line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Gearman::Taskset; |
2
|
7
|
|
|
7
|
|
1919
|
use version (); |
|
7
|
|
|
|
|
2701
|
|
|
7
|
|
|
|
|
221
|
|
3
|
|
|
|
|
|
|
$Gearman::Taskset::VERSION = version->declare("2.003_001"); |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
|
6
|
7
|
|
|
7
|
|
26
|
use strict; |
|
7
|
|
|
|
|
6
|
|
|
7
|
|
|
|
|
113
|
|
7
|
7
|
|
|
7
|
|
20
|
use warnings; |
|
7
|
|
|
|
|
11
|
|
|
7
|
|
|
|
|
241
|
|
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
=head1 NAME |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
Gearman::Taskset - a taskset in Gearman, from the point of view of a client |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
=head1 SYNOPSIS |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
use Gearman::Client; |
16
|
|
|
|
|
|
|
my $client = Gearman::Client->new; |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
# waiting on a set of tasks in parallel |
19
|
|
|
|
|
|
|
my $ts = $client->new_task_set; |
20
|
|
|
|
|
|
|
$ts->add_task( "add" => "1+2", {...}); |
21
|
|
|
|
|
|
|
$ts->wait(); |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
=head1 DESCRIPTION |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
=head1 METHODS |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
=cut |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
use fields ( |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# { handle => [Task, ...] } |
35
|
7
|
|
|
|
|
34
|
'waiting', |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
# Gearman::Client |
38
|
|
|
|
|
|
|
'client', |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# arrayref |
41
|
|
|
|
|
|
|
'need_handle', |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# default socket (non-merged requests) |
44
|
|
|
|
|
|
|
'default_sock', |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# default socket's ip/port |
47
|
|
|
|
|
|
|
'default_sockaddr', |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
# { hostport => socket } |
50
|
|
|
|
|
|
|
'loaned_sock', |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# bool, if taskset has been cancelled mid-processing |
53
|
|
|
|
|
|
|
'cancelled', |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
# hookname -> coderef |
56
|
|
|
|
|
|
|
'hooks', |
57
|
7
|
|
|
7
|
|
435
|
); |
|
7
|
|
|
|
|
1099
|
|
58
|
|
|
|
|
|
|
|
59
|
7
|
|
|
7
|
|
480
|
use Carp (); |
|
7
|
|
|
|
|
7
|
|
|
7
|
|
|
|
|
67
|
|
60
|
7
|
|
|
7
|
|
714
|
use Gearman::Util (); |
|
7
|
|
|
|
|
8
|
|
|
7
|
|
|
|
|
105
|
|
61
|
7
|
|
|
7
|
|
2422
|
use Gearman::ResponseParser::Taskset; |
|
7
|
|
|
|
|
11
|
|
|
7
|
|
|
|
|
138
|
|
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
# i thought about weakening taskset's client, but might be too weak. |
64
|
7
|
|
|
7
|
|
23
|
use Scalar::Util (); |
|
7
|
|
|
|
|
8
|
|
|
7
|
|
|
|
|
67
|
|
65
|
7
|
|
|
7
|
|
476
|
use Socket (); |
|
7
|
|
|
|
|
2567
|
|
|
7
|
|
|
|
|
77
|
|
66
|
7
|
|
|
7
|
|
520
|
use Storable (); |
|
7
|
|
|
|
|
2073
|
|
|
7
|
|
|
|
|
76
|
|
67
|
7
|
|
|
7
|
|
22
|
use Time::HiRes (); |
|
7
|
|
|
|
|
5
|
|
|
7
|
|
|
|
|
12824
|
|
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
=head2 new($client) |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
=cut |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
sub new { |
74
|
5
|
|
|
5
|
1
|
1652
|
my ($self, $client) = @_; |
75
|
5
|
100
|
66
|
|
|
80
|
(Scalar::Util::blessed($client) && $client->isa("Gearman::Client")) |
76
|
|
|
|
|
|
|
|| Carp::croak |
77
|
|
|
|
|
|
|
"provided client argument is not a Gearman::Client reference"; |
78
|
|
|
|
|
|
|
|
79
|
4
|
50
|
|
|
|
15
|
unless (ref $self) { |
80
|
4
|
|
|
|
|
14
|
$self = fields::new($self); |
81
|
|
|
|
|
|
|
} |
82
|
|
|
|
|
|
|
|
83
|
4
|
|
|
|
|
211
|
$self->{waiting} = {}; |
84
|
4
|
|
|
|
|
7
|
$self->{need_handle} = []; |
85
|
4
|
|
|
|
|
8
|
$self->{client} = $client; |
86
|
4
|
|
|
|
|
6
|
$self->{loaned_sock} = {}; |
87
|
4
|
|
|
|
|
5
|
$self->{cancelled} = 0; |
88
|
4
|
|
|
|
|
8
|
$self->{hooks} = {}; |
89
|
|
|
|
|
|
|
|
90
|
4
|
|
|
|
|
7
|
return $self; |
91
|
|
|
|
|
|
|
} ## end sub new |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
sub DESTROY { |
94
|
2
|
|
|
2
|
|
772
|
my $self = shift; |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
# During global cleanup this may be called out of order, and the client my not exist in the taskset. |
97
|
2
|
50
|
|
|
|
8
|
return unless $self->{client}; |
98
|
|
|
|
|
|
|
|
99
|
2
|
50
|
|
|
|
7
|
if ($self->{default_sock}) { |
100
|
|
|
|
|
|
|
$self->client->_sock_cache($self->{default_sockaddr}, |
101
|
0
|
|
|
|
|
0
|
$self->{default_sock}); |
102
|
|
|
|
|
|
|
} |
103
|
|
|
|
|
|
|
|
104
|
2
|
|
|
|
|
2
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
2
|
|
|
|
|
45
|
|
105
|
0
|
|
|
|
|
0
|
$self->client->_sock_cache($hp, $sock); |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
} ## end sub DESTROY |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
=head2 run_hook($name) |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
run a hook callback if defined |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
=cut |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
sub run_hook { |
116
|
1
|
|
|
1
|
1
|
3
|
my ($self, $name) = (shift, shift); |
117
|
1
|
50
|
33
|
|
|
6
|
($name && $self->{hooks}->{$name}) || return; |
118
|
|
|
|
|
|
|
|
119
|
0
|
|
|
|
|
0
|
eval { $self->{hooks}->{$name}->(@_) }; |
|
0
|
|
|
|
|
0
|
|
120
|
|
|
|
|
|
|
|
121
|
0
|
0
|
|
|
|
0
|
warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@; |
122
|
|
|
|
|
|
|
} ## end sub run_hook |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
=head2 add_hook($name, [$cb]) |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
add a hook |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=cut |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
sub add_hook { |
131
|
0
|
|
|
0
|
1
|
0
|
my ($self, $name, $cb) = @_; |
132
|
0
|
0
|
|
|
|
0
|
$name || return; |
133
|
|
|
|
|
|
|
|
134
|
0
|
0
|
|
|
|
0
|
if ($cb) { |
135
|
0
|
|
|
|
|
0
|
$self->{hooks}->{$name} = $cb; |
136
|
|
|
|
|
|
|
} |
137
|
|
|
|
|
|
|
else { |
138
|
0
|
|
|
|
|
0
|
delete $self->{hooks}->{$name}; |
139
|
|
|
|
|
|
|
} |
140
|
|
|
|
|
|
|
} ## end sub add_hook |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
=head2 client () |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
this method is part of the "Taskset" interface, also implemented by |
145
|
|
|
|
|
|
|
Gearman::Client::Async, where no tasksets make sense, so instead the |
146
|
|
|
|
|
|
|
Gearman::Client::Async object itself is also its taskset. (the |
147
|
|
|
|
|
|
|
client tracks all tasks). so don't change this, without being aware |
148
|
|
|
|
|
|
|
of Gearman::Client::Async. similarly, don't access $ts->{client} without |
149
|
|
|
|
|
|
|
going via this accessor. |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=cut |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
sub client { |
154
|
18
|
|
|
18
|
1
|
8046
|
return shift->{client}; |
155
|
|
|
|
|
|
|
} |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
=head2 cancel() |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
Close sockets, cleanup internals. |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
=cut |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
sub cancel { |
164
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
165
|
|
|
|
|
|
|
|
166
|
0
|
|
|
|
|
0
|
$self->{cancelled} = 1; |
167
|
|
|
|
|
|
|
|
168
|
0
|
0
|
|
|
|
0
|
if ($self->{default_sock}) { |
169
|
0
|
|
|
|
|
0
|
close($self->{default_sock}); |
170
|
0
|
|
|
|
|
0
|
$self->{default_sock} = undef; |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
|
173
|
0
|
|
|
|
|
0
|
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
174
|
0
|
|
|
|
|
0
|
$sock->close; |
175
|
|
|
|
|
|
|
} |
176
|
|
|
|
|
|
|
|
177
|
0
|
|
|
|
|
0
|
$self->{waiting} = {}; |
178
|
0
|
|
|
|
|
0
|
$self->{need_handle} = []; |
179
|
0
|
|
|
|
|
0
|
$self->{client} = undef; |
180
|
|
|
|
|
|
|
} ## end sub cancel |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
# |
183
|
|
|
|
|
|
|
# _get_loaned_sock($js) |
184
|
|
|
|
|
|
|
# |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
sub _get_loaned_sock { |
187
|
0
|
|
|
0
|
|
0
|
my ($self, $js) = @_; |
188
|
0
|
|
|
|
|
0
|
my $js_str = $self->client()->_js_str($js); |
189
|
|
|
|
|
|
|
|
190
|
0
|
0
|
|
|
|
0
|
if (my $sock = $self->{loaned_sock}{$js_str}) { |
191
|
0
|
0
|
|
|
|
0
|
return $sock if $sock->connected; |
192
|
0
|
|
|
|
|
0
|
delete $self->{loaned_sock}{$js_str}; |
193
|
|
|
|
|
|
|
} |
194
|
|
|
|
|
|
|
|
195
|
0
|
|
|
|
|
0
|
my $sock = $self->client()->_get_js_sock($js); |
196
|
|
|
|
|
|
|
|
197
|
0
|
|
|
|
|
0
|
return $self->{loaned_sock}{$js_str} = $sock; |
198
|
|
|
|
|
|
|
} ## end sub _get_loaned_sock |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=head2 wait(%opts) |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
event loop for reading in replies |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=cut |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
sub wait { |
207
|
0
|
|
|
0
|
1
|
0
|
my ($self, %opts) = @_; |
208
|
0
|
|
|
|
|
0
|
my $timeout; |
209
|
0
|
0
|
|
|
|
0
|
if (exists $opts{timeout}) { |
210
|
0
|
|
|
|
|
0
|
$timeout = delete $opts{timeout}; |
211
|
0
|
0
|
|
|
|
0
|
$timeout += Time::HiRes::time() if defined $timeout; |
212
|
|
|
|
|
|
|
} |
213
|
|
|
|
|
|
|
|
214
|
0
|
0
|
|
|
|
0
|
Carp::carp "Unknown options: " |
215
|
|
|
|
|
|
|
. join(',', keys %opts) |
216
|
|
|
|
|
|
|
. " passed to Taskset->wait." |
217
|
|
|
|
|
|
|
if keys %opts; |
218
|
|
|
|
|
|
|
|
219
|
0
|
|
|
|
|
0
|
my %parser; # fd -> Gearman::ResponseParser object |
220
|
|
|
|
|
|
|
|
221
|
0
|
|
|
|
|
0
|
my ($rin, $rout, $eout) = ('', '', ''); |
222
|
0
|
|
|
|
|
0
|
my %watching; |
223
|
|
|
|
|
|
|
|
224
|
0
|
|
|
|
|
0
|
for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
225
|
0
|
0
|
|
|
|
0
|
next unless $sock; |
226
|
0
|
0
|
|
|
|
0
|
if (my $fd = $sock->fileno) { |
227
|
0
|
|
|
|
|
0
|
vec($rin, $fd, 1) = 1; |
228
|
0
|
|
|
|
|
0
|
$watching{$fd} = $sock; |
229
|
|
|
|
|
|
|
} |
230
|
|
|
|
|
|
|
} ## end for my $sock ($self->{default_sock...}) |
231
|
|
|
|
|
|
|
|
232
|
0
|
|
0
|
|
|
0
|
while (!$self->{cancelled} && keys %{ $self->{waiting} }) { |
|
0
|
|
|
|
|
0
|
|
233
|
0
|
0
|
|
|
|
0
|
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5; |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
# TODO drop the eout. |
236
|
0
|
|
|
|
|
0
|
my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left); |
237
|
0
|
0
|
0
|
|
|
0
|
if ($timeout && $time_left <= 0) { |
238
|
0
|
|
|
|
|
0
|
$self->cancel; |
239
|
0
|
|
|
|
|
0
|
return; |
240
|
|
|
|
|
|
|
} |
241
|
0
|
0
|
|
|
|
0
|
next if !$nfound; |
242
|
|
|
|
|
|
|
|
243
|
0
|
|
|
|
|
0
|
foreach my $fd (keys %watching) { |
244
|
0
|
0
|
|
|
|
0
|
next unless vec($rout, $fd, 1); |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
# TODO: deal with error vector |
247
|
0
|
|
|
|
|
0
|
my $sock = $watching{$fd}; |
248
|
0
|
|
0
|
|
|
0
|
my $parser = $parser{$fd} |
249
|
|
|
|
|
|
|
||= Gearman::ResponseParser::Taskset->new( |
250
|
|
|
|
|
|
|
source => $sock, |
251
|
|
|
|
|
|
|
taskset => $self |
252
|
|
|
|
|
|
|
); |
253
|
0
|
|
|
|
|
0
|
eval { $parser->parse_sock($sock); }; |
|
0
|
|
|
|
|
0
|
|
254
|
|
|
|
|
|
|
|
255
|
0
|
0
|
|
|
|
0
|
if ($@) { |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail. |
258
|
|
|
|
|
|
|
# We're not in an accessible place here, so if all job servers fail we must die to prevent hanging. |
259
|
0
|
|
|
|
|
0
|
Carp::croak("Job server failure: $@"); |
260
|
|
|
|
|
|
|
} ## end if ($@) |
261
|
|
|
|
|
|
|
} ## end foreach my $fd (keys %watching) |
262
|
|
|
|
|
|
|
} ## end while (!$self->{cancelled...}) |
263
|
|
|
|
|
|
|
} ## end sub wait |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
=head2 add_task(Gearman::Task) |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr> |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
C<$opts_hr> see L |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=cut |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
sub add_task { |
274
|
1
|
|
|
1
|
1
|
1
|
my $self = shift; |
275
|
1
|
|
|
|
|
3
|
my $task = $self->client()->_get_task_from_args(@_); |
276
|
|
|
|
|
|
|
|
277
|
1
|
|
|
|
|
3
|
$task->taskset($self); |
278
|
|
|
|
|
|
|
|
279
|
1
|
|
|
|
|
3
|
$self->run_hook('add_task', $self, $task); |
280
|
|
|
|
|
|
|
|
281
|
1
|
|
|
|
|
2
|
my $jssock = $task->{jssock}; |
282
|
|
|
|
|
|
|
|
283
|
1
|
50
|
|
|
|
5
|
return $task->fail("undefined jssock") unless ($jssock); |
284
|
|
|
|
|
|
|
|
285
|
0
|
|
|
|
|
0
|
my $req = $task->pack_submit_packet($self->client); |
286
|
0
|
|
|
|
|
0
|
my $len = length($req); |
287
|
0
|
|
|
|
|
0
|
my $rv = $jssock->syswrite($req, $len); |
288
|
0
|
|
0
|
|
|
0
|
$rv ||= 0; |
289
|
0
|
0
|
|
|
|
0
|
Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len; |
290
|
|
|
|
|
|
|
|
291
|
0
|
|
|
|
|
0
|
push @{ $self->{need_handle} }, $task; |
|
0
|
|
|
|
|
0
|
|
292
|
0
|
|
|
|
|
0
|
while (@{ $self->{need_handle} }) { |
|
0
|
|
|
|
|
0
|
|
293
|
|
|
|
|
|
|
my $rv |
294
|
|
|
|
|
|
|
= $self->_wait_for_packet($jssock, |
295
|
0
|
|
|
|
|
0
|
$self->{client}->{command_timeout}); |
296
|
0
|
0
|
|
|
|
0
|
if (!$rv) { |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
# ditch it, it failed. |
299
|
|
|
|
|
|
|
# this will resubmit it if it failed. |
300
|
0
|
|
|
|
|
0
|
shift @{ $self->{need_handle} }; |
|
0
|
|
|
|
|
0
|
|
301
|
0
|
0
|
|
|
|
0
|
return $task->fail( |
302
|
|
|
|
|
|
|
join(' ', |
303
|
|
|
|
|
|
|
"no rv on waiting for packet", |
304
|
|
|
|
|
|
|
defined($rv) ? $rv : $!) |
305
|
|
|
|
|
|
|
); |
306
|
|
|
|
|
|
|
} ## end if (!$rv) |
307
|
|
|
|
|
|
|
} ## end while (@{ $self->{need_handle...}}) |
308
|
|
|
|
|
|
|
|
309
|
0
|
|
|
|
|
0
|
return $task->handle; |
310
|
|
|
|
|
|
|
} ## end sub add_task |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
# |
313
|
|
|
|
|
|
|
# _get_default_sock() |
314
|
|
|
|
|
|
|
# used in Gearman::Task->taskset only |
315
|
|
|
|
|
|
|
# |
316
|
|
|
|
|
|
|
sub _get_default_sock { |
317
|
6
|
|
|
6
|
|
3761
|
my $self = shift; |
318
|
6
|
50
|
|
|
|
15
|
return $self->{default_sock} if $self->{default_sock}; |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
my $getter = sub { |
321
|
0
|
|
|
0
|
|
0
|
my $js = shift; |
322
|
|
|
|
|
|
|
return $self->{loaned_sock}{$js} |
323
|
0
|
|
0
|
|
|
0
|
|| $self->{client}->_get_js_sock($js); |
324
|
6
|
|
|
|
|
17
|
}; |
325
|
|
|
|
|
|
|
|
326
|
6
|
|
|
|
|
13
|
my ($js, $jss) = $self->client()->_get_random_js_sock($getter); |
327
|
6
|
50
|
|
|
|
31
|
return unless $jss; |
328
|
|
|
|
|
|
|
|
329
|
0
|
|
|
|
|
0
|
my $js_str = $self->client()->_js_str($js); |
330
|
0
|
|
0
|
|
|
0
|
$self->{loaned_sock}{$js_str} ||= $jss; |
331
|
|
|
|
|
|
|
|
332
|
0
|
|
|
|
|
0
|
$self->{default_sock} = $jss; |
333
|
0
|
|
|
|
|
0
|
$self->{default_sockaddr} = $js_str; |
334
|
|
|
|
|
|
|
|
335
|
0
|
|
|
|
|
0
|
return $jss; |
336
|
|
|
|
|
|
|
} ## end sub _get_default_sock |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
# |
339
|
|
|
|
|
|
|
# _get_hashed_sock($hv) |
340
|
|
|
|
|
|
|
# |
341
|
|
|
|
|
|
|
# only used in Gearman::Task->taskset only |
342
|
|
|
|
|
|
|
# |
343
|
|
|
|
|
|
|
# return a socket |
344
|
|
|
|
|
|
|
sub _get_hashed_sock { |
345
|
2
|
|
|
2
|
|
7
|
my $self = shift; |
346
|
2
|
|
|
|
|
2
|
my $hv = shift; |
347
|
|
|
|
|
|
|
my ($js_count, @job_servers) |
348
|
2
|
|
|
|
|
6
|
= ($self->client()->{js_count}, $self->client()->job_servers()); |
349
|
2
|
|
|
|
|
3
|
my $sock; |
350
|
2
|
|
|
|
|
6
|
for (my $off = 0; $off < $js_count; $off++) { |
351
|
0
|
|
|
|
|
0
|
my $idx = ($hv + $off) % ($js_count); |
352
|
0
|
|
|
|
|
0
|
$sock = $self->_get_loaned_sock($job_servers[$idx]); |
353
|
0
|
|
|
|
|
0
|
last; |
354
|
|
|
|
|
|
|
} |
355
|
|
|
|
|
|
|
|
356
|
2
|
|
|
|
|
5
|
return $sock; |
357
|
|
|
|
|
|
|
} ## end sub _get_hashed_sock |
358
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
# |
360
|
|
|
|
|
|
|
# _wait_for_packet($sock, $timeout) |
361
|
|
|
|
|
|
|
# |
362
|
|
|
|
|
|
|
# $sock socket to singularly read from |
363
|
|
|
|
|
|
|
# |
364
|
|
|
|
|
|
|
# returns boolean when given a sock to wait on. |
365
|
|
|
|
|
|
|
# otherwise, return value is undefined. |
366
|
|
|
|
|
|
|
sub _wait_for_packet { |
367
|
0
|
|
|
0
|
|
0
|
my ($self, $sock, $timeout) = @_; |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
#TODO check $err after read |
370
|
0
|
|
|
|
|
0
|
my $err; |
371
|
0
|
|
|
|
|
0
|
my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout); |
372
|
|
|
|
|
|
|
|
373
|
0
|
0
|
|
|
|
0
|
return $res ? $self->process_packet($res, $sock) : 0; |
374
|
|
|
|
|
|
|
} ## end sub _wait_for_packet |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
# |
377
|
|
|
|
|
|
|
# _is_port($sock) |
378
|
|
|
|
|
|
|
# |
379
|
|
|
|
|
|
|
# return hostport || ipport |
380
|
|
|
|
|
|
|
# |
381
|
|
|
|
|
|
|
sub _ip_port { |
382
|
1
|
|
|
1
|
|
2
|
my ($self, $sock) = @_; |
383
|
1
|
50
|
|
|
|
3
|
$sock || return; |
384
|
|
|
|
|
|
|
|
385
|
0
|
|
|
|
|
0
|
my $pn = getpeername($sock); |
386
|
0
|
0
|
|
|
|
0
|
$pn || return; |
387
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
# look for a hostport in loaned_sock |
389
|
0
|
|
|
|
|
0
|
my $hostport; |
390
|
0
|
|
|
|
|
0
|
while (my ($hp, $s) = each %{ $self->{loaned_sock} }) { |
|
0
|
|
|
|
|
0
|
|
391
|
0
|
0
|
|
|
|
0
|
$s || next; |
392
|
0
|
0
|
|
|
|
0
|
if ($sock == $s) { |
393
|
0
|
|
|
|
|
0
|
$hostport = $hp; |
394
|
0
|
|
|
|
|
0
|
last; |
395
|
|
|
|
|
|
|
} |
396
|
|
|
|
|
|
|
} ## end while (my ($hp, $s) = each...) |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
# hopefully it solves client->get_status mismatch |
399
|
0
|
0
|
|
|
|
0
|
$hostport && return $hostport; |
400
|
|
|
|
|
|
|
|
401
|
0
|
|
|
|
|
0
|
my $fam = Socket::sockaddr_family($pn); |
402
|
0
|
0
|
|
|
|
0
|
my ($port, $iaddr) |
403
|
|
|
|
|
|
|
= ($fam == Socket::AF_INET6) |
404
|
|
|
|
|
|
|
? Socket::sockaddr_in6($pn) |
405
|
|
|
|
|
|
|
: Socket::sockaddr_in($pn); |
406
|
|
|
|
|
|
|
|
407
|
0
|
|
|
|
|
0
|
my $addr = Socket::inet_ntop($fam, $iaddr); |
408
|
|
|
|
|
|
|
|
409
|
0
|
|
|
|
|
0
|
return join ':', $addr, $port; |
410
|
|
|
|
|
|
|
} ## end sub _ip_port |
411
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
# |
413
|
|
|
|
|
|
|
# _fail_jshandle($shandle, $type, [$message]) |
414
|
|
|
|
|
|
|
# |
415
|
|
|
|
|
|
|
# note the failure of a task given by its jobserver-specific handle |
416
|
|
|
|
|
|
|
# |
417
|
|
|
|
|
|
|
sub _fail_jshandle { |
418
|
2
|
|
|
2
|
|
5
|
my ($self, $shandle, $type, $msg) = @_; |
419
|
2
|
50
|
|
|
|
5
|
$shandle |
420
|
|
|
|
|
|
|
or Carp::croak "_fail_jshandle() called without shandle parameter"; |
421
|
|
|
|
|
|
|
|
422
|
2
|
50
|
|
|
|
6
|
my $task_list = $self->{waiting}{$shandle} |
423
|
|
|
|
|
|
|
or Carp::croak "Uhhhh: got $type for unknown handle: $shandle"; |
424
|
|
|
|
|
|
|
|
425
|
2
|
|
|
|
|
3
|
my $task = shift @{$task_list}; |
|
2
|
|
|
|
|
3
|
|
426
|
2
|
50
|
33
|
|
|
14
|
(Scalar::Util::blessed($task) && $task->isa("Gearman::Task")) |
427
|
|
|
|
|
|
|
|| Carp::croak |
428
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on $type for handle $shandle\n"; |
429
|
|
|
|
|
|
|
|
430
|
2
|
|
50
|
|
|
14
|
$task->fail($msg || "jshandle fail"); |
431
|
|
|
|
|
|
|
|
432
|
2
|
50
|
|
|
|
9
|
delete $self->{waiting}{$shandle} unless @$task_list; |
433
|
|
|
|
|
|
|
} ## end sub _fail_jshandle |
434
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
=head2 process_packet($res, $sock) |
436
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
=cut |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub process_packet { |
440
|
14
|
|
|
14
|
1
|
2167
|
my ($self, $res, $sock) = @_; |
441
|
|
|
|
|
|
|
|
442
|
14
|
|
|
|
|
31
|
my $qr = qr/(.+?)\0/; |
443
|
|
|
|
|
|
|
my %assert = ( |
444
|
|
|
|
|
|
|
task => sub { |
445
|
7
|
|
|
7
|
|
7
|
my ($task, $msg) = @_; |
446
|
7
|
100
|
66
|
|
|
71
|
(Scalar::Util::blessed($task) && $task->isa("Gearman::Task")) |
447
|
|
|
|
|
|
|
|| Carp::croak $msg; |
448
|
|
|
|
|
|
|
} |
449
|
14
|
|
|
|
|
51
|
); |
450
|
|
|
|
|
|
|
my %type = ( |
451
|
|
|
|
|
|
|
job_created => sub { |
452
|
2
|
|
|
2
|
|
4
|
my ($blob) = shift; |
453
|
2
|
|
|
|
|
2
|
my $task = shift @{ $self->{need_handle} }; |
|
2
|
|
|
|
|
4
|
|
454
|
|
|
|
|
|
|
$assert{task} |
455
|
2
|
|
|
|
|
4
|
->($task, "Um, got an unexpected job_created notification"); |
456
|
1
|
|
|
|
|
1
|
my $shandle = $blob; |
457
|
1
|
|
|
|
|
4
|
my $ipport = $self->_ip_port($sock); |
458
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
# did sock become disconnected in the meantime? |
460
|
1
|
50
|
|
|
|
2
|
if (!$ipport) { |
461
|
1
|
|
|
|
|
3
|
$self->_fail_jshandle($shandle, "job_created"); |
462
|
1
|
|
|
|
|
16
|
return 1; |
463
|
|
|
|
|
|
|
} |
464
|
|
|
|
|
|
|
|
465
|
0
|
|
|
|
|
0
|
$task->handle("$ipport//$shandle"); |
466
|
0
|
0
|
|
|
|
0
|
return 1 if $task->{background}; |
467
|
0
|
|
0
|
|
|
0
|
push @{ $self->{waiting}{$shandle} ||= [] }, $task; |
|
0
|
|
|
|
|
0
|
|
468
|
0
|
|
|
|
|
0
|
return 1; |
469
|
|
|
|
|
|
|
}, |
470
|
|
|
|
|
|
|
work_complete => sub { |
471
|
3
|
|
|
3
|
|
4
|
my ($blob) = shift; |
472
|
3
|
100
|
|
|
|
35
|
($blob =~ /^$qr/) |
473
|
|
|
|
|
|
|
or Carp::croak "Bogus work_complete from server"; |
474
|
2
|
|
|
|
|
16
|
$blob =~ s/^$qr//; |
475
|
2
|
|
|
|
|
4
|
my $shandle = $1; |
476
|
|
|
|
|
|
|
|
477
|
2
|
|
|
|
|
4
|
my $task_list = $self->{waiting}{$shandle}; |
478
|
2
|
|
|
|
|
3
|
my $task = shift @$task_list; |
479
|
2
|
|
|
|
|
7
|
$assert{task}->( |
480
|
|
|
|
|
|
|
$task, |
481
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_complete for handle $shandle" |
482
|
|
|
|
|
|
|
); |
483
|
|
|
|
|
|
|
|
484
|
1
|
|
|
|
|
5
|
$task->complete(\$blob); |
485
|
1
|
50
|
|
|
|
9
|
delete $self->{waiting}{$shandle} unless @$task_list; |
486
|
|
|
|
|
|
|
|
487
|
1
|
|
|
|
|
18
|
return 1; |
488
|
|
|
|
|
|
|
}, |
489
|
|
|
|
|
|
|
work_data => sub { |
490
|
3
|
|
|
3
|
|
5
|
my ($blob) = shift; |
491
|
3
|
100
|
|
|
|
25
|
$blob =~ s/^(.+?)\0// |
492
|
|
|
|
|
|
|
or Carp::croak "Bogus work_data from server"; |
493
|
2
|
|
|
|
|
4
|
my $shandle = $1; |
494
|
|
|
|
|
|
|
|
495
|
2
|
|
|
|
|
3
|
my $task_list = $self->{waiting}{$shandle}; |
496
|
2
|
|
|
|
|
3
|
my $task = $task_list->[0]; |
497
|
2
|
|
|
|
|
6
|
$assert{task}->( |
498
|
|
|
|
|
|
|
$task, |
499
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_data for handle $shandle" |
500
|
|
|
|
|
|
|
); |
501
|
|
|
|
|
|
|
|
502
|
1
|
|
|
|
|
4
|
$task->data(\$blob); |
503
|
|
|
|
|
|
|
|
504
|
1
|
|
|
|
|
382
|
return 1; |
505
|
|
|
|
|
|
|
}, |
506
|
|
|
|
|
|
|
work_warning => sub { |
507
|
0
|
|
|
0
|
|
0
|
my ($blob) = shift; |
508
|
0
|
0
|
|
|
|
0
|
$blob =~ s/^(.+?)\0// |
509
|
|
|
|
|
|
|
or Carp::croak "Bogus work_warning from server"; |
510
|
0
|
|
|
|
|
0
|
my $shandle = $1; |
511
|
|
|
|
|
|
|
|
512
|
0
|
|
|
|
|
0
|
my $task_list = $self->{waiting}{$shandle}; |
513
|
0
|
|
|
|
|
0
|
my $task = $task_list->[0]; |
514
|
0
|
|
|
|
|
0
|
$assert{task}->( |
515
|
|
|
|
|
|
|
$task, |
516
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_warning for handle $shandle" |
517
|
|
|
|
|
|
|
); |
518
|
|
|
|
|
|
|
|
519
|
0
|
|
|
|
|
0
|
$task->warning(\$blob); |
520
|
|
|
|
|
|
|
|
521
|
0
|
|
|
|
|
0
|
return 1; |
522
|
|
|
|
|
|
|
}, |
523
|
|
|
|
|
|
|
work_exception => sub { |
524
|
2
|
|
|
2
|
|
3
|
my ($blob) = shift; |
525
|
2
|
100
|
|
|
|
31
|
($blob =~ /^$qr/) |
526
|
|
|
|
|
|
|
or Carp::croak "Bogus work_exception from server"; |
527
|
1
|
|
|
|
|
13
|
$blob =~ s/^$qr//; |
528
|
1
|
|
|
|
|
3
|
my $shandle = $1; |
529
|
|
|
|
|
|
|
|
530
|
1
|
|
|
|
|
2
|
my $task_list = $self->{waiting}{$shandle}; |
531
|
1
|
|
|
|
|
1
|
my $task = $task_list->[0]; |
532
|
1
|
|
|
|
|
3
|
$assert{task}->( |
533
|
|
|
|
|
|
|
$task, |
534
|
|
|
|
|
|
|
"Uhhhh: task_list is empty on work_exception for handle $shandle" |
535
|
|
|
|
|
|
|
); |
536
|
|
|
|
|
|
|
|
537
|
|
|
|
|
|
|
#FIXME we have to freeze $blob because Task->exception expected it in this form. |
538
|
|
|
|
|
|
|
# The only reason I see to do it so, is Worker->work implementation. With Gearman::Server it uses nfreeze for exception value. |
539
|
1
|
|
|
|
|
10
|
$task->exception(\Storable::freeze(\$blob)); |
540
|
|
|
|
|
|
|
|
541
|
1
|
|
|
|
|
19
|
return 1; |
542
|
|
|
|
|
|
|
}, |
543
|
|
|
|
|
|
|
work_fail => sub { |
544
|
1
|
|
|
1
|
|
2
|
my ($blob) = shift; |
545
|
1
|
|
|
|
|
2
|
my ($shandle, $msg) = split(/\0/, $blob); |
546
|
1
|
|
33
|
|
|
3
|
$shandle ||=$blob; |
547
|
1
|
|
|
|
|
3
|
$self->_fail_jshandle($shandle, "work_fail", $msg); |
548
|
1
|
|
|
|
|
20
|
return 1; |
549
|
|
|
|
|
|
|
}, |
550
|
|
|
|
|
|
|
work_status => sub { |
551
|
2
|
|
|
2
|
|
2
|
my ($blob) = shift; |
552
|
2
|
|
|
|
|
5
|
my ($shandle, $nu, $de) = split(/\0/, $blob); |
553
|
2
|
|
|
|
|
4
|
my $task_list = $self->{waiting}{$shandle}; |
554
|
2
|
100
|
50
|
|
|
6
|
ref($task_list) eq "ARRAY" && scalar(@{$task_list}) |
|
2
|
|
|
|
|
19
|
|
555
|
|
|
|
|
|
|
or Carp::croak |
556
|
|
|
|
|
|
|
"Uhhhh: got work_status for unknown handle: $shandle"; |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
# FIXME: the server is (probably) sending a work_status packet for each |
559
|
|
|
|
|
|
|
# interested client, even if the clients are the same, so probably need |
560
|
|
|
|
|
|
|
# to fix the server not to do that. just put this FIXME here for now, |
561
|
|
|
|
|
|
|
# though really it's a server issue. |
562
|
1
|
|
|
|
|
2
|
foreach my $task (@$task_list) { |
563
|
1
|
|
|
|
|
5
|
$task->status($nu, $de); |
564
|
|
|
|
|
|
|
} |
565
|
|
|
|
|
|
|
|
566
|
1
|
|
|
|
|
726
|
return 1; |
567
|
|
|
|
|
|
|
}, |
568
|
14
|
|
|
|
|
137
|
); |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
defined($type{ $res->{type} }) |
571
|
14
|
100
|
|
|
|
39
|
|| Carp::croak |
572
|
1
|
|
|
|
|
14
|
"Unimplemented packet type: $res->{type} [${$res->{blobref}}]"; |
573
|
|
|
|
|
|
|
|
574
|
13
|
|
|
|
|
9
|
return $type{ $res->{type} }->(${ $res->{blobref} }); |
|
13
|
|
|
|
|
30
|
|
575
|
|
|
|
|
|
|
} ## end sub process_packet |
576
|
|
|
|
|
|
|
|
577
|
|
|
|
|
|
|
1; |