| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package MogileFS::ProcManager; |
|
2
|
21
|
|
|
21
|
|
213
|
use strict; |
|
|
21
|
|
|
|
|
44
|
|
|
|
21
|
|
|
|
|
12895
|
|
|
3
|
21
|
|
|
21
|
|
135
|
use warnings; |
|
|
21
|
|
|
|
|
43
|
|
|
|
21
|
|
|
|
|
1239
|
|
|
4
|
21
|
|
|
21
|
|
573
|
use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK); |
|
|
21
|
|
|
|
|
48
|
|
|
|
21
|
|
|
|
|
234
|
|
|
5
|
21
|
|
|
21
|
|
7923
|
use Symbol; |
|
|
21
|
|
|
|
|
40
|
|
|
|
21
|
|
|
|
|
1454
|
|
|
6
|
21
|
|
|
21
|
|
131
|
use Socket; |
|
|
21
|
|
|
|
|
33
|
|
|
|
21
|
|
|
|
|
18663
|
|
|
7
|
21
|
|
|
21
|
|
15849
|
use MogileFS::Connection::Client; |
|
|
21
|
|
|
|
|
80
|
|
|
|
21
|
|
|
|
|
748
|
|
|
8
|
21
|
|
|
21
|
|
25473
|
use MogileFS::Connection::Worker; |
|
|
21
|
|
|
|
|
235
|
|
|
|
21
|
|
|
|
|
692
|
|
|
9
|
21
|
|
|
21
|
|
162
|
use MogileFS::Util qw(apply_state_events); |
|
|
21
|
|
|
|
|
45
|
|
|
|
21
|
|
|
|
|
185630
|
|
|
10
|
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
# This class handles keeping lists of workers and clients and |
|
12
|
|
|
|
|
|
|
# assigning them to each other when things happen. You don't actually |
|
13
|
|
|
|
|
|
|
# instantiate a procmanager. the class itself holds all state. |
|
14
|
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
# Mappings: fd => [ clientref, jobstring, starttime ] |
|
16
|
|
|
|
|
|
|
# queues are just lists of Client class objects |
|
17
|
|
|
|
|
|
|
# ChildrenByJob: job => { pid => $client } |
|
18
|
|
|
|
|
|
|
# ErrorsTo: fid => Client |
|
19
|
|
|
|
|
|
|
# RecentQueries: [ string, string, string, ... ] |
|
20
|
|
|
|
|
|
|
# Stats: element => number |
|
21
|
|
|
|
|
|
|
our ($IsChild, @RecentQueries, |
|
22
|
|
|
|
|
|
|
%Mappings, %ChildrenByJob, %ErrorsTo, %Stats); |
|
23
|
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
our $starttime = time(); # time we got going |
|
25
|
0
|
|
|
0
|
0
|
0
|
sub server_starttime { return $starttime } |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...) |
|
28
|
|
|
|
|
|
|
my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ] |
|
29
|
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
my %idle_workers = (); # 'job' -> {href of idle workers} |
|
31
|
|
|
|
|
|
|
my %pending_work = (); # 'job' -> [aref of pending work] |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
$IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
# keep track of what all child pids are doing, and what jobs are being |
|
36
|
|
|
|
|
|
|
# satisifed. |
|
37
|
|
|
|
|
|
|
my %child = (); # pid -> MogileFS::Connection::Worker |
|
38
|
|
|
|
|
|
|
my %todie = (); # pid -> 1 (lists pids that we've asked to die) |
|
39
|
|
|
|
|
|
|
my %jobs = (); # jobname -> [ min, current ] |
|
40
|
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
# we start job_master after monitor has run, but this avoid undef warning |
|
42
|
|
|
|
|
|
|
# in job_needs_reduction |
|
43
|
|
|
|
|
|
|
$jobs{job_master} = [ 0, 0 ]; |
|
44
|
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies. |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
*error = \&Mgd::error; |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
my $monitor_good = 0; # ticked after monitor executes once after startup |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
my $nowish; # updated approximately once per second |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
# it's pointless to spawn certain jobs without a job_master |
|
56
|
|
|
|
|
|
|
my $want_job_master; |
|
57
|
|
|
|
|
|
|
my %needs_job_master = ( |
|
58
|
|
|
|
|
|
|
delete => 1, |
|
59
|
|
|
|
|
|
|
fsck => 1, |
|
60
|
|
|
|
|
|
|
replicate => 1, |
|
61
|
|
|
|
|
|
|
); |
|
62
|
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
sub push_pre_fork_cleanup { |
|
64
|
0
|
|
|
0
|
0
|
0
|
my ($class, $code) = @_; |
|
65
|
0
|
|
|
|
|
0
|
push @prefork_cleanup, $code; |
|
66
|
|
|
|
|
|
|
} |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
sub RecentQueries { |
|
69
|
0
|
|
|
0
|
0
|
0
|
return @RecentQueries; |
|
70
|
|
|
|
|
|
|
} |
|
71
|
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
sub write_pidfile { |
|
73
|
0
|
|
|
0
|
0
|
0
|
my $class = shift; |
|
74
|
0
|
0
|
|
|
|
0
|
my $pidfile = MogileFS->config("pidfile") |
|
75
|
|
|
|
|
|
|
or return 1; |
|
76
|
0
|
|
|
|
|
0
|
my $fh; |
|
77
|
0
|
0
|
|
|
|
0
|
unless (open($fh, ">$pidfile")) { |
|
78
|
0
|
|
|
|
|
0
|
Mgd::log('err', "couldn't create pidfile '$pidfile': $!"); |
|
79
|
0
|
|
|
|
|
0
|
return 0; |
|
80
|
|
|
|
|
|
|
} |
|
81
|
0
|
0
|
0
|
|
|
0
|
unless ((print $fh "$$\n") && close($fh)) { |
|
82
|
0
|
|
|
|
|
0
|
Mgd::log('err', "couldn't write into pidfile '$pidfile': $!"); |
|
83
|
0
|
|
|
|
|
0
|
remove_pidfile(); |
|
84
|
0
|
|
|
|
|
0
|
return 0; |
|
85
|
|
|
|
|
|
|
} |
|
86
|
0
|
|
|
|
|
0
|
return 1; |
|
87
|
|
|
|
|
|
|
} |
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub remove_pidfile { |
|
90
|
0
|
|
|
0
|
0
|
0
|
my $class = shift; |
|
91
|
0
|
0
|
|
|
|
0
|
my $pidfile = MogileFS->config("pidfile") |
|
92
|
|
|
|
|
|
|
or return; |
|
93
|
0
|
|
|
|
|
0
|
unlink $pidfile; |
|
94
|
0
|
|
|
|
|
0
|
return 1; |
|
95
|
|
|
|
|
|
|
} |
|
96
|
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
sub set_min_workers { |
|
98
|
0
|
|
|
0
|
0
|
0
|
my ($class, $job, $min) = @_; |
|
99
|
0
|
|
0
|
|
|
0
|
$jobs{$job} ||= [undef, 0]; # [min, current] |
|
100
|
0
|
|
|
|
|
0
|
$jobs{$job}->[0] = $min; |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# TODO: set allkipsup false, so spawner re-checks? |
|
103
|
|
|
|
|
|
|
} |
|
104
|
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
sub job_to_class_suffix { |
|
106
|
0
|
|
|
0
|
0
|
0
|
my ($class, $job) = @_; |
|
107
|
|
|
|
|
|
|
return { |
|
108
|
0
|
|
|
|
|
0
|
fsck => "Fsck", |
|
109
|
|
|
|
|
|
|
queryworker => "Query", |
|
110
|
|
|
|
|
|
|
delete => "Delete", |
|
111
|
|
|
|
|
|
|
replicate => "Replicate", |
|
112
|
|
|
|
|
|
|
reaper => "Reaper", |
|
113
|
|
|
|
|
|
|
monitor => "Monitor", |
|
114
|
|
|
|
|
|
|
job_master => "JobMaster", |
|
115
|
|
|
|
|
|
|
}->{$job}; |
|
116
|
|
|
|
|
|
|
} |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
sub job_to_class { |
|
119
|
0
|
|
|
0
|
0
|
0
|
my ($class, $job) = @_; |
|
120
|
0
|
0
|
|
|
|
0
|
my $suffix = $class->job_to_class_suffix($job) or return ""; |
|
121
|
0
|
|
|
|
|
0
|
return "MogileFS::Worker::$suffix"; |
|
122
|
|
|
|
|
|
|
} |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
sub child_pids { |
|
125
|
0
|
|
|
0
|
0
|
0
|
return keys %child; |
|
126
|
|
|
|
|
|
|
} |
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
sub WatchDog { |
|
129
|
0
|
|
|
0
|
0
|
0
|
foreach my $pid (keys %child) { |
|
130
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $child = $child{$pid}; |
|
131
|
0
|
|
|
|
|
0
|
my $healthy = $child->watchdog_check; |
|
132
|
0
|
0
|
|
|
|
0
|
next if $healthy; |
|
133
|
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
# special $todie level of 2 means the watchdog tried to kill it. |
|
135
|
|
|
|
|
|
|
# TODO: Should be a CONSTANT? |
|
136
|
0
|
0
|
0
|
|
|
0
|
next if $todie{$pid} && $todie{$pid} == 2; |
|
137
|
0
|
|
|
|
|
0
|
note_pending_death($child->job, $pid, 2); |
|
138
|
|
|
|
|
|
|
|
|
139
|
0
|
|
|
|
|
0
|
error("Watchdog killing worker $pid (" . $child->job . ")"); |
|
140
|
0
|
|
|
|
|
0
|
kill 9, $pid; |
|
141
|
|
|
|
|
|
|
} |
|
142
|
|
|
|
|
|
|
} |
|
143
|
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
# returns a sub that Danga::Socket calls after each event loop round. |
|
145
|
|
|
|
|
|
|
# the sub must return 1 for the program to continue running. |
|
146
|
|
|
|
|
|
|
sub PostEventLoopChecker { |
|
147
|
0
|
|
|
0
|
0
|
0
|
my $lastspawntime = 0; # time we last ran spawn_children sub |
|
148
|
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
return sub { |
|
150
|
|
|
|
|
|
|
# run only once per second |
|
151
|
0
|
|
|
0
|
|
0
|
$nowish = time(); |
|
152
|
0
|
0
|
|
|
|
0
|
return 1 unless $nowish > $lastspawntime; |
|
153
|
0
|
|
|
|
|
0
|
$lastspawntime = $nowish; |
|
154
|
|
|
|
|
|
|
|
|
155
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->WatchDog; |
|
156
|
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
# see if anybody has died, but don't hang up on doing so |
|
158
|
0
|
|
|
|
|
0
|
while(my $pid = waitpid -1, WNOHANG) { |
|
159
|
0
|
0
|
|
|
|
0
|
last unless $pid > 0; |
|
160
|
0
|
|
|
|
|
0
|
$allkidsup = 0; # know something died |
|
161
|
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
# when a child dies, figure out what it was doing |
|
163
|
|
|
|
|
|
|
# and note that job has one less worker |
|
164
|
0
|
|
|
|
|
0
|
my $jobconn; |
|
165
|
0
|
0
|
|
|
|
0
|
if (($jobconn = delete $child{$pid})) { |
|
166
|
0
|
|
|
|
|
0
|
my $job = $jobconn->job; |
|
167
|
0
|
0
|
|
|
|
0
|
my $extra = $todie{$pid} ? "expected" : "UNEXPECTED"; |
|
168
|
0
|
|
|
|
|
0
|
error("Child $pid ($job) died: $? ($extra)"); |
|
169
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->NoteDeadChild($pid); |
|
170
|
0
|
|
|
|
|
0
|
$jobconn->close; |
|
171
|
|
|
|
|
|
|
|
|
172
|
0
|
0
|
|
|
|
0
|
if (my $jobstat = $jobs{$job}) { |
|
173
|
|
|
|
|
|
|
# if the pid is in %todie, then we have asked it to shut down |
|
174
|
|
|
|
|
|
|
# and have already decremented the jobstat counter and don't |
|
175
|
|
|
|
|
|
|
# want to do it again |
|
176
|
0
|
0
|
|
|
|
0
|
unless (my $true = delete $todie{$pid}) { |
|
177
|
|
|
|
|
|
|
# decrement the count of currently running jobs |
|
178
|
0
|
|
|
|
|
0
|
$jobstat->[1]--; |
|
179
|
|
|
|
|
|
|
} |
|
180
|
|
|
|
|
|
|
} |
|
181
|
|
|
|
|
|
|
} |
|
182
|
|
|
|
|
|
|
} |
|
183
|
|
|
|
|
|
|
|
|
184
|
0
|
0
|
|
|
|
0
|
return 1 if $allkidsup; |
|
185
|
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
# foreach job, fork enough children |
|
187
|
0
|
|
|
|
|
0
|
while (my ($job, $jobstat) = each %jobs) { |
|
188
|
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
# do not spawn job_master-dependent workers if we have no job_master |
|
190
|
0
|
0
|
0
|
|
|
0
|
next if (! $want_job_master && $needs_job_master{$job}); |
|
191
|
|
|
|
|
|
|
|
|
192
|
0
|
|
|
|
|
0
|
my $need = $jobstat->[0] - $jobstat->[1]; |
|
193
|
0
|
0
|
|
|
|
0
|
if ($need > 0) { |
|
194
|
0
|
|
|
|
|
0
|
error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need."); |
|
195
|
0
|
|
|
|
|
0
|
for (1..$need) { |
|
196
|
0
|
0
|
|
|
|
0
|
my $jobconn = make_new_child($job) |
|
197
|
|
|
|
|
|
|
or return 1; # basically bail: true value keeps event loop running |
|
198
|
0
|
|
|
|
|
0
|
$child{$jobconn->pid} = $jobconn; |
|
199
|
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
# now increase the count of processes currently doing this job |
|
201
|
0
|
|
|
|
|
0
|
$jobstat->[1]++; |
|
202
|
|
|
|
|
|
|
} |
|
203
|
|
|
|
|
|
|
} |
|
204
|
|
|
|
|
|
|
} |
|
205
|
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
# if we got this far, all jobs have been re-created. note that |
|
207
|
|
|
|
|
|
|
# so we avoid more CPU usage in this post-event-loop callback later |
|
208
|
0
|
|
|
|
|
0
|
$allkidsup = 1; |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
# true value keeps us running: |
|
211
|
0
|
|
|
|
|
0
|
return 1; |
|
212
|
0
|
|
|
|
|
0
|
}; |
|
213
|
|
|
|
|
|
|
} |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
sub make_new_child { |
|
216
|
0
|
|
|
0
|
0
|
0
|
my $job = shift; |
|
217
|
|
|
|
|
|
|
|
|
218
|
0
|
|
|
|
|
0
|
my $pid; |
|
219
|
|
|
|
|
|
|
my $sigset; |
|
220
|
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
# Ensure our dbh is closed before we fork anything. |
|
222
|
|
|
|
|
|
|
# Causes problems on some platforms (Solaris+Postgres) |
|
223
|
0
|
|
|
|
|
0
|
Mgd::close_store(); |
|
224
|
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
# block signal for fork |
|
226
|
0
|
|
|
|
|
0
|
$sigset = POSIX::SigSet->new(SIGINT); |
|
227
|
0
|
0
|
|
|
|
0
|
sigprocmask(SIG_BLOCK, $sigset) |
|
228
|
|
|
|
|
|
|
or return error("Can't block SIGINT for fork: $!"); |
|
229
|
|
|
|
|
|
|
|
|
230
|
0
|
0
|
|
|
|
0
|
socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) |
|
231
|
|
|
|
|
|
|
or die( "socketpair failed: $!" ); |
|
232
|
|
|
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
0
|
return error("fork failed creating $job: $!") |
|
234
|
|
|
|
|
|
|
unless defined ($pid = fork); |
|
235
|
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
# enable auto-flush, so it's not pipe-buffered between parent/child |
|
237
|
0
|
|
|
|
|
0
|
select((select( $parents_ipc ), $|++)[0]); |
|
238
|
0
|
|
|
|
|
0
|
select((select( $childs_ipc ), $|++)[0]); |
|
239
|
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
# if i'm the parent |
|
241
|
0
|
0
|
|
|
|
0
|
if ($pid) { |
|
242
|
0
|
0
|
|
|
|
0
|
sigprocmask(SIG_UNBLOCK, $sigset) |
|
243
|
|
|
|
|
|
|
or return error("Can't unblock SIGINT for fork: $!"); |
|
244
|
|
|
|
|
|
|
|
|
245
|
0
|
|
|
|
|
0
|
close($childs_ipc); # unnecessary but explicit |
|
246
|
0
|
|
|
|
|
0
|
IO::Handle::blocking($parents_ipc, 0); |
|
247
|
|
|
|
|
|
|
|
|
248
|
0
|
|
|
|
|
0
|
my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc); |
|
249
|
0
|
|
|
|
|
0
|
$worker_conn->pid($pid); |
|
250
|
0
|
|
|
|
|
0
|
$worker_conn->job($job); |
|
251
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->RegisterWorkerConn($worker_conn); |
|
252
|
0
|
|
|
|
|
0
|
return $worker_conn; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
# let children have different random number seeds |
|
256
|
0
|
|
|
|
|
0
|
srand(); |
|
257
|
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
# as a child, we want to close these and ignore them |
|
259
|
0
|
|
|
|
|
0
|
$_->() foreach @prefork_cleanup; |
|
260
|
0
|
|
|
|
|
0
|
close($parents_ipc); |
|
261
|
0
|
|
|
|
|
0
|
undef $parents_ipc; |
|
262
|
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
0
|
$SIG{INT} = 'DEFAULT'; |
|
264
|
0
|
|
|
|
|
0
|
$SIG{TERM} = 'DEFAULT'; |
|
265
|
0
|
|
|
|
|
0
|
$0 .= " [$job]"; |
|
266
|
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
# unblock signals |
|
268
|
0
|
0
|
|
|
|
0
|
sigprocmask(SIG_UNBLOCK, $sigset) |
|
269
|
|
|
|
|
|
|
or return error("Can't unblock SIGINT for fork: $!"); |
|
270
|
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
# now call our job function |
|
272
|
0
|
0
|
|
|
|
0
|
my $class = MogileFS::ProcManager->job_to_class($job) |
|
273
|
|
|
|
|
|
|
or die "No worker class defined for job '$job'\n"; |
|
274
|
0
|
|
|
|
|
0
|
my $worker = $class->new($childs_ipc); |
|
275
|
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
# set our frontend into child mode |
|
277
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->SetAsChild($worker); |
|
278
|
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
0
|
$worker->work; |
|
280
|
0
|
|
|
|
|
0
|
exit 0; |
|
281
|
|
|
|
|
|
|
} |
|
282
|
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
sub PendingQueryCount { |
|
284
|
0
|
|
|
0
|
0
|
0
|
return scalar @PendingQueries; |
|
285
|
|
|
|
|
|
|
} |
|
286
|
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
sub BoredQueryWorkerCount { |
|
288
|
0
|
|
|
0
|
0
|
0
|
return scalar @IdleQueryWorkers; |
|
289
|
|
|
|
|
|
|
} |
|
290
|
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
sub QueriesInProgressCount { |
|
292
|
0
|
|
|
0
|
0
|
0
|
return scalar keys %Mappings; |
|
293
|
|
|
|
|
|
|
} |
|
294
|
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
# Toss in any queue depths. |
|
296
|
|
|
|
|
|
|
sub StatsHash { |
|
297
|
0
|
|
|
0
|
0
|
0
|
for my $job (keys %pending_work) { |
|
298
|
0
|
|
|
|
|
0
|
$Stats{'work_queue_for_' . $job} = @{$pending_work{$job}}; |
|
|
0
|
|
|
|
|
0
|
|
|
299
|
|
|
|
|
|
|
} |
|
300
|
0
|
|
|
|
|
0
|
return \%Stats; |
|
301
|
|
|
|
|
|
|
} |
|
302
|
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
sub foreach_job { |
|
304
|
0
|
|
|
0
|
0
|
0
|
my ($class, $cb) = @_; |
|
305
|
0
|
|
|
|
|
0
|
foreach my $job (sort keys %ChildrenByJob) { |
|
306
|
0
|
|
|
|
|
0
|
my $ct = scalar(keys %{$ChildrenByJob{$job}}); |
|
|
0
|
|
|
|
|
0
|
|
|
307
|
0
|
|
|
|
|
0
|
$cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]); |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
} |
|
310
|
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
sub foreach_pending_query { |
|
312
|
0
|
|
|
0
|
0
|
0
|
my ($class, $cb) = @_; |
|
313
|
0
|
|
|
|
|
0
|
foreach my $clq (@PendingQueries) { |
|
314
|
0
|
|
|
|
|
0
|
$cb->($clq->[0], # client object, |
|
315
|
|
|
|
|
|
|
$clq->[1], # "$ip $query" |
|
316
|
|
|
|
|
|
|
); |
|
317
|
|
|
|
|
|
|
} |
|
318
|
|
|
|
|
|
|
} |
|
319
|
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
sub is_monitor_good { |
|
321
|
0
|
|
|
0
|
0
|
0
|
return $monitor_good; |
|
322
|
|
|
|
|
|
|
} |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
sub is_valid_job { |
|
325
|
0
|
|
|
0
|
0
|
0
|
my ($class, $job) = @_; |
|
326
|
0
|
|
|
|
|
0
|
return defined $jobs{$job}; |
|
327
|
|
|
|
|
|
|
} |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
sub valid_jobs { |
|
330
|
0
|
|
|
0
|
0
|
0
|
return sort keys %jobs; |
|
331
|
|
|
|
|
|
|
} |
|
332
|
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
sub request_job_process { |
|
334
|
0
|
|
|
0
|
0
|
0
|
my ($class, $job, $n) = @_; |
|
335
|
0
|
0
|
|
|
|
0
|
return 0 unless $class->is_valid_job($job); |
|
336
|
0
|
0
|
0
|
|
|
0
|
return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case |
|
337
|
|
|
|
|
|
|
|
|
338
|
0
|
0
|
|
|
|
0
|
$want_job_master = $n if ($job eq "job_master"); |
|
339
|
|
|
|
|
|
|
|
|
340
|
0
|
|
|
|
|
0
|
$jobs{$job}->[0] = $n; |
|
341
|
0
|
|
|
|
|
0
|
$allkidsup = 0; |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
# try to clean out the queryworkers (if that's what we're doing?) |
|
344
|
0
|
0
|
|
|
|
0
|
MogileFS::ProcManager->CullQueryWorkers |
|
345
|
|
|
|
|
|
|
if $job eq 'queryworker'; |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
# other workers listening off of a queue should be pinging parent |
|
348
|
|
|
|
|
|
|
# frequently. shouldn't explicitly kill them. |
|
349
|
|
|
|
|
|
|
} |
|
350
|
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
# when a child is spawned, they'll have copies of all the data from the |
|
353
|
|
|
|
|
|
|
# parent, but they don't need it. this method is called when you want |
|
354
|
|
|
|
|
|
|
# to indicate that this procmanager is running on a child and should clean. |
|
355
|
|
|
|
|
|
|
sub SetAsChild { |
|
356
|
0
|
|
|
0
|
0
|
0
|
my ($class, $worker) = @_; |
|
357
|
|
|
|
|
|
|
|
|
358
|
0
|
|
|
|
|
0
|
@IdleQueryWorkers = (); |
|
359
|
0
|
|
|
|
|
0
|
@PendingQueries = (); |
|
360
|
0
|
|
|
|
|
0
|
%Mappings = (); |
|
361
|
0
|
|
|
|
|
0
|
$IsChild = $worker; |
|
362
|
0
|
|
|
|
|
0
|
%ErrorsTo = (); |
|
363
|
0
|
|
|
|
|
0
|
%idle_workers = (); |
|
364
|
0
|
|
|
|
|
0
|
%pending_work = (); |
|
365
|
0
|
|
|
|
|
0
|
%ChildrenByJob = (); |
|
366
|
0
|
|
|
|
|
0
|
%child = (); |
|
367
|
0
|
|
|
|
|
0
|
%todie = (); |
|
368
|
0
|
|
|
|
|
0
|
%jobs = (); |
|
369
|
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
# we just forked from our parent process, also using Danga::Socket, |
|
371
|
|
|
|
|
|
|
# so we need to lose all that state and start afresh. |
|
372
|
0
|
|
|
|
|
0
|
Danga::Socket->Reset; |
|
373
|
|
|
|
|
|
|
} |
|
374
|
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
# called when a child has died. a child is someone doing a job for us, |
|
376
|
|
|
|
|
|
|
# but it might be a queryworker or any other type of job. we just want |
|
377
|
|
|
|
|
|
|
# to remove them from our list of children. they're actually respawned |
|
378
|
|
|
|
|
|
|
# by the make_new_child function elsewhere in Mgd. |
|
379
|
|
|
|
|
|
|
sub NoteDeadChild { |
|
380
|
0
|
|
|
0
|
0
|
0
|
my $pid = $_[1]; |
|
381
|
0
|
|
|
|
|
0
|
foreach my $job (keys %ChildrenByJob) { |
|
382
|
|
|
|
|
|
|
return if # bail out if we actually delete one |
|
383
|
0
|
0
|
|
|
|
0
|
delete $ChildrenByJob{$job}->{$pid}; |
|
384
|
|
|
|
|
|
|
} |
|
385
|
|
|
|
|
|
|
} |
|
386
|
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
# called when a client dies. clients are users, management or non. |
|
388
|
|
|
|
|
|
|
# we just want to remove them from the error reporting interface, if |
|
389
|
|
|
|
|
|
|
# they happen to be part of it. |
|
390
|
|
|
|
|
|
|
sub NoteDeadClient { |
|
391
|
0
|
|
|
0
|
0
|
0
|
my $client = $_[1]; |
|
392
|
0
|
|
|
|
|
0
|
delete $ErrorsTo{$client->{fd}}; |
|
393
|
|
|
|
|
|
|
} |
|
394
|
|
|
|
|
|
|
|
|
395
|
|
|
|
|
|
|
# called when the error function in Mgd is called and we're in the parent, |
|
396
|
|
|
|
|
|
|
# so it's pretty simple that basically we just spit it out to folks listening |
|
397
|
|
|
|
|
|
|
# to errors |
|
398
|
|
|
|
|
|
|
sub NoteError { |
|
399
|
4
|
50
|
|
4
|
0
|
38
|
return unless %ErrorsTo; |
|
400
|
|
|
|
|
|
|
|
|
401
|
0
|
|
|
|
|
0
|
my $msg = ":: ${$_[1]}\r\n"; |
|
|
0
|
|
|
|
|
0
|
|
|
402
|
0
|
|
|
|
|
0
|
foreach my $client (values %ErrorsTo) { |
|
403
|
0
|
|
|
|
|
0
|
$client->write(\$msg); |
|
404
|
|
|
|
|
|
|
} |
|
405
|
|
|
|
|
|
|
} |
|
406
|
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
sub RemoveErrorWatcher { |
|
408
|
0
|
|
|
0
|
0
|
0
|
my ($class, $client) = @_; |
|
409
|
0
|
|
|
|
|
0
|
return delete $ErrorsTo{$client->{fd}}; |
|
410
|
|
|
|
|
|
|
} |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
sub AddErrorWatcher { |
|
413
|
0
|
|
|
0
|
0
|
0
|
my ($class, $client) = @_; |
|
414
|
0
|
|
|
|
|
0
|
$ErrorsTo{$client->{fd}} = $client; |
|
415
|
|
|
|
|
|
|
} |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
# one-time initialization of a new worker connection |
|
418
|
|
|
|
|
|
|
sub RegisterWorkerConn { |
|
419
|
0
|
|
|
0
|
0
|
0
|
my MogileFS::Connection::Worker $worker = $_[1]; |
|
420
|
0
|
|
|
|
|
0
|
$worker->watch_read(1); |
|
421
|
|
|
|
|
|
|
|
|
422
|
|
|
|
|
|
|
#warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid); |
|
423
|
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
# now do any special case startup |
|
425
|
0
|
0
|
|
|
|
0
|
if ($worker->job eq 'queryworker') { |
|
426
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->NoteIdleQueryWorker($worker); |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
# add to normal list |
|
430
|
0
|
|
|
|
|
0
|
$ChildrenByJob{$worker->job}->{$worker->pid} = $worker; |
|
431
|
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
} |
|
433
|
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
sub EnqueueCommandRequest { |
|
435
|
0
|
|
|
0
|
0
|
0
|
my ($class, $line, $client) = @_; |
|
436
|
0
|
|
0
|
|
|
0
|
push @PendingQueries, [ |
|
437
|
|
|
|
|
|
|
$client, |
|
438
|
|
|
|
|
|
|
($client->peer_ip_string || '0.0.0.0') . " $line" |
|
439
|
|
|
|
|
|
|
]; |
|
440
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->ProcessQueues; |
|
441
|
0
|
0
|
|
|
|
0
|
if (@PendingQueries) { |
|
442
|
|
|
|
|
|
|
# Don't like the name. Feel free to change if you find better. |
|
443
|
0
|
|
|
|
|
0
|
$Stats{times_out_of_qworkers}++; |
|
444
|
|
|
|
|
|
|
} |
|
445
|
|
|
|
|
|
|
} |
|
446
|
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
# puts a worker back in the queue, deleting any outstanding jobs in |
|
448
|
|
|
|
|
|
|
# the mapping list for this fd. |
|
449
|
|
|
|
|
|
|
sub NoteIdleQueryWorker { |
|
450
|
|
|
|
|
|
|
# first arg is class, second is worker |
|
451
|
0
|
|
|
0
|
0
|
0
|
my MogileFS::Connection::Worker $worker = $_[1]; |
|
452
|
0
|
|
|
|
|
0
|
delete $Mappings{$worker->{fd}}; |
|
453
|
|
|
|
|
|
|
|
|
454
|
|
|
|
|
|
|
# see if we need to kill off some workers |
|
455
|
0
|
0
|
|
|
|
0
|
if (job_needs_reduction('queryworker')) { |
|
456
|
0
|
|
|
|
|
0
|
Mgd::error("Reducing queryworker headcount by 1."); |
|
457
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->AskWorkerToDie($worker); |
|
458
|
0
|
|
|
|
|
0
|
return; |
|
459
|
|
|
|
|
|
|
} |
|
460
|
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
# must be okay, so put it in the queue |
|
462
|
0
|
|
|
|
|
0
|
push @IdleQueryWorkers, $worker; |
|
463
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->ProcessQueues; |
|
464
|
|
|
|
|
|
|
} |
|
465
|
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
# if we need to kill off a worker, this function takes in the WorkerConn |
|
467
|
|
|
|
|
|
|
# object, tells it to die, marks us as having requested its death, and decrements |
|
468
|
|
|
|
|
|
|
# the count of running jobs. |
|
469
|
|
|
|
|
|
|
sub AskWorkerToDie { |
|
470
|
0
|
|
|
0
|
0
|
0
|
my MogileFS::Connection::Worker $worker = $_[1]; |
|
471
|
0
|
|
|
|
|
0
|
note_pending_death($worker->job, $worker->pid); |
|
472
|
0
|
|
|
|
|
0
|
$worker->write(":shutdown\r\n"); |
|
473
|
|
|
|
|
|
|
} |
|
474
|
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
# kill bored query workers so we can get down to the level requested. this |
|
476
|
|
|
|
|
|
|
# continues killing until we run out of folks to kill. |
|
477
|
|
|
|
|
|
|
sub CullQueryWorkers { |
|
478
|
0
|
|
0
|
0
|
0
|
0
|
while (@IdleQueryWorkers && job_needs_reduction('queryworker')) { |
|
479
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers; |
|
480
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->AskWorkerToDie($worker); |
|
481
|
|
|
|
|
|
|
} |
|
482
|
|
|
|
|
|
|
} |
|
483
|
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
# called when we get a response from a worker. this reenqueues the |
|
485
|
|
|
|
|
|
|
# worker so it can handle another response as well as passes the answer |
|
486
|
|
|
|
|
|
|
# back on to the client. |
|
487
|
|
|
|
|
|
|
sub HandleQueryWorkerResponse { |
|
488
|
|
|
|
|
|
|
# got a response from a worker |
|
489
|
0
|
|
|
0
|
0
|
0
|
my MogileFS::Connection::Worker $worker; |
|
490
|
|
|
|
|
|
|
my $line; |
|
491
|
0
|
|
|
|
|
0
|
(undef, $worker, $line) = @_; |
|
492
|
|
|
|
|
|
|
|
|
493
|
0
|
0
|
|
|
|
0
|
return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild; |
|
494
|
0
|
0
|
0
|
|
|
0
|
return unless $worker && $Mappings{$worker->{fd}}; |
|
495
|
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
# get the client we're working with (if any) |
|
497
|
0
|
|
|
|
|
0
|
my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} }; |
|
|
0
|
|
|
|
|
0
|
|
|
498
|
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
# if we have no client, then we just got a standard message from |
|
500
|
|
|
|
|
|
|
# the queryworker and need to pass it up the line |
|
501
|
0
|
0
|
|
|
|
0
|
return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client; |
|
502
|
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
# at this point it was a command response, but if the client has gone |
|
504
|
|
|
|
|
|
|
# away, just reenqueue this query worker |
|
505
|
0
|
0
|
|
|
|
0
|
return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed}; |
|
506
|
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
# [client-side time to complete] |
|
508
|
0
|
|
|
|
|
0
|
my ($time, $id, $res); |
|
509
|
0
|
0
|
|
|
|
0
|
if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) { |
|
510
|
|
|
|
|
|
|
# save time and response for use later |
|
511
|
|
|
|
|
|
|
# Note the optional negative sign in the regexp. Somebody |
|
512
|
|
|
|
|
|
|
# on the mailing list was getting a time of -0.0000, causing |
|
513
|
|
|
|
|
|
|
# broken connections. |
|
514
|
0
|
|
|
|
|
0
|
($id, $time, $res) = ($1, $2, $3); |
|
515
|
|
|
|
|
|
|
} |
|
516
|
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# now, if it doesn't match |
|
518
|
0
|
0
|
0
|
|
|
0
|
unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") { |
|
519
|
0
|
0
|
|
|
|
0
|
$id = "" unless defined $id; |
|
520
|
0
|
0
|
|
|
|
0
|
$line = "" unless defined $line; |
|
521
|
0
|
|
|
|
|
0
|
$line =~ s/\n/\\n/g; |
|
522
|
0
|
|
|
|
|
0
|
$line =~ s/\r/\\r/g; |
|
523
|
0
|
|
|
|
|
0
|
Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing"); |
|
524
|
0
|
|
|
|
|
0
|
$client->close('worker_mismatch'); |
|
525
|
0
|
|
|
|
|
0
|
return MogileFS::ProcManager->AskWorkerToDie($worker); |
|
526
|
|
|
|
|
|
|
} |
|
527
|
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
# now time this interval and add to @RecentQueries |
|
529
|
0
|
|
|
|
|
0
|
my $tinterval = Time::HiRes::time() - $starttime; |
|
530
|
0
|
|
|
|
|
0
|
push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time); |
|
531
|
0
|
0
|
|
|
|
0
|
shift @RecentQueries if scalar(@RecentQueries) > 50; |
|
532
|
|
|
|
|
|
|
|
|
533
|
|
|
|
|
|
|
# send text to client, put worker back in queue |
|
534
|
0
|
|
|
|
|
0
|
$client->write("$res\r\n"); |
|
535
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->NoteIdleQueryWorker($worker); |
|
536
|
|
|
|
|
|
|
} |
|
537
|
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
# new per-worker magic internal queue runner. |
|
539
|
|
|
|
|
|
|
# TODO: Since this fires only when a master asks or a worker reports |
|
540
|
|
|
|
|
|
|
# in bored, it should just operate on that *one* queue? |
|
541
|
|
|
|
|
|
|
# |
|
542
|
|
|
|
|
|
|
# new change: if worker in $job, but not in _bored, do not send work. |
|
543
|
|
|
|
|
|
|
# if work is received, only delete from _bored |
|
544
|
|
|
|
|
|
|
sub process_worker_queues { |
|
545
|
0
|
0
|
|
0
|
0
|
0
|
return if $IsChild; |
|
546
|
|
|
|
|
|
|
|
|
547
|
0
|
|
|
|
|
0
|
JOB: while (my ($job, $queue) = each %pending_work) { |
|
548
|
0
|
0
|
|
|
|
0
|
next JOB unless @$queue; |
|
549
|
0
|
0
|
0
|
|
|
0
|
next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}}; |
|
|
0
|
|
|
|
|
0
|
|
|
550
|
0
|
|
|
|
|
0
|
WORKER: for my $worker_key (keys %{$idle_workers{$job}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
551
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $worker = |
|
552
|
|
|
|
|
|
|
delete $idle_workers{_bored}->{$worker_key}; |
|
553
|
0
|
0
|
0
|
|
|
0
|
if (!defined $worker || $worker->{closed}) { |
|
554
|
0
|
|
|
|
|
0
|
delete $idle_workers{$job}->{$worker_key}; |
|
555
|
0
|
|
|
|
|
0
|
next WORKER; |
|
556
|
|
|
|
|
|
|
} |
|
557
|
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
# allow workers to grab a linear range of work. |
|
559
|
0
|
|
0
|
|
|
0
|
while (@$queue && $worker->wants_todo($job)) { |
|
560
|
0
|
|
|
|
|
0
|
$worker->write(":queue_todo $job " . shift(@$queue) . "\r\n"); |
|
561
|
0
|
|
|
|
|
0
|
$Stats{'work_sent_to_' . $job}++; |
|
562
|
|
|
|
|
|
|
} |
|
563
|
0
|
0
|
|
|
|
0
|
next JOB unless @$queue; |
|
564
|
|
|
|
|
|
|
} |
|
565
|
|
|
|
|
|
|
} |
|
566
|
|
|
|
|
|
|
} |
|
567
|
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
# called from various spots to empty the queues of available pairs. |
|
569
|
|
|
|
|
|
|
sub ProcessQueues { |
|
570
|
0
|
0
|
|
0
|
0
|
0
|
return if $IsChild; |
|
571
|
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
# try to match up a client with a worker |
|
573
|
0
|
|
0
|
|
|
0
|
while (@IdleQueryWorkers && @PendingQueries) { |
|
574
|
|
|
|
|
|
|
# get client that isn't closed |
|
575
|
0
|
|
|
|
|
0
|
my $clref; |
|
576
|
0
|
|
0
|
|
|
0
|
while (!$clref && @PendingQueries) { |
|
577
|
0
|
0
|
|
|
|
0
|
$clref = shift @PendingQueries |
|
578
|
|
|
|
|
|
|
or next; |
|
579
|
0
|
0
|
|
|
|
0
|
if ($clref->[0]->{closed}) { |
|
580
|
0
|
|
|
|
|
0
|
$clref = undef; |
|
581
|
0
|
|
|
|
|
0
|
next; |
|
582
|
|
|
|
|
|
|
} |
|
583
|
|
|
|
|
|
|
} |
|
584
|
0
|
0
|
|
|
|
0
|
next unless $clref; |
|
585
|
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
# get worker and make sure it's not closed already |
|
587
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers; |
|
588
|
0
|
0
|
0
|
|
|
0
|
if (!defined $worker || $worker->{closed}) { |
|
589
|
0
|
|
|
|
|
0
|
unshift @PendingQueries, $clref; |
|
590
|
0
|
|
|
|
|
0
|
next; |
|
591
|
|
|
|
|
|
|
} |
|
592
|
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
# put in mapping and send data to worker |
|
594
|
0
|
|
|
|
|
0
|
push @$clref, Time::HiRes::time(); |
|
595
|
0
|
|
|
|
|
0
|
$Mappings{$worker->{fd}} = $clref; |
|
596
|
0
|
|
|
|
|
0
|
$Stats{queries}++; |
|
597
|
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
# increment our counter so we know what request counter this is going out |
|
599
|
0
|
|
|
|
|
0
|
$worker->{reqid}++; |
|
600
|
|
|
|
|
|
|
# so we're writing a string of the form: |
|
601
|
|
|
|
|
|
|
# 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n |
|
602
|
0
|
|
|
|
|
0
|
$worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n"); |
|
603
|
|
|
|
|
|
|
} |
|
604
|
|
|
|
|
|
|
} |
|
605
|
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
# send short descriptions of commands we support to the user |
|
607
|
|
|
|
|
|
|
sub SendHelp { |
|
608
|
0
|
|
|
0
|
0
|
0
|
my $client = $_[1]; |
|
609
|
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
# send general purpose help |
|
611
|
0
|
|
|
|
|
0
|
$client->write(<
|
|
612
|
|
|
|
|
|
|
Mogilefsd admin commands: |
|
613
|
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
!version Server version |
|
615
|
|
|
|
|
|
|
!recent Recently executed queries and how long they took. |
|
616
|
|
|
|
|
|
|
!queue Queries that are pending execution. |
|
617
|
|
|
|
|
|
|
!stats General stats on what we\'re up to. |
|
618
|
|
|
|
|
|
|
!watch Observe errors/messages from children. |
|
619
|
|
|
|
|
|
|
!jobs Outstanding job counts, desired level, and pids. |
|
620
|
|
|
|
|
|
|
!shutdown Immediately kill all of mogilefsd. |
|
621
|
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
!to |
|
623
|
|
|
|
|
|
|
Send to all workers of . |
|
624
|
|
|
|
|
|
|
Mostly used for debugging. |
|
625
|
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
!want |
|
627
|
|
|
|
|
|
|
Alter the level of workers of this class desired. |
|
628
|
|
|
|
|
|
|
Example: !want 20 queryworker, !want 3 replicate. |
|
629
|
|
|
|
|
|
|
See !jobs for what jobs are available. |
|
630
|
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
HELP |
|
632
|
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
} |
|
634
|
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
# a child has contacted us with some command/status/something. |
|
636
|
|
|
|
|
|
|
sub HandleChildRequest { |
|
637
|
0
|
0
|
|
0
|
0
|
0
|
if ($IsChild) { |
|
638
|
0
|
|
|
|
|
0
|
Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children"); |
|
639
|
|
|
|
|
|
|
} |
|
640
|
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
# if they have no job set, then their first line is what job they are |
|
642
|
|
|
|
|
|
|
# and not a command. they also specify their pid, just so we know what |
|
643
|
|
|
|
|
|
|
# connection goes with what pid, in case it's ever useful information. |
|
644
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $child = $_[1]; |
|
645
|
0
|
|
|
|
|
0
|
my $cmd = $_[2]; |
|
646
|
|
|
|
|
|
|
|
|
647
|
0
|
0
|
|
|
|
0
|
die "Child $child with no pid?" unless $child->job; |
|
648
|
|
|
|
|
|
|
|
|
649
|
|
|
|
|
|
|
# at this point we've got a command of some sort |
|
650
|
0
|
0
|
|
|
|
0
|
if ($cmd =~ /^error (.+)$/i) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
651
|
|
|
|
|
|
|
# pass it on to our error handler, prefaced with the child's job |
|
652
|
0
|
|
|
|
|
0
|
Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1"); |
|
653
|
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
} elsif ($cmd =~ /^debug (.+)$/i) { |
|
655
|
|
|
|
|
|
|
# pass it on to our error handler, prefaced with the child's job |
|
656
|
0
|
|
|
|
|
0
|
Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1"); |
|
657
|
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
} elsif ($cmd =~ /^queue_depth (\w+)/) { |
|
659
|
0
|
|
|
|
|
0
|
my $job = $1; |
|
660
|
0
|
0
|
|
|
|
0
|
if ($job eq 'all') { |
|
661
|
0
|
|
|
|
|
0
|
for my $qname (keys %pending_work) { |
|
662
|
0
|
|
|
|
|
0
|
my $depth = @{$pending_work{$qname}}; |
|
|
0
|
|
|
|
|
0
|
|
|
663
|
0
|
|
|
|
|
0
|
$child->write(":queue_depth $qname $depth\r\n"); |
|
664
|
|
|
|
|
|
|
} |
|
665
|
|
|
|
|
|
|
} else { |
|
666
|
0
|
|
|
|
|
0
|
my $depth = 0; |
|
667
|
0
|
0
|
|
|
|
0
|
if ($pending_work{$job}) { |
|
668
|
0
|
|
|
|
|
0
|
$depth = @{$pending_work{$job}}; |
|
|
0
|
|
|
|
|
0
|
|
|
669
|
|
|
|
|
|
|
} |
|
670
|
0
|
|
|
|
|
0
|
$child->write(":queue_depth $job $depth\r\n"); |
|
671
|
|
|
|
|
|
|
} |
|
672
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->process_worker_queues; |
|
673
|
|
|
|
|
|
|
} elsif ($cmd =~ /^queue_todo (\w+) (.+)/) { |
|
674
|
0
|
|
|
|
|
0
|
my $job = $1; |
|
675
|
0
|
|
0
|
|
|
0
|
$pending_work{$job} ||= []; |
|
676
|
0
|
|
|
|
|
0
|
push(@{$pending_work{$job}}, $2); |
|
|
0
|
|
|
|
|
0
|
|
|
677
|
|
|
|
|
|
|
# Don't process queues immediately, to allow batch processing. |
|
678
|
|
|
|
|
|
|
} elsif ($cmd =~ /^worker_bored (\d+) (.+)/) { |
|
679
|
0
|
|
|
|
|
0
|
my $batch = $1; |
|
680
|
0
|
|
|
|
|
0
|
my $types = $2; |
|
681
|
0
|
0
|
|
|
|
0
|
if (job_needs_reduction($child->job)) { |
|
682
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->AskWorkerToDie($child); |
|
683
|
|
|
|
|
|
|
} else { |
|
684
|
0
|
0
|
|
|
|
0
|
unless (exists $idle_workers{$child->job}) { |
|
685
|
0
|
|
|
|
|
0
|
$idle_workers{$child->job} = {}; |
|
686
|
|
|
|
|
|
|
} |
|
687
|
0
|
|
0
|
|
|
0
|
$idle_workers{_bored} ||= {}; |
|
688
|
0
|
|
|
|
|
0
|
$idle_workers{_bored}->{$child} = $child; |
|
689
|
0
|
|
|
|
|
0
|
for my $type (split(/\s+/, $types)) { |
|
690
|
0
|
|
0
|
|
|
0
|
$idle_workers{$type} ||= {}; |
|
691
|
0
|
|
|
|
|
0
|
$idle_workers{$type}->{$child}++; |
|
692
|
0
|
|
|
|
|
0
|
$child->wants_todo($type, $batch); |
|
693
|
|
|
|
|
|
|
} |
|
694
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->process_worker_queues; |
|
695
|
|
|
|
|
|
|
} |
|
696
|
|
|
|
|
|
|
} elsif ($cmd eq ":ping") { |
|
697
|
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
# warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time()); |
|
699
|
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
# this command expects a reply, either to die or stay alive. beginning of worker's loops |
|
701
|
0
|
0
|
|
|
|
0
|
if (job_needs_reduction($child->job)) { |
|
702
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->AskWorkerToDie($child); |
|
703
|
|
|
|
|
|
|
} else { |
|
704
|
0
|
|
|
|
|
0
|
$child->write(":stay_alive\r\n"); |
|
705
|
|
|
|
|
|
|
} |
|
706
|
|
|
|
|
|
|
|
|
707
|
|
|
|
|
|
|
} elsif ($cmd eq ":still_alive") { |
|
708
|
|
|
|
|
|
|
# a no-op |
|
709
|
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
} elsif ($cmd =~ /^:monitor_events/) { |
|
711
|
|
|
|
|
|
|
# Apply the state locally, so when we fork children they have a |
|
712
|
|
|
|
|
|
|
# pre-parsed factory. |
|
713
|
|
|
|
|
|
|
# We do not replay the events back to where it came, since this |
|
714
|
|
|
|
|
|
|
# severely impacts startup performance for instances with several |
|
715
|
|
|
|
|
|
|
# thousand domains, classes, hosts or devices. |
|
716
|
0
|
|
|
|
|
0
|
apply_state_events(\$cmd); |
|
717
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->send_to_all_children($cmd, $child); |
|
718
|
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
} elsif ($cmd eq ":monitor_just_ran") { |
|
720
|
0
|
|
|
|
|
0
|
send_monitor_has_run($child); |
|
721
|
|
|
|
|
|
|
|
|
722
|
|
|
|
|
|
|
} elsif ($cmd =~ /^:wake_a (\w+)$/) { |
|
723
|
|
|
|
|
|
|
|
|
724
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->wake_a($1, $child); |
|
725
|
|
|
|
|
|
|
} elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) { |
|
726
|
|
|
|
|
|
|
# and this will rebroadcast it to all other children |
|
727
|
|
|
|
|
|
|
# (including the one that just set it to us, but eh) |
|
728
|
0
|
|
|
|
|
0
|
MogileFS::Config->set_config($1, $2); |
|
729
|
|
|
|
|
|
|
} elsif ($cmd =~ /^:refresh_monitor$/) { |
|
730
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd); |
|
731
|
|
|
|
|
|
|
} else { |
|
732
|
|
|
|
|
|
|
# unknown command |
|
733
|
0
|
|
|
|
|
0
|
my $show = $cmd; |
|
734
|
0
|
0
|
|
|
|
0
|
$show = substr($show, 0, 80) . "..." if length $cmd > 80; |
|
735
|
0
|
|
|
|
|
0
|
Mgd::error("Unknown command [$show] from child; job=" . $child->job); |
|
736
|
|
|
|
|
|
|
} |
|
737
|
|
|
|
|
|
|
} |
|
738
|
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
# Class method. |
|
740
|
|
|
|
|
|
|
# ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ]) |
|
741
|
|
|
|
|
|
|
# given a job class, and a message, send it to all children of that job. returns |
|
742
|
|
|
|
|
|
|
# the number of children the message was sent to. |
|
743
|
|
|
|
|
|
|
# |
|
744
|
|
|
|
|
|
|
# if child is specified, the message will be sent to members of the job class that |
|
745
|
|
|
|
|
|
|
# aren't that child. so you can exclude the one that originated the message. |
|
746
|
|
|
|
|
|
|
# |
|
747
|
|
|
|
|
|
|
# doesn't add to queue of things child gets on next interactive command: writes immediately |
|
748
|
|
|
|
|
|
|
# (won't get in middle of partial write, though, as danga::socket queues things up) |
|
749
|
|
|
|
|
|
|
# |
|
750
|
|
|
|
|
|
|
# if $just_one is specified, only a single process is notified, then we stop. |
|
751
|
|
|
|
|
|
|
sub ImmediateSendToChildrenByJob { |
|
752
|
0
|
|
|
0
|
0
|
0
|
my ($pkg, $class, $msg, $exclude_child, $just_one) = @_; |
|
753
|
|
|
|
|
|
|
|
|
754
|
0
|
|
|
|
|
0
|
my $childref = $ChildrenByJob{$class}; |
|
755
|
0
|
0
|
0
|
|
|
0
|
return 0 unless defined $childref && %$childref; |
|
756
|
|
|
|
|
|
|
|
|
757
|
0
|
|
|
|
|
0
|
foreach my $child (values %$childref) { |
|
758
|
|
|
|
|
|
|
# ignore the child specified as the third arg if one is sent |
|
759
|
0
|
0
|
0
|
|
|
0
|
next if $exclude_child && $exclude_child == $child; |
|
760
|
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
# send the message to this child |
|
762
|
0
|
|
|
|
|
0
|
$child->write("$msg\r\n"); |
|
763
|
0
|
0
|
|
|
|
0
|
return 1 if $just_one; |
|
764
|
|
|
|
|
|
|
} |
|
765
|
0
|
|
|
|
|
0
|
return scalar(keys %$childref); |
|
766
|
|
|
|
|
|
|
} |
|
767
|
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
# called when we notice that a worker has bit it. we might have to restart a |
|
769
|
|
|
|
|
|
|
# job that they had been working on. |
|
770
|
|
|
|
|
|
|
sub NoteDeadWorkerConn { |
|
771
|
0
|
0
|
|
0
|
0
|
0
|
return if $IsChild; |
|
772
|
|
|
|
|
|
|
|
|
773
|
|
|
|
|
|
|
# get parms and error check |
|
774
|
0
|
|
|
|
|
0
|
my MogileFS::Connection::Worker $worker = $_[1]; |
|
775
|
0
|
0
|
|
|
|
0
|
return unless $worker; |
|
776
|
|
|
|
|
|
|
|
|
777
|
0
|
|
|
|
|
0
|
my $fd = $worker->{fd}; |
|
778
|
0
|
0
|
|
|
|
0
|
return unless defined($fd); |
|
779
|
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
# if there's a mapping for this worker's fd, they had a job that didn't get done |
|
781
|
0
|
0
|
|
|
|
0
|
if ($Mappings{$fd}) { |
|
782
|
|
|
|
|
|
|
# unshift, since this one already went through the queue once |
|
783
|
0
|
|
|
|
|
0
|
unshift @PendingQueries, $Mappings{$worker->{fd}}; |
|
784
|
0
|
|
|
|
|
0
|
delete $Mappings{$worker->{fd}}; |
|
785
|
|
|
|
|
|
|
|
|
786
|
|
|
|
|
|
|
# now try to get it processing again |
|
787
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->ProcessQueues; |
|
788
|
|
|
|
|
|
|
} |
|
789
|
|
|
|
|
|
|
} |
|
790
|
|
|
|
|
|
|
|
|
791
|
|
|
|
|
|
|
# given (job, pid), record that this worker is about to die |
|
792
|
|
|
|
|
|
|
# $level is so we can tell if watchdog requested the death. |
|
793
|
|
|
|
|
|
|
sub note_pending_death { |
|
794
|
0
|
|
|
0
|
0
|
0
|
my ($job, $pid, $level) = @_; |
|
795
|
|
|
|
|
|
|
|
|
796
|
0
|
0
|
|
|
|
0
|
die "$job not defined in call to note_pending_death.\n" |
|
797
|
|
|
|
|
|
|
unless defined $jobs{$job}; |
|
798
|
|
|
|
|
|
|
|
|
799
|
0
|
|
0
|
|
|
0
|
$level ||= 1; |
|
800
|
|
|
|
|
|
|
# don't double decrement. |
|
801
|
0
|
0
|
|
|
|
0
|
$jobs{$job}->[1]-- unless $todie{$pid}; |
|
802
|
0
|
|
|
|
|
0
|
$todie{$pid} = $level; |
|
803
|
|
|
|
|
|
|
} |
|
804
|
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
# see if we should reduce the number of active children |
|
806
|
|
|
|
|
|
|
sub job_needs_reduction { |
|
807
|
0
|
|
|
0
|
0
|
0
|
my $job = shift; |
|
808
|
0
|
|
|
|
|
0
|
my $q; |
|
809
|
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
# drop job_master-dependent workers if there is no job_master and no |
|
811
|
|
|
|
|
|
|
# previously queued work |
|
812
|
0
|
0
|
0
|
|
|
0
|
if (!$want_job_master && $needs_job_master{$job} |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
813
|
|
|
|
|
|
|
&& $jobs{job_master}->[1] == 0 # check if job_master is really dead |
|
814
|
|
|
|
|
|
|
&& (($q = $pending_work{$job}) && !@$q || !$q)) { |
|
815
|
0
|
|
|
|
|
0
|
return 1; |
|
816
|
|
|
|
|
|
|
} |
|
817
|
|
|
|
|
|
|
|
|
818
|
0
|
|
|
|
|
0
|
return $jobs{$job}->[0] < $jobs{$job}->[1]; |
|
819
|
|
|
|
|
|
|
} |
|
820
|
|
|
|
|
|
|
|
|
821
|
|
|
|
|
|
|
sub is_child { |
|
822
|
51
|
|
|
51
|
0
|
579
|
return $IsChild; |
|
823
|
|
|
|
|
|
|
} |
|
824
|
|
|
|
|
|
|
|
|
825
|
|
|
|
|
|
|
sub wake_a { |
|
826
|
0
|
|
|
0
|
0
|
0
|
my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it) |
|
827
|
0
|
|
|
|
|
0
|
my $child = MogileFS::ProcManager->is_child; |
|
828
|
0
|
0
|
|
|
|
0
|
if ($child) { |
|
829
|
0
|
|
|
|
|
0
|
$child->wake_a($class); |
|
830
|
|
|
|
|
|
|
} else { |
|
831
|
0
|
|
|
|
|
0
|
MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one"); |
|
832
|
|
|
|
|
|
|
} |
|
833
|
|
|
|
|
|
|
} |
|
834
|
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
sub send_to_all_children { |
|
836
|
47
|
|
|
47
|
0
|
891
|
my ($pkg, $msg, $exclude) = @_; |
|
837
|
47
|
|
|
|
|
278
|
foreach my $child (values %child) { |
|
838
|
0
|
0
|
0
|
|
|
|
next if $exclude && $child == $exclude; |
|
839
|
0
|
|
|
|
|
|
$child->write($msg . "\r\n"); |
|
840
|
|
|
|
|
|
|
} |
|
841
|
|
|
|
|
|
|
} |
|
842
|
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
sub send_monitor_has_run { |
|
844
|
0
|
|
|
0
|
0
|
|
my $child = shift; |
|
845
|
|
|
|
|
|
|
# Gas up other workers if monitor's completed for the first time. |
|
846
|
0
|
0
|
|
|
|
|
if (! $monitor_good) { |
|
847
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs')); |
|
848
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs')); |
|
849
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs')); |
|
850
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs')); |
|
851
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('fsck' => MogileFS->config('fsck_jobs')); |
|
852
|
|
|
|
|
|
|
|
|
853
|
|
|
|
|
|
|
# only one job_master at most |
|
854
|
0
|
|
|
|
|
|
$want_job_master = !!MogileFS->config('job_master'); |
|
855
|
0
|
|
|
|
|
|
MogileFS::ProcManager->set_min_workers('job_master' => $want_job_master); |
|
856
|
|
|
|
|
|
|
|
|
857
|
0
|
|
|
|
|
|
$monitor_good = 1; |
|
858
|
0
|
|
|
|
|
|
$allkidsup = 0; |
|
859
|
|
|
|
|
|
|
} |
|
860
|
0
|
|
|
|
|
|
for my $type (qw(queryworker)) { |
|
861
|
0
|
|
|
|
|
|
MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child); |
|
862
|
|
|
|
|
|
|
} |
|
863
|
|
|
|
|
|
|
} |
|
864
|
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
1; |
|
866
|
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
# Local Variables: |
|
868
|
|
|
|
|
|
|
# mode: perl |
|
869
|
|
|
|
|
|
|
# c-basic-indent: 4 |
|
870
|
|
|
|
|
|
|
# indent-tabs-mode: nil |
|
871
|
|
|
|
|
|
|
# End: |