File Coverage

blib/lib/MongoDB/Op/_BulkWrite.pm
Criterion Covered Total %
statement 51 204 25.0
branch 0 72 0.0
condition 0 28 0.0
subroutine 17 25 68.0
pod 0 2 0.0
total 68 331 20.5


line stmt bran cond sub pod time code
1             # Copyright 2014 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   373 use strict;
  59         125  
  59         1659  
16 59     59   269 use warnings;
  59         119  
  59         2012  
17             package MongoDB::Op::_BulkWrite;
18              
19             # Encapsulate a multi-document multi-operation write; returns a
20             # MongoDB::BulkWriteResult object
21              
22 59     59   284 use version;
  59         107  
  59         14206  
23             our $VERSION = 'v2.2.2';
24              
25 59     59   4583 use Moo;
  59         136  
  59         327  
26              
27 59     59   18178 use MongoDB::Error;
  59         157  
  59         7035  
28 59     59   22555 use MongoDB::BulkWriteResult;
  59         183  
  59         2082  
29 59     59   490 use MongoDB::UnacknowledgedResult;
  59         130  
  59         1283  
30 59     59   26981 use MongoDB::Op::_InsertOne;
  59         199  
  59         1985  
31 59     59   22921 use MongoDB::Op::_Update;
  59         202  
  59         2323  
32 59     59   22992 use MongoDB::Op::_Delete;
  59         194  
  59         2041  
33 59     59   438 use MongoDB::_Protocol;
  59         128  
  59         1036  
34 59     59   264 use MongoDB::_Constants;
  59         124  
  59         6578  
35 59         328 use MongoDB::_Types qw(
36             Boolish
37 59     59   337 );
  59         120  
38 59         264 use Types::Standard qw(
39             ArrayRef
40             InstanceOf
41 59     59   53234 );
  59         125  
42 59     59   43160 use Safe::Isa;
  59         152  
  59         6970  
43 59     59   361 use boolean;
  59         121  
  59         482  
44              
45 59     59   3379 use namespace::clean;
  59         110  
  59         301  
