File Coverage

blib/lib/cPanel/TaskQueue.pm
Criterion Covered Total %
statement 357 411 86.8
branch 117 164 71.3
condition 21 40 52.5
subroutine 66 70 94.2
pod 32 32 100.0
total 593 717 82.7


line stmt bran cond sub pod time code
1             package cPanel::TaskQueue;
2             {
3             $cPanel::TaskQueue::VERSION = '0.606';
4             }
5              
6             # cpanel - cPanel/TaskQueue.pm Copyright(c) 2014 cPanel, Inc.
7             # All rights Reserved.
8             # copyright@cpanel.net http://cpanel.net
9             #
10             # Redistribution and use in source and binary forms, with or without
11             # modification, are permitted provided that the following conditions are met:
12             # * Redistributions of source code must retain the above copyright
13             # notice, this list of conditions and the following disclaimer.
14             # * Redistributions in binary form must reproduce the above copyright
15             # notice, this list of conditions and the following disclaimer in the
16             # documentation and/or other materials provided with the distribution.
17             # * Neither the name of the owner nor the names of its contributors may
18             # be used to endorse or promote products derived from this software
19             # without specific prior written permission.
20             #
21             # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22             # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23             # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24             # DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY
25             # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26             # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27             # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28             # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29             # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30             # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31              
32             # This module handles queuing of tasks for execution. The queue is persistent
33             # handles consolidating of duplicate tasks.
34              
35             # ABSTRACT: Manage a FIFO queue of tasks to perform.
36              
37 34     34   792831 use strict;
  34         166  
  34         1278  
38              
39             #use warnings;
40 34     34   32467 use cPanel::TaskQueue::Task();
  34         100  
  34         914  
41 34     34   24153 use cPanel::TaskQueue::Processor();
  34         93  
  34         687  
42 34     34   36813 use cPanel::StateFile ();
  34         123  
  34         263075  
