File Coverage

blib/lib/Protocol/Gearman/Worker.pm
Criterion Covered Total %
statement 103 103 100.0
branch 4 6 66.6
condition n/a
subroutine 21 21 100.0
pod 2 5 40.0
total 130 135 96.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014,2026 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman::Worker 0.05;
7              
8 3     3   151945 use v5.20;
  3         9  
9 3     3   10 use warnings;
  3         5  
  3         127  
10              
11 3     3   16 use feature qw( postderef signatures );
  3         3  
  3         325  
12 3     3   11 no warnings qw( experimental::postderef experimental::signatures );
  3         3  
  3         78  
13              
14 3     3   12 use base qw( Protocol::Gearman );
  3         4  
  3         1023  
15              
16 3     3   14 use Carp;
  3         4  
  3         2252  
17              
18             =head1 NAME
19              
20             C - implement a Gearman worker
21              
22             =head1 DESCRIPTION
23              
24             =for highlighter language=perl
25              
26             A base class that implements a complete Gearman worker. This abstract class
27             still requires the implementation methods as documented in
28             L, but otherwise provides a full set of behaviour useful to
29             Gearman workers.
30              
31             As it is based on L it is suitable for both synchronous and
32             asynchronous use. When backed by an implementation capable of performing
33             asynchronously, this object fully supports asynchronous Gearman communication.
34             When backed by a synchronous implementation, it will still yield C
35             instances but the limitations of the synchronous implementation may limit how
36             much concurrency and asynchronous behaviour can be acheived.
37              
38             A simple concrete implementation suitable for synchronous use can be found in
39             L.
40              
41             =cut
42              
43             =head1 METHODS
44              
45             =cut
46              
47             =head2 can_do
48              
49             $worker->can_do( $name, %opts );
50              
51             Informs the server that the worker can perform a function of the given name.
52              
53             The following named options are recognised:
54              
55             =over 8
56              
57             =item timeout => INT
58              
59             If specified, the function is registered using the C variant,
60             which sets a timeout on the Gearman server after which the function ought to
61             have completed. The timeout is specified in seconds.
62              
63             =back
64              
65             =cut
66              
67 2         4 sub can_do ( $self, $name, %opts )
  2         3  
68 2     2 1 166023 {
  2         4  
  2         2  
69 2         3 my $timeout = $opts{timeout};
70              
71 2 100       5 if( defined $timeout ) {
72 1         4 $self->send_packet( CAN_DO_TIMEOUT => $name, int $timeout );
73             }
74             else {
75 1         4 $self->send_packet( CAN_DO => $name );
76             }
77             }
78              
79             =head2 grab_job
80              
81             $job = await $worker->grab_job;
82              
83             Returns a future that will eventually yield another job assignment from the
84             server as an instance of a job object; see below.
85              
86             =cut
87              
88             sub grab_job ( $self )
89 3     3 1 9753 {
  3         4  
  3         4  
90 3         16 my $state = $self->gearman_state;
91              
92 3         18 push $state->{gearman_assigns}->@*, my $f = $self->new_future;
93              
94 3         54 $self->send_packet( GRAB_JOB => );
95              
96 3         154 return $f;
97             }
98              
99 3         4 sub on_JOB_ASSIGN ( $self, @args )
100 3     3 0 16 {
  3         6  
  3         14  
101 3         12 my $state = $self->gearman_state;
102              
103 3         12 my $f = shift $state->{gearman_assigns}->@*;
104 3         17 $f->done( Protocol::Gearman::Worker::Job->new( $self, @args ) );
105             }
106              
107             # Manage Gearman's slightly odd sleep/wakeup job request loop
108              
109             sub on_NO_JOB ( $self )
110 1     1 0 10 {
  1         1  
  1         2  
111 1         3 $self->send_packet( PRE_SLEEP => );
112             }
113              
114             sub on_NOOP ( $self )
115 1     1 0 8 {
  1         2  
  1         1  
116 1         3 my $state = $self->gearman_state;
117              
118 1 50       15 $self->send_packet( GRAB_JOB => ) if $state->{gearman_assigns}->@*;
119             }
120              
121             =head2 job_finished
122              
123             $worker->job_finished( $job );
124              
125             Invoked by the C and C methods on a job object, after the
126             server has been informed of the final status of the job. By default this
127             method does nothing, but it is provided for subclasses to override, to be
128             informed when a job is finished.
129              
130             =cut
131              
132             sub job_finished ( $self, $ ) { }
133              
134             package # hide from CPAN
135             Protocol::Gearman::Worker::Job;
136              
137             =head1 JOB OBJECTS
138              
139             Objects of this type are returned by the C method. They represent
140             individual job assignments from the server, and can be used to obtain details
141             of the work to perform, and report on its result.
142              
143             =cut
144              
145 3         3 sub new ( $class, $worker, $handle, $func, $arg )
  3         4  
  3         13  
  3         3  
