File Coverage

blib/lib/Mango.pm
Criterion Covered Total %
statement 36 214 16.8
branch 0 96 0.0
condition 1 71 1.4
subroutine 12 40 30.0
pod 7 7 100.0
total 56 428 13.0


line stmt bran cond sub pod time code
1             package Mango;
2 9     9   1722142 use Mojo::Base 'Mojo::EventEmitter';
  9         102  
  9         81  
3              
4 9     9   14130 use Carp 'croak';
  9         16  
  9         367  
5 9     9   5785 use Hash::Util::FieldHash;
  9         9354  
  9         397  
6 9     9   3864 use Mango::BSON 'bson_doc';
  9         31  
  9         681  
7 9     9   4973 use Mango::Database;
  9         32  
  9         79  
8 9     9   4268 use Mango::Protocol;
  9         25  
  9         58  
9 9     9   282 use Mojo::IOLoop;
  9         18  
  9         52  
10 9     9   4317 use Mojo::URL;
  9         64088  
  9         76  
11 9     9   347 use Mojo::Util 'dumper';
  9         17  
  9         362  
12 9     9   53 use Scalar::Util 'weaken';
  9         18  
  9         476  
13              
14 9   50 9   50 use constant DEBUG => $ENV{MANGO_DEBUG} || 0;
  9         20  
  9         734  
15 9     9   68 use constant DEFAULT_PORT => 27017;
  9         25  
  9         29549  
16              
17             has connect_opt => sub { [] };
18             has default_db => 'admin';
19             has hosts => sub { [['localhost']] };
20             has [qw(inactivity_timeout j)] => 0;
21             has ioloop => sub { Mojo::IOLoop->new };
22             has max_bson_size => 16777216;
23             has max_connections => 5;
24             has [qw(max_write_batch_size wtimeout)] => 1000;
25             has protocol => sub { Mango::Protocol->new };
26             has w => 1;
27              
28             # Private variables are not visible in the object's dump. This
29             # is good for security.
30             Hash::Util::FieldHash::fieldhash my %AUTH;
31              
32             our $VERSION = '1.30';
33              
34 0     0     sub DESTROY { shift->_cleanup }
35              
36 0 0   0 1   sub backlog { scalar @{shift->{queue} || []} }
  0            
