File Coverage

blib/lib/Future/IO/ImplBase.pm
Criterion Covered Total %
statement 103 144 71.5
branch 24 42 57.1
condition 12 41 29.2
subroutine 28 35 80.0
pod 12 12 100.0
total 179 274 65.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2026 -- leonerd@leonerd.org.uk
5              
6             package Future::IO::ImplBase 0.22;
7              
8 21     21   344 use v5.14;
  21         110  
9 21     21   144 use warnings;
  21         79  
  21         1464  
10              
11 21     21   117 use Future::IO qw( POLLIN POLLOUT POLLPRI );
  21         33  
  21         1842  
12              
13 21     21   5544 use Errno qw( EAGAIN EWOULDBLOCK EINPROGRESS );
  21         18030  
  21         2304  
14 21     21   8353 use Socket qw( SOL_SOCKET SO_ERROR );
  21         61902  
  21         3605  
15 21     21   11041 use Struct::Dumb qw( readonly_struct );
  21         198470  
  21         236  
16 21     21   1941 use Time::HiRes qw( time );
  21         44  
  21         218  
17              
18             # connect() yields EWOULDBLOCK on MSWin32
19 21     21   2009 use constant CONNECT_EWOULDBLOCK => ( $^O eq "MSWin32" );
  21         54  
  21         1693  
20              
21 21     21   150 use constant HAVE_MULTIPLE_FILEHANDLES => 1;
  21         56  
  21         1698  
