File Coverage

blib/lib/Mojo/Reactor/POE.pm
Criterion Covered Total %
statement 178 179 99.4
branch 44 56 78.5
condition 13 28 46.4
subroutine 40 40 100.0
pod 13 13 100.0
total 288 316 91.1


line stmt bran cond sub pod time code
1             package Mojo::Reactor::POE;
2              
3 1     1   41034 use POE; # Loaded early to avoid event loop confusion
  1         17747  
  1         6  
4 1     1   50375 BEGIN { POE::Kernel->run } # silence run() warning
5              
6 1     1   1180 use Mojo::Base 'Mojo::Reactor';
  1         2  
  1         8  
7              
8             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::POE';
9              
10 1     1   48261 use Carp 'croak';
  1         3  
  1         47  
11 1     1   633 use Mojo::Reactor::Poll;
  1         1700  
  1         10  
12 1     1   28 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         42  
13 1     1   4 use Scalar::Util 'weaken';
  1         1  
  1         42  
14              
15 1     1   5 use constant { POE_IO_READ => 0, POE_IO_WRITE => 1 };
  1         2  
  1         72  
16 1   50 1   4 use constant DEBUG => $ENV{MOJO_REACTOR_POE_DEBUG} || 0;
  1         1  
  1         2801  
17              
18             our $VERSION = '0.009';
19              
20             my $POE;
21              
22             sub DESTROY {
23 2     2   591 my $self = shift;
24 2         8 $self->reset; # Close session
25 2         10 undef $POE;
26             }
27              
28             sub again {
29 7     7 1 252 my ($self, $id) = @_;
30 7 100       252 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
31 6         16 $timer->{time} = steady_time + $timer->{after};
32             # If session doesn't exist, the time will be set when it starts
33 6 50       64 $self->_session_call(mojo_adjust_timer => $id) if $self->_session_exists;
34             }
35              
36             sub io {
37 10     10 1 2930 my ($self, $handle, $cb) = @_;
38 10         50 $self->{io}{fileno $handle} = {cb => $cb};
39 10         41 return $self->watch($handle, 1, 1);
40             }
41              
42 10     10 1 2357 sub is_running { !!(shift->{running}) }
43              
44             # We have to fall back to Mojo::Reactor::Poll, since POE::Kernel is unique
45 6 100   6 1 56396 sub new { $POE++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }
46              
47             sub next_tick {
48 15     15 1 925 my ($self, $cb) = @_;
49 15         14 push @{$self->{next_tick}}, $cb;
  15         28  
50 15   66     49 $self->{next_timer} //= $self->timer(0 => \&_next);
51 15         28 return undef;
52             }
53              
54             sub one_tick {
55 7362     7362 1 335011 my $self = shift;
56 7362         14531 $self->_init_session;
57            
58             # Stop automatically if there is nothing to watch
59 7362 100 100     6509 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  7362         21846  
  6         30  
60            
61             # Just one tick
62 7357 100       14265 local $self->{running} = 1 unless $self->{running};
63            
64 7357         18526 POE::Kernel->run_one_timeslice;
65             }
66              
67 3     3 1 491 sub recurring { shift->_timer(1, @_) }
68              
69             sub remove {
70 10     10 1 5088 my ($self, $remove) = @_;
71 10 50       39 return unless defined $remove;
72 10 100       38 if (ref $remove) {
73 5 100       37 if (exists $self->{io}{fileno $remove}) {
74 3         5 warn "-- Removed IO watcher for ".fileno($remove)."\n" if DEBUG;
75             # If session doesn't exist, the watcher won't be re-added
76 3 50       29 $self->_session_call(mojo_clear_io => fileno $remove) if $self->_session_exists;
77             }
78 5         38 return !!delete $self->{io}{fileno $remove};
79             } else {
80 5 100       20 if (exists $self->{timers}{$remove}) {
81 4         5 warn "-- Removed timer $remove\n" if DEBUG;
82             # If session doesn't exist, the timer won't be re-added
83 4 50       50 $self->_session_call(mojo_clear_timer => $remove) if $self->_session_exists;
84             }
85 5         36 return !!delete $self->{timers}{$remove};
86             }
87             }
88              
89             sub reset {
90 4     4 1 10 my $self = shift;
91             # If session doesn't exist, watchers won't be re-added
92 4 100       10 if ($self->_session_exists) {
93 2         5 $self->_session_call('mojo_clear_timers');
94 2         3 $self->_session_call(mojo_clear_io => $_) for keys %{$self->{io}};
  2         9  
95             }
96 4         8 delete @{$self}{qw(io next_tick next_timer timers)};
  4         23  
97             }
98              
99             sub start {
100 22     22 1 105 my $self = shift;
101 22         80 $self->{running}++;
102 22         108 $self->one_tick while $self->{running};
103             }
104              
105 23     23 1 213 sub stop { delete shift->{running} }
106              
107 30     30 1 63386 sub timer { shift->_timer(0, @_) }
108              
109             sub watch {
110 24     24 1 200 my ($self, $handle, $read, $write) = @_;
111            
112 24 100       203 croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
113 23         42 $io->{handle} = $handle;
114 23         34 $io->{read} = $read;
115 23         34 $io->{write} = $write;
116            
117 23         24 warn "-- Set IO watcher for ".fileno($handle)."\n" if DEBUG;
118            
119 23         57 $self->_init_session->_session_call(mojo_set_io => fileno $handle);
120            
121 23         58 return $self;
122             }
123              
124             sub _id {
125 33     33   41 my $self = shift;
126 33         62 my $id;
127 33         36 do { $id = md5_sum 't' . steady_time . rand 999 } while $self->{timers}{$id};
  33         123  
128 33         1163 return $id;
129             }
130              
131             sub _next {
132 5     5   6 my $self = shift;
133 5         10 delete $self->{next_timer};
134 5         6 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  19         92  
  14         31  
135             }
136              
137             sub _timer {
138 33     33   67 my ($self, $recurring, $after, $cb) = @_;
139            
140 33         99 my $id = $self->_id;
141 33         94 my $timer = $self->{timers}{$id}
142             = {cb => $cb, after => $after, time => steady_time + $after};
143 33 100       333 $timer->{recurring} = $after if $recurring;
144            
145 33         30 if (DEBUG) {
146             my $is_recurring = $recurring ? ' (recurring)' : '';
147             warn "-- Set timer $id after $after seconds$is_recurring\n";
148             }
149            
150 33         87 $self->_init_session->_session_call(mojo_set_timer => $id);
151            
152 33         98 return $id;
153             }
154              
155             sub _try {
156 8067     8067   12524 my ($self, $what, $cb) = (shift, shift, shift);
157 8067 100       8454 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  8067         17666  
  8066         54460  
158             }
159              
160             sub _init_session {
161 8012     8012   8506 my $self = shift;
162 8012 100       12825 unless ($self->_session_exists) {
163 6         104 my $session = POE::Session->create(
164             inline_states => {
165             _start => \&_event_start,
166             _stop => \&_event_stop,
167             mojo_set_timer => \&_event_set_timer,
168             mojo_clear_timer => \&_event_clear_timer,
169             mojo_adjust_timer => \&_event_adjust_timer,
170             mojo_clear_timers => \&_event_clear_timers,
171             mojo_set_io => \&_event_set_io,
172             mojo_clear_io => \&_event_clear_io,
173             mojo_timer => \&_event_timer,
174             mojo_io => \&_event_io,
175             },
176             heap => { mojo_reactor => $self },
177             );
178 6         710 weaken $session->get_heap()->{mojo_reactor};
179 6         43 $self->{session_id} = $session->ID;
180             }
181 8012         11496 return $self;
182             }
183              
184             sub _session_exists {
185 8029     8029   9360 my $self = shift;
186 8029         24465 return defined $self->{session_id};
187             }
188              
189             sub _session_call {
190 668     668   585 my $self = shift;
191 668 50       1425 croak 'Session call on nonexistent session' unless defined $self->{session_id};
192 668         1905 POE::Kernel->call($self->{session_id}, @_);
193 668         3740 return $self;
194             }
195              
196             sub _event_start {
197 6     6   1690 my $self = $_[HEAP]{mojo_reactor};
198 6         11 my $session = $_[SESSION];
199            
200 6         24 warn "-- POE session started\n" if DEBUG;
201             }
202              
203             sub _event_stop {
204 5     5   558 my $self = $_[HEAP]{mojo_reactor};
205            
206 5         6 warn "-- POE session stopped\n" if DEBUG;
207            
208 5 50       33 delete $self->{session_id} if $self;
209             }
210              
211             sub _event_set_timer {
212 627     627   18442 my $self = $_[HEAP]{mojo_reactor};
213 627         682 my $id = $_[ARG0];
214 627 50 33     3195 return unless exists $self->{timers}{$id}
215             and defined $self->{timers}{$id}{time};
216 627         793 my $timer = $self->{timers}{$id};
217 627         1309 my $delay_time = $timer->{time} - steady_time;
218 627         3609 my $poe_id = POE::Kernel->delay_set(mojo_timer => $delay_time, $id);
219 627         31503 $timer->{poe_id} = $poe_id;
220            
221 627         1448 warn "-- Set POE timer $poe_id in $delay_time seconds\n" if DEBUG;
222             }
223              
224             sub _event_clear_timer {
225 4     4   187 my $self = $_[HEAP]{mojo_reactor};
226 4         9 my $id = $_[ARG0];
227 4 50 33     32 return unless exists $self->{timers}{$id}
228             and defined $self->{timers}{$id}{poe_id};
229 4         10 my $timer = $self->{timers}{$id};
230 4         21 POE::Kernel->alarm_remove($timer->{poe_id});
231            
232 4         377 warn "-- Cleared POE timer $timer->{poe_id}\n" if DEBUG;
233             }
234              
235             sub _event_adjust_timer {
236 6     6   200 my $self = $_[HEAP]{mojo_reactor};
237 6         8 my $id = $_[ARG0];
238 6 50 33     56 return unless exists $self->{timers}{$id}
      33        
239             and defined $self->{timers}{$id}{time}
240             and defined $self->{timers}{$id}{poe_id};
241 6         11 my $timer = $self->{timers}{$id};
242 6         16 my $new_delay = $timer->{time} - steady_time;
243 6         44 POE::Kernel->delay_adjust($timer->{poe_id}, $new_delay);
244            
245 6         549 warn "-- Adjusted POE timer $timer->{poe_id} to $new_delay seconds\n"
246             if DEBUG;
247             }
248              
249             sub _event_clear_timers {
250 2   50 2   87 my $self = $_[HEAP]{mojo_reactor} // return;
251 2         7 POE::Kernel->alarm_remove_all;
252            
253 2         160 warn "-- Cleared all POE timers\n" if DEBUG;
254             }
255              
256             sub _event_set_io {
257 23     23   1058 my $self = $_[HEAP]{mojo_reactor};
258 23         32 my $fd = $_[ARG0];
259 23 50 33     150 return unless exists $self->{io}{$fd}
260             and defined $self->{io}{$fd}{handle};
261 23         44 my $io = $self->{io}{$fd};
262 23 100       48 if ($io->{read}) {
263 18         68 POE::Kernel->select_read($io->{handle}, 'mojo_io');
264             } else {
265 5         19 POE::Kernel->select_read($io->{handle});
266             }
267 23 100       1729 if ($io->{write}) {
268 17         59 POE::Kernel->select_write($io->{handle}, 'mojo_io');
269             } else {
270 6         22 POE::Kernel->select_write($io->{handle});
271             }
272            
273 23         1019 warn "-- Set POE IO watcher for $fd " .
274             "with read: $io->{read}, write: $io->{write}\n" if DEBUG;
275             }
276              
277             sub _event_clear_io {
278 6     6   357 my $self = $_[HEAP]{mojo_reactor};
279 6         9 my $fd = $_[ARG0];
280 6 50 33     77 return unless exists $self->{io}{$fd}
281             and defined $self->{io}{$fd}{handle};
282 6         14 my $io = $self->{io}{$fd};
283 6         28 POE::Kernel->select_read($io->{handle});
284 6         494 POE::Kernel->select_write($io->{handle});
285 6         536 delete $io->{handle};
286            
287 6         21 warn "-- Cleared POE IO watcher for $fd\n" if DEBUG;
288             }
289              
290             sub _event_timer {
291 618     618   92156 my $self = $_[HEAP]{mojo_reactor};
292 618         744 my $id = $_[ARG0];
293            
294 618         885 my $timer = $self->{timers}{$id};
295 618         442 warn "-- Event fired for timer $id\n" if DEBUG;
296 618 100       1138 if (exists $timer->{recurring}) {
297 594         1232 $timer->{time} = steady_time + $timer->{recurring};
298 594         3770 $self->_init_session->_session_call(mojo_set_timer => $id);
299             } else {
300 24         76 delete $self->{timers}{$id};
301             }
302            
303 618         1337 $self->_try('Timer', $timer->{cb});
304             }
305              
306             sub _event_io {
307 7449     7449   587369 my $self = $_[HEAP]{mojo_reactor};
308 7449         12341 my ($handle, $mode) = @_[ARG0, ARG1];
309            
310 7449         17168 my $io = $self->{io}{fileno $handle};
311             #warn "-- Event fired for IO watcher ".fileno($handle)."\n" if DEBUG;
312 7449 100       12565 if ($mode == POE_IO_READ) {
    50          
313 6789         16235 $self->_try('I/O watcher', $io->{cb}, 0);
314             } elsif ($mode == POE_IO_WRITE) {
315 660         1378 $self->_try('I/O watcher', $io->{cb}, 1);
316             } else {
317 0           die "Unknown POE I/O mode $mode";
318             }
319             }
320              
321             1;
322              
323             =head1 NAME
324              
325             Mojo::Reactor::POE - POE backend for Mojo::Reactor
326              
327             =head1 SYNOPSIS
328              
329             use Mojo::Reactor::POE;
330              
331             # Watch if handle becomes readable or writable
332             my $reactor = Mojo::Reactor::POE->new;
333             $reactor->io($first => sub {
334             my ($reactor, $writable) = @_;
335             say $writable ? 'First handle is writable' : 'First handle is readable';
336             });
337              
338             # Change to watching only if handle becomes writable
339             $reactor->watch($first, 0, 1);
340              
341             # Turn file descriptor into handle and watch if it becomes readable
342             my $second = IO::Handle->new_from_fd($fd, 'r');
343             $reactor->io($second => sub {
344             my ($reactor, $writable) = @_;
345             say $writable ? 'Second handle is writable' : 'Second handle is readable';
346             })->watch($second, 1, 0);
347              
348             # Add a timer
349             $reactor->timer(15 => sub {
350             my $reactor = shift;
351             $reactor->remove($first);
352             $reactor->remove($second);
353             say 'Timeout!';
354             });
355              
356             # Start reactor if necessary
357             $reactor->start unless $reactor->is_running;
358              
359             # Or in an application using Mojo::IOLoop
360             use POE qw(Loop::IO_Poll);
361             use Mojo::Reactor::POE;
362             use Mojo::IOLoop;
363            
364             # Or in a Mojolicious application
365             $ MOJO_REACTOR=Mojo::Reactor::POE POE_EVENT_LOOP=POE::Loop::IO_Poll hypnotoad script/myapp
366              
367             =head1 DESCRIPTION
368              
369             L is an event reactor for L that uses L.
370             The usage is exactly the same as other L implementations such as
371             L. L will be used as the default
372             backend for L if it is loaded before L or any
373             module using the loop. However, when invoking a L application
374             through L or L, the reactor must be set as the default by
375             setting the C environment variable to C.
376              
377             Note that if L detects multiple potential event loops it will fail. This
378             includes L and L (loaded by L) if the
379             appropriate L modules are installed. To avoid this, load L
380             before any L module, or specify the L event loop explicitly.
381             This means that for L applications invoked through L or
382             L, the L event loop may also need to be set in the environment.
383             See L.
384              
385             =head1 EVENTS
386              
387             L inherits all events from L.
388              
389             =head1 METHODS
390              
391             L inherits all methods from L and implements
392             the following new ones.
393              
394             =head2 again
395              
396             $reactor->again($id);
397              
398             Restart timer. Note that this method requires an active timer.
399              
400             =head2 io
401              
402             $reactor = $reactor->io($handle => sub {...});
403              
404             Watch handle for I/O events, invoking the callback whenever handle becomes
405             readable or writable.
406              
407             # Callback will be invoked twice if handle becomes readable and writable
408             $reactor->io($handle => sub {
409             my ($reactor, $writable) = @_;
410             say $writable ? 'Handle is writable' : 'Handle is readable';
411             });
412              
413             =head2 is_running
414              
415             my $bool = $reactor->is_running;
416              
417             Check if reactor is running.
418              
419             =head2 new
420              
421             my $reactor = Mojo::Reactor::POE->new;
422              
423             Construct a new L object.
424              
425             =head2 next_tick
426              
427             my $undef = $reactor->next_tick(sub {...});
428              
429             Invoke callback as soon as possible, but not before returning or other
430             callbacks that have been registered with this method, always returns C.
431              
432             =head2 one_tick
433              
434             $reactor->one_tick;
435              
436             Run reactor until an event occurs or no events are being watched anymore. Note
437             that this method can recurse back into the reactor, so you need to be careful.
438              
439             # Don't block longer than 0.5 seconds
440             my $id = $reactor->timer(0.5 => sub {});
441             $reactor->one_tick;
442             $reactor->remove($id);
443              
444             =head2 recurring
445              
446             my $id = $reactor->recurring(0.25 => sub {...});
447              
448             Create a new recurring timer, invoking the callback repeatedly after a given
449             amount of time in seconds.
450              
451             =head2 remove
452              
453             my $bool = $reactor->remove($handle);
454             my $bool = $reactor->remove($id);
455              
456             Remove handle or timer.
457              
458             =head2 reset
459              
460             $reactor->reset;
461              
462             Remove all handles and timers.
463              
464             =head2 start
465              
466             $reactor->start;
467              
468             Start watching for I/O and timer events, this will block until L is
469             called or no events are being watched anymore. See L.
470              
471             # Start reactor only if it is not running already
472             $reactor->start unless $reactor->is_running;
473              
474             =head2 stop
475              
476             $reactor->stop;
477              
478             Stop watching for I/O and timer events.
479              
480             =head2 timer
481              
482             my $id = $reactor->timer(0.5 => sub {...});
483              
484             Create a new timer, invoking the callback after a given amount of time in
485             seconds.
486              
487             =head2 watch
488              
489             $reactor = $reactor->watch($handle, $readable, $writable);
490              
491             Change I/O events to watch handle for with true and false values. Note that
492             this method requires an active I/O watcher.
493              
494             # Watch only for readable events
495             $reactor->watch($handle, 1, 0);
496              
497             # Watch only for writable events
498             $reactor->watch($handle, 0, 1);
499              
500             # Watch for readable and writable events
501             $reactor->watch($handle, 1, 1);
502              
503             # Pause watching for events
504             $reactor->watch($handle, 0, 0);
505              
506             =head1 CAVEATS
507              
508             When using L with L, the event loop must be controlled by
509             L or L, such as with the methods L,
510             L, and L. Starting or stopping the event loop through
511             L will not provide required functionality to L applications.
512              
513             Externally-added sessions will not keep the L running if
514             L has nothing left to watch. This can be worked around by
515             adding a recurring timer for the reactor to watch.
516              
517             =head1 BUGS
518              
519             L has a complex session system which may lead to bugs when used in this
520             manner. Report any issues on the public bugtracker.
521              
522             =head1 AUTHOR
523              
524             Dan Book, C
525              
526             =head1 COPYRIGHT AND LICENSE
527              
528             Copyright 2015, Dan Book.
529              
530             This library is free software; you may redistribute it and/or modify it under
531             the terms of the Artistic License version 2.0.
532              
533             =head1 SEE ALSO
534              
535             L, L, L