File Coverage

blib/lib/IO/Async/Loop/Epoll.pm
Criterion Covered Total %
statement 195 200 97.5
branch 79 114 69.3
condition 27 30 90.0
subroutine 24 24 100.0
pod 7 8 87.5
total 332 376 88.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Loop::Epoll 0.23;
7              
8 12     12   5680270 use v5.14;
  12         83  
9 12     12   79 use warnings;
  12         28  
  12         865  
10              
11 12     12   72 use constant API_VERSION => '0.76';
  12         20  
  12         1188  
12              
13             # Only Linux is known always to be able to report EOF conditions on
14             # filehandles using EPOLLHUP
15             # This is probably redundant as epoll is probably Linux-only also, but it
16             # doesn't harm anything to test specially.
17 12     12   75 use constant _CAN_ON_HANGUP => ( $^O eq "linux" );
  12         62  
  12         774  
18              
19 12     12   96 use base qw( IO::Async::Loop );
  12         21  
  12         11475  
20              
21 12     12   301785 use Carp;
  12         33  
  12         1122  
22              
23 12     12   8167 use Linux::Epoll 0.005;
  12         61910  
  12         1058  
24              
25 12     12   127 use POSIX qw( EINTR EPERM SIG_BLOCK SIG_UNBLOCK sigprocmask sigaction ceil );
  12         69  
  12         215  
26 12     12   1844 use Scalar::Util qw( refaddr );
  12         33  
  12         732  
27              
28 12     12   76 use constant _CAN_WATCHDOG => 1;
  12         28  
  12         1143  
29 12     12   82 use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
  12         26  
  12         928  
30              
31 12     12   7716 use Struct::Dumb;
  12         111197  
  12         96  
32              
33             struct SignalWatch => [qw( code pending orig )];
34              
35             =head1 NAME
36              
37             C - use C with C on Linux
38              
39             =head1 SYNOPSIS
40              
41             =for highlighter language=perl
42              
43             use IO::Async::Loop::Epoll;
44              
45             use IO::Async::Stream;
46             use IO::Async::Signal;
47              
48             my $loop = IO::Async::Loop::Epoll->new;
49              
50             $loop->add( IO::Async::Stream->new(
51             read_handle => \*STDIN,
52             on_read => sub {
53             my ( $self, $buffref ) = @_;
54             while( $$buffref =~ s/^(.*)\r?\n// ) {
55             print "You said: $1\n";
56             }
57             },
58             ) );
59              
60             $loop->add( IO::Async::Signal->new(
61             name => 'INT',
62             on_receipt => sub {
63             print "SIGINT, will now quit\n";
64             $loop->stop;
65             },
66             ) );
67              
68             $loop->run;
69              
70             =head1 DESCRIPTION
71              
72             This subclass of L uses C on Linux to perform
73             read-ready and write-ready tests so that the OZ<>(1) high-performance
74             multiplexing of Linux's C syscall can be used.
75              
76             The C Linux subsystem uses a persistent registration system, meaning
77             that better performance can be achieved in programs using a large number of
78             filehandles. Each C syscall only has an overhead proportional
79             to the number of ready filehandles, rather than the total number being
80             watched. For more detail, see the C manpage.
81              
82             This class uses the C system call, which atomically switches
83             the process's signal mask, performs a wait exactly as C would,
84             then switches it back. This allows a process to block the signals it cares
85             about, but switch in an empty signal mask during the poll, allowing it to
86             handle file IO and signals concurrently.
87              
88             =cut
89              
90             =head1 CONSTRUCTOR
91              
92             =cut
93              
94             =head2 new
95              
96             $loop = IO::Async::Loop::Epoll->new();
97              
98             This function returns a new instance of a C object.
99              
100             =cut
101              
102             sub new
103             {
104 11     11 1 1088722 my $class = shift;
105 11         63 my ( %args ) = @_;
106              
107 11         722 my $epoll = Linux::Epoll->new;
108 11 50       145 defined $epoll or croak "Cannot create epoll handle - $!";
109              
110 11         161 my $self = $class->SUPER::__new( %args );
111              
112 11         9917 $self->{epoll} = $epoll;
113 11         158 $self->{sigmask} = POSIX::SigSet->new();
114 11         30 $self->{maxevents} = 8;
115              
116 11         31 $self->{fakeevents} = {};
117              
118 11         31 $self->{signals} = {}; # {$name} => SignalWatch
119 11         83 $self->{masks} = {};
120              
121 11         140 $self->{pid} = $$;
122              
123             # epoll gets very upset if applications close() filehandles without telling
124             # it, and then try to add that mask a second time. We can attempt to detect
125             # this by storing the mapping from fileno to refaddr($fh)
126 11         50 $self->{refaddr_for_fileno} = {};
127              
128 11         58 return $self;
129             }
130              
131             # Some bits to keep track of in {masks}
132             use constant {
133 12         35759 WATCH_READ => 0x01,
134             WATCH_WRITE => 0x02,
135             WATCH_HUP => 0x04,
136 12     12   3691 };
  12         24  
137              
138             =head1 METHODS
139              
140             As this is a subclass of L, all of its methods are inherited.
141             Expect where noted below, all of the class's methods behave identically to
142             C.
143              
144             =cut
145              
146             sub DESTROY
147             {
148 7     7   33065 my $self = shift;
149              
150 7         89 foreach my $signal ( keys %{ $self->{signals} } ) {
  7         785  
151 2         16 $self->unwatch_signal( $signal );
152             }
153             }
154              
155             sub is_running
156             {
157 1     1 0 60 my $self = shift;
158 1         11 return $self->{running};
159             }
160              
161             =head2 loop_once
162              
163             $count = $loop->loop_once( $timeout );
164              
165             This method calls C, and processes the results of that call.
166             It returns the total number of C callbacks invoked, or
167             C if the underlying C method returned an error. If the
168             C was interrupted by a signal, then 0 is returned instead.
169              
170             =cut
171              
172             sub loop_once
173             {
174 55     55 1 3103754 my $self = shift;
175 55         143 my ( $timeout ) = @_;
176              
177 55 50       533 $self->post_fork if $self->{pid} != $$;
178              
179 55         747 $self->_adjust_timeout( \$timeout );
180              
181 55 100       1383 $timeout = 0 if keys %{ $self->{fakeevents} };
  55         378  
182              
183             # Round up to next milisecond to avoid zero timeouts
184 55 50       494 my $msec = defined $timeout ? ceil( $timeout * 1000 ) : -1;
185              
186 55         475 $self->pre_wait;
187 55         14503668 my $ret = $self->{epoll}->wait( $self->{maxevents}, $msec / 1000, $self->{sigmask} );
188 55         843 $self->post_wait;
189              
190 55 50 66     1628 return undef if !defined $ret and $! != EINTR;
191              
192 55   100     313 my $count = $ret || 0;
193              
194 55         88 if( WATCHDOG_ENABLE and !$self->{alarmed} ) {
195             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
196             $self->{alarmed}++;
197             }
198              
199 55         189 my $iowatches = $self->{iowatches};
200              
201 55         128 my $fakeevents = $self->{fakeevents};
202 55         221 my @fakeevents = map { [ $_ => $fakeevents->{$_} ] } keys %$fakeevents;
  1         5  
203              
204 55         170 foreach my $ev ( @fakeevents ) {
205 1         3 my ( $fd, $bits ) = @$ev;
206              
207 1         3 my $watch = $iowatches->{$fd};
208              
209 1 50       8 if( $bits & WATCH_READ ) {
210 1 50       6 $watch->[1]->() if $watch->[1];
211 1         2 $count++;
212             }
213              
214 1 50       4 if( $bits & WATCH_WRITE ) {
215 1 50       4 $watch->[2]->() if $watch->[2];
216 1         3 $count++;
217             }
218              
219 1 50       4 if( $bits & WATCH_HUP ) {
220 0 0       0 $watch->[3]->() if $watch->[3];
221 0         0 $count++;
222             }
223             }
224              
225 55         213 my $signals = $self->{signals};
226 55         189 foreach my $sigslot ( values %$signals ) {
227 12 100       698 if( $sigslot->pending ) {
228 11         291 $sigslot->pending = 0;
229 11         266 $sigslot->code->();
230 11         647 $count++;
231             }
232             }
233              
234 55         449 $count += $self->_manage_queues;
235              
236             # If we entirely filled the event buffer this time, we may have missed some
237             # Lets get a bigger buffer next time
238 55 50 66     2399 $self->{maxevents} *= 2 if defined $ret and $ret == $self->{maxevents};
239              
240 55         96 alarm( 0 ), undef $self->{alarmed} if WATCHDOG_ENABLE;
241              
242 55         289 return $count;
243             }
244              
245             sub watch_io
246             {
247 19     19 1 52943 my $self = shift;
248 19         273 my %params = @_;
249              
250 19 100       651 $self->post_fork if $self->{pid} != $$;
251              
252 19         62 my $epoll = $self->{epoll};
253              
254 19         538 $self->__watch_io( %params );
255              
256 19         1703 my $handle = $params{handle};
257 19         58 my $fd = $handle->fileno;
258              
259 19         158 my $watch = $self->{iowatches}->{$fd};
260              
261 19         157 my $alarmed = \$self->{alarmed};
262              
263 19   100     312 my $curmask = $self->{masks}->{$fd} || 0;
264             my $cb = $self->{callbacks}->{$fd} ||= sub {
265 19     19   63 my ( $events ) = @_;
266              
267 19         48 if( WATCHDOG_ENABLE and !$$alarmed ) {
268             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
269             $$alarmed = 1;
270             }
271              
272 19 100 100     92 if( $events->{in} or $events->{hup} or $events->{err} ) {
      100        
273 16 100       141 $watch->[1]->() if $watch->[1];
274             }
275              
276 19 100 100     253 if( $events->{out} or $events->{hup} or $events->{err} ) {
      100        
277 13 100       38 $watch->[2]->() if $watch->[2];
278             }
279              
280 19 100 100     148 if( $events->{hup} or $events->{err} ) {
281 6 100       59 $watch->[3]->() if $watch->[3];
282             }
283 19   66     682 };
284              
285 19         84 my $mask = $curmask;
286 19 100       179 $params{on_read_ready} and $mask |= WATCH_READ;
287 19 100       156 $params{on_write_ready} and $mask |= WATCH_WRITE;
288 19 100       146 $params{on_hangup} and $mask |= WATCH_HUP;
289              
290 19         90 my @bits;
291 19 100       162 push @bits, 'in' if $mask & WATCH_READ;
292 19 100       103 push @bits, 'out' if $mask & WATCH_WRITE;
293 19 100       227 push @bits, 'hup' if $mask & WATCH_HUP;
294              
295 19         74 my $fakeevents = $self->{fakeevents};
296              
297 19 100       206 if( !$curmask ) {
    50          
298 16 50       179 defined $self->{refaddr_for_fileno}->{$fd} and
299             croak "Epoll has already seen this filehandle; cannot add it a second time";
300 16         80 $self->{refaddr_for_fileno}->{$fd} = refaddr $handle;
301              
302 16 100       323 if( defined $epoll->add( $handle, \@bits, $cb ) ) {
    50          
303             # All OK
304             }
305             elsif( $! == EPERM ) {
306             # The filehandle isn't epoll'able. This means kernel thinks it should
307             # always be ready.
308 1         3 $fakeevents->{$fd} = $mask;
309             }
310             else {
311 0         0 croak "Cannot EPOLL_CTL_ADD($fd,$mask) - $!";
312             }
313              
314 16         176 $self->{masks}->{$fd} = $mask;
315             }
316             elsif( $mask != $curmask ) {
317 3 50       13 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
318             croak "Epoll cannot cope with fd $fd changing handle under it";
319              
320 3 50       28 if( exists $fakeevents->{$fd} ) {
321 0         0 $fakeevents->{$fd} = $mask;
322             }
323             else {
324 3 50       115 defined $epoll->modify( $handle, \@bits, $cb )
325             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
326             }
327              
328 3         21 $self->{masks}->{$fd} = $mask;
329             }
330             }
331              
332             sub unwatch_io
333             {
334 13     13 1 7775 my $self = shift;
335 13         53 my %params = @_;
336              
337 13 50       87 $self->post_fork if $self->{pid} != $$;
338              
339 13         83 $self->__unwatch_io( %params );
340              
341 13         508 my $epoll = $self->{epoll};
342              
343 13         24 my $handle = $params{handle};
344 13         32 my $fd = $handle->fileno;
345              
346 13 50       108 my $curmask = $self->{masks}->{$fd} or return;
347 13 50       137 my $cb = $self->{callbacks}->{$fd} or return;
348              
349 13         24 my $mask = $curmask;
350 13 100       40 $params{on_read_ready} and $mask &= ~WATCH_READ;
351 13 100       29 $params{on_write_ready} and $mask &= ~WATCH_WRITE;
352 13 100       87 $params{on_hangup} and $mask &= ~WATCH_HUP;
353              
354 13         23 my $fakeevents = $self->{fakeevents};
355              
356 13 50       39 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
357             croak "Epoll cannot cope with fd $fd changing handle under it";
358              
359 13 100       31 if( $mask ) {
360 1 50       7 if( exists $fakeevents->{$fd} ) {
361 0         0 $fakeevents->{$fd} = $mask;
362             }
363             else {
364 1         3 my @bits;
365 1 50       5 push @bits, 'in' if $mask & WATCH_READ;
366 1 50       3 push @bits, 'out' if $mask & WATCH_WRITE;
367 1 50       5 push @bits, 'hup' if $mask & WATCH_HUP;
368              
369 1 50       17 defined $epoll->modify( $handle, \@bits, $cb )
370             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
371             }
372              
373 1         6 $self->{masks}->{$fd} = $mask;
374             }
375             else {
376 12 100       28 if( exists $fakeevents->{$fd} ) {
377 1         3 delete $fakeevents->{$fd};
378             }
379             else {
380 11 50       166 defined $epoll->delete( $handle )
381             or croak "Cannot EPOLL_CTL_DEL($fd) - $!";
382             }
383              
384 12         44 delete $self->{masks}->{$fd};
385 12         21 delete $self->{callbacks}->{$fd};
386              
387 12         362 delete $self->{refaddr_for_fileno}->{$fd};
388             }
389             }
390              
391             sub watch_signal
392             {
393 5     5 1 19668 my $self = shift;
394 5         44 my ( $signal, $code ) = @_;
395              
396 5 100       366 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
397              
398             # We cannot simply set $SIG{$signal} = $code here, because of perl bug
399             # http://rt.perl.org/rt3/Ticket/Display.html?id=82040
400             # Instead, we'll store a tiny piece of code that just sets a flag, and
401             # check the flags on return from the epoll_pwait call.
402              
403 4         72 $self->{signals}{$signal} = SignalWatch( $code, 0, $SIG{$signal} );
404 4         819 my $pending = \$self->{signals}{$signal}->pending;
405              
406 4         101 my $signum = $self->signame2num( $signal );
407 4         2034 sigprocmask( SIG_BLOCK, POSIX::SigSet->new( $signum ) );
408              
409             # Note this is an unsafe signal handler, and as such it should do as little
410             # as possible.
411 4     11   164 my $sigaction = POSIX::SigAction->new( sub { $$pending = 1 } );
  11         156  
412 4 50       248 sigaction( $signum, $sigaction ) or croak "Unable to sigaction - $!";
413             }
414              
415             sub unwatch_signal
416             {
417 4     4 1 2620 my $self = shift;
418 4         17 my ( $signal ) = @_;
419              
420 4 50       27 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
421              
422             # When we saved the original value, we might have got an undef. But %SIG
423             # doesn't like having undef assigned back in, so we need to translate
424 4   100     185 $SIG{$signal} = ( $self->{signals}{$signal} && $self->{signals}{$signal}->orig ) || 'DEFAULT';
425              
426 4         294 delete $self->{signals}{$signal};
427            
428 4         32 my $signum = $self->signame2num( $signal );
429              
430 4         361 sigprocmask( SIG_UNBLOCK, POSIX::SigSet->new( $signum ) );
431             }
432              
433             sub post_fork
434             {
435 6     6 1 3388 my $self = shift;
436              
437 6         3.05791252876794e-309 my $epoll = $self->{epoll} = Linux::Epoll->new;
438 6         121 $self->{pid} = $$;
439              
440 6 50       338 my $watches = $self->{iowatches} or return;
441 6         129 my $fakeevents = $self->{fakeevents};
442              
443 6         174 foreach my $watch ( values %$watches ) {
444 1         3 my ( $handle ) = @$watch;
445 1         3 my $fd = $handle->fileno;
446              
447 1 50       8 next if $fakeevents->{$fd};
448              
449 1         3 my $mask = $self->{masks}->{$fd};
450 1         1 my @bits;
451 1 50       4 push @bits, 'in' if $mask & WATCH_READ;
452 1 50       3 push @bits, 'out' if $mask & WATCH_WRITE;
453 1 50       3 push @bits, 'hup' if $mask & WATCH_HUP;
454              
455 1         2 my $cb = $self->{callbacks}->{$fd};
456 1         10 $epoll->add( $handle, \@bits, $cb );
457             }
458             }
459              
460             =head1 SEE ALSO
461              
462             =over 4
463              
464             =item *
465              
466             L - O(1) multiplexing for Linux
467              
468             =item *
469              
470             L - use IO::Async with poll(2)
471              
472             =back
473              
474             =head1 AUTHOR
475              
476             Paul Evans
477              
478             =cut
479              
480             0x55AA;