File Coverage

blib/lib/Gearman/Server/Client.pm
Criterion Covered Total %
statement 24 386 6.2
branch 0 96 0.0
condition 0 35 0.0
subroutine 8 51 15.6
pod 6 38 15.7
total 38 606 6.2


line stmt bran cond sub pod time code
1             package Gearman::Server::Client;
2 1     1   3 use version;
  1         1  
  1         5  
3             $Gearman::Server::Client::VERSION = qv("v1.130.1");
4              
5 1     1   52 use strict;
  1         1  
  1         18  
6 1     1   3 use warnings;
  1         1  
  1         19  
7              
8             =head1 NAME
9              
10             Gearman::Server::Client - client for gearmand
11              
12             =head1 NAME
13              
14             Used by L to instantiate connections from clients.
15             Clients speak either a binary protocol, for normal operation (calling
16             functions, grabbing function call requests, returning function values,
17             etc), or a text-based line protocol, for relatively rare
18             administrative / monitoring commands.
19              
20             The binary protocol commands aren't currently documented. (FIXME) But
21             they're well-implemented in L, L,
22             and L, if that's any consolation.
23              
24             The line-based administrative commands are documented below.
25              
26             =cut
27              
28 1     1   496 use Danga::Socket;
  1         12853  
  1         27  
29 1     1   4 use base 'Danga::Socket';
  1         2  
  1         152  
30             use fields (
31              
32             # { $job_name => $timeout } $timeout can be undef indicating no timeout
33 1         4 'can_do',
34             'can_do_list',
35             'can_do_iter',
36             'fast_read',
37             'fast_buffer',
38             'read_buf',
39              
40             # 0/1: they've said they're sleeping and we haven't woken them up
41             'sleeping',
42              
43             # Timer for job cancellation
44             'timer',
45              
46             # { $job_handle => Job }
47             'doing',
48              
49             # opaque string, no whitespace. workers give this so checker scripts
50             # can tell apart the same worker connected to multiple jobservers.
51             'client_id',
52              
53             # pointer up to client's server
54             'server',
55             'options',
56             'jobs_done_since_sleep',
57 1     1   5 );
  1         1  
58              
59             # 60k read buffer default, similar to perlbal's backend read.
60 1     1   71 use constant READ_SIZE => 60 * 1024;
  1         1  
  1         49  
61 1     1   3 use constant MAX_READ_SIZE => 512 * 1024;
  1         1  
  1         2795  
62              
63             # Class Method:
64             sub new {
65 0     0 1   my Gearman::Server::Client $self = shift;
66 0           my ($sock, $server) = @_;
67 0 0         $self = fields::new($self) unless ref $self;
68 0           $self->SUPER::new($sock);
69              
70             # Number of bytes to read as fast as we can (don't try to process them)
71 0           $self->{fast_read} = undef;
72              
73             # Array of buffers used during fast read operation
74 0           $self->{fast_buffer} = [];
75 0           $self->{read_buf} = '';
76 0           $self->{sleeping} = 0;
77 0           $self->{can_do} = {};
78              
79             # handle -> Job
80 0           $self->{doing} = {};
81 0           $self->{can_do_list} = [];
82              
83             # numeric iterator for where we start looking for jobs
84 0           $self->{can_do_iter} = 0;
85 0           $self->{client_id} = "-";
86 0           $self->{server} = $server;
87 0           $self->{options} = {};
88 0           $self->{jobs_done_since_sleep} = 0;
89              
90 0           return $self;
91             } ## end sub new
92              
93             sub option {
94 0     0 0   my Gearman::Server::Client $self = shift;
95 0           my $option = shift;
96              
97 0           return $self->{options}->{$option};
98             } ## end sub option
99              
100             sub close {
101 0     0 1   my Gearman::Server::Client $self = shift;
102              
103 0           my $doing = $self->{doing};
104              
105 0           while (my ($handle, $job) = each %$doing) {
106 0           my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
107 0           $job->relay_to_listeners($msg);
108 0           $job->note_finished(0);
109             }
110              
111             # Clear the doing list, since it may contain a set of jobs which contain
112             # references back to us.
113 0           %$doing = ();
114              
115             # Remove self from sleepers, otherwise it will be leaked if another worker
116             # for the job never connects.
117 0           my $sleepers = $self->{server}{sleepers};
118 0           my $sleepers_list = $self->{server}{sleepers_list};
119 0           for my $job (@{ $self->{can_do_list} }) {
  0            
120 0           my $sleeping = $sleepers->{$job};
121 0           delete $sleeping->{$self};
122              
123 0           my $new_sleepers_list;
124 0           for my $client (@{ $sleepers_list->{$job} }) {
  0            
125 0 0         next unless $client;
126 0 0         push @{$new_sleepers_list}, $client unless $sleeping->{$client};
  0            
127             }
128 0 0         if ($new_sleepers_list) {
129 0           $self->{server}{sleepers_list}->{$job} = $new_sleepers_list;
130             }
131             else {
132 0           delete $self->{server}{sleepers_list}->{$job};
133             }
134              
135 0 0         delete $sleepers->{$job} unless %$sleeping;
136             } ## end for my $job (@{ $self->...})
137              
138 0           $self->{server}->note_disconnected_client($self);
139              
140 0           $self->CMD_reset_abilities;
141              
142 0           $self->SUPER::close;
143             } ## end sub close
144              
145             # Client
146             sub event_read {
147 0     0 1   my Gearman::Server::Client $self = shift;
148              
149 0   0       my $read_size = $self->{fast_read} || READ_SIZE;
150 0           my $bref = $self->read($read_size);
151              
152             # Delay close till after buffers are written on EOF. If we are unable
153             # to write 'err' or 'hup' will be thrown and we'll close faster.
154 0 0   0     return $self->write(sub { $self->close }) unless defined $bref;
  0            
155              
156 0 0         if ($self->{fast_read}) {
157 0           push @{ $self->{fast_buffer} }, $$bref;
  0            
158 0           $self->{fast_read} -= length($$bref);
159              
160             # If fast_read is still positive, then we need to read more data
161 0 0         return if ($self->{fast_read} > 0);
162              
163             # Append the whole giant read buffer to our main read buffer
164 0           $self->{read_buf} .= join('', @{ $self->{fast_buffer} });
  0            
165              
166             # Reset the fast read state for next time.
167 0           $self->{fast_buffer} = [];
168 0           $self->{fast_read} = undef;
169             } ## end if ($self->{fast_read})
170             else {
171             # Exact read size length likely means we have more sitting on the
172             # socket. Buffer up to half a meg in one go.
173 0 0         if (length($$bref) == READ_SIZE) {
174 0           my $limit = int(MAX_READ_SIZE / READ_SIZE);
175 0           my @crefs = ($$bref);
176 0           while (my $cref = $self->read(READ_SIZE)) {
177 0           push(@crefs, $$cref);
178 0 0 0       last if (length($$cref) < READ_SIZE || $limit-- < 1);
179             }
180 0           $bref = \join('', @crefs);
181             } ## end if (length($$bref) == ...)
182 0           $self->{read_buf} .= $$bref;
183             } ## end else [ if ($self->{fast_read})]
184              
185 0           my $found_cmd;
186 0           do {
187 0           $found_cmd = 1;
188 0           my $blen = length($self->{read_buf});
189              
190 0 0         if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
    0          
191 0           my ($cmd, $len) = unpack("NN", $1);
192 0 0         if ($blen < $len + 12) {
193              
194             # Start a fast read loop to get all the data we need, less
195             # what we already have in the buffer.
196 0           $self->{fast_read} = $len + 12 - $blen;
197 0           return;
198             } ## end if ($blen < $len + 12)
199              
200 0           $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len));
201              
202             # and slide down buf:
203 0           $self->{read_buf} = substr($self->{read_buf}, 12 + $len);
204              
205             } ## end if ($self->{read_buf} ...)
206             elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) {
207              
208             # ASCII command case (useful for telnetting in)
209 0           my $line = $1;
210 0           $self->process_line($line);
211             } ## end elsif ($self->{read_buf} ...)
212             else {
213 0           $found_cmd = 0;
214             }
215             } while ($found_cmd);
216             } ## end sub event_read
217              
218             sub event_write {
219 0     0 1   my $self = shift;
220 0           my $done = $self->write(undef);
221 0 0         $self->watch_write(0) if $done;
222             }
223              
224             # Line based command processor
225             sub process_line {
226 0     0 0   my Gearman::Server::Client $self = shift;
227 0           my $line = shift;
228              
229 0 0 0       if ($line && $line =~ /^(\w+)\s*(.*)/) {
230 0           my ($cmd, $args) = ($1, $2);
231 0           $cmd = lc($cmd);
232 0           my $code = $self->can("TXTCMD_$cmd");
233 0 0         if ($code) {
234 0           $code->($self, $args);
235 0           return;
236             }
237             } ## end if ($line && $line =~ ...)
238              
239 0           return $self->err_line('unknown_command');
240             } ## end sub process_line
241              
242             =head1 Binary Protocol Structure
243              
244             All binary protocol exchanges between clients (which can be callers,
245             workers, or both) and the Gearman server have common packet header:
246              
247             4 byte magic -- either "\0REQ" for requests to the server, or
248             "\0RES" for responses from the server
249             4 byte type -- network order integer, representing the packet type
250             4 byte length -- network order length, for data segment.
251             data -- optional, if length is non-zero
252              
253             =head1 Binary Protocol Commands
254              
255             =head2 echo_req (type=16)
256              
257             A debug command. The server will reply with the same data, in a echo_res (type=17) packet.
258              
259             =head2 (and many more...)
260              
261             FIXME: auto-generate protocol docs from internal Gearman::Util table,
262             once annotated with some English?
263              
264             =cut
265              
266             sub CMD_echo_req {
267 0     0 0   my Gearman::Server::Client $self = shift;
268 0           my $blobref = shift;
269              
270 0           return $self->res_packet("echo_res", $$blobref);
271             } ## end sub CMD_echo_req
272              
273             sub CMD_work_status {
274 0     0 0   my Gearman::Server::Client $self = shift;
275 0           my $ar = shift;
276 0           my ($handle, $nu, $de) = split(/\0/, $$ar);
277              
278 0           my $job = $self->{doing}{$handle};
279 0 0 0       return $self->error_packet("not_worker")
280             unless $job && $job->worker == $self;
281              
282 0           my $msg = Gearman::Util::pack_res_command("work_status", $$ar);
283 0           $job->relay_to_listeners($msg);
284 0           $job->status([$nu, $de]);
285 0           return 1;
286             } ## end sub CMD_work_status
287              
288             sub CMD_work_complete {
289 0     0 0   my Gearman::Server::Client $self = shift;
290 0           my $ar = shift;
291              
292 0           $$ar =~ s/^(.+?)\0//;
293 0           my $handle = $1;
294              
295 0           my $job = delete $self->{doing}{$handle};
296 0 0 0       return $self->error_packet("not_worker")
297             unless $job && $job->worker == $self;
298              
299 0           my $msg = Gearman::Util::pack_res_command("work_complete",
300             join("\0", $handle, $$ar));
301 0           $job->relay_to_listeners($msg);
302 0           $job->note_finished(1);
303 0 0         if (my $timer = $self->{timer}) {
304 0           $timer->cancel;
305 0           $self->{timer} = undef;
306             }
307              
308 0           return 1;
309             } ## end sub CMD_work_complete
310              
311             sub CMD_work_fail {
312 0     0 0   my Gearman::Server::Client $self = shift;
313 0           my $ar = shift;
314 0           my $handle = $$ar;
315 0           my $job = delete $self->{doing}{$handle};
316 0 0 0       return $self->error_packet("not_worker")
317             unless $job && $job->worker == $self;
318              
319 0           my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
320 0           $job->relay_to_listeners($msg);
321 0           $job->note_finished(1);
322 0 0         if (my $timer = $self->{timer}) {
323 0           $timer->cancel;
324 0           $self->{timer} = undef;
325             }
326              
327 0           return 1;
328             } ## end sub CMD_work_fail
329              
330             sub CMD_work_exception {
331 0     0 0   my Gearman::Server::Client $self = shift;
332 0           my $ar = shift;
333              
334 0           $$ar =~ s/^(.+?)\0//;
335 0           my $handle = $1;
336 0           my $job = $self->{doing}{$handle};
337              
338 0 0 0       return $self->error_packet("not_worker")
339             unless $job && $job->worker == $self;
340              
341 0           my $msg = Gearman::Util::pack_res_command("work_exception",
342             join("\0", $handle, $$ar));
343 0           $job->relay_to_option_listeners($msg, "exceptions");
344              
345 0           return 1;
346             } ## end sub CMD_work_exception
347              
348             sub CMD_pre_sleep {
349 0     0 0   my Gearman::Server::Client $self = shift;
350 0           $self->{'sleeping'} = 1;
351 0           $self->{server}->on_client_sleep($self);
352 0           return 1;
353             } ## end sub CMD_pre_sleep
354              
355             sub CMD_grab_job {
356 0     0 0   my Gearman::Server::Client $self = shift;
357              
358 0           my $job;
359 0           my $can_do_size = scalar @{ $self->{can_do_list} };
  0            
360              
361 0 0         unless ($can_do_size) {
362 0           $self->res_packet("no_job");
363 0           return;
364             }
365              
366             # the offset where we start asking for jobs, to prevent starvation
367             # of some job types.
368 0           $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size;
369              
370 0           my $tried = 0;
371 0           while ($tried < $can_do_size) {
372 0           my $idx = ($tried + $self->{can_do_iter}) % $can_do_size;
373 0           $tried++;
374 0           my $job_to_grab = $self->{can_do_list}->[$idx];
375 0 0         $job = $self->{server}->grab_job($job_to_grab)
376             or next;
377              
378 0           $job->worker($self);
379 0           $self->{doing}{ $job->handle } = $job;
380              
381 0           my $timeout = $self->{can_do}->{$job_to_grab};
382 0 0         if (defined $timeout) {
383             my $timer = Danga::Socket->AddTimer(
384             $timeout,
385             sub {
386 0 0   0     return $self->error_packet("not_worker")
387             unless $job->worker == $self;
388              
389 0           my $msg = Gearman::Util::pack_res_command("work_fail",
390             $job->handle);
391 0           $job->relay_to_listeners($msg);
392 0           $job->note_finished(1);
393 0           $job->clear_listeners;
394 0           $self->{timer} = undef;
395             }
396 0           );
397 0           $self->{timer} = $timer;
398             } ## end if (defined $timeout)
399             return $self->res_packet("job_assign",
400 0           join("\0", $job->handle, $job->func, ${ $job->argref },));
  0            
401             } ## end while ($tried < $can_do_size)
402              
403 0           $self->res_packet("no_job");
404             } ## end sub CMD_grab_job
405              
406             sub CMD_can_do {
407 0     0 0   my Gearman::Server::Client $self = shift;
408 0           my $ar = shift;
409              
410 0           $self->{can_do}->{$$ar} = undef;
411 0           $self->_setup_can_do_list;
412             } ## end sub CMD_can_do
413              
414             sub CMD_can_do_timeout {
415 0     0 0   my Gearman::Server::Client $self = shift;
416 0           my $ar = shift;
417              
418 0           my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/;
419              
420 0 0         if (defined $timeout) {
421 0           $self->{can_do}->{$task} = $timeout;
422             }
423             else {
424 0           $self->{can_do}->{$task} = undef;
425             }
426              
427 0           $self->_setup_can_do_list;
428             } ## end sub CMD_can_do_timeout
429              
430             sub CMD_option_req {
431 0     0 0   my Gearman::Server::Client $self = shift;
432 0           my $ar = shift;
433              
434             my $success = sub {
435 0     0     return $self->res_packet("option_res", $$ar);
436 0           };
437              
438 0 0         if ($$ar eq 'exceptions') {
439 0           $self->{options}->{exceptions} = 1;
440 0           return $success->();
441             }
442              
443 0           return $self->error_packet("unknown_option");
444             } ## end sub CMD_option_req
445              
446             sub CMD_set_client_id {
447 0     0 0   my Gearman::Server::Client $self = shift;
448 0           my $ar = shift;
449              
450 0           $self->{client_id} = $$ar;
451 0           $self->{client_id} =~ s/\s+//g;
452 0 0         $self->{client_id} = "-" unless length $self->{client_id};
453             } ## end sub CMD_set_client_id
454              
455             sub CMD_cant_do {
456 0     0 0   my Gearman::Server::Client $self = shift;
457 0           my $ar = shift;
458              
459 0           delete $self->{can_do}->{$$ar};
460 0           $self->_setup_can_do_list;
461             } ## end sub CMD_cant_do
462              
463             sub CMD_get_status {
464 0     0 0   my Gearman::Server::Client $self = shift;
465 0           my $ar = shift;
466 0           my $job = $self->{server}->job_by_handle($$ar);
467              
468             # handles can't contain nulls
469 0 0         return if $$ar =~ /\0/;
470              
471 0           my ($known, $running, $num, $den);
472 0           $known = 0;
473 0           $running = 0;
474 0 0         if ($job) {
475 0           $known = 1;
476 0 0         $running = $job->worker ? 1 : 0;
477 0 0         if (my $stat = $job->status) {
478 0           ($num, $den) = @$stat;
479             }
480             } ## end if ($job)
481              
482 0 0         $num = '' unless defined $num;
483 0 0         $den = '' unless defined $den;
484              
485 0           $self->res_packet("status_res",
486             join("\0", $$ar, $known, $running, $num, $den));
487             } ## end sub CMD_get_status
488              
489             sub CMD_reset_abilities {
490 0     0 0   my Gearman::Server::Client $self = shift;
491              
492 0           $self->{can_do} = {};
493 0           $self->_setup_can_do_list;
494             } ## end sub CMD_reset_abilities
495              
496             sub _setup_can_do_list {
497 0     0     my Gearman::Server::Client $self = shift;
498 0           $self->{can_do_list} = [keys %{ $self->{can_do} }];
  0            
499 0           $self->{can_do_iter} = 0;
500             }
501              
502 0     0 0   sub CMD_submit_job { push @_, 1; &_cmd_submit_job; }
  0            
503 0     0 0   sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; }
  0            
504 0     0 0   sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; }
  0            
505              
506             sub _cmd_submit_job {
507 0     0     my Gearman::Server::Client $self = shift;
508 0           my $ar = shift;
509 0           my $subscribe = shift;
510 0           my $high_pri = shift;
511              
512 0 0         return $self->error_packet("invalid_args", "No func/uniq header [$$ar].")
513             unless $$ar =~ s/^(.+?)\0(.*?)\0//;
514              
515 0           my ($func, $uniq) = ($1, $2);
516              
517 0           my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar,
518             $high_pri);
519              
520 0 0         if ($subscribe) {
521 0           $job->add_listener($self);
522             }
523             else {
524             # background mode
525 0           $job->require_listener(0);
526             }
527              
528 0           $self->res_packet("job_created", $job->handle);
529 0           $self->{server}->wake_up_sleepers($func);
530             } ## end sub _cmd_submit_job
531              
532             sub res_packet {
533 0     0 0   my Gearman::Server::Client $self = shift;
534 0           my ($code, $arg) = @_;
535 0           $self->write(Gearman::Util::pack_res_command($code, $arg));
536 0           return 1;
537             } ## end sub res_packet
538              
539             sub error_packet {
540 0     0 0   my Gearman::Server::Client $self = shift;
541 0           my ($code, $msg) = @_;
542 0           $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg"));
543 0           return 0;
544             } ## end sub error_packet
545              
546             sub process_cmd {
547 0     0 0   my Gearman::Server::Client $self = shift;
548 0           my $cmd = shift;
549 0           my $blob = shift;
550              
551 0           my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd);
552 0           my $ret = eval { $self->$cmd_name(\$blob); };
  0            
553 0 0         return $ret unless $@;
554 0           warn "Error: $@\n";
555 0           return $self->error_packet("server_error", $@);
556             } ## end sub process_cmd
557              
558 0     0 1   sub event_err { my $self = shift; $self->close; }
  0            
559 0     0 1   sub event_hup { my $self = shift; $self->close; }
  0            
560              
561             ############################################################################
562              
563             =head1 Line based commands
564              
565             These commands are used for administrative or statistic tasks to be done on the gearman server. They can be entered using a line based client (telnet, etc.) by connecting to the listening port (7003) and are also intended to be machine parsable.
566              
567             =head2 "workers"
568              
569             Emits list of registered workers, their fds, IPs, client ids, and list of registered abilities (function names they can do). Of format:
570              
571             fd ip.x.y.z client_id : func_a func_b func_c
572             fd ip.x.y.z client_id : func_a func_b func_c
573             fd ip.x.y.z client_id : func_a func_b func_c
574             .
575              
576             It ends with a line with just a period.
577              
578             =cut
579              
580             sub TXTCMD_workers {
581 0     0 0   my Gearman::Server::Client $self = shift;
582              
583 0           foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) {
  0            
584 0           my $fd = $cl->{fd};
585 0           $self->write("$fd "
586             . $cl->peer_ip_string
587 0           . " $cl->{client_id} : @{$cl->{can_do_list}}\n");
588              
589             } ## end foreach my $cl (sort { $a->...})
590 0           $self->write(".\n");
591             } ## end sub TXTCMD_workers
592              
593             =head2 "status"
594              
595             The output format of this function is tab separated columns as follows, followed by a line consisting of a fullstop and a newline (".\n") to indicate the end of output.
596              
597             =over
598              
599             =item Function name
600              
601             A string denoting the name of the function of the job
602              
603             =item Number in queue
604              
605             A positive integer indicating the total number of jobs for this function in the queue. This includes currently running ones as well (next column)
606              
607             =item Number of jobs running
608              
609             A positive integer showing how many jobs of this function are currently running
610              
611             =item Number of capable workers
612              
613             A positive integer denoting the maximum possible count of workers that could be doing this job. Though they may not all be working on it due to other tasks holding them busy.
614              
615             =back
616              
617             =cut
618              
619             sub TXTCMD_status {
620 0     0 0   my Gearman::Server::Client $self = shift;
621              
622 0           my %funcs; # func -> 1 (set of all funcs to display)
623              
624             # keep track of how many workers can do which functions
625             my %can;
626 0           foreach my $client ($self->{server}->clients) {
627 0           foreach my $func (@{ $client->{can_do_list} }) {
  0            
628 0           $can{$func}++;
629 0           $funcs{$func} = 1;
630             }
631             } ## end foreach my $client ($self->...)
632              
633 0           my %queued_funcs;
634             my %running_funcs;
635              
636 0           foreach my $job ($self->{server}->jobs) {
637 0           my $func = $job->func;
638 0           $queued_funcs{$func}++;
639 0 0         if ($job->worker) {
640 0           $running_funcs{$func}++;
641             }
642             } ## end foreach my $job ($self->{server...})
643              
644             # also include queued functions (even if there aren't workers)
645             # in our list of funcs to show.
646 0           $funcs{$_} = 1 foreach keys %queued_funcs;
647              
648 0           foreach my $func (sort keys %funcs) {
649 0   0       my $queued = $queued_funcs{$func} || 0;
650 0   0       my $running = $running_funcs{$func} || 0;
651 0   0       my $can = $can{$func} || 0;
652 0           $self->write("$func\t$queued\t$running\t$can\n");
653             } ## end foreach my $func (sort keys...)
654              
655 0           $self->write(".\n");
656             } ## end sub TXTCMD_status
657              
658             =head2 "jobs"
659              
660             Output format is zero or more lines of:
661              
662             [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n
663              
664             Follows by a single line of:
665              
666             .\n
667              
668             \t is a literal tab character
669             \n is perl's definition of newline (literal \n on linux, something else on win32)
670              
671             =cut
672              
673             sub TXTCMD_jobs {
674 0     0 0   my Gearman::Server::Client $self = shift;
675              
676 0           foreach my $job ($self->{server}->jobs) {
677 0           my $func = $job->func;
678 0           my $uniq = $job->uniq;
679 0           my $worker_addr = "-";
680              
681 0 0         if (my $worker = $job->worker) {
682 0           $worker_addr = $worker->peer_addr_string;
683             }
684              
685 0           my $listeners = $job->listeners;
686              
687 0           $self->write("$func\t$uniq\t$worker_addr\t$listeners\n");
688             } ## end foreach my $job ($self->{server...})
689              
690 0           $self->write(".\n");
691             } ## end sub TXTCMD_jobs
692              
693             =head2 "clients"
694              
695             Output format is zero or more sections of:
696              
697             =over
698              
699             One line of:
700              
701             [Client Address]\n
702              
703             Followed by zero or more lines of:
704              
705             \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n
706              
707             =back
708              
709             Follows by a single line of:
710              
711             .\n
712              
713             \t is a literal tab character
714             \n is perl's definition of newline (literal \n on linux, something else on win32)
715              
716             =cut
717              
718             sub TXTCMD_clients {
719 0     0 0   my Gearman::Server::Client $self = shift;
720              
721 0           my %jobs_by_client;
722              
723 0           foreach my $job ($self->{server}->jobs) {
724 0           foreach my $client ($job->listeners) {
725 0   0       my $ent = $jobs_by_client{$client} ||= [];
726 0           push @$ent, $job;
727             }
728             } ## end foreach my $job ($self->{server...})
729              
730 0           foreach my $client ($self->{server}->clients) {
731 0           my $client_addr = $client->peer_addr_string;
732 0           $self->write("$client_addr\n");
733 0   0       my $jobs = $jobs_by_client{$client} || [];
734              
735 0           foreach my $job (@$jobs) {
736 0           my $func = $job->func;
737 0           my $uniq = $job->uniq;
738 0           my $worker_addr = "-";
739              
740 0 0         if (my $worker = $job->worker) {
741 0           $worker_addr = $worker->peer_addr_string;
742             }
743 0           $self->write("\t$func\t$uniq\t$worker_addr\n");
744             } ## end foreach my $job (@$jobs)
745              
746             } ## end foreach my $client ($self->...)
747              
748 0           $self->write(".\n");
749             } ## end sub TXTCMD_clients
750              
751             sub TXTCMD_gladiator {
752 0     0 0   my Gearman::Server::Client $self = shift;
753 0   0       my $args = shift || "";
754 0           my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;";
755 0 0         if ($has_gladiator) {
756 0           my $all = Devel::Gladiator::walk_arena();
757 0           my %ct;
758 0           foreach my $it (@$all) {
759 0           $ct{ ref $it }++;
760 0 0         if (ref $it eq "CODE") {
761 0           my $name = Devel::Peek::CvGV($it);
762 0 0         $ct{$name}++ if $name =~ /ANON/;
763             }
764             } ## end foreach my $it (@$all)
765 0           $all = undef; # required to free memory
766 0           foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) {
  0            
767 0 0 0       next unless $ct{$n} > 1 || $args eq "all";
768 0           $self->write(sprintf("%7d $n\n", $ct{$n}));
769             }
770             } ## end if ($has_gladiator)
771 0           $self->write(".\n");
772             } ## end sub TXTCMD_gladiator
773              
774             =head2 "maxqueue" function [max_queue_size]
775              
776             For a given function of job, the maximum queue size is adjusted to be max_queue_size jobs long. A negative value indicates unlimited queue size.
777              
778             If the max_queue_size value is not supplied then it is unset (and the default maximum queue size will apply to this function).
779              
780             This function will return OK upon success, and will return ERR incomplete_args upon an invalid number of arguments.
781              
782             =cut
783              
784             sub TXTCMD_maxqueue {
785 0     0 0   my Gearman::Server::Client $self = shift;
786 0           my $args = shift;
787 0           my ($func, $max) = split /\s+/, $args;
788              
789 0 0         unless (length $func) {
790 0           return $self->err_line('incomplete_args');
791             }
792              
793 0           $self->{server}->set_max_queue($func, $max);
794 0           $self->write("OK\n");
795             } ## end sub TXTCMD_maxqueue
796              
797             =head2 "shutdown" ["graceful"]
798              
799             Close the server. Or "shutdown graceful" to close the listening socket, then close the server when traffic has died away.
800              
801             =cut
802              
803             sub TXTCMD_shutdown {
804 0     0 0   my Gearman::Server::Client $self = shift;
805 0           my $args = shift;
806 0 0         if ($args eq "graceful") {
    0          
807 0           $self->write("OK\n");
808 0           Gearmand::shutdown_graceful();
809             }
810             elsif (!$args) {
811 0           $self->write("OK\n");
812 0           exit 0;
813             }
814             else {
815 0           $self->err_line('unknown_args');
816             }
817             } ## end sub TXTCMD_shutdown
818              
819             =head2 "version"
820              
821             Returns server version.
822              
823             =cut
824              
825             sub TXTCMD_version {
826 0     0 0   my Gearman::Server::Client $self = shift;
827 0           $self->write("$Gearman::Server::VERSION\n");
828             }
829              
830             sub err_line {
831 0     0 0   my Gearman::Server::Client $self = shift;
832 0           my $err_code = shift;
833             my $err_text = {
834             'unknown_command# numeric iterator for where we start looking for jobl'
835             => "Unknown server command",
836             'unknown_args' => "Unknown arguments to server command",
837             'incomplete_args' =>
838             "An incomplete set of arguments was sent to this command",
839 0           }->{$err_code};
840              
841 0           $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
842 0           return 0;
843             } ## end sub err_line
844              
845             sub eurl {
846 0     0 0   my $a = $_[0];
847 0           $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
  0            
848 0           $a =~ tr/ /+/;
849 0           return $a;
850             } ## end sub eurl
851              
852             1;