File Coverage

blib/lib/JobCenter/Client/Mojo.pm
Criterion Covered Total %
statement 55 536 10.2
branch 1 216 0.4
condition 0 97 0.0
subroutine 19 94 20.2
pod 14 20 70.0
total 89 963 9.2


line stmt bran cond sub pod time code
1             package JobCenter::Client::Mojo;
2 1     1   927 use Mojo::Base 'Mojo::EventEmitter';
  1         3  
  1         10  
3              
4             our $VERSION = '0.45'; # VERSION
5              
6             #
7             # Mojo's default reactor uses EV, and EV does not play nice with signals
8             # without some handholding. We either can try to detect EV and do the
9             # handholding, or try to prevent Mojo using EV.
10             #
11             BEGIN {
12 1 50   1   2014 $ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'};
13             }
14              
15             # more Mojolicious
16 1     1   541 use Mojo::IOLoop;
  1         166154  
  1         19  
17 1     1   43 use Mojo::IOLoop::Stream;
  1         3  
  1         6  
18 1     1   543 use Mojo::Log;
  1         12685  
  1         13  
19              
20             # standard perl
21 1     1   45 use Carp qw(croak);
  1         3  
  1         50  
22 1     1   7 use Cwd qw(realpath);
  1         2  
  1         53  
23 1     1   8 use Data::Dumper;
  1         2  
  1         55  
24 1     1   6 use Encode qw(encode_utf8 decode_utf8);
  1         2  
  1         45  
25 1     1   6 use File::Basename;
  1         2  
  1         57  
26 1     1   6 use IO::Handle;
  1         3  
  1         47  
27 1     1   7 use POSIX ();
  1         1  
  1         15  
28 1     1   784 use Storable;
  1         3455  
  1         60  
29 1     1   527 use Sys::Hostname;
  1         1030  
  1         90  
30              
31             # from cpan
32 1     1   529 use JSON::RPC2::TwoWay 0.05;
  1         3979  
  1         35  
33             # JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here
34             # without adding another dependency
35 1     1   7 use JSON::MaybeXS qw(JSON);
  1         2  
  1         70  
36 1     1   552 use MojoX::NetstringStream 0.06; # for the enhanced close
  1         1367  
  1         8  
37              
38             # us
39 1     1   553 use JobCenter::Client::Mojo::Steps;
  1         3  
  1         7  
40              
41             has [qw(
42             actions address auth clientid conn debug ioloop jobs json
43             lastping log method ns ping_timeout port rpc timeout tls token who
44             )];
45              
46             # keep in sync with the jobcenter
47             use constant {
48 1         8039 WORK_OK => 0, # exit codes for work method
49             WORK_PING_TIMEOUT => 92,
50             WORK_CONNECTION_CLOSED => 91,
51 1     1   62 };
  1         2  
52              
53             sub new {
54 0     0 1   my ($class, %args) = @_;
55 0           my $self = $class->SUPER::new();
56              
57 0   0       my $address = $args{address} // '127.0.0.1';
58 0   0       my $debug = $args{debug} // 0; # or 1?
59 0   0       $self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton;
60 0   0       my $json = $args{json} // 1;
61 0 0 0       my $log = $args{log} // Mojo::Log->new(level => ($debug) ? 'debug' : 'info');
62 0   0       my $method = $args{method} // 'password';
63 0   0       my $port = $args{port} // 6522;
64 0   0       my $timeout = $args{timeout} // 60;
65 0   0       my $tls = $args{tls} // 0;
66 0           my $tls_ca = $args{tls_ca};
67 0           my $tls_cert = $args{tls_cert};
68 0           my $tls_key = $args{tls_key};
69 0 0         my $token = $args{token} or croak 'no token?';
70 0 0         my $who = $args{who} or croak 'no who?';
71              
72 0           $self->{address} = $address;
73 0   0       $self->{debug} = $args{debug} // 1;
74 0           $self->{jobs} = {};
75 0           $self->{json} = $json;
76             $self->{jsonobject} = $args{jsonobject} //
77             JSON::MaybeXS->new(utf8 => 1, allow_nonref => 1),
78 0   0       $self->{ping_timeout} = $args{ping_timeout} // 300;
      0        
79 0           $self->{log} = $log;
80 0           $self->{method} = $method;
81 0           $self->{port} = $port;
82 0           $self->{timeout} = $timeout;
83 0           $self->{tls} = $tls;
84 0           $self->{tls_ca} = $tls_ca;
85 0           $self->{tls_cert} = $tls_cert;
86 0           $self->{tls_key} = $tls_key;
87 0           $self->{token} = $token;
88 0           $self->{who} = $who;
89 0   0       $self->{autoconnect} = ($args{autoconnect} //= 1);
90              
91 0 0         return $self if !$args{autoconnect};
92              
93 0           $self->connect;
94              
95 0 0         return $self if $self->{auth};
96 0           return;
97             }
98              
99             sub connect {
100 0     0 1   my $self = shift;
101              
102 0           delete $self->ioloop->{__exit__};
103 0           delete $self->{auth};
104 0           $self->{actions} = {};
105              
106             $self->on(disconnect => sub {
107 0     0     my ($self, $code) = @_;
108             #$self->{_exit} = $code;
109 0           $self->ioloop->stop;
110 0           });
111              
112             my $rpc = JSON::RPC2::TwoWay->new(
113             debug => $self->{debug},
114             json => $self->{jsonobject},
115 0 0         ) or croak 'no rpc?';
116 0     0     $rpc->register('greetings', sub { $self->rpc_greetings(@_) }, notification => 1);
  0            
117 0     0     $rpc->register('job_done', sub { $self->rpc_job_done(@_) }, notification => 1);
  0            
118 0     0     $rpc->register('ping', sub { $self->rpc_ping(@_) });
  0            
119 0     0     $rpc->register('task_ready', sub { $self->rpc_task_ready(@_) }, notification => 1);
  0            
120              
121             my $clarg = {
122             address => $self->{address},
123             port => $self->{port},
124             tls => $self->{tls},
125 0           };
126 0 0         $clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca};
127 0 0         $clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert};
128 0 0         $clarg->{tls_key} = $self->{tls_key} if $self->{tls_key};
129              
130             my $clientid = $self->ioloop->client(
131             $clarg => sub {
132 0     0     my ($loop, $err, $stream) = @_;
133 0 0         if ($err) {
134 0           $err =~ s/\n$//s;
135 0           $self->log->info('connection to API failed: ' . $err);
136 0           $self->{auth} = 0;
137 0           return;
138             }
139 0           my $ns = MojoX::NetstringStream->new(stream => $stream);
140 0           $self->{ns} = $ns;
141             my $conn = $rpc->newconnection(
142             owner => $self,
143 0           write => sub { $ns->write(@_) },
144 0           );
145 0           $self->{conn} = $conn;
146             $ns->on(chunk => sub {
147 0           my ($ns2, $chunk) = @_;
148             #say 'got chunk: ', $chunk;
149 0           my @err = $conn->handle($chunk);
150 0 0         $self->log->debug('chunk handler: ' . join(' ', grep defined, @err)) if @err;
151 0 0         $ns->close if $err[0];
152 0           });
153             $ns->on(close => sub {
154             # this cb is called during global destruction, at
155             # least on old perls where
156             # Mojo::Util::_global_destruction() won't work
157 0 0         return unless $conn;
158 0           $conn->close;
159 0           $self->log->info('connection to API closed');
160 0           $self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo doc
161 0           });
162 0           });
163              
164 0           $self->{rpc} = $rpc;
165 0           $self->{clientid} = $clientid;
166              
167             # handle timeout?
168             my $tmr = $self->ioloop->timer($self->{timeout} => sub {
169 0     0     my $loop = shift;
170 0           $self->log->error('timeout wating for greeting');
171 0           $loop->remove($clientid);
172 0           $self->{auth} = 0;
173 0           });
174              
175 0           $self->log->debug('starting handshake');
176              
177 0     0     $self->_loop(sub { not defined $self->{auth} });
  0            
