line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
|
2
|
|
|
|
|
|
|
package Minion::Worker::Role::Kevin; |
3
|
|
|
|
|
|
|
$Minion::Worker::Role::Kevin::VERSION = '0.7.1'; |
4
|
|
|
|
|
|
|
# ABSTRACT: Alternative Minion worker |
5
|
1
|
|
|
1
|
|
713
|
use Mojo::Base -role; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
5
|
|
6
|
|
|
|
|
|
|
|
7
|
1
|
|
|
1
|
|
688
|
use Mojo::Log; |
|
1
|
|
|
|
|
1188
|
|
|
1
|
|
|
|
|
5
|
|
8
|
1
|
|
|
1
|
|
29
|
use Mojo::Util 'steady_time'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
42
|
|
9
|
|
|
|
|
|
|
|
10
|
1
|
|
50
|
1
|
|
4
|
use constant TRACE => $ENV{KEVIN_WORKER_TRACE} || 0; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
1057
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
has 'defaults'; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
has 'log' => sub { Mojo::Log->new }; |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
sub _defaults { |
17
|
|
|
|
|
|
|
return { |
18
|
0
|
|
|
0
|
|
|
command_interval => 10, |
19
|
|
|
|
|
|
|
dequeue_timeout => 5, |
20
|
|
|
|
|
|
|
heartbeat_interval => 300, |
21
|
|
|
|
|
|
|
jobs => 4, |
22
|
|
|
|
|
|
|
queues => ['default'], |
23
|
|
|
|
|
|
|
repair_interval => 21600, |
24
|
|
|
|
|
|
|
}; |
25
|
|
|
|
|
|
|
} |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
sub _run_defaults { |
28
|
0
|
|
|
0
|
|
|
my $self = shift; |
29
|
0
|
|
0
|
|
|
|
$self->{_run_defaults} //= {%{$self->_defaults}, %{$self->{defaults} // {}}}; |
|
0
|
|
0
|
|
|
|
|
|
0
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
} |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
sub run { |
33
|
0
|
|
|
0
|
0
|
|
my ($self, @args) = @_; |
34
|
|
|
|
|
|
|
|
35
|
0
|
|
|
|
|
|
my $status = $self->status; |
36
|
0
|
|
|
|
|
|
my $defaults = $self->_run_defaults; |
37
|
|
|
|
|
|
|
|
38
|
0
|
|
0
|
|
|
|
$status->{$_} //= $defaults->{$_} for keys %$defaults; |
39
|
0
|
|
0
|
|
|
|
$status->{performed} //= 0; |
40
|
|
|
|
|
|
|
|
41
|
0
|
|
|
|
|
|
my $now = steady_time; |
42
|
0
|
0
|
|
|
|
|
$self->{next_heartbeat} = $now if $status->{heartbeat_interval}; |
43
|
0
|
0
|
|
|
|
|
$self->{next_command} = $now if $status->{command_interval}; |
44
|
0
|
0
|
|
|
|
|
if ($status->{repair_interval}) { |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# Randomize to avoid congestion |
47
|
0
|
|
|
|
|
|
$status->{repair_interval} -= int rand $status->{repair_interval} / 2; |
48
|
|
|
|
|
|
|
|
49
|
0
|
|
|
|
|
|
$self->{next_repair} = $now; |
50
|
|
|
|
|
|
|
$self->{next_repair} += $status->{repair_interval} |
51
|
0
|
0
|
|
|
|
|
if delete $status->{fast_start}; |
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
|
54
|
0
|
|
|
|
|
|
$self->{pid} = $$; |
55
|
0
|
|
|
0
|
|
|
local $SIG{CHLD} = sub { }; |
56
|
0
|
|
|
0
|
|
|
local $SIG{INT} = local $SIG{TERM} = sub { $self->_term(1) }; |
|
0
|
|
|
|
|
|
|
57
|
0
|
|
|
0
|
|
|
local $SIG{QUIT} = sub { $self->_term }; |
|
0
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
# Remote control commands need to validate arguments carefully |
60
|
0
|
|
|
|
|
|
my $commands = $self->commands; |
61
|
|
|
|
|
|
|
local $commands->{jobs} |
62
|
0
|
0
|
0
|
0
|
|
|
= sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ }; |
|
0
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
local $commands->{stop} |
64
|
0
|
0
|
0
|
0
|
|
|
= sub { $self->{jobs}{$_[1]}->stop if $self->{jobs}{$_[1] // ''} }; |
|
0
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
# Log fatal errors |
67
|
0
|
|
|
|
|
|
my $log = $self->log; |
68
|
0
|
|
|
|
|
|
$log->info("Worker $$ started"); |
69
|
0
|
0
|
|
|
|
|
eval { $self->_work until $self->{finished}; 1 } |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
or $log->fatal("Worker error: $@"); |
71
|
0
|
|
|
|
|
|
$self->unregister; |
72
|
0
|
|
|
|
|
|
$log->info("Worker $$ stopped"); |
73
|
|
|
|
|
|
|
} |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
sub _term { |
76
|
0
|
|
|
0
|
|
|
my ($self, $graceful) = @_; |
77
|
0
|
0
|
|
|
|
|
return unless $self->{pid} == $$; |
78
|
0
|
|
|
|
|
|
$self->{stopping}++; |
79
|
0
|
0
|
|
|
|
|
$self->{graceful} = $graceful or kill 'KILL', keys %{$self->{jobs}}; |
|
0
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
} |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub _ping { |
83
|
0
|
|
|
0
|
|
|
shift->register; |
84
|
|
|
|
|
|
|
} |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
sub _work { |
87
|
0
|
|
|
0
|
|
|
my $self = shift; |
88
|
|
|
|
|
|
|
|
89
|
0
|
|
|
|
|
|
my $log = $self->log; |
90
|
0
|
|
|
|
|
|
my $status = $self->status; |
91
|
|
|
|
|
|
|
|
92
|
0
|
0
|
0
|
|
|
|
if ($self->{stopping} && !$self->{quit}++) { |
93
|
|
|
|
|
|
|
$log->info("Stopping worker $$ " |
94
|
0
|
0
|
|
|
|
|
. ($self->{graceful} ? 'gracefully' : 'immediately')); |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
# Skip hearbeats, remote command and repairs |
97
|
0
|
|
|
|
|
|
delete @{$status}{qw(heartbeat_interval command_interval )} |
98
|
0
|
0
|
|
|
|
|
unless $self->{graceful}; |
99
|
0
|
|
|
|
|
|
delete $status->{repair_interval}; |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# Send heartbeats in regular intervals |
103
|
0
|
0
|
0
|
|
|
|
if ($status->{heartbeat_interval} && $self->{next_heartbeat} < steady_time) { |
104
|
0
|
|
|
|
|
|
$log->debug('Sending heartbeat') if TRACE; |
105
|
0
|
|
|
|
|
|
$self->_ping; |
106
|
0
|
|
|
|
|
|
$self->{next_heartbeat} = steady_time + $status->{heartbeat_interval}; |
107
|
|
|
|
|
|
|
} |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
# Process worker remote control commands in regular intervals |
110
|
0
|
0
|
0
|
|
|
|
if ($status->{command_interval} && $self->{next_command} < steady_time) { |
111
|
0
|
|
|
|
|
|
$log->debug('Checking remote control') if TRACE; |
112
|
0
|
|
|
|
|
|
$self->process_commands; |
113
|
0
|
|
|
|
|
|
$self->{next_command} = steady_time + $status->{command_interval}; |
114
|
|
|
|
|
|
|
} |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
# Repair in regular intervals |
117
|
0
|
0
|
0
|
|
|
|
if ($status->{repair_interval} && $self->{next_repair} < steady_time) { |
118
|
0
|
|
|
|
|
|
$log->debug('Checking worker registry and job queue'); |
119
|
0
|
|
|
|
|
|
$self->minion->repair; |
120
|
0
|
|
|
|
|
|
$self->{next_repair} = steady_time + $status->{repair_interval}; |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
# Check if jobs are finished |
124
|
0
|
|
0
|
|
|
|
my $jobs = $self->{jobs} ||= {}; |
125
|
|
|
|
|
|
|
$jobs->{$_}->is_finished and ++$status->{performed} and delete $jobs->{$_} |
126
|
0
|
|
0
|
|
|
|
for keys %$jobs; |
|
|
|
0
|
|
|
|
|
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
# Return if worker is finished |
129
|
0
|
0
|
0
|
|
|
|
++$self->{finished} and return if $self->{stopping} && !keys %{$self->{jobs}}; |
|
0
|
|
0
|
|
|
|
|
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
# Job limit has been reached or worker is stopping |
132
|
|
|
|
|
|
|
return $self->emit('busy') |
133
|
0
|
0
|
0
|
|
|
|
if ($status->{jobs} <= keys %$jobs) || $self->{stopping}; |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# Try to get more jobs |
136
|
0
|
|
|
|
|
|
my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)}; |
|
0
|
|
|
|
|
|
|
137
|
0
|
0
|
|
|
|
|
if (my $job = $self->emit('wait')->dequeue($max => {queues => $queues})) { |
138
|
0
|
|
|
|
|
|
$jobs->{my $id = $job->id} = $job->start; |
139
|
0
|
|
|
|
|
|
my ($pid, $task) = ($job->pid, $job->task); |
140
|
0
|
|
|
|
|
|
$log->debug(qq{Process $pid is performing job "$id" with task "$task"}); |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
1; |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
__END__ |