File Coverage

blib/lib/Gearman/Client/Async/Connection.pm
Criterion Covered Total %
statement 133 257 51.7
branch 18 90 20.0
condition 2 16 12.5
subroutine 29 43 67.4
pod 5 18 27.7
total 187 424 44.1


line stmt bran cond sub pod time code
1             package Gearman::Client::Async::Connection;
2 11     11   74 use strict;
  11         20  
  11         489  
3 11     11   61 use warnings;
  11         21  
  11         405  
4              
5 11     11   57 use Danga::Socket;
  11         19  
  11         275  
6 11     11   74 use base 'Danga::Socket';
  11         16  
  11         2418  
7             use fields (
8 11         115 'state', # one of 3 state constants below
9             'waiting', # hashref of $handle -> [ Task+ ]
10             'need_handle', # arrayref of Gearman::Task objects which
11             # have been submitted but need handles.
12             'parser', # parser object
13             'hostspec', # scalar: "host:ip"
14             'deadtime', # unixtime we're marked dead until.
15             'task2handle', # hashref of stringified Task -> scalar handle
16             'on_ready', # arrayref of on_ready callbacks to run on connect success
17             'on_error', # arrayref of on_error callbacks to run on connect failure
18             't_offline', # bool: fake being off the net for purposes of connecting, to force timeout
19 11     11   65 );
  11         29  
20              
21             our $T_ON_TIMEOUT;
22              
23 11     11   1329 use constant S_DISCONNECTED => \ "disconnected";
  11         32  
  11         787  
24 11     11   60 use constant S_CONNECTING => \ "connecting";
  11         19  
  11         595  
25 11     11   56 use constant S_READY => \ "ready";
  11         23  
  11         580  
26              
27 11     11   57 use Carp qw(croak);
  11         18  
  11         666  
28 11     11   85 use Gearman::Task;
  11         17  
  11         291  
29 11     11   61 use Gearman::Util;
  11         18  
  11         275  
30 11     11   279 use Scalar::Util qw(weaken);
  11         30  
  11         954  
31              
32 11     11   58 use IO::Handle;
  11         18  
  11         440  
33 11     11   56 use Socket qw(PF_INET IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM);
  11         17  
  11         28783  
