File Coverage

blib/lib/Protocol/Gearman/Client.pm
Criterion Covered Total %
statement 117 117 100.0
branch 12 18 66.6
condition 7 14 50.0
subroutine 18 18 100.0
pod 2 10 20.0
total 156 177 88.1


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014,2026 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman::Client 0.05;
7              
8 3     3   152791 use v5.20;
  3         9  
9 3     3   11 use warnings;
  3         25  
  3         134  
10              
11 3     3   15 use feature qw( postderef signatures );
  3         3  
  3         323  
12 3     3   11 no warnings qw( experimental::postderef experimental::signatures );
  3         5  
  3         105  
13              
14 3     3   18 use base qw( Protocol::Gearman );
  3         6  
  3         990  
15              
16 3     3   20 use Carp;
  3         5  
  3         168  
17              
18 3     3   1461 use Struct::Dumb;
  3         19371  
  3         25  
19              
20             struct Job => [qw( f on_data on_warning on_status exception )];
21              
22             =head1 NAME
23              
24             C - implement a Gearman client
25              
26             =head1 DESCRIPTION
27              
28             =for highlighter language=perl
29              
30             A base class that implements a complete Gearman client. This abstract class
31             still requires the implementation methods as documented in
32             L, but otherwise provides a full set of behaviour useful to
33             Gearman clients.
34              
35             As it is based on L it is suitable for both synchronous and
36             asynchronous use. When backed by an implementation capable of performing
37             asynchronously, this object fully supports asynchronous Gearman communication.
38             When backed by a synchronous implementation, it will still yield C
39             instances but the limitations of the synchronous implementation may limit how
40             much concurrency and asynchronous behaviour can be acheived.
41              
42             A simple concrete implementation suitable for synchronous use can be found in
43             L.
44              
45             =cut
46              
47             =head1 METHODS
48              
49             =cut
50              
51             =head2 option_request
52              
53             await $client->option_request( $option );
54              
55             Requests that the Gearman server enable the named option for this connection.
56              
57             The following options are defined by Gearman:
58              
59             =over 8
60              
61             =item * C
62              
63             Enables the use of the C packet; meaning the client is happy
64             to receive them.
65              
66             =back
67              
68             =cut
69              
70 1         2 sub option_request ( $self, $option )
71 1     1 1 174985 {
  1         2  
  1         2  
72 1         7 my $state = $self->gearman_state;
73              
74 1         3 my $request_f = $self->new_future;
75 1         23 $state->{gearman_option_reqs}{$option} = $request_f;
76              
77 1         3 $self->send_packet( OPTION_REQ => $option );
78              
79 1         67 return $request_f;
80             }
81              
82 1         2 sub on_OPTION_RES ( $self, $option )
83 1     1 0 5928 {
  1         1  
  1         1  
84 1         4 my $state = $self->gearman_state;
85 1         11 ( delete $state->{gearman_option_reqs}{$option} )->done();
86             }
87              
88             =head2 submit_job
89              
90             $result = await $client->submit_job( %args );
91              
92             Submits a job request to the Gearman server, and returns a future that will
93             eventually yield the result of the job or its failure.
94              
95             Takes the following required arguments:
96              
97             =over 8
98              
99             =item func => STRING
100              
101             The name of the function to call
102              
103             =item arg => STRING
104              
105             An opaque bytestring containing the argument data for the function. Its exact
106             format should be specified by the registered function.
107              
108             =back
109              
110             Takes the following optional arguments;
111              
112             =over 8
113              
114             =item background => BOOL
115              
116             If true, the job is submitted as a background request. Such a request will not
117             yield any status or completion information from the server. Once submitted the
118             server will not communicate about it further. In this case, the returned
119             future will complete with an empty result as soon as the job is accepted by
120             the server.
121              
122             =item priority => "high" | "low" | ""
123              
124             Alters the job priority on the server. If present, must be either C<"high">,
125             C<"low"> or the empty string.
126              
127             =item on_data => CODE
128              
129             Invoked on receipt of more incremental data from the worker.
130              
131             $on_data->( $data )
132              
133             =item on_warning => CODE
134              
135             Invoked on receipt of a warning from the worker.
136              
137             $on_warning->( $warning )
138              
139             =item on_status => CODE
140              
141             Invoked on a status update from the worker.
142              
143             $on_status->( $numerator, $denominator )
144              
145             =back
146              
147             =cut
148              
149 5         7 sub submit_job ( $self, %args )
150 5     5 1 8892 {
  5         19  
  5         7  
151 5   33     35 my $func = $args{func} // croak "Required 'func' is missing in submit_job";
152 5   33     13 my $arg = $args{arg} // croak "Required 'arg' is missing in submit_job";
153              
154 5         10 my $bg = !!$args{background};
155              
156 5   100     19 my $prio = $args{priority} // "";
157 5 50 66     17 $prio eq "" or $prio eq "high" or $prio eq "low" or
      33        
158             croak "Unrecognised 'priority' of '$prio'";
159              
160 5         27 my $state = $self->gearman_state;
161              
162 5         25 my $submit_f = $self->new_future;
163 5         76 push $state->{gearman_submits}->@*, $submit_f;
164              
165 5         23 my $f = $self->new_future;
166 5     5   198 $submit_f->on_done( sub ( $job_handle ) {
  5         8  
  5         6  
167 5 100       12 if( $bg ) {
168 1         4 $f->done;
169             }
170             else {
171             $state->{gearman_job}{$job_handle} = Job(
172             $f, $args{on_data}, $args{on_warning}, $args{on_status}, undef
173 4         30 );
174             }
175 5         50 });
176              
177 5         101 my $type = "SUBMIT_JOB";
178              
179 5 50       16 $type .= "_LOW" if $prio eq "low";
180 5 100       16 $type .= "_HIGH" if $prio eq "high";
181              
182 5 100       9 $type .= "_BG" if $bg;
183              
184 5         27 $self->send_packet( $type => $func, $state->{gearman_next_id}++, $arg );
185              
186 5         219 return $f;
187             }
188              
189 5         7 sub on_JOB_CREATED ( $self, $job_handle )
190 5     5 0 1498 {
  5         6  
  5         6  
191 5         12 my $state = $self->gearman_state;
192              
193 5         9 my $f = shift $state->{gearman_submits}->@*;
194 5         19 $f->done( $job_handle );
195             }
196              
197 1         8 sub on_WORK_DATA ( $self, $job_handle, $data )
  1         1  
