File Coverage

blib/lib/Gearman/Worker.pm
Criterion Covered Total %
statement 100 272 36.7
branch 25 124 20.1
condition 2 11 18.1
subroutine 20 32 62.5
pod 11 11 100.0
total 158 450 35.1


line stmt bran cond sub pod time code
1             package Gearman::Worker;
2 11     11   669643 use version;
  11         14365  
  11         59  
3             $Gearman::Worker::VERSION = version->declare("2.003_002");
4              
5 11     11   761 use strict;
  11         13  
  11         177  
6 11     11   39 use warnings;
  11         15  
  11         247  
7              
8 11     11   35 use base "Gearman::Objects";
  11         12  
  11         3995  
9              
10             =head1 NAME
11              
12             Gearman::Worker - Worker for gearman distributed job system
13              
14             =head1 SYNOPSIS
15              
16             use Gearman::Worker;
17             my $worker = Gearman::Worker->new;
18             $worker->job_servers(
19             '127.0.0.1',
20             {
21             ca_certs => ...,
22             cert_file => ...,
23             host => '10.0.0.1',
24             key_file => ...,
25             port => 4733,
26             socket_cb => sub {...},
27             use_ssl => 1,
28             }
29             );
30             $worker->register_function($funcname => $subref);
31             $worker->work while 1;
32              
33             =head1 DESCRIPTION
34              
35             I is a worker class for the Gearman distributed job system,
36             providing a framework for receiving and serving jobs from a Gearman server.
37              
38             Callers instantiate a I object, register a list of functions
39             and capabilities that they can handle, then enter an event loop, waiting
40             for the server to send jobs.
41              
42             The worker can send a return value back to the server, which then gets
43             sent back to the client that requested the job; or it can simply execute
44             silently.
45              
46             =head1 USAGE
47              
48             =head2 Gearman::Worker->new(%options)
49              
50             Creates a new I object, and returns the object.
51              
52             If I<%options> is provided, initializes the new worker object with the
53             settings in I<%options>, which can contain:
54              
55             =over 4
56              
57             =item * job_servers
58              
59             Calls I (see below) to initialize the list of job
60             servers. It will be ignored if this worker is running as a child
61             process of a gearman server.
62              
63             =item * prefix
64              
65             Calls I (see below) to set the prefix / namespace.
66              
67             =back
68              
69             =head2 $client-Eprefix($prefix)
70              
71             Sets the namespace / prefix for the function names. This is useful
72             for sharing job servers between different applications or different
73             instances of the same application (different development sandboxes for
74             example).
75              
76             The namespace is currently implemented as a simple tab separated
77             concatenation of the prefix and the function name.
78              
79             =head1 EXAMPLES
80              
81             =head2 Summation
82              
83             This is an example worker that receives a request to sum up a list of
84             integers.
85              
86             use Gearman::Worker;
87             use Storable qw( thaw );
88             use List::Util qw( sum );
89             my $worker = Gearman::Worker->new;
90             $worker->job_servers('127.0.0.1');
91             $worker->register_function(sum => sub { sum @{ thaw($_[0]->arg) } });
92             $worker->work while 1;
93              
94             See the I documentation for a sample client sending the
95             I job.
96              
97             =head1 METHODS
98              
99             =cut
100              
101             #TODO: retries?
102             #
103 11     11   56 use Carp ();
  11         14  
  11         136  
104 11     11   4074 use Gearman::Util ();
  11         17  
  11         249  
105 11     11   3391 use Gearman::Job;
  11         16  
  11         228  
106 11     11   586 use Storable ();
  11         2407  
  11         233  
107              
108             use fields (
109 11         66 'last_connect_fail', # host:port -> unixtime
110             'down_since', # host:port -> unixtime
111             'connecting', # host:port -> unixtime connect started at
112             'can', # ability -> subref (ability is func with optional prefix)
113             'timeouts', # ability -> timeouts
114             'client_id', # random identifier string, no whitespace
115             'parent_pipe', # bool/obj: if we're a child process of a gearman server,
116             # this is socket to our parent process. also means parent
117             # sock can never disconnect or timeout, etc..
118 11     11   35 );
  11         8  
