File Coverage

blib/lib/IO/Async/Loop/Epoll.pm
Criterion Covered Total %
statement 182 191 95.2
branch 75 106 70.7
condition 27 29 93.1
subroutine 23 24 95.8
pod 7 8 87.5
total 314 358 87.7


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Loop::Epoll 0.22;
7              
8 13     13   358538 use v5.14;
  13         87  
9 13     13   75 use warnings;
  13         25  
  13         416  
10              
11 13     13   65 use constant API_VERSION => '0.76';
  13         26  
  13         987  
12              
13             # Only Linux is known always to be able to report EOF conditions on
14             # filehandles using EPOLLHUP
15             # This is probably redundant as epoll is probably Linux-only also, but it
16             # doesn't harm anything to test specially.
17 13     13   79 use constant _CAN_ON_HANGUP => ( $^O eq "linux" );
  13         18  
  13         630  
18              
19 13     13   70 use base qw( IO::Async::Loop );
  13         26  
  13         11302  
20              
21 13     13   244924 use Carp;
  13         38  
  13         859  
22              
23 13     13   7638 use Linux::Epoll 0.005;
  13         53852  
  13         794  
24              
25 13     13   103 use POSIX qw( EINTR EPERM SIG_BLOCK SIG_UNBLOCK sigprocmask sigaction ceil );
  13         36  
  13         118  
26 13     13   1545 use Scalar::Util qw( refaddr );
  13         27  
  13         575  
27              
28 13     13   83 use constant _CAN_WATCHDOG => 1;
  13         22  
  13         816  
29 13     13   87 use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
  13         34  
  13         687  
30              
31 13     13   6915 use Struct::Dumb;
  13         26895  
  13         116  
32              
33             struct SignalWatch => [qw( code pending orig )];
34              
35             =head1 NAME
36              
37             C - use C with C on Linux
38              
39             =head1 SYNOPSIS
40              
41             use IO::Async::Loop::Epoll;
42              
43             use IO::Async::Stream;
44             use IO::Async::Signal;
45              
46             my $loop = IO::Async::Loop::Epoll->new;
47              
48             $loop->add( IO::Async::Stream->new(
49             read_handle => \*STDIN,
50             on_read => sub {
51             my ( $self, $buffref ) = @_;
52             while( $$buffref =~ s/^(.*)\r?\n// ) {
53             print "You said: $1\n";
54             }
55             },
56             ) );
57              
58             $loop->add( IO::Async::Signal->new(
59             name => 'INT',
60             on_receipt => sub {
61             print "SIGINT, will now quit\n";
62             $loop->stop;
63             },
64             ) );
65              
66             $loop->run;
67              
68             =head1 DESCRIPTION
69              
70             This subclass of L uses C on Linux to perform
71             read-ready and write-ready tests so that the OZ<>(1) high-performance
72             multiplexing of Linux's C syscall can be used.
73              
74             The C Linux subsystem uses a persistent registration system, meaning
75             that better performance can be achieved in programs using a large number of
76             filehandles. Each C syscall only has an overhead proportional
77             to the number of ready filehandles, rather than the total number being
78             watched. For more detail, see the C manpage.
79              
80             This class uses the C system call, which atomically switches
81             the process's signal mask, performs a wait exactly as C would,
82             then switches it back. This allows a process to block the signals it cares
83             about, but switch in an empty signal mask during the poll, allowing it to
84             handle file IO and signals concurrently.
85              
86             =cut
87              
88             =head1 CONSTRUCTOR
89              
90             =cut
91              
92             =head2 new
93              
94             $loop = IO::Async::Loop::Epoll->new()
95              
96             This function returns a new instance of a C object.
97              
98             =cut
99              
100             sub new
101             {
102 12     12 1 473 my $class = shift;
103 12         33 my ( %args ) = @_;
104              
105 12         497 my $epoll = Linux::Epoll->new;
106 12 50       76 defined $epoll or croak "Cannot create epoll handle - $!";
107              
108 12         125 my $self = $class->SUPER::__new( %args );
109              
110 12         7724 $self->{epoll} = $epoll;
111 12         132 $self->{sigmask} = POSIX::SigSet->new();
112 12         25 $self->{maxevents} = 8;
113              
114 12         28 $self->{fakeevents} = {};
115              
116 12         35 $self->{signals} = {}; # {$name} => SignalWatch
117 12         41 $self->{masks} = {};
118              
119 12         50 $self->{pid} = $$;
120              
121             # epoll gets very upset if applications close() filehandles without telling
122             # it, and then try to add that mask a second time. We can attempt to detect
123             # this by storing the mapping from fileno to refaddr($fh)
124 12         29 $self->{refaddr_for_fileno} = {};
125              
126 12         47 return $self;
127             }
128              
129             # Some bits to keep track of in {masks}
130             use constant {
131 13         25891 WATCH_READ => 0x01,
132             WATCH_WRITE => 0x02,
133             WATCH_HUP => 0x04,
134 13     13   2829 };
  13         225  
