File Coverage

blib/lib/JobCenter/Client/Mojo.pm
Criterion Covered Total %
statement 58 546 10.6
branch 1 222 0.4
condition 0 97 0.0
subroutine 20 96 20.8
pod 14 20 70.0
total 93 981 9.4


line stmt bran cond sub pod time code
1             package JobCenter::Client::Mojo;
2 1     1   344362 use Mojo::Base 'Mojo::EventEmitter';
  1         3  
  1         10  
3              
4             our $VERSION = '0.47'; # VERSION
5              
6             #
7             # Mojo's default reactor uses EV, and EV does not play nice with signals
8             # without some handholding. We either can try to detect EV and do the
9             # handholding, or try to prevent Mojo using EV.
10             #
11             BEGIN {
12 1 50   1   2222 $ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'};
13             }
14              
15             # more Mojolicious
16 1     1   621 use Mojo::IOLoop;
  1         369931  
  1         6  
17 1     1   65 use Mojo::IOLoop::Stream;
  1         2  
  1         8  
18 1     1   643 use Mojo::Log;
  1         18257  
  1         11  
19              
20             # standard perl
21 1     1   68 use Carp qw(croak);
  1         2  
  1         70  
22 1     1   4 use Cwd qw(realpath);
  1         1  
  1         43  
23 1     1   4 use Data::Dumper;
  1         2  
  1         51  
24 1     1   4 use Encode qw(encode_utf8 decode_utf8);
  1         2  
  1         41  
25 1     1   4 use File::Basename;
  1         1  
  1         62  
26 1     1   5 use IO::Handle;
  1         10  
  1         59  
27 1     1   8 use POSIX ();
  1         3  
  1         41  
28 1     1   23 use Scalar::Util qw(weaken);
  1         2  
  1         67  
29 1     1   5 use Storable;
  1         1  
  1         47  
30 1     1   689 use Sys::Hostname;
  1         1659  
  1         90  
31              
32             # from cpan
33 1     1   622 use JSON::RPC2::TwoWay 0.07;
  1         9443  
  1         45  
34             # JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here
35             # without adding another dependency
36 1     1   10 use JSON::MaybeXS qw(JSON);
  1         4  
  1         84  
37 1     1   581 use MojoX::NetstringStream 0.06; # for the enhanced close
  1         1810  
  1         11  
38              
39             # us
40 1     1   646 use JobCenter::Client::Mojo::Steps;
  1         3  
  1         6  
41              
42             has [qw(
43             actions address auth clientid conn debug ioloop jobs json
44             lastping log method ns ping_timeout port rpc timeout tls token who
45             )];
46              
47             # keep in sync with the jobcenter
48             use constant {
49 1         8056 WORK_OK => 0, # exit codes for work method
50             WORK_PING_TIMEOUT => 92,
51             WORK_CONNECTION_CLOSED => 91,
52 1     1   60 };
  1         2  
