File Coverage

blib/lib/IO/Async/Function.pm
Criterion Covered Total %
statement 239 250 95.6
branch 83 112 74.1
condition 20 32 62.5
subroutine 47 48 97.9
pod 8 11 72.7
total 397 453 87.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2024 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Function 0.805;
7              
8 10     10   551453 use v5.14;
  10         36  
9 10     10   54 use warnings;
  10         16  
  10         596  
10              
11 10     10   60 use base qw( IO::Async::Notifier );
  10         20  
  10         2385  
12 10     10   8081 use IO::Async::Timer::Countdown;
  10         28  
  10         345  
13              
14 10     10   68 use Carp;
  10         20  
  10         745  
15              
16 10     10   65 use List::Util qw( first );
  10         15  
  10         627  
17              
18 10     10   3355 use Struct::Dumb qw( readonly_struct );
  10         59808  
  10         112  
19              
20             readonly_struct Pending => [qw( priority f )];
21              
22             =head1 NAME
23              
24             C - call a function asynchronously
25              
26             =head1 SYNOPSIS
27              
28             =for highlighter language=perl
29              
30             use Future::AsyncAwait;
31             use IO::Async::Function;
32              
33             use IO::Async::Loop;
34             my $loop = IO::Async::Loop->new;
35              
36             my $function = IO::Async::Function->new(
37             code => sub {
38             my ( $number ) = @_;
39             return is_prime( $number );
40             },
41             );
42              
43             $loop->add( $function );
44              
45             my $isprime = await $function->call(
46             args => [ 123454321 ],
47             );
48              
49             print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
50              
51             =head1 DESCRIPTION
52              
53             This subclass of L wraps a function body in a collection
54             of worker processes, to allow it to execute independently of the main process.
55             The object acts as a proxy to the function, allowing invocations to be made by
56             passing in arguments, and invoking a continuation in the main process when the
57             function returns.
58              
59             The object represents the function code itself, rather than one specific
60             invocation of it. It can be called multiple times, by the C method.
61             Multiple outstanding invocations can be called; they will be dispatched in
62             the order they were queued. If only one worker process is used then results
63             will be returned in the order they were called. If multiple are used, then
64             each request will be sent in the order called, but timing differences between
65             each worker may mean results are returned in a different order.
66              
67             Since the code block will be called multiple times within the same child
68             process, it must take care not to modify any of its state that might affect
69             subsequent calls. Since it executes in a child process, it cannot make any
70             modifications to the state of the parent program. Therefore, all the data
71             required to perform its task must be represented in the call arguments, and
72             all of the result must be represented in the return values.
73              
74             The Function object is implemented using an L with two
75             L objects to pass calls into and results out from it.
76              
77             The L framework generally provides mechanisms for multiplexing IO
78             tasks between different handles, so there aren't many occasions when such an
79             asynchronous function is necessary. Two cases where this does become useful
80             are:
81              
82             =over 4
83              
84             =item 1.
85              
86             When a large amount of computationally-intensive work needs to be performed
87             (for example, the C test in the example in the C).
88              
89             =item 2.
90              
91             When a blocking OS syscall or library-level function needs to be called, and
92             no nonblocking or asynchronous version is supplied. This is used by
93             L.
94              
95             =back
96              
97             This object is ideal for representing "pure" functions; that is, blocks of
98             code which have no stateful effect on the process, and whose result depends
99             only on the arguments passed in. For a more general co-routine ability, see
100             also L.
101              
102             =cut
103              
104             =head1 PARAMETERS
105              
106             The following named parameters may be passed to C or C:
107              
108             =head2 code => CODE
109              
110             The body of the function to execute.
111              
112             @result = $code->( @args );
113              
114             =head2 init_code => CODE
115              
116             Optional. If defined, this is invoked exactly once in every child process or
117             thread, after it is created, but before the first invocation of the function
118             body itself.
119              
120             $init_code->();
121              
122             =head2 module => STRING
123              
124             =head2 func => STRING
125              
126             I
127              
128             An alternative to the C argument, which names a module to load and a
129             function to call within it. C should give a perl module name (i.e.
130             C, not a filename like F), and C should give
131             the basename of a function within that module (i.e. without the module name
132             prefixed). It will be invoked, without extra arguments, as the main code
133             body of the object.
134              
135             The task of loading this module and resolving the resulting function from it
136             is only performed on the remote worker side, so the controlling process will
137             not need to actually load the module.
138              
139             =head2 init_func => STRING or ARRAY [ STRING, ... ]
140              
141             Optional addition to the C and C alternatives. Names a function
142             within the module to call each time a new worker is created.
143              
144             If this value is an array reference, its first element must be a string giving
145             the name of the function; the remaining values are passed to that function as
146             arguments.
147              
148             =head2 model => "fork" | "thread" | "spawn"
149              
150             Optional. Requests a specific L model. If not supplied,
151             leaves the default choice up to Routine.
152              
153             =head2 min_workers => INT
154              
155             =head2 max_workers => INT
156              
157             The lower and upper bounds of worker processes to try to keep running. The
158             actual number running at any time will be kept somewhere between these bounds
159             according to load.
160              
161             =head2 max_worker_calls => INT
162              
163             Optional. If provided, stop a worker process after it has processed this
164             number of calls. (New workers may be started to replace stopped ones, within
165             the bounds given above).
166              
167             =head2 idle_timeout => NUM
168              
169             Optional. If provided, idle worker processes will be shut down after this
170             amount of time, if there are more than C of them.
171              
172             =head2 exit_on_die => BOOL
173              
174             Optional boolean, controls what happens after the C throws an
175             exception. If missing or false, the worker will continue running to process
176             more requests. If true, the worker will be shut down. A new worker might be
177             constructed by the C method to replace it, if necessary.
178              
179             =head2 setup => ARRAY
180              
181             Optional array reference. Specifies the C key to pass to the underlying
182             L when setting up new worker processes.
183              
184             =cut
185              
186             sub _init
187             {
188 49     49   378 my $self = shift;
189 49         469 $self->SUPER::_init( @_ );
190              
191 49         233 $self->{min_workers} = 1;
192 49         115 $self->{max_workers} = 8;
193              
194 49         374 $self->{workers} = {}; # {$id} => IaFunction:Worker
195              
196 49         175 $self->{pending_queue} = [];
197             }
198              
199             sub configure
200             {
201 49     49 1 97 my $self = shift;
202 49         196 my %params = @_;
203              
204 49         90 my %worker_params;
205 49         464 foreach (qw( model exit_on_die max_worker_calls )) {
206 147 100       1983 $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
207             }
208              
209 49 100       262 if( keys %worker_params ) {
210 14         65 foreach my $worker ( $self->_worker_objects ) {
211 0         0 $worker->configure( %worker_params );
212             }
213             }
214              
215 49 100       219 if( exists $params{idle_timeout} ) {
216 7         30 my $timeout = delete $params{idle_timeout};
217 7 50       58 if( !$timeout ) {
    50          
218 0 0       0 $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
219             }
220             elsif( my $idle_timer = $self->{idle_timer} ) {
221 0         0 $idle_timer->configure( delay => $timeout );
222             }
223             else {
224             $self->{idle_timer} = IO::Async::Timer::Countdown->new(
225             delay => $timeout,
226             on_expire => $self->_capture_weakself( sub {
227 1 50   1   5 my $self = shift or return;
228 1         5 my $workers = $self->{workers};
229              
230             # Shut down atmost one idle worker, starting from the highest
231             # ID. Since we search from lowest to assign work, this tries
232             # to ensure we'll shut down the least useful ones first,
233             # keeping more useful ones in memory (page/cache warmth, etc..)
234 1         6 foreach my $id ( reverse sort keys %$workers ) {
235 1 50       14 next if $workers->{$id}{busy};
236              
237 1         22 $workers->{$id}->stop;
238 1         3 last;
239             }
240              
241             # Still more?
242 1 50       9 $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
243 7         129 } ),
244             );
245 7         87 $self->add_child( $self->{idle_timer} );
246             }
247             }
248              
249 49         557 foreach (qw( min_workers max_workers )) {
250 98 100       288 $self->{$_} = delete $params{$_} if exists $params{$_};
251             # TODO: something about retuning
252             }
253              
254 49         93 my $need_restart;
255              
256 49         109 foreach (qw( init_code code module init_func func setup )) {
257 294 100       688 $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
258             }
259              
260             defined $self->{code} and defined $self->{func} and
261 49 50 66     529 croak "Cannot ->configure both 'code' and 'func'";
262             defined $self->{func} and !defined $self->{module} and
263 49 50 66     264 croak "'func' parameter requires a 'module' as well";
264              
265 49         332 $self->SUPER::configure( %params );
266              
267 49 50 33     577 if( $need_restart and $self->loop ) {
268 0         0 $self->stop;
269 0         0 $self->start;
270             }
271             }
272              
273             sub _add_to_loop
274             {
275 49     49   89 my $self = shift;
276 49         236 $self->SUPER::_add_to_loop( @_ );
277              
278 49         259 $self->start;
279             }
280              
281             sub _remove_from_loop
282             {
283 37     37   68 my $self = shift;
284              
285 37         202 $self->stop;
286              
287 37         253 $self->SUPER::_remove_from_loop( @_ );
288             }
289              
290             =head1 METHODS
291              
292             The following methods documented in C expressions return L
293             instances.
294              
295             =cut
296              
297             =head2 start
298              
299             $function->start;
300              
301             Start the worker processes
302              
303             =cut
304              
305             sub start
306             {
307 51     51 1 65 my $self = shift;
308              
309 51         290 $self->_new_worker for 1 .. $self->{min_workers};
310             }
311              
312             =head2 stop
313              
314             $function->stop;
315              
316             Stop the worker processes
317              
318             $f = $function->stop;
319              
320             I
321              
322             If called in non-void context, returns a L instance that
323             will complete once every worker process has stopped and exited. This may be
324             useful for waiting until all of the processes are waited on, or other
325             edge-cases, but is not otherwise particularly useful.
326              
327             =cut
328              
329             sub stop
330             {
331 43     43 1 84 my $self = shift;
332              
333 43         538 $self->{stopping} = 1;
334              
335 43         80 my @f;
336              
337 43         227 foreach my $worker ( $self->_worker_objects ) {
338 38 100       235 defined wantarray ? push @f, $worker->stop : $worker->stop;
339             }
340              
341 43 100       1051 return Future->needs_all( @f ) if defined wantarray;
342             }
343              
344             =head2 restart
345              
346             $function->restart;
347              
348             Gracefully stop and restart all the worker processes.
349              
350             =cut
351              
352             sub restart
353             {
354 2     2 1 1648 my $self = shift;
355              
356 2         13 $self->stop;
357 2         13 $self->start;
358             }
359              
360             =head2 call
361              
362             @result = await $function->call( %params );
363              
364             Schedules an invocation of the contained function to be executed on one of the
365             worker processes. If a non-busy worker is available now, it will be called
366             immediately. If not, it will be queued and sent to the next free worker that
367             becomes available.
368              
369             The request will already have been serialised by the marshaller, so it will be
370             safe to modify any referenced data structures in the arguments after this call
371             returns.
372              
373             The C<%params> hash takes the following keys:
374              
375             =over 8
376              
377             =item args => ARRAY
378              
379             A reference to the array of arguments to pass to the code.
380              
381             =item priority => NUM
382              
383             Optional. Defines the sorting order when no workers are available and calls
384             must be queued for later. A default of zero will apply if not provided.
385              
386             Higher values cause the call to be considered more important, and will be
387             placed earlier in the queue than calls with a smaller value. Calls of equal
388             priority are still handled in FIFO order.
389              
390             =back
391              
392             If the function body returns normally the list of results are provided as the
393             (successful) result of returned future. If the function throws an exception
394             this results in a failed future. In the special case that the exception is in
395             fact an unblessed C reference, this array is unpacked and used as-is
396             for the C result. If the exception is not such a reference, it is used
397             as the first argument to C, in the category of C.
398              
399             $f->done( @result );
400              
401             $f->fail( @{ $exception } );
402             $f->fail( $exception, error => );
403              
404             =head2 call (void)
405              
406             $function->call( %params );
407              
408             When not returning a future, the C, C and C
409             arguments give continuations to handle successful results or failure.
410              
411             =over 8
412              
413             =item on_result => CODE
414              
415             A continuation that is invoked when the code has been executed. If the code
416             returned normally, it is called as:
417              
418             $on_result->( 'return', @values )
419              
420             If the code threw an exception, or some other error occurred such as a closed
421             connection or the process died, it is called as:
422              
423             $on_result->( 'error', $exception_name )
424              
425             =item on_return => CODE and on_error => CODE
426              
427             An alternative to C. Two continuations to use in either of the
428             circumstances given above. They will be called directly, without the leading
429             'return' or 'error' value.
430              
431             =back
432              
433             =cut
434              
435             sub debug_printf_call
436             {
437 75     75 0 116 my $self = shift;
438 75         625 $self->debug_printf( "CALL" );
439             }
440              
441             sub debug_printf_result
442             {
443 52     52 0 91 my $self = shift;
444 52         239 $self->debug_printf( "RESULT" );
445             }
446              
447             sub debug_printf_failure
448             {
449 21     21 0 40 my $self = shift;
450 21         55 my ( $err ) = @_;
451 21         150 $self->debug_printf( "FAIL $err" );
452             }
453              
454             sub call
455             {
456 88     88 1 8324 my $self = shift;
457 88         1590 my %params = @_;
458              
459             # TODO: possibly just queue this?
460 88 50       340 $self->loop or croak "Cannot ->call on a Function not yet in a Loop";
461              
462 88         199 my $args = delete $params{args};
463 88 50       272 ref $args eq "ARRAY" or croak "Expected 'args' to be an array";
464              
465 88         189 my ( $on_done, $on_fail );
466 88 100 66     911 if( defined $params{on_result} ) {
    100          
    50          
467 2         7 my $on_result = delete $params{on_result};
468 2 50       7 ref $on_result or croak "Expected 'on_result' to be a reference";
469              
470             $on_done = sub {
471 2     2   57 $on_result->( return => @_ );
472 2         25 };
473             $on_fail = sub {
474 0     0   0 my ( $err, @values ) = @_;
475 0         0 $on_result->( error => @values );
476 2         17 };
477             }
478             elsif( defined $params{on_return} and defined $params{on_error} ) {
479 44         117 my $on_return = delete $params{on_return};
480 44 50       237 ref $on_return or croak "Expected 'on_return' to be a reference";
481 44         90 my $on_error = delete $params{on_error};
482 44 50       176 ref $on_error or croak "Expected 'on_error' to be a reference";
483              
484 44         63 $on_done = $on_return;
485 44         104 $on_fail = $on_error;
486             }
487             elsif( !defined wantarray ) {
488 0         0 croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
489             }
490              
491 88         418 $self->debug_printf_call( @$args );
492              
493 88         1308 my $request = IO::Async::Channel->encode( $args );
494              
495 88         158 my $future;
496 88 100       663 if( my $worker = $self->_get_worker ) {
497 64         4872 $future = $self->_call_worker( $worker, $request );
498             }
499             else {
500 23         112 $self->debug_printf( "QUEUE" );
501 23         42 my $queue = $self->{pending_queue};
502              
503             my $next = Pending(
504 23   100     295 my $priority = $params{priority} || 0,
505             my $wait_f = $self->loop->new_future,
506             );
507              
508 23 100       2378 if( $priority ) {
509 9     12   192 my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
  12         354  
510 9   33     186 splice @$queue, $idx // $#$queue+1, 0, ( $next );
511             }
512             else {
513 14         43 push @$queue, $next;
514             }
515              
516             $future = $wait_f->then( sub {
517 22     22   2125 my ( $self, $worker ) = @_;
518 22         158 $self->_call_worker( $worker, $request );
519 23         316 });
520             }
521              
522             $future->on_done( $self->_capture_weakself( sub {
523 64 50   64   236 my $self = shift or return;
524 64         439 $self->debug_printf_result( @_ );
525 87         1890 }));
526             $future->on_fail( $self->_capture_weakself( sub {
527 21 50   21   144 my $self = shift or return;
528 21         158 $self->debug_printf_failure( @_ );
529 87         2885 }));
530              
531 87 100       2567 $future->on_done( $on_done ) if $on_done;
532 87 100       955 $future->on_fail( $on_fail ) if $on_fail;
533              
534 87 100       1617 return $future if defined wantarray;
535              
536             # Caller is not going to keep hold of the Future, so we have to ensure it
537             # stays alive somehow
538 36     14   464 $self->adopt_future( $future->else( sub { Future->done } ) );
  14         714  
539             }
540              
541             sub _worker_objects
542             {
543 226     226   476 my $self = shift;
544 226         316 return values %{ $self->{workers} };
  226         1067  
545             }
546              
547             =head2 workers
548              
549             $count = $function->workers;
550              
551             Returns the total number of worker processes available
552              
553             =cut
554              
555             sub workers
556             {
557 53     53 1 9346 my $self = shift;
558 53         94 return scalar keys %{ $self->{workers} };
  53         869  
559             }
560              
561             =head2 workers_busy
562              
563             $count = $function->workers_busy;
564              
565             Returns the number of worker processes that are currently busy
566              
567             =cut
568              
569             sub workers_busy
570             {
571 9     9 1 6426 my $self = shift;
572 9         36 return scalar grep { $_->{busy} } $self->_worker_objects;
  9         117  
573             }
574              
575             =head2 workers_idle
576              
577             $count = $function->workers_idle;
578              
579             Returns the number of worker processes that are currently idle
580              
581             =cut
582              
583             sub workers_idle
584             {
585 160     160 1 329 my $self = shift;
586 160         546 return scalar grep { !$_->{busy} } $self->_worker_objects;
  165         867  
587             }
588              
589             sub _new_worker
590             {
591 55     55   83 my $self = shift;
592              
593             my $worker = IO::Async::Function::Worker->new(
594 440         1345 ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
595             max_calls => $self->{max_worker_calls},
596              
597             on_finish => $self->_capture_weakself( sub {
598 3 50   3   30 my $self = shift or return;
599 3         6 my ( $worker ) = @_;
600              
601 3 50       21 return if $self->{stopping};
602              
603 0 0       0 $self->_new_worker if $self->workers < $self->{min_workers};
604              
605 0         0 $self->_dispatch_pending;
606 55         219 } ),
607             );
608              
609 55         440 $self->add_child( $worker );
610              
611 53         1608 return $self->{workers}{$worker->id} = $worker;
612             }
613              
614             sub _get_worker
615             {
616 111     111   201 my $self = shift;
617              
618 111         185 foreach ( sort keys %{ $self->{workers} } ) {
  111         547  
619 102 100       641 return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
620             }
621              
622 35 100       405 if( $self->workers < $self->{max_workers} ) {
623 12         70 return $self->_new_worker;
624             }
625              
626 23         62 return undef;
627             }
628              
629             sub _call_worker
630             {
631 86     86   164 my $self = shift;
632 86         249 my ( $worker, $type, $args ) = @_;
633              
634 86         1390 my $future = $worker->call( $type, $args );
635              
636 86 100       2779 if( $self->workers_idle == 0 ) {
637 84 100       596 $self->{idle_timer}->stop if $self->{idle_timer};
638             }
639              
640 86         255 return $future;
641             }
642              
643             sub _dispatch_pending
644             {
645 86     86   164 my $self = shift;
646              
647 86         148 while( my $next = shift @{ $self->{pending_queue} } ) {
  87         811  
648 23 50       205 my $worker = $self->_get_worker or return;
649              
650 23         1205 my $f = $next->f;
651              
652 23 100       334 next if $f->is_cancelled;
653              
654 22         253 $self->debug_printf( "UNQUEUE" );
655 22         225 $f->done( $self, $worker );
656 22         1299 return;
657             }
658              
659 64 100       230 if( $self->workers_idle > $self->{min_workers} ) {
660 17 100 66     208 $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
661             }
662             }
663              
664             package # hide from indexer
665             IO::Async::Function::Worker;
666              
667 10     10   27910 use base qw( IO::Async::Routine );
  10         23  
  10         7514  