178              
179 0           $self->log->debug('done with handhake?');
180              
181 0           $self->ioloop->remove($tmr);
182 0           $self->unsubscribe('disconnect');
183 0           1;
184             }
185              
186             sub is_connected {
187 0     0 0   my $self = shift;
188 0   0       return $self->{auth} && !$self->ioloop->{__exit__};
189             }
190              
191             sub rpc_greetings {
192 0     0 0   my ($self, $c, $i) = @_;
193             #$self->ioloop->delay(
194             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
195             sub {
196 0     0     my $steps = shift;
197 0 0         die "wrong api version $i->{version} (expected 1.1)" unless $i->{version} eq '1.1';
198 0           $self->log->info('got greeting from ' . $i->{who});
199 0           $c->call('hello', {who => $self->who, method => $self->method, token => $self->token}, $steps->next());
200             },
201             sub {
202 0     0     my ($steps, $e, $r) = @_;
203 0           my $w;
204             #say 'hello returned: ', Dumper(\@_);
205 0 0         die "hello returned error $e->{message} ($e->{code})" if $e;
206 0 0         die 'no results from hello?' unless $r;
207 0           ($r, $w) = @$r;
208 0 0         if ($r) {
209 0           $self->log->info("hello returned: $r, $w");
210 0           $self->{auth} = 1;
211             } else {
212 0   0       $self->log->error('hello failed: ' . ($w // ''));
213 0           $self->{auth} = 0; # defined but false
214             }
215             }], sub {
216 0     0     my ($err) = @_;
217 0           $self->log->error('something went wrong in handshake: ' . $err);
218 0           $self->{auth} = '';
219 0           });
220             }
221              
222             sub rpc_job_done {
223 0     0 0   my ($self, $conn, $i) = @_;
224 0           my $job_id = $i->{job_id};
225 0           my $outargs = $i->{outargs};
226 0           my $outargsj = $self->{jsonobject}->encode($outargs);
227 0 0         $outargs = $outargsj if $self->{json};
228 0           $outargsj = decode_utf8($outargsj); # for debug printing
229 0           my $rescb = delete $self->{jobs}->{$job_id};
230 0 0         if ($rescb) {
231 0           $self->log->debug("got job_done: for job_id $job_id result: $outargsj");
232 0           local $@;
233 0           eval {
234 0           $rescb->($job_id, $outargs);
235             };
236 0 0         $self->log->info("got $@ calling result callback") if $@;
237             } else {
238 0           $self->log->debug("got job_done for unknown job $job_id result: $outargsj");
239             }
240             }
241              
242             sub call {
243 0     0 1   my ($self, %args) = @_;
244 0           my ($done, $job_id, $outargs);
245             $args{cb1} = sub {
246 0     0     ($job_id, $outargs) = @_;
247 0 0         $done++ unless $job_id;
248 0           };
249             $args{cb2} = sub {
250 0     0     ($job_id, $outargs) = @_;
251 0           $done++;
252 0           };
253 0           $self->call_nb(%args);
254              
255 0     0     $self->_loop(sub { !$done });
  0            
256              
257 0           return $job_id, $outargs;
258             }
259              
260             sub call_nb {
261 0     0 1   my ($self, %args) = @_;
262 0 0         my $wfname = $args{wfname} or die 'no workflowname?';
263 0           my $vtag = $args{vtag};
264 0   0       my $inargs = $args{inargs} // '{}';
265 0   0       my $callcb = $args{cb1} // die 'no call callback?';
266 0   0       my $rescb = $args{cb2} // die 'no result callback?';
267 0   0       my $timeout = $args{timeout} // $self->timeout * 5; # a bit hackish..
268 0           my $reqauth = $args{reqauth};
269 0           my $clenv = $args{clenv};
270 0           my $inargsj;
271              
272 0 0         if ($self->{json}) {
273 0           $inargsj = $inargs;
274 0           $inargs = $self->{jsonobject}->decode($inargs);
275 0 0         croak 'inargs is not a json object' unless ref $inargs eq 'HASH';
276 0 0         if ($clenv) {
277 0           $clenv = $self->{jsonobject}->decode($clenv);
278 0 0         croak 'clenv is not a json object' unless ref $clenv eq 'HASH';
279             }
280 0 0         if ($reqauth) {
281 0           $reqauth = $self->{jsonobject}->decode($reqauth);
282 0 0         croak 'reqauth is not a json object' unless ref $reqauth eq 'HASH';
283             }
284             } else {
285 0 0         croak 'inargs should be a hashref' unless ref $inargs eq 'HASH';
286             # test encoding
287 0           $inargsj = $self->{jsonobject}->encode($inargs);
288 0 0         if ($clenv) {
289 0 0         croak 'clenv should be a hashref' unless ref $clenv eq 'HASH';
290             }
291 0 0         if ($reqauth) {
292 0 0         croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH';
293             }
294             }
295              
296 0           $inargsj = decode_utf8($inargsj);
297 0 0         $self->log->debug("calling $wfname with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : ''));
298              
299             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
300             sub {
301 0     0     my $steps = shift;
302 0 0         $self->conn->call('create_job', {
    0          
303             wfname => $wfname,
304             vtag => $vtag,
305             inargs => $inargs,
306             timeout => $timeout,
307             ($clenv ? (clenv => $clenv) : ()),
308             ($reqauth ? (reqauth => $reqauth) : ()),
309             }, $steps->next());
310             },
311             sub {
312 0     0     my ($steps, $e, $r) = @_;
313 0           my ($job_id, $msg);
314 0 0         if ($e) {
315 0           $self->log->error("create_job returned error: $e->{message} ($e->{code}");
316 0           $msg = "$e->{message} ($e->{code}"
317             } else {
318 0           ($job_id, $msg) = @$r; # fixme: check for arrayref?
319 0 0         if ($msg) {
    0          
320 0           $self->log->error("create_job returned error: $msg");
321             } elsif ($job_id) {
322 0           $self->log->debug("create_job returned job_id: $job_id");
323 0           $self->jobs->{$job_id} = $rescb;
324             }
325             }
326 0 0         if ($msg) {
327 0 0         $msg = {error => $msg} unless ref $msg;
328 0 0         $msg = $self->{jsonobject}->encode($msg) if $self->{json};
329             }
330 0           $callcb->($job_id, $msg);
331             }], sub {
332 0     0     my ($err) = @_;
333 0           $self->log->error("Something went wrong in call_nb: $err");
334 0           $err = { error => $err };
335 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
336 0           $callcb->(undef, $err);
337 0           });
338             }
339              
340             sub close {
341 0     0 1   my ($self) = @_;
342              
343 0           $self->log->debug('closing connection');
344 0           $self->conn->close();
345 0           $self->ns->close();
346 0           %$self = ();
347             }
348              
349             sub find_jobs {
350 0     0 1   my ($self, $filter) = @_;
351 0 0         croak('no filter?') unless $filter;
352              
353             #print 'filter: ', Dumper($filter);
354 0 0         $filter = $self->{jsonobject}->encode($filter) if ref $filter eq 'HASH';
355              
356 0           my ($done, $err, $jobs);
357             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
358             sub {
359 0     0     my $steps = shift;
360             # fixme: check results?
361 0           $self->conn->call('find_jobs', { filter => $filter }, $steps->next());
362             },
363             sub {
364             #say 'find_jobs call returned: ', Dumper(\@_);
365 0     0     my ($steps, $e, $r) = @_;
366 0           $done++; # received something so done waiting
367 0 0         if ($e) {
368 0           $self->log->error("find_jobs got error $e->{message} ($e->{code})");
369 0           $err = $e->{message};
370 0           return;
371             }
372 0           $jobs = $r;
373             }], sub {
374 0     0     my ($err) = @_;
375 0           $done++;
376 0           $self->log->error("something went wrong with get_job_status: $err");
377 0           });
378              
379 0     0     $self->_loop(sub { !$done });
  0            
