line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Gearman::Taskset; |
2
|
7
|
|
|
7
|
|
2873
|
use version (); |
|
7
|
|
|
|
|
3558
|
|
|
7
|
|
|
|
|
327
|
|
3
|
|
|
|
|
|
|
$Gearman::Taskset::VERSION = version->declare("2.002.002"); #TRIAL |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
|
6
|
7
|
|
|
7
|
|
33
|
use strict; |
|
7
|
|
|
|
|
8
|
|
|
7
|
|
|
|
|
161
|
|
7
|
7
|
|
|
7
|
|
31
|
use warnings; |
|
7
|
|
|
|
|
7
|
|
|
7
|
|
|
|
|
329
|
|
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
=head1 NAME |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
Gearman::Taskset - a taskset in Gearman, from the point of view of a client |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
=head1 SYNOPSIS |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
use Gearman::Client; |
16
|
|
|
|
|
|
|
my $client = Gearman::Client->new; |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
# waiting on a set of tasks in parallel |
19
|
|
|
|
|
|
|
my $ts = $client->new_task_set; |
20
|
|
|
|
|
|
|
$ts->add_task( "add" => "1+2", {...}); |
21
|
|
|
|
|
|
|
$ts->wait(); |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
=head1 DESCRIPTION |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
=head1 METHODS |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
=cut |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
use fields ( |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# { handle => [Task, ...] } |
35
|
7
|
|
|
|
|
39
|
'waiting', |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
# Gearman::Client |
38
|
|
|
|
|
|
|
'client', |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# arrayref |
41
|
|
|
|
|
|
|
'need_handle', |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# default socket (non-merged requests) |
44
|
|
|
|
|
|
|
'default_sock', |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# default socket's ip/port |
47
|
|
|
|
|
|
|
'default_sockaddr', |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
# { hostport => socket } |
50
|
|
|
|
|
|
|
'loaned_sock', |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# bool, if taskset has been cancelled mid-processing |
53
|
|
|
|
|
|
|
'cancelled', |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
# hookname -> coderef |
56
|
|
|
|
|
|
|
'hooks', |
57
|
7
|
|
|
7
|
|
484
|
); |
|
7
|
|
|
|
|
1230
|
|
58
|
|
|
|
|
|
|
|
59
|
7
|
|
|
7
|
|
590
|
use Carp (); |
|
7
|
|
|
|
|
12
|
|
|
7
|
|
|
|
|
91
|
|
60
|
7
|
|
|
7
|
|
1027
|
use Gearman::Util (); |
|
7
|
|
|
|
|
10
|
|
|
7
|
|
|
|
|
144
|
|
61
|
7
|
|
|
7
|
|
3171
|
use Gearman::ResponseParser::Taskset; |
|
7
|
|
|
|
|
15
|
|
|
7
|
|
|
|
|
188
|
|
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
# i thought about weakening taskset's client, but might be too weak. |
64
|
7
|
|
|
7
|
|
37
|
use Scalar::Util (); |
|
7
|
|
|
|
|
10
|
|
|
7
|
|
|
|
|
192
|
|
65
|
7
|
|
|
7
|
|
594
|
use Socket (); |
|
7
|
|
|
|
|
2961
|
|
|
7
|
|
|
|
|
112
|
|
66
|
7
|
|
|
7
|
|
26
|
use Time::HiRes (); |
|
7
|
|
|
|
|
10
|
|
|
7
|
|
|
|
|
15197
|
|
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=head2 new($client) |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
=cut |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
sub new { |
73
|
5
|
|
|
5
|
1
|
2553
|
my ($self, $client) = @_; |
74
|
5
|
100
|
66
|
|
|
92
|
(Scalar::Util::blessed($client) && $client->isa("Gearman::Client")) |
75
|
|
|
|
|
|
|
|| Carp::croak |
76
|
|
|
|
|
|
|
"provided client argument is not a Gearman::Client reference"; |
77
|
|
|
|
|
|
|
|
78
|
4
|
50
|
|
|
|
16
|
unless (ref $self) { |
79
|
4
|
|
|
|
|
15
|
$self = fields::new($self); |
80
|
|
|
|
|
|
|
} |
81
|
|
|
|
|
|
|
|
82
|
4
|
|
|
|
|
308
|
$self->{waiting} = {}; |
83
|
4
|
|
|
|
|
11
|
$self->{need_handle} = []; |
84
|
4
|
|
|
|
|
9
|
$self->{client} = $client; |
85
|
4
|
|
|
|
|
8
|
$self->{loaned_sock} = {}; |
86
|
4
|
|
|
|
|
8
|
$self->{cancelled} = 0; |
87
|
4
|
|
|
|
|
9
|
$self->{hooks} = {}; |
88
|
|
|
|
|
|
|
|
89
|
4
|
|
|
|
|
10
|
return $self; |
90
|
|
|
|
|
|
|
} ## end sub new |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
sub DESTROY { |
93
|
3
|
|
|
3
|
|
1035
|
my $self = shift; |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
# During global cleanup this may be called out of order, and the client my not exist in the taskset. |
96
|
3
|
50
|
|
|
|
19
|
return unless $self->{client}; |
97
|
|
|
|
|
|
|
|
98
|
3
|
50
|
|
|
|
16
|
if ($self->{default_sock}) { |
99
|
|
|
|
|
|
|
$self->client->_sock_cache($self->{default_sockaddr}, |
100
|
0
|
|
|
|
|
0
|
$self->{default_sock}); |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
|
103
|
3
|
|
|
|
|
8
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
3
|
|
|
|
|
134
|
|
104
|
0
|
|
|
|
|
0
|
$self->client->_sock_cache($hp, $sock); |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
} ## end sub DESTROY |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=head2 run_hook($name) |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
run a hook callback if defined |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=cut |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
sub run_hook { |
115
|
3
|
|
|
3
|
1
|
14
|
my ($self, $name) = (shift, shift); |
116
|
3
|
100
|
33
|
|
|
28
|
($name && $self->{hooks}->{$name}) || return; |
117
|
|
|
|
|
|
|
|
118
|
1
|
|
|
|
|
4
|
eval { $self->{hooks}->{$name}->(@_) }; |
|
1
|
|
|
|
|
7
|
|
119
|
|
|
|
|
|
|
|
120
|
1
|
50
|
|
|
|
12
|
warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@; |
121
|
|
|
|
|
|
|
} ## end sub run_hook |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
=head2 add_hook($name, [$cb]) |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
add a hook |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
=cut |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
sub add_hook { |
130
|
2
|
|
|
2
|
1
|
1482
|
my ($self, $name, $cb) = @_; |
131
|
2
|
50
|
|
|
|
12
|
$name || return; |
132
|
|
|
|
|
|
|
|
133
|
2
|
100
|
|
|
|
9
|
if ($cb) { |
134
|
1
|
|
|
|
|
15
|
$self->{hooks}->{$name} = $cb; |
135
|
|
|
|
|
|
|
} |
136
|
|
|
|
|
|
|
else { |
137
|
1
|
|
|
|
|
10
|
delete $self->{hooks}->{$name}; |
138
|
|
|
|
|
|
|
} |
139
|
|
|
|
|
|
|
} ## end sub add_hook |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=head2 client () |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
this method is part of the "Taskset" interface, also implemented by |
144
|
|
|
|
|
|
|
Gearman::Client::Async, where no tasksets make sense, so instead the |
145
|
|
|
|
|
|
|
Gearman::Client::Async object itself is also its taskset. (the |
146
|
|
|
|
|
|
|
client tracks all tasks). so don't change this, without being aware |
147
|
|
|
|
|
|
|
of Gearman::Client::Async. similarly, don't access $ts->{client} without |
148
|
|
|
|
|
|
|
going via this accessor. |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
=cut |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
sub client { |
153
|
19
|
|
|
19
|
1
|
12070
|
return shift->{client}; |
154
|
|
|
|
|
|
|
} |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
=head2 cancel() |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
Close sockets, cleanup internals. |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
=cut |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
sub cancel { |
163
|
1
|
|
|
1
|
1
|
1889
|
my $self = shift; |
164
|
|
|
|
|
|
|
|
165
|
1
|
|
|
|
|
3
|
$self->{cancelled} = 1; |
166
|
|
|
|
|
|
|
|
167
|
1
|
50
|
|
|
|
9
|
if ($self->{default_sock}) { |
168
|
1
|
|
|
|
|
6
|
close($self->{default_sock}); |
169
|
1
|
|
|
|
|
5
|
$self->{default_sock} = undef; |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
|
172
|
1
|
|
|
|
|
12
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
2
|
|
|
|
|
50
|
|
173
|
1
|
|
|
|
|
12
|
$sock->close; |
174
|
|
|
|
|
|
|
} |
175
|
|
|
|
|
|
|
|
176
|
1
|
|
|
|
|
6
|
$self->{waiting} = {}; |
177
|
1
|
|
|
|
|
4
|
$self->{need_handle} = []; |
178
|
1
|
|
|
|
|
4
|
$self->{client} = undef; |
179
|
|
|
|
|
|
|
} ## end sub cancel |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
# |
182
|
|
|
|
|
|
|
# _get_loaned_sock($js) |
183
|
|
|
|
|
|
|
# |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub _get_loaned_sock { |
186
|
0
|
|
|
0
|
|
0
|
my ($self, $js) = @_; |
187
|
0
|
|
|
|
|
0
|
my $js_str = $self->client()->_js_str($js); |
188
|
|
|
|
|
|
|
|
189
|
0
|
0
|
|
|
|
0
|
if (my $sock = $self->{loaned_sock}{$js_str}) { |
190
|
0
|
0
|
|
|
|
0
|
return $sock if $sock->connected; |
191
|
0
|
|
|
|
|
0
|
delete $self->{loaned_sock}{$js_str}; |
192
|
|
|
|
|
|
|
} |
193
|
|
|
|
|
|
|
|
194
|
0
|
|
|
|
|
0
|
my $sock = $self->client()->_get_js_sock($js); |
195
|
|
|
|
|
|
|
|
196
|
0
|
|
|
|
|
0
|
return $self->{loaned_sock}{$js_str} = $sock; |
197
|
|
|
|
|
|
|
} ## end sub _get_loaned_sock |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
=head2 wait(%opts) |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
event loop for reading in replies |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
=cut |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
sub wait { |
206
|
0
|
|
|
0
|
1
|
0
|
my ($self, %opts) = @_; |
207
|
0
|
|
|
|
|
0
|
my $timeout; |
208
|
0
|
0
|
|
|
|
0
|
if (exists $opts{timeout}) { |
209
|
0
|
|
|
|
|
0
|
$timeout = delete $opts{timeout}; |
210
|
0
|
0
|
|
|
|
0
|
$timeout += Time::HiRes::time() if defined $timeout; |
211
|
|
|
|
|
|
|
} |
212
|
|
|
|
|
|
|
|
213
|
0
|
0
|
|
|
|
0
|
Carp::carp "Unknown options: " |
214
|
|
|
|
|
|
|
. join(',', keys %opts) |
215
|
|
|
|
|
|
|
. " passed to Taskset->wait." |
216
|
|
|
|
|
|
|
if keys %opts; |
217
|
|
|
|
|
|
|
|
218
|
0
|
|
|
|
|
0
|
my %parser; # fd -> Gearman::ResponseParser object |
219
|
|
|
|
|
|
|
|
220
|
0
|
|
|
|
|
0
|
my ($rin, $rout, $eout) = ('', '', ''); |
221
|
0
|
|
|
|
|
0
|
my %watching; |
222
|
|
|
|
|
|
|
|
223
|
0
|
|
|
|
|
0
|
for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
224
|
0
|
0
|
|
|
|
0
|
next unless $sock; |
225
|
0
|
0
|
|
|
|
0
|
if (my $fd = $sock->fileno) { |
226
|
0
|
|
|
|
|
0
|
vec($rin, $fd, 1) = 1; |
227
|
0
|
|
|
|
|
0
|
$watching{$fd} = $sock; |
228
|
|
|
|
|
|
|
} |
229
|
|
|
|
|
|
|
} ## end for my $sock ($self->{default_sock...}) |
230
|
|
|
|
|
|
|
|
231
|
0
|
|
0
|
|
|
0
|
while (!$self->{cancelled} && keys %{ $self->{waiting} }) { |
|
0
|
|
|
|
|
0
|
|
232
|
0
|
0
|
|
|
|
0
|
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5; |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
# TODO drop the eout. |
235
|
0
|
|
|
|
|
0
|
my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left); |
236
|
0
|
0
|
0
|
|
|
0
|
if ($timeout && $time_left <= 0) { |
237
|
0
|
|
|
|
|
0
|
$self->cancel; |
238
|
0
|
|
|
|
|
0
|
return; |
239
|
|
|
|
|
|
|
} |
240
|
0
|
0
|
|
|
|
0
|
next if !$nfound; |
241
|
|
|
|
|
|
|
|
242
|
0
|
|
|
|
|
0
|
foreach my $fd (keys %watching) { |
243
|
0
|
0
|
|
|
|
0
|
next unless vec($rout, $fd, 1); |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
# TODO: deal with error vector |
246
|
0
|
|
|
|
|
0
|
my $sock = $watching{$fd}; |
247
|
0
|
|
0
|
|
|
0
|
my $parser = $parser{$fd} |
248
|
|
|
|
|
|
|
||= Gearman::ResponseParser::Taskset->new( |
249
|
|
|
|
|
|
|
source => $sock, |
250
|
|
|
|
|
|
|
taskset => $self |
251
|
|
|
|
|
|
|
); |
252
|
0
|
|
|
|
|
0
|
eval { $parser->parse_sock($sock); }; |
|
0
|
|
|
|
|
0
|
|
253
|
|
|
|
|
|
|
|
254
|
0
|
0
|
|
|
|
0
|
if ($@) { |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail. |
257
|
|
|
|
|
|
|
# We're not in an accessible place here, so if all job servers fail we must die to prevent hanging. |
258
|
0
|
|
|
|
|
0
|
Carp::croak("Job server failure: $@"); |
259
|
|
|
|
|
|
|
} ## end if ($@) |
260
|
|
|
|
|
|
|
} ## end foreach my $fd (keys %watching) |
261
|
|
|
|
|
|
|
} ## end while (!$self->{cancelled...}) |
262
|
|
|
|
|
|
|
} ## end sub wait |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=head2 add_task(Gearman::Task) |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr> |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
C<$opts_hr> see L |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
=cut |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
sub add_task { |
273
|
3
|
|
|
3
|
1
|
1400
|
my $self = shift; |
274
|
3
|
|
|
|
|
13
|
my $task = $self->client()->_get_task_from_args(@_); |
275
|
|
|
|
|
|
|
|
276
|
2
|
|
|
|
|
11
|
$task->taskset($self); |
277
|
|
|
|
|
|
|
|
278
|
2
|
|
|
|
|
19
|
$self->run_hook('add_task', $self, $task); |
279
|
|
|
|
|
|
|
|
280
|
2
|
|
|
|
|
3
|
my $jssock = $task->{jssock}; |
281
|
|
|
|
|
|
|
|
282
|
2
|
50
|
|
|
|
22
|
return $task->fail("undefined jssock") unless ($jssock); |
283
|
|
|
|
|
|
|
|
284
|
0
|
|
|
|
|
0
|
my $req = $task->pack_submit_packet($self->client); |
285
|
0
|
|
|
|
|
0
|
my $len = length($req); |
286
|
0
|
|
|
|
|
0
|
my $rv = $jssock->syswrite($req, $len); |
287
|
0
|
|
0
|
|
|
0
|
$rv ||= 0; |
288
|
0
|
0
|
|
|
|
0
|
Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len; |
289
|
|
|
|
|
|
|
|
290
|
0
|
|
|
|
|
0
|
push @{ $self->{need_handle} }, $task; |
|
0
|
|
|
|
|
0
|
|
291
|
0
|
|
|
|
|
0
|
while (@{ $self->{need_handle} }) { |
|
0
|
|
|
|
|
0
|
|
292
|
|
|
|
|
|
|
my $rv |
293
|
|
|
|
|
|
|
= $self->_wait_for_packet($jssock, |
294
|
0
|
|
|
|
|
0
|
$self->{client}->{command_timeout}); |
295
|
0
|
0
|
|
|
|
0
|
if (!$rv) { |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
# ditch it, it failed. |
298
|
|
|
|
|
|
|
# this will resubmit it if it failed. |
299
|
0
|
|
|
|
|
0
|
shift @{ $self->{need_handle} }; |
|
0
|
|
|
|
|
0
|
|
300
|
0
|
0
|
|
|
|
0
|
return $task->fail( |
301
|
|
|
|
|
|
|
join(' ', |
302
|
|
|
|
|
|
|
"no rv on waiting for packet", |
303
|
|
|
|
|
|
|
defined($rv) ? $rv : $!) |
304
|
|
|
|
|
|
|
); |
305
|
|
|
|
|
|
|
} ## end if (!$rv) |
306
|
|
|
|
|
|
|
} ## end while (@{ $self->{need_handle...}}) |
307
|
|
|
|
|
|
|
|
308
|
0
|
|
|
|
|
0
|
return $task->handle; |
309
|
|
|
|
|
|
|
} ## end sub add_task |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
# |
312
|
|
|
|
|
|
|
# _get_default_sock() |
313
|
|
|
|
|
|
|
# used in Gearman::Task->taskset only |
314
|
|
|
|
|
|
|
# |
315
|
|
|
|
|
|
|
sub _get_default_sock { |
316
|
10
|
|
|
10
|
|
8796
|
my $self = shift; |
317
|
10
|
50
|
|
|
|
39
|
return $self->{default_sock} if $self->{default_sock}; |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
my $getter = sub { |
320
|
0
|
|
|
0
|
|
0
|
my $js = shift; |
321
|
|
|
|
|
|
|
return $self->{loaned_sock}{$js} |
322
|
0
|
|
0
|
|
|
0
|
|| $self->{client}->_get_js_sock($js); |
323
|
10
|
|
|
|
|
42
|
}; |
324
|
|
|
|
|
|
|
|
325
|
10
|
|
|
|
|
45
|
my ($js, $jss) = $self->client()->_get_random_js_sock($getter); |
326
|
10
|
50
|
|
|
|
91
|
return unless $jss; |
327
|
|
|
|
|
|
|
|
328
|
0
|
|
|
|
|
0
|
my $js_str = $self->client()->_js_str($js); |
329
|
0
|
|
0
|
|
|
0
|
$self->{loaned_sock}{$js_str} ||= $jss; |
330
|
|
|
|
|
|
|
|
331
|
0
|
|
|
|
|
0
|
$self->{default_sock} = $jss; |
332
|
0
|
|
|
|
|
0
|
$self->{default_sockaddr} = $js_str; |
333
|
|
|
|
|
|
|
|
334
|
0
|
|
|
|
|
0
|
return $jss; |
335
|
|
|
|
|
|
|
} ## end sub _get_default_sock |
336
|
|
|
|
|
|
|
|
337
|
|
|
|
|
|
|
# |
338
|
|
|
|
|
|
|
# _get_hashed_sock($hv) |
339
|
|
|
|
|
|
|
# |
340
|
|
|
|
|
|
|
# only used in Gearman::Task->taskset only |
341
|
|
|
|
|
|
|
# |
342
|
|
|
|
|
|
|
# return a socket |
343
|
|
|
|
|
|
|
sub _get_hashed_sock { |
344
|
2
|
|
|
2
|
|
2
|
my $self = shift; |
345
|
2
|
|
|
|
|
3
|
my $hv = shift; |
346
|
|
|
|
|
|
|
my ($js_count, @job_servers) |
347
|
2
|
|
|
|
|
3
|
= ($self->client()->{js_count}, $self->client()->job_servers()); |
348
|
2
|
|
|
|
|
5
|
my $sock; |
349
|
2
|
|
|
|
|
5
|
for (my $off = 0; $off < $js_count; $off++) { |
350
|
0
|
|
|
|
|
0
|
my $idx = ($hv + $off) % ($js_count); |
351
|
0
|
|
|
|
|
0
|
$sock = $self->_get_loaned_sock($job_servers[$idx]); |
352
|
0
|
|
|
|
|
0
|
last; |
353
|
|
|
|
|
|
|
} |
354
|
|
|
|
|
|
|
|
355
|
2
|
|
|
|
|
5
|
return $sock; |
356
|
|
|
|
|
|
|
} ## end sub _get_hashed_sock |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
# |
359
|
|
|
|
|
|
|
# _wait_for_packet($sock, $timeout) |
360
|
|
|
|
|
|
|
# |
361
|
|
|
|
|
|
|
# $sock socket to singularly read from |
362
|
|
|
|
|
|
|
# |
363
|
|
|
|
|
|
|
# returns boolean when given a sock to wait on. |
364
|
|
|
|
|
|
|
# otherwise, return value is undefined. |
365
|
|
|
|
|
|
|
sub _wait_for_packet { |
366
|
1
|
|
|
1
|
|
1087
|
my ($self, $sock, $timeout) = @_; |
367
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
#TODO check $err after read |
369
|
1
|
|
|
|
|
3
|
my $err; |
370
|
1
|
|
|
|
|
8
|
my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout); |
371
|
|
|
|
|
|
|
|
372
|
0
|
0
|
|
|
|
0
|
return $res ? $self->process_packet($res, $sock) : 0; |
373
|
|
|
|
|
|
|
} ## end sub _wait_for_packet |
374
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
# |
376
|
|
|
|
|
|
|
# _is_port($sock) |
377
|
|
|
|
|
|
|
# |
378
|
|
|
|
|
|
|
# return hostport || ipport |
379
|
|
|
|
|
|
|
# |
380
|
|
|
|
|
|
|
sub _ip_port { |
381
|
1
|
|
|
1
|
|
7
|
my ($self, $sock) = @_; |
382
|
1
|
50
|
|
|
|
9
|
$sock || return; |
383
|
|
|
|
|
|
|
|
384
|
0
|
|
|
|
|
0
|
my $pn = getpeername($sock); |
385
|
0
|
0
|
|
|
|
0
|
$pn || return; |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
# look for a hostport in loaned_sock |
388
|
0
|
|
|
|
|
0
|
my $hostport; |
389
|
0
|
|
|
|
|
0
|
while (my ($hp, $s) = each %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
390
|
0
|
0
|
|
|
|
0
|
$s || next; |
391
|
0
|
0
|
|
|
|
0
|
if ($sock == $s) { |
392
|
0
|
|
|
|
|
0
|
$hostport = $hp; |
393
|
0
|
|
|
|
|
0
|
last; |
394
|
|
|
|
|
|
|
} |
395
|
|
|
|
|
|
|
} ## end while (my ($hp, $s) = each...) |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
# hopefully it solves client->get_status mismatch |
398
|
0
|
0
|
|
|
|
0
|
$hostport && return $hostport; |
399
|
|
|
|
|
|
|
|
400
|
0
|
|
|
|
|
0
|
my $fam = Socket::sockaddr_family($pn); |
401
|
0
|
0
|
|
|
|
0
|
my ($port, $iaddr) |
402
|
|
|
|
|
|
|
= ($fam == Socket::AF_INET6) |
403
|
|
|
|
|
|
|
? Socket::sockaddr_in6($pn) |
404
|
|
|
|
|
|
|
: Socket::sockaddr_in($pn); |
405
|
|
|
|
|
|
|
|
406
|
0
|
|
|
|
|
0
|
my $addr = Socket::inet_ntop($fam, $iaddr); |
407
|
|
|
|
|
|
|
|
408
|
0
|
|
|
|
|
0
|
return join ':', $addr, $port; |
409
|
|
|
|
|
|
|
} ## end sub _ip_port |
410
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
# |
412
|
|
|
|
|
|
|
# _fail_jshandle($shandle) |
413
|
|
|
|
|
|
|
# |
414
|
|
|
|
|
|
|
# note the failure of a task given by its jobserver-specific handle |
415
|
|
|
|
|
|
|
# |
416
|
|
|
|
|
|
|
sub _fail_jshandle { |
417
|
4
|
|
|
4
|
|
2231
|
my ($self, $shandle) = @_; |
418
|
4
|
100
|
|
|
|
48
|
$shandle |
419
|
|
|
|
|
|
|
or Carp::croak "_fail_jshandle() called without shandle parameter"; |
420
|
|
|
|
|
|
|
|
421
|
3
|
50
|
|
|
|
130
|
my $task_list = $self->{waiting}{$shandle} |
422
|
|
|
|
|
|
|
or Carp::croak "Uhhhh: got work_fail for unknown handle: $shandle"; |
423
|
|
|
|
|
|
|
|
424
|
0
|
|
|
|
|
0
|
my $task = shift @$task_list; |
425
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
426
|
|
|
|
|
|
|
or Carp::croak |
427
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_fail for handle $shandle\n"; |
428
|
|
|
|
|
|
|
|
429
|
0
|
|
|
|
|
0
|
$task->fail("jshandle fail"); |
430
|
0
|
0
|
|
|
|
0
|
delete $self->{waiting}{$shandle} unless @$task_list; |
431
|
|
|
|
|
|
|
} ## end sub _fail_jshandle |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
=head2 process_packet($res, $sock) |
434
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
=cut |
436
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
sub process_packet { |
438
|
9
|
|
|
9
|
1
|
21
|
my ($self, $res, $sock) = @_; |
439
|
|
|
|
|
|
|
|
440
|
9
|
100
|
|
|
|
37
|
if ($res->{type} eq "job_created") { |
441
|
2
|
|
|
|
|
2
|
my $task = shift @{ $self->{need_handle} }; |
|
2
|
|
|
|
|
9
|
|
442
|
2
|
100
|
66
|
|
|
40
|
($task && ref($task) eq "Gearman::Task") |
443
|
|
|
|
|
|
|
or Carp::croak "Um, got an unexpected job_created notification"; |
444
|
1
|
|
|
|
|
6
|
my $shandle = ${ $res->{'blobref'} }; |
|
1
|
|
|
|
|
8
|
|
445
|
1
|
|
|
|
|
10
|
my $ipport = $self->_ip_port($sock); |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
# did sock become disconnected in the meantime? |
448
|
1
|
50
|
|
|
|
9
|
if (!$ipport) { |
449
|
1
|
|
|
|
|
9
|
$self->_fail_jshandle($shandle); |
450
|
0
|
|
|
|
|
0
|
return 1; |
451
|
|
|
|
|
|
|
} |
452
|
|
|
|
|
|
|
|
453
|
0
|
|
|
|
|
0
|
$task->handle("$ipport//$shandle"); |
454
|
0
|
0
|
|
|
|
0
|
return 1 if $task->{background}; |
455
|
0
|
|
0
|
|
|
0
|
push @{ $self->{waiting}{$shandle} ||= [] }, $task; |
|
0
|
|
|
|
|
0
|
|
456
|
0
|
|
|
|
|
0
|
return 1; |
457
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "job_created") |
458
|
|
|
|
|
|
|
|
459
|
7
|
100
|
|
|
|
34
|
if ($res->{type} eq "work_fail") { |
460
|
1
|
|
|
|
|
3
|
my $shandle = ${ $res->{'blobref'} }; |
|
1
|
|
|
|
|
4
|
|
461
|
1
|
|
|
|
|
5
|
$self->_fail_jshandle($shandle); |
462
|
0
|
|
|
|
|
0
|
return 1; |
463
|
|
|
|
|
|
|
} |
464
|
|
|
|
|
|
|
|
465
|
6
|
|
|
|
|
21
|
my $qr = qr/(.+?)\0/; |
466
|
|
|
|
|
|
|
|
467
|
6
|
100
|
|
|
|
20
|
if ($res->{type} eq "work_complete") { |
468
|
2
|
100
|
|
|
|
2
|
(${ $res->{'blobref'} } =~ /^$qr/) |
|
2
|
|
|
|
|
134
|
|
469
|
|
|
|
|
|
|
or Carp::croak "Bogus work_complete from server"; |
470
|
1
|
|
|
|
|
2
|
${ $res->{'blobref'} } =~ s/^$qr//; |
|
1
|
|
|
|
|
48
|
|
471
|
1
|
|
|
|
|
8
|
my $shandle = $1; |
472
|
|
|
|
|
|
|
|
473
|
1
|
50
|
|
|
|
41
|
my $task_list = $self->{waiting}{$shandle} |
474
|
|
|
|
|
|
|
or Carp::croak |
475
|
|
|
|
|
|
|
"Uhhhh: got work_complete for unknown handle: $shandle\n"; |
476
|
|
|
|
|
|
|
|
477
|
0
|
|
|
|
|
0
|
my $task = shift @$task_list; |
478
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
479
|
|
|
|
|
|
|
or Carp::croak |
480
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_complete for handle $shandle\n"; |
481
|
|
|
|
|
|
|
|
482
|
0
|
|
|
|
|
0
|
$task->complete($res->{'blobref'}); |
483
|
0
|
0
|
|
|
|
0
|
delete $self->{waiting}{$shandle} unless @$task_list; |
484
|
|
|
|
|
|
|
|
485
|
0
|
|
|
|
|
0
|
return 1; |
486
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_complete") |
487
|
|
|
|
|
|
|
|
488
|
4
|
100
|
|
|
|
16
|
if ($res->{type} eq "work_exception") { |
489
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
# ${ $res->{'blobref'} } =~ s/^(.+?)\0// |
491
|
|
|
|
|
|
|
# or Carp::croak "Bogus work_exception from server"; |
492
|
|
|
|
|
|
|
|
493
|
2
|
100
|
|
|
|
5
|
(${ $res->{'blobref'} } =~ /^$qr/) |
|
2
|
|
|
|
|
81
|
|
494
|
|
|
|
|
|
|
or Carp::croak "Bogus work_exception from server"; |
495
|
1
|
|
|
|
|
4
|
${ $res->{'blobref'} } =~ s/^$qr//; |
|
1
|
|
|
|
|
38
|
|
496
|
1
|
|
|
|
|
7
|
my $shandle = $1; |
497
|
|
|
|
|
|
|
|
498
|
1
|
50
|
|
|
|
33
|
my $task_list = $self->{waiting}{$shandle} |
499
|
|
|
|
|
|
|
or Carp::croak |
500
|
|
|
|
|
|
|
"Uhhhh: got work_exception for unknown handle: $shandle\n"; |
501
|
|
|
|
|
|
|
|
502
|
0
|
|
|
|
|
0
|
my $task = $task_list->[0]; |
503
|
0
|
0
|
0
|
|
|
0
|
($task && ref($task) eq "Gearman::Task") |
504
|
|
|
|
|
|
|
or Carp::croak |
505
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_exception for handle $shandle\n"; |
506
|
|
|
|
|
|
|
|
507
|
0
|
|
|
|
|
0
|
$task->exception($res->{'blobref'}); |
508
|
|
|
|
|
|
|
|
509
|
0
|
|
|
|
|
0
|
return 1; |
510
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_exception") |
511
|
|
|
|
|
|
|
|
512
|
2
|
100
|
|
|
|
10
|
if ($res->{type} eq "work_status") { |
513
|
1
|
|
|
|
|
4
|
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} }); |
|
1
|
|
|
|
|
5
|
|
514
|
|
|
|
|
|
|
|
515
|
1
|
50
|
|
|
|
32
|
my $task_list = $self->{waiting}{$shandle} |
516
|
|
|
|
|
|
|
or Carp::croak |
517
|
|
|
|
|
|
|
"Uhhhh: got work_status for unknown handle: $shandle\n"; |
518
|
|
|
|
|
|
|
|
519
|
|
|
|
|
|
|
# FIXME: the server is (probably) sending a work_status packet for each |
520
|
|
|
|
|
|
|
# interested client, even if the clients are the same, so probably need |
521
|
|
|
|
|
|
|
# to fix the server not to do that. just put this FIXME here for now, |
522
|
|
|
|
|
|
|
# though really it's a server issue. |
523
|
0
|
|
|
|
|
0
|
foreach my $task (@$task_list) { |
524
|
0
|
|
|
|
|
0
|
$task->status($nu, $de); |
525
|
|
|
|
|
|
|
} |
526
|
|
|
|
|
|
|
|
527
|
0
|
|
|
|
|
0
|
return 1; |
528
|
|
|
|
|
|
|
} ## end if ($res->{type} eq "work_status") |
529
|
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
Carp::croak |
531
|
1
|
|
|
|
|
7
|
"Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]"; |
|
1
|
|
|
|
|
25
|
|
532
|
|
|
|
|
|
|
} ## end sub process_packet |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
1; |