53              
54             sub new {
55 0     0 1   my ($class, %args) = @_;
56 0           my $self = $class->SUPER::new();
57              
58 0   0       my $address = $args{address} // '127.0.0.1';
59 0   0       my $debug = $args{debug} // 0; # or 1?
60 0   0       $self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton;
61 0   0       my $json = $args{json} // 1;
62 0 0 0       my $log = $args{log} // Mojo::Log->new(level => ($debug) ? 'debug' : 'info');
63 0   0       my $method = $args{method} // 'password';
64 0   0       my $port = $args{port} // 6522;
65 0   0       my $timeout = $args{timeout} // 60;
66 0   0       my $tls = $args{tls} // 0;
67 0           my $tls_ca = $args{tls_ca};
68 0           my $tls_cert = $args{tls_cert};
69 0           my $tls_key = $args{tls_key};
70 0 0         my $token = $args{token} or croak 'no token?';
71 0 0         my $who = $args{who} or croak 'no who?';
72              
73 0           $self->{address} = $address;
74 0   0       $self->{debug} = $args{debug} // 1;
75 0           $self->{jobs} = {};
76 0           $self->{json} = $json;
77             $self->{jsonobject} = $args{jsonobject} //
78             JSON::MaybeXS->new(utf8 => 1, allow_nonref => 1),
79 0   0       $self->{ping_timeout} = $args{ping_timeout} // 300;
      0        
80 0           $self->{log} = $log;
81 0           $self->{method} = $method;
82 0           $self->{port} = $port;
83 0           $self->{timeout} = $timeout;
84 0           $self->{tls} = $tls;
85 0           $self->{tls_ca} = $tls_ca;
86 0           $self->{tls_cert} = $tls_cert;
87 0           $self->{tls_key} = $tls_key;
88 0           $self->{token} = $token;
89 0           $self->{who} = $who;
90 0   0       $self->{autoconnect} = ($args{autoconnect} //= 1);
91              
92 0 0         return $self if !$args{autoconnect};
93              
94 0           $self->connect;
95              
96 0 0         return $self if $self->{auth};
97 0           return;
98             }
99              
100             sub connect {
101 0     0 1   my $self = shift;
102              
103 0           delete $self->ioloop->{__exit__};
104 0           delete $self->{auth};
105 0           $self->{actions} = {};
106              
107             $self->on(disconnect => sub {
108 0     0     my ($self, $code) = @_;
109             #$self->{_exit} = $code;
110 0           $self->ioloop->stop;
111 0           });
112              
113 0           my $debug = do {
114 0           weaken(my $self = $self);
115             $self->{debug} ? sub {
116 0 0   0     if ($self->{log}) {
117 0           $self->log->debug(@_);
118             } else {
119 0           warn join(' ', @_)."\n";
120             }
121             } : undef
122 0 0         };
123              
124             my $rpc = JSON::RPC2::TwoWay->new(
125             debug => $debug,
126             json => $self->{jsonobject},
127 0 0         ) or croak 'no rpc?';
128 0     0     $rpc->register('greetings', sub { $self->rpc_greetings(@_) }, notification => 1);
  0            
129 0     0     $rpc->register('job_done', sub { $self->rpc_job_done(@_) }, notification => 1);
  0            
130 0     0     $rpc->register('ping', sub { $self->rpc_ping(@_) });
  0            
131 0     0     $rpc->register('task_ready', sub { $self->rpc_task_ready(@_) }, notification => 1);
  0            
132              
133             my $clarg = {
134             address => $self->{address},
135             port => $self->{port},
136             tls => $self->{tls},
137 0           };
138 0 0         $clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca};
139 0 0         $clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert};
140 0 0         $clarg->{tls_key} = $self->{tls_key} if $self->{tls_key};
141              
142             my $clientid = $self->ioloop->client(
143             $clarg => sub {
144 0     0     my ($loop, $err, $stream) = @_;
145 0 0         if ($err) {
146 0           $err =~ s/\n$//s;
147 0           $self->log->info('connection to API failed: ' . $err);
148 0           $self->{auth} = 0;
149 0           return;
150             }
151 0           my $ns = MojoX::NetstringStream->new(stream => $stream);
152 0           $self->{ns} = $ns;
153             my $conn = $rpc->newconnection(
154             owner => $self,
155 0           write => sub { $ns->write(@_) },
156 0           );
157 0           $self->{conn} = $conn;
158             $ns->on(chunk => sub {
159 0           my ($ns2, $chunk) = @_;
160             #say 'got chunk: ', $chunk;
161 0           my @err = $conn->handle($chunk);
162 0 0         $self->log->debug('chunk handler: ' . join(' ', grep defined, @err)) if @err;
163 0 0         $ns->close if $err[0];
164 0           });
165             $ns->on(close => sub {
166             # this cb is called during global destruction, at
167             # least on old perls where
168             # Mojo::Util::_global_destruction() won't work
169 0 0         return unless $conn;
170 0           $conn->close;
171 0           $self->log->info('connection to API closed');
172 0           $self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo doc
173 0           });
174 0           });
175              
176 0           $self->{rpc} = $rpc;
177 0           $self->{clientid} = $clientid;
178              
179             # handle timeout?
180             my $tmr = $self->ioloop->timer($self->{timeout} => sub {
181 0     0     my $loop = shift;
182 0           $self->log->error('timeout wating for greeting');
183 0           $loop->remove($clientid);
184 0           $self->{auth} = 0;
185 0           });
186              
187 0           $self->log->debug('starting handshake');
188              
189 0     0     $self->_loop(sub { not defined $self->{auth} });
  0            
