File Coverage

blib/lib/Async/Simple/Task/Fork.pm
Criterion Covered Total %
statement 55 73 75.3
branch 12 24 50.0
condition 2 10 20.0
subroutine 12 12 100.0
pod 6 6 100.0
total 87 125 69.6


line stmt bran cond sub pod time code
1             package Async::Simple::Task::Fork;
2              
3             =head1 NAME
4              
5             Async::Simple::Task::Fork - Forks child process.
6             It waits for "data" whic will be passed via "put", and executed "sub" with this "data" as an argument.
7             Result of execution will be returned to parent by "get".
8              
9              
10             =head1 SYNOPSIS
11              
12             use Async::Simple::Task::Fork;
13              
14             my $sub = sub { sleep 1; return $_[0]{x} + 1 }; # Accepts $data as @_ and returns any type you need
15              
16             my $task = Async::Simple::Task::Fork->new( task => &$sub ); # Creates a child process, which waits for data and execute &$sub if data is passed
17              
18             my $data = { x => 123 }; # Any type you wish: scalar, array, hash
19              
20             $task->put( $data ); # Put a task data to sub in the child process
21              
22             # ...do something useful in parent while our data working ...
23              
24             my $result = $task->get; # result = undef because result is not ready yet
25             sleep 2; # or do something else....
26             my $result = $task->get; # result = 2
27              
28             $task->put( $data ); # Put another data to task sub and so on,....
29              
30             Result and data can be of any type and deep which can be translated via Data::Serializer->new( serializer => Storable ) # by default
31              
32             If your "sub" can return undef you should check $task->has_result, as a mark that result is ready.
33              
34              
35             =head1 DESCRIPTION
36              
37             Allows to initialize fork process.
38              
39             After that, executes "sub" for each "data" passed to child process.
40              
41              
42             =head1 METHODS
43              
44             =head2 C<new>
45              
46             Forks a process
47              
48             my $task = Async::Simple::Task::Fork->new( task => &$sub, %other_optional_params );
49              
50             Params (all param except "task" are optional):
51              
52             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
53              
54             timeout => timeout in seconds between child checkings for new data passed. default 0.01
55              
56             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
57              
58              
59             =head2 C<put>
60              
61             Puts data to task.
62              
63             $self->put( $data );
64              
65              
66             =head2 C<get>
67              
68             Tries to read result from task.
69              
70             Returns undef if it is not ready.
71              
72             In case, your function can return undef, you shoud check $task->has_answer, as a mark of ready result.
73              
74             my $result = $self->get();
75              
76              
77             =head1 SUPPORT AND DOCUMENTATION
78              
79             After installing, you can find documentation for this module with the perldoc command.
80              
81             perldoc Async::Simple::Task::Fork
82              
83             You can also look for information at:
84              
85             RT, CPAN's request tracker (report bugs here)
86             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task-Fork
87              
88             AnnoCPAN, Annotated CPAN documentation
89             http://annocpan.org/dist/Async-Simple-Task-Fork
90              
91             CPAN Ratings
92             http://cpanratings.perl.org/d/Async-Simple-Task-Fork
93              
94             Search CPAN
95             http://search.cpan.org/dist/Async-Simple-Task-Fork/
96              
97              
98             =head1 AUTHOR
99              
100             ANTONC <antonc@cpan.org>
101              
102             =head1 LICENSE
103              
104             This program is free software; you can redistribute it and/or modify it
105             under the terms of the the Artistic License (2.0). You may obtain a
106             copy of the full license at:
107              
108             L<http://www.perlfoundation.org/artistic_license_2_0>
109              
110             =cut
111              
112              
113 7     7   150375 use Modern::Perl;
  7         55  
  7         60  
114 7     7   1292 use Moose;
  7         502589  
  7         53  
115 7     7   52514 use namespace::autoclean;
  7         7002  
  7         57  
116 7     7   2799 use Data::Serializer;
  7         14779  
  7         297  
117 7     7   341 use Time::HiRes qw/ alarm sleep /;
  7         917  
  7         55  