34              
35             sub DEBUGGING () { 0 }
36              
37             sub new {
38 1     1 1 20 my Gearman::Client::Async::Connection $self = shift;
39              
40 1         13 my %opts = @_;
41              
42 1 50       26 $self = fields::new( $self ) unless ref $self;
43              
44 1 50       235 my $hostspec = delete( $opts{hostspec} ) or
45             croak("hostspec required");
46              
47 1 50 33     95 if (ref $hostspec eq 'GLOB') {
    50          
48             # In this case we have been passed a globref, hopefully a socket that has already
49             # been connected to the Gearman server in some way.
50 0         0 $self->SUPER::new($hostspec);
51 0         0 $self->{state} = S_CONNECTING;
52 0         0 $self->{parser} = Gearman::ResponseParser::Async->new( $self );
53 0         0 $self->watch_write(1);
54             } elsif (ref $hostspec && $hostspec->can("to_inprocess_server")) {
55             # In this case we have been passed an object that looks like a Gearman::Server,
56             # which we can just call "to_inprocess_server" on to get a socketpair connecting
57             # to it.
58 1         5 my $sock = $hostspec->to_inprocess_server;
59 1         637 $self->SUPER::new($sock);
60 1         43 $self->{state} = S_CONNECTING;
61 1         20 $self->{parser} = Gearman::ResponseParser::Async->new( $self );
62 1         11 $self->watch_write(1);
63             }else {
64 0         0 $self->{state} = S_DISCONNECTED;
65             }
66              
67 1         25 $self->{hostspec} = $hostspec;
68 1         3 $self->{waiting} = {};
69 1         3 $self->{need_handle} = [];
70 1         6 $self->{deadtime} = 0;
71 1         2 $self->{on_ready} = [];
72 1         2 $self->{on_error} = [];
73 1         2 $self->{task2handle} = {};
74              
75 1 50       4 croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
76 1         13 return $self;
77             }
78              
79             sub close_when_finished {
80 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
81             # FIXME: implement
82             }
83              
84             sub hostspec {
85 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
86              
87 0         0 return $self->{hostspec};
88             }
89              
90             sub connect {
91 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
92              
93 0         0 $self->{state} = S_CONNECTING;
94              
95 0         0 my ($host, $port) = split /:/, $self->{hostspec};
96 0   0     0 $port ||= 7003;
97              
98 0         0 warn "Connecting to $self->{hostspec}\n" if DEBUGGING;
99              
100 0         0 socket my $sock, PF_INET, SOCK_STREAM, IPPROTO_TCP;
101 0         0 IO::Handle::blocking($sock, 0);
102 0 0       0 setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
103              
104 0 0 0     0 unless ($sock && defined fileno($sock)) {
105 0         0 warn( "Error creating socket: $!\n" );
106 0         0 return undef;
107             }
108              
109 0         0 $self->SUPER::new( $sock );
110 0         0 $self->{parser} = Gearman::ResponseParser::Async->new( $self );
111              
112 0         0 eval {
113 0         0 connect $sock, Socket::sockaddr_in($port, Socket::inet_aton($host));
114             };
115 0 0       0 if ($@) {
116 0         0 $self->on_connect_error;
117 0         0 return;
118             }
119              
120             Danga::Socket->AddTimer(0.25, sub {
121 0 0   0   0 return unless $self->{state} == S_CONNECTING;
122 0 0       0 $T_ON_TIMEOUT->() if $T_ON_TIMEOUT;
123 0         0 $self->on_connect_error;
124 0         0 });
125              
126             # unless we're faking being offline for the test suite, connect and watch
127             # for writabilty so we know the connect worked...
128 0 0       0 unless ($self->{t_offline}) {
129 0         0 $self->watch_write(1);
130             }
131             }
132              
133             sub event_write {
134 1     1 1 99 my Gearman::Client::Async::Connection $self = shift;
135              
136 1 50       5 if ($self->{state} == S_CONNECTING) {
137 1         3 $self->{state} = S_READY;
138 1         13 $self->watch_read(1);
139 1         24 warn "$self->{hostspec} connected and ready.\n" if DEBUGGING;
140 1         2 $_->() foreach @{$self->{on_ready}};
  1         6  
141 1         15 $self->destroy_callbacks;
142             }
143              
144 1 50       4 $self->watch_write(0) if $self->write(undef);
145             }
146              
147             sub destroy_callbacks {
148 1     1 0 2 my Gearman::Client::Async::Connection $self = shift;
149 1         3 $self->{on_ready} = [];
150 1         2 $self->{on_error} = [];
151             }
152              
153             sub event_read {
154 1     1 1 801 my Gearman::Client::Async::Connection $self = shift;
155              
156 1         7 my $input = $self->read( 128 * 1024 );
157 1 50       35 unless (defined $input) {
158 0 0       0 $self->mark_dead if $self->stuff_outstanding;
159 0         0 $self->close( "EOF" );
160 0         0 return;
161             }
162              
163 1         14 $self->{parser}->parse_data( $input );
164             }
165              
166             sub event_err {
167 0     0 1 0 my Gearman::Client::Async::Connection $self = shift;
168              
169 0         0 my $was_connecting = ($self->{state} == S_CONNECTING);
170              
171 0 0 0     0 if ($was_connecting && $self->{t_offline}) {
172 0         0 $self->SUPER::close( "error" );
173 0         0 return;
174             }
175              
176 0         0 $self->mark_dead;
177 0         0 $self->close( "error" );
178 0 0       0 $self->on_connect_error if $was_connecting;
179             }
180              
181             sub on_connect_error {
182 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
183 0         0 warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;
184              
185 0         0 $self->mark_dead;
186 0         0 $self->close( "error" );
187 0         0 $_->() foreach @{$self->{on_error}};
  0         0  
188 0         0 $self->destroy_callbacks;
189             }
190              
191             sub close {
192 0     0 1 0 my Gearman::Client::Async::Connection $self = shift;
193 0         0 my $reason = shift;
194              
195 0 0       0 if ($self->{state} != S_DISCONNECTED) {
196 0         0 $self->{state} = S_DISCONNECTED;
197 0         0 $self->SUPER::close( $reason );
198             }
199              
200 0         0 $self->_requeue_all;
201             }
202              
203             sub mark_dead {
204 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
205 0         0 $self->{deadtime} = time + 10;
206 0         0 warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
207             }
208              
209             sub alive {
210 2     2 0 6 my Gearman::Client::Async::Connection $self = shift;
211 2         19 return $self->{deadtime} <= time;
212             }
213              
214             sub add_task {
215 2     2 0 3 my Gearman::Client::Async::Connection $self = shift;
216 2         3 my Gearman::Task $task = shift;
217              
218 2 50       7 Carp::confess("add_task called when in wrong state")
219             unless $self->{state} == S_READY;
220              
221 2         2 warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;
222              
223 2         7 $self->write( $task->pack_submit_packet );
224 2         128 push @{$self->{need_handle}}, $task;
  2         5  
225 2         7 Scalar::Util::weaken($self->{need_handle}->[-1]);
226             }
227              
228             sub stuff_outstanding {
229 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
230             return
231             @{$self->{need_handle}} ||
232 0   0     0 %{$self->{waiting}};
233             }
234              
235             sub _requeue_all {
236 0     0   0 my Gearman::Client::Async::Connection $self = shift;
237              
238 0         0 my $need_handle = $self->{need_handle};
239 0         0 my $waiting = $self->{waiting};
240              
241 0         0 $self->{need_handle} = [];
242 0         0 $self->{waiting} = {};
243              
244 0         0 while (@$need_handle) {
245 0         0 my $task = shift @$need_handle;
246 0         0 warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
247 0 0       0 $task->fail if $task;
248             }
249              
250 0         0 while (my ($shandle, $tasklist) = each( %$waiting )) {
251 0         0 foreach my $task (@$tasklist) {
252 0         0 warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
253 0         0 $task->fail;
254             }
255             }
256             }
257              
258             sub process_packet {
259 2     2 0 3 my Gearman::Client::Async::Connection $self = shift;
260 2         4 my $res = shift;
261              
262 2         2 warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;
263              
264 2 50       52 if ($res->{type} eq "job_created") {
265              
266 2 50       3 die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
  2         7  
267 2 50       3 my Gearman::Task $task = shift @{ $self->{need_handle} } or
  2         8  
268             return 1;
269              
270              
271 2         3 my $shandle = ${ $res->{'blobref'} };
  2         4  
272 2 50       5 if ($task) {
273 2         6 $self->{task2handle}{"$task"} = $shandle;
274 2   50     4 push @{ $self->{waiting}->{$shandle} ||= [] }, $task;
  2         19  
275             }
276 2         6 return 1;
277             }
278              
279 0 0       0 if ($res->{type} eq "work_fail") {
280 0         0 my $shandle = ${ $res->{'blobref'} };
  0         0  
281 0         0 $self->_fail_jshandle($shandle);
282 0         0 return 1;
283             }
284              
285 0 0       0 if ($res->{type} eq "work_complete") {
286 0 0       0 ${ $res->{'blobref'} } =~ s/^(.+?)\0//
  0         0  
287             or die "Bogus work_complete from server";
288 0         0 my $shandle = $1;
289              
290 0 0       0 my $task_list = $self->{waiting}{$shandle} or
291             return;
292              
293 0 0       0 my Gearman::Task $task = shift @$task_list or
294             return;
295              
296 0         0 $task->complete($res->{'blobref'});
297              
298 0 0       0 unless (@$task_list) {
299 0         0 delete $self->{waiting}{$shandle};
300 0         0 delete $self->{task2handle}{"$task"};
301             }
302              
303 0         0 warn "Jobs: " . scalar( keys( %{$self->{waiting}} ) ) . "\n" if DEBUGGING;
304              
305 0         0 return 1;
306             }
307              
308 0 0       0 if ($res->{type} eq "work_status") {
309 0         0 my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
  0         0  
310              
311 0 0       0 my $task_list = $self->{waiting}{$shandle} or
312             return;
313              
314 0         0 foreach my Gearman::Task $task (@$task_list) {
315 0         0 $task->status($nu, $de);
316             }
317              
318 0         0 return 1;
319             }
320              
321 0         0 die "Unknown/unimplemented packet type: $res->{type}";
322              
323             }
324              
325             sub give_up_on {
326 0     0 0 0 my Gearman::Client::Async::Connection $self = shift;
327 0         0 my $task = shift;
328              
329 0 0       0 my $shandle = $self->{task2handle}{"$task"} or return;
330 0 0       0 my $task_list = $self->{waiting}{$shandle} or return;
331 0         0 @$task_list = grep { $_ != $task } @$task_list;
  0         0  
332 0 0       0 unless (@$task_list) {
333 0         0 delete $self->{waiting}{$shandle};
334             }
335              
336             }
337              
338             # note the failure of a task given by its jobserver-specific handle
339             sub _fail_jshandle {
340 0     0   0 my Gearman::Client::Async::Connection $self = shift;
341 0         0 my $shandle = shift;
342              
343 0 0       0 my $task_list = $self->{waiting}->{$shandle} or
344             return;
345              
346 0 0       0 my Gearman::Task $task = shift @$task_list or
347             return;
348              
349             # cleanup
350 0 0       0 unless (@$task_list) {
351 0         0 delete $self->{task2handle}{"$task"};
352 0         0 delete $self->{waiting}{$shandle};
353             }
354              
355 0         0 $task->fail;
356             }
357              
358             sub get_in_ready_state {
359 2     2 0 4 my ($self, $on_ready, $on_error) = @_;
360              
361 2 50       9 if ($self->{state} == S_READY) {
362 0         0 $on_ready->();
363 0         0 return;
364             }
365              
366 2 50       8 push @{$self->{on_ready}}, $on_ready if $on_ready;
  2         5  
367 2 50       6 push @{$self->{on_error}}, $on_error if $on_error;
  2         3  
368              
369 2 50       31 $self->connect if $self->{state} == S_DISCONNECTED;
370             }
371              
372             sub t_set_offline {
373 0     0 0 0 my ($self, $val) = @_;
374 0 0       0 $val = 1 unless defined $val;
375 0         0 $self->{t_offline} = $val;
376             }
377              
378             package Gearman::ResponseParser::Async;
379              
380 11     11   95 use strict;
  11         20  
  11         383  
381 11     11   54 use warnings;
  11         26  
  11         430  
382 11     11   56 use Scalar::Util qw(weaken);
  11         19  
  11         599  
383              
384 11     11   60 use Gearman::ResponseParser;
  11         19  
  11         342  
385 11     11   50 use base 'Gearman::ResponseParser';
  11         18  
  11         2626  
386              
387             sub new {
388 1     1   7 my $class = shift;
389              
390 1         23 my $self = $class->SUPER::new;
391              
392 1         35 $self->{_conn} = shift;
393 1         4 weaken($self->{_conn});
394              
395 1         3 return $self;
396             }
397              
398             sub on_packet {
399 2     2   119 my $self = shift;
400 2         3 my $packet = shift;
401              
402 2 50       6 return unless $self->{_conn};
403 2         10 $self->{_conn}->process_packet( $packet );
404             }
405              
406             sub on_error {
407 0     0     my $self = shift;
408              
409 0 0         return unless $self->{_conn};
410 0           $self->{_conn}->mark_unsafe;
411 0           $self->{_conn}->close;
412             }
413              
414             1;