File Coverage

blib/lib/Future/IO/ImplBase.pm
Criterion Covered Total %
statement 135 153 88.2
branch 34 50 68.0
condition 24 47 51.0
subroutine 33 36 91.6
pod 12 12 100.0
total 238 298 79.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2026 -- leonerd@leonerd.org.uk
5              
6             package Future::IO::ImplBase 0.23;
7              
8 22     22   177273 use v5.14;
  22         79  
9 22     22   148 use warnings;
  22         54  
  22         1386  
10              
11 22     22   516 use Future::IO qw( POLLIN POLLOUT POLLPRI );
  22         34  
  22         1500  
12              
13 22     22   4600 use Errno qw( EAGAIN EWOULDBLOCK EINPROGRESS );
  22         14143  
  22         2058  
14 22     22   6789 use Socket qw( SOL_SOCKET SO_ERROR );
  22         50671  
  22         3046  
15 22     22   9042 use Struct::Dumb qw( readonly_struct );
  22         167361  
  22         136  
16 22     22   17701 use Time::HiRes qw( time );
  22         40  
  22         180  
17              
18             # connect() yields EWOULDBLOCK on MSWin32
19 22     22   1754 use constant CONNECT_EWOULDBLOCK => ( $^O eq "MSWin32" );
  22         46  
  22         1461  
20              
21 22     22   154 use constant HAVE_MULTIPLE_FILEHANDLES => 1;
  22         56  
  22         1378  