118              
119              
120              
121              
122             our $VERSION = '0.18';
123              
124              
125             extends 'Async::Simple::Task';
126              
127              
128             =head1 Attributes
129              
130             =head2 task
131              
132             task = sub {
133             my ( $data ) = @_; # source data for task
134             ... your task code ...
135             return( $result );
136             }
137              
138             =cut
139              
140             has task => (
141             is => 'ro',
142             isa => 'CodeRef',
143             required => 1,
144             );
145              
146              
147             =head2 answer
148              
149             Result of current task
150              
151             =cut
152              
153              
154             =head2 has_answer
155              
156             has_answer is true, if the task has been finished and result has been ready
157              
158             =cut
159              
160              
161             =head2 timeout
162              
163             timeout - positive numeric value = seconds between checking for result.
164              
165             inherited from Async::Simple::Task.
166              
167             =cut
168              
169              
170             =head2 kill_on_exit
171              
172             Kills process from parent in case of object desctuction
173              
174             =cut
175              
176             has kill_on_exit => (
177             is => 'rw',
178             isa => 'Int',
179             default => 1,
180             );
181              
182              
183             # Writable pipe between parent and child.
184             # Each of them has pair of handlers, for duplex communication.
185             has writer => (
186             is => 'rw',
187             isa => 'FileHandle',
188             );
189              
190              
191             # Readable pipe between parent and child.
192             # Each of them has pair of handlers, for duplex communication.
193             has reader => (
194             is => 'rw',
195             isa => 'FileHandle',
196             );
197              
198              
199             # Object that must have 2 methods: encode + decode.
200             # Encoded data must be a singe string value
201             # By default serialization uses Data::Serializer with "Storable".
202             has serializer => (
203             is => 'ro',
204             isa => 'Any',
205             lazy => 1,
206             required => 1,
207             builder => 'get_serializer',
208             );
209              
210              
211             # In child always has value = 0
212             # For parent has Int value > 0
213             has pid => (
214             is => 'ro',
215             isa => 'Int',
216             required => 1,
217             lazy => 1,
218             builder => 'fork_child',
219             );
220              
221              
222             =head2 new()
223              
224             my $task = Async::Simple::Task::Fork->new( %all_optional_params );
225              
226              
227             Possible keys for %all_optional_params:
228              
229             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
230              
231             timeout => timeout in seconds between child checkings for new data passed. default 0.01
232              
233             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
234              
235             =cut
236              
237              
238             =head2 BUILD
239              
240             internal. Some tricks here:)
241              
242             1. Master process called $task->new with fork() inside
243             2. After forking done we have two processes:
244             2.1. Master gets one side of reader/writer pipes and pid of child
245             2.2. Child - another side of pipes and extra logic with everlasting loop
246              
247             =cut
248              
249             sub BUILD {
250 197     197 1 562 my ( $self ) = @_;
251              
252             # Return for master process
253             # Only child tasks must go down and make a loop
254 197 50       7839 return $self if $self->pid;
255              
256             # Child loop: untill parent is alive
257 0         0 while ( 1 ) {
258 0         0 $self->clear_answer;
259 0         0 $self->get;
260              
261 0 0       0 unless ( $self->has_answer ) {
262 0         0 sleep $self->timeout;
263 0         0 next;
264             }
265              
266 0         0 my $result = eval{ $self->task->( $self->answer ) };
  0         0  
267 0         0 $self->clear_answer;
268 0   0     0 $self->put( $result // $@ // '' );
      0        
269             }
270              
271 0         0 exit(0);
272             };
273              
274              
275             =head2 fork_child
276              
277             Makes child process and returns pid of child process to parent or 0 to child process
278              
279             =cut
280              
281             sub fork_child {
282 191     191 1 468 my ( $self ) = @_;
283              
284             # This is here instead of BEGIN, because this package uses as "extends" in Async::Simple::Task::ForkTmpFile
285             # TODO: Maybe it would be great to move this code(function) to separate package
286             # if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
287             # die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
288             # };
289              
290             # Pipes: parent -> child and child -> parent
291 191 50       7621 pipe my( $parent_reader, $child_writer ) or die 'Child to Parent pipe open error';
292 191 50       4462 pipe my( $child_reader, $parent_writer ) or die 'Parent to Child pipe open error';
293              
294 191   50     251791 my $pid = fork() // die "fork() failed: $!";
295              
296             # child
297 191 50       2698 unless ( $pid ) {
298 0         0 close $parent_reader;
299 0         0 close $parent_writer;
300              
301 0         0 $child_writer->autoflush(1);
302              
303 0         0 $self->writer( $child_writer );
304 0         0 $self->reader( $child_reader );
305              
306             # Important!
307             # Just after that we trap into BUILD
308             # with the infinitive loop for child process (pid=0)
309 0         0 return 0;
310             }
311              
312             # parent
313 191         6670 close $child_writer;
314 191         2771 close $child_reader;
315              
316 191         10854 $parent_writer->autoflush(1);
317              
318 191         59858 $self->writer( $parent_writer );
319 191         9257 $self->reader( $parent_reader );
320              
321 191         14599 return $pid;
322             };
323              
324              
325             =head2 get
326              
327             Reads from task, if something can be readed or returns undef after timeout.
328              
329             my $result = $task->get;
330              
331             Please note! If your function can return an undef value, then you shoud check
332              
333             $task->has_result.
334              
335             =cut
336              
337             sub get {
338 1700     1700 1 5512724 my ( $self ) = @_;
339              
340 1700         82345 my $pipe = $self->reader;
341 1700         5425 my $data;
342              
343             # Try to read "marker" into data within timeout
344             # Each pack starts with an empty line and serialized string of useful data.
345             eval {
346 1700     1413   30741 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
  1413         56519  
347 1700         90896 alarm $self->timeout;
348 1700         15155089 $data = <$pipe>;
349 287         4248 alarm 0;
350 1700 100       4751 } or do {
351             # Can't read data
352 1413 50       17566 return unless $data;
353             # Alarm caught but something readed, will continue
354 0         0 undef $@;
355             };
356              
357 287 50       1263 return unless defined $data;
358 287 50       1487 return unless $data eq "-\n";
359              
360             # Read useful data without any timeouts
361             # or die, if parent/child has closed connection
362 287         23311 $data = <$pipe>;
363              
364 287 50       1134 return unless defined $data;
365              
366             my $answer = $data
367 287 50       1318 ? eval{ $self->serializer->deserialize( $data )->[0] }
  287         11844  
368             : undef;
369              
370 287         60915 $self->answer( $answer );
371             };
372              
373              
374             =head2 put
375              
376             Writes task to task.
377              
378             $task->put( $data );
379              
380              
381             =cut
382              
383             sub put {
384 307     307 1 882 my ( $self, $data ) = @_;
385              
386 307         11217 $self->clear_answer;
387              
388 307         13244 my $pipe = $self->writer;
389              
390             # Each pack starts with an empty line and serialized string of useful data
391 307         7687 say $pipe '-';
392 307         12040 say $pipe $self->serializer->serialize( [ $data ] );
393              
394             };
395              
396              
397             =head2 get_serializer
398              
399             Internal. Returns an object that must have 2 methods: serialize and deserialize.
400             By default returns Data::Serializer with Storable as backend.
401              
402             $self->serializer->serialize( $task_data_ref );
403              
404             $result_ref = $self->serializer->deserialize();
405              
406             =cut
407              
408             sub get_serializer {
409 155     155 1 346 my ( $self ) = @_;
410              
411 155         1148 Data::Serializer->new( serializer => 'Storable' );
412             };
413              
414              
415             =head2 DEMOLISH
416              
417             Destroys object and probably should finish the child process.
418              
419             =cut
420              
421             sub DEMOLISH {
422 197     197 1 546 my ( $self ) = @_;
423              
424 197 50 33     6074 return unless $self->pid && $self->kill_on_exit;
425              
426 197         5981 kill 'KILL', $self->pid;
427             }
428              
429             __PACKAGE__->meta->make_immutable;