File Coverage

blib/lib/Gearman/Util.pm
Criterion Covered Total %
statement 54 132 40.9
branch 11 66 16.6
condition 8 16 50.0
subroutine 14 17 82.3
pod 7 7 100.0
total 94 238 39.5


line stmt bran cond sub pod time code
1             package Gearman::Util;
2 19     19   976 use version ();
  19         1285  
  19         632  
3             $Gearman::Util::VERSION = version->declare("2.003_001");
4              
5 19     19   64 use strict;
  19         24  
  19         370  
6 19     19   74 use warnings;
  19         22  
  19         490  
7              
8             # man errno
9             # Resource temporarily unavailable
10             # (may be the same value as EWOULDBLOCK) (POSIX.1)
11 19     19   858 use POSIX qw(:errno_h);
  19         8854  
  19         134  
12 19     19   12871 use Time::HiRes qw();
  19         8191  
  19         359  
13 19     19   8244 use IO::Select;
  19         22355  
  19         21666  
14              
15             =head1 NAME
16              
17             Gearman::Util - Utility functions for gearman distributed job system
18              
19             =head1 METHODS
20              
21             =cut
22              
23             sub DEBUG () {0}
24              
25             # I: to jobserver
26             # O: out of job server
27             # W: worker
28             # C: client of job server
29             # J: jobserver
30             our %cmd = (
31             1 => ['I', "can_do"], # from W: [FUNC]
32             2 => ['I', "cant_do"], # from W: [FUNC]
33             3 => ['I', "reset_abilities"], # from W: ---
34             4 => ['I', "pre_sleep"], # from W: ---
35             6 => ['O', "noop"], # J->W ---
36             7 => ['I', "submit_job"], # C->J FUNC[0]UNIQ[0]ARGS
37             8 => ['O', "job_created"], # J->C HANDLE
38             9 => ['I', "grab_job"], # W->J --
39             10 => ['O', "no_job"], # J->W --
40             11 => ['O', "job_assign"], # J->W HANDLE[0]FUNC[0]ARG
41             12 => ['IO', "work_status"], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
42             13 => ['IO', "work_complete"], # W->J/C: HANDLE[0]RES
43             14 => ['IO', "work_fail"], # W->J/C: HANDLE
44             15 => ['I', "get_status"], # C->J: HANDLE
45             16 => ['I', "echo_req"], # ?->J TEXT
46             17 => ['O', "echo_res"], # J->? TEXT
47             18 => ['I', "submit_job_bg"], # C->J " " " " "
48             19 => ['O', "error"], # J->? ERRCODE[0]ERR_TEXT
49             20 => ['O', "status_res"], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
50             21 => ['I', "submit_job_high"], # C->J FUNC[0]UNIQ[0]ARGS
51             22 => ['I', "set_client_id"], # W->J: [RANDOM_STRING_NO_WHITESPACE]
52             23 => ['I', "can_do_timeout"], # from W: FUNC[0]TIMEOUT
53              
54             # for worker to declare to the jobserver that this worker is only connected
55             # to one jobserver, so no polls/grabs will take place, and server is free
56             # to push "job_assign" packets back down.
57             24 => ['I', "all_yours"], # W->J ---
58             25 => ['IO', "work_exception"], # W->J/C: HANDLE[0]EXCEPTION
59             26 => ['I', "option_req"], # C->J: [OPT]
60             27 => ['O', "option_res"], # J->C: [OPT]
61             28 => ['IO', "work_data"], # W->J/C: HANDLE[0]RES
62             29 => ['IO', "work_warning"], # W->J/C: HANDLE[0]RES
63             32 => ['I', "submit_job_high_bg"], # C->J FUNC[0]UNIQ[0]ARGS
64             33 => ['I', "submit_job_low"], # C->J FUNC[0]UNIQ[0]ARGS
65             34 => ['I', "submit_job_low_bg"], # C->J FUNC[0]UNIQ[0]ARGS
66             );
67              
68             our %num; # name -> num
69             while (my ($num, $ary) = each %cmd) {
70             die if $num{ $ary->[1] };
71             $num{ $ary->[1] } = $num;
72             }
73              
74             =head2 cmd_name($num)
75              
76             B cmd
77              
78             =cut
79              
80             sub cmd_name {
81 39     39 1 8035 my $num = shift;
82 39         45 my $c = $cmd{$num};
83 39 50       154 return $c ? $c->[1] : undef;
84             }
85              
86             =head2 pack_req_command($key, $arg)
87              
88             B request string
89              
90             =cut
91              
92             sub pack_req_command {
93 73     73 1 624 return _pack_command("REQ", @_);
94             }
95              
96             =head2 pack_res_command($cmd, $arg)
97              
98             B response string
99              
100             =cut
101              
102             sub pack_res_command {
103 64     64 1 497 return _pack_command("RES", @_);
104             }
105              
106             =head2 read_res_packet($sock, $err_ref, $timeout)
107              
108             B undef on closed socket or malformed packet
109              
110             =cut
111              
112             sub read_res_packet {
113 0     0 1 0 warn " Entering read_res_packet" if DEBUG;
114 0         0 my $sock = shift;
115 0         0 my $err_ref = shift;
116 0         0 my $timeout = shift;
117 0         0 my $time_start = Time::HiRes::time();
118              
119             #TODO improvement for SSL socket
120             # http://search.cpan.org/~sullr/IO-Socket-SSL/lib/IO/Socket/SSL.pod#Using_Non-Blocking_Sockets
121             my $err = sub {
122 0     0   0 my $code = shift;
123 0 0       0 $sock->close() if $sock->connected;
124 0 0       0 $$err_ref = $code if ref $err_ref;
125 0         0 return undef;
126 0         0 };
127              
128 0         0 $sock->blocking(0);
129              
130 0         0 my $is = IO::Select->new($sock);
131              
132 0         0 my $readlen = 12;
133 0         0 my $offset = 0;
134 0         0 my $buf = '';
135              
136 0         0 my ($magic, $type, $len);
137              
138 0         0 warn " Starting up event loop\n" if DEBUG;
139              
140 0         0 while (1) {
141 0         0 my $time_remaining = undef;
142 0 0       0 if (defined $timeout) {
143 0         0 warn " We have a timeout of $timeout\n" if DEBUG;
144 0         0 $time_remaining = $time_start + $timeout - Time::HiRes::time();
145 0 0       0 return $err->("timeout") if $time_remaining < 0;
146             }
147              
148 0 0       0 $is->can_read($time_remaining) || next;
149              
150 0         0 warn " Entering read loop\n" if DEBUG;
151              
152 0         0 my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
153 0 0       0 if (!defined($ok)) {
    0          
154 0         0 next;
155             }
156             elsif ($ok == 0) {
157 0         0 return $err->($err_code);
158             }
159              
160 0 0       0 if (!defined $type) {
161 0 0       0 next unless length($buf) >= 12;
162 0         0 my $header = substr($buf, 0, 12, '');
163 0         0 ($magic, $type, $len) = unpack("a4NN", $header);
164 0 0       0 return $err->("malformed_magic") unless $magic eq "\0RES";
165 0         0 my $starting = length($buf);
166 0         0 $readlen = $len - $starting;
167 0         0 $offset = $starting;
168              
169 0 0       0 if ($readlen) {
170 0         0 my ($ok, $err_code)
171             = _read_sock($sock, \$buf, \$readlen, \$offset);
172 0 0       0 if (!defined($ok)) {
    0          
173 0         0 next;
174             }
175             elsif ($ok == 0) {
176 0         0 return $err->($err_code);
177             }
178             } ## end if ($readlen)
179             } ## end if (!defined $type)
180              
181 0         0 $type = $cmd{$type};
182 0 0       0 return $err->("bogus_command") unless $type;
183 0 0       0 return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
184              
185 0         0 warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
186             if DEBUG;
187              
188 0         0 $sock->blocking(1);
189              
190             return {
191 0         0 type => $type->[1],
192             len => $len,
193             blobref => \$buf,
194             };
195             } ## end while (1)
196             } ## end sub read_res_packet
197              
198             sub _read_sock {
199 0     0   0 my ($sock, $buf_ref, $readlen_ref, $offset_ref) = @_;
200 0         0 local $!;
201 0         0 my $rv = sysread($sock, $$buf_ref, $$readlen_ref, $$offset_ref);
202              
203 0 0       0 unless ($rv) {
204 0         0 warn " Read error: $!\n" if DEBUG;
205 0 0       0 $! == EAGAIN && return;
206             }
207              
208 0 0       0 return (0, "read_error") unless defined $rv;
209 0 0       0 return (0, "eof") unless $rv;
210              
211 0 0       0 unless ($rv >= $$readlen_ref) {
212 0         0 warn
213             " Partial read of $rv bytes, at offset $$offset_ref, readlen was $$readlen_ref\n"
214             if DEBUG;
215 0         0 $$offset_ref += $rv;
216 0         0 $$readlen_ref -= $rv;
217              
218 0         0 return _read_sock($sock, $buf_ref, $readlen_ref, $offset_ref);
219             } ## end unless ($rv >= $$readlen_ref)
220              
221 0         0 warn " Finished reading\n" if DEBUG;
222 0         0 return (1);
223             } ## end sub _read_sock
224              
225             =head2 read_text_status($sock, $err_ref)
226              
227             =cut
228              
229             sub read_text_status {
230 1     1 1 754 my $sock = shift;
231 1         1 my $err_ref = shift;
232              
233             my $err = sub {
234 1     1   17 my $code = shift;
235 1 50       4 $sock->close() if $sock->connected;
236 1 50       10 $$err_ref = $code if ref $err_ref;
237 1         5 return undef;
238 1         6 };
239              
240 1 50       5 $sock->connected || return $err->("can't read from unconnected socket");
241 0         0 my @lines;
242 0         0 my $complete = 0;
243 0         0 while (my $line = <$sock>) {
244 0         0 chomp $line;
245 0 0       0 return $err->($1) if $line =~ /^ERR (\w+) /;
246              
247 0 0       0 if ($line eq '.') {
248 0         0 $complete++;
249 0         0 last;
250             }
251              
252 0         0 push @lines, $line;
253             } ## end while (my $line = <$sock>)
254 0 0       0 return $err->("eof") unless $complete;
255              
256 0         0 return @lines;
257             } ## end sub read_text_status
258              
259             =head2 send_req($sock, $reqref)
260              
261             =cut
262              
263             sub send_req {
264 7     7 1 980 my ($sock, $reqref) = @_;
265 7 100       24 return 0 unless $sock;
266              
267 3         4 my $data = ${$reqref};
  3         5  
268 3         7 (my $total_len) = (my $len) = length($data);
269 3         9 my ($num_zero_writes, $offset) = (0, 0);
270 3         67 local $SIG{PIPE} = "IGNORE";
271              
272 3   33     27 while ($len && ($num_zero_writes < 5)) {
273 3         37 my $written = $sock->syswrite($data, $len, $offset);
274 2 50       33 if (!defined $written) {
    0          
275 2         2 warn "send_req: syswrite error: $!" if DEBUG;
276 2         33 return 0;
277             }
278             elsif ($written > 0) {
279 0         0 $len -= $written;
280 0         0 $offset += $written;
281             }
282             else {
283 0         0 $num_zero_writes++;
284             }
285             } ## end while ($len && ($num_zero_writes...))
286              
287 0   0     0 return ($total_len > 0 && $offset == $total_len);
288             } ## end sub send_req
289              
290             =head2 wait_for_readability($fileno, $timeout)
291              
292             given a file descriptor number and a timeout,
293              
294             wait for that descriptor to become readable
295              
296             B 0 or 1 on if it did or not
297              
298             =cut
299              
300             sub wait_for_readability {
301 1     1 1 850 my ($fileno, $timeout) = @_;
302 1 50 33     7 return 0 unless $fileno && $timeout;
303              
304 1         2 my $rin = '';
305 1         3 vec($rin, $fileno, 1) = 1;
306 1         3003210 my $nfound = select($rin, undef, undef, $timeout);
307              
308             # nfound can be undef or 0, both failures, or 1, a success
309 1 50       29 return $nfound ? 1 : 0;
310             } ## end sub wait_for_readability
311              
312             #
313             # _pack_command($prefix, $key, $arg)
314             #
315             sub _pack_command {
316 137     137   146 my ($prefix, $key, $arg) = @_;
317 137 100 100     416 ($key && $num{$key}) || die sprintf("Bogus type arg of '%s'", $key || '');
      66        
318              
319 133   100     245 $arg ||= '';
320 133         108 my $len = length($arg);
321 133         775 return "\0$prefix" . pack("NN", $num{$key}, $len) . $arg;
322             } ## end sub _pack_command
323              
324             1;