File Coverage

blib/lib/Job/Async/Worker/Redis.pm
Criterion Covered Total %
statement 32 136 23.5
branch 2 32 6.2
condition 0 26 0.0
subroutine 10 27 37.0
pod 7 15 46.6
total 51 236 21.6


line stmt bran cond sub pod time code
1             package Job::Async::Worker::Redis;
2              
3 3     3   162959 use strict;
  3         23  
  3         98  
4 3     3   18 use warnings;
  3         7  
  3         101  
5              
6 3     3   881 use parent qw(Job::Async::Worker);
  3         591  
  3         18  
7              
8             our $VERSION = '0.002'; # VERSION
9              
10             =head1 NAME
11              
12             Job::Async::Worker::Redis - L worker implementation for L
13              
14             =head1 SYNOPSIS
15              
16             =head1 DESCRIPTION
17              
18             =cut
19              
20 3     3   415641 use curry::weak;
  3         7  
  3         75  
21 3     3   15 use Syntax::Keyword::Try;
  3         8  
  3         20  
22              
23 3     3   1030 use Job::Async::Utils;
  3         297047  
  3         121  
24 3     3   27 use Future::Utils qw(repeat);
  3         7  
  3         188  
25 3     3   20 use Log::Any qw($log);
  3         7  
  3         29  
26              
27 3     3   1849 use Net::Async::Redis;
  3         39311  
  3         6887  
28              
29             =head2 incoming_job
30              
31             Source for jobs received from the C<< BRPOP(LPUSH) >> queue wait.
32              
33             =cut
34              
35             sub incoming_job {
36 0     0 1 0 my ($self) = @_;
37 0   0     0 $self->{incoming_job} //= do {
38 0 0       0 die 'needs to be part of a loop' unless $self->loop;
39 0         0 my $src = $self->ryu->source;
40 0         0 $src->each($self->curry::weak::on_job_received);
41 0         0 $src
42             }
43             }
44              
45             =head2 on_job_received
46              
47             Called for each job that's received.
48              
49             =cut
50              
51             sub on_job_received {
52 0     0 1 0 my ($self, $id) = (shift, @$_);
53 0         0 my ($queue) = $self->pending_queues;
54 0         0 local @{$log->{context}}{qw(worker_id job_id queue)} = ($self->id, $id, $queue);
  0         0  
55 0         0 $log->debugf('Received job');
56 0 0       0 if(exists $self->{pending_jobs}{$id}) {
57 0         0 $log->errorf("Already have job %s", $id);
58 0         0 die 'Duplicate job ID';
59             } else {
60 0         0 undef $self->{pending_jobs}{$id};
61             }
62 0         0 my $job_count = 0 + keys %{$self->{pending_jobs}};
  0         0  
63 0         0 $log->tracef("Current job count is %d", $job_count);
64 0         0 $self->trigger;
65             $self->redis->hgetall('job::' . $id)->then(sub {
66 0     0   0 my ($items) = @_;
67             Future::Utils::call {
68 0         0 local @{$log->{context}}{qw(worker_id job_id queue)} = ($self->id, $id, $queue);
  0         0  
69 0         0 my %data = @$items;
70 0         0 $self->{pending_jobs}{$id} = my $job = Job::Async::Job->new(
71             data => Job::Async::Job->structured_data(\%data),
72             id => $id,
73             future => my $f = $self->loop->new_future,
74             );
75 0         0 $log->tracef('Job content is %s', { %$job });
76             $f->on_done(sub {
77 0         0 my ($rslt) = @_;
78 0         0 local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
  0         0  
79 0         0 $log->tracef("Result was %s", $rslt);
80             my $code = sub {
81 0         0 my $tx = shift;
82 0         0 local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
  0         0  
83             try {
84             delete $self->{pending_jobs}{$id};
85             $log->tracef('Removing job from processing queue');
86             return Future->needs_all(
87             $tx->hmset('job::' . $id, result => "$rslt"),
88             $tx->publish('client::' . $data{_reply_to}, $id),
89             $tx->lrem($self->prefixed_queue($self->processing_queue) => 1, $id),
90             )
91 0         0 } catch {
92             $log->errorf("Failed due to %s", $@);
93             return Future->fail($@, redis => $self->id, $id);
94             }
95 0         0 };
96             (
97             $self->use_multi
98             ? $self->redis->multi($code)
99             : $code->($self->redis)
100             )->on_ready($self->curry::weak::trigger)
101 0         0 ->on_fail(sub { warn "failed? " . shift })
102 0 0       0 ->retain;
103 0         0 });
104 0         0 $f->on_ready($self->curry::weak::trigger);
105 0 0       0 if(my $timeout = $self->timeout) {
106             Future->needs_any(
107             $f,
108             $self->loop->timeout_future(after => $timeout)
109             )->on_fail(sub {
110 0         0 local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
  0         0  
111 0 0       0 $log->errorf("Timeout but already completed with %s", $f->state) if $f->is_ready;
112 0         0 $f->fail('timeout')
113 0         0 })->retain;
114             }
115 0         0 $self->jobs->emit($job);
116 0         0 $f
117             }
118 0         0 })->on_fail(sub {
119 0     0   0 $log->errorf("Unable to retrieve hash key for data: %s", join " ", @_)
120 0         0 })->retain
121             }
122              
123 0     0 0 0 sub use_multi { shift->{use_multi} }
124              
125 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
126              
127             =head2 pending_queues
128              
129             Note that L only
130             supports a single queue, and will fail if you attempt to start with multiple
131             queues defined.
132              
133             =cut
134              
135 0   0 0 1 0 sub pending_queues { @{ shift->{pending_queues} ||= [qw(pending)] } }
  0         0  
