File Coverage

blib/lib/Net/Async/Beanstalk/Receive.pm
Criterion Covered Total %
statement 26 59 44.0
branch 0 12 0.0
condition 0 4 0.0
subroutine 9 21 42.8
pod n/a
total 35 96 36.4


line stmt bran cond sub pod time code
1             package Net::Async::Beanstalk::Receive;
2              
3             our $VERSION = '0.001';
4             $VERSION = eval $VERSION;
5              
6             =head1 NAME
7              
8             Net::Async::Beanstalk::Receive - Methods to handle responses from beanstalk
9              
10             =head1 DOCUMENTED ELSEWHERE
11              
12             This module's external API is documented in L
13              
14             =cut
15              
16 3     3   1777 use v5.10;
  3         10  
17 3     3   12 use Moo::Role;
  3         6  
  3         21  
18 3     3   914 use strictures 2;
  3         24  
  3         126  
19              
20 3     3   678 use Carp;
  3         7  
  3         235  
21 3     3   20 use List::Util qw(any);
  3         6  
  3         182  
22 3     3   16 use MooX::EventHandler;
  3         5  
  3         25  
23 3     3   394 use Net::Async::Beanstalk::Constant qw(:receive :state);
  3         7  
  3         584  
24 3     3   32 use YAML::Any qw(Load);
  3         5  
  3         26  
25 3     3   2870 use namespace::clean;
  3         8  
  3         26  
