| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package Deeme::Worker; | 
| 2 | 1 |  |  | 1 |  | 421 | use Deeme::Obj "Deeme"; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 5 |  | 
| 3 | 1 |  | 50 | 1 |  | 6 | use constant DEBUG => $ENV{DEEME_DEBUG} || 0; | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 556 |  | 
| 4 |  |  |  |  |  |  |  | 
| 5 |  |  |  |  |  |  | sub dequeue_event { | 
| 6 | 1 |  |  | 1 | 1 | 3 | my ( $self, $name ) = ( shift, shift ); | 
| 7 | 1 | 50 |  |  |  | 26 | if ( my $s = $self->backend->events_get($name) ) { | 
| 8 | 1 |  |  |  |  | 1 | warn | 
| 9 |  |  |  |  |  |  | "Worker -- dequeue $name in @{[blessed $self]} (@{[scalar @$s]})\n" | 
| 10 |  |  |  |  |  |  | if DEBUG; | 
| 11 | 1 |  |  |  |  | 24 | my @onces = $self->backend->events_onces($name); | 
| 12 | 1 |  |  |  |  | 3 | my $i     = 0; | 
| 13 | 1 |  |  |  |  | 2 | for my $cb (@$s) { | 
| 14 | 2 | 50 | 50 |  |  | 13 | ( $onces[$i] == 1 ) | 
| 15 |  |  |  |  |  |  | ? ( splice( @onces, $i, 1 ) | 
| 16 |  |  |  |  |  |  | and $self->_unsubscribe_index( $name => $i ) ) | 
| 17 |  |  |  |  |  |  | : $i++; | 
| 18 | 2 |  |  |  |  | 8 | push( | 
| 19 | 2 |  |  |  |  | 5 | @{ $self->{'queue'} }, | 
| 20 |  |  |  |  |  |  | Deeme::Job->new( | 
| 21 |  |  |  |  |  |  | deeme => $self, | 
| 22 |  |  |  |  |  |  | cb    => $cb | 
| 23 |  |  |  |  |  |  | ) | 
| 24 |  |  |  |  |  |  | ); | 
| 25 |  |  |  |  |  |  | } | 
| 26 |  |  |  |  |  |  | } | 
| 27 | 1 |  |  |  |  | 3 | return @{ $self->{'queue'} }; | 
|  | 1 |  |  |  |  | 5 |  | 
| 28 |  |  |  |  |  |  | } | 
| 29 |  |  |  |  |  |  |  | 
| 30 |  |  |  |  |  |  | sub dequeue { | 
| 31 | 10 |  |  | 10 | 1 | 9927 | my $self = shift; | 
| 32 | 10 |  |  |  |  | 10 | my $name = shift; | 
| 33 | 10 | 100 |  |  |  | 210 | if ( my $s = $self->backend->events_get($name) ) { | 
| 34 | 5 |  |  |  |  | 6 | warn | 
| 35 |  |  |  |  |  |  | "Worker -- dequeue $name in @{[blessed $self]} safely (@{[scalar @$s]})\n" | 
| 36 |  |  |  |  |  |  | if DEBUG; | 
| 37 | 5 |  |  |  |  | 21 | my $cb = Deeme::Job->new( | 
| 38 |  |  |  |  |  |  | deeme => $self, | 
| 39 |  |  |  |  |  |  | cb    => @$s[0] | 
| 40 |  |  |  |  |  |  | ); | 
| 41 | 5 |  |  |  |  | 19 | $self->_unsubscribe_index( $name, 0 ); | 
| 42 | 5 |  |  |  |  | 8 | push( @{ $self->{'queue'} }, $cb ); | 
|  | 5 |  |  |  |  | 12 |  | 
| 43 |  |  |  |  |  |  | } | 
| 44 | 10 |  |  |  |  | 13 | return @{ $self->{'queue'} }[0]; | 
|  | 10 |  |  |  |  | 53 |  | 
| 45 |  |  |  |  |  |  | } | 
| 46 |  |  |  |  |  |  |  | 
| 47 |  |  |  |  |  |  | sub process { | 
| 48 | 3 |  |  | 3 | 1 | 14 | my $self = shift; | 
| 49 | 3 |  |  |  |  | 9 | return @{ $self->{'queue'} } > 0 | 
|  | 3 |  |  |  |  | 11 |  | 
| 50 | 3 | 50 |  |  |  | 3 | ? @{ $self->{'queue'} }[0]->process(@_) | 
| 51 |  |  |  |  |  |  | : undef; | 
| 52 |  |  |  |  |  |  | } | 
| 53 |  |  |  |  |  |  |  | 
| 54 |  |  |  |  |  |  | sub process_all { | 
| 55 | 1 |  |  | 1 | 1 | 3 | my $self = shift; | 
| 56 | 1 |  |  |  |  | 2 | my @args = @_; | 
| 57 | 1 |  |  |  |  | 2 | my @returns; | 
| 58 | 1 |  |  |  |  | 3 | while ( my $job = shift @{ $self->{'queue'} } ) { | 
|  | 3 |  |  |  |  | 11 |  | 
| 59 | 2 |  |  |  |  | 7 | push( @returns, $job->process(@args) ); | 
| 60 |  |  |  |  |  |  | } | 
| 61 | 1 |  |  |  |  | 3 | return @returns; | 
| 62 |  |  |  |  |  |  | } | 
| 63 |  |  |  |  |  |  |  | 
| 64 | 7 |  |  | 7 | 1 | 440 | sub add { return shift->once(@_) } | 
| 65 |  |  |  |  |  |  | 1; | 
| 66 |  |  |  |  |  |  | __END__ |