File Coverage

blib/lib/POE/Component/JobQueue.pm
Criterion Covered Total %
statement 109 123 88.6
branch 31 58 53.4
condition 19 44 43.1
subroutine 14 16 87.5
pod 0 9 0.0
total 173 250 69.2


line stmt bran cond sub pod time code
1             # $Id: JobQueue.pm 29 2009-07-28 06:33:37Z rcaputo $
2             # License and documentation are after __END__.
3              
4             package POE::Component::JobQueue;
5              
6 1     1   166470 use strict;
  1         3  
  1         33  
7              
8 1     1   4 use vars qw($VERSION);
  1         1  
  1         43  
9             $VERSION = '0.571';
10              
11 1     1   5 use Carp qw (croak);
  1         5  
  1         43  
12              
13 1     1   4 use POE::Session;
  1         1  
  1         5  
14              
15             sub DEBUG () { 0 };
16              
17             # Spawn a new PoCo::JobQueue session. This basically is a
18             # constructor, but it isn't named "new" because it doesn't create a
19             # usable object. Instead, it spawns the object off as a session.
20              
21             sub spawn {
22 2     2 0 1274 my $type = shift;
23              
24 2 50       9 croak "$type requires an even number of parameters" if @_ % 2;
25              
26 2         9 my %params = @_;
27              
28             ### Parameters that are common to both types of job queue.
29              
30 2         5 my $alias = delete $params{Alias};
31 2 50 33     22 $alias = 'queuer' unless defined $alias and length $alias;
32              
33 2         5 my $worker = delete $params{Worker};
34 2 50 33     12 croak "$type requires a coderef Worker parameter"
35             unless defined $worker and ref($worker) eq 'CODE';
36              
37 2         6 my $worker_limit = delete $params{WorkerLimit};
38 2 50 33     19 $worker_limit = 8 unless defined $worker_limit and $worker_limit > 0;
39              
40 2 50 50     16 croak "$type requires either an Active or a Passive parameter block"
41             unless defined($params{Active}) xor defined($params{Passive});
42              
43             ### Parameters and states that are common to both types of queue.
44              
45 2         5 my @args = ( $alias, $worker_limit, $worker );
46             my %states =
47             ( _child => \&poco_jobqueue_both_child,
48             stop => \&poco_jobqueue_both_stop,
49 2     2   2161 _stop => sub {},
50 2         13 );
51              
52             ### Modal parameters and states go here.
53              
54             # Set up for an active queue.
55 2 100       12 if (exists $params{Active}) {
    50          
56 1         3 my $active = delete $params{Active};
57              
58 1         2 my $poll_interval = delete $active->{PollInterval};
59 1 50 33     6 $poll_interval = undef
60             unless defined $poll_interval and $poll_interval > 0;
61              
62 1         2 my $ack_alias = delete $active->{AckAlias};
63 1 50 33     9 $ack_alias = undef unless defined $ack_alias and length $ack_alias;
64              
65 1         3 my $ack_state = delete $active->{AckState};
66 1 50 33     14 $ack_state = undef unless defined $ack_state and length $ack_state;
67              
68 1 50 25     14 croak "$type must have neither or both AckAlias and AckState"
69             if defined($ack_alias) xor defined($ack_state);
70              
71 1         3 $states{_start} = \&poco_jobqueue_active_start;
72 1         3 $states{dequeue} = \&poco_jobqueue_active_dequeue;
73              
74 1         4 push @args, $poll_interval, $ack_alias, $ack_state;
75             }
76              
77             # Set up for a passive queue.
78             elsif (exists $params{Passive}) {
79 1         2 my $passive = delete $params{Passive};
80              
81 1         3 my $prioritizer = delete $passive->{Prioritizer};
82 1 50   9   5 $prioritizer = sub { 1 } unless defined $prioritizer;
  9         23  
83              
84 1 50       4 croak( "$type doesn't know these Passive parameters: ",
85             join(', ', sort keys %$passive)
86             ) if scalar keys %$passive;
87              
88 1         4 $states{_start} = \&poco_jobqueue_passive_start;
89 1         3 $states{dequeue} = \&poco_jobqueue_passive_dequeue;
90 1         3 $states{enqueue} = \&poco_jobqueue_passive_enqueue;
91              
92 1         2 push @args, $prioritizer;
93             }
94              
95 2 50       7 croak( "$type doesn't know these parameters: ",
96             join(', ', sort keys %params)
97             ) if scalar keys %params;
98              
99             # Spawn whichever queue we've built.
100 2         10 POE::Session->create
101             ( inline_states => \%states,
102             args => \@args,
103             );
104              
105 2         334 undef;
106             }
107              
108             # Helper function for active job queues.
109              
110             sub poco_jobqueue_active_meta_postback {
111 0     0 0 0 die "unimplemented bit";
112             }
113              
114             # Start an active job queue. This type of queue polls for new jobs.
115              
116             sub poco_jobqueue_active_start {
117 1     1 0 178 my ( $kernel, $heap,
118             $alias, $worker_limit, $worker_ref,
119             $poll_interval, $ack_alias, $ack_state
120             ) = @_[KERNEL, HEAP, ARG0..ARG5];
121              
122             # Common parameters.
123 1         3 $heap->{alias} = $alias;
124 1         2 $heap->{worker_limit} = $worker_limit;
125 1         2 $heap->{worker_ref} = $worker_ref;
126              
127             # Active queue parameters.
128 1         3 $heap->{poll_interval} = $poll_interval;
129             $heap->{meta_postback} =
130             sub {
131 10     10   88 my @job = @_;
132 10         43 my $session = $kernel->alias_resolve( $ack_alias );
133 10 50       360 return $session->postback( $ack_state, @job ) if defined $session;
134 0         0 return sub { 1 };
  0         0  
135 1         6 };
136              
137             # State variables. Pending polls starts at 1 because we're going to
138             # fake an initial poll to get things started.
139 1         2 $heap->{worker_count} = 0;
140 1         3 $heap->{pending_polls} = 0;
141 1         3 $heap->{latest_worker} = 0;
142              
143             # Register an alias.
144 1         4 $kernel->alias_set($alias);
145              
146             # Start an initial set of workers.
147 1         32 $kernel->yield( 'dequeue' );
148             }
149              
150             # Start a passive job queue. This type of queue waits for something
151             # else to enqueue jobs.
152              
153             sub poco_jobqueue_passive_start {
154 1     1 0 298 my ( $kernel, $heap,
155             $alias, $worker_limit, $worker_ref,
156             $prioritizer
157             ) = @_[KERNEL, HEAP, ARG0..ARG3];
158              
159             # Common parameters.
160 1         3 $heap->{alias} = $alias;
161 1         2 $heap->{worker_limit} = $worker_limit;
162 1         3 $heap->{worker_ref} = $worker_ref;
163              
164             # Active queue parameters.
165 1         2 $heap->{prioritizer} = $prioritizer;
166              
167             # State variables.
168 1         2 $heap->{worker_count} = 0;
169 1         4 $heap->{job_queue} = [ ];
170              
171             # Register an alias.
172 1         5 $kernel->alias_set($alias);
173             }
174              
175             # A worker either has come or gone. Track the number of running
176             # workers, and spawn new ones if appropriate.
177              
178             sub poco_jobqueue_both_child {
179 40     40 0 5998533 my ($kernel, $heap, $operation) = @_[KERNEL, HEAP, ARG0];
180              
181             # A worker has begun its job. Count it so we know how many exist.
182              
183 40 100 66     305 if ($operation eq 'gain' or $operation eq 'create') {
184 20         21 DEBUG and warn "JQ: job queue $heap->{alias} got a new worker";
185 20         86 $heap->{worker_count}++;
186             }
187              
188             # A worker has finished. Decrement our worker count, and try to
189             # start another worker to take its place.
190              
191             else {
192 20         34 DEBUG and warn "JQ: job queue $heap->{alias} lost a worker";
193 20 50       111 warn( "worker count ($heap->{worker_count}) exceeded the limit (",
194             $heap->{worker_limit}, ")"
195             ) if $heap->{worker_count} > $heap->{worker_limit};
196 20         41 $heap->{worker_count}--;
197 20 100 66     238 $kernel->yield('dequeue') unless (
198             $heap->{latest_worker} or $heap->{stopped}
199             );
200             }
201             }
202              
203             # Remove the alias, stop active polling and delete outstanding job queue
204              
205             sub poco_jobqueue_both_stop {
206 0     0 0 0 my ($kernel, $heap) = @_[KERNEL, HEAP];
207              
208 0         0 $kernel->alias_remove($heap->{alias});
209 0         0 $kernel->alarm_remove_all();
210              
211 0 0       0 delete $heap->{pollinterval} if ($heap->{pollinterval});
212 0 0       0 delete $heap->{job_queue} if ($heap->{job_queue});
213              
214 0         0 $heap->{stopped} = 1;
215             }
216              
217             # Attempt to fill empty worker slots.
218              
219             # This is a token for ARG0 that signifies this was a timed event.
220             sub TIMED () { 31415 }
221              
222             sub poco_jobqueue_active_dequeue {
223 7     7 0 3857 my ($kernel, $heap, $is_timed) = @_[KERNEL, HEAP, ARG0];
224              
225             # If this is a poll from a timed event, then decrement the pending
226             # polls count. The pending polls count is just to ensure that
227             # redundant delays are not set, because each redundant delay would
228             # force the existing one forward in time. They would delay polling
229             # past the hard polling interval, which would probably be bad (and
230             # could stave off polling indefinitely in some instances). I think
231             # this is a bit of a hack, and something better should replace it.
232              
233 7 50 33     40 if (defined $is_timed and $is_timed == TIMED) {
234             # Decrement the number of pending polls. There can be only one,
235             # so throw in a die for assertion testing.
236 0 0       0 die "pending polls should now be zero (not $heap->{pending_polls})"
237             if --$heap->{pending_polls};
238             }
239              
240             # Attempt to fill the empty worker slots.
241 7   66     84 while (
242             not $heap->{stopped}
243             and $heap->{worker_count} < $heap->{worker_limit}
244             ) {
245              
246             # Call the worker to fetch a new job and spawn a session.
247 11         23 my $previous_worker_count = $heap->{worker_count};
248 11         45 $heap->{worker_ref}->( $heap->{meta_postback} );
249              
250             # If the worker count hasn't changed, then we've run out of jobs.
251             # Begin polling, if applicable, and exit the spawn loop.
252 11 100       1163 if ($heap->{worker_count} == $previous_worker_count) {
253 1 50 33     6 if (defined $heap->{poll_interval} and !$heap->{pending_polls}) {
254 0         0 $heap->{pending_polls}++;
255 0         0 $kernel->delay( dequeue => $heap->{poll_interval} => TIMED );
256             }
257 1 50       5 $heap->{latest_worker}++ unless defined $heap->{poll_interval};
258 1         4 last;
259             }
260             }
261             }
262              
263             # Attempt to fill empty worker slots.
264              
265             sub poco_jobqueue_passive_dequeue {
266 20     20 0 5386 my ($kernel, $heap) = @_[KERNEL, HEAP];
267              
268             # Attempt to fill the empty worker slots.
269 20   66     163 while (
270             not $heap->{stopped}
271             and $heap->{worker_count} < $heap->{worker_limit}
272             ) {
273              
274             # Try to fetch another job from the queue.
275 15         421 my $next_job = shift @{ $heap->{job_queue} };
  15         41  
276 15 100       53 last unless defined $next_job;
277              
278 10         13 DEBUG and
279             warn "JQ: job queue $heap->{alias} is starting a new worker";
280              
281             # Start a new session with the job.
282 10         46 $heap->{worker_ref}->( @$next_job );
283             }
284              
285             # Avoid accidentally returning something.
286 20         658 undef;
287             }
288              
289             # Enqueue a job in a passive queue.
290              
291             sub poco_jobqueue_passive_enqueue {
292 10     10 0 1544 my ($kernel, $sender, $heap, $return_state, @job) =
293             @_[KERNEL, SENDER, HEAP, ARG0..$#_];
294              
295 10 50       28 if ($heap->{stopped}) {
296 0         0 DEBUG and warn(
297             "JQ: $heap->{alias} can not enqueue new jobs after 'stop'\n"
298             );
299              
300 0         0 return;
301             }
302              
303 10         10 DEBUG and warn "JQ: job queue $heap->{alias} enqueuing a new job";
304              
305 10         10 my $postback;
306 10 50       21 if (defined $return_state) {
307 10         34 $postback = $sender->postback( $return_state, @job );
308             }
309              
310             # Add the job to the queue. Use the prioritizer to find the right
311             # place to put it.
312              
313 10         562 my $queue_index = @{ $heap->{job_queue} };
  10         19  
314 10         25 while ($queue_index--) {
315             last if
316 9 50       26 $heap->{prioritizer}->( $heap->{job_queue}->[$queue_index],
317             \@job,
318             ) >= 0;
319             }
320              
321             # Place the new job after the index we found.
322 10         13 splice( @{$heap->{job_queue}}, $queue_index+1, 0, [ $postback, @job ] );
  10         31  
323              
324             # Dequeue a new event.
325 10         30 $kernel->yield( 'dequeue' );
326             }
327              
328             1;
329              
330             __END__