line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package JobCenter::Client::Mojo; |
2
|
1
|
|
|
1
|
|
927
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
10
|
|
3
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
our $VERSION = '0.45'; # VERSION |
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
# |
7
|
|
|
|
|
|
|
# Mojo's default reactor uses EV, and EV does not play nice with signals |
8
|
|
|
|
|
|
|
# without some handholding. We either can try to detect EV and do the |
9
|
|
|
|
|
|
|
# handholding, or try to prevent Mojo using EV. |
10
|
|
|
|
|
|
|
# |
11
|
|
|
|
|
|
|
BEGIN { |
12
|
1
|
50
|
|
1
|
|
2014
|
$ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'}; |
13
|
|
|
|
|
|
|
} |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
# more Mojolicious |
16
|
1
|
|
|
1
|
|
541
|
use Mojo::IOLoop; |
|
1
|
|
|
|
|
166154
|
|
|
1
|
|
|
|
|
19
|
|
17
|
1
|
|
|
1
|
|
43
|
use Mojo::IOLoop::Stream; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
6
|
|
18
|
1
|
|
|
1
|
|
543
|
use Mojo::Log; |
|
1
|
|
|
|
|
12685
|
|
|
1
|
|
|
|
|
13
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# standard perl |
21
|
1
|
|
|
1
|
|
45
|
use Carp qw(croak); |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
50
|
|
22
|
1
|
|
|
1
|
|
7
|
use Cwd qw(realpath); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
53
|
|
23
|
1
|
|
|
1
|
|
8
|
use Data::Dumper; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
55
|
|
24
|
1
|
|
|
1
|
|
6
|
use Encode qw(encode_utf8 decode_utf8); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
45
|
|
25
|
1
|
|
|
1
|
|
6
|
use File::Basename; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
57
|
|
26
|
1
|
|
|
1
|
|
6
|
use IO::Handle; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
47
|
|
27
|
1
|
|
|
1
|
|
7
|
use POSIX (); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
15
|
|
28
|
1
|
|
|
1
|
|
784
|
use Storable; |
|
1
|
|
|
|
|
3455
|
|
|
1
|
|
|
|
|
60
|
|
29
|
1
|
|
|
1
|
|
527
|
use Sys::Hostname; |
|
1
|
|
|
|
|
1030
|
|
|
1
|
|
|
|
|
90
|
|
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
# from cpan |
32
|
1
|
|
|
1
|
|
529
|
use JSON::RPC2::TwoWay 0.05; |
|
1
|
|
|
|
|
3979
|
|
|
1
|
|
|
|
|
35
|
|
33
|
|
|
|
|
|
|
# JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here |
34
|
|
|
|
|
|
|
# without adding another dependency |
35
|
1
|
|
|
1
|
|
7
|
use JSON::MaybeXS qw(JSON); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
70
|
|
36
|
1
|
|
|
1
|
|
552
|
use MojoX::NetstringStream 0.06; # for the enhanced close |
|
1
|
|
|
|
|
1367
|
|
|
1
|
|
|
|
|
8
|
|
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# us |
39
|
1
|
|
|
1
|
|
553
|
use JobCenter::Client::Mojo::Steps; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
7
|
|
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
has [qw( |
42
|
|
|
|
|
|
|
actions address auth clientid conn debug ioloop jobs json |
43
|
|
|
|
|
|
|
lastping log method ns ping_timeout port rpc timeout tls token who |
44
|
|
|
|
|
|
|
)]; |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# keep in sync with the jobcenter |
47
|
|
|
|
|
|
|
use constant { |
48
|
1
|
|
|
|
|
8039
|
WORK_OK => 0, # exit codes for work method |
49
|
|
|
|
|
|
|
WORK_PING_TIMEOUT => 92, |
50
|
|
|
|
|
|
|
WORK_CONNECTION_CLOSED => 91, |
51
|
1
|
|
|
1
|
|
62
|
}; |
|
1
|
|
|
|
|
2
|
|
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
sub new { |
54
|
0
|
|
|
0
|
1
|
|
my ($class, %args) = @_; |
55
|
0
|
|
|
|
|
|
my $self = $class->SUPER::new(); |
56
|
|
|
|
|
|
|
|
57
|
0
|
|
0
|
|
|
|
my $address = $args{address} // '127.0.0.1'; |
58
|
0
|
|
0
|
|
|
|
my $debug = $args{debug} // 0; # or 1? |
59
|
0
|
|
0
|
|
|
|
$self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton; |
60
|
0
|
|
0
|
|
|
|
my $json = $args{json} // 1; |
61
|
0
|
0
|
0
|
|
|
|
my $log = $args{log} // Mojo::Log->new(level => ($debug) ? 'debug' : 'info'); |
62
|
0
|
|
0
|
|
|
|
my $method = $args{method} // 'password'; |
63
|
0
|
|
0
|
|
|
|
my $port = $args{port} // 6522; |
64
|
0
|
|
0
|
|
|
|
my $timeout = $args{timeout} // 60; |
65
|
0
|
|
0
|
|
|
|
my $tls = $args{tls} // 0; |
66
|
0
|
|
|
|
|
|
my $tls_ca = $args{tls_ca}; |
67
|
0
|
|
|
|
|
|
my $tls_cert = $args{tls_cert}; |
68
|
0
|
|
|
|
|
|
my $tls_key = $args{tls_key}; |
69
|
0
|
0
|
|
|
|
|
my $token = $args{token} or croak 'no token?'; |
70
|
0
|
0
|
|
|
|
|
my $who = $args{who} or croak 'no who?'; |
71
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
|
$self->{address} = $address; |
73
|
0
|
|
0
|
|
|
|
$self->{debug} = $args{debug} // 1; |
74
|
0
|
|
|
|
|
|
$self->{jobs} = {}; |
75
|
0
|
|
|
|
|
|
$self->{json} = $json; |
76
|
|
|
|
|
|
|
$self->{jsonobject} = $args{jsonobject} // |
77
|
|
|
|
|
|
|
JSON::MaybeXS->new(utf8 => 1, allow_nonref => 1), |
78
|
0
|
|
0
|
|
|
|
$self->{ping_timeout} = $args{ping_timeout} // 300; |
|
|
|
0
|
|
|
|
|
79
|
0
|
|
|
|
|
|
$self->{log} = $log; |
80
|
0
|
|
|
|
|
|
$self->{method} = $method; |
81
|
0
|
|
|
|
|
|
$self->{port} = $port; |
82
|
0
|
|
|
|
|
|
$self->{timeout} = $timeout; |
83
|
0
|
|
|
|
|
|
$self->{tls} = $tls; |
84
|
0
|
|
|
|
|
|
$self->{tls_ca} = $tls_ca; |
85
|
0
|
|
|
|
|
|
$self->{tls_cert} = $tls_cert; |
86
|
0
|
|
|
|
|
|
$self->{tls_key} = $tls_key; |
87
|
0
|
|
|
|
|
|
$self->{token} = $token; |
88
|
0
|
|
|
|
|
|
$self->{who} = $who; |
89
|
0
|
|
0
|
|
|
|
$self->{autoconnect} = ($args{autoconnect} //= 1); |
90
|
|
|
|
|
|
|
|
91
|
0
|
0
|
|
|
|
|
return $self if !$args{autoconnect}; |
92
|
|
|
|
|
|
|
|
93
|
0
|
|
|
|
|
|
$self->connect; |
94
|
|
|
|
|
|
|
|
95
|
0
|
0
|
|
|
|
|
return $self if $self->{auth}; |
96
|
0
|
|
|
|
|
|
return; |
97
|
|
|
|
|
|
|
} |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
sub connect { |
100
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
101
|
|
|
|
|
|
|
|
102
|
0
|
|
|
|
|
|
delete $self->ioloop->{__exit__}; |
103
|
0
|
|
|
|
|
|
delete $self->{auth}; |
104
|
0
|
|
|
|
|
|
$self->{actions} = {}; |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
$self->on(disconnect => sub { |
107
|
0
|
|
|
0
|
|
|
my ($self, $code) = @_; |
108
|
|
|
|
|
|
|
#$self->{_exit} = $code; |
109
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
110
|
0
|
|
|
|
|
|
}); |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
my $rpc = JSON::RPC2::TwoWay->new( |
113
|
|
|
|
|
|
|
debug => $self->{debug}, |
114
|
|
|
|
|
|
|
json => $self->{jsonobject}, |
115
|
0
|
0
|
|
|
|
|
) or croak 'no rpc?'; |
116
|
0
|
|
|
0
|
|
|
$rpc->register('greetings', sub { $self->rpc_greetings(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
117
|
0
|
|
|
0
|
|
|
$rpc->register('job_done', sub { $self->rpc_job_done(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
118
|
0
|
|
|
0
|
|
|
$rpc->register('ping', sub { $self->rpc_ping(@_) }); |
|
0
|
|
|
|
|
|
|
119
|
0
|
|
|
0
|
|
|
$rpc->register('task_ready', sub { $self->rpc_task_ready(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
my $clarg = { |
122
|
|
|
|
|
|
|
address => $self->{address}, |
123
|
|
|
|
|
|
|
port => $self->{port}, |
124
|
|
|
|
|
|
|
tls => $self->{tls}, |
125
|
0
|
|
|
|
|
|
}; |
126
|
0
|
0
|
|
|
|
|
$clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca}; |
127
|
0
|
0
|
|
|
|
|
$clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert}; |
128
|
0
|
0
|
|
|
|
|
$clarg->{tls_key} = $self->{tls_key} if $self->{tls_key}; |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
my $clientid = $self->ioloop->client( |
131
|
|
|
|
|
|
|
$clarg => sub { |
132
|
0
|
|
|
0
|
|
|
my ($loop, $err, $stream) = @_; |
133
|
0
|
0
|
|
|
|
|
if ($err) { |
134
|
0
|
|
|
|
|
|
$err =~ s/\n$//s; |
135
|
0
|
|
|
|
|
|
$self->log->info('connection to API failed: ' . $err); |
136
|
0
|
|
|
|
|
|
$self->{auth} = 0; |
137
|
0
|
|
|
|
|
|
return; |
138
|
|
|
|
|
|
|
} |
139
|
0
|
|
|
|
|
|
my $ns = MojoX::NetstringStream->new(stream => $stream); |
140
|
0
|
|
|
|
|
|
$self->{ns} = $ns; |
141
|
|
|
|
|
|
|
my $conn = $rpc->newconnection( |
142
|
|
|
|
|
|
|
owner => $self, |
143
|
0
|
|
|
|
|
|
write => sub { $ns->write(@_) }, |
144
|
0
|
|
|
|
|
|
); |
145
|
0
|
|
|
|
|
|
$self->{conn} = $conn; |
146
|
|
|
|
|
|
|
$ns->on(chunk => sub { |
147
|
0
|
|
|
|
|
|
my ($ns2, $chunk) = @_; |
148
|
|
|
|
|
|
|
#say 'got chunk: ', $chunk; |
149
|
0
|
|
|
|
|
|
my @err = $conn->handle($chunk); |
150
|
0
|
0
|
|
|
|
|
$self->log->debug('chunk handler: ' . join(' ', grep defined, @err)) if @err; |
151
|
0
|
0
|
|
|
|
|
$ns->close if $err[0]; |
152
|
0
|
|
|
|
|
|
}); |
153
|
|
|
|
|
|
|
$ns->on(close => sub { |
154
|
|
|
|
|
|
|
# this cb is called during global destruction, at |
155
|
|
|
|
|
|
|
# least on old perls where |
156
|
|
|
|
|
|
|
# Mojo::Util::_global_destruction() won't work |
157
|
0
|
0
|
|
|
|
|
return unless $conn; |
158
|
0
|
|
|
|
|
|
$conn->close; |
159
|
0
|
|
|
|
|
|
$self->log->info('connection to API closed'); |
160
|
0
|
|
|
|
|
|
$self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo doc |
161
|
0
|
|
|
|
|
|
}); |
162
|
0
|
|
|
|
|
|
}); |
163
|
|
|
|
|
|
|
|
164
|
0
|
|
|
|
|
|
$self->{rpc} = $rpc; |
165
|
0
|
|
|
|
|
|
$self->{clientid} = $clientid; |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
# handle timeout? |
168
|
|
|
|
|
|
|
my $tmr = $self->ioloop->timer($self->{timeout} => sub { |
169
|
0
|
|
|
0
|
|
|
my $loop = shift; |
170
|
0
|
|
|
|
|
|
$self->log->error('timeout wating for greeting'); |
171
|
0
|
|
|
|
|
|
$loop->remove($clientid); |
172
|
0
|
|
|
|
|
|
$self->{auth} = 0; |
173
|
0
|
|
|
|
|
|
}); |
174
|
|
|
|
|
|
|
|
175
|
0
|
|
|
|
|
|
$self->log->debug('starting handshake'); |
176
|
|
|
|
|
|
|
|
177
|
0
|
|
|
0
|
|
|
$self->_loop(sub { not defined $self->{auth} }); |
|
0
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
|
179
|
0
|
|
|
|
|
|
$self->log->debug('done with handhake?'); |
180
|
|
|
|
|
|
|
|
181
|
0
|
|
|
|
|
|
$self->ioloop->remove($tmr); |
182
|
0
|
|
|
|
|
|
$self->unsubscribe('disconnect'); |
183
|
0
|
|
|
|
|
|
1; |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
sub is_connected { |
187
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
188
|
0
|
|
0
|
|
|
|
return $self->{auth} && !$self->ioloop->{__exit__}; |
189
|
|
|
|
|
|
|
} |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
sub rpc_greetings { |
192
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i) = @_; |
193
|
|
|
|
|
|
|
#$self->ioloop->delay( |
194
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
195
|
|
|
|
|
|
|
sub { |
196
|
0
|
|
|
0
|
|
|
my $steps = shift; |
197
|
0
|
0
|
|
|
|
|
die "wrong api version $i->{version} (expected 1.1)" unless $i->{version} eq '1.1'; |
198
|
0
|
|
|
|
|
|
$self->log->info('got greeting from ' . $i->{who}); |
199
|
0
|
|
|
|
|
|
$c->call('hello', {who => $self->who, method => $self->method, token => $self->token}, $steps->next()); |
200
|
|
|
|
|
|
|
}, |
201
|
|
|
|
|
|
|
sub { |
202
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
203
|
0
|
|
|
|
|
|
my $w; |
204
|
|
|
|
|
|
|
#say 'hello returned: ', Dumper(\@_); |
205
|
0
|
0
|
|
|
|
|
die "hello returned error $e->{message} ($e->{code})" if $e; |
206
|
0
|
0
|
|
|
|
|
die 'no results from hello?' unless $r; |
207
|
0
|
|
|
|
|
|
($r, $w) = @$r; |
208
|
0
|
0
|
|
|
|
|
if ($r) { |
209
|
0
|
|
|
|
|
|
$self->log->info("hello returned: $r, $w"); |
210
|
0
|
|
|
|
|
|
$self->{auth} = 1; |
211
|
|
|
|
|
|
|
} else { |
212
|
0
|
|
0
|
|
|
|
$self->log->error('hello failed: ' . ($w // '')); |
213
|
0
|
|
|
|
|
|
$self->{auth} = 0; # defined but false |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
}], sub { |
216
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
217
|
0
|
|
|
|
|
|
$self->log->error('something went wrong in handshake: ' . $err); |
218
|
0
|
|
|
|
|
|
$self->{auth} = ''; |
219
|
0
|
|
|
|
|
|
}); |
220
|
|
|
|
|
|
|
} |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
sub rpc_job_done { |
223
|
0
|
|
|
0
|
0
|
|
my ($self, $conn, $i) = @_; |
224
|
0
|
|
|
|
|
|
my $job_id = $i->{job_id}; |
225
|
0
|
|
|
|
|
|
my $outargs = $i->{outargs}; |
226
|
0
|
|
|
|
|
|
my $outargsj = $self->{jsonobject}->encode($outargs); |
227
|
0
|
0
|
|
|
|
|
$outargs = $outargsj if $self->{json}; |
228
|
0
|
|
|
|
|
|
$outargsj = decode_utf8($outargsj); # for debug printing |
229
|
0
|
|
|
|
|
|
my $rescb = delete $self->{jobs}->{$job_id}; |
230
|
0
|
0
|
|
|
|
|
if ($rescb) { |
231
|
0
|
|
|
|
|
|
$self->log->debug("got job_done: for job_id $job_id result: $outargsj"); |
232
|
0
|
|
|
|
|
|
local $@; |
233
|
0
|
|
|
|
|
|
eval { |
234
|
0
|
|
|
|
|
|
$rescb->($job_id, $outargs); |
235
|
|
|
|
|
|
|
}; |
236
|
0
|
0
|
|
|
|
|
$self->log->info("got $@ calling result callback") if $@; |
237
|
|
|
|
|
|
|
} else { |
238
|
0
|
|
|
|
|
|
$self->log->debug("got job_done for unknown job $job_id result: $outargsj"); |
239
|
|
|
|
|
|
|
} |
240
|
|
|
|
|
|
|
} |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
sub call { |
243
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
244
|
0
|
|
|
|
|
|
my ($done, $job_id, $outargs); |
245
|
|
|
|
|
|
|
$args{cb1} = sub { |
246
|
0
|
|
|
0
|
|
|
($job_id, $outargs) = @_; |
247
|
0
|
0
|
|
|
|
|
$done++ unless $job_id; |
248
|
0
|
|
|
|
|
|
}; |
249
|
|
|
|
|
|
|
$args{cb2} = sub { |
250
|
0
|
|
|
0
|
|
|
($job_id, $outargs) = @_; |
251
|
0
|
|
|
|
|
|
$done++; |
252
|
0
|
|
|
|
|
|
}; |
253
|
0
|
|
|
|
|
|
$self->call_nb(%args); |
254
|
|
|
|
|
|
|
|
255
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
|
257
|
0
|
|
|
|
|
|
return $job_id, $outargs; |
258
|
|
|
|
|
|
|
} |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
sub call_nb { |
261
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
262
|
0
|
0
|
|
|
|
|
my $wfname = $args{wfname} or die 'no workflowname?'; |
263
|
0
|
|
|
|
|
|
my $vtag = $args{vtag}; |
264
|
0
|
|
0
|
|
|
|
my $inargs = $args{inargs} // '{}'; |
265
|
0
|
|
0
|
|
|
|
my $callcb = $args{cb1} // die 'no call callback?'; |
266
|
0
|
|
0
|
|
|
|
my $rescb = $args{cb2} // die 'no result callback?'; |
267
|
0
|
|
0
|
|
|
|
my $timeout = $args{timeout} // $self->timeout * 5; # a bit hackish.. |
268
|
0
|
|
|
|
|
|
my $reqauth = $args{reqauth}; |
269
|
0
|
|
|
|
|
|
my $clenv = $args{clenv}; |
270
|
0
|
|
|
|
|
|
my $inargsj; |
271
|
|
|
|
|
|
|
|
272
|
0
|
0
|
|
|
|
|
if ($self->{json}) { |
273
|
0
|
|
|
|
|
|
$inargsj = $inargs; |
274
|
0
|
|
|
|
|
|
$inargs = $self->{jsonobject}->decode($inargs); |
275
|
0
|
0
|
|
|
|
|
croak 'inargs is not a json object' unless ref $inargs eq 'HASH'; |
276
|
0
|
0
|
|
|
|
|
if ($clenv) { |
277
|
0
|
|
|
|
|
|
$clenv = $self->{jsonobject}->decode($clenv); |
278
|
0
|
0
|
|
|
|
|
croak 'clenv is not a json object' unless ref $clenv eq 'HASH'; |
279
|
|
|
|
|
|
|
} |
280
|
0
|
0
|
|
|
|
|
if ($reqauth) { |
281
|
0
|
|
|
|
|
|
$reqauth = $self->{jsonobject}->decode($reqauth); |
282
|
0
|
0
|
|
|
|
|
croak 'reqauth is not a json object' unless ref $reqauth eq 'HASH'; |
283
|
|
|
|
|
|
|
} |
284
|
|
|
|
|
|
|
} else { |
285
|
0
|
0
|
|
|
|
|
croak 'inargs should be a hashref' unless ref $inargs eq 'HASH'; |
286
|
|
|
|
|
|
|
# test encoding |
287
|
0
|
|
|
|
|
|
$inargsj = $self->{jsonobject}->encode($inargs); |
288
|
0
|
0
|
|
|
|
|
if ($clenv) { |
289
|
0
|
0
|
|
|
|
|
croak 'clenv should be a hashref' unless ref $clenv eq 'HASH'; |
290
|
|
|
|
|
|
|
} |
291
|
0
|
0
|
|
|
|
|
if ($reqauth) { |
292
|
0
|
0
|
|
|
|
|
croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH'; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
} |
295
|
|
|
|
|
|
|
|
296
|
0
|
|
|
|
|
|
$inargsj = decode_utf8($inargsj); |
297
|
0
|
0
|
|
|
|
|
$self->log->debug("calling $wfname with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : '')); |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
300
|
|
|
|
|
|
|
sub { |
301
|
0
|
|
|
0
|
|
|
my $steps = shift; |
302
|
0
|
0
|
|
|
|
|
$self->conn->call('create_job', { |
|
|
0
|
|
|
|
|
|
303
|
|
|
|
|
|
|
wfname => $wfname, |
304
|
|
|
|
|
|
|
vtag => $vtag, |
305
|
|
|
|
|
|
|
inargs => $inargs, |
306
|
|
|
|
|
|
|
timeout => $timeout, |
307
|
|
|
|
|
|
|
($clenv ? (clenv => $clenv) : ()), |
308
|
|
|
|
|
|
|
($reqauth ? (reqauth => $reqauth) : ()), |
309
|
|
|
|
|
|
|
}, $steps->next()); |
310
|
|
|
|
|
|
|
}, |
311
|
|
|
|
|
|
|
sub { |
312
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
313
|
0
|
|
|
|
|
|
my ($job_id, $msg); |
314
|
0
|
0
|
|
|
|
|
if ($e) { |
315
|
0
|
|
|
|
|
|
$self->log->error("create_job returned error: $e->{message} ($e->{code}"); |
316
|
0
|
|
|
|
|
|
$msg = "$e->{message} ($e->{code}" |
317
|
|
|
|
|
|
|
} else { |
318
|
0
|
|
|
|
|
|
($job_id, $msg) = @$r; # fixme: check for arrayref? |
319
|
0
|
0
|
|
|
|
|
if ($msg) { |
|
|
0
|
|
|
|
|
|
320
|
0
|
|
|
|
|
|
$self->log->error("create_job returned error: $msg"); |
321
|
|
|
|
|
|
|
} elsif ($job_id) { |
322
|
0
|
|
|
|
|
|
$self->log->debug("create_job returned job_id: $job_id"); |
323
|
0
|
|
|
|
|
|
$self->jobs->{$job_id} = $rescb; |
324
|
|
|
|
|
|
|
} |
325
|
|
|
|
|
|
|
} |
326
|
0
|
0
|
|
|
|
|
if ($msg) { |
327
|
0
|
0
|
|
|
|
|
$msg = {error => $msg} unless ref $msg; |
328
|
0
|
0
|
|
|
|
|
$msg = $self->{jsonobject}->encode($msg) if $self->{json}; |
329
|
|
|
|
|
|
|
} |
330
|
0
|
|
|
|
|
|
$callcb->($job_id, $msg); |
331
|
|
|
|
|
|
|
}], sub { |
332
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
333
|
0
|
|
|
|
|
|
$self->log->error("Something went wrong in call_nb: $err"); |
334
|
0
|
|
|
|
|
|
$err = { error => $err }; |
335
|
0
|
0
|
|
|
|
|
$err = $self->{jsonobject}->encode($err) if $self->{json}; |
336
|
0
|
|
|
|
|
|
$callcb->(undef, $err); |
337
|
0
|
|
|
|
|
|
}); |
338
|
|
|
|
|
|
|
} |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
sub close { |
341
|
0
|
|
|
0
|
1
|
|
my ($self) = @_; |
342
|
|
|
|
|
|
|
|
343
|
0
|
|
|
|
|
|
$self->log->debug('closing connection'); |
344
|
0
|
|
|
|
|
|
$self->conn->close(); |
345
|
0
|
|
|
|
|
|
$self->ns->close(); |
346
|
0
|
|
|
|
|
|
%$self = (); |
347
|
|
|
|
|
|
|
} |
348
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
sub find_jobs { |
350
|
0
|
|
|
0
|
1
|
|
my ($self, $filter) = @_; |
351
|
0
|
0
|
|
|
|
|
croak('no filter?') unless $filter; |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
#print 'filter: ', Dumper($filter); |
354
|
0
|
0
|
|
|
|
|
$filter = $self->{jsonobject}->encode($filter) if ref $filter eq 'HASH'; |
355
|
|
|
|
|
|
|
|
356
|
0
|
|
|
|
|
|
my ($done, $err, $jobs); |
357
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
358
|
|
|
|
|
|
|
sub { |
359
|
0
|
|
|
0
|
|
|
my $steps = shift; |
360
|
|
|
|
|
|
|
# fixme: check results? |
361
|
0
|
|
|
|
|
|
$self->conn->call('find_jobs', { filter => $filter }, $steps->next()); |
362
|
|
|
|
|
|
|
}, |
363
|
|
|
|
|
|
|
sub { |
364
|
|
|
|
|
|
|
#say 'find_jobs call returned: ', Dumper(\@_); |
365
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
366
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
367
|
0
|
0
|
|
|
|
|
if ($e) { |
368
|
0
|
|
|
|
|
|
$self->log->error("find_jobs got error $e->{message} ($e->{code})"); |
369
|
0
|
|
|
|
|
|
$err = $e->{message}; |
370
|
0
|
|
|
|
|
|
return; |
371
|
|
|
|
|
|
|
} |
372
|
0
|
|
|
|
|
|
$jobs = $r; |
373
|
|
|
|
|
|
|
}], sub { |
374
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
375
|
0
|
|
|
|
|
|
$done++; |
376
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with get_job_status: $err"); |
377
|
0
|
|
|
|
|
|
}); |
378
|
|
|
|
|
|
|
|
379
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
|
381
|
0
|
0
|
|
|
|
|
return $err, @$jobs if ref $jobs eq 'ARRAY'; |
382
|
0
|
|
|
|
|
|
return $err; |
383
|
|
|
|
|
|
|
} |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
sub check_if_lock_exists { |
386
|
0
|
|
|
0
|
1
|
|
my ($self, $locktype, $lockvalue) = @_; |
387
|
0
|
0
|
|
|
|
|
croak('no locktype?') unless $locktype; |
388
|
0
|
0
|
|
|
|
|
croak('no lockvalue?') unless $lockvalue; |
389
|
|
|
|
|
|
|
|
390
|
0
|
|
|
|
|
|
my ($done, $err, $found); |
391
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
392
|
|
|
|
|
|
|
sub { |
393
|
0
|
|
|
0
|
|
|
my $steps = shift; |
394
|
|
|
|
|
|
|
# fixme: check results? |
395
|
0
|
|
|
|
|
|
$self->conn->call( |
396
|
|
|
|
|
|
|
'check_if_lock_exists', |
397
|
|
|
|
|
|
|
{ locktype => $locktype, lockvalue => $lockvalue }, |
398
|
|
|
|
|
|
|
$steps->next() |
399
|
|
|
|
|
|
|
); |
400
|
|
|
|
|
|
|
}, |
401
|
|
|
|
|
|
|
sub { |
402
|
|
|
|
|
|
|
#say 'find_jobs call returned: ', Dumper(\@_); |
403
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
404
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
405
|
0
|
0
|
|
|
|
|
if ($e) { |
406
|
0
|
|
|
|
|
|
$self->log->error("find_jobs got error $e->{message} ($e->{code})"); |
407
|
0
|
|
|
|
|
|
$err = $e->{message}; |
408
|
0
|
|
|
|
|
|
return; |
409
|
|
|
|
|
|
|
} |
410
|
0
|
|
|
|
|
|
$found = $r; |
411
|
|
|
|
|
|
|
}], sub { |
412
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
413
|
0
|
|
|
|
|
|
$done++; |
414
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with check_if_lock_exists: $err"); |
415
|
0
|
|
|
|
|
|
}); |
416
|
|
|
|
|
|
|
|
417
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
418
|
|
|
|
|
|
|
|
419
|
0
|
0
|
|
|
|
|
$found = $self->{jsonobject}->encode($found) if $self->{json}; |
420
|
0
|
|
|
|
|
|
return $found; |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
sub get_api_status { |
424
|
0
|
|
|
0
|
0
|
|
my ($self, $what) = @_; |
425
|
0
|
0
|
|
|
|
|
croak('no what?') unless $what; |
426
|
|
|
|
|
|
|
|
427
|
0
|
|
|
|
|
|
my ($done, $result); |
428
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
429
|
|
|
|
|
|
|
sub { |
430
|
0
|
|
|
0
|
|
|
my $steps = shift; |
431
|
0
|
|
|
|
|
|
$self->conn->call('get_api_status', { what => $what }, $steps->next()); |
432
|
|
|
|
|
|
|
}, |
433
|
|
|
|
|
|
|
sub { |
434
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
435
|
0
|
|
|
0
|
|
|
my ($d, $e, $r) = @_; |
436
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
437
|
0
|
0
|
|
|
|
|
if ($e) { |
438
|
0
|
|
|
|
|
|
$self->log->error("get_api_status got error $e->{message} ($e->{code})"); |
439
|
0
|
|
|
|
|
|
$result = $e->{message}; |
440
|
0
|
|
|
|
|
|
return; |
441
|
|
|
|
|
|
|
} |
442
|
0
|
|
|
|
|
|
$result = $r; |
443
|
|
|
|
|
|
|
}], sub { |
444
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
445
|
0
|
|
|
|
|
|
$done++; |
446
|
0
|
|
|
|
|
|
$self->log->eror("something went wrong with get_api_status: $err"); |
447
|
0
|
|
|
|
|
|
}); |
448
|
|
|
|
|
|
|
|
449
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
|
451
|
0
|
|
|
|
|
|
return $result; |
452
|
|
|
|
|
|
|
} |
453
|
|
|
|
|
|
|
|
454
|
|
|
|
|
|
|
sub get_job_status { |
455
|
0
|
|
|
0
|
1
|
|
my ($self, $job_id) = @_; |
456
|
0
|
0
|
|
|
|
|
croak('no job_id?') unless $job_id; |
457
|
|
|
|
|
|
|
|
458
|
0
|
|
|
|
|
|
my ($done, $job_id2, $outargs); |
459
|
|
|
|
|
|
|
$self->get_job_status_nb( |
460
|
|
|
|
|
|
|
job_id => $job_id, |
461
|
|
|
|
|
|
|
statuscb => sub { |
462
|
0
|
|
|
0
|
|
|
($job_id2, $outargs) = @_; |
463
|
0
|
|
|
|
|
|
$done++; |
464
|
0
|
|
|
|
|
|
return; |
465
|
|
|
|
|
|
|
}, |
466
|
0
|
|
|
|
|
|
); |
467
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
468
|
0
|
|
|
|
|
|
return $job_id2, $outargs; |
469
|
|
|
|
|
|
|
} |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
sub get_job_status_nb { |
472
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
473
|
|
|
|
|
|
|
my $job_id = $args{job_id} or |
474
|
0
|
0
|
|
|
|
|
croak('no job_id?'); |
475
|
|
|
|
|
|
|
|
476
|
0
|
|
|
|
|
|
my $statuscb = $args{statuscb}; |
477
|
0
|
0
|
|
|
|
|
croak('statuscb should be a coderef') |
478
|
|
|
|
|
|
|
if ref $statuscb ne 'CODE'; |
479
|
|
|
|
|
|
|
|
480
|
0
|
|
|
|
|
|
my $notifycb = $args{notifycb}; |
481
|
0
|
0
|
0
|
|
|
|
croak('notifycb should be a coderef') |
482
|
|
|
|
|
|
|
if $notifycb and ref $notifycb ne 'CODE'; |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
#my ($done, $job_id2, $outargs); |
485
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
486
|
|
|
|
|
|
|
sub { |
487
|
0
|
|
|
0
|
|
|
my $steps = shift; |
488
|
|
|
|
|
|
|
# fixme: check results? |
489
|
0
|
0
|
|
|
|
|
$self->conn->call( |
490
|
|
|
|
|
|
|
'get_job_status', { |
491
|
|
|
|
|
|
|
job_id => $job_id, |
492
|
|
|
|
|
|
|
notify => ($notifycb ? JSON->true : JSON->false), |
493
|
|
|
|
|
|
|
}, $steps->next() |
494
|
|
|
|
|
|
|
); |
495
|
|
|
|
|
|
|
}, |
496
|
|
|
|
|
|
|
sub { |
497
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
498
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
499
|
|
|
|
|
|
|
#$self->log->debug("get_job_satus_nb got job_id: $res msg: $msg"); |
500
|
0
|
0
|
|
|
|
|
if ($e) { |
501
|
0
|
|
|
|
|
|
$self->log->error("get_job_status got error $e->{message} ($e->{code})"); |
502
|
0
|
|
|
|
|
|
$statuscb->(undef, $e->{message}); |
503
|
0
|
|
|
|
|
|
return; |
504
|
|
|
|
|
|
|
} |
505
|
0
|
|
|
|
|
|
my ($job_id2, $outargs) = @$r; |
506
|
0
|
0
|
0
|
|
|
|
if ($notifycb and !$job_id2 and !$outargs) { |
|
|
|
0
|
|
|
|
|
507
|
0
|
|
|
|
|
|
$self->jobs->{$job_id} = $notifycb; |
508
|
|
|
|
|
|
|
} |
509
|
0
|
0
|
0
|
|
|
|
$outargs = $self->{jsonobject}->encode($outargs) if $self->{json} and ref $outargs; |
510
|
0
|
|
|
|
|
|
$statuscb->($job_id2, $outargs); |
511
|
0
|
|
|
|
|
|
return; |
512
|
|
|
|
|
|
|
}], sub { |
513
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
514
|
0
|
|
|
|
|
|
$self->log->error("Something went wrong in get_job_status_nb: $err"); |
515
|
0
|
|
|
|
|
|
$err = { error => $err }; |
516
|
0
|
0
|
|
|
|
|
$err = $self->{jsonobject}->encode($err) if $self->{json}; |
517
|
0
|
|
|
|
|
|
$statuscb->(undef, $err); |
518
|
0
|
|
|
|
|
|
}); |
519
|
|
|
|
|
|
|
} |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
sub ping { |
522
|
0
|
|
|
0
|
1
|
|
my ($self, $timeout) = @_; |
523
|
|
|
|
|
|
|
|
524
|
0
|
|
0
|
|
|
|
$timeout //= $self->timeout; |
525
|
0
|
|
|
|
|
|
my ($done, $ret); |
526
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
$self->ioloop->timer($timeout => sub { |
528
|
0
|
|
|
0
|
|
|
$done++; |
529
|
0
|
|
|
|
|
|
}); |
530
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
$self->conn->call('ping', {}, sub { |
532
|
0
|
|
|
0
|
|
|
my ($e, $r) = @_; |
533
|
0
|
0
|
0
|
|
|
|
if (not $e and $r and $r =~ /pong/) { |
|
|
|
0
|
|
|
|
|
534
|
0
|
|
|
|
|
|
$ret = 1; |
535
|
|
|
|
|
|
|
} else { |
536
|
0
|
|
|
|
|
|
%$self = (); |
537
|
|
|
|
|
|
|
} |
538
|
0
|
|
|
|
|
|
$done++; |
539
|
0
|
|
|
|
|
|
}); |
540
|
|
|
|
|
|
|
|
541
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
|
543
|
0
|
|
|
|
|
|
return $ret; |
544
|
|
|
|
|
|
|
} |
545
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
sub work { |
547
|
0
|
|
|
0
|
1
|
|
my ($self, $prepare) = @_; |
548
|
|
|
|
|
|
|
|
549
|
0
|
|
|
|
|
|
my $pt = $self->ping_timeout; |
550
|
0
|
|
|
|
|
|
my $tmr; |
551
|
|
|
|
|
|
|
$tmr = $self->ioloop->recurring($pt => sub { |
552
|
0
|
|
|
0
|
|
|
my $ioloop = shift; |
553
|
0
|
|
0
|
|
|
|
$self->log->debug('in ping_timeout timer: lastping: ' |
554
|
|
|
|
|
|
|
. ($self->lastping // 0) . ' limit: ' . (time - $pt) ); |
555
|
0
|
0
|
0
|
|
|
|
return if ($self->lastping // 0) > time - $pt; |
556
|
0
|
|
|
|
|
|
$self->log->error('ping timeout'); |
557
|
0
|
|
|
|
|
|
$ioloop->remove($self->clientid); |
558
|
0
|
|
|
|
|
|
$ioloop->remove($tmr); |
559
|
0
|
|
|
|
|
|
$ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc |
560
|
0
|
|
|
|
|
|
$ioloop->stop; |
561
|
0
|
0
|
|
|
|
|
}) if $pt > 0; |
562
|
|
|
|
|
|
|
$self->on(disconnect => sub { |
563
|
0
|
|
|
0
|
|
|
my ($self, $code) = @_; |
564
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = $code; |
565
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
566
|
0
|
|
|
|
|
|
}); |
567
|
0
|
0
|
|
|
|
|
return 0 if $prepare; |
568
|
|
|
|
|
|
|
|
569
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = WORK_OK; |
570
|
0
|
|
|
|
|
|
$self->log->debug('JobCenter::Client::Mojo starting work'); |
571
|
0
|
0
|
|
|
|
|
$self->ioloop->start unless Mojo::IOLoop->is_running; |
572
|
0
|
|
|
|
|
|
$self->log->debug('JobCenter::Client::Mojo done?'); |
573
|
0
|
0
|
|
|
|
|
$self->ioloop->remove($tmr) if $tmr; |
574
|
|
|
|
|
|
|
|
575
|
0
|
|
|
|
|
|
return $self->ioloop->{__exit__}; |
576
|
|
|
|
|
|
|
} |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
sub stop { |
579
|
0
|
|
|
0
|
1
|
|
my ($self, $exit) = @_; |
580
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = $exit; |
581
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
582
|
|
|
|
|
|
|
} |
583
|
|
|
|
|
|
|
|
584
|
|
|
|
|
|
|
sub create_slotgroup { |
585
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $slots) = @_; |
586
|
0
|
0
|
|
|
|
|
croak('no slotgroup name?') unless $name; |
587
|
|
|
|
|
|
|
|
588
|
0
|
|
|
|
|
|
my ($done, $result); |
589
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
590
|
|
|
|
|
|
|
sub { |
591
|
0
|
|
|
0
|
|
|
my $steps = shift; |
592
|
0
|
|
|
|
|
|
$self->conn->call('create_slotgroup', { name => $name, slots => $slots }, $steps->next()); |
593
|
|
|
|
|
|
|
}, |
594
|
|
|
|
|
|
|
sub { |
595
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
596
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
597
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
598
|
0
|
0
|
|
|
|
|
if ($e) { |
599
|
0
|
|
|
|
|
|
$self->log->error("create_slotgroup got error $e->{message}"); |
600
|
0
|
|
|
|
|
|
$result = $e->{message}; |
601
|
0
|
|
|
|
|
|
return; |
602
|
|
|
|
|
|
|
} |
603
|
0
|
|
|
|
|
|
$result = $r; |
604
|
|
|
|
|
|
|
}], sub { |
605
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
606
|
0
|
|
|
|
|
|
$done++; |
607
|
0
|
|
|
|
|
|
$self->log->eror("something went wrong with create_slotgroup: $err"); |
608
|
0
|
|
|
|
|
|
}); |
609
|
|
|
|
|
|
|
|
610
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
|
612
|
0
|
|
|
|
|
|
return $result; |
613
|
|
|
|
|
|
|
} |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
sub announce { |
616
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
617
|
0
|
0
|
|
|
|
|
my $actionname = $args{actionname} or croak 'no actionname?'; |
618
|
0
|
0
|
|
|
|
|
my $cb = $args{cb} or croak 'no cb?'; |
619
|
|
|
|
|
|
|
#my $async = $args{async} // 0; |
620
|
0
|
0
|
0
|
|
|
|
my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync'); |
621
|
0
|
0
|
|
|
|
|
croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|sync)$/; |
622
|
0
|
|
|
|
|
|
my $undocb = $args{undocb}; |
623
|
0
|
|
|
|
|
|
my $host = hostname; |
624
|
0
|
|
0
|
|
|
|
my $workername = $args{workername} // "$self->{who} $host $0 $$"; |
625
|
|
|
|
|
|
|
|
626
|
0
|
0
|
|
|
|
|
croak "already have action $actionname" if $self->actions->{$actionname}; |
627
|
|
|
|
|
|
|
|
628
|
0
|
|
|
|
|
|
my ($done, $err); |
629
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
630
|
|
|
|
|
|
|
sub { |
631
|
0
|
|
|
0
|
|
|
my $steps = shift; |
632
|
|
|
|
|
|
|
$self->conn->call('announce', { |
633
|
|
|
|
|
|
|
workername => $workername, |
634
|
|
|
|
|
|
|
actionname => $actionname, |
635
|
|
|
|
|
|
|
slotgroup => $args{slotgroup}, |
636
|
|
|
|
|
|
|
slots => $args{slots}, |
637
|
0
|
0
|
|
|
|
|
(($args{filter}) ? (filter => $args{filter}) : ()), |
638
|
|
|
|
|
|
|
}, $steps->next()); |
639
|
|
|
|
|
|
|
}, |
640
|
|
|
|
|
|
|
sub { |
641
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
642
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
643
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
644
|
0
|
0
|
|
|
|
|
if ($e) { |
645
|
0
|
|
|
|
|
|
$self->log->error("announce got error: $e->{message}"); |
646
|
0
|
|
|
|
|
|
$err = $e->{message}; |
647
|
0
|
|
|
|
|
|
return; |
648
|
|
|
|
|
|
|
} |
649
|
0
|
|
|
|
|
|
my ($res, $msg) = @$r; |
650
|
0
|
|
|
|
|
|
$self->log->debug("announce got res: $res msg: $msg"); |
651
|
|
|
|
|
|
|
$self->actions->{$actionname} = { |
652
|
|
|
|
|
|
|
cb => $cb, |
653
|
|
|
|
|
|
|
mode => $mode, |
654
|
|
|
|
|
|
|
undocb => $undocb, |
655
|
0
|
0
|
0
|
|
|
|
addenv => $args{addenv} // 0, |
656
|
|
|
|
|
|
|
} if $res; |
657
|
0
|
0
|
|
|
|
|
$err = $msg unless $res; |
658
|
|
|
|
|
|
|
}], sub { |
659
|
0
|
|
|
0
|
|
|
($err) = @_; |
660
|
0
|
|
|
|
|
|
$done++; |
661
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with announce: $err"); |
662
|
0
|
|
|
|
|
|
}); |
663
|
|
|
|
|
|
|
|
664
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
665
|
0
|
|
|
|
|
|
return $err; |
666
|
|
|
|
|
|
|
} |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
sub rpc_ping { |
669
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i, $rpccb) = @_; |
670
|
0
|
|
|
|
|
|
$self->lastping(time()); |
671
|
0
|
|
|
|
|
|
return 'pong!'; |
672
|
|
|
|
|
|
|
} |
673
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
sub rpc_task_ready { |
675
|
|
|
|
|
|
|
#say 'got task_ready: ', Dumper(\@_); |
676
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i) = @_; |
677
|
0
|
|
|
|
|
|
my $actionname = $i->{actionname}; |
678
|
0
|
|
|
|
|
|
my $job_id = $i->{job_id}; |
679
|
0
|
|
|
|
|
|
my $action = $self->actions->{$actionname}; |
680
|
0
|
0
|
|
|
|
|
unless ($action) { |
681
|
0
|
|
|
|
|
|
$self->log->info("got task_ready for unknown action $actionname"); |
682
|
0
|
|
|
|
|
|
return; |
683
|
|
|
|
|
|
|
} |
684
|
|
|
|
|
|
|
|
685
|
0
|
|
|
|
|
|
$self->log->debug("got task_ready for $actionname job_id $job_id calling get_task"); |
686
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
687
|
|
|
|
|
|
|
sub { |
688
|
0
|
|
|
0
|
|
|
my $steps = shift; |
689
|
0
|
|
|
|
|
|
$c->call('get_task', {actionname => $actionname, job_id => $job_id}, $steps->next()); |
690
|
|
|
|
|
|
|
}, |
691
|
|
|
|
|
|
|
sub { |
692
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
693
|
|
|
|
|
|
|
#say 'get_task returned: ', Dumper(\@_); |
694
|
0
|
0
|
|
|
|
|
if ($e) { |
695
|
0
|
|
|
|
|
|
$$self->log->debug("got $e->{message} ($e->{code}) calling get_task"); |
696
|
|
|
|
|
|
|
} |
697
|
0
|
0
|
|
|
|
|
unless ($r) { |
698
|
0
|
|
|
|
|
|
$self->log->debug('no task for get_task'); |
699
|
0
|
|
|
|
|
|
return; |
700
|
|
|
|
|
|
|
} |
701
|
0
|
|
|
|
|
|
my ($cookie, @args); |
702
|
0
|
|
|
|
|
|
($job_id, $cookie, @args) = @$r; |
703
|
0
|
0
|
|
|
|
|
unless ($cookie) { |
704
|
0
|
|
|
|
|
|
$self->log->debug('aaah? no cookie? (get_task)'); |
705
|
0
|
|
|
|
|
|
return; |
706
|
|
|
|
|
|
|
} |
707
|
0
|
0
|
|
|
|
|
pop @args unless $action->{addenv}; # remove env |
708
|
0
|
|
|
|
|
|
local $@; |
709
|
0
|
0
|
|
|
|
|
if ($action->{mode} eq 'subproc') { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
710
|
0
|
|
|
|
|
|
eval { |
711
|
0
|
|
|
|
|
|
$self->_subproc($c, $action, $job_id, $cookie, @args); |
712
|
|
|
|
|
|
|
}; |
713
|
0
|
0
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@; |
714
|
|
|
|
|
|
|
} elsif ($action->{mode} eq 'async') { |
715
|
0
|
|
|
|
|
|
eval { |
716
|
|
|
|
|
|
|
$action->{cb}->($job_id, @args, sub { |
717
|
0
|
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => $_[0] }); |
718
|
0
|
|
|
|
|
|
}); |
719
|
|
|
|
|
|
|
}; |
720
|
0
|
0
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@; |
721
|
|
|
|
|
|
|
} elsif ($action->{mode} eq 'sync') { |
722
|
0
|
|
|
|
|
|
my $outargs = eval { $action->{cb}->($job_id, @args) }; |
|
0
|
|
|
|
|
|
|
723
|
0
|
0
|
|
|
|
|
$outargs = { error => $@ } if $@; |
724
|
0
|
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => $outargs }); |
725
|
|
|
|
|
|
|
} else { |
726
|
0
|
|
|
|
|
|
die "unkown mode $action->{mode}"; |
727
|
|
|
|
|
|
|
} |
728
|
|
|
|
|
|
|
}], sub { |
729
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
730
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with rpc_task_ready: $err"); |
731
|
0
|
|
|
|
|
|
}); |
732
|
|
|
|
|
|
|
} |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
sub _subproc { |
735
|
0
|
|
|
0
|
|
|
my ($self, $c, $action, $job_id, $cookie, @args) = @_; |
736
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
# based on Mojo::IOLoop::Subprocess |
738
|
0
|
|
|
|
|
|
my $ioloop = $self->ioloop; |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
# Pipe for subprocess communication |
741
|
0
|
0
|
|
|
|
|
pipe(my $reader, my $writer) or die "Can't create pipe: $!"; |
742
|
|
|
|
|
|
|
|
743
|
0
|
0
|
|
|
|
|
die "Can't fork: $!" unless defined(my $pid = fork); |
744
|
0
|
0
|
|
|
|
|
unless ($pid) {# Child |
745
|
0
|
|
|
|
|
|
$self->log->debug("in child $$");; |
746
|
0
|
|
|
|
|
|
$ioloop->reset; |
747
|
0
|
|
|
|
|
|
CORE::close $reader; # or we won't get a sigpipe when daddy dies.. |
748
|
0
|
|
|
|
|
|
my $undo = 0; |
749
|
0
|
|
|
|
|
|
my $outargs = eval { $action->{cb}->($job_id, @args) }; |
|
0
|
|
|
|
|
|
|
750
|
0
|
0
|
0
|
|
|
|
if ($@) { |
|
|
0
|
|
|
|
|
|
751
|
0
|
|
|
|
|
|
$outargs = {'error' => $@}; |
752
|
0
|
|
|
|
|
|
$undo++; |
753
|
|
|
|
|
|
|
} elsif (ref $outargs eq 'HASH' and $outargs->{'error'}) { |
754
|
0
|
|
|
|
|
|
$undo++; |
755
|
|
|
|
|
|
|
} |
756
|
0
|
0
|
0
|
|
|
|
if ($undo and $action->{undocb}) { |
757
|
0
|
|
|
|
|
|
$self->log->info("undoing for $job_id");; |
758
|
0
|
|
|
|
|
|
my $res = eval { $action->{undocb}->($job_id, @args); }; |
|
0
|
|
|
|
|
|
|
759
|
0
|
0
|
|
|
|
|
$res = $@ if $@; |
760
|
|
|
|
|
|
|
# how should this look? |
761
|
|
|
|
|
|
|
$outargs = {'error' => { |
762
|
|
|
|
|
|
|
'msg' => 'undo failure', |
763
|
|
|
|
|
|
|
'undo' => $res, |
764
|
|
|
|
|
|
|
'olderr' => $outargs->{error}, |
765
|
0
|
|
|
|
|
|
}}; |
766
|
0
|
|
|
|
|
|
$undo = 0; |
767
|
|
|
|
|
|
|
} |
768
|
|
|
|
|
|
|
# stop ignoring sigpipe |
769
|
0
|
|
|
0
|
|
|
$SIG{PIPE} = sub { $undo++ }; |
|
0
|
|
|
|
|
|
|
770
|
|
|
|
|
|
|
# if the parent is gone we get a sigpipe here: |
771
|
0
|
|
|
|
|
|
print $writer Storable::freeze($outargs); |
772
|
0
|
0
|
|
|
|
|
$writer->flush or $undo++; |
773
|
0
|
0
|
|
|
|
|
CORE::close $writer or $undo++; |
774
|
0
|
0
|
0
|
|
|
|
if ($undo and $action->{undocb}) { |
775
|
0
|
|
|
|
|
|
$self->log->info("undoing for $job_id");; |
776
|
0
|
|
|
|
|
|
eval { $action->{undocb}->($job_id, @args); }; |
|
0
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
# ignore errors because we can't report them back.. |
778
|
|
|
|
|
|
|
} |
779
|
|
|
|
|
|
|
# FIXME: normal exit? |
780
|
0
|
|
|
|
|
|
POSIX::_exit(0); |
781
|
|
|
|
|
|
|
} |
782
|
|
|
|
|
|
|
|
783
|
|
|
|
|
|
|
# Parent |
784
|
0
|
|
|
|
|
|
my $me = $$; |
785
|
0
|
|
|
|
|
|
CORE::close $writer; |
786
|
0
|
|
|
|
|
|
my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0); |
787
|
0
|
|
|
|
|
|
$ioloop->stream($stream); |
788
|
0
|
|
|
|
|
|
my $buffer = ''; |
789
|
0
|
|
|
0
|
|
|
$stream->on(read => sub { $buffer .= pop }); |
|
0
|
|
|
|
|
|
|
790
|
|
|
|
|
|
|
$stream->on( |
791
|
|
|
|
|
|
|
close => sub { |
792
|
|
|
|
|
|
|
#say "close handler!"; |
793
|
0
|
0
|
|
0
|
|
|
return unless $$ == $me; |
794
|
0
|
|
|
|
|
|
waitpid $pid, 0; |
795
|
0
|
|
|
|
|
|
my $outargs = eval { Storable::thaw($buffer) }; |
|
0
|
|
|
|
|
|
|
796
|
0
|
0
|
|
|
|
|
$outargs = { error => $@ } if $@; |
797
|
0
|
0
|
0
|
|
|
|
if ($outargs and ref $outargs eq 'HASH') { |
798
|
0
|
|
|
|
|
|
$self->log->debug('subprocess results: ' . Dumper($outargs)); |
799
|
0
|
|
|
|
|
|
eval { $c->notify( |
|
0
|
|
|
|
|
|
|
800
|
|
|
|
|
|
|
'task_done', |
801
|
|
|
|
|
|
|
{ cookie => $cookie, outargs => $outargs } |
802
|
|
|
|
|
|
|
); }; # the connection might be gone? |
803
|
|
|
|
|
|
|
} # else? |
804
|
|
|
|
|
|
|
} |
805
|
0
|
|
|
|
|
|
); |
806
|
|
|
|
|
|
|
} |
807
|
|
|
|
|
|
|
|
808
|
|
|
|
|
|
|
# tick while Mojo::Reactor is still running and condition callback is true |
809
|
|
|
|
|
|
|
sub _loop { |
810
|
0
|
0
|
|
0
|
|
|
warn __PACKAGE__." recursing into IO loop" if state $looping++; |
811
|
|
|
|
|
|
|
|
812
|
0
|
|
|
|
|
|
my $reactor = $_[0]->ioloop->singleton->reactor; |
813
|
0
|
|
|
|
|
|
my $err; |
814
|
|
|
|
|
|
|
|
815
|
0
|
0
|
|
|
|
|
if (ref $reactor eq 'Mojo::Reactor::EV') { |
|
|
0
|
|
|
|
|
|
816
|
|
|
|
|
|
|
|
817
|
0
|
|
|
|
|
|
my $active = 1; |
818
|
|
|
|
|
|
|
|
819
|
0
|
|
0
|
|
|
|
$active = $reactor->one_tick while $_[1]->() && $active; |
820
|
|
|
|
|
|
|
|
821
|
|
|
|
|
|
|
} elsif (ref $reactor eq 'Mojo::Reactor::Poll') { |
822
|
|
|
|
|
|
|
|
823
|
0
|
|
|
|
|
|
$reactor->{running}++; |
824
|
|
|
|
|
|
|
|
825
|
0
|
|
0
|
|
|
|
$reactor->one_tick while $_[1]->() && $reactor->is_running; |
826
|
|
|
|
|
|
|
|
827
|
0
|
|
0
|
|
|
|
$reactor->{running} &&= $reactor->{running} - 1; |
828
|
|
|
|
|
|
|
|
829
|
|
|
|
|
|
|
} else { |
830
|
|
|
|
|
|
|
|
831
|
0
|
|
|
|
|
|
$err = "unknown reactor: ".ref $reactor; |
832
|
|
|
|
|
|
|
} |
833
|
|
|
|
|
|
|
|
834
|
0
|
|
|
|
|
|
$looping--; |
835
|
0
|
0
|
|
|
|
|
die $err if $err; |
836
|
|
|
|
|
|
|
} |
837
|
|
|
|
|
|
|
|
838
|
|
|
|
|
|
|
#sub DESTROY { |
839
|
|
|
|
|
|
|
# my $self = shift; |
840
|
|
|
|
|
|
|
# say 'destroying ', $self; |
841
|
|
|
|
|
|
|
#} |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
1; |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
=encoding utf8 |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
=head1 NAME |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
JobCenter::Client::Mojo - JobCenter JSON-RPC 2.0 Api client using Mojo. |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
=head1 SYNOPSIS |
852
|
|
|
|
|
|
|
|
853
|
|
|
|
|
|
|
use JobCenter::Client::Mojo; |
854
|
|
|
|
|
|
|
|
855
|
|
|
|
|
|
|
my $client = JobCenter::Client::Mojo->new( |
856
|
|
|
|
|
|
|
address => ... |
857
|
|
|
|
|
|
|
port => ... |
858
|
|
|
|
|
|
|
who => ... |
859
|
|
|
|
|
|
|
token => ... |
860
|
|
|
|
|
|
|
); |
861
|
|
|
|
|
|
|
|
862
|
|
|
|
|
|
|
my ($job_id, $outargs) = $client->call( |
863
|
|
|
|
|
|
|
wfname => 'test', |
864
|
|
|
|
|
|
|
inargs => { test => 'test' }, |
865
|
|
|
|
|
|
|
); |
866
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
=head1 DESCRIPTION |
868
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
L is a class to build a client to connect to the |
870
|
|
|
|
|
|
|
JSON-RPC 2.0 Api of the L workflow engine. The client can be |
871
|
|
|
|
|
|
|
used to create and inspect jobs as well as for providing 'worker' services |
872
|
|
|
|
|
|
|
to the JobCenter. |
873
|
|
|
|
|
|
|
|
874
|
|
|
|
|
|
|
=head1 METHODS |
875
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
=head2 new |
877
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
$client = JobCenter::Client::Mojo->new(%arguments); |
879
|
|
|
|
|
|
|
|
880
|
|
|
|
|
|
|
Class method that returns a new JobCenter::Client::Mojo object. |
881
|
|
|
|
|
|
|
|
882
|
|
|
|
|
|
|
Valid arguments are: |
883
|
|
|
|
|
|
|
|
884
|
|
|
|
|
|
|
=over 4 |
885
|
|
|
|
|
|
|
|
886
|
|
|
|
|
|
|
=item - address: address of the Api. |
887
|
|
|
|
|
|
|
|
888
|
|
|
|
|
|
|
(default: 127.0.0.1) |
889
|
|
|
|
|
|
|
|
890
|
|
|
|
|
|
|
=item - port: port of the Api |
891
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
(default 6522) |
893
|
|
|
|
|
|
|
|
894
|
|
|
|
|
|
|
=item - tls: connect using tls |
895
|
|
|
|
|
|
|
|
896
|
|
|
|
|
|
|
(default false) |
897
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
=item - tls_ca: verify server using ca |
899
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
(default undef) |
901
|
|
|
|
|
|
|
|
902
|
|
|
|
|
|
|
=item - tls_key: private client key |
903
|
|
|
|
|
|
|
|
904
|
|
|
|
|
|
|
(default undef) |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
=item - tls_ca: public client certificate |
907
|
|
|
|
|
|
|
|
908
|
|
|
|
|
|
|
(default undef) |
909
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
=item - who: who to authenticate as. |
911
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
(required) |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
=item - method: how to authenticate. |
915
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
(default: password) |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
=item - token: token to authenticate with. |
919
|
|
|
|
|
|
|
|
920
|
|
|
|
|
|
|
(required) |
921
|
|
|
|
|
|
|
|
922
|
|
|
|
|
|
|
=item - debug: when true prints debugging using L |
923
|
|
|
|
|
|
|
|
924
|
|
|
|
|
|
|
(default: false) |
925
|
|
|
|
|
|
|
|
926
|
|
|
|
|
|
|
=item - ioloop: L object to use |
927
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
(per default the L->singleton object is used) |
929
|
|
|
|
|
|
|
|
930
|
|
|
|
|
|
|
=item - json: flag wether input is json or perl. |
931
|
|
|
|
|
|
|
|
932
|
|
|
|
|
|
|
when true expects the inargs to be valid json, when false a perl hashref is |
933
|
|
|
|
|
|
|
expected and json encoded. (default true) |
934
|
|
|
|
|
|
|
|
935
|
|
|
|
|
|
|
=item - jsonobject: json encoder/decoder object to use |
936
|
|
|
|
|
|
|
|
937
|
|
|
|
|
|
|
(per default a new L object is created) |
938
|
|
|
|
|
|
|
|
939
|
|
|
|
|
|
|
=item - log: L object to use |
940
|
|
|
|
|
|
|
|
941
|
|
|
|
|
|
|
(per default a new L object is created) |
942
|
|
|
|
|
|
|
|
943
|
|
|
|
|
|
|
=item - timeout: how long to wait for Api calls to complete |
944
|
|
|
|
|
|
|
|
945
|
|
|
|
|
|
|
(default 60 seconds) |
946
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
=item - ping_timeout: after this long without a ping from the Api the |
948
|
|
|
|
|
|
|
connection will be closed and the work() method will return |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
(default 5 minutes) |
951
|
|
|
|
|
|
|
|
952
|
|
|
|
|
|
|
=back |
953
|
|
|
|
|
|
|
|
954
|
|
|
|
|
|
|
=head2 call |
955
|
|
|
|
|
|
|
|
956
|
|
|
|
|
|
|
($job_id, $result) = $client->call(%args); |
957
|
|
|
|
|
|
|
|
958
|
|
|
|
|
|
|
Creates a new L job and waits for the results. Throws an error |
959
|
|
|
|
|
|
|
if somethings goes wrong immediately. Errors encountered during later |
960
|
|
|
|
|
|
|
processing are returned as a L error object. |
961
|
|
|
|
|
|
|
|
962
|
|
|
|
|
|
|
Valid arguments are: |
963
|
|
|
|
|
|
|
|
964
|
|
|
|
|
|
|
=over 4 |
965
|
|
|
|
|
|
|
|
966
|
|
|
|
|
|
|
=item - wfname: name of the workflow to call (required) |
967
|
|
|
|
|
|
|
|
968
|
|
|
|
|
|
|
=item - inargs: input arguments for the workflow (if any) |
969
|
|
|
|
|
|
|
|
970
|
|
|
|
|
|
|
=item - vtag: version tag of the workflow to use (optional) |
971
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
=item - timeout: wait this many seconds for the job to finish |
973
|
|
|
|
|
|
|
(optional, defaults to 5 times the Api-call timeout, so default 5 minutes) |
974
|
|
|
|
|
|
|
|
975
|
|
|
|
|
|
|
=item - reqauth: authentication token to be passed on to the authentication |
976
|
|
|
|
|
|
|
module of the API for per job/request authentication. |
977
|
|
|
|
|
|
|
|
978
|
|
|
|
|
|
|
=item - clenv: client environment, made available as part of the job |
979
|
|
|
|
|
|
|
environment and inherited to child jobs. |
980
|
|
|
|
|
|
|
|
981
|
|
|
|
|
|
|
=back |
982
|
|
|
|
|
|
|
|
983
|
|
|
|
|
|
|
=head2 call_nb |
984
|
|
|
|
|
|
|
|
985
|
|
|
|
|
|
|
$job_id = $client->call_nb(%args); |
986
|
|
|
|
|
|
|
|
987
|
|
|
|
|
|
|
Creates a new L job and call the provided callback on completion |
988
|
|
|
|
|
|
|
of the job. Throws an error if somethings goes wrong immediately. Errors |
989
|
|
|
|
|
|
|
encountered during later processing are returned as a L error |
990
|
|
|
|
|
|
|
object to the callback. |
991
|
|
|
|
|
|
|
|
992
|
|
|
|
|
|
|
Valid arguments are those for L and: |
993
|
|
|
|
|
|
|
|
994
|
|
|
|
|
|
|
=over 4 |
995
|
|
|
|
|
|
|
|
996
|
|
|
|
|
|
|
=item - cb1: coderef to the callback to call on job creation (requird) |
997
|
|
|
|
|
|
|
|
998
|
|
|
|
|
|
|
( cb1 => sub { ($job_id, $err) = @_; ... } ) |
999
|
|
|
|
|
|
|
|
1000
|
|
|
|
|
|
|
If job_id is undefined the job was not created, the error is then returned |
1001
|
|
|
|
|
|
|
as the second return value. |
1002
|
|
|
|
|
|
|
|
1003
|
|
|
|
|
|
|
=item - cb2: coderef to the callback to call on job completion (requird) |
1004
|
|
|
|
|
|
|
|
1005
|
|
|
|
|
|
|
( cb2 => sub { ($job_id, $outargs) = @_; ... } ) |
1006
|
|
|
|
|
|
|
|
1007
|
|
|
|
|
|
|
=back |
1008
|
|
|
|
|
|
|
|
1009
|
|
|
|
|
|
|
=head2 get_job_status |
1010
|
|
|
|
|
|
|
|
1011
|
|
|
|
|
|
|
($job_id, $result) = $client->get_job_status($job_id); |
1012
|
|
|
|
|
|
|
|
1013
|
|
|
|
|
|
|
Retrieves the status for the given $job_id. If the job_id does not exist |
1014
|
|
|
|
|
|
|
then the returned $job_id will be undefined and $result will be an error |
1015
|
|
|
|
|
|
|
message. If the job has not finished executing then both $job_id and |
1016
|
|
|
|
|
|
|
$result will be undefined. Otherwise the $result will contain the result of |
1017
|
|
|
|
|
|
|
the job. (Which may be a JobCenter error object) |
1018
|
|
|
|
|
|
|
|
1019
|
|
|
|
|
|
|
=head2 get_job_status_nb |
1020
|
|
|
|
|
|
|
|
1021
|
|
|
|
|
|
|
$client->get_job_status_nb(%args); |
1022
|
|
|
|
|
|
|
|
1023
|
|
|
|
|
|
|
Retrieves the status for the given $job_id. |
1024
|
|
|
|
|
|
|
|
1025
|
|
|
|
|
|
|
Valid arguments are: |
1026
|
|
|
|
|
|
|
|
1027
|
|
|
|
|
|
|
=over 4 |
1028
|
|
|
|
|
|
|
|
1029
|
|
|
|
|
|
|
=item - job_id |
1030
|
|
|
|
|
|
|
|
1031
|
|
|
|
|
|
|
=item - statuscb: coderef to the callback for the current status |
1032
|
|
|
|
|
|
|
|
1033
|
|
|
|
|
|
|
( statuscb => sub { ($job_id, $result) = @_; ... } ) |
1034
|
|
|
|
|
|
|
|
1035
|
|
|
|
|
|
|
If the job_id does not exist then the returned $job_id will be undefined |
1036
|
|
|
|
|
|
|
and $result will be an error message. If the job has not finished executing |
1037
|
|
|
|
|
|
|
then both $job_id and $result will be undefined. Otherwise the $result will |
1038
|
|
|
|
|
|
|
contain the result of the job. (Which may be a JobCenter error object) |
1039
|
|
|
|
|
|
|
|
1040
|
|
|
|
|
|
|
=item - notifycb: coderef to the callback for job completion |
1041
|
|
|
|
|
|
|
|
1042
|
|
|
|
|
|
|
( statuscb => sub { ($job_id, $result) = @_; ... } ) |
1043
|
|
|
|
|
|
|
|
1044
|
|
|
|
|
|
|
If the job was still running when the get_job_status_nb call was made then |
1045
|
|
|
|
|
|
|
this callback will be called on completion of the job. |
1046
|
|
|
|
|
|
|
|
1047
|
|
|
|
|
|
|
=back |
1048
|
|
|
|
|
|
|
|
1049
|
|
|
|
|
|
|
=head2 check_if_lock_exists |
1050
|
|
|
|
|
|
|
|
1051
|
|
|
|
|
|
|
$found = $client->find_jobs($locktype, $lockvalue); |
1052
|
|
|
|
|
|
|
|
1053
|
|
|
|
|
|
|
Checks if a lock with the given locktype and lockvalue exists. (I.e. a job |
1054
|
|
|
|
|
|
|
is currently running that holds that lock. |
1055
|
|
|
|
|
|
|
|
1056
|
|
|
|
|
|
|
Returns true if the lock exists, null if the locktype does not exist, false |
1057
|
|
|
|
|
|
|
otherwise. |
1058
|
|
|
|
|
|
|
|
1059
|
|
|
|
|
|
|
=head2 find_jobs |
1060
|
|
|
|
|
|
|
|
1061
|
|
|
|
|
|
|
($err, @jobs) = $client->find_jobs({'foo'=>'bar'}); |
1062
|
|
|
|
|
|
|
|
1063
|
|
|
|
|
|
|
Finds all currently running jobs with arguments matching the filter |
1064
|
|
|
|
|
|
|
expression. The expression is evaluated in PostgreSQL using the @> for |
1065
|
|
|
|
|
|
|
jsonb objects, basically this means that you can only do equality tests for |
1066
|
|
|
|
|
|
|
one or more top-level keys. If @jobs is empty $err might contain an error |
1067
|
|
|
|
|
|
|
message. |
1068
|
|
|
|
|
|
|
|
1069
|
|
|
|
|
|
|
=head2 ping |
1070
|
|
|
|
|
|
|
|
1071
|
|
|
|
|
|
|
$status = $client->ping($timeout); |
1072
|
|
|
|
|
|
|
|
1073
|
|
|
|
|
|
|
Tries to ping the JobCenter API. On success return true. On failure returns |
1074
|
|
|
|
|
|
|
the undefined value, after that the client object should be undefined. |
1075
|
|
|
|
|
|
|
|
1076
|
|
|
|
|
|
|
=head2 close |
1077
|
|
|
|
|
|
|
|
1078
|
|
|
|
|
|
|
$client->close() |
1079
|
|
|
|
|
|
|
|
1080
|
|
|
|
|
|
|
Closes the connection to the JobCenter API and tries to de-allocate |
1081
|
|
|
|
|
|
|
everything. Trying to use the client afterwards will produce errors. |
1082
|
|
|
|
|
|
|
|
1083
|
|
|
|
|
|
|
=head2 create_slotgroup |
1084
|
|
|
|
|
|
|
|
1085
|
|
|
|
|
|
|
$client->create_slotgroup($name, $slots) |
1086
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
A 'slotgroup' is a way of telling the JobCenter API how many taskss the |
1088
|
|
|
|
|
|
|
worker can do at once. The number of slots should be a positive integer. |
1089
|
|
|
|
|
|
|
Slotgroups names starting with a _ (underscore) are reserved for internal |
1090
|
|
|
|
|
|
|
use. Returns a error message when then was an error, the empty string |
1091
|
|
|
|
|
|
|
otherwise. |
1092
|
|
|
|
|
|
|
|
1093
|
|
|
|
|
|
|
=head2 announce |
1094
|
|
|
|
|
|
|
|
1095
|
|
|
|
|
|
|
Announces the capability to do an action to the Api. The provided callback |
1096
|
|
|
|
|
|
|
will be called when there is a task to be performed. Returns an error when |
1097
|
|
|
|
|
|
|
there was a problem announcing the action. |
1098
|
|
|
|
|
|
|
|
1099
|
|
|
|
|
|
|
my $err = $client->announce( |
1100
|
|
|
|
|
|
|
workername => 'me', |
1101
|
|
|
|
|
|
|
actionname => 'do', |
1102
|
|
|
|
|
|
|
slots => 1 |
1103
|
|
|
|
|
|
|
cb => sub { ... }, |
1104
|
|
|
|
|
|
|
); |
1105
|
|
|
|
|
|
|
die "could not announce $actionname?: $err" if $err; |
1106
|
|
|
|
|
|
|
|
1107
|
|
|
|
|
|
|
See L for an example. |
1108
|
|
|
|
|
|
|
|
1109
|
|
|
|
|
|
|
Valid arguments are: |
1110
|
|
|
|
|
|
|
|
1111
|
|
|
|
|
|
|
=over 4 |
1112
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
=item - workername: name of the worker |
1114
|
|
|
|
|
|
|
|
1115
|
|
|
|
|
|
|
(optional, defaults to client->who, processname and processid) |
1116
|
|
|
|
|
|
|
|
1117
|
|
|
|
|
|
|
=item - actionname: name of the action |
1118
|
|
|
|
|
|
|
|
1119
|
|
|
|
|
|
|
(required) |
1120
|
|
|
|
|
|
|
|
1121
|
|
|
|
|
|
|
=item - cb: callback to be called for the action |
1122
|
|
|
|
|
|
|
|
1123
|
|
|
|
|
|
|
(required) |
1124
|
|
|
|
|
|
|
|
1125
|
|
|
|
|
|
|
=item - mode: callback mode |
1126
|
|
|
|
|
|
|
|
1127
|
|
|
|
|
|
|
(optional, default 'sync') |
1128
|
|
|
|
|
|
|
|
1129
|
|
|
|
|
|
|
Possible values: |
1130
|
|
|
|
|
|
|
|
1131
|
|
|
|
|
|
|
=over 8 |
1132
|
|
|
|
|
|
|
|
1133
|
|
|
|
|
|
|
=item - 'sync': simple blocking mode, just return the results from the |
1134
|
|
|
|
|
|
|
callback. Use only for callbacks taking less than (about) a second. |
1135
|
|
|
|
|
|
|
|
1136
|
|
|
|
|
|
|
=item - 'subproc': the simple blocking callback is started in a seperate |
1137
|
|
|
|
|
|
|
process. Useful for callbacks that take a long time. |
1138
|
|
|
|
|
|
|
|
1139
|
|
|
|
|
|
|
=item - 'async': the callback gets passed another callback as the last |
1140
|
|
|
|
|
|
|
argument that is to be called on completion of the task. For advanced use |
1141
|
|
|
|
|
|
|
cases where the worker is actually more like a proxy. The (initial) |
1142
|
|
|
|
|
|
|
callback is expected to return soonish to the event loop, after setting up |
1143
|
|
|
|
|
|
|
some Mojo-callbacks. |
1144
|
|
|
|
|
|
|
|
1145
|
|
|
|
|
|
|
=back |
1146
|
|
|
|
|
|
|
|
1147
|
|
|
|
|
|
|
=item - async: backwards compatible way for specifying mode 'async' |
1148
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
(optional, default false) |
1150
|
|
|
|
|
|
|
|
1151
|
|
|
|
|
|
|
=item - slotgroup: the slotgroup to use for accounting of parrallel tasks |
1152
|
|
|
|
|
|
|
|
1153
|
|
|
|
|
|
|
(optional, conflicts with 'slots') |
1154
|
|
|
|
|
|
|
|
1155
|
|
|
|
|
|
|
=item - slots: the amount of tasks the worker is able to process in parallel |
1156
|
|
|
|
|
|
|
for this action. |
1157
|
|
|
|
|
|
|
|
1158
|
|
|
|
|
|
|
(optional, default 1, conflicts with 'slotgroup') |
1159
|
|
|
|
|
|
|
|
1160
|
|
|
|
|
|
|
=item - undocb: a callback that gets called when the original callback |
1161
|
|
|
|
|
|
|
returns an error object or throws an error. |
1162
|
|
|
|
|
|
|
|
1163
|
|
|
|
|
|
|
Called with the same arguments as the original callback. |
1164
|
|
|
|
|
|
|
|
1165
|
|
|
|
|
|
|
(optional, only valid for mode 'subproc') |
1166
|
|
|
|
|
|
|
|
1167
|
|
|
|
|
|
|
=item - filter: only process a subset of the action |
1168
|
|
|
|
|
|
|
|
1169
|
|
|
|
|
|
|
The filter expression allows a worker to specify that it can only do the |
1170
|
|
|
|
|
|
|
actionname for a certain subset of arguments. For example, for a "mkdir" |
1171
|
|
|
|
|
|
|
action the filter expression {'host' => 'example.com'} would mean that this |
1172
|
|
|
|
|
|
|
worker can only do mkdir on host example.com. Filter expressions are limited |
1173
|
|
|
|
|
|
|
to simple equality tests on one or more keys, and only those keys that are |
1174
|
|
|
|
|
|
|
allowed in the action definition. Filtering can be allowed, be mandatory or |
1175
|
|
|
|
|
|
|
be forbidden per action. |
1176
|
|
|
|
|
|
|
|
1177
|
|
|
|
|
|
|
=item - addenv: pass on action enviroment to the callback |
1178
|
|
|
|
|
|
|
|
1179
|
|
|
|
|
|
|
If the addenv flag is true the action callback will be given one extra |
1180
|
|
|
|
|
|
|
argument containing the action environment as a hashref. In the async |
1181
|
|
|
|
|
|
|
callback mode the environment will be inserted before the result callback. |
1182
|
|
|
|
|
|
|
|
1183
|
|
|
|
|
|
|
=back |
1184
|
|
|
|
|
|
|
|
1185
|
|
|
|
|
|
|
=head2 work |
1186
|
|
|
|
|
|
|
|
1187
|
|
|
|
|
|
|
Starts the L. Returns a non-zero value when the IOLoop was |
1188
|
|
|
|
|
|
|
stopped due to some error condition (like a lost connection or a ping |
1189
|
|
|
|
|
|
|
timeout). |
1190
|
|
|
|
|
|
|
|
1191
|
|
|
|
|
|
|
=head3 Possible work() exit codes |
1192
|
|
|
|
|
|
|
|
1193
|
|
|
|
|
|
|
The JobCenter::Client::Mojo library currently defines the following exit codes: |
1194
|
|
|
|
|
|
|
|
1195
|
|
|
|
|
|
|
WORK_OK |
1196
|
|
|
|
|
|
|
WORK_PING_TIMEOUT |
1197
|
|
|
|
|
|
|
WORK_CONNECTION_CLOSED |
1198
|
|
|
|
|
|
|
|
1199
|
|
|
|
|
|
|
=head2 stop |
1200
|
|
|
|
|
|
|
|
1201
|
|
|
|
|
|
|
$client->stop($exit); |
1202
|
|
|
|
|
|
|
|
1203
|
|
|
|
|
|
|
Makes the work() function exit with the provided exit code. |
1204
|
|
|
|
|
|
|
|
1205
|
|
|
|
|
|
|
=head1 SEE ALSO |
1206
|
|
|
|
|
|
|
|
1207
|
|
|
|
|
|
|
=over 4 |
1208
|
|
|
|
|
|
|
|
1209
|
|
|
|
|
|
|
=item * |
1210
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
L, L, L: the L Web framework |
1212
|
|
|
|
|
|
|
|
1213
|
|
|
|
|
|
|
=item * |
1214
|
|
|
|
|
|
|
|
1215
|
|
|
|
|
|
|
L, L |
1216
|
|
|
|
|
|
|
|
1217
|
|
|
|
|
|
|
=back |
1218
|
|
|
|
|
|
|
|
1219
|
|
|
|
|
|
|
L: JobCenter Orchestration Engine |
1220
|
|
|
|
|
|
|
|
1221
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENT |
1222
|
|
|
|
|
|
|
|
1223
|
|
|
|
|
|
|
This software has been developed with support from L. |
1224
|
|
|
|
|
|
|
In German: Diese Software wurde mit Unterstützung von L entwickelt. |
1225
|
|
|
|
|
|
|
|
1226
|
|
|
|
|
|
|
=head1 THANKS |
1227
|
|
|
|
|
|
|
|
1228
|
|
|
|
|
|
|
Thanks to Eitan Schuler for reporting a bug and providing a pull request. |
1229
|
|
|
|
|
|
|
|
1230
|
|
|
|
|
|
|
=head1 AUTHORS |
1231
|
|
|
|
|
|
|
|
1232
|
|
|
|
|
|
|
=over 4 |
1233
|
|
|
|
|
|
|
|
1234
|
|
|
|
|
|
|
=item * |
1235
|
|
|
|
|
|
|
|
1236
|
|
|
|
|
|
|
Wieger Opmeer |
1237
|
|
|
|
|
|
|
|
1238
|
|
|
|
|
|
|
=back |
1239
|
|
|
|
|
|
|
|
1240
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
1241
|
|
|
|
|
|
|
|
1242
|
|
|
|
|
|
|
This software is copyright (c) 2017 by Wieger Opmeer. |
1243
|
|
|
|
|
|
|
|
1244
|
|
|
|
|
|
|
This is free software; you can redistribute it and/or modify it under |
1245
|
|
|
|
|
|
|
the same terms as the Perl 5 programming language system itself. |
1246
|
|
|
|
|
|
|
|
1247
|
|
|
|
|
|
|
=cut |
1248
|
|
|
|
|
|
|
|
1249
|
|
|
|
|
|
|
1; |