File Coverage

blib/lib/Async/Simple/Task/Fork.pm
Criterion Covered Total %
statement 56 75 74.6
branch 13 26 50.0
condition 2 10 20.0
subroutine 12 12 100.0
pod 6 6 100.0
total 89 129 68.9


line stmt bran cond sub pod time code
1             package Async::Simple::Task::Fork;
2              
3             =head1 NAME
4              
5             Async::Simple::Task::Fork - Forks child process.
6             It waits for "data" whic will be passed via "put", and executed "sub" with this "data" as an argument.
7             Result of execution will be returned to parent by "get".
8              
9              
10             =head1 SYNOPSIS
11              
12             use Async::Simple::Task::Fork;
13              
14             my $sub = sub { sleep 1; return $_[0]{x} + 1 }; # Accepts $data as @_ and returns any type you need
15              
16             my $task = Async::Simple::Task::Fork->new( task => &$sub ); # Creates a child process, which waits for data and execute &$sub if data is passed
17              
18             my $data = { x => 123 }; # Any type you wish: scalar, array, hash
19              
20             $task->put( $data ); # Put a task data to sub in the child process
21              
22             # ...do something useful in parent while our data working ...
23              
24             my $result = $task->get; # result = undef because result is not ready yet
25             sleep 2; # or do something else....
26             my $result = $task->get; # result = 2
27              
28             $task->put( $data ); # Put another data to task sub and so on,....
29              
30             Result and data can be of any type and deep which can be translated via Data::Serializer->new( serializer => Storable ) # by default
31              
32             If your "sub" can return undef you should check $task->has_result, as a mark that result is ready.
33              
34              
35             =head1 DESCRIPTION
36              
37             Allows to initialize fork process.
38              
39             After that, executes "sub" for each "data" passed to child process.
40              
41              
42             =head1 METHODS
43              
44             =head2 C<new>
45              
46             Forks a process
47              
48             my $task = Async::Simple::Task::Fork->new( task => &$sub, %other_optional_params );
49              
50             Params (all param except "task" are optional):
51              
52             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
53              
54             timeout => timeout in seconds between child checkings for new data passed. default 0.01
55              
56             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
57              
58              
59             =head2 C<put>
60              
61             Puts data to task.
62              
63             $self->put( $data );
64              
65              
66             =head2 C<get>
67              
68             Tries to read result from task.
69              
70             Returns undef if it is not ready.
71              
72             In case, your function can return undef, you shoud check $task->has_answer, as a mark of ready result.
73              
74             my $result = $self->get();
75              
76              
77             =head1 SUPPORT AND DOCUMENTATION
78              
79             After installing, you can find documentation for this module with the perldoc command.
80              
81             perldoc Async::Simple::Task::Fork
82              
83             You can also look for information at:
84              
85             RT, CPAN's request tracker (report bugs here)
86             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task-Fork
87              
88             AnnoCPAN, Annotated CPAN documentation
89             http://annocpan.org/dist/Async-Simple-Task-Fork
90              
91             CPAN Ratings
92             http://cpanratings.perl.org/d/Async-Simple-Task-Fork
93              
94             Search CPAN
95             http://search.cpan.org/dist/Async-Simple-Task-Fork/
96              
97              
98             =head1 AUTHOR
99              
100             ANTONC <antonc@cpan.org>
101              
102             =head1 LICENSE
103              
104             This program is free software; you can redistribute it and/or modify it
105             under the terms of the the Artistic License (2.0). You may obtain a
106             copy of the full license at:
107              
108             L<http://www.perlfoundation.org/artistic_license_2_0>
109              
110             =cut
111              
112              
113 4     4   147355 use Modern::Perl;
  4         9  
  4         45  
114 4     4   1028 use Moose;
  4         512074  
  4         33  
115 4     4   35031 use namespace::autoclean;
  4         5766  
  4         38  
116 4     4   1916 use Data::Serializer;
  4         9176  
  4         176  
117 4     4   414 use Time::HiRes qw/ alarm sleep /;
  4         1038  
  4         38  