26              
27             # TODO: Document internal API
28              
29             has_event [
30             'on_read',
31             'on_invalid_response',
32             (map { lc "on_$_" } keys %RESPONSE),
33             (map { lc "on_${_}_final" } @WITHDATA),
34             ];
35              
36             sub _build_on_read {
37             sub {
38             my $self = shift;
39             my ($buf, $eof) = @_;
40             my $data;
41              
42             my $state = $self->current_command;
43             if (ref $state and $state->[STATE_MOAR]) {
44             my ($had, $size) = @{ $state->[STATE_MOAR] };
45             return 0 unless $$buf =~ s/^(.{$size})$NL//s;
46             push @{ $state->[STATE_RECEIVE] }, $data = $1;
47             $buf = $had;
48              
49             } else {
50             # prot.c defines NAME_CHARS as alphanum & -+/;.$_()
51             return 0 unless $$buf =~ s{^([A-Za-z0-9+/;.\$_() -]+)$NL}{};
52             push @{ $state->[STATE_RECEIVE] }, $buf = $1;
53             }
54              
55             my ($got, @args) = split / /, $buf;
56              
57             if (not $RESPONSE{$got}) {
58             $self->maybe_invoke_event(on_invalid_response => $got, $buf);
59             # Can't continue; state's all screwey
60             die "Protocol error";
61             } else {
62             $self->_assert_state($got);
63             if ($data) {
64             $self->maybe_invoke_event(lc "on_${got}_final", @args, $data);
65             } else {
66             $self->maybe_invoke_event(lc "on_$got", @args);
67             }
68             }
69              
70             return 1;
71             }
72             }
73              
74             # Events which are handled in unusual ways
75              
76             # OK returns some YAML data
77             sub _build_on_ok_final {
78             sub {
79             # TODO: Alert when the result of list-tubes-watching or
80             # whatever-finds-out-used-tube are called and differ from values
81             # cached in using/watching
82             my $self = shift;
83             my ($data) = @_;
84             my $decoded = Load($data);
85             my @decoded = ref $decoded eq 'ARRAY' ? @$decoded : %$decoded;
86             # Assert \@decoded {=} $self->watching if $source eq 'list-tubes-watched' ?
87             $self->finish_command(@decoded);
88             }
89             }
90              
91             # using and watching events change or assert which tubes are active
92             sub _build_on_using {
93             sub {
94             my $self = shift;
95             my $state = $self->current_command;
96             my ($tube) = @_;
97             if ($state->[STATE_COMMAND] eq 'list-tube-used') {
98             # Assert $tube eq $self->using ?
99             $self->finish_command($tube);
100             } elsif ($state->[STATE_COMMAND] eq 'use') {
101             $self->_set_using($tube);
102             $self->finish_command($tube);
103             }
104             }
105             }
106              
107             sub _build_on_watching {
108             sub {
109             my $self = shift;
110             my $state = $self->current_command;
111             my $tube = $state->[STATE_DATUM];
112             my ($count) = @_;
113             if ($state->[STATE_COMMAND] eq 'ignore') {
114             delete $self->_watching->{$tube};
115             $self->finish_command($tube, $count);
116             } elsif ($state->[STATE_COMMAND] eq 'watch') {
117             $self->_watching->{$tube} = 1;
118             $self->finish_command($tube, $count);
119             }
120             }
121             }
122              
123             # Repetetive responses; First define some magic ...
124              
125             # ... with 1 way to wait,
126             sub _makesub_wantmoar {
127 0     0     my ($command) = @_;
128             sub {
129 0     0     my $self = shift;
130 0           my $size = pop;
131 0           $self->current_command->[STATE_MOAR] = [join(' ', $command, @_), $size];
132 0           };
133             }
134              
135             # ... 3 ways to fail,
136              
137             sub _makesub_fail {
138 0 0   0     my $how = @_ >= 4 ? shift : ERROR_FAIL;
139 0           my ($category, $message, $start) = @_; # backwards
140             sub {
141 0     0     my $state = $_[0]->current_command;
142 0 0         $state = $_[0]->_command_stack if $start == 0;
143 0   0       my @nothing = (shift @$state) x ($start // STATE_DATUM);
144 0   0       my $datum = $_[1] || $state->[STATE_DATUM] || '';
145 0 0         $how & ERROR_FAIL && $_[0]->fail_command(eval $message, $category, @$state);
146 0 0         $how & ERROR_EVENT && $_[0]->invoke_error( $message, $category, @$state);
147             }
148 0           }
149 0     0     sub _makesub_error { _makesub(ERROR_EVENT, @_) }
150 0     0     sub _makesub_hard { _makesub(ERROR_EVENT | ERROR_FAIL, @_) }
151              
152             # ... 2 ways to be done with some data,
153 0     0     sub _makesub_done { sub { $_[0]->finish_command(@_[1..$#_]) } }
  0     0      
154 0     0     sub _makesub_done_datum { sub { $_[0]->finish_command($_[0]->current_command->[STATE_DATUM]) } }
  0     0      
155              
156             # ... and 1 way to do some combination,
157             sub _makesub_multi {
158 0     0     my %respond = @_;
159             sub {
160 0     0     my $self = shift;
161 0           my $state = $self->current_command;
162 0           my $datum = $_[0];
163 0           for ($state->[STATE_COMMAND]) {
164             my ($sub, @args) = $respond{$_}[0] eq 'finish_with'
165             # There's only actually two of these so a bit of hard-coding is fine
166 0           ? (finish => [ $state->[STATE_DATUM], map {$_[$_]} @{ $respond{$_}[1] } ])
  0            
167 0 0         : ("$respond{$_}[0]_command" => @{$respond{$_}}[1..$#{$respond{$_}}]);
  0            
  0            
168 0 0         $args[0] = eval $args[0] if $_ eq 'fail';
169 0           $self->$sub(@args);
170             }
171 0           };
172             }
173              
174             # ... then sprinkle it on the protocol.
175              
176             # Lots of ways to not be found
177             sub _build_on_not_found {
178             _makesub_multi(
179             ( map { my $verb = $_ =~ s/-job//r;
180             my $message; # Isn't English fun?
181             if ($_ eq 'bury') { $verb = 'buried';
182             } elsif ($_ =~ /e$/) { $verb = $_ . 'd';
183             } elsif ($_ =~ /[kh]$/) { $verb = $_ .'ed';
184             } elsif ($_ eq 'stats') { $message = "Statistics were not found for the job: \$datum not found";
185             } $_ => [ fail => 'beanstalk-job' => $message // "The job could not be $verb: \$datum not found" ];
186             } qw(bury delete touch kick-job release stats-job) ),
187             peek => [ fail => 'beanstalk-peek' => "The job could not be peeked at: \$datum not found" ],
188             ( map { my $adj = $_ =~ s/peek-//r; $_=> [ 'beanstalk-peek' => "The next $adj job could not be peeked at: None found" ],
189             } qw(peek-buried peek-delayed peek-ready) ),
190             'pause-tube' => [ fail => 'beanstalk-tube' => "The tube could not be paused: \$datum not found" ],
191             'stats-tube' => [ fail => 'beanstalk-tube' => "Statistics were not found for the tube: \$datum not found" ],
192             ) }
193              
194             # Two other responses from multiple commands
195             sub _build_on_buried { _makesub_multi(
196             bury => ['finish' ],
197             put => [ fail => 'beanstalk-put' => "Job was inserted but buried (out of memory): ID \$datum" ],
198             release => [ fail => 'beanstalk-job' => "The job could not be released (out of memory): ID \$datum" ],
199             ) }
200             sub _build_on_kicked { _makesub_multi(
201             kick => [ finish_with => [0] ],
202             'kick-job' => ['finish_with' ],
203             ) }
204              
205             # 4 hard errors
206             # This module sent something badly-formed
207             sub _build_on_bad_format { _makesub_hard ('beanstalk-internal' => "Protocol error: Bad format", STATE_SEND) }
208             # The server sent something unexpected.
209             sub _build_on_invalid_response{ _makesub_error('beanstalk-server' => "Protocol error: Unknown response", STATE_RECEIVE) }
210             # This should never happen; perhaps a version mismatch?
211             sub _build_on_unknown_command { _makesub_hard ('beanstalk-internal' => "Protocol error: Unknown command", STATE_COMMAND) }
212             # Something broke
213             sub _build_on_internal_error { _makesub_error('beanstalk-server' => "Protocol error: Internal error", 0) }
214              
215             # Everything else is boring
216              
217             sub _build_on_deadline_soon { _makesub_fail ('beanstalk-reserve' => "No job was reserved: Deadline soon") }
218             sub _build_on_deleted { _makesub_done_datum() }
219             sub _build_on_draining { _makesub_fail ('beanstalk-put' => "Job was not inserted: Server is draining") }
220             sub _build_on_expected_crlf { _makesub_fail ('beanstalk-put' => "Protocol error: Expected cr+lf") }
221             sub _build_on_found { _makesub_wantmoar('FOUND') }
222             sub _build_on_found_final { _makesub_done_datum() }
223             sub _build_on_inserted { _makesub_done() }
224             sub _build_on_job_too_big { _makesub_fail ('beanstalk-put' => "Invalid job: too big") }
225             sub _build_on_not_ignored { _makesub_fail ('beanstalk-tube' => "The last tube cannot be ignored: \$datum") }
226             sub _build_on_ok { _makesub_wantmoar('OK') }
227             sub _build_on_out_of_memory { _makesub_fail ('beanstalk-server' => "Protocol error: Out of memory") }
228             sub _build_on_paused { _makesub_done_datum() }
229             sub _build_on_released { _makesub_done_datum() }
230             sub _build_on_reserved { _makesub_wantmoar('RESERVED') }
231             sub _build_on_reserved_final { _makesub_done() }
232             sub _build_on_timed_out { _makesub_fail ('beanstalk-reserve' => "No job was reserved: Timed out") }
233             sub _build_on_touched { _makesub_done_datum() }
234              
235             1;