File Coverage

blib/lib/AnyEvent/Beanstalk.pm
Criterion Covered Total %
statement 33 293 11.2
branch 0 150 0.0
condition 0 162 0.0
subroutine 11 55 20.0
pod 30 33 90.9
total 74 693 10.6


line stmt bran cond sub pod time code
1             package AnyEvent::Beanstalk;
2             {
3             $AnyEvent::Beanstalk::VERSION = '1.142520';
4             }
5              
6 5     5   123684 use strict;
  5         15  
  5         2065  
7 5     5   44 use warnings;
  5         7  
  5         249  
8              
9 5     5   27 use constant DEBUG => $ENV{AE_BEANSTALK_DEBUG};
  5         18  
  5         456  
10 5     5   32 use Scalar::Util qw(blessed);
  5         10  
  5         1491  
11              
12 5     5   12022 use AnyEvent;
  5         46828  
  5         189  
13 5     5   9120 use AnyEvent::Handle;
  5         200652  
  5         217  
14 5     5   6015 use AnyEvent::Socket;
  5         137487  
  5         1579  
15 5     5   6871 use AnyEvent::Beanstalk::Job;
  5         18  
  5         3533  
16 5     5   3211 use AnyEvent::Beanstalk::Stats;
  5         12  
  5         147  
17              
18 5     5   29 use base qw(Class::Accessor::Fast);
  5         8  
  5         9139  