22              
23             =head1 NAME
24              
25             C - base class for C implementations
26              
27             =head1 DESCRIPTION
28              
29             This package provides a few utility methods that may help writing actual
30             L implementation classes. It is entirely optional; implementations
31             are not required to use it.
32              
33             =cut
34              
35             =head1 CLASS METHODS
36              
37             =cut
38              
39             =head2 APPLY
40              
41             __PACKAGE__->APPLY;
42              
43             Attempts to set the value of the C<$Future::IO::IMPL> variable to the name of
44             the calling package.
45              
46             =cut
47              
48             sub APPLY
49             {
50 5     5 1 1562790 my $pkg = shift;
51              
52 21     21   137 no warnings 'once';
  21         40  
  21         43331  
53 5 50 33     70 ( $Future::IO::IMPL //= $pkg ) eq $pkg or
54             warn "Unable to set Future::IO implementation to $pkg".
55             " as it is already $Future::IO::IMPL\n";
56             }
57              
58             =head1 DEFAULT METHODS
59              
60             These methods are provided based on lower-level functionallity that the
61             implementing class should provide.
62              
63             =cut
64              
65             =head2 accept
66              
67             Implemented by wrapping C, as L uses.
68              
69             =cut
70              
71             sub accept
72             {
73 1     1 1 2 my $self = shift;
74 1         3 my ( $fh ) = @_;
75              
76             return $self->poll( $fh, POLLIN )->then( sub {
77 1     1   130 my $accepted = $fh->accept;
78 1 50       151 if( $accepted ) {
79 1         24 return Future->done( $accepted );
80             }
81             else {
82 0         0 return Future->fail( "accept: $!\n", accept => $fh, $! );
83             }
84 1         8 } );
85             }
86              
87             =head2 alarm
88              
89             Implemented by wrapping C.
90              
91             =cut
92              
93             sub alarm
94             {
95 0     0 1 0 my $self = shift;
96 0         0 my ( $time ) = @_;
97              
98 0         0 return $self->sleep( $time - time() );
99             }
100              
101             =head2 connect
102              
103             Implemented by wrapping C, as L uses.
104              
105             =cut
106              
107             sub connect
108             {
109 2     2 1 5 my $self = shift;
110 2         4 my ( $fh, $name ) = @_;
111              
112             # We can't use IO::Socket->connect here because
113             # https://github.com/Perl/perl5/issues/19326
114              
115 2         181 my $ret = CORE::connect( $fh, $name );
116 2         18 my $errno = $!;
117              
118 2 50       21 return Future->done if $ret;
119              
120 2 50 50     9 unless( $errno == EINPROGRESS or
121             ( CONNECT_EWOULDBLOCK and $errno == EWOULDBLOCK ) ) {
122 0         0 return Future->fail( "connect: $errno\n", connect => $fh, $errno );
123             }
124              
125             # not synchronous result
126              
127             return $self->poll( $fh, POLLOUT|POLLPRI )->then( sub {
128 2     2   188 $errno = $fh->getsockopt( SOL_SOCKET, SO_ERROR );
129              
130 2 100       41 if( $errno ) {
131 1         4 $! = $errno;
132 1         17 return Future->fail( "connect: $!\n", connect => $fh, $! );
133             }
134              
135 1         8 return Future->done;
136 2         15 } );
137             }
138              
139             =head2 recv
140              
141             =head2 recvfrom
142              
143             Implemented by wrapping C, as L uses.
144              
145             =cut
146              
147             sub _recv1
148             {
149 8     8   11 my $self = shift;
150 8         17 my ( $f, $with_fromaddr, $fh, $length, $flags ) = @_;
151              
152             my $waitf = $self->poll( $fh, POLLIN )->on_done( sub {
153 6     6   398 my $fromaddr = $fh->recv( my $buf, $length, $flags );
154 6 100 66     121 if( defined $fromaddr and length $buf ) {
    50 0        
    0          
155 4 100       12 $f->done( $buf, $with_fromaddr ? ( $fromaddr ) : () );
156             }
157             elsif( defined $fromaddr ) {
158 2         6 $f->done(); # fromaddr is not interesting at EOF
159             }
160             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
161             # Try again
162 0         0 $self->_recv1( $f, $with_fromaddr, $fh, $length, $flags );
163             }
164             else {
165 0 0       0 my $name = $with_fromaddr ? "recvfrom" : "recv";
166 0         0 $f->fail( "$name: $!\n", $name => $fh, $! );
167             }
168 8         19 });
169              
170 8   33     188 $f //= $waitf->new;
171              
172 8         59 $f->on_cancel( $waitf );
173              
174 8         167 return $f;
175             }
176              
177             sub recv
178             {
179 4     4 1 8 my $self = shift;
180 4         12 return $self->_recv1( undef, 0, @_ );
181             }
182              
183             sub recvfrom
184             {
185 4     4 1 6 my $self = shift;
186 4         8 return $self->_recv1( undef, 1, @_ );
187             }
188              
189             =head2 send
190              
191             Implemented by wrapping C, as L uses.
192              
193             =cut
194              
195             sub _send1
196             {
197 5     5   7 my $self = shift;
198 5         13 my ( $f, $fh, $data, $flags, $to ) = @_;
199              
200             my $waitf = $self->poll( $fh, POLLOUT )->on_done( sub {
201 4     4   285 my $len;
202             # IO::Socket->send itself might die
203 4 50       8 my $e = eval { $len = $fh->send( $data, $flags, $to ); 1 } ? undef : $@;
  4         19  
  4         180  
204              
205 4 50 33     22 if( defined $e ) {
    100          
    50          
206 0         0 $f->fail( "send: $e\n", send => $fh, $! );
207             }
208             elsif( defined $len ) {
209 3         28 $f->done( $len );
210             }
211             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
212             # Try again
213 0         0 $self->_send1( $f, $fh, $data, $flags, $to );
214             }
215             else {
216 1         15 $f->fail( "send: $!\n", send => $fh, $! );
217             }
218 5         18 } );
219              
220 5   33     132 $f //= $waitf->new;
221              
222 5         42 $f->on_cancel( $waitf );
223              
224 5         156 return $f;
225             }
226              
227             sub send
228             {
229 5     5 1 11 my $self = shift;
230 5         18 return $self->_send1( undef, @_ );
231             }
232              
233             =head2 sysread
234              
235             Requires a lower-level method
236              
237             $f = $class->poll( $fh, POLLIN );
238              
239             which should return a Future that completes when the given filehandle may be
240             ready for reading.
241              
242             =cut
243              
244             sub _sysread1
245             {
246 134     134   283 my $self = shift;
247 134         379 my ( $f, $fh, $length ) = @_;
248              
249             my $waitf = $self->poll( $fh, POLLIN )->on_done( sub {
250 131     131   6021401 my $ret = $fh->sysread( my $buf, $length );
251 131 100 33     180959 if( $ret ) {
    100          
    50          
252 20         190 $f->done( $buf );
253             }
254             elsif( defined $ret ) {
255             # EOF
256 11         51 $f->done();
257             }
258             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
259             # Try again
260 100         488 $self->_sysread1( $f, $fh, $length );
261             }
262             else {
263 0         0 $f->fail( "sysread: $!\n", sysread => $fh, $! );
264             }
265 134         714 });
266              
267 134   66     7080 $f //= $waitf->new;
268              
269 134         1033 $f->on_cancel( $waitf );
270              
271 134         4105 return $f;
272             }
273              
274             sub sysread
275             {
276 34     34 1 96 my $self = shift;
277 34         252 return $self->_sysread1( undef, @_ );
278             }
279              
280             =head2 syswrite
281              
282             Requires a lower-level method
283              
284             $f = $class->poll( $fh, POLLOUT );
285              
286             which should return a Future that completes when the given filehandle may be
287             ready for writing.
288              
289             =cut
290              
291             sub _syswrite1
292             {
293 15     15   22 my $self = shift;
294 15         57 my ( $f, $fh, $data ) = @_;
295              
296             my $waitf = $self->poll( $fh, POLLOUT )->on_done( sub {
297 13     13   2069 my $len = $fh->syswrite( $data );
298 13 100 33     227 if( defined $len ) {
    50          
299 11         44 $f->done( $len );
300             }
301             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
302             # Try again
303 0         0 $self->_syswrite1( $f, $fh, $data );
304             }
305             else {
306 2         20 $f->fail( "syswrite: $!\n", syswrite => $fh, $! );
307             }
308 15         93 });
309              
310 15   33     1136 $f //= $waitf->new;
311              
312 15         173 $f->on_cancel( $waitf );
313              
314 15         404 return $f;
315             }
316              
317             sub syswrite
318             {
319 15     15 1 27 my $self = shift;
320 15         423 return $self->_syswrite1( undef, @_ );
321             }
322              
323             =head1 OPTIONAL METHODS
324              
325             The following methods may be directly provided by an implementation, or they
326             may be emulated by this base class by other means. It is usually better to
327             provide these methods in an implementation if it can do so more efficiently or
328             better in those modules; these emulations are provided as a worst-case
329             fallback and may not be ideal.
330              
331             These methods will require a helper method provided by the implementation
332             class to construct new C instances of its chosen type.
333              
334             $f = $class->_new_future;
335              
336             =cut
337              
338             =head2 sleep
339              
340             I
341              
342             Emulated by maintaining a queue of C and C timers. Two helper
343             methods are provided for the implementation to manage this queue.
344              
345             $timeout = $class->_timeout;
346              
347             $class->_manage_timers;
348              
349             The C<_timeout> method returns a value in seconds to the delay until when the
350             next timer will expire. This may be C if there are none waiting. The
351             C<_manage_timers> method may be called at any time to invoke any of the timers
352             that have now expired.
353              
354             =cut
355              
356             readonly_struct Alarm => [qw( time f )];
357             my @alarms;
358              
359             sub _timeout
360             {
361 0     0     shift;
362              
363 0           my $timeout;
364 0 0         if( @alarms ) {
365             # These are sorted by time order, so head is soonest
366 0           $timeout = $alarms[0]->time - time();
367 0 0         $timeout = 0 if $timeout < 0;
368             }
369              
370 0           return $timeout;
371             }
372              
373             sub _manage_timers
374             {
375 0     0     shift;
376 0           my $now = time();
377              
378 0   0       while( @alarms and $alarms[0]->time <= $now ) {
379 0           ( shift @alarms )->f->done;
380             }
381             }
382              
383             sub sleep
384             {
385 0     0 1   my $class = shift;
386 0           my ( $secs ) = @_;
387              
388 0           my $time = time() + $secs;
389              
390 0           my $f = $class->_new_future;
391              
392             # TODO: Binary search
393 0           my $idx = 0;
394 0   0       $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
395              
396 0           splice @alarms, $idx, 0, Alarm( $time, $f );
397              
398             $f->on_cancel( sub {
399 0     0     my $self = shift;
400 0           my $idx = 0;
401 0   0       $idx++ while $idx < @alarms and $alarms[$idx]->f != $self;
402              
403 0           splice @alarms, $idx, 1, ();
404 0           } );
405              
406 0           return $f;
407             }
408              
409             =head1 LEGACY METHODS
410              
411             The following methods are not considered part of the official C
412             implementation API, and should not be relied upon when writing new code.
413             However, existing code may still exist that uses them so for now they are
414             provided as wrappers.
415              
416             Later versions of this module may start printing deprecation warnings on these
417             methods, so existing code ought to be updated to use the newer forms now.
418              
419             =cut
420              
421             =head2 ready_for_read
422              
423             =head2 ready_for_write
424              
425             $f = $class->ready_for_read( $fh );
426              
427             $f = $class->ready_for_write( $fh );
428              
429             Implemented by wrapping C by passing in the C or C
430             flags respectively.
431              
432             =cut
433              
434             sub ready_for_read
435             {
436 0     0 1   my $self = shift;
437 0           my ( $fh ) = @_;
438              
439 0           return $self->poll( $fh, POLLIN );
440             # TODO: should we check the result before yielding?
441             }
442              
443             sub ready_for_write
444             {
445 0     0 1   my $self = shift;
446 0           my ( $fh ) = @_;
447              
448 0           return $self->poll( $fh, POLLOUT );
449             # TODO: should we check the result before yielding?
450             }
451              
452             =head1 AUTHOR
453              
454             Paul Evans
455              
456             =cut
457              
458             0x55AA;