46              
47             has queue => (
48             is => 'ro',
49             required => 1,
50             isa => ArrayRef,
51             );
52              
53             has ordered => (
54             is => 'ro',
55             required => 1,
56             isa => Boolish,
57             );
58              
59             has client => (
60             is => 'ro',
61             required => 1,
62             isa => InstanceOf['MongoDB::MongoClient'],
63             );
64              
65             has _retryable => (
66             is => 'rw',
67             isa => Boolish,
68             default => 1,
69             );
70              
71             with $_ for qw(
72             MongoDB::Role::_PrivateConstructor
73             MongoDB::Role::_CollectionOp
74             MongoDB::Role::_WriteOp
75             MongoDB::Role::_UpdatePreEncoder
76             MongoDB::Role::_InsertPreEncoder
77             MongoDB::Role::_BypassValidation
78             );
79              
80             sub _is_retryable {
81 0     0     my $self = shift;
82 0   0       return $self->_should_use_acknowledged_write && $self->_retryable;
83             }
84              
85             sub has_collation {
86 0     0 0   my $self = shift;
87             return !!grep {
88 0           my ( $type, $doc ) = @$_;
89 0 0 0       ( $type eq "update" || $type eq "delete" ) && defined $doc->{collation};
90 0           } @{ $self->queue };
  0            
91             }
92              
93             sub execute {
94 0     0 0   my ( $self, $link ) = @_;
95              
96 0 0         Carp::confess("NO LINK") unless $link;
97              
98 0 0         if ( $self->has_collation ) {
99 0 0         MongoDB::UsageError->throw(
100             "MongoDB host '" . $link->address . "' doesn't support collation" )
101             if !$link->supports_collation;
102              
103 0 0         MongoDB::UsageError->throw(
104             "Unacknowledged bulk writes that specify a collation are not allowed")
105             if !$self->_should_use_acknowledged_write;
106             }
107              
108 0           my $use_write_cmd = $link->supports_write_commands;
109              
110             # If using legacy write ops, then there will never be a valid modified_count
111             # result so we set that to undef in the constructor; otherwise, we set it
112             # to 0 so that results accumulate normally. If a mongos on a mixed topology
113             # later fails to set it, results merging will handle it in that case.
114             # If unacknowledged, we have to accumulate a result to get bulk semantics
115             # right and just throw it away later.
116 0 0         my $result = MongoDB::BulkWriteResult->_new(
117             modified_count => ( $use_write_cmd ? 0 : undef ),
118             write_errors => [],
119             write_concern_errors => [],
120             op_count => 0,
121             batch_count => 0,
122             inserted_count => 0,
123             upserted_count => 0,
124             matched_count => 0,
125             deleted_count => 0,
126             upserted => [],
127             inserted => [],
128             );
129              
130 0 0         my @batches =
131             $self->ordered
132             ? $self->_batch_ordered( $link, $self->queue )
133             : $self->_batch_unordered( $link, $self->queue );
134              
135 0           for my $batch (@batches) {
136 0 0         if ($use_write_cmd) {
137 0           $self->_execute_write_command_batch( $link, $batch, $result );
138             }
139             else {
140 0           $self->_execute_legacy_batch( $link, $batch, $result );
141             }
142             }
143              
144 0 0         return MongoDB::UnacknowledgedResult->_new(
145             write_errors => [],
146             write_concern_errors => [],
147             ) if ! $self->_should_use_acknowledged_write;
148              
149             # only reach here with an error for unordered bulk ops
150 0           $result->assert_no_write_error;
151              
152             # write concern errors are thrown only for the entire batch
153 0           $result->assert_no_write_concern_error;
154              
155 0           return $result;
156             }
157              
158             my %OP_MAP = (
159             insert => [ insert => 'documents' ],
160             update => [ update => 'updates' ],
161             delete => [ delete => 'deletes' ],
162             );
163              
164             # _execute_write_command_batch may split batches if they are too large and
165             # execute them separately
166              
167             sub _execute_write_command_batch {
168 0     0     my ( $self, $link, $batch, $result ) = @_;
169              
170 0           my ( $type, $docs, $idx_map ) = @$batch;
171 0           my ( $cmd, $op_key ) = @{ $OP_MAP{$type} };
  0            
172              
173 0           my $boolean_ordered = boolean( $self->ordered );
174             my ( $db_name, $coll_name, $wc ) =
175 0           map { $self->$_ } qw/db_name coll_name write_concern/;
  0            
176              
177 0           my @left_to_send = ($docs);
178 0           my @sending_idx_map = ($idx_map);
179              
180 0           my $max_bson_size = $link->max_bson_object_size;
181 0           my $supports_document_validation = $link->supports_document_validation;
182              
183 0           while (@left_to_send) {
184 0           my $chunk = shift @left_to_send;
185 0           my $chunk_idx_map = shift @sending_idx_map;
186             # for update/insert, pre-encode docs as they need custom BSON handling
187             # that can't be applied to an entire write command at once
188 0 0         if ( $cmd eq 'update' ) {
    0          
189             # take array of hash, validate and encode each update doc; since this
190             # might be called more than once if chunks are getting split, check if
191             # the update doc is already encoded; this also removes the 'is_replace'
192             # field that needs to not be in the command sent to the server
193 0           for ( my $i = 0; $i <= $#$chunk; $i++ ) {
194 0 0         next if ref( $chunk->[$i]{u} ) eq 'BSON::Raw';
195 0           my $is_replace = delete $chunk->[$i]{is_replace};
196 0           $chunk->[$i]{u} = $self->_pre_encode_update( $max_bson_size, $chunk->[$i]{u}, $is_replace );
197             }
198             }
199             elsif ( $cmd eq 'insert' ) {
200             # take array of docs, encode each one while saving original or generated _id
201             # field; since this might be called more than once if chunks are getting
202             # split, check if the doc is already encoded
203 0           for ( my $i = 0; $i <= $#$chunk; $i++ ) {
204 0 0         unless ( ref( $chunk->[$i] ) eq 'BSON::Raw' ) {
205 0           $chunk->[$i] = $self->_pre_encode_insert( $max_bson_size, $chunk->[$i], '.' );
206             };
207             }
208             }
209              
210             my $cmd_doc = [
211             $cmd => $coll_name,
212             $op_key => $chunk,
213             ordered => $boolean_ordered,
214 0           @{ $wc->as_args },
  0            
215             ];
216              
217 0 0 0       if ( $cmd eq 'insert' || $cmd eq 'update' ) {
218 0           $cmd_doc = $self->_maybe_bypass( $supports_document_validation, $cmd_doc );
219             }
220              
221 0           my $op = MongoDB::Op::_Command->_new(
222             db_name => $db_name,
223             query => $cmd_doc,
224             query_flags => {},
225             bson_codec => $self->bson_codec,
226             session => $self->session,
227             retryable_write => $self->retryable_write,
228             monitoring_callback => $self->monitoring_callback,
229             );
230              
231             my $cmd_result = eval {
232 0 0         $self->_is_retryable
233             ? $self->client->send_retryable_write_op( $op )
234             : $self->client->send_write_op( $op );
235 0 0         } or do {
236 0   0       my $error = $@ || "Unknown error";
237             # This error never touches the database!.... so is before any retryable writes errors etc.
238 0 0         if ( $error->$_isa("MongoDB::_CommandSizeError") ) {
    0          
239 0 0         if ( @$chunk == 1 ) {
240 0           MongoDB::DocumentError->throw(
241             message => "document too large",
242             document => $chunk->[0],
243             );
244             }
245             else {
246 0           unshift @left_to_send, $self->_split_chunk( $chunk, $error->size );
247 0           unshift @sending_idx_map, $self->_split_chunk( $chunk_idx_map, $error->size );
248             }
249             }
250             elsif ( $error->$_can( 'result' ) ) {
251             # We are already going to explode from something here, but
252             # BulkWriteResult has the correct parsing method to allow us to
253             # check for write errors, as they have a higher priority than
254             # write concern errors.
255 0           MongoDB::BulkWriteResult->_parse_cmd_result(
256             op => $type,
257             op_count => scalar @$chunk,
258             result => $error->result,
259             cmd_doc => $cmd_doc,
260             idx_map => $chunk_idx_map,
261             )->assert_no_write_error;
262             # Explode with original error
263 0           die $error;
264             }
265             else {
266 0           die $error;
267             }
268             };
269              
270 0 0         redo unless $cmd_result; # restart after a chunk split
271              
272 0           my $r = MongoDB::BulkWriteResult->_parse_cmd_result(
273             op => $type,
274             op_count => scalar @$chunk,
275             result => $cmd_result,
276             cmd_doc => $cmd_doc,
277             idx_map => $chunk_idx_map,
278             );
279              
280             # append corresponding ops to errors
281 0 0         if ( $r->count_write_errors ) {
282 0           for my $error ( @{ $r->write_errors } ) {
  0            
283 0           $error->{op} = $chunk->[ $error->{index} ];
284             }
285             }
286              
287 0           $result->_merge_result($r);
288 0 0         $result->assert_no_write_error if $boolean_ordered;
289             }
290              
291 0           return;
292             }
293              
294             sub _split_chunk {
295 0     0     my ( $self, $chunk, $size ) = @_;
296              
297 0           my $avg_cmd_size = $size / @$chunk;
298 0           my $new_cmds_per_chunk = int( MAX_BSON_WIRE_SIZE / $avg_cmd_size );
299              
300 0           my @split_chunks;
301 0           while (@$chunk) {
302 0           push @split_chunks, [ splice( @$chunk, 0, $new_cmds_per_chunk ) ];
303             }
304              
305 0           return @split_chunks;
306             }
307              
308             sub _batch_ordered {
309 0     0     my ( $self, $link, $queue ) = @_;
310 0           my @batches;
311 0           my $last_type = '';
312 0           my $count = 0;
313              
314 0           my $max_batch_count = $link->max_write_batch_size;
315              
316 0           my $queue_idx = 0;
317 0           for my $op (@$queue) {
318 0           my ( $type, $doc ) = @$op;
319 0 0 0       if ( $type ne $last_type || $count == $max_batch_count ) {
320 0           push @batches, [ $type => [$doc], [$queue_idx] ];
321 0           $last_type = $type;
322 0           $count = 1;
323             }
324             else {
325 0           push @{ $batches[-1][1] }, $doc;
  0            
326 0           push @{ $batches[-1][2] }, $queue_idx;
  0            
327 0           $count++;
328             }
329 0           $queue_idx++;
330             }
331              
332 0           return @batches;
333             }
334              
335             sub _batch_unordered {
336 0     0     my ( $self, $link, $queue ) = @_;
337 0           my %batches = map { $_ => [ [] ] } keys %OP_MAP;
  0            
338 0           my %queue_map = map { $_ => [ [] ] } keys %OP_MAP;
  0            
339              
340 0           my $max_batch_count = $link->max_write_batch_size;
341              
342 0           my $queue_idx = 0;
343 0           for my $op (@$queue) {
344 0           my ( $type, $doc ) = @$op;
345 0 0         if ( @{ $batches{$type}[-1] } == $max_batch_count ) {
  0            
346 0           push @{ $batches{$type} }, [$doc];
  0            
347 0           push @{ $queue_map{$type} }, [ $queue_idx ];
  0            
348             }
349             else {
350 0           push @{ $batches{$type}[-1] }, $doc;
  0            
351 0           push @{ $queue_map{$type}[-1] }, $queue_idx;
  0            
352             }
353 0           $queue_idx++;
354             }
355              
356             # insert/update/delete are guaranteed to be in random order on Perl 5.18+
357 0           my @batches;
358 0           for my $type ( grep { scalar @{ $batches{$_}[-1] } } keys %batches ) {
  0            
  0            
359             push @batches, map { [
360             $type,
361             $batches{$type}[$_],
362 0           $queue_map{$type}[$_], # array of indices from the original queue
363 0           ] } 0 .. $#{ $batches{$type} };
  0            
364             }
365 0           return @batches;
366             }
367              
368             sub _execute_legacy_batch {
369 0     0     my ( $self, $link, $batch, $result ) = @_;
370 0           my ( $type, $docs ) = @$batch;
371 0           my $ordered = $self->ordered;
372              
373             # if write concern is not safe, we have to proxy with a safe one so that
374             # we can interrupt ordered bulks, even while ignoring the actual error
375 0           my $wc = $self->write_concern;
376 0           my $w_0 = !$wc->is_acknowledged;
377 0 0         if ($w_0) {
378 0           my $wc_args = $wc->as_args();
379 0 0         my $wcs = scalar @$wc_args ? $wc->as_args()->[1] : {};
380 0           $wcs->{w} = 1;
381 0           $wc = MongoDB::WriteConcern->new($wcs);
382             }
383              
384             # XXX successive inserts ought to get batched up, up to the max size for
385             # batch, but we have no feedback on max size to know how many to put
386             # together. I wonder if send_insert should return a list of write results,
387             # or if it should just strip out however many docs it can from an arrayref
388             # and leave the rest, and then this code can iterate.
389              
390 0           for my $doc (@$docs) {
391              
392 0           my $op;
393 0 0         if ( $type eq 'insert' ) {
    0          
    0          
394 0           $op = MongoDB::Op::_InsertOne->_new(
395             db_name => $self->db_name,
396             coll_name => $self->coll_name,
397             full_name => $self->db_name . "." . $self->coll_name,
398             document => $doc,
399             write_concern => $wc,
400             bson_codec => $self->bson_codec,
401             monitoring_callback => $self->monitoring_callback,
402             );
403             }
404             elsif ( $type eq 'update' ) {
405             $op = MongoDB::Op::_Update->_new(
406             db_name => $self->db_name,
407             coll_name => $self->coll_name,
408             full_name => $self->db_name . "." . $self->coll_name,
409             filter => $doc->{q},
410             update => $doc->{u},
411             multi => $doc->{multi},
412             upsert => $doc->{upsert},
413             write_concern => $wc,
414             is_replace => $doc->{is_replace},
415 0           bson_codec => $self->bson_codec,
416             monitoring_callback => $self->monitoring_callback,
417             );
418             }
419             elsif ( $type eq 'delete' ) {
420             $op = MongoDB::Op::_Delete->_new(
421             db_name => $self->db_name,
422             coll_name => $self->coll_name,
423             full_name => $self->db_name . "." . $self->coll_name,
424             filter => $doc->{q},
425             just_one => !!$doc->{limit},
426 0           write_concern => $wc,
427             bson_codec => $self->bson_codec,
428             monitoring_callback => $self->monitoring_callback,
429             );
430             }
431              
432             my $op_result = eval {
433 0           $op->execute($link);
434 0 0         } or do {
435 0   0       my $error = $@ || "Unknown error";
436 0 0 0       if ( $error->$_isa("MongoDB::DatabaseError")
437             && $error->result->does("MongoDB::Role::_WriteResult") )
438             {
439 0           return $error->result;
440             }
441 0 0 0       die $error unless $w_0 && /exceeds maximum size/;
442 0           return undef; ## no critic: this makes op_result undef
443             };
444              
445 0 0         my $gle_result =
446             $op_result ? MongoDB::BulkWriteResult->_parse_write_op($op_result) : undef;
447              
448             # Even for {w:0}, if the batch is ordered we have to break on the first
449             # error, but we don't throw the error to the user.
450 0 0         if ($w_0) {
451 0 0 0       last if $ordered && ( !$gle_result || $gle_result->count_write_errors );
      0        
452             }
453             else {
454 0           $result->_merge_result($gle_result);
455 0 0         $result->assert_no_write_error if $ordered;
456             }
457             }
458              
459 0           return;
460             }
461              
462             1;