File Coverage

blib/lib/AnyEvent/Beanstalk.pm
Criterion Covered Total %
statement 33 298 11.0
branch 0 156 0.0
condition 0 159 0.0
subroutine 11 55 20.0
pod 30 33 90.9
total 74 701 10.5


line stmt bran cond sub pod time code
1             package AnyEvent::Beanstalk;
2             $AnyEvent::Beanstalk::VERSION = '1.170590';
3 6     6   66431 use strict;
  6         9  
  6         169  
4 6     6   27 use warnings;
  6         7  
  6         236  
5              
6 6     6   25 use constant DEBUG => $ENV{AE_BEANSTALK_DEBUG};
  6         12  
  6         491  
7 6     6   27 use Scalar::Util qw(blessed);
  6         7  
  6         463  
8              
9 6     6   6103 use AnyEvent;
  6         27048  
  6         181  
10 6     6   4244 use AnyEvent::Handle;
  6         97358  
  6         197  
11 6     6   3553 use AnyEvent::Socket;
  6         63611  
  6         663  
12 6     6   2897 use AnyEvent::Beanstalk::Job;
  6         16  
  6         73  
13 6     6   2383 use AnyEvent::Beanstalk::Stats;
  6         11  
  6         174  
14              
15 6     6   30 use base qw(Class::Accessor::Fast);
  6         8  
  6         4524  
