File Coverage

blib/lib/Parallel/TaskExecutor.pm
Criterion Covered Total %
statement 144 150 96.0
branch 24 32 75.0
condition 6 9 66.6
subroutine 23 24 95.8
pod 7 8 87.5
total 204 223 91.4


line stmt bran cond sub pod time code
1             package Parallel::TaskExecutor;
2              
3 54     54   1118495 use strict;
  54         90  
  54         1837  
4 54     54   217 use warnings;
  54         107  
  54         2366  
5 54     54   381 use utf8;
  54         77  
  54         425  
6              
7 54     54   31999 use Data::Dumper;
  54         485379  
  54         4418  
8 54     54   24268 use English;
  54         170886  
  54         333  
9 54     54   24077 use Exporter 'import';
  54         125  
  54         1723  
10 54     54   30164 use Hash::Util 'lock_keys';
  54         189486  
  54         323  
11 54     54   7291 use IO::Pipe;
  54         32897  
  54         1752  
12 54     54   30924 use Log::Any::Simple ':default';
  54         1257462  
  54         342  
13 54     54   177303 use Parallel::TaskExecutor::Task;
  54         255  
  54         2470  
14 54     54   432 use Readonly;
  54         109  
  54         3728  
15 54     54   325 use Scalar::Util 'weaken';
  54         106  
  54         2665  
16 54     54   32437 use Time::HiRes 'usleep';
  54         79264  
  54         364  