136              
137             =head2 processing_queue
138              
139             =cut
140              
141 0   0 0 1 0 sub processing_queue { shift->{processing_queue} //= 'processing' }
142              
143             =head2 start
144              
145             =cut
146              
147             sub start {
148 0     0 1 0 my ($self) = @_;
149              
150 0         0 $self->trigger;
151             }
152              
153             =head2 stop
154              
155             Requests to stop processing.
156              
157             Returns a future which will complete when all currently-processing jobs have
158             finished.
159              
160             =cut
161              
162             sub stop {
163 0     0 1 0 my ($self) = @_;
164 0   0     0 $self->{stopping_future} ||= $self->loop->new_future;
165 0         0 my $pending = 0 + keys %{$self->{pending_jobs}};
  0         0  
166 0 0 0     0 if(!$pending && $self->{awaiting_job}) {
167             # This will ->cancel a Net::Async::Redis future. Currently that's just
168             # ignored to no great effect, but it would be nice sometime to do
169             # something useful with that.
170 0         0 $self->{awaiting_job}->cancel;
171 0         0 $self->{stopping_future}->done;
172             }
173             # else, either a job is being processed, or there are pending ones.
174             # sub trigger will recheck
175 0         0 return $self->{stopping_future};
176             }
177              
178             sub queue_redis {
179 0     0 0 0 my ($self) = @_;
180 0 0       0 unless($self->{queue_redis}) {
181             $self->add_child(
182 0         0 $self->{queue_redis} = Net::Async::Redis->new(
183             uri => $self->uri,
184             )
185             );
186 0         0 $self->{queue_redis}->connect;
187             }
188 0         0 return $self->{queue_redis};
189             }
190              
191             sub redis {
192 0     0 0 0 my ($self) = @_;
193 0 0       0 unless($self->{redis}) {
194             $self->add_child(
195 0         0 $self->{redis} = Net::Async::Redis->new(
196             uri => $self->uri,
197             )
198             );
199 0         0 $self->{redis}->connect;
200             }
201 0         0 return $self->{redis};
202             }
203              
204             sub prefixed_queue {
205 0     0 0 0 my ($self, $q) = @_;
206 0 0       0 return $q unless length(my $prefix = $self->prefix);
207 0         0 return join '::', $self->prefix, $q;
208             }
209              
210             sub trigger {
211 0     0 0 0 my ($self) = @_;
212 0         0 local @{$log->{context}}{qw(worker_id queue)} = ($self->id, my ($queue) = $self->pending_queues);
  0         0  
213 0         0 my $pending = 0 + keys %{$self->{pending_jobs}};
  0         0  
214 0         0 $log->tracef('Trigger called with %d pending tasks, %d max', $pending, $self->max_concurrent_jobs);
215 0 0       0 return if $pending >= $self->max_concurrent_jobs;
216 0 0 0     0 if(!$pending and $self->{stopping_future}) {
217 0         0 $self->{stopping_future}->done;
218 0         0 return;
219             }
220 0 0       0 $log->tracef("Start awaiting task") unless $self->{awaiting_job};
221             $self->{awaiting_job} //= $self->queue_redis->brpoplpush(
222             $self->prefixed_queue($queue) => $self->prefixed_queue($self->processing_queue), 0
223             )->on_ready(sub {
224 0     0   0 my $f = shift;
225 0         0 delete $self->{awaiting_job};
226 0         0 local @{$log->{context}}{qw(worker_id queue)} = ($self->id, $queue);
  0         0  
227 0         0 $log->tracef('Had task from queue, pending now %d', 0 + keys %{$self->{pending_jobs}});
  0         0  
228             try {
229             my ($id, $queue, @details) = $f->get;
230             $queue //= $queue;
231             $self->incoming_job->emit([ $id, $queue ]);
232 0         0 } catch {
233             $log->errorf("Failed to retrieve and process job: %s", $@);
234             }
235 0         0 $self->loop->later($self->curry::weak::trigger);
236 0   0     0 });
237 0         0 return;
238             }
239              
240 0   0 0 0 0 sub max_concurrent_jobs { shift->{max_concurrent_jobs} //= 1 }
241              
242 0     0 0 0 sub uri { shift->{uri} }
243              
244             sub configure {
245 1     1 1 803 my ($self, %args) = @_;
246 1         4 for my $k (qw(uri max_concurrent_jobs prefix mode processing_queue use_multi)) {
247 6 50       15 $self->{$k} = delete $args{$k} if exists $args{$k};
248             }
249              
250 1 50       4 if(exists $args{pending_queues}) {
251 0 0       0 if(my $queues = $args{pending_queues}) {
252 0 0 0     0 die 'Only a single queue is supported in reliable mode' if $self->mode eq 'reliable' and @$queues > 1;
253 0         0 $self->{pending_queues} = $queues;
254             } else {
255             delete $self->{pending_queues}
256 0         0 }
257             }
258 1         6 return $self->next::method(%args);
259             }
260              
261             1;
262              
263             =head1 AUTHOR
264              
265             Tom Molesworth
266              
267             =head1 LICENSE
268              
269             Copyright Tom Molesworth 2016-2019. Licensed under the same terms as Perl itself.
270