380              
381 0 0         return $err, @$jobs if ref $jobs eq 'ARRAY';
382 0           return $err;
383             }
384              
385             sub check_if_lock_exists {
386 0     0 1   my ($self, $locktype, $lockvalue) = @_;
387 0 0         croak('no locktype?') unless $locktype;
388 0 0         croak('no lockvalue?') unless $lockvalue;
389              
390 0           my ($done, $err, $found);
391             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
392             sub {
393 0     0     my $steps = shift;
394             # fixme: check results?
395 0           $self->conn->call(
396             'check_if_lock_exists',
397             { locktype => $locktype, lockvalue => $lockvalue },
398             $steps->next()
399             );
400             },
401             sub {
402             #say 'find_jobs call returned: ', Dumper(\@_);
403 0     0     my ($steps, $e, $r) = @_;
404 0           $done++; # received something so done waiting
405 0 0         if ($e) {
406 0           $self->log->error("find_jobs got error $e->{message} ($e->{code})");
407 0           $err = $e->{message};
408 0           return;
409             }
410 0           $found = $r;
411             }], sub {
412 0     0     my ($err) = @_;
413 0           $done++;
414 0           $self->log->error("something went wrong with check_if_lock_exists: $err");
415 0           });
416              
417 0     0     $self->_loop(sub { !$done });
  0            
