File Coverage

blib/lib/Couchbase/Test/Async/Loop.pm
Criterion Covered Total %
statement 78 213 36.6
branch 0 50 0.0
condition 0 9 0.0
subroutine 26 48 54.1
pod 0 10 0.0
total 104 330 31.5


line stmt bran cond sub pod time code
1             package Couchbase::Test::Async::Loop;
2 2     2   12 use strict;
  2         4  
  2         88  
3 2     2   26 use warnings;
  2         6  
  2         58  
4 2     2   40 use Couchbase::Client::Async;
  2         6  
  2         40  
5 2     2   10 use Couchbase::Client::IDXConst;
  2         6  
  2         494  
6 2     2   10 use Couchbase::Client::Errors;
  2         6  
  2         226  
7              
8 2     2   12 use POE;
  2         4  
  2         16  
9 2     2   684 use POE::Kernel;
  2         4  
  2         34  
10 2     2   96 use POE::Session;
  2         6  
  2         8  
11 2     2   166 use Data::Dumper;
  2         6  
  2         112  
12 2     2   12 use Log::Fu { level => "info" };
  2         6  
  2         18  
13 2     2   4100 use Devel::Peek;
  2         1020  
  2         12  
14 2     2   206 use Array::Assign;
  2         6  
  2         98  
15              
16 2     2   8 use base qw(POE::Sugar::Attributes);
  2         4  
  2         300  
17              
18             my $poe_kernel = "POE::Kernel";
19              
20             sub cbc_connect :Start {
21 0     0 0 0 $_[HEAP]->object->connect();
22 2     2   14 }
  2         4  
  2         18  
23              
24             sub unhandled :Event(_default) {
25 0     0 0 0 log_errf("Got unknown event %s", $_[ARG0]);
26 2     2   676 }
  2         6  
  2         10  
27              
28             sub got_error :Event {
29 0     0 0 0 log_errf("Got errnum=%d, errstr=%s",
30             $_[ARG0], $_[ARG1]);
31 0         0 $_[HEAP]->on_error(@_[ARG0,ARG1]);
32 2     2   614 }
  2         4  
  2         8  
33              
34              
35             #This would be an event-loop specific implementation of update_event
36             my %EVMETH_MAP = (
37             COUCHBASE_WRITE_EVENT, "write",
38             COUCHBASE_READ_EVENT, "read"
39             );
40              
41             sub _activate_events {
42 0     0     my ($cbc_flags, $dupfh, $opaque) = @_;
43 0           while (my ($ev,$meth) = each %EVMETH_MAP ) {
44 0 0         if($cbc_flags & $ev) {
45 0           log_debugf("Activating event %d on dupfd %d", $ev, fileno($dupfh));
46 0           $poe_kernel->${\"select_$meth"}($dupfh, "dispatch_event", $ev, $opaque);
  0            
47             }
48             }
49             }
50              
51             sub _deactivate_events {
52 0     0     my ($cbc_flags, $dupfh) = @_;
53 0           while (my ($ev,$meth) = each %EVMETH_MAP ) {
54 0 0         if($cbc_flags & $ev) {
55 0           log_debugf("Deactivating event %d on dupfd %d", $ev, fileno($dupfh));
56 0           $poe_kernel->${\"select_$meth"}($dupfh);
  0            
57             }
58             }
59             }
60              
61             sub _startstop_events {
62 0     0     my ($events,$prefix,$dupfh) = @_;
63 0           while (my ($ev,$meth) = each %EVMETH_MAP) {
64 0 0         if($events & $ev) {
65 0           log_debugf("Invoking $prefix: $meth on dupfd %d", fileno($dupfh));
66 0           $poe_kernel->${\"$prefix\_$meth"}($dupfh);
  0            
67             }
68             }
69             }
70              
71              
72             sub update_event :Event {
73 0     0 0 0 my ($evdata,$action,$flags) = @_[ARG0..ARG2];
74 0         0 my $dupfh = $evdata->[EVIDX_DUPFH];
75            
76 0 0 0     0 if($action == EVACTION_WATCH) {
    0          
    0          
77 0 0       0 if(!$dupfh) {
78 0         0 open $dupfh, ">&", $evdata->[EVIDX_FD];
79 0         0 _activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
80 0         0 $evdata->[EVIDX_DUPFH] = $dupfh;
81             } else {
82 0         0 my $events_do_delete = $evdata->[EVIDX_WATCHFLAGS] & (~$flags);
83 0         0 log_debugf("Old events=%x, new events = %x, delete events %x",
84             $evdata->[EVIDX_WATCHFLAGS], $flags, $events_do_delete);
85 0         0 _activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
86 0         0 _deactivate_events($events_do_delete, $dupfh);
87             }
88             } elsif ($action == EVACTION_UNWATCH) {
89 0 0       0 if(!$dupfh) {
90 0         0 warn("Unwatch requested on undefined dup'd filehandle");
91 0         0 return;
92             }
93 0         0 _deactivate_events($evdata->[EVIDX_WATCHFLAGS], $dupfh);
94             } elsif ($action == EVACTION_SUSPEND || $action == EVACTION_RESUME) {
95 0 0       0 if(!$dupfh) {
96 0         0 warn("suspend/resume requested on undefined dup'd filehandle. ".
97             "fd=".$evdata->[EVIDX_FD]);
98             }
99 0 0       0 my $prefix = $action == EVACTION_SUSPEND ? "pause" : "resume";
100 0         0 $prefix = "select_" . $prefix;
101 0         0 _startstop_events($evdata->[EVIDX_WATCHFLAGS], $prefix, $dupfh);
102             } else {
103 0         0 die("Unhandled action $action");
104             }
105 2     2   1612 }
  2         4  
  2         8  
