File Coverage

blib/lib/Mango/Cursor.pm
Criterion Covered Total %
statement 6 65 9.2
branch 0 28 0.0
condition 0 16 0.0
subroutine 2 20 10.0
pod 5 5 100.0
total 13 134 9.7


line stmt bran cond sub pod time code
1             package Mango::Cursor;
2 9     9   59 use Mojo::Base -base;
  9         19  
  9         57  
3              
4 9     9   1141 use Mojo::IOLoop;
  9         19  
  9         42  
5              
6             has [qw(collection id ns)];
7             has [qw(batch_size limit)] => 0;
8              
9             sub add_batch {
10 0     0 1   my ($self, $docs) = @_;
11 0   0       push @{$self->{results} ||= []}, @$docs;
  0            
12 0           return $self;
13             }
14              
15             sub all {
16 0     0 1   my ($self, $cb) = @_;
17              
18             # Non-blocking
19 0           my @all;
20 0 0   0     return $self->next(sub { shift->_collect(\@all, $cb, @_) }) if $cb;
  0            
21              
22             # Blocking
23 0           while (my $next = $self->next) { push @all, $next }
  0            
24 0           return \@all;
25             }
26              
27             sub next {
28 0     0 1   my ($self, $cb) = @_;
29 0 0         return defined $self->id ? $self->_continue($cb) : $self->_start($cb);
30             }
31              
32             sub num_to_return {
33 0     0 1   my $self = shift;
34 0           my $limit = $self->limit;
35 0           my $size = $self->batch_size;
36 0 0 0       return $limit == 0 || ($size > 0 && $size < $limit) ? $size : $limit;
37             }
38              
39             sub rewind {
40 0     0 1   my ($self, $cb) = @_;
41              
42 0           delete @$self{qw(num results)};
43 0 0         return $cb ? $self->_defer($cb) : undef unless defined(my $id = $self->id);
    0          
44 0           $self->id(undef);
45              
46             # Non-blocking
47 0           my $mango = $self->collection->db->mango;
48 0 0   0     return $mango->kill_cursors($id => sub { shift; $self->$cb(@_) }) if $cb;
  0            
  0            
49              
50             # Blocking
51 0           $mango->kill_cursors($id);
52             }
53              
54             sub _collect {
55 0     0     my ($self, $all, $cb, $err, $doc) = @_;
56 0 0 0       return $self->_defer($cb, $err, $all) if $err || !$doc;
57 0           push @$all, $doc;
58 0     0     $self->next(sub { shift->_collect($all, $cb, @_) });
  0            
59             }
60              
61             sub _continue {
62 0     0     my ($self, $cb) = @_;
63              
64 0           my $collection = $self->collection;
65 0   0       my $name = $self->ns // $collection->full_name;
66 0           my $mango = $collection->db->mango;
67              
68             # Non-blocking
69 0 0         if ($cb) {
70 0 0         return $self->_defer($cb, undef, $self->_dequeue) if $self->_enough;
71             return $mango->get_more(($name, $self->num_to_return, $self->id) =>
72 0     0     sub { shift; $self->$cb(shift, $self->_enqueue(shift)) });
  0            
  0            
73             }
74              
75             # Blocking
76 0 0         return $self->_dequeue if $self->_enough;
77 0           return $self->_enqueue(
78             $mango->get_more($name, $self->num_to_return, $self->id));
79             }
80              
81             sub _defer {
82 0     0     my ($self, $cb, @args) = @_;
83 0     0     Mojo::IOLoop->next_tick(sub { $self->$cb(@args) });
  0            
84             }
85              
86             sub _dequeue {
87 0     0     my $self = shift;
88 0 0         return undef if $self->_finished;
89 0           $self->{num}++;
90 0           return shift @{$self->{results}};
  0            
91             }
92              
93             sub _enough {
94 0     0     my $self = shift;
95 0   0       return $self->id eq '0' || $self->_finished || !!@{$self->{results} // []};
96             }
97              
98             sub _enqueue {
99 0     0     my ($self, $reply) = @_;
100 0 0         return undef unless $reply;
101 0           return $self->add_batch($reply->{docs})->id($reply->{cursor})->_dequeue;
102             }
103              
104             sub _finished {
105 0     0     my $self = shift;
106 0 0         return undef unless my $limit = $self->limit;
107 0 0 0       return ($self->{num} // 0) >= abs($limit) ? 1 : undef;
108             }
109              
110 0     0     sub _start { die 'Cursor cannot be restarted' }
111              
112             1;
113              
114             =encoding utf8
115              
116             =head1 NAME
117              
118             Mango::Cursor - MongoDB cursor
119              
120             =head1 SYNOPSIS
121              
122             use Mango::Cursor;
123              
124             my $cursor = Mango::Cursor->new(collection => $collection);
125             my $docs = $cursor->all;
126              
127             =head1 DESCRIPTION
128              
129             L is a container for MongoDB cursors used by
130             L.
131              
132             =head1 ATTRIBUTES
133              
134             L implements the following attributes.
135              
136             =head2 batch_size
137              
138             my $size = $cursor->batch_size;
139             $cursor = $cursor->batch_size(10);
140              
141             Number of documents to fetch in one batch, defaults to C<0>.
142              
143             =head2 collection
144              
145             my $collection = $cursor->collection;
146             $cursor = $cursor->collection(Mango::Collection->new);
147              
148             L object this cursor belongs to.
149              
150             =head2 id
151              
152             my $id = $cursor->id;
153             $cursor = $cursor->id(123456);
154              
155             Cursor id.
156              
157             =head2 limit
158              
159             my $limit = $cursor->limit;
160             $cursor = $cursor->limit(10);
161              
162             Limit the number of documents, defaults to C<0>.
163              
164             =head1 METHODS
165              
166             L inherits all methods from L and implements the
167             following new ones.
168              
169             =head2 add_batch
170              
171             $cursor = $cursor->add_batch($docs);
172              
173             Add batch of documents to cursor.
174              
175             =head2 all
176              
177             my $docs = $cursor->all;
178              
179             Fetch all documents at once. You can also append a callback to perform
180             operation non-blocking.
181              
182             $cursor->all(sub {
183             my ($cursor, $err, $docs) = @_;
184             ...
185             });
186             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
187              
188             =head2 next
189              
190             my $doc = $cursor->next;
191              
192             Fetch next document. You can also append a callback to perform operation
193             non-blocking.
194              
195             $cursor->next(sub {
196             my ($cursor, $err, $doc) = @_;
197             ...
198             });
199             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
200              
201             =head2 rewind
202              
203             $cursor->rewind;
204              
205             Rewind cursor and kill it on the server. You can also append a callback to
206             perform operation non-blocking.
207              
208             $cursor->rewind(sub {
209             my ($cursor, $err) = @_;
210             ...
211             });
212             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
213              
214             =head2 num_to_return
215              
216             my $num = $cursor->num_to_return;
217              
218             Number of results to return with next C or C operation based
219             on L and L.
220              
221             =head1 SEE ALSO
222              
223             L, L, L.
224              
225             =cut