File Coverage

blib/lib/Protocol/Gearman.pm
Criterion Covered Total %
statement 127 127 100.0
branch 22 32 68.7
condition 6 15 40.0
subroutine 22 22 100.0
pod 14 15 93.3
total 191 211 90.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014,2026 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman 0.05;
7              
8 9     9   898138 use v5.20;
  9         24  
9 9     9   34 use warnings;
  9         11  
  9         376  
10              
11 9     9   38 use feature qw( postderef signatures );
  9         12  
  9         1071  
12 9     9   36 no warnings qw( experimental::postderef experimental::signatures );
  9         36  
  9         245  
13              
14 9     9   29 use Carp;
  9         16  
  9         506  
15 9     9   37 use Scalar::Util qw( reftype );
  9         11  
  9         2467  
16              
17             =head1 NAME
18              
19             C - abstract base class for both client and worker
20              
21             =head1 DESCRIPTION
22              
23             =for highlighter language=perl
24              
25             This base class is used by both L and
26             L. It shouldn't be used directly by end-user
27             implementations. It is documented here largely to explain what methods an end
28             implementation needs to provide in order to create a Gearman client or worker.
29              
30             For implementing a Gearman client or worker, see the modules
31              
32             =over 2
33              
34             =item *
35              
36             L
37              
38             =item *
39              
40             L
41              
42             =back
43              
44             For a simple synchronous Gearman client or worker module for use during
45             testing or similar, see
46              
47             =over 2
48              
49             =item *
50              
51             L
52              
53             =item *
54              
55             L
56              
57             =back
58              
59             =cut
60              
61             =head1 REQUIRED METHODS
62              
63             The implementation should provide the following methods:
64              
65             =cut
66              
67             =head2 new_future
68              
69             $f = $gearman->new_future;
70              
71             Return a new L subclass instance, for request methods to use. This
72             instance should support awaiting appropriately.
73              
74             =cut
75              
76 1         2 sub new_future ( $self, @args )
77 1     1 1 405 {
  1         1  
  1         1  
78 1 50 33     7 reftype $self eq "HASH" and ref( my $code = $self->{gearman_method_new_future} ) eq "CODE" or
79             croak "Can't locate object method \"new_future\" via package ".ref($self).", or it is not a prototypical object";
80              
81 1         4 $code->( $self, @args );
82             }
83              
84             =head2 send
85              
86             $gearman->send( $bytes );
87              
88             Send the given bytes to the server.
89              
90             =cut
91              
92 1         1 sub send ( $self, @args )
93 1     1 1 1 {
  1         2  
  1         1  
94 1 50 33     6 reftype $self eq "HASH" and ref( my $code = $self->{gearman_method_send} ) eq "CODE" or
95             croak "Can't locate object method \"send\" via package ".ref($self).", or it is not a prototypical object";
96              
97 1         3 $code->( $self, @args );
98             }
99              
100             =head2 gearman_state
101              
102             $h = $gearman->gearman_state;
103              
104             Return a HASH reference for the Gearman-related code to store its state on.
105             If not implemented, a default method will be provided which uses C<$gearman>
106             itself, for the common case of HASH-based objects. All the Gearman-related
107             state will be stored in keys whose names are prefixed by C, to avoid
108             clashes with other object state.
109              
110             =cut
111              
112 25     25 1 27 sub gearman_state ( $self ) { $self }
  25         43  
  25         27  
  25         38  
113              
114             # These are used internally but not exported
115             use constant {
116 9         12958 MAGIC_REQUEST => "\0REQ",
117             MAGIC_RESPONSE => "\0RES",
118 9     9   44 };
  9         27  
