File Coverage

blib/lib/Async/Simple/Task/Fork.pm
Criterion Covered Total %
statement 56 73 76.7
branch 13 24 54.1
condition 2 10 20.0
subroutine 12 12 100.0
pod 6 6 100.0
total 89 125 71.2


line stmt bran cond sub pod time code
1             package Async::Simple::Task::Fork;
2              
3             =head1 NAME
4              
5             Async::Simple::Task::Fork - Forks child process.
6             It waits for "data" whic will be passed via "put", and executed "sub" with this "data" as an argument.
7             Result of execution will be returned to parent by "get".
8              
9              
10             =head1 SYNOPSIS
11              
12             use Async::Simple::Task::Fork;
13              
14             my $sub = sub { sleep 1; return $_[0]{x} + 1 }; # Accepts $data as @_ and returns any type you need
15              
16             my $task = Async::Simple::Task::Fork->new( task => &$sub ); # Creates a child process, which waits for data and execute &$sub if data is passed
17              
18             my $data = { x => 123 }; # Any type you wish: scalar, array, hash
19              
20             $task->put( $data ); # Put a task data to sub in the child process
21              
22             # ...do something useful in parent while our data working ...
23              
24             my $result = $task->get; # result = undef because result is not ready yet
25             sleep 2; # or do something else....
26             my $result = $task->get; # result = 2
27              
28             $task->put( $data ); # Put another data to task sub and so on,....
29              
30             Result and data can be of any type and deep which can be translated via Data::Serializer->new( serializer => Storable ) # by default
31              
32             If your "sub" can return undef you should check $task->has_result, as a mark that result is ready.
33              
34              
35             =head1 DESCRIPTION
36              
37             Allows to initialize fork process.
38              
39             After that, executes "sub" for each "data" passed to child process.
40              
41              
42             =head1 METHODS
43              
44             =head2 C<new>
45              
46             Forks a process
47              
48             my $task = Async::Simple::Task::Fork->new( task => &$sub, %other_optional_params );
49              
50             Params (all param except "task" are optional):
51              
52             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
53              
54             timeout => timeout in seconds between child checkings for new data passed. default 0.01
55              
56             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
57              
58              
59             =head2 C<put>
60              
61             Puts data to task.
62              
63             $self->put( $data );
64              
65              
66             =head2 C<get>
67              
68             Tries to read result from task.
69              
70             Returns undef if it is not ready.
71              
72             In case, your function can return undef, you shoud check $task->has_answer, as a mark of ready result.
73              
74             my $result = $self->get();
75              
76              
77             =head1 SUPPORT AND DOCUMENTATION
78              
79             After installing, you can find documentation for this module with the perldoc command.
80              
81             perldoc Async::Simple::Task::Fork
82              
83             You can also look for information at:
84              
85             RT, CPAN's request tracker (report bugs here)
86             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task-Fork
87              
88             AnnoCPAN, Annotated CPAN documentation
89             http://annocpan.org/dist/Async-Simple-Task-Fork
90              
91             CPAN Ratings
92             http://cpanratings.perl.org/d/Async-Simple-Task-Fork
93              
94             Search CPAN
95             http://search.cpan.org/dist/Async-Simple-Task-Fork/
96              
97              
98             =head1 AUTHOR
99              
100             ANTONC <antonc@cpan.org>
101              
102             =head1 LICENSE
103              
104             This program is free software; you can redistribute it and/or modify it
105             under the terms of the the Artistic License (2.0). You may obtain a
106             copy of the full license at:
107              
108             L<http://www.perlfoundation.org/artistic_license_2_0>
109              
110             =cut
111              
112              
113 4     4   142596 use Modern::Perl;
  4         9  
  4         39  
114 4     4   981 use Moose;
  4         415781  
  4         30  
115 4     4   27914 use namespace::autoclean;
  4         8047  
  4         38  
116 4     4   1401 use Data::Serializer;
  4         7344  
  4         129  
