File Coverage

blib/lib/Minion/Notifier.pm
Criterion Covered Total %
statement 29 32 90.6
branch n/a
condition n/a
subroutine 8 9 88.8
pod 4 4 100.0
total 41 45 91.1


line stmt bran cond sub pod time code
1             package Minion::Notifier;
2              
3 1     1   6 use Mojo::Base 'Mojo::EventEmitter';
  1         2  
  1         6  
4              
5             our $VERSION = '0.06';
6             $VERSION = eval $VERSION;
7              
8             has minion => sub { die 'A Minion instance is required' };
9              
10             has transport => sub { Minion::Notifier::Transport->new };
11              
12 0     0 1 0 sub app { shift->minion->app }
13              
14             sub emit_event {
15 3     3 1 6 my ($self, $id, $event) = @_;
16 3         19 $self->emit(job => $id, $event);
17 3         2264 $self->emit("job:$id" => $id, $event);
18 3         22 $self->emit($event => $id);
19             }
20              
21             sub setup_listener {
22 1     1 1 2 my $self = shift;
23              
24             $self->minion->on(enqueue => sub {
25 1     1   4991682 my ($minion, $id) = @_;
26 1         7 $self->transport->send($id, 'enqueue');
27 1         4 });
28              
29             $self->transport->on(notified => sub {
30 3     3   25 my ($transport, $id, $event) = @_;
31 3         15 $self->emit_event($id, $event);
32 1         24 });
33              
34 1         17 $self->transport->listen;
35              
36 1         1172 return $self;
37             }
38              
39             sub setup_worker {
40 1     1 1 2 my $self = shift;
41              
42             my $dequeue = sub {
43 1     1   1826 my ($worker, $job) = @_;
44 1         6 my $id = $job->id;
45 1         13 $self->transport->send($id, 'dequeue');
46 1         713 $job->on(start => sub { $self->transport->_start });
  0         0  
47 1         9 $job->on(finished => sub { $self->transport->send($id, 'finished') });
  1         17181  
48 1         7 $job->on(failed => sub { $self->transport->send($id, 'failed') });
  0         0  
49 1         6 };
50              
51             $self->minion->on(worker => sub {
52 1     1   125 my ($minion, $worker) = @_;
53 1         8 $worker->on(dequeue => $dequeue);
54 1         4 });
55              
56 1         33 return $self
57             }
58              
59             1;
60              
61             =head1 NAME
62              
63             Minion::Notifier - Notify listeners when a Minion task has completed
64              
65             =head1 SYNOPSIS
66              
67             use Mojolicious::Lite;
68              
69             plugin Minion => { Pg => 'posgressql://...'};
70              
71             plugin 'Minion::Notifier';
72              
73             app->minion->add_task( doit => sub { ... } );
74              
75             any '/doit' => sub {
76             my $c = shift;
77             my $id = $c->minion->enqueue(doit => [...]);
78             $c->minion_notifier->on(job => sub {
79             my ($notifier, $job_id, $message) = @_;
80             return unless $job_id eq $id;
81             $c->render( text => "job $id: $message" );
82             });
83             };
84              
85             =head1 DESCRIPTION
86              
87             Although L is a highly capable job queue, it does not natively have a mechanism to notify listeners when a job has finished or failed.
88             L provides this feature using pluggable L backends.
89             Currently supported are L, L, and L.
90             Postgres support requires L and Redis requires L.
91             WebSockets are native to Mojolicious but you need a broker to manage the connections; L is the author's suggested WebSocket message broker.
92              
93             Note that this is an early release and the mechansim for loading plugins, especially third-party plugins is likely to change.
94             Also note that due to the use of messaging buses, the order of events is not guaranteed especially on very fast jobs.
95              
96             =head1 EVENTS
97              
98             L inherits all events from L and emits the following new ones.
99              
100             =head2 enqueue
101              
102             $notifier->on(enqueue => sub { my ($notifier, $job_id) = @_; ... });
103              
104             Emitted whenever any job is enqueued (typically having a state of "inactive").
105             Note that the event is not repeated as an argument, though this is subject to change.
106              
107             =head2 dequeue
108              
109             $notifier->on(dequeue => sub { my ($notifier, $job_id) = @_; ... });
110              
111             Emitted whenever any job is dequeued for processing (typically having a state of "active").
112             Note that the event is not repeated as an argument, though this is subject to change.
113              
114             =head2 job
115              
116             $notifier->on(job => sub { my ($notifier, $job_id, $event) = @_; ... });
117              
118             Emitted on any event from the backend for all jobs.
119             The events are currently "enqueue", "dequeue", "finished", and "failed".
120              
121             =head2 job:$id
122              
123             $notifier->on("job:1234" => sub { my ($notifier, $job_id, $event) = @_; ... });
124              
125             Emitted on any message from the backend for specific jobs.
126             Note that the id is still passed so that you may reuse callbacks if desired.
127             The events are currently "enqueue", "dequeue", "finished", and "failed".
128              
129             Users of this event are encouraged to carefully consider what race conditions may exist in the act of subscribing to it.
130             For example, C will emit the "enqueue" event before it even returns the job's id.
131             For this reason, this event is discouraged and may be deprecated/removed in a future release.
132              
133             =head2 finished
134              
135             $notifier->on(finished => sub { my ($notifier, $job_id) = @_; ... });
136              
137             Emitted whenever any job reaches a state of "finished".
138             Note that the event is not repeated as an argument, though this is subject to change.
139              
140             =head2 failed
141              
142             $notifier->on(failed => sub { my ($notifier, $job_id) = @_; ... });
143              
144             Emitted whenever any job reaches a state of "failed".
145             Note that the event is not repeated as an argument, though this is subject to change.
146              
147             =head1 ATTRIBUTES
148              
149             L inherits all of the attributes from L and implements the following new ones.
150              
151             =head2 minion
152              
153             The L instance to listen to.
154             Note that this attribute is used to gain access to the L<"application instance"|/app">.
155              
156             =head2 transport
157              
158             An instance of L or more likely a subclass thereof.
159             This is used to moderate the communication between processes and even hosts.
160              
161             =head1 METHODS
162              
163             L inherits all of the methods from L and implements the following new ones.
164              
165             =head2 app
166              
167             A shortcut for C<< $notifier->minion->app >>.
168              
169             =head2 emit_event
170              
171             A low level method used to emit the batch of events related to received minion events.
172              
173             =head2 setup_listener
174              
175             Setup the linkages that allow for notifications to be received.
176             This is called automatically by L once the ioloop has started.
177              
178             =head2 setup_worker
179              
180             Setup the linkages that cause the jobs to send notifications when reaching "finished" or "failed" states.
181             This is called automatically by L.
182              
183             =head1 FUTURE WORK
184              
185             =over
186              
187             =item *
188              
189             Document all included classes (hey this is a preview release!)
190              
191             =item *
192              
193             Improve backend loader mechanism
194              
195             =item *
196              
197             Investigate timeout behavior for the various transport backends
198              
199             =back
200              
201             =head1 SEE ALSO
202              
203             =over
204              
205             =item *
206              
207             L - Real-time web framework
208              
209             =item *
210              
211             L - The L job queue
212              
213             =item *
214              
215             L - A lightweight message broker using L' WebSockets for transport
216              
217             =back
218              
219             =head1 SOURCE REPOSITORY
220              
221             L
222              
223             =head1 AUTHOR
224              
225             Joel Berger, Ejoel.a.berger@gmail.comE
226              
227             =head1 COPYRIGHT AND LICENSE
228              
229             Copyright (C) 2015 by Joel Berger
230              
231             This library is free software; you can redistribute it and/or modify
232             it under the same terms as Perl itself.
233              
234             =cut