line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package JobCenter::Client::Mojo; |
2
|
1
|
|
|
1
|
|
852
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
8
|
|
3
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
our $VERSION = '0.44'; # VERSION |
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
# |
7
|
|
|
|
|
|
|
# Mojo's default reactor uses EV, and EV does not play nice with signals |
8
|
|
|
|
|
|
|
# without some handholding. We either can try to detect EV and do the |
9
|
|
|
|
|
|
|
# handholding, or try to prevent Mojo using EV. |
10
|
|
|
|
|
|
|
# |
11
|
|
|
|
|
|
|
BEGIN { |
12
|
1
|
50
|
|
1
|
|
1931
|
$ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'}; |
13
|
|
|
|
|
|
|
} |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
# more Mojolicious |
16
|
1
|
|
|
1
|
|
524
|
use Mojo::IOLoop; |
|
1
|
|
|
|
|
161177
|
|
|
1
|
|
|
|
|
16
|
|
17
|
1
|
|
|
1
|
|
43
|
use Mojo::IOLoop::Stream; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
6
|
|
18
|
1
|
|
|
1
|
|
527
|
use Mojo::Log; |
|
1
|
|
|
|
|
12503
|
|
|
1
|
|
|
|
|
10
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# standard perl |
21
|
1
|
|
|
1
|
|
41
|
use Carp qw(croak); |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
48
|
|
22
|
1
|
|
|
1
|
|
6
|
use Cwd qw(realpath); |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
51
|
|
23
|
1
|
|
|
1
|
|
7
|
use Data::Dumper; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
51
|
|
24
|
1
|
|
|
1
|
|
7
|
use Encode qw(encode_utf8 decode_utf8); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
43
|
|
25
|
1
|
|
|
1
|
|
6
|
use File::Basename; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
56
|
|
26
|
1
|
|
|
1
|
|
5
|
use IO::Handle; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
47
|
|
27
|
1
|
|
|
1
|
|
6
|
use POSIX (); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
15
|
|
28
|
1
|
|
|
1
|
|
790
|
use Storable; |
|
1
|
|
|
|
|
3357
|
|
|
1
|
|
|
|
|
59
|
|
29
|
1
|
|
|
1
|
|
555
|
use Sys::Hostname; |
|
1
|
|
|
|
|
1018
|
|
|
1
|
|
|
|
|
73
|
|
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
# from cpan |
32
|
1
|
|
|
1
|
|
509
|
use JSON::RPC2::TwoWay 0.05; |
|
1
|
|
|
|
|
3871
|
|
|
1
|
|
|
|
|
31
|
|
33
|
|
|
|
|
|
|
# JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here |
34
|
|
|
|
|
|
|
# without adding another dependency |
35
|
1
|
|
|
1
|
|
6
|
use JSON::MaybeXS qw(); |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
21
|
|
36
|
1
|
|
|
1
|
|
537
|
use MojoX::NetstringStream 0.06; # for the enhanced close |
|
1
|
|
|
|
|
1237
|
|
|
1
|
|
|
|
|
8
|
|
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# us |
39
|
1
|
|
|
1
|
|
602
|
use JobCenter::Client::Mojo::Steps; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
8
|
|
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
has [qw( |
42
|
|
|
|
|
|
|
actions address auth clientid conn debug ioloop jobs json |
43
|
|
|
|
|
|
|
lastping log method ns ping_timeout port rpc timeout tls token who |
44
|
|
|
|
|
|
|
)]; |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# keep in sync with the jobcenter |
47
|
|
|
|
|
|
|
use constant { |
48
|
1
|
|
|
|
|
8219
|
WORK_OK => 0, # exit codes for work method |
49
|
|
|
|
|
|
|
WORK_PING_TIMEOUT => 92, |
50
|
|
|
|
|
|
|
WORK_CONNECTION_CLOSED => 91, |
51
|
1
|
|
|
1
|
|
92
|
}; |
|
1
|
|
|
|
|
1
|
|
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
sub new { |
54
|
0
|
|
|
0
|
1
|
|
my ($class, %args) = @_; |
55
|
0
|
|
|
|
|
|
my $self = $class->SUPER::new(); |
56
|
|
|
|
|
|
|
|
57
|
0
|
|
0
|
|
|
|
my $address = $args{address} // '127.0.0.1'; |
58
|
0
|
|
0
|
|
|
|
my $debug = $args{debug} // 0; # or 1? |
59
|
0
|
|
0
|
|
|
|
$self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton; |
60
|
0
|
|
0
|
|
|
|
my $json = $args{json} // 1; |
61
|
0
|
0
|
0
|
|
|
|
my $log = $args{log} // Mojo::Log->new(level => ($debug) ? 'debug' : 'info'); |
62
|
0
|
|
0
|
|
|
|
my $method = $args{method} // 'password'; |
63
|
0
|
|
0
|
|
|
|
my $port = $args{port} // 6522; |
64
|
0
|
|
0
|
|
|
|
my $timeout = $args{timeout} // 60; |
65
|
0
|
|
0
|
|
|
|
my $tls = $args{tls} // 0; |
66
|
0
|
|
|
|
|
|
my $tls_ca = $args{tls_ca}; |
67
|
0
|
|
|
|
|
|
my $tls_cert = $args{tls_cert}; |
68
|
0
|
|
|
|
|
|
my $tls_key = $args{tls_key}; |
69
|
0
|
0
|
|
|
|
|
my $token = $args{token} or croak 'no token?'; |
70
|
0
|
0
|
|
|
|
|
my $who = $args{who} or croak 'no who?'; |
71
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
|
$self->{address} = $address; |
73
|
0
|
|
0
|
|
|
|
$self->{debug} = $args{debug} // 1; |
74
|
0
|
|
|
|
|
|
$self->{jobs} = {}; |
75
|
0
|
|
|
|
|
|
$self->{json} = $json; |
76
|
|
|
|
|
|
|
$self->{jsonobject} = $args{jsonobject} // JSON::MaybeXS->new(utf8 => 1), |
77
|
0
|
|
0
|
|
|
|
$self->{ping_timeout} = $args{ping_timeout} // 300; |
|
|
|
0
|
|
|
|
|
78
|
0
|
|
|
|
|
|
$self->{log} = $log; |
79
|
0
|
|
|
|
|
|
$self->{method} = $method; |
80
|
0
|
|
|
|
|
|
$self->{port} = $port; |
81
|
0
|
|
|
|
|
|
$self->{timeout} = $timeout; |
82
|
0
|
|
|
|
|
|
$self->{tls} = $tls; |
83
|
0
|
|
|
|
|
|
$self->{tls_ca} = $tls_ca; |
84
|
0
|
|
|
|
|
|
$self->{tls_cert} = $tls_cert; |
85
|
0
|
|
|
|
|
|
$self->{tls_key} = $tls_key; |
86
|
0
|
|
|
|
|
|
$self->{token} = $token; |
87
|
0
|
|
|
|
|
|
$self->{who} = $who; |
88
|
0
|
|
0
|
|
|
|
$self->{autoconnect} = ($args{autoconnect} //= 1); |
89
|
|
|
|
|
|
|
|
90
|
0
|
0
|
|
|
|
|
return $self if !$args{autoconnect}; |
91
|
|
|
|
|
|
|
|
92
|
0
|
|
|
|
|
|
$self->connect; |
93
|
|
|
|
|
|
|
|
94
|
0
|
0
|
|
|
|
|
return $self if $self->{auth}; |
95
|
0
|
|
|
|
|
|
return; |
96
|
|
|
|
|
|
|
} |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
sub connect { |
99
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
100
|
|
|
|
|
|
|
|
101
|
0
|
|
|
|
|
|
delete $self->ioloop->{__exit__}; |
102
|
0
|
|
|
|
|
|
delete $self->{auth}; |
103
|
0
|
|
|
|
|
|
$self->{actions} = {}; |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
$self->on(disconnect => sub { |
106
|
0
|
|
|
0
|
|
|
my ($self, $code) = @_; |
107
|
|
|
|
|
|
|
#$self->{_exit} = $code; |
108
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
109
|
0
|
|
|
|
|
|
}); |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
my $rpc = JSON::RPC2::TwoWay->new( |
112
|
|
|
|
|
|
|
debug => $self->{debug}, |
113
|
|
|
|
|
|
|
json => $self->{jsonobject}, |
114
|
0
|
0
|
|
|
|
|
) or croak 'no rpc?'; |
115
|
0
|
|
|
0
|
|
|
$rpc->register('greetings', sub { $self->rpc_greetings(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
116
|
0
|
|
|
0
|
|
|
$rpc->register('job_done', sub { $self->rpc_job_done(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
117
|
0
|
|
|
0
|
|
|
$rpc->register('ping', sub { $self->rpc_ping(@_) }); |
|
0
|
|
|
|
|
|
|
118
|
0
|
|
|
0
|
|
|
$rpc->register('task_ready', sub { $self->rpc_task_ready(@_) }, notification => 1); |
|
0
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
my $clarg = { |
121
|
|
|
|
|
|
|
address => $self->{address}, |
122
|
|
|
|
|
|
|
port => $self->{port}, |
123
|
|
|
|
|
|
|
tls => $self->{tls}, |
124
|
0
|
|
|
|
|
|
}; |
125
|
0
|
0
|
|
|
|
|
$clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca}; |
126
|
0
|
0
|
|
|
|
|
$clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert}; |
127
|
0
|
0
|
|
|
|
|
$clarg->{tls_key} = $self->{tls_key} if $self->{tls_key}; |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
my $clientid = $self->ioloop->client( |
130
|
|
|
|
|
|
|
$clarg => sub { |
131
|
0
|
|
|
0
|
|
|
my ($loop, $err, $stream) = @_; |
132
|
0
|
0
|
|
|
|
|
if ($err) { |
133
|
0
|
|
|
|
|
|
$err =~ s/\n$//s; |
134
|
0
|
|
|
|
|
|
$self->log->info('connection to API failed: ' . $err); |
135
|
0
|
|
|
|
|
|
$self->{auth} = 0; |
136
|
0
|
|
|
|
|
|
return; |
137
|
|
|
|
|
|
|
} |
138
|
0
|
|
|
|
|
|
my $ns = MojoX::NetstringStream->new(stream => $stream); |
139
|
0
|
|
|
|
|
|
$self->{ns} = $ns; |
140
|
|
|
|
|
|
|
my $conn = $rpc->newconnection( |
141
|
|
|
|
|
|
|
owner => $self, |
142
|
0
|
|
|
|
|
|
write => sub { $ns->write(@_) }, |
143
|
0
|
|
|
|
|
|
); |
144
|
0
|
|
|
|
|
|
$self->{conn} = $conn; |
145
|
|
|
|
|
|
|
$ns->on(chunk => sub { |
146
|
0
|
|
|
|
|
|
my ($ns2, $chunk) = @_; |
147
|
|
|
|
|
|
|
#say 'got chunk: ', $chunk; |
148
|
0
|
|
|
|
|
|
my @err = $conn->handle($chunk); |
149
|
0
|
0
|
|
|
|
|
$self->log->debug('chunk handler: ' . join(' ', grep defined, @err)) if @err; |
150
|
0
|
0
|
|
|
|
|
$ns->close if $err[0]; |
151
|
0
|
|
|
|
|
|
}); |
152
|
|
|
|
|
|
|
$ns->on(close => sub { |
153
|
|
|
|
|
|
|
# this cb is called during global destruction, at |
154
|
|
|
|
|
|
|
# least on old perls where |
155
|
|
|
|
|
|
|
# Mojo::Util::_global_destruction() won't work |
156
|
0
|
0
|
|
|
|
|
return unless $conn; |
157
|
0
|
|
|
|
|
|
$conn->close; |
158
|
0
|
|
|
|
|
|
$self->log->info('connection to API closed'); |
159
|
0
|
|
|
|
|
|
$self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo doc |
160
|
0
|
|
|
|
|
|
}); |
161
|
0
|
|
|
|
|
|
}); |
162
|
|
|
|
|
|
|
|
163
|
0
|
|
|
|
|
|
$self->{rpc} = $rpc; |
164
|
0
|
|
|
|
|
|
$self->{clientid} = $clientid; |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
# handle timeout? |
167
|
|
|
|
|
|
|
my $tmr = $self->ioloop->timer($self->{timeout} => sub { |
168
|
0
|
|
|
0
|
|
|
my $loop = shift; |
169
|
0
|
|
|
|
|
|
$self->log->error('timeout wating for greeting'); |
170
|
0
|
|
|
|
|
|
$loop->remove($clientid); |
171
|
0
|
|
|
|
|
|
$self->{auth} = 0; |
172
|
0
|
|
|
|
|
|
}); |
173
|
|
|
|
|
|
|
|
174
|
0
|
|
|
|
|
|
$self->log->debug('starting handshake'); |
175
|
|
|
|
|
|
|
|
176
|
0
|
|
|
0
|
|
|
$self->_loop(sub { not defined $self->{auth} }); |
|
0
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
|
178
|
0
|
|
|
|
|
|
$self->log->debug('done with handhake?'); |
179
|
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
|
$self->ioloop->remove($tmr); |
181
|
0
|
|
|
|
|
|
$self->unsubscribe('disconnect'); |
182
|
0
|
|
|
|
|
|
1; |
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub is_connected { |
186
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
187
|
0
|
|
0
|
|
|
|
return $self->{auth} && !$self->ioloop->{__exit__}; |
188
|
|
|
|
|
|
|
} |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
sub rpc_greetings { |
191
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i) = @_; |
192
|
|
|
|
|
|
|
#$self->ioloop->delay( |
193
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
194
|
|
|
|
|
|
|
sub { |
195
|
0
|
|
|
0
|
|
|
my $steps = shift; |
196
|
0
|
0
|
|
|
|
|
die "wrong api version $i->{version} (expected 1.1)" unless $i->{version} eq '1.1'; |
197
|
0
|
|
|
|
|
|
$self->log->info('got greeting from ' . $i->{who}); |
198
|
0
|
|
|
|
|
|
$c->call('hello', {who => $self->who, method => $self->method, token => $self->token}, $steps->next()); |
199
|
|
|
|
|
|
|
}, |
200
|
|
|
|
|
|
|
sub { |
201
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
202
|
0
|
|
|
|
|
|
my $w; |
203
|
|
|
|
|
|
|
#say 'hello returned: ', Dumper(\@_); |
204
|
0
|
0
|
|
|
|
|
die "hello returned error $e->{message} ($e->{code})" if $e; |
205
|
0
|
0
|
|
|
|
|
die 'no results from hello?' unless $r; |
206
|
0
|
|
|
|
|
|
($r, $w) = @$r; |
207
|
0
|
0
|
|
|
|
|
if ($r) { |
208
|
0
|
|
|
|
|
|
$self->log->info("hello returned: $r, $w"); |
209
|
0
|
|
|
|
|
|
$self->{auth} = 1; |
210
|
|
|
|
|
|
|
} else { |
211
|
0
|
|
0
|
|
|
|
$self->log->error('hello failed: ' . ($w // '')); |
212
|
0
|
|
|
|
|
|
$self->{auth} = 0; # defined but false |
213
|
|
|
|
|
|
|
} |
214
|
|
|
|
|
|
|
}], sub { |
215
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
216
|
0
|
|
|
|
|
|
$self->log->error('something went wrong in handshake: ' . $err); |
217
|
0
|
|
|
|
|
|
$self->{auth} = ''; |
218
|
0
|
|
|
|
|
|
}); |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
sub rpc_job_done { |
222
|
0
|
|
|
0
|
0
|
|
my ($self, $conn, $i) = @_; |
223
|
0
|
|
|
|
|
|
my $job_id = $i->{job_id}; |
224
|
0
|
|
|
|
|
|
my $outargs = $i->{outargs}; |
225
|
0
|
|
|
|
|
|
my $outargsj = $self->{jsonobject}->encode($outargs); |
226
|
0
|
0
|
|
|
|
|
$outargs = $outargsj if $self->{json}; |
227
|
0
|
|
|
|
|
|
$outargsj = decode_utf8($outargsj); # for debug printing |
228
|
0
|
|
|
|
|
|
my $rescb = delete $self->{jobs}->{$job_id}; |
229
|
0
|
0
|
|
|
|
|
if ($rescb) { |
230
|
0
|
|
|
|
|
|
$self->log->debug("got job_done: for job_id $job_id result: $outargsj"); |
231
|
0
|
|
|
|
|
|
local $@; |
232
|
0
|
|
|
|
|
|
eval { |
233
|
0
|
|
|
|
|
|
$rescb->($job_id, $outargs); |
234
|
|
|
|
|
|
|
}; |
235
|
0
|
0
|
|
|
|
|
$self->log->info("got $@ calling result callback") if $@; |
236
|
|
|
|
|
|
|
} else { |
237
|
0
|
|
|
|
|
|
$self->log->debug("got job_done for unknown job $job_id result: $outargsj"); |
238
|
|
|
|
|
|
|
} |
239
|
|
|
|
|
|
|
} |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
sub call { |
242
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
243
|
0
|
|
|
|
|
|
my ($done, $job_id, $outargs); |
244
|
|
|
|
|
|
|
$args{cb1} = sub { |
245
|
0
|
|
|
0
|
|
|
($job_id, $outargs) = @_; |
246
|
0
|
0
|
|
|
|
|
$done++ unless $job_id; |
247
|
0
|
|
|
|
|
|
}; |
248
|
|
|
|
|
|
|
$args{cb2} = sub { |
249
|
0
|
|
|
0
|
|
|
($job_id, $outargs) = @_; |
250
|
0
|
|
|
|
|
|
$done++; |
251
|
0
|
|
|
|
|
|
}; |
252
|
0
|
|
|
|
|
|
$self->call_nb(%args); |
253
|
|
|
|
|
|
|
|
254
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
|
256
|
0
|
|
|
|
|
|
return $job_id, $outargs; |
257
|
|
|
|
|
|
|
} |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
sub call_nb { |
260
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
261
|
0
|
0
|
|
|
|
|
my $wfname = $args{wfname} or die 'no workflowname?'; |
262
|
0
|
|
|
|
|
|
my $vtag = $args{vtag}; |
263
|
0
|
|
0
|
|
|
|
my $inargs = $args{inargs} // '{}'; |
264
|
0
|
|
0
|
|
|
|
my $callcb = $args{cb1} // die 'no call callback?'; |
265
|
0
|
|
0
|
|
|
|
my $rescb = $args{cb2} // die 'no result callback?'; |
266
|
0
|
|
0
|
|
|
|
my $timeout = $args{timeout} // $self->timeout * 5; # a bit hackish.. |
267
|
0
|
|
|
|
|
|
my $reqauth = $args{reqauth}; |
268
|
0
|
|
|
|
|
|
my $clenv = $args{clenv}; |
269
|
0
|
|
|
|
|
|
my $inargsj; |
270
|
|
|
|
|
|
|
|
271
|
0
|
0
|
|
|
|
|
if ($self->{json}) { |
272
|
0
|
|
|
|
|
|
$inargsj = $inargs; |
273
|
0
|
|
|
|
|
|
$inargs = $self->{jsonobject}->decode($inargs); |
274
|
0
|
0
|
|
|
|
|
croak 'inargs is not a json object' unless ref $inargs eq 'HASH'; |
275
|
0
|
0
|
|
|
|
|
if ($clenv) { |
276
|
0
|
|
|
|
|
|
$clenv = decode_json($clenv); |
277
|
0
|
0
|
|
|
|
|
croak 'clenv is not a json object' unless ref $clenv eq 'HASH'; |
278
|
|
|
|
|
|
|
} |
279
|
0
|
0
|
|
|
|
|
if ($reqauth) { |
280
|
0
|
|
|
|
|
|
$reqauth = decode_json($reqauth); |
281
|
0
|
0
|
|
|
|
|
croak 'reqauth is not a json object' unless ref $reqauth eq 'HASH'; |
282
|
|
|
|
|
|
|
} |
283
|
|
|
|
|
|
|
} else { |
284
|
0
|
0
|
|
|
|
|
croak 'inargs should be a hashref' unless ref $inargs eq 'HASH'; |
285
|
|
|
|
|
|
|
# test encoding |
286
|
0
|
|
|
|
|
|
$inargsj = $self->{jsonobject}->encode($inargs); |
287
|
0
|
0
|
|
|
|
|
if ($clenv) { |
288
|
0
|
0
|
|
|
|
|
croak 'clenv should be a hashref' unless ref $clenv eq 'HASH'; |
289
|
|
|
|
|
|
|
} |
290
|
0
|
0
|
|
|
|
|
if ($reqauth) { |
291
|
0
|
0
|
|
|
|
|
croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH'; |
292
|
|
|
|
|
|
|
} |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
0
|
|
|
|
|
|
$inargsj = decode_utf8($inargsj); |
296
|
0
|
0
|
|
|
|
|
$self->log->debug("calling $wfname with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : '')); |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
299
|
|
|
|
|
|
|
sub { |
300
|
0
|
|
|
0
|
|
|
my $steps = shift; |
301
|
0
|
0
|
|
|
|
|
$self->conn->call('create_job', { |
|
|
0
|
|
|
|
|
|
302
|
|
|
|
|
|
|
wfname => $wfname, |
303
|
|
|
|
|
|
|
vtag => $vtag, |
304
|
|
|
|
|
|
|
inargs => $inargs, |
305
|
|
|
|
|
|
|
timeout => $timeout, |
306
|
|
|
|
|
|
|
($clenv ? (clenv => $clenv) : ()), |
307
|
|
|
|
|
|
|
($reqauth ? (reqauth => $reqauth) : ()), |
308
|
|
|
|
|
|
|
}, $steps->next()); |
309
|
|
|
|
|
|
|
}, |
310
|
|
|
|
|
|
|
sub { |
311
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
312
|
0
|
|
|
|
|
|
my ($job_id, $msg); |
313
|
0
|
0
|
|
|
|
|
if ($e) { |
314
|
0
|
|
|
|
|
|
$self->log->error("create_job returned error: $e->{message} ($e->{code}"); |
315
|
0
|
|
|
|
|
|
$msg = "$e->{message} ($e->{code}" |
316
|
|
|
|
|
|
|
} else { |
317
|
0
|
|
|
|
|
|
($job_id, $msg) = @$r; # fixme: check for arrayref? |
318
|
0
|
0
|
|
|
|
|
if ($msg) { |
|
|
0
|
|
|
|
|
|
319
|
0
|
|
|
|
|
|
$self->log->error("create_job returned error: $msg"); |
320
|
|
|
|
|
|
|
} elsif ($job_id) { |
321
|
0
|
|
|
|
|
|
$self->log->debug("create_job returned job_id: $job_id"); |
322
|
0
|
|
|
|
|
|
$self->jobs->{$job_id} = $rescb; |
323
|
|
|
|
|
|
|
} |
324
|
|
|
|
|
|
|
} |
325
|
0
|
0
|
|
|
|
|
if ($msg) { |
326
|
0
|
0
|
|
|
|
|
$msg = {error => $msg} unless ref $msg; |
327
|
0
|
0
|
|
|
|
|
$msg = $self->{jsonobject}->encode($msg) if $self->{json}; |
328
|
|
|
|
|
|
|
} |
329
|
0
|
|
|
|
|
|
$callcb->($job_id, $msg); |
330
|
|
|
|
|
|
|
}], sub { |
331
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
332
|
0
|
|
|
|
|
|
$self->log->error("Something went wrong in call_nb: $err"); |
333
|
0
|
|
|
|
|
|
$err = { error => $err }; |
334
|
0
|
0
|
|
|
|
|
$err = $self->{jsonobject}->encode($err) if $self->{json}; |
335
|
0
|
|
|
|
|
|
$callcb->(undef, $err); |
336
|
0
|
|
|
|
|
|
}); |
337
|
|
|
|
|
|
|
} |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
sub close { |
340
|
0
|
|
|
0
|
1
|
|
my ($self) = @_; |
341
|
|
|
|
|
|
|
|
342
|
0
|
|
|
|
|
|
$self->log->debug('closing connection'); |
343
|
0
|
|
|
|
|
|
$self->conn->close(); |
344
|
0
|
|
|
|
|
|
$self->ns->close(); |
345
|
0
|
|
|
|
|
|
%$self = (); |
346
|
|
|
|
|
|
|
} |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
sub find_jobs { |
349
|
0
|
|
|
0
|
1
|
|
my ($self, $filter) = @_; |
350
|
0
|
0
|
|
|
|
|
croak('no filter?') unless $filter; |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
#print 'filter: ', Dumper($filter); |
353
|
0
|
0
|
|
|
|
|
$filter = $self->{jsonobject}->encode($filter) if ref $filter eq 'HASH'; |
354
|
|
|
|
|
|
|
|
355
|
0
|
|
|
|
|
|
my ($done, $err, $jobs); |
356
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
357
|
|
|
|
|
|
|
sub { |
358
|
0
|
|
|
0
|
|
|
my $steps = shift; |
359
|
|
|
|
|
|
|
# fixme: check results? |
360
|
0
|
|
|
|
|
|
$self->conn->call('find_jobs', { filter => $filter }, $steps->next()); |
361
|
|
|
|
|
|
|
}, |
362
|
|
|
|
|
|
|
sub { |
363
|
|
|
|
|
|
|
#say 'find_jobs call returned: ', Dumper(\@_); |
364
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
365
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
366
|
0
|
0
|
|
|
|
|
if ($e) { |
367
|
0
|
|
|
|
|
|
$self->log->error("find_jobs got error $e->{message} ($e->{code})"); |
368
|
0
|
|
|
|
|
|
$err = $e->{message}; |
369
|
0
|
|
|
|
|
|
return; |
370
|
|
|
|
|
|
|
} |
371
|
0
|
|
|
|
|
|
$jobs = $r; |
372
|
|
|
|
|
|
|
}], sub { |
373
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
374
|
0
|
|
|
|
|
|
$done++; |
375
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with get_job_status: $err"); |
376
|
0
|
|
|
|
|
|
}); |
377
|
|
|
|
|
|
|
|
378
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
|
380
|
0
|
0
|
|
|
|
|
return $err, @$jobs if ref $jobs eq 'ARRAY'; |
381
|
0
|
|
|
|
|
|
return $err; |
382
|
|
|
|
|
|
|
} |
383
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
sub get_api_status { |
385
|
0
|
|
|
0
|
0
|
|
my ($self, $what) = @_; |
386
|
0
|
0
|
|
|
|
|
croak('no what?') unless $what; |
387
|
|
|
|
|
|
|
|
388
|
0
|
|
|
|
|
|
my ($done, $result); |
389
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
390
|
|
|
|
|
|
|
sub { |
391
|
0
|
|
|
0
|
|
|
my $steps = shift; |
392
|
0
|
|
|
|
|
|
$self->conn->call('get_api_status', { what => $what }, $steps->next()); |
393
|
|
|
|
|
|
|
}, |
394
|
|
|
|
|
|
|
sub { |
395
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
396
|
0
|
|
|
0
|
|
|
my ($d, $e, $r) = @_; |
397
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
398
|
0
|
0
|
|
|
|
|
if ($e) { |
399
|
0
|
|
|
|
|
|
$self->log->error("get_api_status got error $e->{message} ($e->{code})"); |
400
|
0
|
|
|
|
|
|
$result = $e->{message}; |
401
|
0
|
|
|
|
|
|
return; |
402
|
|
|
|
|
|
|
} |
403
|
0
|
|
|
|
|
|
$result = $r; |
404
|
|
|
|
|
|
|
}], sub { |
405
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
406
|
0
|
|
|
|
|
|
$done++; |
407
|
0
|
|
|
|
|
|
$self->log->eror("something went wrong with get_api_status: $err"); |
408
|
0
|
|
|
|
|
|
}); |
409
|
|
|
|
|
|
|
|
410
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
return $result; |
413
|
|
|
|
|
|
|
} |
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
sub get_job_status { |
416
|
0
|
|
|
0
|
1
|
|
my ($self, $job_id) = @_; |
417
|
0
|
0
|
|
|
|
|
croak('no job_id?') unless $job_id; |
418
|
|
|
|
|
|
|
|
419
|
0
|
|
|
|
|
|
my ($done, $job_id2, $outargs); |
420
|
|
|
|
|
|
|
$self->get_job_status_nb( |
421
|
|
|
|
|
|
|
job_id => $job_id, |
422
|
|
|
|
|
|
|
statuscb => sub { |
423
|
0
|
|
|
0
|
|
|
($job_id2, $outargs) = @_; |
424
|
0
|
|
|
|
|
|
$done++; |
425
|
0
|
|
|
|
|
|
return; |
426
|
|
|
|
|
|
|
}, |
427
|
0
|
|
|
|
|
|
); |
428
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
429
|
0
|
|
|
|
|
|
return $job_id2, $outargs; |
430
|
|
|
|
|
|
|
} |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
sub get_job_status_nb { |
433
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
434
|
|
|
|
|
|
|
my $job_id = $args{job_id} or |
435
|
0
|
0
|
|
|
|
|
croak('no job_id?'); |
436
|
|
|
|
|
|
|
|
437
|
0
|
|
|
|
|
|
my $statuscb = $args{statuscb}; |
438
|
0
|
0
|
|
|
|
|
croak('statuscb should be a coderef') |
439
|
|
|
|
|
|
|
if ref $statuscb ne 'CODE'; |
440
|
|
|
|
|
|
|
|
441
|
0
|
|
|
|
|
|
my $notifycb = $args{notifycb}; |
442
|
0
|
0
|
0
|
|
|
|
croak('notifycb should be a coderef') |
443
|
|
|
|
|
|
|
if $notifycb and ref $notifycb ne 'CODE'; |
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
#my ($done, $job_id2, $outargs); |
446
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
447
|
|
|
|
|
|
|
sub { |
448
|
0
|
|
|
0
|
|
|
my $steps = shift; |
449
|
|
|
|
|
|
|
# fixme: check results? |
450
|
0
|
0
|
|
|
|
|
$self->conn->call( |
451
|
|
|
|
|
|
|
'get_job_status', { |
452
|
|
|
|
|
|
|
job_id => $job_id, |
453
|
|
|
|
|
|
|
notify => ($notifycb ? JSON->true : JSON->false), |
454
|
|
|
|
|
|
|
}, $steps->next() |
455
|
|
|
|
|
|
|
); |
456
|
|
|
|
|
|
|
}, |
457
|
|
|
|
|
|
|
sub { |
458
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
459
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
460
|
|
|
|
|
|
|
#$self->log->debug("get_job_satus_nb got job_id: $res msg: $msg"); |
461
|
0
|
0
|
|
|
|
|
if ($e) { |
462
|
0
|
|
|
|
|
|
$self->log->error("get_job_status got error $e->{message} ($e->{code})"); |
463
|
0
|
|
|
|
|
|
$statuscb->(undef, $e->{message}); |
464
|
0
|
|
|
|
|
|
return; |
465
|
|
|
|
|
|
|
} |
466
|
0
|
|
|
|
|
|
my ($job_id2, $outargs) = @$r; |
467
|
0
|
0
|
0
|
|
|
|
if ($notifycb and !$job_id2 and !$outargs) { |
|
|
|
0
|
|
|
|
|
468
|
0
|
|
|
|
|
|
$self->jobs->{$job_id} = $notifycb; |
469
|
|
|
|
|
|
|
} |
470
|
0
|
0
|
0
|
|
|
|
$outargs = $self->{jsonobject}->encode($outargs) if $self->{json} and ref $outargs; |
471
|
0
|
|
|
|
|
|
$statuscb->($job_id2, $outargs); |
472
|
0
|
|
|
|
|
|
return; |
473
|
|
|
|
|
|
|
}], sub { |
474
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
475
|
0
|
|
|
|
|
|
$self->log->error("Something went wrong in get_job_status_nb: $err"); |
476
|
0
|
|
|
|
|
|
$err = { error => $err }; |
477
|
0
|
0
|
|
|
|
|
$err = $self->{jsonobject}->encode($err) if $self->{json}; |
478
|
0
|
|
|
|
|
|
$statuscb->(undef, $err); |
479
|
0
|
|
|
|
|
|
}); |
480
|
|
|
|
|
|
|
} |
481
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
sub ping { |
483
|
0
|
|
|
0
|
1
|
|
my ($self, $timeout) = @_; |
484
|
|
|
|
|
|
|
|
485
|
0
|
|
0
|
|
|
|
$timeout //= $self->timeout; |
486
|
0
|
|
|
|
|
|
my ($done, $ret); |
487
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
$self->ioloop->timer($timeout => sub { |
489
|
0
|
|
|
0
|
|
|
$done++; |
490
|
0
|
|
|
|
|
|
}); |
491
|
|
|
|
|
|
|
|
492
|
|
|
|
|
|
|
$self->conn->call('ping', {}, sub { |
493
|
0
|
|
|
0
|
|
|
my ($e, $r) = @_; |
494
|
0
|
0
|
0
|
|
|
|
if (not $e and $r and $r =~ /pong/) { |
|
|
|
0
|
|
|
|
|
495
|
0
|
|
|
|
|
|
$ret = 1; |
496
|
|
|
|
|
|
|
} else { |
497
|
0
|
|
|
|
|
|
%$self = (); |
498
|
|
|
|
|
|
|
} |
499
|
0
|
|
|
|
|
|
$done++; |
500
|
0
|
|
|
|
|
|
}); |
501
|
|
|
|
|
|
|
|
502
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
|
504
|
0
|
|
|
|
|
|
return $ret; |
505
|
|
|
|
|
|
|
} |
506
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
sub work { |
508
|
0
|
|
|
0
|
1
|
|
my ($self, $prepare) = @_; |
509
|
|
|
|
|
|
|
|
510
|
0
|
|
|
|
|
|
my $pt = $self->ping_timeout; |
511
|
0
|
|
|
|
|
|
my $tmr; |
512
|
|
|
|
|
|
|
$tmr = $self->ioloop->recurring($pt => sub { |
513
|
0
|
|
|
0
|
|
|
my $ioloop = shift; |
514
|
0
|
|
0
|
|
|
|
$self->log->debug('in ping_timeout timer: lastping: ' |
515
|
|
|
|
|
|
|
. ($self->lastping // 0) . ' limit: ' . (time - $pt) ); |
516
|
0
|
0
|
0
|
|
|
|
return if ($self->lastping // 0) > time - $pt; |
517
|
0
|
|
|
|
|
|
$self->log->error('ping timeout'); |
518
|
0
|
|
|
|
|
|
$ioloop->remove($self->clientid); |
519
|
0
|
|
|
|
|
|
$ioloop->remove($tmr); |
520
|
0
|
|
|
|
|
|
$ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc |
521
|
0
|
|
|
|
|
|
$ioloop->stop; |
522
|
0
|
0
|
|
|
|
|
}) if $pt > 0; |
523
|
|
|
|
|
|
|
$self->on(disconnect => sub { |
524
|
0
|
|
|
0
|
|
|
my ($self, $code) = @_; |
525
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = $code; |
526
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
527
|
0
|
|
|
|
|
|
}); |
528
|
0
|
0
|
|
|
|
|
return 0 if $prepare; |
529
|
|
|
|
|
|
|
|
530
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = WORK_OK; |
531
|
0
|
|
|
|
|
|
$self->log->debug('JobCenter::Client::Mojo starting work'); |
532
|
0
|
0
|
|
|
|
|
$self->ioloop->start unless Mojo::IOLoop->is_running; |
533
|
0
|
|
|
|
|
|
$self->log->debug('JobCenter::Client::Mojo done?'); |
534
|
0
|
0
|
|
|
|
|
$self->ioloop->remove($tmr) if $tmr; |
535
|
|
|
|
|
|
|
|
536
|
0
|
|
|
|
|
|
return $self->ioloop->{__exit__}; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
sub stop { |
540
|
0
|
|
|
0
|
1
|
|
my ($self, $exit) = @_; |
541
|
0
|
|
|
|
|
|
$self->ioloop->{__exit__} = $exit; |
542
|
0
|
|
|
|
|
|
$self->ioloop->stop; |
543
|
|
|
|
|
|
|
} |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
sub create_slotgroup { |
546
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $slots) = @_; |
547
|
0
|
0
|
|
|
|
|
croak('no slotgroup name?') unless $name; |
548
|
|
|
|
|
|
|
|
549
|
0
|
|
|
|
|
|
my ($done, $result); |
550
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
551
|
|
|
|
|
|
|
sub { |
552
|
0
|
|
|
0
|
|
|
my $steps = shift; |
553
|
0
|
|
|
|
|
|
$self->conn->call('create_slotgroup', { name => $name, slots => $slots }, $steps->next()); |
554
|
|
|
|
|
|
|
}, |
555
|
|
|
|
|
|
|
sub { |
556
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
557
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
558
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
559
|
0
|
0
|
|
|
|
|
if ($e) { |
560
|
0
|
|
|
|
|
|
$self->log->error("create_slotgroup got error $e->{message}"); |
561
|
0
|
|
|
|
|
|
$result = $e->{message}; |
562
|
0
|
|
|
|
|
|
return; |
563
|
|
|
|
|
|
|
} |
564
|
0
|
|
|
|
|
|
$result = $r; |
565
|
|
|
|
|
|
|
}], sub { |
566
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
567
|
0
|
|
|
|
|
|
$done++; |
568
|
0
|
|
|
|
|
|
$self->log->eror("something went wrong with create_slotgroup: $err"); |
569
|
0
|
|
|
|
|
|
}); |
570
|
|
|
|
|
|
|
|
571
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
|
573
|
0
|
|
|
|
|
|
return $result; |
574
|
|
|
|
|
|
|
} |
575
|
|
|
|
|
|
|
|
576
|
|
|
|
|
|
|
sub announce { |
577
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
578
|
0
|
0
|
|
|
|
|
my $actionname = $args{actionname} or croak 'no actionname?'; |
579
|
0
|
0
|
|
|
|
|
my $cb = $args{cb} or croak 'no cb?'; |
580
|
|
|
|
|
|
|
#my $async = $args{async} // 0; |
581
|
0
|
0
|
0
|
|
|
|
my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync'); |
582
|
0
|
0
|
|
|
|
|
croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|sync)$/; |
583
|
0
|
|
|
|
|
|
my $undocb = $args{undocb}; |
584
|
0
|
|
|
|
|
|
my $host = hostname; |
585
|
0
|
|
0
|
|
|
|
my $workername = $args{workername} // "$self->{who} $host $0 $$"; |
586
|
|
|
|
|
|
|
|
587
|
0
|
0
|
|
|
|
|
croak "already have action $actionname" if $self->actions->{$actionname}; |
588
|
|
|
|
|
|
|
|
589
|
0
|
|
|
|
|
|
my ($done, $err); |
590
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
591
|
|
|
|
|
|
|
sub { |
592
|
0
|
|
|
0
|
|
|
my $steps = shift; |
593
|
|
|
|
|
|
|
$self->conn->call('announce', { |
594
|
|
|
|
|
|
|
workername => $workername, |
595
|
|
|
|
|
|
|
actionname => $actionname, |
596
|
|
|
|
|
|
|
slotgroup => $args{slotgroup}, |
597
|
|
|
|
|
|
|
slots => $args{slots}, |
598
|
0
|
0
|
|
|
|
|
(($args{filter}) ? (filter => $args{filter}) : ()), |
599
|
|
|
|
|
|
|
}, $steps->next()); |
600
|
|
|
|
|
|
|
}, |
601
|
|
|
|
|
|
|
sub { |
602
|
|
|
|
|
|
|
#say 'call returned: ', Dumper(\@_); |
603
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
604
|
0
|
|
|
|
|
|
$done++; # received something so done waiting |
605
|
0
|
0
|
|
|
|
|
if ($e) { |
606
|
0
|
|
|
|
|
|
$self->log->error("announce got error: $e->{message}"); |
607
|
0
|
|
|
|
|
|
$err = $e->{message}; |
608
|
0
|
|
|
|
|
|
return; |
609
|
|
|
|
|
|
|
} |
610
|
0
|
|
|
|
|
|
my ($res, $msg) = @$r; |
611
|
0
|
|
|
|
|
|
$self->log->debug("announce got res: $res msg: $msg"); |
612
|
|
|
|
|
|
|
$self->actions->{$actionname} = { |
613
|
|
|
|
|
|
|
cb => $cb, |
614
|
|
|
|
|
|
|
mode => $mode, |
615
|
|
|
|
|
|
|
undocb => $undocb, |
616
|
0
|
0
|
0
|
|
|
|
addenv => $args{addenv} // 0, |
617
|
|
|
|
|
|
|
} if $res; |
618
|
0
|
0
|
|
|
|
|
$err = $msg unless $res; |
619
|
|
|
|
|
|
|
}], sub { |
620
|
0
|
|
|
0
|
|
|
($err) = @_; |
621
|
0
|
|
|
|
|
|
$done++; |
622
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with announce: $err"); |
623
|
0
|
|
|
|
|
|
}); |
624
|
|
|
|
|
|
|
|
625
|
0
|
|
|
0
|
|
|
$self->_loop(sub { !$done }); |
|
0
|
|
|
|
|
|
|
626
|
0
|
|
|
|
|
|
return $err; |
627
|
|
|
|
|
|
|
} |
628
|
|
|
|
|
|
|
|
629
|
|
|
|
|
|
|
sub rpc_ping { |
630
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i, $rpccb) = @_; |
631
|
0
|
|
|
|
|
|
$self->lastping(time()); |
632
|
0
|
|
|
|
|
|
return 'pong!'; |
633
|
|
|
|
|
|
|
} |
634
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
sub rpc_task_ready { |
636
|
|
|
|
|
|
|
#say 'got task_ready: ', Dumper(\@_); |
637
|
0
|
|
|
0
|
0
|
|
my ($self, $c, $i) = @_; |
638
|
0
|
|
|
|
|
|
my $actionname = $i->{actionname}; |
639
|
0
|
|
|
|
|
|
my $job_id = $i->{job_id}; |
640
|
0
|
|
|
|
|
|
my $action = $self->actions->{$actionname}; |
641
|
0
|
0
|
|
|
|
|
unless ($action) { |
642
|
0
|
|
|
|
|
|
$self->log->info("got task_ready for unknown action $actionname"); |
643
|
0
|
|
|
|
|
|
return; |
644
|
|
|
|
|
|
|
} |
645
|
|
|
|
|
|
|
|
646
|
0
|
|
|
|
|
|
$self->log->debug("got task_ready for $actionname job_id $job_id calling get_task"); |
647
|
|
|
|
|
|
|
JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([ |
648
|
|
|
|
|
|
|
sub { |
649
|
0
|
|
|
0
|
|
|
my $steps = shift; |
650
|
0
|
|
|
|
|
|
$c->call('get_task', {actionname => $actionname, job_id => $job_id}, $steps->next()); |
651
|
|
|
|
|
|
|
}, |
652
|
|
|
|
|
|
|
sub { |
653
|
0
|
|
|
0
|
|
|
my ($steps, $e, $r) = @_; |
654
|
|
|
|
|
|
|
#say 'get_task returned: ', Dumper(\@_); |
655
|
0
|
0
|
|
|
|
|
if ($e) { |
656
|
0
|
|
|
|
|
|
$$self->log->debug("got $e->{message} ($e->{code}) calling get_task"); |
657
|
|
|
|
|
|
|
} |
658
|
0
|
0
|
|
|
|
|
unless ($r) { |
659
|
0
|
|
|
|
|
|
$self->log->debug('no task for get_task'); |
660
|
0
|
|
|
|
|
|
return; |
661
|
|
|
|
|
|
|
} |
662
|
0
|
|
|
|
|
|
my ($cookie, @args); |
663
|
0
|
|
|
|
|
|
($job_id, $cookie, @args) = @$r; |
664
|
0
|
0
|
|
|
|
|
unless ($cookie) { |
665
|
0
|
|
|
|
|
|
$self->log->debug('aaah? no cookie? (get_task)'); |
666
|
0
|
|
|
|
|
|
return; |
667
|
|
|
|
|
|
|
} |
668
|
0
|
0
|
|
|
|
|
pop @args unless $action->{addenv}; # remove env |
669
|
0
|
|
|
|
|
|
local $@; |
670
|
0
|
0
|
|
|
|
|
if ($action->{mode} eq 'subproc') { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
671
|
0
|
|
|
|
|
|
eval { |
672
|
0
|
|
|
|
|
|
$self->_subproc($c, $action, $job_id, $cookie, @args); |
673
|
|
|
|
|
|
|
}; |
674
|
0
|
0
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@; |
675
|
|
|
|
|
|
|
} elsif ($action->{mode} eq 'async') { |
676
|
0
|
|
|
|
|
|
eval { |
677
|
|
|
|
|
|
|
$action->{cb}->($job_id, @args, sub { |
678
|
0
|
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => $_[0] }); |
679
|
0
|
|
|
|
|
|
}); |
680
|
|
|
|
|
|
|
}; |
681
|
0
|
0
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@; |
682
|
|
|
|
|
|
|
} elsif ($action->{mode} eq 'sync') { |
683
|
0
|
|
|
|
|
|
my $outargs = eval { $action->{cb}->($job_id, @args) }; |
|
0
|
|
|
|
|
|
|
684
|
0
|
0
|
|
|
|
|
$outargs = { error => $@ } if $@; |
685
|
0
|
|
|
|
|
|
$c->notify('task_done', { cookie => $cookie, outargs => $outargs }); |
686
|
|
|
|
|
|
|
} else { |
687
|
0
|
|
|
|
|
|
die "unkown mode $action->{mode}"; |
688
|
|
|
|
|
|
|
} |
689
|
|
|
|
|
|
|
}], sub { |
690
|
0
|
|
|
0
|
|
|
my ($err) = @_; |
691
|
0
|
|
|
|
|
|
$self->log->error("something went wrong with rpc_task_ready: $err"); |
692
|
0
|
|
|
|
|
|
}); |
693
|
|
|
|
|
|
|
} |
694
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
sub _subproc { |
696
|
0
|
|
|
0
|
|
|
my ($self, $c, $action, $job_id, $cookie, @args) = @_; |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
# based on Mojo::IOLoop::Subprocess |
699
|
0
|
|
|
|
|
|
my $ioloop = $self->ioloop; |
700
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
# Pipe for subprocess communication |
702
|
0
|
0
|
|
|
|
|
pipe(my $reader, my $writer) or die "Can't create pipe: $!"; |
703
|
|
|
|
|
|
|
|
704
|
0
|
0
|
|
|
|
|
die "Can't fork: $!" unless defined(my $pid = fork); |
705
|
0
|
0
|
|
|
|
|
unless ($pid) {# Child |
706
|
0
|
|
|
|
|
|
$self->log->debug("in child $$");; |
707
|
0
|
|
|
|
|
|
$ioloop->reset; |
708
|
0
|
|
|
|
|
|
CORE::close $reader; # or we won't get a sigpipe when daddy dies.. |
709
|
0
|
|
|
|
|
|
my $undo = 0; |
710
|
0
|
|
|
|
|
|
my $outargs = eval { $action->{cb}->($job_id, @args) }; |
|
0
|
|
|
|
|
|
|
711
|
0
|
0
|
0
|
|
|
|
if ($@) { |
|
|
0
|
|
|
|
|
|
712
|
0
|
|
|
|
|
|
$outargs = {'error' => $@}; |
713
|
0
|
|
|
|
|
|
$undo++; |
714
|
|
|
|
|
|
|
} elsif (ref $outargs eq 'HASH' and $outargs->{'error'}) { |
715
|
0
|
|
|
|
|
|
$undo++; |
716
|
|
|
|
|
|
|
} |
717
|
0
|
0
|
0
|
|
|
|
if ($undo and $action->{undocb}) { |
718
|
0
|
|
|
|
|
|
$self->log->info("undoing for $job_id");; |
719
|
0
|
|
|
|
|
|
my $res = eval { $action->{undocb}->($job_id, @args); }; |
|
0
|
|
|
|
|
|
|
720
|
0
|
0
|
|
|
|
|
$res = $@ if $@; |
721
|
|
|
|
|
|
|
# how should this look? |
722
|
|
|
|
|
|
|
$outargs = {'error' => { |
723
|
|
|
|
|
|
|
'msg' => 'undo failure', |
724
|
|
|
|
|
|
|
'undo' => $res, |
725
|
|
|
|
|
|
|
'olderr' => $outargs->{error}, |
726
|
0
|
|
|
|
|
|
}}; |
727
|
0
|
|
|
|
|
|
$undo = 0; |
728
|
|
|
|
|
|
|
} |
729
|
|
|
|
|
|
|
# stop ignoring sigpipe |
730
|
0
|
|
|
0
|
|
|
$SIG{PIPE} = sub { $undo++ }; |
|
0
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
# if the parent is gone we get a sigpipe here: |
732
|
0
|
|
|
|
|
|
print $writer Storable::freeze($outargs); |
733
|
0
|
0
|
|
|
|
|
$writer->flush or $undo++; |
734
|
0
|
0
|
|
|
|
|
CORE::close $writer or $undo++; |
735
|
0
|
0
|
0
|
|
|
|
if ($undo and $action->{undocb}) { |
736
|
0
|
|
|
|
|
|
$self->log->info("undoing for $job_id");; |
737
|
0
|
|
|
|
|
|
eval { $action->{undocb}->($job_id, @args); }; |
|
0
|
|
|
|
|
|
|
738
|
|
|
|
|
|
|
# ignore errors because we can't report them back.. |
739
|
|
|
|
|
|
|
} |
740
|
|
|
|
|
|
|
# FIXME: normal exit? |
741
|
0
|
|
|
|
|
|
POSIX::_exit(0); |
742
|
|
|
|
|
|
|
} |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
# Parent |
745
|
0
|
|
|
|
|
|
my $me = $$; |
746
|
0
|
|
|
|
|
|
CORE::close $writer; |
747
|
0
|
|
|
|
|
|
my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0); |
748
|
0
|
|
|
|
|
|
$ioloop->stream($stream); |
749
|
0
|
|
|
|
|
|
my $buffer = ''; |
750
|
0
|
|
|
0
|
|
|
$stream->on(read => sub { $buffer .= pop }); |
|
0
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
$stream->on( |
752
|
|
|
|
|
|
|
close => sub { |
753
|
|
|
|
|
|
|
#say "close handler!"; |
754
|
0
|
0
|
|
0
|
|
|
return unless $$ == $me; |
755
|
0
|
|
|
|
|
|
waitpid $pid, 0; |
756
|
0
|
|
|
|
|
|
my $outargs = eval { Storable::thaw($buffer) }; |
|
0
|
|
|
|
|
|
|
757
|
0
|
0
|
|
|
|
|
$outargs = { error => $@ } if $@; |
758
|
0
|
0
|
0
|
|
|
|
if ($outargs and ref $outargs eq 'HASH') { |
759
|
0
|
|
|
|
|
|
$self->log->debug('subprocess results: ' . Dumper($outargs)); |
760
|
0
|
|
|
|
|
|
eval { $c->notify( |
|
0
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
'task_done', |
762
|
|
|
|
|
|
|
{ cookie => $cookie, outargs => $outargs } |
763
|
|
|
|
|
|
|
); }; # the connection might be gone? |
764
|
|
|
|
|
|
|
} # else? |
765
|
|
|
|
|
|
|
} |
766
|
0
|
|
|
|
|
|
); |
767
|
|
|
|
|
|
|
} |
768
|
|
|
|
|
|
|
|
769
|
|
|
|
|
|
|
# tick while Mojo::Reactor is still running and condition callback is true |
770
|
|
|
|
|
|
|
sub _loop { |
771
|
0
|
0
|
|
0
|
|
|
warn __PACKAGE__." recursing into IO loop" if state $looping++; |
772
|
|
|
|
|
|
|
|
773
|
0
|
|
|
|
|
|
my $reactor = $_[0]->ioloop->singleton->reactor; |
774
|
0
|
|
|
|
|
|
my $err; |
775
|
|
|
|
|
|
|
|
776
|
0
|
0
|
|
|
|
|
if (ref $reactor eq 'Mojo::Reactor::EV') { |
|
|
0
|
|
|
|
|
|
777
|
|
|
|
|
|
|
|
778
|
0
|
|
|
|
|
|
my $active = 1; |
779
|
|
|
|
|
|
|
|
780
|
0
|
|
0
|
|
|
|
$active = $reactor->one_tick while $_[1]->() && $active; |
781
|
|
|
|
|
|
|
|
782
|
|
|
|
|
|
|
} elsif (ref $reactor eq 'Mojo::Reactor::Poll') { |
783
|
|
|
|
|
|
|
|
784
|
0
|
|
|
|
|
|
$reactor->{running}++; |
785
|
|
|
|
|
|
|
|
786
|
0
|
|
0
|
|
|
|
$reactor->one_tick while $_[1]->() && $reactor->is_running; |
787
|
|
|
|
|
|
|
|
788
|
0
|
|
0
|
|
|
|
$reactor->{running} &&= $reactor->{running} - 1; |
789
|
|
|
|
|
|
|
|
790
|
|
|
|
|
|
|
} else { |
791
|
|
|
|
|
|
|
|
792
|
0
|
|
|
|
|
|
$err = "unknown reactor: ".ref $reactor; |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
|
795
|
0
|
|
|
|
|
|
$looping--; |
796
|
0
|
0
|
|
|
|
|
die $err if $err; |
797
|
|
|
|
|
|
|
} |
798
|
|
|
|
|
|
|
|
799
|
|
|
|
|
|
|
#sub DESTROY { |
800
|
|
|
|
|
|
|
# my $self = shift; |
801
|
|
|
|
|
|
|
# say 'destroying ', $self; |
802
|
|
|
|
|
|
|
#} |
803
|
|
|
|
|
|
|
|
804
|
|
|
|
|
|
|
1; |
805
|
|
|
|
|
|
|
|
806
|
|
|
|
|
|
|
=encoding utf8 |
807
|
|
|
|
|
|
|
|
808
|
|
|
|
|
|
|
=head1 NAME |
809
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
JobCenter::Client::Mojo - JobCenter JSON-RPC 2.0 Api client using Mojo. |
811
|
|
|
|
|
|
|
|
812
|
|
|
|
|
|
|
=head1 SYNOPSIS |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
use JobCenter::Client::Mojo; |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
my $client = JobCenter::Client::Mojo->new( |
817
|
|
|
|
|
|
|
address => ... |
818
|
|
|
|
|
|
|
port => ... |
819
|
|
|
|
|
|
|
who => ... |
820
|
|
|
|
|
|
|
token => ... |
821
|
|
|
|
|
|
|
); |
822
|
|
|
|
|
|
|
|
823
|
|
|
|
|
|
|
my ($job_id, $outargs) = $client->call( |
824
|
|
|
|
|
|
|
wfname => 'test', |
825
|
|
|
|
|
|
|
inargs => { test => 'test' }, |
826
|
|
|
|
|
|
|
); |
827
|
|
|
|
|
|
|
|
828
|
|
|
|
|
|
|
=head1 DESCRIPTION |
829
|
|
|
|
|
|
|
|
830
|
|
|
|
|
|
|
L is a class to build a client to connect to the |
831
|
|
|
|
|
|
|
JSON-RPC 2.0 Api of the L workflow engine. The client can be |
832
|
|
|
|
|
|
|
used to create and inspect jobs as well as for providing 'worker' services |
833
|
|
|
|
|
|
|
to the JobCenter. |
834
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
=head1 METHODS |
836
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
=head2 new |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
$client = JobCenter::Client::Mojo->new(%arguments); |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
Class method that returns a new JobCenter::Client::Mojo object. |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
Valid arguments are: |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
=over 4 |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
=item - address: address of the Api. |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
(default: 127.0.0.1) |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
=item - port: port of the Api |
852
|
|
|
|
|
|
|
|
853
|
|
|
|
|
|
|
(default 6522) |
854
|
|
|
|
|
|
|
|
855
|
|
|
|
|
|
|
=item - tls: connect using tls |
856
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
(default false) |
858
|
|
|
|
|
|
|
|
859
|
|
|
|
|
|
|
=item - tls_ca: verify server using ca |
860
|
|
|
|
|
|
|
|
861
|
|
|
|
|
|
|
(default undef) |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
=item - tls_key: private client key |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
(default undef) |
866
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
=item - tls_ca: public client certificate |
868
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
(default undef) |
870
|
|
|
|
|
|
|
|
871
|
|
|
|
|
|
|
=item - who: who to authenticate as. |
872
|
|
|
|
|
|
|
|
873
|
|
|
|
|
|
|
(required) |
874
|
|
|
|
|
|
|
|
875
|
|
|
|
|
|
|
=item - method: how to authenticate. |
876
|
|
|
|
|
|
|
|
877
|
|
|
|
|
|
|
(default: password) |
878
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
=item - token: token to authenticate with. |
880
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
(required) |
882
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
=item - debug: when true prints debugging using L |
884
|
|
|
|
|
|
|
|
885
|
|
|
|
|
|
|
(default: false) |
886
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
=item - ioloop: L object to use |
888
|
|
|
|
|
|
|
|
889
|
|
|
|
|
|
|
(per default the L->singleton object is used) |
890
|
|
|
|
|
|
|
|
891
|
|
|
|
|
|
|
=item - json: flag wether input is json or perl. |
892
|
|
|
|
|
|
|
|
893
|
|
|
|
|
|
|
when true expects the inargs to be valid json, when false a perl hashref is |
894
|
|
|
|
|
|
|
expected and json encoded. (default true) |
895
|
|
|
|
|
|
|
|
896
|
|
|
|
|
|
|
=item - jsonobject: json encoder/decoder object to use |
897
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
(per default a new L object is created) |
899
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
=item - log: L object to use |
901
|
|
|
|
|
|
|
|
902
|
|
|
|
|
|
|
(per default a new L object is created) |
903
|
|
|
|
|
|
|
|
904
|
|
|
|
|
|
|
=item - timeout: how long to wait for Api calls to complete |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
(default 60 seconds) |
907
|
|
|
|
|
|
|
|
908
|
|
|
|
|
|
|
=item - ping_timeout: after this long without a ping from the Api the |
909
|
|
|
|
|
|
|
connection will be closed and the work() method will return |
910
|
|
|
|
|
|
|
|
911
|
|
|
|
|
|
|
(default 5 minutes) |
912
|
|
|
|
|
|
|
|
913
|
|
|
|
|
|
|
=back |
914
|
|
|
|
|
|
|
|
915
|
|
|
|
|
|
|
=head2 call |
916
|
|
|
|
|
|
|
|
917
|
|
|
|
|
|
|
($job_id, $result) = $client->call(%args); |
918
|
|
|
|
|
|
|
|
919
|
|
|
|
|
|
|
Creates a new L job and waits for the results. Throws an error |
920
|
|
|
|
|
|
|
if somethings goes wrong immediately. Errors encountered during later |
921
|
|
|
|
|
|
|
processing are returned as a L error object. |
922
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
Valid arguments are: |
924
|
|
|
|
|
|
|
|
925
|
|
|
|
|
|
|
=over 4 |
926
|
|
|
|
|
|
|
|
927
|
|
|
|
|
|
|
=item - wfname: name of the workflow to call (required) |
928
|
|
|
|
|
|
|
|
929
|
|
|
|
|
|
|
=item - inargs: input arguments for the workflow (if any) |
930
|
|
|
|
|
|
|
|
931
|
|
|
|
|
|
|
=item - vtag: version tag of the workflow to use (optional) |
932
|
|
|
|
|
|
|
|
933
|
|
|
|
|
|
|
=item - timeout: wait this many seconds for the job to finish |
934
|
|
|
|
|
|
|
(optional, defaults to 5 times the Api-call timeout, so default 5 minutes) |
935
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
=item - reqauth: authentication token to be passed on to the authentication |
937
|
|
|
|
|
|
|
module of the API for per job/request authentication. |
938
|
|
|
|
|
|
|
|
939
|
|
|
|
|
|
|
=item - clenv: client environment, made available as part of the job |
940
|
|
|
|
|
|
|
environment and inherited to child jobs. |
941
|
|
|
|
|
|
|
|
942
|
|
|
|
|
|
|
=back |
943
|
|
|
|
|
|
|
|
944
|
|
|
|
|
|
|
=head2 call_nb |
945
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
$job_id = $client->call_nb(%args); |
947
|
|
|
|
|
|
|
|
948
|
|
|
|
|
|
|
Creates a new L job and call the provided callback on completion |
949
|
|
|
|
|
|
|
of the job. Throws an error if somethings goes wrong immediately. Errors |
950
|
|
|
|
|
|
|
encountered during later processing are returned as a L error |
951
|
|
|
|
|
|
|
object to the callback. |
952
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
Valid arguments are those for L and: |
954
|
|
|
|
|
|
|
|
955
|
|
|
|
|
|
|
=over 4 |
956
|
|
|
|
|
|
|
|
957
|
|
|
|
|
|
|
=item - cb1: coderef to the callback to call on job creation (requird) |
958
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
( cb1 => sub { ($job_id, $err) = @_; ... } ) |
960
|
|
|
|
|
|
|
|
961
|
|
|
|
|
|
|
If job_id is undefined the job was not created, the error is then returned |
962
|
|
|
|
|
|
|
as the second return value. |
963
|
|
|
|
|
|
|
|
964
|
|
|
|
|
|
|
=item - cb2: coderef to the callback to call on job completion (requird) |
965
|
|
|
|
|
|
|
|
966
|
|
|
|
|
|
|
( cb2 => sub { ($job_id, $outargs) = @_; ... } ) |
967
|
|
|
|
|
|
|
|
968
|
|
|
|
|
|
|
=back |
969
|
|
|
|
|
|
|
|
970
|
|
|
|
|
|
|
=head2 get_job_status |
971
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
($job_id, $result) = $client->get_job_status($job_id); |
973
|
|
|
|
|
|
|
|
974
|
|
|
|
|
|
|
Retrieves the status for the given $job_id. If the job_id does not exist |
975
|
|
|
|
|
|
|
then the returned $job_id will be undefined and $result will be an error |
976
|
|
|
|
|
|
|
message. If the job has not finished executing then both $job_id and |
977
|
|
|
|
|
|
|
$result will be undefined. Otherwise the $result will contain the result of |
978
|
|
|
|
|
|
|
the job. (Which may be a JobCenter error object) |
979
|
|
|
|
|
|
|
|
980
|
|
|
|
|
|
|
=head2 get_job_status_nb |
981
|
|
|
|
|
|
|
|
982
|
|
|
|
|
|
|
$client->get_job_status_nb(%args); |
983
|
|
|
|
|
|
|
|
984
|
|
|
|
|
|
|
Retrieves the status for the given $job_id. |
985
|
|
|
|
|
|
|
|
986
|
|
|
|
|
|
|
Valid arguments are: |
987
|
|
|
|
|
|
|
|
988
|
|
|
|
|
|
|
=over 4 |
989
|
|
|
|
|
|
|
|
990
|
|
|
|
|
|
|
=item - job_id |
991
|
|
|
|
|
|
|
|
992
|
|
|
|
|
|
|
=item - statuscb: coderef to the callback for the current status |
993
|
|
|
|
|
|
|
|
994
|
|
|
|
|
|
|
( statuscb => sub { ($job_id, $result) = @_; ... } ) |
995
|
|
|
|
|
|
|
|
996
|
|
|
|
|
|
|
If the job_id does not exist then the returned $job_id will be undefined |
997
|
|
|
|
|
|
|
and $result will be an error message. If the job has not finished executing |
998
|
|
|
|
|
|
|
then both $job_id and $result will be undefined. Otherwise the $result will |
999
|
|
|
|
|
|
|
contain the result of the job. (Which may be a JobCenter error object) |
1000
|
|
|
|
|
|
|
|
1001
|
|
|
|
|
|
|
=item - notifycb: coderef to the callback for job completion |
1002
|
|
|
|
|
|
|
|
1003
|
|
|
|
|
|
|
( statuscb => sub { ($job_id, $result) = @_; ... } ) |
1004
|
|
|
|
|
|
|
|
1005
|
|
|
|
|
|
|
If the job was still running when the get_job_status_nb call was made then |
1006
|
|
|
|
|
|
|
this callback will be called on completion of the job. |
1007
|
|
|
|
|
|
|
|
1008
|
|
|
|
|
|
|
=back |
1009
|
|
|
|
|
|
|
|
1010
|
|
|
|
|
|
|
=head2 find_jobs |
1011
|
|
|
|
|
|
|
|
1012
|
|
|
|
|
|
|
($err, @jobs) = $client->find_jobs({'foo'=>'bar'}); |
1013
|
|
|
|
|
|
|
|
1014
|
|
|
|
|
|
|
Finds all currently running jobs with arguments matching the filter |
1015
|
|
|
|
|
|
|
expression. The expression is evaluated in PostgreSQL using the @> for |
1016
|
|
|
|
|
|
|
jsonb objects, basically this means that you can only do equality tests for |
1017
|
|
|
|
|
|
|
one or more top-level keys. If @jobs is empty $err might contain an error |
1018
|
|
|
|
|
|
|
message. |
1019
|
|
|
|
|
|
|
|
1020
|
|
|
|
|
|
|
=head2 ping |
1021
|
|
|
|
|
|
|
|
1022
|
|
|
|
|
|
|
$status = $client->ping($timeout); |
1023
|
|
|
|
|
|
|
|
1024
|
|
|
|
|
|
|
Tries to ping the JobCenter API. On success return true. On failure returns |
1025
|
|
|
|
|
|
|
the undefined value, after that the client object should be undefined. |
1026
|
|
|
|
|
|
|
|
1027
|
|
|
|
|
|
|
=head2 close |
1028
|
|
|
|
|
|
|
|
1029
|
|
|
|
|
|
|
$client->close() |
1030
|
|
|
|
|
|
|
|
1031
|
|
|
|
|
|
|
Closes the connection to the JobCenter API and tries to de-allocate |
1032
|
|
|
|
|
|
|
everything. Trying to use the client afterwards will produce errors. |
1033
|
|
|
|
|
|
|
|
1034
|
|
|
|
|
|
|
=head2 create_slotgroup |
1035
|
|
|
|
|
|
|
|
1036
|
|
|
|
|
|
|
$client->create_slotgroup($name, $slots) |
1037
|
|
|
|
|
|
|
|
1038
|
|
|
|
|
|
|
A 'slotgroup' is a way of telling the JobCenter API how many taskss the |
1039
|
|
|
|
|
|
|
worker can do at once. The number of slots should be a positive integer. |
1040
|
|
|
|
|
|
|
Slotgroups names starting with a _ (underscore) are reserved for internal |
1041
|
|
|
|
|
|
|
use. Returns a error message when then was an error, the empty string |
1042
|
|
|
|
|
|
|
otherwise. |
1043
|
|
|
|
|
|
|
|
1044
|
|
|
|
|
|
|
=head2 announce |
1045
|
|
|
|
|
|
|
|
1046
|
|
|
|
|
|
|
Announces the capability to do an action to the Api. The provided callback |
1047
|
|
|
|
|
|
|
will be called when there is a task to be performed. Returns an error when |
1048
|
|
|
|
|
|
|
there was a problem announcing the action. |
1049
|
|
|
|
|
|
|
|
1050
|
|
|
|
|
|
|
my $err = $client->announce( |
1051
|
|
|
|
|
|
|
workername => 'me', |
1052
|
|
|
|
|
|
|
actionname => 'do', |
1053
|
|
|
|
|
|
|
slots => 1 |
1054
|
|
|
|
|
|
|
cb => sub { ... }, |
1055
|
|
|
|
|
|
|
); |
1056
|
|
|
|
|
|
|
die "could not announce $actionname?: $err" if $err; |
1057
|
|
|
|
|
|
|
|
1058
|
|
|
|
|
|
|
See L for an example. |
1059
|
|
|
|
|
|
|
|
1060
|
|
|
|
|
|
|
Valid arguments are: |
1061
|
|
|
|
|
|
|
|
1062
|
|
|
|
|
|
|
=over 4 |
1063
|
|
|
|
|
|
|
|
1064
|
|
|
|
|
|
|
=item - workername: name of the worker |
1065
|
|
|
|
|
|
|
|
1066
|
|
|
|
|
|
|
(optional, defaults to client->who, processname and processid) |
1067
|
|
|
|
|
|
|
|
1068
|
|
|
|
|
|
|
=item - actionname: name of the action |
1069
|
|
|
|
|
|
|
|
1070
|
|
|
|
|
|
|
(required) |
1071
|
|
|
|
|
|
|
|
1072
|
|
|
|
|
|
|
=item - cb: callback to be called for the action |
1073
|
|
|
|
|
|
|
|
1074
|
|
|
|
|
|
|
(required) |
1075
|
|
|
|
|
|
|
|
1076
|
|
|
|
|
|
|
=item - mode: callback mode |
1077
|
|
|
|
|
|
|
|
1078
|
|
|
|
|
|
|
(optional, default 'sync') |
1079
|
|
|
|
|
|
|
|
1080
|
|
|
|
|
|
|
Possible values: |
1081
|
|
|
|
|
|
|
|
1082
|
|
|
|
|
|
|
=over 8 |
1083
|
|
|
|
|
|
|
|
1084
|
|
|
|
|
|
|
=item - 'sync': simple blocking mode, just return the results from the |
1085
|
|
|
|
|
|
|
callback. Use only for callbacks taking less than (about) a second. |
1086
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
=item - 'subproc': the simple blocking callback is started in a seperate |
1088
|
|
|
|
|
|
|
process. Useful for callbacks that take a long time. |
1089
|
|
|
|
|
|
|
|
1090
|
|
|
|
|
|
|
=item - 'async': the callback gets passed another callback as the last |
1091
|
|
|
|
|
|
|
argument that is to be called on completion of the task. For advanced use |
1092
|
|
|
|
|
|
|
cases where the worker is actually more like a proxy. The (initial) |
1093
|
|
|
|
|
|
|
callback is expected to return soonish to the event loop, after setting up |
1094
|
|
|
|
|
|
|
some Mojo-callbacks. |
1095
|
|
|
|
|
|
|
|
1096
|
|
|
|
|
|
|
=back |
1097
|
|
|
|
|
|
|
|
1098
|
|
|
|
|
|
|
=item - async: backwards compatible way for specifying mode 'async' |
1099
|
|
|
|
|
|
|
|
1100
|
|
|
|
|
|
|
(optional, default false) |
1101
|
|
|
|
|
|
|
|
1102
|
|
|
|
|
|
|
=item - slotgroup: the slotgroup to use for accounting of parrallel tasks |
1103
|
|
|
|
|
|
|
|
1104
|
|
|
|
|
|
|
(optional, conflicts with 'slots') |
1105
|
|
|
|
|
|
|
|
1106
|
|
|
|
|
|
|
=item - slots: the amount of tasks the worker is able to process in parallel |
1107
|
|
|
|
|
|
|
for this action. |
1108
|
|
|
|
|
|
|
|
1109
|
|
|
|
|
|
|
(optional, default 1, conflicts with 'slotgroup') |
1110
|
|
|
|
|
|
|
|
1111
|
|
|
|
|
|
|
=item - undocb: a callback that gets called when the original callback |
1112
|
|
|
|
|
|
|
returns an error object or throws an error. |
1113
|
|
|
|
|
|
|
|
1114
|
|
|
|
|
|
|
Called with the same arguments as the original callback. |
1115
|
|
|
|
|
|
|
|
1116
|
|
|
|
|
|
|
(optional, only valid for mode 'subproc') |
1117
|
|
|
|
|
|
|
|
1118
|
|
|
|
|
|
|
=item - filter: only process a subset of the action |
1119
|
|
|
|
|
|
|
|
1120
|
|
|
|
|
|
|
The filter expression allows a worker to specify that it can only do the |
1121
|
|
|
|
|
|
|
actionname for a certain subset of arguments. For example, for a "mkdir" |
1122
|
|
|
|
|
|
|
action the filter expression {'host' => 'example.com'} would mean that this |
1123
|
|
|
|
|
|
|
worker can only do mkdir on host example.com. Filter expressions are limited |
1124
|
|
|
|
|
|
|
to simple equality tests on one or more keys, and only those keys that are |
1125
|
|
|
|
|
|
|
allowed in the action definition. Filtering can be allowed, be mandatory or |
1126
|
|
|
|
|
|
|
be forbidden per action. |
1127
|
|
|
|
|
|
|
|
1128
|
|
|
|
|
|
|
=item - addenv: pass on action enviroment to the callback |
1129
|
|
|
|
|
|
|
|
1130
|
|
|
|
|
|
|
If the addenv flag is true the action callback will be given one extra |
1131
|
|
|
|
|
|
|
argument containing the action environment as a hashref. In the async |
1132
|
|
|
|
|
|
|
callback mode the environment will be inserted before the result callback. |
1133
|
|
|
|
|
|
|
|
1134
|
|
|
|
|
|
|
=back |
1135
|
|
|
|
|
|
|
|
1136
|
|
|
|
|
|
|
=head2 work |
1137
|
|
|
|
|
|
|
|
1138
|
|
|
|
|
|
|
Starts the L. Returns a non-zero value when the IOLoop was |
1139
|
|
|
|
|
|
|
stopped due to some error condition (like a lost connection or a ping |
1140
|
|
|
|
|
|
|
timeout). |
1141
|
|
|
|
|
|
|
|
1142
|
|
|
|
|
|
|
=head3 Possible work() exit codes |
1143
|
|
|
|
|
|
|
|
1144
|
|
|
|
|
|
|
The JobCenter::Client::Mojo library currently defines the following exit codes: |
1145
|
|
|
|
|
|
|
|
1146
|
|
|
|
|
|
|
WORK_OK |
1147
|
|
|
|
|
|
|
WORK_PING_TIMEOUT |
1148
|
|
|
|
|
|
|
WORK_CONNECTION_CLOSED |
1149
|
|
|
|
|
|
|
|
1150
|
|
|
|
|
|
|
=head2 stop |
1151
|
|
|
|
|
|
|
|
1152
|
|
|
|
|
|
|
$client->stop($exit); |
1153
|
|
|
|
|
|
|
|
1154
|
|
|
|
|
|
|
Makes the work() function exit with the provided exit code. |
1155
|
|
|
|
|
|
|
|
1156
|
|
|
|
|
|
|
=head1 SEE ALSO |
1157
|
|
|
|
|
|
|
|
1158
|
|
|
|
|
|
|
=over 4 |
1159
|
|
|
|
|
|
|
|
1160
|
|
|
|
|
|
|
=item * |
1161
|
|
|
|
|
|
|
|
1162
|
|
|
|
|
|
|
L, L, L: the L Web framework |
1163
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
=item * |
1165
|
|
|
|
|
|
|
|
1166
|
|
|
|
|
|
|
L, L |
1167
|
|
|
|
|
|
|
|
1168
|
|
|
|
|
|
|
=back |
1169
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
L: JobCenter Orchestration Engine |
1171
|
|
|
|
|
|
|
|
1172
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENT |
1173
|
|
|
|
|
|
|
|
1174
|
|
|
|
|
|
|
This software has been developed with support from L. |
1175
|
|
|
|
|
|
|
In German: Diese Software wurde mit Unterstützung von L entwickelt. |
1176
|
|
|
|
|
|
|
|
1177
|
|
|
|
|
|
|
=head1 THANKS |
1178
|
|
|
|
|
|
|
|
1179
|
|
|
|
|
|
|
Thanks to Eitan Schuler for reporting a bug and providing a pull request. |
1180
|
|
|
|
|
|
|
|
1181
|
|
|
|
|
|
|
=head1 AUTHORS |
1182
|
|
|
|
|
|
|
|
1183
|
|
|
|
|
|
|
=over 4 |
1184
|
|
|
|
|
|
|
|
1185
|
|
|
|
|
|
|
=item * |
1186
|
|
|
|
|
|
|
|
1187
|
|
|
|
|
|
|
Wieger Opmeer |
1188
|
|
|
|
|
|
|
|
1189
|
|
|
|
|
|
|
=back |
1190
|
|
|
|
|
|
|
|
1191
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
1192
|
|
|
|
|
|
|
|
1193
|
|
|
|
|
|
|
This software is copyright (c) 2017 by Wieger Opmeer. |
1194
|
|
|
|
|
|
|
|
1195
|
|
|
|
|
|
|
This is free software; you can redistribute it and/or modify it under |
1196
|
|
|
|
|
|
|
the same terms as the Perl 5 programming language system itself. |
1197
|
|
|
|
|
|
|
|
1198
|
|
|
|
|
|
|
=cut |
1199
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
1; |