135              
136             =head1 METHODS
137              
138             As this is a subclass of L, all of its methods are inherited.
139             Expect where noted below, all of the class's methods behave identically to
140             C.
141              
142             =cut
143              
144             sub DESTROY
145             {
146 8     8   24091 my $self = shift;
147              
148 8         97 foreach my $signal ( keys %{ $self->{signals} } ) {
  8         598  
149 5         4312 $self->unwatch_signal( $signal );
150             }
151             }
152              
153             sub is_running
154             {
155 0     0 0 0 my $self = shift;
156 0         0 return $self->{running};
157             }
158              
159             =head2 loop_once
160              
161             $count = $loop->loop_once( $timeout )
162              
163             This method calls C, and processes the results of that call.
164             It returns the total number of C callbacks invoked, or
165             C if the underlying C method returned an error. If the
166             C was interrupted by a signal, then 0 is returned instead.
167              
168             =cut
169              
170             sub loop_once
171             {
172 36     36 1 3072183 my $self = shift;
173 36         131 my ( $timeout ) = @_;
174              
175 36 50       236 $self->post_fork if $self->{pid} != $$;
176              
177 36         518 $self->_adjust_timeout( \$timeout );
178              
179 36 100       1056 $timeout = 0 if keys %{ $self->{fakeevents} };
  36         184  
180              
181             # Round up to next milisecond to avoid zero timeouts
182 36 50       531 my $msec = defined $timeout ? ceil( $timeout * 1000 ) : -1;
183              
184 36         404 $self->pre_wait;
185 36         2005185 my $ret = $self->{epoll}->wait( $self->{maxevents}, $msec / 1000, $self->{sigmask} );
186 36         535 $self->post_wait;
187              
188 36 50 66     1254 return undef if !defined $ret and $! != EINTR;
189              
190 36   100     199 my $count = $ret || 0;
191              
192 36         67 if( WATCHDOG_ENABLE and !$self->{alarmed} ) {
193             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
194             $self->{alarmed}++;
195             }
196              
197 36         89 my $iowatches = $self->{iowatches};
198              
199 36         87 my $fakeevents = $self->{fakeevents};
200 36         186 my @fakeevents = map { [ $_ => $fakeevents->{$_} ] } keys %$fakeevents;
  1         6  
201              
202 36         225 foreach my $ev ( @fakeevents ) {
203 1         3 my ( $fd, $bits ) = @$ev;
204              
205 1         4 my $watch = $iowatches->{$fd};
206              
207 1 50       5 if( $bits & WATCH_READ ) {
208 1 50       7 $watch->[1]->() if $watch->[1];
209 1         4 $count++;
210             }
211              
212 1 50       5 if( $bits & WATCH_WRITE ) {
213 1 50       9 $watch->[2]->() if $watch->[2];
214 1         5 $count++;
215             }
216              
217 1 50       5 if( $bits & WATCH_HUP ) {
218 0 0       0 $watch->[3]->() if $watch->[3];
219 0         0 $count++;
220             }
221             }
222              
223 36         131 my $signals = $self->{signals};
224 36         119 foreach my $sigslot ( values %$signals ) {
225 17 100       112 if( $sigslot->pending ) {
226 16         168 $sigslot->pending = 0;
227 16         165 $sigslot->code->();
228 16         1070 $count++;
229             }
230             }
231              
232 36         263 $count += $self->_manage_queues;
233              
234             # If we entirely filled the event buffer this time, we may have missed some
235             # Lets get a bigger buffer next time
236 36 50 66     1403 $self->{maxevents} *= 2 if defined $ret and $ret == $self->{maxevents};
237              
238 36         69 alarm( 0 ), undef $self->{alarmed} if WATCHDOG_ENABLE;
239              
240 36         152 return $count;
241             }
242              
243             sub watch_io
244             {
245 14     14 1 34372 my $self = shift;
246 14         212 my %params = @_;
247              
248 14 100       343 $self->post_fork if $self->{pid} != $$;
249              
250 14         74 my $epoll = $self->{epoll};
251              
252 14         432 $self->__watch_io( %params );
253              
254 14         1305 my $handle = $params{handle};
255 14         97 my $fd = $handle->fileno;
256              
257 14         144 my $watch = $self->{iowatches}->{$fd};
258              
259 14         44 my $alarmed = \$self->{alarmed};
260              
261 14   100     217 my $curmask = $self->{masks}->{$fd} || 0;
262             my $cb = $self->{callbacks}->{$fd} ||= sub {
263 14     14   59 my ( $events ) = @_;
264              
265 14         22 if( WATCHDOG_ENABLE and !$$alarmed ) {
266             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
267             $$alarmed = 1;
268             }
269              
270 14 100 100     80 if( $events->{in} or $events->{hup} or $events->{err} ) {
      100        
271 11 100       85 $watch->[1]->() if $watch->[1];
272             }
273              
274 14 100 100     223 if( $events->{out} or $events->{hup} or $events->{err} ) {
      100        
275 9 100       27 $watch->[2]->() if $watch->[2];
276             }
277              
278 14 100 100     95 if( $events->{hup} or $events->{err} ) {
279 6 100       54 $watch->[3]->() if $watch->[3];
280             }
281 14   100     208 };
282              
283 14         40 my $mask = $curmask;
284 14 100       53 $params{on_read_ready} and $mask |= WATCH_READ;
285 14 100       108 $params{on_write_ready} and $mask |= WATCH_WRITE;
286 14 100       126 $params{on_hangup} and $mask |= WATCH_HUP;
287              
288 14         34 my @bits;
289 14 100       45 push @bits, 'in' if $mask & WATCH_READ;
290 14 100       97 push @bits, 'out' if $mask & WATCH_WRITE;
291 14 100       40 push @bits, 'hup' if $mask & WATCH_HUP;
292              
293 14         37 my $fakeevents = $self->{fakeevents};
294              
295 14 100       43 if( !$curmask ) {
    50          
296 13 50       48 defined $self->{refaddr_for_fileno}->{$fd} and
297             croak "Epoll has already seen this filehandle; cannot add it a second time";
298 13         126 $self->{refaddr_for_fileno}->{$fd} = refaddr $handle;
299              
300 13 100       312 if( defined $epoll->add( $handle, \@bits, $cb ) ) {
    50          
301             # All OK
302             }
303             elsif( $! == EPERM ) {
304             # The filehandle isn't epoll'able. This means kernel thinks it should
305             # always be ready.
306 1         4 $fakeevents->{$fd} = $mask;
307             }
308             else {
309 0         0 croak "Cannot EPOLL_CTL_ADD($fd,$mask) - $!";
310             }
311              
312 13         111 $self->{masks}->{$fd} = $mask;
313             }
314             elsif( $mask != $curmask ) {
315 1 50       7 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
316             croak "Epoll cannot cope with fd $fd changing handle under it";
317              
318 1 50       4 if( exists $fakeevents->{$fd} ) {
319 0         0 $fakeevents->{$fd} = $mask;
320             }
321             else {
322 1 50       18 defined $epoll->modify( $handle, \@bits, $cb )
323             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
324             }
325              
326 1         7 $self->{masks}->{$fd} = $mask;
327             }
328             }
329              
330             sub unwatch_io
331             {
332 11     11 1 5686 my $self = shift;
333 11         40 my %params = @_;
334              
335 11 50       43 $self->post_fork if $self->{pid} != $$;
336              
337 11         60 $self->__unwatch_io( %params );
338              
339 11         392 my $epoll = $self->{epoll};
340              
341 11         23 my $handle = $params{handle};
342 11         25 my $fd = $handle->fileno;
343              
344 11 50       71 my $curmask = $self->{masks}->{$fd} or return;
345 11 50       31 my $cb = $self->{callbacks}->{$fd} or return;
346              
347 11         18 my $mask = $curmask;
348 11 100       31 $params{on_read_ready} and $mask &= ~WATCH_READ;
349 11 100       29 $params{on_write_ready} and $mask &= ~WATCH_WRITE;
350 11 100       25 $params{on_hangup} and $mask &= ~WATCH_HUP;
351              
352 11         22 my $fakeevents = $self->{fakeevents};
353              
354 11 50       42 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
355             croak "Epoll cannot cope with fd $fd changing handle under it";
356              
357 11 100       28 if( $mask ) {
358 1 50       5 if( exists $fakeevents->{$fd} ) {
359 0         0 $fakeevents->{$fd} = $mask;
360             }
361             else {
362 1         3 my @bits;
363 1 50       5 push @bits, 'in' if $mask & WATCH_READ;
364 1 50       5 push @bits, 'out' if $mask & WATCH_WRITE;
365 1 50       4 push @bits, 'hup' if $mask & WATCH_HUP;
366              
367 1 50       21 defined $epoll->modify( $handle, \@bits, $cb )
368             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
369             }
370              
371 1         7 $self->{masks}->{$fd} = $mask;
372             }
373             else {
374 10 100       23 if( exists $fakeevents->{$fd} ) {
375 1         2 delete $fakeevents->{$fd};
376             }
377             else {
378 9 50       136 defined $epoll->delete( $handle )
379             or croak "Cannot EPOLL_CTL_DEL($fd) - $!";
380             }
381              
382 10         32 delete $self->{masks}->{$fd};
383 10         20 delete $self->{callbacks}->{$fd};
384              
385 10         241 delete $self->{refaddr_for_fileno}->{$fd};
386             }
387             }
388              
389             sub watch_signal
390             {
391 8     8 1 15440 my $self = shift;
392 8         79 my ( $signal, $code ) = @_;
393              
394 8 100       334 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
395              
396             # We cannot simply set $SIG{$signal} = $code here, because of perl bug
397             # http://rt.perl.org/rt3/Ticket/Display.html?id=82040
398             # Instead, we'll store a tiny piece of code that just sets a flag, and
399             # check the flags on return from the epoll_pwait call.
400              
401 7         152 $self->{signals}{$signal} = SignalWatch( $code, 0, $SIG{$signal} );
402 7         489 my $pending = \$self->{signals}{$signal}->pending;
403              
404 7         276 my $signum = $self->signame2num( $signal );
405 7         3782 sigprocmask( SIG_BLOCK, POSIX::SigSet->new( $signum ) );
406              
407             # Note this is an unsafe signal handler, and as such it should do as little
408             # as possible.
409 7     16   353 my $sigaction = POSIX::SigAction->new( sub { $$pending = 1 } );
  16         283  
410 7 50       533 sigaction( $signum, $sigaction ) or croak "Unable to sigaction - $!";
411             }
412              
413             sub unwatch_signal
414             {
415 7     7 1 1939 my $self = shift;
416 7         67 my ( $signal ) = @_;
417              
418 7 50       140 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
419              
420             # When we saved the original value, we might have got an undef. But %SIG
421             # doesn't like having undef assigned back in, so we need to translate
422 7   100     451 $SIG{$signal} = ( $self->{signals}{$signal} && $self->{signals}{$signal}->orig ) || 'DEFAULT';
423              
424 7         985 delete $self->{signals}{$signal};
425            
426 7         138 my $signum = $self->signame2num( $signal );
427              
428 7         1487 sigprocmask( SIG_UNBLOCK, POSIX::SigSet->new( $signum ) );
429             }
430              
431             sub post_fork
432             {
433 4     4 1 63 my $self = shift;
434              
435 4         9.41216398963475e-316 $self->{epoll} = Linux::Epoll->new;
436 4         32 $self->{pid} = $$;
437              
438 4 50       89 my $watches = $self->{iowatches} or return;
439              
440 4         64 foreach my $watch ( values %$watches ) {
441 0           my ( $handle, $on_read_ready, $on_write_ready, $on_hangup ) = @$watch;
442 0           $self->watch_io(
443             handle => $handle,
444             on_read_ready => $on_read_ready,
445             on_write_ready => $on_write_ready,
446             on_hangup => $on_hangup,
447             );
448             }
449             }
450              
451             =head1 SEE ALSO
452              
453             =over 4
454              
455             =item *
456              
457             L - O(1) multiplexing for Linux
458              
459             =item *
460              
461             L - use IO::Async with poll(2)
462              
463             =back
464              
465             =head1 AUTHOR
466              
467             Paul Evans
468              
469             =cut
470              
471             0x55AA;