File Coverage

blib/lib/Mojo/Redis/Processor.pm
Criterion Covered Total %
statement 53 114 46.4
branch 21 38 55.2
condition 0 6 0.0
subroutine 12 26 46.1
pod 5 5 100.0
total 91 189 48.1


line stmt bran cond sub pod time code
1             package Mojo::Redis::Processor;
2 1     1   96601 use strict;
  1         2  
  1         26  
3 1     1   4 use warnings;
  1         1  
  1         21  
4              
5 1     1   6 use Carp;
  1         1  
  1         54  
6 1     1   497 use Array::Utils qw (array_minus);
  1         332  
  1         60  
7 1     1   5 use Digest::MD5 qw(md5_hex);
  1         1  
  1         47  
8 1     1   4 use Time::HiRes qw(usleep);
  1         2  
  1         8  
9 1     1   682 use Mojo::Redis2;
  1         198239  
  1         13  
10 1     1   744 use RedisDB;
  1         27583  
  1         45  
11 1     1   796 use JSON::XS qw(encode_json decode_json);
  1         5015  
  1         1107  
12              
13             =head1 NAME
14              
15             Mojo::Redis::Processor - Encapsulates the process for a Mojo app to send an expensive job to a daemon using Redis underneath and Redis SET NX and Redis Pub/Sub.
16              
17             =cut
18              
19             our $VERSION = '0.07';
20              
21             =head1 DESCRIPTION
22              
23             This module is specialized to help a Mojo app to send an expensive job request to be processed in parallel in a separete daemon. Communication is handled through Redis.
24              
25             This is specialized for processing tasks that can be common between different running Mojo children. Race condition between children to add a new tasks is handle by Redis SET NX capability.
26              
27             =head1 Example
28              
29             Mojo app which wants to send data and get stream of processed results will look like:
30              
31             use Mojo::Redis::Processor;
32             use Mojolicious::Lite;
33              
34             my $rp = Mojo::Redis::Processor->new({
35             data => 'Data',
36             trigger => 'R_25',
37             });
38              
39             $rp->send();
40             my $redis_channel = $rp->on_processed(
41             sub {
42             my ($message, $channel) = @_;
43             print "Got a new result [$message]\n";
44             });
45              
46             app->start;
47              
48             Try it like:
49              
50             $ perl -Ilib ws.pl daemon
51              
52              
53             Processor daemon code will look like:
54              
55             use Mojo::Redis::Processor;
56             use Parallel::ForkManager;
57              
58             use constant MAX_WORKERS => 1;
59              
60             $pm = new Parallel::ForkManager(MAX_WORKERS);
61              
62             while (1) {
63             my $pid = $pm->start and next;
64              
65             my $rp = Mojo::Redis::Processor->new;
66              
67             $next = $rp->next();
68             if ($next) {
69             print "next job started [$next].\n";
70              
71             $redis_channel = $rp->on_trigger(
72             sub {
73             my $payload = shift;
74             print "processing payload\n";
75             return rand(100);
76             });
77             print "Job done, exiting the child!\n";
78             } else {
79             print "no job found\n";
80             sleep 1;
81             }
82             $pm->finish;
83             }
84              
85             Try it like:
86              
87             $ perl -Ilib daemon.pl
88              
89             Daemon needs to pick a forking method and also handle ide processes and timeouts.
90              
91             =cut
92              
93             =head1 METHODS
94              
95             =cut
96              
97             my @ALLOWED = qw(data trigger redis_read redis_write read_conn write_conn daemon_conn prefix expire usleep retry);
98              
99             =head2 C<< new(%Options) >>
100              
101             This will instantiate the object for both reqeust sender and processor. Type depends on options which are passed.
102              
103             =over
104              
105             =item B
106              
107             Data for processing that we pass to the $pricer code.
108              
109             =item B
110              
111             Trigger will be a redis channel that will trigger call of pricer code.
112              
113             =item B
114              
115             Data for processing that we pass to the $pricer code.
116              
117             =item B
118              
119             Redis URL for read and write. Write means there is a central and replicated redis. redis_write will default to redis_read if it is not passed.
120              
121             =item B
122              
123             Setting redis connections directly. daemon_conn is used to wait for trigger.
124              
125             =item B
126              
127             Key prefix that is used in redis. If it is not set it will default to "Redis::Processor::".
128              
129             =item B
130              
131             Expire time that client will set after receiving new price from price processor. Price process will continue to price as long as someone is extending this expiry.
132              
133             =item B
134              
135             Sleep time if there was no job available.
136              
137             =item B
138              
139             Retry time to wait for new job become available. If no job become available next() will return empty.
140              
141             =back
142              
143             This will new the thing.
144              
145             =cut
146              
147             sub new { ## no critic (ArgUnpacking)
148 6     6 1 1775 my $class = shift;
149 6 100       17 my $self = ref $_[0] ? $_[0] : {@_};
150              
151 6         7 my @REQUIRED = qw();
152 6 100       15 if (exists $self->{data}) {
153 3         5 @REQUIRED = qw(data trigger);
154             }
155              
156 6         7 my @missing = grep { !$self->{$_} } @REQUIRED;
  6         10  
157 6 100       19 croak "Error, missing parameters: " . join(',', @missing) if @missing;
158              
159 5         13 my @passed = keys %$self;
160 5         12 my @invalid = array_minus(@passed, @ALLOWED);
161 5 100       145 croak "Error, invalid parameters:" . join(',', @invalid) if @invalid;
162              
163 4         4 bless $self, $class;
164 4         12 $self->_initialize();
165 4         22 return $self;
166             }
167              
168             sub _initialize {
169 4     4   3 my $self = shift;
170 4 100       13 $self->{prefix} = 'Redis::Processor::' if !exists $self->{prefix};
171 4 100       7 $self->{expire} = 60 if !exists $self->{expire};
172 4 100       9 $self->{usleep} = 10 if !exists $self->{usleep};
173 4 100       8 $self->{redis_read} = 'redis://127.0.0.1:6379/0' if !exists $self->{redis_read};
174 4 100       7 $self->{redis_write} = $self->{redis_read} if !exists $self->{redis_write};
175 4 100       8 $self->{retry} = 1 if !exists $self->{retry};
176              
177 4         10 $self->{_job_counter} = $self->{prefix} . 'job';
178 4         5 $self->{_worker_counter} = $self->{prefix} . 'worker';
179 4         5 return;
180             }
181              
182             sub _job_load {
183 0     0   0 my $self = shift;
184 0         0 my $job = shift;
185 0         0 return $self->{prefix} . 'load::' . $job;
186             }
187              
188             sub _unique {
189 0     0   0 my $self = shift;
190 0         0 return $self->{prefix} . md5_hex($self->_payload);
191             }
192              
193             sub _payload {
194 0     0   0 my $self = shift;
195 0         0 return JSON::XS::encode_json([$self->{data}, $self->{trigger}]);
196             }
197              
198             sub _processed_channel {
199 0     0   0 my $self = shift;
200 0         0 return $self->_unique;
201             }
202              
203             sub _read {
204 0     0   0 my $self = shift;
205 0 0       0 $self->{read_conn} = Mojo::Redis2->new(url => $self->{redis_read}) if !$self->{read_conn};
206              
207 0         0 return $self->{read_conn};
208             }
209              
210             sub _write {
211 1     1   5 my $self = shift;
212              
213 1 50       12 $self->{write_conn} = RedisDB->new(url => $self->{redis_write}) if !$self->{write_conn};
214 0           return $self->{write_conn};
215             }
216              
217             sub _daemon_redis {
218 0     0     my $self = shift;
219              
220 0 0         $self->{daemon_conn} = RedisDB->new(url => $self->{redis_write}) if !$self->{daemon_conn};
221 0           return $self->{daemon_conn};
222             }
223              
224             =head2 C<< send() >>
225              
226             Will send the Mojo app data processing request. This is mainly a queueing job. Job will expire if no worker take it in time. If more than one app try to register the same job Redis SET NX will only assign one of them to proceed.
227              
228             =cut
229              
230             sub send { ## no critic (ProhibitBuiltinHomonyms)
231 0     0 1   my $self = shift;
232              
233             # race for setting a unique key
234 0 0         if ($self->_write->setnx($self->_unique, 1)) {
235             # if successful first set the key TTL. It must go away if no worker took the job.
236 0           $self->_write->expire($self->_unique, $self->{expire});
237              
238 0           my $job = $self->_write->incr($self->{_job_counter});
239 0           $self->_write->set($self->_job_load($job), $self->_payload);
240 0           $self->_write->expire($self->_job_load($job), $self->{expire});
241             }
242 0           return;
243             }
244              
245             =head2 C<< on_processed($code) >>
246              
247             Mojo app will call this to register a code reference that will be triggered everytime there is a result. Results will be triggered and published based on trigger option.
248              
249             =cut
250              
251             sub on_processed {
252 0     0 1   my $self = shift;
253 0           my $code = shift;
254              
255             $self->_read->on(
256             message => sub {
257 0     0     my (undef, $msg, $channel) = @_;
258 0           $code->($msg, $channel);
259 0           });
260 0           $self->_read->subscribe([$self->_processed_channel]);
261 0           return;
262             }
263              
264             =head2 C<< next() >>
265              
266             Daemon will call this to start the next job. If it return empty it meam there was no job found after "retry".
267              
268             =cut
269              
270             sub next { ## no critic (ProhibitBuiltinHomonyms)
271 0     0 1   my $self = shift;
272              
273 0           my $last_job = $self->_read->get($self->{_job_counter});
274 0           my $last_worker = $self->_read->get($self->{_worker_counter});
275              
276 0 0 0       return if (!$last_job || ($last_worker && $last_job <= $last_worker));
      0        
277              
278 0           my $next = $self->_write->incr($self->{_worker_counter});
279 0           my $payload;
280              
281 0           for (my $i = 0; $i < $self->{retry}; $i++) {
282 0 0         last if $payload = $self->_read->get($self->_job_load($next));
283 0           usleep($self->{usleep});
284             }
285 0 0         return if not $payload;
286              
287 0           my $tmp = JSON::XS::decode_json($payload);
288              
289 0           $self->{data} = $tmp->[0];
290 0           $self->{trigger} = $tmp->[1];
291              
292 0           return $next;
293             }
294              
295             sub _expired {
296 0     0     my $self = shift;
297              
298 0 0         return 1 if $self->_read->ttl($self->_unique) <= 0;
299 0           return;
300             }
301              
302             =head2 C<< on_trigger() >>
303              
304             Daemon will call this to register a processor code reference that will be called everytime trigger happens.
305             The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system.
306             on_trigger will exit the loop when there is no more subscriber to the channel.
307              
308             =cut
309              
310             sub on_trigger {
311 0     0 1   my $self = shift;
312 0           my $pricer = shift;
313              
314             $self->_daemon_redis->subscription_loop(
315             default_callback => sub {
316 0     0     my $c = shift;
317 0           my $count = $self->_publish($pricer->($self->{data}));
318 0           $self->_write->expire($self->_unique, $self->{expire});
319 0 0         if ($count == 0) {
320 0           $c->unsubscribe();
321 0           $self->_write->del($self->_unique);
322             }
323             },
324 0           subscribe => [$self->{trigger}]);
325 0           return;
326             }
327              
328             sub _publish {
329 0     0     my $self = shift;
330 0           my $result = shift;
331              
332 0           $self->_write->publish($self->_processed_channel, $result);
333 0           return;
334             }
335              
336             =head1 AUTHOR
337              
338             Binary.com, C<< >>
339              
340             =head1 BUGS
341              
342             Please report any bugs or feature requests to C, or through
343             the web interface at L. I will be notified, and then you'll
344             automatically be notified of progress on your bug as I make changes.
345              
346              
347              
348              
349             =head1 SUPPORT
350              
351             You can find documentation for this module with the perldoc command.
352              
353             perldoc Mojo::Redis::Processor
354              
355              
356             You can also look for information at:
357              
358             =over 4
359              
360             =item * RT: CPAN's request tracker (report bugs here)
361              
362             L
363              
364             =item * AnnoCPAN: Annotated CPAN documentation
365              
366             L
367              
368             =item * CPAN Ratings
369              
370             L
371              
372             =item * Search CPAN
373              
374             L
375              
376             =back
377              
378              
379             =head1 ACKNOWLEDGEMENTS
380              
381              
382              
383             =cut
384              
385             1;