22              
23             =head1 NAME
24              
25             C - base class for C implementations
26              
27             =head1 DESCRIPTION
28              
29             This package provides a few utility methods that may help writing actual
30             L implementation classes. It is entirely optional; implementations
31             are not required to use it.
32              
33             =cut
34              
35             =head1 CLASS METHODS
36              
37             =cut
38              
39             =head2 APPLY
40              
41             __PACKAGE__->APPLY;
42              
43             Attempts to set the value of the C<$Future::IO::IMPL> variable to the name of
44             the calling package.
45              
46             =cut
47              
48             sub APPLY
49             {
50 5     5 1 868395 my $pkg = shift;
51              
52 22     22   108 no warnings 'once';
  22         48  
  22         38751  
53 5 50 33     45 ( $Future::IO::IMPL //= $pkg ) eq $pkg or
54             warn "Unable to set Future::IO implementation to $pkg".
55             " as it is already $Future::IO::IMPL\n";
56             }
57              
58             =head1 DEFAULT METHODS
59              
60             These methods are provided based on lower-level functionallity that the
61             implementing class should provide.
62              
63             =cut
64              
65             =head2 accept
66              
67             Implemented by wrapping C, as L uses.
68              
69             =cut
70              
71             sub accept
72             {
73 1     1 1 2 my $self = shift;
74 1         2 my ( $fh ) = @_;
75              
76             return $self->poll( $fh, POLLIN )->then( sub {
77 1     1   87 my $accepted = $fh->accept;
78 1 50       157 if( $accepted ) {
79 1         9 return Future->done( $accepted );
80             }
81             else {
82 0         0 return Future->fail( "accept: $!\n", accept => $fh, $! );
83             }
84 1         6 } );
85             }
86              
87             =head2 alarm
88              
89             Implemented by wrapping C.
90              
91             =cut
92              
93             sub alarm
94             {
95 0     0 1 0 my $self = shift;
96 0         0 my ( $time ) = @_;
97              
98 0         0 return $self->sleep( $time - time() );
99             }
100              
101             =head2 connect
102              
103             Implemented by wrapping C, as L uses.
104              
105             =cut
106              
107             sub connect
108             {
109 2     2 1 6 my $self = shift;
110 2         6 my ( $fh, $name ) = @_;
111              
112             # We can't use IO::Socket->connect here because
113             # https://github.com/Perl/perl5/issues/19326
114              
115 2         215 my $ret = CORE::connect( $fh, $name );
116 2         23 my $errno = $!;
117              
118 2 50       10 return Future->done if $ret;
119              
120 2 50 50     14 unless( $errno == EINPROGRESS or
121             ( CONNECT_EWOULDBLOCK and $errno == EWOULDBLOCK ) ) {
122 0         0 return Future->fail( "connect: $errno\n", connect => $fh, $errno );
123             }
124              
125             # not synchronous result
126              
127             return $self->poll( $fh, POLLOUT|POLLPRI )->then( sub {
128 2     2   297 $errno = $fh->getsockopt( SOL_SOCKET, SO_ERROR );
129              
130 2 100       58 if( $errno ) {
131 1         5 $! = $errno;
132 1         27 return Future->fail( "connect: $!\n", connect => $fh, $! );
133             }
134              
135 1         6 return Future->done;
136 2         19 } );
137             }
138              
139             =head2 recv
140              
141             =head2 recvfrom
142              
143             Implemented by wrapping C, as L uses.
144              
145             =cut
146              
147             sub _recv1
148             {
149 8     8   17 my $self = shift;
150 8         20 my ( $f, $with_fromaddr, $fh, $length, $flags ) = @_;
151              
152             my $waitf = $self->poll( $fh, POLLIN )->on_done( sub {
153 6     6   547 my $fromaddr = $fh->recv( my $buf, $length, $flags );
154 6 100 66     220 if( defined $fromaddr and length $buf ) {
    50 0        
    0          
155 4 100       49 $f->done( $buf, $with_fromaddr ? ( $fromaddr ) : () );
156             }
157             elsif( defined $fromaddr ) {
158 2         9 $f->done(); # fromaddr is not interesting at EOF
159             }
160             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
161             # Try again
162 0         0 $self->_recv1( $f, $with_fromaddr, $fh, $length, $flags );
163             }
164             else {
165 0 0       0 my $name = $with_fromaddr ? "recvfrom" : "recv";
166 0         0 $f->fail( "$name: $!\n", $name => $fh, $! );
167             }
168 8         33 });
169              
170 8   33     288 $f //= $waitf->new;
171              
172 8         114 $f->on_cancel( $waitf );
173              
174 8         248 return $f;
175             }
176              
177             sub recv
178             {
179 4     4 1 11 my $self = shift;
180 4         20 return $self->_recv1( undef, 0, @_ );
181             }
182              
183             sub recvfrom
184             {
185 4     4 1 9 my $self = shift;
186 4         13 return $self->_recv1( undef, 1, @_ );
187             }
188              
189             =head2 send
190              
191             Implemented by wrapping C, as L uses.
192              
193             =cut
194              
195             sub _send1
196             {
197 5     5   68 my $self = shift;
198 5         12 my ( $f, $fh, $data, $flags, $to ) = @_;
199              
200             my $waitf = $self->poll( $fh, POLLOUT )->on_done( sub {
201 4     4   349 my $len;
202             # IO::Socket->send itself might die
203 4 50       7 my $e = eval { $len = $fh->send( $data, $flags, $to ); 1 } ? undef : $@;
  4         19  
  4         206  
204              
205 4 50 33     31 if( defined $e ) {
    100          
    50          
206 0         0 $f->fail( "send: $e\n", send => $fh, $! );
207             }
208             elsif( defined $len ) {
209 3         9 $f->done( $len );
210             }
211             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
212             # Try again
213 0         0 $self->_send1( $f, $fh, $data, $flags, $to );
214             }
215             else {
216 1         24 $f->fail( "send: $!\n", send => $fh, $! );
217             }
218 5         19 } );
219              
220 5   33     144 $f //= $waitf->new;
221              
222 5         44 $f->on_cancel( $waitf );
223              
224 5         129 return $f;
225             }
226              
227             sub send
228             {
229 5     5 1 9 my $self = shift;
230 5         19 return $self->_send1( undef, @_ );
231             }
232              
233             =head2 sysread
234              
235             Requires a lower-level method
236              
237             $f = $class->poll( $fh, POLLIN );
238              
239             which should return a Future that completes when the given filehandle may be
240             ready for reading.
241              
242             =cut
243              
244             sub _sysread1
245             {
246 134     134   305 my $self = shift;
247 134         315 my ( $f, $fh, $length ) = @_;
248              
249             my $waitf = $self->poll( $fh, POLLIN )->on_done( sub {
250 131     131   6352483 my $ret = $fh->sysread( my $buf, $length );
251 131 100 33     108235 if( $ret ) {
    100          
    50          
252 20         109 $f->done( $buf );
253             }
254             elsif( defined $ret ) {
255             # EOF
256 11         42 $f->done();
257             }
258             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
259             # Try again
260 100         315 $self->_sysread1( $f, $fh, $length );
261             }
262             else {
263 0         0 $f->fail( "sysread: $!\n", sysread => $fh, $! );
264             }
265 134         620 });
266              
267 134   66     8996 $f //= $waitf->new;
268              
269 134         617 $f->on_cancel( $waitf );
270              
271 134         3069 return $f;
272             }
273              
274             sub sysread
275             {
276 34     34 1 139 my $self = shift;
277 34         371 return $self->_sysread1( undef, @_ );
278             }
279              
280             =head2 syswrite
281              
282             Requires a lower-level method
283              
284             $f = $class->poll( $fh, POLLOUT );
285              
286             which should return a Future that completes when the given filehandle may be
287             ready for writing.
288              
289             =cut
290              
291             sub _syswrite1
292             {
293 15     15   65 my $self = shift;
294 15         84 my ( $f, $fh, $data ) = @_;
295              
296             my $waitf = $self->poll( $fh, POLLOUT )->on_done( sub {
297 13     13   2206 my $len = $fh->syswrite( $data );
298 13 100 33     403 if( defined $len ) {
    50          
299 11         64 $f->done( $len );
300             }
301             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
302             # Try again
303 0         0 $self->_syswrite1( $f, $fh, $data );
304             }
305             else {
306 2         29 $f->fail( "syswrite: $!\n", syswrite => $fh, $! );
307             }
308 15         118 });
309              
310 15   33     1968 $f //= $waitf->new;
311              
312 15         241 $f->on_cancel( $waitf );
313              
314 15         678 return $f;
315             }
316              
317             sub syswrite
318             {
319 15     15 1 353 my $self = shift;
320 15         192 return $self->_syswrite1( undef, @_ );
321             }
322              
323             =head1 OPTIONAL METHODS
324              
325             The following methods may be directly provided by an implementation, or they
326             may be emulated by this base class by other means. It is usually better to
327             provide these methods in an implementation if it can do so more efficiently or
328             better in those modules; these emulations are provided as a worst-case
329             fallback and may not be ideal.
330              
331             These methods will require a helper method provided by the implementation
332             class to construct new C instances of its chosen type.
333              
334             $f = $class->_new_future;
335              
336             =cut
337              
338             =head2 sleep
339              
340             I
341              
342             Emulated by maintaining a queue of C and C timers. Two helper
343             methods are provided for the implementation to manage this queue.
344              
345             $timeout = $class->_timeout;
346              
347             $class->_manage_timers;
348              
349             The C<_timeout> method returns a value in seconds to the delay until when the
350             next timer will expire. This may be C if there are none waiting. The
351             C<_manage_timers> method may be called at any time to invoke any of the timers
352             that have now expired.
353              
354             =cut
355              
356             readonly_struct Alarm => [qw( time f )];
357             my @alarms;
358             my $cancelled_alarms;
359              
360             sub _timeout
361             {
362 4     4   47 shift;
363              
364 4         6 my $timeout;
365              
366 4   66     89 ( shift @alarms, $cancelled_alarms-- )
367             while @alarms and $alarms[0]->f->is_cancelled;
368              
369 4 50       120 if( @alarms ) {
370             # These are sorted by time order, so head is soonest
371 4         54 $timeout = $alarms[0]->time - time();
372 4 50       27 $timeout = 0 if $timeout < 0;
373             }
374              
375 4         18 return $timeout;
376             }
377              
378             sub _manage_timers
379             {
380 6     6   5137 shift;
381 6         15 my $now = time();
382              
383 6         53 while( @alarms ) {
384 19 100       551 last if $alarms[0]->time > $now;
385              
386 15         275 my $f = ( shift @alarms )->f;
387              
388 15 100       74 $cancelled_alarms--, next if $f->is_cancelled;
389 9         50 $f->done;
390             }
391             }
392              
393             sub _compact_alarms
394             {
395 4     4   8 @alarms = grep { !$_->f->is_cancelled } @alarms;
  186         7064  
396 4         54 $cancelled_alarms = 0;
397             }
398              
399             sub sleep
400             {
401 112     112 1 148338 my $class = shift;
402 112         120 my ( $secs ) = @_;
403              
404 112         168 my $time = time() + $secs;
405              
406 112         292 my $f = $class->_new_future;
407 112         677 my $alarm = Alarm( $time, $f );
408              
409 112 100 100     7215 if( !@alarms or $time >= $alarms[-1]->time ) {
410             # Quick path, just push it on the end
411 109         477 push @alarms, $alarm;
412             }
413             else {
414             # Need to find the right point to splice() it into. It's more likely
415             # that the new alarm goes at the end of the queue so start our search
416             # there.
417             # This isn't a full binary search but a good compromise between fast
418             # performance and simple to write.
419 3         13 my $idx = $#alarms;
420 3   66     36 $idx = int( $idx/2 ) while $idx > 0 and $alarms[$idx]->time > $time;
421 3   66     79 $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
422              
423 3         125 splice @alarms, $idx, 0, $alarm;
424             }
425              
426             $f->on_cancel( sub {
427 103     103   9399 $cancelled_alarms++;
428              
429 103 100 100     235 _compact_alarms
430             if $cancelled_alarms >= 5 and $cancelled_alarms > @alarms/2;
431 112         419 } );
432              
433 112         1398 return $f;
434             }
435              
436             =head1 LEGACY METHODS
437              
438             The following methods are not considered part of the official C
439             implementation API, and should not be relied upon when writing new code.
440             However, existing code may still exist that uses them so for now they are
441             provided as wrappers.
442              
443             Later versions of this module may start printing deprecation warnings on these
444             methods, so existing code ought to be updated to use the newer forms now.
445              
446             =cut
447              
448             =head2 ready_for_read
449              
450             =head2 ready_for_write
451              
452             $f = $class->ready_for_read( $fh );
453              
454             $f = $class->ready_for_write( $fh );
455              
456             Implemented by wrapping C by passing in the C or C
457             flags respectively.
458              
459             =cut
460              
461             sub ready_for_read
462             {
463 0     0 1   my $self = shift;
464 0           my ( $fh ) = @_;
465              
466 0           return $self->poll( $fh, POLLIN );
467             # TODO: should we check the result before yielding?
468             }
469              
470             sub ready_for_write
471             {
472 0     0 1   my $self = shift;
473 0           my ( $fh ) = @_;
474              
475 0           return $self->poll( $fh, POLLOUT );
476             # TODO: should we check the result before yielding?
477             }
478              
479             =head1 AUTHOR
480              
481             Paul Evans
482              
483             =cut
484              
485             0x55AA;