File Coverage

blib/lib/Queue/Beanstalk.pm
Criterion Covered Total %
statement 21 246 8.5
branch 0 142 0.0
condition 0 58 0.0
subroutine 7 24 29.1
pod 6 11 54.5
total 34 481 7.0


line stmt bran cond sub pod time code
1             package Queue::Beanstalk;
2              
3 1     1   26042 use 5.006002;
  1         4  
  1         41  
4 1     1   7 use Carp;
  1         2  
  1         171  
5 1     1   1143 use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
  1         6641  
  1         253  
6 1     1   1712 use IO::Handle ();
  1         8424  
  1         30  
7 1     1   965 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
  1         1384  
  1         164  
8 1     1   6 use strict;
  1         2  
  1         35  
9 1     1   5 use warnings;
  1         2  
  1         3154  
10              
11             require Exporter;
12              
13             our @ISA = qw(Exporter);
14              
15             our @EXPORT_OK = qw();
16             our @EXPORT = qw();
17              
18             our $VERSION = '0.02';
19              
20             our $FLAG_NOSIGNAL = 0;
21             eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
22              
23             sub new {
24 0     0 1   my $classname = shift();
25              
26 0           my $self = {
27             # Defaults
28             'report_errors' => 1,
29             'random_servers' => 1,
30             'connect_timeout' => 0.25,
31             'select_timeout' => 1.0,
32             'reserve_timeout' => 10, # if there is no job to do, wait a bit
33             'auto_next_server' => 0, # usually not what you want
34             'servers' => [ '127.0.0.1:11300' ],
35              
36             # Internals
37             'errstr' => '',
38             'warnstr' => '',
39             '_connect_retries' => 0,
40             'sock' => undef,
41             };
42              
43 0 0         my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
44              
45             # Default: Retry one for each server (problems with connecting will do a
46             # round robin connect for this many times.)
47 0 0         $self->{'max_autoretry'} = scalar(@{$args->{'servers'}||$self->{'servers'}});
  0            
48              
49 0           $self->{$_} = $args->{$_} foreach (keys %$args); # update options
50              
51              
52 0           bless $self, $classname;
53              
54             # Connect to first/random server
55 0           $self->next_server();
56              
57 0           $self;
58             }
59              
60             sub warn {
61 0     0 0   my ($self, $message) = @_;
62 0           $self->{'warnstr'} = $message;
63 0 0         carp $message if ($self->{'report_errors'});
64             }
65              
66             sub die {
67 0     0 0   my ($self, $message) = @_;
68 0           $self->{'errstr'} = $message;
69 0 0         croak $message if ($self->{'report_errors'});
70             }
71              
72             sub next_server {
73 0     0 0   my $self = shift;
74 0   0       my $internal = shift || 0;
75              
76 0 0 0       if ($self->{'random_servers'} && !$internal) {
77             # get random server
78 0           $self->{'current_server'} = int( rand( scalar(@{$self->{'servers'}}) ) );
  0            
79             } else {
80 0 0         if (!defined $self->{'current_server'}) {
81             # First connection
82 0           $self->{'current_server'} = 0;
83             } else {
84             # round robin 'election'
85 0           $self->{'current_server'}++;
86 0           $self->{'current_server'} %= scalar(@{$self->{'servers'}});
  0            
87             }
88             }
89              
90             # In case of connection errors or if all servers is in "draining mode",
91             # reconnect only this many times
92             # NOTE: Will try to reconnect 'for ever' if no servers responds
93             # and report_errors are nontrue.
94 0 0 0       if ($internal && ($self->{'_connect_retries'}++ >= $self->{'max_autoretry'})) {
95 0           $self->die('Could not connect to servers after ' . $self->{'max_autoretry'} . ' attempts.');
96             }
97 0           $self->connect();
98             }
99              
100             sub connect {
101 0     0 0   my $self = shift;
102 0           my $sock = $self->{'sock'};
103              
104 0 0         if (defined $sock) {
105             # A socket was already open
106 0           close $sock;
107             }
108              
109 0           my ($ip,$port) = split /:/, @{$self->{'servers'}}[ $self->{'current_server'} ];
  0            
110              
111 0           my $proto = getprotobyname('tcp');
112 0           socket($sock, PF_INET, SOCK_STREAM, $proto);
113 0           my $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
114              
115             # The following code is borrowed heavily from Cache::Memcached
116              
117 0 0         if ($self->{'connect_timeout'}) {
118 0           IO::Handle::blocking($sock, 0);
119             } else {
120 0           IO::Handle::blocking($sock, 1);
121             }
122              
123 0           my $ret = connect($sock, $sin);
124              
125 0 0 0       if (!$ret && $self->{'connect_timeout'} && $! == EINPROGRESS) {
      0        
126              
127 0           my $win='';
128 0           vec($win, fileno($sock), 1) = 1;
129              
130 0 0         if (select(undef, $win, undef, $self->{'connect_timeout'}) > 0) {
131 0           $ret = connect($sock, $sin);
132             # EISCONN means connected & won't re-connect, so success
133 0 0 0       $ret = 1 if !$ret && $!==EISCONN;
134             }
135             }
136              
137 0 0         unless ($self->{'connect_timeout'}) { # socket was temporarily blocking, now revert
138 0           IO::Handle::blocking($sock, 0);
139             }
140              
141             # from here on, we use non-blocking (async) IO for the duration
142             # of the socket's life
143              
144             # disable buffering
145 0           my $old = select($sock);
146 0           $| = 1;
147 0           select($old);
148              
149 0           $self->{'sock'} = $sock;
150              
151 0 0         $self->next_server(1) unless $ret;
152              
153 0           return $ret;
154             }
155              
156             # based upon _write_and_read() found in Cache::Memcached
157             sub _write_and_read_data {
158 0     0     my ($self, $line, $check_header) = @_;
159 0           my $sock = $self->{'sock'};
160 0           my ($res,$ret,$offset,$toread) = (undef, undef, 0, 0);
161 0           my @return;
162              
163             # default: stats handler
164             $check_header ||= sub {
165 0 0   0     if (m/OK (\d+)/) {
166 0           return $1;
167             } else {
168 0           return 0;
169             }
170 0   0       };
171              
172             # state: 0 - writing, 1 - reading header, 2 - reading data, 3 - done
173 0           my $state = 0; # writing
174              
175             # the bitsets for select
176 0           my ($rin, $rout, $win, $wout);
177 0           my $nfound;
178              
179 0           my $last_state = -1;
180 0 0         local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
181              
182 0 0         IO::Handle::blocking($sock, 1) if (!$self->{'select_timeout'});
183              
184             # select loop
185 0           while (1) {
186 0 0         if ($last_state != $state) {
187 0 0         last if $state == 3; # done
188 0           ($rin, $win) = ('','');
189 0 0 0       vec($rin, fileno($sock), 1) = 1 if $state == 1 || $state == 2; # reading
190 0 0         vec($win, fileno($sock), 1) = 1 if $state == 0; # writing
191 0           $last_state = $state;
192             }
193              
194 0           $nfound = select($rout=$rin, $wout=$win, undef, $self->{'select_timeout'});
195 0 0         last unless $nfound;
196              
197 0 0         if (vec($wout, fileno($sock), 1)) {
198 0           $res = send($sock, $line, $FLAG_NOSIGNAL);
199            
200 0 0 0       next if not defined $res and $! == EWOULDBLOCK;
201              
202 0 0 0       if (!defined $res || $res <= 0) {
203 0           $self->next_server(1); # disconnected, reconnect
204 0           return undef;
205             }
206              
207 0 0         if ($res == length($line)) { # all data sent
208 0           $state = 1; # start reading
209             } else {
210 0           substr($line, 0, $res, ''); # delete the part we sent
211             }
212             }
213              
214 0 0         if (vec($rout, fileno($sock), 1)) {
215              
216 0           $res = sysread($sock, $ret, 255, $offset);
217              
218 0 0 0       next if not defined $res and $! == EWOULDBLOCK;
219              
220 0 0         if ($res <= 0) {
221 0           $self->next_server(1); # disconnected, reconnect
222 0           return undef;
223             }
224              
225 0           $offset += $res; # read $res bytes
226              
227 0 0 0       if ($state == 1 && $ret =~ m/\r\n/) {
228 0           @return = ($check_header->($ret));
229 0 0         return undef unless defined $return[0];
230              
231 0           $state = 2; # read data
232              
233 0           $ret =~ s/.+?\r\n//; # remove header
234 0           $offset = length($ret); # update offset
235              
236 0           $toread = $return[0]; # Number of bytes to read
237             }
238              
239 0 0 0       if ($state == 2 && (($offset - 2) == $toread)) { # $toread = number of bytes to read, minus \r\n
240 0           substr($ret,$offset - 2,2) = '';
241 0           $state = 3;
242             }
243              
244             }
245             }
246              
247 0 0         unless ($state == 3) { # done
248 0           $self->next_server(1); # improperly finished, reconnect
249 0           return undef;
250             }
251              
252 0 0         IO::Handle::blocking($sock, 0) if (!$self->{'select_timeout'});
253              
254 0           return $ret, @return;
255             }
256              
257             # heavily based upon the same function found in Cache::Memcached
258             sub _write_and_read {
259 0     0     my ($self, $line, $check_complete) = @_;
260 0           my $sock = $self->{'sock'};
261 0           my ($res,$ret,$offset) = (undef, undef, 0);
262              
263             $check_complete ||= sub {
264 0     0     return (rindex($ret, "\r\n") + 2 == length($ret));
265 0   0       };
266              
267             # state: 0 - writing, 1 - reading, 2 - done
268 0           my $state = 0; # writing
269              
270             # the bitsets for select
271 0           my ($rin, $rout, $win, $wout);
272 0           my $nfound;
273              
274 0           my $last_state = -1;
275 0 0         local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
276              
277             # select loop
278 0           while (1) {
279 0 0         if ($last_state != $state) {
280 0 0         last if $state == 2; # done
281 0           ($rin, $win) = ('','');
282 0 0         vec($rin, fileno($sock), 1) = 1 if $state == 1; # reading
283 0 0         vec($win, fileno($sock), 1) = 1 if $state == 0; # writing
284 0           $last_state = $state;
285             }
286              
287 0           $nfound = select($rout=$rin, $wout=$win, undef, $self->{'select_timeout'});
288 0 0         last unless $nfound;
289              
290 0 0         if (vec($wout, fileno($sock), 1)) {
291 0           $res = send($sock, $line, $FLAG_NOSIGNAL);
292            
293 0 0 0       next if not defined $res and $! == EWOULDBLOCK;
294              
295 0 0 0       if (!defined $res || $res <= 0) {
296 0           $self->next_server(1); # disconnected, reconnect
297 0           return undef;
298             }
299              
300 0 0         if ($res == length($line)) { # all data sent
301 0           $state = 1; # start reading
302             } else {
303 0           substr($line, 0, $res, ''); # delete the part we sent
304             }
305             }
306              
307 0 0         if (vec($rout, fileno($sock), 1)) {
308 0           $res = sysread($sock, $ret, 255, $offset);
309              
310 0 0 0       next if not defined $res and $! == EWOULDBLOCK;
311              
312 0 0         if ($res <= 0) {
313 0           $self->next_server(1); # disconnected, reconnect
314 0           return undef;
315             }
316              
317 0           $offset += $res; # read $res bytes
318              
319 0 0         $state = 2 if $check_complete->(\$ret); # are we done reading?
320             }
321             }
322              
323 0 0         unless ($state == 2) { # done
324 0           $self->next_server(1); # improperly finished, reconnect
325 0           return undef;
326             }
327              
328 0           $self->{'last_message'} = $ret;
329              
330 0           return $ret;
331             }
332              
333             sub handle_errors ($$$@) {
334 0     0 0   my ($self, $message, $command, @args) = @_;
335              
336             # Try next server if possible
337 0 0         if ($message =~ m/DRAINING/i) {
338 0           $self->next_server(1);
339 0           shift @args;
340 0           return $self->$command(@args);
341             }
342 0           return undef;
343             }
344              
345             sub put {
346 0     0 1   my ($self, $data, $pri, $delay, $ttr) = @_;
347              
348 0   0       $pri ||= 4294967295;
349 0           $pri %= 2**32;
350 0   0       $delay ||= 0;
351 0           $delay = int($delay);
352 0 0         $ttr = defined $ttr ? int($ttr) : 120;
353              
354 0           my $ret = $self->_write_and_read("put $pri $delay $ttr " . length($data) . "\r\n$data\r\n");
355              
356 0 0         return undef unless defined $ret;
357              
358 0 0         $self->next_server if $self->{'auto_next_server'};
359              
360 0 0         if ($ret =~ m/INSERTED (\d+)/) {
361 0           $self->{'last_insert_id'} = $1;
362 0           return 'inserted';
363             }
364 0 0         return 'buried' if $ret =~ m/BURIED/;
365              
366            
367              
368 0 0         $self->warn('Invalid data returned from server') unless $self->handle_errors($ret,'put',@_);
369 0           return undef;
370             }
371              
372             sub stats {
373 0     0 1   my $self = shift;
374 0 0         my $id = defined $_[0] ? ' ' . int(shift()) : '';
375              
376             my ($data, $bytes) = $self->_write_and_read_data("stats$id\r\n", sub {
377 0 0   0     if ($_[0] =~ m/ok (\d+)/i) {
378 0           return ($1);
379             } else {
380 0           return undef;
381             }
382 0           });
383              
384 0           my $result = eval "use YAML; return 1;";
385 0 0         if ($result) {
386 0           return YAML::Load($data);
387             } else {
388 0           $self->warn('YAML module missing');
389 0           return $data;
390             }
391             }
392              
393             sub reserve {
394 0     0 1   my ($self) = @_;
395              
396 0 0         if ($self->{'job_id'}) {
397              
398             # Unfinished job, let someone else have it
399 0           $self->_write_and_read("release " . $self->{'job_id'} . " " . $self->{'job_pri'} . " 0\r\n");
400 0           $self->{'job_id'} = undef;
401 0           $self->{'job_pri'} = undef;
402 0           $self->{'job_data'} = undef;
403              
404             }
405              
406 0           my $old_timeout = $self->{'select_timeout'};
407 0           $self->{'select_timeout'} = $self->{'reserve_timeout'}; # set temporary timeout for reserve-request
408              
409             # Send request
410             my ($data, $bytes, $id, $pri) = $self->_write_and_read_data("reserve\r\n", sub {
411 0 0   0     if ($_[0] =~ m/reserved (\d+) (\d+) (\d+)/i) {
412 0           return ($3,$1,$2); # "bytes" value must be first return-parameter
413             } else {
414 0           return undef;
415             }
416 0           });
417              
418 0 0         return undef unless defined $bytes;
419              
420 0           $self->{'select_timeout'} = $old_timeout;
421              
422 0           $self->{'job_id'} = $id;
423 0           $self->{'job_pri'} = $pri;
424 0           $self->{'job_data'} = $data;
425              
426 0           return $data;
427             }
428              
429             sub release {
430 0     0 1   my ($self, $pri, $delay) = @_;
431              
432 0 0         if ($self->{'job_id'}) {
433 0           $self->warn('no job reserved yet');
434 0           return undef;
435             }
436 0 0 0       my $res = $self->_write_and_read("release " .
437             $self->{'job_id'} . " " .
438             ( ($pri % 2**32) || $self->{'job_pri'} ) . " " . # priority
439             ( defined $delay ? int($delay) : 0 ) . # delay
440             "\r\n");
441              
442 0 0         if ($res =~ m/RELEASED|BURIED/) {
443 0           $self->{'job_id'} = undef;
444 0           $self->{'job_pri'} = undef;
445 0           $self->{'job_data'} = undef;
446              
447 0 0         $self->next_server if $self->{'auto_next_server'};
448              
449 0 0         return 'released' if ($res =~ m/RELEASED/i);
450 0 0         return 'buried' if ($res =~ m/BURIED/i);
451             }
452 0           return undef;
453             }
454              
455              
456             sub delete {
457 0     0 1   my $self = shift;
458              
459 0 0 0       if (!defined $self->{'job_id'} || !$self->{'job_id'}) {
460 0           $self->warn('no job reserved yet');
461 0           return undef;
462             }
463              
464 0           my $res = $self->_write_and_read("delete " . $self->{'job_id'} . "\r\n");
465              
466 0 0         if ($res =~ m/DELETED/) {
467 0           $self->{'job_id'} = undef;
468 0           $self->{'job_pri'} = undef;
469 0           $self->{'job_data'} = undef;
470              
471 0 0         $self->next_server if $self->{'auto_next_server'};
472              
473 0           return 1;
474             }
475 0           return 0;
476             }
477              
478             1;
479             __END__