File Coverage

lib/MooseX/Workers/Engine.pm
Criterion Covered Total %
statement 97 105 92.3
branch 36 44 81.8
condition 14 21 66.6
subroutine 19 21 90.4
pod 4 8 50.0
total 170 199 85.4


line stmt bran cond sub pod time code
1             package MooseX::Workers::Engine;
2 17     17   75 use Moose;
  17         23  
  17         120  
3 17     17   98667 use POE qw(Wheel::Run);
  17         622311  
  17         113  
4              
5             has visitor => (
6             is => 'ro',
7             does => 'MooseX::Workers',
8             );
9              
10             has max_workers => (
11             isa => 'Int',
12             is => 'rw',
13             default => sub { 5 },
14             );
15              
16             # Processes currently running
17             has process_list => (
18             traits => [ 'Hash' ],
19             isa => 'HashRef',
20             default => sub { {} },
21             handles => {
22             set_process => 'set',
23             get_process => 'get',
24             remove_process => 'delete',
25             process_list => 'kv',
26             }
27             );
28              
29             # Processes waiting to run
30             has process_queue => (
31             traits => [ 'Array' ],
32             isa => 'ArrayRef',
33             default => sub { [] },
34             handles => {
35             enqueue_process => 'push',
36             dequeue_process => 'shift',
37             process_queue => 'elements',
38             }
39             );
40              
41             has workers => (
42             traits => [ 'Hash' ],
43             isa => 'HashRef',
44             is => 'rw',
45             lazy => 1,
46             required => 1,
47             default => sub { {} },
48             handles => {
49             set_worker => 'set',
50             get_worker => 'get',
51             remove_worker => 'delete',
52             has_workers => 'count',
53             num_workers => 'count',
54             get_worker_ids => 'keys',
55             },
56             );
57              
58             has jobs => (
59             traits => [ 'Hash' ],
60             isa => 'HashRef',
61             is => 'rw',
62             lazy => 1,
63             required => 1,
64             default => sub { {} },
65             handles => {
66             set_job => 'set',
67             get_job => 'get',
68             remove_job => 'delete',
69             has_jobs => 'count',
70             num_jobs => 'count',
71             },
72             );
73              
74             has session => (
75             isa => 'POE::Session',
76             is => 'ro',
77             required => 1,
78             lazy => 1,
79             default => sub {
80             POE::Session->create(
81             object_states => [
82             $_[0] => [
83             qw(
84             _start
85             _stop
86             _worker_stdout
87             _worker_stderr
88             _worker_error
89             _worker_done
90             _worker_started
91             _sig_child
92             add_worker
93             _kill_worker
94             )
95             ],
96             ],
97             );
98             },
99             clearer => 'remove_manager',
100             predicate => 'has_manager',
101             );
102              
103             sub yield {
104 78     78 1 849 my $self = shift;
105 78         2656 $poe_kernel->post( $self->session => @_ );
106             }
107              
108             sub call {
109 120     120 1 147 my $self = shift;
110 120         2339 return $poe_kernel->call( $self->session => @_ );
111             }
112              
113             sub put_worker {
114 0     0 0 0 my ( $self, $wheel_id ) = splice @_, 0, 2;
115 0         0 $self->get_worker($wheel_id)->put(@_);
116             }
117              
118             sub kill_worker {
119 0     0 0 0 my ( $self, $wheel_id ) = splice @_, 0, 2;
120 0         0 $self->get_worker($wheel_id)->kill(@_);
121 0         0 $self->remove_worker($wheel_id);
122             }
123              
124             sub stdout_filter {
125 86     86 0 1769 my $self = $_[OBJECT];
126 86         1614 $self->visitor->stdout_filter;
127             }
128              
129             sub stderr_filter {
130 75     75 0 1048 my $self = $_[OBJECT];
131 75         1395 $self->visitor->stderr_filter;
132             }
133              
134             #
135             # EVENTS
136             #
137              
138             sub add_worker {
139 125     125 1 5458 my ( $self, $job, $args, $kernel, $heap ) = @_[ OBJECT, ARG0, ARG1, KERNEL, HEAP ];
140              
141             # if we've reached the worker threashold, set off a warning
142 125 100       3576 if ( $self->num_workers >= $self->max_workers ) {
143 52 50       101 if ( $args->{enqueue} ) {
144 52         1427 $self->enqueue_process([$job, $args]);
145 52         129 return;
146             } else {
147 0         0 $self->visitor->max_workers_reached($job);
148 0         0 return;
149             }
150             }
151              
152 73         94 my $command;
153 73 100 66     391 if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
154 6         127 $command = $job->command;
155 6         105 $args = $job->args;
156             }
157             else {
158 67         83 $command = $job;
159             }
160              
161 73         81 my @optional_io_filters;
162 73 100       193 push @optional_io_filters, 'StdoutFilter', $self->stdout_filter if $self->stdout_filter;
163 73 100       1064 push @optional_io_filters, 'StderrFilter', $self->stderr_filter if $self->stderr_filter;
164            
165 73 100       993 $args = [$args] unless ref $args eq 'ARRAY';
166              
167 73         497 my $wheel = POE::Wheel::Run->new(
168             Program => $command,
169             ProgramArgs => $args,
170             @optional_io_filters,
171             StdoutEvent => '_worker_stdout',
172             StderrEvent => '_worker_stderr',
173             ErrorEvent => '_worker_error',
174             CloseEvent => '_worker_done',
175             );
176 73         298446 $kernel->sig_child($wheel->PID, "_sig_child");
177              
178 73         11970 $self->set_worker( $wheel->ID => $wheel );
179 73         313 $self->set_process( $wheel->PID => $wheel->ID );
180 73 100 66     491 if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
181 6         34 $job->ID($wheel->ID);
182 6         43 $job->PID($wheel->PID);
183 6         18 $self->set_job( $wheel->ID => $job );
184 6 100       167 if ($job->timeout) {
185 2         31 $heap->{wheel_to_timer}{$wheel->ID} =
186             $kernel->delay_set('_kill_worker', $job->timeout, $wheel->ID);
187             }
188             }
189 73         373 $self->yield( '_worker_started' => $wheel->ID => $job );
190 73         8319 return ( $wheel->ID => $wheel->PID );
191             }
192              
193             sub _kill_worker {
194 1     1   998985 my ( $self, $wheel_id ) = @_[ OBJECT, ARG0 ];
195 1         84 my $job = $self->get_job($wheel_id);
196 1 50       39 $self->visitor->worker_timeout( $job )
197             if $self->visitor->can('worker_timeout');
198 1         979 $self->get_worker($wheel_id)->kill;
199             }
200              
201             sub _start {
202 15     15   4322 my ($self) = $_[OBJECT];
203 15 100       416 $self->visitor->worker_manager_start()
204             if $self->visitor->can('worker_manager_start');
205              
206             # Set an alias to ensure our manager session is not cleaned up.
207 15         10754 $_[KERNEL]->alias_set("manager");
208              
209             # Register the generic signal handler for any signals our visitor
210             # class wishes to receive.
211 15         825 my @visitor_methods = map { $_->name } $self->visitor->meta->get_all_methods;
  553         23051  
212 15         78 for my $sig_handler (grep { /^sig_/ } @visitor_methods){
  553         604  
213 2         9 (my $sig) = ($sig_handler =~ /^sig_(.*)/);
214 2 100 66     24 next if uc($sig) eq 'CHLD' or uc($sig) eq 'CHILD';
215              
216 1         5 $poe_kernel->state( $sig_handler, $self, '_sig_handler' );
217 1         24 $poe_kernel->sig( $sig => $sig_handler );
218             }
219             }
220              
221             sub _stop {
222 15     15   6682 my ($self) = $_[OBJECT];
223 15 100       417 $self->visitor->worker_manager_stop()
224             if $self->visitor->can('worker_manager_stop');
225 15         17908 $self->remove_manager;
226             }
227              
228             sub _sig_child {
229 73     73   20384 my ($self) = $_[OBJECT];
230 73 100       1973 $self->visitor->sig_child( $self->get_process($_[ARG1]), $_[ARG2] )
231             if $self->visitor->can('sig_child');
232 73         2652 $self->remove_process( $_[ARG1] );
233 73         285 $_[KERNEL]->sig_handled();
234             }
235              
236             # A generic sig handler (for everything except SIGCHLD)
237             sub _sig_handler {
238 1     1   214 my ($self, $state) = @_[OBJECT,STATE];
239 1         26 $self->visitor->$state( @_[ARG0..ARG9] );
240 1         357 $_[KERNEL]->sig_handled();
241             }
242              
243             sub _worker_stdout {
244 72     72   51924 my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
245 72         2434 my $job = $self->get_job($wheel_id);
246 72 50 66     1408 $self->visitor->worker_stdout( $input, $job || $wheel_id )
247             if $self->visitor->can('worker_stdout');
248             }
249              
250             sub _worker_stderr {
251 70     70   46882 my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
252 70         143 $wheel_id =~ tr[ -~][]cd;
253 70         2175 my $job = $self->get_job($wheel_id);
254 70 50 66     1430 $self->visitor->worker_stderr( $input, $job || $wheel_id )
255             if $self->visitor->can('worker_stderr');
256             }
257              
258             sub _worker_error {
259 146     146   39708 my ($self) = $_[OBJECT];
260 146 50 33     1150 return if $_[ARG0] eq "read" && $_[ARG1] == 0;
261              
262             # $operation, $errnum, $errstr, $wheel_id
263 0 0       0 $self->visitor->worker_error( @_[ ARG0 .. ARG3 ] )
264             if $self->visitor->can('worker_error');
265             }
266              
267             sub _worker_done {
268 73     73   2318 my ($self, $wheel_id, $kernel, $heap) = @_[ OBJECT, ARG0, KERNEL, HEAP ];
269 73         2768 my $job = $self->get_job($wheel_id);
270 73 100       246 $kernel->alarm_remove(delete $heap->{wheel_to_timer}{$wheel_id}) if $heap->{wheel_to_timer}{$wheel_id};
271 73 100       1709 if ($self->visitor->can('worker_done')) {
272 72 100       203 if ($job) {
273 5         122 $self->visitor->worker_done( $job );
274             } else {
275 67         1284 $self->visitor->worker_done( $wheel_id );
276             }
277             }
278 73         53176 $self->delete_worker( $wheel_id );
279              
280             # If we have free workers and processes in queue, then dequeue one of them.
281 73   100     24260 while ( $self->num_workers < $self->max_workers &&
282             (my $jobref = $self->dequeue_process)
283             ) {
284 52         79 my ($cmd, $args) = @$jobref;
285             # This has to be call(), not yield() so num_workers increments before
286             # next loop above.
287 52         210 $self->call(add_worker => $cmd, $args);
288             }
289             }
290              
291             sub delete_worker {
292 73     73 1 154 my ( $self, $wheelID ) = @_;
293 73         2746 my $wheel = $self->get_worker($wheelID);
294 73         323 $self->remove_worker( $wheel->ID );
295             }
296              
297             sub _worker_started {
298 73     73   56641 my ( $self, $wheel_id, $command ) = @_[ OBJECT, ARG0, ARG1 ];
299 73         2565 my $job = $self->get_job($wheel_id);
300 73 50       1494 if ($self->visitor->can('worker_started')) {
301 73 100       182 if ($job) {
302 6         100 $self->visitor->worker_started( $job )
303             } else {
304 67         1219 $self->visitor->worker_started( $wheel_id, $command )
305             }
306             }
307             }
308              
309              
310 17     17   1310049 no Moose;
  17         58  
  17         176  
