File Coverage

blib/lib/Future/IO.pm
Criterion Covered Total %
statement 175 215 81.4
branch 35 74 47.3
condition 40 56 71.4
subroutine 44 49 89.8
pod 18 19 94.7
total 312 413 75.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2026 -- leonerd@leonerd.org.uk
5              
6             package Future::IO 0.22;
7              
8 21     21   4934990 use v5.14;
  21         84  
9 21     21   166 use warnings;
  21         48  
  21         1213  
10              
11 21     21   156 use Carp;
  21         47  
  21         2654  
12              
13             # These need to be visible to sub override_impl
14             my @pollers;
15             my @alarms;
16              
17             our $IMPL;
18              
19             our $MAX_READLEN = 8192;
20             our $MAX_WRITELEN = 8192;
21              
22 21     21   6969 use IO::Poll qw( POLLIN POLLOUT POLLPRI POLLHUP POLLERR POLLNVAL );
  21         45937  
  21         1963  
23              
24 21     21   142 use Exporter 'import';
  21         49  
  21         1075  
25             BEGIN {
26             # This needs to happen at BEGIN time because stupid cyclic reasons
27 21     21   50957 our @EXPORT_OK = qw( POLLIN POLLOUT POLLPRI POLLHUP POLLERR POLLNVAL );
28             }
29              
30             =head1 NAME
31              
32             C - Future-returning IO methods
33              
34             =head1 SYNOPSIS
35              
36             =for highlighter language=perl
37              
38             use Future::IO;
39             Future::IO->load_best_impl;
40              
41             my $delay = Future::IO->sleep( 5 );
42             # $delay will become done in 5 seconds time
43              
44             my $input = Future::IO->read( \*STDIN, 4096 );
45             # $input will yield some input from the STDIN IO handle
46              
47             =head1 DESCRIPTION
48              
49             This package provides a few basic methods that behave similarly to the
50             same-named core perl functions relating to IO operations, but yield their
51             results asynchronously via L instances.
52              
53             This is provided primarily as a decoupling mechanism, to allow modules to be
54             written that perform IO in an asynchronous manner to depend directly on this,
55             while allowing asynchronous event systems to provide an implementation of
56             these operations.
57              
58             =head2 Default Implementation
59              
60             If the C method is not invoked, a default implementation of
61             these operations is provided. This implementation allows a single queue of
62             C or C calls on a single filehandle only, combined with C
63             calls. It does not support C.
64              
65             It is provided for the simple cases where modules only need one filehandle
66             (most likely a single network socket or hardware device handle), allowing such
67             modules to work without needing a better event system.
68              
69             If there are both read/write and C futures pending, the implementation
70             will use C to wait for either. This may be problematic on MSWin32,
71             depending on what type of filehandle is involved.
72              
73             If C is not being used then the default implementation will
74             temporarily set filehandles into blocking mode (by switching off the
75             C flag) while performing IO on them.
76              
77             For cases where multiple filehandles are required, or for doing more involved
78             IO operations, a real implementation based on an actual event loop should be
79             loaded. It is recommended to use the L method to do this, if
80             there are no other specific requirements. If the program is already using some
81             other event system, such as L or L, it is best to directly load
82             the relevant implementation module in the toplevel program.
83              
84             =head2 Unit Testing
85              
86             The replaceable implementation is also useful for writing unit test scripts.
87             If the implementation is set to an instance of some sort of test fixture or
88             mocking object, a unit test can check that the appropriate IO operations
89             happen as part of the test.
90              
91             A testing module which does this is provided by L.
92              
93             =head2 Cancellation
94              
95             Any C returned by a C method should support being
96             cancelled by the L method. Doing so should not cause the
97             program to break overall, nor will it upset the specific implementation being
98             used.
99              
100             I, the result of cancelling a future instance that performs some
101             actual IO work is I. It may cause no work to be performed, or it
102             may result in a partial (or even complete but as-yet unreported) IO operation
103             to have already taken place. In particular, operations that write bytes to, or
104             read bytes from filehandles may have already transferred some or all of those
105             bytes before the future was cancelled, and there is now nothing that the
106             program can do to "undo" those effects. In general it is likely that the only
107             situation where you will cancel an IO operation on a filehandle is when an
108             entire connection is being abandoned, filehandles closed, and so on.
109              
110             That said, it should be safe to cancel L and L futures, as
111             each one will operate entirely independently, and not cause any change of
112             state to any of the others. This typically allows you to wrap a "timeout"-like
113             behaviour around any other sort of IO operation by using L
114             or similar. If the real IO operation was successful, the timeout can be safely
115             cancelled. If the timeout happens, the IO operation will be cancelled, and at
116             this point the application will have to discard the filehandle (or otherwise
117             resynchronse in some application-specific manner).
118              
119             =cut
120              
121             =head1 METHODS
122              
123             =cut
124              
125             =head2 accept
126              
127             $socketfh = await Future::IO->accept( $fh );
128              
129             I
130              
131             Returns a L that will become done when a new connection has been
132             accepted on the given filehandle, which should represent a listen-mode socket.
133             The returned future will yield the newly-accepted client socket filehandle.
134              
135             =cut
136              
137             sub accept
138             {
139 1     1 1 280011 shift;
140 1         3 my ( $fh ) = @_;
141              
142 1   50     23 return ( $IMPL //= "Future::IO::_DefaultImpl" )->accept( $fh );
143             }
144              
145             =head2 alarm
146              
147             await Future::IO->alarm( $epoch );
148              
149             I
150              
151             Returns a L that will become done at a fixed point in the future,
152             given as an epoch timestamp (such as returned by C). This value may be
153             fractional.
154              
155             =cut
156              
157             sub alarm
158             {
159 1     1 1 819 shift;
160 1         3 my ( $epoch ) = @_;
161              
162 1   50     4 $IMPL //= "Future::IO::_DefaultImpl";
163              
164 1 50       17 if( $IMPL->can( "alarm" ) ) {
165 1         41 return $IMPL->alarm( $epoch );
166             }
167             else {
168 0         0 return $IMPL->sleep( $epoch - Time::HiRes::time() );
169             }
170             }
171              
172             =head2 connect
173              
174             await Future::IO->connect( $fh, $name );
175              
176             I
177              
178             Returns a L that will become done when a C has succeeded on
179             the given filehandle to the given sockname address.
180              
181             =cut
182              
183             sub connect
184             {
185 2     2 1 292876 shift;
186 2         6 my ( $fh, $name ) = @_;
187              
188 2   100     32 return ( $IMPL //= "Future::IO::_DefaultImpl" )->connect( $fh, $name );
189             }
190              
191             =head2 poll
192              
193             $revents = await Future::IO->poll( $fh, $events );
194              
195             I
196              
197             Returns a L that will become done when the indicated IO operations
198             can be performed on the given filehandle. I<$events> should be a bitfield of
199             one or more of the POSIX C constants, such as C, C or
200             C. The result of the future will be a similar bitfield, indicating
201             which operations may now take place. If the C, C or
202             C events happen, they will always be reported; you do not need to
203             request these specifically. You should always request at least one of
204             C, C or C. Older versions of some implementation
205             modules did accept just C, but that is not supported any more.
206              
207             Multiple outstanding futures may be enqueued for the same filehandle. When an
208             event happens, only the first outstanding future that is interested in it is
209             informed; the rest will remain pending for the next round of IO events, if the
210             condition still prevails.
211              
212             Note that, as compared to the real C system call, this method only
213             operates on a single filehandle; all futures returned by it are independent
214             and refer just to that one filehandle each.
215              
216             Also note that, in general, it is better to use one of the higher-level
217             methods to perform whatver IO operation is required on the given filehandle.
218             The C method is largely intended as a lowest-level fallback, for example
219             if integrating with some other library or module that performs its own
220             filehandle IO and just needs to be informed when such IO operations may be
221             performed.
222              
223             For convenience, the C constants are exported by this module. They
224             should be used in preference to the ones from C, in case a platform
225             does not provide the latter module directly.
226              
227             =cut
228              
229             sub poll
230             {
231 14     14 1 248328 shift;
232 14         34 my ( $fh, $events ) = @_;
233              
234 14   100     117 return ( $IMPL //= "Future::IO::_DefaultImpl" )->poll( $fh, $events );
235             }
236              
237             =head2 read
238              
239             $bytes = await Future::IO->read( $fh, $length );
240              
241             I Before this version this method used to be named
242             C, and still available via that alias.
243              
244             Returns a L that will become done when at least one byte can be read
245             from the given filehandle. It may return up to C<$length> bytes. On EOF, the
246             returned future will yield an empty list (or C in scalar context). On
247             any error (other than C / C which are ignored), the
248             future fails with a suitable error message.
249              
250             Note specifically this may perform only a single C call, and thus
251             is not guaranteed to actually return the full length.
252              
253             =cut
254              
255             sub read
256             {
257 12     12 1 690039 shift;
258 12         33 my ( $fh, $length ) = @_;
259              
260 12   100     123 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sysread( $fh, $length );
261             }
262              
263             *sysread = \&read;
264              
265             =head2 read_exactly
266              
267             $bytes = await Future::IO->read_exactly( $fh, $length );
268              
269             I Before this version this method used to be named
270             C, and still available via that alias.
271              
272             Returns a L that will become done when exactly the given number of
273             bytes have been read from the given filehandle. It returns exactly C<$length>
274             bytes. On EOF, the returned future will yield an empty list (or C in
275             scalar context), even if fewer bytes have already been obtained. These bytes
276             will be lost. On any error (other than C / C which are
277             ignored), the future fails with a suitable error message.
278              
279             This may make more than one C call.
280              
281             =cut
282              
283             sub read_exactly
284             {
285 2     2 1 155005 shift;
286 2         5 my ( $fh, $length ) = @_;
287              
288 2   100     13 $IMPL //= "Future::IO::_DefaultImpl";
289              
290 2 50       27 if( my $code = $IMPL->can( "sysread_exactly" ) ) {
291 0         0 return $IMPL->$code( $fh, $length );
292             }
293              
294 2         11 return _read_into_buffer( $IMPL, $fh, $length, \(my $buffer = '') );
295             }
296              
297             *sysread_exactly = \&read_exactly;
298              
299             sub _read_into_buffer
300             {
301 7     7   16 my ( $IMPL, $fh, $length, $bufref ) = @_;
302              
303             $IMPL->sysread( $fh, $length - length $$bufref )->then( sub {
304 7     7   408 my ( $more ) = @_;
305 7 100       17 return Future->done() if !defined $more; # EOF
306              
307 6         11 $$bufref .= $more;
308              
309 6 100       21 return Future->done( $$bufref ) if length $$bufref >= $length;
310 5         11 return _read_into_buffer( $IMPL, $fh, $length, $bufref );
311 7         29 });
312             }
313              
314             =head2 read_until_eof
315              
316             $f = Future::IO->read_until_eof( $fh );
317              
318             I Before this version this method used to be named
319             C, and still available via that alias.
320              
321             Returns a L that will become done when the given filehandle reaches
322             the EOF condition. The returned future will yield all of the bytes read up
323             until that point.
324              
325             =cut
326              
327             sub read_until_eof
328             {
329 8     8 1 168070 shift;
330 8         20 my ( $fh ) = @_;
331              
332 8   100     111 $IMPL //= "Future::IO::_DefaultImpl";
333              
334 8         283 return _read_until_eof( $IMPL, $fh, \(my $buffer = '') );
335             }
336              
337             *sysread_until_eof = \&read_until_eof;
338              
339             sub _read_until_eof
340             {
341 16     16   83 my ( $IMPL, $fh, $bufref ) = @_;
342              
343             $IMPL->sysread( $fh, $MAX_READLEN )->then( sub {
344 16     16   1205 my ( $more ) = @_;
345 16 100       131 return Future->done( $$bufref ) if !defined $more;
346              
347 8         27 $$bufref .= $more;
348 8         38 return _read_until_eof( $IMPL, $fh, $bufref );
349 16         337 });
350             }
351              
352             =head2 recv
353              
354             =head2 recvfrom
355              
356             $bytes = await Future::IO->recv( $fh, $length );
357             $bytes = await Future::IO->recv( $fh, $length, $flags );
358              
359             ( $bytes, $from ) = await Future::IO->recvfrom( $fh, $length );
360             ( $bytes, $from ) = await Future::IO->recvfrom( $fh, $length, $flags );
361              
362             I
363              
364             Returns a L that will become done when at least one byte is received
365             from the given filehandle (presumably a socket), by using a C or
366             C system call. On any error (other than C /
367             C which are ignored) the future fails with a suitable error
368             message.
369              
370             Optionally a flags bitmask in C<$flags> can be passed. If no flags are
371             required, the value may be zero. The C method additionally returns
372             the sender's address as a second result value; this is primarily used by
373             unconnected datagram sockets.
374              
375             =cut
376              
377             sub recv
378             {
379 4     4 1 210729 shift;
380 4         9 my ( $fh, $length, $flags ) = @_;
381              
382 4   100     35 return ( $IMPL //= "Future::IO::_DefaultImpl" )->recv( $fh, $length, $flags );
383             }
384              
385             sub recvfrom
386             {
387 4     4 1 2543 shift;
388 4         6 my ( $fh, $length, $flags ) = @_;
389              
390 4   50     22 return ( $IMPL //= "Future::IO::_DefaultImpl" )->recvfrom( $fh, $length, $flags );
391             }
392              
393             =head2 send
394              
395             $sent_len = await Future::IO->send( $fh, $bytes );
396             $sent_len = await Future::IO->send( $fh, $bytes, $flags );
397             $sent_len = await Future::IO->send( $fh, $bytes, $flags, $to );
398              
399             I
400              
401             Returns a L that will become done when at least one byte has been
402             sent to the given filehandle (presumably a socket), by using a C
403             system call. On any error (other than C / C which are
404             ignored) the future fails with a suitable error message.
405              
406             Optionally a flags bitmask in C<$flags> or a destination address in C<$to> can
407             also be passed. If no flags are required, the value may be zero. If C<$to> is
408             specified then a C system call is used instead.
409              
410             =cut
411              
412             sub send
413             {
414 5     5 1 238256 shift;
415 5         12 my ( $fh, $bytes, $flags, $to ) = @_;
416              
417 5   100     49 return ( $IMPL //= "Future::IO::_DefaultImpl" )->send( $fh, $bytes, $flags, $to );
418             }
419              
420             =head2 sleep
421              
422             await Future::IO->sleep( $secs );
423              
424             Returns a L that will become done a fixed delay from now, given in
425             seconds. This value may be fractional.
426              
427             =cut
428              
429             sub sleep
430             {
431 9     9 1 204107 shift;
432 9         27 my ( $secs ) = @_;
433              
434 9   100     71 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sleep( $secs );
435             }
436              
437             =head2 waitpid
438              
439             $wstatus = await Future::IO->waitpid( $pid );
440              
441             I
442              
443             Returns a L that will become done when the given child process
444             terminates. The future will yield the wait status of the child process.
445             This can be inspected by the usual bitshifting operations as per C<$?>:
446              
447             if( my $termsig = ($wstatus & 0x7f) ) {
448             say "Terminated with signal $termsig";
449             }
450             else {
451             my $exitcode = ($wstatus >> 8);
452             say "Terminated with exit code $exitcode";
453             }
454              
455             =cut
456              
457             sub waitpid
458             {
459 10     10 1 90 shift;
460 10         41 my ( $pid ) = @_;
461              
462 10   50     641 return ( $IMPL //= "Future::IO::_DefaultImpl" )->waitpid( $pid );
463             }
464              
465             =head2 write
466              
467             $written_len = await Future::IO->write( $fh, $bytes );
468              
469             I Before this version this method used to be named
470             C, and still available via that alias.
471              
472             Returns a L that will become done when at least one byte has been
473             written to the given filehandle. It may write up to all of the bytes. On any
474             error (other than C / C which are ignored) the future
475             fails with a suitable error message.
476              
477             Note specifically this may perform only a single C call, and thus
478             is not guaranteed to actually return the full length.
479              
480             =cut
481              
482             sub write
483             {
484 10     10 1 244023 shift;
485 10         25 my ( $fh, $bytes ) = @_;
486              
487 10   100     70 return ( $IMPL //= "Future::IO::_DefaultImpl" )->syswrite( $fh, $bytes );
488             }
489              
490             *syswrite = \&write;
491              
492             =head2 write_exactly
493              
494             $written_len = await Future::IO->write_exactly( $fh, $bytes );
495              
496             I Before this version this method used to be named
497             C, and still available via that alias.
498              
499             Returns a L that will become done when exactly the given bytes have
500             been written to the given filehandle. On any error (other than C /
501             C which are ignored) the future fails with a suitable error
502             message.
503              
504             This may make more than one C call.
505              
506             =cut
507              
508             sub write_exactly
509             {
510 3     3 1 196963 shift;
511 3         53 my ( $fh, $bytes ) = @_;
512              
513 3   100     40 $IMPL //= "Future::IO::_DefaultImpl";
514              
515 3 50       251 if( my $code = $IMPL->can( "syswrite_exactly" ) ) {
516 0         0 return $IMPL->$code( $fh, $bytes );
517             }
518              
519 3         44 return _write_from_buffer( $IMPL, $fh, \$bytes, length $bytes );
520             }
521              
522             *syswrite_exactly = \&write_exactly;
523              
524             sub _write_from_buffer
525             {
526 5     5   25 my ( $IMPL, $fh, $bufref, $len ) = @_;
527              
528             $IMPL->syswrite( $fh, substr $$bufref, 0, $MAX_WRITELEN )->then( sub {
529 5     5   220 my ( $written_len ) = @_;
530 5         14 substr $$bufref, 0, $written_len, "";
531              
532 5 100       58 return Future->done( $len ) if !length $$bufref;
533 2         4 return _write_from_buffer( $IMPL, $fh, $bufref, $len );
534 5         141 });
535             }
536              
537             =head2 override_impl
538              
539             Future::IO->override_impl( $impl );
540              
541             Sets a new implementation for C, replacing the minimal default
542             internal implementation. This can either be a package name or an object
543             instance reference, but must provide the methods named above.
544              
545             This method is intended to be called by event loops and other similar places,
546             to provide a better integration. Another way, which doesn't involve directly
547             depending on C or loading it, is to use the C<$IMPL> variable; see
548             below.
549              
550             Can only be called once, and only if the default implementation is not in use,
551             therefore a module that wishes to override this ought to invoke it as soon as
552             possible on program startup, before any of the main C methods may
553             have been called.
554              
555             =cut
556              
557             my $overridden;
558              
559             sub override_impl
560             {
561 1     1 1 167145 shift;
562 1 50       4 croak "Future::IO implementation is already overridden" if defined $IMPL;
563 1 50 33     7 croak "Future::IO implementation cannot be set once default is already in use"
564             if @pollers or @alarms;
565              
566 1         3 ( $IMPL ) = @_;
567             }
568              
569             sub try_load_impl
570             {
571 0     0 0 0 shift;
572 0         0 my ( $name ) = @_;
573              
574 0 0       0 $name =~ m/::/ or $name = "Future::IO::Impl::$name";
575 0         0 my $module = "$name.pm" =~ s{::}{/}gr;
576              
577 0 0       0 eval { require $module } or return 0;
  0         0  
578 0 0       0 $name->can( "poll" ) or return 0;
579             # TODO: Consider some sort of API version check
580              
581 0         0 return 1;
582             }
583              
584             =head2 load_impl
585              
586             Future::IO->load_impl( @names );
587              
588             I
589              
590             Given a list of possible implementation module names, iterates through them
591             attempting to load each one until a suitable module is found. Any errors
592             encountered while loading each are ignored. If no module is found to be
593             suitable, an exception is thrown that likely aborts the program.
594              
595             C<@names> should contain a list of Perl module names (which likely live in the
596             C prefix). If any name does not contain a C<::>
597             separator, it will have that prefix applied to it. This allows a conveniently
598             short list; e.g.
599              
600             Future::IO->load_impl( qw( UV Glib Ppoll ) );
601              
602             This method is intended to be called once, at startup, by the main containing
603             program. Since it sets the implementation, it would generally be considered
604             inappropriate to invoke this method from some additional module that might be
605             loaded by a containing program.
606              
607             This is now discouraged, in favour of letting C decide instead by
608             using L.
609              
610             =cut
611              
612             sub load_impl
613             {
614 0     0 1 0 shift;
615              
616 0         0 foreach ( @_ ) {
617 0 0       0 Future::IO->try_load_impl( $_ ) and return;
618             }
619 0         0 die "Unable to find a usable Future::IO::Impl subclass\n";
620             }
621              
622             =head2 load_best_impl
623              
624             Future::IO->load_best_impl();
625              
626             I
627              
628             Attempt to work out and load an implementation module.
629              
630             In most situations, most programs don't really care what specific
631             implementation module they use, if they aren't already committed to some other
632             event system and also using C alongside it. This method allows
633             programs to offload the decision-making about which specific implementations
634             to try to load, to C itself.
635              
636             This method works by attempting a few different strategies to determine the
637             "best" implementation to use. It maintains a list of the currently-known CPAN
638             modules which provide implementations, and attempts them in a given preference
639             order.
640              
641             The environment variable C offers further control of the
642             behaviour of this method. Its value should be a comma-separated list of
643             implementation names to be attempted, in preference to any of the others.
644             Names prefixed with a hyphen will be skipped entirely by any attempt.
645              
646             =over 4
647              
648             =item 1.
649              
650             First, if C is set, any of the names given are tried, in
651             order.
652              
653             =item 2.
654              
655             Then any of the modules that attempt to wrap other event systems such as L
656             or L are attempted if it is detected that the other event system is
657             already loaded.
658              
659             =item 3.
660              
661             If none of these were successful, next it attempts any OS-specific modules
662             based on the OS name (given by C<$^O>).
663              
664             =item 4.
665              
666             Finally, a list of other generic modules is attempted, which also includes
667             any of the wrapper implementations that can be started independently.
668              
669             =back
670              
671             For more details, consult the implementation code in this module to find the
672             current list of known modules and the order they are attempted in.
673              
674             =cut
675              
676             # Try to account for every Future::IO::Impl::* module on CPAN
677             #
678             # Purposely omitting:
679             # Future::IO::Impl::Tickit - requires a $tickit instance to work
680              
681             my @IMPLS_WRAPPER = (
682             "UV",
683             "Glib",
684             [ IOAsync => "IO::Async::Loop" ],
685             "POE",
686             "AnyEvent",
687             );
688              
689             my %IMPLS_FOR_OS = (
690             freebsd => [qw( KQueue )], # TODO and probably other BSDs
691             linux => [qw( Uring )],
692             # TODO: other OSes?
693             );
694              
695             my @IMPLS_GENERIC = (qw(
696             Ppoll
697             UV
698             Glib
699             ));
700              
701             sub load_best_impl
702             {
703 0     0 1 0 shift;
704              
705 0         0 my @prefer;
706             my %veto;
707              
708 0   0     0 foreach ( split m/,/, $ENV{PERL_FUTURE_IO_IMPL} // "" ) {
709 0 0       0 if( s/^-// ) {
710 0         0 $veto{$_} = 1;
711             }
712             else {
713 0         0 push @prefer, $_;
714             }
715             }
716              
717 0         0 foreach my $impl ( @prefer ) {
718 0 0       0 $veto{$impl} and next;
719 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
720             }
721              
722             # First, load a wrapper impl if the wrapped system is already loaded
723 0         0 foreach ( @IMPLS_WRAPPER ) {
724 0 0       0 my ( $impl, $package ) = ref $_ ? @$_ : ( $_, $_ );
725 0 0       0 $veto{$impl} and next;
726 0 0       0 eval { $package->VERSION(0) } or next;
  0         0  
727              
728 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
729             }
730              
731             # OK, maybe we can find a good impl for this particular OS
732 0 0       0 foreach my $impl ( @{ $IMPLS_FOR_OS{$^O} || [] } ) {
  0         0  
733 0 0       0 $veto{$impl} and next;
734 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
735             }
736              
737             # Failing all of that, try the generic ones
738 0         0 foreach my $impl ( @IMPLS_GENERIC ) {
739 0 0       0 $veto{$impl} and next;
740 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
741             }
742              
743 0         0 die "Unable to find a usable Future::IO::Impl subclass\n";
744             }
745              
746             =head2 HAVE_MULTIPLE_FILEHANDLES
747              
748             $has = Future::IO->HAVE_MULTIPLE_FILEHANDLES;
749              
750             I
751              
752             Returns true if the underlying IO implementation actually supports multiple
753             filehandles. The default minimal internal implementation used not to support
754             this, but I it now does; so this method always returns
755             true.
756              
757             =cut
758              
759             sub HAVE_MULTIPLE_FILEHANDLES
760             {
761 0   0 0 1 0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->HAVE_MULTIPLE_FILEHANDLES;
762             }
763              
764             package
765             Future::IO::_DefaultImpl;
766 21     21   200 use base qw( Future::IO::ImplBase );
  21         41  
  21         13174  
767 21     21   168 use Carp;
  21         41  
  21         1590  
768              
769 21     21   140 use IO::Poll qw( POLLIN POLLOUT POLLPRI );
  21         58  
  21         1613  
770 21     21   145 use Struct::Dumb qw( readonly_struct );
  21         57  
  21         125  
771 21     21   1713 use Time::HiRes qw( time );
  21         39  
  21         124  
772              
773             readonly_struct Poller => [qw( fh events f )];
774             readonly_struct Alarm => [qw( time f )];
775              
776 21     21   2134 use constant HAVE_MULTIPLE_FILEHANDLES => 1;
  21         40  
  21         11336  
777              
778             sub alarm
779             {
780 1     1   4 my $class = shift;
781 1         4 return $class->_done_at( shift );
782             }
783              
784             sub sleep
785             {
786 7     7   15 my $class = shift;
787 7         40 return $class->_done_at( time() + shift );
788             }
789              
790             sub poll
791             {
792 163     163   267 my $class = shift;
793 163         287 my ( $fh, $events ) = @_;
794              
795 163         805 my $f = Future::IO::_DefaultImpl::F->new;
796              
797 163         1741 push @pollers, Poller( $fh, $events, $f );
798              
799             $f->on_cancel( sub {
800 10     10   1674 my $f = shift;
801              
802 10         16 my $idx = 0;
803 10   33     266 $idx++ while $idx < @pollers and $pollers[$idx]->f != $f;
804              
805 10         101 splice @pollers, $idx, 1, ();
806 163         10939 });
807              
808 163         4418 return $f;
809             }
810              
811             sub waitpid
812             {
813 0     0   0 croak "This implementation cannot handle waitpid";
814             }
815              
816             sub _done_at
817             {
818 8     8   15 shift;
819 8         19 my ( $time ) = @_;
820              
821 8         39 my $f = Future::IO::_DefaultImpl::F->new;
822              
823             # TODO: Binary search
824 8         74 my $idx = 0;
825 8   66     83 $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
826              
827 8         48 splice @alarms, $idx, 0, Alarm( $time, $f );
828              
829             $f->on_cancel( sub {
830 2     2   165 my $self = shift;
831              
832 2         5 my $idx = 0;
833 2   33     54 $idx++ while $idx < @alarms and $alarms[$idx]->f != $f;
834              
835 2         25 splice @alarms, $idx, 1, ();
836 8         553 } );
837              
838 8         226 return $f;
839             }
840              
841             package # hide
842             Future::IO::_DefaultImpl::F;
843 21     21   195 use base qw( Future );
  21         44  
  21         15775  
844 21     21   296497 use IO::Poll qw( POLLIN POLLOUT POLLPRI );
  21         46  
  21         1592  
845 21     21   131 use Time::HiRes qw( time );
  21         41  
  21         151  
846              
847             sub _await_once
848             {
849 47 50 66 47   630 die "Cowardly refusing to sit idle and do nothing" unless @pollers || @alarms;
850              
851 47         99 my $rvec = '';
852 47         130 my $wvec = '';
853 47         79 my $evec = '';
854              
855 47         105 foreach my $p ( @pollers ) {
856 49         1141 my $fileno = $p->fh->fileno;
857              
858 49 100       10238 vec( $rvec, $fileno, 1 ) = 1 if $p->events & POLLIN;
859 49 100       1122 vec( $wvec, $fileno, 1 ) = 1 if $p->events & POLLOUT;
860 49 100       1082 vec( $evec, $fileno, 1 ) = 1 if $p->events & POLLPRI;
861             }
862              
863             # If we always select() then problematic platforms like MSWin32 would
864             # always break. Instead, we'll only select() if we're waiting on alarms, or
865             # both POLLIN and POLLOUT, or POLLPRI. If not we'll just presume the one
866             # operation we're waiting for is definitely ready right now.
867 47   100     514 my $do_select = @alarms ||
868             ( $rvec ne '' and $wvec ne '' ) ||
869             ( $evec ne '' );
870              
871 47 100       122 if( $do_select ) {
872 13         23 my $maxwait;
873 13 100       257 $maxwait = $alarms[0]->time - time() if @alarms;
874              
875 13         1200380 my $ret = select( $rvec, $wvec, $evec, $maxwait );
876             }
877             # else just presume it's ready
878              
879             # Perl doesn't have an easy construction for iterating an array possibly
880             # splicing as you go...
881 47         251 for ( my $idx = 0; $idx < @pollers; ) {
882 157         330 my $p = $pollers[$idx];
883              
884 157         4282 my $fh = $p->fh;
885 157         1449 my $fileno = $fh->fileno;
886              
887 157         175212 my $was_blocking;
888 157 100       1153 $was_blocking = $fh->blocking(1) if !$do_select;
889              
890 157         183969 my $revents = 0;
891 157 100       580 $revents |= POLLIN if vec( $rvec, $fileno, 1 );
892 157 100       388 $revents |= POLLOUT if vec( $wvec, $fileno, 1 );
893 157 50       410 $revents |= POLLPRI if vec( $evec, $fileno, 1 );
894 157         5140 $revents &= $p->events;
895              
896 157 100       1088 $revents or $idx++, next;
897              
898 151         335 splice @pollers, $idx, 1, ();
899 151         3438 $p->f->done( $revents );
900              
901 151 100 100     6109 $fh->blocking(0) if !$do_select and !$was_blocking;
902             }
903              
904 47         234 my $now = time();
905 47   100     770 while( @alarms and $alarms[0]->time <= $now ) {
906 6         318 ( shift @alarms )->f->done;
907             }
908             }
909              
910             =head2 await
911              
912             $f = $f->await;
913              
914             I
915              
916             Blocks until this future is ready (either by success or failure). Does not
917             throw an exception if failed.
918              
919             =cut
920              
921             sub await
922             {
923 46     46   4795 my $self = shift;
924 46         168 _await_once until $self->is_ready;
925 46         870 return $self;
926             }
927              
928             =head1 THE C<$IMPL> VARIABLE
929              
930             I
931              
932             As an alternative to setting an implementation by using L, a
933             package variable is also available that allows modules such as event systems
934             to opportunistically provide an implementation without needing to depend on
935             the module, or loading it C. Simply directly set that package
936             variable to the name of an implementing package or an object instance.
937              
938             Additionally, implementors may use a name within the C
939             namespace, suffixed by the name of their event system.
940              
941             For example, something like the following code arrangement is recommended.
942              
943             package Future::IO::Impl::BananaLoop;
944              
945             {
946             no warnings 'once';
947             ( $Future::IO::IMPL //= __PACKAGE__ ) eq __PACKAGE__ or
948             warn "Unable to set Future::IO implementation to " . __PACKAGE__ .
949             " as it is already $Future::IO::IMPL\n";
950             }
951              
952             sub poll
953             {
954             ...
955             }
956              
957             sub sleep
958             {
959             ...
960             }
961              
962             sub sysread
963             {
964             ...
965             }
966              
967             sub syswrite
968             {
969             ...
970             }
971              
972             sub waitpid
973             {
974             ...
975             }
976              
977             Optionally, you can also implement L and
978             L:
979              
980             sub sysread_exactly
981             {
982             ...
983             }
984              
985             sub syswrite_exactly
986             {
987             ...
988             }
989              
990             If not, they will be emulated by C itself, making multiple calls
991             to the non-C<_exactly> versions.
992              
993             =head1 AUTHOR
994              
995             Paul Evans
996              
997             =cut
998              
999             0x55AA;