File Coverage

blib/lib/Async/Simple/Task/ForkTmpFile.pm
Criterion Covered Total %
statement 47 57 82.4
branch 8 16 50.0
condition 1 2 50.0
subroutine 8 9 88.8
pod 2 2 100.0
total 66 86 76.7


line stmt bran cond sub pod time code
1             package Async::Simple::Task::ForkTmpFile;
2              
3             =head1 NAME
4              
5             Async::Simple::Task::ForkTmpFile - Forks child process.
6             It waits for "data" whic will be passed via "put", and executed "sub" with this "data" as an argument.
7             Result of execution will be returned to parent by "get".
8              
9             The behaviour of this class all the same as of Async::Simple::Task::Fork
10             except that it use files as interprocess transport instead of pipes.
11              
12             This class is recommended only as workaround for systems which don't support
13             bidirectional pipes.
14              
15             =head1 SYNOPSIS
16              
17             use Async::Simple::Task::ForkTmpFile;
18              
19             my $sub = sub { sleep 1; return $_[0]{x} + 1 }; # Accepts $data as @_ and returns any type you need
20              
21             my $task = Async::Simple::Task::ForkTmpFile->new( task => &$sub ); # Creates a child process, which waits for data and execute &$sub if data is passed
22              
23             my $data = { x => 123 }; # Any type you wish: scalar, array, hash
24              
25             $task->put( $data ); # Put a task data to sub in the child process
26              
27             # ...do something useful in parent while our data working ...
28              
29             my $result = $task->get; # result = undef because result is not ready yet
30             sleep 2; # or do something else....
31             my $result = $task->get; # result = 2
32              
33             $task->put( $data ); # Put another data to task sub and so on,....
34              
35             Result and data can be of any type and deep which can be translated via Data::Serializer->new( serializer => Storable ) # by default
36              
37             If your "sub" can return undef you should check $task->has_result, as a mark that result is ready.
38              
39              
40             =head1 DESCRIPTION
41              
42             Allows to initialize fork process.
43              
44             After that, executes "sub" for each "data" passed to child process.
45              
46              
47             =head1 METHODS
48              
49             =head2 C<new>
50              
51             Forks a process
52              
53             my $task = Async::Simple::Task::ForkTmpFile->new( task => &$sub, %other_optional_params );
54              
55             Params (all param except "task" are optional):
56              
57             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
58              
59             timeout => timeout in seconds between child checkings for new data passed. default 0.01
60              
61             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
62              
63              
64             =head2 C<put>
65              
66             Puts data to task.
67              
68             $self->put( $data );
69              
70              
71             =head2 C<get>
72              
73             Tries to read result from task.
74              
75             Returns undef if it is not ready.
76              
77             In case, your function can return undef, you shoud check $task->has_answer, as a mark of ready result.
78              
79             my $result = $self->get();
80              
81              
82             =head1 SUPPORT AND DOCUMENTATION
83              
84             After installing, you can find documentation for this module with the perldoc command.
85              
86             perldoc Async::Simple::Task::ForkTmpFile
87              
88             You can also look for information at:
89              
90             RT, CPAN's request tracker (report bugs here)
91             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task-ForkTmpFile
92              
93             AnnoCPAN, Annotated CPAN documentation
94             http://annocpan.org/dist/Async-Simple-Task-ForkTmpFile
95              
96             CPAN Ratings
97             http://cpanratings.perl.org/d/Async-Simple-Task-ForkTmpFile
98              
99             Search CPAN
100             http://search.cpan.org/dist/Async-Simple-Task-ForkTmpFile/
101              
102              
103             =head1 AUTHOR
104              
105             ANTONC <antonc@cpan.org>
106              
107             =head1 LICENSE
108              
109             This program is free software; you can redistribute it and/or modify it
110             under the terms of the the Artistic License (2.0). You may obtain a
111             copy of the full license at:
112              
113             L<http://www.perlfoundation.org/artistic_license_2_0>
114              
115             =cut
116              
117              
118 2     2   138729 use Modern::Perl;
  2         6  
  2         21  
119 2     2   767 use Moose;
  2         434740  
  2         19  
120 2     2   16109 use namespace::autoclean;
  2         6319  
  2         14  
121 2     2   481 use Data::Serializer;
  2         2353  
  2         63  
122 2     2   14 use Time::HiRes qw/ alarm sleep /;
  2         4  
  2         19  
123 2     2   872 use File::Temp ();
  2         8262  
  2         798  