119              
120             my %CONSTANTS = (
121             TYPE_CAN_DO => 1,
122             TYPE_CANT_DO => 2,
123             TYPE_RESET_ABILITIES => 3,
124             TYPE_PRE_SLEEP => 4,
125             TYPE_NOOP => 6,
126             TYPE_SUBMIT_JOB => 7,
127             TYPE_JOB_CREATED => 8,
128             TYPE_GRAB_JOB => 9,
129             TYPE_NO_JOB => 10,
130             TYPE_JOB_ASSIGN => 11,
131             TYPE_WORK_STATUS => 12,
132             TYPE_WORK_COMPLETE => 13,
133             TYPE_WORK_FAIL => 14,
134             TYPE_GET_STATUS => 15,
135             TYPE_ECHO_REQ => 16,
136             TYPE_ECHO_RES => 17,
137             TYPE_SUBMIT_JOB_BG => 18,
138             TYPE_ERROR => 19,
139             TYPE_STATUS_RES => 20,
140             TYPE_SUBMIT_JOB_HIGH => 21,
141             TYPE_SET_CLIENT_ID => 22,
142             TYPE_CAN_DO_TIMEOUT => 23,
143             TYPE_ALL_YOURS => 24,
144             TYPE_WORK_EXCEPTION => 25,
145             TYPE_OPTION_REQ => 26,
146             TYPE_OPTION_RES => 27,
147             TYPE_WORK_DATA => 28,
148             TYPE_WORK_WARNING => 29,
149             TYPE_GRAB_JOB_UNIQ => 30,
150             TYPE_JOB_ASSIGN_UNIQ => 31,
151             TYPE_SUBMIT_JOB_HIGH_BG => 32,
152             TYPE_SUBMIT_JOB_LOW => 33,
153             TYPE_SUBMIT_JOB_LOW_BG => 34,
154             );
155              
156             require constant;
157             constant->import( $_, $CONSTANTS{$_} ) for keys %CONSTANTS;
158              
159             =head1 INTERNAL METHODS
160              
161             These methods are provided for the client and worker subclasses to use; it is
162             unlikely these will be of interest to other users but they are documented here
163             for completeness.
164              
165             =cut
166              
167             # All Gearman packet bodies follow a standard format, of a fixed number of
168             # string arguments (given by the packet type), separated by a single NUL byte.
169             # All but the final argument may not contain embedded NULs.
170              
171             my %TYPENAMES = map { m/^TYPE_(.*)$/ ? ( $CONSTANTS{$_} => $1 ) : () } keys %CONSTANTS;
172              
173             my %ARGS_FOR_TYPE = (
174             # In order from doc/PROTOCOL
175             # common
176             ECHO_REQ => 1,
177             ECHO_RES => 1,
178             ERROR => 2,
179             # client->server
180             SUBMIT_JOB => 3,
181             SUBMIT_JOB_BG => 3,
182             SUBMIT_JOB_HIGH => 3,
183             SUBMIT_JOB_HIGH_BG => 3,
184             SUBMIT_JOB_LOW => 3,
185             SUBMIT_JOB_LOW_BG => 3,
186             GET_STATUS => 1,
187             OPTION_REQ => 1,
188             # server->client
189             JOB_CREATED => 1,
190             STATUS_RES => 5,
191             OPTION_RES => 1,
192             # worker->server
193             CAN_DO => 1,
194             CAN_DO_TIMEOUT => 2,
195             CANT_DO => 1,
196             RESET_ABILITIES => 0,
197             PRE_SLEEP => 0,
198             GRAB_JOB => 0,
199             GRAB_JOB_UNIQ => 0,
200             WORK_DATA => 2,
201             WORK_WARNING => 2,
202             WORK_STATUS => 3,
203             WORK_COMPLETE => 2,
204             WORK_FAIL => 1,
205             WORK_EXCEPTION => 2,
206             SET_CLIENT_ID => 1,
207             ALL_YOURS => 0,
208             # server->worker
209             NOOP => 0,
210             NO_JOB => 0,
211             JOB_ASSIGN => 3,
212             JOB_ASSIGN_UNIQ => 4,
213             );
214              
215             =head2 pack_packet
216              
217             ( $type, $body ) = $gearman->pack_packet( $name, @args );
218              
219             Given a name of a packet type (specified as a string as the name of one of the
220             C constants, without the leading C prefix; case insignificant)
221             returns the type value and the arguments for the packet packed into a body
222             string. This is intended for passing directly into C or
223             C:
224              
225             send_packet $fh, pack_packet( SUBMIT_JOB => $func, $id, $arg );
226              
227             =cut
228              
229 11         22 sub pack_packet ( $, $typename, @args )
230 11     11 1 141551 {
  11         13  
  11         13  
231 11 100       243 my $typefn = __PACKAGE__->can( "TYPE_\U$typename" ) or
232             croak "Unrecognised packet type '$typename'";
233              
234 10         61 my $n_args = $ARGS_FOR_TYPE{uc $typename};
235              
236 10 100       112 @args == $n_args or croak "Expected '\U$typename\E' to take $n_args args";
237             $args[$_] =~ m/\0/ and croak "Non-final argument [$_] of '\U$typename\E' cannot contain a \\0"
238 9   66     132 for 0 .. $n_args-2;
239              
240 8         19 my $type = $typefn->();
241 8         38 return ( $type, join "\0", @args );
242             }
243              
244             =head2 unpack_packet
245              
246             ( $name, @args ) = $gearman->unpack_packet( $type, $body );
247              
248             Given a type code and body string, returns the type name and unpacked
249             arguments from the body. This function is the reverse of C and is
250             intended to be used on the result of C or C:
251              
252             The returned C<$name> will always be a fully-captialised type name, as one of
253             the C constants without the leading C prefix.
254              
255             This is intended for a C control block, or dynamic method
256             dispatch:
257              
258             my ( $name, @args ) = unpack_packet( recv_packet $fh );
259              
260             $self->${\"handle_$name"}( @args );
261              
262             =cut
263              
264 9         11 sub unpack_packet ( $, $type, $body )
265 9     9 1 1091 {
  9         14  
  9         9  
266 9 100       133 my $typename = $TYPENAMES{$type} or
267             croak "Unrecognised packet type $type";
268              
269 8         18 my $n_args = $ARGS_FOR_TYPE{$typename};
270              
271 8 50       23 return ( $typename ) if $n_args == 0;
272 8         50 return ( $typename, split m/\0/, $body, $n_args );
273             }
274              
275             =head2 parse_packet_from_string
276              
277             ( $name, @args ) = $gearman->parse_packet_from_string( $bytes );
278              
279             Attempts to parse a complete message packet from the given byte string. If it
280             succeeds, it returns the type name and arguments. If it fails it returns an
281             empty list.
282              
283             If successful, it will remove the bytes of the packet form the C<$bytes>
284             scalar, which must therefore be mutable.
285              
286             If the byte string begins with some bytes that are not recognised as the
287             Gearman packet magic for a response, the function will immediately throw an
288             exception before modifying the string.
289              
290             =cut
291              
292             # hard to do $_[0] mutation with a signature
293             sub parse_packet_from_string
294             {
295 10     10 1 163981 my $self = shift;
296              
297 10 100       35 return unless length $_[0] >= 4;
298 7 100       181 croak "Expected to find 'RES' magic in packet" unless
299             unpack( "a4", $_[0] ) eq MAGIC_RESPONSE;
300              
301 6 50       21 return unless length $_[0] >= 12;
302              
303 6         27 my $bodylen = unpack( "x8 N", $_[0] );
304 6 50       19 return unless length $_[0] >= 12 + $bodylen;
305              
306             # Now committed to extracting it
307 6         24 my ( $type ) = unpack( "x4 N x4", substr $_[0], 0, 12, "" );
308 6         11 my $body = substr $_[0], 0, $bodylen, "";
309              
310 6         25 return $self->unpack_packet( $type, $body );
311             }
312              
313             =head2 recv_packet_from_fh
314              
315             ( $name, @args ) = $gearman->recv_packet_from_fh( $fh );
316              
317             Attempts to read a complete packet from the given filehandle, blocking until
318             it is available. The results are undefined if this function is called on a
319             non-blocking filehandle.
320              
321             If an IO error happens, an exception is thrown. If the first four bytes read
322             are not recognised as the Gearman packet magic for a response, the function
323             will immediately throw an exception. If either of these conditions happen, the
324             filehandle should be considered no longer valid and should be closed.
325              
326             =cut
327              
328 2         5 sub recv_packet_from_fh ( $self, $fh )
329 2     2 1 1080 {
  2         3  
  2         2  
330 2 50       8 $fh->read( my $magic, 4 ) or croak "Cannot read header - $!";
331 2 100       118 croak "Expected to find 'RES' magic in packet" unless
332             $magic eq MAGIC_RESPONSE;
333              
334 1 50       6 $fh->read( my $header, 8 ) or croak "Cannot read header - $!";
335 1         8 my ( $type, $bodylen ) = unpack( "N N", $header );
336              
337 1         1 my $body = "";
338 1 50 33     4 $fh->read( $body, $bodylen ) or croak "Cannot read body - $!" if $bodylen;
339              
340 1         6 return $self->unpack_packet( $type, $body );
341             }
342              
343             =head2 build_packet_to_string
344              
345             $bytes = $gearman->build_packet_to_string( $name, @args );
346              
347             Returns a byte string containing a complete packet with the given fields.
348              
349             =cut
350              
351 7         12 sub build_packet_to_string ( $self, $name, @args )
  7         8  
