File Coverage

blib/lib/Job/Machine/Worker.pm
Criterion Covered Total %
statement 9 68 13.2
branch 0 4 0.0
condition 0 20 0.0
subroutine 3 21 14.2
pod 12 12 100.0
total 24 125 19.2


line stmt bran cond sub pod time code
1             package Job::Machine::Worker;
2             $Job::Machine::Worker::VERSION = '0.26';
3 1     1   19421 use strict;
  1         3  
  1         26  
4 1     1   4 use warnings;
  1         2  
  1         25  
5              
6 1     1   4 use base 'Job::Machine::Base';
  1         1  
  1         489  
7              
8             sub reply {
9 0     0 1   my ($self,$data,$queue) = @_;
10 0           my $db = $self->db;
11 0   0       $queue ||= $self->{queue};
12 0           $self->result($data,$queue);
13 0           my $task_id = $db->task_id;
14             ## Payload: Status of result, result id...
15 0           $db->notify(queue => $task_id, reply => 1);
16 0           return $task_id;
17             }
18              
19             sub result {
20 0     0 1   my ($self,$data,$queue) = @_;
21 0   0       $queue ||= $self->{queue};
22 0           $self->db->insert_result($data,$queue);
23 0           $self->db->set_task_status(200);
24             }
25              
26             sub error_result {
27 0     0 1   my ($self,$data,$queue) = @_;
28 0   0       $queue ||= $self->{queue};
29 0           $self->db->insert_result($data,$queue);
30             }
31              
32             sub receive {
33 0     0 1   my $self = shift;
34 0           $self->startup;
35 0           my $db = $self->{db};
36 0           $self->_init_chores;
37 0           $self->subscribe($self->{queue});
38 0           $self->_check_queue($self->{queue});
39 0   0       while ($self->keep_running && (my $notifies = $db->set_listen($self->timeout))) {
40 0           my ($queue,$pid) = @$notifies;
41 0 0 0       $self->_do_chores() && next unless $queue;
42              
43 0           $self->_check_queue($self->{queue});
44             }
45 0           return;
46             };
47              
48             sub _check_queue {
49 0     0     my $self = shift;
50 0           my $db = $self->{db};
51 0           while (my $task = $self->db->fetch_work_task) {
52             ## log process call
53 0           $self->process($task);
54             }
55             }
56              
57              
58             sub _init_chores {
59 0     0     my $self = shift;
60 0           my $db = $self->{db};
61             my @chores = (
62             sub {
63 0     0     my $self = shift;
64 0   0       my $number = $db->revive_tasks($self->max_runtime) || 0;
65 0           $self->job_log("Revived tasks: $number");
66             },
67             sub {
68 0     0     my $self = shift;
69 0   0       my $number = $db->fail_tasks($self->retries) || 0;
70 0           $self->job_log("Failed tasks: $number");
71             },
72             sub {
73 0     0     my $self = shift;
74 0   0       my $number = $db->remove_tasks($self->remove_after) || 0;
75 0           $self->job_log("Removed tasks: $number");
76             },
77 0           );
78 0           push @{ $self->{chores} }, @chores;
  0            
79             }
80              
81              
82             sub add_chore {
83 0     0 1   my ($self, $chore) = @_ ;
84 0 0         return unless ref $chore eq 'CODE';
85              
86 0           push @{ $self->{chores} }, $chore;
  0            
87             }
88              
89             sub _do_chores {
90 0     0     my $self = shift;
91 0           my $chores = $self->{chores};
92 0           my $idx = int(rand(@{ $chores }));
  0            
93 0           my $chore = $chores->[$idx];
94 0           $self->$chore;
95             }
96              
97       0 1   sub startup {}
98              
99 0     0 1   sub process {die 'Subclasss me!'}
100              
101 0     0 1   sub max_runtime {return 30*60}
102              
103 0     0 1   sub timeout {return 300}
104              
105 0     0 1   sub retries {return 3}
106              
107 0     0 1   sub remove_after {return 30}
108              
109 0     0 1   sub keep_running {return 1}
110              
111             1;
112              
113             __END__