17              
18             our @EXPORT_OK = qw(default_executor);
19             our %EXPORT_TAGS = (all => \@EXPORT_OK);
20              
21             our @CARP_NOT = 'Parallel::TaskExecutor::Task';
22              
23             our $VERSION = '0.05'; # Remember to change it in Task.pm too.
24              
25             =pod
26              
27             =encoding utf8
28              
29             =head1 NAME
30              
31             Parallel::TaskExecutor
32              
33             =head1 SYNOPSIS
34              
35             Cross-platform executor for parallel tasks executed in forked processes.
36              
37             my $executor = Parallel::TaskExecutor->new();
38             my $task = $executor->run(sub { return 'foo' });
39             $task->wait();
40             is($task->data(), 'foo');
41              
42             =head1 DESCRIPTION
43              
44             This module provides a simple interface to run Perl code in forked processes and
45             receive the result of their processing. This is quite similar to
46             L with a different OO approach, more centered on the task
47             object that can be seen as a very lightweight promise.
48              
49             Note that this module uses L for its logging. So you can use
50             any L to consume its log. For example, put the following in
51             your main application file:
52              
53             use Log::Any::Adapter ('Stderr');
54              
55             In addition, when testing a module that uses B, if
56             you’re using L, you should add the following line at the beginning of
57             each of your tests to initialize the multi-process feature of the test
58             framework:
59              
60             use Test2::IPC;
61              
62             =head1 METHODS
63              
64             =head2 constructor
65              
66             my $executor = Parallel::TaskExecutor->new(%options);
67              
68             Create a new executor. The main possible option is:
69              
70             =over 4
71              
72             =item *
73              
74             B (default = 4): how many different sub-processes
75             can be created in total by this object instance.
76              
77             =back
78              
79             But all the options that can be passed to run() can also be passed to new() and
80             they will apply to all the calls to this object (unless overridden in a specific
81             call to run()).
82              
83             =cut
84              
85             Readonly::Scalar my $default_max_parallel_tasks => 4;
86              
87             sub new {
88 169     169 0 18632635 my ($class, %options) = @_;
89             my $this = bless {
90 169   66     2972 max_parallel_tasks => $options{max_parallel_tasks} // $default_max_parallel_tasks,
91             options => \%options,
92             current_tasks => 0,
93             # Stores a weak reference to all the non-done tasks that have another
94             # reference held by the user.
95             tasks => {},
96             # Stores a non-weak reference to all the non-done tasks that would have went
97             # out of scope otherwise.
98             zombies => {},
99             pid => $PID,
100             }, $class;
101 169         429 lock_keys(%{$this});
  169         1666  
102 169         3795 return $this;
103             }
104              
105             =pod
106              
107             =head2 destructor
108              
109             When a B goes out of scope, its destructor will wait
110             for all the tasks that it started and for which the returned task object is not
111             live. This is a complement to the destructor of L
112             which waits for a task to be done if its parent executor is no longer live.
113              
114             =cut
115              
116             sub DESTROY {
117 115     115   7392 my ($this) = @_;
118             # TODO: consider if this is the correct thing to do or if we should instead
119             # wait for the task here.
120 115 50       2111 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
121 115 100       27807 return unless $PID == $this->{pid};
122 71         420 while (my (undef, $c) = each %{$this->{zombies}}) {
  76         835  
123             # TODO: add an option to abandon the children (but they must be awaited by
124             # someone).
125 5         148 $c->wait();
126             }
127 71         3762 return;
128             }
129              
130             =pod
131              
132             =head2 default_executor()
133              
134             my $executor = default_executor();
135              
136             Returns a default B object with an unspecified
137             parallelism (guaranteed to be more than 1 parallel tasks).
138              
139             =cut
140              
141             my $default_executor = Parallel::TaskExecutor->new(max_parallel_tasks => 10);
142              
143             sub default_executor {
144 6     6 1 1265138 return $default_executor;
145             }
146              
147             my $task_count = 0;
148              
149             # This is a very conservative estimates. On modern system the limit is 64kB.
150             Readonly::Scalar my $default_response_channel_buffer_size => 4096;
151              
152             sub _fork_and_run {
153 565     565   5530 my ($this, $sub, %options) = @_;
154 565         15409 my $miso = IO::Pipe->new(); # From the child to the parent.
155 565         114833 my $task_id = $task_count++;
156 565         5949 trace("Will fork for task ${task_id}");
157 565         1456535 my $pid = fork();
158 565 50       37463 fatal('Cannot fork a sub-process') unless defined $pid;
159 565 100       8412 $this->{current_tasks}++ unless $options{untracked};
160              
161 565 100       6063 if ($pid == 0) {
162             # In the child task
163             # TODO: the code here should be moved to the Task class. It would be clearer
164             # and probably allow a better separation of the properties of the Task class
165             # between those used by the executor or those used by the task.
166 46         9593 $miso->writer();
167 46         16596 trace("Starting child task (id == ${task_id}) in process ${PID}");
168              
169 46 100       4160 if (exists $options{SIG}) {
170 2         17 while (my ($k, $v) = each %{$options{SIG}}) {
  4         129  
171 2         108 $SIG{$k} = $v; ## no critic (RequireLocalizedPunctuationVars)
172             }
173             }
174              
175 46         3812 print $miso "ready\n";
176 46         11055 $miso->flush();
177              
178 46         327 my @out;
179 46         1729 trace("Starting user code in child task (id == ${task_id}) in process ${PID}");
180 46 100       1591 if ($options{scalar}) {
181 2         73 @out = scalar($sub->());
182             } else {
183 44         2231 @out = $sub->();
184             }
185 43         101172 trace("Serializing task result in child task (id == ${task_id}) in process ${PID}");
186 43         718 my $serialized_out;
187             {
188 43         715 local $Data::Dumper::Indent = 0;
  43         1581  
189 43         4149 local $Data::Dumper::Purity = 1;
190 43         2086 local $Data::Dumper::Sparseseen = 1;
191 43         2368 local $Data::Dumper::Varname = 'TASKEXECUTORVAR';
192 43         2340 $serialized_out = Dumper(\@out);
193             }
194 43         25763 trace("Emitting task result in child task (id == ${task_id}) in process ${PID}");
195 43         872 my $size = length($serialized_out);
196 43         402 my $max_size = $default_response_channel_buffer_size;
197 43 50       812 warning("Data returned by process ${PID} for task ${task_id} is too large (%dB)", $size)
198             if $size > $max_size;
199             # Nothing will be read before the process terminate, so the data
200 43         655 print $miso scalar($serialized_out);
201 43         949 trace("Done sending result in child task (id == ${task_id}) in process ${PID}");
202 43 50       2516 close $miso or warning("Can’t close writer side of child task miso channel: ${ERRNO}");
203 43         1011 trace("Exiting child task (id == ${task_id}) in process ${PID}");
204 43         9692 exit 0;
205             }
206              
207             # Still in the parent task
208 519         224936 trace("Started child task (id == ${task_id}) with pid == ${pid}");
209 519         72606 $miso->reader();
210             my $task = Parallel::TaskExecutor::Task->new(
211             untracked => $options{untracked},
212             task_id => $task_id,
213             runner => $this,
214             state => 'running',
215             channel => $miso,
216             pid => $pid,
217             parent => $PID,
218 519         679164 catch_error => $options{catch_error},);
219 519         8104 weaken($task->{runner});
220 519         12641 $this->{tasks}{$task} = $task;
221 519         38906 weaken($this->{tasks}{$task});
222              
223 519         1099805 my $ready = <$miso>;
224 519 50       3652 fatal(
225             "Got unexpected data during ready check of child task (id == ${task_id}) with pid == ${pid}: $ready"
226             ) unless $ready eq "ready\n";
227              
228 519 100       2981 if ($options{wait}) {
229 15         709 trace("Waiting for child $pid to exit (task id == ${task_id})");
230 15         764 $task->wait();
231 15         87 trace("OK, child $pid exited (task id == ${task_id})");
232             }
233 519         59950 return $task;
234             }
235              
236             =pod
237              
238             =head2 run()
239              
240             my $task = $executor->run($sub, %options);
241              
242             Fork a new child process and use it to execute the given I<$sub>. The execution
243             can be tracked using the returned I<$task> object of type
244             L.
245              
246             If there are already B tasks running, then the call will
247             block until the count of running tasks goes below that limit.
248              
249             The possible options are the following:
250              
251             =over 4
252              
253             =item *
254              
255             B (hash-reference): if provided, this specifies a set of signal
256             handlers to be set in the child process. These signal handler are installed
257             before the provided I<$sub> is called and before the call to run() returns.
258              
259             =item *
260              
261             B: if set to a true value, the call to run will wait for the task
262             to be complete before returning (this means that C<$task->done()> will always be
263             true when you get the task).
264              
265             =item *
266              
267             B: by default, a failure of a child task will abort the parent
268             process. If this option is set to true, the failure will be reported by the task
269             instead.
270              
271             =item *
272              
273             B: when set to true, the I<$sub> is called in scalar context. Otherwise
274             it is called in list context.
275              
276             =item *
277              
278             B: if set to true, the task will be run immediately, even if this means
279             exceeding the value for the B passed to the constructor.
280             Note however that the task will still increase by one the number of running
281             tasks tracked by the executor (unless B is also set to true).
282              
283             =item *
284              
285             B: if set to true, the task will not increase the number of running
286             task counted by the executor. However, the call to run() might still be blocked
287             if the number of outstanding tasks exceeds B (unless
288             B is set to true too).
289              
290             =back
291              
292             =cut
293              
294             Readonly::Scalar my $busy_loop_wait_time_us => 1000;
295              
296             sub run {
297 547     547 1 21365 my ($this, $sub, %options) = @_;
298 547         6450 %options = (%{$this->{options}}, %options);
  547         11821  
299             # TODO: add an option to always call _remove_done_tasks here, to cleanup.
300 547   66     26782 while (!$options{forced} && $this->{current_tasks} >= $this->{max_parallel_tasks}) {
301 55537         373208 $this->_remove_done_tasks();
302 55537         87808807 usleep($busy_loop_wait_time_us);
303             }
304 547         13800 return $this->_fork_and_run($sub, %options);
305             }
306              
307             =pod
308              
309             =head2 run_now()
310              
311             my $data = $executor->run_now($sub, %options);
312              
313             Runs the given I<$sub> in a forked process and waits for its result. This never
314             blocks (the I<$sub> is run even if the executor max parallelism is already
315             reached) and this does not increase the counted parallelism of the executor
316             either (in effect the B, B, and B options are set to
317             true).
318              
319             In addition, the B option is set to true if this method is called in
320             scalar context, unless that option was explicitly passed to the run_now() call.
321              
322             =cut
323              
324             sub run_now {
325 18     18 1 61 my ($this, $sub, %options) = @_;
326 18 100 66     205 $options{scalar} = 1 unless exists $options{scalar} || wantarray;
327 18         151 my $task = $this->_fork_and_run($sub, %options, untracked => 1, wait => 1);
328 15         159 $task->wait();
329 15         282 return $task->data();
330             }
331              
332             =pod
333              
334             =head2 wait()
335              
336             $executor->wait();
337              
338             Waits for all the outstanding tasks to terminate. This waits for all the tasks
339             independently of whether their L object is still
340             live.
341              
342             =cut
343              
344             sub wait { ## no critic (ProhibitBuiltinHomonyms)
345 3     3 1 143 my ($this) = @_;
346 3         13 my $nb_children = $this->{current_tasks};
347 3 50       11 return unless $nb_children;
348 3         97 debug("Waiting for ${nb_children} running tasks...");
349 3         86 while (my (undef, $c) = each %{$this->{zombies}}) {
  5         36  
350             # $c is never weak here and wait() will also not delete from this hash
351             # itself
352 2         68 $c->wait();
353 2         24 delete $this->{zombies}{$c}; # $c is both the key and the value.
354             }
355 3         6 while (my (undef, $c) = each %{$this->{tasks}}) {
  4         125  
356             # $c can be a weak reference, but it should never be undef because the task
357             # will remove itself from this hash when it’s done (and the reference is
358             # unweakened when it’s the last reference to the task).
359             # $c->wait() will delete this entry from the hash, but this is legal when
360             # looping with each.
361 1         63 $c->wait();
362             }
363 3         18 return;
364             }
365              
366             =pod
367              
368             =head2 set_max_parallel_tasks()
369              
370             $executor->set_max_parallel_tasks(N)
371              
372             Sets the B option of the executor.
373              
374             =cut
375              
376             sub set_max_parallel_tasks {
377 0     0 1 0 my ($this, $max_parallel_tasks) = @_;
378 0         0 $this->{max_parallel_tasks} = $max_parallel_tasks;
379 0         0 return;
380             }
381              
382             sub _remove_done_tasks {
383 55537     55537   151005 my ($this) = @_;
384 55537         106092 my $done = 0;
385 55537         112002 while (my (undef, $c) = each %{$this->{zombies}}) {
  55537         269920  
386 0 0       0 if ($c->_try_wait()) {
387 0         0 delete $this->{zombies}{$c};
388 0         0 $done += 1;
389             }
390             }
391 55537         141501 while (my (undef, $c) = each %{$this->{tasks}}) {
  205563         771858  
392             # See the comment in wait()
393 150026 100       525875 $done += $c->_try_wait() ? 1 : 0;
394             }
395 55537 100       133488 debug("Removed ${done} done tasks") if $done;
396 55537         118736 return;
397             }
398              
399             =pod
400              
401             =head2 signal_all()
402              
403             $executor->signal_all('HUP');
404              
405             Sends the given signal all the tasks. See L
406             for more details.
407              
408             =cut
409              
410             sub signal_all {
411 1     1 1 14 my ($this, $signal) = @_;
412             # See the comment in wait() for why $c can never be undef although it is a weak
413             # reference.
414 1         3 kill $signal, map { $_->pid() } values %{$this->{tasks}}, values %{$this->{zombies}};
  2         38  
  1         15  
  1         23  
415 1         21 return;
416             }
417              
418             =pod
419              
420             =head2 kill_all()
421              
422             $executor->kill_all();
423              
424             Same as L but sends the C signal by default.
425             You can still pass a specific signal if you want.
426              
427             =cut
428              
429             sub kill_all {
430 1     1 1 75 my ($this, $signal) = (@_, 'KILL');
431 1         29 return $this->signal_all($signal);
432             }
433              
434             1;
435              
436             =pod
437              
438             =head1 CAVEATS AND TODOS
439              
440             =over 4
441              
442             =item *
443              
444             The data returned by a child task can only have a limited size (4kB as of
445             writing this). In a future release, we may switch to using temporary files to
446             pass the result when this limit is reached.
447              
448             =item *
449              
450             There is currently no support to setup uni or bi-directional communication
451             channel with the child task. This must be done manually by the user.
452              
453             =back
454              
455             =head1 AUTHOR
456              
457             This program has been written by L.
458              
459             =head1 LICENSE
460              
461             Copyright 2024 Mathias Kende
462              
463             This program is distributed under the MIT (X11) License:
464             L
465              
466             Permission is hereby granted, free of charge, to any person
467             obtaining a copy of this software and associated documentation
468             files (the "Software"), to deal in the Software without
469             restriction, including without limitation the rights to use,
470             copy, modify, merge, publish, distribute, sublicense, and/or sell
471             copies of the Software, and to permit persons to whom the
472             Software is furnished to do so, subject to the following
473             conditions:
474              
475             The above copyright notice and this permission notice shall be
476             included in all copies or substantial portions of the Software.
477              
478             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
479             EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
480             OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
481             NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
482             HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
483             WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
484             FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
485             OTHER DEALINGS IN THE SOFTWARE.
486              
487             =head1 SEE ALSO
488              
489             =over 4
490              
491             =item L
492              
493             =item L
494              
495             =item L
496              
497             =item L
498              
499             =back
500              
501             =cut