352 7     7 1 5968 {
  7         30  
  7         13  
353 7         27 my ( $type, $body ) = $self->pack_packet( $name, @args );
354              
355 7         70 return pack "a4 N N a*", MAGIC_REQUEST, $type, length $body, $body;
356             }
357              
358             =head2 send_packet_to_fh
359              
360             $gearman->send_packet_to_fh( $fh, $name, @args );
361              
362             Sends a complete packet to the given filehandle. If an IO error happens, an
363             exception is thrown.
364              
365             =cut
366              
367 1         2 sub send_packet_to_fh ( $self, $fh, $name, @args )
  1         2  
  1         1  
368 1     1 1 284 {
  1         1  
  1         2  
369 1 50       3 $fh->print( $self->build_packet_to_string( $name, @args ) ) or croak "Cannot send packet - $!";
370             }
371              
372             =head2 send_packet
373              
374             $gearman->send_packet( $typename, @args );
375              
376             Packs a packet from a list of arguments then sends it; a combination of
377             C and C. Uses the implementation's C method.
378              
379             =cut
380              
381 5         7 sub send_packet ( $self, $type, @args )
  5         20  
382 5     5 1 147176 {
  5         29  
  5         6  
383 5         37 $self->send( $self->build_packet_to_string( $type, @args ) );
384             }
385              
386             =head2 on_recv
387              
388             $gearman->on_recv( $buffer );
389              
390             The implementation should call this method when more bytes of data have been
391             received. It parses and unpacks packets from the buffer, then dispatches to
392             the appropriately named C method. A combination of C and
393             C.
394              
395             The C<$buffer> scalar may be modified; if it still contains bytes left over
396             after the call these should be preserved by the implementation for the next
397             time it is called.
398              
399             =cut
400              
401             # hard to do $_[0] mutation with a signature
402             sub on_recv
403             {
404 4     4 1 998 my $self = shift;
405              
406 4         25 while( my ( $type, @args ) = $self->parse_packet_from_string( $_[0] ) ) {
407 5         8 my $method = "on_$type";
408 5         28 $self->$method( @args );
409             }
410             }
411              
412             *on_read = \&on_recv;
413              
414             =head2 on_ERROR
415              
416             $gearman->on_ERROR( $name, $message );
417              
418             Default handler for the C packet. This method should be overriden
419             by subclasses to change the behaviour.
420              
421             =cut
422              
423 1         2 sub on_ERROR ( $self, $name, $message )
  1         10  
