File Coverage

blib/lib/Job/Machine/Worker.pm
Criterion Covered Total %
statement 9 58 15.5
branch 0 2 0.0
condition 0 20 0.0
subroutine 3 19 15.7
pod 11 11 100.0
total 23 110 20.9


line stmt bran cond sub pod time code
1             package Job::Machine::Worker;
2             $Job::Machine::Worker::VERSION = '0.22';
3 1     1   12149 use strict;
  1         1  
  1         41  
4 1     1   5 use warnings;
  1         1  
  1         23  
5              
6 1     1   4 use base 'Job::Machine::Base';
  1         0  
  1         278  
7              
8             sub reply {
9 0     0 1   my ($self,$data,$queue) = @_;
10 0           my $db = $self->db;
11 0   0       $queue ||= $self->{queue};
12 0           $self->result($data,$queue);
13 0           my $task_id = $db->task_id;
14             ## Payload: Status of result, result id...
15 0           $db->notify(queue => $task_id, reply => 1);
16 0           return $task_id;
17             }
18              
19             sub result {
20 0     0 1   my ($self,$data,$queue) = @_;
21 0   0       $queue ||= $self->{queue};
22 0           $self->db->insert_result($data,$queue);
23 0           $self->db->set_task_status(200);
24             }
25              
26             sub error_result {
27 0     0 1   my ($self,$data,$queue) = @_;
28 0   0       $queue ||= $self->{queue};
29 0           $self->db->insert_result($data,$queue);
30             }
31              
32             sub receive {
33 0     0 1   my $self = shift;
34 0           $self->startup;
35 0           my $db = $self->{db};
36 0           $self->subscribe($self->{queue});
37 0           $self->_check_queue($self->{queue});
38 0   0       while ($self->keep_running && (my $notifies = $db->set_listen($self->timeout))) {
39 0           my ($queue,$pid) = @$notifies;
40 0 0 0       $self->_do_chores() && next unless $queue;
41              
42 0           $self->_check_queue($self->{queue});
43             }
44 0           return;
45             };
46              
47             sub _check_queue {
48 0     0     my $self = shift;
49 0           my $db = $self->{db};
50 0           while (my $task = $self->db->fetch_work_task) {
51             ## log process call
52 0           $self->process($task);
53             }
54             }
55              
56             sub _do_chores {
57 0     0     my $self = shift;
58 0           my $db = $self->{db};
59             my @chores = (
60             sub {
61 0     0     my $self = shift;
62 0   0       my $number = $db->revive_tasks($self->max_runtime) || 0;
63 0           $self->log("Revived tasks: $number");
64             },
65             sub {
66 0     0     my $self = shift;
67 0   0       my $number = $db->fail_tasks($self->retries) || 0;
68 0           $self->log("Failed tasks: $number");
69             },
70             sub {
71 0     0     my $self = shift;
72 0   0       my $number = $db->remove_tasks($self->remove_after) || 0;
73 0           $self->log("Removed tasks: $number");
74             },
75 0           );
76 0           my $chore = $chores[int(rand(@chores))];
77 0           $self->$chore;
78             }
79              
80 0     0 1   sub startup {}
81              
82 0     0 1   sub process {die 'Subclasss me!'}
83              
84 0     0 1   sub max_runtime {return 30*60}
85              
86 0     0 1   sub timeout {return 300}
87              
88 0     0 1   sub retries {return 3}
89              
90 0     0 1   sub remove_after {return 30}
91              
92 0     0 1   sub keep_running {return 1}
93              
94             1;
95              
96             __END__