124              
125             our $VERSION = '0.12';
126              
127             extends 'Async::Simple::Task::Fork';
128              
129              
130             =head1 Attributes
131              
132             =head2 task
133              
134             task = sub {
135             my ( $data ) = @_; # source data for task
136             ... your task code ...
137             return( $result );
138             }
139              
140             =cut
141              
142              
143             =head2 answer
144              
145             Result of current task
146              
147             =cut
148              
149              
150             =head2 has_answer
151              
152             has_answer is true, if the task has been finished and result has been ready
153              
154             =cut
155              
156              
157             =head2 timeout
158              
159             timeout - positive numeric value = seconds between checking for result.
160              
161             inherited from Async::Simple::Task.
162              
163             =cut
164              
165              
166             =head2 kill_on_exit
167              
168             Kills process from parent in case of object desctuction
169              
170             =cut
171              
172              
173             =head2 new()
174              
175             my $task = Async::Simple::Task::ForkTmpFile->new( %all_optional_params );
176              
177              
178             Possible keys for %all_optional_params:
179              
180             task => coderef, function, called for each "data" passed to child process via $task->put( $data );
181              
182             timeout => timeout in seconds between child checkings for new data passed. default 0.01
183              
184             kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).
185              
186             =cut
187              
188              
189             =head2 BUILD
190              
191             internal. Some tricks here:)
192              
193             1. Master process called $task->new with fork() inside
194             2. After forking done we have two processes:
195             2.1. Master gets one side of reader/writer tmp file handlers and pid of child
196             2.2. Child - another side of tmp file handlers and extra logic with everlasting loop
197              
198             =cut
199              
200              
201             =head2 fork_child
202              
203             Makes child process and returns pid of child process to parent or 0 to child process
204              
205             =cut
206              
207             sub fork_child {
208 6     6 1 19 my ( $self ) = @_;
209              
210             # Connections via tmp files: parent -> child and child -> parent
211 6         72 my $parent_writer = File::Temp->new();
212 6         4294 my $parent_reader = File::Temp->new();
213              
214 6         2360 my $parent_writer_fname = $parent_writer->filename;
215 6         68 my $parent_reader_fname = $parent_reader->filename;
216              
217              
218 6   50     11853 my $pid = fork() // die "fork() failed: $!";
219              
220             # child
221 6 50       163 unless ( $pid ) {
222 0         0 open( my $child_reader, '<', $parent_writer_fname );
223 0         0 open( my $child_writer, '>', $parent_reader_fname );
224              
225 0         0 $child_writer->autoflush(1);
226              
227 0         0 $self->writer( $child_writer );
228 0         0 $self->reader( $child_reader );
229              
230             # Important!
231             # Just after that we trap into BUILD
232             # with the infinitive loop for child process (pid=0)
233 0         0 return 0;
234             }
235              
236             # parent
237 6         393 $parent_writer->autoflush(1);
238              
239 6         2019 $self->writer( $parent_writer );
240 6         361 $self->reader( $parent_reader );
241              
242 6         364 return $pid;
243             };
244              
245              
246             =head2 get
247              
248             Reads from task, if something can be readed or returns undef after timeout.
249              
250             my $result = $task->get;
251              
252             Please note! If your function can return an undef value, then you shoud check
253              
254             $task->has_result.
255              
256             =cut
257              
258             sub get {
259 5     5 1 5505543 my ( $self ) = @_;
260              
261 5         336 my $fh = $self->reader;
262 5         14 my $data;
263              
264             # Try to read "marker" into data within timeout
265             # Each pack starts with an empty line and serialized string of useful data.
266             eval {
267 5     0   133 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
  0         0  
268 5         208 alarm $self->timeout;
269 5         251 $data = <$fh>;
270 5         97 alarm 0;
271 5 50       14 } or do {
272             # Can't read data
273 0 0       0 return unless $data;
274             # Alarm caught but something readed, will continue
275 0         0 undef $@;
276             };
277              
278 5 100       35 return unless defined $data;
279 4 50       125 return unless $data eq "-\n";
280              
281             # Read useful data without any timeouts
282             # or die, if parent/child has closed connection
283 4         20 undef $data;
284              
285 4         19 for ( 1..1000 ) {
286 4         29 $data = <$fh>;
287 4 50       41 last if defined $data;
288 0         0 sleep $self->timeout;
289             }
290              
291 4 50       24 return unless defined $data;
292              
293             my $answer = $data
294 4 50       28 ? eval{ $self->serializer->deserialize( $data )->[0] }
  4         181  
295             : undef;
296              
297 4         1296 $self->answer( $answer );
298             };
299              
300              
301             =head2 put
302              
303             Writes task to task.
304              
305             $task->put( $data );
306              
307             =cut
308              
309              
310             =head2 get_serializer
311              
312             Internal. Returns an object that must have 2 methods: serialize and deserialize.
313             By default returns Data::Serializer with Storable as backend.
314              
315             $self->serializer->serialize( $task_data_ref );
316              
317             $result_ref = $self->serializer->deserialize();
318              
319             =cut
320              
321              
322             __PACKAGE__->meta->make_immutable;