line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Gearman::Server::Client; |
2
|
3
|
|
|
3
|
|
13
|
use version (); |
|
3
|
|
|
|
|
4
|
|
|
3
|
|
|
|
|
117
|
|
3
|
|
|
|
|
|
|
$Gearman::Server::Client::VERSION = version->declare("1.140_001"); |
4
|
|
|
|
|
|
|
|
5
|
3
|
|
|
3
|
|
13
|
use strict; |
|
3
|
|
|
|
|
4
|
|
|
3
|
|
|
|
|
57
|
|
6
|
3
|
|
|
3
|
|
11
|
use warnings; |
|
3
|
|
|
|
|
9
|
|
|
3
|
|
|
|
|
86
|
|
7
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
=head1 NAME |
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
Gearman::Server::Client - client for L based on L |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
=head1 DESCRIPTION |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
Used by L to instantiate connections from clients. |
15
|
|
|
|
|
|
|
Clients speak either a binary protocol, for normal operation (calling |
16
|
|
|
|
|
|
|
functions, grabbing function call requests, returning function values, |
17
|
|
|
|
|
|
|
etc), or a text-based line protocol, for relatively rare |
18
|
|
|
|
|
|
|
administrative / monitoring commands. |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
The binary protocol commands aren't currently documented. (FIXME) But |
21
|
|
|
|
|
|
|
they're well-implemented in L, L, |
22
|
|
|
|
|
|
|
and L, if that's any consolation. |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
The line-based administrative commands are documented below. |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
=head1 METHODS |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
=cut |
29
|
|
|
|
|
|
|
|
30
|
3
|
|
|
3
|
|
1474
|
use Gearman::Util; |
|
3
|
|
|
|
|
14107
|
|
|
3
|
|
|
|
|
109
|
|
31
|
3
|
|
|
3
|
|
1874
|
use Danga::Socket; |
|
3
|
|
|
|
|
45369
|
|
|
3
|
|
|
|
|
157
|
|
32
|
3
|
|
|
3
|
|
31
|
use base 'Danga::Socket'; |
|
3
|
|
|
|
|
7
|
|
|
3
|
|
|
|
|
750
|
|
33
|
|
|
|
|
|
|
use fields ( |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
# { $job_name => $timeout } $timeout can be undef indicating no timeout |
36
|
3
|
|
|
|
|
23
|
'can_do', |
37
|
|
|
|
|
|
|
'can_do_list', |
38
|
|
|
|
|
|
|
'can_do_iter', |
39
|
|
|
|
|
|
|
'fast_read', |
40
|
|
|
|
|
|
|
'fast_buffer', |
41
|
|
|
|
|
|
|
'read_buf', |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# 0/1: they've said they're sleeping and we haven't woken them up |
44
|
|
|
|
|
|
|
'sleeping', |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# Timer for job cancellation |
47
|
|
|
|
|
|
|
'timer', |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
# { $job_handle => Job } |
50
|
|
|
|
|
|
|
'doing', |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# opaque string, no whitespace. workers give this so checker scripts |
53
|
|
|
|
|
|
|
# can tell apart the same worker connected to multiple jobservers. |
54
|
|
|
|
|
|
|
'client_id', |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
# pointer up to client's server |
57
|
|
|
|
|
|
|
'server', |
58
|
|
|
|
|
|
|
'options', |
59
|
|
|
|
|
|
|
'jobs_done_since_sleep', |
60
|
3
|
|
|
3
|
|
26
|
); |
|
3
|
|
|
|
|
7
|
|
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# 60k read buffer default, similar to perlbal's backend read. |
63
|
3
|
|
|
3
|
|
411
|
use constant READ_SIZE => 60 * 1024; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
285
|
|
64
|
3
|
|
|
3
|
|
18
|
use constant MAX_READ_SIZE => 512 * 1024; |
|
3
|
|
|
|
|
7
|
|
|
3
|
|
|
|
|
17182
|
|
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
# Class Method: |
67
|
|
|
|
|
|
|
sub new { |
68
|
2
|
|
|
2
|
1
|
4537
|
my Gearman::Server::Client $self = shift; |
69
|
2
|
|
|
|
|
5
|
my ($sock, $server) = @_; |
70
|
2
|
50
|
|
|
|
20
|
$self = fields::new($self) unless ref $self; |
71
|
2
|
|
|
|
|
510
|
$self->SUPER::new($sock); |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
# Number of bytes to read as fast as we can (don't try to process them) |
74
|
2
|
|
|
|
|
89
|
$self->{fast_read} = undef; |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
# Array of buffers used during fast read operation |
77
|
2
|
|
|
|
|
9
|
$self->{fast_buffer} = []; |
78
|
2
|
|
|
|
|
6
|
$self->{read_buf} = ''; |
79
|
2
|
|
|
|
|
5
|
$self->{sleeping} = 0; |
80
|
2
|
|
|
|
|
7
|
$self->{can_do} = {}; |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
# handle -> Job |
83
|
2
|
|
|
|
|
6
|
$self->{doing} = {}; |
84
|
2
|
|
|
|
|
6
|
$self->{can_do_list} = []; |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
# numeric iterator for where we start looking for jobs |
87
|
2
|
|
|
|
|
8
|
$self->{can_do_iter} = 0; |
88
|
2
|
|
|
|
|
7
|
$self->{client_id} = "-"; |
89
|
2
|
|
|
|
|
6
|
$self->{server} = $server; |
90
|
2
|
|
|
|
|
6
|
$self->{options} = {}; |
91
|
2
|
|
|
|
|
7
|
$self->{jobs_done_since_sleep} = 0; |
92
|
|
|
|
|
|
|
|
93
|
2
|
|
|
|
|
7
|
return $self; |
94
|
|
|
|
|
|
|
} ## end sub new |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
=head2 option($option) |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
=cut |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
sub option { |
101
|
0
|
|
|
0
|
1
|
|
my Gearman::Server::Client $self = shift; |
102
|
0
|
|
|
|
|
|
my $option = shift; |
103
|
|
|
|
|
|
|
|
104
|
0
|
|
|
|
|
|
return $self->{options}->{$option}; |
105
|
|
|
|
|
|
|
} ## end sub option |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=head2 close() |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
=cut |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
sub close { |
112
|
0
|
|
|
0
|
1
|
|
my Gearman::Server::Client $self = shift; |
113
|
|
|
|
|
|
|
|
114
|
0
|
|
|
|
|
|
my $doing = $self->{doing}; |
115
|
|
|
|
|
|
|
|
116
|
0
|
|
|
|
|
|
while (my ($handle, $job) = each %$doing) { |
117
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_fail", $handle); |
118
|
0
|
|
|
|
|
|
$job->relay_to_listeners($msg); |
119
|
0
|
|
|
|
|
|
$job->note_finished(0); |
120
|
|
|
|
|
|
|
} |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# Clear the doing list, since it may contain a set of jobs which contain |
123
|
|
|
|
|
|
|
# references back to us. |
124
|
0
|
|
|
|
|
|
%$doing = (); |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
# Remove self from sleepers, otherwise it will be leaked if another worker |
127
|
|
|
|
|
|
|
# for the job never connects. |
128
|
0
|
|
|
|
|
|
my $sleepers = $self->{server}{sleepers}; |
129
|
0
|
|
|
|
|
|
my $sleepers_list = $self->{server}{sleepers_list}; |
130
|
0
|
|
|
|
|
|
for my $job (@{ $self->{can_do_list} }) { |
|
0
|
|
|
|
|
|
|
131
|
0
|
|
|
|
|
|
my $sleeping = $sleepers->{$job}; |
132
|
0
|
|
|
|
|
|
delete $sleeping->{$self}; |
133
|
|
|
|
|
|
|
|
134
|
0
|
|
|
|
|
|
my $new_sleepers_list; |
135
|
0
|
|
|
|
|
|
for my $client (@{ $sleepers_list->{$job} }) { |
|
0
|
|
|
|
|
|
|
136
|
0
|
0
|
|
|
|
|
next unless $client; |
137
|
0
|
0
|
|
|
|
|
push @{$new_sleepers_list}, $client unless $sleeping->{$client}; |
|
0
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
} |
139
|
0
|
0
|
|
|
|
|
if ($new_sleepers_list) { |
140
|
0
|
|
|
|
|
|
$self->{server}{sleepers_list}->{$job} = $new_sleepers_list; |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
else { |
143
|
0
|
|
|
|
|
|
delete $self->{server}{sleepers_list}->{$job}; |
144
|
|
|
|
|
|
|
} |
145
|
|
|
|
|
|
|
|
146
|
0
|
0
|
|
|
|
|
delete $sleepers->{$job} unless %$sleeping; |
147
|
|
|
|
|
|
|
} ## end for my $job (@{ $self->...}) |
148
|
|
|
|
|
|
|
|
149
|
0
|
|
|
|
|
|
$self->{server}->note_disconnected_client($self); |
150
|
|
|
|
|
|
|
|
151
|
0
|
|
|
|
|
|
$self->CMD_reset_abilities; |
152
|
|
|
|
|
|
|
|
153
|
0
|
|
|
|
|
|
$self->SUPER::close; |
154
|
|
|
|
|
|
|
} ## end sub close |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
=head2 event_read() |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
read from socket |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
=cut |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
sub event_read { |
163
|
0
|
|
|
0
|
1
|
|
my Gearman::Server::Client $self = shift; |
164
|
|
|
|
|
|
|
|
165
|
0
|
|
0
|
|
|
|
my $read_size = $self->{fast_read} || READ_SIZE; |
166
|
0
|
|
|
|
|
|
my $bref = $self->read($read_size); |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
# Delay close till after buffers are written on EOF. If we are unable |
169
|
|
|
|
|
|
|
# to write 'err' or 'hup' will be thrown and we'll close faster. |
170
|
0
|
0
|
|
0
|
|
|
return $self->write(sub { $self->close }) unless defined $bref; |
|
0
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
|
172
|
0
|
0
|
|
|
|
|
if ($self->{fast_read}) { |
173
|
0
|
|
|
|
|
|
push @{ $self->{fast_buffer} }, $$bref; |
|
0
|
|
|
|
|
|
|
174
|
0
|
|
|
|
|
|
$self->{fast_read} -= length($$bref); |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
# If fast_read is still positive, then we need to read more data |
177
|
0
|
0
|
|
|
|
|
return if ($self->{fast_read} > 0); |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
# Append the whole giant read buffer to our main read buffer |
180
|
0
|
|
|
|
|
|
$self->{read_buf} .= join('', @{ $self->{fast_buffer} }); |
|
0
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
# Reset the fast read state for next time. |
183
|
0
|
|
|
|
|
|
$self->{fast_buffer} = []; |
184
|
0
|
|
|
|
|
|
$self->{fast_read} = undef; |
185
|
|
|
|
|
|
|
} ## end if ($self->{fast_read}) |
186
|
|
|
|
|
|
|
else { |
187
|
|
|
|
|
|
|
# Exact read size length likely means we have more sitting on the |
188
|
|
|
|
|
|
|
# socket. Buffer up to half a meg in one go. |
189
|
0
|
0
|
|
|
|
|
if (length($$bref) == READ_SIZE) { |
190
|
0
|
|
|
|
|
|
my $limit = int(MAX_READ_SIZE / READ_SIZE); |
191
|
0
|
|
|
|
|
|
my @crefs = ($$bref); |
192
|
0
|
|
|
|
|
|
while (my $cref = $self->read(READ_SIZE)) { |
193
|
0
|
|
|
|
|
|
push(@crefs, $$cref); |
194
|
0
|
0
|
0
|
|
|
|
last if (length($$cref) < READ_SIZE || $limit-- < 1); |
195
|
|
|
|
|
|
|
} |
196
|
0
|
|
|
|
|
|
$bref = \join('', @crefs); |
197
|
|
|
|
|
|
|
} ## end if (length($$bref) == ...) |
198
|
0
|
|
|
|
|
|
$self->{read_buf} .= $$bref; |
199
|
|
|
|
|
|
|
} ## end else [ if ($self->{fast_read})] |
200
|
|
|
|
|
|
|
|
201
|
0
|
|
|
|
|
|
my $found_cmd; |
202
|
0
|
|
|
|
|
|
do { |
203
|
0
|
|
|
|
|
|
$found_cmd = 1; |
204
|
0
|
|
|
|
|
|
my $blen = length($self->{read_buf}); |
205
|
|
|
|
|
|
|
|
206
|
0
|
0
|
|
|
|
|
if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) { |
|
|
0
|
|
|
|
|
|
207
|
0
|
|
|
|
|
|
my ($cmd, $len) = unpack("NN", $1); |
208
|
0
|
0
|
|
|
|
|
if ($blen < $len + 12) { |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
# Start a fast read loop to get all the data we need, less |
211
|
|
|
|
|
|
|
# what we already have in the buffer. |
212
|
0
|
|
|
|
|
|
$self->{fast_read} = $len + 12 - $blen; |
213
|
0
|
|
|
|
|
|
return; |
214
|
|
|
|
|
|
|
} ## end if ($blen < $len + 12) |
215
|
|
|
|
|
|
|
|
216
|
0
|
|
|
|
|
|
$self->process_cmd($cmd, substr($self->{read_buf}, 12, $len)); |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
# and slide down buf: |
219
|
0
|
|
|
|
|
|
$self->{read_buf} = substr($self->{read_buf}, 12 + $len); |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
} ## end if ($self->{read_buf} ...) |
222
|
|
|
|
|
|
|
elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) { |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
# ASCII command case (useful for telnetting in) |
225
|
0
|
|
|
|
|
|
my $line = $1; |
226
|
0
|
|
|
|
|
|
$self->process_line($line); |
227
|
|
|
|
|
|
|
} ## end elsif ($self->{read_buf} ...) |
228
|
|
|
|
|
|
|
else { |
229
|
0
|
|
|
|
|
|
$found_cmd = 0; |
230
|
|
|
|
|
|
|
} |
231
|
|
|
|
|
|
|
} while ($found_cmd); |
232
|
|
|
|
|
|
|
} ## end sub event_read |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
=head2 event_write() |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=cut |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
sub event_write { |
239
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
240
|
0
|
|
|
|
|
|
my $done = $self->write(undef); |
241
|
0
|
0
|
|
|
|
|
$self->watch_write(0) if $done; |
242
|
|
|
|
|
|
|
} |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=head2 process_line($line) |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
Line based command processor |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
=cut |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
sub process_line { |
251
|
0
|
|
|
0
|
1
|
|
my Gearman::Server::Client $self = shift; |
252
|
0
|
|
|
|
|
|
my $line = shift; |
253
|
|
|
|
|
|
|
|
254
|
0
|
0
|
0
|
|
|
|
if ($line && $line =~ /^(\w+)\s*(.*)/) { |
255
|
0
|
|
|
|
|
|
my ($cmd, $args) = ($1, $2); |
256
|
0
|
|
|
|
|
|
$cmd = lc($cmd); |
257
|
0
|
|
|
|
|
|
my $code = $self->can("TXTCMD_$cmd"); |
258
|
0
|
0
|
|
|
|
|
if ($code) { |
259
|
0
|
|
|
|
|
|
$code->($self, $args); |
260
|
0
|
|
|
|
|
|
return; |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
} ## end if ($line && $line =~ ...) |
263
|
|
|
|
|
|
|
|
264
|
0
|
|
|
|
|
|
return $self->err_line('unknown_command'); |
265
|
|
|
|
|
|
|
} ## end sub process_line |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
=head1 Binary Protocol Structure |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
All binary protocol exchanges between clients (which can be callers, |
270
|
|
|
|
|
|
|
workers, or both) and the Gearman server have common packet header: |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
4 byte magic -- either "\0REQ" for requests to the server, or |
273
|
|
|
|
|
|
|
"\0RES" for responses from the server |
274
|
|
|
|
|
|
|
4 byte type -- network order integer, representing the packet type |
275
|
|
|
|
|
|
|
4 byte length -- network order length, for data segment. |
276
|
|
|
|
|
|
|
data -- optional, if length is non-zero |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
=head1 Binary Protocol Commands |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
=head2 echo_req (type=16) |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
A debug command. The server will reply with the same data, in a echo_res (type=17) packet. |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
=head2 (and many more...) |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
FIXME: auto-generate protocol docs from internal Gearman::Util table, |
287
|
|
|
|
|
|
|
once annotated with some English? |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=cut |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
sub CMD_echo_req { |
292
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
293
|
0
|
|
|
|
|
|
my $blobref = shift; |
294
|
|
|
|
|
|
|
|
295
|
0
|
|
|
|
|
|
return $self->res_packet("echo_res", $$blobref); |
296
|
|
|
|
|
|
|
} ## end sub CMD_echo_req |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
sub CMD_work_status { |
299
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
300
|
0
|
|
|
|
|
|
my $ar = shift; |
301
|
0
|
|
|
|
|
|
my ($handle, $nu, $de) = split(/\0/, $$ar); |
302
|
|
|
|
|
|
|
|
303
|
0
|
|
|
|
|
|
my $job = $self->{doing}{$handle}; |
304
|
0
|
0
|
0
|
|
|
|
return $self->error_packet("not_worker") |
305
|
|
|
|
|
|
|
unless $job && $job->worker == $self; |
306
|
|
|
|
|
|
|
|
307
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_status", $$ar); |
308
|
0
|
|
|
|
|
|
$job->relay_to_listeners($msg); |
309
|
0
|
|
|
|
|
|
$job->status([$nu, $de]); |
310
|
0
|
|
|
|
|
|
return 1; |
311
|
|
|
|
|
|
|
} ## end sub CMD_work_status |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
sub CMD_work_complete { |
314
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
315
|
0
|
|
|
|
|
|
my $ar = shift; |
316
|
|
|
|
|
|
|
|
317
|
0
|
|
|
|
|
|
$$ar =~ s/^(.+?)\0//; |
318
|
0
|
|
|
|
|
|
my $handle = $1; |
319
|
|
|
|
|
|
|
|
320
|
0
|
|
|
|
|
|
my $job = delete $self->{doing}{$handle}; |
321
|
0
|
0
|
0
|
|
|
|
return $self->error_packet("not_worker") |
322
|
|
|
|
|
|
|
unless $job && $job->worker == $self; |
323
|
|
|
|
|
|
|
|
324
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_complete", |
325
|
|
|
|
|
|
|
join("\0", $handle, $$ar)); |
326
|
0
|
|
|
|
|
|
$job->relay_to_listeners($msg); |
327
|
0
|
|
|
|
|
|
$job->note_finished(1); |
328
|
0
|
0
|
|
|
|
|
if (my $timer = $self->{timer}) { |
329
|
0
|
|
|
|
|
|
$timer->cancel; |
330
|
0
|
|
|
|
|
|
$self->{timer} = undef; |
331
|
|
|
|
|
|
|
} |
332
|
|
|
|
|
|
|
|
333
|
0
|
|
|
|
|
|
return 1; |
334
|
|
|
|
|
|
|
} ## end sub CMD_work_complete |
335
|
|
|
|
|
|
|
|
336
|
|
|
|
|
|
|
sub CMD_work_fail { |
337
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
338
|
0
|
|
|
|
|
|
my $ar = shift; |
339
|
0
|
|
|
|
|
|
my $handle = $$ar; |
340
|
0
|
|
|
|
|
|
my $job = delete $self->{doing}{$handle}; |
341
|
0
|
0
|
0
|
|
|
|
return $self->error_packet("not_worker") |
342
|
|
|
|
|
|
|
unless $job && $job->worker == $self; |
343
|
|
|
|
|
|
|
|
344
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_fail", $handle); |
345
|
0
|
|
|
|
|
|
$job->relay_to_listeners($msg); |
346
|
0
|
|
|
|
|
|
$job->note_finished(1); |
347
|
0
|
0
|
|
|
|
|
if (my $timer = $self->{timer}) { |
348
|
0
|
|
|
|
|
|
$timer->cancel; |
349
|
0
|
|
|
|
|
|
$self->{timer} = undef; |
350
|
|
|
|
|
|
|
} |
351
|
|
|
|
|
|
|
|
352
|
0
|
|
|
|
|
|
return 1; |
353
|
|
|
|
|
|
|
} ## end sub CMD_work_fail |
354
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
sub CMD_work_exception { |
356
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
357
|
0
|
|
|
|
|
|
my $ar = shift; |
358
|
|
|
|
|
|
|
|
359
|
0
|
|
|
|
|
|
$$ar =~ s/^(.+?)\0//; |
360
|
0
|
|
|
|
|
|
my $handle = $1; |
361
|
0
|
|
|
|
|
|
my $job = $self->{doing}{$handle}; |
362
|
|
|
|
|
|
|
|
363
|
0
|
0
|
0
|
|
|
|
return $self->error_packet("not_worker") |
364
|
|
|
|
|
|
|
unless $job && $job->worker == $self; |
365
|
|
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_exception", |
367
|
|
|
|
|
|
|
join("\0", $handle, $$ar)); |
368
|
0
|
|
|
|
|
|
$job->relay_to_option_listeners($msg, "exceptions"); |
369
|
|
|
|
|
|
|
|
370
|
0
|
|
|
|
|
|
return 1; |
371
|
|
|
|
|
|
|
} ## end sub CMD_work_exception |
372
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
sub CMD_pre_sleep { |
374
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
375
|
0
|
|
|
|
|
|
$self->{'sleeping'} = 1; |
376
|
0
|
|
|
|
|
|
$self->{server}->on_client_sleep($self); |
377
|
0
|
|
|
|
|
|
return 1; |
378
|
|
|
|
|
|
|
} ## end sub CMD_pre_sleep |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
sub CMD_grab_job { |
381
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
382
|
|
|
|
|
|
|
|
383
|
0
|
|
|
|
|
|
my $job; |
384
|
0
|
|
|
|
|
|
my $can_do_size = scalar @{ $self->{can_do_list} }; |
|
0
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
|
386
|
0
|
0
|
|
|
|
|
unless ($can_do_size) { |
387
|
0
|
|
|
|
|
|
$self->res_packet("no_job"); |
388
|
0
|
|
|
|
|
|
return; |
389
|
|
|
|
|
|
|
} |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
# the offset where we start asking for jobs, to prevent starvation |
392
|
|
|
|
|
|
|
# of some job types. |
393
|
0
|
|
|
|
|
|
$self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size; |
394
|
|
|
|
|
|
|
|
395
|
0
|
|
|
|
|
|
my $tried = 0; |
396
|
0
|
|
|
|
|
|
while ($tried < $can_do_size) { |
397
|
0
|
|
|
|
|
|
my $idx = ($tried + $self->{can_do_iter}) % $can_do_size; |
398
|
0
|
|
|
|
|
|
$tried++; |
399
|
0
|
|
|
|
|
|
my $job_to_grab = $self->{can_do_list}->[$idx]; |
400
|
0
|
0
|
|
|
|
|
$job = $self->{server}->grab_job($job_to_grab) |
401
|
|
|
|
|
|
|
or next; |
402
|
|
|
|
|
|
|
|
403
|
0
|
|
|
|
|
|
$job->worker($self); |
404
|
0
|
|
|
|
|
|
$self->{doing}{ $job->handle } = $job; |
405
|
|
|
|
|
|
|
|
406
|
0
|
|
|
|
|
|
my $timeout = $self->{can_do}->{$job_to_grab}; |
407
|
0
|
0
|
|
|
|
|
if (defined $timeout) { |
408
|
|
|
|
|
|
|
my $timer = Danga::Socket->AddTimer( |
409
|
|
|
|
|
|
|
$timeout, |
410
|
|
|
|
|
|
|
sub { |
411
|
0
|
0
|
|
0
|
|
|
return $self->error_packet("not_worker") |
412
|
|
|
|
|
|
|
unless $job->worker == $self; |
413
|
|
|
|
|
|
|
|
414
|
0
|
|
|
|
|
|
my $msg = Gearman::Util::pack_res_command("work_fail", |
415
|
|
|
|
|
|
|
$job->handle); |
416
|
0
|
|
|
|
|
|
$job->relay_to_listeners($msg); |
417
|
0
|
|
|
|
|
|
$job->note_finished(1); |
418
|
0
|
|
|
|
|
|
$job->clear_listeners; |
419
|
0
|
|
|
|
|
|
$self->{timer} = undef; |
420
|
|
|
|
|
|
|
} |
421
|
0
|
|
|
|
|
|
); |
422
|
0
|
|
|
|
|
|
$self->{timer} = $timer; |
423
|
|
|
|
|
|
|
} ## end if (defined $timeout) |
424
|
|
|
|
|
|
|
return $self->res_packet("job_assign", |
425
|
0
|
|
|
|
|
|
join("\0", $job->handle, $job->func, ${ $job->argref },)); |
|
0
|
|
|
|
|
|
|
426
|
|
|
|
|
|
|
} ## end while ($tried < $can_do_size) |
427
|
|
|
|
|
|
|
|
428
|
0
|
|
|
|
|
|
$self->res_packet("no_job"); |
429
|
|
|
|
|
|
|
} ## end sub CMD_grab_job |
430
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
sub CMD_can_do { |
432
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
433
|
0
|
|
|
|
|
|
my $ar = shift; |
434
|
|
|
|
|
|
|
|
435
|
0
|
|
|
|
|
|
$self->{can_do}->{$$ar} = undef; |
436
|
0
|
|
|
|
|
|
$self->_setup_can_do_list; |
437
|
|
|
|
|
|
|
} ## end sub CMD_can_do |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub CMD_can_do_timeout { |
440
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
441
|
0
|
|
|
|
|
|
my $ar = shift; |
442
|
|
|
|
|
|
|
|
443
|
0
|
|
|
|
|
|
my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/; |
444
|
|
|
|
|
|
|
|
445
|
0
|
0
|
|
|
|
|
if (defined $timeout) { |
446
|
0
|
|
|
|
|
|
$self->{can_do}->{$task} = $timeout; |
447
|
|
|
|
|
|
|
} |
448
|
|
|
|
|
|
|
else { |
449
|
0
|
|
|
|
|
|
$self->{can_do}->{$task} = undef; |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
|
452
|
0
|
|
|
|
|
|
$self->_setup_can_do_list; |
453
|
|
|
|
|
|
|
} ## end sub CMD_can_do_timeout |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
sub CMD_option_req { |
456
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
457
|
0
|
|
|
|
|
|
my $ar = shift; |
458
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
my $success = sub { |
460
|
0
|
|
|
0
|
|
|
return $self->res_packet("option_res", $$ar); |
461
|
0
|
|
|
|
|
|
}; |
462
|
|
|
|
|
|
|
|
463
|
0
|
0
|
|
|
|
|
if ($$ar eq 'exceptions') { |
464
|
0
|
|
|
|
|
|
$self->{options}->{exceptions} = 1; |
465
|
0
|
|
|
|
|
|
return $success->(); |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
0
|
|
|
|
|
|
return $self->error_packet("unknown_option"); |
469
|
|
|
|
|
|
|
} ## end sub CMD_option_req |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
sub CMD_set_client_id { |
472
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
473
|
0
|
|
|
|
|
|
my $ar = shift; |
474
|
|
|
|
|
|
|
|
475
|
0
|
|
|
|
|
|
$self->{client_id} = $$ar; |
476
|
0
|
|
|
|
|
|
$self->{client_id} =~ s/\s+//g; |
477
|
0
|
0
|
|
|
|
|
$self->{client_id} = "-" unless length $self->{client_id}; |
478
|
|
|
|
|
|
|
} ## end sub CMD_set_client_id |
479
|
|
|
|
|
|
|
|
480
|
|
|
|
|
|
|
sub CMD_cant_do { |
481
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
482
|
0
|
|
|
|
|
|
my $ar = shift; |
483
|
|
|
|
|
|
|
|
484
|
0
|
|
|
|
|
|
delete $self->{can_do}->{$$ar}; |
485
|
0
|
|
|
|
|
|
$self->_setup_can_do_list; |
486
|
|
|
|
|
|
|
} ## end sub CMD_cant_do |
487
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
sub CMD_get_status { |
489
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
490
|
0
|
|
|
|
|
|
my $ar = shift; |
491
|
0
|
|
|
|
|
|
my $job = $self->{server}->job_by_handle($$ar); |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
# handles can't contain nulls |
494
|
0
|
0
|
|
|
|
|
return if $$ar =~ /\0/; |
495
|
|
|
|
|
|
|
|
496
|
0
|
|
|
|
|
|
my ($known, $running, $num, $den); |
497
|
0
|
|
|
|
|
|
$known = 0; |
498
|
0
|
|
|
|
|
|
$running = 0; |
499
|
0
|
0
|
|
|
|
|
if ($job) { |
500
|
0
|
|
|
|
|
|
$known = 1; |
501
|
0
|
0
|
|
|
|
|
$running = $job->worker ? 1 : 0; |
502
|
0
|
0
|
|
|
|
|
if (my $stat = $job->status) { |
503
|
0
|
|
|
|
|
|
($num, $den) = @$stat; |
504
|
|
|
|
|
|
|
} |
505
|
|
|
|
|
|
|
} ## end if ($job) |
506
|
|
|
|
|
|
|
|
507
|
0
|
0
|
|
|
|
|
$num = '' unless defined $num; |
508
|
0
|
0
|
|
|
|
|
$den = '' unless defined $den; |
509
|
|
|
|
|
|
|
|
510
|
0
|
|
|
|
|
|
$self->res_packet("status_res", |
511
|
|
|
|
|
|
|
join("\0", $$ar, $known, $running, $num, $den)); |
512
|
|
|
|
|
|
|
} ## end sub CMD_get_status |
513
|
|
|
|
|
|
|
|
514
|
|
|
|
|
|
|
sub CMD_reset_abilities { |
515
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
516
|
|
|
|
|
|
|
|
517
|
0
|
|
|
|
|
|
$self->{can_do} = {}; |
518
|
0
|
|
|
|
|
|
$self->_setup_can_do_list; |
519
|
|
|
|
|
|
|
} ## end sub CMD_reset_abilities |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
sub _setup_can_do_list { |
522
|
0
|
|
|
0
|
|
|
my Gearman::Server::Client $self = shift; |
523
|
0
|
|
|
|
|
|
$self->{can_do_list} = [keys %{ $self->{can_do} }]; |
|
0
|
|
|
|
|
|
|
524
|
0
|
|
|
|
|
|
$self->{can_do_iter} = 0; |
525
|
|
|
|
|
|
|
} |
526
|
|
|
|
|
|
|
|
527
|
0
|
|
|
0
|
0
|
|
sub CMD_submit_job { push @_, 1; &_cmd_submit_job; } |
|
0
|
|
|
|
|
|
|
528
|
0
|
|
|
0
|
0
|
|
sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; } |
|
0
|
|
|
|
|
|
|
529
|
0
|
|
|
0
|
0
|
|
sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; } |
|
0
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
sub _cmd_submit_job { |
532
|
0
|
|
|
0
|
|
|
my Gearman::Server::Client $self = shift; |
533
|
0
|
|
|
|
|
|
my $ar = shift; |
534
|
0
|
|
|
|
|
|
my $subscribe = shift; |
535
|
0
|
|
|
|
|
|
my $high_pri = shift; |
536
|
|
|
|
|
|
|
|
537
|
0
|
0
|
|
|
|
|
return $self->error_packet("invalid_args", "No func/uniq header [$$ar].") |
538
|
|
|
|
|
|
|
unless $$ar =~ s/^(.+?)\0(.*?)\0//; |
539
|
|
|
|
|
|
|
|
540
|
0
|
|
|
|
|
|
my ($func, $uniq) = ($1, $2); |
541
|
|
|
|
|
|
|
|
542
|
0
|
|
|
|
|
|
my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar, |
543
|
|
|
|
|
|
|
$high_pri); |
544
|
|
|
|
|
|
|
|
545
|
0
|
0
|
|
|
|
|
if ($subscribe) { |
546
|
0
|
|
|
|
|
|
$job->add_listener($self); |
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
else { |
549
|
|
|
|
|
|
|
# background mode |
550
|
0
|
|
|
|
|
|
$job->require_listener(0); |
551
|
|
|
|
|
|
|
} |
552
|
|
|
|
|
|
|
|
553
|
0
|
|
|
|
|
|
$self->res_packet("job_created", $job->handle); |
554
|
0
|
|
|
|
|
|
$self->{server}->wake_up_sleepers($func); |
555
|
|
|
|
|
|
|
} ## end sub _cmd_submit_job |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
sub res_packet { |
558
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
559
|
0
|
|
|
|
|
|
my ($code, $arg) = @_; |
560
|
0
|
|
|
|
|
|
$self->write(Gearman::Util::pack_res_command($code, $arg)); |
561
|
0
|
|
|
|
|
|
return 1; |
562
|
|
|
|
|
|
|
} ## end sub res_packet |
563
|
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
sub error_packet { |
565
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
566
|
0
|
|
|
|
|
|
my ($code, $msg) = @_; |
567
|
0
|
|
|
|
|
|
$self->write(Gearman::Util::pack_res_command("error", "$code\0$msg")); |
568
|
0
|
|
|
|
|
|
return 0; |
569
|
|
|
|
|
|
|
} ## end sub error_packet |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
sub process_cmd { |
572
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
573
|
0
|
|
|
|
|
|
my $cmd = shift; |
574
|
0
|
|
|
|
|
|
my $blob = shift; |
575
|
|
|
|
|
|
|
|
576
|
0
|
|
|
|
|
|
my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd); |
577
|
0
|
|
|
|
|
|
my $ret = eval { $self->$cmd_name(\$blob); }; |
|
0
|
|
|
|
|
|
|
578
|
0
|
0
|
|
|
|
|
return $ret unless $@; |
579
|
0
|
|
|
|
|
|
warn "Error: $@\n"; |
580
|
0
|
|
|
|
|
|
return $self->error_packet("server_error", $@); |
581
|
|
|
|
|
|
|
} ## end sub process_cmd |
582
|
|
|
|
|
|
|
|
583
|
0
|
|
|
0
|
1
|
|
sub event_err { my $self = shift; $self->close; } |
|
0
|
|
|
|
|
|
|
584
|
0
|
|
|
0
|
1
|
|
sub event_hup { my $self = shift; $self->close; } |
|
0
|
|
|
|
|
|
|
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
############################################################################ |
587
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
=head1 Line based commands |
589
|
|
|
|
|
|
|
|
590
|
|
|
|
|
|
|
These commands are used for administrative or statistic tasks to be done on the |
591
|
|
|
|
|
|
|
gearman server. They can be entered using a line based client (telnet, etc.) by |
592
|
|
|
|
|
|
|
connecting to the listening port (4730) and are also intended to be machine |
593
|
|
|
|
|
|
|
parsable. |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
=head2 "workers" |
596
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
Emits list of registered workers, their fds, IPs, client ids, and list of |
598
|
|
|
|
|
|
|
registered abilities (function names they can do). Of format: |
599
|
|
|
|
|
|
|
|
600
|
|
|
|
|
|
|
fd ip.x.y.z client_id : func_a func_b func_c |
601
|
|
|
|
|
|
|
fd ip.x.y.z client_id : func_a func_b func_c |
602
|
|
|
|
|
|
|
fd ip.x.y.z client_id : func_a func_b func_c |
603
|
|
|
|
|
|
|
. |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
It ends with a line with just a period. |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=cut |
608
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
sub TXTCMD_workers { |
610
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
611
|
|
|
|
|
|
|
|
612
|
0
|
|
|
|
|
|
foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) { |
|
0
|
|
|
|
|
|
|
613
|
0
|
|
|
|
|
|
my $fd = $cl->{fd}; |
614
|
0
|
|
|
|
|
|
$self->write("$fd " |
615
|
|
|
|
|
|
|
. $cl->peer_ip_string |
616
|
0
|
|
|
|
|
|
. " $cl->{client_id} : @{$cl->{can_do_list}}\n"); |
617
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
} ## end foreach my $cl (sort { $a->...}) |
619
|
0
|
|
|
|
|
|
$self->write(".\n"); |
620
|
|
|
|
|
|
|
} ## end sub TXTCMD_workers |
621
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
=head2 "status" |
623
|
|
|
|
|
|
|
|
624
|
|
|
|
|
|
|
The output format of this function is tab separated columns as follows, |
625
|
|
|
|
|
|
|
followed by a line consisting of a fullstop and a newline (".\n") to indicate |
626
|
|
|
|
|
|
|
the end of output. |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
=over |
629
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
=item Function name |
631
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
A string denoting the name of the function of the job |
633
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
=item Number in queue |
635
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
A positive integer indicating the total number of jobs for this function in the |
637
|
|
|
|
|
|
|
queue. This includes currently running ones as well (next column) |
638
|
|
|
|
|
|
|
|
639
|
|
|
|
|
|
|
=item Number of jobs running |
640
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
A positive integer showing how many jobs of this function are currently running |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
=item Number of capable workers |
644
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
A positive integer denoting the maximum possible count of workers that could be |
646
|
|
|
|
|
|
|
doing this job. Though they may not all be working on it due to other tasks |
647
|
|
|
|
|
|
|
holding them busy. |
648
|
|
|
|
|
|
|
|
649
|
|
|
|
|
|
|
=back |
650
|
|
|
|
|
|
|
|
651
|
|
|
|
|
|
|
=cut |
652
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
sub TXTCMD_status { |
654
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
655
|
|
|
|
|
|
|
|
656
|
0
|
|
|
|
|
|
my %funcs; # func -> 1 (set of all funcs to display) |
657
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
# keep track of how many workers can do which functions |
659
|
|
|
|
|
|
|
my %can; |
660
|
0
|
|
|
|
|
|
foreach my $client ($self->{server}->clients) { |
661
|
0
|
|
|
|
|
|
foreach my $func (@{ $client->{can_do_list} }) { |
|
0
|
|
|
|
|
|
|
662
|
0
|
|
|
|
|
|
$can{$func}++; |
663
|
0
|
|
|
|
|
|
$funcs{$func} = 1; |
664
|
|
|
|
|
|
|
} |
665
|
|
|
|
|
|
|
} ## end foreach my $client ($self->...) |
666
|
|
|
|
|
|
|
|
667
|
0
|
|
|
|
|
|
my %queued_funcs; |
668
|
|
|
|
|
|
|
my %running_funcs; |
669
|
|
|
|
|
|
|
|
670
|
0
|
|
|
|
|
|
foreach my $job ($self->{server}->jobs) { |
671
|
0
|
|
|
|
|
|
my $func = $job->func; |
672
|
0
|
|
|
|
|
|
$queued_funcs{$func}++; |
673
|
0
|
0
|
|
|
|
|
if ($job->worker) { |
674
|
0
|
|
|
|
|
|
$running_funcs{$func}++; |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
} ## end foreach my $job ($self->{server...}) |
677
|
|
|
|
|
|
|
|
678
|
|
|
|
|
|
|
# also include queued functions (even if there aren't workers) |
679
|
|
|
|
|
|
|
# in our list of funcs to show. |
680
|
0
|
|
|
|
|
|
$funcs{$_} = 1 foreach keys %queued_funcs; |
681
|
|
|
|
|
|
|
|
682
|
0
|
|
|
|
|
|
foreach my $func (sort keys %funcs) { |
683
|
0
|
|
0
|
|
|
|
my $queued = $queued_funcs{$func} || 0; |
684
|
0
|
|
0
|
|
|
|
my $running = $running_funcs{$func} || 0; |
685
|
0
|
|
0
|
|
|
|
my $can = $can{$func} || 0; |
686
|
0
|
|
|
|
|
|
$self->write("$func\t$queued\t$running\t$can\n"); |
687
|
|
|
|
|
|
|
} ## end foreach my $func (sort keys...) |
688
|
|
|
|
|
|
|
|
689
|
0
|
|
|
|
|
|
$self->write(".\n"); |
690
|
|
|
|
|
|
|
} ## end sub TXTCMD_status |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
=head2 "jobs" |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
Output format is zero or more lines of: |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
[Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
Follows by a single line of: |
699
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
.\n |
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
\t is a literal tab character |
703
|
|
|
|
|
|
|
\n is perl's definition of newline (literal \n on linux, something else on win32) |
704
|
|
|
|
|
|
|
|
705
|
|
|
|
|
|
|
=cut |
706
|
|
|
|
|
|
|
|
707
|
|
|
|
|
|
|
sub TXTCMD_jobs { |
708
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
709
|
|
|
|
|
|
|
|
710
|
0
|
|
|
|
|
|
foreach my $job ($self->{server}->jobs) { |
711
|
0
|
|
|
|
|
|
my $func = $job->func; |
712
|
0
|
|
|
|
|
|
my $uniq = $job->uniq; |
713
|
0
|
|
|
|
|
|
my $worker_addr = "-"; |
714
|
|
|
|
|
|
|
|
715
|
0
|
0
|
|
|
|
|
if (my $worker = $job->worker) { |
716
|
0
|
|
|
|
|
|
$worker_addr = $worker->peer_addr_string; |
717
|
|
|
|
|
|
|
} |
718
|
|
|
|
|
|
|
|
719
|
0
|
|
|
|
|
|
my $listeners = $job->listeners; |
720
|
|
|
|
|
|
|
|
721
|
0
|
|
|
|
|
|
$self->write("$func\t$uniq\t$worker_addr\t$listeners\n"); |
722
|
|
|
|
|
|
|
} ## end foreach my $job ($self->{server...}) |
723
|
|
|
|
|
|
|
|
724
|
0
|
|
|
|
|
|
$self->write(".\n"); |
725
|
|
|
|
|
|
|
} ## end sub TXTCMD_jobs |
726
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
=head2 "clients" |
728
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
Output format is zero or more sections of: |
730
|
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
=over |
732
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
One line of: |
734
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
[Client Address]\n |
736
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
Followed by zero or more lines of: |
738
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
\t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
=back |
742
|
|
|
|
|
|
|
|
743
|
|
|
|
|
|
|
Follows by a single line of: |
744
|
|
|
|
|
|
|
|
745
|
|
|
|
|
|
|
.\n |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
\t is a literal tab character |
748
|
|
|
|
|
|
|
\n is perl's definition of newline (literal \n on linux, something else on win32) |
749
|
|
|
|
|
|
|
|
750
|
|
|
|
|
|
|
=cut |
751
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
sub TXTCMD_clients { |
753
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
754
|
|
|
|
|
|
|
|
755
|
0
|
|
|
|
|
|
my %jobs_by_client; |
756
|
|
|
|
|
|
|
|
757
|
0
|
|
|
|
|
|
foreach my $job ($self->{server}->jobs) { |
758
|
0
|
|
|
|
|
|
foreach my $client ($job->listeners) { |
759
|
0
|
|
0
|
|
|
|
my $ent = $jobs_by_client{$client} ||= []; |
760
|
0
|
|
|
|
|
|
push @$ent, $job; |
761
|
|
|
|
|
|
|
} |
762
|
|
|
|
|
|
|
} ## end foreach my $job ($self->{server...}) |
763
|
|
|
|
|
|
|
|
764
|
0
|
|
|
|
|
|
foreach my $client ($self->{server}->clients) { |
765
|
0
|
|
|
|
|
|
my $client_addr = $client->peer_addr_string; |
766
|
0
|
|
|
|
|
|
$self->write("$client_addr\n"); |
767
|
0
|
|
0
|
|
|
|
my $jobs = $jobs_by_client{$client} || []; |
768
|
|
|
|
|
|
|
|
769
|
0
|
|
|
|
|
|
foreach my $job (@$jobs) { |
770
|
0
|
|
|
|
|
|
my $func = $job->func; |
771
|
0
|
|
|
|
|
|
my $uniq = $job->uniq; |
772
|
0
|
|
|
|
|
|
my $worker_addr = "-"; |
773
|
|
|
|
|
|
|
|
774
|
0
|
0
|
|
|
|
|
if (my $worker = $job->worker) { |
775
|
0
|
|
|
|
|
|
$worker_addr = $worker->peer_addr_string; |
776
|
|
|
|
|
|
|
} |
777
|
0
|
|
|
|
|
|
$self->write("\t$func\t$uniq\t$worker_addr\n"); |
778
|
|
|
|
|
|
|
} ## end foreach my $job (@$jobs) |
779
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
} ## end foreach my $client ($self->...) |
781
|
|
|
|
|
|
|
|
782
|
0
|
|
|
|
|
|
$self->write(".\n"); |
783
|
|
|
|
|
|
|
} ## end sub TXTCMD_clients |
784
|
|
|
|
|
|
|
|
785
|
|
|
|
|
|
|
sub TXTCMD_gladiator { |
786
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
787
|
0
|
|
0
|
|
|
|
my $args = shift || ""; |
788
|
0
|
|
|
|
|
|
my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;"; |
789
|
0
|
0
|
|
|
|
|
if ($has_gladiator) { |
790
|
0
|
|
|
|
|
|
my $all = Devel::Gladiator::walk_arena(); |
791
|
0
|
|
|
|
|
|
my %ct; |
792
|
0
|
|
|
|
|
|
foreach my $it (@$all) { |
793
|
0
|
|
|
|
|
|
$ct{ ref $it }++; |
794
|
0
|
0
|
|
|
|
|
if (ref $it eq "CODE") { |
795
|
0
|
|
|
|
|
|
my $name = Devel::Peek::CvGV($it); |
796
|
0
|
0
|
|
|
|
|
$ct{$name}++ if $name =~ /ANON/; |
797
|
|
|
|
|
|
|
} |
798
|
|
|
|
|
|
|
} ## end foreach my $it (@$all) |
799
|
0
|
|
|
|
|
|
$all = undef; # required to free memory |
800
|
0
|
|
|
|
|
|
foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) { |
|
0
|
|
|
|
|
|
|
801
|
0
|
0
|
0
|
|
|
|
next unless $ct{$n} > 1 || $args eq "all"; |
802
|
0
|
|
|
|
|
|
$self->write(sprintf("%7d $n\n", $ct{$n})); |
803
|
|
|
|
|
|
|
} |
804
|
|
|
|
|
|
|
} ## end if ($has_gladiator) |
805
|
0
|
|
|
|
|
|
$self->write(".\n"); |
806
|
|
|
|
|
|
|
} ## end sub TXTCMD_gladiator |
807
|
|
|
|
|
|
|
|
808
|
|
|
|
|
|
|
=head2 "maxqueue" function [max_queue_size] |
809
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
For a given function of job, the maximum queue size is adjusted to be |
811
|
|
|
|
|
|
|
max_queue_size jobs long. A negative value indicates unlimited queue size. |
812
|
|
|
|
|
|
|
|
813
|
|
|
|
|
|
|
If the max_queue_size value is not supplied then it is unset (and the default |
814
|
|
|
|
|
|
|
maximum queue size will apply to this function). |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
This function will return OK upon success, and will return ERR incomplete_args |
817
|
|
|
|
|
|
|
upon an invalid number of arguments. |
818
|
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
=cut |
820
|
|
|
|
|
|
|
|
821
|
|
|
|
|
|
|
sub TXTCMD_maxqueue { |
822
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
823
|
0
|
|
|
|
|
|
my $args = shift; |
824
|
0
|
|
|
|
|
|
my ($func, $max) = split /\s+/, $args; |
825
|
|
|
|
|
|
|
|
826
|
0
|
0
|
|
|
|
|
unless (length $func) { |
827
|
0
|
|
|
|
|
|
return $self->err_line('incomplete_args'); |
828
|
|
|
|
|
|
|
} |
829
|
|
|
|
|
|
|
|
830
|
0
|
|
|
|
|
|
$self->{server}->set_max_queue($func, $max); |
831
|
0
|
|
|
|
|
|
$self->write("OK\n"); |
832
|
|
|
|
|
|
|
} ## end sub TXTCMD_maxqueue |
833
|
|
|
|
|
|
|
|
834
|
|
|
|
|
|
|
=head2 "version" |
835
|
|
|
|
|
|
|
|
836
|
|
|
|
|
|
|
Returns server version. |
837
|
|
|
|
|
|
|
|
838
|
|
|
|
|
|
|
=cut |
839
|
|
|
|
|
|
|
|
840
|
|
|
|
|
|
|
sub TXTCMD_version { |
841
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
842
|
0
|
|
|
|
|
|
$self->write("$Gearman::Server::VERSION\n"); |
843
|
|
|
|
|
|
|
} |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
sub err_line { |
846
|
0
|
|
|
0
|
0
|
|
my Gearman::Server::Client $self = shift; |
847
|
0
|
|
|
|
|
|
my $err_code = shift; |
848
|
0
|
|
|
|
|
|
my %err_text = ( |
849
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
# numeric iterator for where we start looking for jobl |
851
|
|
|
|
|
|
|
unknown_command => "Unknown server command", |
852
|
|
|
|
|
|
|
unknown_args => "Unknown arguments to server command", |
853
|
|
|
|
|
|
|
incomplete_args => |
854
|
|
|
|
|
|
|
"An incomplete set of arguments was sent to this command", |
855
|
|
|
|
|
|
|
); |
856
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
$self->write( |
858
|
|
|
|
|
|
|
join '', |
859
|
|
|
|
|
|
|
"ERR $err_code ", |
860
|
0
|
|
0
|
|
|
|
eurl($err_text{$err_code}) || '', "\r\n" |
861
|
|
|
|
|
|
|
); |
862
|
0
|
|
|
|
|
|
return 0; |
863
|
|
|
|
|
|
|
} ## end sub err_line |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
sub eurl { |
866
|
0
|
|
|
0
|
0
|
|
my $a = $_[0]; |
867
|
0
|
0
|
|
|
|
|
$a || return; |
868
|
0
|
|
|
|
|
|
$a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; |
|
0
|
|
|
|
|
|
|
869
|
0
|
|
|
|
|
|
$a =~ tr/ /+/; |
870
|
0
|
|
|
|
|
|
return $a; |
871
|
|
|
|
|
|
|
} ## end sub eurl |
872
|
|
|
|
|
|
|
|
873
|
|
|
|
|
|
|
1; |