File Coverage

lib/MooseX/Workers.pm
Criterion Covered Total %
statement 2 4 50.0
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 4 6 66.6


line stmt bran cond sub pod time code
1             package MooseX::Workers;
2             BEGIN {
3 1     1   26537 $MooseX::Workers::AUTHORITY = 'cpan:PERIGRIN';
4             }
5             {
6             $MooseX::Workers::VERSION = '0.23';
7             }
8 1     1   419 use Moose::Role;
  0            
  0            
9             use MooseX::Workers::Job;
10              
11             use MooseX::Workers::Engine;
12              
13             has _ctor_params => (
14             is => 'ro',
15             isa => 'HashRef',
16             lazy => 1,
17             default => sub { {} },
18             );
19              
20             around BUILDARGS => sub {
21             my ($orig, $class, %args) = @_;
22              
23             shift;
24             shift;
25            
26             return $class->$orig( _ctor_params => \%args, @_ );
27             };
28              
29             has Engine => (
30             isa => 'MooseX::Workers::Engine',
31             is => 'ro',
32             lazy => 1,
33             required => 1,
34             builder => '_build_Engine',
35             handles => [
36             qw(
37             max_workers
38             has_workers
39             num_workers
40             put_worker
41             kill_worker
42             get_worker
43             )
44             ],
45             );
46             sub _build_Engine {
47             my $self = shift;
48             my @args;
49             push @args, max_workers => $self->_ctor_params->{max_workers} if exists $self->_ctor_params->{max_workers};
50             MooseX::Workers::Engine->new( visitor => $self, @args );
51             }
52              
53             sub spawn {
54             my ( $self, $cmd, $args ) = @_;
55             return $self->Engine->call( add_worker => $cmd => $args );
56             }
57              
58             __PACKAGE__->meta->add_method( 'fork' => __PACKAGE__->can('spawn') );
59              
60             sub run_command {
61             my ( $self, $cmd ) = @_;
62             $self->Engine->yield( add_worker => $cmd );
63             }
64              
65             sub enqueue {
66             my ( $self, $cmd ) = @_;
67             $self->Engine->call( add_worker => $cmd, { enqueue => 1 } );
68             }
69              
70             sub check_worker_threshold {
71             return $_[0]->num_workers >= $_[0]->max_workers;
72             }
73              
74             sub check_worker_threashold {
75             warn 'check_worker_threashold (note the typo) is deprecated '
76             . 'please use check_worker_threshold instead';
77             shift->check_worker_threshold;
78             }
79              
80             sub stdout_filter { undef }
81             sub stderr_filter { undef }
82              
83             no Moose::Role;
84             1;
85             __END__
86              
87             =head1 NAME
88              
89             MooseX::Workers - Simple sub-process management for asynchronous tasks
90              
91             =head1 SYNOPSIS
92              
93             EXAMPLE #1:
94             package Manager;
95             # This example prints output from the children normally on both STDOUT and STDERR
96              
97             use Moose;
98             with qw(MooseX::Workers);
99              
100             sub run {
101             $_[0]->spawn( sub { sleep 3; print "Hello World\n" } );
102             warn "Running now ... ";
103             POE::Kernel->run();
104             }
105              
106             # Implement our Interface
107             sub worker_stdout { shift; warn join ' ', @_; }
108             sub worker_stderr { shift; warn join ' ', @_; }
109              
110             sub worker_manager_start { warn 'started worker manager' }
111             sub worker_manager_stop { warn 'stopped worker manager' }
112              
113             sub max_workers_reached { warn 'maximum worker count reached' }
114             sub worker_error { shift; warn join ' ', @_; }
115             sub worker_finished { warn 'a worker has finished' }
116             sub worker_started { shift; warn join ' ', @_; }
117             sub sig_child { shift; warn join ' ', @_; }
118             sub sig_TERM { shift; warn 'Handled TERM' }
119              
120             no Moose;
121              
122             Manager->new->run();
123              
124              
125             EXAMPLE #2:
126             package Manager;
127              
128             # This example prints output from the children normally on
129             # STDERR but uses STDOUT to returns a hashref from the child to
130             # the parent
131              
132             use Moose;
133             with qw(MooseX::Workers);
134             use POE qw(Filter::Reference Filter::Line);
135              
136             sub run {
137             $_[0]->spawn(
138             sub {
139             sleep 3;
140              
141             # Return a hashref (arrayref, whatever) to the parent using P::F::Reference
142             print @{POE::Filter::Reference->new->put([ {msg => "Hello World"} ])}; # Note the [] around the return val
143              
144             # Print normally using P::F::Line (shown for
145             # completeness; in practice, just don't bother
146             # defining the _filter method
147             #
148             print STDERR "Hey look, an error message";
149             }
150             );
151              
152             POE::Kernel->run();
153             }
154              
155             # Implement our Interface
156             # These two are both optional; if defined (as here), they
157             # should return a subclass of POE::Filter.
158             sub stdout_filter { POE::Filter::Reference->new }
159             sub stderr_filter { POE::Filter::Line->new }
160              
161             sub worker_stdout {
162             my ( $self, $result ) = @_; # $result will be a hashref: {msg => "Hello World"}
163             print $result->{msg};
164              
165             # Note that you can do more than just print the message --
166             # e.g. this is the way to return data from the children for
167             # accumulation in the parent.
168             }
169             sub worker_stderr {
170             my ( $self, $stderr_msg ) = @_; # $stderr_msg will be a string: "Hey look, an error message";
171             warn $stderr_msg;
172             }
173              
174             # From here down, this is identical to the previous example.
175             sub worker_manager_start { warn 'started worker manager' }
176             sub worker_manager_stop { warn 'stopped worker manager' }
177              
178             sub max_workers_reached { warn 'maximum worker count reached' }
179             sub worker_error { shift; warn join ' ', @_; }
180             sub worker_finished { warn 'a worker has finished' }
181             sub worker_started { shift; warn join ' ', @_; }
182             sub sig_child { shift; warn join ' ', @_; }
183             sub sig_TERM { shift; warn 'Handled TERM' }
184              
185             no Moose;
186              
187             Manager->new->run();
188              
189             =head1 DESCRIPTION
190              
191             MooseX::Workers is a Role that provides easy delegation of long-running tasks
192             into a managed child process. Process management is taken care of via POE and its
193             POE::Wheel::Run module.
194              
195             =head1 METHODS
196              
197             =over
198              
199             =item spawn ($command)
200              
201             =item fork ($command)
202              
203             =item run_command ($command)
204              
205             These three methods are the whole point of this module.
206             They pass $command through to the MooseX::Worker::Engine which will take
207             care of running $command for you.
208              
209             spawn() and fork() both invoke L<POE::Kernel> call(), which is synchronous.
210              
211             run_command() invokes L<POE::Kernel> yield(), which is asynchronous.
212              
213             If max_workers() has been reached, run_command() warns and does nothing. It is up to you to re-submit
214             $command. See enqueue() if you want us to run $command as soon as another worker is free.
215              
216             =item enqueue($command)
217              
218             Just like run_command(), only that if max_workers() has been set and that number of workers
219             has been reached, then we add $command to a FIFO command queue. As soon as any running
220             worker exits, the first $command in queue (if any) will be run.
221              
222             =item check_worker_threshold
223              
224             This will check to see how many workers you have compared to the max_workers limit. It returns true
225             if the $num_workers is >= $max_workers;
226              
227             =item max_workers($count)
228              
229             An accessor for the maximum number of workers. This is delegated to the MooseX::Workers::Engine object.
230              
231             =item has_workers
232              
233             Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
234              
235             =item num_workers
236              
237             Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
238              
239             =item meta
240              
241             The Metaclass for MooseX::Workers::Engine see Moose's documentation.
242              
243             =back
244              
245             =head1 INTERFACE
246              
247             MooseX::Worker::Engine supports the following callbacks:
248              
249             =over
250              
251             =item worker_manager_start
252              
253             Called when the managing session is started
254              
255             =item worker_manager_stop
256              
257             Called when the managing session stops
258              
259             =item max_workers_reached
260              
261             Called when we reach the maximum number of workers
262              
263             =item stdout_filter
264              
265             OPTIONAL. If defined, this should return an object that isa
266             POE::Filter. If it doesn't, the results are undefined. Anything that
267             a child proc sends on STDOUT will be passed through the relevant
268             filter.
269              
270             =item stderr_filter
271              
272             OPTIONAL. If defined, this should return an object that isa
273             POE::Filter. If it doesn't, the results are undefined. Anything that
274             a child proc sends on STDERR will be passed through the relevant
275             filter.
276              
277             =item worker_stdout
278              
279             Called when a child prints to STDOUT. If C<stdout_filter> was
280             defined, the output will be filtered appropriately, as described
281             above. This is useful to allow child processes to return data to the
282             parent (generally via POE::Filter::Reference).
283              
284             =item worker_stderr
285              
286             Called when a child prints to STDERR. Filtered through the result of
287             C<stderr_filter> if that method is defined.
288              
289             =item worker_error
290              
291             Called when there is an error condition detected with the child.
292              
293             =item worker_finished
294              
295             Called when a worker completes $command.
296              
297             If the command was a L<MooseX::Workers::Job>, it will get the removed job
298             instance as the first parameter.
299              
300             =item worker_done
301              
302             B<*DEPRECATED*>
303              
304             This is called before the worker is removed, so L</num_workers> and
305             L</has_workers> does not reflect that a worker has just finished. Use
306             L</worker_finished> instead.
307              
308             Gets the L<MooseX::Workers::Job> instance, if the $command was a job, and the
309             L<POE::Wheel::Run> id otherwise.
310              
311             =item worker_started
312              
313             Called when a worker starts $command
314              
315             =item sig_child
316              
317             Called when the mangaging session recieves a SIG CHLD event
318              
319             =item sig_*
320              
321             Called when the underlying POE Kernel receives a signal; this is not limited to
322             OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept
323             arbitrary POE signals (sent via POE::Kernel->signal), but does exclude
324             SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
325              
326             These interface methods are automatically inserted when MooseX::Worker::Engine
327             detects that your manager class contains any methods beginning with sig_.
328             Signals are case-sensitive, so if you wish to handle a TERM signal, you must
329             define a sig_TERM() method. Note also that this action is performed upon
330             MooseX::Worker::Engine startup, so any run-time modification of your class
331             which 'does' MooseX::Workers is not likely to be detected.
332              
333             See the sig_TERM handler in the SYNOPSIS for an example.
334              
335             =back
336              
337             =back
338              
339             See L<MooseX::Workers::Engine> for more details.
340             Also see L<MooseX::Workers::Job> if you'd like to give your tasks
341             names, or set timeouts on them.
342              
343             =head1 WIN32 NOTES
344              
345             You don't need to binmode the STDIN/STDOUT/STDERR streams in your coderefs, this
346             is done for you. If you need utf8, it is safe to re-binmode them to
347             C<:encoding(UTF-8)>.
348              
349             Coderef workers that time out are killed with a SIGINT rather than a SIGTERM,
350             because TERM does not behave compatibly (thanks Rocco!) This is done with a:
351              
352             local $SIG{INT} = sub { exit 0 };
353              
354             that wraps the coderef.
355              
356             You cannot catch a TERM sent to the parent process (see L<perlport/kill>, use
357             INT instead.
358              
359             External programs are run with L<Win32::Job> by L<POE::Wheel::Run>. They are
360             prepended with C<cmd /c> so that builtin cmd commands also work. Use a
361             L<MooseX::Workers::Job> with a string program and arrayref args for this. If
362             you are using L<POE::Filter::Line> with an external program (which is the
363             default if you don't set the filter) the CRs from line ends will be removed
364             automatically.
365              
366             =head1 BUGS AND LIMITATIONS
367              
368             Please report any bugs or feature requests to
369             C<bug-moosex-workers@rt.cpan.org>, or through the web interface at
370             L<http://rt.cpan.org>.
371              
372             Version control: L<https://github.com/jhannah/moosex-workers>
373              
374             =head1 AUTHORS
375              
376             Chris Prather C<< <perigrin@cpan.org> >>
377              
378             Tom Lanyon C<< <dec@cpan.org> >>
379              
380             Jay Hannah C<< <jay@jays.net> >>
381              
382             Justin Hunter C<< <justin.d.hunter@gmail.com> >>
383              
384             David K. Storrs C<< <david.storrs@gmail.com> >>
385              
386             Rafael Kitover C<< <rkitover@cpan.org> >>
387              
388             =head1 LICENCE AND COPYRIGHT
389              
390             Copyright (c) 2007-2013, Chris Prather C<< <perigrin@cpan.org> >>. Some rights reserved.
391              
392             This module is free software; you can redistribute it and/or
393             modify it under the same terms as Perl itself. See L<perlartistic>.
394              
395              
396             =head1 DISCLAIMER OF WARRANTY
397              
398             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
399             FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
400             OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
401             PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
402             EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
403             WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
404             ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH
405             YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
406             NECESSARY SERVICING, REPAIR, OR CORRECTION.
407              
408             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
409             WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
410             REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE
411             LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL,
412             OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE
413             THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
414             RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
415             FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
416             SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
417             SUCH DAMAGES.
418              
419             =cut
420              
421             1;
422              
423