118              
119              
120              
121              
122             our $VERSION = '0.13';
123              
124              
125             extends 'Async::Simple::Task';
126              
127              
128             =head1 Attributes
129              
130             =head2 task
131              
132             task = sub {
133             my ( $data ) = @_; # source data for task
134             ... your task code ...
135             return( $result );
136             }
137              
138             =cut
139              
140             has task => (
141             is => 'ro',
142             isa => 'CodeRef',
143             required => 1,
144             );
145              
146              
147             =head2 answer
148              
149             Result of current task
150              
151             =cut
152              
153              
154             =head2 has_answer
155              
156             has_answer is true, if the task has been finished and result has been ready
157              
158             =cut
159              
160              
161             =head2 timeout
162              
163             timeout - positive numeric value = seconds between checking for result.
164              
165             inherited from Async::Simple::Task.
166              
167             =cut
168              
169              
170             =head2 kill_on_exit
171              
172             Kills process from parent in case of object desctuction
173              
174             =cut
175              
176             has kill_on_exit => (
177             is => 'rw',
178             isa => 'Int',
179             default => 1,
180             );
181              
182              
183             # Writable pipe between parent and child.
184             # Each of them has pair of handlers, for duplex communication.
185             has writer => (
186             is => 'rw',
187             isa => 'FileHandle',
188             );
189              
190              
191             # Readable pipe between parent and child.
192             # Each of them has pair of handlers, for duplex communication.
193             has reader => (
194             is => 'rw',
195             isa => 'FileHandle',
196             );
197              
198              
199             # Object that must have 2 methods: encode + decode.
200             # Encoded data must be a singe string value
201             # By default serialization uses Data::Serializer with "Storable".
202             has serializer => (
203             is => 'ro',
204             isa => 'Any',
205             lazy => 1,
206             required => 1,
207             builder => 'get_serializer',
208             );
209              
210              
211             # In child always has value = 0
212             # For parent has Int value > 0
213             has pid => (
214             is => 'ro',
215             isa => 'Int',
216             required => 1,
217             builder => 'fork_child',
218             );
219              
220              
221             =head2 new()
222              
223             my $task = Async::Simple::Task::Fork->new( %all_optional_params );
224              
225              
226             Possible keys for %all_optional_params:
227              
228             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
229              
230             timeout => timeout in seconds between child checkings for new data passed. default 0.01
231              
232             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
233              
234             =cut
235              
236              
237             =head2 BUILD
238              
239             internal. Some tricks here:)
240              
241             1. Master process called $task->new with fork() inside
242             2. After forking done we have two processes:
243             2.1. Master gets one side of reader/writer pipes and pid of child
244             2.2. Child - another side of pipes and extra logic with everlasting loop
245              
246             =cut
247              
248             sub BUILD {
249 197     197 1 764 my ( $self ) = @_;
250              
251             # Return for master process
252             # Only child tasks must go down and make a loop
253 197 50       11002 return $self if $self->pid;
254              
255             # Child loop: untill parent is alive
256 0         0 while ( 1 ) {
257 0         0 $self->clear_answer;
258 0         0 $self->get;
259              
260 0 0       0 unless ( $self->has_answer ) {
261 0         0 sleep $self->timeout;
262 0         0 next;
263             }
264              
265 0         0 my $result = eval{ $self->task->( $self->answer ) };
  0         0  
266 0         0 $self->clear_answer;
267 0   0     0 $self->put( $result // $@ // '' );
      0        
268             }
269              
270 0         0 exit(0);
271             };
272              
273              
274             =head2 fork_child
275              
276             Makes child process and returns pid of child process to parent or 0 to child process
277              
278             =cut
279              
280             sub fork_child {
281 191     191 1 643 my ( $self ) = @_;
282              
283             # This is here instead of BEGIN, because this package uses as "extends" in Async::Simple::Task::ForkTmpFile
284             # TODO: Maybe it would be great to move this code(function) to separate package
285 191 50       3189 if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
286 0         0 die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
287             };
288              
289             # Pipes: parent -> child and child -> parent
290 191 50       9554 pipe my( $parent_reader, $child_writer ) or die 'Child to Parent pipe open error';
291 191 50       4361 pipe my( $child_reader, $parent_writer ) or die 'Parent to Child pipe open error';
292              
293 191   50     338761 my $pid = fork() // die "fork() failed: $!";
294              
295             # child
296 191 50       4235 unless ( $pid ) {
297 0         0 close $parent_reader;
298 0         0 close $parent_writer;
299              
300 0         0 $child_writer->autoflush(1);
301              
302 0         0 $self->writer( $child_writer );
303 0         0 $self->reader( $child_reader );
304              
305             # Important!
306             # Just after that we trap into BUILD
307             # with the infinitive loop for child process (pid=0)
308 0         0 return 0;
309             }
310              
311             # parent
312 191         7287 close $child_writer;
313 191         3017 close $child_reader;
314              
315 191         10818 $parent_writer->autoflush(1);
316              
317 191         62324 $self->writer( $parent_writer );
318 191         10690 $self->reader( $parent_reader );
319              
320 191         15680 return $pid;
321             };
322              
323              
324             =head2 get
325              
326             Reads from task, if something can be readed or returns undef after timeout.
327              
328             my $result = $task->get;
329              
330             Please note! If your function can return an undef value, then you shoud check
331              
332             $task->has_result.
333              
334             =cut
335              
336             sub get {
337 1737     1737 1 5515574 my ( $self ) = @_;
338              
339 1737         106468 my $pipe = $self->reader;
340 1737         4913 my $data;
341              
342             # Try to read "marker" into data within timeout
343             # Each pack starts with an empty line and serialized string of useful data.
344             eval {
345 1737     1450   35265 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
  1450         66437  
346 1737         88160 alarm $self->timeout;
347 1737         15195744 $data = <$pipe>;
348 287         5029 alarm 0;
349 1737 100       4992 } or do {
350             # Can't read data
351 1450 50       21403 return unless $data;
352             # Alarm caught but something readed, will continue
353 0         0 undef $@;
354             };
355              
356 287 50       1347 return unless defined $data;
357 287 50       1453 return unless $data eq "-\n";
358              
359             # Read useful data without any timeouts
360             # or die, if parent/child has closed connection
361 287         21267 $data = <$pipe>;
362              
363 287 50       1077 return unless defined $data;
364              
365             my $answer = $data
366 287 50       1407 ? eval{ $self->serializer->deserialize( $data )->[0] }
  287         13239  
367             : undef;
368              
369 287         69194 $self->answer( $answer );
370             };
371              
372              
373             =head2 put
374              
375             Writes task to task.
376              
377             $task->put( $data );
378              
379              
380             =cut
381              
382             sub put {
383 311     311 1 2057 my ( $self, $data ) = @_;
384              
385 311         12270 $self->clear_answer;
386              
387 311         11177 my $pipe = $self->writer;
388              
389             # Each pack starts with an empty line and serialized string of useful data
390 311         7329 say $pipe '-';
391 311         13618 say $pipe $self->serializer->serialize( [ $data ] );
392              
393             };
394              
395              
396             =head2 get_serializer
397              
398             Internal. Returns an object that must have 2 methods: serialize and deserialize.
399             By default returns Data::Serializer with Storable as backend.
400              
401             $self->serializer->serialize( $task_data_ref );
402              
403             $result_ref = $self->serializer->deserialize();
404              
405             =cut
406              
407             sub get_serializer {
408 155     155 1 424 my ( $self ) = @_;
409              
410 155         1369 Data::Serializer->new( serializer => 'Storable' );
411             };
412              
413              
414             =head2 DEMOLISH
415              
416             Destroys object and probably should finish the child process.
417              
418             =cut
419              
420             sub DEMOLISH {
421 197     197 1 572 my ( $self ) = @_;
422              
423 197 50 33     7112 return unless $self->pid && $self->kill_on_exit;
424              
425 197         7706 kill 'KILL', $self->pid;
426             }
427              
428             __PACKAGE__->meta->make_immutable;