line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# $Id: JobQueue.pm 29 2009-07-28 06:33:37Z rcaputo $ |
2
|
|
|
|
|
|
|
# License and documentation are after __END__. |
3
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
package POE::Component::JobQueue; |
5
|
|
|
|
|
|
|
|
6
|
1
|
|
|
1
|
|
166470
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
33
|
|
7
|
|
|
|
|
|
|
|
8
|
1
|
|
|
1
|
|
4
|
use vars qw($VERSION); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
43
|
|
9
|
|
|
|
|
|
|
$VERSION = '0.571'; |
10
|
|
|
|
|
|
|
|
11
|
1
|
|
|
1
|
|
5
|
use Carp qw (croak); |
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
43
|
|
12
|
|
|
|
|
|
|
|
13
|
1
|
|
|
1
|
|
4
|
use POE::Session; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
5
|
|
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
sub DEBUG () { 0 }; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
# Spawn a new PoCo::JobQueue session. This basically is a |
18
|
|
|
|
|
|
|
# constructor, but it isn't named "new" because it doesn't create a |
19
|
|
|
|
|
|
|
# usable object. Instead, it spawns the object off as a session. |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
sub spawn { |
22
|
2
|
|
|
2
|
0
|
1274
|
my $type = shift; |
23
|
|
|
|
|
|
|
|
24
|
2
|
50
|
|
|
|
9
|
croak "$type requires an even number of parameters" if @_ % 2; |
25
|
|
|
|
|
|
|
|
26
|
2
|
|
|
|
|
9
|
my %params = @_; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
### Parameters that are common to both types of job queue. |
29
|
|
|
|
|
|
|
|
30
|
2
|
|
|
|
|
5
|
my $alias = delete $params{Alias}; |
31
|
2
|
50
|
33
|
|
|
22
|
$alias = 'queuer' unless defined $alias and length $alias; |
32
|
|
|
|
|
|
|
|
33
|
2
|
|
|
|
|
5
|
my $worker = delete $params{Worker}; |
34
|
2
|
50
|
33
|
|
|
12
|
croak "$type requires a coderef Worker parameter" |
35
|
|
|
|
|
|
|
unless defined $worker and ref($worker) eq 'CODE'; |
36
|
|
|
|
|
|
|
|
37
|
2
|
|
|
|
|
6
|
my $worker_limit = delete $params{WorkerLimit}; |
38
|
2
|
50
|
33
|
|
|
19
|
$worker_limit = 8 unless defined $worker_limit and $worker_limit > 0; |
39
|
|
|
|
|
|
|
|
40
|
2
|
50
|
50
|
|
|
16
|
croak "$type requires either an Active or a Passive parameter block" |
41
|
|
|
|
|
|
|
unless defined($params{Active}) xor defined($params{Passive}); |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
### Parameters and states that are common to both types of queue. |
44
|
|
|
|
|
|
|
|
45
|
2
|
|
|
|
|
5
|
my @args = ( $alias, $worker_limit, $worker ); |
46
|
|
|
|
|
|
|
my %states = |
47
|
|
|
|
|
|
|
( _child => \&poco_jobqueue_both_child, |
48
|
|
|
|
|
|
|
stop => \&poco_jobqueue_both_stop, |
49
|
2
|
|
|
2
|
|
2161
|
_stop => sub {}, |
50
|
2
|
|
|
|
|
13
|
); |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
### Modal parameters and states go here. |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
# Set up for an active queue. |
55
|
2
|
100
|
|
|
|
12
|
if (exists $params{Active}) { |
|
|
50
|
|
|
|
|
|
56
|
1
|
|
|
|
|
3
|
my $active = delete $params{Active}; |
57
|
|
|
|
|
|
|
|
58
|
1
|
|
|
|
|
2
|
my $poll_interval = delete $active->{PollInterval}; |
59
|
1
|
50
|
33
|
|
|
6
|
$poll_interval = undef |
60
|
|
|
|
|
|
|
unless defined $poll_interval and $poll_interval > 0; |
61
|
|
|
|
|
|
|
|
62
|
1
|
|
|
|
|
2
|
my $ack_alias = delete $active->{AckAlias}; |
63
|
1
|
50
|
33
|
|
|
9
|
$ack_alias = undef unless defined $ack_alias and length $ack_alias; |
64
|
|
|
|
|
|
|
|
65
|
1
|
|
|
|
|
3
|
my $ack_state = delete $active->{AckState}; |
66
|
1
|
50
|
33
|
|
|
14
|
$ack_state = undef unless defined $ack_state and length $ack_state; |
67
|
|
|
|
|
|
|
|
68
|
1
|
50
|
25
|
|
|
14
|
croak "$type must have neither or both AckAlias and AckState" |
69
|
|
|
|
|
|
|
if defined($ack_alias) xor defined($ack_state); |
70
|
|
|
|
|
|
|
|
71
|
1
|
|
|
|
|
3
|
$states{_start} = \&poco_jobqueue_active_start; |
72
|
1
|
|
|
|
|
3
|
$states{dequeue} = \&poco_jobqueue_active_dequeue; |
73
|
|
|
|
|
|
|
|
74
|
1
|
|
|
|
|
4
|
push @args, $poll_interval, $ack_alias, $ack_state; |
75
|
|
|
|
|
|
|
} |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
# Set up for a passive queue. |
78
|
|
|
|
|
|
|
elsif (exists $params{Passive}) { |
79
|
1
|
|
|
|
|
2
|
my $passive = delete $params{Passive}; |
80
|
|
|
|
|
|
|
|
81
|
1
|
|
|
|
|
3
|
my $prioritizer = delete $passive->{Prioritizer}; |
82
|
1
|
50
|
|
9
|
|
5
|
$prioritizer = sub { 1 } unless defined $prioritizer; |
|
9
|
|
|
|
|
23
|
|
83
|
|
|
|
|
|
|
|
84
|
1
|
50
|
|
|
|
4
|
croak( "$type doesn't know these Passive parameters: ", |
85
|
|
|
|
|
|
|
join(', ', sort keys %$passive) |
86
|
|
|
|
|
|
|
) if scalar keys %$passive; |
87
|
|
|
|
|
|
|
|
88
|
1
|
|
|
|
|
4
|
$states{_start} = \&poco_jobqueue_passive_start; |
89
|
1
|
|
|
|
|
3
|
$states{dequeue} = \&poco_jobqueue_passive_dequeue; |
90
|
1
|
|
|
|
|
3
|
$states{enqueue} = \&poco_jobqueue_passive_enqueue; |
91
|
|
|
|
|
|
|
|
92
|
1
|
|
|
|
|
2
|
push @args, $prioritizer; |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
2
|
50
|
|
|
|
7
|
croak( "$type doesn't know these parameters: ", |
96
|
|
|
|
|
|
|
join(', ', sort keys %params) |
97
|
|
|
|
|
|
|
) if scalar keys %params; |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
# Spawn whichever queue we've built. |
100
|
2
|
|
|
|
|
10
|
POE::Session->create |
101
|
|
|
|
|
|
|
( inline_states => \%states, |
102
|
|
|
|
|
|
|
args => \@args, |
103
|
|
|
|
|
|
|
); |
104
|
|
|
|
|
|
|
|
105
|
2
|
|
|
|
|
334
|
undef; |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
# Helper function for active job queues. |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
sub poco_jobqueue_active_meta_postback { |
111
|
0
|
|
|
0
|
0
|
0
|
die "unimplemented bit"; |
112
|
|
|
|
|
|
|
} |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
# Start an active job queue. This type of queue polls for new jobs. |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
sub poco_jobqueue_active_start { |
117
|
1
|
|
|
1
|
0
|
178
|
my ( $kernel, $heap, |
118
|
|
|
|
|
|
|
$alias, $worker_limit, $worker_ref, |
119
|
|
|
|
|
|
|
$poll_interval, $ack_alias, $ack_state |
120
|
|
|
|
|
|
|
) = @_[KERNEL, HEAP, ARG0..ARG5]; |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# Common parameters. |
123
|
1
|
|
|
|
|
3
|
$heap->{alias} = $alias; |
124
|
1
|
|
|
|
|
2
|
$heap->{worker_limit} = $worker_limit; |
125
|
1
|
|
|
|
|
2
|
$heap->{worker_ref} = $worker_ref; |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
# Active queue parameters. |
128
|
1
|
|
|
|
|
3
|
$heap->{poll_interval} = $poll_interval; |
129
|
|
|
|
|
|
|
$heap->{meta_postback} = |
130
|
|
|
|
|
|
|
sub { |
131
|
10
|
|
|
10
|
|
88
|
my @job = @_; |
132
|
10
|
|
|
|
|
43
|
my $session = $kernel->alias_resolve( $ack_alias ); |
133
|
10
|
50
|
|
|
|
360
|
return $session->postback( $ack_state, @job ) if defined $session; |
134
|
0
|
|
|
|
|
0
|
return sub { 1 }; |
|
0
|
|
|
|
|
0
|
|
135
|
1
|
|
|
|
|
6
|
}; |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
# State variables. Pending polls starts at 1 because we're going to |
138
|
|
|
|
|
|
|
# fake an initial poll to get things started. |
139
|
1
|
|
|
|
|
2
|
$heap->{worker_count} = 0; |
140
|
1
|
|
|
|
|
3
|
$heap->{pending_polls} = 0; |
141
|
1
|
|
|
|
|
3
|
$heap->{latest_worker} = 0; |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# Register an alias. |
144
|
1
|
|
|
|
|
4
|
$kernel->alias_set($alias); |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
# Start an initial set of workers. |
147
|
1
|
|
|
|
|
32
|
$kernel->yield( 'dequeue' ); |
148
|
|
|
|
|
|
|
} |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
# Start a passive job queue. This type of queue waits for something |
151
|
|
|
|
|
|
|
# else to enqueue jobs. |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
sub poco_jobqueue_passive_start { |
154
|
1
|
|
|
1
|
0
|
298
|
my ( $kernel, $heap, |
155
|
|
|
|
|
|
|
$alias, $worker_limit, $worker_ref, |
156
|
|
|
|
|
|
|
$prioritizer |
157
|
|
|
|
|
|
|
) = @_[KERNEL, HEAP, ARG0..ARG3]; |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
# Common parameters. |
160
|
1
|
|
|
|
|
3
|
$heap->{alias} = $alias; |
161
|
1
|
|
|
|
|
2
|
$heap->{worker_limit} = $worker_limit; |
162
|
1
|
|
|
|
|
3
|
$heap->{worker_ref} = $worker_ref; |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
# Active queue parameters. |
165
|
1
|
|
|
|
|
2
|
$heap->{prioritizer} = $prioritizer; |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
# State variables. |
168
|
1
|
|
|
|
|
2
|
$heap->{worker_count} = 0; |
169
|
1
|
|
|
|
|
4
|
$heap->{job_queue} = [ ]; |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# Register an alias. |
172
|
1
|
|
|
|
|
5
|
$kernel->alias_set($alias); |
173
|
|
|
|
|
|
|
} |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# A worker either has come or gone. Track the number of running |
176
|
|
|
|
|
|
|
# workers, and spawn new ones if appropriate. |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
sub poco_jobqueue_both_child { |
179
|
40
|
|
|
40
|
0
|
5998533
|
my ($kernel, $heap, $operation) = @_[KERNEL, HEAP, ARG0]; |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
# A worker has begun its job. Count it so we know how many exist. |
182
|
|
|
|
|
|
|
|
183
|
40
|
100
|
66
|
|
|
305
|
if ($operation eq 'gain' or $operation eq 'create') { |
184
|
20
|
|
|
|
|
21
|
DEBUG and warn "JQ: job queue $heap->{alias} got a new worker"; |
185
|
20
|
|
|
|
|
86
|
$heap->{worker_count}++; |
186
|
|
|
|
|
|
|
} |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
# A worker has finished. Decrement our worker count, and try to |
189
|
|
|
|
|
|
|
# start another worker to take its place. |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
else { |
192
|
20
|
|
|
|
|
34
|
DEBUG and warn "JQ: job queue $heap->{alias} lost a worker"; |
193
|
20
|
50
|
|
|
|
111
|
warn( "worker count ($heap->{worker_count}) exceeded the limit (", |
194
|
|
|
|
|
|
|
$heap->{worker_limit}, ")" |
195
|
|
|
|
|
|
|
) if $heap->{worker_count} > $heap->{worker_limit}; |
196
|
20
|
|
|
|
|
41
|
$heap->{worker_count}--; |
197
|
20
|
100
|
66
|
|
|
238
|
$kernel->yield('dequeue') unless ( |
198
|
|
|
|
|
|
|
$heap->{latest_worker} or $heap->{stopped} |
199
|
|
|
|
|
|
|
); |
200
|
|
|
|
|
|
|
} |
201
|
|
|
|
|
|
|
} |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
# Remove the alias, stop active polling and delete outstanding job queue |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
sub poco_jobqueue_both_stop { |
206
|
0
|
|
|
0
|
0
|
0
|
my ($kernel, $heap) = @_[KERNEL, HEAP]; |
207
|
|
|
|
|
|
|
|
208
|
0
|
|
|
|
|
0
|
$kernel->alias_remove($heap->{alias}); |
209
|
0
|
|
|
|
|
0
|
$kernel->alarm_remove_all(); |
210
|
|
|
|
|
|
|
|
211
|
0
|
0
|
|
|
|
0
|
delete $heap->{pollinterval} if ($heap->{pollinterval}); |
212
|
0
|
0
|
|
|
|
0
|
delete $heap->{job_queue} if ($heap->{job_queue}); |
213
|
|
|
|
|
|
|
|
214
|
0
|
|
|
|
|
0
|
$heap->{stopped} = 1; |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
# Attempt to fill empty worker slots. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
# This is a token for ARG0 that signifies this was a timed event. |
220
|
|
|
|
|
|
|
sub TIMED () { 31415 } |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
sub poco_jobqueue_active_dequeue { |
223
|
7
|
|
|
7
|
0
|
3857
|
my ($kernel, $heap, $is_timed) = @_[KERNEL, HEAP, ARG0]; |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
# If this is a poll from a timed event, then decrement the pending |
226
|
|
|
|
|
|
|
# polls count. The pending polls count is just to ensure that |
227
|
|
|
|
|
|
|
# redundant delays are not set, because each redundant delay would |
228
|
|
|
|
|
|
|
# force the existing one forward in time. They would delay polling |
229
|
|
|
|
|
|
|
# past the hard polling interval, which would probably be bad (and |
230
|
|
|
|
|
|
|
# could stave off polling indefinitely in some instances). I think |
231
|
|
|
|
|
|
|
# this is a bit of a hack, and something better should replace it. |
232
|
|
|
|
|
|
|
|
233
|
7
|
50
|
33
|
|
|
40
|
if (defined $is_timed and $is_timed == TIMED) { |
234
|
|
|
|
|
|
|
# Decrement the number of pending polls. There can be only one, |
235
|
|
|
|
|
|
|
# so throw in a die for assertion testing. |
236
|
0
|
0
|
|
|
|
0
|
die "pending polls should now be zero (not $heap->{pending_polls})" |
237
|
|
|
|
|
|
|
if --$heap->{pending_polls}; |
238
|
|
|
|
|
|
|
} |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
# Attempt to fill the empty worker slots. |
241
|
7
|
|
66
|
|
|
84
|
while ( |
242
|
|
|
|
|
|
|
not $heap->{stopped} |
243
|
|
|
|
|
|
|
and $heap->{worker_count} < $heap->{worker_limit} |
244
|
|
|
|
|
|
|
) { |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
# Call the worker to fetch a new job and spawn a session. |
247
|
11
|
|
|
|
|
23
|
my $previous_worker_count = $heap->{worker_count}; |
248
|
11
|
|
|
|
|
45
|
$heap->{worker_ref}->( $heap->{meta_postback} ); |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
# If the worker count hasn't changed, then we've run out of jobs. |
251
|
|
|
|
|
|
|
# Begin polling, if applicable, and exit the spawn loop. |
252
|
11
|
100
|
|
|
|
1163
|
if ($heap->{worker_count} == $previous_worker_count) { |
253
|
1
|
50
|
33
|
|
|
6
|
if (defined $heap->{poll_interval} and !$heap->{pending_polls}) { |
254
|
0
|
|
|
|
|
0
|
$heap->{pending_polls}++; |
255
|
0
|
|
|
|
|
0
|
$kernel->delay( dequeue => $heap->{poll_interval} => TIMED ); |
256
|
|
|
|
|
|
|
} |
257
|
1
|
50
|
|
|
|
5
|
$heap->{latest_worker}++ unless defined $heap->{poll_interval}; |
258
|
1
|
|
|
|
|
4
|
last; |
259
|
|
|
|
|
|
|
} |
260
|
|
|
|
|
|
|
} |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
# Attempt to fill empty worker slots. |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
sub poco_jobqueue_passive_dequeue { |
266
|
20
|
|
|
20
|
0
|
5386
|
my ($kernel, $heap) = @_[KERNEL, HEAP]; |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
# Attempt to fill the empty worker slots. |
269
|
20
|
|
66
|
|
|
163
|
while ( |
270
|
|
|
|
|
|
|
not $heap->{stopped} |
271
|
|
|
|
|
|
|
and $heap->{worker_count} < $heap->{worker_limit} |
272
|
|
|
|
|
|
|
) { |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
# Try to fetch another job from the queue. |
275
|
15
|
|
|
|
|
421
|
my $next_job = shift @{ $heap->{job_queue} }; |
|
15
|
|
|
|
|
41
|
|
276
|
15
|
100
|
|
|
|
53
|
last unless defined $next_job; |
277
|
|
|
|
|
|
|
|
278
|
10
|
|
|
|
|
13
|
DEBUG and |
279
|
|
|
|
|
|
|
warn "JQ: job queue $heap->{alias} is starting a new worker"; |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
# Start a new session with the job. |
282
|
10
|
|
|
|
|
46
|
$heap->{worker_ref}->( @$next_job ); |
283
|
|
|
|
|
|
|
} |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
# Avoid accidentally returning something. |
286
|
20
|
|
|
|
|
658
|
undef; |
287
|
|
|
|
|
|
|
} |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
# Enqueue a job in a passive queue. |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
sub poco_jobqueue_passive_enqueue { |
292
|
10
|
|
|
10
|
0
|
1544
|
my ($kernel, $sender, $heap, $return_state, @job) = |
293
|
|
|
|
|
|
|
@_[KERNEL, SENDER, HEAP, ARG0..$#_]; |
294
|
|
|
|
|
|
|
|
295
|
10
|
50
|
|
|
|
28
|
if ($heap->{stopped}) { |
296
|
0
|
|
|
|
|
0
|
DEBUG and warn( |
297
|
|
|
|
|
|
|
"JQ: $heap->{alias} can not enqueue new jobs after 'stop'\n" |
298
|
|
|
|
|
|
|
); |
299
|
|
|
|
|
|
|
|
300
|
0
|
|
|
|
|
0
|
return; |
301
|
|
|
|
|
|
|
} |
302
|
|
|
|
|
|
|
|
303
|
10
|
|
|
|
|
10
|
DEBUG and warn "JQ: job queue $heap->{alias} enqueuing a new job"; |
304
|
|
|
|
|
|
|
|
305
|
10
|
|
|
|
|
10
|
my $postback; |
306
|
10
|
50
|
|
|
|
21
|
if (defined $return_state) { |
307
|
10
|
|
|
|
|
34
|
$postback = $sender->postback( $return_state, @job ); |
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
# Add the job to the queue. Use the prioritizer to find the right |
311
|
|
|
|
|
|
|
# place to put it. |
312
|
|
|
|
|
|
|
|
313
|
10
|
|
|
|
|
562
|
my $queue_index = @{ $heap->{job_queue} }; |
|
10
|
|
|
|
|
19
|
|
314
|
10
|
|
|
|
|
25
|
while ($queue_index--) { |
315
|
|
|
|
|
|
|
last if |
316
|
9
|
50
|
|
|
|
26
|
$heap->{prioritizer}->( $heap->{job_queue}->[$queue_index], |
317
|
|
|
|
|
|
|
\@job, |
318
|
|
|
|
|
|
|
) >= 0; |
319
|
|
|
|
|
|
|
} |
320
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
# Place the new job after the index we found. |
322
|
10
|
|
|
|
|
13
|
splice( @{$heap->{job_queue}}, $queue_index+1, 0, [ $postback, @job ] ); |
|
10
|
|
|
|
|
31
|
|
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
# Dequeue a new event. |
325
|
10
|
|
|
|
|
30
|
$kernel->yield( 'dequeue' ); |
326
|
|
|
|
|
|
|
} |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
1; |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
__END__ |