190              
191 0           $self->log->debug('done with handhake?');
192              
193 0           $self->ioloop->remove($tmr);
194 0           $self->unsubscribe('disconnect');
195 0           1;
196             }
197              
198             sub is_connected {
199 0     0 0   my $self = shift;
200 0   0       return $self->{auth} && !$self->ioloop->{__exit__};
201             }
202              
203             sub rpc_greetings {
204 0     0 0   my ($self, $c, $i) = @_;
205             #$self->ioloop->delay(
206             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
207             sub {
208 0     0     my $steps = shift;
209 0 0         die "wrong api version $i->{version} (expected 1.1)" unless $i->{version} eq '1.1';
210 0           $self->log->info('got greeting from ' . $i->{who});
211 0           $c->call('hello', {who => $self->who, method => $self->method, token => $self->token}, $steps->next());
212             },
213             sub {
214 0     0     my ($steps, $e, $r) = @_;
215 0           my $w;
216             #say 'hello returned: ', Dumper(\@_);
217 0 0         die "hello returned error $e->{message} ($e->{code})" if $e;
218 0 0         die 'no results from hello?' unless $r;
219 0           ($r, $w) = @$r;
220 0 0         if ($r) {
221 0           $self->log->info("hello returned: $r, $w");
222 0           $self->{auth} = 1;
223             } else {
224 0   0       $self->log->error('hello failed: ' . ($w // ''));
225 0           $self->{auth} = 0; # defined but false
226             }
227             }], sub {
228 0     0     my ($err) = @_;
229 0           $self->log->error('something went wrong in handshake: ' . $err);
230 0           $self->{auth} = '';
231 0           });
232             }
233              
234             sub rpc_job_done {
235 0     0 0   my ($self, $conn, $i) = @_;
236 0           my $job_id = $i->{job_id};
237 0           my $outargs = $i->{outargs};
238 0           my $outargsj = $self->{jsonobject}->encode($outargs);
239 0 0         $outargs = $outargsj if $self->{json};
240 0           $outargsj = decode_utf8($outargsj); # for debug printing
241 0           my $rescb = delete $self->{jobs}->{$job_id};
242 0 0         if ($rescb) {
243 0           $self->log->debug("got job_done: for job_id $job_id result: $outargsj");
244 0           local $@;
245 0           eval {
246 0           $rescb->($job_id, $outargs);
247             };
248 0 0         $self->log->info("got $@ calling result callback") if $@;
249             } else {
250 0           $self->log->debug("got job_done for unknown job $job_id result: $outargsj");
251             }
252             }
253              
254             sub call {
255 0     0 1   my ($self, %args) = @_;
256 0           my ($done, $job_id, $outargs);
257             $args{cb1} = sub {
258 0     0     ($job_id, $outargs) = @_;
259 0 0         $done++ unless $job_id;
260 0           };
261             $args{cb2} = sub {
262 0     0     ($job_id, $outargs) = @_;
263 0           $done++;
264 0           };
265 0           $self->call_nb(%args);
266              
267 0     0     $self->_loop(sub { !$done });
  0            
268              
269 0           return $job_id, $outargs;
270             }
271              
272             sub call_nb {
273 0     0 1   my ($self, %args) = @_;
274 0 0         my $wfname = $args{wfname} or die 'no workflowname?';
275 0           my $vtag = $args{vtag};
276 0   0       my $inargs = $args{inargs} // '{}';
277 0   0       my $callcb = $args{cb1} // die 'no call callback?';
278 0   0       my $rescb = $args{cb2} // die 'no result callback?';
279 0   0       my $timeout = $args{timeout} // $self->timeout * 5; # a bit hackish..
280 0           my $reqauth = $args{reqauth};
281 0           my $clenv = $args{clenv};
282 0           my $inargsj;
283              
284 0 0         if ($self->{json}) {
285 0           $inargsj = $inargs;
286 0           $inargs = $self->{jsonobject}->decode($inargs);
287 0 0         croak 'inargs is not a json object' unless ref $inargs eq 'HASH';
288 0 0         if ($clenv) {
289 0           $clenv = $self->{jsonobject}->decode($clenv);
290 0 0         croak 'clenv is not a json object' unless ref $clenv eq 'HASH';
291             }
292 0 0         if ($reqauth) {
293 0           $reqauth = $self->{jsonobject}->decode($reqauth);
294 0 0         croak 'reqauth is not a json object' unless ref $reqauth eq 'HASH';
295             }
296             } else {
297 0 0         croak 'inargs should be a hashref' unless ref $inargs eq 'HASH';
298             # test encoding
299 0           $inargsj = $self->{jsonobject}->encode($inargs);
300 0 0         if ($clenv) {
301 0 0         croak 'clenv should be a hashref' unless ref $clenv eq 'HASH';
302             }
303 0 0         if ($reqauth) {
304 0 0         croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH';
305             }
306             }
307              
308 0           $inargsj = decode_utf8($inargsj);
309 0 0         $self->log->debug("calling $wfname with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : ''));
310              
311             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
312             sub {
313 0     0     my $steps = shift;
314 0 0         $self->conn->call('create_job', {
    0          
315             wfname => $wfname,
316             vtag => $vtag,
317             inargs => $inargs,
318             timeout => $timeout,
319             ($clenv ? (clenv => $clenv) : ()),
320             ($reqauth ? (reqauth => $reqauth) : ()),
321             }, $steps->next());
322             },
323             sub {
324 0     0     my ($steps, $e, $r) = @_;
325 0           my ($job_id, $msg);
326 0 0         if ($e) {
327 0           $self->log->error("create_job returned error: $e->{message} ($e->{code})");
328 0           $msg = "$e->{message} ($e->{code})"
329             } else {
330 0           ($job_id, $msg) = @$r; # fixme: check for arrayref?
331 0 0         if ($msg) {
    0          
332 0           $self->log->error("create_job returned error: $msg");
333             } elsif ($job_id) {
334 0           $self->log->debug("create_job returned job_id: $job_id");
335 0           $self->jobs->{$job_id} = $rescb;
336             }
337             }
338 0 0         if ($msg) {
339 0 0         $msg = {error => $msg} unless ref $msg;
340 0 0         $msg = $self->{jsonobject}->encode($msg) if $self->{json};
341             }
342 0           $callcb->($job_id, $msg);
343             }], sub {
344 0     0     my ($err) = @_;
345 0           $self->log->error("Something went wrong in call_nb: $err");
346 0           $err = { error => $err };
347 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
348 0           $callcb->(undef, $err);
349 0           });
350             }
351              
352             sub close {
353 0     0 1   my ($self) = @_;
354              
355 0           $self->log->debug('closing connection');
356 0           $self->conn->close();
357 0           $self->ns->close();
358 0           %$self = ();
359             }
360              
361             sub find_jobs {
362 0     0 1   my ($self, $filter) = @_;
363 0 0         croak('no filter?') unless $filter;
364              
365             #print 'filter: ', Dumper($filter);
366 0 0         $filter = $self->{jsonobject}->encode($filter) if ref $filter eq 'HASH';
367              
368 0           my ($done, $err, $jobs);
369             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
370             sub {
371 0     0     my $steps = shift;
372             # fixme: check results?
373 0           $self->conn->call('find_jobs', { filter => $filter }, $steps->next());
374             },
375             sub {
376             #say 'find_jobs call returned: ', Dumper(\@_);
377 0     0     my ($steps, $e, $r) = @_;
378 0           $done++; # received something so done waiting
379 0 0         if ($e) {
380 0           $self->log->error("find_jobs got error $e->{message} ($e->{code})");
381 0           $err = $e->{message};
382 0           return;
383             }
384 0           $jobs = $r;
385             }], sub {
386 0     0     my ($err) = @_;
387 0           $done++;
388 0           $self->log->error("something went wrong with get_job_status: $err");
389 0           });
390              
391 0     0     $self->_loop(sub { !$done });
  0            
