File Coverage

blib/lib/Deeme/Worker.pm
Criterion Covered Total %
statement 39 39 100.0
branch 5 8 62.5
condition 2 4 50.0
subroutine 7 7 100.0
pod 5 5 100.0
total 58 63 92.0


line stmt bran cond sub pod time code
1             package Deeme::Worker;
2 1     1   421 use Deeme::Obj "Deeme";
  1         2  
  1         5  
3 1   50 1   6 use constant DEBUG => $ENV{DEEME_DEBUG} || 0;
  1         1  
  1         556  
4              
5             sub dequeue_event {
6 1     1 1 3 my ( $self, $name ) = ( shift, shift );
7 1 50       26 if ( my $s = $self->backend->events_get($name) ) {
8 1         1 warn
9             "Worker -- dequeue $name in @{[blessed $self]} (@{[scalar @$s]})\n"
10             if DEBUG;
11 1         24 my @onces = $self->backend->events_onces($name);
12 1         3 my $i = 0;
13 1         2 for my $cb (@$s) {
14 2 50 50     13 ( $onces[$i] == 1 )
15             ? ( splice( @onces, $i, 1 )
16             and $self->_unsubscribe_index( $name => $i ) )
17             : $i++;
18 2         8 push(
19 2         5 @{ $self->{'queue'} },
20             Deeme::Job->new(
21             deeme => $self,
22             cb => $cb
23             )
24             );
25             }
26             }
27 1         3 return @{ $self->{'queue'} };
  1         5  
28             }
29              
30             sub dequeue {
31 10     10 1 9927 my $self = shift;
32 10         10 my $name = shift;
33 10 100       210 if ( my $s = $self->backend->events_get($name) ) {
34 5         6 warn
35             "Worker -- dequeue $name in @{[blessed $self]} safely (@{[scalar @$s]})\n"
36             if DEBUG;
37 5         21 my $cb = Deeme::Job->new(
38             deeme => $self,
39             cb => @$s[0]
40             );
41 5         19 $self->_unsubscribe_index( $name, 0 );
42 5         8 push( @{ $self->{'queue'} }, $cb );
  5         12  
43             }
44 10         13 return @{ $self->{'queue'} }[0];
  10         53  
45             }
46              
47             sub process {
48 3     3 1 14 my $self = shift;
49 3         9 return @{ $self->{'queue'} } > 0
  3         11  
50 3 50       3 ? @{ $self->{'queue'} }[0]->process(@_)
51             : undef;
52             }
53              
54             sub process_all {
55 1     1 1 3 my $self = shift;
56 1         2 my @args = @_;
57 1         2 my @returns;
58 1         3 while ( my $job = shift @{ $self->{'queue'} } ) {
  3         11  
59 2         7 push( @returns, $job->process(@args) );
60             }
61 1         3 return @returns;
62             }
63              
64 7     7 1 440 sub add { return shift->once(@_) }
65             1;
66             __END__