| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | #  You may distribute under the terms of either the GNU General Public License | 
| 2 |  |  |  |  |  |  | #  or the Artistic License (the same terms as Perl itself) | 
| 3 |  |  |  |  |  |  | # | 
| 4 |  |  |  |  |  |  | #  (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk | 
| 5 |  |  |  |  |  |  |  | 
| 6 |  |  |  |  |  |  | package IO::Async::Function; | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 10 |  |  | 10 |  | 272144 | use strict; | 
|  | 10 |  |  |  |  | 18 |  | 
|  | 10 |  |  |  |  | 248 |  | 
| 9 | 10 |  |  | 10 |  | 46 | use warnings; | 
|  | 10 |  |  |  |  | 110 |  | 
|  | 10 |  |  |  |  | 413 |  | 
| 10 |  |  |  |  |  |  |  | 
| 11 |  |  |  |  |  |  | our $VERSION = '0.802'; | 
| 12 |  |  |  |  |  |  |  | 
| 13 | 10 |  |  | 10 |  | 49 | use base qw( IO::Async::Notifier ); | 
|  | 10 |  |  |  |  | 18 |  | 
|  | 10 |  |  |  |  | 1969 |  | 
| 14 | 10 |  |  | 10 |  | 3802 | use IO::Async::Timer::Countdown; | 
|  | 10 |  |  |  |  | 21 |  | 
|  | 10 |  |  |  |  | 224 |  | 
| 15 |  |  |  |  |  |  |  | 
| 16 | 10 |  |  | 10 |  | 52 | use Carp; | 
|  | 10 |  |  |  |  | 16 |  | 
|  | 10 |  |  |  |  | 443 |  | 
| 17 |  |  |  |  |  |  |  | 
| 18 | 10 |  |  | 10 |  | 49 | use List::Util qw( first ); | 
|  | 10 |  |  |  |  | 12 |  | 
|  | 10 |  |  |  |  | 599 |  | 
| 19 |  |  |  |  |  |  |  | 
| 20 | 10 |  |  | 10 |  | 3014 | use Struct::Dumb qw( readonly_struct ); | 
|  | 10 |  |  |  |  | 11798 |  | 
|  | 10 |  |  |  |  | 43 |  | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | readonly_struct Pending => [qw( priority f )]; | 
| 23 |  |  |  |  |  |  |  | 
| 24 |  |  |  |  |  |  | =head1 NAME | 
| 25 |  |  |  |  |  |  |  | 
| 26 |  |  |  |  |  |  | C - call a function asynchronously | 
| 27 |  |  |  |  |  |  |  | 
| 28 |  |  |  |  |  |  | =head1 SYNOPSIS | 
| 29 |  |  |  |  |  |  |  | 
| 30 |  |  |  |  |  |  | use IO::Async::Function; | 
| 31 |  |  |  |  |  |  |  | 
| 32 |  |  |  |  |  |  | use IO::Async::Loop; | 
| 33 |  |  |  |  |  |  | my $loop = IO::Async::Loop->new; | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | my $function = IO::Async::Function->new( | 
| 36 |  |  |  |  |  |  | code => sub { | 
| 37 |  |  |  |  |  |  | my ( $number ) = @_; | 
| 38 |  |  |  |  |  |  | return is_prime( $number ); | 
| 39 |  |  |  |  |  |  | }, | 
| 40 |  |  |  |  |  |  | ); | 
| 41 |  |  |  |  |  |  |  | 
| 42 |  |  |  |  |  |  | $loop->add( $function ); | 
| 43 |  |  |  |  |  |  |  | 
| 44 |  |  |  |  |  |  | $function->call( | 
| 45 |  |  |  |  |  |  | args => [ 123454321 ], | 
| 46 |  |  |  |  |  |  | )->on_done( sub { | 
| 47 |  |  |  |  |  |  | my $isprime = shift; | 
| 48 |  |  |  |  |  |  | print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n"; | 
| 49 |  |  |  |  |  |  | })->on_fail( sub { | 
| 50 |  |  |  |  |  |  | print STDERR "Cannot determine if it's prime - $_[0]\n"; | 
| 51 |  |  |  |  |  |  | })->get; | 
| 52 |  |  |  |  |  |  |  | 
| 53 |  |  |  |  |  |  | =head1 DESCRIPTION | 
| 54 |  |  |  |  |  |  |  | 
| 55 |  |  |  |  |  |  | This subclass of L wraps a function body in a collection | 
| 56 |  |  |  |  |  |  | of worker processes, to allow it to execute independently of the main process. | 
| 57 |  |  |  |  |  |  | The object acts as a proxy to the function, allowing invocations to be made by | 
| 58 |  |  |  |  |  |  | passing in arguments, and invoking a continuation in the main process when the | 
| 59 |  |  |  |  |  |  | function returns. | 
| 60 |  |  |  |  |  |  |  | 
| 61 |  |  |  |  |  |  | The object represents the function code itself, rather than one specific | 
| 62 |  |  |  |  |  |  | invocation of it. It can be called multiple times, by the C method. | 
| 63 |  |  |  |  |  |  | Multiple outstanding invocations can be called; they will be dispatched in | 
| 64 |  |  |  |  |  |  | the order they were queued. If only one worker process is used then results | 
| 65 |  |  |  |  |  |  | will be returned in the order they were called. If multiple are used, then | 
| 66 |  |  |  |  |  |  | each request will be sent in the order called, but timing differences between | 
| 67 |  |  |  |  |  |  | each worker may mean results are returned in a different order. | 
| 68 |  |  |  |  |  |  |  | 
| 69 |  |  |  |  |  |  | Since the code block will be called multiple times within the same child | 
| 70 |  |  |  |  |  |  | process, it must take care not to modify any of its state that might affect | 
| 71 |  |  |  |  |  |  | subsequent calls. Since it executes in a child process, it cannot make any | 
| 72 |  |  |  |  |  |  | modifications to the state of the parent program. Therefore, all the data | 
| 73 |  |  |  |  |  |  | required to perform its task must be represented in the call arguments, and | 
| 74 |  |  |  |  |  |  | all of the result must be represented in the return values. | 
| 75 |  |  |  |  |  |  |  | 
| 76 |  |  |  |  |  |  | The Function object is implemented using an L with two | 
| 77 |  |  |  |  |  |  | L objects to pass calls into and results out from it. | 
| 78 |  |  |  |  |  |  |  | 
| 79 |  |  |  |  |  |  | The L framework generally provides mechanisms for multiplexing IO | 
| 80 |  |  |  |  |  |  | tasks between different handles, so there aren't many occasions when such an | 
| 81 |  |  |  |  |  |  | asynchronous function is necessary. Two cases where this does become useful | 
| 82 |  |  |  |  |  |  | are: | 
| 83 |  |  |  |  |  |  |  | 
| 84 |  |  |  |  |  |  | =over 4 | 
| 85 |  |  |  |  |  |  |  | 
| 86 |  |  |  |  |  |  | =item 1. | 
| 87 |  |  |  |  |  |  |  | 
| 88 |  |  |  |  |  |  | When a large amount of computationally-intensive work needs to be performed | 
| 89 |  |  |  |  |  |  | (for example, the C test in the example in the C). | 
| 90 |  |  |  |  |  |  |  | 
| 91 |  |  |  |  |  |  | =item 2. | 
| 92 |  |  |  |  |  |  |  | 
| 93 |  |  |  |  |  |  | When a blocking OS syscall or library-level function needs to be called, and | 
| 94 |  |  |  |  |  |  | no nonblocking or asynchronous version is supplied. This is used by | 
| 95 |  |  |  |  |  |  | L. | 
| 96 |  |  |  |  |  |  |  | 
| 97 |  |  |  |  |  |  | =back | 
| 98 |  |  |  |  |  |  |  | 
| 99 |  |  |  |  |  |  | This object is ideal for representing "pure" functions; that is, blocks of | 
| 100 |  |  |  |  |  |  | code which have no stateful effect on the process, and whose result depends | 
| 101 |  |  |  |  |  |  | only on the arguments passed in. For a more general co-routine ability, see | 
| 102 |  |  |  |  |  |  | also L. | 
| 103 |  |  |  |  |  |  |  | 
| 104 |  |  |  |  |  |  | =cut | 
| 105 |  |  |  |  |  |  |  | 
| 106 |  |  |  |  |  |  | =head1 PARAMETERS | 
| 107 |  |  |  |  |  |  |  | 
| 108 |  |  |  |  |  |  | The following named parameters may be passed to C or C: | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  | =head2 code => CODE | 
| 111 |  |  |  |  |  |  |  | 
| 112 |  |  |  |  |  |  | The body of the function to execute. | 
| 113 |  |  |  |  |  |  |  | 
| 114 |  |  |  |  |  |  | @result = $code->( @args ) | 
| 115 |  |  |  |  |  |  |  | 
| 116 |  |  |  |  |  |  | =head2 init_code => CODE | 
| 117 |  |  |  |  |  |  |  | 
| 118 |  |  |  |  |  |  | Optional. If defined, this is invoked exactly once in every child process or | 
| 119 |  |  |  |  |  |  | thread, after it is created, but before the first invocation of the function | 
| 120 |  |  |  |  |  |  | body itself. | 
| 121 |  |  |  |  |  |  |  | 
| 122 |  |  |  |  |  |  | $init_code->() | 
| 123 |  |  |  |  |  |  |  | 
| 124 |  |  |  |  |  |  | =head2 module => STRING | 
| 125 |  |  |  |  |  |  |  | 
| 126 |  |  |  |  |  |  | =head2 func => STRING | 
| 127 |  |  |  |  |  |  |  | 
| 128 |  |  |  |  |  |  | I | 
| 129 |  |  |  |  |  |  |  | 
| 130 |  |  |  |  |  |  | An alternative to the C  argument, which names a module to load and a  | 
| 131 |  |  |  |  |  |  | function to call within it. C should give a perl module name (i.e. | 
| 132 |  |  |  |  |  |  | C, not a filename like F), and C should give | 
| 133 |  |  |  |  |  |  | the basename of a function within that module (i.e. without the module name | 
| 134 |  |  |  |  |  |  | prefixed). It will be invoked, without extra arguments, as the main code | 
| 135 |  |  |  |  |  |  | body of the object. | 
| 136 |  |  |  |  |  |  |  | 
| 137 |  |  |  |  |  |  | The task of loading this module and resolving the resulting function from it | 
| 138 |  |  |  |  |  |  | is only performed on the remote worker side, so the controlling process will | 
| 139 |  |  |  |  |  |  | not need to actually load the module. | 
| 140 |  |  |  |  |  |  |  | 
| 141 |  |  |  |  |  |  | =head2 init_func => STRING or ARRAY [ STRING, ... ] | 
| 142 |  |  |  |  |  |  |  | 
| 143 |  |  |  |  |  |  | Optional addition to the C and C alternatives. Names a function | 
| 144 |  |  |  |  |  |  | within the module to call each time a new worker is created. | 
| 145 |  |  |  |  |  |  |  | 
| 146 |  |  |  |  |  |  | If this value is an array reference, its first element must be a string giving | 
| 147 |  |  |  |  |  |  | the name of the function; the remaining values are passed to that function as | 
| 148 |  |  |  |  |  |  | arguments. | 
| 149 |  |  |  |  |  |  |  | 
| 150 |  |  |  |  |  |  | =head2 model => "fork" | "thread" | "spawn" | 
| 151 |  |  |  |  |  |  |  | 
| 152 |  |  |  |  |  |  | Optional. Requests a specific L model. If not supplied, | 
| 153 |  |  |  |  |  |  | leaves the default choice up to Routine. | 
| 154 |  |  |  |  |  |  |  | 
| 155 |  |  |  |  |  |  | =head2 min_workers => INT | 
| 156 |  |  |  |  |  |  |  | 
| 157 |  |  |  |  |  |  | =head2 max_workers => INT | 
| 158 |  |  |  |  |  |  |  | 
| 159 |  |  |  |  |  |  | The lower and upper bounds of worker processes to try to keep running. The | 
| 160 |  |  |  |  |  |  | actual number running at any time will be kept somewhere between these bounds | 
| 161 |  |  |  |  |  |  | according to load. | 
| 162 |  |  |  |  |  |  |  | 
| 163 |  |  |  |  |  |  | =head2 max_worker_calls => INT | 
| 164 |  |  |  |  |  |  |  | 
| 165 |  |  |  |  |  |  | Optional. If provided, stop a worker process after it has processed this | 
| 166 |  |  |  |  |  |  | number of calls. (New workers may be started to replace stopped ones, within | 
| 167 |  |  |  |  |  |  | the bounds given above). | 
| 168 |  |  |  |  |  |  |  | 
| 169 |  |  |  |  |  |  | =head2 idle_timeout => NUM | 
| 170 |  |  |  |  |  |  |  | 
| 171 |  |  |  |  |  |  | Optional. If provided, idle worker processes will be shut down after this | 
| 172 |  |  |  |  |  |  | amount of time, if there are more than C of them. | 
| 173 |  |  |  |  |  |  |  | 
| 174 |  |  |  |  |  |  | =head2 exit_on_die => BOOL | 
| 175 |  |  |  |  |  |  |  | 
| 176 |  |  |  |  |  |  | Optional boolean, controls what happens after the C  throws an  | 
| 177 |  |  |  |  |  |  | exception. If missing or false, the worker will continue running to process | 
| 178 |  |  |  |  |  |  | more requests. If true, the worker will be shut down. A new worker might be | 
| 179 |  |  |  |  |  |  | constructed by the C method to replace it, if necessary. | 
| 180 |  |  |  |  |  |  |  | 
| 181 |  |  |  |  |  |  | =head2 setup => ARRAY | 
| 182 |  |  |  |  |  |  |  | 
| 183 |  |  |  |  |  |  | Optional array reference. Specifies the C key to pass to the underlying | 
| 184 |  |  |  |  |  |  | L when setting up new worker processes. | 
| 185 |  |  |  |  |  |  |  | 
| 186 |  |  |  |  |  |  | =cut | 
| 187 |  |  |  |  |  |  |  | 
| 188 |  |  |  |  |  |  | sub _init | 
| 189 |  |  |  |  |  |  | { | 
| 190 | 49 |  |  | 49 |  | 108 | my $self = shift; | 
| 191 | 49 |  |  |  |  | 302 | $self->SUPER::_init( @_ ); | 
| 192 |  |  |  |  |  |  |  | 
| 193 | 49 |  |  |  |  | 148 | $self->{min_workers} = 1; | 
| 194 | 49 |  |  |  |  | 80 | $self->{max_workers} = 8; | 
| 195 |  |  |  |  |  |  |  | 
| 196 | 49 |  |  |  |  | 113 | $self->{workers} = {}; # {$id} => IaFunction:Worker | 
| 197 |  |  |  |  |  |  |  | 
| 198 | 49 |  |  |  |  | 122 | $self->{pending_queue} = []; | 
| 199 |  |  |  |  |  |  | } | 
| 200 |  |  |  |  |  |  |  | 
| 201 |  |  |  |  |  |  | sub configure | 
| 202 |  |  |  |  |  |  | { | 
| 203 | 49 |  |  | 49 | 1 | 71 | my $self = shift; | 
| 204 | 49 |  |  |  |  | 124 | my %params = @_; | 
| 205 |  |  |  |  |  |  |  | 
| 206 | 49 |  |  |  |  | 63 | my %worker_params; | 
| 207 | 49 |  |  |  |  | 107 | foreach (qw( model exit_on_die max_worker_calls )) { | 
| 208 | 147 | 100 |  |  |  | 349 | $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_}; | 
| 209 |  |  |  |  |  |  | } | 
| 210 |  |  |  |  |  |  |  | 
| 211 | 49 | 100 |  |  |  | 130 | if( keys %worker_params ) { | 
| 212 | 14 |  |  |  |  | 74 | foreach my $worker ( $self->_worker_objects ) { | 
| 213 | 0 |  |  |  |  | 0 | $worker->configure( %worker_params ); | 
| 214 |  |  |  |  |  |  | } | 
| 215 |  |  |  |  |  |  | } | 
| 216 |  |  |  |  |  |  |  | 
| 217 | 49 | 100 |  |  |  | 134 | if( exists $params{idle_timeout} ) { | 
| 218 | 7 |  |  |  |  | 25 | my $timeout = delete $params{idle_timeout}; | 
| 219 | 7 | 50 |  |  |  | 36 | if( !$timeout ) { | 
|  |  | 50 |  |  |  |  |  | 
| 220 | 0 | 0 |  |  |  | 0 | $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer}; | 
| 221 |  |  |  |  |  |  | } | 
| 222 |  |  |  |  |  |  | elsif( my $idle_timer = $self->{idle_timer} ) { | 
| 223 | 0 |  |  |  |  | 0 | $idle_timer->configure( delay => $timeout ); | 
| 224 |  |  |  |  |  |  | } | 
| 225 |  |  |  |  |  |  | else { | 
| 226 |  |  |  |  |  |  | $self->{idle_timer} = IO::Async::Timer::Countdown->new( | 
| 227 |  |  |  |  |  |  | delay => $timeout, | 
| 228 |  |  |  |  |  |  | on_expire => $self->_capture_weakself( sub { | 
| 229 | 1 | 50 |  | 1 |  | 22 | my $self = shift or return; | 
| 230 | 1 |  |  |  |  | 6 | my $workers = $self->{workers}; | 
| 231 |  |  |  |  |  |  |  | 
| 232 |  |  |  |  |  |  | # Shut down atmost one idle worker, starting from the highest | 
| 233 |  |  |  |  |  |  | # ID. Since we search from lowest to assign work, this tries | 
| 234 |  |  |  |  |  |  | # to ensure we'll shut down the least useful ones first, | 
| 235 |  |  |  |  |  |  | # keeping more useful ones in memory (page/cache warmth, etc..) | 
| 236 | 1 |  |  |  |  | 8 | foreach my $id ( reverse sort keys %$workers ) { | 
| 237 | 1 | 50 |  |  |  | 6 | next if $workers->{$id}{busy}; | 
| 238 |  |  |  |  |  |  |  | 
| 239 | 1 |  |  |  |  | 7 | $workers->{$id}->stop; | 
| 240 | 1 |  |  |  |  | 3 | last; | 
| 241 |  |  |  |  |  |  | } | 
| 242 |  |  |  |  |  |  |  | 
| 243 |  |  |  |  |  |  | # Still more? | 
| 244 | 1 | 50 |  |  |  | 6 | $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers}; | 
| 245 | 7 |  |  |  |  | 79 | } ), | 
| 246 |  |  |  |  |  |  | ); | 
| 247 | 7 |  |  |  |  | 59 | $self->add_child( $self->{idle_timer} ); | 
| 248 |  |  |  |  |  |  | } | 
| 249 |  |  |  |  |  |  | } | 
| 250 |  |  |  |  |  |  |  | 
| 251 | 49 |  |  |  |  | 101 | foreach (qw( min_workers max_workers )) { | 
| 252 | 98 | 100 |  |  |  | 240 | $self->{$_} = delete $params{$_} if exists $params{$_}; | 
| 253 |  |  |  |  |  |  | # TODO: something about retuning | 
| 254 |  |  |  |  |  |  | } | 
| 255 |  |  |  |  |  |  |  | 
| 256 | 49 |  |  |  |  | 65 | my $need_restart; | 
| 257 |  |  |  |  |  |  |  | 
| 258 | 49 |  |  |  |  | 112 | foreach (qw( init_code code module init_func func setup )) { | 
| 259 | 294 | 100 |  |  |  | 570 | $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_}; | 
| 260 |  |  |  |  |  |  | } | 
| 261 |  |  |  |  |  |  |  | 
| 262 |  |  |  |  |  |  | defined $self->{code} and defined $self->{func} and | 
| 263 | 49 | 50 | 66 |  |  | 492 | croak "Cannot ->configure both 'code' and 'func'"; | 
| 264 |  |  |  |  |  |  | defined $self->{func} and !defined $self->{module} and | 
| 265 | 49 | 50 | 66 |  |  | 313 | croak "'func' parameter requires a 'module' as well"; | 
| 266 |  |  |  |  |  |  |  | 
| 267 | 49 |  |  |  |  | 248 | $self->SUPER::configure( %params ); | 
| 268 |  |  |  |  |  |  |  | 
| 269 | 49 | 50 | 33 |  |  | 316 | if( $need_restart and $self->loop ) { | 
| 270 | 0 |  |  |  |  | 0 | $self->stop; | 
| 271 | 0 |  |  |  |  | 0 | $self->start; | 
| 272 |  |  |  |  |  |  | } | 
| 273 |  |  |  |  |  |  | } | 
| 274 |  |  |  |  |  |  |  | 
| 275 |  |  |  |  |  |  | sub _add_to_loop | 
| 276 |  |  |  |  |  |  | { | 
| 277 | 49 |  |  | 49 |  | 86 | my $self = shift; | 
| 278 | 49 |  |  |  |  | 166 | $self->SUPER::_add_to_loop( @_ ); | 
| 279 |  |  |  |  |  |  |  | 
| 280 | 49 |  |  |  |  | 142 | $self->start; | 
| 281 |  |  |  |  |  |  | } | 
| 282 |  |  |  |  |  |  |  | 
| 283 |  |  |  |  |  |  | sub _remove_from_loop | 
| 284 |  |  |  |  |  |  | { | 
| 285 | 37 |  |  | 37 |  | 71 | my $self = shift; | 
| 286 |  |  |  |  |  |  |  | 
| 287 | 37 |  |  |  |  | 185 | $self->stop; | 
| 288 |  |  |  |  |  |  |  | 
| 289 | 37 |  |  |  |  | 141 | $self->SUPER::_remove_from_loop( @_ ); | 
| 290 |  |  |  |  |  |  | } | 
| 291 |  |  |  |  |  |  |  | 
| 292 |  |  |  |  |  |  | =head1 METHODS | 
| 293 |  |  |  |  |  |  |  | 
| 294 |  |  |  |  |  |  | The following methods documented with a trailing call to C<< ->get >> return | 
| 295 |  |  |  |  |  |  | L instances. | 
| 296 |  |  |  |  |  |  |  | 
| 297 |  |  |  |  |  |  | =cut | 
| 298 |  |  |  |  |  |  |  | 
| 299 |  |  |  |  |  |  | =head2 start | 
| 300 |  |  |  |  |  |  |  | 
| 301 |  |  |  |  |  |  | $function->start | 
| 302 |  |  |  |  |  |  |  | 
| 303 |  |  |  |  |  |  | Start the worker processes | 
| 304 |  |  |  |  |  |  |  | 
| 305 |  |  |  |  |  |  | =cut | 
| 306 |  |  |  |  |  |  |  | 
| 307 |  |  |  |  |  |  | sub start | 
| 308 |  |  |  |  |  |  | { | 
| 309 | 51 |  |  | 51 | 1 | 67 | my $self = shift; | 
| 310 |  |  |  |  |  |  |  | 
| 311 | 51 |  |  |  |  | 204 | $self->_new_worker for 1 .. $self->{min_workers}; | 
| 312 |  |  |  |  |  |  | } | 
| 313 |  |  |  |  |  |  |  | 
| 314 |  |  |  |  |  |  | =head2 stop | 
| 315 |  |  |  |  |  |  |  | 
| 316 |  |  |  |  |  |  | $function->stop | 
| 317 |  |  |  |  |  |  |  | 
| 318 |  |  |  |  |  |  | Stop the worker processes | 
| 319 |  |  |  |  |  |  |  | 
| 320 |  |  |  |  |  |  | $f = $function->stop | 
| 321 |  |  |  |  |  |  |  | 
| 322 |  |  |  |  |  |  | I | 
| 323 |  |  |  |  |  |  |  | 
| 324 |  |  |  |  |  |  | If called in non-void context, returns a L instance that | 
| 325 |  |  |  |  |  |  | will complete once every worker process has stopped and exited. This may be | 
| 326 |  |  |  |  |  |  | useful for waiting until all of the processes are waited on, or other | 
| 327 |  |  |  |  |  |  | edge-cases, but is not otherwise particularly useful. | 
| 328 |  |  |  |  |  |  |  | 
| 329 |  |  |  |  |  |  | =cut | 
| 330 |  |  |  |  |  |  |  | 
| 331 |  |  |  |  |  |  | sub stop | 
| 332 |  |  |  |  |  |  | { | 
| 333 | 43 |  |  | 43 | 1 | 74 | my $self = shift; | 
| 334 |  |  |  |  |  |  |  | 
| 335 | 43 |  |  |  |  | 436 | $self->{stopping} = 1; | 
| 336 |  |  |  |  |  |  |  | 
| 337 | 43 |  |  |  |  | 92 | my @f; | 
| 338 |  |  |  |  |  |  |  | 
| 339 | 43 |  |  |  |  | 113 | foreach my $worker ( $self->_worker_objects ) { | 
| 340 | 38 | 100 |  |  |  | 191 | defined wantarray ? push @f, $worker->stop : $worker->stop; | 
| 341 |  |  |  |  |  |  | } | 
| 342 |  |  |  |  |  |  |  | 
| 343 | 43 | 100 |  |  |  | 1311 | return Future->needs_all( @f ) if defined wantarray; | 
| 344 |  |  |  |  |  |  | } | 
| 345 |  |  |  |  |  |  |  | 
| 346 |  |  |  |  |  |  | =head2 restart | 
| 347 |  |  |  |  |  |  |  | 
| 348 |  |  |  |  |  |  | $function->restart | 
| 349 |  |  |  |  |  |  |  | 
| 350 |  |  |  |  |  |  | Gracefully stop and restart all the worker processes. | 
| 351 |  |  |  |  |  |  |  | 
| 352 |  |  |  |  |  |  | =cut | 
| 353 |  |  |  |  |  |  |  | 
| 354 |  |  |  |  |  |  | sub restart | 
| 355 |  |  |  |  |  |  | { | 
| 356 | 2 |  |  | 2 | 1 | 1787 | my $self = shift; | 
| 357 |  |  |  |  |  |  |  | 
| 358 | 2 |  |  |  |  | 20 | $self->stop; | 
| 359 | 2 |  |  |  |  | 12 | $self->start; | 
| 360 |  |  |  |  |  |  | } | 
| 361 |  |  |  |  |  |  |  | 
| 362 |  |  |  |  |  |  | =head2 call | 
| 363 |  |  |  |  |  |  |  | 
| 364 |  |  |  |  |  |  | @result = $function->call( %params )->get | 
| 365 |  |  |  |  |  |  |  | 
| 366 |  |  |  |  |  |  | Schedules an invocation of the contained function to be executed on one of the | 
| 367 |  |  |  |  |  |  | worker processes. If a non-busy worker is available now, it will be called | 
| 368 |  |  |  |  |  |  | immediately. If not, it will be queued and sent to the next free worker that | 
| 369 |  |  |  |  |  |  | becomes available. | 
| 370 |  |  |  |  |  |  |  | 
| 371 |  |  |  |  |  |  | The request will already have been serialised by the marshaller, so it will be | 
| 372 |  |  |  |  |  |  | safe to modify any referenced data structures in the arguments after this call | 
| 373 |  |  |  |  |  |  | returns. | 
| 374 |  |  |  |  |  |  |  | 
| 375 |  |  |  |  |  |  | The C<%params> hash takes the following keys: | 
| 376 |  |  |  |  |  |  |  | 
| 377 |  |  |  |  |  |  | =over 8 | 
| 378 |  |  |  |  |  |  |  | 
| 379 |  |  |  |  |  |  | =item args => ARRAY | 
| 380 |  |  |  |  |  |  |  | 
| 381 |  |  |  |  |  |  | A reference to the array of arguments to pass to the code. | 
| 382 |  |  |  |  |  |  |  | 
| 383 |  |  |  |  |  |  | =item priority => NUM | 
| 384 |  |  |  |  |  |  |  | 
| 385 |  |  |  |  |  |  | Optional. Defines the sorting order when no workers are available and calls | 
| 386 |  |  |  |  |  |  | must be queued for later. A default of zero will apply if not provided. | 
| 387 |  |  |  |  |  |  |  | 
| 388 |  |  |  |  |  |  | Higher values cause the call to be considered more important, and will be | 
| 389 |  |  |  |  |  |  | placed earlier in the queue than calls with a smaller value. Calls of equal | 
| 390 |  |  |  |  |  |  | priority are still handled in FIFO order. | 
| 391 |  |  |  |  |  |  |  | 
| 392 |  |  |  |  |  |  | =back | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | If the function body returns normally the list of results are provided as the | 
| 395 |  |  |  |  |  |  | (successful) result of returned future. If the function throws an exception | 
| 396 |  |  |  |  |  |  | this results in a failed future. In the special case that the exception is in | 
| 397 |  |  |  |  |  |  | fact an unblessed C reference, this array is unpacked and used as-is | 
| 398 |  |  |  |  |  |  | for the C result. If the exception is not such a reference, it is used | 
| 399 |  |  |  |  |  |  | as the first argument to C, in the category of C. | 
| 400 |  |  |  |  |  |  |  | 
| 401 |  |  |  |  |  |  | $f->done( @result ) | 
| 402 |  |  |  |  |  |  |  | 
| 403 |  |  |  |  |  |  | $f->fail( @{ $exception } ) | 
| 404 |  |  |  |  |  |  | $f->fail( $exception, error => ) | 
| 405 |  |  |  |  |  |  |  | 
| 406 |  |  |  |  |  |  | =head2 call (void) | 
| 407 |  |  |  |  |  |  |  | 
| 408 |  |  |  |  |  |  | $function->call( %params ) | 
| 409 |  |  |  |  |  |  |  | 
| 410 |  |  |  |  |  |  | When not returning a future, the C, C and C | 
| 411 |  |  |  |  |  |  | arguments give continuations to handle successful results or failure. | 
| 412 |  |  |  |  |  |  |  | 
| 413 |  |  |  |  |  |  | =over 8 | 
| 414 |  |  |  |  |  |  |  | 
| 415 |  |  |  |  |  |  | =item on_result => CODE | 
| 416 |  |  |  |  |  |  |  | 
| 417 |  |  |  |  |  |  | A continuation that is invoked when the code has been executed. If the code | 
| 418 |  |  |  |  |  |  | returned normally, it is called as: | 
| 419 |  |  |  |  |  |  |  | 
| 420 |  |  |  |  |  |  | $on_result->( 'return', @values ) | 
| 421 |  |  |  |  |  |  |  | 
| 422 |  |  |  |  |  |  | If the code threw an exception, or some other error occurred such as a closed | 
| 423 |  |  |  |  |  |  | connection or the process died, it is called as: | 
| 424 |  |  |  |  |  |  |  | 
| 425 |  |  |  |  |  |  | $on_result->( 'error', $exception_name ) | 
| 426 |  |  |  |  |  |  |  | 
| 427 |  |  |  |  |  |  | =item on_return => CODE and on_error => CODE | 
| 428 |  |  |  |  |  |  |  | 
| 429 |  |  |  |  |  |  | An alternative to C. Two continuations to use in either of the | 
| 430 |  |  |  |  |  |  | circumstances given above. They will be called directly, without the leading | 
| 431 |  |  |  |  |  |  | 'return' or 'error' value. | 
| 432 |  |  |  |  |  |  |  | 
| 433 |  |  |  |  |  |  | =back | 
| 434 |  |  |  |  |  |  |  | 
| 435 |  |  |  |  |  |  | =cut | 
| 436 |  |  |  |  |  |  |  | 
| 437 |  |  |  |  |  |  | sub debug_printf_call | 
| 438 |  |  |  |  |  |  | { | 
| 439 | 75 |  |  | 75 | 0 | 103 | my $self = shift; | 
| 440 | 75 |  |  |  |  | 439 | $self->debug_printf( "CALL" ); | 
| 441 |  |  |  |  |  |  | } | 
| 442 |  |  |  |  |  |  |  | 
| 443 |  |  |  |  |  |  | sub debug_printf_result | 
| 444 |  |  |  |  |  |  | { | 
| 445 | 52 |  |  | 52 | 0 | 103 | my $self = shift; | 
| 446 | 52 |  |  |  |  | 114 | $self->debug_printf( "RESULT" ); | 
| 447 |  |  |  |  |  |  | } | 
| 448 |  |  |  |  |  |  |  | 
| 449 |  |  |  |  |  |  | sub debug_printf_failure | 
| 450 |  |  |  |  |  |  | { | 
| 451 | 21 |  |  | 21 | 0 | 53 | my $self = shift; | 
| 452 | 21 |  |  |  |  | 54 | my ( $err ) = @_; | 
| 453 | 21 |  |  |  |  | 110 | $self->debug_printf( "FAIL $err" ); | 
| 454 |  |  |  |  |  |  | } | 
| 455 |  |  |  |  |  |  |  | 
| 456 |  |  |  |  |  |  | sub call | 
| 457 |  |  |  |  |  |  | { | 
| 458 | 90 |  |  | 90 | 1 | 7038 | my $self = shift; | 
| 459 | 90 |  |  |  |  | 1060 | my %params = @_; | 
| 460 |  |  |  |  |  |  |  | 
| 461 |  |  |  |  |  |  | # TODO: possibly just queue this? | 
| 462 | 90 | 50 |  |  |  | 277 | $self->loop or croak "Cannot ->call on a Function not yet in a Loop"; | 
| 463 |  |  |  |  |  |  |  | 
| 464 | 90 |  |  |  |  | 208 | my $args = delete $params{args}; | 
| 465 | 90 | 50 |  |  |  | 243 | ref $args eq "ARRAY" or croak "Expected 'args' to be an array"; | 
| 466 |  |  |  |  |  |  |  | 
| 467 | 90 |  |  |  |  | 148 | my ( $on_done, $on_fail ); | 
| 468 | 90 | 100 | 66 |  |  | 669 | if( defined $params{on_result} ) { | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
| 469 | 2 |  |  |  |  | 7 | my $on_result = delete $params{on_result}; | 
| 470 | 2 | 50 |  |  |  | 12 | ref $on_result or croak "Expected 'on_result' to be a reference"; | 
| 471 |  |  |  |  |  |  |  | 
| 472 |  |  |  |  |  |  | $on_done = sub { | 
| 473 | 2 |  |  | 2 |  | 54 | $on_result->( return => @_ ); | 
| 474 | 2 |  |  |  |  | 16 | }; | 
| 475 |  |  |  |  |  |  | $on_fail = sub { | 
| 476 | 0 |  |  | 0 |  | 0 | my ( $err, @values ) = @_; | 
| 477 | 0 |  |  |  |  | 0 | $on_result->( error => @values ); | 
| 478 | 2 |  |  |  |  | 24 | }; | 
| 479 |  |  |  |  |  |  | } | 
| 480 |  |  |  |  |  |  | elsif( defined $params{on_return} and defined $params{on_error} ) { | 
| 481 | 44 |  |  |  |  | 123 | my $on_return = delete $params{on_return}; | 
| 482 | 44 | 50 |  |  |  | 128 | ref $on_return or croak "Expected 'on_return' to be a reference"; | 
| 483 | 44 |  |  |  |  | 90 | my $on_error  = delete $params{on_error}; | 
| 484 | 44 | 50 |  |  |  | 111 | ref $on_error or croak "Expected 'on_error' to be a reference"; | 
| 485 |  |  |  |  |  |  |  | 
| 486 | 44 |  |  |  |  | 71 | $on_done = $on_return; | 
| 487 | 44 |  |  |  |  | 70 | $on_fail = $on_error; | 
| 488 |  |  |  |  |  |  | } | 
| 489 |  |  |  |  |  |  | elsif( !defined wantarray ) { | 
| 490 | 0 |  |  |  |  | 0 | croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future"; | 
| 491 |  |  |  |  |  |  | } | 
| 492 |  |  |  |  |  |  |  | 
| 493 | 90 |  |  |  |  | 301 | $self->debug_printf_call( @$args ); | 
| 494 |  |  |  |  |  |  |  | 
| 495 | 90 |  |  |  |  | 937 | my $request = IO::Async::Channel->encode( $args ); | 
| 496 |  |  |  |  |  |  |  | 
| 497 | 90 |  |  |  |  | 183 | my $future; | 
| 498 | 90 | 100 |  |  |  | 205 | if( my $worker = $self->_get_worker ) { | 
| 499 | 66 |  |  |  |  | 302 | $future = $self->_call_worker( $worker, $request ); | 
| 500 |  |  |  |  |  |  | } | 
| 501 |  |  |  |  |  |  | else { | 
| 502 | 23 |  |  |  |  | 88 | $self->debug_printf( "QUEUE" ); | 
| 503 | 23 |  |  |  |  | 59 | my $queue = $self->{pending_queue}; | 
| 504 |  |  |  |  |  |  |  | 
| 505 |  |  |  |  |  |  | my $next = Pending( | 
| 506 | 23 |  | 100 |  |  | 128 | my $priority = $params{priority} || 0, | 
| 507 |  |  |  |  |  |  | my $wait_f = $self->loop->new_future, | 
| 508 |  |  |  |  |  |  | ); | 
| 509 |  |  |  |  |  |  |  | 
| 510 | 23 | 100 |  |  |  | 383 | if( $priority ) { | 
| 511 | 9 |  |  | 12 |  | 111 | my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue; | 
|  | 12 |  |  |  |  | 72 |  | 
| 512 | 9 |  | 33 |  |  | 114 | splice @$queue, $idx // $#$queue+1, 0, ( $next ); | 
| 513 |  |  |  |  |  |  | } | 
| 514 |  |  |  |  |  |  | else { | 
| 515 | 14 |  |  |  |  | 58 | push @$queue, $next; | 
| 516 |  |  |  |  |  |  | } | 
| 517 |  |  |  |  |  |  |  | 
| 518 |  |  |  |  |  |  | $future = $wait_f->then( sub { | 
| 519 | 22 |  |  | 22 |  | 1428 | my ( $self, $worker ) = @_; | 
| 520 | 22 |  |  |  |  | 65 | $self->_call_worker( $worker, $request ); | 
| 521 | 23 |  |  |  |  | 162 | }); | 
| 522 |  |  |  |  |  |  | } | 
| 523 |  |  |  |  |  |  |  | 
| 524 |  |  |  |  |  |  | $future->on_done( $self->_capture_weakself( sub { | 
| 525 | 66 | 50 |  | 66 |  | 147 | my $self = shift or return; | 
| 526 | 66 |  |  |  |  | 205 | $self->debug_printf_result( @_ ); | 
| 527 | 89 |  |  |  |  | 1970 | })); | 
| 528 |  |  |  |  |  |  | $future->on_fail( $self->_capture_weakself( sub { | 
| 529 | 21 | 50 |  | 21 |  | 153 | my $self = shift or return; | 
| 530 | 21 |  |  |  |  | 131 | $self->debug_printf_failure( @_ ); | 
| 531 | 89 |  |  |  |  | 2201 | })); | 
| 532 |  |  |  |  |  |  |  | 
| 533 | 89 | 100 |  |  |  | 2232 | $future->on_done( $on_done ) if $on_done; | 
| 534 | 89 | 100 |  |  |  | 990 | $future->on_fail( $on_fail ) if $on_fail; | 
| 535 |  |  |  |  |  |  |  | 
| 536 | 89 | 100 |  |  |  | 1471 | return $future if defined wantarray; | 
| 537 |  |  |  |  |  |  |  | 
| 538 |  |  |  |  |  |  | # Caller is not going to keep hold of the Future, so we have to ensure it | 
| 539 |  |  |  |  |  |  | # stays alive somehow | 
| 540 | 36 |  |  | 14 |  | 235 | $self->adopt_future( $future->else( sub { Future->done } ) ); | 
|  | 14 |  |  |  |  | 611 |  | 
| 541 |  |  |  |  |  |  | } | 
| 542 |  |  |  |  |  |  |  | 
| 543 |  |  |  |  |  |  | sub _worker_objects | 
| 544 |  |  |  |  |  |  | { | 
| 545 | 230 |  |  | 230 |  | 334 | my $self = shift; | 
| 546 | 230 |  |  |  |  | 249 | return values %{ $self->{workers} }; | 
|  | 230 |  |  |  |  | 961 |  | 
| 547 |  |  |  |  |  |  | } | 
| 548 |  |  |  |  |  |  |  | 
| 549 |  |  |  |  |  |  | =head2 workers | 
| 550 |  |  |  |  |  |  |  | 
| 551 |  |  |  |  |  |  | $count = $function->workers | 
| 552 |  |  |  |  |  |  |  | 
| 553 |  |  |  |  |  |  | Returns the total number of worker processes available | 
| 554 |  |  |  |  |  |  |  | 
| 555 |  |  |  |  |  |  | =cut | 
| 556 |  |  |  |  |  |  |  | 
| 557 |  |  |  |  |  |  | sub workers | 
| 558 |  |  |  |  |  |  | { | 
| 559 | 53 |  |  | 53 | 1 | 8965 | my $self = shift; | 
| 560 | 53 |  |  |  |  | 65 | return scalar keys %{ $self->{workers} }; | 
|  | 53 |  |  |  |  | 459 |  | 
| 561 |  |  |  |  |  |  | } | 
| 562 |  |  |  |  |  |  |  | 
| 563 |  |  |  |  |  |  | =head2 workers_busy | 
| 564 |  |  |  |  |  |  |  | 
| 565 |  |  |  |  |  |  | $count = $function->workers_busy | 
| 566 |  |  |  |  |  |  |  | 
| 567 |  |  |  |  |  |  | Returns the number of worker processes that are currently busy | 
| 568 |  |  |  |  |  |  |  | 
| 569 |  |  |  |  |  |  | =cut | 
| 570 |  |  |  |  |  |  |  | 
| 571 |  |  |  |  |  |  | sub workers_busy | 
| 572 |  |  |  |  |  |  | { | 
| 573 | 9 |  |  | 9 | 1 | 4797 | my $self = shift; | 
| 574 | 9 |  |  |  |  | 21 | return scalar grep { $_->{busy} } $self->_worker_objects; | 
|  | 9 |  |  |  |  | 36 |  | 
| 575 |  |  |  |  |  |  | } | 
| 576 |  |  |  |  |  |  |  | 
| 577 |  |  |  |  |  |  | =head2 workers_idle | 
| 578 |  |  |  |  |  |  |  | 
| 579 |  |  |  |  |  |  | $count = $function->workers_idle | 
| 580 |  |  |  |  |  |  |  | 
| 581 |  |  |  |  |  |  | Returns the number of worker processes that are currently idle | 
| 582 |  |  |  |  |  |  |  | 
| 583 |  |  |  |  |  |  | =cut | 
| 584 |  |  |  |  |  |  |  | 
| 585 |  |  |  |  |  |  | sub workers_idle | 
| 586 |  |  |  |  |  |  | { | 
| 587 | 164 |  |  | 164 | 1 | 253 | my $self = shift; | 
| 588 | 164 |  |  |  |  | 399 | return scalar grep { !$_->{busy} } $self->_worker_objects; | 
|  | 169 |  |  |  |  | 694 |  | 
| 589 |  |  |  |  |  |  | } | 
| 590 |  |  |  |  |  |  |  | 
| 591 |  |  |  |  |  |  | sub _new_worker | 
| 592 |  |  |  |  |  |  | { | 
| 593 | 55 |  |  | 55 |  | 110 | my $self = shift; | 
| 594 |  |  |  |  |  |  |  | 
| 595 |  |  |  |  |  |  | my $worker = IO::Async::Function::Worker->new( | 
| 596 | 440 |  |  |  |  | 1107 | ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ), | 
| 597 |  |  |  |  |  |  | max_calls => $self->{max_worker_calls}, | 
| 598 |  |  |  |  |  |  |  | 
| 599 |  |  |  |  |  |  | on_finish => $self->_capture_weakself( sub { | 
| 600 | 3 | 50 |  | 3 |  | 33 | my $self = shift or return; | 
| 601 | 3 |  |  |  |  | 6 | my ( $worker ) = @_; | 
| 602 |  |  |  |  |  |  |  | 
| 603 | 3 | 50 |  |  |  | 36 | return if $self->{stopping}; | 
| 604 |  |  |  |  |  |  |  | 
| 605 | 0 | 0 |  |  |  | 0 | $self->_new_worker if $self->workers < $self->{min_workers}; | 
| 606 |  |  |  |  |  |  |  | 
| 607 | 0 |  |  |  |  | 0 | $self->_dispatch_pending; | 
| 608 | 55 |  |  |  |  | 112 | } ), | 
| 609 |  |  |  |  |  |  | ); | 
| 610 |  |  |  |  |  |  |  | 
| 611 | 55 |  |  |  |  | 324 | $self->add_child( $worker ); | 
| 612 |  |  |  |  |  |  |  | 
| 613 | 53 |  |  |  |  | 745 | return $self->{workers}{$worker->id} = $worker; | 
| 614 |  |  |  |  |  |  | } | 
| 615 |  |  |  |  |  |  |  | 
| 616 |  |  |  |  |  |  | sub _get_worker | 
| 617 |  |  |  |  |  |  | { | 
| 618 | 113 |  |  | 113 |  | 149 | my $self = shift; | 
| 619 |  |  |  |  |  |  |  | 
| 620 | 113 |  |  |  |  | 136 | foreach ( sort keys %{ $self->{workers} } ) { | 
|  | 113 |  |  |  |  | 414 |  | 
| 621 | 104 | 100 |  |  |  | 399 | return $self->{workers}{$_} if !$self->{workers}{$_}{busy}; | 
| 622 |  |  |  |  |  |  | } | 
| 623 |  |  |  |  |  |  |  | 
| 624 | 35 | 100 |  |  |  | 234 | if( $self->workers < $self->{max_workers} ) { | 
| 625 | 12 |  |  |  |  | 84 | return $self->_new_worker; | 
| 626 |  |  |  |  |  |  | } | 
| 627 |  |  |  |  |  |  |  | 
| 628 | 23 |  |  |  |  | 70 | return undef; | 
| 629 |  |  |  |  |  |  | } | 
| 630 |  |  |  |  |  |  |  | 
| 631 |  |  |  |  |  |  | sub _call_worker | 
| 632 |  |  |  |  |  |  | { | 
| 633 | 88 |  |  | 88 |  | 137 | my $self = shift; | 
| 634 | 88 |  |  |  |  | 184 | my ( $worker, $type, $args ) = @_; | 
| 635 |  |  |  |  |  |  |  | 
| 636 | 88 |  |  |  |  | 542 | my $future = $worker->call( $type, $args ); | 
| 637 |  |  |  |  |  |  |  | 
| 638 | 88 | 100 |  |  |  | 1895 | if( $self->workers_idle == 0 ) { | 
| 639 | 86 | 100 |  |  |  | 358 | $self->{idle_timer}->stop if $self->{idle_timer}; | 
| 640 |  |  |  |  |  |  | } | 
| 641 |  |  |  |  |  |  |  | 
| 642 | 88 |  |  |  |  | 207 | return $future; | 
| 643 |  |  |  |  |  |  | } | 
| 644 |  |  |  |  |  |  |  | 
| 645 |  |  |  |  |  |  | sub _dispatch_pending | 
| 646 |  |  |  |  |  |  | { | 
| 647 | 88 |  |  | 88 |  | 123 | my $self = shift; | 
| 648 |  |  |  |  |  |  |  | 
| 649 | 88 |  |  |  |  | 122 | while( my $next = shift @{ $self->{pending_queue} } ) { | 
|  | 89 |  |  |  |  | 464 |  | 
| 650 | 23 | 50 |  |  |  | 130 | my $worker = $self->_get_worker or return; | 
| 651 |  |  |  |  |  |  |  | 
| 652 | 23 |  |  |  |  | 83 | my $f = $next->f; | 
| 653 |  |  |  |  |  |  |  | 
| 654 | 23 | 100 |  |  |  | 196 | next if $f->is_cancelled; | 
| 655 |  |  |  |  |  |  |  | 
| 656 | 22 |  |  |  |  | 158 | $self->debug_printf( "UNQUEUE" ); | 
| 657 | 22 |  |  |  |  | 124 | $f->done( $self, $worker ); | 
| 658 | 22 |  |  |  |  | 790 | return; | 
| 659 |  |  |  |  |  |  | } | 
| 660 |  |  |  |  |  |  |  | 
| 661 | 66 | 100 |  |  |  | 159 | if( $self->workers_idle > $self->{min_workers} ) { | 
| 662 | 19 | 100 | 66 |  |  | 212 | $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running; | 
| 663 |  |  |  |  |  |  | } | 
| 664 |  |  |  |  |  |  | } | 
| 665 |  |  |  |  |  |  |  | 
| 666 |  |  |  |  |  |  | package # hide from indexer | 
| 667 |  |  |  |  |  |  | IO::Async::Function::Worker; | 
| 668 |  |  |  |  |  |  |  | 
| 669 | 10 |  |  | 10 |  | 17774 | use base qw( IO::Async::Routine ); | 
|  | 10 |  |  |  |  | 21 |  | 
|  | 10 |  |  |  |  | 4806 |  | 
| 670 |  |  |  |  |  |  |  | 
| 671 | 10 |  |  | 10 |  | 59 | use Carp; | 
|  | 10 |  |  |  |  | 18 |  | 
|  | 10 |  |  |  |  | 428 |  | 
| 672 |  |  |  |  |  |  |  | 
| 673 | 10 |  |  | 10 |  | 4402 | use IO::Async::Channel; | 
|  | 10 |  |  |  |  | 22 |  | 
|  | 10 |  |  |  |  | 259 |  | 
| 674 |  |  |  |  |  |  |  | 
| 675 | 10 |  |  | 10 |  | 4177 | use IO::Async::Internals::FunctionWorker; | 
|  | 10 |  |  |  |  | 25 |  | 
|  | 10 |  |  |  |  | 6580 |  | 
| 676 |  |  |  |  |  |  |  | 
| 677 |  |  |  |  |  |  | sub new | 
| 678 |  |  |  |  |  |  | { | 
| 679 | 55 |  |  | 55 |  | 161 | my $class = shift; | 
| 680 | 55 |  |  |  |  | 520 | my %params = @_; | 
| 681 |  |  |  |  |  |  |  | 
| 682 | 55 |  |  |  |  | 302 | my $arg_channel = IO::Async::Channel->new; | 
| 683 | 55 |  |  |  |  | 147 | my $ret_channel = IO::Async::Channel->new; | 
| 684 |  |  |  |  |  |  |  | 
| 685 | 55 |  |  |  |  | 66 | my $send_initial; | 
| 686 |  |  |  |  |  |  |  | 
| 687 | 55 | 100 |  |  |  | 238 | if( defined( my $code = $params{code} ) ) { | 
|  |  | 50 |  |  |  |  |  | 
| 688 | 49 |  |  |  |  | 78 | my $init_code = $params{init_code}; | 
| 689 |  |  |  |  |  |  |  | 
| 690 |  |  |  |  |  |  | $params{code} = sub { | 
| 691 | 1 | 50 |  | 1 |  | 20 | $init_code->() if defined $init_code; | 
| 692 |  |  |  |  |  |  |  | 
| 693 | 1 |  |  |  |  | 19 | IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel ); | 
| 694 | 49 |  |  |  |  | 493 | }; | 
| 695 |  |  |  |  |  |  | } | 
| 696 |  |  |  |  |  |  | elsif( defined( my $func = $params{func} ) ) { | 
| 697 | 6 |  |  |  |  | 30 | my $module    = $params{module}; | 
| 698 | 6 |  |  |  |  | 18 | my $init_func = $params{init_func}; | 
| 699 | 6 |  |  |  |  | 15 | my @init_args; | 
| 700 |  |  |  |  |  |  |  | 
| 701 | 6 |  |  |  |  | 38 | $params{module} = "IO::Async::Internals::FunctionWorker"; | 
| 702 | 6 |  |  |  |  | 18 | $params{func}   = "run_worker"; | 
| 703 |  |  |  |  |  |  |  | 
| 704 | 6 | 50 |  |  |  | 28 | ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY"; | 
| 705 |  |  |  |  |  |  |  | 
| 706 | 6 |  |  |  |  | 30 | $send_initial = [ $module, $func, $init_func, @init_args ]; | 
| 707 |  |  |  |  |  |  | } | 
| 708 |  |  |  |  |  |  |  | 
| 709 | 55 |  |  |  |  | 229 | delete @params{qw( init_code init_func )}; | 
| 710 |  |  |  |  |  |  |  | 
| 711 | 55 |  |  |  |  | 355 | my $worker = $class->SUPER::new( | 
| 712 |  |  |  |  |  |  | %params, | 
| 713 |  |  |  |  |  |  | channels_in  => [ $arg_channel ], | 
| 714 |  |  |  |  |  |  | channels_out => [ $ret_channel ], | 
| 715 |  |  |  |  |  |  | ); | 
| 716 |  |  |  |  |  |  |  | 
| 717 | 55 |  |  |  |  | 139 | $worker->{arg_channel} = $arg_channel; | 
| 718 | 55 |  |  |  |  | 105 | $worker->{ret_channel} = $ret_channel; | 
| 719 |  |  |  |  |  |  |  | 
| 720 | 55 | 100 |  |  |  | 114 | $worker->{send_initial} = $send_initial if $send_initial; | 
| 721 |  |  |  |  |  |  |  | 
| 722 | 55 |  |  |  |  | 151 | return $worker; | 
| 723 |  |  |  |  |  |  | } | 
| 724 |  |  |  |  |  |  |  | 
| 725 |  |  |  |  |  |  | sub _add_to_loop | 
| 726 |  |  |  |  |  |  | { | 
| 727 | 55 |  |  | 55 |  | 83 | my $self = shift; | 
| 728 | 55 |  |  |  |  | 191 | $self->SUPER::_add_to_loop( @_ ); | 
| 729 |  |  |  |  |  |  |  | 
| 730 | 53 | 100 |  |  |  | 4439 | $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial}; | 
| 731 |  |  |  |  |  |  | } | 
| 732 |  |  |  |  |  |  |  | 
| 733 |  |  |  |  |  |  | sub configure | 
| 734 |  |  |  |  |  |  | { | 
| 735 | 55 |  |  | 55 |  | 91 | my $self = shift; | 
| 736 | 55 |  |  |  |  | 219 | my %params = @_; | 
| 737 |  |  |  |  |  |  |  | 
| 738 | 55 |  | 33 |  |  | 348 | exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls ); | 
| 739 |  |  |  |  |  |  |  | 
| 740 | 55 |  |  |  |  | 420 | $self->SUPER::configure( %params ); | 
| 741 |  |  |  |  |  |  | } | 
| 742 |  |  |  |  |  |  |  | 
| 743 |  |  |  |  |  |  | sub stop | 
| 744 |  |  |  |  |  |  | { | 
| 745 | 48 |  |  | 48 |  | 109 | my $worker = shift; | 
| 746 | 48 |  |  |  |  | 192 | $worker->{arg_channel}->close; | 
| 747 |  |  |  |  |  |  |  | 
| 748 | 48 |  |  |  |  | 64 | my $ret; | 
| 749 | 48 | 100 |  |  |  | 155 | $ret = $worker->result_future if defined wantarray; | 
| 750 |  |  |  |  |  |  |  | 
| 751 | 48 | 50 |  |  |  | 134 | if( my $function = $worker->parent ) { | 
| 752 | 48 |  |  |  |  | 155 | delete $function->{workers}{$worker->id}; | 
| 753 |  |  |  |  |  |  |  | 
| 754 | 48 | 100 |  |  |  | 136 | if( $worker->{busy} ) { | 
| 755 | 10 |  |  |  |  | 93 | $worker->{remove_on_idle}++; | 
| 756 |  |  |  |  |  |  | } | 
| 757 |  |  |  |  |  |  | else { | 
| 758 | 38 |  |  |  |  | 84 | $function->remove_child( $worker ); | 
| 759 |  |  |  |  |  |  | } | 
| 760 |  |  |  |  |  |  | } | 
| 761 |  |  |  |  |  |  |  | 
| 762 | 48 |  |  |  |  | 177 | return $ret; | 
| 763 |  |  |  |  |  |  | } | 
| 764 |  |  |  |  |  |  |  | 
| 765 |  |  |  |  |  |  | sub call | 
| 766 |  |  |  |  |  |  | { | 
| 767 | 88 |  |  | 88 |  | 142 | my $worker = shift; | 
| 768 | 88 |  |  |  |  | 159 | my ( $args ) = @_; | 
| 769 |  |  |  |  |  |  |  | 
| 770 | 88 |  |  |  |  | 414 | $worker->{arg_channel}->send_encoded( $args ); | 
| 771 |  |  |  |  |  |  |  | 
| 772 | 88 |  |  |  |  | 513 | $worker->{busy} = 1; | 
| 773 | 88 |  |  |  |  | 186 | $worker->{max_calls}--; | 
| 774 |  |  |  |  |  |  |  | 
| 775 |  |  |  |  |  |  | return $worker->{ret_channel}->recv->then( | 
| 776 |  |  |  |  |  |  | # on recv | 
| 777 |  |  |  |  |  |  | $worker->_capture_weakself( sub { | 
| 778 | 85 |  |  | 85 |  | 164 | my ( $worker, $result ) = @_; | 
| 779 | 85 |  |  |  |  | 196 | my ( $type, @values ) = @$result; | 
| 780 |  |  |  |  |  |  |  | 
| 781 |  |  |  |  |  |  | $worker->stop if !$worker->{max_calls} or | 
| 782 | 85 | 100 | 66 |  |  | 435 | $worker->{exit_on_die} && $type eq "e"; | 
|  |  |  | 100 |  |  |  |  | 
| 783 |  |  |  |  |  |  |  | 
| 784 | 85 | 100 |  |  |  | 198 | if( $type eq "r" ) { | 
|  |  | 50 |  |  |  |  |  | 
| 785 | 66 |  |  |  |  | 387 | return Future->done( @values ); | 
| 786 |  |  |  |  |  |  | } | 
| 787 |  |  |  |  |  |  | elsif( $type eq "e" ) { | 
| 788 | 19 |  |  |  |  | 122 | return Future->fail( @values ); | 
| 789 |  |  |  |  |  |  | } | 
| 790 |  |  |  |  |  |  | else { | 
| 791 | 0 |  |  |  |  | 0 | die "Unrecognised type from worker - $type\n"; | 
| 792 |  |  |  |  |  |  | } | 
| 793 |  |  |  |  |  |  | } ), | 
| 794 |  |  |  |  |  |  | # on EOF | 
| 795 |  |  |  |  |  |  | $worker->_capture_weakself( sub { | 
| 796 | 2 |  |  | 2 |  | 16 | my ( $worker ) = @_; | 
| 797 |  |  |  |  |  |  |  | 
| 798 | 2 |  |  |  |  | 28 | $worker->stop; | 
| 799 |  |  |  |  |  |  |  | 
| 800 | 2 |  |  |  |  | 14 | return Future->fail( "closed", "closed" ); | 
| 801 |  |  |  |  |  |  | } ) | 
| 802 |  |  |  |  |  |  | )->on_ready( $worker->_capture_weakself( sub { | 
| 803 | 88 |  |  | 88 |  | 179 | my ( $worker, $f ) = @_; | 
| 804 | 88 |  |  |  |  | 158 | $worker->{busy} = 0; | 
| 805 |  |  |  |  |  |  |  | 
| 806 | 88 |  |  |  |  | 211 | my $function = $worker->parent; | 
| 807 | 88 | 50 |  |  |  | 327 | $function->_dispatch_pending if $function; | 
| 808 |  |  |  |  |  |  |  | 
| 809 | 88 | 100 | 66 |  |  | 811 | $function->remove_child( $worker ) if $function and $worker->{remove_on_idle}; | 
| 810 | 88 |  |  |  |  | 258 | })); | 
| 811 |  |  |  |  |  |  | } | 
| 812 |  |  |  |  |  |  |  | 
| 813 |  |  |  |  |  |  | =head1 EXAMPLES | 
| 814 |  |  |  |  |  |  |  | 
| 815 |  |  |  |  |  |  | =head2 Extended Error Information on Failure | 
| 816 |  |  |  |  |  |  |  | 
| 817 |  |  |  |  |  |  | The array-unpacking form of exception indiciation allows the function body to | 
| 818 |  |  |  |  |  |  | more precicely control the resulting failure from the C future. | 
| 819 |  |  |  |  |  |  |  | 
| 820 |  |  |  |  |  |  | my $divider = IO::Async::Function->new( | 
| 821 |  |  |  |  |  |  | code => sub { | 
| 822 |  |  |  |  |  |  | my ( $numerator, $divisor ) = @_; | 
| 823 |  |  |  |  |  |  | $divisor == 0 and | 
| 824 |  |  |  |  |  |  | die [ "Cannot divide by zero", div_zero => $numerator, $divisor ]; | 
| 825 |  |  |  |  |  |  |  | 
| 826 |  |  |  |  |  |  | return $numerator / $divisor; | 
| 827 |  |  |  |  |  |  | } | 
| 828 |  |  |  |  |  |  | ); | 
| 829 |  |  |  |  |  |  |  | 
| 830 |  |  |  |  |  |  | =head1 NOTES | 
| 831 |  |  |  |  |  |  |  | 
| 832 |  |  |  |  |  |  | For the record, 123454321 is 11111 * 11111, a square number, and therefore not | 
| 833 |  |  |  |  |  |  | prime. | 
| 834 |  |  |  |  |  |  |  | 
| 835 |  |  |  |  |  |  | =head1 AUTHOR | 
| 836 |  |  |  |  |  |  |  | 
| 837 |  |  |  |  |  |  | Paul Evans | 
| 838 |  |  |  |  |  |  |  | 
| 839 |  |  |  |  |  |  | =cut | 
| 840 |  |  |  |  |  |  |  | 
| 841 |  |  |  |  |  |  | 0x55AA; |