File Coverage

blib/lib/Net/Async/Beanstalk/Send.pm
Criterion Covered Total %
statement 18 53 33.9
branch 0 20 0.0
condition 0 18 0.0
subroutine 6 14 42.8
pod 0 3 0.0
total 24 108 22.2


line stmt bran cond sub pod time code
1             package Net::Async::Beanstalk::Send;
2              
3             our $VERSION = '0.001';
4             $VERSION = eval $VERSION;
5              
6             =head1 NAME
7              
8             Net::Async::Beanstalk::Send - Methods to send commands to beanstalk
9              
10             =head1 DOCUMENTED ELSEWHERE
11              
12             This module's external API is documented in L
13              
14             =cut
15              
16 3     3   1863 use Moo::Role;
  3         6  
  3         21  
17 3     3   949 use strictures 2;
  3         31  
  3         118  
18              
19 3     3   588 use Carp;
  3         5  
  3         214  
20 3     3   18 use Net::Async::Beanstalk::Constant qw(:send :state);
  3         6  
  3         526  
21 3     3   23 use namespace::clean;
  3         5  
  3         29  
22              
23             # TODO: Document internal API
24              
25             sub _next {
26 0     0     my $self = shift;
27 0 0         return unless $self->count_commands >= 1;
28 0           my $current = $self->current_command;
29 0           $current->[STATE_SEND] = join(' ', @$current[STATE_COMMAND..$#$current]) . $NL;
30 0           $self->adopt_future($self->write($current->[STATE_SEND]));
31             }
32              
33             # TODO: Warn if reserves are pending
34             # TODO: Don't send if not connected
35             sub _send {
36 0     0     my $self = shift;
37 0           my ($cmd) = @_;
38 0 0         croak "Invalid command" unless exists $COMMAND{$cmd};
39 0           my $future = $self->loop->new_future;
40 0     0     $future->on_ready(sub { $self->_next });
  0            
41 0           $self->_push_command([ $future, (undef) x (STATE_COMMAND - 1), @_ ]);
42 0 0         $self->_next if $self->count_commands == 1;
43 0 0         defined wantarray ? $future : $future->get;
44             }
45              
46             around decoder => sub {
47             my $orig = shift;
48             my $self = shift;
49             my ($no_decode) = @_;
50             my $decoder = $self->$orig();
51             sub {
52             my $utf8 = ''.$_[0];
53             utf8::decode($utf8);
54             $no_decode ? $utf8 : $decoder->($utf8);
55             };
56             };
57              
58             around encoder => sub {
59             my $orig = shift;
60             my $self = shift;
61             my $encoder = $self->$orig();
62             sub {
63             my $result = '' . ((not ref $_[0] or overload::Method($_[0], '""'))
64             ? $_[0]
65             : $encoder->(@_));
66             utf8::encode($result) if utf8::is_utf8($result);
67             $result;
68             };
69             };
70              
71             sub put {
72 0     0 0   my $self = shift;
73 0 0         my $job; $job = $self->encoder->(shift) if @_ % 2;
  0            
74 0           my %opt = @_;
75              
76             my @args = (
77             delete $opt{priority} || $self->default_priority,
78             delete $opt{delay} || $self->default_delay,
79 0   0       delete $opt{ttr} || $self->default_ttr,
      0        
      0        
80             );
81 0   0       $job //= delete $opt{raw_data} // $self->encoder->(delete $opt{data});
      0        
82 0 0 0       croak 'Too much job data' if exists $opt{raw_data} or exists $opt{data};
83 0 0         croak 'Unknown options to "put": ' . join ', ', sort keys %opt if scalar keys %opt;
84 0           $self->_send(put => (map int, @args), length($job) . $NL . $job);
85             }
86              
87             # TODO: raw/decode on other job-fetching commans
88             sub reserve {
89 0     0 0   my $self = shift;
90 0           my %opt = @_;
91 0           my $no_decode = delete $opt{asis};
92 0           my $as_raw = delete $opt{raw};
93             # TODO: Warn about illogical combinations?
94 0           my $timeout = delete $opt{timeout};
95 0 0         croak 'Unknown options to "reserve": ' . join ', ', sort keys %opt if scalar keys %opt;
96              
97 0 0         my @command = defined $timeout ? ('reserve-with-timeout', $timeout) : ('reserve');
98 0 0         return $self->_send(@command) if $as_raw;
99             $self->_send(@command)->then(sub {
100 0     0     Future->done($_[0], $self->decoder($no_decode)->($_[1]), @_[2..$#_]);
101 0           });
102             }
103              
104 0     0 0   sub reserve_with_timeout { $_[0]->reserve(timeout => $_[1], @_[2..$#_]) }
105              
106             # TODO: Make void context useful
107             for my $command (keys %COMMAND) {
108 3     3   3426 no strict 'refs';
  3         6  
  3         361  
109             my $sub = $command =~ s/-/_/gr;
110             next if __PACKAGE__->can($sub);
111 0     0     *$sub = sub { $_[0]->_send($command => @_[1..$#_]) };
112             }
113              
114             1;