43              
44             my $WNOHANG;
45             if ( !exists $INC{'POSIX.pm'} ) {
46              
47             # If POSIX is not already loaded, try for CPanel's tiny module.
48             ## no critic (ProhibitStringyEval)
49 34     34   33203 eval 'local $SIG{__DIE__} = "DEFAULT";
  0         0  
  0         0  
50             use cPanel::POSIX::Tiny 0.8; #issafe
51             $WNOHANG = &cPanel::POSIX::Tiny::WNOHANG;';
52             }
53             if ( !$WNOHANG ) {
54             ## no critic (ProhibitStringyEval)
55 34     34   39658 eval 'use POSIX ();
  34         400572  
  34         1117  
56             $WNOHANG = &POSIX::WNOHANG;';
57             }
58              
59             # -----------------------------------------------------------------------------
60             # Policy code: The following allows is a little weird because its intent is to
61             # change the policy by which some code is executed, without adding a gratuitous
62             # object and polymorphism into the mix.
63              
64             my $are_policies_set = 0;
65             my $the_serializer;
66              
67             #
68             # This method allows changing the policies for logging and locking.
69             sub import {
70 24     24   1667 my $class = shift;
71 24 50       1518 die 'Not an even number of arguments to the ' . __PACKAGE__ . " module\n" if @_ % 2;
72 24 100       138 die "Policies already set elsewhere\n" if $are_policies_set;
73 22 100       10496 return 1 unless @_; # Don't set the policies flag.
74              
75 3         10 while (@_) {
76 4         15 my ( $policy, $module ) = splice( @_, 0, 2 );
77 4         8 my @methods = ();
78 4         5 my @sf_policies;
79 4 100       15 if ( '-logger' eq $policy ) {
    50          
80 3         23 cPanel::StateFile->import( '-logger' => $module );
81             }
82             elsif ( '-serializer' eq $policy ) {
83 1         3 _load_serializer_module($module);
84 1         6 $the_serializer = $module;
85             }
86             else {
87 0         0 die "Unrecognized policy '$policy'\n";
88             }
89             }
90 3         7 $are_policies_set = 1;
91 3         3643 return 1;
92             }
93              
94             sub _load_serializer_module {
95 1     1   3 my ($module) = @_;
96 1 50       4 die "Supplied serializer must be a module name.\n" if ref $module;
97 1 50       8 die "'$module' does not look like a serializer" unless $module =~ m{^\w+(?:::\w+)*$};
98 1         59 eval "use $module;"; ## no critic (ProhibitStringyEval)
99 1 50       6 die $@ if $@;
100 1 50       6 die "Supplied serializer module '$module' does not support the correct interface."
101             unless _valid_serializer($module);
102 1         3 return;
103             }
104              
105             sub _valid_serializer {
106 1     1   3 my ($serializer) = @_;
107 1         3 foreach my $method (qw/load save filename/) {
108 3 50       5 return unless eval { $serializer->can($method) };
  3         25  
109             }
110 1         9 return 1;
111             }
112              
113             sub _get_serializer {
114 30 100   30   174 unless ( defined $the_serializer ) {
115 17     18   1557 eval 'use cPanel::TQSerializer::Storable;'; ## no crititc (ProhibitStringyEval)
  18         14324  
  18         53  
  18         375  
116 17 50       102 cPanel::StateFile->_throw(@_) if $@;
117 17         65 $the_serializer = 'cPanel::TQSerializer::Storable';
118             }
119 30         207 return $the_serializer;
120             }
121              
122             # Replacement for List::Util::first, so I don't need to bring in the whole module.
123             sub _first (&@) { ## no critic(ProhibitSubroutinePrototypes)
124 155     155   234 my $pred = shift;
125 155         214 local $_;
126 155         358 foreach (@_) {
127 289 100       708 return $_ if $pred->();
128             }
129 128         454 return;
130             }
131              
132             # Namespace string used when creating task ids.
133             my $taskqueue_uuid = 'TaskQueue';
134              
135             {
136              
137             # Class-wide definition of the valid processors
138             my %valid_processors;
139             my $FILETYPE = 'TaskQueue'; # Identifier at the beginning of the state file
140             my $CACHE_VERSION = 3; # Cache file version number.
141              
142             # State File
143             #
144 6     6 1 3864 sub get_name { return $_[0]->{queue_name}; }
145 3     3 1 931 sub get_default_timeout { return $_[0]->{default_task_timeout}; }
146 3     3 1 15 sub get_max_timeout { return $_[0]->{max_task_timeout}; }
147 3     3 1 16 sub get_max_running { return $_[0]->{max_in_process}; }
148 3     3 1 17 sub get_default_child_timeout { return $_[0]->{default_child_timeout}; }
149              
150             # Processing pausing
151             sub pause_processing {
152 1     1 1 2 my ($self) = @_;
153 1         6 my $guard = $self->{disk_state}->synch();
154 1         4 $self->{paused} = 1;
155 1         8 $guard->update_file();
156 1         4 return;
157             }
158              
159             sub resume_processing {
160 1     1 1 3 my ($self) = @_;
161 1         5 my $guard = $self->{disk_state}->synch();
162 1         3 $self->{paused} = 0;
163 1         6 $guard->update_file();
164 1         4 return;
165             }
166 3   100 3 1 33 sub is_paused { return $_[0]->{paused} || 0; }
167              
168             # --------------------------------------
169             # Class methods
170              
171             sub register_task_processor {
172 67     67 1 4006 my ( $class, $command, $processor ) = @_;
173              
174 67 100 100     664 unless ( defined $command and length $command ) {
175 2         15 cPanel::StateFile->_throw("Missing command in register_task_processor.\n");
176             }
177 65 100       221 unless ( defined $processor ) {
178 1         6 cPanel::StateFile->_throw("Missing task processor in register_task_processor.\n");
179             }
180 64 100       232 if ( exists $valid_processors{$command} ) {
181 1         8 cPanel::StateFile->_throw("Command '$command' already has a TaskQueue::Processor registered.\n");
182             }
183 63 100       275 if ( 'CODE' eq ref $processor ) {
    100          
184 50         10699 $valid_processors{$command} = cPanel::TaskQueue::Processor::CodeRef->new( { code => $processor } );
185 50         212 return 1;
186             }
187 13         139 elsif ( eval { $processor->isa('cPanel::TaskQueue::Processor') } ) {
188 12         41 $valid_processors{$command} = $processor;
189 12         43 return 1;
190             }
191              
192 1         7 cPanel::StateFile->_throw("Unrecognized task processor object.\n");
193             }
194              
195             sub unregister_task_processor {
196 5     5 1 1702 my ( $class, $command ) = @_;
197              
198 5 100 100     38 unless ( defined $command and length $command ) {
199 2         11 cPanel::StateFile->_throw("Missing command in unregister_task_processor.\n");
200             }
201 3 100       15 unless ( exists $valid_processors{$command} ) {
202 1         8 cPanel::StateFile->_throw("Command '$command' not registered, ignoring.\n");
203 0         0 return;
204             }
205              
206 2         21 delete $valid_processors{$command};
207 2         11 return 1;
208             }
209              
210             # Initialize parameters.
211             sub new {
212 33     33 1 29299 my ( $class, $args_ref ) = @_;
213 33 100       424 cPanel::StateFile->_throw("Args parameter must be a hash reference\n") unless 'HASH' eq ref $args_ref;
214              
215             # Deprecate the cache_dir argument, replace with state_dir
216 32 50 0     149 $args_ref->{state_dir} ||= $args_ref->{cache_dir} if exists $args_ref->{cache_dir};
217 32 100       150 cPanel::StateFile->_throw("No state directory supplied.\n") unless exists $args_ref->{state_dir};
218 31 100       142 cPanel::StateFile->_throw("No queue name supplied.\n") unless exists $args_ref->{name};
219              
220 30         56 my $serializer;
221 30 50       128 if ( defined $args_ref->{serial} ) {
222 0         0 _load_serializer_module( $args_ref->{serial} );
223 0         0 $serializer = $args_ref->{serial};
224             }
225 30   33     319 $serializer ||= _get_serializer();
226              
227             # TODO: Do I want to sanity check the arguments?
228 30         342 my $self = bless {
229             queue_name => $args_ref->{name},
230             default_task_timeout => 60,
231             max_task_timeout => 300,
232             max_in_process => 2,
233             default_child_timeout => 3600,
234             disk_state_file => $serializer->filename("$args_ref->{state_dir}/$args_ref->{name}_queue"),
235             next_id => 1,
236             queue_waiting => [],
237             processing_list => [],
238             deferral_queue => [],
239             disk_state => undef,
240             defer_obj => undef,
241             paused => 0,
242             serializer => $serializer,
243             }, $class;
244              
245             # Make a disk file to track the object.
246 30 50       438 my $state_args = {
    50          
247             state_file => $self->{disk_state_file}, data_obj => $self,
248             exists $args_ref->{state_timeout} ? ( timeout => $args_ref->{state_timeout} ) : (),
249             exists $args_ref->{logger} ? ( logger => $args_ref->{logger} ) : (),
250             };
251             eval {
252 30         291 $self->{disk_state} = cPanel::StateFile->new($state_args);
253 26         179 1;
254 30 100       77 } or do {
255 4   50     15 my $ex = $@ || 'Unreocognized exception.';
256              
257             # If not a loading error, rethrow.
258 4 50       46 cPanel::StateFile->_throw($ex) unless $ex =~ /Not a recognized|Invalid version/;
259 4         29 cPanel::StateFile->_warn($ex);
260 4         47 cPanel::StateFile->_warn("Moving bad state file and retry.\n");
261 4         59 cPanel::StateFile->_notify(
262             'Unable to load TaskQueue metadata',
263             "Loading of [$self->{disk_state_file}] failed: $ex\n" . "Moving bad file to [$self->{disk_state_file}.broken] and retrying.\n"
264             );
265 4         162 unlink "$self->{disk_state_file}.broken";
266 4         240 rename $self->{disk_state_file}, "$self->{disk_state_file}.broken";
267              
268 4         18 $self->{disk_state} = cPanel::StateFile->new($state_args);
269             };
270              
271             # Use incoming parameters to override what's in the file.
272 30 100       84 if ( grep { exists $args_ref->{$_} } qw/default_timeout max_timeout max_running default_child_timeout/ ) {
  120         322  
273 2         9 my $guard = $self->{disk_state}->synch();
274 2         2 my $altered;
275 2         13 for my $settings (
276             [qw(default_task_timeout default_timeout)],
277             [qw(max_task_timeout max_timeout)],
278             [qw(max_in_process max_running)],
279             [qw(default_child_timeout default_child_timeout)],
280             ) {
281 8         10 my ( $internal_name, $arg_name ) = @$settings;
282 8 50 33     45 if ( exists $args_ref->{$arg_name} && $self->{$internal_name} ne $args_ref->{$arg_name} ) {
283 8         15 $self->{$internal_name} = $args_ref->{$arg_name};
284 8         15 ++$altered;
285             }
286             }
287 2 50       14 $guard->update_file() if $altered;
288             }
289              
290 30         145 return $self;
291             }
292              
293             sub throw {
294 14     14 1 31 my $self = shift;
295 14 100       123 return $self->{disk_state} ? $self->{disk_state}->throw(@_) : cPanel::StateFile->_throw(@_);
296             }
297              
298             sub warn {
299 1     1 1 2 my $self = shift;
300 1 50       9 return $self->{disk_state} ? $self->{disk_state}->warn(@_) : warn @_;
301             }
302              
303             sub info {
304 1     1 1 3 my $self = shift;
305 1 50       9 return $self->{disk_state} ? $self->{disk_state}->info(@_) : undef;
306             }
307              
308             # -------------------------------------------------------
309             # Pseudo-private methods. Should not be called except under unusual circumstances.
310             sub _serializer {
311 181     181   248 my ($self) = @_;
312 181         2249 return $self->{serializer};
313             }
314              
315             sub _state_file {
316 1     1   3 my ($self) = @_;
317 1         6 return $self->{disk_state_file};
318             }
319              
320             # -------------------------------------------------------
321             # Public methods
322             sub load_from_cache {
323 9     9 1 19 my ( $self, $fh ) = @_;
324              
325 9         36 local $/;
326 9         33 my ( $magic, $version, $meta ) = $self->_serializer()->load($fh);
327              
328 9 100 100     232 $self->throw('Not a recognized TaskQueue state file.') unless defined $magic and $magic eq $FILETYPE;
329 7 100 66     53 $self->throw('Invalid version of TaskQueue state file.') unless defined $version and $version eq $CACHE_VERSION;
330              
331             # Next id should continue increasing.
332             # (We might want to deal with wrap-around at some point.)
333 5 100       22 $self->{next_id} = $meta->{nextid} if $meta->{nextid} > $self->{next_id};
334              
335             # TODO: Add more sanity checks here.
336 5 50       23 $self->{default_task_timeout} = $meta->{def_task_to} if $meta->{def_task_to} > 0;
337 5 50       35 $self->{max_task_timeout} = $meta->{max_task_to} if $meta->{max_task_to} > 0;
338 5 50       20 $self->{max_in_process} = $meta->{max_running} if $meta->{max_running} > 0;
339 5 50       20 $self->{default_child_timeout} = $meta->{def_child_to} if $meta->{def_child_to} > 0;
340 5 50 33     33 $self->{paused} = 1 if exists $meta->{paused} && $meta->{paused};
341 5 50       21 $self->{defer_obj} = exists $meta->{defer_obj} ? $meta->{defer_obj} : undef;
342              
343             # Clean queues that have been read from disk.
344 5         17 $self->{queue_waiting} = _clean_task_list( $meta->{waiting_queue} );
345 5         25 $self->{processing_list} = _clean_task_list( $meta->{processing_queue} );
346 5         12 $self->{deferral_queue} = _clean_task_list( $meta->{deferral_queue} );
347              
348 5         30 return 1;
349             }
350              
351             sub _clean_task_list {
352 15     15   21 my ($task_list) = @_;
353 15 50       33 return [] unless defined $task_list;
354             return [
355             grep {
356             defined $_
357 3 50       11 and eval { $_->isa('cPanel::TaskQueue::Task') }
  3         24  
358             } map {
359 3         5 eval { cPanel::TaskQueue::Task->reconstitute($_) }
  3         23  
  15         41  
360 15         18 } @{$task_list}
361             ];
362             }
363              
364             sub save_to_cache {
365 172     172 1 461 my ( $self, $fh ) = @_;
366              
367 172 100       2257 my $meta = {
368             nextid => $self->{next_id},
369             def_task_to => $self->{default_task_timeout},
370             max_task_to => $self->{max_task_timeout},
371             max_running => $self->{max_in_process},
372             def_child_to => $self->{default_child_timeout},
373             waiting_queue => $self->{queue_waiting},
374             processing_queue => $self->{processing_list},
375             deferral_queue => $self->{deferral_queue},
376             paused => ( $self->{paused} ? 1 : 0 ),
377             defer_obj => $self->{defer_obj},
378             };
379 172         694 return $self->_serializer()->save( $fh, $FILETYPE, $CACHE_VERSION, $meta );
380             }
381              
382             sub queue_task {
383 99     99 1 5364 my ( $self, $command ) = @_;
384              
385 99 50       282 $self->throw('Cannot queue an empty command.') unless defined $command;
386              
387 99 100       148 if ( eval { $command->isa('cPanel::TaskQueue::Task') } ) {
  99         1069  
388 1 50       5 if ( 0 == $command->retries_remaining() ) {
389 1         5 $self->info('Task with 0 retries not queued.');
390 1         11 return;
391             }
392 0         0 my $task = $command->mutate( { timeout => $self->{default_child_timeout} } );
393 0         0 return $self->_queue_the_task($task);
394             }
395              
396             # must have non-space characters to be a command.
397 98 100       512 $self->throw('Cannot queue an empty command.') unless $command =~ /\S/;
398              
399 96         1272 my $task = cPanel::TaskQueue::Task->new(
400             {
401             cmd => $command,
402             nsid => $taskqueue_uuid,
403             id => $self->{next_id}++,
404             timeout => $self->{default_child_timeout},
405             }
406             );
407 96         513 return $self->_queue_the_task($task);
408             }
409              
410             sub unqueue_task {
411 15     15 1 1236 my ( $self, $uuid ) = @_;
412              
413 15 100       45 unless ( _is_valid_uuid($uuid) ) {
414 2         8 $self->throw('No Task uuid argument passed to unqueue_cmd.');
415             }
416              
417             # Lock the queue before we begin accessing it.
418 13         57 my $guard = $self->{disk_state}->synch();
419 13         24 my $old_count = @{ $self->{queue_waiting} };
  13         32  
420              
421 13         21 $self->{queue_waiting} = [ grep { $_->uuid() ne $uuid } @{ $self->{queue_waiting} } ];
  35         96  
  13         28  
422              
423             # All changes complete, save to disk.
424 13         126 $guard->update_file();
425 13         22 return $old_count > @{ $self->{queue_waiting} };
  13         67  
426             }
427              
428             sub _is_task_in_list {
429 20     20   55 my ( $self, $uuid, $list, $subname ) = @_;
430              
431 20 100       61 unless ( _is_valid_uuid($uuid) ) {
432 4         15 $self->throw("No Task uuid argument passed to $subname.");
433             }
434              
435             # Update from disk, but don't worry about lock. Information only.
436 16         199 $self->{disk_state}->synch();
437 16     21   88 return defined _first { $_->uuid() eq $uuid } @{ $self->{$list} };
  21         71  
  16         66  
438             }
439              
440 17     17 1 5161 sub is_task_queued { return $_[0]->_is_task_in_list( $_[1], 'queue_waiting', 'is_task_queued' ); }
441 3     3 1 1106 sub is_task_processing { return $_[0]->_is_task_in_list( $_[1], 'processing_list', 'is_task_processing' ); }
442 0     0 1 0 sub is_task_deferred { return $_[0]->_is_task_in_list( $_[1], 'deferral_queue', 'is_task_deferred' ); }
443              
444             sub _list_of_all_tasks {
445 16     16   25 my ($self) = @_;
446 16         20 return @{ $self->{queue_waiting} }, @{ $self->{deferral_queue} }, @{ $self->{processing_list} };
  16         35  
  16         29  
  16         58  
447             }
448              
449             sub find_task {
450 5     5 1 16 my ( $self, $uuid ) = @_;
451              
452             # Update from disk, but don't worry about lock. Information only.
453 5         20 $self->{disk_state}->synch();
454 5     14   25 my $task = _first { $_->uuid() eq $uuid } $self->_list_of_all_tasks();
  14         51  
455              
456 5 100       29 return unless defined $task;
457 4         15 return $task->clone();
458             }
459              
460             sub find_command {
461 7     7 1 3048 my ( $self, $command ) = @_;
462              
463             # Update from disk, but don't worry about lock. Information only.
464 7         28 $self->{disk_state}->synch();
465 7     24   42 my $task = _first { $_->command() eq $command } $self->_list_of_all_tasks();
  24         62  
466              
467 7 100       42 return unless defined $task;
468 5         19 return $task->clone();
469             }
470              
471             sub find_commands {
472 4     4 1 9 my ( $self, $command ) = @_;
473              
474             # Update from disk, but don't worry about lock. Information only.
475 4         15 $self->{disk_state}->synch();
476 4         16 my @tasks = grep { $_->command() eq $command } $self->_list_of_all_tasks();
  24         57  
477              
478 4 100       19 return unless @tasks;
479 3         33 return map { $_->clone() } @tasks;
  4         13  
480             }
481              
482             sub _how_many {
483 118     118   436 my ( $self, $listname ) = @_;
484              
485             # Update from disk, but don't worry about lock. Information only.
486 118         471 $self->{disk_state}->synch();
487 118         186 return scalar @{ $self->{$listname} };
  118         1461  
488             }
489              
490 48     48 1 812 sub how_many_queued { return $_[0]->_how_many('queue_waiting'); }
491 35     35 1 148 sub how_many_deferred { return $_[0]->_how_many('deferral_queue'); }
492 35     35 1 529 sub how_many_in_process { return $_[0]->_how_many('processing_list'); }
493              
494             sub has_work_to_do {
495 7     7 1 44 my ($self) = @_;
496              
497             # Update from disk, but don't worry about lock. Possibly information only.
498 7         56 $self->{disk_state}->synch();
499 7         105 $self->_clean_completed_tasks();
500              
501             # If we are paused, there is no work to do.
502 7 100       77 return if $self->{paused};
503              
504 6   66     57 return scalar( @{ $self->{processing_list} } ) < $self->{max_in_process} && 0 != @{ $self->{queue_waiting} };
505             }
506              
507             sub peek_next_task {
508 14     14 1 645 my ($self) = @_;
509              
510             # Update from disk, but don't worry about lock. Information only.
511 14         56 $self->{disk_state}->synch();
512 14 100       19 return unless @{ $self->{queue_waiting} };
  14         60  
513              
514 13         79 return $self->{queue_waiting}->[0]->clone();
515             }
516              
517             sub process_next_task {
518 33     33 1 151 my ($self) = @_;
519              
520             # Lock the queue before doing any manipulations.
521 33         176 my $guard = $self->{disk_state}->synch();
522              
523 33         694 $self->_handle_already_running_tasks($guard);
524              
525 33 50   86   314 if ( _first { !defined $_ } @{ $self->{queue_waiting} } ) {
  86         301  
  33         150  
526              
527             # Somehow some undefined tasks got into the queue, log and
528             # delete them.
529 0         0 $self->warn('Undefined tasks found in the queue, removing...');
530 0         0 $self->{queue_waiting} = [ grep { defined $_ } @{ $self->{queue_waiting} } ];
  0         0  
  0         0  
531              
532             # Since we've changed the wait queue, we need to update disk file,
533             # otherwise changes could be lost if we return early, below.
534 0         0 $guard->update_file();
535             }
536              
537             # If we are paused, there is no work to do.
538 33 100       173 return 1 if $self->{paused};
539              
540 32         355 my ( $task, $processor );
541 32         90 while ( !$task ) {
542              
543             # We can now schedule new tasks
544 45 100       56 return 1 unless @{ $self->{queue_waiting} };
  45         172  
545 36         53 $task = shift @{ $self->{queue_waiting} };
  36         161  
546              
547             # can fail if the processor for this command was removed.
548 36         84 $processor = _get_task_processor($task);
549 36 100       116 unless ($processor) {
550              
551             # TODO: log missing processor.
552 1         6 $self->warn( q{No processor found for '} . $task->full_command() . q{'.} );
553 1         16 $guard->update_file();
554 1         26 return 1;
555             }
556              
557             # Check for deferrals.
558 35 100       443 if ( $processor->is_task_deferred( $task, $self->{defer_obj} ) ) {
559 13         16 unshift @{ $self->{deferral_queue} }, $task;
  13         31  
560 13         42 $task = undef;
561             }
562             }
563              
564 22         94 $task->begin();
565 22         29 push @{ $self->{processing_list} }, $task;
  22         54  
566 22         85 $self->_add_task_to_deferral_object( $task, $processor );
567              
568             # Finished making changes, save to disk.
569 22         83 $guard->update_file();
570              
571             # I don't want to stay locked while processing.
572 22         42 my $pid;
573             my $ex;
574             $guard->call_unlocked(
575             sub {
576 22     22   44 my $orig_alarm;
577             eval {
578 22         580 local $SIG{'ALRM'} = sub { die "time out reached\n"; };
  0         0  
579 22         163 $orig_alarm = alarm( $self->_timeout($processor) );
580 22         204 $pid = $processor->process_task( $task->clone(), $self->{disk_state}->get_logger() );
581 17         2157 alarm $orig_alarm;
582 17         1761 1;
583 22 50       34 } or do {
584 0         0 $ex = $@; # save exception for later
585 0         0 alarm $orig_alarm;
586             };
587             }
588 22         196 );
589              
590             # Deal with a child process or remove from processing.
591 17 100       424 if ($pid) {
592 15         365 $task->set_pid($pid);
593             }
594             else {
595 2         11 my $uuid = $task->uuid();
596              
597             # remove finished item from the list.
598 2         4 $self->{processing_list} = [ grep { $_->uuid() ne $uuid } @{ $self->{processing_list} } ];
  2         7  
  2         5  
599 2         9 $self->_remove_task_from_deferral_object();
600             }
601              
602             # Don't lose any exceptions.
603 17 50       88 if ($ex) {
604 0 0       0 if ( $ex eq "time out reached\n" ) {
605              
606             # TODO: log timeout condition.
607 0         0 $self->warn( q{Task '} . $task->full_command() . q{' timed out during processing.} );
608             }
609             else {
610 0         0 $self->throw($ex);
611             }
612             }
613              
614             # Finished making changes, save to disk.
615 17         158 $guard->update_file();
616 17         299 return $pid == 0;
617             }
618              
619             sub finish_all_processing {
620 0     0 1 0 my ($self) = @_;
621              
622             # Lock the queue for manipulation and to reduce new task items.
623 0         0 my $guard = $self->{disk_state}->synch();
624 0         0 while ( @{ $self->{processing_list} } ) {
  0         0  
625              
626             # we still need to remove some
627 0         0 my $pid;
628              
629             # TODO: Might want to deal with timeouts or something to keep this
630             # from waiting forever.
631 0     0   0 $guard->call_unlocked( sub { $pid = waitpid( -1, 0 ) } );
  0         0  
632              
633 0 0       0 next unless $pid;
634 0         0 $self->{processing_list} = [
635 0 0       0 grep { 0 == waitpid( $_->pid(), $WNOHANG ) }
636 0         0 grep { $_->pid() && $_->pid() != $pid } @{ $self->{processing_list} }
  0         0  
637             ];
638 0         0 $self->_process_deferrals();
639 0         0 $guard->update_file();
640             }
641 0         0 return;
642             }
643              
644             sub snapshot_task_lists {
645 8     8 1 27 my ($self) = @_;
646              
647             # Update from disk, but don't worry about lock. Information only.
648 8         71 $self->{disk_state}->synch();
649              
650             return {
651 2         9 waiting => [ map { $_->clone() } @{ $self->{queue_waiting} } ],
  8         31  
  8         68  
652 8         59 processing => [ map { $_->clone() } @{ $self->{processing_list} } ],
  20         78  
653 8         20 deferred => [ map { $_->clone() } @{ $self->{deferral_queue} } ],
  8         31  
654             };
655             }
656              
657             # ---------------------------------------------------------------
658             # Private Methods.
659              
660             sub _get_task_processor {
661 396     396   518 my ($task) = @_;
662 396         1219 return $valid_processors{ $task->command() };
663             }
664              
665             # Test whether the supplied task descriptor duplicates any in the queue.
666             sub _is_duplicate_command {
667 94     94   132 my ( $self, $task ) = @_;
668 94         171 my $proc = _get_task_processor($task);
669              
670 94     144   780 return defined _first { $proc->is_dupe( $task, $_ ) } reverse @{ $self->{queue_waiting} };
  144         549  
  94         333  
671             }
672              
673             sub _process_overrides {
674 94     94   155 my ( $self, $task ) = @_;
675 94         203 my $proc = _get_task_processor($task);
676              
677 94         184 $self->{queue_waiting} = [ grep { !$proc->overrides( $task, $_ ) } @{ $self->{queue_waiting} } ];
  163         953  
  94         284  
678              
679 94         663 return;
680             }
681              
682             # Perform all of the steps needed to put a task in the queue.
683             # Only queues legal commands, that are not duplicates.
684             # If successful, returns the new queue id.
685             sub _queue_the_task {
686 96     96   644 my ( $self, $task ) = @_;
687              
688             # Validate the incoming task.
689             # It must be a command we recognize, have valid parameters, and not be a duplicate.
690 96         251 my $proc = _get_task_processor($task);
691 96 100       438 unless ($proc) {
692 1         4 $self->throw( q{No known processor for '} . $task->command() . q{'.} );
693             }
694 95 100       515 unless ( $proc->is_valid_args($task) ) {
695 1         6 $self->throw( q{Requested command [} . $task->full_command() . q{] has invalid arguments.} );
696             }
697              
698             # Lock the queue here, because we begin looking what's currently in the queue
699             # and don't want it to change under us.
700 94         361 my $guard = $self->{disk_state}->synch();
701              
702             # Check overrides first and then duplicate. This seems backward, but
703             # actually is not. See the tests labelled 'override, not dupe' in
704             # t/07.task_queue_dupes_and_overrides.t for the case that makes sense.
705             #
706             # By making the task override its duplicates as well, we can get the
707             # behavior you expect when you think this is wrong. If we swap the order of
708             # the tests there's no way to force the right behavior.
709 94         323 $self->_process_overrides($task);
710 94 100       343 return if $self->_is_duplicate_command($task);
711              
712 88         300 push @{ $self->{queue_waiting} }, $task;
  88         252  
713              
714             # Changes to the queue are complete, save to disk.
715 88         302 $guard->update_file();
716              
717 88         400 return $task->uuid();
718             }
719              
720             # Use either the timeout in the processor or the default timeout,
721             # unless that is greater than the max, the use the max.
722             sub _timeout {
723 22     22   53 my ( $self, $processor ) = @_;
724              
725 22   33     204 my $timeout = $processor->get_timeout() || $self->{default_task_timeout};
726              
727 22 50       169 return $timeout > $self->{max_task_timeout} ? $self->{max_task_timeout} : $timeout;
728             }
729              
730             # Clean the processing list of any child tasks that completed since the
731             # last time we looked. The $guard object is an optional parameter. If
732             # a guard does not exist, we will create one if necessary for any locking.
733             sub _clean_completed_tasks {
734 73     73   111 my ( $self, $guard ) = @_;
735              
736 73         469 my $num_processing = @{ $self->{processing_list} };
  73         295  
737 73 100       219 return unless $num_processing;
738              
739             # Remove tasks that have already completed from the in-memory list.
740 36         111 $self->_remove_completed_tasks_from_list();
741 36 50       36 return if @{ $self->{processing_list} } == $num_processing;
  36         120  
742              
743             # Was not locked, so we need to lock and remove completed tasks again.
744 0 0       0 if ( !$guard ) {
745 0         0 $guard = $self->{disk_state}->synch();
746 0         0 $self->_remove_completed_tasks_from_list();
747             }
748 0         0 $guard->update_file();
749 0         0 return;
750             }
751              
752             # Remove child tasks that have completed executing from the processing
753             # list in memory.
754             sub _remove_completed_tasks_from_list {
755 36     36   64 my ($self) = @_;
756 36 50       55 $self->{processing_list} = [ grep { $_->pid() && 0 == waitpid( $_->pid(), $WNOHANG ) } @{ $self->{processing_list} } ];
  40         192  
  36         200  
757 36         194 $self->_process_deferrals();
758 36         49 return;
759             }
760              
761             sub _add_task_to_deferral_object {
762 22     22   47 my ( $self, $task, $processor ) = @_;
763 22 50       58 return unless $task;
764              
765 22   33     57 $processor ||= _get_task_processor($task);
766 22         91 $self->{defer_obj}->{$_} = 1 foreach $processor->deferral_tags($task);
767 22         35 return;
768             }
769              
770             sub _remove_task_from_deferral_object {
771 2     2   6 my ( $self, $task, $processor ) = @_;
772 2 50       8 return unless $task;
773              
774 0   0     0 $processor ||= _get_task_processor($task);
775 0         0 delete $self->{defer_obj}->{$_} foreach $processor->deferral_tags($task);
776 0         0 return;
777             }
778              
779             # Clean up the object that tracks deferral information.
780             # Check all tasks in the deferral queue and add them back to the waiting
781             # list if they are no longer deferred.
782             sub _process_deferrals {
783 36     36   52 my ($self) = @_;
784              
785             # clean up the current deferral object for the tasks being processed.
786 36         76 $self->{defer_obj} = {};
787 36         132 foreach my $task ( @{ $self->{processing_list} } ) {
  36         467  
788 40         214 $self->{defer_obj}->{$_} = 1 foreach _get_task_processor($task)->deferral_tags($task);
789             }
790              
791             # Separate deferred tasks from non-deferred tasks.
792 36         56 my @defer;
793             my @proc;
794 36         54 foreach my $task ( @{ $self->{deferral_queue} } ) {
  36         88  
795 36 50       72 if ( _get_task_processor($task)->is_task_deferred( $task, $self->{defer_obj} ) ) {
796 36         108 push @defer, $task;
797             }
798             else {
799              
800             # move 'no longer deferred' tasks in reverse order to processing list
801 0         0 unshift @proc, $task;
802             }
803             }
804              
805             # update queues
806 36 50       96 $self->{queue_waiting} = [ @proc, @{ $self->{queue_waiting} } ] if @proc;
  0         0  
807 36         66 $self->{deferral_queue} = \@defer;
808 36         135 return;
809             }
810              
811             # Handle the case of too many tasks being processed
812             # Are there too many in processing?
813             # check to see if any processes are complete, and remove them.
814             # Are there still too many in processing?
815             # waitpid - blocks.
816             # remove process that completed
817             # remove any other completed processes
818             sub _handle_already_running_tasks {
819 33     33   73 my ( $self, $guard ) = @_;
820              
821 33         205 $self->_clean_completed_tasks($guard);
822              
823 33         81 while ( $self->{max_in_process} <= scalar @{ $self->{processing_list} } ) {
  33         126  
824              
825             # we still need to remove some
826 0         0 my $pid;
827              
828             # TODO: Might want to deal with timeouts or something to keep this
829             # from waiting forever.
830 0     0   0 $guard->call_unlocked( sub { $pid = waitpid( -1, 0 ) } );
  0         0  
831              
832 0 0       0 next if $pid < 1;
833 0         0 $self->{processing_list} = [ grep { $_->pid() != $pid } @{ $self->{processing_list} } ];
  0         0  
  0         0  
834 0         0 $self->_process_deferrals();
835 0         0 $guard->update_file();
836             }
837 33         87 $self->_clean_completed_tasks($guard);
838 33         46 return;
839             }
840              
841             sub _is_valid_uuid {
842 35     35   197 return cPanel::TaskQueue::Task::is_valid_taskid(shift);
843             }
844             }
845              
846             # One guaranteed processor, the no-operation case.
847             __PACKAGE__->register_task_processor( 'noop', sub { } );
848              
849             1;
850              
851             __END__