392              
393 0 0         return $err, @$jobs if ref $jobs eq 'ARRAY';
394 0           return $err;
395             }
396              
397             sub check_if_lock_exists {
398 0     0 1   my ($self, $locktype, $lockvalue) = @_;
399 0 0         croak('no locktype?') unless $locktype;
400 0 0         croak('no lockvalue?') unless $lockvalue;
401              
402 0           my ($done, $found);
403             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
404             sub {
405 0     0     my $steps = shift;
406             # fixme: check results?
407 0           $self->conn->call(
408             'check_if_lock_exists',
409             { locktype => $locktype, lockvalue => $lockvalue },
410             $steps->next()
411             );
412             },
413             sub {
414             #say 'find_jobs call returned: ', Dumper(\@_);
415 0     0     my ($steps, $e, $r) = @_;
416 0           $done++; # received something so done waiting
417 0 0         if ($e) {
418 0           $self->log->error("find_jobs got error $e->{message} ($e->{code})");
419 0           return;
420             }
421 0           $found = $r;
422             }], sub {
423 0     0     my ($err) = @_;
424 0           $done++;
425 0           $self->log->error("something went wrong with check_if_lock_exists: $err");
426 0           });
427              
428 0     0     $self->_loop(sub { !$done });
  0            
429              
430 0 0         $found = $self->{jsonobject}->encode($found) if $self->{json};
431 0           return $found;
432             }
433              
434             sub get_api_status {
435 0     0 0   my ($self, $what) = @_;
436 0 0         croak('no what?') unless $what;
437              
438 0           my ($done, $result);
439             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
440             sub {
441 0     0     my $steps = shift;
442 0           $self->conn->call('get_api_status', { what => $what }, $steps->next());
443             },
444             sub {
445             #say 'call returned: ', Dumper(\@_);
446 0     0     my ($d, $e, $r) = @_;
447 0           $done++; # received something so done waiting
448 0 0         if ($e) {
449 0           $self->log->error("get_api_status got error $e->{message} ($e->{code})");
450 0           $result = $e->{message};
451 0           return;
452             }
453 0           $result = $r;
454             }], sub {
455 0     0     my ($err) = @_;
456 0           $done++;
457 0           $self->log->eror("something went wrong with get_api_status: $err");
458 0           });
459              
460 0     0     $self->_loop(sub { !$done });
  0            
