File Coverage

blib/lib/Future/Selector.pm
Criterion Covered Total %
statement 93 94 98.9
branch 22 24 91.6
condition 15 21 71.4
subroutine 18 18 100.0
pod 4 4 100.0
total 152 161 94.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2023-2025 -- leonerd@leonerd.org.uk
5              
6 6     6   1578167 use v5.26;
  6         26  
7 6     6   44 use warnings;
  6         41  
  6         425  
8              
9 6     6   4657 use Object::Pad 0.800;
  6         91305  
  6         545  
10 6     6   4770 use Future::AsyncAwait 0.44 ':experimental(cancel)';
  6         42338  
  6         44  
11 6     6   4062 use Sublike::Extended 0.29 'method';
  6         5693  
  6         36  
12              
13             class Future::Selector 0.05;
14              
15 6     6   2572 use Carp;
  6         15  
  6         530  
16 6     6   45 use Scalar::Util qw( refaddr );
  6         13  
  6         1238  
17              
18             =head1 NAME
19              
20             C - manage a collection of pending futures
21              
22             =head1 SYNOPSIS
23              
24             use Future::AsyncAwait;
25             use Future::IO;
26             use Future::Selector;
27             use IO::Socket::IP;
28              
29             my $selector = Future::Selector->new;
30              
31             my $listensock = IO::Socket::IP->new(
32             LocalHost => "::1",
33             LocalPort => "8191",
34             Listen => 1,
35             );
36              
37             $selector->add(
38             data => "listener",
39             gen => sub { Future::IO->accept( $listensock ) },
40             );
41              
42             while(1) {
43             my @ready = await $selector->select;
44              
45             ...
46             }
47              
48             =head1 DESCRIPTION
49              
50             Objects in this class maintain a collection of pending L instances,
51             and manage the lifecycle of waiting for their eventual completion. This
52             provides a central structure for writing asynchronous event-driven programs
53             using L and L-based logic.
54              
55             When writing an asynchronous C-based client, often the program can be
56             structured similar to a straight-line synchronous program, where at any point
57             the client is just waiting on sending or receiving one particular message or
58             data-flow. It therefore suffices to use a simple call/response structure,
59             perhaps written using the C and C keywords provided by
60             L.
61              
62             In contrast, a server program often has many things happening at once. It will
63             be handling multiple clients simultaneously, as well as waiting for new client
64             connections and any other internal logic it requires to provide data to those
65             clients. There is not just one obvious pending future at any one time; there
66             could be several that all need to be monitored for success or failure.
67              
68             A C instance helps this situation, by storing an entire set
69             of pending futures that represent individual sub-divisions of the work of the
70             program (or a part of it). As each completes, the selector instance informs
71             the containing code so it can continue to perform the work required to handle
72             that part, perhaps resulting in more future instances for the selector to
73             manage.
74              
75             =head2 Program Structure
76              
77             As per the SYNOPSIS example, a typical server-style program would probably be
78             structured around a C loop that repeatedly Cs on the next
79             C
80             The data values stored with each future and returned by the C
81             can be used to help direct the program into working out what is going on. For
82             example, string names or object instances could help identify different kinds
83             of next step.
84              
85             use v5.36;
86              
87             ...
88              
89             $selector->add(
90             data => "listener",
91             gen => sub { Future::IO->accept( $listensock ) },
92             );
93              
94             while(1) {
95             foreach my ( $data, $f ) ( await $selector->select ) {
96             if( $data eq "listener" ) {
97             # a new client has been accept()ed. should now set up handling
98             # for it in some manner.
99              
100             my $sock = await $f;
101             my $clientconn = ClientConnection->new( fh => $sock );
102              
103             $selector->add( data => $clientconn, f => $clientconn->run );
104             }
105             elsif( $data isa ClientConnection ) {
106             # an existing connection's runloop has terminated. should now
107             # handle that in whatever way is appropriate
108             ...
109             }
110             ...
111             }
112             }
113              
114             Alternatively, if each stored future instance already performed all of the
115             work required to handle it before it yields success, there may be nothing for
116             the toplevel application loop to do other than repeatedly wait for things to
117             happen.
118              
119             $selector->add(
120             data => undef, # ignored
121             gen => async sub {
122             my $sock = await Future::IO->accept( $listensock );
123             my $clientconn = ClientConnection->new( fh => $sock );
124              
125             $selector->add( data => undef, f => $clientconn->run );
126             }
127             );
128              
129             await $selector->select while 1;
130              
131             Failure propagation by the C
132             encountered by individual component futures are still passed upwards through
133             the program structure, ultimately ending at the toplevel if nothing else
134             catches it first.
135              
136             =head2 Comparison With C, C, etc..
137              
138             In some ways, the operation of this class is similar to system calls like
139             C and C. However, there are several key differences:
140              
141             =over 4
142              
143             =item *
144              
145             C stores high-level futures, rather than operating directly
146             on system-level filehandles. As such, it can wait for application-level
147             events and workflow when those things are represented by futures.
148              
149             =item *
150              
151             The main waiting call, L, is a method that returns a future. This
152             could be returned from some module or component of a program, to be awaited on
153             by another outer C instance. The application is not
154             limited to exactly one as would be the case for blocking system calls, but can
155             instead create a hierarchical structure out of as many instances as are
156             required.
157              
158             =item *
159              
160             Once added, a given future remains a member of a C instance
161             until it eventually completes; which may require many calls to the C
162             method (or indeed, it may never complete during the lifetime of the program,
163             for tasks that should keep pending throughout). In this way, the object is
164             more comparable to persistent system-level schedulers like Linux's C or
165             BSD's C mechanisms, than the one-shot nature of C or
166             C themselves.
167              
168             =back
169              
170             =cut
171              
172             class Future::Selector::_Item {
173 30     30   79 field $data :param :reader;
  30         86  
174 134     134   386 field $f :param :mutator;
  134         440  
175 55     55   115 field $gen :param :reader;
  55         185  
176             }
177              
178             field %items; # keyed by refaddr
179              
180             =head1 METHODS
181              
182             =cut
183              
184             field $next_waitf;
185             field @next_ready;
186             field $next_failure;
187             field @items_needing_regen;
188              
189 37     37   73 method _item_is_ready ( $item )
  37         158  
  37         65  
  37         53  