119              
120             sub new {
121 6     6 1 14294 my ($class, %opts) = @_;
122 6         7 my $self = $class;
123 6 50       25 $self = fields::new($class) unless ref $self;
124              
125 6 100       2812 if ($ENV{GEARMAN_WORKER_USE_STDIO}) {
126 1 50       4 if ($opts{job_servers}) {
127 0         0 warn join ' ', __PACKAGE__,
128             'ignores job_servers if $ENV{GEARMAN_WORKER_USE_STDIO} is set';
129              
130             # delete job_servers to insure Gearman::Objects
131             # does not treat correspondent object property
132 0         0 delete($opts{job_servers});
133             } ## end if ($opts{job_servers})
134             } ## end if ($ENV{GEARMAN_WORKER_USE_STDIO...})
135              
136 6         22 $self->SUPER::new(%opts);
137              
138 6 100       10 if ($ENV{GEARMAN_WORKER_USE_STDIO}) {
139 1 50       16 open my $sock, '+<&', \*STDIN
140             or die "Unable to dup STDIN to socket for worker to use.";
141 1         3 $self->{job_servers} = [$sock];
142 1         1 $self->{parent_pipe} = $sock;
143              
144 1 50       3 die "Unable to initialize connection to gearmand"
145             unless $self->_on_connect($sock);
146             } ## end if ($ENV{GEARMAN_WORKER_USE_STDIO...})
147              
148 5         5 $self->{last_connect_fail} = {};
149 5         5 $self->{down_since} = {};
150 5         5 $self->{can} = {};
151 5         5 $self->{timeouts} = {};
152 5         5 $self->{client_id} = join('', map { chr(int(rand(26)) + 97) } (1 .. 30));
  150         169  
153              
154 5         17 return $self;
155             } ## end sub new
156              
157             =head2 reset_abilities
158              
159             tell all the jobservers that this worker can't do anything
160              
161             =cut
162              
163             sub reset_abilities {
164 1     1 1 317 my $self = shift;
165 1         3 my $req = _rc("reset_abilities");
166 1         1 foreach my $js (@{ $self->{job_servers} }) {
  1         4  
167 0 0       0 my $jss = $self->_get_js_sock($js)
168             or next;
169              
170 0 0       0 unless (_send($jss, $req)) {
171 0         0 $self->_uncache_sock($js, "err_write_reset_abilities");
172             }
173             } ## end foreach my $js (@{ $self->{...}})
174              
175 1         2 $self->{can} = {};
176 1         3 $self->{timeouts} = {};
177             } ## end sub reset_abilities
178              
179             =head2 _uncache_sock($js, $reason)
180              
181             close TCP connection
182              
183             =cut
184              
185             sub _uncache_sock {
186 0     0   0 my ($self, $js, $reason) = @_;
187              
188             # we can't reconnect as a child process, so all we can do is die and hope our
189             # parent process respawns us...
190             die "Error/timeout talking to gearman parent process: [$reason]"
191 0 0       0 if $self->{parent_pipe};
192              
193             # normal case, we just close this TCP connection and we'll reconnect later.
194             # delete cached sock
195 0         0 $self->_sock_cache($js, undef, 1);
196             } ## end sub _uncache_sock
197              
198             =head2 work(%opts)
199              
200             Endless loop takes a job and wait for the next one.
201             You can pass "stop_if", "on_start", "on_complete" and "on_fail" callbacks in I<%opts>.
202              
203             =cut
204              
205             my %job_done;
206              
207             sub work {
208 0     0 1 0 my ($self, %opts) = @_;
209 0   0 0   0 my $stop_if = delete($opts{stop_if}) || sub {0};
  0         0  
210 0         0 my $complete_cb = delete $opts{on_complete};
211 0         0 my $fail_cb = delete $opts{on_fail};
212 0         0 my $start_cb = delete $opts{on_start};
213 0 0       0 die "Unknown opts" if %opts;
214              
215 0         0 my $grab_req = _rc("grab_job");
216 0         0 my $presleep_req = _rc("pre_sleep");
217              
218 0         0 my $last_job_time;
219              
220             my $on_connect = sub {
221 0     0   0 return _send($_[0], $presleep_req);
222 0         0 };
223              
224 0         0 my %js_map = map { $self->_js_str($_) => $_ } $self->job_servers;
  0         0  
225              
226             # "Active" job servers are servers that have woken us up and should be
227             # queried to see if they have jobs for us to handle. On our first pass
228             # in the loop we contact all servers.
229 0         0 my %active_js = map { $_ => 1 } keys(%js_map);
  0         0  
230              
231             # ( js => last_update_time, ... )
232 0         0 my %last_update_time;
233              
234 0         0 while (1) {
235              
236             # "Jobby" job servers are the set of server which we will contact
237             # on this pass through the loop, because we need to clear and use
238             # the "Active" set to plan for our next pass through the loop.
239 0         0 my @jobby_js = keys %active_js;
240              
241 0         0 %active_js = ();
242              
243 0         0 my $js_count = @jobby_js;
244 0         0 my $js_offset = int(rand($js_count));
245 0         0 my $is_idle = 0;
246              
247 0         0 for (my $i = 0; $i < $js_count; $i++) {
248 0         0 my $js_index = ($i + $js_offset) % $js_count;
249 0         0 my $js_str = $jobby_js[$js_index];
250 0         0 my $js = $js_map{$js_str};
251 0 0       0 my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
252             or next;
253              
254             # TODO: add an optional sleep in here for the test suite
255             # to test gearmand server going away here. (SIGPIPE on
256             # send_req, etc) this testing has been done manually, at
257             # least.
258              
259 0 0       0 unless (_send($jss, $grab_req)) {
260 0 0 0     0 if ($!{EPIPE} && $self->{parent_pipe}) {
261              
262             # our parent process died, so let's just quit
263             # gracefully.
264 0         0 exit(0);
265             } ## end if ($!{EPIPE} && $self...)
266              
267 0         0 $self->_uncache_sock($js, "grab_job_timeout");
268 0         0 delete $last_update_time{$js_str};
269 0         0 next;
270             } ## end unless (_send($jss, $grab_req...))
271              
272             # if we're a child process talking over a unix pipe, give more
273             # time, since we know there are no network issues, and also
274             # because on failure, we can't "reconnect". all we can do is
275             # die and hope our parent process respawns us.
276 0 0       0 my $timeout = $self->{parent_pipe} ? 5 : 0.50;
277 0 0       0 unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout))
278             {
279 0         0 $self->_uncache_sock($js, "grab_job_timeout");
280 0         0 delete $last_update_time{$js_str};
281 0         0 next;
282             } ## end unless (Gearman::Util::wait_for_readability...)
283              
284 0         0 my $res;
285             do {
286 0         0 my $err;
287 0         0 $res = Gearman::Util::read_res_packet($jss, \$err);
288 0 0       0 unless ($res) {
289 0         0 $self->_uncache_sock($js, "read_res_error");
290 0         0 delete $last_update_time{$js_str};
291 0         0 next;
292             }
293 0         0 } while ($res->{type} eq "noop");
294              
295 0 0       0 if ($res->{type} eq "no_job") {
296 0 0       0 unless (_send($jss, $presleep_req)) {
297 0         0 delete $last_update_time{$js_str};
298 0         0 $self->_uncache_sock($js, "write_presleep_error");
299             }
300 0         0 $last_update_time{$js_str} = time;
301 0         0 next;
302             } ## end if ($res->{type} eq "no_job")
303              
304 0 0       0 unless ($res->{type} eq "job_assign") {
305 0         0 my $msg = "Uh, wasn't expecting a $res->{type} packet.";
306              
307 0 0       0 if ($res->{type} eq "error") {
308 0         0 $msg .= " [${$res->{blobref}}]\n";
  0         0  
309 0         0 $msg =~ s/\0/ -- /g;
310             }
311 0         0 die $msg;
312             } ## end unless ($res->{type} eq "job_assign")
313              
314 0 0       0 ${ $res->{blobref} } =~ s/^(.+?)\0(.+?)\0//
  0         0  
315             or die "Uh, regexp on job_assign failed";
316 0         0 my ($handle, $ability) = ($1, $2);
317             my $job = Gearman::Job->new(
318             func => $ability,
319             argref => $res->{blobref},
320 0         0 handle => $handle,
321             jss => $jss,
322             js => $js
323             );
324              
325 0         0 my $jobhandle = join("//", $js_str, $job->handle);
326 0 0       0 $start_cb->($jobhandle) if $start_cb;
327              
328 0         0 my $handler = $self->{can}{$ability};
329 0         0 my $ret = eval { $handler->($job); };
  0         0  
330 0         0 my $err = $@;
331 0 0       0 warn "Job '$ability' died: $err" if $err;
332              
333 0         0 $last_update_time{$js_str} = $last_job_time = time();
334 0 0       0 if ($err) {
335              
336             #TODO should be work_exception replaced by work_fail?
337             # see 75b65e1
338 0         0 my $exception_req
339             = _rc("work_exception",
340             _join0($handle, Storable::nfreeze(\$err)));
341 0 0       0 unless (_send($jss, $exception_req)) {
342 0         0 $self->_uncache_sock($js, "write_res_error");
343 0         0 next;
344             }
345             } ## end if ($err)
346              
347 0 0       0 if (!defined $job_done{ $job->handle }) {
348 0 0       0 if (defined $ret) {
349 0         0 $self->send_work_complete($job, $ret);
350             }
351             else {
352 0         0 $self->send_work_fail($job);
353             }
354             } ## end if (!defined $job_done...)
355              
356 0         0 my $done = delete $job_done{ $job->handle };
357 0 0       0 if ($done->{command} eq "work_complete") {
358 0 0       0 $complete_cb->($jobhandle, $ret) if $complete_cb;
359             }
360             else {
361 0 0       0 $fail_cb->($jobhandle, $err) if $fail_cb;
362             }
363              
364 0 0       0 unless ($done->{result}) {
365 0         0 $self->_uncache_sock($js, "write_res_error");
366 0         0 next;
367             }
368              
369 0         0 $active_js{$js_str} = 1;
370             } ## end for (my $i = 0; $i < $js_count...)
371              
372 0         0 my @jss;
373              
374 0         0 foreach my $js_str (keys(%js_map)) {
375             my $jss
376 0 0       0 = $self->_get_js_sock($js_map{$js_str},
377             on_connect => $on_connect)
378             or next;
379 0         0 push @jss, [$js_str, $jss];
380             } ## end foreach my $js_str (keys(%js_map...))
381              
382 0         0 $is_idle = 1;
383 0         0 my $wake_vec = '';
384              
385 0         0 foreach my $j (@jss) {
386 0         0 (undef, my $_jss) = @{$j};
  0         0  
387 0         0 my $fd = $_jss->fileno;
388 0         0 vec($wake_vec, $fd, 1) = 1;
389             }
390              
391 0 0       0 my $timeout = keys(%active_js) ? 0 : (10 + rand(2));
392              
393             # chill for some arbitrary time until we're woken up again
394 0         0 my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
395              
396 0 0       0 if ($nready) {
397 0         0 foreach my $j (@jss) {
398 0         0 my ($js_str, $jss) = @{$j};
  0         0  
399 0         0 my $fd = $jss->fileno;
400 0 0       0 $active_js{$js_str} = 1
401             if vec($wout, $fd, 1);
402             } ## end foreach my $j (@jss)
403             } ## end if ($nready)
404              
405 0 0       0 $is_idle = 0 if keys %active_js;
406              
407 0 0       0 return if $stop_if->($is_idle, $last_job_time);
408              
409 0         0 my $update_since = time - (15 + rand 60);
410              
411 0         0 while (my ($js_str, $last_update) = each %last_update_time) {
412 0 0       0 $active_js{$js_str} = 1 if $last_update < $update_since;
413             }
414             } ## end while (1)
415              
416             } ## end sub work
417              
418             =head2 $worker->register_function($funcname, $subref)
419              
420             =head2 $worker->register_function($funcname, $timeout, $subref)
421              
422             Registers the function C<$funcname> as being provided by the worker
423             C<$worker>, and advertises these capabilities to all of the job servers
424             defined in this worker.
425              
426             C<$subref> must be a subroutine reference that will be invoked when the
427             worker receives a request for this function. It will be passed a
428             L object representing the job that has been received by the
429             worker.
430              
431             C<$timeout> is an optional parameter specifying how long the jobserver will
432             wait for your subroutine to give an answer. Exceeding this time will result
433             in the jobserver reassigning the task and ignoring your result. This prevents
434             a gimpy worker from ruining the 'user experience' in many situations.
435              
436             B C<< _register_all(can_do request) >>
437              
438             =cut
439              
440             sub register_function {
441 2     2 1 444 my $self = shift;
442 2         3 my $func = shift;
443 2 100       6 my $timeout = shift unless (ref $_[0] eq 'CODE');
444 2         2 my $subref = shift;
445              
446 2         9 my $ability = $self->func($func);
447              
448 2         2 my $req;
449 2 100       4 if (defined $timeout) {
450 1         2 $req = _rc("can_do_timeout", _join0($ability, $timeout));
451 1         2 $self->{timeouts}{$ability} = $timeout;
452             }
453             else {
454 1         2 $req = _rc("can_do", $ability);
455             }
456              
457 2         4 $self->{can}{$ability} = $subref;
458              
459 2         4 return $self->_register_all($req);
460             } ## end sub register_function
461              
462             =head2 unregister_function($funcname)
463              
464             send cant_do C<$funcname> request to L
465              
466             B C<< _register_all(cant_do) >>
467              
468             =cut
469              
470             sub unregister_function {
471 0     0 1 0 my ($self, $func) = @_;
472 0         0 my $ability = $self->func($func);
473 0         0 delete $self->{can}{$ability};
474              
475 0         0 my $req = _rc("cant_do", $ability);
476 0         0 return $self->_register_all($req);
477             } ## end sub unregister_function
478              
479             =head2 job_servers(@servers)
480              
481             Override L method to skip job server initialization if
482             wokring with L.
483              
484             Calling this method will do nothing in a worker that is running as a child
485             process of a gearman server.
486              
487             =cut
488              
489             sub job_servers {
490 2     2 1 3 my $self = shift;
491 2 50       4 $ENV{GEARMAN_WORKER_USE_STDIO} && return $self->{job_servers};
492              
493 2         8 return $self->SUPER::job_servers(@_);
494             } ## end sub job_servers
495              
496             =head2 send_work_complete($job, $v)
497              
498             notify the server (and listening clients) that job completed successfully
499              
500             =cut
501              
502             sub send_work_complete {
503 0     0 1 0 return shift->_finish_job_request("work_complete", @_);
504             }
505              
506             =head2 send_work_data($job, $data)
507              
508             Use this method to update the client with data from a running job.
509              
510             =cut
511              
512             sub send_work_data {
513 0     0 1 0 my ($self, $job, $data) = @_;
514             return $self->_job_request("work_data", $job,
515 0 0       0 ref($data) ? ${$data} : $data);
  0         0  
516             }
517              
518             =head2 send_work_warning($job, $message)
519              
520             Use this method to send a warning C<$message> to the server (and any listening clients) with regard to the running C.
521              
522             =cut
523              
524             sub send_work_warning {
525 0     0 1 0 my ($self, $job, $msg) = @_;
526 0         0 return $self->_job_request("work_warning", $job, $msg);
527             }
528              
529             # =head2 send_work_exception($job, $exception)
530              
531             # Use this method to notify the server (and any listening clients) that the C failed with the given C<$exception>.
532              
533             # =cut
534              
535             # sub send_work_exception {
536             # my ($self, $job, $exc) = @_;
537             # return $self->_job_request("work_exception", $job, $exc);
538             # }
539              
540             =head2 send_work_fail($job, [$message])
541              
542             Use this method to notify the server (and any listening clients) that the job failed.
543              
544             =cut
545              
546             sub send_work_fail {
547 0     0 1 0 my ($self) = shift;
548 0         0 return $self->_finish_job_request("work_fail", @_);
549             }
550              
551             =head2 send_work_status($job, $numerator, $denominator)
552              
553             Use this method to send periodically to the server status update for long running jobs to update the percentage
554             complete.
555              
556             =cut
557              
558             sub send_work_status {
559 0     0 1 0 my ($self, $job, $numerator, $denominator) = @_;
560 0         0 return $self->_job_request("work_status", $job, $numerator, $denominator);
561             }
562              
563             # _finish_job_request($cmd, $job, [$v])
564             #
565             # send some data or message to the client for finished job
566             # $cmd = work_complete || work_fail
567             #
568             sub _finish_job_request {
569 0     0   0 my ($self, $cmd, $job, $v) = @_;
570 0 0       0 my $res = $self->_job_request($cmd, $job, ref($v) ? ${$v} : $v);
  0         0  
571              
572             # set job done flag because work method check it
573 0         0 $job_done{ $job->handle } = { command => $cmd, result => $res };
574              
575 0         0 return $res;
576             } ## end sub _finish_job_request
577              
578             # _job_request($cmd, $job, [$v])
579             #
580             # send some data to the client for the running job
581             #
582             sub _job_request {
583 0     0   0 my ($self, $cmd, $job, $v) = @_;
584 0 0       0 my $req = _rc($cmd, $v ? _join0($job->handle, $v) : $job->handle);
585              
586 0         0 return _send($job->{jss}, $req);
587             } ## end sub _job_request
588              
589             #
590             # _register_all($req)
591             #
592             sub _register_all {
593 2     2   2 my ($self, $req) = @_;
594              
595 2         2 my $count = 0;
596 2         3 my @job_servers = $self->job_servers();
597 2         3 foreach my $js (@job_servers) {
598 0 0       0 my $jss = $self->_get_js_sock($js)
599             or next;
600              
601 0 0       0 unless (_send($jss, $req)) {
602 0         0 $self->_uncache_sock($js, "write_register_func_error");
603 0         0 next;
604             }
605 0         0 $count++;
606             } ## end foreach my $js (@job_servers)
607              
608 2   33     9 return $count && $count == scalar(@job_servers);
609             } ## end sub _register_all
610              
611             #
612             # _get_js_sock($js, %opts)
613             #
614             sub _get_js_sock {
615 3     3   1514 my ($self, $js, %opts) = @_;
616 3 100       14 $js || return;
617              
618 2         10 my $js_str = $self->_js_str($js);
619 2         3 my $on_connect = delete $opts{on_connect};
620              
621             # Someday should warn when called with extra opts.
622              
623 2 50       4 warn "getting job server socket: $js_str" if $self->debug;
624              
625             # special case, if we're a child process of a gearman::server
626             # parent process, talking over a unix pipe...
627 2 100       10 return $self->{parent_pipe} if $self->{parent_pipe};
628              
629 1 50       8 if (my $sock = $self->_sock_cache($js)) {
630 0 0       0 return $sock if getpeername($sock);
631              
632             # delete cached sock
633 0         0 $self->_sock_cache($js, undef, 1);
634             } ## end if (my $sock = $self->...)
635              
636 1         1 my $now = time;
637 1         2 my $down_since = $self->{down_since}{$js_str};
638 1 50       2 if ($down_since) {
639 0 0       0 warn "job server down since $down_since" if $self->debug;
640              
641 0         0 my $down_for = $now - $down_since;
642 0 0       0 my $retry_period = $down_for > 60 ? 30 : (int($down_for / 2) + 1);
643 0 0       0 if ($self->{last_connect_fail}{$js_str} > $now - $retry_period) {
644 0         0 return;
645             }
646             } ## end if ($down_since)
647              
648 1 50       2 warn "connecting to '$js_str'" if $self->debug;
649              
650 1         6 my $sock = $self->socket($js, 1);
651 1 50       3 unless ($sock) {
652 1   33     7 $self->{down_since}{$js_str} ||= $now;
653 1         1 $self->{last_connect_fail}{$js_str} = $now;
654              
655 1         4 return;
656             } ## end unless ($sock)
657              
658 0         0 delete $self->{last_connect_fail}{$js_str};
659 0         0 delete $self->{down_since}{$js_str};
660              
661 0         0 $sock->autoflush(1);
662 0         0 $self->sock_nodelay($sock);
663              
664 0         0 $self->_sock_cache($js, $sock);
665              
666 0 0       0 my $ok = $on_connect ? $on_connect->($sock) : $self->_on_connect($sock);
667 0 0       0 unless ($ok) {
668              
669             # delete
670 0         0 $self->_sock_cache($js, undef, 1);
671 0         0 return;
672             } ## end unless ($ok)
673              
674 0         0 return $sock;
675             } ## end sub _get_js_sock
676              
677             #
678             # _on_connect($sock)
679             #
680             # Housekeeping things to do on connection to a server. Method call
681             # with one argument being the 'socket' we're going to take care of.
682             # returns true on success, false on failure.
683             #
684             sub _on_connect {
685 2     2   315 my ($self, $sock) = @_;
686              
687 2         5 my $cid_req = _rc("set_client_id", $self->{client_id});
688 2 50       4 return unless _send($sock, $cid_req);
689              
690             # get this socket's state caught-up
691 0         0 foreach my $ability (keys %{ $self->{can} }) {
  0         0  
692 0         0 my $timeout = $self->{timeouts}->{$ability};
693 0 0       0 unless ($self->_set_ability($sock, $ability, $timeout)) {
694 0         0 return;
695             }
696             } ## end foreach my $ability (keys %...)
697              
698 0         0 return 1;
699             } ## end sub _on_connect
700              
701             #
702             # _set_ability($sock, $ability, [$timeout])
703             #
704             sub _set_ability {
705 3     3   5 my ($self, $sock, $ability, $timeout) = @_;
706 3         3 my $req;
707 3 100       7 if (defined $timeout) {
708 1         2 $req = _rc("can_do_timeout", _join0($ability, $timeout));
709             }
710             else {
711 2         3 $req = _rc("can_do", $ability);
712             }
713 3         6 return _send($sock, $req);
714             } ## end sub _set_ability
715              
716             #
717             # _send($jss, $req)
718             #
719             # send C<$req> to C<$jss>
720             #
721             sub _send {
722 5     5   6 my ($jss, $req) = @_;
723 5         8 return Gearman::Util::send_req($jss, \$req);
724             }
725              
726             #
727             # _rc($cmd, [@val])
728             #
729             sub _rc {
730 8     8   17 return Gearman::Util::pack_req_command(@_);
731             }
732              
733             #
734             # _join0(@v)
735             #
736             sub _join0 {
737 2     2   6 return join("\0", @_);
738             }
739              
740             1;
741             __END__