File Coverage

blib/lib/Mango/Bulk.pm
Criterion Covered Total %
statement 12 99 12.1
branch 0 32 0.0
condition 0 28 0.0
subroutine 4 21 19.0
pod 8 8 100.0
total 24 188 12.7


line stmt bran cond sub pod time code
1             package Mango::Bulk;
2 9     9   222 use Mojo::Base -base;
  9         16  
  9         57  
3              
4 9     9   1208 use Carp 'croak';
  9         33  
  9         444  
5 9     9   48 use Mango::BSON qw(bson_doc bson_encode bson_oid bson_raw);
  9         18  
  9         426  
6 9     9   4251 use Mojo::IOLoop;
  9         1153626  
  9         59  
7              
8             has 'collection';
9             has ordered => 1;
10              
11             sub execute {
12 0     0 1   my ($self, $cb) = @_;
13              
14             # Full results shared with all operations
15 0           my $full = {upserted => [], writeConcernErrors => [], writeErrors => []};
16 0           $full->{$_} = 0 for qw(nInserted nMatched nModified nRemoved nUpserted);
17              
18             # Non-blocking
19 0 0         if ($cb) {
20 0     0     return Mojo::IOLoop->next_tick(sub { shift; $self->$cb(undef, $full) })
  0            
21 0 0         unless my $group = shift @{$self->{ops}};
  0            
22 0           return $self->_next($group, $full, $cb);
23             }
24              
25             # Blocking
26 0           my $db = $self->collection->db;
27 0           my $protocol = $db->mango->protocol;
28 0           while (my $group = shift @{$self->{ops}}) {
  0            
29 0           my ($type, $offset, $command) = $self->_group($group);
30 0           _merge($type, $offset, $full, $db->command($command));
31 0 0         if (my $err = $protocol->write_error($full)) { croak $err }
  0            
32             }
33              
34 0           return $full;
35             }
36              
37 0     0 1   sub find { shift->_set(query => shift) }
38              
39             sub insert {
40 0     0 1   my ($self, $doc) = @_;
41 0   0       $doc->{_id} //= bson_oid;
42 0           return $self->_op(insert => $doc);
43             }
44              
45 0     0 1   sub remove { shift->_remove(0) }
46 0     0 1   sub remove_one { shift->_remove(1) }
47              
48 0     0 1   sub update { shift->_update(\1, @_) }
49 0     0 1   sub update_one { shift->_update(\0, @_) }
50              
51 0     0 1   sub upsert { shift->_set(upsert => 1) }
52              
53             sub _group {
54 0     0     my ($self, $group) = @_;
55              
56 0           my ($type, $offset) = splice @$group, 0, 2;
57 0           my $collection = $self->collection;
58 0 0         return $type, $offset, bson_doc $type => $collection->name,
    0          
59             $type eq 'insert' ? 'documents' : "${type}s" => $group,
60             ordered => $self->ordered ? \1 : \0,
61             writeConcern => $collection->db->build_write_concern;
62             }
63              
64             sub _merge {
65 0     0     my ($type, $offset, $full, $result) = @_;
66              
67             # Insert
68 0 0         if ($type eq 'insert') { $full->{nInserted} += $result->{n} }
  0 0          
    0          
69              
70             # Update
71             elsif ($type eq 'update') {
72 0           $full->{nModified} += $result->{n};
73              
74             # Upsert
75 0 0         if (my $upserted = $result->{upserted}) {
76 0           push @{$full->{upserted}}, map { $_->{index} += $offset; $_ } @$upserted;
  0            
  0            
  0            
77 0           $full->{nUpserted} += @$upserted;
78 0           $full->{nMatched} += $result->{n} - @$upserted;
79             }
80              
81 0           else { $full->{nMatched} += $result->{n} }
82             }
83              
84             # Delete
85 0           elsif ($type eq 'delete') { $full->{nRemoved} += $result->{n} }
86              
87             # Errors
88 0           push @{$full->{writeConcernErrors}}, $result->{writeConcernError}
89 0 0         if $result->{writeConcernError};
90 0           push @{$full->{writeErrors}},
91 0           map { $_->{index} += $offset; $_ } @{$result->{writeErrors}};
  0            
  0            
  0            
92             }
93              
94             sub _next {
95 0     0     my ($self, $group, $full, $cb) = @_;
96              
97 0           my ($type, $offset, $command) = $self->_group($group);
98             $self->collection->db->command(
99             $command => sub {
100 0     0     my ($db, $err, $result) = @_;
101              
102 0 0         _merge($type, $offset, $full, $result) unless $err;
103 0   0       $err ||= $self->collection->db->mango->protocol->write_error($full);
104 0 0         return $self->$cb($err, $full) if $err;
105              
106 0 0         return $self->$cb(undef, $full) unless my $next = shift @{$self->{ops}};
  0            
107 0           $self->_next($next, $full, $cb);
108             }
109 0           );
110             }
111              
112             sub _op {
113 0     0     my ($self, $type, $doc) = @_;
114              
115             # Pre-encode documents
116 0           my $mango = $self->collection->db->mango;
117 0           my $bson_max = $mango->max_bson_size;
118 0           my $batch_max = $mango->max_write_batch_size;
119 0   0       my $ops = $self->{ops} ||= [];
120 0 0         my $previous = @$ops ? $ops->[-1] : [];
121 0           my $bson = bson_encode $doc;
122 0           my $size = length $bson;
123 0   0       my $new = ($self->{size} // 0) + $size;
124 0   0       my $limit = $new > $bson_max || @$previous >= $batch_max + 2;
125              
126             # Group documents based on type and limits
127             push @$ops, [$type, $self->{offset} || 0] and delete $self->{size}
128 0 0 0       if !@$previous || $previous->[0] ne $type || $limit;
      0        
      0        
      0        
129 0           push @{$ops->[-1]}, bson_raw $bson;
  0            
130 0           $self->{size} += $size;
131 0           $self->{offset}++;
132              
133 0           return $self;
134             }
135              
136             sub _remove {
137 0     0     my ($self, $limit) = @_;
138 0   0       my $query = delete $self->{query} // {};
139 0           return $self->_op(delete => {q => $query, limit => $limit});
140             }
141              
142             sub _set {
143 0     0     my ($self, $key, $value) = @_;
144 0           $self->{$key} = $value;
145 0           return $self;
146             }
147              
148             sub _update {
149 0     0     my ($self, $multi, $update) = @_;
150 0   0       my $query = delete $self->{query} // {};
151 0 0         my $upsert = delete $self->{upsert} ? \1 : \0;
152 0           return $self->_op(
153             update => {q => $query, u => $update, multi => $multi, upsert => $upsert});
154             }
155              
156             1;
157              
158             =encoding utf8
159              
160             =head1 NAME
161              
162             Mango::Bulk - MongoDB bulk operations
163              
164             =head1 SYNOPSIS
165              
166             use Mango::Bulk;
167              
168             my $bulk = Mango::Bulk->new(collection => $collection);
169             $bulk->insert({foo => 'bar'})->insert({foo => 'baz'})->execute;
170              
171             =head1 DESCRIPTION
172              
173             L is a container for MongoDB bulk operations, all operations will
174             be automatically grouped so they don't exceed L.
175              
176             =head1 ATTRIBUTES
177              
178             L implements the following attributes.
179              
180             =head2 collection
181              
182             my $collection = $bulk->collection;
183             $bulk = $bulk->collection(Mango::Collection->new);
184              
185             L object this bulk operation belongs to.
186              
187             =head2 ordered
188              
189             my $bool = $bulk->ordered;
190             $bulk = $bulk->ordered($bool);
191              
192             Bulk operations are ordered, defaults to a true value.
193              
194             =head1 METHODS
195              
196             L inherits all methods from L and implements the
197             following new ones.
198              
199             =head2 execute
200              
201             my $results = $bulk->execute;
202              
203             Execute bulk operations. You can also append a callback to perform operation
204             non-blocking.
205              
206             $bulk->execute(sub {
207             my ($bulk, $err, $results) = @_;
208             ...
209             });
210             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
211              
212             =head2 find
213              
214             $bulk = $bulk->find({foo => 'bar'});
215              
216             Query for next update or remove operation.
217              
218             =head2 insert
219              
220             $bulk = $bulk->insert({foo => 'bar'});
221              
222             Insert document.
223              
224             =head2 remove
225              
226             $bulk = $bulk->remove;
227              
228             Remove multiple documents.
229              
230             =head2 remove_one
231              
232             $bulk = $bulk->remove_one;
233              
234             Remove one document.
235              
236             =head2 update
237              
238             $bulk = $bulk->update({foo => 'bar'});
239              
240             Update multiple documents.
241              
242             =head2 update_one
243              
244             $bulk = $bulk->update_one({foo => 'baz'});
245              
246             Update one document.
247              
248             =head2 upsert
249              
250             $bulk = $bulk->upsert;
251              
252             Next update operation will be an C.
253              
254             =head1 SEE ALSO
255              
256             L, L, L.
257              
258             =cut