190             {
191 37         114 my $f = $item->f;
192              
193 37         120 delete $items{ refaddr $item };
194              
195 37 100       105 if( $item->gen ) {
196 23         52 push @items_needing_regen, $item;
197             }
198              
199 37 100       137 return if $f->is_cancelled;
200              
201 34 100       225 if( $next_waitf ) {
202 12 100       57 if( $f->is_failed ) {
203 1         39 $f->on_fail( $next_waitf ); # copy the failure
204             }
205             else {
206 11         126 $next_waitf->done( $item->data, $item->f );
207             }
208             }
209             else {
210 22 100       60 if( $f->is_failed ) {
211 3   33     38 $next_failure //= $f;
212             }
213             else {
214 19         134 push @next_ready, $item->data, $item->f;
215             }
216             }
217             }
218              
219             =head2 add
220              
221             $selector->add( data => $data, f => $f );
222              
223             Adds a new future to the collection.
224              
225             After the future becomes ready, the currently-pending C
226             next one to be created) will complete. It will yield the given data and future
227             instance if this future succeeded, or fail with the same failure if this
228             future failed. At that point it will be removed from the stored collection.
229             If the item future was cancelled, it is removed from the collection but
230             otherwise ignored; the C
231             result.
232              
233             $selector->add( data => $data, gen => $gen );
234              
235             $f = $gen->();
236              
237             Adds a new generator of futures to the collection.
238              
239             The generator is a code reference which is used to generate a future, which is
240             then added to the collection similar to the above case. Each time the future
241             becomes ready, the generator is called again to create another future to
242             continue watching. This continues until the generator returns C.
243              
244             =cut
245              
246 21     21 1 397 method add ( :$data, :$f = undef, :$gen = undef )
  21         78  
  21         111  
  21         67  
247             {
248 21 100 66     183 if( $gen and !$f ) {
    50          
249             # TODO: Consider if we should do this immediately at all?
250 5         15 $f = $gen->();
251             }
252             elsif( !$f ) {
253 0         0 croak "Require 'f' or 'gen'";
254             }
255              
256 21         397 my $item = Future::Selector::_Item->new(
257             data => $data,
258             f => $f,
259             gen => $gen,
260             );
261 21         69 $items{ refaddr $item } = $item;
262              
263 21     19   177 $f->on_ready( sub { $self->_item_is_ready( $item ) } );
  19         43895  
264              
265 21         515 return;
266             }
267              
268             =head2 select
269              
270             ( $data1, $f1, $data2, $f2, ... ) = await $selector->select();
271              
272             Returns a future that will become ready when at least one of the stored
273             futures is ready. It will yield an even-sized list of pairs, giving the
274             associated data and original (now-completed) futures that were stored.
275              
276             If you are intending to run the loop indefinitely, be careful not to write
277             code such as
278              
279             1 while await $selector->select;
280              
281             because in scalar context, the Ced future will yield its first value,
282             which will be the data associated with the first completed future. If that
283             data value was false (such as C) then the loop will stop running at
284             that point. Generally in these sorts of situations you want to use L or
285             L.
286              
287             =cut
288              
289 34     34 1 9752 method select ()
  34         131  
  34         53  
