File Coverage

blib/lib/MogileFS/ProcManager.pm
Criterion Covered Total %
statement 28 359 7.8
branch 1 170 0.5
condition 0 72 0.0
subroutine 11 55 20.0
pod 0 46 0.0
total 40 702 5.7


line stmt bran cond sub pod time code
1             package MogileFS::ProcManager;
2 21     21   213 use strict;
  21         44  
  21         12895  
3 21     21   135 use warnings;
  21         43  
  21         1239  
4 21     21   573 use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
  21         48  
  21         234  
5 21     21   7923 use Symbol;
  21         40  
  21         1454  
6 21     21   131 use Socket;
  21         33  
  21         18663  
7 21     21   15849 use MogileFS::Connection::Client;
  21         80  
  21         748  
8 21     21   25473 use MogileFS::Connection::Worker;
  21         235  
  21         692  
9 21     21   162 use MogileFS::Util qw(apply_state_events);
  21         45  
  21         185630  
10              
11             # This class handles keeping lists of workers and clients and
12             # assigning them to each other when things happen. You don't actually
13             # instantiate a procmanager. the class itself holds all state.
14              
15             # Mappings: fd => [ clientref, jobstring, starttime ]
16             # queues are just lists of Client class objects
17             # ChildrenByJob: job => { pid => $client }
18             # ErrorsTo: fid => Client
19             # RecentQueries: [ string, string, string, ... ]
20             # Stats: element => number
21             our ($IsChild, @RecentQueries,
22             %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
23              
24             our $starttime = time(); # time we got going
25 0     0 0 0 sub server_starttime { return $starttime }
26              
27             my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...)
28             my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ]
29              
30             my %idle_workers = (); # 'job' -> {href of idle workers}
31             my %pending_work = (); # 'job' -> [aref of pending work]
32              
33             $IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object
34              
35             # keep track of what all child pids are doing, and what jobs are being
36             # satisifed.
37             my %child = (); # pid -> MogileFS::Connection::Worker
38             my %todie = (); # pid -> 1 (lists pids that we've asked to die)
39             my %jobs = (); # jobname -> [ min, current ]
40              
41             # we start job_master after monitor has run, but this avoid undef warning
42             # in job_needs_reduction
43             $jobs{job_master} = [ 0, 0 ];
44              
45             our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
46              
47             my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child
48              
49             *error = \&Mgd::error;
50              
51             my $monitor_good = 0; # ticked after monitor executes once after startup
52              
53             my $nowish; # updated approximately once per second
54              
55             # it's pointless to spawn certain jobs without a job_master
56             my $want_job_master;
57             my %needs_job_master = (
58             delete => 1,
59             fsck => 1,
60             replicate => 1,
61             );
62              
63             sub push_pre_fork_cleanup {
64 0     0 0 0 my ($class, $code) = @_;
65 0         0 push @prefork_cleanup, $code;
66             }
67              
68             sub RecentQueries {
69 0     0 0 0 return @RecentQueries;
70             }
71              
72             sub write_pidfile {
73 0     0 0 0 my $class = shift;
74 0 0       0 my $pidfile = MogileFS->config("pidfile")
75             or return 1;
76 0         0 my $fh;
77 0 0       0 unless (open($fh, ">$pidfile")) {
78 0         0 Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
79 0         0 return 0;
80             }
81 0 0 0     0 unless ((print $fh "$$\n") && close($fh)) {
82 0         0 Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
83 0         0 remove_pidfile();
84 0         0 return 0;
85             }
86 0         0 return 1;
87             }
88              
89             sub remove_pidfile {
90 0     0 0 0 my $class = shift;
91 0 0       0 my $pidfile = MogileFS->config("pidfile")
92             or return;
93 0         0 unlink $pidfile;
94 0         0 return 1;
95             }
96              
97             sub set_min_workers {
98 0     0 0 0 my ($class, $job, $min) = @_;
99 0   0     0 $jobs{$job} ||= [undef, 0]; # [min, current]
100 0         0 $jobs{$job}->[0] = $min;
101              
102             # TODO: set allkipsup false, so spawner re-checks?
103             }
104              
105             sub job_to_class_suffix {
106 0     0 0 0 my ($class, $job) = @_;
107             return {
108 0         0 fsck => "Fsck",
109             queryworker => "Query",
110             delete => "Delete",
111             replicate => "Replicate",
112             reaper => "Reaper",
113             monitor => "Monitor",
114             job_master => "JobMaster",
115             }->{$job};
116             }
117              
118             sub job_to_class {
119 0     0 0 0 my ($class, $job) = @_;
120 0 0       0 my $suffix = $class->job_to_class_suffix($job) or return "";
121 0         0 return "MogileFS::Worker::$suffix";
122             }
123              
124             sub child_pids {
125 0     0 0 0 return keys %child;
126             }
127              
128             sub WatchDog {
129 0     0 0 0 foreach my $pid (keys %child) {
130 0         0 my MogileFS::Connection::Worker $child = $child{$pid};
131 0         0 my $healthy = $child->watchdog_check;
132 0 0       0 next if $healthy;
133              
134             # special $todie level of 2 means the watchdog tried to kill it.
135             # TODO: Should be a CONSTANT?
136 0 0 0     0 next if $todie{$pid} && $todie{$pid} == 2;
137 0         0 note_pending_death($child->job, $pid, 2);
138              
139 0         0 error("Watchdog killing worker $pid (" . $child->job . ")");
140 0         0 kill 9, $pid;
141             }
142             }
143              
144             # returns a sub that Danga::Socket calls after each event loop round.
145             # the sub must return 1 for the program to continue running.
146             sub PostEventLoopChecker {
147 0     0 0 0 my $lastspawntime = 0; # time we last ran spawn_children sub
148              
149             return sub {
150             # run only once per second
151 0     0   0 $nowish = time();
152 0 0       0 return 1 unless $nowish > $lastspawntime;
153 0         0 $lastspawntime = $nowish;
154              
155 0         0 MogileFS::ProcManager->WatchDog;
156              
157             # see if anybody has died, but don't hang up on doing so
158 0         0 while(my $pid = waitpid -1, WNOHANG) {
159 0 0       0 last unless $pid > 0;
160 0         0 $allkidsup = 0; # know something died
161              
162             # when a child dies, figure out what it was doing
163             # and note that job has one less worker
164 0         0 my $jobconn;
165 0 0       0 if (($jobconn = delete $child{$pid})) {
166 0         0 my $job = $jobconn->job;
167 0 0       0 my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
168 0         0 error("Child $pid ($job) died: $? ($extra)");
169 0         0 MogileFS::ProcManager->NoteDeadChild($pid);
170 0         0 $jobconn->close;
171              
172 0 0       0 if (my $jobstat = $jobs{$job}) {
173             # if the pid is in %todie, then we have asked it to shut down
174             # and have already decremented the jobstat counter and don't
175             # want to do it again
176 0 0       0 unless (my $true = delete $todie{$pid}) {
177             # decrement the count of currently running jobs
178 0         0 $jobstat->[1]--;
179             }
180             }
181             }
182             }
183              
184 0 0       0 return 1 if $allkidsup;
185              
186             # foreach job, fork enough children
187 0         0 while (my ($job, $jobstat) = each %jobs) {
188              
189             # do not spawn job_master-dependent workers if we have no job_master
190 0 0 0     0 next if (! $want_job_master && $needs_job_master{$job});
191              
192 0         0 my $need = $jobstat->[0] - $jobstat->[1];
193 0 0       0 if ($need > 0) {
194 0         0 error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
195 0         0 for (1..$need) {
196 0 0       0 my $jobconn = make_new_child($job)
197             or return 1; # basically bail: true value keeps event loop running
198 0         0 $child{$jobconn->pid} = $jobconn;
199              
200             # now increase the count of processes currently doing this job
201 0         0 $jobstat->[1]++;
202             }
203             }
204             }
205              
206             # if we got this far, all jobs have been re-created. note that
207             # so we avoid more CPU usage in this post-event-loop callback later
208 0         0 $allkidsup = 1;
209              
210             # true value keeps us running:
211 0         0 return 1;
212 0         0 };
213             }
214              
215             sub make_new_child {
216 0     0 0 0 my $job = shift;
217              
218 0         0 my $pid;
219             my $sigset;
220              
221             # Ensure our dbh is closed before we fork anything.
222             # Causes problems on some platforms (Solaris+Postgres)
223 0         0 Mgd::close_store();
224              
225             # block signal for fork
226 0         0 $sigset = POSIX::SigSet->new(SIGINT);
227 0 0       0 sigprocmask(SIG_BLOCK, $sigset)
228             or return error("Can't block SIGINT for fork: $!");
229              
230 0 0       0 socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
231             or die( "socketpair failed: $!" );
232              
233 0 0       0 return error("fork failed creating $job: $!")
234             unless defined ($pid = fork);
235              
236             # enable auto-flush, so it's not pipe-buffered between parent/child
237 0         0 select((select( $parents_ipc ), $|++)[0]);
238 0         0 select((select( $childs_ipc ), $|++)[0]);
239              
240             # if i'm the parent
241 0 0       0 if ($pid) {
242 0 0       0 sigprocmask(SIG_UNBLOCK, $sigset)
243             or return error("Can't unblock SIGINT for fork: $!");
244              
245 0         0 close($childs_ipc); # unnecessary but explicit
246 0         0 IO::Handle::blocking($parents_ipc, 0);
247              
248 0         0 my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
249 0         0 $worker_conn->pid($pid);
250 0         0 $worker_conn->job($job);
251 0         0 MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
252 0         0 return $worker_conn;
253             }
254              
255             # let children have different random number seeds
256 0         0 srand();
257              
258             # as a child, we want to close these and ignore them
259 0         0 $_->() foreach @prefork_cleanup;
260 0         0 close($parents_ipc);
261 0         0 undef $parents_ipc;
262              
263 0         0 $SIG{INT} = 'DEFAULT';
264 0         0 $SIG{TERM} = 'DEFAULT';
265 0         0 $0 .= " [$job]";
266              
267             # unblock signals
268 0 0       0 sigprocmask(SIG_UNBLOCK, $sigset)
269             or return error("Can't unblock SIGINT for fork: $!");
270              
271             # now call our job function
272 0 0       0 my $class = MogileFS::ProcManager->job_to_class($job)
273             or die "No worker class defined for job '$job'\n";
274 0         0 my $worker = $class->new($childs_ipc);
275              
276             # set our frontend into child mode
277 0         0 MogileFS::ProcManager->SetAsChild($worker);
278              
279 0         0 $worker->work;
280 0         0 exit 0;
281             }
282              
283             sub PendingQueryCount {
284 0     0 0 0 return scalar @PendingQueries;
285             }
286              
287             sub BoredQueryWorkerCount {
288 0     0 0 0 return scalar @IdleQueryWorkers;
289             }
290              
291             sub QueriesInProgressCount {
292 0     0 0 0 return scalar keys %Mappings;
293             }
294              
295             # Toss in any queue depths.
296             sub StatsHash {
297 0     0 0 0 for my $job (keys %pending_work) {
298 0         0 $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
  0         0  
299             }
300 0         0 return \%Stats;
301             }
302              
303             sub foreach_job {
304 0     0 0 0 my ($class, $cb) = @_;
305 0         0 foreach my $job (sort keys %ChildrenByJob) {
306 0         0 my $ct = scalar(keys %{$ChildrenByJob{$job}});
  0         0  
307 0         0 $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
  0         0  
  0         0  
308             }
309             }
310              
311             sub foreach_pending_query {
312 0     0 0 0 my ($class, $cb) = @_;
313 0         0 foreach my $clq (@PendingQueries) {
314 0         0 $cb->($clq->[0], # client object,
315             $clq->[1], # "$ip $query"
316             );
317             }
318             }
319              
320             sub is_monitor_good {
321 0     0 0 0 return $monitor_good;
322             }
323              
324             sub is_valid_job {
325 0     0 0 0 my ($class, $job) = @_;
326 0         0 return defined $jobs{$job};
327             }
328              
329             sub valid_jobs {
330 0     0 0 0 return sort keys %jobs;
331             }
332              
333             sub request_job_process {
334 0     0 0 0 my ($class, $job, $n) = @_;
335 0 0       0 return 0 unless $class->is_valid_job($job);
336 0 0 0     0 return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case
337              
338 0 0       0 $want_job_master = $n if ($job eq "job_master");
339              
340 0         0 $jobs{$job}->[0] = $n;
341 0         0 $allkidsup = 0;
342              
343             # try to clean out the queryworkers (if that's what we're doing?)
344 0 0       0 MogileFS::ProcManager->CullQueryWorkers
345             if $job eq 'queryworker';
346              
347             # other workers listening off of a queue should be pinging parent
348             # frequently. shouldn't explicitly kill them.
349             }
350              
351              
352             # when a child is spawned, they'll have copies of all the data from the
353             # parent, but they don't need it. this method is called when you want
354             # to indicate that this procmanager is running on a child and should clean.
355             sub SetAsChild {
356 0     0 0 0 my ($class, $worker) = @_;
357              
358 0         0 @IdleQueryWorkers = ();
359 0         0 @PendingQueries = ();
360 0         0 %Mappings = ();
361 0         0 $IsChild = $worker;
362 0         0 %ErrorsTo = ();
363 0         0 %idle_workers = ();
364 0         0 %pending_work = ();
365 0         0 %ChildrenByJob = ();
366 0         0 %child = ();
367 0         0 %todie = ();
368 0         0 %jobs = ();
369              
370             # we just forked from our parent process, also using Danga::Socket,
371             # so we need to lose all that state and start afresh.
372 0         0 Danga::Socket->Reset;
373             }
374              
375             # called when a child has died. a child is someone doing a job for us,
376             # but it might be a queryworker or any other type of job. we just want
377             # to remove them from our list of children. they're actually respawned
378             # by the make_new_child function elsewhere in Mgd.
379             sub NoteDeadChild {
380 0     0 0 0 my $pid = $_[1];
381 0         0 foreach my $job (keys %ChildrenByJob) {
382             return if # bail out if we actually delete one
383 0 0       0 delete $ChildrenByJob{$job}->{$pid};
384             }
385             }
386              
387             # called when a client dies. clients are users, management or non.
388             # we just want to remove them from the error reporting interface, if
389             # they happen to be part of it.
390             sub NoteDeadClient {
391 0     0 0 0 my $client = $_[1];
392 0         0 delete $ErrorsTo{$client->{fd}};
393             }
394              
395             # called when the error function in Mgd is called and we're in the parent,
396             # so it's pretty simple that basically we just spit it out to folks listening
397             # to errors
398             sub NoteError {
399 4 50   4 0 38 return unless %ErrorsTo;
400              
401 0         0 my $msg = ":: ${$_[1]}\r\n";
  0         0  
402 0         0 foreach my $client (values %ErrorsTo) {
403 0         0 $client->write(\$msg);
404             }
405             }
406              
407             sub RemoveErrorWatcher {
408 0     0 0 0 my ($class, $client) = @_;
409 0         0 return delete $ErrorsTo{$client->{fd}};
410             }
411              
412             sub AddErrorWatcher {
413 0     0 0 0 my ($class, $client) = @_;
414 0         0 $ErrorsTo{$client->{fd}} = $client;
415             }
416              
417             # one-time initialization of a new worker connection
418             sub RegisterWorkerConn {
419 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
420 0         0 $worker->watch_read(1);
421              
422             #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
423              
424             # now do any special case startup
425 0 0       0 if ($worker->job eq 'queryworker') {
426 0         0 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
427             }
428              
429             # add to normal list
430 0         0 $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
431              
432             }
433              
434             sub EnqueueCommandRequest {
435 0     0 0 0 my ($class, $line, $client) = @_;
436 0   0     0 push @PendingQueries, [
437             $client,
438             ($client->peer_ip_string || '0.0.0.0') . " $line"
439             ];
440 0         0 MogileFS::ProcManager->ProcessQueues;
441 0 0       0 if (@PendingQueries) {
442             # Don't like the name. Feel free to change if you find better.
443 0         0 $Stats{times_out_of_qworkers}++;
444             }
445             }
446              
447             # puts a worker back in the queue, deleting any outstanding jobs in
448             # the mapping list for this fd.
449             sub NoteIdleQueryWorker {
450             # first arg is class, second is worker
451 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
452 0         0 delete $Mappings{$worker->{fd}};
453              
454             # see if we need to kill off some workers
455 0 0       0 if (job_needs_reduction('queryworker')) {
456 0         0 Mgd::error("Reducing queryworker headcount by 1.");
457 0         0 MogileFS::ProcManager->AskWorkerToDie($worker);
458 0         0 return;
459             }
460              
461             # must be okay, so put it in the queue
462 0         0 push @IdleQueryWorkers, $worker;
463 0         0 MogileFS::ProcManager->ProcessQueues;
464             }
465              
466             # if we need to kill off a worker, this function takes in the WorkerConn
467             # object, tells it to die, marks us as having requested its death, and decrements
468             # the count of running jobs.
469             sub AskWorkerToDie {
470 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
471 0         0 note_pending_death($worker->job, $worker->pid);
472 0         0 $worker->write(":shutdown\r\n");
473             }
474              
475             # kill bored query workers so we can get down to the level requested. this
476             # continues killing until we run out of folks to kill.
477             sub CullQueryWorkers {
478 0   0 0 0 0 while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
479 0         0 my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
480 0         0 MogileFS::ProcManager->AskWorkerToDie($worker);
481             }
482             }
483              
484             # called when we get a response from a worker. this reenqueues the
485             # worker so it can handle another response as well as passes the answer
486             # back on to the client.
487             sub HandleQueryWorkerResponse {
488             # got a response from a worker
489 0     0 0 0 my MogileFS::Connection::Worker $worker;
490             my $line;
491 0         0 (undef, $worker, $line) = @_;
492              
493 0 0       0 return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
494 0 0 0     0 return unless $worker && $Mappings{$worker->{fd}};
495              
496             # get the client we're working with (if any)
497 0         0 my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
  0         0  
498              
499             # if we have no client, then we just got a standard message from
500             # the queryworker and need to pass it up the line
501 0 0       0 return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
502              
503             # at this point it was a command response, but if the client has gone
504             # away, just reenqueue this query worker
505 0 0       0 return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
506              
507             # [client-side time to complete]
508 0         0 my ($time, $id, $res);
509 0 0       0 if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
510             # save time and response for use later
511             # Note the optional negative sign in the regexp. Somebody
512             # on the mailing list was getting a time of -0.0000, causing
513             # broken connections.
514 0         0 ($id, $time, $res) = ($1, $2, $3);
515             }
516              
517             # now, if it doesn't match
518 0 0 0     0 unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
519 0 0       0 $id = "" unless defined $id;
520 0 0       0 $line = "" unless defined $line;
521 0         0 $line =~ s/\n/\\n/g;
522 0         0 $line =~ s/\r/\\r/g;
523 0         0 Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
524 0         0 $client->close('worker_mismatch');
525 0         0 return MogileFS::ProcManager->AskWorkerToDie($worker);
526             }
527              
528             # now time this interval and add to @RecentQueries
529 0         0 my $tinterval = Time::HiRes::time() - $starttime;
530 0         0 push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
531 0 0       0 shift @RecentQueries if scalar(@RecentQueries) > 50;
532              
533             # send text to client, put worker back in queue
534 0         0 $client->write("$res\r\n");
535 0         0 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
536             }
537              
538             # new per-worker magic internal queue runner.
539             # TODO: Since this fires only when a master asks or a worker reports
540             # in bored, it should just operate on that *one* queue?
541             #
542             # new change: if worker in $job, but not in _bored, do not send work.
543             # if work is received, only delete from _bored
544             sub process_worker_queues {
545 0 0   0 0 0 return if $IsChild;
546              
547 0         0 JOB: while (my ($job, $queue) = each %pending_work) {
548 0 0       0 next JOB unless @$queue;
549 0 0 0     0 next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
  0         0  
550 0         0 WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
  0         0  
551 0         0 my MogileFS::Connection::Worker $worker =
552             delete $idle_workers{_bored}->{$worker_key};
553 0 0 0     0 if (!defined $worker || $worker->{closed}) {
554 0         0 delete $idle_workers{$job}->{$worker_key};
555 0         0 next WORKER;
556             }
557              
558             # allow workers to grab a linear range of work.
559 0   0     0 while (@$queue && $worker->wants_todo($job)) {
560 0         0 $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
561 0         0 $Stats{'work_sent_to_' . $job}++;
562             }
563 0 0       0 next JOB unless @$queue;
564             }
565             }
566             }
567              
568             # called from various spots to empty the queues of available pairs.
569             sub ProcessQueues {
570 0 0   0 0 0 return if $IsChild;
571              
572             # try to match up a client with a worker
573 0   0     0 while (@IdleQueryWorkers && @PendingQueries) {
574             # get client that isn't closed
575 0         0 my $clref;
576 0   0     0 while (!$clref && @PendingQueries) {
577 0 0       0 $clref = shift @PendingQueries
578             or next;
579 0 0       0 if ($clref->[0]->{closed}) {
580 0         0 $clref = undef;
581 0         0 next;
582             }
583             }
584 0 0       0 next unless $clref;
585              
586             # get worker and make sure it's not closed already
587 0         0 my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers;
588 0 0 0     0 if (!defined $worker || $worker->{closed}) {
589 0         0 unshift @PendingQueries, $clref;
590 0         0 next;
591             }
592              
593             # put in mapping and send data to worker
594 0         0 push @$clref, Time::HiRes::time();
595 0         0 $Mappings{$worker->{fd}} = $clref;
596 0         0 $Stats{queries}++;
597              
598             # increment our counter so we know what request counter this is going out
599 0         0 $worker->{reqid}++;
600             # so we're writing a string of the form:
601             # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
602 0         0 $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
603             }
604             }
605              
606             # send short descriptions of commands we support to the user
607             sub SendHelp {
608 0     0 0 0 my $client = $_[1];
609              
610             # send general purpose help
611 0         0 $client->write(<
612             Mogilefsd admin commands:
613              
614             !version Server version
615             !recent Recently executed queries and how long they took.
616             !queue Queries that are pending execution.
617             !stats General stats on what we\'re up to.
618             !watch Observe errors/messages from children.
619             !jobs Outstanding job counts, desired level, and pids.
620             !shutdown Immediately kill all of mogilefsd.
621              
622             !to
623             Send to all workers of .
624             Mostly used for debugging.
625              
626             !want
627             Alter the level of workers of this class desired.
628             Example: !want 20 queryworker, !want 3 replicate.
629             See !jobs for what jobs are available.
630              
631             HELP
632              
633             }
634              
635             # a child has contacted us with some command/status/something.
636             sub HandleChildRequest {
637 0 0   0 0 0 if ($IsChild) {
638 0         0 Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
639             }
640              
641             # if they have no job set, then their first line is what job they are
642             # and not a command. they also specify their pid, just so we know what
643             # connection goes with what pid, in case it's ever useful information.
644 0         0 my MogileFS::Connection::Worker $child = $_[1];
645 0         0 my $cmd = $_[2];
646              
647 0 0       0 die "Child $child with no pid?" unless $child->job;
648              
649             # at this point we've got a command of some sort
650 0 0       0 if ($cmd =~ /^error (.+)$/i) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
651             # pass it on to our error handler, prefaced with the child's job
652 0         0 Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
653              
654             } elsif ($cmd =~ /^debug (.+)$/i) {
655             # pass it on to our error handler, prefaced with the child's job
656 0         0 Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
657              
658             } elsif ($cmd =~ /^queue_depth (\w+)/) {
659 0         0 my $job = $1;
660 0 0       0 if ($job eq 'all') {
661 0         0 for my $qname (keys %pending_work) {
662 0         0 my $depth = @{$pending_work{$qname}};
  0         0  
663 0         0 $child->write(":queue_depth $qname $depth\r\n");
664             }
665             } else {
666 0         0 my $depth = 0;
667 0 0       0 if ($pending_work{$job}) {
668 0         0 $depth = @{$pending_work{$job}};
  0         0  
669             }
670 0         0 $child->write(":queue_depth $job $depth\r\n");
671             }
672 0         0 MogileFS::ProcManager->process_worker_queues;
673             } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
674 0         0 my $job = $1;
675 0   0     0 $pending_work{$job} ||= [];
676 0         0 push(@{$pending_work{$job}}, $2);
  0         0  
677             # Don't process queues immediately, to allow batch processing.
678             } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
679 0         0 my $batch = $1;
680 0         0 my $types = $2;
681 0 0       0 if (job_needs_reduction($child->job)) {
682 0         0 MogileFS::ProcManager->AskWorkerToDie($child);
683             } else {
684 0 0       0 unless (exists $idle_workers{$child->job}) {
685 0         0 $idle_workers{$child->job} = {};
686             }
687 0   0     0 $idle_workers{_bored} ||= {};
688 0         0 $idle_workers{_bored}->{$child} = $child;
689 0         0 for my $type (split(/\s+/, $types)) {
690 0   0     0 $idle_workers{$type} ||= {};
691 0         0 $idle_workers{$type}->{$child}++;
692 0         0 $child->wants_todo($type, $batch);
693             }
694 0         0 MogileFS::ProcManager->process_worker_queues;
695             }
696             } elsif ($cmd eq ":ping") {
697              
698             # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
699              
700             # this command expects a reply, either to die or stay alive. beginning of worker's loops
701 0 0       0 if (job_needs_reduction($child->job)) {
702 0         0 MogileFS::ProcManager->AskWorkerToDie($child);
703             } else {
704 0         0 $child->write(":stay_alive\r\n");
705             }
706              
707             } elsif ($cmd eq ":still_alive") {
708             # a no-op
709              
710             } elsif ($cmd =~ /^:monitor_events/) {
711             # Apply the state locally, so when we fork children they have a
712             # pre-parsed factory.
713             # We do not replay the events back to where it came, since this
714             # severely impacts startup performance for instances with several
715             # thousand domains, classes, hosts or devices.
716 0         0 apply_state_events(\$cmd);
717 0         0 MogileFS::ProcManager->send_to_all_children($cmd, $child);
718              
719             } elsif ($cmd eq ":monitor_just_ran") {
720 0         0 send_monitor_has_run($child);
721              
722             } elsif ($cmd =~ /^:wake_a (\w+)$/) {
723              
724 0         0 MogileFS::ProcManager->wake_a($1, $child);
725             } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
726             # and this will rebroadcast it to all other children
727             # (including the one that just set it to us, but eh)
728 0         0 MogileFS::Config->set_config($1, $2);
729             } elsif ($cmd =~ /^:refresh_monitor$/) {
730 0         0 MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd);
731             } else {
732             # unknown command
733 0         0 my $show = $cmd;
734 0 0       0 $show = substr($show, 0, 80) . "..." if length $cmd > 80;
735 0         0 Mgd::error("Unknown command [$show] from child; job=" . $child->job);
736             }
737             }
738              
739             # Class method.
740             # ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
741             # given a job class, and a message, send it to all children of that job. returns
742             # the number of children the message was sent to.
743             #
744             # if child is specified, the message will be sent to members of the job class that
745             # aren't that child. so you can exclude the one that originated the message.
746             #
747             # doesn't add to queue of things child gets on next interactive command: writes immediately
748             # (won't get in middle of partial write, though, as danga::socket queues things up)
749             #
750             # if $just_one is specified, only a single process is notified, then we stop.
751             sub ImmediateSendToChildrenByJob {
752 0     0 0 0 my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
753              
754 0         0 my $childref = $ChildrenByJob{$class};
755 0 0 0     0 return 0 unless defined $childref && %$childref;
756              
757 0         0 foreach my $child (values %$childref) {
758             # ignore the child specified as the third arg if one is sent
759 0 0 0     0 next if $exclude_child && $exclude_child == $child;
760              
761             # send the message to this child
762 0         0 $child->write("$msg\r\n");
763 0 0       0 return 1 if $just_one;
764             }
765 0         0 return scalar(keys %$childref);
766             }
767              
768             # called when we notice that a worker has bit it. we might have to restart a
769             # job that they had been working on.
770             sub NoteDeadWorkerConn {
771 0 0   0 0 0 return if $IsChild;
772              
773             # get parms and error check
774 0         0 my MogileFS::Connection::Worker $worker = $_[1];
775 0 0       0 return unless $worker;
776              
777 0         0 my $fd = $worker->{fd};
778 0 0       0 return unless defined($fd);
779              
780             # if there's a mapping for this worker's fd, they had a job that didn't get done
781 0 0       0 if ($Mappings{$fd}) {
782             # unshift, since this one already went through the queue once
783 0         0 unshift @PendingQueries, $Mappings{$worker->{fd}};
784 0         0 delete $Mappings{$worker->{fd}};
785              
786             # now try to get it processing again
787 0         0 MogileFS::ProcManager->ProcessQueues;
788             }
789             }
790              
791             # given (job, pid), record that this worker is about to die
792             # $level is so we can tell if watchdog requested the death.
793             sub note_pending_death {
794 0     0 0 0 my ($job, $pid, $level) = @_;
795              
796 0 0       0 die "$job not defined in call to note_pending_death.\n"
797             unless defined $jobs{$job};
798              
799 0   0     0 $level ||= 1;
800             # don't double decrement.
801 0 0       0 $jobs{$job}->[1]-- unless $todie{$pid};
802 0         0 $todie{$pid} = $level;
803             }
804              
805             # see if we should reduce the number of active children
806             sub job_needs_reduction {
807 0     0 0 0 my $job = shift;
808 0         0 my $q;
809              
810             # drop job_master-dependent workers if there is no job_master and no
811             # previously queued work
812 0 0 0     0 if (!$want_job_master && $needs_job_master{$job}
      0        
      0        
      0        
813             && $jobs{job_master}->[1] == 0 # check if job_master is really dead
814             && (($q = $pending_work{$job}) && !@$q || !$q)) {
815 0         0 return 1;
816             }
817              
818 0         0 return $jobs{$job}->[0] < $jobs{$job}->[1];
819             }
820              
821             sub is_child {
822 51     51 0 579 return $IsChild;
823             }
824              
825             sub wake_a {
826 0     0 0 0 my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it)
827 0         0 my $child = MogileFS::ProcManager->is_child;
828 0 0       0 if ($child) {
829 0         0 $child->wake_a($class);
830             } else {
831 0         0 MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
832             }
833             }
834              
835             sub send_to_all_children {
836 47     47 0 891 my ($pkg, $msg, $exclude) = @_;
837 47         278 foreach my $child (values %child) {
838 0 0 0       next if $exclude && $child == $exclude;
839 0           $child->write($msg . "\r\n");
840             }
841             }
842              
843             sub send_monitor_has_run {
844 0     0 0   my $child = shift;
845             # Gas up other workers if monitor's completed for the first time.
846 0 0         if (! $monitor_good) {
847 0           MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
848 0           MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs'));
849 0           MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs'));
850 0           MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs'));
851 0           MogileFS::ProcManager->set_min_workers('fsck' => MogileFS->config('fsck_jobs'));
852              
853             # only one job_master at most
854 0           $want_job_master = !!MogileFS->config('job_master');
855 0           MogileFS::ProcManager->set_min_workers('job_master' => $want_job_master);
856              
857 0           $monitor_good = 1;
858 0           $allkidsup = 0;
859             }
860 0           for my $type (qw(queryworker)) {
861 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
862             }
863             }
864              
865             1;
866              
867             # Local Variables:
868             # mode: perl
869             # c-basic-indent: 4
870             # indent-tabs-mode: nil
871             # End: