File Coverage

blib/lib/Gearman/Server/Client.pm
Criterion Covered Total %
statement 44 382 11.5
branch 1 94 1.0
condition 0 37 0.0
subroutine 10 51 19.6
pod 8 37 21.6
total 63 601 10.4


line stmt bran cond sub pod time code
1             package Gearman::Server::Client;
2 3     3   13 use version ();
  3         4  
  3         117  
3             $Gearman::Server::Client::VERSION = version->declare("1.140_001");
4              
5 3     3   13 use strict;
  3         4  
  3         57  
6 3     3   11 use warnings;
  3         9  
  3         86  
7              
8             =head1 NAME
9              
10             Gearman::Server::Client - client for L based on L
11              
12             =head1 DESCRIPTION
13              
14             Used by L to instantiate connections from clients.
15             Clients speak either a binary protocol, for normal operation (calling
16             functions, grabbing function call requests, returning function values,
17             etc), or a text-based line protocol, for relatively rare
18             administrative / monitoring commands.
19              
20             The binary protocol commands aren't currently documented. (FIXME) But
21             they're well-implemented in L, L,
22             and L, if that's any consolation.
23              
24             The line-based administrative commands are documented below.
25              
26             =head1 METHODS
27              
28             =cut
29              
30 3     3   1474 use Gearman::Util;
  3         14107  
  3         109  
31 3     3   1874 use Danga::Socket;
  3         45369  
  3         157  
32 3     3   31 use base 'Danga::Socket';
  3         7  
  3         750  
33             use fields (
34              
35             # { $job_name => $timeout } $timeout can be undef indicating no timeout
36 3         23 'can_do',
37             'can_do_list',
38             'can_do_iter',
39             'fast_read',
40             'fast_buffer',
41             'read_buf',
42              
43             # 0/1: they've said they're sleeping and we haven't woken them up
44             'sleeping',
45              
46             # Timer for job cancellation
47             'timer',
48              
49             # { $job_handle => Job }
50             'doing',
51              
52             # opaque string, no whitespace. workers give this so checker scripts
53             # can tell apart the same worker connected to multiple jobservers.
54             'client_id',
55              
56             # pointer up to client's server
57             'server',
58             'options',
59             'jobs_done_since_sleep',
60 3     3   26 );
  3         7  
61              
62             # 60k read buffer default, similar to perlbal's backend read.
63 3     3   411 use constant READ_SIZE => 60 * 1024;
  3         6  
  3         285  
64 3     3   18 use constant MAX_READ_SIZE => 512 * 1024;
  3         7  
  3         17182  
65              
66             # Class Method:
67             sub new {
68 2     2 1 4537 my Gearman::Server::Client $self = shift;
69 2         5 my ($sock, $server) = @_;
70 2 50       20 $self = fields::new($self) unless ref $self;
71 2         510 $self->SUPER::new($sock);
72              
73             # Number of bytes to read as fast as we can (don't try to process them)
74 2         89 $self->{fast_read} = undef;
75              
76             # Array of buffers used during fast read operation
77 2         9 $self->{fast_buffer} = [];
78 2         6 $self->{read_buf} = '';
79 2         5 $self->{sleeping} = 0;
80 2         7 $self->{can_do} = {};
81              
82             # handle -> Job
83 2         6 $self->{doing} = {};
84 2         6 $self->{can_do_list} = [];
85              
86             # numeric iterator for where we start looking for jobs
87 2         8 $self->{can_do_iter} = 0;
88 2         7 $self->{client_id} = "-";
89 2         6 $self->{server} = $server;
90 2         6 $self->{options} = {};
91 2         7 $self->{jobs_done_since_sleep} = 0;
92              
93 2         7 return $self;
94             } ## end sub new
95              
96             =head2 option($option)
97              
98             =cut
99              
100             sub option {
101 0     0 1   my Gearman::Server::Client $self = shift;
102 0           my $option = shift;
103              
104 0           return $self->{options}->{$option};
105             } ## end sub option
106              
107             =head2 close()
108              
109             =cut
110              
111             sub close {
112 0     0 1   my Gearman::Server::Client $self = shift;
113              
114 0           my $doing = $self->{doing};
115              
116 0           while (my ($handle, $job) = each %$doing) {
117 0           my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
118 0           $job->relay_to_listeners($msg);
119 0           $job->note_finished(0);
120             }
121              
122             # Clear the doing list, since it may contain a set of jobs which contain
123             # references back to us.
124 0           %$doing = ();
125              
126             # Remove self from sleepers, otherwise it will be leaked if another worker
127             # for the job never connects.
128 0           my $sleepers = $self->{server}{sleepers};
129 0           my $sleepers_list = $self->{server}{sleepers_list};
130 0           for my $job (@{ $self->{can_do_list} }) {
  0            
131 0           my $sleeping = $sleepers->{$job};
132 0           delete $sleeping->{$self};
133              
134 0           my $new_sleepers_list;
135 0           for my $client (@{ $sleepers_list->{$job} }) {
  0            
136 0 0         next unless $client;
137 0 0         push @{$new_sleepers_list}, $client unless $sleeping->{$client};
  0            
138             }
139 0 0         if ($new_sleepers_list) {
140 0           $self->{server}{sleepers_list}->{$job} = $new_sleepers_list;
141             }
142             else {
143 0           delete $self->{server}{sleepers_list}->{$job};
144             }
145              
146 0 0         delete $sleepers->{$job} unless %$sleeping;
147             } ## end for my $job (@{ $self->...})
148              
149 0           $self->{server}->note_disconnected_client($self);
150              
151 0           $self->CMD_reset_abilities;
152              
153 0           $self->SUPER::close;
154             } ## end sub close
155              
156             =head2 event_read()
157              
158             read from socket
159              
160             =cut
161              
162             sub event_read {
163 0     0 1   my Gearman::Server::Client $self = shift;
164              
165 0   0       my $read_size = $self->{fast_read} || READ_SIZE;
166 0           my $bref = $self->read($read_size);
167              
168             # Delay close till after buffers are written on EOF. If we are unable
169             # to write 'err' or 'hup' will be thrown and we'll close faster.
170 0 0   0     return $self->write(sub { $self->close }) unless defined $bref;
  0            
171              
172 0 0         if ($self->{fast_read}) {
173 0           push @{ $self->{fast_buffer} }, $$bref;
  0            
174 0           $self->{fast_read} -= length($$bref);
175              
176             # If fast_read is still positive, then we need to read more data
177 0 0         return if ($self->{fast_read} > 0);
178              
179             # Append the whole giant read buffer to our main read buffer
180 0           $self->{read_buf} .= join('', @{ $self->{fast_buffer} });
  0            
181              
182             # Reset the fast read state for next time.
183 0           $self->{fast_buffer} = [];
184 0           $self->{fast_read} = undef;
185             } ## end if ($self->{fast_read})
186             else {
187             # Exact read size length likely means we have more sitting on the
188             # socket. Buffer up to half a meg in one go.
189 0 0         if (length($$bref) == READ_SIZE) {
190 0           my $limit = int(MAX_READ_SIZE / READ_SIZE);
191 0           my @crefs = ($$bref);
192 0           while (my $cref = $self->read(READ_SIZE)) {
193 0           push(@crefs, $$cref);
194 0 0 0       last if (length($$cref) < READ_SIZE || $limit-- < 1);
195             }
196 0           $bref = \join('', @crefs);
197             } ## end if (length($$bref) == ...)
198 0           $self->{read_buf} .= $$bref;
199             } ## end else [ if ($self->{fast_read})]
200              
201 0           my $found_cmd;
202 0           do {
203 0           $found_cmd = 1;
204 0           my $blen = length($self->{read_buf});
205              
206 0 0         if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
    0          
207 0           my ($cmd, $len) = unpack("NN", $1);
208 0 0         if ($blen < $len + 12) {
209              
210             # Start a fast read loop to get all the data we need, less
211             # what we already have in the buffer.
212 0           $self->{fast_read} = $len + 12 - $blen;
213 0           return;
214             } ## end if ($blen < $len + 12)
215              
216 0           $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len));
217              
218             # and slide down buf:
219 0           $self->{read_buf} = substr($self->{read_buf}, 12 + $len);
220              
221             } ## end if ($self->{read_buf} ...)
222             elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) {
223              
224             # ASCII command case (useful for telnetting in)
225 0           my $line = $1;
226 0           $self->process_line($line);
227             } ## end elsif ($self->{read_buf} ...)
228             else {
229 0           $found_cmd = 0;
230             }
231             } while ($found_cmd);
232             } ## end sub event_read
233              
234             =head2 event_write()
235              
236             =cut
237              
238             sub event_write {
239 0     0 1   my $self = shift;
240 0           my $done = $self->write(undef);
241 0 0         $self->watch_write(0) if $done;
242             }
243              
244             =head2 process_line($line)
245              
246             Line based command processor
247              
248             =cut
249              
250             sub process_line {
251 0     0 1   my Gearman::Server::Client $self = shift;
252 0           my $line = shift;
253              
254 0 0 0       if ($line && $line =~ /^(\w+)\s*(.*)/) {
255 0           my ($cmd, $args) = ($1, $2);
256 0           $cmd = lc($cmd);
257 0           my $code = $self->can("TXTCMD_$cmd");
258 0 0         if ($code) {
259 0           $code->($self, $args);
260 0           return;
261             }
262             } ## end if ($line && $line =~ ...)
263              
264 0           return $self->err_line('unknown_command');
265             } ## end sub process_line
266              
267             =head1 Binary Protocol Structure
268              
269             All binary protocol exchanges between clients (which can be callers,
270             workers, or both) and the Gearman server have common packet header:
271              
272             4 byte magic -- either "\0REQ" for requests to the server, or
273             "\0RES" for responses from the server
274             4 byte type -- network order integer, representing the packet type
275             4 byte length -- network order length, for data segment.
276             data -- optional, if length is non-zero
277              
278             =head1 Binary Protocol Commands
279              
280             =head2 echo_req (type=16)
281              
282             A debug command. The server will reply with the same data, in a echo_res (type=17) packet.
283              
284             =head2 (and many more...)
285              
286             FIXME: auto-generate protocol docs from internal Gearman::Util table,
287             once annotated with some English?
288              
289             =cut
290              
291             sub CMD_echo_req {
292 0     0 0   my Gearman::Server::Client $self = shift;
293 0           my $blobref = shift;
294              
295 0           return $self->res_packet("echo_res", $$blobref);
296             } ## end sub CMD_echo_req
297              
298             sub CMD_work_status {
299 0     0 0   my Gearman::Server::Client $self = shift;
300 0           my $ar = shift;
301 0           my ($handle, $nu, $de) = split(/\0/, $$ar);
302              
303 0           my $job = $self->{doing}{$handle};
304 0 0 0       return $self->error_packet("not_worker")
305             unless $job && $job->worker == $self;
306              
307 0           my $msg = Gearman::Util::pack_res_command("work_status", $$ar);
308 0           $job->relay_to_listeners($msg);
309 0           $job->status([$nu, $de]);
310 0           return 1;
311             } ## end sub CMD_work_status
312              
313             sub CMD_work_complete {
314 0     0 0   my Gearman::Server::Client $self = shift;
315 0           my $ar = shift;
316              
317 0           $$ar =~ s/^(.+?)\0//;
318 0           my $handle = $1;
319              
320 0           my $job = delete $self->{doing}{$handle};
321 0 0 0       return $self->error_packet("not_worker")
322             unless $job && $job->worker == $self;
323              
324 0           my $msg = Gearman::Util::pack_res_command("work_complete",
325             join("\0", $handle, $$ar));
326 0           $job->relay_to_listeners($msg);
327 0           $job->note_finished(1);
328 0 0         if (my $timer = $self->{timer}) {
329 0           $timer->cancel;
330 0           $self->{timer} = undef;
331             }
332              
333 0           return 1;
334             } ## end sub CMD_work_complete
335              
336             sub CMD_work_fail {
337 0     0 0   my Gearman::Server::Client $self = shift;
338 0           my $ar = shift;
339 0           my $handle = $$ar;
340 0           my $job = delete $self->{doing}{$handle};
341 0 0 0       return $self->error_packet("not_worker")
342             unless $job && $job->worker == $self;
343              
344 0           my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
345 0           $job->relay_to_listeners($msg);
346 0           $job->note_finished(1);
347 0 0         if (my $timer = $self->{timer}) {
348 0           $timer->cancel;
349 0           $self->{timer} = undef;
350             }
351              
352 0           return 1;
353             } ## end sub CMD_work_fail
354              
355             sub CMD_work_exception {
356 0     0 0   my Gearman::Server::Client $self = shift;
357 0           my $ar = shift;
358              
359 0           $$ar =~ s/^(.+?)\0//;
360 0           my $handle = $1;
361 0           my $job = $self->{doing}{$handle};
362              
363 0 0 0       return $self->error_packet("not_worker")
364             unless $job && $job->worker == $self;
365              
366 0           my $msg = Gearman::Util::pack_res_command("work_exception",
367             join("\0", $handle, $$ar));
368 0           $job->relay_to_option_listeners($msg, "exceptions");
369              
370 0           return 1;
371             } ## end sub CMD_work_exception
372              
373             sub CMD_pre_sleep {
374 0     0 0   my Gearman::Server::Client $self = shift;
375 0           $self->{'sleeping'} = 1;
376 0           $self->{server}->on_client_sleep($self);
377 0           return 1;
378             } ## end sub CMD_pre_sleep
379              
380             sub CMD_grab_job {
381 0     0 0   my Gearman::Server::Client $self = shift;
382              
383 0           my $job;
384 0           my $can_do_size = scalar @{ $self->{can_do_list} };
  0            
385              
386 0 0         unless ($can_do_size) {
387 0           $self->res_packet("no_job");
388 0           return;
389             }
390              
391             # the offset where we start asking for jobs, to prevent starvation
392             # of some job types.
393 0           $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size;
394              
395 0           my $tried = 0;
396 0           while ($tried < $can_do_size) {
397 0           my $idx = ($tried + $self->{can_do_iter}) % $can_do_size;
398 0           $tried++;
399 0           my $job_to_grab = $self->{can_do_list}->[$idx];
400 0 0         $job = $self->{server}->grab_job($job_to_grab)
401             or next;
402              
403 0           $job->worker($self);
404 0           $self->{doing}{ $job->handle } = $job;
405              
406 0           my $timeout = $self->{can_do}->{$job_to_grab};
407 0 0         if (defined $timeout) {
408             my $timer = Danga::Socket->AddTimer(
409             $timeout,
410             sub {
411 0 0   0     return $self->error_packet("not_worker")
412             unless $job->worker == $self;
413              
414 0           my $msg = Gearman::Util::pack_res_command("work_fail",
415             $job->handle);
416 0           $job->relay_to_listeners($msg);
417 0           $job->note_finished(1);
418 0           $job->clear_listeners;
419 0           $self->{timer} = undef;
420             }
421 0           );
422 0           $self->{timer} = $timer;
423             } ## end if (defined $timeout)
424             return $self->res_packet("job_assign",
425 0           join("\0", $job->handle, $job->func, ${ $job->argref },));
  0            
426             } ## end while ($tried < $can_do_size)
427              
428 0           $self->res_packet("no_job");
429             } ## end sub CMD_grab_job
430              
431             sub CMD_can_do {
432 0     0 0   my Gearman::Server::Client $self = shift;
433 0           my $ar = shift;
434              
435 0           $self->{can_do}->{$$ar} = undef;
436 0           $self->_setup_can_do_list;
437             } ## end sub CMD_can_do
438              
439             sub CMD_can_do_timeout {
440 0     0 0   my Gearman::Server::Client $self = shift;
441 0           my $ar = shift;
442              
443 0           my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/;
444              
445 0 0         if (defined $timeout) {
446 0           $self->{can_do}->{$task} = $timeout;
447             }
448             else {
449 0           $self->{can_do}->{$task} = undef;
450             }
451              
452 0           $self->_setup_can_do_list;
453             } ## end sub CMD_can_do_timeout
454              
455             sub CMD_option_req {
456 0     0 0   my Gearman::Server::Client $self = shift;
457 0           my $ar = shift;
458              
459             my $success = sub {
460 0     0     return $self->res_packet("option_res", $$ar);
461 0           };
462              
463 0 0         if ($$ar eq 'exceptions') {
464 0           $self->{options}->{exceptions} = 1;
465 0           return $success->();
466             }
467              
468 0           return $self->error_packet("unknown_option");
469             } ## end sub CMD_option_req
470              
471             sub CMD_set_client_id {
472 0     0 0   my Gearman::Server::Client $self = shift;
473 0           my $ar = shift;
474              
475 0           $self->{client_id} = $$ar;
476 0           $self->{client_id} =~ s/\s+//g;
477 0 0         $self->{client_id} = "-" unless length $self->{client_id};
478             } ## end sub CMD_set_client_id
479              
480             sub CMD_cant_do {
481 0     0 0   my Gearman::Server::Client $self = shift;
482 0           my $ar = shift;
483              
484 0           delete $self->{can_do}->{$$ar};
485 0           $self->_setup_can_do_list;
486             } ## end sub CMD_cant_do
487              
488             sub CMD_get_status {
489 0     0 0   my Gearman::Server::Client $self = shift;
490 0           my $ar = shift;
491 0           my $job = $self->{server}->job_by_handle($$ar);
492              
493             # handles can't contain nulls
494 0 0         return if $$ar =~ /\0/;
495              
496 0           my ($known, $running, $num, $den);
497 0           $known = 0;
498 0           $running = 0;
499 0 0         if ($job) {
500 0           $known = 1;
501 0 0         $running = $job->worker ? 1 : 0;
502 0 0         if (my $stat = $job->status) {
503 0           ($num, $den) = @$stat;
504             }
505             } ## end if ($job)
506              
507 0 0         $num = '' unless defined $num;
508 0 0         $den = '' unless defined $den;
509              
510 0           $self->res_packet("status_res",
511             join("\0", $$ar, $known, $running, $num, $den));
512             } ## end sub CMD_get_status
513              
514             sub CMD_reset_abilities {
515 0     0 0   my Gearman::Server::Client $self = shift;
516              
517 0           $self->{can_do} = {};
518 0           $self->_setup_can_do_list;
519             } ## end sub CMD_reset_abilities
520              
521             sub _setup_can_do_list {
522 0     0     my Gearman::Server::Client $self = shift;
523 0           $self->{can_do_list} = [keys %{ $self->{can_do} }];
  0            
524 0           $self->{can_do_iter} = 0;
525             }
526              
527 0     0 0   sub CMD_submit_job { push @_, 1; &_cmd_submit_job; }
  0            
