line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Gearman::Taskset; |
2
|
7
|
|
|
7
|
|
1768
|
use version; |
|
7
|
|
|
|
|
2662
|
|
|
7
|
|
|
|
|
27
|
|
3
|
|
|
|
|
|
|
$Gearman::Taskset::VERSION = qv("2.001_001"); |
4
|
|
|
|
|
|
|
|
5
|
7
|
|
|
7
|
|
507
|
use strict; |
|
7
|
|
|
|
|
9
|
|
|
7
|
|
|
|
|
121
|
|
6
|
7
|
|
|
7
|
|
21
|
use warnings; |
|
7
|
|
|
|
|
6
|
|
|
7
|
|
|
|
|
275
|
|
7
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
=head1 NAME |
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
Gearman::Taskset - a taskset in Gearman, from the point of view of a client |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
=head1 SYNOPSIS |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
use Gearman::Client; |
15
|
|
|
|
|
|
|
my $client = Gearman::Client->new; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
# waiting on a set of tasks in parallel |
18
|
|
|
|
|
|
|
my $ts = $client->new_task_set; |
19
|
|
|
|
|
|
|
$ts->add_task( "add" => "1+2", {...}); |
20
|
|
|
|
|
|
|
$ts->wait(); |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
=head1 DESCRIPTION |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 METHODS |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
=cut |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
use fields ( |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
# { handle => [Task, ...] } |
34
|
7
|
|
|
|
|
36
|
'waiting', |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
# Gearman::Client |
37
|
|
|
|
|
|
|
'client', |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
# arrayref |
40
|
|
|
|
|
|
|
'need_handle', |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
# default socket (non-merged requests) |
43
|
|
|
|
|
|
|
'default_sock', |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
# default socket's ip/port |
46
|
|
|
|
|
|
|
'default_sockaddr', |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
# { hostport => socket } |
49
|
|
|
|
|
|
|
'loaned_sock', |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
# bool, if taskset has been cancelled mid-processing |
52
|
|
|
|
|
|
|
'cancelled', |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
# hookname -> coderef |
55
|
|
|
|
|
|
|
'hooks', |
56
|
7
|
|
|
7
|
|
398
|
); |
|
7
|
|
|
|
|
1068
|
|
57
|
|
|
|
|
|
|
|
58
|
7
|
|
|
7
|
|
494
|
use Carp (); |
|
7
|
|
|
|
|
7
|
|
|
7
|
|
|
|
|
75
|
|
59
|
7
|
|
|
7
|
|
758
|
use Gearman::Util (); |
|
7
|
|
|
|
|
8
|
|
|
7
|
|
|
|
|
99
|
|
60
|
7
|
|
|
7
|
|
2434
|
use Gearman::ResponseParser::Taskset; |
|
7
|
|
|
|
|
12
|
|
|
7
|
|
|
|
|
132
|
|
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# i thought about weakening taskset's client, but might be too weak. |
63
|
7
|
|
|
7
|
|
25
|
use Scalar::Util (); |
|
7
|
|
|
|
|
8
|
|
|
7
|
|
|
|
|
69
|
|
64
|
7
|
|
|
7
|
|
490
|
use Socket (); |
|
7
|
|
|
|
|
2540
|
|
|
7
|
|
|
|
|
75
|
|
65
|
7
|
|
|
7
|
|
18
|
use Time::HiRes (); |
|
7
|
|
|
|
|
6
|
|
|
7
|
|
|
|
|
10906
|
|
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=head2 new($client) |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
=cut |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
sub new { |
72
|
5
|
|
|
5
|
1
|
1390
|
my ($self, $client) = @_; |
73
|
5
|
100
|
66
|
|
|
69
|
(Scalar::Util::blessed($client) && $client->isa("Gearman::Client")) |
74
|
|
|
|
|
|
|
|| Carp::croak |
75
|
|
|
|
|
|
|
"provided client argument is not a Gearman::Client reference"; |
76
|
|
|
|
|
|
|
|
77
|
4
|
50
|
|
|
|
15
|
unless (ref $self) { |
78
|
4
|
|
|
|
|
13
|
$self = fields::new($self); |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
4
|
|
|
|
|
236
|
$self->{waiting} = {}; |
82
|
4
|
|
|
|
|
9
|
$self->{need_handle} = []; |
83
|
4
|
|
|
|
|
9
|
$self->{client} = $client; |
84
|
4
|
|
|
|
|
6
|
$self->{loaned_sock} = {}; |
85
|
4
|
|
|
|
|
8
|
$self->{cancelled} = 0; |
86
|
4
|
|
|
|
|
7
|
$self->{hooks} = {}; |
87
|
|
|
|
|
|
|
|
88
|
4
|
|
|
|
|
9
|
return $self; |
89
|
|
|
|
|
|
|
} ## end sub new |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
sub DESTROY { |
92
|
3
|
|
|
3
|
|
641
|
my $self = shift; |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
# During global cleanup this may be called out of order, and the client my not exist in the taskset. |
95
|
3
|
50
|
|
|
|
15
|
return unless $self->{client}; |
96
|
|
|
|
|
|
|
|
97
|
3
|
50
|
|
|
|
11
|
if ($self->{default_sock}) { |
98
|
|
|
|
|
|
|
$self->{client} |
99
|
0
|
|
|
|
|
0
|
->_put_js_sock($self->{default_sockaddr}, $self->{default_sock}); |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
|
102
|
3
|
|
|
|
|
5
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
3
|
|
|
|
|
95
|
|
103
|
0
|
|
|
|
|
0
|
$self->{client}->_put_js_sock($hp, $sock); |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
} ## end sub DESTROY |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=head2 run_hook($name) |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
run a hook callback if defined |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
=cut |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
sub run_hook { |
114
|
3
|
|
|
3
|
1
|
7
|
my ($self, $name) = (shift, shift); |
115
|
3
|
100
|
33
|
|
|
17
|
($name && $self->{hooks}->{$name}) || return; |
116
|
|
|
|
|
|
|
|
117
|
1
|
|
|
|
|
3
|
eval { $self->{hooks}->{$name}->(@_) }; |
|
1
|
|
|
|
|
4
|
|
118
|
|
|
|
|
|
|
|
119
|
1
|
50
|
|
|
|
8
|
warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@; |
120
|
|
|
|
|
|
|
} ## end sub run_hook |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
=head2 add_hook($name, [$cb]) |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
add a hook |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=cut |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
sub add_hook { |
129
|
2
|
|
|
2
|
1
|
612
|
my ($self, $name, $cb) = @_; |
130
|
2
|
50
|
|
|
|
7
|
$name || return; |
131
|
|
|
|
|
|
|
|
132
|
2
|
100
|
|
|
|
6
|
if ($cb) { |
133
|
1
|
|
|
|
|
10
|
$self->{hooks}->{$name} = $cb; |
134
|
|
|
|
|
|
|
} |
135
|
|
|
|
|
|
|
else { |
136
|
1
|
|
|
|
|
7
|
delete $self->{hooks}->{$name}; |
137
|
|
|
|
|
|
|
} |
138
|
|
|
|
|
|
|
} ## end sub add_hook |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
=head2 client () |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
this method is part of the "Taskset" interface, also implemented by |
143
|
|
|
|
|
|
|
Gearman::Client::Async, where no tasksets make sense, so instead the |
144
|
|
|
|
|
|
|
Gearman::Client::Async object itself is also its taskset. (the |
145
|
|
|
|
|
|
|
client tracks all tasks). so don't change this, without being aware |
146
|
|
|
|
|
|
|
of Gearman::Client::Async. similarly, don't access $ts->{client} without |
147
|
|
|
|
|
|
|
going via this accessor. |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
=cut |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
sub client { |
152
|
7
|
|
|
7
|
1
|
5113
|
return shift->{client}; |
153
|
|
|
|
|
|
|
} |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
=head2 cancel() |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
=cut |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
sub cancel { |
160
|
1
|
|
|
1
|
1
|
909
|
my $self = shift; |
161
|
|
|
|
|
|
|
|
162
|
1
|
|
|
|
|
2
|
$self->{cancelled} = 1; |
163
|
|
|
|
|
|
|
|
164
|
1
|
50
|
|
|
|
6
|
if ($self->{default_sock}) { |
165
|
1
|
|
|
|
|
2
|
close($self->{default_sock}); |
166
|
1
|
|
|
|
|
1
|
$self->{default_sock} = undef; |
167
|
|
|
|
|
|
|
} |
168
|
|
|
|
|
|
|
|
169
|
1
|
|
|
|
|
7
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
2
|
|
|
|
|
21
|
|
170
|
1
|
|
|
|
|
6
|
$sock->close; |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
|
173
|
1
|
|
|
|
|
2
|
$self->{waiting} = {}; |
174
|
1
|
|
|
|
|
2
|
$self->{need_handle} = []; |
175
|
1
|
|
|
|
|
2
|
$self->{client} = undef; |
176
|
|
|
|
|
|
|
} ## end sub cancel |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
#=head2 _get_loaned_sock($hostport) |
179
|
|
|
|
|
|
|
# |
180
|
|
|
|
|
|
|
#=cut |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
sub _get_loaned_sock { |
183
|
0
|
|
|
0
|
|
0
|
my ($self, $hostport) = @_; |
184
|
|
|
|
|
|
|
|
185
|
0
|
0
|
|
|
|
0
|
if (my $sock = $self->{loaned_sock}{$hostport}) { |
186
|
0
|
0
|
|
|
|
0
|
return $sock if $sock->connected; |
187
|
0
|
|
|
|
|
0
|
delete $self->{loaned_sock}{$hostport}; |
188
|
|
|
|
|
|
|
} |
189
|
|
|
|
|
|
|
|
190
|
0
|
|
|
|
|
0
|
my $sock = $self->{client}->_get_js_sock($hostport); |
191
|
0
|
|
|
|
|
0
|
return $self->{loaned_sock}{$hostport} = $sock; |
192
|
|
|
|
|
|
|
} ## end sub _get_loaned_sock |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=head2 wait(%opts) |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
event loop for reading in replies |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
=cut |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
sub wait { |
201
|
0
|
|
|
0
|
1
|
0
|
my ($self, %opts) = @_; |
202
|
0
|
|
|
|
|
0
|
my $timeout; |
203
|
0
|
0
|
|
|
|
0
|
if (exists $opts{timeout}) { |
204
|
0
|
|
|
|
|
0
|
$timeout = delete $opts{timeout}; |
205
|
0
|
0
|
|
|
|
0
|
$timeout += Time::HiRes::time() if defined $timeout; |
206
|
|
|
|
|
|
|
} |
207
|
|
|
|
|
|
|
|
208
|
0
|
0
|
|
|
|
0
|
Carp::carp "Unknown options: " |
209
|
|
|
|
|
|
|
. join(',', keys %opts) |
210
|
|
|
|
|
|
|
. " passed to Taskset->wait." |
211
|
|
|
|
|
|
|
if keys %opts; |
212
|
|
|
|
|
|
|
|
213
|
0
|
|
|
|
|
0
|
my %parser; # fd -> Gearman::ResponseParser object |
214
|
|
|
|
|
|
|
|
215
|
0
|
|
|
|
|
0
|
my ($rin, $rout, $eout) = ('', '', ''); |
216
|
0
|
|
|
|
|
0
|
my %watching; |
217
|
|
|
|
|
|
|
|
218
|
0
|
|
|
|
|
0
|
for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
219
|
0
|
0
|
|
|
|
0
|
next unless $sock; |
220
|
0
|
|
|
|
|
0
|
my $fd = $sock->fileno; |
221
|
0
|
|
|
|
|
0
|
vec($rin, $fd, 1) = 1; |
222
|
0
|
|
|
|
|
0
|
$watching{$fd} = $sock; |
223
|
|
|
|
|
|
|
} ## end for my $sock ($self->{default_sock...}) |
224
|
|
|
|
|
|
|
|
225
|
0
|
|
|
|
|
0
|
my $tries = 0; |
226
|
0
|
|
0
|
|
|
0
|
while (!$self->{cancelled} && keys %{ $self->{waiting} }) { |
|
0
|
|
|
|
|
0
|
|
227
|
0
|
|
|
|
|
0
|
$tries++; |
228
|
|
|
|
|
|
|
|
229
|
0
|
0
|
|
|
|
0
|
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5; |
230
|
0
|
|
|
|
|
0
|
my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left) |
231
|
|
|
|
|
|
|
; # TODO drop the eout. |
232
|
0
|
0
|
0
|
|
|
0
|
if ($timeout && $time_left <= 0) { |
233
|
0
|
|
|
|
|
0
|
$self->cancel; |
234
|
0
|
|
|
|
|
0
|
return; |
235
|
|
|
|
|
|
|
} |
236
|
0
|
0
|
|
|
|
0
|
next if !$nfound; |
237
|
|
|
|
|
|
|
|
238
|
0
|
|
|
|
|
0
|
foreach my $fd (keys %watching) { |
239
|
0
|
0
|
|
|
|
0
|
next unless vec($rout, $fd, 1); |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
# TODO: deal with error vector |
242
|
|
|
|
|
|
|
|
243
|
0
|
|
|
|
|
0
|
my $sock = $watching{$fd}; |
244
|
0
|
|
0
|
|
|
0
|
my $parser = $parser{$fd} |
245
|
|
|
|
|
|
|
||= Gearman::ResponseParser::Taskset->new( |
246
|
|
|
|
|
|
|
source => $sock, |
247
|
|
|
|
|
|
|
taskset => $self |
248
|
|
|
|
|
|
|
); |
249
|
0
|
|
|
|
|
0
|
eval { $parser->parse_sock($sock); }; |
|
0
|
|
|
|
|
0
|
|
250
|
|
|
|
|
|
|
|
251
|
0
|
0
|
|
|
|
0
|
if ($@) { |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail. |
254
|
|
|
|
|
|
|
# We're not in an accessible place here, so if all job servers fail we must die to prevent hanging. |
255
|
0
|
|
|
|
|
0
|
Carp::croak("Job server failure: $@"); |
256
|
|
|
|
|
|
|
} ## end if ($@) |
257
|
|
|
|
|
|
|
} ## end foreach my $fd (keys %watching) |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
} ## end while (!$self->{cancelled...}) |
260
|
|
|
|
|
|
|
} ## end sub wait |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
=head2 add_task(Gearman::Task) |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr> |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
C<$opts_hr> see L |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
=cut |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
sub add_task { |
271
|
3
|
|
|
3
|
1
|
647
|
my $self = shift; |
272
|
3
|
|
|
|
|
11
|
my $task = $self->client()->_get_task_from_args(@_); |
273
|
|
|
|
|
|
|
|
274
|
2
|
|
|
|
|
9
|
$task->taskset($self); |
275
|
|
|
|
|
|
|
|
276
|
2
|
|
|
|
|
5
|
$self->run_hook('add_task', $self, $task); |
277
|
|
|
|
|
|
|
|
278
|
2
|
|
|
|
|
4
|
my $jssock = $task->{jssock}; |
279
|
|
|
|
|
|
|
|
280
|
2
|
50
|
|
|
|
10
|
return $task->fail("undefined jssock") unless ($jssock); |
281
|
|
|
|
|
|
|
|
282
|
0
|
|
|
|
|
0
|
my $req = $task->pack_submit_packet($self->client); |
283
|
0
|
|
|
|
|
0
|
my $len = length($req); |
284
|
0
|
|
|
|
|
0
|
my $rv = $jssock->syswrite($req, $len); |
285
|
0
|
|
0
|
|
|
0
|
$rv ||= 0; |
286
|
0
|
0
|
|
|
|
0
|
Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len; |
287
|
|
|
|
|
|
|
|
288
|
0
|
|
|
|
|
0
|
push @{ $self->{need_handle} }, $task; |
|
0
|
|
|
|
|
0
|
|
289
|
0
|
|
|
|
|
0
|
while (@{ $self->{need_handle} }) { |
|
0
|
|
|
|
|
0
|
|
290
|
|
|
|
|
|
|
my $rv |
291
|
|
|
|
|
|
|
= $self->_wait_for_packet($jssock, |
292
|
0
|
|
|
|
|
0
|
$self->{client}->{command_timeout}); |
293
|
0
|
0
|
|
|
|
0
|
if (!$rv) { |
294
|
|
|
|
|
|
|
# ditch it, it failed. |
295
|
|
|
|
|
|
|
# this will resubmit it if it failed. |
296
|
0
|
|
|
|
|
0
|
shift @{ $self->{need_handle} }; |
|
0
|
|
|
|
|
0
|
|
297
|
0
|
0
|
|
|
|
0
|
return $task->fail( |
298
|
|
|
|
|
|
|
join(' ', |
299
|
|
|
|
|
|
|
"no rv on waiting for packet", |
300
|
|
|
|
|
|
|
defined($rv) ? $rv : $!) |
301
|
|
|
|
|
|
|
); |
302
|
|
|
|
|
|
|
} ## end if (!$rv) |
303
|
|
|
|
|
|
|
} ## end while (@{ $self->{need_handle...}}) |
304
|
|
|
|
|
|
|
|
305
|
0
|
|
|
|
|
0
|
return $task->handle; |
306
|
|
|
|
|
|
|
} ## end sub add_task |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
# |
309
|
|
|
|
|
|
|
# _get_default_sock() |
310
|
|
|
|
|
|
|
# used in Gearman::Task->taskset only |
311
|
|
|
|
|
|
|
# |
312
|
|
|
|
|
|
|
sub _get_default_sock { |
313
|
10
|
|
|
10
|
|
3855
|
my $self = shift; |
314
|
10
|
50
|
|
|
|
25
|
return $self->{default_sock} if $self->{default_sock}; |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
my $getter = sub { |
317
|
0
|
|
|
0
|
|
0
|
my $hostport = shift; |
318
|
|
|
|
|
|
|
return $self->{loaned_sock}{$hostport} |
319
|
0
|
|
0
|
|
|
0
|
|| $self->{client}->_get_js_sock($hostport); |
320
|
10
|
|
|
|
|
28
|
}; |
321
|
|
|
|
|
|
|
|
322
|
10
|
|
|
|
|
35
|
my ($jst, $jss) = $self->{client}->_get_random_js_sock($getter); |
323
|
10
|
50
|
|
|
|
55
|
return unless $jss; |
324
|
0
|
|
0
|
|
|
0
|
$self->{loaned_sock}{$jst} ||= $jss; |
325
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
0
|
$self->{default_sock} = $jss; |
327
|
0
|
|
|
|
|
0
|
$self->{default_sockaddr} = $jst; |
328
|
|
|
|
|
|
|
|
329
|
0
|
|
|
|
|
0
|
return $jss; |
330
|
|
|
|
|
|
|
} ## end sub _get_default_sock |
331
|
|
|
|
|
|
|
|
332
|
|
|
|
|
|
|
# |
333
|
|
|
|
|
|
|
# _get_hashed_sock($hv) |
334
|
|
|
|
|
|
|
# |
335
|
|
|
|
|
|
|
# only used in Gearman::Task->taskset only |
336
|
|
|
|
|
|
|
# |
337
|
|
|
|
|
|
|
# return a socket |
338
|
|
|
|
|
|
|
sub _get_hashed_sock { |
339
|
2
|
|
|
2
|
|
2
|
my $self = shift; |
340
|
2
|
|
|
|
|
2
|
my $hv = shift; |
341
|
|
|
|
|
|
|
|
342
|
2
|
|
|
|
|
3
|
my $cl = $self->client; |
343
|
2
|
|
|
|
|
2
|
my $sock; |
344
|
2
|
|
|
|
|
6
|
for (my $off = 0; $off < $cl->{js_count}; $off++) { |
345
|
0
|
|
|
|
|
0
|
my $idx = ($hv + $off) % ($cl->{js_count}); |
346
|
0
|
|
|
|
|
0
|
$sock = $self->_get_loaned_sock($cl->{job_servers}[$idx]); |
347
|
0
|
|
|
|
|
0
|
last; |
348
|
|
|
|
|
|
|
} |
349
|
|
|
|
|
|
|
|
350
|
2
|
|
|
|
|
4
|
return $sock; |
351
|
|
|
|
|
|
|
} ## end sub _get_hashed_sock |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
# |
354
|
|
|
|
|
|
|
# _wait_for_packet($sock, $timeout) |
355
|
|
|
|
|
|
|
# |
356
|
|
|
|
|
|
|
# $sock socket to singularly read from |
357
|
|
|
|
|
|
|
# |
358
|
|
|
|
|
|
|
# returns boolean when given a sock to wait on. |
359
|
|
|
|
|
|
|
# otherwise, return value is undefined. |
360
|
|
|
|
|
|
|
sub _wait_for_packet { |
361
|
1
|
|
|
1
|
|
541
|
my ($self, $sock, $timeout) = @_; |
362
|
|
|
|
|
|
|
|
363
|
|
|
|
|
|
|
#TODO check $err after read |
364
|
1
|
|
|
|
|
2
|
my $err; |
365
|
1
|
|
|
|
|
7
|
my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout); |
366
|
|
|
|
|
|
|
|
367
|
0
|
0
|
|
|
|
0
|
return $res ? $self->process_packet($res, $sock) : 0; |
368
|
|
|
|
|
|
|
} ## end sub _wait_for_packet |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
# |
371
|
|
|
|
|
|
|
# _is_port($sock) |
372
|
|
|
|
|
|
|
# |
373
|
|
|
|
|
|
|
# return hostport || ipport |
374
|
|
|
|
|
|
|
# |
375
|
|
|
|
|
|
|
sub _ip_port { |
376
|
1
|
|
|
1
|
|
1
|
my ($self, $sock) = @_; |
377
|
1
|
50
|
|
|
|
4
|
$sock || return; |
378
|
|
|
|
|
|
|
|
379
|
0
|
|
|
|
|
0
|
my $pn = getpeername($sock); |
380
|
0
|
0
|
|
|
|
0
|
$pn || return; |
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
# look for a hostport in loaned_sock |
383
|
0
|
|
|
|
|
0
|
my $hostport; |
384
|
0
|
|
|
|
|
0
|
while (my ($hp, $s) = each %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
385
|
0
|
0
|
|
|
|
0
|
$s || next; |
386
|
0
|
0
|
|
|
|
0
|
if ($sock == $s) { |
387
|
0
|
|
|
|
|
0
|
$hostport = $hp; |
388
|
0
|
|
|
|
|
0
|
last; |
389
|
|
|
|
|
|
|
} |
390
|
|
|
|
|
|
|
} ## end while (my ($hp, $s) = each...) |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
# hopefully it solves client->get_status mismatch |
393
|
0
|
0
|
|
|
|
0
|
$hostport && return $hostport; |
394
|
|
|
|
|
|
|
|
395
|
0
|
|
|
|
|
0
|
my $fam = Socket::sockaddr_family($pn); |
396
|
0
|
0
|
|
|
|
0
|
my ($port, $iaddr) |
397
|
|
|
|
|
|
|
= ($fam == Socket::AF_INET6) |
398
|
|
|
|
|
|
|
? Socket::sockaddr_in6($pn) |
399
|
|
|
|
|
|
|
: Socket::sockaddr_in($pn); |
400
|
|
|
|
|
|
|
|
401
|
0
|
|
|
|
|
0
|
my $addr = Socket::inet_ntop($fam, $iaddr); |
402
|
|
|
|
|
|
|
|
403
|
0
|
|
|
|
|
0
|
return join ':', $addr, $port; |
404
|
|
|
|
|
|
|
} ## end sub _ip_port |
405
|
|
|
|
|
|
|
|
406
|
|
|
|
|
|
|
# |
407
|
|
|
|
|
|
|
# _fail_jshandle($shandle) |
408
|
|
|
|
|
|
|
# |
409
|
|
|
|
|
|
|
# note the failure of a task given by its jobserver-specific handle |
410
|
|
|
|
|
|
|
# |
411
|
|
|
|
|
|
|
sub _fail_jshandle { |
412
|
4
|
|
|
4
|
|
1046
|
my ($self, $shandle) = @_; |
413
|
4
|
100
|
|
|
|
25
|
$shandle |
414
|
|
|
|
|
|
|
or Carp::croak "_fail_jshandle() called without shandle parameter"; |
415
|
|
|
|
|
|
|
|
416
|
3
|
50
|
|
|
|
44
|
my $task_list = $self->{waiting}{$shandle} |
417
|
|
|
|
|
|
|
or Carp::croak "Uhhhh: got work_fail for unknown handle: $shandle"; |
418
|
|
|
|
|
|
|
|
419
|
0
|
|
|
|
|
0
|
my $task = shift @$task_list; |
420
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
421
|
|
|
|
|
|
|
or Carp::croak |
422
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_fail for handle $shandle\n"; |
423
|
|
|
|
|
|
|
|
424
|
0
|
|
|
|
|
0
|
$task->fail("jshandle fail"); |
425
|
0
|
0
|
|
|
|
0
|
delete $self->{waiting}{$shandle} unless @$task_list; |
426
|
|
|
|
|
|
|
} ## end sub _fail_jshandle |
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
=head2 process_packet($res, $sock) |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
=cut |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
sub process_packet { |
433
|
9
|
|
|
9
|
1
|
16
|
my ($self, $res, $sock) = @_; |
434
|
|
|
|
|
|
|
|
435
|
9
|
100
|
|
|
|
23
|
if ($res->{type} eq "job_created") { |
436
|
2
|
|
|
|
|
3
|
my $task = shift @{ $self->{need_handle} }; |
|
2
|
|
|
|
|
5
|
|
437
|
2
|
100
|
66
|
|
|
25
|
($task && ref($task) eq "Gearman::Task") |
438
|
|
|
|
|
|
|
or Carp::croak "Um, got an unexpected job_created notification"; |
439
|
1
|
|
|
|
|
1
|
my $shandle = ${ $res->{'blobref'} }; |
|
1
|
|
|
|
|
3
|
|
440
|
1
|
|
|
|
|
3
|
my $ipport = $self->_ip_port($sock); |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
# did sock become disconnected in the meantime? |
443
|
1
|
50
|
|
|
|
6
|
if (!$ipport) { |
444
|
1
|
|
|
|
|
3
|
$self->_fail_jshandle($shandle); |
445
|
0
|
|
|
|
|
0
|
return 1; |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
|
448
|
0
|
|
|
|
|
0
|
$task->handle("$ipport//$shandle"); |
449
|
0
|
0
|
|
|
|
0
|
return 1 if $task->{background}; |
450
|
0
|
|
0
|
|
|
0
|
push @{ $self->{waiting}{$shandle} ||= [] }, $task; |
|
0
|
|
|
|
|
0
|
|
451
|
0
|
|
|
|
|
0
|
return 1; |
452
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "job_created") |
453
|
|
|
|
|
|
|
|
454
|
7
|
100
|
|
|
|
17
|
if ($res->{type} eq "work_fail") { |
455
|
1
|
|
|
|
|
2
|
my $shandle = ${ $res->{'blobref'} }; |
|
1
|
|
|
|
|
2
|
|
456
|
1
|
|
|
|
|
3
|
$self->_fail_jshandle($shandle); |
457
|
0
|
|
|
|
|
0
|
return 1; |
458
|
|
|
|
|
|
|
} |
459
|
|
|
|
|
|
|
|
460
|
6
|
|
|
|
|
17
|
my $qr = qr/(.+?)\0/; |
461
|
|
|
|
|
|
|
|
462
|
6
|
100
|
|
|
|
16
|
if ($res->{type} eq "work_complete") { |
463
|
2
|
100
|
|
|
|
2
|
(${ $res->{'blobref'} } =~ /^$qr/) |
|
2
|
|
|
|
|
56
|
|
464
|
|
|
|
|
|
|
or Carp::croak "Bogus work_complete from server"; |
465
|
1
|
|
|
|
|
1
|
${ $res->{'blobref'} } =~ s/^$qr//; |
|
1
|
|
|
|
|
24
|
|
466
|
1
|
|
|
|
|
4
|
my $shandle = $1; |
467
|
|
|
|
|
|
|
|
468
|
1
|
50
|
|
|
|
20
|
my $task_list = $self->{waiting}{$shandle} |
469
|
|
|
|
|
|
|
or Carp::croak |
470
|
|
|
|
|
|
|
"Uhhhh: got work_complete for unknown handle: $shandle\n"; |
471
|
|
|
|
|
|
|
|
472
|
0
|
|
|
|
|
0
|
my $task = shift @$task_list; |
473
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
474
|
|
|
|
|
|
|
or Carp::croak |
475
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_complete for handle $shandle\n"; |
476
|
|
|
|
|
|
|
|
477
|
0
|
|
|
|
|
0
|
$task->complete($res->{'blobref'}); |
478
|
0
|
0
|
|
|
|
0
|
delete $self->{waiting}{$shandle} unless @$task_list; |
479
|
|
|
|
|
|
|
|
480
|
0
|
|
|
|
|
0
|
return 1; |
481
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_complete") |
482
|
|
|
|
|
|
|
|
483
|
4
|
100
|
|
|
|
11
|
if ($res->{type} eq "work_exception") { |
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
# ${ $res->{'blobref'} } =~ s/^(.+?)\0// |
486
|
|
|
|
|
|
|
# or Carp::croak "Bogus work_exception from server"; |
487
|
|
|
|
|
|
|
|
488
|
2
|
100
|
|
|
|
3
|
(${ $res->{'blobref'} } =~ /^$qr/) |
|
2
|
|
|
|
|
45
|
|
489
|
|
|
|
|
|
|
or Carp::croak "Bogus work_exception from server"; |
490
|
1
|
|
|
|
|
2
|
${ $res->{'blobref'} } =~ s/^$qr//; |
|
1
|
|
|
|
|
22
|
|
491
|
1
|
|
|
|
|
4
|
my $shandle = $1; |
492
|
|
|
|
|
|
|
|
493
|
1
|
50
|
|
|
|
19
|
my $task_list = $self->{waiting}{$shandle} |
494
|
|
|
|
|
|
|
or Carp::croak |
495
|
|
|
|
|
|
|
"Uhhhh: got work_exception for unknown handle: $shandle\n"; |
496
|
|
|
|
|
|
|
|
497
|
0
|
|
|
|
|
0
|
my $task = $task_list->[0]; |
498
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
499
|
|
|
|
|
|
|
or Carp::croak |
500
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_exception for handle $shandle\n"; |
501
|
|
|
|
|
|
|
|
502
|
0
|
|
|
|
|
0
|
$task->exception($res->{'blobref'}); |
503
|
|
|
|
|
|
|
|
504
|
0
|
|
|
|
|
0
|
return 1; |
505
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_exception") |
506
|
|
|
|
|
|
|
|
507
|
2
|
100
|
|
|
|
7
|
if ($res->{type} eq "work_status") { |
508
|
1
|
|
|
|
|
3
|
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} }); |
|
1
|
|
|
|
|
5
|
|
509
|
|
|
|
|
|
|
|
510
|
1
|
50
|
|
|
|
29
|
my $task_list = $self->{waiting}{$shandle} |
511
|
|
|
|
|
|
|
or Carp::croak |
512
|
|
|
|
|
|
|
"Uhhhh: got work_status for unknown handle: $shandle\n"; |
513
|
|
|
|
|
|
|
|
514
|
|
|
|
|
|
|
# FIXME: the server is (probably) sending a work_status packet for each |
515
|
|
|
|
|
|
|
# interested client, even if the clients are the same, so probably need |
516
|
|
|
|
|
|
|
# to fix the server not to do that. just put this FIXME here for now, |
517
|
|
|
|
|
|
|
# though really it's a server issue. |
518
|
0
|
|
|
|
|
0
|
foreach my $task (@$task_list) { |
519
|
0
|
|
|
|
|
0
|
$task->status($nu, $de); |
520
|
|
|
|
|
|
|
} |
521
|
|
|
|
|
|
|
|
522
|
0
|
|
|
|
|
0
|
return 1; |
523
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_status") |
524
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
Carp::croak |
526
|
1
|
|
|
|
|
4
|
"Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]"; |
|
1
|
|
|
|
|
16
|
|
527
|
|
|
|
|
|
|
} ## end sub process_packet |
528
|
|
|
|
|
|
|
|
529
|
|
|
|
|
|
|
1; |