461              
462 0           return $result;
463             }
464              
465             sub get_job_status {
466 0     0 1   my ($self, $job_id) = @_;
467 0 0         croak('no job_id?') unless $job_id;
468              
469 0           my ($done, $job_id2, $outargs);
470             $self->get_job_status_nb(
471             job_id => $job_id,
472             statuscb => sub {
473 0     0     ($job_id2, $outargs) = @_;
474 0           $done++;
475 0           return;
476             },
477 0           );
478 0     0     $self->_loop(sub { !$done });
  0            
479 0           return $job_id2, $outargs;
480             }
481              
482             sub get_job_status_nb {
483 0     0 1   my ($self, %args) = @_;
484             my $job_id = $args{job_id} or
485 0 0         croak('no job_id?');
486              
487 0           my $statuscb = $args{statuscb};
488 0 0         croak('statuscb should be a coderef')
489             if ref $statuscb ne 'CODE';
490              
491 0           my $notifycb = $args{notifycb};
492 0 0 0       croak('notifycb should be a coderef')
493             if $notifycb and ref $notifycb ne 'CODE';
494              
495             #my ($done, $job_id2, $outargs);
496             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
497             sub {
498 0     0     my $steps = shift;
499             # fixme: check results?
500 0 0         $self->conn->call(
501             'get_job_status', {
502             job_id => $job_id,
503             notify => ($notifycb ? JSON->true : JSON->false),
504             }, $steps->next()
505             );
506             },
507             sub {
508             #say 'call returned: ', Dumper(\@_);
509 0     0     my ($steps, $e, $r) = @_;
510             #$self->log->debug("get_job_satus_nb got job_id: $res msg: $msg");
511 0 0         if ($e) {
512 0           $self->log->error("get_job_status got error $e->{message} ($e->{code})");
513 0           $e = { error => "$e->{message} ($e->{code})" };
514 0 0         $e = $self->{jsonobject}->encode($e) if $self->{json};
515 0           $statuscb->(undef, $e);
516 0           return;
517             }
518 0           my ($job_id2, $outargs) = @$r;
519 0 0 0       if ($notifycb and !$job_id2 and !$outargs) {
      0        
520 0           $self->jobs->{$job_id} = $notifycb;
521             }
522 0 0 0       $outargs = $self->{jsonobject}->encode($outargs) if $self->{json} and ref $outargs;
523 0           $statuscb->($job_id2, $outargs);
524 0           return;
525             }], sub {
526 0     0     my ($err) = @_;
527 0           $self->log->error("Something went wrong in get_job_status_nb: $err");
528 0           $err = { error => $err };
529 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
530 0           $statuscb->(undef, $err);
531 0           });
532             }
533              
534             sub ping {
535 0     0 1   my ($self, $timeout) = @_;
536              
537 0   0       $timeout //= $self->timeout;
538 0           my ($done, $ret);
539              
540             $self->ioloop->timer($timeout => sub {
541 0     0     $done++;
542 0           });
543              
544             $self->conn->call('ping', {}, sub {
545 0     0     my ($e, $r) = @_;
546 0 0 0       if (not $e and $r and $r =~ /pong/) {
      0        
547 0           $ret = 1;
548             } else {
549 0           %$self = ();
550             }
551 0           $done++;
552 0           });
553              
554 0     0     $self->_loop(sub { !$done });
  0            
555              
556 0           return $ret;
557             }
558              
559             sub work {
560 0     0 1   my ($self, $prepare) = @_;
561              
562 0           my $pt = $self->ping_timeout;
563 0           my $tmr;
564             $tmr = $self->ioloop->recurring($pt => sub {
565 0     0     my $ioloop = shift;
566 0   0       $self->log->debug('in ping_timeout timer: lastping: '
567             . ($self->lastping // 0) . ' limit: ' . (time - $pt) );
568 0 0 0       return if ($self->lastping // 0) > time - $pt;
569 0           $self->log->error('ping timeout');
570 0           $ioloop->remove($self->clientid);
571 0           $ioloop->remove($tmr);
572 0           $ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc
573 0           $ioloop->stop;
574 0 0         }) if $pt > 0;
575             $self->on(disconnect => sub {
576 0     0     my ($self, $code) = @_;
577 0           $self->ioloop->{__exit__} = $code;
578 0           $self->ioloop->stop;
579 0           });
580 0 0         return 0 if $prepare;
581              
582 0           $self->ioloop->{__exit__} = WORK_OK;
583 0           $self->log->debug('JobCenter::Client::Mojo starting work');
584 0 0         $self->ioloop->start unless Mojo::IOLoop->is_running;
585 0           $self->log->debug('JobCenter::Client::Mojo done?');
586 0 0         $self->ioloop->remove($tmr) if $tmr;
587              
588 0           return $self->ioloop->{__exit__};
589             }
590              
591             sub stop {
592 0     0 1   my ($self, $exit) = @_;
593 0           $self->ioloop->{__exit__} = $exit;
594 0           $self->ioloop->stop;
595             }
596              
597             sub create_slotgroup {
598 0     0 1   my ($self, $name, $slots) = @_;
599 0 0         croak('no slotgroup name?') unless $name;
600              
601 0           my ($done, $result);
602             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
603             sub {
604 0     0     my $steps = shift;
605 0           $self->conn->call('create_slotgroup', { name => $name, slots => $slots }, $steps->next());
606             },
607             sub {
608             #say 'call returned: ', Dumper(\@_);
609 0     0     my ($steps, $e, $r) = @_;
610 0           $done++; # received something so done waiting
611 0 0         if ($e) {
612 0           $self->log->error("create_slotgroup got error $e->{message}");
613 0           $result = $e->{message};
614 0           return;
615             }
616 0           $result = $r;
617             }], sub {
618 0     0     my ($err) = @_;
619 0           $done++;
620 0           $self->log->eror("something went wrong with create_slotgroup: $err");
621 0           });
622              
623 0     0     $self->_loop(sub { !$done });
  0            
624              
625 0           return $result;
626             }
627              
628             sub announce {
629 0     0 1   my ($self, %args) = @_;
630 0 0         my $actionname = $args{actionname} or croak 'no actionname?';
631 0 0         my $cb = $args{cb} or croak 'no cb?';
632             #my $async = $args{async} // 0;
633 0 0 0       my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync');
634 0 0         croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|sync)$/;
635 0           my $undocb = $args{undocb};
636 0           my $host = hostname;
637 0   0       my $workername = $args{workername} // "$self->{who} $host $0 $$";
638              
639 0 0         croak "already have action $actionname" if $self->actions->{$actionname};
640              
641 0           my ($done, $err);
642             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
643             sub {
644 0     0     my $steps = shift;
645             $self->conn->call('announce', {
646             workername => $workername,
647             actionname => $actionname,
648             slotgroup => $args{slotgroup},
649             slots => $args{slots},
650 0 0         (($args{filter}) ? (filter => $args{filter}) : ()),
651             }, $steps->next());
652             },
653             sub {
654             #say 'call returned: ', Dumper(\@_);
655 0     0     my ($steps, $e, $r) = @_;
656 0           $done++; # received something so done waiting
657 0 0         if ($e) {
658 0           $self->log->error("announce got error: $e->{message}");
659 0           $err = $e->{message};
660 0           return;
661             }
662 0           my ($res, $msg) = @$r;
663 0           $self->log->debug("announce got res: $res msg: $msg");
664             $self->actions->{$actionname} = {
665             cb => $cb,
666             mode => $mode,
667             undocb => $undocb,
668 0 0 0       addenv => $args{addenv} // 0,
669             } if $res;
670 0 0         $err = $msg unless $res;
671             }], sub {
672 0     0     ($err) = @_;
673 0           $done++;
674 0           $self->log->error("something went wrong with announce: $err");
675 0           });
676              
677 0     0     $self->_loop(sub { !$done });
  0            