37              
38             sub db {
39 0     0 1   my ($self, $name) = @_;
40 0   0       $name //= $self->default_db;
41 0           my $db = Mango::Database->new(mango => $self, name => $name);
42 0           weaken $db->{mango};
43 0           return $db;
44             }
45              
46             sub from_string {
47 0     0 1   my ($self, $str, @extra) = @_;
48              
49             # Protocol
50 0 0         return $self unless $str;
51 0           my $url = Mojo::URL->new($str);
52 0 0         croak qq{Invalid MongoDB connection string "$str"}
53             unless $url->protocol eq 'mongodb';
54              
55             # Hosts
56 0           my @hosts;
57             /^([^,:]+)(?::(\d+))?/ and push @hosts, $2 ? [$1, $2] : [$1]
58 0 0 0       for split /,/, join(':', map { $_ // '' } $url->host, $url->port);
  0   0        
59 0 0         $self->hosts(\@hosts) if @hosts;
60              
61             # Database
62 0 0         if (my $db = $url->path->parts->[0]) { $self->default_db($db) }
  0            
63              
64             # User and password
65 0 0 0       if (($url->userinfo // '') =~ /^([^:]+):([^:]+)$/) {
66 0           require Mango::Auth::SCRAM;
67 0           $self->_auth(Mango::Auth::SCRAM->new)
68             ->_auth->_credentials([$self->default_db, $1, $2]);
69             }
70              
71             # Options
72 0           my $query = $url->query;
73 0 0         if (my $j = $query->param('journal')) { $self->j($j) }
  0            
74 0 0         if (my $w = $query->param('w')) { $self->w($w) }
  0            
75 0 0         if (my $timeout = $query->param('wtimeoutMS')) { $self->wtimeout($timeout) }
  0            
76              
77             # Other connection options like TLS
78 0 0         if (@extra) { $self->connect_opt(\@extra) }
  0            
79              
80 0           return $self;
81             }
82              
83 0     0 1   sub get_more { shift->_op('get_more', 1, @_) }
84              
85 0     0 1   sub kill_cursors { shift->_op('kill_cursors', 0, @_) }
86              
87 0     0 1   sub new { shift->SUPER::new->from_string(@_) }
88              
89 0     0 1   sub query { shift->_op('query', 1, @_) }
90              
91             sub _auth {
92 0     0     my ($self, $mode) = @_;
93 0 0         return $AUTH{$self} unless $mode;
94              
95 0           $AUTH{$self} = $mode;
96 0           $AUTH{$self}->mango($self);
97 0           weaken $AUTH{$self}->{mango};
98 0           return $self;
99             }
100              
101             sub _build {
102 0     0     my ($self, $name) = (shift, shift);
103 0           my $next = $self->_id;
104 0           warn "-- Operation #$next ($name)\n@{[dumper [@_]]}" if DEBUG;
105 0           my $method = "build_$name";
106 0           return ($next, $self->protocol->$method($next, @_));
107             }
108              
109             sub _cleanup {
110 0     0     my $self = shift;
111 0 0         return unless $self->_loop(0);
112              
113             # Clean up connections
114 0           delete $self->{pid};
115 0           my $connections = delete $self->{connections};
116 0           for my $c (keys %$connections) {
117 0           my $loop = $self->_loop($connections->{$c}{nb});
118 0 0         $loop->remove($c) if $loop;
119             }
120              
121             # Clean up active operations
122 0   0       my $queue = delete $self->{queue} || [];
123             $_->{last} && !$_->{start} && unshift @$queue, $_->{last}
124 0   0       for values %$connections;
      0        
125 0           $self->_finish(undef, $_->{cb}, 'Premature connection close') for @$queue;
126             }
127              
128             sub _close {
129 0     0     my ($self, $id) = @_;
130              
131 0 0         return unless my $c = delete $self->{connections}{$id};
132 0           my $last = $c->{last};
133 0 0         $self->_finish(undef, $last->{cb}, 'Premature connection close') if $last;
134 0 0         $self->_connect($c->{nb}) if @{$self->{queue}};
  0            
135             }
136              
137             sub _connect {
138 0     0     my ($self, $nb, $hosts) = @_;
139              
140 0   0       my ($host, $port) = @{shift @{$hosts ||= [@{$self->hosts}]}};
  0            
  0            
  0            
141 0   0       $port //= DEFAULT_PORT;
142 0           my @extra = @{$self->connect_opt};
  0            
143 0           weaken $self;
144 0           my $id;
145             $id = $self->_loop($nb)->client(
146             {address => $host, port => $port, @extra} => sub {
147 0     0     my ($loop, $err, $stream) = @_;
148              
149             # Connection error (try next server)
150 0 0         if ($err) {
151 0 0         return $self->_error($id, $err) unless @$hosts;
152 0           delete $self->{connections}{$id};
153 0           return $self->_connect($nb, $hosts);
154             }
155              
156             # Connection established
157 0           $stream->timeout($self->inactivity_timeout);
158 0 0         $stream->on(close => sub { $self && $self->_close($id) });
  0            
159 0 0         $stream->on(error => sub { $self && $self->_error($id, pop) });
  0            
160 0           $stream->on(read => sub { $self->_read($id, pop) });
  0            
161              
162             # Check node information with "isMaster" command
163 0           my $cb = sub { shift->_master($id, $nb, $hosts, pop) };
  0            
164 0           $self->_fast($id, $self->default_db, {isMaster => 1}, $cb);
165             }
166 0           );
167 0           $self->{connections}{$id} = { nb => $nb, start => 1 };
168              
169 0           my $num = scalar keys %{$self->{connections}};
  0            
170 0           warn "-- New connection ($host:$port:$num)\n" if DEBUG;
171             }
172              
173             sub _error {
174 0     0     my ($self, $id, $err) = @_;
175              
176 0 0         return unless my $c = delete $self->{connections}{$id};
177 0           $self->_loop($c->{nb})->remove($id);
178              
179 0   0       my $last = $c->{last} // shift @{$self->{queue}};
  0            
180 0 0         $self->_finish(undef, $last->{cb}, $err) if $last;
181             }
182              
183             sub _fast {
184 0     0     my ($self, $id, $db, $command, $cb) = @_;
185              
186             # Handle errors
187             my $wrapper = sub {
188 0     0     my ($self, $err, $reply) = @_;
189              
190 0           my $doc = $reply->{docs}[0];
191 0   0       $err ||= $self->protocol->command_error($doc);
192 0 0         return $self->$cb(undef, $doc) unless $err;
193              
194 0 0         return unless my $last = shift @{$self->{queue}};
  0            
195 0           $self->_finish(undef, $last->{cb}, $err);
196 0           };
197              
198             # Skip the queue and run command right away
199 0           my ($next, $msg)
200             = $self->_build('query', "$db.\$cmd", {}, 0, -1, $command, {});
201             $self->{connections}{$id}{fast}
202 0           = {id => $next, safe => 1, msg => $msg, cb => $wrapper};
203 0           $self->_next;
204             }
205              
206             sub _finish {
207 0     0     my ($self, $reply, $cb, $err) = @_;
208 0   0       $self->$cb($err || $self->protocol->query_failure($reply), $reply);
209             }
210              
211 0   0 0     sub _id { $_[0]{id} = $_[0]->protocol->next_id($_[0]{id} // 0) }
212              
213 0 0   0     sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }
214              
215             sub _master {
216 0     0     my ($self, $id, $nb, $hosts, $doc) = @_;
217              
218             # Check version
219             return $self->_error($id, 'MongoDB version 3.0 required')
220 0 0 0       unless ($doc->{maxWireVersion} || 0) >= 3;
221              
222             # Continue with authentication if we are connected to the primary
223 0 0         if ($doc->{ismaster}) {
224 0 0         return $self->_auth
225             ? $self->_auth->_authenticate($id)
226             : $self->emit(connection => $id)->_next;
227             }
228              
229             # Get primary and try to connect again
230 0 0 0       unshift @$hosts, [$1, $2] if ($doc->{primary} // '') =~ /^(.+):(\d+)$/;
231 0 0         return $self->_error($id, "Couldn't find primary node") unless @$hosts;
232 0           delete $self->{connections}{$id};
233 0           $self->_loop($nb)->remove($id);
234 0           $self->_connect($nb, $hosts);
235             }
236              
237             sub _next {
238 0     0     my ($self, $op) = @_;
239              
240             # Make sure all connections are saturated
241 0 0 0       push @{$self->{queue} ||= []}, $op if $op;
  0            
242 0           my $connections = $self->{connections};
243 0           my $start;
244 0   0       $self->_write($_) and $start++ for keys %$connections;
245              
246             # Check if we need a blocking connection
247 0 0         return unless $op;
248 0           my @ids = keys %$connections;
249             return $self->_connect(0)
250 0 0 0       if !$op->{nb} && !grep { !$connections->{$_}{nb} } @ids;
  0            
251              
252             # Check if we need more non-blocking connections
253             $self->_connect(1)
254 0 0 0       if !$start && @{$self->{queue}} && @ids < $self->max_connections;
  0   0        
255             }
256              
257             sub _op {
258 0     0     my ($self, $op, $safe) = (shift, shift, shift);
259 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
260 0           my ($next, $msg) = $self->_build($op, @_);
261 0           $self->_start(
262             {id => $next, safe => $safe, msg => $msg, nb => !!$cb, cb => $cb});
263             }
264              
265             sub _read {
266 0     0     my ($self, $id, $chunk) = @_;
267              
268 0           my $c = $self->{connections}{$id};
269 0           $c->{buffer} .= $chunk;
270 0           while (my $reply = $self->protocol->parse_reply(\$c->{buffer})) {
271 0           warn "-- Client <<< Server (#$reply->{to})\n@{[dumper $reply]}" if DEBUG;
272 0 0         next unless $reply->{to} == $c->{last}{id};
273 0           $self->_finish($reply, (delete $c->{last})->{cb});
274             }
275 0           $self->_next;
276             }
277              
278             sub _start {
279 0     0     my ($self, $op) = @_;
280              
281             # Fork safety
282 0 0 0       $self->_cleanup unless ($self->{pid} //= $$) eq $$;
283              
284             # Non-blocking
285 0 0         return $self->_next($op) if $op->{cb};
286              
287             # Blocking
288 0           my ($err, $reply);
289 0     0     $op->{cb} = sub { shift->ioloop->stop; ($err, $reply) = @_ };
  0            
  0            
290 0           $self->_next($op);
291 0           $self->ioloop->start;
292 0 0         return $err ? croak $err : $reply;
293             }
294              
295             sub _write {
296 0     0     my ($self, $id) = @_;
297              
298             # Make sure connection has not been corrupted while event loop was stopped
299 0           my $c = $self->{connections}{$id};
300 0 0         return $c->{start} if $c->{last};
301 0           my $loop = $self->_loop($c->{nb});
302 0 0         return undef unless my $stream = $loop->stream($id);
303 0 0 0       if (!$loop->is_running && $stream->is_readable) {
304 0           $stream->close;
305 0           return undef;
306             }
307              
308             # Fast operation
309 0 0         delete $c->{start} unless my $last = delete $c->{fast};
310              
311             # Blocking operations have a higher precedence
312             return $c->{start}
313 0 0 0       unless $last || ($c->{nb} xor !($self->{queue}->[-1] || {})->{nb});
      0        
      0        
314 0 0 0       $last ||= $c->{nb} ? shift @{$self->{queue}} : pop @{$self->{queue}};
  0            
  0            
315              
316 0 0         return $c->{start} unless $c->{last} = $last;
317 0           warn "-- Client >>> Server (#$last->{id})\n" if DEBUG;
318 0           $stream->write(delete $last->{msg});
319              
320             # Unsafe operations are done when they are written
321 0 0         return $c->{start} if $last->{safe};
322 0           weaken $self;
323 0     0     $stream->write('', sub { $self->_finish(undef, delete($c->{last})->{cb}) });
  0            
324 0           return $c->{start};
325             }
326              
327             1;
328              
329             =encoding utf8
330              
331             =head1 NAME
332              
333             Mango - Pure-Perl non-blocking I/O MongoDB driver
334              
335             =head1 SYNOPSIS
336              
337             use Mango;
338             use feature state;
339              
340             # Declare a Mango helper
341             sub mango { state $m = Mango->new('mongodb://localhost:27017') }
342             # or in a Mojolicious::Lite app
343             helper mango => sub { state $m = Mango->new('mongodb://localhost:27017') };
344              
345             # Insert document
346             my $oid = mango->db('test')->collection('foo')->insert({bar => 'baz'});
347              
348             # Find document
349             my $doc = mango->db('test')->collection('foo')->find_one({bar => 'baz'});
350             say $doc->{bar};
351              
352             # Update document
353             mango->db('test')->collection('foo')
354             ->update({bar => 'baz'}, {bar => 'yada'});
355              
356             # Remove document
357             mango->db('test')->collection('foo')->remove({bar => 'yada'});
358              
359             # Insert document with special BSON types
360             use Mango::BSON ':bson';
361             my $oid = mango->db('test')->collection('foo')
362             ->insert({data => bson_bin("\x00\x01"), now => bson_time});
363              
364             # Non-blocking concurrent find
365             my $delay = Mojo::IOLoop->delay(sub {
366             my ($delay, @docs) = @_;
367             ...
368             });
369             for my $name (qw(sri marty)) {
370             my $end = $delay->begin(0);
371             mango->db('test')->collection('users')->find({name => $name})->all(sub {
372             my ($cursor, $err, $docs) = @_;
373             $end->(@$docs);
374             });
375             }
376             $delay->wait;
377              
378             # Event loops such as AnyEvent are supported through EV
379             use EV;
380             use AnyEvent;
381             my $cv = AE::cv;
382             mango->db('test')->command(buildInfo => sub {
383             my ($db, $err, $doc) = @_;
384             $cv->send($doc->{version});
385             });
386             say $cv->recv;
387              
388             =head1 DESCRIPTION
389              
390             L is a pure-Perl non-blocking I/O MongoDB driver, optimized for use
391             with the L real-time web framework, and with multiple event loop
392             support. Since MongoDB is still changing rapidly, only the latest stable
393             version is supported.
394              
395             For MongoDB 2.6 support, use L 1.16.
396              
397             To learn more about MongoDB you should take a look at the
398             L, the documentation included
399             in this distribution is no replacement for it.
400              
401             Look at L for CRUD operations.
402              
403             Many arguments passed to methods as well as values of attributes get
404             serialized to BSON with L, which provides many helper functions
405             you can use to generate data types that are not available natively in Perl.
406             All connections will be reset automatically if a new process has been forked,
407             this allows multiple processes to share the same L object safely.
408              
409             For better scalability (epoll, kqueue) and to provide IPv6, SOCKS5 as well as
410             TLS support, the optional modules L (4.0+), L (0.20+),
411             L (0.64+) and L (1.84+) will be used
412             automatically if they are installed. Individual features can also be disabled
413             with the C, C and C environment
414             variables.
415              
416             =head1 EVENTS
417              
418             L inherits all events from L and can emit the
419             following new ones.
420              
421             =head2 connection
422              
423             $mango->on(connection => sub {
424             my ($mango, $id) = @_;
425             ...
426             });
427              
428             Emitted when a new connection has been established.
429              
430             =head1 ATTRIBUTES
431              
432             L implements the following attributes.
433              
434             =head2 default_db
435              
436             my $name = $mango->default_db;
437             $mango = $mango->default_db('test');
438              
439             Default database, defaults to C.
440              
441             =head2 hosts
442              
443             my $hosts = $mango->hosts;
444             $mango = $mango->hosts([['localhost', 3000], ['localhost', 4000]]);
445              
446             Servers to connect to, defaults to C and port C<27017>.
447              
448             =head2 inactivity_timeout
449              
450             my $timeout = $mango->inactivity_timeout;
451             $mango = $mango->inactivity_timeout(15);
452              
453             Maximum amount of time in seconds a connection can be inactive before getting
454             closed, defaults to C<0>. Setting the value to C<0> will allow connections to
455             be inactive indefinitely.
456              
457             =head2 ioloop
458              
459             my $loop = $mango->ioloop;
460             $mango = $mango->ioloop(Mojo::IOLoop->new);
461              
462             Event loop object to use for blocking I/O operations, defaults to a
463             L object.
464              
465             =head2 j
466              
467             my $j = $mango->j;
468             $mango = $mango->j(1);
469              
470             Wait for all operations to have reached the journal, defaults to C<0>.
471              
472             =head2 max_bson_size
473              
474             my $max = $mango->max_bson_size;
475             $mango = $mango->max_bson_size(16777216);
476              
477             Maximum size for BSON documents in bytes, defaults to C<16777216> (16MB).
478              
479             =head2 max_connections
480              
481             my $max = $mango->max_connections;
482             $mango = $mango->max_connections(5);
483              
484             Maximum number of connections to use for non-blocking operations, defaults to
485             C<5>.
486              
487             =head2 max_write_batch_size
488              
489             my $max = $mango->max_write_batch_size;
490             $mango = $mango->max_write_batch_size(1000);
491              
492             Maximum number of write operations to batch together, defaults to C<1000>.
493              
494             =head2 protocol
495              
496             my $protocol = $mango->protocol;
497             $mango = $mango->protocol(Mango::Protocol->new);
498              
499             Protocol handler, defaults to a L object.
500              
501             =head2 w
502              
503             my $w = $mango->w;
504             $mango = $mango->w(2);
505              
506             Wait for all operations to have reached at least this many servers, C<1>
507             indicates just primary, C<2> indicates primary and at least one secondary,
508             defaults to C<1>.
509              
510             =head2 wtimeout
511              
512             my $timeout = $mango->wtimeout;
513             $mango = $mango->wtimeout(1);
514              
515             Timeout for write propagation in milliseconds, defaults to C<1000>.
516              
517             =head1 METHODS
518              
519             L inherits all methods from L and implements the following
520             new ones.
521              
522             =head2 backlog
523              
524             my $num = $mango->backlog;
525              
526             Number of queued operations that have not yet been assigned to a connection.
527              
528             =head2 db
529              
530             my $db = $mango->db;
531             my $db = $mango->db('test');
532              
533             Build L object for database, uses L if no name
534             is provided. Note that the reference L is weakened,
535             so the L object needs to be referenced elsewhere as well.
536              
537             =head2 from_string
538              
539             $mango
540             = $mango->from_string('mongodb://sri:s3cret@localhost:3000/test?w=2');
541              
542             Parse configuration from connection string.
543              
544             =head2 get_more
545              
546             my $reply = $mango->get_more($namespace, $return, $cursor);
547              
548             Perform low level C operation. You can also append a callback to
549             perform operation non-blocking.
550              
551             $mango->get_more(($namespace, $return, $cursor) => sub {
552             my ($mango, $err, $reply) = @_;
553             ...
554             });
555             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
556              
557             =head2 kill_cursors
558              
559             $mango->kill_cursors(@ids);
560              
561             Perform low level C operation. You can also append a callback to
562             perform operation non-blocking.
563              
564             $mango->kill_cursors(@ids => sub {
565             my ($mango, $err) = @_;
566             ...
567             });
568             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
569              
570             =head2 new
571              
572             my $mango = Mango->new;
573             my $mango = Mango->new('mongodb://sri:s3cret@localhost:3000/test?w=2');
574              
575             # Using TLS encryption
576             my $mango = Mango->new('mongodb://127.0.0.1:27017', tls => 1,
577             tls_cert => '/path/to/certificate.pem');
578              
579             Construct a new L object and parse connection string with
580             L if necessary.
581              
582             Not that is is B recommended to build your Mango object inside
583             a helper function like shown in the synopsis. This is because the Mango's
584             object reference inside L objects is weakened to avoid
585             memory leaks. This means your Mango instance is quickly going to get
586             undefined after you use the C method. So, use a helper to prevent that.
587              
588             If a username and password are provided, Mango will try to authenticate using
589             SCRAM-SHA1. B this will require L which is not
590             installed by default.
591              
592             Any extra arguments given after the connection string will be passed to the
593             C method from L. To connect to a server using
594             TLS, use the options C (boolean), C and optionally C.
595              
596             =head2 query
597              
598             my $reply
599             = $mango->query($namespace, $flags, $skip, $return, $query, $fields);
600              
601             Perform low level C operation. You can also append a callback to
602             perform operation non-blocking.
603              
604             $mango->query(($namespace, $flags, $skip, $return, $query, $fields) => sub {
605             my ($mango, $err, $reply) = @_;
606             ...
607             });
608             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
609              
610             =head1 DEBUGGING
611              
612             You can set the C environment variable to get some advanced
613             diagnostics information printed to C.
614              
615             MANGO_DEBUG=1
616              
617             =head1 SPONSORS
618              
619             Some of the work on this distribution has been sponsored by
620             L, thank you!
621              
622             =head1 AUTHOR
623              
624             Sebastian Riedel, C.
625              
626             Current maintainer: Olivier Duclos C.
627              
628             =head1 CREDITS
629              
630             In alphabetical order:
631              
632             =over 2
633              
634             alexbyk
635              
636             Andrey Khozov
637              
638             Colin Cyr
639              
640             =back
641              
642             =head1 COPYRIGHT AND LICENSE
643              
644             Copyright (C) 2013-2014, Sebastian Riedel.
645              
646             This program is free software, you can redistribute it and/or modify it under
647             the terms of the Artistic License version 2.0.
648              
649             =head1 SEE ALSO
650              
651             L, L,
652             L.
653              
654             =cut