198 1     1 0 26 {
  1         2  
  1         2  
199 1         2 my $state = $self->gearman_state;
200              
201 1         2 my $job = $state->{gearman_job}{$job_handle};
202              
203 1 50       14 $job->on_data->( $data ) if $job->on_data;
204             }
205              
206 1         1 sub on_WORK_WARNING ( $self, $job_handle, $warning )
  1         2  
207 1     1 0 23 {
  1         6  
  1         2  
208 1         2 my $state = $self->gearman_state;
209              
210 1         2 my $job = $state->{gearman_job}{$job_handle};
211              
212 1 50       14 $job->on_warning->( $warning ) if $job->on_warning;
213             }
214              
215 2         2 sub on_WORK_STATUS ( $self, $job_handle, $num, $denom )
  2         3  
  2         3  
216 2     2 0 104 {
  2         2  
  2         3  
217 2         3 my $state = $self->gearman_state;
218              
219 2         3 my $job = $state->{gearman_job}{$job_handle};
220              
221 2 50       26 $job->on_status->( $num, $denom ) if $job->on_status;
222             }
223              
224 3         5 sub on_WORK_COMPLETE ( $self, $job_handle, $result )
  3         10  
225 3     3 0 83 {
  3         4  
  3         3  
226 3         7 my $state = $self->gearman_state;
227              
228 3         6 my $job = delete $state->{gearman_job}{$job_handle};
229              
230 3         60 $job->f->done( $result );
231             }
232              
233 1         1 sub on_WORK_EXCEPTION ( $self, $job_handle, $exception )
  1         2  
234 1     1 0 65 {
  1         1  
  1         1  
235 1         3 my $state = $self->gearman_state;
236              
237 1         2 my $job = $state->{gearman_job}{$job_handle};
238 1         32 $job->exception = $exception;
239             }
240              
241 1         2 sub on_WORK_FAIL ( $self, $job_handle )
242 1     1 0 9 {
  1         1  
  1         1  
243 1         3 my $state = $self->gearman_state;
244              
245 1         2 my $job = delete $state->{gearman_job}{$job_handle};
246              
247 1         27 my $exception = $job->exception;
248 1 50       15 $job->f->fail( "Work failed", gearman => ( defined $exception ? ( $exception ) : () ) );
249             }
250              
251             =head1 AUTHOR
252              
253             Paul Evans
254              
255             =cut
256              
257             0x55AA;