106              
107             sub update_timer :Event {
108 0     0 0 0 my ($evdata,$action,$usecs) = @_[ARG0..ARG2];
109 0         0 my $timer_id = $evdata->[EVIDX_PLDATA];
110 0         0 my $seconds;
111            
112 0 0       0 if($usecs) {
113 0         0 $seconds = ($usecs / (1000*1000));
114             }
115 0 0       0 if($action == EVACTION_WATCH) {
116 0 0       0 if(defined $timer_id) {
117 0         0 log_debugf("Rescheduling timer %d in %0.5f seconds from now",
118             $timer_id, $seconds);
119 0         0 $poe_kernel->delay_adjust($timer_id, $seconds)
120             } else {
121 0         0 $timer_id = $poe_kernel->delay_set(
122             "dispatch_timeout", $seconds, $evdata->[EVIDX_OPAQUE]);
123 0         0 $evdata->[EVIDX_PLDATA] = $timer_id;
124 0         0 log_debugf("Scheduling timer %d for %0.5f seconds from now",
125             $timer_id, $seconds);
126             }
127             } else {
128 0 0       0 if(defined $timer_id) {
129 0         0 log_debug("Deletion requested for timer $timer_id.");
130 0         0 $poe_kernel->alarm_remove($timer_id);
131 0         0 $evdata->[EVIDX_PLDATA] = undef;
132             } else {
133 0         0 log_debug("Requested to delete non-existent timer ID");
134             }
135             }
136 2     2   748 }
  2         4  
  2         8  
137              
138             #this is what an event loop does in order to tell libcouchbase that an event
139             #has been received
140             sub dispatch_event :Event {
141 0     0 0 0 my ($flags,$opaque) = @_[ARG2..ARG3];
142 0         0 log_debugf("Flags=%d, opaque=%x", $flags, $opaque);
143 0         0 Couchbase::Client::Async->HaveEvent($flags, $opaque);
144 2     2   524 }
  2         2  
  2         8  
145              
146             sub dispatch_timeout :Event {
147 0     0 0 0 my $opaque = $_[ARG0];
148 0         0 my $flags = 0;
149 0         0 log_debugf("Dispatching timer.. opaque=%x", $opaque);
150 0         0 Couchbase::Client::Async->HaveEvent($flags, $opaque);
151 2     2   466 }
  2         4  
  2         10  
152              
153              
154             #### External interface
155              
156             use Class::XSAccessor {
157 2         28 constructor => 'new',
158             accessors => [qw(object alias on_ready on_error)]
159 2     2   416 };
  2         4  