418              
419 0 0         $found = $self->{jsonobject}->encode($found) if $self->{json};
420 0           return $found;
421             }
422              
423             sub get_api_status {
424 0     0 0   my ($self, $what) = @_;
425 0 0         croak('no what?') unless $what;
426              
427 0           my ($done, $result);
428             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
429             sub {
430 0     0     my $steps = shift;
431 0           $self->conn->call('get_api_status', { what => $what }, $steps->next());
432             },
433             sub {
434             #say 'call returned: ', Dumper(\@_);
435 0     0     my ($d, $e, $r) = @_;
436 0           $done++; # received something so done waiting
437 0 0         if ($e) {
438 0           $self->log->error("get_api_status got error $e->{message} ($e->{code})");
439 0           $result = $e->{message};
440 0           return;
441             }
442 0           $result = $r;
443             }], sub {
444 0     0     my ($err) = @_;
445 0           $done++;
446 0           $self->log->eror("something went wrong with get_api_status: $err");
447 0           });
448              
449 0     0     $self->_loop(sub { !$done });
  0            
450              
451 0           return $result;
452             }
453              
454             sub get_job_status {
455 0     0 1   my ($self, $job_id) = @_;
456 0 0         croak('no job_id?') unless $job_id;
457              
458 0           my ($done, $job_id2, $outargs);
459             $self->get_job_status_nb(
460             job_id => $job_id,
461             statuscb => sub {
462 0     0     ($job_id2, $outargs) = @_;
463 0           $done++;
464 0           return;
465             },
466 0           );
467 0     0     $self->_loop(sub { !$done });
  0            
468 0           return $job_id2, $outargs;
469             }
470              
471             sub get_job_status_nb {
472 0     0 1   my ($self, %args) = @_;
473             my $job_id = $args{job_id} or
474 0 0         croak('no job_id?');
475              
476 0           my $statuscb = $args{statuscb};
477 0 0         croak('statuscb should be a coderef')
478             if ref $statuscb ne 'CODE';
479              
480 0           my $notifycb = $args{notifycb};
481 0 0 0       croak('notifycb should be a coderef')
482             if $notifycb and ref $notifycb ne 'CODE';
483              
484             #my ($done, $job_id2, $outargs);
485             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
486             sub {
487 0     0     my $steps = shift;
488             # fixme: check results?
489 0 0         $self->conn->call(
490             'get_job_status', {
491             job_id => $job_id,
492             notify => ($notifycb ? JSON->true : JSON->false),
493             }, $steps->next()
494             );
495             },
496             sub {
497             #say 'call returned: ', Dumper(\@_);
498 0     0     my ($steps, $e, $r) = @_;
499             #$self->log->debug("get_job_satus_nb got job_id: $res msg: $msg");
500 0 0         if ($e) {
501 0           $self->log->error("get_job_status got error $e->{message} ($e->{code})");
502 0           $statuscb->(undef, $e->{message});
503 0           return;
504             }
505 0           my ($job_id2, $outargs) = @$r;
506 0 0 0       if ($notifycb and !$job_id2 and !$outargs) {
      0        
507 0           $self->jobs->{$job_id} = $notifycb;
508             }
509 0 0 0       $outargs = $self->{jsonobject}->encode($outargs) if $self->{json} and ref $outargs;
510 0           $statuscb->($job_id2, $outargs);
511 0           return;
512             }], sub {
513 0     0     my ($err) = @_;
514 0           $self->log->error("Something went wrong in get_job_status_nb: $err");
515 0           $err = { error => $err };
516 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
517 0           $statuscb->(undef, $err);
518 0           });
519             }
520              
521             sub ping {
522 0     0 1   my ($self, $timeout) = @_;
523              
524 0   0       $timeout //= $self->timeout;
525 0           my ($done, $ret);
526              
527             $self->ioloop->timer($timeout => sub {
528 0     0     $done++;
529 0           });
530              
531             $self->conn->call('ping', {}, sub {
532 0     0     my ($e, $r) = @_;
533 0 0 0       if (not $e and $r and $r =~ /pong/) {
      0        
534 0           $ret = 1;
535             } else {
536 0           %$self = ();
537             }
538 0           $done++;
539 0           });
540              
541 0     0     $self->_loop(sub { !$done });
  0            
542              
543 0           return $ret;
544             }
545              
546             sub work {
547 0     0 1   my ($self, $prepare) = @_;
548              
549 0           my $pt = $self->ping_timeout;
550 0           my $tmr;
551             $tmr = $self->ioloop->recurring($pt => sub {
552 0     0     my $ioloop = shift;
553 0   0       $self->log->debug('in ping_timeout timer: lastping: '
554             . ($self->lastping // 0) . ' limit: ' . (time - $pt) );
555 0 0 0       return if ($self->lastping // 0) > time - $pt;
556 0           $self->log->error('ping timeout');
557 0           $ioloop->remove($self->clientid);
558 0           $ioloop->remove($tmr);
559 0           $ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc
560 0           $ioloop->stop;
561 0 0         }) if $pt > 0;
562             $self->on(disconnect => sub {
563 0     0     my ($self, $code) = @_;
564 0           $self->ioloop->{__exit__} = $code;
565 0           $self->ioloop->stop;
566 0           });
567 0 0         return 0 if $prepare;
568              
569 0           $self->ioloop->{__exit__} = WORK_OK;
570 0           $self->log->debug('JobCenter::Client::Mojo starting work');
571 0 0         $self->ioloop->start unless Mojo::IOLoop->is_running;
572 0           $self->log->debug('JobCenter::Client::Mojo done?');
573 0 0         $self->ioloop->remove($tmr) if $tmr;
574              
575 0           return $self->ioloop->{__exit__};
576             }
577              
578             sub stop {
579 0     0 1   my ($self, $exit) = @_;
580 0           $self->ioloop->{__exit__} = $exit;
581 0           $self->ioloop->stop;
582             }
583              
584             sub create_slotgroup {
585 0     0 1   my ($self, $name, $slots) = @_;
586 0 0         croak('no slotgroup name?') unless $name;
587              
588 0           my ($done, $result);
589             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
590             sub {
591 0     0     my $steps = shift;
592 0           $self->conn->call('create_slotgroup', { name => $name, slots => $slots }, $steps->next());
593             },
594             sub {
595             #say 'call returned: ', Dumper(\@_);
596 0     0     my ($steps, $e, $r) = @_;
597 0           $done++; # received something so done waiting
598 0 0         if ($e) {
599 0           $self->log->error("create_slotgroup got error $e->{message}");
600 0           $result = $e->{message};
601 0           return;
602             }
603 0           $result = $r;
604             }], sub {
605 0     0     my ($err) = @_;
606 0           $done++;
607 0           $self->log->eror("something went wrong with create_slotgroup: $err");
608 0           });
609              
610 0     0     $self->_loop(sub { !$done });
  0            
611              
612 0           return $result;
613             }
614              
615             sub announce {
616 0     0 1   my ($self, %args) = @_;
617 0 0         my $actionname = $args{actionname} or croak 'no actionname?';
618 0 0         my $cb = $args{cb} or croak 'no cb?';
619             #my $async = $args{async} // 0;
620 0 0 0       my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync');
621 0 0         croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|sync)$/;
622 0           my $undocb = $args{undocb};
623 0           my $host = hostname;
624 0   0       my $workername = $args{workername} // "$self->{who} $host $0 $$";
625              
626 0 0         croak "already have action $actionname" if $self->actions->{$actionname};
627              
628 0           my ($done, $err);
629             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
630             sub {
631 0     0     my $steps = shift;
632             $self->conn->call('announce', {
633             workername => $workername,
634             actionname => $actionname,
635             slotgroup => $args{slotgroup},
636             slots => $args{slots},
637 0 0         (($args{filter}) ? (filter => $args{filter}) : ()),
638             }, $steps->next());
639             },
640             sub {
641             #say 'call returned: ', Dumper(\@_);
642 0     0     my ($steps, $e, $r) = @_;
643 0           $done++; # received something so done waiting
644 0 0         if ($e) {
645 0           $self->log->error("announce got error: $e->{message}");
646 0           $err = $e->{message};
647 0           return;
648             }
649 0           my ($res, $msg) = @$r;
650 0           $self->log->debug("announce got res: $res msg: $msg");
651             $self->actions->{$actionname} = {
652             cb => $cb,
653             mode => $mode,
654             undocb => $undocb,
655 0 0 0       addenv => $args{addenv} // 0,
656             } if $res;
657 0 0         $err = $msg unless $res;
658             }], sub {
659 0     0     ($err) = @_;
660 0           $done++;
661 0           $self->log->error("something went wrong with announce: $err");
662 0           });
663              
664 0     0     $self->_loop(sub { !$done });
  0            