678 0           return $err;
679             }
680              
681             sub rpc_ping {
682 0     0 0   my ($self, $c, $i, $rpccb) = @_;
683 0           $self->lastping(time());
684 0           return 'pong!';
685             }
686              
687             sub rpc_task_ready {
688             #say 'got task_ready: ', Dumper(\@_);
689 0     0 0   my ($self, $c, $i) = @_;
690 0           my $actionname = $i->{actionname};
691 0           my $job_id = $i->{job_id};
692 0           my $action = $self->actions->{$actionname};
693 0 0         unless ($action) {
694 0           $self->log->info("got task_ready for unknown action $actionname");
695 0           return;
696             }
697              
698 0           $self->log->debug("got task_ready for $actionname job_id $job_id calling get_task");
699             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
700             sub {
701 0     0     my $steps = shift;
702 0           $c->call('get_task', {actionname => $actionname, job_id => $job_id}, $steps->next());
703             },
704             sub {
705 0     0     my ($steps, $e, $r) = @_;
706             #say 'get_task returned: ', Dumper(\@_);
707 0 0         if ($e) {
708 0           $$self->log->debug("got $e->{message} ($e->{code}) calling get_task");
709             }
710 0 0         unless ($r) {
711 0           $self->log->debug('no task for get_task');
712 0           return;
713             }
714 0           my ($cookie, @args);
715 0           ($job_id, $cookie, @args) = @$r;
716 0 0         unless ($cookie) {
717 0           $self->log->debug('aaah? no cookie? (get_task)');
718 0           return;
719             }
720 0 0         pop @args unless $action->{addenv}; # remove env
721 0           local $@;
722 0 0         if ($action->{mode} eq 'subproc') {
    0          
    0          
723 0           eval {
724 0           $self->_subproc($c, $action, $job_id, $cookie, @args);
725             };
726 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
727             } elsif ($action->{mode} eq 'async') {
728 0           eval {
729             $action->{cb}->($job_id, @args, sub {
730 0           $c->notify('task_done', { cookie => $cookie, outargs => $_[0] });
731 0           });
732             };
733 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
734             } elsif ($action->{mode} eq 'sync') {
735 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
736 0 0         $outargs = { error => $@ } if $@;
737 0           $c->notify('task_done', { cookie => $cookie, outargs => $outargs });
738             } else {
739 0           die "unkown mode $action->{mode}";
740             }
741             }], sub {
742 0     0     my ($err) = @_;
743 0           $self->log->error("something went wrong with rpc_task_ready: $err");
744 0           });
745             }
746              
747             sub _subproc {
748 0     0     my ($self, $c, $action, $job_id, $cookie, @args) = @_;
749              
750             # based on Mojo::IOLoop::Subprocess
751 0           my $ioloop = $self->ioloop;
752              
753             # Pipe for subprocess communication
754 0 0         pipe(my $reader, my $writer) or die "Can't create pipe: $!";
755              
756 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
757 0 0         unless ($pid) {# Child
758 0           $self->log->debug("in child $$");;
759 0           $ioloop->reset;
760 0           CORE::close $reader; # or we won't get a sigpipe when daddy dies..
761 0           my $undo = 0;
762 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
763 0 0 0       if ($@) {
    0          
764 0           $outargs = {'error' => $@};
765 0           $undo++;
766             } elsif (ref $outargs eq 'HASH' and $outargs->{'error'}) {
767 0           $undo++;
768             }
769 0 0 0       if ($undo and $action->{undocb}) {
770 0           $self->log->info("undoing for $job_id");;
771 0           my $res = eval { $action->{undocb}->($job_id, @args); };
  0            
772 0 0         $res = $@ if $@;
773             # how should this look?
774             $outargs = {'error' => {
775             'msg' => 'undo failure',
776             'undo' => $res,
777             'olderr' => $outargs->{error},
778 0           }};
779 0           $undo = 0;
780             }
781             # stop ignoring sigpipe
782 0     0     $SIG{PIPE} = sub { $undo++ };
  0            
783             # if the parent is gone we get a sigpipe here:
784 0           print $writer Storable::freeze($outargs);
785 0 0         $writer->flush or $undo++;
786 0 0         CORE::close $writer or $undo++;
787 0 0 0       if ($undo and $action->{undocb}) {
788 0           $self->log->info("undoing for $job_id");;
789 0           eval { $action->{undocb}->($job_id, @args); };
  0            
790             # ignore errors because we can't report them back..
791             }
792             # FIXME: normal exit?
793 0           POSIX::_exit(0);
794             }
795              
796             # Parent
797 0           my $me = $$;
798 0           CORE::close $writer;
799 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
800 0           $ioloop->stream($stream);
801 0           my $buffer = '';
802 0     0     $stream->on(read => sub { $buffer .= pop });
  0            
803             $stream->on(
804             close => sub {
805             #say "close handler!";
806 0 0   0     return unless $$ == $me;
807 0           waitpid $pid, 0;
808 0           my $outargs = eval { Storable::thaw($buffer) };
  0            
809 0 0         $outargs = { error => $@ } if $@;
810 0 0 0       if ($outargs and ref $outargs eq 'HASH') {
811 0           $self->log->debug('subprocess results: ' . Dumper($outargs));
812 0           eval { $c->notify(
  0            
813             'task_done',
814             { cookie => $cookie, outargs => $outargs }
815             ); }; # the connection might be gone?
816             } # else?
817             }
818 0           );
819             }
820              
821             # tick while Mojo::Reactor is still running and condition callback is true
822             sub _loop {
823 0 0   0     warn __PACKAGE__." recursing into IO loop" if state $looping++;
824              
825 0           my $reactor = $_[0]->ioloop->singleton->reactor;
826 0           my $err;
827              
828 0 0         if (ref $reactor eq 'Mojo::Reactor::EV') {
    0          
829              
830 0           my $active = 1;
831              
832 0   0       $active = $reactor->one_tick while $_[1]->() && $active;
833              
834             } elsif (ref $reactor eq 'Mojo::Reactor::Poll') {
835              
836 0           $reactor->{running}++;
837              
838 0   0       $reactor->one_tick while $_[1]->() && $reactor->is_running;
839              
840 0   0       $reactor->{running} &&= $reactor->{running} - 1;
841              
842             } else {
843              
844 0           $err = "unknown reactor: ".ref $reactor;
845             }
846              
847 0           $looping--;
848 0 0         die $err if $err;
849             }
850              
851             #sub DESTROY {
852             # my $self = shift;
853             # say 'destroying ', $self;
854             #}
855              
856             1;
857              
858             =encoding utf8
859              
860             =head1 NAME
861              
862             JobCenter::Client::Mojo - JobCenter JSON-RPC 2.0 Api client using Mojo.
863              
864             =head1 SYNOPSIS
865              
866             use JobCenter::Client::Mojo;
867              
868             my $client = JobCenter::Client::Mojo->new(
869             address => ...
870             port => ...
871             who => ...
872             token => ...
873             );
874              
875             my ($job_id, $outargs) = $client->call(
876             wfname => 'test',
877             inargs => { test => 'test' },
878             );
879              
880             =head1 DESCRIPTION
881              
882             L is a class to build a client to connect to the
883             JSON-RPC 2.0 Api of the L workflow engine. The client can be
884             used to create and inspect jobs as well as for providing 'worker' services
885             to the JobCenter.
886              
887             =head1 METHODS
888              
889             =head2 new
890              
891             $client = JobCenter::Client::Mojo->new(%arguments);
892              
893             Class method that returns a new JobCenter::Client::Mojo object.
894              
895             Valid arguments are:
896              
897             =over 4
898              
899             =item - address: address of the Api.
900              
901             (default: 127.0.0.1)
902              
903             =item - port: port of the Api
904              
905             (default 6522)
906              
907             =item - tls: connect using tls
908              
909             (default false)
910              
911             =item - tls_ca: verify server using ca
912              
913             (default undef)
914              
915             =item - tls_key: private client key
916              
917             (default undef)
918              
919             =item - tls_ca: public client certificate
920              
921             (default undef)
922              
923             =item - who: who to authenticate as.
924              
925             (required)
926              
927             =item - method: how to authenticate.
928              
929             (default: password)
930              
931             =item - token: token to authenticate with.
932              
933             (required)
934              
935             =item - debug: when true prints debugging using L
936              
937             (default: false)
938              
939             =item - ioloop: L object to use
940              
941             (per default the L->singleton object is used)
942              
943             =item - json: flag wether input is json or perl.
944              
945             when true expects the inargs to be valid json, when false a perl hashref is
946             expected and json encoded. (default true)
947              
948             =item - jsonobject: json encoder/decoder object to use
949              
950             (per default a new L object is created)
951              
952             =item - log: L object to use
953              
954             (per default a new L object is created)
955              
956             =item - timeout: how long to wait for Api calls to complete
957              
958             (default 60 seconds)
959              
960             =item - ping_timeout: after this long without a ping from the Api the
961             connection will be closed and the work() method will return
962              
963             (default 5 minutes)
964              
965             =back
966              
967             =head2 call
968              
969             ($job_id, $result) = $client->call(%args);
970              
971             Creates a new L job and waits for the results. Throws an error
972             if somethings goes wrong immediately. Errors encountered during later
973             processing are returned as a L error object.
974              
975             Valid arguments are:
976              
977             =over 4
978              
979             =item - wfname: name of the workflow to call (required)
980              
981             =item - inargs: input arguments for the workflow (if any)
982              
983             =item - vtag: version tag of the workflow to use (optional)
984              
985             =item - timeout: wait this many seconds for the job to finish
986             (optional, defaults to 5 times the Api-call timeout, so default 5 minutes)
987              
988             =item - reqauth: authentication token to be passed on to the authentication
989             module of the API for per job/request authentication.
990              
991             =item - clenv: client environment, made available as part of the job
992             environment and inherited to child jobs.
993              
994             =back
995              
996             =head2 call_nb
997              
998             $job_id = $client->call_nb(%args);
999              
1000             Creates a new L job and call the provided callback on completion
1001             of the job. Throws an error if somethings goes wrong immediately. Errors
1002             encountered during later processing are returned as a L error
1003             object to the callback.
1004              
1005             Valid arguments are those for L and:
1006              
1007             =over 4
1008              
1009             =item - cb1: coderef to the callback to call on job creation (requird)
1010              
1011             ( cb1 => sub { ($job_id, $err) = @_; ... } )
1012              
1013             If job_id is undefined the job was not created, the error is then returned
1014             as the second return value.
1015              
1016             =item - cb2: coderef to the callback to call on job completion (requird)
1017              
1018             ( cb2 => sub { ($job_id, $outargs) = @_; ... } )
1019              
1020             =back
1021              
1022             =head2 get_job_status
1023              
1024             ($job_id, $result) = $client->get_job_status($job_id);
1025              
1026             Retrieves the status for the given $job_id. If the job_id does not exist
1027             then the returned $job_id will be undefined and $result will be an error
1028             message. If the job has not finished executing then both $job_id and
1029             $result will be undefined. Otherwise the $result will contain the result of
1030             the job. (Which may be a JobCenter error object)
1031              
1032             =head2 get_job_status_nb
1033              
1034             $client->get_job_status_nb(%args);
1035              
1036             Retrieves the status for the given $job_id.
1037              
1038             Valid arguments are:
1039              
1040             =over 4
1041              
1042             =item - job_id
1043              
1044             =item - statuscb: coderef to the callback for the current status
1045              
1046             ( statuscb => sub { ($job_id, $result) = @_; ... } )
1047              
1048             If the job_id does not exist then the returned $job_id will be undefined
1049             and $result will be an error message. If the job has not finished executing
1050             then both $job_id and $result will be undefined. Otherwise the $result will
1051             contain the result of the job. (Which may be a JobCenter error object)
1052              
1053             =item - notifycb: coderef to the callback for job completion
1054              
1055             ( statuscb => sub { ($job_id, $result) = @_; ... } )
1056              
1057             If the job was still running when the get_job_status_nb call was made then
1058             this callback will be called on completion of the job.
1059              
1060             =back
1061              
1062             =head2 check_if_lock_exists
1063              
1064             $found = $client->check_if_lock_exists($locktype, $lockvalue);
1065              
1066             Checks if a lock with the given locktype and lockvalue exists. (I.e. a job
1067             is currently running that holds that lock.
1068              
1069             Returns true if the lock exists, null if the locktype does not exist, false
1070             otherwise.
1071              
1072             =head2 find_jobs
1073              
1074             ($err, @jobs) = $client->find_jobs({'foo'=>'bar'});
1075              
1076             Finds all currently running jobs with arguments matching the filter
1077             expression. The expression is evaluated in PostgreSQL using the @> for
1078             jsonb objects, basically this means that you can only do equality tests for
1079             one or more top-level keys. If @jobs is empty $err might contain an error
1080             message.
1081              
1082             =head2 ping
1083              
1084             $status = $client->ping($timeout);
1085              
1086             Tries to ping the JobCenter API. On success return true. On failure returns
1087             the undefined value, after that the client object should be undefined.
1088              
1089             =head2 close
1090              
1091             $client->close()
1092              
1093             Closes the connection to the JobCenter API and tries to de-allocate
1094             everything. Trying to use the client afterwards will produce errors.
1095              
1096             =head2 create_slotgroup
1097              
1098             $client->create_slotgroup($name, $slots)
1099              
1100             A 'slotgroup' is a way of telling the JobCenter API how many taskss the
1101             worker can do at once. The number of slots should be a positive integer.
1102             Slotgroups names starting with a _ (underscore) are reserved for internal
1103             use. Returns a error message when then was an error, the empty string
1104             otherwise.
1105              
1106             =head2 announce
1107              
1108             Announces the capability to do an action to the Api. The provided callback
1109             will be called when there is a task to be performed. Returns an error when
1110             there was a problem announcing the action.
1111              
1112             my $err = $client->announce(
1113             workername => 'me',
1114             actionname => 'do',
1115             slots => 1
1116             cb => sub { ... },
1117             );
1118             die "could not announce $actionname?: $err" if $err;
1119              
1120             See L for an example.
1121              
1122             Valid arguments are:
1123              
1124             =over 4
1125              
1126             =item - workername: name of the worker
1127              
1128             (optional, defaults to client->who, processname and processid)
1129              
1130             =item - actionname: name of the action
1131              
1132             (required)
1133              
1134             =item - cb: callback to be called for the action
1135              
1136             (required)
1137              
1138             =item - mode: callback mode
1139              
1140             (optional, default 'sync')
1141              
1142             Possible values:
1143              
1144             =over 8
1145              
1146             =item - 'sync': simple blocking mode, just return the results from the
1147             callback. Use only for callbacks taking less than (about) a second.
1148              
1149             =item - 'subproc': the simple blocking callback is started in a seperate
1150             process. Useful for callbacks that take a long time.
1151              
1152             =item - 'async': the callback gets passed another callback as the last
1153             argument that is to be called on completion of the task. For advanced use
1154             cases where the worker is actually more like a proxy. The (initial)
1155             callback is expected to return soonish to the event loop, after setting up
1156             some Mojo-callbacks.
1157              
1158             =back
1159              
1160             =item - async: backwards compatible way for specifying mode 'async'
1161              
1162             (optional, default false)
1163              
1164             =item - slotgroup: the slotgroup to use for accounting of parrallel tasks
1165              
1166             (optional, conflicts with 'slots')
1167              
1168             =item - slots: the amount of tasks the worker is able to process in parallel
1169             for this action.
1170              
1171             (optional, default 1, conflicts with 'slotgroup')
1172              
1173             =item - undocb: a callback that gets called when the original callback
1174             returns an error object or throws an error.
1175              
1176             Called with the same arguments as the original callback.
1177              
1178             (optional, only valid for mode 'subproc')
1179              
1180             =item - filter: only process a subset of the action
1181              
1182             The filter expression allows a worker to specify that it can only do the
1183             actionname for a certain subset of arguments. For example, for a "mkdir"
1184             action the filter expression {'host' => 'example.com'} would mean that this
1185             worker can only do mkdir on host example.com. Filter expressions are limited
1186             to simple equality tests on one or more keys, and only those keys that are
1187             allowed in the action definition. Filtering can be allowed, be mandatory or
1188             be forbidden per action.
1189              
1190             =item - addenv: pass on action enviroment to the callback
1191              
1192             If the addenv flag is true the action callback will be given one extra
1193             argument containing the action environment as a hashref. In the async
1194             callback mode the environment will be inserted before the result callback.
1195              
1196             =back
1197              
1198             =head2 work
1199              
1200             Starts the L. Returns a non-zero value when the IOLoop was
1201             stopped due to some error condition (like a lost connection or a ping
1202             timeout).
1203              
1204             =head3 Possible work() exit codes
1205              
1206             The JobCenter::Client::Mojo library currently defines the following exit codes:
1207              
1208             WORK_OK
1209             WORK_PING_TIMEOUT
1210             WORK_CONNECTION_CLOSED
1211              
1212             =head2 stop
1213              
1214             $client->stop($exit);
1215              
1216             Makes the work() function exit with the provided exit code.
1217              
1218             =head1 SEE ALSO
1219              
1220             =over 4
1221              
1222             =item *
1223              
1224             L, L, L: the L Web framework
1225              
1226             =item *
1227              
1228             L, L
1229              
1230             =back
1231              
1232             L: JobCenter Orchestration Engine
1233              
1234             =head1 ACKNOWLEDGEMENT
1235              
1236             This software has been developed with support from L.
1237             In German: Diese Software wurde mit Unterstützung von L entwickelt.
1238              
1239             =head1 THANKS
1240              
1241             Thanks to Eitan Schuler for reporting a bug and providing a pull request.
1242              
1243             =head1 AUTHORS
1244              
1245             =over 4
1246              
1247             =item *
1248              
1249             Wieger Opmeer
1250              
1251             =back
1252              
1253             =head1 COPYRIGHT AND LICENSE
1254              
1255             This software is copyright (C) 2017-2023 by Wieger Opmeer.
1256              
1257             This is free software; you can redistribute it and/or modify it under
1258             the same terms as the Perl 5 programming language system itself.
1259              
1260             =cut
1261              
1262             1;