146 3     3   5 {
  3         4  
  3         4  
147 3         26 return bless {
148             worker => $worker,
149             handle => $handle,
150             func => $func,
151             arg => $arg,
152             }, $class;
153             }
154              
155             =head2 worker
156              
157             $worker = $job->worker;
158              
159             Returns the C object the job was received by.
160              
161             =head2 handle
162              
163             $handle = $job->handle;
164              
165             Returns the job handle assigned by the server. Most implementations should not
166             need to use this directly.
167              
168             =head2 func
169              
170             =head2 arg
171              
172             $func = $job->func;
173              
174             $arg = $job->arg;
175              
176             The function name and opaque argument data bytes sent by the requesting
177             client.
178              
179             =cut
180              
181 11     11   12 sub worker ( $self ) { $self->{worker} }
  11         11  
  11         12  
  11         32  
182 9     9   337 sub handle ( $self ) { $self->{handle} }
  9         8  
  9         9  
  9         27  
183 2     2   219 sub func ( $self ) { $self->{func} }
  2         4  
  2         2  
  2         11  
184 2     2   3 sub arg ( $self ) { $self->{arg} }
  2         4  
  2         3  
  2         5  
185              
186             =head2 data
187              
188             $job->data( $data );
189              
190             Sends more data back to the client. Intended for long-running jobs with
191             incremental output.
192              
193             =cut
194              
195 1         1 sub data ( $self, $data )
196 1     1   9 {
  1         1  
  1         2  
197 1         2 $self->worker->send_packet( WORK_DATA => $self->handle, $data );
198             }
199              
200             =head2 warning
201              
202             $job->warning( $warning );
203              
204             Sends a warning to the client.
205              
206             =cut
207              
208 1         1 sub warning ( $self, $warning )
209 1     1   7 {
  1         2  
  1         1  
210 1         2 $self->worker->send_packet( WORK_WARNING => $self->handle, $warning );
211             }
212              
213             =head2 status
214              
215             $job->status( $numerator, $denominator );
216              
217             Sets the current progress of the job.
218              
219             =cut
220              
221 2         3 sub status ( $self, $num, $denom )
  2         2  
222 2     2   259 {
  2         3  
  2         3  
223 2         4 $self->worker->send_packet( WORK_STATUS => $self->handle, $num, $denom );
224             }
225              
226             =head2 complete
227              
228             $job->complete( $result );
229              
230             Informs the server that the job is now complete, and sets its result.
231              
232             =cut
233              
234 2         3 sub complete ( $self, $result )
235 2     2   617 {
  2         4  
  2         2  
236 2         23 $self->worker->send_packet( WORK_COMPLETE => $self->handle, $result );
237 2         87 $self->worker->job_finished( $self );
238             }
239              
240             =head2 fail
241              
242             $job->fail( $exception );
243              
244             Informs the server that the job has failed.
245              
246             Optionally an exception value can be supplied; if given this will be sent to
247             the server using a C message. Note that not all clients will
248             receive this; it is an optional feature.
249              
250             =cut
251              
252 1         2 sub fail ( $self, $exception )
253 1     1   12 {
  1         2  
  1         2  
254 1 50       3 if( defined $exception ) {
255 1         6 $self->worker->send_packet( WORK_EXCEPTION => $self->handle, $exception );
256             }
257              
258 1         7 $self->worker->send_packet( WORK_FAIL => $self->handle );
259 1         15 $self->worker->job_finished( $self );
260             }
261              
262             =head1 AUTHOR
263              
264             Paul Evans
265              
266             =cut
267              
268             0x55AA;