line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# Lightweight client for the RPC-Switch json-rpc request multiplexer |
2
|
|
|
|
|
|
|
# |
3
|
|
|
|
|
|
|
# see: RPC::Switch: https://github.com/a6502/rpc-switch |
4
|
|
|
|
|
|
|
# RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client |
5
|
|
|
|
|
|
|
# |
6
|
|
|
|
|
|
|
package RPC::Switch::Client::Tiny; |
7
|
|
|
|
|
|
|
|
8
|
20
|
|
|
20
|
|
2165687
|
use strict; |
|
20
|
|
|
|
|
217
|
|
|
20
|
|
|
|
|
561
|
|
9
|
20
|
|
|
20
|
|
100
|
use warnings; |
|
20
|
|
|
|
|
41
|
|
|
20
|
|
|
|
|
520
|
|
10
|
20
|
|
|
20
|
|
99
|
use Carp 'croak'; |
|
20
|
|
|
|
|
40
|
|
|
20
|
|
|
|
|
999
|
|
11
|
20
|
|
|
20
|
|
858
|
use JSON; |
|
20
|
|
|
|
|
8171
|
|
|
20
|
|
|
|
|
100
|
|
12
|
20
|
|
|
20
|
|
11581
|
use IO::Select; |
|
20
|
|
|
|
|
35863
|
|
|
20
|
|
|
|
|
1567
|
|
13
|
20
|
|
|
20
|
|
16800
|
use IO::Socket::SSL; |
|
20
|
|
|
|
|
1259498
|
|
|
20
|
|
|
|
|
179
|
|
14
|
20
|
|
|
20
|
|
13940
|
use Time::HiRes qw(time); |
|
20
|
|
|
|
|
25919
|
|
|
20
|
|
|
|
|
105
|
|
15
|
20
|
|
|
20
|
|
12689
|
use RPC::Switch::Client::Tiny::Error; |
|
20
|
|
|
|
|
41
|
|
|
20
|
|
|
|
|
629
|
|
16
|
20
|
|
|
20
|
|
7719
|
use RPC::Switch::Client::Tiny::Netstring; |
|
20
|
|
|
|
|
40
|
|
|
20
|
|
|
|
|
1188
|
|
17
|
20
|
|
|
20
|
|
8005
|
use RPC::Switch::Client::Tiny::Async; |
|
20
|
|
|
|
|
59
|
|
|
20
|
|
|
|
|
704
|
|
18
|
20
|
|
|
20
|
|
8307
|
use RPC::Switch::Client::Tiny::SessionCache; |
|
20
|
|
|
|
|
40
|
|
|
20
|
|
|
|
|
112304
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
our $VERSION = '1.66'; |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
sub new { |
23
|
234
|
|
|
234
|
1
|
421262
|
my ($class, %args) = @_; |
24
|
234
|
50
|
|
|
|
1822
|
my $s = $args{sock} or croak __PACKAGE__ . " expects sock"; |
25
|
234
|
50
|
|
|
|
1313
|
unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call |
26
|
234
|
50
|
|
|
|
1333
|
defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!"; |
27
|
234
|
100
|
|
|
|
3134
|
unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; } |
|
19
|
|
|
|
|
3857
|
|
28
|
|
|
|
|
|
|
} |
29
|
215
|
50
|
|
|
|
646
|
unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; } |
|
0
|
|
|
|
|
0
|
|
30
|
215
|
|
|
|
|
3875
|
my $self = bless { |
31
|
|
|
|
|
|
|
%args, |
32
|
|
|
|
|
|
|
id => 1, # next request id |
33
|
|
|
|
|
|
|
state => '', # last rpcswitch.type |
34
|
|
|
|
|
|
|
reqs => {}, # outstanding requests |
35
|
|
|
|
|
|
|
channels => {}, # open rpcswitch channels |
36
|
|
|
|
|
|
|
methods => {}, # defined worker methods |
37
|
|
|
|
|
|
|
announced => {}, # announced worker methods |
38
|
|
|
|
|
|
|
msglimit => 999999, # max netstring size |
39
|
|
|
|
|
|
|
}, $class; |
40
|
215
|
50
|
|
|
|
1580
|
if (ref($self->{sock}) eq 'IO::Socket::SSL') { |
41
|
0
|
0
|
|
|
|
0
|
$self->{auth_method} = 'clientcert' unless exists $self->{auth_method}; |
42
|
0
|
0
|
|
|
|
0
|
$self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert |
43
|
|
|
|
|
|
|
} else { |
44
|
215
|
50
|
|
|
|
1085
|
$self->{auth_method} = 'password' unless exists $self->{auth_method}; |
45
|
|
|
|
|
|
|
} |
46
|
215
|
100
|
|
|
|
1120
|
$self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1}; |
47
|
215
|
|
|
|
|
1050
|
return $self; |
48
|
|
|
|
|
|
|
} |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
sub rpc_error { |
51
|
76
|
|
|
76
|
0
|
1349
|
return RPC::Switch::Client::Tiny::Error->new(@_); |
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
sub rpc_send { |
55
|
864
|
|
|
864
|
0
|
3545
|
my ($self, $msg) = @_; |
56
|
864
|
|
|
|
|
3627
|
my $s = $self->{sock}; |
57
|
864
|
|
|
|
|
3738
|
my $len = length($s); |
58
|
864
|
50
|
33
|
|
|
9832
|
if ($self->{msglimit} && ($len > $self->{msglimit})) { |
59
|
0
|
|
|
|
|
0
|
warn "rpc_send msglimit exceeded: $len > $self->{msglimit}"; |
60
|
0
|
|
|
|
|
0
|
return; |
61
|
|
|
|
|
|
|
} |
62
|
864
|
|
|
|
|
4349
|
$msg->{jsonrpc} = '2.0'; |
63
|
864
|
|
|
|
|
1592
|
my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); |
|
864
|
|
|
|
|
5326
|
|
64
|
864
|
100
|
|
|
|
31099
|
$self->{trace_cb}->('SND', $msg) if $self->{trace_cb}; |
65
|
864
|
|
|
|
|
12320
|
return netstring_write($s, $str); |
66
|
|
|
|
|
|
|
} |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
sub rpc_send_req { |
69
|
472
|
|
|
472
|
0
|
1593
|
my ($self, $method, $msg) = @_; |
70
|
472
|
|
|
|
|
1902
|
my $id = "$self->{id}"; $self->{id}++; |
|
472
|
|
|
|
|
982
|
|
71
|
472
|
|
|
|
|
1142
|
$msg->{id} = $id; |
72
|
472
|
|
|
|
|
880
|
$msg->{method} = $method; |
73
|
472
|
|
|
|
|
2619
|
$self->{reqs}{$id} = $method; |
74
|
472
|
100
|
|
|
|
4318
|
$self->{state} = $method if $method =~ /^rpcswitch\./; |
75
|
472
|
100
|
|
|
|
1559
|
$self->rpc_send($msg) or return; |
76
|
467
|
|
|
|
|
1452
|
return $id; |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
sub rpc_send_call { |
80
|
176
|
|
|
176
|
0
|
1345
|
my ($self, $method, $params, $reqauth) = @_; |
81
|
|
|
|
|
|
|
|
82
|
176
|
50
|
|
|
|
557
|
if (defined $reqauth) { # request authentication |
83
|
|
|
|
|
|
|
# Without the vcookie the rpcswitch does not validate |
84
|
|
|
|
|
|
|
# the reqauth parameter. |
85
|
|
|
|
|
|
|
# The vcookie 'eatme' value is hardcoded in the rpc-switch |
86
|
|
|
|
|
|
|
# code and is called 'channel information version'. |
87
|
|
|
|
|
|
|
# |
88
|
0
|
|
|
|
|
0
|
return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}}); |
89
|
|
|
|
|
|
|
} else { |
90
|
176
|
|
|
|
|
752
|
return $self->rpc_send_req($method, {params => $params}); |
91
|
|
|
|
|
|
|
} |
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
sub rpc_decode { |
95
|
849
|
|
|
849
|
0
|
2070
|
my ($self, $msg) = @_; |
96
|
849
|
|
|
|
|
4744
|
my ($req, $rsp) = ('', ''); |
97
|
|
|
|
|
|
|
|
98
|
849
|
50
|
66
|
|
|
6762
|
unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) { |
|
|
|
66
|
|
|
|
|
99
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1})); |
100
|
|
|
|
|
|
|
} |
101
|
849
|
100
|
|
|
|
5430
|
if (exists $msg->{method}) { |
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
102
|
420
|
|
|
|
|
1060
|
$req = $msg->{method}; |
103
|
|
|
|
|
|
|
} elsif (!defined $msg->{id}) { |
104
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1})); |
105
|
|
|
|
|
|
|
} elsif (!exists $self->{reqs}{$msg->{id}}) { |
106
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1})); |
107
|
|
|
|
|
|
|
} else { |
108
|
429
|
|
|
|
|
1403
|
$rsp = delete $self->{reqs}{$msg->{id}}; |
109
|
|
|
|
|
|
|
|
110
|
429
|
50
|
|
|
|
1567
|
if (exists $msg->{error}) { |
111
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}}); |
112
|
|
|
|
|
|
|
} |
113
|
429
|
50
|
|
|
|
1124
|
if (!exists $msg->{result}) { |
114
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing"); |
115
|
|
|
|
|
|
|
} |
116
|
429
|
50
|
100
|
|
|
1913
|
if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) { |
|
|
|
66
|
|
|
|
|
117
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}"); |
118
|
|
|
|
|
|
|
} |
119
|
|
|
|
|
|
|
} |
120
|
849
|
|
|
|
|
4103
|
return ($req, $rsp); |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
sub rpc_worker_announce { |
124
|
159
|
|
|
159
|
0
|
478
|
my ($self, $workername) = @_; |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
# ignore repeated announce request or unfinished withdraw |
127
|
|
|
|
|
|
|
# |
128
|
159
|
100
|
|
|
|
240
|
return if (keys %{$self->{announced}}); |
|
159
|
|
|
|
|
684
|
|
129
|
|
|
|
|
|
|
|
130
|
130
|
|
|
|
|
259
|
foreach my $method (keys %{$self->{methods}}) { |
|
130
|
|
|
|
|
594
|
|
131
|
145
|
100
|
|
|
|
501
|
next if exists $self->{methods}{$method}{id}; # active announce/withdraw request |
132
|
|
|
|
|
|
|
|
133
|
135
|
|
|
|
|
885
|
my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}}; |
134
|
135
|
50
|
|
|
|
587
|
$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; |
135
|
135
|
|
|
|
|
504
|
my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params}); |
136
|
135
|
50
|
|
|
|
701
|
die rpc_error('io', 'netstring_write') unless defined $id; |
137
|
135
|
|
|
|
|
705
|
$self->{methods}{$method}{id} = $id; |
138
|
|
|
|
|
|
|
} |
139
|
130
|
|
|
|
|
341
|
return; |
140
|
|
|
|
|
|
|
} |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
sub rpc_worker_withdraw { |
143
|
31
|
|
|
31
|
0
|
110
|
my ($self) = @_; |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# callers will get code -32006 'opposite end of channel gone' |
146
|
|
|
|
|
|
|
# errors when the announcement is withdrawn. |
147
|
|
|
|
|
|
|
# |
148
|
31
|
|
|
|
|
54
|
foreach my $method (keys %{$self->{announced}}) { |
|
31
|
|
|
|
|
262
|
|
149
|
16
|
100
|
|
|
|
176
|
next if exists $self->{methods}{$method}{id}; # active announce/withdraw request |
150
|
|
|
|
|
|
|
|
151
|
8
|
|
|
|
|
40
|
my $params = {method => $method}; |
152
|
8
|
50
|
|
|
|
176
|
$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; |
153
|
8
|
|
|
|
|
168
|
my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params}); |
154
|
8
|
50
|
|
|
|
88
|
die rpc_error('io', 'netstring_write') unless defined $id; |
155
|
8
|
|
|
|
|
192
|
$self->{methods}{$method}{id} = $id; |
156
|
|
|
|
|
|
|
} |
157
|
31
|
|
|
|
|
117
|
return; |
158
|
|
|
|
|
|
|
} |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
sub rpc_worker_flowcontrol { |
161
|
584
|
|
|
584
|
0
|
2578
|
my ($self, $workername) = @_; |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
# need to be in connected auth state |
164
|
584
|
100
|
100
|
|
|
5023
|
return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello')); |
165
|
|
|
|
|
|
|
|
166
|
452
|
100
|
|
|
|
1356
|
if ($self->{flowcontrol}) { |
167
|
102
|
|
|
|
|
260
|
my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}}); |
|
102
|
|
|
|
|
354
|
|
|
102
|
|
|
|
|
240
|
|
168
|
|
|
|
|
|
|
#printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async}; |
169
|
102
|
100
|
|
|
|
472
|
if ($cnt >= $self->{async}{max_async} * 2) { |
|
|
100
|
|
|
|
|
|
170
|
31
|
|
|
|
|
229
|
$self->rpc_worker_withdraw(); |
171
|
|
|
|
|
|
|
} elsif ($cnt < $self->{async}{max_async}) { |
172
|
44
|
|
|
|
|
236
|
$self->rpc_worker_announce($workername); |
173
|
|
|
|
|
|
|
} |
174
|
|
|
|
|
|
|
} |
175
|
452
|
|
|
|
|
947
|
return; |
176
|
|
|
|
|
|
|
} |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
sub valid_worker_err { |
179
|
19
|
|
|
19
|
0
|
114
|
my ($err) = @_; |
180
|
19
|
50
|
|
|
|
532
|
$err = {text => $err} unless ref($err); # convert plain errors |
181
|
19
|
50
|
|
|
|
228
|
$err->{class} = 'hard' unless exists $err->{class}; |
182
|
19
|
|
|
|
|
57
|
return $err; |
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub rpcswitch_resp { |
186
|
244
|
|
|
244
|
0
|
702
|
my ($rpcswitch) = @_; |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
# Just the vcookie & vci-channel parameters are required by |
189
|
|
|
|
|
|
|
# the rpcswitch. The worker_id field is optional, and might |
190
|
|
|
|
|
|
|
# be set to the worker_id returned by the announce response. |
191
|
|
|
|
|
|
|
# |
192
|
244
|
|
|
|
|
923
|
$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}}; |
193
|
|
|
|
|
|
|
#$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}}; |
194
|
244
|
|
|
|
|
558
|
return $rpcswitch; |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
sub client { |
198
|
228
|
|
|
228
|
1
|
2071
|
my ($self, $msg, $method, $params, $reqauth) = @_; |
199
|
228
|
|
|
|
|
798
|
my ($req, $rsp) = $self->rpc_decode($msg); |
200
|
|
|
|
|
|
|
|
201
|
228
|
100
|
|
|
|
1520
|
if ($req eq 'rpcswitch.greetings') { |
|
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
202
|
38
|
50
|
|
|
|
380
|
my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert |
203
|
38
|
|
|
|
|
228
|
my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; |
204
|
38
|
|
|
|
|
304
|
$self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); |
205
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.hello') { |
206
|
19
|
50
|
|
|
|
1596
|
if (!$msg->{result}[0]) { |
207
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
208
|
|
|
|
|
|
|
} |
209
|
19
|
|
|
|
|
703
|
$self->rpc_send_call($method, $params, $reqauth); |
210
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.ping') { |
211
|
19
|
|
|
|
|
76
|
return [$msg->{result}]; # ping complete |
212
|
|
|
|
|
|
|
} elsif ($rsp eq $method) { |
213
|
133
|
50
|
|
|
|
760
|
if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel |
214
|
133
|
|
|
|
|
703
|
$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone |
215
|
|
|
|
|
|
|
} |
216
|
133
|
100
|
|
|
|
1406
|
if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump) |
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
217
|
19
|
|
|
|
|
361
|
$self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id |
218
|
|
|
|
|
|
|
} elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error |
219
|
19
|
|
|
|
|
380
|
my $e = valid_worker_err($msg->{result}[1]); |
220
|
19
|
|
|
|
|
285
|
die rpc_error('worker', to_json($e), $e); |
221
|
|
|
|
|
|
|
} elsif ($msg->{result}[0] eq 'RES_OK') { |
222
|
95
|
|
|
|
|
722
|
return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$] |
|
95
|
|
|
|
|
551
|
|
|
95
|
|
|
|
|
266
|
|
223
|
|
|
|
|
|
|
} |
224
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.result') { |
225
|
0
|
|
|
|
|
0
|
my $channel = $msg->{rpcswitch}{vci}; |
226
|
0
|
0
|
0
|
|
|
0
|
if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) { |
|
|
0
|
0
|
|
|
|
|
227
|
0
|
|
|
|
|
0
|
return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification) |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
228
|
|
|
|
|
|
|
} elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) { |
229
|
0
|
|
|
|
|
0
|
my $e = valid_worker_err($msg->{params}[2]); |
230
|
0
|
|
|
|
|
0
|
die rpc_error('worker', to_json($e), $e); |
231
|
|
|
|
|
|
|
} |
232
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]"); |
233
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.channel_gone') { |
234
|
19
|
|
|
|
|
209
|
my $channel = $msg->{params}{channel}; |
235
|
19
|
50
|
|
|
|
114
|
if (exists $self->{channels}{$channel}) { |
236
|
19
|
|
|
|
|
95
|
my $id = delete $self->{channels}{$channel}; |
237
|
19
|
|
|
|
|
133
|
die rpc_error('rpcswitch', "$req for request $id: $channel"); |
238
|
|
|
|
|
|
|
} |
239
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$req for unknown request: $channel"); |
240
|
|
|
|
|
|
|
} else { |
241
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1})); |
242
|
|
|
|
|
|
|
} |
243
|
76
|
|
|
|
|
456
|
return; |
244
|
|
|
|
|
|
|
} |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
sub worker { |
247
|
621
|
|
|
621
|
1
|
1814
|
my ($self, $msg, $workername) = @_; |
248
|
621
|
|
|
|
|
2880
|
my ($req, $rsp) = $self->rpc_decode($msg); |
249
|
|
|
|
|
|
|
|
250
|
621
|
100
|
|
|
|
3115
|
if ($req eq 'rpcswitch.greetings') { |
|
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
251
|
115
|
50
|
|
|
|
571
|
my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert |
252
|
115
|
|
|
|
|
520
|
my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; |
253
|
115
|
|
|
|
|
499
|
$self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); |
254
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.hello') { |
255
|
115
|
50
|
|
|
|
1199
|
if (!$msg->{result}[0]) { |
256
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
257
|
|
|
|
|
|
|
} |
258
|
115
|
|
|
|
|
2081
|
$self->rpc_worker_announce($workername); |
259
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.announce') { |
260
|
135
|
50
|
|
|
|
686
|
if (!$msg->{result}[0]) { |
261
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
262
|
|
|
|
|
|
|
} |
263
|
135
|
100
|
|
|
|
1381
|
my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; |
|
165
|
|
|
|
|
1622
|
|
|
135
|
|
|
|
|
1138
|
|
264
|
135
|
50
|
|
|
|
571
|
if (!defined $method) { |
265
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]"); |
266
|
|
|
|
|
|
|
} |
267
|
|
|
|
|
|
|
# register announced method |
268
|
135
|
|
|
|
|
477
|
$self->{announced}{$method}{cb} = $self->{methods}{$method}{cb}; |
269
|
135
|
|
|
|
|
389
|
$self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id}; |
270
|
135
|
|
|
|
|
351
|
delete $self->{methods}{$method}{id}; |
271
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.ping') { |
272
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $msg->{id}, result => 'pong!'}); |
273
|
|
|
|
|
|
|
} elsif (exists $self->{announced}{$req}) { |
274
|
248
|
|
|
|
|
625
|
$msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response |
275
|
|
|
|
|
|
|
|
276
|
248
|
|
|
|
|
654
|
$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone |
277
|
|
|
|
|
|
|
|
278
|
248
|
100
|
|
|
|
680
|
if ($self->{async}) { # use async call for forked childs |
279
|
199
|
|
|
|
|
1365
|
$self->{async}->msg_enqueue($msg); |
280
|
|
|
|
|
|
|
} else { |
281
|
49
|
|
|
|
|
170
|
my $rpcswitch = rpcswitch_resp($msg->{rpcswitch}); |
282
|
49
|
|
|
|
|
326
|
my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) }; |
|
49
|
|
|
|
|
310
|
|
283
|
49
|
100
|
|
|
|
1210
|
if ($@) { |
284
|
15
|
|
|
|
|
705
|
$self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch}); |
285
|
|
|
|
|
|
|
} else { |
286
|
34
|
|
|
|
|
503
|
$self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch}); |
287
|
|
|
|
|
|
|
} |
288
|
|
|
|
|
|
|
} |
289
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.withdraw') { |
290
|
|
|
|
|
|
|
# Note: the rpcswitch sends just a boolean result here |
291
|
|
|
|
|
|
|
# |
292
|
8
|
50
|
|
|
|
72
|
if (!$msg->{result}) { |
293
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}"); |
294
|
|
|
|
|
|
|
} |
295
|
8
|
50
|
|
|
|
24
|
my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; |
|
8
|
|
|
|
|
352
|
|
|
8
|
|
|
|
|
32
|
|
296
|
8
|
50
|
|
|
|
48
|
if (!defined $method) { |
297
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}"); |
298
|
|
|
|
|
|
|
} |
299
|
|
|
|
|
|
|
# remove announced method |
300
|
8
|
|
|
|
|
312
|
delete $self->{announced}{$method}; |
301
|
8
|
|
|
|
|
96
|
delete $self->{methods}{$method}{id}; |
302
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.channel_gone') { |
303
|
0
|
|
|
|
|
0
|
my $channel = $msg->{params}{channel}; |
304
|
0
|
0
|
|
|
|
0
|
if ($self->{async}) { |
305
|
0
|
|
|
0
|
|
0
|
my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel }); |
|
0
|
|
|
|
|
0
|
|
306
|
0
|
0
|
|
|
|
0
|
if (@$msgs) { |
307
|
0
|
|
|
|
|
0
|
warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs); |
|
0
|
|
|
|
|
0
|
|
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
} |
310
|
0
|
0
|
|
|
|
0
|
if (exists $self->{channels}{$channel}) { |
311
|
0
|
|
|
|
|
0
|
delete $self->{channels}{$channel}; |
312
|
|
|
|
|
|
|
} else { |
313
|
0
|
|
|
|
|
0
|
warn "worker $req for unknown request: $channel"; |
314
|
|
|
|
|
|
|
} |
315
|
|
|
|
|
|
|
} else { |
316
|
0
|
|
|
|
|
0
|
warn "worker unsupported msg: ".to_json($msg, {canonical => 1}); |
317
|
|
|
|
|
|
|
} |
318
|
621
|
|
|
|
|
2425
|
return; |
319
|
|
|
|
|
|
|
} |
320
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
sub is_session_req { |
322
|
34
|
|
|
34
|
0
|
128
|
my ($self, $params) = @_; |
323
|
34
|
100
|
|
|
|
319
|
return unless $self->{sessioncache}; |
324
|
|
|
|
|
|
|
|
325
|
20
|
50
|
33
|
|
|
251
|
if (exists $params->{session} && exists $params->{session}{id}) { |
326
|
20
|
|
|
|
|
97
|
return $params->{session}; |
327
|
|
|
|
|
|
|
} |
328
|
0
|
|
|
|
|
0
|
return; |
329
|
|
|
|
|
|
|
} |
330
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
sub is_session_resp { |
332
|
34
|
|
|
34
|
0
|
395
|
my ($self, $params) = @_; |
333
|
34
|
100
|
|
|
|
1027
|
return unless $self->{sessioncache}; |
334
|
|
|
|
|
|
|
|
335
|
20
|
50
|
33
|
|
|
512
|
if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) { |
|
|
|
33
|
|
|
|
|
|
|
|
33
|
|
|
|
|
336
|
20
|
|
|
|
|
152
|
return $params->[2]->{set_session}; |
337
|
|
|
|
|
|
|
} |
338
|
0
|
|
|
|
|
0
|
return; |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
sub child_handler { |
342
|
18
|
|
|
18
|
0
|
264
|
my ($self, $wr) = @_; |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# The child has to explicitly close the ssl-socket without shutdown. |
345
|
|
|
|
|
|
|
# Otherwise the parent will get an EOF. |
346
|
|
|
|
|
|
|
# see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors |
347
|
|
|
|
|
|
|
# |
348
|
18
|
50
|
|
|
|
555
|
if (ref($self->{sock}) eq 'IO::Socket::SSL') { |
349
|
0
|
|
|
|
|
0
|
$self->{sock}->close(SSL_no_shutdown => 1); |
350
|
|
|
|
|
|
|
} else { |
351
|
18
|
|
|
|
|
690
|
close($self->{sock}); |
352
|
|
|
|
|
|
|
} |
353
|
18
|
|
|
|
|
219
|
$self->{sock} = $wr; |
354
|
18
|
|
|
|
|
137
|
delete $self->{trace_cb}; |
355
|
18
|
|
|
|
|
687
|
local $SIG{INT} = 'DEFAULT'; |
356
|
18
|
|
|
|
|
542
|
local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
# When session handling is enabled a child might process |
359
|
|
|
|
|
|
|
# more than one request with the same session_id. |
360
|
|
|
|
|
|
|
# |
361
|
18
|
|
|
|
|
668
|
while (1) { |
362
|
24
|
|
|
|
|
345
|
my $b = eval { netstring_read($self->{sock}) }; |
|
24
|
|
|
|
|
1230
|
|
363
|
24
|
100
|
|
|
|
228
|
unless ($b) { |
364
|
4
|
50
|
33
|
|
|
64
|
next if ($@ && ($@ =~ /^EINTR/)); # interrupted |
365
|
4
|
50
|
|
|
|
45
|
die "worker child: $@" if $@; |
366
|
4
|
|
|
|
|
11
|
last; # EOF |
367
|
|
|
|
|
|
|
} |
368
|
20
|
|
|
|
|
425
|
my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
20
|
|
|
|
|
202
|
|
|
20
|
|
|
|
|
864
|
|
369
|
20
|
50
|
|
|
|
3291
|
die "worker child: $@" if $@; |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# The client catches all possible die() calls, so that it is |
372
|
|
|
|
|
|
|
# guaranteed to call exit either from here or from a signal handler. |
373
|
|
|
|
|
|
|
# |
374
|
20
|
|
|
|
|
270
|
my $params; |
375
|
20
|
|
|
|
|
318
|
my $callback = $self->{methods}{$msg->{method}}{cb}; |
376
|
20
|
|
|
|
|
120
|
my @resp; |
377
|
20
|
|
|
|
|
165
|
eval { |
378
|
20
|
|
|
|
|
483
|
local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code |
379
|
20
|
|
|
|
|
579
|
@resp = $callback->($msg->{params}, $msg->{rpcswitch}); |
380
|
|
|
|
|
|
|
}; |
381
|
20
|
50
|
|
|
|
904196
|
if (my $err = $@) { |
382
|
0
|
|
|
|
|
0
|
$params = ['RES_ERROR', $msg->{id}, $err]; |
383
|
|
|
|
|
|
|
} else { |
384
|
20
|
|
|
|
|
183
|
$params = ['RES_OK', $msg->{id}, @resp]; |
385
|
|
|
|
|
|
|
} |
386
|
20
|
|
|
|
|
214
|
$b = eval { to_json($params, {%{$self->{json_utf8}}}) }; |
|
20
|
|
|
|
|
435
|
|
|
20
|
|
|
|
|
355
|
|
387
|
20
|
50
|
|
|
|
1348
|
return 1 if $@; # signal die from json encode |
388
|
20
|
|
|
|
|
353
|
my $res = netstring_write($self->{sock}, $b); |
389
|
20
|
50
|
|
|
|
149
|
return 2 unless $res; # signal socket error |
390
|
|
|
|
|
|
|
|
391
|
20
|
100
|
66
|
|
|
140
|
last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params}); |
392
|
|
|
|
|
|
|
} |
393
|
18
|
100
|
|
|
|
595
|
close($self->{sock}) or return 3; # signal errors like broken pipe |
394
|
17
|
|
|
|
|
469
|
return 0; |
395
|
|
|
|
|
|
|
} |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
sub _worker_child_write { |
398
|
177
|
|
|
177
|
|
769
|
my ($self, $child, $msg) = @_; |
399
|
|
|
|
|
|
|
|
400
|
177
|
|
|
|
|
447
|
my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); |
|
177
|
|
|
|
|
7210
|
|
401
|
177
|
|
|
|
|
21890
|
my $res = netstring_write($child->{reader}, $b); # forward request to worker child |
402
|
177
|
50
|
|
|
|
777
|
die rpc_error('io', 'netstring_write') unless $res; |
403
|
177
|
|
|
|
|
689
|
return; |
404
|
|
|
|
|
|
|
} |
405
|
|
|
|
|
|
|
|
406
|
|
|
|
|
|
|
sub _worker_child_get { |
407
|
195
|
|
|
195
|
|
408
|
my ($self, $msg) = @_; |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
# First try to reuse child for existing session |
410
|
|
|
|
|
|
|
# |
411
|
195
|
100
|
|
|
|
571
|
if (my $sessioncache = $self->{sessioncache}) { |
412
|
20
|
50
|
|
|
|
106
|
if (my $session_req = $self->is_session_req($msg->{params})) { |
|
|
0
|
|
|
|
|
|
413
|
20
|
100
|
|
|
|
173
|
if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) { |
414
|
6
|
|
|
|
|
18
|
return $child; |
415
|
|
|
|
|
|
|
} |
416
|
|
|
|
|
|
|
} elsif ($sessioncache->{session_persist_user}) { |
417
|
0
|
0
|
|
|
|
0
|
if (exists $msg->{params}{$sessioncache->{session_persist_user}}) { |
418
|
0
|
|
|
|
|
0
|
my $user = $msg->{params}{$sessioncache->{session_persist_user}}; |
419
|
0
|
0
|
|
|
|
0
|
if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) { |
420
|
0
|
|
|
|
|
0
|
delete $child->{session}; # reused session will be added after session_resp |
421
|
0
|
|
|
|
|
0
|
return $child; |
422
|
|
|
|
|
|
|
} |
423
|
|
|
|
|
|
|
} |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
} |
426
|
189
|
|
|
|
|
740
|
my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci}); |
427
|
171
|
|
|
|
|
4868
|
return $child; |
428
|
|
|
|
|
|
|
} |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
sub _worker_childs_dequeue_and_run { |
431
|
602
|
|
|
602
|
|
1175
|
my ($self) = @_; |
432
|
|
|
|
|
|
|
|
433
|
602
|
|
|
|
|
2305
|
while (my $msg = $self->{async}->msg_dequeue()) { |
434
|
195
|
|
|
|
|
504
|
my $id = $msg->{id}; |
435
|
195
|
|
|
|
|
531
|
my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch}); |
436
|
|
|
|
|
|
|
|
437
|
195
|
|
|
|
|
320
|
my $child = eval { $self->_worker_child_get($msg) }; |
|
195
|
|
|
|
|
601
|
|
438
|
177
|
50
|
|
|
|
840
|
if ($@) { |
439
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); |
440
|
|
|
|
|
|
|
} else { |
441
|
177
|
|
|
|
|
405
|
eval { $self->_worker_child_write($child, $msg) }; |
|
177
|
|
|
|
|
2372
|
|
442
|
177
|
50
|
|
|
|
759
|
if ($@) { |
443
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); |
444
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
445
|
|
|
|
|
|
|
} else { |
446
|
177
|
|
|
|
|
10004
|
$self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp}); |
447
|
177
|
|
|
|
|
3304
|
$self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp}); |
448
|
|
|
|
|
|
|
} |
449
|
|
|
|
|
|
|
} |
450
|
|
|
|
|
|
|
} |
451
|
584
|
|
|
|
|
1101
|
return; |
452
|
|
|
|
|
|
|
} |
453
|
|
|
|
|
|
|
|
454
|
|
|
|
|
|
|
sub _worker_child_read_and_finish { |
455
|
166
|
|
|
166
|
|
681
|
my ($self, $child) = @_; |
456
|
|
|
|
|
|
|
|
457
|
166
|
|
|
|
|
532
|
my $res; |
458
|
166
|
|
|
|
|
320
|
my $b = eval { netstring_read($child->{reader}) }; |
|
166
|
|
|
|
|
1357
|
|
459
|
166
|
100
|
|
|
|
586
|
unless ($b) { |
460
|
14
|
50
|
|
|
|
280
|
my $err = $@ ? $@ : 'EOF'; |
461
|
14
|
|
|
|
|
448
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); |
462
|
14
|
|
|
|
|
966
|
$self->{async}->child_finish($child, 'error'); |
463
|
|
|
|
|
|
|
} else { |
464
|
152
|
|
|
|
|
275
|
my $params = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
152
|
|
|
|
|
281
|
|
|
152
|
|
|
|
|
1193
|
|
465
|
152
|
50
|
|
|
|
7864
|
if ($@) { |
466
|
0
|
|
|
|
|
0
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}}); |
467
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
468
|
|
|
|
|
|
|
} else { |
469
|
152
|
|
|
|
|
1869
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}}); |
470
|
152
|
50
|
|
|
|
790
|
unless ($res) { |
471
|
0
|
|
|
|
|
0
|
my $err = "result msg limit exceeded: " . length($b); |
472
|
0
|
|
|
|
|
0
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); |
473
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
474
|
0
|
|
|
|
|
0
|
return $res; |
475
|
|
|
|
|
|
|
} |
476
|
|
|
|
|
|
|
|
477
|
152
|
100
|
|
|
|
556
|
if (my $sessioncache = $self->{sessioncache}) { |
478
|
14
|
50
|
|
|
|
91
|
if (my $set_session = $self->is_session_resp($params)) { |
479
|
14
|
100
|
66
|
|
|
126
|
if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) { |
480
|
8
|
|
|
|
|
77
|
$child->{session} = $sessioncache->session_new($set_session); |
481
|
8
|
|
|
|
|
89
|
$sessioncache->expire_insert($child->{session}); |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
} |
484
|
|
|
|
|
|
|
|
485
|
14
|
100
|
|
|
|
56
|
if ($sessioncache->session_put($child)) { |
|
|
50
|
|
|
|
|
|
486
|
9
|
|
|
|
|
27
|
my $cnt = scalar keys %{$sessioncache->{active}}; |
|
9
|
|
|
|
|
30
|
|
487
|
9
|
50
|
|
|
|
24
|
if ($cnt > $sessioncache->{max_session}) { |
488
|
0
|
0
|
|
|
|
0
|
if ($child = $sessioncache->lru_dequeue()) { |
489
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'lru'); |
490
|
|
|
|
|
|
|
} |
491
|
|
|
|
|
|
|
} |
492
|
9
|
|
|
|
|
21
|
$child = undef; |
493
|
|
|
|
|
|
|
} elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) { |
494
|
|
|
|
|
|
|
# update idle user session with older session_id |
495
|
|
|
|
|
|
|
# |
496
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($idle_child, 'update'); |
497
|
|
|
|
|
|
|
|
498
|
0
|
0
|
|
|
|
0
|
if ($sessioncache->session_put($child)) { |
499
|
0
|
|
|
|
|
0
|
$child = undef; |
500
|
|
|
|
|
|
|
} |
501
|
|
|
|
|
|
|
} |
502
|
|
|
|
|
|
|
} |
503
|
152
|
100
|
|
|
|
511
|
if ($child) { |
504
|
143
|
|
|
|
|
558
|
$self->{async}->child_finish($child, 'done'); |
505
|
|
|
|
|
|
|
} |
506
|
|
|
|
|
|
|
} |
507
|
|
|
|
|
|
|
} |
508
|
166
|
|
|
|
|
1011
|
return $res; |
509
|
|
|
|
|
|
|
} |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
sub _worker_sessions_expire { |
512
|
602
|
|
|
602
|
|
1216
|
my ($self) = @_; |
513
|
602
|
100
|
|
|
|
1588
|
return unless $self->{sessioncache}; |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
# If a job for the expired session is active, the session |
516
|
|
|
|
|
|
|
# will be dropped when session_put() is called after the |
517
|
|
|
|
|
|
|
# job completed. |
518
|
|
|
|
|
|
|
# |
519
|
60
|
|
|
|
|
241
|
while (my $child = $self->{sessioncache}->expired_dequeue()) { |
520
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'expired'); |
521
|
|
|
|
|
|
|
} |
522
|
60
|
|
|
|
|
113
|
return; |
523
|
|
|
|
|
|
|
} |
524
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
sub rpc_timeout { |
526
|
1091
|
|
|
1091
|
0
|
2353
|
my ($self, $call_timeout) = @_; |
527
|
|
|
|
|
|
|
|
528
|
1091
|
100
|
66
|
|
|
3293
|
if ($call_timeout && (keys %{$self->{reqs}} > 0)) { |
|
19
|
|
|
|
|
285
|
|
529
|
19
|
|
|
|
|
190
|
return $call_timeout; # for individual client call |
530
|
|
|
|
|
|
|
} |
531
|
1072
|
|
|
|
|
2207
|
return $self->{timeout}; |
532
|
|
|
|
|
|
|
} |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
sub rpc_stopped { |
535
|
1109
|
|
|
1109
|
0
|
3284
|
my ($self) = @_; |
536
|
|
|
|
|
|
|
|
537
|
1109
|
50
|
|
|
|
4598
|
if ($self->{stop}) { |
538
|
0
|
0
|
0
|
|
|
0
|
if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) { |
|
0
|
0
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
|
|
539
|
0
|
|
|
|
|
0
|
return; # wait for withdraw to complete |
540
|
0
|
|
|
|
|
0
|
} elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) { |
541
|
0
|
|
|
|
|
0
|
return; # wait for active jobs to complete |
542
|
|
|
|
|
|
|
} |
543
|
0
|
|
|
|
|
0
|
return 1; |
544
|
|
|
|
|
|
|
} |
545
|
|
|
|
|
|
|
} |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
sub rpc_handler { |
548
|
305
|
|
|
305
|
0
|
1188
|
my ($self, $call_timeout, $handler, @handler_params) = @_; |
549
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
# returns response or throws rpc_error. |
551
|
|
|
|
|
|
|
# returns undef when remote side cleanly closed connection with EOF. |
552
|
|
|
|
|
|
|
# |
553
|
305
|
|
|
|
|
947
|
while (!$self->rpc_stopped()) { |
554
|
1109
|
|
|
|
|
2457
|
my @pipes = (); |
555
|
1109
|
100
|
|
|
|
2742
|
if ($self->{async}) { |
556
|
602
|
|
|
|
|
2139
|
$self->_worker_sessions_expire(); |
557
|
602
|
50
|
|
|
|
3799
|
$self->_worker_childs_dequeue_and_run() unless $self->{stop}; |
558
|
584
|
|
|
|
|
5159
|
$self->{async}->childs_reap(nonblock => 1); |
559
|
584
|
|
|
|
|
4484
|
$self->rpc_worker_flowcontrol(@handler_params); |
560
|
584
|
|
|
|
|
957
|
@pipes = map { $_->{reader} } values %{$self->{async}{jobs}}; |
|
428
|
|
|
|
|
1525
|
|
|
584
|
|
|
|
|
3674
|
|
561
|
|
|
|
|
|
|
} |
562
|
1091
|
|
|
|
|
5442
|
my $timeout = $self->rpc_timeout($call_timeout); |
563
|
|
|
|
|
|
|
|
564
|
1091
|
100
|
100
|
|
|
5990
|
if ($timeout || @pipes) { |
565
|
440
|
|
|
|
|
9262
|
my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout); |
566
|
440
|
50
|
66
|
|
|
13615112
|
next if (@ready == 0) && $!{EINTR}; # $! is not reset on success |
567
|
440
|
100
|
|
|
|
2144
|
die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0); |
568
|
|
|
|
|
|
|
|
569
|
421
|
|
|
|
|
2248
|
foreach my $fh (@ready) { |
570
|
461
|
50
|
66
|
|
|
4488
|
if (($fh != $self->{sock}) && $self->{async}) { |
571
|
166
|
50
|
|
|
|
1051
|
unless (exists $self->{async}{jobs}{$fh->fileno}) { |
572
|
0
|
|
|
|
|
0
|
die rpc_error('io', "child pipe not found: ". $fh->fileno); |
573
|
|
|
|
|
|
|
} |
574
|
166
|
|
|
|
|
1690
|
my $child = $self->{async}{jobs}{$fh->fileno}; |
575
|
166
|
|
|
|
|
1373
|
$self->{async}->job_rem($child); |
576
|
166
|
|
|
|
|
2912
|
my $res = $self->_worker_child_read_and_finish($child); |
577
|
|
|
|
|
|
|
} |
578
|
|
|
|
|
|
|
} |
579
|
421
|
100
|
|
|
|
1022
|
next unless grep { $_ == $self->{sock} } @ready; |
|
461
|
|
|
|
|
3819
|
|
580
|
|
|
|
|
|
|
} |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
# always block on full messages from rpcswitch |
583
|
946
|
|
|
|
|
1789
|
my $b = eval { netstring_read($self->{sock}) }; |
|
946
|
|
|
|
|
3591
|
|
584
|
946
|
100
|
|
|
|
3475
|
unless ($b) { |
585
|
97
|
50
|
33
|
|
|
420
|
next if ($@ && ($@ =~ /^EINTR/)); # check if stopped |
586
|
97
|
50
|
|
|
|
242
|
die rpc_error('io', $@) if $@; |
587
|
97
|
|
|
|
|
297
|
return; # EOF |
588
|
|
|
|
|
|
|
} |
589
|
849
|
|
|
|
|
1359
|
my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
849
|
|
|
|
|
1419
|
|
|
849
|
|
|
|
|
6132
|
|
590
|
849
|
100
|
|
|
|
33228
|
die rpc_error('jsonrpc', $@) if $@; |
591
|
830
|
100
|
|
|
|
2471
|
$self->{trace_cb}->('RCV', $msg) if $self->{trace_cb}; |
592
|
830
|
|
|
|
|
8028
|
my $res = eval { $handler->($self, $msg, @handler_params) }; |
|
830
|
|
|
|
|
3564
|
|
593
|
830
|
100
|
|
|
|
2503
|
if (my $err = $@) { |
594
|
38
|
50
|
|
|
|
1501
|
die $err if ref($err); # forward error |
595
|
0
|
|
|
|
|
0
|
die rpc_error('io', $err); |
596
|
|
|
|
|
|
|
} |
597
|
792
|
100
|
|
|
|
3823
|
if ($res) { |
598
|
114
|
|
|
|
|
684
|
return $res; |
599
|
|
|
|
|
|
|
} |
600
|
|
|
|
|
|
|
} |
601
|
0
|
|
|
|
|
0
|
return; # STOP is checked by caller |
602
|
|
|
|
|
|
|
} |
603
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
sub work { |
605
|
134
|
|
|
134
|
1
|
3339
|
my ($self, $workername, $methods, $opts) = @_; |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
# a write on a shutdown socket should never happen |
608
|
|
|
|
|
|
|
# |
609
|
134
|
|
|
0
|
|
4014
|
local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" }; |
|
0
|
|
|
|
|
0
|
|
610
|
|
|
|
|
|
|
|
611
|
134
|
|
|
|
|
913
|
foreach my $method (keys %$methods) { |
612
|
130
|
|
|
|
|
558
|
$self->{methods}{$method}{cb} = $methods->{$method}{cb}; |
613
|
130
|
100
|
|
|
|
612
|
$self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {}; |
614
|
130
|
50
|
|
|
|
613
|
$self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter}; |
615
|
|
|
|
|
|
|
} |
616
|
134
|
100
|
|
|
|
578
|
$opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb}; |
617
|
134
|
100
|
|
|
|
2238
|
$self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async}; |
618
|
134
|
50
|
|
|
|
644
|
$self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol}; |
619
|
134
|
100
|
|
|
|
708
|
$self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session}; |
620
|
134
|
|
|
|
|
792
|
$self->rpc_handler(0, \&worker, $workername); |
621
|
|
|
|
|
|
|
|
622
|
97
|
50
|
|
|
|
724
|
if ($self->{stop}) { |
623
|
0
|
|
|
|
|
0
|
$self->rpc_worker_withdraw(); |
624
|
0
|
|
|
|
|
0
|
$self->{stop} = 'withdraw'; |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
# wait some time for withdraw & active jobs to complete |
627
|
|
|
|
|
|
|
# |
628
|
0
|
|
|
0
|
|
0
|
local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; }; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
629
|
0
|
|
|
|
|
0
|
alarm($self->{gracetime}); |
630
|
0
|
|
|
|
|
0
|
$self->rpc_handler(0, \&worker, $workername); |
631
|
0
|
|
|
|
|
0
|
alarm(0); |
632
|
0
|
|
|
|
|
0
|
die rpc_error('io', 'STOP'); |
633
|
|
|
|
|
|
|
} |
634
|
97
|
100
|
|
|
|
382
|
if (my $async = $self->{async}) { |
635
|
|
|
|
|
|
|
# drop stored sessions |
636
|
|
|
|
|
|
|
# |
637
|
48
|
100
|
|
|
|
507
|
if (my $sessioncache = $self->{sessioncache}) { |
638
|
4
|
|
|
|
|
12
|
foreach my $session_id (keys %{$sessioncache->{active}}) { |
|
4
|
|
|
|
|
47
|
|
639
|
3
|
50
|
|
|
|
33
|
if (my $child = $sessioncache->session_get($session_id)) { |
640
|
3
|
|
|
|
|
27
|
$async->child_finish($child, 'idle'); |
641
|
|
|
|
|
|
|
} |
642
|
|
|
|
|
|
|
} |
643
|
|
|
|
|
|
|
} |
644
|
|
|
|
|
|
|
# reap remaining childs |
645
|
|
|
|
|
|
|
# |
646
|
48
|
50
|
|
|
|
172
|
if (keys %{$async->{finished}}) { |
|
48
|
|
|
|
|
291
|
|
647
|
48
|
|
|
0
|
|
2840
|
local $SIG{ALRM} = sub { warn "worker child wait timeout\n" }; |
|
0
|
|
|
|
|
0
|
|
648
|
48
|
|
|
|
|
700
|
alarm(1); # wait at most for one second |
649
|
48
|
50
|
|
|
|
344
|
unless ($async->childs_reap()) { # blocking |
650
|
0
|
|
|
|
|
0
|
$async->childs_reap(nonblock => 1); # continue nonblocking after timeout |
651
|
|
|
|
|
|
|
} |
652
|
48
|
|
|
|
|
1987
|
alarm(0); |
653
|
|
|
|
|
|
|
} |
654
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
# EOF is only an error here when there are outstanding requests |
656
|
|
|
|
|
|
|
# |
657
|
48
|
|
|
0
|
|
1002
|
my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 }); |
|
0
|
|
|
|
|
0
|
|
658
|
48
|
|
|
|
|
236
|
my @childs = keys %{$async->{finished}}; |
|
48
|
|
|
|
|
198
|
|
659
|
|
|
|
|
|
|
|
660
|
48
|
|
|
|
|
448
|
$async->childs_kill(); # don't wait here |
661
|
|
|
|
|
|
|
|
662
|
48
|
|
|
|
|
337
|
$async->{jobqueue} = []; |
663
|
48
|
|
|
|
|
241
|
$async->{finished} = {}; |
664
|
|
|
|
|
|
|
|
665
|
48
|
50
|
|
|
|
199
|
die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs); |
666
|
48
|
50
|
|
|
|
245
|
die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs); |
667
|
|
|
|
|
|
|
} |
668
|
97
|
|
|
|
|
3217
|
return; |
669
|
|
|
|
|
|
|
} |
670
|
|
|
|
|
|
|
|
671
|
|
|
|
|
|
|
sub call { |
672
|
171
|
|
|
171
|
1
|
181868
|
my ($self, $method, $params, $opts) = @_; |
673
|
171
|
|
|
|
|
456
|
my $reqauth = $opts->{reqauth}; |
674
|
171
|
|
|
|
|
323
|
my $call_timeout = $opts->{timeout}; |
675
|
|
|
|
|
|
|
|
676
|
171
|
100
|
|
|
|
608
|
if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests |
677
|
133
|
|
|
|
|
779
|
$self->rpc_send_call($method, $params, $reqauth); |
678
|
|
|
|
|
|
|
} |
679
|
|
|
|
|
|
|
# EOF is an error here (response missing) |
680
|
|
|
|
|
|
|
# |
681
|
171
|
50
|
|
|
|
1254
|
my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof'); |
682
|
114
|
100
|
|
|
|
779
|
return wantarray() ? @$res : $res->[0]; |
683
|
|
|
|
|
|
|
} |
684
|
|
|
|
|
|
|
|
685
|
|
|
|
|
|
|
# stop() exits an active $client->work() worker handler. |
686
|
|
|
|
|
|
|
# |
687
|
|
|
|
|
|
|
# - work() dies with RPC::Switch::Client::Tiny::Error which |
688
|
|
|
|
|
|
|
# might be {type => 'io', message => 'STOP'}, or any other |
689
|
|
|
|
|
|
|
# error if a non-restartable system call was interrupted. |
690
|
|
|
|
|
|
|
# |
691
|
|
|
|
|
|
|
# - stop() makes no sense for call() (it has rpc_timeout) |
692
|
|
|
|
|
|
|
# - the only way to call stop is from a signal handler. |
693
|
|
|
|
|
|
|
# - if a signal handler is called, non-restartavle perl |
694
|
|
|
|
|
|
|
# system call are interrupted and return $! == EINTR. |
695
|
|
|
|
|
|
|
# |
696
|
|
|
|
|
|
|
# -> this can break an active worker handler and result in |
697
|
|
|
|
|
|
|
# a RES_ERROR-message to the caller if a non-restartable |
698
|
|
|
|
|
|
|
# perl syscall is interrupted. |
699
|
|
|
|
|
|
|
# -> for the async worker mode this should mostly work. |
700
|
|
|
|
|
|
|
# (sysreadfull, rpc print & IO::Select are restartable). |
701
|
|
|
|
|
|
|
# -> stop will just wait for a gracetime of 2 seconds |
702
|
|
|
|
|
|
|
# for active jobs to complete. The remaining jobs |
703
|
|
|
|
|
|
|
# are terminated. |
704
|
|
|
|
|
|
|
# |
705
|
|
|
|
|
|
|
sub stop { |
706
|
0
|
|
|
0
|
0
|
0
|
my ($self, $opts) = @_; |
707
|
0
|
0
|
|
|
|
0
|
$self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2; |
708
|
0
|
|
|
|
|
0
|
$self->{stop} = 'pending'; |
709
|
0
|
|
|
|
|
0
|
return; |
710
|
|
|
|
|
|
|
} |
711
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
# The perl object destroy order is undefined, so $self->{sock} |
713
|
|
|
|
|
|
|
# might already be destroyed and it makes no sense to try to |
714
|
|
|
|
|
|
|
# send RES_ERROR messages for remaining childs. |
715
|
|
|
|
|
|
|
# see: https://perldoc.perl.org/perlobj#Global-Destruction |
716
|
|
|
|
|
|
|
# |
717
|
|
|
|
|
|
|
# So just terminate remaining childs, and let init-process |
718
|
|
|
|
|
|
|
# collect them instead of calling waitpid() here. |
719
|
|
|
|
|
|
|
# |
720
|
|
|
|
|
|
|
# TODO: perl will call DESTROY only when the process exits |
721
|
|
|
|
|
|
|
# cleanly or calls exit(). If the process is killed, perl |
722
|
|
|
|
|
|
|
# calls DESTROY only if a handler for the matching signal |
723
|
|
|
|
|
|
|
# is installed, like: $SIG{'TERM'} = sub { exit; }; |
724
|
|
|
|
|
|
|
# |
725
|
|
|
|
|
|
|
# -> so does it make sense to support DESTROY at all, |
726
|
|
|
|
|
|
|
# if there are situations when it is not called? |
727
|
|
|
|
|
|
|
# |
728
|
|
|
|
|
|
|
sub DESTROY { |
729
|
196
|
|
|
196
|
|
3569
|
my ($self) = @_; |
730
|
196
|
100
|
|
|
|
1966
|
$self->{async}->childs_kill() if $self->{async}; # don't wait here |
731
|
|
|
|
|
|
|
} |
732
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
1; |
734
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
__END__ |