File Coverage

blib/lib/MongoDB/ChangeStream.pm
Criterion Covered Total %
statement 36 73 49.3
branch 0 26 0.0
condition 0 11 0.0
subroutine 12 16 75.0
pod 2 3 66.6
total 50 129 38.7


line stmt bran cond sub pod time code
1             # Copyright 2018 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   404 use strict;
  59         134  
  59         1759  
16 59     59   325 use warnings;
  59         129  
  59         1918  
17             package MongoDB::ChangeStream;
18              
19             # ABSTRACT: A stream providing update information for collections.
20              
21 59     59   319 use version;
  59         129  
  59         396  
22             our $VERSION = 'v2.2.1';
23              
24 59     59   4448 use Moo;
  59         151  
  59         460  
25 59     59   21135 use MongoDB::Cursor;
  59         140  
  59         1510  
26 59     59   25580 use MongoDB::Op::_ChangeStream;
  59         217  
  59         2334  
27 59     59   485 use MongoDB::Error;
  59         132  
  59         6734  
28 59     59   407 use Safe::Isa;
  59         132  
  59         7485  
29 59     59   421 use BSON::Timestamp;
  59         126  
  59         1692  
30 59         459 use MongoDB::_Types qw(
31             MongoDBCollection
32             ArrayOfHashRef
33             Boolish
34             BSONTimestamp
35             ClientSession
36 59     59   331 );
  59         163  
37 59         270 use Types::Standard qw(
38             InstanceOf
39             HashRef
40             Maybe
41             Str
42             Num
43 59     59   87863 );
  59         160  
44              
45 59     59   67962 use namespace::clean -except => 'meta';
  59         153  
  59         334  