160              
161             sub spawn {
162 0     0 0   my ($cls,$session_name,%options) = @_;
163 0 0         my $cb_ready = delete $options{on_ready}
164             or die ("Must have on_ready callback");
165 0           my $user_error_callback = delete $options{on_error};
166            
167             my $async = Couchbase::Client::Async->new({
168             %options,
169             cb_error =>
170 0     0     sub { $poe_kernel->post($session_name, "got_error", @_) },
171             cb_update_event =>
172 0     0     sub { $poe_kernel->call($session_name, "update_event", @_) },
173            
174             cb_waitdone => $cb_ready,
175            
176             cb_update_timer =>
177 0     0     sub { $poe_kernel->call($session_name, "update_timer", @_) }
178 0           });
179            
180 0           my $o = __PACKAGE__->new(alias => $session_name, object => $async,
181             on_error => $user_error_callback);
182 0           POE::Session->create(
183             heap => $o,
184             inline_states =>
185             POE::Sugar::Attributes->inline_states(__PACKAGE__, $session_name)
186             );
187 0           $async->connect();
188 0           return $o;
189             }
190              
191             sub _single_dispatch_common {
192 0     0     my ($result,$arg) = @_;
193 0           my ($key) = keys %$result;
194 0           my ($ret) = values %$result;
195            
196 0 0         if($arg->{callback}) {
197 0           $arg->{callback}->($key, $ret, $arg->{arg});
198             } else {
199 0           $poe_kernel->post($arg->{session}, $arg->{state},
200             $key, $ret,$arg->{arg});
201             }
202             }
203              
204             sub _strop_common {
205 0     0     my ($self,$command,$key,
206             $value,$expiry,$cas,$cbparams) = @_;
207 0           my @arry;
208            
209 0 0 0       if( $cbparams->{state} && (!$cbparams->{session}) ) {
210 0           $cbparams->{session} = $_[SENDER];
211             }
212            
213 0 0 0       unless($cbparams->{state} || $cbparams->{callback}) {
214 0           die("Must have either target state or CODE reference for notification");
215             }
216            
217 0 0         if($cbparams->{callback}) {
218 0 0         unless(ref $cbparams->{callback} eq 'CODE') {
219 0           die("Callback must be a CODE reference");
220             }
221             }
222            
223 0           arry_assign_i(@arry,
224             REQIDX_KEY, $key,
225             REQIDX_VALUE, $value,
226             REQIDX_EXP, $expiry,
227             REQIDX_CAS, $cas);
228            
229 0           $self->object->request(
230             $command, REQTYPE_SINGLE,
231             \&_single_dispatch_common, $cbparams, CBTYPE_COMPLETION,
232             \@arry
233             );
234             }
235              
236              
237             sub _numop_common {
238 0     0     my ($self,$key,$delta,$initial,$expiry,$cbparams) = @_;
239 0           my @arry;
240 0           arry_assign_i(@arry,
241             REQIDX_KEY, $key,
242             REQIDX_ARITH_DELTA, $delta,
243             REQIDX_ARITH_INITIAL, $initial,
244             REQIDX_EXP, $expiry);
245 0           $self->object->request(
246             PLCBA_CMD_ARITHMETIC, REQTYPE_SINGLE,
247             \&_single_dispatch_common, $cbparams, CBTYPE_COMPLETION,
248             \@arry
249             );
250             }
251              
252             my %_state_map = (
253             add => PLCBA_CMD_ADD,
254             replace => PLCBA_CMD_REPLACE,
255             append => PLCBA_CMD_APPEND,
256             prepend => PLCBA_CMD_PREPEND,
257             set => PLCBA_CMD_SET
258             );
259              
260             sub _set_common :Event(add, replace, append, prepend, set)
261             {
262 0     0   0 my ($op_params,$cb_params) = @_[ARG0,ARG1];
263 0         0 my ($key,$value,$expiry,$cas);
264 0         0 my $command;
265            
266 0 0       0 if($_[STATE] eq 'cas') {
267 0         0 $command = PLCBA_CMD_SET;
268 0         0 ($key,$value,$cas,$expiry) = @$op_params;
269             } else {
270 0         0 $command = $_state_map{$_[STATE]};
271 0         0 ($key,$value,$expiry) = @$op_params;
272             }
273            
274 0         0 $_[HEAP]->_strop_common($command,
275             $key, $value, $expiry, $cas,
276             $cb_params);
277 2     2   2290 }
  2         4  
  2         8  
278              
279             sub get :Event {
280 0     0 0 0 my ($key,$cbparams) = @_[ARG0,ARG1];
281 0 0       0 if(ref $key) {
282 0         0 $key = $key->[0];
283             }
284 0         0 $_[HEAP]->_strop_common(
285             PLCBA_CMD_GET, $key, undef, undef, undef, $cbparams);
286 2     2   510 }
  2         6  
  2         18  
287              
288             sub arithmetic :Event {
289 0     0 0 0 my ($op_params,$cb_params) = @_[ARG0, ARG1];
290 0         0 my ($key,$delta,$initial,$expiry) = @$op_params;
291 0         0 $_[HEAP]->_numop_common($key, $delta, $initial, $expiry, $cb_params);
292 2     2   520 }
  2         4  
  2         12  
293              
294             sub _arith_basic :Event(incr, decr) {
295 0     0   0 my ($op_params,$cb_params) = @_[ARG0..ARG1];
296 0         0 my ($key,$delta,$expiry);
297 0 0       0 if(!ref $op_params) {
298 0         0 $delta = 1;
299 0         0 $key = $op_params;
300             } else {
301 0         0 ($key,$delta,$expiry) = @$op_params;
302             }
303 0 0       0 if($_[STATE] eq 'decr') {
304 0         0 $delta = (-$delta);
305             }
306 0         0 $_[HEAP]->_numop_common($key, $delta, undef, $expiry, $cb_params);
307 2     2   600 }
  2         4  
  2         8  
308              
309             sub _keyop :Event(touch, remove)
310             {
311 0     0     my ($op_params, $cb_params) = @_[ARG0,ARG1];
312 0           my ($key,$expiry,$cas);
313 0           my $command;
314            
315 0 0         if($_[STATE] eq 'touch') {
316 0           ($key,$expiry) = @$op_params;
317 0           $command = PLCBA_CMD_TOUCH;
318             } else {
319 0           ($key,$cas) = @$op_params;
320 0           $command = PLCBA_CMD_REMOVE;
321             }
322            
323 0           my @arry;
324 0           arry_assign_i(@arry,
325             REQIDX_KEY, $key,
326             REQIDX_EXP, $expiry,
327             REQIDX_CAS, $cas);
328 0           $_[HEAP]->object->request(
329             $command, REQTYPE_SINGLE,
330             \&_single_dispatch_common, $cb_params, CBTYPE_COMPLETION,
331             \@arry
332             );
333 2     2   656 }
  2         4  
  2         8  
334              
335             1;