117 4     4   355 use Time::HiRes qw/ alarm sleep /;
  4         1029  
  4         28  
118              
119              
120             if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
121             die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
122             };
123              
124              
125             our $VERSION = '0.12';
126              
127              
128             extends 'Async::Simple::Task';
129              
130              
131             =head1 Attributes
132              
133             =head2 task
134              
135             task = sub {
136             my ( $data ) = @_; # source data for task
137             ... your task code ...
138             return( $result );
139             }
140              
141             =cut
142              
143             has task => (
144             is => 'ro',
145             isa => 'CodeRef',
146             required => 1,
147             );
148              
149              
150             =head2 answer
151              
152             Result of current task
153              
154             =cut
155              
156              
157             =head2 has_answer
158              
159             has_answer is true, if the task has been finished and result has been ready
160              
161             =cut
162              
163              
164             =head2 timeout
165              
166             timeout - positive numeric value = seconds between checking for result.
167              
168             inherited from Async::Simple::Task.
169              
170             =cut
171              
172              
173             =head2 kill_on_exit
174              
175             Kills process from parent in case of object desctuction
176              
177             =cut
178              
179             has kill_on_exit => (
180             is => 'rw',
181             isa => 'Int',
182             default => 1,
183             );
184              
185              
186             # Writable pipe between parent and child.
187             # Each of them has pair of handlers, for duplex communication.
188             has writer => (
189             is => 'rw',
190             isa => 'FileHandle',
191             );
192              
193              
194             # Readable pipe between parent and child.
195             # Each of them has pair of handlers, for duplex communication.
196             has reader => (
197             is => 'rw',
198             isa => 'FileHandle',
199             );
200              
201              
202             # Object that must have 2 methods: encode + decode.
203             # Encoded data must be a singe string value
204             # By default serialization uses Data::Serializer with "Storable".
205             has serializer => (
206             is => 'ro',
207             isa => 'Any',
208             lazy => 1,
209             required => 1,
210             builder => 'get_serializer',
211             );
212              
213              
214             # In child always has value = 0
215             # For parent has Int value > 0
216             has pid => (
217             is => 'ro',
218             isa => 'Int',
219             required => 1,
220             builder => 'fork_child',
221             );
222              
223              
224             =head2 new()
225              
226             my $task = Async::Simple::Task::Fork->new( %all_optional_params );
227              
228              
229             Possible keys for %all_optional_params:
230              
231             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
232              
233             timeout => timeout in seconds between child checkings for new data passed. default 0.01
234              
235             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
236              
237             =cut
238              
239              
240             =head2 BUILD
241              
242             internal. Some tricks here:)
243              
244             1. Master process called $task->new with fork() inside
245             2. After forking done we have two processes:
246             2.1. Master gets one side of reader/writer pipes and pid of child
247             2.2. Child - another side of pipes and extra logic with everlasting loop
248              
249             =cut
250              
251             sub BUILD {
252 197     197 1 825 my ( $self ) = @_;
253              
254             # Return for master process
255             # Only child tasks must go down and make a loop
256 197 50       10341 return $self if $self->pid;
257              
258             # Child loop: untill parent is alive
259 0         0 while ( 1 ) {
260 0         0 $self->clear_answer;
261 0         0 $self->get;
262              
263 0 0       0 unless ( $self->has_answer ) {
264 0         0 sleep $self->timeout;
265 0         0 next;
266             }
267              
268 0         0 my $result = eval{ $self->task->( $self->answer ) };
  0         0  
269 0         0 $self->clear_answer;
270 0   0     0 $self->put( $result // $@ // '' );
      0        
271             }
272              
273 0         0 exit(0);
274             };
275              
276              
277             =head2 fork_child
278              
279             Makes child process and returns pid of child process to parent or 0 to child process
280              
281             =cut
282              
283             sub fork_child {
284 191     191 1 552 my ( $self ) = @_;
285              
286             # Pipes: parent -> child and child -> parent
287 191 50       8789 pipe my( $parent_reader, $child_writer ) or die 'Child to Parent pipe open error';
288 191 50       5846 pipe my( $child_reader, $parent_writer ) or die 'Parent to Child pipe open error';
289              
290 191   50     277437 my $pid = fork() // die "fork() failed: $!";
291              
292             # child
293 191 50       4271 unless ( $pid ) {
294 0         0 close $parent_reader;
295 0         0 close $parent_writer;
296              
297 0         0 $child_writer->autoflush(1);
298              
299 0         0 $self->writer( $child_writer );
300 0         0 $self->reader( $child_reader );
301              
302             # Important!
303             # Just after that we trap into BUILD
304             # with the infinitive loop for child process (pid=0)
305 0         0 return 0;
306             }
307              
308             # parent
309 191         6341 close $child_writer;
310 191         3509 close $child_reader;
311              
312 191         11047 $parent_writer->autoflush(1);
313              
314 191         62021 $self->writer( $parent_writer );
315 191         9718 $self->reader( $parent_reader );
316              
317 191         16016 return $pid;
318             };
319              
320              
321             =head2 get
322              
323             Reads from task, if something can be readed or returns undef after timeout.
324              
325             my $result = $task->get;
326              
327             Please note! If your function can return an undef value, then you shoud check
328              
329             $task->has_result.
330              
331             =cut
332              
333             sub get {
334 1757     1757 1 5513502 my ( $self ) = @_;
335              
336 1757         80134 my $pipe = $self->reader;
337 1757         5584 my $data;
338              
339             # Try to read "marker" into data within timeout
340             # Each pack starts with an empty line and serialized string of useful data.
341             eval {
342 1757     1472   32603 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
  1472         56304  
343 1757         81206 alarm $self->timeout;
344 1757         15333065 $data = <$pipe>;
345 287         5276 alarm 0;
346 1757 100       5559 } or do {
347             # Can't read data
348 1472 100       19616 return unless $data;
349             # Alarm caught but something readed, will continue
350 2         18 undef $@;
351             };
352              
353 287 50       1273 return unless defined $data;
354 287 50       1088 return unless $data eq "-\n";
355              
356             # Read useful data without any timeouts
357             # or die, if parent/child has closed connection
358 287         13183 $data = <$pipe>;
359              
360 287 50       1079 return unless defined $data;
361              
362             my $answer = $data
363 287 50       1294 ? eval{ $self->serializer->deserialize( $data )->[0] }
  287         12354  
364             : undef;
365              
366 287         66394 $self->answer( $answer );
367             };
368              
369              
370             =head2 put
371              
372             Writes task to task.
373              
374             $task->put( $data );
375              
376              
377             =cut
378              
379             sub put {
380 311     311 1 2540 my ( $self, $data ) = @_;
381              
382 311         11662 $self->clear_answer;
383              
384 311         10823 my $pipe = $self->writer;
385              
386             # Each pack starts with an empty line and serialized string of useful data
387 311         7912 say $pipe '-';
388 311         12529 say $pipe $self->serializer->serialize( [ $data ] );
389              
390             };
391              
392              
393             =head2 get_serializer
394              
395             Internal. Returns an object that must have 2 methods: serialize and deserialize.
396             By default returns Data::Serializer with Storable as backend.
397              
398             $self->serializer->serialize( $task_data_ref );
399              
400             $result_ref = $self->serializer->deserialize();
401              
402             =cut
403              
404             sub get_serializer {
405 155     155 1 391 my ( $self ) = @_;
406              
407 155         1289 Data::Serializer->new( serializer => 'Storable' );
408             };
409              
410              
411             =head2 DEMOLISH
412              
413             Destroys object and probably should finish the child process.
414              
415             =cut
416              
417             sub DEMOLISH {
418 197     197 1 583 my ( $self ) = @_;
419              
420 197 50 33     7433 return unless $self->pid && $self->kill_on_exit;
421              
422 197         6861 kill 'KILL', $self->pid;
423             }
424              
425             __PACKAGE__->meta->make_immutable;