290             {
291 34   100     121 my $wait_f = $next_waitf // do {
292 33 100       145 if( my @i = @items_needing_regen ) {
293 18         46 undef @items_needing_regen;
294              
295 18         40 foreach my $item ( @i ) {
296 18 50       41 my $f = $item->gen->() or next;
297              
298 18     18   545 $f->on_ready( sub { $self->_item_is_ready( $item ) } );
  18         952  
299 18         180 $item->f = $f;
300 18         80 $items{ refaddr $item } = $item;
301             }
302             }
303              
304 33 100 100     817 keys %items or @next_ready or $next_failure or
      66        
305             croak "$self cowardly refuses to sit idle and do nothing";
306              
307 30   66     138 $_->f->is_ready or $next_waitf = $_->f->new, last for values %items;
308 30   66     272 $next_waitf //= Future->new;
309              
310 30         238 $next_waitf->set_label( "Future::Selector next_waitf" );
311              
312 30 100       277 if( $next_failure ) {
    100          
313 3         13 $next_failure->on_fail( $next_waitf ); # copy the failure
314 3         193 undef $next_failure;
315             }
316             elsif( @next_ready ) {
317 14         51 $next_waitf->done( @next_ready );
318 14         578 undef @next_ready;
319             }
320              
321 30     29   195 $next_waitf->on_ready( sub { undef $next_waitf } );
  29         1030  
322             };
323              
324             # We need to ensure that overlapping calls to ->select can't accidentally
325             # cancel each other.
326             # A simple call to ->without_cancel doesn't quite work as it causes
327             # sequence futures to be lost.
328              
329 31         510 my $ret_f = $wait_f->new;
330 31         268 $wait_f->on_done( $ret_f )
331             ->on_fail( $ret_f );
332             # nothing about cancel of $ret_f here. technically if we don't tidy up the
333             # on_done/on_fail above these will retain $ret_f longer than necessary, but
334             # there's no API to do that currently. Hopefully $wait_f will get cycled
335             # and replaced soon enough anyway and that will all go then.
336 31         1873 return $ret_f;
337             }
338              
339             =head2 run
340              
341             await $selector->run();
342              
343             I
344              
345             Returns a future that represents repeatedly calling the L method
346             indefinitely. This will not return, except that if any of the contained
347             futures fails then this will fail the same way.
348              
349             This is most typically used at the toplevel of a server-type program, one
350             where there is no normal exit condition and the program is expected to remain
351             running unless some fatal error happens.
352              
353             =cut
354              
355 2     2 1 10 async method run ()
  2         5  
  2         5  
356 2         6 {
357 2         9 await $self->select while 1;
358             }
359              
360             =head2 run_until_ready
361              
362             @result = await $selector->run_until_ready( $f );
363              
364             I
365              
366             Returns a future that represents repeatedly calling the L method
367             until the given future instance is ready. When it becomes ready (either by
368             success or failure) the returned future will yield the same result. If the
369             returned future is cancelled, then C<$f> itself will be cancelled too. This
370             will not cancel a concurrently-pending C
371              
372             The given future will be added to the selector by calling this method; you
373             should I call L on it yourself first.
374              
375             This is typically used in client or hybrid code, or as a nested component of
376             a server program, which needs to wait on a result while also performing other
377             background tasks.
378              
379             Remember that since this method itself returns a future, it could easily serve
380             as the input to another outer-level selector instance.
381              
382             =cut
383              
384 3     3 1 544 async method run_until_ready ( $f )
  3         10  
  3         7  
  3         6  
385 3         12 {
386 3         14 $self->add( data => undef, f => $f );
387             CANCEL { $f->cancel; }
388 3         16 await $self->select until $f->is_ready;
  3         12  
389 1         28 return await $f;
390             }
391              
392             =head1 TODO
393              
394             =over 4
395              
396             =item *
397              
398             Convenience ->add_f / ->add_gen
399              
400             =item *
401              
402             Configurable behaviour on component future failure
403              
404             =back
405              
406             =cut
407              
408             =head1 AUTHOR
409              
410             Paul Evans
411              
412             =cut
413              
414             0x55AA;