File Coverage

blib/lib/Future/IO.pm
Criterion Covered Total %
statement 178 218 81.6
branch 35 74 47.3
condition 40 56 71.4
subroutine 44 49 89.8
pod 18 19 94.7
total 315 416 75.7


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2026 -- leonerd@leonerd.org.uk
5              
6             package Future::IO 0.23;
7              
8 22     22   4645905 use v5.14;
  22         76  
9 22     22   163 use warnings;
  22         51  
  22         1145  
10              
11 22     22   110 use Carp;
  22         40  
  22         2283  
12              
13             # These need to be visible to sub override_impl
14             my @pollers;
15             my @alarms;
16              
17             our $IMPL;
18              
19             our $MAX_READLEN = 8192;
20             our $MAX_WRITELEN = 8192;
21              
22 22     22   6625 use IO::Poll qw( POLLIN POLLOUT POLLPRI POLLHUP POLLERR POLLNVAL );
  22         40205  
  22         1859  
23              
24 22     22   160 use Exporter 'import';
  22         47  
  22         1161  
25             BEGIN {
26             # This needs to happen at BEGIN time because stupid cyclic reasons
27 22     22   40175 our @EXPORT_OK = qw( POLLIN POLLOUT POLLPRI POLLHUP POLLERR POLLNVAL );
28             }
29              
30             =head1 NAME
31              
32             C - Future-returning IO methods
33              
34             =head1 SYNOPSIS
35              
36             =for highlighter language=perl
37              
38             use Future::IO;
39             Future::IO->load_best_impl;
40              
41             my $delay = Future::IO->sleep( 5 );
42             # $delay will become done in 5 seconds time
43              
44             my $input = Future::IO->read( \*STDIN, 4096 );
45             # $input will yield some input from the STDIN IO handle
46              
47             =head1 DESCRIPTION
48              
49             This package provides a few basic methods that behave similarly to the
50             same-named core perl functions relating to IO operations, but yield their
51             results asynchronously via L instances.
52              
53             This is provided primarily as a decoupling mechanism, to allow modules to be
54             written that perform IO in an asynchronous manner to depend directly on this,
55             while allowing asynchronous event systems to provide an implementation of
56             these operations.
57              
58             =head2 Default Implementation
59              
60             If the C method is not invoked, a default implementation of
61             these operations is provided. This implementation allows a single queue of
62             C or C calls on a single filehandle only, combined with C
63             calls. It does not support C.
64              
65             It is provided for the simple cases where modules only need one filehandle
66             (most likely a single network socket or hardware device handle), allowing such
67             modules to work without needing a better event system.
68              
69             If there are both read/write and C futures pending, the implementation
70             will use C to wait for either. This may be problematic on MSWin32,
71             depending on what type of filehandle is involved.
72              
73             If C is not being used then the default implementation will
74             temporarily set filehandles into blocking mode (by switching off the
75             C flag) while performing IO on them.
76              
77             For cases where multiple filehandles are required, or for doing more involved
78             IO operations, a real implementation based on an actual event loop should be
79             loaded. It is recommended to use the L method to do this, if
80             there are no other specific requirements. If the program is already using some
81             other event system, such as L or L, it is best to directly load
82             the relevant implementation module in the toplevel program.
83              
84             =head2 Unit Testing
85              
86             The replaceable implementation is also useful for writing unit test scripts.
87             If the implementation is set to an instance of some sort of test fixture or
88             mocking object, a unit test can check that the appropriate IO operations
89             happen as part of the test.
90              
91             A testing module which does this is provided by L.
92              
93             =head2 Cancellation
94              
95             Any C returned by a C method should support being
96             cancelled by the L method. Doing so should not cause the
97             program to break overall, nor will it upset the specific implementation being
98             used.
99              
100             I, the result of cancelling a future instance that performs some
101             actual IO work is I. It may cause no work to be performed, or it
102             may result in a partial (or even complete but as-yet unreported) IO operation
103             to have already taken place. In particular, operations that write bytes to, or
104             read bytes from filehandles may have already transferred some or all of those
105             bytes before the future was cancelled, and there is now nothing that the
106             program can do to "undo" those effects. In general it is likely that the only
107             situation where you will cancel an IO operation on a filehandle is when an
108             entire connection is being abandoned, filehandles closed, and so on.
109              
110             That said, it should be safe to cancel L and L futures, as
111             each one will operate entirely independently, and not cause any change of
112             state to any of the others. This typically allows you to wrap a "timeout"-like
113             behaviour around any other sort of IO operation by using L
114             or similar. If the real IO operation was successful, the timeout can be safely
115             cancelled. If the timeout happens, the IO operation will be cancelled, and at
116             this point the application will have to discard the filehandle (or otherwise
117             resynchronse in some application-specific manner).
118              
119             =cut
120              
121             =head1 METHODS
122              
123             =cut
124              
125             =head2 accept
126              
127             $socketfh = await Future::IO->accept( $fh );
128              
129             I
130              
131             Returns a L that will become done when a new connection has been
132             accepted on the given filehandle, which should represent a listen-mode socket.
133             The returned future will yield the newly-accepted client socket filehandle.
134              
135             =cut
136              
137             sub accept
138             {
139 1     1 1 160557 shift;
140 1         2 my ( $fh ) = @_;
141              
142 1   50     16 return ( $IMPL //= "Future::IO::_DefaultImpl" )->accept( $fh );
143             }
144              
145             =head2 alarm
146              
147             await Future::IO->alarm( $epoch );
148              
149             I
150              
151             Returns a L that will become done at a fixed point in the future,
152             given as an epoch timestamp (such as returned by C). This value may be
153             fractional.
154              
155             =cut
156              
157             sub alarm
158             {
159 1     1 1 714 shift;
160 1         3 my ( $epoch ) = @_;
161              
162 1   50     3 $IMPL //= "Future::IO::_DefaultImpl";
163              
164 1 50       75 if( $IMPL->can( "alarm" ) ) {
165 1         7 return $IMPL->alarm( $epoch );
166             }
167             else {
168 0         0 return $IMPL->sleep( $epoch - Time::HiRes::time() );
169             }
170             }
171              
172             =head2 connect
173              
174             await Future::IO->connect( $fh, $name );
175              
176             I
177              
178             Returns a L that will become done when a C has succeeded on
179             the given filehandle to the given sockname address.
180              
181             =cut
182              
183             sub connect
184             {
185 2     2 1 389834 shift;
186 2         10 my ( $fh, $name ) = @_;
187              
188 2   100     43 return ( $IMPL //= "Future::IO::_DefaultImpl" )->connect( $fh, $name );
189             }
190              
191             =head2 poll
192              
193             $revents = await Future::IO->poll( $fh, $events );
194              
195             I
196              
197             Returns a L that will become done when the indicated IO operations
198             can be performed on the given filehandle. I<$events> should be a bitfield of
199             one or more of the POSIX C constants, such as C, C or
200             C. The result of the future will be a similar bitfield, indicating
201             which operations may now take place. If the C, C or
202             C events happen, they will always be reported; you do not need to
203             request these specifically. You should always request at least one of
204             C, C or C. Older versions of some implementation
205             modules did accept just C, but that is not supported any more.
206              
207             Multiple outstanding futures may be enqueued for the same filehandle. When an
208             event happens, only the first outstanding future that is interested in it is
209             informed; the rest will remain pending for the next round of IO events, if the
210             condition still prevails.
211              
212             Note that, as compared to the real C system call, this method only
213             operates on a single filehandle; all futures returned by it are independent
214             and refer just to that one filehandle each.
215              
216             Also note that, in general, it is better to use one of the higher-level
217             methods to perform whatver IO operation is required on the given filehandle.
218             The C method is largely intended as a lowest-level fallback, for example
219             if integrating with some other library or module that performs its own
220             filehandle IO and just needs to be informed when such IO operations may be
221             performed.
222              
223             For convenience, the C constants are exported by this module. They
224             should be used in preference to the ones from C, in case a platform
225             does not provide the latter module directly.
226              
227             =cut
228              
229             sub poll
230             {
231 18     18 1 204142 shift;
232 18         30 my ( $fh, $events ) = @_;
233              
234 18   100     92 return ( $IMPL //= "Future::IO::_DefaultImpl" )->poll( $fh, $events );
235             }
236              
237             =head2 read
238              
239             $bytes = await Future::IO->read( $fh, $length );
240              
241             I Before this version this method used to be named
242             C, and still available via that alias.
243              
244             Returns a L that will become done when at least one byte can be read
245             from the given filehandle. It may return up to C<$length> bytes. On EOF, the
246             returned future will yield an empty list (or C in scalar context). On
247             any error (other than C / C which are ignored), the
248             future fails with a suitable error message.
249              
250             Note specifically this may perform only a single C call, and thus
251             is not guaranteed to actually return the full length.
252              
253             =cut
254              
255             sub read
256             {
257 12     12 1 624029 shift;
258 12         25 my ( $fh, $length ) = @_;
259              
260 12   100     103 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sysread( $fh, $length );
261             }
262              
263             *sysread = \&read;
264              
265             =head2 read_exactly
266              
267             $bytes = await Future::IO->read_exactly( $fh, $length );
268              
269             I Before this version this method used to be named
270             C, and still available via that alias.
271              
272             Returns a L that will become done when exactly the given number of
273             bytes have been read from the given filehandle. It returns exactly C<$length>
274             bytes. On EOF, the returned future will yield an empty list (or C in
275             scalar context), even if fewer bytes have already been obtained. These bytes
276             will be lost. On any error (other than C / C which are
277             ignored), the future fails with a suitable error message.
278              
279             This may make more than one C call.
280              
281             =cut
282              
283             sub read_exactly
284             {
285 2     2 1 273303 shift;
286 2         7 my ( $fh, $length ) = @_;
287              
288 2   100     17 $IMPL //= "Future::IO::_DefaultImpl";
289              
290 2 50       31 if( my $code = $IMPL->can( "sysread_exactly" ) ) {
291 0         0 return $IMPL->$code( $fh, $length );
292             }
293              
294 2         9 return _read_into_buffer( $IMPL, $fh, $length, \(my $buffer = '') );
295             }
296              
297             *sysread_exactly = \&read_exactly;
298              
299             sub _read_into_buffer
300             {
301 7     7   18 my ( $IMPL, $fh, $length, $bufref ) = @_;
302              
303             $IMPL->sysread( $fh, $length - length $$bufref )->then( sub {
304 7     7   623 my ( $more ) = @_;
305 7 100       28 return Future->done() if !defined $more; # EOF
306              
307 6         15 $$bufref .= $more;
308              
309 6 100       44 return Future->done( $$bufref ) if length $$bufref >= $length;
310 5         17 return _read_into_buffer( $IMPL, $fh, $length, $bufref );
311 7         46 });
312             }
313              
314             =head2 read_until_eof
315              
316             $f = Future::IO->read_until_eof( $fh );
317              
318             I Before this version this method used to be named
319             C, and still available via that alias.
320              
321             Returns a L that will become done when the given filehandle reaches
322             the EOF condition. The returned future will yield all of the bytes read up
323             until that point.
324              
325             =cut
326              
327             sub read_until_eof
328             {
329 8     8 1 150140 shift;
330 8         126 my ( $fh ) = @_;
331              
332 8   100     167 $IMPL //= "Future::IO::_DefaultImpl";
333              
334 8         228 return _read_until_eof( $IMPL, $fh, \(my $buffer = '') );
335             }
336              
337             *sysread_until_eof = \&read_until_eof;
338              
339             sub _read_until_eof
340             {
341 16     16   54 my ( $IMPL, $fh, $bufref ) = @_;
342              
343             $IMPL->sysread( $fh, $MAX_READLEN )->then( sub {
344 16     16   922 my ( $more ) = @_;
345 16 100       104 return Future->done( $$bufref ) if !defined $more;
346              
347 8         24 $$bufref .= $more;
348 8         101 return _read_until_eof( $IMPL, $fh, $bufref );
349 16         565 });
350             }
351              
352             =head2 recv
353              
354             =head2 recvfrom
355              
356             $bytes = await Future::IO->recv( $fh, $length );
357             $bytes = await Future::IO->recv( $fh, $length, $flags );
358              
359             ( $bytes, $from ) = await Future::IO->recvfrom( $fh, $length );
360             ( $bytes, $from ) = await Future::IO->recvfrom( $fh, $length, $flags );
361              
362             I
363              
364             Returns a L that will become done when at least one byte is received
365             from the given filehandle (presumably a socket), by using a C or
366             C system call. On any error (other than C /
367             C which are ignored) the future fails with a suitable error
368             message.
369              
370             Optionally a flags bitmask in C<$flags> can be passed. If no flags are
371             required, the value may be zero. The C method additionally returns
372             the sender's address as a second result value; this is primarily used by
373             unconnected datagram sockets.
374              
375             =cut
376              
377             sub recv
378             {
379 4     4 1 238324 shift;
380 4         11 my ( $fh, $length, $flags ) = @_;
381              
382 4   100     45 return ( $IMPL //= "Future::IO::_DefaultImpl" )->recv( $fh, $length, $flags );
383             }
384              
385             sub recvfrom
386             {
387 4     4 1 4006 shift;
388 4         10 my ( $fh, $length, $flags ) = @_;
389              
390 4   50     37 return ( $IMPL //= "Future::IO::_DefaultImpl" )->recvfrom( $fh, $length, $flags );
391             }
392              
393             =head2 send
394              
395             $sent_len = await Future::IO->send( $fh, $bytes );
396             $sent_len = await Future::IO->send( $fh, $bytes, $flags );
397             $sent_len = await Future::IO->send( $fh, $bytes, $flags, $to );
398              
399             I
400              
401             Returns a L that will become done when at least one byte has been
402             sent to the given filehandle (presumably a socket), by using a C
403             system call. On any error (other than C / C which are
404             ignored) the future fails with a suitable error message.
405              
406             Optionally a flags bitmask in C<$flags> or a destination address in C<$to> can
407             also be passed. If no flags are required, the value may be zero. If C<$to> is
408             specified then a C system call is used instead.
409              
410             =cut
411              
412             sub send
413             {
414 5     5 1 252448 shift;
415 5         14 my ( $fh, $bytes, $flags, $to ) = @_;
416              
417 5   100     46 return ( $IMPL //= "Future::IO::_DefaultImpl" )->send( $fh, $bytes, $flags, $to );
418             }
419              
420             =head2 sleep
421              
422             await Future::IO->sleep( $secs );
423              
424             Returns a L that will become done a fixed delay from now, given in
425             seconds. This value may be fractional.
426              
427             =cut
428              
429             sub sleep
430             {
431 9     9 1 152191 shift;
432 9         22 my ( $secs ) = @_;
433              
434 9   100     65 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sleep( $secs );
435             }
436              
437             =head2 waitpid
438              
439             $wstatus = await Future::IO->waitpid( $pid );
440              
441             I
442              
443             Returns a L that will become done when the given child process
444             terminates. The future will yield the wait status of the child process.
445             This can be inspected by the usual bitshifting operations as per C<$?>:
446              
447             if( my $termsig = ($wstatus & 0x7f) ) {
448             say "Terminated with signal $termsig";
449             }
450             else {
451             my $exitcode = ($wstatus >> 8);
452             say "Terminated with exit code $exitcode";
453             }
454              
455             =cut
456              
457             sub waitpid
458             {
459 10     10 1 95 shift;
460 10         99 my ( $pid ) = @_;
461              
462 10   50     902 return ( $IMPL //= "Future::IO::_DefaultImpl" )->waitpid( $pid );
463             }
464              
465             =head2 write
466              
467             $written_len = await Future::IO->write( $fh, $bytes );
468              
469             I Before this version this method used to be named
470             C, and still available via that alias.
471              
472             Returns a L that will become done when at least one byte has been
473             written to the given filehandle. It may write up to all of the bytes. On any
474             error (other than C / C which are ignored) the future
475             fails with a suitable error message.
476              
477             Note specifically this may perform only a single C call, and thus
478             is not guaranteed to actually return the full length.
479              
480             =cut
481              
482             sub write
483             {
484 10     10 1 288209 shift;
485 10         27 my ( $fh, $bytes ) = @_;
486              
487 10   100     94 return ( $IMPL //= "Future::IO::_DefaultImpl" )->syswrite( $fh, $bytes );
488             }
489              
490             *syswrite = \&write;
491              
492             =head2 write_exactly
493              
494             $written_len = await Future::IO->write_exactly( $fh, $bytes );
495              
496             I Before this version this method used to be named
497             C, and still available via that alias.
498              
499             Returns a L that will become done when exactly the given bytes have
500             been written to the given filehandle. On any error (other than C /
501             C which are ignored) the future fails with a suitable error
502             message.
503              
504             This may make more than one C call.
505              
506             =cut
507              
508             sub write_exactly
509             {
510 3     3 1 242617 shift;
511 3         69 my ( $fh, $bytes ) = @_;
512              
513 3   100     117 $IMPL //= "Future::IO::_DefaultImpl";
514              
515 3 50       348 if( my $code = $IMPL->can( "syswrite_exactly" ) ) {
516 0         0 return $IMPL->$code( $fh, $bytes );
517             }
518              
519 3         102 return _write_from_buffer( $IMPL, $fh, \$bytes, length $bytes );
520             }
521              
522             *syswrite_exactly = \&write_exactly;
523              
524             sub _write_from_buffer
525             {
526 5     5   39 my ( $IMPL, $fh, $bufref, $len ) = @_;
527              
528             $IMPL->syswrite( $fh, substr $$bufref, 0, $MAX_WRITELEN )->then( sub {
529 5     5   371 my ( $written_len ) = @_;
530 5         17 substr $$bufref, 0, $written_len, "";
531              
532 5 100       56 return Future->done( $len ) if !length $$bufref;
533 2         7 return _write_from_buffer( $IMPL, $fh, $bufref, $len );
534 5         237 });
535             }
536              
537             =head2 override_impl
538              
539             Future::IO->override_impl( $impl );
540              
541             Sets a new implementation for C, replacing the minimal default
542             internal implementation. This can either be a package name or an object
543             instance reference, but must provide the methods named above.
544              
545             This method is intended to be called by event loops and other similar places,
546             to provide a better integration. Another way, which doesn't involve directly
547             depending on C or loading it, is to use the C<$IMPL> variable; see
548             below.
549              
550             Can only be called once, and only if the default implementation is not in use,
551             therefore a module that wishes to override this ought to invoke it as soon as
552             possible on program startup, before any of the main C methods may
553             have been called.
554              
555             =cut
556              
557             my $overridden;
558              
559             sub override_impl
560             {
561 1     1 1 197163 shift;
562 1 50       6 croak "Future::IO implementation is already overridden" if defined $IMPL;
563 1 50 33     10 croak "Future::IO implementation cannot be set once default is already in use"
564             if @pollers or @alarms;
565              
566 1         25 ( $IMPL ) = @_;
567             }
568              
569             sub try_load_impl
570             {
571 0     0 0 0 shift;
572 0         0 my ( $name ) = @_;
573              
574 0 0       0 $name =~ m/::/ or $name = "Future::IO::Impl::$name";
575 0         0 my $module = "$name.pm" =~ s{::}{/}gr;
576              
577 0 0       0 eval { require $module } or return 0;
  0         0  
578 0 0       0 $name->can( "poll" ) or return 0;
579             # TODO: Consider some sort of API version check
580              
581 0         0 return 1;
582             }
583              
584             =head2 load_impl
585              
586             Future::IO->load_impl( @names );
587              
588             I
589              
590             Given a list of possible implementation module names, iterates through them
591             attempting to load each one until a suitable module is found. Any errors
592             encountered while loading each are ignored. If no module is found to be
593             suitable, an exception is thrown that likely aborts the program.
594              
595             C<@names> should contain a list of Perl module names (which likely live in the
596             C prefix). If any name does not contain a C<::>
597             separator, it will have that prefix applied to it. This allows a conveniently
598             short list; e.g.
599              
600             Future::IO->load_impl( qw( UV Glib Ppoll ) );
601              
602             This method is intended to be called once, at startup, by the main containing
603             program. Since it sets the implementation, it would generally be considered
604             inappropriate to invoke this method from some additional module that might be
605             loaded by a containing program.
606              
607             This is now discouraged, in favour of letting C decide instead by
608             using L.
609              
610             =cut
611              
612             sub load_impl
613             {
614 0     0 1 0 shift;
615              
616 0         0 foreach ( @_ ) {
617 0 0       0 Future::IO->try_load_impl( $_ ) and return;
618             }
619 0         0 die "Unable to find a usable Future::IO::Impl subclass\n";
620             }
621              
622             =head2 load_best_impl
623              
624             Future::IO->load_best_impl();
625              
626             I
627              
628             Attempt to work out and load an implementation module.
629              
630             In most situations, most programs don't really care what specific
631             implementation module they use, if they aren't already committed to some other
632             event system and also using C alongside it. This method allows
633             programs to offload the decision-making about which specific implementations
634             to try to load, to C itself.
635              
636             This method works by attempting a few different strategies to determine the
637             "best" implementation to use. It maintains a list of the currently-known CPAN
638             modules which provide implementations, and attempts them in a given preference
639             order.
640              
641             The environment variable C offers further control of the
642             behaviour of this method. Its value should be a comma-separated list of
643             implementation names to be attempted, in preference to any of the others.
644             Names prefixed with a hyphen will be skipped entirely by any attempt.
645              
646             =over 4
647              
648             =item 1.
649              
650             First, if C is set, any of the names given are tried, in
651             order.
652              
653             =item 2.
654              
655             Then any of the modules that attempt to wrap other event systems such as L
656             or L are attempted if it is detected that the other event system is
657             already loaded.
658              
659             =item 3.
660              
661             If none of these were successful, next it attempts any OS-specific modules
662             based on the OS name (given by C<$^O>).
663              
664             =item 4.
665              
666             Finally, a list of other generic modules is attempted, which also includes
667             any of the wrapper implementations that can be started independently.
668              
669             =back
670              
671             For more details, consult the implementation code in this module to find the
672             current list of known modules and the order they are attempted in.
673              
674             =cut
675              
676             # Try to account for every Future::IO::Impl::* module on CPAN
677             #
678             # Purposely omitting:
679             # Future::IO::Impl::Tickit - requires a $tickit instance to work
680              
681             my @IMPLS_WRAPPER = (
682             "UV",
683             "Glib",
684             [ IOAsync => "IO::Async::Loop" ],
685             "POE",
686             "AnyEvent",
687             );
688              
689             my %IMPLS_FOR_OS = (
690             darwin => [qw( KQueue )],
691             freebsd => [qw( KQueue )], # TODO and probably other BSDs
692             linux => [qw( Uring )],
693             openbsd => [qw( KQueue )],
694             # TODO: other OSes?
695             );
696              
697             my @IMPLS_GENERIC = (qw(
698             Ppoll
699             UV
700             Glib
701             ));
702              
703             sub load_best_impl
704             {
705 0     0 1 0 shift;
706              
707 0         0 my @prefer;
708             my %veto;
709              
710 0   0     0 foreach ( split m/,/, $ENV{PERL_FUTURE_IO_IMPL} // "" ) {
711 0 0       0 if( s/^-// ) {
712 0         0 $veto{$_} = 1;
713             }
714             else {
715 0         0 push @prefer, $_;
716             }
717             }
718              
719 0         0 foreach my $impl ( @prefer ) {
720 0 0       0 $veto{$impl} and next;
721 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
722             }
723              
724             # First, load a wrapper impl if the wrapped system is already loaded
725 0         0 foreach ( @IMPLS_WRAPPER ) {
726 0 0       0 my ( $impl, $package ) = ref $_ ? @$_ : ( $_, $_ );
727 0 0       0 $veto{$impl} and next;
728 0 0       0 eval { $package->VERSION(0) } or next;
  0         0  
729              
730 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
731             }
732              
733             # OK, maybe we can find a good impl for this particular OS
734 0 0       0 foreach my $impl ( @{ $IMPLS_FOR_OS{$^O} || [] } ) {
  0         0  
735 0 0       0 $veto{$impl} and next;
736 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
737             }
738              
739             # Failing all of that, try the generic ones
740 0         0 foreach my $impl ( @IMPLS_GENERIC ) {
741 0 0       0 $veto{$impl} and next;
742 0 0       0 Future::IO->try_load_impl( $impl ) and return 1;
743             }
744              
745 0         0 die "Unable to find a usable Future::IO::Impl subclass\n";
746             }
747              
748             =head2 HAVE_MULTIPLE_FILEHANDLES
749              
750             $has = Future::IO->HAVE_MULTIPLE_FILEHANDLES;
751              
752             I
753              
754             Returns true if the underlying IO implementation actually supports multiple
755             filehandles. The default minimal internal implementation used not to support
756             this, but I it now does; so this method always returns
757             true.
758              
759             =cut
760              
761             sub HAVE_MULTIPLE_FILEHANDLES
762             {
763 0   0 0 1 0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->HAVE_MULTIPLE_FILEHANDLES;
764             }
765              
766             package
767             Future::IO::_DefaultImpl;
768 22     22   173 use base qw( Future::IO::ImplBase );
  22         32  
  22         11839  
769 22     22   130 use Carp;
  22         29  
  22         1407  
770              
771 22     22   135 use IO::Poll qw( POLLIN POLLOUT POLLPRI );
  22         60  
  22         1127  
772 22     22   514 use Struct::Dumb qw( readonly_struct );
  22         7056  
  22         107  
773 22     22   1293 use Time::HiRes qw( time );
  22         44  
  22         92  
774              
775             readonly_struct Poller => [qw( fh events f )];
776             readonly_struct Alarm => [qw( time f )];
777              
778 22     22   2024 use constant HAVE_MULTIPLE_FILEHANDLES => 1;
  22         35  
  22         9315  
779              
780             sub alarm
781             {
782 1     1   2 my $class = shift;
783 1         4 return $class->_done_at( shift );
784             }
785              
786             sub sleep
787             {
788 7     7   12 my $class = shift;
789 7         29 return $class->_done_at( time() + shift );
790             }
791              
792             sub poll
793             {
794 167     167   230 my $class = shift;
795 167         254 my ( $fh, $events ) = @_;
796              
797 167         687 my $f = Future::IO::_DefaultImpl::F->new;
798              
799 167         1433 push @pollers, Poller( $fh, $events, $f );
800              
801             $f->on_cancel( sub {
802 10     10   1444 my $f = shift;
803              
804 10         20 my $idx = 0;
805 10   33     364 $idx++ while $idx < @pollers and $pollers[$idx]->f != $f;
806              
807 10         168 splice @pollers, $idx, 1, ();
808 167         9071 });
809              
810 167         3578 return $f;
811             }
812              
813             sub waitpid
814             {
815 0     0   0 croak "This implementation cannot handle waitpid";
816             }
817              
818             sub _done_at
819             {
820 8     8   9 shift;
821 8         13 my ( $time ) = @_;
822              
823 8         32 my $f = Future::IO::_DefaultImpl::F->new;
824              
825             # TODO: Binary search
826 8         80 my $idx = 0;
827 8   66     57 $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
828              
829 8         38 splice @alarms, $idx, 0, Alarm( $time, $f );
830              
831             $f->on_cancel( sub {
832 2     2   133 my $self = shift;
833              
834 2         5 my $idx = 0;
835 2   33     55 $idx++ while $idx < @alarms and $alarms[$idx]->f != $f;
836              
837 2         34 splice @alarms, $idx, 1, ();
838 8         465 } );
839              
840 8         186 return $f;
841             }
842              
843             package # hide
844             Future::IO::_DefaultImpl::F;
845 22     22   141 use base qw( Future );
  22         33  
  22         13755  
846 22     22   256674 use IO::Poll qw( POLLIN POLLOUT POLLPRI );
  22         34  
  22         1245  
847 22     22   100 use Time::HiRes qw( time );
  22         33  
  22         133  
848              
849             sub _await_once
850             {
851 159 50 66 159   1095 die "Cowardly refusing to sit idle and do nothing" unless @pollers || @alarms;
852              
853 159         258 my $rvec = '';
854 159         215 my $wvec = '';
855 159         206 my $evec = '';
856              
857 159         261 foreach my $p ( @pollers ) {
858 161         3512 my $fileno = $p->fh->fileno;
859              
860 161 100       117698 vec( $rvec, $fileno, 1 ) = 1 if $p->events & POLLIN;
861 161 100       3391 vec( $wvec, $fileno, 1 ) = 1 if $p->events & POLLOUT;
862 161 100       2921 vec( $evec, $fileno, 1 ) = 1 if $p->events & POLLPRI;
863             }
864              
865             # If we always select() then problematic platforms like MSWin32 would
866             # always break. Instead, we'll only select() if we're waiting on alarms, or
867             # both POLLIN and POLLOUT, or POLLPRI. If not we'll just presume the one
868             # operation we're waiting for is definitely ready right now.
869 159   100     1709 my $do_select = @alarms ||
870             ( $rvec ne '' and $wvec ne '' ) ||
871             ( $evec ne '' );
872              
873 159 100       322 if( $do_select ) {
874 13         43 my $maxwait;
875 13 100       240 $maxwait = $alarms[0]->time - time() if @alarms;
876              
877 13         1214806 my $ret = select( $rvec, $wvec, $evec, $maxwait );
878             }
879             # else just presume it's ready
880              
881             # Perl doesn't have an easy construction for iterating an array possibly
882             # splicing as you go...
883              
884 159         281 my $npollers = @pollers;
885             # Need to stop after the current set; ignoring any new that were pushed
886             # while running
887 159         487 for ( my $idx = 0; $idx < $npollers; ) {
888 161         230 my $p = $pollers[$idx];
889              
890 161         2468 my $fh = $p->fh;
891 161         1074 my $fileno = $fh->fileno;
892              
893 161         104392 my $was_blocking;
894 161 100       1073 $was_blocking = $fh->blocking(1) if !$do_select;
895              
896 161         104736 my $revents = 0;
897 161 100       552 $revents |= POLLIN if vec( $rvec, $fileno, 1 );
898 161 100       430 $revents |= POLLOUT if vec( $wvec, $fileno, 1 );
899 161 50       349 $revents |= POLLPRI if vec( $evec, $fileno, 1 );
900 161         3786 $revents &= $p->events;
901              
902 161 100       849 if( $revents ) {
903 155         274 splice @pollers, $idx, 1, ();
904 155         179 $npollers--;
905              
906 155         2456 $p->f->done( $revents );
907             }
908             else {
909 6         13 $idx++;
910             }
911              
912 161 100 100     6447 $fh->blocking(0) if !$do_select and !$was_blocking;
913             }
914              
915 159         482 my $now = time();
916 159   100     982 while( @alarms and $alarms[0]->time <= $now ) {
917 6         197 ( shift @alarms )->f->done;
918             }
919             }
920              
921             =head2 await
922              
923             $f = $f->await;
924              
925             I
926              
927             Blocks until this future is ready (either by success or failure). Does not
928             throw an exception if failed.
929              
930             =cut
931              
932             sub await
933             {
934 50     50   6829 my $self = shift;
935 50         189 _await_once until $self->is_ready;
936 50         762 return $self;
937             }
938              
939             =head1 THE C<$IMPL> VARIABLE
940              
941             I
942              
943             As an alternative to setting an implementation by using L, a
944             package variable is also available that allows modules such as event systems
945             to opportunistically provide an implementation without needing to depend on
946             the module, or loading it C. Simply directly set that package
947             variable to the name of an implementing package or an object instance.
948              
949             Additionally, implementors may use a name within the C
950             namespace, suffixed by the name of their event system.
951              
952             For example, something like the following code arrangement is recommended.
953              
954             package Future::IO::Impl::BananaLoop;
955              
956             {
957             no warnings 'once';
958             ( $Future::IO::IMPL //= __PACKAGE__ ) eq __PACKAGE__ or
959             warn "Unable to set Future::IO implementation to " . __PACKAGE__ .
960             " as it is already $Future::IO::IMPL\n";
961             }
962              
963             sub poll
964             {
965             ...
966             }
967              
968             sub sleep
969             {
970             ...
971             }
972              
973             sub sysread
974             {
975             ...
976             }
977              
978             sub syswrite
979             {
980             ...
981             }
982              
983             sub waitpid
984             {
985             ...
986             }
987              
988             Optionally, you can also implement L and
989             L:
990              
991             sub sysread_exactly
992             {
993             ...
994             }
995              
996             sub syswrite_exactly
997             {
998             ...
999             }
1000              
1001             If not, they will be emulated by C itself, making multiple calls
1002             to the non-C<_exactly> versions.
1003              
1004             =head1 AUTHOR
1005              
1006             Paul Evans
1007              
1008             =cut
1009              
1010             0x55AA;