File Coverage

blib/lib/Parallel/TaskExecutor/Task.pm
Criterion Covered Total %
statement 106 106 100.0
branch 40 48 83.3
condition 2 3 66.6
subroutine 22 22 100.0
pod 7 9 77.7
total 177 188 94.1


line stmt bran cond sub pod time code
1             package Parallel::TaskExecutor::Task;
2              
3 54     54   347 use strict;
  54         109  
  54         1878  
4 54     54   252 use warnings;
  54         77  
  54         2338  
5 54     54   230 use utf8;
  54         76  
  54         435  
6              
7 54     54   1515 use English;
  54         110  
  54         529  
8 54     54   24911 use Hash::Util 'lock_keys';
  54         102  
  54         669  
9 54     54   3955 use Log::Any::Simple ':default';
  54         86  
  54         291  
10 54     54   50175 use POSIX ':sys_wait_h';
  54         533493  
  54         398  
11 54     54   88857 use Scalar::Util 'unweaken';
  54         75  
  54         66469  
12              
13             our $VERSION = '0.05'; # Remember to change it in TaskExecutor.pm too.
14              
15             =pod
16              
17             =encoding utf8
18              
19             =head1 NAME
20              
21             Parallel::TaskExecutor::Tasks
22              
23             =head1 SYNOPSIS
24              
25             A simple task (or promise) class for the L package.
26              
27             my $executor = Parallel::TaskExecutor->new();
28             my $task = $executor->run(sub { return 'foo' });
29             $task->wait();
30             is($task->data(), 'foo');
31              
32             =head1 DESCRIPTION
33              
34             The tasks that this class exposes are lightweight promises that can be used to
35             wait for the end of the parallel processing and read the result of that
36             processing.
37              
38             =head1 METHODS
39              
40             =head2 constructor
41              
42             The constructor of this class is private, it can only be built by
43             L through a call to L.
44              
45             =cut
46              
47             sub new {
48 519     519 0 102521 my ($class, %data) = @_;
49             # %data can be anything that is needed by Parallel::TaskExecutor. However the
50             # following values are used by Parallel::TaskExecutor::Task too:
51             # - state: one of new, running, done
52             # - pid: the PID of the task
53             # - parent: the PID of the parent process. We don’t do anything if we’re
54             # called in a different process.
55             # - task_id: arbitrary identifier for the task
56             # - runner: Parallel::TaskExecutor runner for this task, kept as a weak
57             # reference
58             # - untracked: don’t count this task toward the task limit of its runner
59             # - catch_error: if false, a failed task will abort the parent.
60             # - channel: may be set to read the data produced by the task
61             # - data: will contain the data read from the channel.
62 519         558027 my $this = bless {%data, data => undef, error => undef}, $class;
63 519         2575 lock_keys(%{$this});
  519         207948  
64 519         177748 return $this;
65             }
66              
67             =pod
68              
69             =head2 destructor
70              
71             The destructor of a B object will block until the
72             task is done if you no longer keep a reference to its parent
73             L object. If the parent executor is still live, then
74             that object will be responsible to wait for the end of the task (either through
75             an explicit call to L or in its destructor).
76              
77             =cut
78              
79             sub DESTROY {
80 526     526   163226 my ($this) = @_;
81             # TODO: consider if this is the correct thing to do or if we should instead
82             # wait for the task here.
83 526 50       3637 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
84 526 100       17616 return unless $PID == $this->{parent};
85             # TODO: provide a system to not wait here, but defer that to the deletion of
86             # the runner.
87 291 100       1154 if ($this->running()) {
88 17 100       200 if ($this->{runner}) {
89 7         167 trace("Deferring reaping of task $this->{task_id}");
90             # We could unweaken the entry in the tasks hash, but it’s cleaner to have
91             # only weak objects there, and non-weak objects in zombies (otherwise we
92             # would need to rely on isweak in the TaskExecutor DESTROY method).
93 7         171 $this->{runner}{zombies}{$this} = $this;
94 7         64 delete $this->{runner}{tasks}{$this};
95             # Once we are a zombie, we can be deleted only once done, so this code path
96             # will not keep creating reference to the object.
97             } else {
98 10         445 $this->wait();
99             }
100             }
101 291         6168 return;
102             }
103              
104             =pod
105              
106             =head2 wait
107              
108             $task->wait();
109              
110             Blocks until the task is done. When this function returns, it is guaranteed that
111             the task is in the I state (that is, that done() will return true).
112              
113             Returns a true value if the child task succeeded. If the task failed and
114             B was set in the parent executor when the
115             task started, then this method will return a false value.
116              
117              
118             =cut
119              
120             sub wait { ## no critic (ProhibitBuiltinHomonyms)
121 299     299 1 19164 my ($this) = @_;
122 299 100       1176 return if $this->{state} eq 'done';
123 100         2789 trace("Starting blocking waitpid($this->{pid})");
124 100         7512 local ($ERRNO, $CHILD_ERROR) = (0, 0);
125 100         36376380 my $ret = waitpid($this->{pid}, 0);
126 100 50       1408 fatal("No children with pid $this->{pid} for task $this->{task_id}") if $ret == -1;
127             fatal(
128             "Incoherent PID returned by waitpid: actual $ret; expected $this->{pid} for task $this->{task_id}"
129 100 50       769 ) if $ret != $this->{pid};
130 100         1901 $this->_process_done();
131 99 100       1225 return $this->{error} ? 0 : 1;
132             }
133              
134             =pod
135              
136             =head2 data
137              
138             my @data = $task->data();
139             my $data = $task->data();
140              
141             Returns the result value of a finished task (produced by the code-reference that
142             was passed to the run() call of the executor). If called in list context,
143             returns all the produced value. If called in scalar context, returns only the
144             first value. Note that the code-reference itself in the task has been called in
145             a list context by default, unless the B option was passed to its
146             executor.
147              
148             It is an error to call this method on a task that is still running. So you must
149             be sure that the task is done before you call it (either through a call to
150             wait() or to done() for example). See also the get() method which combines a
151             call to wait() and to data().
152              
153             If the task failed and B was set in the parent executor when the
154             task started, then this method will die() with the child task error.
155              
156             =cut
157              
158             sub data {
159 256     256 1 19663 my ($this) = @_;
160 256 100       1585 fatal('Trying to read the data of a still running task') unless $this->done();
161 254 100       875 die $this->{error} if defined $this->{error}; ## no critic (RequireCarping)
162             # TODO: we should have a variant for undef wantarray that does not setup
163             # the whole pipe to get the return data.
164             # Note: wantarray here is not necessarily the same as when the task was set
165             # up, it is the responsibility of the caller to set the 'scalar' option
166             # correctly.
167 249 100       2171 return wantarray ? @{$this->{data}} : $this->{data}[0];
  4         92  
168             }
169              
170             =pod
171              
172             =head2 running
173              
174             print "Still running\n" if $task->running();
175              
176             Returns whether the task is still running.
177              
178             =cut
179              
180             sub running {
181 303     303 1 854 my ($this) = @_;
182 303 100       1358 $this->_try_wait() if $this->{state} eq 'running';
183 303         1307 return $this->{state} eq 'running';
184             }
185              
186             =pod
187              
188             =head2 done
189              
190             print "Done\n" if $task->done();
191              
192             Returns whether the task is done. This is guaranteed to always be the opposite
193             of done().
194              
195             =cut
196              
197             # This method is the opposite of running() because the task can only be in the
198             # state running or done once it has been returned to the caller.
199              
200             sub done {
201 256     256 1 621 my ($this) = @_;
202 256 100       939 $this->_try_wait() if $this->{state} eq 'running';
203 256         1048 return $this->{state} eq 'done';
204             }
205              
206             =pod
207              
208             =head2 get
209              
210             my $data = $task->get();
211              
212             Waits until the task is done and returns the result of the task. See the
213             documentation of the L and L methods for more
214             details, in particular regarding scalar and list context data.
215              
216             =cut
217              
218             sub get {
219 233     233 1 3057 my ($this) = @_;
220 233         1280 $this->wait();
221 233         1250 return $this->data();
222             }
223              
224             sub _try_wait {
225 150050     150050   300585 my ($this) = @_;
226 150050 50       401325 return if $this->{state} ne 'running';
227 150050         689900 trace("Starting non blocking waitpid($this->{pid})");
228 150050         3275489 local ($ERRNO, $CHILD_ERROR) = (0, 0);
229 150050         971691 my $pid = waitpid($this->{pid}, WNOHANG);
230 150050 100 66     858233 if ($pid > 0 || $pid < -1) { # Perl fake processes on Windows use negative PIDs.
231             # TODO: do the same validation on $pid than in the wait() method.
232 356         10345 $this->_process_done();
233 356         7257 return 1;
234             }
235 149694         883340 return;
236             }
237              
238             sub _process_done {
239 456     456   2211 my ($this) = @_;
240 456         3290 $this->{state} = 'done';
241 456 100       1989 if ($this->{runner}) {
242 433 100       2010 $this->{runner}{current_tasks}-- unless $this->{untracked};
243 433         3301 delete $this->{runner}{tasks}{$this}; # might not exist if we are a zombie, this is fine.
244             }
245 456 100       3219 if ($CHILD_ERROR) {
    50          
246 10 100       63 if ($this->{catch_error}) {
247 9         66 $this->{error} = "Child command failed: ${CHILD_ERROR}";
248             } else {
249             # Ideally, we should first wait for all child processes of all runners
250             # before dying, to print the dying message last.
251 1         69 error(
252             "Child process (pid == $this->{pid}, task_id == $this->{task_id}) failed (${CHILD_ERROR})");
253 1         149 exit 2;
254             }
255             } elsif ($this->{channel}) {
256 446         5771 local $INPUT_RECORD_SEPARATOR = undef;
257 446         1352 my $fh = $this->{channel};
258 446         14085 my $data = <$fh>;
259 446 50       14207 close $fh or warning("Cannot close task output channel: ${ERRNO}");
260             {
261 54     54   413 no strict; ## no critic (ProhibitNoStrict)
  54         75  
  54         3160  
  446         1212  
262 54     54   259 no warnings; ## no critic (ProhibitNoWarnings)
  54         126  
  54         22989  
263 446         86251 $this->{data} = eval $data; ## no critic (ProhibitStringyEval)
264             }
265             fatal(
266 446 50       4012 "Cannot parse the output of child task $this->{task_id} (pid == $this->{pid}): ${EVAL_ERROR}")
267             if $EVAL_ERROR;
268             }
269 455         10435 trace("Child pid == $this->{pid} returned (task id == $this->{task_id})");
270 455 100       13841 trace(" --> current tasks == $this->{runner}{current_tasks}") if $this->{runner};
271 455         5017 return;
272             }
273              
274             # Undocumented because there is too much risk that the behavior of the library
275             # would be broken if the user started doing weird thing with that.
276             sub pid {
277 5     5 0 29 my ($this) = @_;
278 5         4075 return $this->{pid};
279             }
280              
281             =pod
282              
283             =head2 signal
284              
285             $task->signal('HUP');
286              
287             Sends the given signal to the task. Signal can be anything accepted by the
288             L method, so either a signal name or a signal number. See
289             L for how to get the list of supported signals.
290              
291             Note that even if the signal kills the task you should still in general wait()
292             for it at some point. Also, unless the task gracefully handles the signal, you
293             will probably need to pass the C option to the run() call when the
294             task is started, otherwise your whole program will be aborted.
295              
296             =cut
297              
298             sub signal {
299 3     3 1 27 my ($this, $signal) = @_;
300 3 50       24 return if $this->{state} ne 'running';
301 3         124 trace("Sending signal ${signal} to process $this->{pid}");
302 3         224 kill $signal, $this->{pid};
303 3         25 return;
304             }
305              
306             =pod
307              
308             =head2 kill
309              
310             $task->kill();
311              
312             This is a synonym of L|/signal> but where the default argument is
313             C. You can still pass a different signal name if you want.
314              
315             =cut
316              
317             sub kill { ## no critic (Subroutines::ProhibitBuiltinHomonyms)
318 2     2 1 34 my ($this, $signal) = (@_, 'KILL');
319 2         20 return $this->signal($signal);
320             }
321              
322             1;