665 0           return $err;
666             }
667              
668             sub rpc_ping {
669 0     0 0   my ($self, $c, $i, $rpccb) = @_;
670 0           $self->lastping(time());
671 0           return 'pong!';
672             }
673              
674             sub rpc_task_ready {
675             #say 'got task_ready: ', Dumper(\@_);
676 0     0 0   my ($self, $c, $i) = @_;
677 0           my $actionname = $i->{actionname};
678 0           my $job_id = $i->{job_id};
679 0           my $action = $self->actions->{$actionname};
680 0 0         unless ($action) {
681 0           $self->log->info("got task_ready for unknown action $actionname");
682 0           return;
683             }
684              
685 0           $self->log->debug("got task_ready for $actionname job_id $job_id calling get_task");
686             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
687             sub {
688 0     0     my $steps = shift;
689 0           $c->call('get_task', {actionname => $actionname, job_id => $job_id}, $steps->next());
690             },
691             sub {
692 0     0     my ($steps, $e, $r) = @_;
693             #say 'get_task returned: ', Dumper(\@_);
694 0 0         if ($e) {
695 0           $$self->log->debug("got $e->{message} ($e->{code}) calling get_task");
696             }
697 0 0         unless ($r) {
698 0           $self->log->debug('no task for get_task');
699 0           return;
700             }
701 0           my ($cookie, @args);
702 0           ($job_id, $cookie, @args) = @$r;
703 0 0         unless ($cookie) {
704 0           $self->log->debug('aaah? no cookie? (get_task)');
705 0           return;
706             }
707 0 0         pop @args unless $action->{addenv}; # remove env
708 0           local $@;
709 0 0         if ($action->{mode} eq 'subproc') {
    0          
    0          
710 0           eval {
711 0           $self->_subproc($c, $action, $job_id, $cookie, @args);
712             };
713 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
714             } elsif ($action->{mode} eq 'async') {
715 0           eval {
716             $action->{cb}->($job_id, @args, sub {
717 0           $c->notify('task_done', { cookie => $cookie, outargs => $_[0] });
718 0           });
719             };
720 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
721             } elsif ($action->{mode} eq 'sync') {
722 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
723 0 0         $outargs = { error => $@ } if $@;
724 0           $c->notify('task_done', { cookie => $cookie, outargs => $outargs });
725             } else {
726 0           die "unkown mode $action->{mode}";
727             }
728             }], sub {
729 0     0     my ($err) = @_;
730 0           $self->log->error("something went wrong with rpc_task_ready: $err");
731 0           });
732             }
733              
734             sub _subproc {
735 0     0     my ($self, $c, $action, $job_id, $cookie, @args) = @_;
736              
737             # based on Mojo::IOLoop::Subprocess
738 0           my $ioloop = $self->ioloop;
739              
740             # Pipe for subprocess communication
741 0 0         pipe(my $reader, my $writer) or die "Can't create pipe: $!";
742              
743 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
744 0 0         unless ($pid) {# Child
745 0           $self->log->debug("in child $$");;
746 0           $ioloop->reset;
747 0           CORE::close $reader; # or we won't get a sigpipe when daddy dies..
748 0           my $undo = 0;
749 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
750 0 0 0       if ($@) {
    0          
751 0           $outargs = {'error' => $@};
752 0           $undo++;
753             } elsif (ref $outargs eq 'HASH' and $outargs->{'error'}) {
754 0           $undo++;
755             }
756 0 0 0       if ($undo and $action->{undocb}) {
757 0           $self->log->info("undoing for $job_id");;
758 0           my $res = eval { $action->{undocb}->($job_id, @args); };
  0            
759 0 0         $res = $@ if $@;
760             # how should this look?
761             $outargs = {'error' => {
762             'msg' => 'undo failure',
763             'undo' => $res,
764             'olderr' => $outargs->{error},
765 0           }};
766 0           $undo = 0;
767             }
768             # stop ignoring sigpipe
769 0     0     $SIG{PIPE} = sub { $undo++ };
  0            
770             # if the parent is gone we get a sigpipe here:
771 0           print $writer Storable::freeze($outargs);
772 0 0         $writer->flush or $undo++;
773 0 0         CORE::close $writer or $undo++;
774 0 0 0       if ($undo and $action->{undocb}) {
775 0           $self->log->info("undoing for $job_id");;
776 0           eval { $action->{undocb}->($job_id, @args); };
  0            
777             # ignore errors because we can't report them back..
778             }
779             # FIXME: normal exit?
780 0           POSIX::_exit(0);
781             }
782              
783             # Parent
784 0           my $me = $$;
785 0           CORE::close $writer;
786 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
787 0           $ioloop->stream($stream);
788 0           my $buffer = '';
789 0     0     $stream->on(read => sub { $buffer .= pop });
  0            
790             $stream->on(
791             close => sub {
792             #say "close handler!";
793 0 0   0     return unless $$ == $me;
794 0           waitpid $pid, 0;
795 0           my $outargs = eval { Storable::thaw($buffer) };
  0            
796 0 0         $outargs = { error => $@ } if $@;
797 0 0 0       if ($outargs and ref $outargs eq 'HASH') {
798 0           $self->log->debug('subprocess results: ' . Dumper($outargs));
799 0           eval { $c->notify(
  0            
800             'task_done',
801             { cookie => $cookie, outargs => $outargs }
802             ); }; # the connection might be gone?
803             } # else?
804             }
805 0           );
806             }
807              
808             # tick while Mojo::Reactor is still running and condition callback is true
809             sub _loop {
810 0 0   0     warn __PACKAGE__." recursing into IO loop" if state $looping++;
811              
812 0           my $reactor = $_[0]->ioloop->singleton->reactor;
813 0           my $err;
814              
815 0 0         if (ref $reactor eq 'Mojo::Reactor::EV') {
    0          
816              
817 0           my $active = 1;
818              
819 0   0       $active = $reactor->one_tick while $_[1]->() && $active;
820              
821             } elsif (ref $reactor eq 'Mojo::Reactor::Poll') {
822              
823 0           $reactor->{running}++;
824              
825 0   0       $reactor->one_tick while $_[1]->() && $reactor->is_running;
826              
827 0   0       $reactor->{running} &&= $reactor->{running} - 1;
828              
829             } else {
830              
831 0           $err = "unknown reactor: ".ref $reactor;
832             }
833              
834 0           $looping--;
835 0 0         die $err if $err;
836             }
837              
838             #sub DESTROY {
839             # my $self = shift;
840             # say 'destroying ', $self;
841             #}
842              
843             1;
844              
845             =encoding utf8
846              
847             =head1 NAME
848              
849             JobCenter::Client::Mojo - JobCenter JSON-RPC 2.0 Api client using Mojo.
850              
851             =head1 SYNOPSIS
852              
853             use JobCenter::Client::Mojo;
854              
855             my $client = JobCenter::Client::Mojo->new(
856             address => ...
857             port => ...
858             who => ...
859             token => ...
860             );
861              
862             my ($job_id, $outargs) = $client->call(
863             wfname => 'test',
864             inargs => { test => 'test' },
865             );
866              
867             =head1 DESCRIPTION
868              
869             L is a class to build a client to connect to the
870             JSON-RPC 2.0 Api of the L workflow engine. The client can be
871             used to create and inspect jobs as well as for providing 'worker' services
872             to the JobCenter.
873              
874             =head1 METHODS
875              
876             =head2 new
877              
878             $client = JobCenter::Client::Mojo->new(%arguments);
879              
880             Class method that returns a new JobCenter::Client::Mojo object.
881              
882             Valid arguments are:
883              
884             =over 4
885              
886             =item - address: address of the Api.
887              
888             (default: 127.0.0.1)
889              
890             =item - port: port of the Api
891              
892             (default 6522)
893              
894             =item - tls: connect using tls
895              
896             (default false)
897              
898             =item - tls_ca: verify server using ca
899              
900             (default undef)
901              
902             =item - tls_key: private client key
903              
904             (default undef)
905              
906             =item - tls_ca: public client certificate
907              
908             (default undef)
909              
910             =item - who: who to authenticate as.
911              
912             (required)
913              
914             =item - method: how to authenticate.
915              
916             (default: password)
917              
918             =item - token: token to authenticate with.
919              
920             (required)
921              
922             =item - debug: when true prints debugging using L
923              
924             (default: false)
925              
926             =item - ioloop: L object to use
927              
928             (per default the L->singleton object is used)
929              
930             =item - json: flag wether input is json or perl.
931              
932             when true expects the inargs to be valid json, when false a perl hashref is
933             expected and json encoded. (default true)
934              
935             =item - jsonobject: json encoder/decoder object to use
936              
937             (per default a new L object is created)
938              
939             =item - log: L object to use
940              
941             (per default a new L object is created)
942              
943             =item - timeout: how long to wait for Api calls to complete
944              
945             (default 60 seconds)
946              
947             =item - ping_timeout: after this long without a ping from the Api the
948             connection will be closed and the work() method will return
949              
950             (default 5 minutes)
951              
952             =back
953              
954             =head2 call
955              
956             ($job_id, $result) = $client->call(%args);
957              
958             Creates a new L job and waits for the results. Throws an error
959             if somethings goes wrong immediately. Errors encountered during later
960             processing are returned as a L error object.
961              
962             Valid arguments are:
963              
964             =over 4
965              
966             =item - wfname: name of the workflow to call (required)
967              
968             =item - inargs: input arguments for the workflow (if any)
969              
970             =item - vtag: version tag of the workflow to use (optional)
971              
972             =item - timeout: wait this many seconds for the job to finish
973             (optional, defaults to 5 times the Api-call timeout, so default 5 minutes)
974              
975             =item - reqauth: authentication token to be passed on to the authentication
976             module of the API for per job/request authentication.
977              
978             =item - clenv: client environment, made available as part of the job
979             environment and inherited to child jobs.
980              
981             =back
982              
983             =head2 call_nb
984              
985             $job_id = $client->call_nb(%args);
986              
987             Creates a new L job and call the provided callback on completion
988             of the job. Throws an error if somethings goes wrong immediately. Errors
989             encountered during later processing are returned as a L error
990             object to the callback.
991              
992             Valid arguments are those for L and:
993              
994             =over 4
995              
996             =item - cb1: coderef to the callback to call on job creation (requird)
997              
998             ( cb1 => sub { ($job_id, $err) = @_; ... } )
999              
1000             If job_id is undefined the job was not created, the error is then returned
1001             as the second return value.
1002              
1003             =item - cb2: coderef to the callback to call on job completion (requird)
1004              
1005             ( cb2 => sub { ($job_id, $outargs) = @_; ... } )
1006              
1007             =back
1008              
1009             =head2 get_job_status
1010              
1011             ($job_id, $result) = $client->get_job_status($job_id);
1012              
1013             Retrieves the status for the given $job_id. If the job_id does not exist
1014             then the returned $job_id will be undefined and $result will be an error
1015             message. If the job has not finished executing then both $job_id and
1016             $result will be undefined. Otherwise the $result will contain the result of
1017             the job. (Which may be a JobCenter error object)
1018              
1019             =head2 get_job_status_nb
1020              
1021             $client->get_job_status_nb(%args);
1022              
1023             Retrieves the status for the given $job_id.
1024              
1025             Valid arguments are:
1026              
1027             =over 4
1028              
1029             =item - job_id
1030              
1031             =item - statuscb: coderef to the callback for the current status
1032              
1033             ( statuscb => sub { ($job_id, $result) = @_; ... } )
1034              
1035             If the job_id does not exist then the returned $job_id will be undefined
1036             and $result will be an error message. If the job has not finished executing
1037             then both $job_id and $result will be undefined. Otherwise the $result will
1038             contain the result of the job. (Which may be a JobCenter error object)
1039              
1040             =item - notifycb: coderef to the callback for job completion
1041              
1042             ( statuscb => sub { ($job_id, $result) = @_; ... } )
1043              
1044             If the job was still running when the get_job_status_nb call was made then
1045             this callback will be called on completion of the job.
1046              
1047             =back
1048              
1049             =head2 check_if_lock_exists
1050              
1051             $found = $client->find_jobs($locktype, $lockvalue);
1052              
1053             Checks if a lock with the given locktype and lockvalue exists. (I.e. a job
1054             is currently running that holds that lock.
1055              
1056             Returns true if the lock exists, null if the locktype does not exist, false
1057             otherwise.
1058              
1059             =head2 find_jobs
1060              
1061             ($err, @jobs) = $client->find_jobs({'foo'=>'bar'});
1062              
1063             Finds all currently running jobs with arguments matching the filter
1064             expression. The expression is evaluated in PostgreSQL using the @> for
1065             jsonb objects, basically this means that you can only do equality tests for
1066             one or more top-level keys. If @jobs is empty $err might contain an error
1067             message.
1068              
1069             =head2 ping
1070              
1071             $status = $client->ping($timeout);
1072              
1073             Tries to ping the JobCenter API. On success return true. On failure returns
1074             the undefined value, after that the client object should be undefined.
1075              
1076             =head2 close
1077              
1078             $client->close()
1079              
1080             Closes the connection to the JobCenter API and tries to de-allocate
1081             everything. Trying to use the client afterwards will produce errors.
1082              
1083             =head2 create_slotgroup
1084              
1085             $client->create_slotgroup($name, $slots)
1086              
1087             A 'slotgroup' is a way of telling the JobCenter API how many taskss the
1088             worker can do at once. The number of slots should be a positive integer.
1089             Slotgroups names starting with a _ (underscore) are reserved for internal
1090             use. Returns a error message when then was an error, the empty string
1091             otherwise.
1092              
1093             =head2 announce
1094              
1095             Announces the capability to do an action to the Api. The provided callback
1096             will be called when there is a task to be performed. Returns an error when
1097             there was a problem announcing the action.
1098              
1099             my $err = $client->announce(
1100             workername => 'me',
1101             actionname => 'do',
1102             slots => 1
1103             cb => sub { ... },
1104             );
1105             die "could not announce $actionname?: $err" if $err;
1106              
1107             See L for an example.
1108              
1109             Valid arguments are:
1110              
1111             =over 4
1112              
1113             =item - workername: name of the worker
1114              
1115             (optional, defaults to client->who, processname and processid)
1116              
1117             =item - actionname: name of the action
1118              
1119             (required)
1120              
1121             =item - cb: callback to be called for the action
1122              
1123             (required)
1124              
1125             =item - mode: callback mode
1126              
1127             (optional, default 'sync')
1128              
1129             Possible values:
1130              
1131             =over 8
1132              
1133             =item - 'sync': simple blocking mode, just return the results from the
1134             callback. Use only for callbacks taking less than (about) a second.
1135              
1136             =item - 'subproc': the simple blocking callback is started in a seperate
1137             process. Useful for callbacks that take a long time.
1138              
1139             =item - 'async': the callback gets passed another callback as the last
1140             argument that is to be called on completion of the task. For advanced use
1141             cases where the worker is actually more like a proxy. The (initial)
1142             callback is expected to return soonish to the event loop, after setting up
1143             some Mojo-callbacks.
1144              
1145             =back
1146              
1147             =item - async: backwards compatible way for specifying mode 'async'
1148              
1149             (optional, default false)
1150              
1151             =item - slotgroup: the slotgroup to use for accounting of parrallel tasks
1152              
1153             (optional, conflicts with 'slots')
1154              
1155             =item - slots: the amount of tasks the worker is able to process in parallel
1156             for this action.
1157              
1158             (optional, default 1, conflicts with 'slotgroup')
1159              
1160             =item - undocb: a callback that gets called when the original callback
1161             returns an error object or throws an error.
1162              
1163             Called with the same arguments as the original callback.
1164              
1165             (optional, only valid for mode 'subproc')
1166              
1167             =item - filter: only process a subset of the action
1168              
1169             The filter expression allows a worker to specify that it can only do the
1170             actionname for a certain subset of arguments. For example, for a "mkdir"
1171             action the filter expression {'host' => 'example.com'} would mean that this
1172             worker can only do mkdir on host example.com. Filter expressions are limited
1173             to simple equality tests on one or more keys, and only those keys that are
1174             allowed in the action definition. Filtering can be allowed, be mandatory or
1175             be forbidden per action.
1176              
1177             =item - addenv: pass on action enviroment to the callback
1178              
1179             If the addenv flag is true the action callback will be given one extra
1180             argument containing the action environment as a hashref. In the async
1181             callback mode the environment will be inserted before the result callback.
1182              
1183             =back
1184              
1185             =head2 work
1186              
1187             Starts the L. Returns a non-zero value when the IOLoop was
1188             stopped due to some error condition (like a lost connection or a ping
1189             timeout).
1190              
1191             =head3 Possible work() exit codes
1192              
1193             The JobCenter::Client::Mojo library currently defines the following exit codes:
1194              
1195             WORK_OK
1196             WORK_PING_TIMEOUT
1197             WORK_CONNECTION_CLOSED
1198              
1199             =head2 stop
1200              
1201             $client->stop($exit);
1202              
1203             Makes the work() function exit with the provided exit code.
1204              
1205             =head1 SEE ALSO
1206              
1207             =over 4
1208              
1209             =item *
1210              
1211             L, L, L: the L Web framework
1212              
1213             =item *
1214              
1215             L, L
1216              
1217             =back
1218              
1219             L: JobCenter Orchestration Engine
1220              
1221             =head1 ACKNOWLEDGEMENT
1222              
1223             This software has been developed with support from L.
1224             In German: Diese Software wurde mit Unterstützung von L entwickelt.
1225              
1226             =head1 THANKS
1227              
1228             Thanks to Eitan Schuler for reporting a bug and providing a pull request.
1229              
1230             =head1 AUTHORS
1231              
1232             =over 4
1233              
1234             =item *
1235              
1236             Wieger Opmeer
1237              
1238             =back
1239              
1240             =head1 COPYRIGHT AND LICENSE
1241              
1242             This software is copyright (c) 2017 by Wieger Opmeer.
1243              
1244             This is free software; you can redistribute it and/or modify it under
1245             the same terms as the Perl 5 programming language system itself.
1246              
1247             =cut
1248              
1249             1;