424 1     1 1 1 {
  1         2  
  1         1  
425 1         7 die "Received Gearman error '$name' (\"$message\")\n";
426             }
427              
428             =head2 echo_request
429              
430             $payload = await $gearman->echo_request( $payload );
431              
432             Sends an C packet to the Gearman server, and returns a future that
433             will eventually yield the payload when the server responds.
434              
435             =cut
436              
437 1         1 sub echo_request ( $self, $payload )
438 1     1 1 380 {
  1         2  
  1         2  
439 1         4 my $state = $self->gearman_state;
440              
441 1         6 push $state->{gearman_echos}->@*, my $f = $self->new_future;
442              
443 1         21 $self->send_packet( ECHO_REQ => $payload );
444              
445 1         43 return $f;
446             }
447              
448 1         2 sub on_ECHO_RES ( $self, $payload )
449 1     1 0 559 {
  1         1  
  1         1  
450 1         3 my $state = $self->gearman_state;
451              
452 1         8 ( shift $state->{gearman_echos}->@* )->done( $payload );
453             }
454              
455             =head1 PROTOTYPICAL OBJECTS
456              
457             An alternative option to subclassing to provide the missing methods, is to use
458             C (or rather, one of the client or worker subclasses) as a
459             prototypical object, passing in CODE references for the missing methods to a
460             special constructor that creates a concrete object.
461              
462             This may be more convenient to use in smaller one-shot cases (like unit tests
463             or small scripts) instead of creating a subclass.
464              
465             my $socket = ...;
466              
467             my $client = Protocol::Gearman::Client->new_prototype(
468             send => sub { $socket->print( $_[1] ); },
469             new_future => sub { My::Future::Subclass->new },
470             );
471              
472             =head2 new_prototype
473              
474             $gearman = Protocol::Gearman->new_prototype( %methods )
475              
476             Returns a new prototypical object constructed using the given methods. The
477             named arguments must give values for the C and C methods.
478              
479             =cut
480              
481 1         2 sub new_prototype ( $class, %methods )
482 1     1 1 166153 {
  1         3  
  1         1  
483 1         3 my $self = bless {}, $class;
484              
485 1         2 foreach (qw( send new_future )) {
486 2 50 33     11 defined $methods{$_} and ref $methods{$_} eq "CODE" or
487             croak "Expected to receive a CODE reference for '$_'";
488              
489 2         9 $self->{"gearman_method_$_"} = $methods{$_};
490             }
491              
492 1         3 return $self;
493             }
494              
495             =head1 AUTHOR
496              
497             Paul Evans
498              
499             =cut
500              
501             0x55AA;