528 0     0 0   sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; }
  0            
529 0     0 0   sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; }
  0            
530              
531             sub _cmd_submit_job {
532 0     0     my Gearman::Server::Client $self = shift;
533 0           my $ar = shift;
534 0           my $subscribe = shift;
535 0           my $high_pri = shift;
536              
537 0 0         return $self->error_packet("invalid_args", "No func/uniq header [$$ar].")
538             unless $$ar =~ s/^(.+?)\0(.*?)\0//;
539              
540 0           my ($func, $uniq) = ($1, $2);
541              
542 0           my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar,
543             $high_pri);
544              
545 0 0         if ($subscribe) {
546 0           $job->add_listener($self);
547             }
548             else {
549             # background mode
550 0           $job->require_listener(0);
551             }
552              
553 0           $self->res_packet("job_created", $job->handle);
554 0           $self->{server}->wake_up_sleepers($func);
555             } ## end sub _cmd_submit_job
556              
557             sub res_packet {
558 0     0 0   my Gearman::Server::Client $self = shift;
559 0           my ($code, $arg) = @_;
560 0           $self->write(Gearman::Util::pack_res_command($code, $arg));
561 0           return 1;
562             } ## end sub res_packet
563              
564             sub error_packet {
565 0     0 0   my Gearman::Server::Client $self = shift;
566 0           my ($code, $msg) = @_;
567 0           $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg"));
568 0           return 0;
569             } ## end sub error_packet
570              
571             sub process_cmd {
572 0     0 0   my Gearman::Server::Client $self = shift;
573 0           my $cmd = shift;
574 0           my $blob = shift;
575              
576 0           my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd);
577 0           my $ret = eval { $self->$cmd_name(\$blob); };
  0            