19              
20             __PACKAGE__->mk_accessors(
21             qw< decoder delay encoder on_error on_connect priority server socket ttr > ##
22             );
23              
24             my $YAML_CLASS = do {
25             local ($SIG{__DIE__}, $SIG{__WARN__});
26             eval { require YAML::XS } ? 'YAML::XS'
27             : eval { require YAML::Syck } ? 'YAML::Syck'
28             : eval { require YAML } ? 'YAML'
29             : die $@;
30             };
31             my $YAML_LOAD = $YAML_CLASS->can('Load');
32             my $YAML_DUMP = $YAML_CLASS->can('Dump');
33              
34              
35             sub new {
36 0     0 1   my $proto = shift;
37 0           my %arg = @_;
38              
39 0   0       bless(
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
40             { delay => $arg{delay} || 0,
41             ttr => $arg{ttr} || 120,
42             priority => $arg{priority} || 10_000,
43             encoder => $arg{encoder} || $YAML_DUMP,
44             decoder => $arg{decoder} || $YAML_LOAD,
45             server => $arg{server} || undef,
46             debug => $arg{debug} || 0,
47             on_error => $arg{on_error} || undef,
48             on_connect => $arg{on_connect} || undef,
49             },
50             ref($proto) || $proto
51             );
52             }
53              
54              
55             sub run_cmd {
56 0     0 0   my $self = shift;
57              
58 0 0         $self->{_cmd_cb} or return $self->connect(@_);
59 0           $self->{_cmd_cb}->(@_);
60             }
61              
62              
63 0     0 1   sub quit { shift->disconnect }
64 0 0   0 1   sub reserve_pending { shift->{_reserve_pending} || 0 }
65              
66              
67             sub disconnect {
68 0     0 1   my $self = shift;
69 0           my $condvar = delete $self->{_condvar};
70 0           delete @{$self}{grep {/^_[a-z]/} keys %$self};
  0            
  0            
71 0 0         if ($condvar) {
72 0           $_->send for values %$condvar;
73             }
74 0           return;
75             }
76              
77              
78             sub _error {
79 0     0     my $self = shift;
80 0           $self->disconnect;
81 0   0 0     ($self->on_error || sub { die @_ })->(@_);
  0            
82             }
83              
84              
85             sub reconnect {
86 0     0 1   my $self = shift;
87              
88 0   0       my $using = $self->{__using} || 'default';
89             $self->use(
90             $using,
91             sub {
92 0 0 0 0     $self->_error("Can't use '$using'") unless @_ and $_[0] eq 'USING';
93             }
94 0           );
95              
96 0   0       my $watching = $self->{__watching} || {default => 1};
97             $self->watch_only(
98             keys %$watching,
99             sub {
100 0 0 0 0     $self->_error("Error watching tubes") unless @_ and $_[0] eq 'WATCHING';
101             }
102 0           );
103             }
104              
105             my %EXPECT = qw(
106             put INSERTED
107             use USING
108             reserve RESERVED
109             reserve-with-timeout RESERVED
110             delete DELETED
111             release RELEASED
112             bury BURIED
113             touch TOUCHED
114             watch WATCHING
115             ignore WATCHING
116             peek FOUND
117             peek-ready FOUND
118             peek-delayed FOUND
119             peek-buried FOUND
120             kick KICKED
121             kick-job KICKED
122             stats-job OK
123             stats-tube OK
124             stats OK
125             list-tubes OK
126             list-tube-used USING
127             list-tubes-watched OK
128             pause-tube PAUSED
129             );
130              
131             sub connect {
132 0     0 1   my $self = shift;
133              
134 0           my $cv;
135 0 0         if (@_) {
136 0           $cv = AE::cv;
137 0           $self->{_condvar}{$cv} = $cv;
138 0           push @{$self->{_connect_queue}}, [@_, $cv];
  0            
139             }
140              
141 0 0         return $cv if $self->{_sock};
142              
143 0   0       my ($host, $port) = parse_hostport($self->server || '127.0.0.1', 11300);
144             $self->{_sock} = tcp_connect $host, $port, sub {
145 0     0     $self->server("$host\:$port");
146 0 0         my $fh = shift
147             or return $self->_error("Can't connect to beanstalk server: $!");
148              
149 0           $self->{__using} = 'default';
150 0           $self->{__watching} = {default => 1};
151 0           my $on_connect = $self->on_connect;
152 0 0         $on_connect->() if $on_connect;
153 0           $self->{_socket} = $fh;
154              
155             my $hd = AnyEvent::Handle->new(
156             fh => $fh,
157 0           on_error => sub { $_[0]->destroy; $self->_error($_[2]) },
  0            
158 0           on_eof => sub { $_[0]->destroy; $self->_error("EOF") },
  0            
159 0           );
160              
161             $self->{_cmd_cb} = sub {
162 0           my $command = lc shift;
163              
164 0           my ($cv, $cb);
165             {
166 5     5   37 no warnings;
  5         9  
  5         27680  
  0            
167 0 0 0       $cv = pop if @_ && blessed($_[-1]) eq 'AnyEvent::CondVar';
168 0 0 0       $cb = pop if @_ && ref $_[-1] eq 'CODE';
169             }
170              
171 0 0         my $value = $command eq 'put' ? pop(@_) . "\015\012" : '';
172 0           my @argv = @_;
173 0           my $send = join(" ", $command, @argv) . "\015\012" . $value;
174              
175 0           warn "Sending [$send]\n" if DEBUG;
176              
177 0   0       $cv ||= AE::cv;
178 0           $self->{_condvar}{$cv} = $cv;
179             $cv->cb(
180             sub {
181 0           my $cv = shift;
182 0           my @res = $cv->recv;
183 0           $cb->(@res);
184             }
185 0 0         ) if $cb;
186              
187 0 0         $self->{_reserve_pending}++ if $command =~ /^reserve/;
188              
189 0           $hd->push_write($send);
190             $hd->push_read(
191             line => sub {
192 0           my ($hd, $result) = @_;
193 0           warn "got line <$result> for command [$send]\n" if DEBUG;
194 0           my @resp = split(/\s+/, $result);
195 0           my $resp = uc shift @resp;
196              
197 0 0         $self->{_reserve_pending}-- if $command =~ /^reserve/;
198              
199 0 0         unless ($resp eq $EXPECT{$command}) {
200 0           delete $self->{_condvar}{$cv};
201 0           $cv->send(undef, $result);
202 0           return;
203             }
204              
205 0 0         if ($resp =~ /^ (?: RESERVED | FOUND ) $/x) {
    0          
    0          
    0          
    0          
206 0           my ($id, $bytes) = @resp;
207             $hd->unshift_read(
208             chunk => $bytes + 2,
209             sub {
210 0           my ($hd, $chunk) = @_;
211 0           my $job = AnyEvent::Beanstalk::Job->new(
212             id => $id,
213             client => $self,
214             data => substr($chunk, 0, -2),
215             );
216 0           delete $self->{_condvar}{$cv};
217 0           $cv->send($job, $result);
218             }
219 0           );
220             }
221             elsif ($resp =~ /^ (?: INSERTED | BURIED ) $/x) {
222 0           my $id = shift @resp;
223 0 0         my $job = AnyEvent::Beanstalk::Job->new(
224             id => $id,
225             client => $self,
226             data => substr($value, 0, -2),
227             buried => $resp eq 'BURIED' ? 1 : 0,
228             );
229 0           delete $self->{_condvar}{$cv};
230 0           $cv->send($job, $result);
231             }
232             elsif ($resp eq 'OK') {
233 0           my $bytes = shift @resp;
234             $hd->unshift_read(
235             chunk => $bytes + 2,
236             sub {
237 0           my ($hd, $chunk) = @_;
238 0           warn "got '$chunk'\n" if DEBUG;
239 0           my $yaml = $YAML_LOAD->($chunk);
240 0           delete $self->{_condvar}{$cv};
241 0 0         $yaml = AnyEvent::Beanstalk::Stats->new($yaml) if $command =~ /^stats/;
242 0           $cv->send($yaml,$result);
243             }
244 0           );
245             }
246             elsif($resp =~ /^ (?: RELEASED | TOUCHED ) $/x) {
247 0           my ($id, $pri, $delay) = @argv;
248 0           delete $self->{_condvar}{$cv};
249 0 0         my $job = AnyEvent::Beanstalk::Job->new(
250             id => $id,
251             client => $self,
252             ( $command eq 'release'
253             ? (
254             priority => $pri,
255             delay => $delay,
256             )
257             : ()
258             )
259             );
260 0           $cv->send($job, $result);
261             }
262             elsif($resp =~ /^ (?: USING | WATCHING ) $/x) {
263 0           delete $self->{_condvar}{$cv};
264 0           my $retval = shift @resp;
265 0           $cv->send($retval, $result);
266             }
267             else {
268 0           delete $self->{_condvar}{$cv};
269 0           $cv->send(1, $result);
270             }
271             }
272 0           );
273              
274 0           return $cv;
275 0           };
276              
277 0 0         for my $queue (@{$self->{_connect_queue} || []}) {
  0            
278 0           $self->{_cmd_cb}->(@$queue);
279             }
280 0           delete $self->{_connect_queue};
281 0           };
282              
283 0           return $cv;
284             }
285              
286              
287             sub watch_only {
288 0     0 1   my $self = shift;
289 0 0 0       my $cb = pop if @_ and ref($_[-1]) eq 'CODE';
290 0           my $cv = AE::cv;
291 0           $self->{_condvar}{$cv} = $cv;
292              
293             $cv->cb(
294             sub {
295 0     0     my $cv = shift;
296 0           my @res = $cv->recv;
297 0           $cb->(@res);
298             }
299 0 0         ) if $cb;
300              
301 0 0         unless (@_) {
302 0           delete $self->{_condvar}{$cv};
303 0           $cv->send('NOT_IGNORED');
304 0           return $cv;
305             }
306              
307 0           my %tubes = map { ($_ => 1) } @_;
  0            
308             my $done = sub {
309 0     0     %tubes = ();
310 0           delete $self->{_condvar}{$cv};
311 0           $cv->send(@_);
312 0           };
313             $self->list_tubes_watched(
314             sub {
315 0     0     my ($tubes,$r) = @_;
316 0 0 0       return $done->(@_) unless $r and $r =~ /^OK\b/;
317 0           foreach my $t (@$tubes) {
318 0 0         $tubes{$t} = 0 unless delete $tubes{$t};
319             }
320 0 0         $done->() if !keys %tubes;
321 0           foreach my $t (sort { $tubes{$b} <=> $tubes{$a} } keys %tubes) {
  0            
322 0 0         my $cmd = $tubes{$t} ? 'watch' : 'ignore';
323             $self->run_cmd(
324             $cmd, $t,
325             sub {
326 0           my ($r) = @_;
327 0 0         return unless keys %tubes;
328 0           delete $tubes{$t};
329 0 0 0       return $done->(@_)
      0        
330             if !@_
331             or $r ne 'WATCHING'
332             or !keys %tubes;
333             }
334 0           );
335             }
336             }
337 0           );
338              
339 0           $cv;
340             }
341              
342              
343             sub put {
344 0     0 1   my $self = shift;
345 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
346 0   0       my $opt = shift || {};
347              
348 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
349 0 0         my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr;
350 0 0         my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
351 0 0         my $data =
    0          
352             exists $opt->{data} ? $opt->{data}
353             : exists $opt->{encode} ? $self->encoder->($opt->{encode})
354             : '';
355              
356 0   0       $pri = int($pri || 0) || 1;
357 0   0       $ttr = int($ttr || 0) || 1;
358 0   0       $delay = int($delay || 0) || 0;
359 0 0         $data = '' unless defined $data;
360              
361 0 0         utf8::encode($data) if utf8::is_utf8($data); # need bytes
362              
363 0           $self->run_cmd('put' => $pri, $delay, $ttr, length($data), $data, @cb);
364             }
365              
366              
367             sub stats {
368 0     0 1   my $self = shift;
369 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
370 0           $self->run_cmd('stats' => @cb);
371             }
372              
373              
374             sub stats_tube {
375 0     0 1   my $self = shift;
376 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
377 0           my $tube = shift;
378 0           $self->run_cmd('stats-tube' => $tube, @cb);
379             }
380              
381              
382             sub stats_job {
383 0     0 1   my $self = shift;
384 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
385 0   0       my $id = shift || 0;
386 0           $self->run_cmd('stats-job' => $id, @cb);
387             }
388              
389              
390             sub kick {
391 0     0 1   my $self = shift;
392 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
393 0   0       my $bound = shift || 1;
394 0           $self->run_cmd('kick' => $bound, @cb);
395             }
396              
397              
398             sub kick_job {
399 0     0 1   my $self = shift;
400 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
401 0   0       my $id = shift || 0;
402 0           $self->run_cmd('kick-job' => $id, @cb);
403             }
404              
405             sub use {
406 0     0 1   my $self = shift;
407 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
408 0           my $tube = shift;
409             $self->run_cmd(
410             'use' => $tube,
411             sub {
412 0 0 0 0     $self->{__using} = $_[0] if @_ and $_[1] =~ /^USING/;
413 0 0         $cb[0]->(@_) if @cb;
414             }
415 0           );
416             }
417              
418              
419             sub reserve {
420 0     0 1   my $self = shift;
421 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
422 0           my $timeout = shift;
423              
424 0 0         my @cmd = defined($timeout) ? ('reserve-with-timeout' => $timeout) : "reserve";
425 0           $self->run_cmd(@cmd, @cb);
426             }
427              
428              
429             sub delete {
430 0     0 1   my $self = shift;
431 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
432 0   0       my $id = shift || 0;
433 0           $self->run_cmd('delete' => $id, @cb);
434             }
435              
436              
437             sub touch {
438 0     0 1   my $self = shift;
439 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
440 0   0       my $id = shift || 0;
441 0           $self->run_cmd('touch' => $id, @cb);
442             }
443              
444              
445             sub release {
446 0     0 1   my $self = shift;
447 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
448 0   0       my $id = shift || 0;
449 0   0       my $opt = shift || {};
450              
451 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
452 0 0         my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
453              
454 0           $self->run_cmd('release' => $id, $pri, $delay, @cb);
455             }
456              
457              
458             sub bury {
459 0     0 1   my $self = shift;
460 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
461 0           my $id = shift;
462 0   0       my $opt = shift || {};
463              
464 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
465              
466 0           $self->run_cmd('bury' => $id, $pri, @cb);
467             }
468              
469              
470             sub watch {
471 0     0 1   my $self = shift;
472 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
473 0           my $tube = shift;
474              
475             $self->run_cmd(
476             'watch' => $tube,
477             sub {
478 0 0 0 0     $self->{__watching}{$tube} = 1 if @_ and $_[1] =~ /^WATCHING/;
479 0 0         $cb[0]->(@_) if @cb;
480             }
481 0           );
482             }
483              
484              
485             sub ignore {
486 0     0 1   my $self = shift;
487 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
488 0           my $tube = shift;
489              
490             $self->run_cmd(
491             'ignore' => $tube,
492             sub {
493 0 0 0 0     delete $self->{__watching}{$tube} if @_ and $_[1] =~ /^WATCHING/;
494 0 0         $cb[0]->(@_) if @cb;
495             }
496 0           );
497             }
498              
499              
500             sub peek {
501 0     0 1   my $self = shift;
502 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
503 0           my $id = shift;
504              
505 0           $self->run_cmd('peek' => $id, @cb);
506             }
507              
508              
509             sub peek_ready {
510 0     0 1   my $self = shift;
511 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
512 0           my $id = shift;
513              
514 0           $self->run_cmd('peek-ready' => @cb);
515             }
516              
517              
518             sub peek_delayed {
519 0     0 1   my $self = shift;
520 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
521 0           my $id = shift;
522              
523 0           $self->run_cmd('peek-delayed' => @cb);
524             }
525              
526              
527             sub peek_buried {
528 0     0 1   my $self = shift;
529 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
530 0           my $id = shift;
531              
532 0           $self->run_cmd('peek-buried' => @cb);
533             }
534              
535              
536             sub list_tubes {
537 0     0 1   my $self = shift;
538 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
539 0           $self->run_cmd('list-tubes' => @cb);
540             }
541              
542              
543             sub list_tube_used {
544 0     0 1   my $self = shift;
545 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
546 0           $self->run_cmd('list-tube-used' => @cb);
547             }
548              
549              
550             sub list_tubes_watched {
551 0     0 1   my $self = shift;
552 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
553 0           $self->run_cmd('list-tubes-watched' => @cb);
554             }
555              
556              
557             sub pause_tube {
558 0     0 1   my $self = shift;
559 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
560 0           my $tube = shift;
561 0   0       my $delay = shift || 0;
562 0           $self->run_cmd('pause-tube' => $tube, $delay, @cb);
563             }
564              
565              
566             sub watching {
567 0     0 0   my $self = shift;
568 0 0         return unless $self->{_sock};
569 0 0         my $watching = $self->{__watching} or return;
570 0           return keys %$watching;
571             }
572              
573              
574             sub using {
575 0     0 0   my $self = shift;
576 0           return $self->{__using};
577             }
578              
579              
580             sub sync {
581 0     0 1   my $self = shift;
582              
583 0   0       while ($self->{_condvar} and my ($cv) = values %{$self->{_condvar}}) {
  0            
584 0           $cv->recv;
585             }
586             }
587              
588             1;
589             __END__