16              
17             __PACKAGE__->mk_accessors(
18             qw< decoder delay encoder on_error on_connect priority server socket ttr > ##
19             );
20              
21             my $YAML_CLASS = do {
22             local ($SIG{__DIE__}, $SIG{__WARN__});
23             eval { require YAML::XS } ? 'YAML::XS'
24             : eval { require YAML::Syck } ? 'YAML::Syck'
25             : eval { require YAML } ? 'YAML'
26             : die $@;
27             };
28             my $YAML_LOAD = $YAML_CLASS->can('Load');
29             my $YAML_DUMP = $YAML_CLASS->can('Dump');
30              
31              
32             sub new {
33 0     0 1   my $proto = shift;
34 0           my %arg = @_;
35              
36             bless(
37             { delay => $arg{delay} || 0,
38             ttr => $arg{ttr} || 120,
39             priority => $arg{priority} || 10_000,
40             encoder => $arg{encoder} || $YAML_DUMP,
41             decoder => $arg{decoder} || $YAML_LOAD,
42             server => $arg{server} || undef,
43             debug => $arg{debug} || 0,
44             on_error => $arg{on_error} || undef,
45             on_connect => $arg{on_connect} || undef,
46             },
47 0   0       ref($proto) || $proto
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
48             );
49             }
50              
51              
52             sub run_cmd {
53 0     0 0   my $self = shift;
54              
55 0 0         $self->{_cmd_cb} or return $self->connect(@_);
56 0           $self->{_cmd_cb}->(@_);
57             }
58              
59              
60 0     0 1   sub quit { shift->disconnect }
61 0 0   0 1   sub reserve_pending { shift->{_reserve_pending} || 0 }
62              
63              
64             sub disconnect {
65 0     0 1   my $self = shift;
66 0           my $condvar = delete $self->{_condvar};
67 0           delete @{$self}{grep {/^_[a-z]/} keys %$self};
  0            
  0            
68 0 0         if ($condvar) {
69 0           $_->send for values %$condvar;
70             }
71 0           return;
72             }
73              
74              
75             sub _error {
76 0     0     my $self = shift;
77 0           $self->disconnect;
78 0   0 0     ($self->on_error || sub { die @_ })->(@_);
  0            
79             }
80              
81              
82             sub reconnect {
83 0     0 1   my $self = shift;
84              
85 0   0       my $using = $self->{__using} || 'default';
86             $self->use(
87             $using,
88             sub {
89 0 0 0 0     $self->_error("Can't use '$using'") unless @_ and $_[0] eq 'USING';
90             }
91 0           );
92              
93 0   0       my $watching = $self->{__watching} || {default => 1};
94             $self->watch_only(
95             keys %$watching,
96             sub {
97 0 0 0 0     $self->_error("Error watching tubes") unless @_ and $_[0] eq 'WATCHING';
98             }
99 0           );
100             }
101              
102             my %EXPECT = qw(
103             put INSERTED
104             use USING
105             reserve RESERVED
106             reserve-with-timeout RESERVED
107             delete DELETED
108             release RELEASED
109             bury BURIED
110             touch TOUCHED
111             watch WATCHING
112             ignore WATCHING
113             peek FOUND
114             peek-ready FOUND
115             peek-delayed FOUND
116             peek-buried FOUND
117             kick KICKED
118             kick-job KICKED
119             stats-job OK
120             stats-tube OK
121             stats OK
122             list-tubes OK
123             list-tube-used USING
124             list-tubes-watched OK
125             pause-tube PAUSED
126             );
127              
128             sub connect {
129 0     0 1   my $self = shift;
130              
131 0           my $cv;
132 0 0         if (@_) {
133 0           $cv = AE::cv;
134 0           $self->{_condvar}{$cv} = $cv;
135 0           push @{$self->{_connect_queue}}, [@_, $cv];
  0            
136             }
137              
138 0 0         return $cv if $self->{_sock};
139              
140 0   0       my ($host, $port) = parse_hostport($self->server || '127.0.0.1', 11300);
141             $self->{_sock} = tcp_connect $host, $port, sub {
142 0     0     $self->server("$host\:$port");
143 0 0         my $fh = shift
144             or return $self->_error("Can't connect to beanstalk server: $!");
145              
146 0           $self->{__using} = 'default';
147 0           $self->{__watching} = {default => 1};
148 0           my $on_connect = $self->on_connect;
149 0 0         $on_connect->() if $on_connect;
150 0           $self->{_socket} = $fh;
151              
152             my $hd = AnyEvent::Handle->new(
153             fh => $fh,
154 0           on_error => sub { $_[0]->destroy; $self->_error($_[2]) },
  0            
155 0           on_eof => sub { $_[0]->destroy; $self->_error("EOF") },
  0            
156 0           );
157              
158             $self->{_cmd_cb} = sub {
159 0           my $command = lc shift;
160              
161 0           my ($cv, $cb);
162             {
163 6     6   32 no warnings;
  6         5  
  6         16283  
  0            
164 0 0 0       $cv = pop if @_ && blessed($_[-1]) eq 'AnyEvent::CondVar';
165 0 0 0       $cb = pop if @_ && ref $_[-1] eq 'CODE';
166             }
167              
168 0 0         my $value = $command eq 'put' ? pop(@_) . "\015\012" : '';
169 0           my @argv = @_;
170 0           my $send = join(" ", $command, @argv) . "\015\012" . $value;
171              
172 0           warn "Sending [$send]\n" if DEBUG;
173              
174 0   0       $cv ||= AE::cv;
175 0           $self->{_condvar}{$cv} = $cv;
176             $cv->cb(
177             sub {
178 0           my $cv = shift;
179 0           my @res = $cv->recv;
180 0           $cb->(@res);
181             }
182 0 0         ) if $cb;
183              
184 0 0         $self->{_reserve_pending}++ if $command =~ /^reserve/;
185              
186 0           $hd->push_write($send);
187             $hd->push_read(
188             line => sub {
189 0           my ($hd, $result) = @_;
190 0           warn "got line <$result> for command [$send]\n" if DEBUG;
191 0           my @resp = split(/\s+/, $result);
192 0           my $resp = uc shift @resp;
193              
194 0 0         $self->{_reserve_pending}-- if $command =~ /^reserve/;
195              
196 0 0         unless ($resp eq $EXPECT{$command}) {
197 0           delete $self->{_condvar}{$cv};
198 0           $cv->send(undef, $result);
199 0           return;
200             }
201              
202 0 0         if ($resp =~ /^ (?: RESERVED | FOUND ) $/x) {
    0          
    0          
    0          
    0          
203 0           my ($id, $bytes) = @resp;
204             $hd->unshift_read(
205             chunk => $bytes + 2,
206             sub {
207 0           my ($hd, $chunk) = @_;
208 0           my $job = AnyEvent::Beanstalk::Job->new(
209             id => $id,
210             client => $self,
211             data => substr($chunk, 0, -2),
212             );
213 0           delete $self->{_condvar}{$cv};
214 0           $cv->send($job, $result);
215             }
216 0           );
217             }
218             elsif ($resp =~ /^ (?: INSERTED | BURIED ) $/x) {
219 0           my $id = shift @resp;
220 0 0         my $job = AnyEvent::Beanstalk::Job->new(
221             id => $id,
222             client => $self,
223             data => substr($value, 0, -2),
224             buried => $resp eq 'BURIED' ? 1 : 0,
225             );
226 0           delete $self->{_condvar}{$cv};
227 0           $cv->send($job, $result);
228             }
229             elsif ($resp eq 'OK') {
230 0           my $bytes = shift @resp;
231             $hd->unshift_read(
232             chunk => $bytes + 2,
233             sub {
234 0           my ($hd, $chunk) = @_;
235 0           warn "got '$chunk'\n" if DEBUG;
236 0           my $yaml = $YAML_LOAD->($chunk);
237 0           delete $self->{_condvar}{$cv};
238 0 0         $yaml = AnyEvent::Beanstalk::Stats->new($yaml) if $command =~ /^stats/;
239 0           $cv->send($yaml,$result);
240             }
241 0           );
242             }
243             elsif($resp =~ /^ (?: RELEASED | TOUCHED ) $/x) {
244 0           my ($id, $pri, $delay) = @argv;
245 0           delete $self->{_condvar}{$cv};
246 0 0         my $job = AnyEvent::Beanstalk::Job->new(
247             id => $id,
248             client => $self,
249             ( $command eq 'release'
250             ? (
251             priority => $pri,
252             delay => $delay,
253             )
254             : ()
255             )
256             );
257 0           $cv->send($job, $result);
258             }
259             elsif($resp =~ /^ (?: USING | WATCHING ) $/x) {
260 0           delete $self->{_condvar}{$cv};
261 0           my $retval = shift @resp;
262 0           $cv->send($retval, $result);
263             }
264             else {
265 0           delete $self->{_condvar}{$cv};
266 0           $cv->send(1, $result);
267             }
268             }
269 0           );
270              
271 0           return $cv;
272 0           };
273              
274 0 0         for my $queue (@{$self->{_connect_queue} || []}) {
  0            
275 0           $self->{_cmd_cb}->(@$queue);
276             }
277 0           delete $self->{_connect_queue};
278 0           };
279              
280 0           return $cv;
281             }
282              
283              
284             sub watch_only {
285 0     0 1   my $self = shift;
286 0 0 0       my $cb = pop if @_ and ref($_[-1]) eq 'CODE';
287 0           my $cv = AE::cv;
288 0           $self->{_condvar}{$cv} = $cv;
289              
290             $cv->cb(
291             sub {
292 0     0     my $cv = shift;
293 0           my @res = $cv->recv;
294 0           $cb->(@res);
295             }
296 0 0         ) if $cb;
297              
298 0 0         unless (@_) {
299 0           delete $self->{_condvar}{$cv};
300 0           $cv->send(undef, 'NOT_IGNORED');
301 0           return $cv;
302             }
303              
304 0           my %tubes = map { ($_ => 1) } @_;
  0            
305             my $done = sub {
306 0     0     delete $self->{_condvar}{$cv};
307 0           $cv->send(@_);
308 0           };
309             $self->list_tubes_watched(
310             sub {
311 0     0     my ($tubes,$r) = @_;
312 0 0 0       return $done->(@_) unless $r and $r =~ /^OK\b/;
313 0           my $w = $self->{__watching} = {};
314 0           foreach my $t (@$tubes) {
315 0 0         $tubes{$t} = 0 unless delete $tubes{$t};
316 0           $w->{$t}++;
317             }
318 0 0         unless (keys %tubes) { # nothing to do
319 0           my $ts = scalar @$tubes;
320 0           $done->($ts, "WATCHING $ts");
321             }
322 0           my @err; # first error
323 0           foreach my $t (sort { $tubes{$b} <=> $tubes{$a} } keys %tubes) {
  0            
324 0 0         my $cmd = $tubes{$t} ? 'watch' : 'ignore';
325             $self->run_cmd(
326             $cmd, $t,
327             sub {
328 0 0 0       if ($_[1] and $_[1] =~ /^WATCHING\b/) {
329 0 0         $tubes{$t} ? $w->{$t}++ : delete $w->{$t};
330             } else {
331 0 0         @err = @_ unless @err;
332             }
333 0           delete $tubes{$t};
334 0 0         return $done->(@err ? @err : @_)
    0          
335             unless keys %tubes;
336             }
337 0           );
338             }
339             }
340 0           );
341              
342 0           $cv;
343             }
344              
345              
346             sub put {
347 0     0 1   my $self = shift;
348 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
349 0   0       my $opt = shift || {};
350              
351 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
352 0 0         my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr;
353 0 0         my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
354             my $data =
355             exists $opt->{data} ? $opt->{data}
356             : exists $opt->{encode} ? $self->encoder->($opt->{encode})
357 0 0         : '';
    0          
358              
359 0   0       $pri = int($pri || 0) || 1;
360 0   0       $ttr = int($ttr || 0) || 1;
361 0   0       $delay = int($delay || 0) || 0;
362 0 0         $data = '' unless defined $data;
363              
364 0 0         utf8::encode($data) if utf8::is_utf8($data); # need bytes
365              
366 0           $self->run_cmd('put' => $pri, $delay, $ttr, length($data), $data, @cb);
367             }
368              
369              
370             sub stats {
371 0     0 1   my $self = shift;
372 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
373 0           $self->run_cmd('stats' => @cb);
374             }
375              
376              
377             sub stats_tube {
378 0     0 1   my $self = shift;
379 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
380 0           my $tube = shift;
381 0           $self->run_cmd('stats-tube' => $tube, @cb);
382             }
383              
384              
385             sub stats_job {
386 0     0 1   my $self = shift;
387 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
388 0   0       my $id = shift || 0;
389 0           $self->run_cmd('stats-job' => $id, @cb);
390             }
391              
392              
393             sub kick {
394 0     0 1   my $self = shift;
395 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
396 0   0       my $bound = shift || 1;
397 0           $self->run_cmd('kick' => $bound, @cb);
398             }
399              
400              
401             sub kick_job {
402 0     0 1   my $self = shift;
403 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
404 0   0       my $id = shift || 0;
405 0           $self->run_cmd('kick-job' => $id, @cb);
406             }
407              
408             sub use {
409 0     0 1   my $self = shift;
410 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
411 0           my $tube = shift;
412             $self->run_cmd(
413             'use' => $tube,
414             sub {
415 0 0 0 0     $self->{__using} = $_[0] if @_ and $_[1] =~ /^USING\b/;
416 0 0         $cb[0]->(@_) if @cb;
417             }
418 0           );
419             }
420              
421              
422             sub reserve {
423 0     0 1   my $self = shift;
424 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
425 0           my $timeout = shift;
426              
427 0 0         my @cmd = defined($timeout) ? ('reserve-with-timeout' => $timeout) : "reserve";
428 0           $self->run_cmd(@cmd, @cb);
429             }
430              
431              
432             sub delete {
433 0     0 1   my $self = shift;
434 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
435 0   0       my $id = shift || 0;
436 0           $self->run_cmd('delete' => $id, @cb);
437             }
438              
439              
440             sub touch {
441 0     0 1   my $self = shift;
442 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
443 0   0       my $id = shift || 0;
444 0           $self->run_cmd('touch' => $id, @cb);
445             }
446              
447              
448             sub release {
449 0     0 1   my $self = shift;
450 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
451 0   0       my $id = shift || 0;
452 0   0       my $opt = shift || {};
453              
454 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
455 0 0         my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
456              
457 0           $self->run_cmd('release' => $id, $pri, $delay, @cb);
458             }
459              
460              
461             sub bury {
462 0     0 1   my $self = shift;
463 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
464 0           my $id = shift;
465 0   0       my $opt = shift || {};
466              
467 0 0         my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
468              
469 0           $self->run_cmd('bury' => $id, $pri, @cb);
470             }
471              
472              
473             sub watch {
474 0     0 1   my $self = shift;
475 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
476 0           my $tube = shift;
477              
478             $self->run_cmd(
479             'watch' => $tube,
480             sub {
481 0 0 0 0     $self->{__watching}{$tube} = 1 if @_ and $_[1] =~ /^WATCHING\b/;
482 0 0         $cb[0]->(@_) if @cb;
483             }
484 0           );
485             }
486              
487              
488             sub ignore {
489 0     0 1   my $self = shift;
490 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
491 0           my $tube = shift;
492              
493             $self->run_cmd(
494             'ignore' => $tube,
495             sub {
496 0 0 0 0     delete $self->{__watching}{$tube} if @_ and $_[1] =~ /^WATCHING\b/;
497 0 0         $cb[0]->(@_) if @cb;
498             }
499 0           );
500             }
501              
502              
503             sub peek {
504 0     0 1   my $self = shift;
505 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
506 0           my $id = shift;
507              
508 0           $self->run_cmd('peek' => $id, @cb);
509             }
510              
511              
512             sub peek_ready {
513 0     0 1   my $self = shift;
514 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
515 0           my $id = shift;
516              
517 0           $self->run_cmd('peek-ready' => @cb);
518             }
519              
520              
521             sub peek_delayed {
522 0     0 1   my $self = shift;
523 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
524 0           my $id = shift;
525              
526 0           $self->run_cmd('peek-delayed' => @cb);
527             }
528              
529              
530             sub peek_buried {
531 0     0 1   my $self = shift;
532 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
533 0           my $id = shift;
534              
535 0           $self->run_cmd('peek-buried' => @cb);
536             }
537              
538              
539             sub list_tubes {
540 0     0 1   my $self = shift;
541 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
542 0           $self->run_cmd('list-tubes' => @cb);
543             }
544              
545              
546             sub list_tube_used {
547 0     0 1   my $self = shift;
548 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
549 0           $self->run_cmd('list-tube-used' => @cb);
550             }
551              
552              
553             sub list_tubes_watched {
554 0     0 1   my $self = shift;
555 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
556 0           $self->run_cmd('list-tubes-watched' => @cb);
557             }
558              
559              
560             sub pause_tube {
561 0     0 1   my $self = shift;
562 0 0 0       my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
563 0           my $tube = shift;
564 0   0       my $delay = shift || 0;
565 0           $self->run_cmd('pause-tube' => $tube, $delay, @cb);
566             }
567              
568              
569             sub watching {
570 0     0 0   my $self = shift;
571 0 0         return unless $self->{_sock};
572 0 0         my $watching = $self->{__watching} or return;
573 0           return keys %$watching;
574             }
575              
576              
577             sub using {
578 0     0 0   my $self = shift;
579 0           return $self->{__using};
580             }
581              
582              
583             sub sync {
584 0     0 1   my $self = shift;
585              
586 0   0       while ($self->{_condvar} and my ($cv) = values %{$self->{_condvar}}) {
  0            
587 0           $cv->recv;
588             }
589             }
590              
591             1;
592             __END__