46              
47             has _result => (
48             is => 'rw',
49             isa => InstanceOf['MongoDB::QueryResult'],
50             init_arg => undef,
51             );
52              
53             has _client => (
54             is => 'ro',
55             isa => InstanceOf['MongoDB::MongoClient'],
56             init_arg => 'client',
57             required => 1,
58             );
59              
60             has _op_args => (
61             is => 'ro',
62             isa => HashRef,
63             init_arg => 'op_args',
64             required => 1,
65             );
66              
67             has _pipeline => (
68             is => 'ro',
69             isa => ArrayOfHashRef,
70             init_arg => 'pipeline',
71             required => 1,
72             );
73              
74             has _full_document => (
75             is => 'ro',
76             isa => Str,
77             init_arg => 'full_document',
78             predicate => '_has_full_document',
79             );
80              
81             has _resume_after => (
82             is => 'ro',
83             init_arg => 'resume_after',
84             predicate => '_has_resume_after',
85             );
86              
87             has _start_after => (
88             is => 'ro',
89             init_arg => 'start_after',
90             predicate => '_has_start_after',
91             );
92              
93             has _all_changes_for_cluster => (
94             is => 'ro',
95             isa => Boolish,
96             init_arg => 'all_changes_for_cluster',
97             default => sub { 0 },
98             );
99              
100             has _start_at_operation_time => (
101             is => 'ro',
102             isa => BSONTimestamp,
103             init_arg => 'start_at_operation_time',
104             predicate => '_has_start_at_operation_time',
105             coerce => sub {
106             ref($_[0]) ? $_[0] : BSON::Timestamp->new(seconds => $_[0])
107             },
108             );
109              
110             has _session => (
111             is => 'ro',
112             isa => Maybe[ClientSession],
113             init_arg => 'session',
114             );
115              
116             has _options => (
117             is => 'ro',
118             isa => HashRef,
119             init_arg => 'options',
120             default => sub { {} },
121             );
122              
123             has _max_await_time_ms => (
124             is => 'ro',
125             isa => Num,
126             init_arg => 'max_await_time_ms',
127             predicate => '_has_max_await_time_ms',
128             );
129              
130             has _last_operation_time => (
131             is => 'rw',
132             init_arg => undef,
133             predicate => '_has_last_operation_time',
134             );
135              
136             has _last_resume_token => (
137             is => 'rw',
138             init_arg => undef,
139             predicate => '_has_last_resume_token',
140             );
141              
142             sub BUILD {
143 0     0 0   my ($self) = @_;
144              
145             # starting point is construction time instead of first next call
146 0           $self->_execute_query;
147             }
148              
149             sub _execute_query {
150 0     0     my ($self) = @_;
151              
152 0           my $resume_opt = {};
153              
154             # seen prior results, continuing after last resume token
155 0 0         if ($self->_has_last_resume_token) {
    0          
    0          
156 0           $resume_opt->{resume_after} = $self->_last_resume_token;
157             }
158             elsif ( $self->_has_start_after ) {
159             $self->_last_resume_token(
160 0           $resume_opt->{start_after} = $self->_start_after
161             );
162             }
163             # no results yet, but we have operation time from prior query
164             elsif ($self->_has_last_operation_time) {
165 0           $resume_opt->{start_at_operation_time} = $self->_last_operation_time;
166             }
167             # no results and no prior operation time, send specified options
168             else {
169 0 0         $resume_opt->{start_at_operation_time} = $self->_start_at_operation_time
170             if $self->_has_start_at_operation_time;
171 0 0         if ( $self->_has_resume_after ) {
172             $self->_last_resume_token(
173 0           $resume_opt->{resume_after} = $self->_resume_after
174             );
175             }
176             }
177              
178             my $op = MongoDB::Op::_ChangeStream->new(
179             pipeline => $self->_pipeline,
180             all_changes_for_cluster => $self->_all_changes_for_cluster,
181             session => $self->_session,
182             options => $self->_options,
183             client => $self->_client,
184             $self->_has_full_document
185             ? (full_document => $self->_full_document)
186             : (),
187             $self->_has_max_await_time_ms
188             ? (maxAwaitTimeMS => $self->_max_await_time_ms)
189             : (),
190             %$resume_opt,
191 0 0         %{ $self->_op_args },
  0 0          
192             );
193              
194 0           my $res = $self->_client->send_retryable_read_op($op);
195 0           $self->_result($res->{result});
196             $self->_last_operation_time($res->{operationTime})
197 0 0         if exists $res->{operationTime};
198             }
199              
200             #pod =head1 STREAM METHODS
201             #pod
202             #pod =cut
203              
204             #pod =head2 next
205             #pod
206             #pod $change_stream = $collection->watch(...);
207             #pod $change = $change_stream->next;
208             #pod
209             #pod Waits for the next change in the collection and returns it.
210             #pod
211             #pod B: This method will wait for the amount of milliseconds passed
212             #pod as C to L or the server's
213             #pod default wait-time. It will not wait indefinitely.
214             #pod
215             #pod =cut
216              
217             sub next {
218 0     0 1   my ($self) = @_;
219              
220 0           my $change;
221             my $retried;
222 0           while (1) {
223             last if eval {
224 0           $change = $self->_result->next;
225 0           1; # successfully fetched result
226 0 0 0       } or do {
227 0   0       my $error = $@ || "Unknown error";
228 0 0 0       if (
      0        
229             not($retried)
230             and $error->$_isa('MongoDB::Error')
231             and $error->_is_resumable
232             ) {
233 0           $retried = 1;
234 0           $self->_execute_query;
235             }
236             else {
237 0           die $error;
238             }
239 0           0; # failed, cursor was rebuilt
240             };
241             }
242              
243             # this differs from drivers that block indefinitely. we have to
244             # deal with the situation where no results are available.
245 0 0         if (not defined $change) {
246 0           return undef; ## no critic
247             }
248              
249 0 0         if (exists $change->{'postBatchResumeToken'}) {
    0          
250 0           $self->_last_resume_token( $change->{'postBatchResumeToken'} );
251 0           return $change;
252             }
253             elsif (exists $change->{_id}) {
254 0           $self->_last_resume_token( $change->{_id} );
255 0           return $change;
256             }
257             else {
258 0           MongoDB::InvalidOperationError->throw(
259             "Cannot provide resume functionality when the ".
260             "resume token is missing");
261             }
262             }
263              
264             #pod =head2 get_resume_token
265             #pod
266             #pod Users can inspect the C<_id> on each C to use as a
267             #pod resume token. But since MongoDB 4.2, C and C responses
268             #pod also include a C. Drivers use one or the other
269             #pod when automatically resuming.
270             #pod
271             #pod This method retrieves the same resume token that would be used to
272             #pod automatically resume. Users intending to store the resume token
273             #pod should use this method to get the most up to date resume token.
274             #pod
275             #pod For instance:
276             #pod
277             #pod if ($local_change) {
278             #pod process_change($local_change);
279             #pod }
280             #pod
281             #pod eval {
282             #pod my $change_stream = $coll->watch([], { resumeAfter => $local_resume_token });
283             #pod while ( my $change = $change_stream->next) {
284             #pod $local_resume_token = $change_stream->get_resume_token;
285             #pod $local_change = $change;
286             #pod process_change($local_change);
287             #pod }
288             #pod };
289             #pod if (my $err = $@) {
290             #pod $log->error($err);
291             #pod }
292             #pod
293             #pod In this case the current change is always persisted locally,
294             #pod including the resume token, such that on restart the application
295             #pod can still process the change while ensuring that the change stream
296             #pod continues from the right logical time in the oplog. It is the
297             #pod application's responsibility to ensure that C is
298             #pod idempotent, this design merely makes a reasonable effort to process
299             #pod each change at least once.
300             #pod
301             #pod =cut
302              
303 0     0 1   sub get_resume_token { $_[0]->_last_resume_token }
304              
305             1;
306              
307             __END__