File Coverage

blib/lib/Mojo/Reactor/Poll.pm
Criterion Covered Total %
statement 100 100 100.0
branch 42 46 91.3
condition 14 23 60.8
subroutine 22 22 100.0
pod 12 12 100.0
total 190 203 93.6


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Poll;
2 67     67   281067 use Mojo::Base 'Mojo::Reactor';
  67         149  
  67         535  
3              
4 67     67   1349 use Carp qw(croak);
  67         474  
  67         5028  
5 67     67   532 use IO::Poll qw(POLLERR POLLHUP POLLIN POLLNVAL POLLOUT POLLPRI);
  67         813  
  67         4855  
6 67     67   501 use List::Util qw(min);
  67         136  
  67         4560  
7 67     67   352 use Mojo::Util qw(md5_sum steady_time);
  67         144  
  67         3376  
8 67     67   385 use Time::HiRes qw(usleep);
  67         127  
  67         681  
9              
10             sub again {
11 12272     12272 1 28605 my ($self, $id, $after) = @_;
12 12272 100       38455 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
13 12271 100       27181 $timer->{after} = $after if defined $after;
14 12271         38481 $timer->{time} = steady_time + $timer->{after};
15             }
16              
17             sub io {
18 1141     1141 1 4931 my ($self, $handle, $cb) = @_;
19 1141   33     8645 $self->{io}{fileno($handle) // croak 'Handle is closed'} = {cb => $cb};
20 1141         3527 return $self->watch($handle, 1, 1);
21             }
22              
23 3532     3532 1 21624 sub is_running { !!shift->{running} }
24              
25             sub next_tick {
26 2142     2142 1 9065 my ($self, $cb) = @_;
27 2142         3611 push @{$self->{next_tick}}, $cb;
  2142         6085  
28 2142   66     11302 $self->{next_timer} //= $self->timer(0 => \&_next);
29 2142         10274 return undef;
30             }
31              
32             sub one_tick {
33 47183     47183 1 73968 my $self = shift;
34              
35             # Just one tick
36 47183 100       99425 local $self->{running} = 1 unless $self->{running};
37              
38             # Wait for one event
39 47183         63973 my $i;
40 47183   66     158499 until ($i || !$self->{running}) {
41              
42             # Stop automatically if there is nothing to watch
43 47183 100 100     67427 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  47183         105356  
  11         58  
44              
45             # Calculate ideal timeout based on timers and round up to next millisecond
46 47173         68631 my $min = min map { $_->{time} } values %{$self->{timers}};
  89782         206961  
  47173         103191  
47 47173 100       130729 my $timeout = defined $min ? $min - steady_time : 0.5;
48 47173 100       210275 $timeout = $timeout <= 0 ? 0 : int($timeout * 1000) + 1;
49              
50             # I/O
51 47173 100       67142 if (keys %{$self->{io}}) {
  47173 100       110425  
52 27465         35648 my @poll = map { $_ => $self->{io}{$_}{mode} } keys %{$self->{io}};
  64903         148166  
  27465         58684  
53              
54             # This may break in the future, but is worth it for performance
55 27465 100       7735750 if (IO::Poll::_poll($timeout, @poll) > 0) {
56 26310         79055 while (my ($fd, $mode) = splice @poll, 0, 2) {
57              
58 59720 100       117766 if ($mode & (POLLIN | POLLPRI | POLLNVAL | POLLHUP | POLLERR)) {
59 21295 50       46751 next unless my $io = $self->{io}{$fd};
60 21295 50       56019 ++$i and $self->_try('I/O watcher', $io->{cb}, 0);
61             }
62 59720 100 100     223928 next unless $mode & POLLOUT && (my $io = $self->{io}{$fd});
63 8888 50       34734 ++$i and $self->_try('I/O watcher', $io->{cb}, 1);
64             }
65             }
66             }
67              
68             # Wait for timeout if poll can't be used
69 17         852247 elsif ($timeout) { usleep($timeout * 1000) }
70              
71             # Timers (time should not change in between timers)
72 47173         114660 my $now = steady_time;
73 47173         163986 for my $id (keys %{$self->{timers}}) {
  47173         121358  
74 89841 100       208424 next unless my $t = $self->{timers}{$id};
75 89837 100       307182 next unless $t->{time} <= $now;
76              
77             # Recurring timer
78 21751 100       45407 if ($t->{recurring}) { $t->{time} = $now + $t->{after} }
  20309         40305  
79              
80             # Normal timer
81 1442         5028 else { $self->remove($id) }
82              
83 21751 50 33     87027 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
84             }
85             }
86             }
87              
88 14     14 1 1198 sub recurring { shift->_timer(1, @_) }
89              
90             sub remove {
91 3733     3733 1 13178 my ($self, $remove) = @_;
92 3733 100       16785 return !!delete $self->{timers}{$remove} unless ref $remove;
93 1042   33     22111 return !!delete $self->{io}{fileno($remove) // croak 'Handle is closed'};
94             }
95              
96 7     7 1 1008 sub reset { delete @{shift()}{qw(events io next_tick next_timer running timers)} }
  7         93  
97              
98             sub start {
99 1097     1097 1 2432 my $self = shift;
100 1097   50     6246 local $self->{running} = ($self->{running} || 0) + 1;
101 1097         4280 $self->one_tick while $self->{running};
102             }
103              
104 1117     1117 1 4771 sub stop { delete shift->{running} }
105              
106 2180     2180 1 25830 sub timer { shift->_timer(0, @_) }
107              
108             sub watch {
109 11298     11298 1 28599 my ($self, $handle, $read, $write) = @_;
110              
111 11298 100       51584 croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
112 11297         23317 $io->{mode} = 0;
113 11297 100       31550 $io->{mode} |= POLLIN | POLLPRI if $read;
114 11297 100       25764 $io->{mode} |= POLLOUT if $write;
115              
116 11297         28153 return $self;
117             }
118              
119             sub _id {
120 2194     2194   3647 my $self = shift;
121 2194         3838 my $id;
122 2194         3658 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  2194         8442  
123 2194         47802 return $id;
124             }
125              
126             sub _next {
127 1381     1381   2462 my $self = shift;
128 1381         3631 delete $self->{next_timer};
129 1381         2650 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  2141         6593  
  3522         11486  
130             }
131              
132             sub _timer {
133 2194     2194   5862 my ($self, $recurring, $after, $cb) = @_;
134 2194         6446 my $id = $self->_id;
135 2194         6611 $self->{timers}{$id} = {cb => $cb, after => $after, recurring => $recurring, time => steady_time + $after};
136 2194         25553 return $id;
137             }
138              
139             sub _try {
140 51934     51934   106135 my ($self, $what, $cb) = (shift, shift, shift);
141 51934 100       84731 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  51934         139600  
  51931         323721  
142             }
143              
144             1;
145              
146             =encoding utf8
147              
148             =head1 NAME
149              
150             Mojo::Reactor::Poll - Low-level event reactor with poll support
151              
152             =head1 SYNOPSIS
153              
154             use Mojo::Reactor::Poll;
155              
156             # Watch if handle becomes readable or writable
157             my $reactor = Mojo::Reactor::Poll->new;
158             $reactor->io($first => sub ($reactor, $writable) {
159             say $writable ? 'First handle is writable' : 'First handle is readable';
160             });
161              
162             # Change to watching only if handle becomes writable
163             $reactor->watch($first, 0, 1);
164              
165             # Turn file descriptor into handle and watch if it becomes readable
166             my $second = IO::Handle->new_from_fd($fd, 'r');
167             $reactor->io($second => sub ($reactor, $writable) {
168             say $writable ? 'Second handle is writable' : 'Second handle is readable';
169             })->watch($second, 1, 0);
170              
171             # Add a timer
172             $reactor->timer(15 => sub ($reactor) {
173             $reactor->remove($first);
174             $reactor->remove($second);
175             say 'Timeout!';
176             });
177              
178             # Start reactor if necessary
179             $reactor->start unless $reactor->is_running;
180              
181             =head1 DESCRIPTION
182              
183             L is a low-level event reactor based on L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L.
188              
189             =head1 METHODS
190              
191             L inherits all methods from L and implements the following new ones.
192              
193             =head2 again
194              
195             $reactor->again($id);
196             $reactor->again($id, 0.5);
197              
198             Restart timer and optionally change the invocation time. Note that this method requires an active timer.
199              
200             =head2 io
201              
202             $reactor = $reactor->io($handle => sub {...});
203              
204             Watch handle for I/O events, invoking the callback whenever handle becomes readable or writable.
205              
206             # Callback will be executed twice if handle becomes readable and writable
207             $reactor->io($handle => sub ($reactor, $writable) {
208             say $writable ? 'Handle is writable' : 'Handle is readable';
209             });
210              
211             =head2 is_running
212              
213             my $bool = $reactor->is_running;
214              
215             Check if reactor is running.
216              
217             =head2 next_tick
218              
219             my $undef = $reactor->next_tick(sub {...});
220              
221             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
222             method, always returns C.
223              
224             =head2 one_tick
225              
226             $reactor->one_tick;
227              
228             Run reactor until an event occurs or no events are being watched anymore.
229              
230             # Don't block longer than 0.5 seconds
231             my $id = $reactor->timer(0.5 => sub {});
232             $reactor->one_tick;
233             $reactor->remove($id);
234              
235             =head2 recurring
236              
237             my $id = $reactor->recurring(0.25 => sub {...});
238              
239             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
240              
241             =head2 remove
242              
243             my $bool = $reactor->remove($handle);
244             my $bool = $reactor->remove($id);
245              
246             Remove handle or timer.
247              
248             =head2 reset
249              
250             $reactor->reset;
251              
252             Remove all handles and timers.
253              
254             =head2 start
255              
256             $reactor->start;
257              
258             Start watching for I/O and timer events, this will block until L is called or no events are being watched
259             anymore.
260              
261             # Start reactor only if it is not running already
262             $reactor->start unless $reactor->is_running;
263              
264             =head2 stop
265              
266             $reactor->stop;
267              
268             Stop watching for I/O and timer events.
269              
270             =head2 timer
271              
272             my $id = $reactor->timer(0.5 => sub {...});
273              
274             Create a new timer, invoking the callback after a given amount of time in seconds.
275              
276             =head2 watch
277              
278             $reactor = $reactor->watch($handle, $readable, $writable);
279              
280             Change I/O events to watch handle for with true and false values. Note that this method requires an active I/O watcher.
281              
282             # Watch only for readable events
283             $reactor->watch($handle, 1, 0);
284              
285             # Watch only for writable events
286             $reactor->watch($handle, 0, 1);
287              
288             # Watch for readable and writable events
289             $reactor->watch($handle, 1, 1);
290              
291             # Pause watching for events
292             $reactor->watch($handle, 0, 0);
293              
294             =head1 SEE ALSO
295              
296             L, L, L.
297              
298             =cut