File Coverage

blib/lib/IO/Async/Loop/FutureIO.pm
Criterion Covered Total %
statement 95 102 93.1
branch 31 44 70.4
condition 9 13 69.2
subroutine 24 27 88.8
pod 12 12 100.0
total 171 198 86.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2026 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Loop::FutureIO 0.02;
7              
8 7     7   4206960 use v5.14;
  7         41  
9 7     7   39 use warnings;
  7         27  
  7         401  
10              
11 7     7   41 use constant API_VERSION => '0.76';
  7         14  
  7         522  
12              
13             # We cannot support ->watch_process( 0 => ... ) because Future::IO does not
14             # give us a hook
15 7     7   36 use constant _CAN_WATCH_ALL_PIDS => 0;
  7         14  
  7         403  
16              
17 7     7   43 use base qw( IO::Async::Loop );
  7         14  
  7         6011  
18             IO::Async::Loop->VERSION( '0.49' );
19              
20 7     7   140842 use Carp;
  7         19  
  7         557  
21              
22 7     7   687 use Future::IO 0.19;
  7         53891  
  7         413  
23 7     7   48 use Future::IO 0.19 qw( POLLIN POLLOUT );
  7         113  
  7         9629  
24              
25             =head1 NAME
26              
27             C - use C with C
28              
29             =head1 SYNOPSIS
30              
31             =for highlighter language=perl
32              
33             use IO::Async::Loop::FutureIO;
34              
35             use Future::IO:
36             Future::IO->load_best_impl;
37              
38             =head1 DESCRIPTION
39              
40             This subclass of L uses L to perform its work.
41              
42             Currently there are a few features that don't yet work, due to missing support
43             from C itself. Hopefully a later version of C will be
44             able to provide these missing pieces, and then this module will be shipped by
45             default in the main C distribution.
46              
47             =head2 Missing Features
48              
49             Currently the following things do not work with this module:
50              
51             =over 4
52              
53             =item Signals
54              
55             The C and C methods are not currently
56             implemented, because C does not support a general purpose signal
57             wait ability. Once this is available, these methods can be added.
58              
59             =item Watching PID 0
60              
61             Likewise, as C only supports watching specific PIDs and not a
62             repeating wait for any process, this is not permitted here.
63              
64             =item C
65              
66             The C method cannot reliably answer whether C itself
67             is currently blocked awaiting IO, so it is also not provided.
68              
69             =item Metrics
70              
71             This module would not be able to provide metrics on the overall operation of
72             C, so it is not provided.
73              
74             =back
75              
76             =cut
77              
78             sub new
79             {
80 6     6 1 153 my $class = shift;
81 6         48 my $self = $class->SUPER::__new( @_ );
82              
83 6 50       2458 $Future::IO::IMPL eq "Future::IO::Impl::IOAsync" and
84             croak "Cannot mutually build Future::IO and IO::Async on each other";
85              
86 6         23 return $self;
87             }
88              
89             sub _more_f
90             {
91 60     60   13075 my $self = shift;
92 60         160 my ( $f ) = @_;
93              
94 60   66     383 my $loop_f = ( $self->{next_loop_f} //= $f->new );
95              
96             $f->on_ready( sub {
97 60 100   60   2418120 $loop_f->done unless $loop_f->is_ready;
98 60         957 });
99              
100 60         5376 return $f;
101             }
102              
103             sub loop_once
104             {
105 36     36 1 7364 my $self = shift;
106 36         94 my ( $timeout ) = @_;
107              
108 36         100 my $timeout_f;
109 36 100       214 if( defined $timeout ) {
110 35         175 $timeout_f = $self->_more_f( Future::IO->sleep( $timeout ) );
111             }
112              
113 36 50       170 if( my $f = $self->{next_loop_f} ) {
114 36         129 undef $self->{next_loop_f};
115              
116 36         189 $f->await;
117             }
118             else {
119 0         0 die "TODO: loop_once without next_loop_f";
120             }
121              
122 36 100       2985 $timeout_f->cancel if $timeout_f;
123             }
124              
125             sub _poll_for_io
126             {
127 2449     2449   7057 my ( $watch, $handle, $mask, $cb ) = @_;
128              
129 2449         4615 my $key = "${mask}_f";
130              
131             $watch->{$key} = Future::IO->poll( $handle, $mask )
132             ->on_done( sub {
133 2435     2435   1149977 $cb->();
134              
135             _poll_for_io( $watch, $handle, $mask, $cb )
136 2435 100       13217 if exists $watch->{$key};
137 2449         20931 } );
138             }
139              
140             sub watch_io
141             {
142 14     14 1 38002 my $self = shift;
143 14         96 my %params = @_;
144              
145 14 50       57 my $handle = $params{handle} or die "Need a handle";
146 14   100     95 my $watch = $self->{watch_io}{$handle} //= {};
147              
148 14 100       51 if( my $cb = $params{on_read_ready} ) {
149 9         26 _poll_for_io( $watch, $handle, POLLIN, $cb );
150             }
151 14 100       1903 if( my $cb = $params{on_write_ready} ) {
152 7         21 _poll_for_io( $watch, $handle, POLLOUT, $cb );
153             }
154             }
155              
156             sub unwatch_io
157             {
158 11     11 1 6008 my $self = shift;
159 11         46 my %params = @_;
160              
161 11 50       38 my $handle = $params{handle} or die "Need a handle";
162 11 50       53 my $watch = $self->{watch_io}{$handle} or return;
163              
164 11 100       38 if( $params{on_read_ready} ) {
165 8         14 my $mask = POLLIN;
166 8         40 ( delete $watch->{"${mask}_f"} )->cancel;
167             }
168 11 100       325 if( $params{on_write_ready} ) {
169 7         14 my $mask = POLLOUT;
170 7         27 ( delete $watch->{"${mask}_f"} )->cancel;
171             }
172              
173             keys %$watch or
174 11 100       298 delete $self->{watch_io}{$handle};
175             }
176              
177             sub watch_time
178             {
179 14     14 1 14144 my $self = shift;
180 14         268 my %params = @_;
181              
182 14 50       93 my $code = $params{code} or croak "Expected 'code' as CODE ref";
183 14   33     285 my $now = $params{now} // $self->time;
184 14   66     223 my $delay = $params{after} // ( $params{at} - $now );
185              
186 14 100       172 $delay = 0 if $delay < 0;
187              
188             return $self->_more_f(
189             Future::IO->sleep( $delay )
190 6     6   7106837 ->on_done( sub { $code->() } )
191 14         137 );
192             }
193              
194             sub unwatch_time
195             {
196 9     9 1 818 my $self = shift;
197 9         44 my ( $timer ) = @_;
198              
199 9         37 $timer->cancel;
200             }
201              
202             sub watch_idle
203             {
204 6     6 1 5270 my $self = shift;
205 6         25 my %params = @_;
206              
207 6 50       22 my $when = delete $params{when} or croak "Expected 'when'";
208              
209 6 50       20 my $code = delete $params{code} or croak "Expected 'code' as a CODE ref";
210              
211 6 50       15 $when eq "later" or croak "Expected 'when' to be 'later'";
212              
213             # Just treat it as a zero timer
214             return $self->_more_f(
215             Future::IO->sleep( 0 )
216 5     5   955 ->on_done( sub { $code->() } )
217 6         33 );
218             }
219              
220             sub unwatch_idle
221             {
222 1     1 1 5 my $self = shift;
223 1         2 my ( $idle ) = @_;
224              
225 1         4 $idle->cancel;
226             }
227              
228             sub watch_signal
229             {
230 0     0 1 0 croak "Future::IO does not currently support signal watches";
231             }
232              
233             sub unwatch_signal
234             {
235 0     0 1 0 croak "Future::IO does not currently support signal watches";
236             }
237              
238             sub watch_process
239             {
240 5     5 1 30771 my $self = shift;
241 5         97 my ( $pid, $code ) = @_;
242              
243 5 50       161 defined $pid or croak "Require a PID for ->watch_process";
244 5 50       217 $pid or croak "Require a PID for ->watch_process (cannot watch for all processes by PID=0)";
245              
246 5   100     308 my $waitpids = $self->{waitpids} //= {};
247              
248             $waitpids->{$pid} = $self->_more_f(
249             Future::IO->waitpid( $pid )
250             ->on_done( sub {
251 5     5   1122422 delete $waitpids->{$pid};
252 5         35 $code->( $pid, $_[0] );
253             })
254 5         350 );
255             }
256              
257             sub unwatch_process
258             {
259 0     0 1   my $self = shift;
260 0           my ( $pid ) = @_;
261              
262 0           my $f = delete $self->{waitpids}{$pid};
263 0 0         $f->cancel if $f;
264             }
265              
266             =head1 AUTHOR
267              
268             Paul Evans
269              
270             =cut
271              
272             0x55AA;