File Coverage

blib/lib/Mojo/Reactor/Epoll.pm
Criterion Covered Total %
statement 115 116 99.1
branch 41 48 85.4
condition 25 44 56.8
subroutine 20 20 100.0
pod 7 7 100.0
total 208 235 88.5


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Epoll;
2 1     1   274420 use Mojo::Base 'Mojo::Reactor::Poll';
  1         11  
  1         8  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::Epoll';
5              
6 1     1   40698 use Carp 'croak';
  1         3  
  1         43  
7 1     1   496 use Linux::Epoll;
  1         536  
  1         49  
8 1     1   8 use List::Util 'min';
  1         2  
  1         52  
9 1     1   6 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         43  
10 1     1   6 use Scalar::Util 'weaken';
  1         2  
  1         37  
11 1     1   5 use Time::HiRes 'usleep';
  1         4  
  1         5  
12              
13 1   50 1   100 use constant DEBUG => $ENV{MOJO_REACTOR_EPOLL_DEBUG} || 0;
  1         3  
  1         1723  
14              
15             our $VERSION = '0.010';
16              
17             sub io {
18 10     10 1 6520 my ($self, $handle, $cb) = @_;
19 10   33     52 my $fd = fileno($handle) // croak 'Handle is closed';
20 10         38 $self->{io}{$fd}{cb} = $cb;
21 10         18 warn "-- Set IO watcher for $fd\n" if DEBUG;
22 10         33 return $self->watch($handle, 1, 1);
23             }
24              
25             sub one_tick {
26 15345     15345 1 37490 my $self = shift;
27            
28             # Just one tick
29 15345 100       27923 local $self->{running} = 1 unless $self->{running};
30            
31             # Wait for one event
32 15345         18758 my $i;
33 15345   66     43849 until ($i || !$self->{running}) {
34             # Stop automatically if there is nothing to watch
35 15345 100 100     18975 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  15345         32740  
  7         36  
36            
37             # Calculate ideal timeout based on timers
38 15339         21103 my $min = min map { $_->{time} } values %{$self->{timers}};
  17668         42877  
  15339         27367  
39 15339 100       38307 my $timeout = defined $min ? ($min - steady_time) : 0.5;
40 15339 100       78858 $timeout = 0 if $timeout < 0;
41            
42             # I/O
43 15339 100       20497 if (my $watched = keys %{$self->{io}}) {
  15339 100       30926  
44             # Set max events to half the number of descriptors, to a minimum of 10
45 13641         24386 my $maxevents = int $watched/2;
46 13641 50       23710 $maxevents = 10 if $maxevents < 10;
47            
48 13641   66     26506 my $epoll = $self->{epoll} // $self->_create_epoll;
49              
50 13641         189071 my $count = $epoll->wait($maxevents, $timeout);
51 13641 50       37529 $i += $count if defined $count;
52             }
53            
54             # Wait for timeout if epoll can't be used
55 7         175326 elsif ($timeout) { usleep $timeout * 1000000 }
56            
57             # Timers (time should not change in between timers)
58 15339         33689 my $now = steady_time;
59 15339         81108 for my $id (keys %{$self->{timers}}) {
  15339         37669  
60 17669 50       36004 next unless my $t = $self->{timers}{$id};
61 17669 100       64980 next unless $t->{time} <= $now;
62            
63             # Recurring timer
64 2354 100       3824 if ($t->{recurring}) { $t->{time} = $now + $t->{after} }
  2325         3700  
65            
66             # Normal timer
67 29         94 else { $self->remove($id) }
68            
69 2354 50 50     6199 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
70             }
71             }
72             }
73              
74 5     5 1 647 sub recurring { shift->_timer(1, @_) }
75              
76             sub remove {
77 40     40 1 3857 my ($self, $remove) = @_;
78 40 100       105 if (ref $remove) {
79 5   33     29 my $fd = fileno($remove) // croak 'Handle is closed';
80 5 50 66     38 if (exists $self->{io}{$fd} and exists $self->{io}{$fd}{epoll_cb}) {
81 3         76 $self->{epoll}->delete($remove);
82             }
83 5         11 warn "-- Removed IO watcher for $fd\n" if DEBUG;
84 5         89 return !!delete $self->{io}{$fd};
85             } else {
86 35         55 warn "-- Removed timer $remove\n" if DEBUG;
87 35         180 return !!delete $self->{timers}{$remove};
88             }
89             }
90              
91             sub reset {
92 4     4 1 1677 my $self = shift;
93 4         15 delete @{$self}{qw(epoll pending_watch)};
  4         60  
94 0         26 $self->SUPER::reset;
95             }
96              
97 35     35 1 276912 sub timer { shift->_timer(0, @_) }
98              
99             sub watch {
100 34     34 1 464 my ($self, $handle, $read, $write) = @_;
101            
102 34         73 my $fd = fileno $handle;
103 34 100       217 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
104              
105 33         52 my $epoll = $self->{epoll};
106 33 100       68 unless (defined $epoll) {
107 9         12 push @{$self->{pending_watch}}, [$handle, $read, $write];
  9         25  
108 9         29 return $self;
109             }
110            
111 24         34 my @events;
112 24 100       70 push @events, 'in', 'prio' if $read;
113 24 100       50 push @events, 'out' if $write;
114            
115 24 100       56 my $op = exists $io->{epoll_cb} ? 'modify' : 'add';
116            
117 24         72 weaken $self;
118             my $cb = $io->{epoll_cb} // sub {
119 13642     13642   39852 my ($events) = @_;
120 13642 50 66     30016 if ($events->{in} or $events->{prio} or $events->{hup} or $events->{err}) {
      33        
      33        
121 13362 50       32996 return unless exists $self->{io}{$fd};
122 13362         30730 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0);
123             }
124 13642 100 66     58622 if ($events->{out} or $events->{hup} or $events->{err}) {
      33        
125 1681 100       3232 return unless exists $self->{io}{$fd};
126 1680         3174 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1);
127             }
128 24   100     106 };
129 24         402 $epoll->$op($handle, \@events, $cb);
130            
131             # Cache callback for future modify operations, after successfully added to epoll
132 24   66     112 $io->{epoll_cb} //= $cb;
133            
134 24         94 return $self;
135             }
136              
137             sub _create_epoll {
138 3     3   8 my $self = shift;
139 3         162 $self->{epoll} = Linux::Epoll->new;
140 3   50     10 $self->watch(@$_) for @{delete $self->{pending_watch} // []};
  3         22  
141 3         22 return $self->{epoll};
142             }
143              
144             sub _id {
145 40     40   62 my $self = shift;
146 40         59 my $id;
147 40         63 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  40         111  
148 40         896 return $id;
149             }
150              
151             sub _timer {
152 40     40   105 my ($self, $recurring, $after, $cb) = @_;
153            
154 40         100 my $id = $self->_id;
155 40         113 my $timer = $self->{timers}{$id} = {
156             cb => $cb,
157             after => $after,
158             recurring => $recurring,
159             time => steady_time + $after,
160             };
161            
162 40         337 if (DEBUG) {
163             my $is_recurring = $recurring ? ' (recurring)' : '';
164             warn "-- Set timer $id after $after seconds$is_recurring\n";
165             }
166            
167 40         124 return $id;
168             }
169              
170             sub _try {
171 17396     17396   32445 my ($self, $what, $cb) = (shift, shift, shift);
172 17396 100       27851 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  17396         45051  
  17395         69201  
173             }
174              
175             1;
176              
177             =head1 NAME
178              
179             Mojo::Reactor::Epoll - epoll backend for Mojo::Reactor
180              
181             =head1 SYNOPSIS
182              
183             use Mojo::Reactor::Epoll;
184              
185             # Watch if handle becomes readable or writable
186             my $reactor = Mojo::Reactor::Epoll->new;
187             $reactor->io($first => sub {
188             my ($reactor, $writable) = @_;
189             say $writable ? 'First handle is writable' : 'First handle is readable';
190             });
191              
192             # Change to watching only if handle becomes writable
193             $reactor->watch($first, 0, 1);
194              
195             # Turn file descriptor into handle and watch if it becomes readable
196             my $second = IO::Handle->new_from_fd($fd, 'r');
197             $reactor->io($second => sub {
198             my ($reactor, $writable) = @_;
199             say $writable ? 'Second handle is writable' : 'Second handle is readable';
200             })->watch($second, 1, 0);
201              
202             # Add a timer
203             $reactor->timer(15 => sub {
204             my $reactor = shift;
205             $reactor->remove($first);
206             $reactor->remove($second);
207             say 'Timeout!';
208             });
209              
210             # Start reactor if necessary
211             $reactor->start unless $reactor->is_running;
212              
213             # Or in an application using Mojo::IOLoop
214             use Mojo::Reactor::Epoll;
215             use Mojo::IOLoop;
216            
217             # Or in a Mojolicious application
218             $ MOJO_REACTOR=Mojo::Reactor::Epoll hypnotoad script/myapp
219              
220             =head1 DESCRIPTION
221              
222             L is an event reactor for L that uses the
223             L Linux subsystem. The usage is exactly the same as other
224             L implementations such as L.
225             L will be used as the default backend for L
226             if it is loaded before L or any module using the loop. However,
227             when invoking a L application through L or L,
228             the reactor must be set as the default by setting the C
229             environment variable to C.
230              
231             =head1 EVENTS
232              
233             L inherits all events from L.
234              
235             =head1 METHODS
236              
237             L inherits all methods from L and
238             implements the following new ones.
239              
240             =head2 io
241              
242             $reactor = $reactor->io($handle => sub {...});
243              
244             Watch handle for I/O events, invoking the callback whenever handle becomes
245             readable or writable.
246              
247             # Callback will be invoked twice if handle becomes readable and writable
248             $reactor->io($handle => sub {
249             my ($reactor, $writable) = @_;
250             say $writable ? 'Handle is writable' : 'Handle is readable';
251             });
252              
253             =head2 one_tick
254              
255             $reactor->one_tick;
256              
257             Run reactor until an event occurs or no events are being watched anymore. Note
258             that this method can recurse back into the reactor, so you need to be careful.
259              
260             # Don't block longer than 0.5 seconds
261             my $id = $reactor->timer(0.5 => sub {});
262             $reactor->one_tick;
263             $reactor->remove($id);
264              
265             =head2 recurring
266              
267             my $id = $reactor->recurring(0.25 => sub {...});
268              
269             Create a new recurring timer, invoking the callback repeatedly after a given
270             amount of time in seconds.
271              
272             =head2 remove
273              
274             my $bool = $reactor->remove($handle);
275             my $bool = $reactor->remove($id);
276              
277             Remove handle or timer.
278              
279             =head2 reset
280              
281             $reactor->reset;
282              
283             Remove all handles and timers.
284              
285             =head2 timer
286              
287             my $id = $reactor->timer(0.5 => sub {...});
288              
289             Create a new timer, invoking the callback after a given amount of time in
290             seconds.
291              
292             =head2 watch
293              
294             $reactor = $reactor->watch($handle, $readable, $writable);
295              
296             Change I/O events to watch handle for with true and false values. Note that
297             this method requires an active I/O watcher.
298              
299             # Watch only for readable events
300             $reactor->watch($handle, 1, 0);
301              
302             # Watch only for writable events
303             $reactor->watch($handle, 0, 1);
304              
305             # Watch for readable and writable events
306             $reactor->watch($handle, 1, 1);
307              
308             # Pause watching for events
309             $reactor->watch($handle, 0, 0);
310              
311             =head1 CAVEATS
312              
313             The epoll notification facility is exclusive to Linux systems.
314              
315             The epoll handle is not usable across forks, and this is not currently managed
316             for you, though it is not created until the loop is started to allow for
317             preforking deployments such as L.
318              
319             =head1 BUGS
320              
321             Report any issues on the public bugtracker.
322              
323             =head1 AUTHOR
324              
325             Dan Book, C
326              
327             =head1 COPYRIGHT AND LICENSE
328              
329             Copyright 2015, Dan Book.
330              
331             This library is free software; you may redistribute it and/or modify it under
332             the terms of the Artistic License version 2.0.
333              
334             =head1 SEE ALSO
335              
336             L, L, L