578 0 0         return $ret unless $@;
579 0           warn "Error: $@\n";
580 0           return $self->error_packet("server_error", $@);
581             } ## end sub process_cmd
582              
583 0     0 1   sub event_err { my $self = shift; $self->close; }
  0            
584 0     0 1   sub event_hup { my $self = shift; $self->close; }
  0            
585              
586             ############################################################################
587              
588             =head1 Line based commands
589              
590             These commands are used for administrative or statistic tasks to be done on the
591             gearman server. They can be entered using a line based client (telnet, etc.) by
592             connecting to the listening port (4730) and are also intended to be machine
593             parsable.
594              
595             =head2 "workers"
596              
597             Emits list of registered workers, their fds, IPs, client ids, and list of
598             registered abilities (function names they can do). Of format:
599              
600             fd ip.x.y.z client_id : func_a func_b func_c
601             fd ip.x.y.z client_id : func_a func_b func_c
602             fd ip.x.y.z client_id : func_a func_b func_c
603             .
604              
605             It ends with a line with just a period.
606              
607             =cut
608              
609             sub TXTCMD_workers {
610 0     0 0   my Gearman::Server::Client $self = shift;
611              
612 0           foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) {
  0            
613 0           my $fd = $cl->{fd};
614 0           $self->write("$fd "
615             . $cl->peer_ip_string
616 0           . " $cl->{client_id} : @{$cl->{can_do_list}}\n");
617              
618             } ## end foreach my $cl (sort { $a->...})
619 0           $self->write(".\n");
620             } ## end sub TXTCMD_workers
621              
622             =head2 "status"
623              
624             The output format of this function is tab separated columns as follows,
625             followed by a line consisting of a fullstop and a newline (".\n") to indicate
626             the end of output.
627              
628             =over
629              
630             =item Function name
631              
632             A string denoting the name of the function of the job
633              
634             =item Number in queue
635              
636             A positive integer indicating the total number of jobs for this function in the
637             queue. This includes currently running ones as well (next column)
638              
639             =item Number of jobs running
640              
641             A positive integer showing how many jobs of this function are currently running
642              
643             =item Number of capable workers
644              
645             A positive integer denoting the maximum possible count of workers that could be
646             doing this job. Though they may not all be working on it due to other tasks
647             holding them busy.
648              
649             =back
650              
651             =cut
652              
653             sub TXTCMD_status {
654 0     0 0   my Gearman::Server::Client $self = shift;
655              
656 0           my %funcs; # func -> 1 (set of all funcs to display)
657              
658             # keep track of how many workers can do which functions
659             my %can;
660 0           foreach my $client ($self->{server}->clients) {
661 0           foreach my $func (@{ $client->{can_do_list} }) {
  0            
662 0           $can{$func}++;
663 0           $funcs{$func} = 1;
664             }
665             } ## end foreach my $client ($self->...)
666              
667 0           my %queued_funcs;
668             my %running_funcs;
669              
670 0           foreach my $job ($self->{server}->jobs) {
671 0           my $func = $job->func;
672 0           $queued_funcs{$func}++;
673 0 0         if ($job->worker) {
674 0           $running_funcs{$func}++;
675             }
676             } ## end foreach my $job ($self->{server...})
677              
678             # also include queued functions (even if there aren't workers)
679             # in our list of funcs to show.
680 0           $funcs{$_} = 1 foreach keys %queued_funcs;
681              
682 0           foreach my $func (sort keys %funcs) {
683 0   0       my $queued = $queued_funcs{$func} || 0;
684 0   0       my $running = $running_funcs{$func} || 0;
685 0   0       my $can = $can{$func} || 0;
686 0           $self->write("$func\t$queued\t$running\t$can\n");
687             } ## end foreach my $func (sort keys...)
688              
689 0           $self->write(".\n");
690             } ## end sub TXTCMD_status
691              
692             =head2 "jobs"
693              
694             Output format is zero or more lines of:
695              
696             [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n
697              
698             Follows by a single line of:
699              
700             .\n
701              
702             \t is a literal tab character
703             \n is perl's definition of newline (literal \n on linux, something else on win32)
704              
705             =cut
706              
707             sub TXTCMD_jobs {
708 0     0 0   my Gearman::Server::Client $self = shift;
709              
710 0           foreach my $job ($self->{server}->jobs) {
711 0           my $func = $job->func;
712 0           my $uniq = $job->uniq;
713 0           my $worker_addr = "-";
714              
715 0 0         if (my $worker = $job->worker) {
716 0           $worker_addr = $worker->peer_addr_string;
717             }
718              
719 0           my $listeners = $job->listeners;
720              
721 0           $self->write("$func\t$uniq\t$worker_addr\t$listeners\n");
722             } ## end foreach my $job ($self->{server...})
723              
724 0           $self->write(".\n");
725             } ## end sub TXTCMD_jobs
726              
727             =head2 "clients"
728              
729             Output format is zero or more sections of:
730              
731             =over
732              
733             One line of:
734              
735             [Client Address]\n
736              
737             Followed by zero or more lines of:
738              
739             \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n
740              
741             =back
742              
743             Follows by a single line of:
744              
745             .\n
746              
747             \t is a literal tab character
748             \n is perl's definition of newline (literal \n on linux, something else on win32)
749              
750             =cut
751              
752             sub TXTCMD_clients {
753 0     0 0   my Gearman::Server::Client $self = shift;
754              
755 0           my %jobs_by_client;
756              
757 0           foreach my $job ($self->{server}->jobs) {
758 0           foreach my $client ($job->listeners) {
759 0   0       my $ent = $jobs_by_client{$client} ||= [];
760 0           push @$ent, $job;
761             }
762             } ## end foreach my $job ($self->{server...})
763              
764 0           foreach my $client ($self->{server}->clients) {
765 0           my $client_addr = $client->peer_addr_string;
766 0           $self->write("$client_addr\n");
767 0   0       my $jobs = $jobs_by_client{$client} || [];
768              
769 0           foreach my $job (@$jobs) {
770 0           my $func = $job->func;
771 0           my $uniq = $job->uniq;
772 0           my $worker_addr = "-";
773              
774 0 0         if (my $worker = $job->worker) {
775 0           $worker_addr = $worker->peer_addr_string;
776             }
777 0           $self->write("\t$func\t$uniq\t$worker_addr\n");
778             } ## end foreach my $job (@$jobs)
779              
780             } ## end foreach my $client ($self->...)
781              
782 0           $self->write(".\n");
783             } ## end sub TXTCMD_clients
784              
785             sub TXTCMD_gladiator {
786 0     0 0   my Gearman::Server::Client $self = shift;
787 0   0       my $args = shift || "";
788 0           my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;";
789 0 0         if ($has_gladiator) {
790 0           my $all = Devel::Gladiator::walk_arena();
791 0           my %ct;
792 0           foreach my $it (@$all) {
793 0           $ct{ ref $it }++;
794 0 0         if (ref $it eq "CODE") {
795 0           my $name = Devel::Peek::CvGV($it);
796 0 0         $ct{$name}++ if $name =~ /ANON/;
797             }
798             } ## end foreach my $it (@$all)
799 0           $all = undef; # required to free memory
800 0           foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) {
  0            
801 0 0 0       next unless $ct{$n} > 1 || $args eq "all";
802 0           $self->write(sprintf("%7d $n\n", $ct{$n}));
803             }
804             } ## end if ($has_gladiator)
805 0           $self->write(".\n");
806             } ## end sub TXTCMD_gladiator
807              
808             =head2 "maxqueue" function [max_queue_size]
809              
810             For a given function of job, the maximum queue size is adjusted to be
811             max_queue_size jobs long. A negative value indicates unlimited queue size.
812              
813             If the max_queue_size value is not supplied then it is unset (and the default
814             maximum queue size will apply to this function).
815              
816             This function will return OK upon success, and will return ERR incomplete_args
817             upon an invalid number of arguments.
818              
819             =cut
820              
821             sub TXTCMD_maxqueue {
822 0     0 0   my Gearman::Server::Client $self = shift;
823 0           my $args = shift;
824 0           my ($func, $max) = split /\s+/, $args;
825              
826 0 0         unless (length $func) {
827 0           return $self->err_line('incomplete_args');
828             }
829              
830 0           $self->{server}->set_max_queue($func, $max);
831 0           $self->write("OK\n");
832             } ## end sub TXTCMD_maxqueue
833              
834             =head2 "version"
835              
836             Returns server version.
837              
838             =cut
839              
840             sub TXTCMD_version {
841 0     0 0   my Gearman::Server::Client $self = shift;
842 0           $self->write("$Gearman::Server::VERSION\n");
843             }
844              
845             sub err_line {
846 0     0 0   my Gearman::Server::Client $self = shift;
847 0           my $err_code = shift;
848 0           my %err_text = (
849              
850             # numeric iterator for where we start looking for jobl
851             unknown_command => "Unknown server command",
852             unknown_args => "Unknown arguments to server command",
853             incomplete_args =>
854             "An incomplete set of arguments was sent to this command",
855             );
856              
857             $self->write(
858             join '',
859             "ERR $err_code ",
860 0   0       eurl($err_text{$err_code}) || '', "\r\n"
861             );
862 0           return 0;
863             } ## end sub err_line
864              
865             sub eurl {
866 0     0 0   my $a = $_[0];
867 0 0         $a || return;
868 0           $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
  0            
869 0           $a =~ tr/ /+/;
870 0           return $a;
871             } ## end sub eurl
872              
873             1;