311             1;
312             __END__
313              
314             =head1 NAME
315              
316             MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers
317              
318             =head1 SYNOPSIS
319              
320             package MooseX::Workers;
321              
322             has Engine => (
323             isa => 'MooseX::Workers::Engine',
324             is => 'ro',
325             lazy => 1,
326             required => 1,
327             default => sub { MooseX::Workers::Engine->new( visitor => $_[0] ) },
328             handles => [
329             qw(
330             max_workers
331             has_workers
332             num_workers
333             put_worker
334             kill_worker
335             )
336             ],
337             );
338              
339             =head1 DESCRIPTION
340              
341             MooseX::Workers::Engine provides the main functionality
342             to MooseX::Workers. It wraps a POE::Session and as many POE::Wheel::Run
343             objects as it needs.
344              
345             =head1 ATTRIBUTES
346              
347             =over
348              
349             =item visitor
350              
351             Hold a reference to our main object so we can use the callbacks on it.
352              
353             =item max_workers
354              
355             An Integer specifying the maximum number of workers we have.
356              
357             =item workers
358              
359             An ArrayRef of POE::Wheel::Run objects that are our workers.
360              
361             =item session
362              
363             Contains the POE::Session that controls the workers.
364              
365             =back
366              
367             =head1 METHODS
368              
369             =over
370              
371             =item yield
372              
373             Helper method to post events to our internal manager session.
374              
375             =item call
376              
377             Helper method to call events to our internal manager session.
378             This is synchronous and will block incoming data from the children
379             if it takes too long to return.
380              
381             =item set_worker($key)
382              
383             Set the worker at $key
384              
385             =item get_worker($key)
386              
387             Retrieve the worker at $key
388              
389             =item delete_worker($key)
390              
391             Remove the worker atx $key
392              
393             =item has_workers
394              
395             Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
396              
397             =item num_workers
398              
399             Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
400              
401             =item has_manager
402              
403             Check to see if we have a manager session.
404              
405             =item remove_manager
406              
407             Remove the manager session.
408              
409             =item meta
410              
411             The Metaclass for MooseX::Workers::Engine see Moose's documentation.
412              
413             =back
414              
415             =head1 EVENTS
416              
417             =over
418              
419             =item add_worker ($command)
420              
421             Create a POE::Wheel::Run object to handle $command. If $command holds a scalar, it will be executed as exec($scalar).
422             Shell metacharacters will be expanded in this form. If $command holds an array reference,
423             it will executed as exec(@$array). This form of exec() doesn't expand shell metacharacters.
424             If $command holds a code reference, it will be called in the forked child process, and then
425             the child will exit.
426              
427             See POE::Wheel::Run for more details.
428              
429             =back
430              
431             =head1 INTERFACE
432              
433             MooseX::Worker::Engine fires the following callbacks to its visitor object:
434              
435             =over
436              
437             =item worker_manager_start
438              
439             Called when the managing session is started.
440              
441             =item worker_manager_stop
442              
443             Called when the managing session stops.
444              
445             =item max_workers_reached
446              
447             Called when we reach the maximum number of workers.
448              
449             =item worker_stdout
450              
451             Called when a child prints to STDOUT.
452              
453             =item worker_stderr
454              
455             Called when a child prints to STDERR.
456              
457             =item worker_error
458              
459             Called when there is an error condition detected with the child.
460              
461             =item worker_done
462              
463             Called when a worker completes $command.
464              
465             =item worker_started
466              
467             Called when a worker starts $command.
468              
469             =item sig_child($PID, $ret)
470              
471             Called when the managing session receives a SIG CHLD event.
472              
473             =item sig_*
474              
475             Called when the underlying POE Kernel receives a signal; this is not limited to
476             OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept
477             arbitrary POE signals (sent via POE::Kernel->signal), but does exclude
478             SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
479              
480             These interface methods are automatically inserted when MooseX::Worker::Engine
481             detects that the visitor object contains any methods beginning with sig_.
482             Signals are case-sensitive, so if you wish to handle a TERM signal, you must
483             define a sig_TERM() method. Note also that this action is performed upon
484             MooseX::Worker::Engine startup, so any run-time modification of the visitor
485             object is not likely to be detected.
486              
487             =back
488              
489             =cut
490              
491             1;
492              
493