668              
669 10     10   112 use Carp;
  10         17  
  10         569  
670              
671 10     10   6700 use IO::Async::Channel;
  10         44  
  10         450  
672              
673 10     10   5939 use IO::Async::Internals::FunctionWorker;
  10         30  
  10         8183  
674              
675             sub new
676             {
677 55     55   172 my $class = shift;
678 55         1388 my %params = @_;
679              
680 55         472 my $arg_channel = IO::Async::Channel->new;
681 55         144 my $ret_channel = IO::Async::Channel->new;
682              
683 55         101 my $send_initial;
684              
685 55 100       267 if( defined( my $code = $params{code} ) ) {
    50          
686 49         105 my $init_code = $params{init_code};
687              
688             $params{code} = sub {
689 1 50   1   39 $init_code->() if defined $init_code;
690              
691 1         22 IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel );
692 49         1022 };
693             }
694             elsif( defined( my $func = $params{func} ) ) {
695 6         29 my $module = $params{module};
696 6         57 my $init_func = $params{init_func};
697 6         22 my @init_args;
698              
699 6         32 $params{module} = "IO::Async::Internals::FunctionWorker";
700 6         56 $params{func} = "run_worker";
701              
702 6 50       67 ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
703              
704 6         32 $send_initial = [ $module, $func, $init_func, @init_args ];
705             }
706              
707 55         200 delete @params{qw( init_code init_func )};
708              
709 55         377 my $worker = $class->SUPER::new(
710             %params,
711             channels_in => [ $arg_channel ],
712             channels_out => [ $ret_channel ],
713             );
714              
715 55         241 $worker->{arg_channel} = $arg_channel;
716 55         227 $worker->{ret_channel} = $ret_channel;
717              
718 55 100       127 $worker->{send_initial} = $send_initial if $send_initial;
719              
720 55         191 return $worker;
721             }
722              
723             sub _add_to_loop
724             {
725 55     55   90 my $self = shift;
726 55         231 $self->SUPER::_add_to_loop( @_ );
727              
728 53 100       7098 $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
729             }
730              
731             sub configure
732             {
733 55     55   115 my $self = shift;
734 55         224 my %params = @_;
735              
736 55   33     401 exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
737              
738 55         300 $self->SUPER::configure( %params );
739             }
740              
741             sub stop
742             {
743 48     48   84 my $worker = shift;
744 48         304 $worker->{arg_channel}->close;
745              
746 48         61 my $ret;
747 48 100       209 $ret = $worker->result_future if defined wantarray;
748              
749 48 50       147 if( my $function = $worker->parent ) {
750 48         344 delete $function->{workers}{$worker->id};
751              
752 48 100       144 if( $worker->{busy} ) {
753 10         87 $worker->{remove_on_idle}++;
754             }
755             else {
756 38         124 $function->remove_child( $worker );
757             }
758             }
759              
760 48         199 return $ret;
761             }
762              
763             sub call
764             {
765 86     86   161 my $worker = shift;
766 86         166 my ( $args ) = @_;
767              
768 86         1391 $worker->{arg_channel}->send_encoded( $args );
769              
770 86         759 $worker->{busy} = 1;
771 86         226 $worker->{max_calls}--;
772              
773             return $worker->{ret_channel}->recv->then(
774             # on recv
775             $worker->_capture_weakself( sub {
776 83     83   241 my ( $worker, $result ) = @_;
777 83         296 my ( $type, @values ) = @$result;
778              
779             $worker->stop if !$worker->{max_calls} or
780 83 100 66     663 $worker->{exit_on_die} && $type eq "e";
      100        
781              
782 83 100       270 if( $type eq "r" ) {
    50          
783 64         363 return Future->done( @values );
784             }
785             elsif( $type eq "e" ) {
786 19         266 return Future->fail( @values );
787             }
788             else {
789 0         0 die "Unrecognised type from worker - $type\n";
790             }
791             } ),
792             # on EOF
793             $worker->_capture_weakself( sub {
794 2     2   24 my ( $worker ) = @_;
795              
796 2         38 $worker->stop;
797              
798 2         32 return Future->fail( "closed", "closed" );
799             } )
800             )->on_ready( $worker->_capture_weakself( sub {
801 86     86   216 my ( $worker, $f ) = @_;
802 86         245 $worker->{busy} = 0;
803              
804 86         348 my $function = $worker->parent;
805 86 50       737 $function->_dispatch_pending if $function;
806              
807 86 100 66     973 $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
808 86         603 }));
809             }
810              
811             =head1 EXAMPLES
812              
813             =head2 Extended Error Information on Failure
814              
815             The array-unpacking form of exception indiciation allows the function body to
816             more precicely control the resulting failure from the C future.
817              
818             my $divider = IO::Async::Function->new(
819             code => sub {
820             my ( $numerator, $divisor ) = @_;
821             $divisor == 0 and
822             die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
823              
824             return $numerator / $divisor;
825             }
826             );
827              
828             =head1 NOTES
829              
830             For the record, 123454321 is 11111 * 11111, a square number, and therefore not
831             prime.
832              
833             =head1 AUTHOR
834              
835             Paul Evans
836              
837             =cut
838              
839             0x55AA;