| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
# Lightweight client for the RPC-Switch json-rpc request multiplexer |
|
2
|
|
|
|
|
|
|
# |
|
3
|
|
|
|
|
|
|
# see: RPC::Switch: https://github.com/a6502/rpc-switch |
|
4
|
|
|
|
|
|
|
# RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client |
|
5
|
|
|
|
|
|
|
# |
|
6
|
|
|
|
|
|
|
package RPC::Switch::Client::Tiny; |
|
7
|
|
|
|
|
|
|
|
|
8
|
20
|
|
|
20
|
|
2165687
|
use strict; |
|
|
20
|
|
|
|
|
217
|
|
|
|
20
|
|
|
|
|
561
|
|
|
9
|
20
|
|
|
20
|
|
100
|
use warnings; |
|
|
20
|
|
|
|
|
41
|
|
|
|
20
|
|
|
|
|
520
|
|
|
10
|
20
|
|
|
20
|
|
99
|
use Carp 'croak'; |
|
|
20
|
|
|
|
|
40
|
|
|
|
20
|
|
|
|
|
999
|
|
|
11
|
20
|
|
|
20
|
|
858
|
use JSON; |
|
|
20
|
|
|
|
|
8171
|
|
|
|
20
|
|
|
|
|
100
|
|
|
12
|
20
|
|
|
20
|
|
11581
|
use IO::Select; |
|
|
20
|
|
|
|
|
35863
|
|
|
|
20
|
|
|
|
|
1567
|
|
|
13
|
20
|
|
|
20
|
|
16800
|
use IO::Socket::SSL; |
|
|
20
|
|
|
|
|
1259498
|
|
|
|
20
|
|
|
|
|
179
|
|
|
14
|
20
|
|
|
20
|
|
13940
|
use Time::HiRes qw(time); |
|
|
20
|
|
|
|
|
25919
|
|
|
|
20
|
|
|
|
|
105
|
|
|
15
|
20
|
|
|
20
|
|
12689
|
use RPC::Switch::Client::Tiny::Error; |
|
|
20
|
|
|
|
|
41
|
|
|
|
20
|
|
|
|
|
629
|
|
|
16
|
20
|
|
|
20
|
|
7719
|
use RPC::Switch::Client::Tiny::Netstring; |
|
|
20
|
|
|
|
|
40
|
|
|
|
20
|
|
|
|
|
1188
|
|
|
17
|
20
|
|
|
20
|
|
8005
|
use RPC::Switch::Client::Tiny::Async; |
|
|
20
|
|
|
|
|
59
|
|
|
|
20
|
|
|
|
|
704
|
|
|
18
|
20
|
|
|
20
|
|
8307
|
use RPC::Switch::Client::Tiny::SessionCache; |
|
|
20
|
|
|
|
|
40
|
|
|
|
20
|
|
|
|
|
112304
|
|
|
19
|
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
our $VERSION = '1.66'; |
|
21
|
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
sub new { |
|
23
|
234
|
|
|
234
|
1
|
421262
|
my ($class, %args) = @_; |
|
24
|
234
|
50
|
|
|
|
1822
|
my $s = $args{sock} or croak __PACKAGE__ . " expects sock"; |
|
25
|
234
|
50
|
|
|
|
1313
|
unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call |
|
26
|
234
|
50
|
|
|
|
1333
|
defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!"; |
|
27
|
234
|
100
|
|
|
|
3134
|
unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; } |
|
|
19
|
|
|
|
|
3857
|
|
|
28
|
|
|
|
|
|
|
} |
|
29
|
215
|
50
|
|
|
|
646
|
unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; } |
|
|
0
|
|
|
|
|
0
|
|
|
30
|
215
|
|
|
|
|
3875
|
my $self = bless { |
|
31
|
|
|
|
|
|
|
%args, |
|
32
|
|
|
|
|
|
|
id => 1, # next request id |
|
33
|
|
|
|
|
|
|
state => '', # last rpcswitch.type |
|
34
|
|
|
|
|
|
|
reqs => {}, # outstanding requests |
|
35
|
|
|
|
|
|
|
channels => {}, # open rpcswitch channels |
|
36
|
|
|
|
|
|
|
methods => {}, # defined worker methods |
|
37
|
|
|
|
|
|
|
announced => {}, # announced worker methods |
|
38
|
|
|
|
|
|
|
msglimit => 999999, # max netstring size |
|
39
|
|
|
|
|
|
|
}, $class; |
|
40
|
215
|
50
|
|
|
|
1580
|
if (ref($self->{sock}) eq 'IO::Socket::SSL') { |
|
41
|
0
|
0
|
|
|
|
0
|
$self->{auth_method} = 'clientcert' unless exists $self->{auth_method}; |
|
42
|
0
|
0
|
|
|
|
0
|
$self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert |
|
43
|
|
|
|
|
|
|
} else { |
|
44
|
215
|
50
|
|
|
|
1085
|
$self->{auth_method} = 'password' unless exists $self->{auth_method}; |
|
45
|
|
|
|
|
|
|
} |
|
46
|
215
|
100
|
|
|
|
1120
|
$self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1}; |
|
47
|
215
|
|
|
|
|
1050
|
return $self; |
|
48
|
|
|
|
|
|
|
} |
|
49
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
sub rpc_error { |
|
51
|
76
|
|
|
76
|
0
|
1349
|
return RPC::Switch::Client::Tiny::Error->new(@_); |
|
52
|
|
|
|
|
|
|
} |
|
53
|
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
sub rpc_send { |
|
55
|
864
|
|
|
864
|
0
|
3545
|
my ($self, $msg) = @_; |
|
56
|
864
|
|
|
|
|
3627
|
my $s = $self->{sock}; |
|
57
|
864
|
|
|
|
|
3738
|
my $len = length($s); |
|
58
|
864
|
50
|
33
|
|
|
9832
|
if ($self->{msglimit} && ($len > $self->{msglimit})) { |
|
59
|
0
|
|
|
|
|
0
|
warn "rpc_send msglimit exceeded: $len > $self->{msglimit}"; |
|
60
|
0
|
|
|
|
|
0
|
return; |
|
61
|
|
|
|
|
|
|
} |
|
62
|
864
|
|
|
|
|
4349
|
$msg->{jsonrpc} = '2.0'; |
|
63
|
864
|
|
|
|
|
1592
|
my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); |
|
|
864
|
|
|
|
|
5326
|
|
|
64
|
864
|
100
|
|
|
|
31099
|
$self->{trace_cb}->('SND', $msg) if $self->{trace_cb}; |
|
65
|
864
|
|
|
|
|
12320
|
return netstring_write($s, $str); |
|
66
|
|
|
|
|
|
|
} |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
sub rpc_send_req { |
|
69
|
472
|
|
|
472
|
0
|
1593
|
my ($self, $method, $msg) = @_; |
|
70
|
472
|
|
|
|
|
1902
|
my $id = "$self->{id}"; $self->{id}++; |
|
|
472
|
|
|
|
|
982
|
|
|
71
|
472
|
|
|
|
|
1142
|
$msg->{id} = $id; |
|
72
|
472
|
|
|
|
|
880
|
$msg->{method} = $method; |
|
73
|
472
|
|
|
|
|
2619
|
$self->{reqs}{$id} = $method; |
|
74
|
472
|
100
|
|
|
|
4318
|
$self->{state} = $method if $method =~ /^rpcswitch\./; |
|
75
|
472
|
100
|
|
|
|
1559
|
$self->rpc_send($msg) or return; |
|
76
|
467
|
|
|
|
|
1452
|
return $id; |
|
77
|
|
|
|
|
|
|
} |
|
78
|
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
sub rpc_send_call { |
|
80
|
176
|
|
|
176
|
0
|
1345
|
my ($self, $method, $params, $reqauth) = @_; |
|
81
|
|
|
|
|
|
|
|
|
82
|
176
|
50
|
|
|
|
557
|
if (defined $reqauth) { # request authentication |
|
83
|
|
|
|
|
|
|
# Without the vcookie the rpcswitch does not validate |
|
84
|
|
|
|
|
|
|
# the reqauth parameter. |
|
85
|
|
|
|
|
|
|
# The vcookie 'eatme' value is hardcoded in the rpc-switch |
|
86
|
|
|
|
|
|
|
# code and is called 'channel information version'. |
|
87
|
|
|
|
|
|
|
# |
|
88
|
0
|
|
|
|
|
0
|
return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}}); |
|
89
|
|
|
|
|
|
|
} else { |
|
90
|
176
|
|
|
|
|
752
|
return $self->rpc_send_req($method, {params => $params}); |
|
91
|
|
|
|
|
|
|
} |
|
92
|
|
|
|
|
|
|
} |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
sub rpc_decode { |
|
95
|
849
|
|
|
849
|
0
|
2070
|
my ($self, $msg) = @_; |
|
96
|
849
|
|
|
|
|
4744
|
my ($req, $rsp) = ('', ''); |
|
97
|
|
|
|
|
|
|
|
|
98
|
849
|
50
|
66
|
|
|
6762
|
unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) { |
|
|
|
|
66
|
|
|
|
|
|
99
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1})); |
|
100
|
|
|
|
|
|
|
} |
|
101
|
849
|
100
|
|
|
|
5430
|
if (exists $msg->{method}) { |
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
102
|
420
|
|
|
|
|
1060
|
$req = $msg->{method}; |
|
103
|
|
|
|
|
|
|
} elsif (!defined $msg->{id}) { |
|
104
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1})); |
|
105
|
|
|
|
|
|
|
} elsif (!exists $self->{reqs}{$msg->{id}}) { |
|
106
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1})); |
|
107
|
|
|
|
|
|
|
} else { |
|
108
|
429
|
|
|
|
|
1403
|
$rsp = delete $self->{reqs}{$msg->{id}}; |
|
109
|
|
|
|
|
|
|
|
|
110
|
429
|
50
|
|
|
|
1567
|
if (exists $msg->{error}) { |
|
111
|
0
|
|
|
|
|
0
|
die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}}); |
|
112
|
|
|
|
|
|
|
} |
|
113
|
429
|
50
|
|
|
|
1124
|
if (!exists $msg->{result}) { |
|
114
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing"); |
|
115
|
|
|
|
|
|
|
} |
|
116
|
429
|
50
|
100
|
|
|
1913
|
if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) { |
|
|
|
|
66
|
|
|
|
|
|
117
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}"); |
|
118
|
|
|
|
|
|
|
} |
|
119
|
|
|
|
|
|
|
} |
|
120
|
849
|
|
|
|
|
4103
|
return ($req, $rsp); |
|
121
|
|
|
|
|
|
|
} |
|
122
|
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
sub rpc_worker_announce { |
|
124
|
159
|
|
|
159
|
0
|
478
|
my ($self, $workername) = @_; |
|
125
|
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
# ignore repeated announce request or unfinished withdraw |
|
127
|
|
|
|
|
|
|
# |
|
128
|
159
|
100
|
|
|
|
240
|
return if (keys %{$self->{announced}}); |
|
|
159
|
|
|
|
|
684
|
|
|
129
|
|
|
|
|
|
|
|
|
130
|
130
|
|
|
|
|
259
|
foreach my $method (keys %{$self->{methods}}) { |
|
|
130
|
|
|
|
|
594
|
|
|
131
|
145
|
100
|
|
|
|
501
|
next if exists $self->{methods}{$method}{id}; # active announce/withdraw request |
|
132
|
|
|
|
|
|
|
|
|
133
|
135
|
|
|
|
|
885
|
my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}}; |
|
134
|
135
|
50
|
|
|
|
587
|
$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; |
|
135
|
135
|
|
|
|
|
504
|
my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params}); |
|
136
|
135
|
50
|
|
|
|
701
|
die rpc_error('io', 'netstring_write') unless defined $id; |
|
137
|
135
|
|
|
|
|
705
|
$self->{methods}{$method}{id} = $id; |
|
138
|
|
|
|
|
|
|
} |
|
139
|
130
|
|
|
|
|
341
|
return; |
|
140
|
|
|
|
|
|
|
} |
|
141
|
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
sub rpc_worker_withdraw { |
|
143
|
31
|
|
|
31
|
0
|
110
|
my ($self) = @_; |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# callers will get code -32006 'opposite end of channel gone' |
|
146
|
|
|
|
|
|
|
# errors when the announcement is withdrawn. |
|
147
|
|
|
|
|
|
|
# |
|
148
|
31
|
|
|
|
|
54
|
foreach my $method (keys %{$self->{announced}}) { |
|
|
31
|
|
|
|
|
262
|
|
|
149
|
16
|
100
|
|
|
|
176
|
next if exists $self->{methods}{$method}{id}; # active announce/withdraw request |
|
150
|
|
|
|
|
|
|
|
|
151
|
8
|
|
|
|
|
40
|
my $params = {method => $method}; |
|
152
|
8
|
50
|
|
|
|
176
|
$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; |
|
153
|
8
|
|
|
|
|
168
|
my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params}); |
|
154
|
8
|
50
|
|
|
|
88
|
die rpc_error('io', 'netstring_write') unless defined $id; |
|
155
|
8
|
|
|
|
|
192
|
$self->{methods}{$method}{id} = $id; |
|
156
|
|
|
|
|
|
|
} |
|
157
|
31
|
|
|
|
|
117
|
return; |
|
158
|
|
|
|
|
|
|
} |
|
159
|
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
sub rpc_worker_flowcontrol { |
|
161
|
584
|
|
|
584
|
0
|
2578
|
my ($self, $workername) = @_; |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
# need to be in connected auth state |
|
164
|
584
|
100
|
100
|
|
|
5023
|
return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello')); |
|
165
|
|
|
|
|
|
|
|
|
166
|
452
|
100
|
|
|
|
1356
|
if ($self->{flowcontrol}) { |
|
167
|
102
|
|
|
|
|
260
|
my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}}); |
|
|
102
|
|
|
|
|
354
|
|
|
|
102
|
|
|
|
|
240
|
|
|
168
|
|
|
|
|
|
|
#printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async}; |
|
169
|
102
|
100
|
|
|
|
472
|
if ($cnt >= $self->{async}{max_async} * 2) { |
|
|
|
100
|
|
|
|
|
|
|
170
|
31
|
|
|
|
|
229
|
$self->rpc_worker_withdraw(); |
|
171
|
|
|
|
|
|
|
} elsif ($cnt < $self->{async}{max_async}) { |
|
172
|
44
|
|
|
|
|
236
|
$self->rpc_worker_announce($workername); |
|
173
|
|
|
|
|
|
|
} |
|
174
|
|
|
|
|
|
|
} |
|
175
|
452
|
|
|
|
|
947
|
return; |
|
176
|
|
|
|
|
|
|
} |
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
sub valid_worker_err { |
|
179
|
19
|
|
|
19
|
0
|
114
|
my ($err) = @_; |
|
180
|
19
|
50
|
|
|
|
532
|
$err = {text => $err} unless ref($err); # convert plain errors |
|
181
|
19
|
50
|
|
|
|
228
|
$err->{class} = 'hard' unless exists $err->{class}; |
|
182
|
19
|
|
|
|
|
57
|
return $err; |
|
183
|
|
|
|
|
|
|
} |
|
184
|
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub rpcswitch_resp { |
|
186
|
244
|
|
|
244
|
0
|
702
|
my ($rpcswitch) = @_; |
|
187
|
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
# Just the vcookie & vci-channel parameters are required by |
|
189
|
|
|
|
|
|
|
# the rpcswitch. The worker_id field is optional, and might |
|
190
|
|
|
|
|
|
|
# be set to the worker_id returned by the announce response. |
|
191
|
|
|
|
|
|
|
# |
|
192
|
244
|
|
|
|
|
923
|
$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}}; |
|
193
|
|
|
|
|
|
|
#$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}}; |
|
194
|
244
|
|
|
|
|
558
|
return $rpcswitch; |
|
195
|
|
|
|
|
|
|
} |
|
196
|
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
sub client { |
|
198
|
228
|
|
|
228
|
1
|
2071
|
my ($self, $msg, $method, $params, $reqauth) = @_; |
|
199
|
228
|
|
|
|
|
798
|
my ($req, $rsp) = $self->rpc_decode($msg); |
|
200
|
|
|
|
|
|
|
|
|
201
|
228
|
100
|
|
|
|
1520
|
if ($req eq 'rpcswitch.greetings') { |
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
202
|
38
|
50
|
|
|
|
380
|
my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert |
|
203
|
38
|
|
|
|
|
228
|
my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; |
|
204
|
38
|
|
|
|
|
304
|
$self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); |
|
205
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.hello') { |
|
206
|
19
|
50
|
|
|
|
1596
|
if (!$msg->{result}[0]) { |
|
207
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
|
208
|
|
|
|
|
|
|
} |
|
209
|
19
|
|
|
|
|
703
|
$self->rpc_send_call($method, $params, $reqauth); |
|
210
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.ping') { |
|
211
|
19
|
|
|
|
|
76
|
return [$msg->{result}]; # ping complete |
|
212
|
|
|
|
|
|
|
} elsif ($rsp eq $method) { |
|
213
|
133
|
50
|
|
|
|
760
|
if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel |
|
214
|
133
|
|
|
|
|
703
|
$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone |
|
215
|
|
|
|
|
|
|
} |
|
216
|
133
|
100
|
|
|
|
1406
|
if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump) |
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
217
|
19
|
|
|
|
|
361
|
$self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id |
|
218
|
|
|
|
|
|
|
} elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error |
|
219
|
19
|
|
|
|
|
380
|
my $e = valid_worker_err($msg->{result}[1]); |
|
220
|
19
|
|
|
|
|
285
|
die rpc_error('worker', to_json($e), $e); |
|
221
|
|
|
|
|
|
|
} elsif ($msg->{result}[0] eq 'RES_OK') { |
|
222
|
95
|
|
|
|
|
722
|
return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$] |
|
|
95
|
|
|
|
|
551
|
|
|
|
95
|
|
|
|
|
266
|
|
|
223
|
|
|
|
|
|
|
} |
|
224
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.result') { |
|
225
|
0
|
|
|
|
|
0
|
my $channel = $msg->{rpcswitch}{vci}; |
|
226
|
0
|
0
|
0
|
|
|
0
|
if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) { |
|
|
|
0
|
0
|
|
|
|
|
|
227
|
0
|
|
|
|
|
0
|
return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification) |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
228
|
|
|
|
|
|
|
} elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) { |
|
229
|
0
|
|
|
|
|
0
|
my $e = valid_worker_err($msg->{params}[2]); |
|
230
|
0
|
|
|
|
|
0
|
die rpc_error('worker', to_json($e), $e); |
|
231
|
|
|
|
|
|
|
} |
|
232
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]"); |
|
233
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.channel_gone') { |
|
234
|
19
|
|
|
|
|
209
|
my $channel = $msg->{params}{channel}; |
|
235
|
19
|
50
|
|
|
|
114
|
if (exists $self->{channels}{$channel}) { |
|
236
|
19
|
|
|
|
|
95
|
my $id = delete $self->{channels}{$channel}; |
|
237
|
19
|
|
|
|
|
133
|
die rpc_error('rpcswitch', "$req for request $id: $channel"); |
|
238
|
|
|
|
|
|
|
} |
|
239
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$req for unknown request: $channel"); |
|
240
|
|
|
|
|
|
|
} else { |
|
241
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1})); |
|
242
|
|
|
|
|
|
|
} |
|
243
|
76
|
|
|
|
|
456
|
return; |
|
244
|
|
|
|
|
|
|
} |
|
245
|
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
sub worker { |
|
247
|
621
|
|
|
621
|
1
|
1814
|
my ($self, $msg, $workername) = @_; |
|
248
|
621
|
|
|
|
|
2880
|
my ($req, $rsp) = $self->rpc_decode($msg); |
|
249
|
|
|
|
|
|
|
|
|
250
|
621
|
100
|
|
|
|
3115
|
if ($req eq 'rpcswitch.greetings') { |
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
251
|
115
|
50
|
|
|
|
571
|
my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert |
|
252
|
115
|
|
|
|
|
520
|
my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; |
|
253
|
115
|
|
|
|
|
499
|
$self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); |
|
254
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.hello') { |
|
255
|
115
|
50
|
|
|
|
1199
|
if (!$msg->{result}[0]) { |
|
256
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
|
257
|
|
|
|
|
|
|
} |
|
258
|
115
|
|
|
|
|
2081
|
$self->rpc_worker_announce($workername); |
|
259
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.announce') { |
|
260
|
135
|
50
|
|
|
|
686
|
if (!$msg->{result}[0]) { |
|
261
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); |
|
262
|
|
|
|
|
|
|
} |
|
263
|
135
|
100
|
|
|
|
1381
|
my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; |
|
|
165
|
|
|
|
|
1622
|
|
|
|
135
|
|
|
|
|
1138
|
|
|
264
|
135
|
50
|
|
|
|
571
|
if (!defined $method) { |
|
265
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]"); |
|
266
|
|
|
|
|
|
|
} |
|
267
|
|
|
|
|
|
|
# register announced method |
|
268
|
135
|
|
|
|
|
477
|
$self->{announced}{$method}{cb} = $self->{methods}{$method}{cb}; |
|
269
|
135
|
|
|
|
|
389
|
$self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id}; |
|
270
|
135
|
|
|
|
|
351
|
delete $self->{methods}{$method}{id}; |
|
271
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.ping') { |
|
272
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $msg->{id}, result => 'pong!'}); |
|
273
|
|
|
|
|
|
|
} elsif (exists $self->{announced}{$req}) { |
|
274
|
248
|
|
|
|
|
625
|
$msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response |
|
275
|
|
|
|
|
|
|
|
|
276
|
248
|
|
|
|
|
654
|
$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone |
|
277
|
|
|
|
|
|
|
|
|
278
|
248
|
100
|
|
|
|
680
|
if ($self->{async}) { # use async call for forked childs |
|
279
|
199
|
|
|
|
|
1365
|
$self->{async}->msg_enqueue($msg); |
|
280
|
|
|
|
|
|
|
} else { |
|
281
|
49
|
|
|
|
|
170
|
my $rpcswitch = rpcswitch_resp($msg->{rpcswitch}); |
|
282
|
49
|
|
|
|
|
326
|
my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) }; |
|
|
49
|
|
|
|
|
310
|
|
|
283
|
49
|
100
|
|
|
|
1210
|
if ($@) { |
|
284
|
15
|
|
|
|
|
705
|
$self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch}); |
|
285
|
|
|
|
|
|
|
} else { |
|
286
|
34
|
|
|
|
|
503
|
$self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch}); |
|
287
|
|
|
|
|
|
|
} |
|
288
|
|
|
|
|
|
|
} |
|
289
|
|
|
|
|
|
|
} elsif ($rsp eq 'rpcswitch.withdraw') { |
|
290
|
|
|
|
|
|
|
# Note: the rpcswitch sends just a boolean result here |
|
291
|
|
|
|
|
|
|
# |
|
292
|
8
|
50
|
|
|
|
72
|
if (!$msg->{result}) { |
|
293
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "$rsp failed: $msg->{result}"); |
|
294
|
|
|
|
|
|
|
} |
|
295
|
8
|
50
|
|
|
|
24
|
my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; |
|
|
8
|
|
|
|
|
352
|
|
|
|
8
|
|
|
|
|
32
|
|
|
296
|
8
|
50
|
|
|
|
48
|
if (!defined $method) { |
|
297
|
0
|
|
|
|
|
0
|
die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}"); |
|
298
|
|
|
|
|
|
|
} |
|
299
|
|
|
|
|
|
|
# remove announced method |
|
300
|
8
|
|
|
|
|
312
|
delete $self->{announced}{$method}; |
|
301
|
8
|
|
|
|
|
96
|
delete $self->{methods}{$method}{id}; |
|
302
|
|
|
|
|
|
|
} elsif ($req eq 'rpcswitch.channel_gone') { |
|
303
|
0
|
|
|
|
|
0
|
my $channel = $msg->{params}{channel}; |
|
304
|
0
|
0
|
|
|
|
0
|
if ($self->{async}) { |
|
305
|
0
|
|
|
0
|
|
0
|
my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel }); |
|
|
0
|
|
|
|
|
0
|
|
|
306
|
0
|
0
|
|
|
|
0
|
if (@$msgs) { |
|
307
|
0
|
|
|
|
|
0
|
warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs); |
|
|
0
|
|
|
|
|
0
|
|
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
} |
|
310
|
0
|
0
|
|
|
|
0
|
if (exists $self->{channels}{$channel}) { |
|
311
|
0
|
|
|
|
|
0
|
delete $self->{channels}{$channel}; |
|
312
|
|
|
|
|
|
|
} else { |
|
313
|
0
|
|
|
|
|
0
|
warn "worker $req for unknown request: $channel"; |
|
314
|
|
|
|
|
|
|
} |
|
315
|
|
|
|
|
|
|
} else { |
|
316
|
0
|
|
|
|
|
0
|
warn "worker unsupported msg: ".to_json($msg, {canonical => 1}); |
|
317
|
|
|
|
|
|
|
} |
|
318
|
621
|
|
|
|
|
2425
|
return; |
|
319
|
|
|
|
|
|
|
} |
|
320
|
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
sub is_session_req { |
|
322
|
34
|
|
|
34
|
0
|
128
|
my ($self, $params) = @_; |
|
323
|
34
|
100
|
|
|
|
319
|
return unless $self->{sessioncache}; |
|
324
|
|
|
|
|
|
|
|
|
325
|
20
|
50
|
33
|
|
|
251
|
if (exists $params->{session} && exists $params->{session}{id}) { |
|
326
|
20
|
|
|
|
|
97
|
return $params->{session}; |
|
327
|
|
|
|
|
|
|
} |
|
328
|
0
|
|
|
|
|
0
|
return; |
|
329
|
|
|
|
|
|
|
} |
|
330
|
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
sub is_session_resp { |
|
332
|
34
|
|
|
34
|
0
|
395
|
my ($self, $params) = @_; |
|
333
|
34
|
100
|
|
|
|
1027
|
return unless $self->{sessioncache}; |
|
334
|
|
|
|
|
|
|
|
|
335
|
20
|
50
|
33
|
|
|
512
|
if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) { |
|
|
|
|
33
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
336
|
20
|
|
|
|
|
152
|
return $params->[2]->{set_session}; |
|
337
|
|
|
|
|
|
|
} |
|
338
|
0
|
|
|
|
|
0
|
return; |
|
339
|
|
|
|
|
|
|
} |
|
340
|
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
sub child_handler { |
|
342
|
18
|
|
|
18
|
0
|
264
|
my ($self, $wr) = @_; |
|
343
|
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# The child has to explicitly close the ssl-socket without shutdown. |
|
345
|
|
|
|
|
|
|
# Otherwise the parent will get an EOF. |
|
346
|
|
|
|
|
|
|
# see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors |
|
347
|
|
|
|
|
|
|
# |
|
348
|
18
|
50
|
|
|
|
555
|
if (ref($self->{sock}) eq 'IO::Socket::SSL') { |
|
349
|
0
|
|
|
|
|
0
|
$self->{sock}->close(SSL_no_shutdown => 1); |
|
350
|
|
|
|
|
|
|
} else { |
|
351
|
18
|
|
|
|
|
690
|
close($self->{sock}); |
|
352
|
|
|
|
|
|
|
} |
|
353
|
18
|
|
|
|
|
219
|
$self->{sock} = $wr; |
|
354
|
18
|
|
|
|
|
137
|
delete $self->{trace_cb}; |
|
355
|
18
|
|
|
|
|
687
|
local $SIG{INT} = 'DEFAULT'; |
|
356
|
18
|
|
|
|
|
542
|
local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
# When session handling is enabled a child might process |
|
359
|
|
|
|
|
|
|
# more than one request with the same session_id. |
|
360
|
|
|
|
|
|
|
# |
|
361
|
18
|
|
|
|
|
668
|
while (1) { |
|
362
|
24
|
|
|
|
|
345
|
my $b = eval { netstring_read($self->{sock}) }; |
|
|
24
|
|
|
|
|
1230
|
|
|
363
|
24
|
100
|
|
|
|
228
|
unless ($b) { |
|
364
|
4
|
50
|
33
|
|
|
64
|
next if ($@ && ($@ =~ /^EINTR/)); # interrupted |
|
365
|
4
|
50
|
|
|
|
45
|
die "worker child: $@" if $@; |
|
366
|
4
|
|
|
|
|
11
|
last; # EOF |
|
367
|
|
|
|
|
|
|
} |
|
368
|
20
|
|
|
|
|
425
|
my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
|
20
|
|
|
|
|
202
|
|
|
|
20
|
|
|
|
|
864
|
|
|
369
|
20
|
50
|
|
|
|
3291
|
die "worker child: $@" if $@; |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# The client catches all possible die() calls, so that it is |
|
372
|
|
|
|
|
|
|
# guaranteed to call exit either from here or from a signal handler. |
|
373
|
|
|
|
|
|
|
# |
|
374
|
20
|
|
|
|
|
270
|
my $params; |
|
375
|
20
|
|
|
|
|
318
|
my $callback = $self->{methods}{$msg->{method}}{cb}; |
|
376
|
20
|
|
|
|
|
120
|
my @resp; |
|
377
|
20
|
|
|
|
|
165
|
eval { |
|
378
|
20
|
|
|
|
|
483
|
local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code |
|
379
|
20
|
|
|
|
|
579
|
@resp = $callback->($msg->{params}, $msg->{rpcswitch}); |
|
380
|
|
|
|
|
|
|
}; |
|
381
|
20
|
50
|
|
|
|
904196
|
if (my $err = $@) { |
|
382
|
0
|
|
|
|
|
0
|
$params = ['RES_ERROR', $msg->{id}, $err]; |
|
383
|
|
|
|
|
|
|
} else { |
|
384
|
20
|
|
|
|
|
183
|
$params = ['RES_OK', $msg->{id}, @resp]; |
|
385
|
|
|
|
|
|
|
} |
|
386
|
20
|
|
|
|
|
214
|
$b = eval { to_json($params, {%{$self->{json_utf8}}}) }; |
|
|
20
|
|
|
|
|
435
|
|
|
|
20
|
|
|
|
|
355
|
|
|
387
|
20
|
50
|
|
|
|
1348
|
return 1 if $@; # signal die from json encode |
|
388
|
20
|
|
|
|
|
353
|
my $res = netstring_write($self->{sock}, $b); |
|
389
|
20
|
50
|
|
|
|
149
|
return 2 unless $res; # signal socket error |
|
390
|
|
|
|
|
|
|
|
|
391
|
20
|
100
|
66
|
|
|
140
|
last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params}); |
|
392
|
|
|
|
|
|
|
} |
|
393
|
18
|
100
|
|
|
|
595
|
close($self->{sock}) or return 3; # signal errors like broken pipe |
|
394
|
17
|
|
|
|
|
469
|
return 0; |
|
395
|
|
|
|
|
|
|
} |
|
396
|
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
sub _worker_child_write { |
|
398
|
177
|
|
|
177
|
|
769
|
my ($self, $child, $msg) = @_; |
|
399
|
|
|
|
|
|
|
|
|
400
|
177
|
|
|
|
|
447
|
my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); |
|
|
177
|
|
|
|
|
7210
|
|
|
401
|
177
|
|
|
|
|
21890
|
my $res = netstring_write($child->{reader}, $b); # forward request to worker child |
|
402
|
177
|
50
|
|
|
|
777
|
die rpc_error('io', 'netstring_write') unless $res; |
|
403
|
177
|
|
|
|
|
689
|
return; |
|
404
|
|
|
|
|
|
|
} |
|
405
|
|
|
|
|
|
|
|
|
406
|
|
|
|
|
|
|
sub _worker_child_get { |
|
407
|
195
|
|
|
195
|
|
408
|
my ($self, $msg) = @_; |
|
408
|
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
# First try to reuse child for existing session |
|
410
|
|
|
|
|
|
|
# |
|
411
|
195
|
100
|
|
|
|
571
|
if (my $sessioncache = $self->{sessioncache}) { |
|
412
|
20
|
50
|
|
|
|
106
|
if (my $session_req = $self->is_session_req($msg->{params})) { |
|
|
|
0
|
|
|
|
|
|
|
413
|
20
|
100
|
|
|
|
173
|
if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) { |
|
414
|
6
|
|
|
|
|
18
|
return $child; |
|
415
|
|
|
|
|
|
|
} |
|
416
|
|
|
|
|
|
|
} elsif ($sessioncache->{session_persist_user}) { |
|
417
|
0
|
0
|
|
|
|
0
|
if (exists $msg->{params}{$sessioncache->{session_persist_user}}) { |
|
418
|
0
|
|
|
|
|
0
|
my $user = $msg->{params}{$sessioncache->{session_persist_user}}; |
|
419
|
0
|
0
|
|
|
|
0
|
if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) { |
|
420
|
0
|
|
|
|
|
0
|
delete $child->{session}; # reused session will be added after session_resp |
|
421
|
0
|
|
|
|
|
0
|
return $child; |
|
422
|
|
|
|
|
|
|
} |
|
423
|
|
|
|
|
|
|
} |
|
424
|
|
|
|
|
|
|
} |
|
425
|
|
|
|
|
|
|
} |
|
426
|
189
|
|
|
|
|
740
|
my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci}); |
|
427
|
171
|
|
|
|
|
4868
|
return $child; |
|
428
|
|
|
|
|
|
|
} |
|
429
|
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
sub _worker_childs_dequeue_and_run { |
|
431
|
602
|
|
|
602
|
|
1175
|
my ($self) = @_; |
|
432
|
|
|
|
|
|
|
|
|
433
|
602
|
|
|
|
|
2305
|
while (my $msg = $self->{async}->msg_dequeue()) { |
|
434
|
195
|
|
|
|
|
504
|
my $id = $msg->{id}; |
|
435
|
195
|
|
|
|
|
531
|
my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch}); |
|
436
|
|
|
|
|
|
|
|
|
437
|
195
|
|
|
|
|
320
|
my $child = eval { $self->_worker_child_get($msg) }; |
|
|
195
|
|
|
|
|
601
|
|
|
438
|
177
|
50
|
|
|
|
840
|
if ($@) { |
|
439
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); |
|
440
|
|
|
|
|
|
|
} else { |
|
441
|
177
|
|
|
|
|
405
|
eval { $self->_worker_child_write($child, $msg) }; |
|
|
177
|
|
|
|
|
2372
|
|
|
442
|
177
|
50
|
|
|
|
759
|
if ($@) { |
|
443
|
0
|
|
|
|
|
0
|
$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); |
|
444
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
|
445
|
|
|
|
|
|
|
} else { |
|
446
|
177
|
|
|
|
|
10004
|
$self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp}); |
|
447
|
177
|
|
|
|
|
3304
|
$self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp}); |
|
448
|
|
|
|
|
|
|
} |
|
449
|
|
|
|
|
|
|
} |
|
450
|
|
|
|
|
|
|
} |
|
451
|
584
|
|
|
|
|
1101
|
return; |
|
452
|
|
|
|
|
|
|
} |
|
453
|
|
|
|
|
|
|
|
|
454
|
|
|
|
|
|
|
sub _worker_child_read_and_finish { |
|
455
|
166
|
|
|
166
|
|
681
|
my ($self, $child) = @_; |
|
456
|
|
|
|
|
|
|
|
|
457
|
166
|
|
|
|
|
532
|
my $res; |
|
458
|
166
|
|
|
|
|
320
|
my $b = eval { netstring_read($child->{reader}) }; |
|
|
166
|
|
|
|
|
1357
|
|
|
459
|
166
|
100
|
|
|
|
586
|
unless ($b) { |
|
460
|
14
|
50
|
|
|
|
280
|
my $err = $@ ? $@ : 'EOF'; |
|
461
|
14
|
|
|
|
|
448
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); |
|
462
|
14
|
|
|
|
|
966
|
$self->{async}->child_finish($child, 'error'); |
|
463
|
|
|
|
|
|
|
} else { |
|
464
|
152
|
|
|
|
|
275
|
my $params = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
|
152
|
|
|
|
|
281
|
|
|
|
152
|
|
|
|
|
1193
|
|
|
465
|
152
|
50
|
|
|
|
7864
|
if ($@) { |
|
466
|
0
|
|
|
|
|
0
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}}); |
|
467
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
|
468
|
|
|
|
|
|
|
} else { |
|
469
|
152
|
|
|
|
|
1869
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}}); |
|
470
|
152
|
50
|
|
|
|
790
|
unless ($res) { |
|
471
|
0
|
|
|
|
|
0
|
my $err = "result msg limit exceeded: " . length($b); |
|
472
|
0
|
|
|
|
|
0
|
$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); |
|
473
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'error'); |
|
474
|
0
|
|
|
|
|
0
|
return $res; |
|
475
|
|
|
|
|
|
|
} |
|
476
|
|
|
|
|
|
|
|
|
477
|
152
|
100
|
|
|
|
556
|
if (my $sessioncache = $self->{sessioncache}) { |
|
478
|
14
|
50
|
|
|
|
91
|
if (my $set_session = $self->is_session_resp($params)) { |
|
479
|
14
|
100
|
66
|
|
|
126
|
if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) { |
|
480
|
8
|
|
|
|
|
77
|
$child->{session} = $sessioncache->session_new($set_session); |
|
481
|
8
|
|
|
|
|
89
|
$sessioncache->expire_insert($child->{session}); |
|
482
|
|
|
|
|
|
|
} |
|
483
|
|
|
|
|
|
|
} |
|
484
|
|
|
|
|
|
|
|
|
485
|
14
|
100
|
|
|
|
56
|
if ($sessioncache->session_put($child)) { |
|
|
|
50
|
|
|
|
|
|
|
486
|
9
|
|
|
|
|
27
|
my $cnt = scalar keys %{$sessioncache->{active}}; |
|
|
9
|
|
|
|
|
30
|
|
|
487
|
9
|
50
|
|
|
|
24
|
if ($cnt > $sessioncache->{max_session}) { |
|
488
|
0
|
0
|
|
|
|
0
|
if ($child = $sessioncache->lru_dequeue()) { |
|
489
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'lru'); |
|
490
|
|
|
|
|
|
|
} |
|
491
|
|
|
|
|
|
|
} |
|
492
|
9
|
|
|
|
|
21
|
$child = undef; |
|
493
|
|
|
|
|
|
|
} elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) { |
|
494
|
|
|
|
|
|
|
# update idle user session with older session_id |
|
495
|
|
|
|
|
|
|
# |
|
496
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($idle_child, 'update'); |
|
497
|
|
|
|
|
|
|
|
|
498
|
0
|
0
|
|
|
|
0
|
if ($sessioncache->session_put($child)) { |
|
499
|
0
|
|
|
|
|
0
|
$child = undef; |
|
500
|
|
|
|
|
|
|
} |
|
501
|
|
|
|
|
|
|
} |
|
502
|
|
|
|
|
|
|
} |
|
503
|
152
|
100
|
|
|
|
511
|
if ($child) { |
|
504
|
143
|
|
|
|
|
558
|
$self->{async}->child_finish($child, 'done'); |
|
505
|
|
|
|
|
|
|
} |
|
506
|
|
|
|
|
|
|
} |
|
507
|
|
|
|
|
|
|
} |
|
508
|
166
|
|
|
|
|
1011
|
return $res; |
|
509
|
|
|
|
|
|
|
} |
|
510
|
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
sub _worker_sessions_expire { |
|
512
|
602
|
|
|
602
|
|
1216
|
my ($self) = @_; |
|
513
|
602
|
100
|
|
|
|
1588
|
return unless $self->{sessioncache}; |
|
514
|
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
# If a job for the expired session is active, the session |
|
516
|
|
|
|
|
|
|
# will be dropped when session_put() is called after the |
|
517
|
|
|
|
|
|
|
# job completed. |
|
518
|
|
|
|
|
|
|
# |
|
519
|
60
|
|
|
|
|
241
|
while (my $child = $self->{sessioncache}->expired_dequeue()) { |
|
520
|
0
|
|
|
|
|
0
|
$self->{async}->child_finish($child, 'expired'); |
|
521
|
|
|
|
|
|
|
} |
|
522
|
60
|
|
|
|
|
113
|
return; |
|
523
|
|
|
|
|
|
|
} |
|
524
|
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
sub rpc_timeout { |
|
526
|
1091
|
|
|
1091
|
0
|
2353
|
my ($self, $call_timeout) = @_; |
|
527
|
|
|
|
|
|
|
|
|
528
|
1091
|
100
|
66
|
|
|
3293
|
if ($call_timeout && (keys %{$self->{reqs}} > 0)) { |
|
|
19
|
|
|
|
|
285
|
|
|
529
|
19
|
|
|
|
|
190
|
return $call_timeout; # for individual client call |
|
530
|
|
|
|
|
|
|
} |
|
531
|
1072
|
|
|
|
|
2207
|
return $self->{timeout}; |
|
532
|
|
|
|
|
|
|
} |
|
533
|
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
sub rpc_stopped { |
|
535
|
1109
|
|
|
1109
|
0
|
3284
|
my ($self) = @_; |
|
536
|
|
|
|
|
|
|
|
|
537
|
1109
|
50
|
|
|
|
4598
|
if ($self->{stop}) { |
|
538
|
0
|
0
|
0
|
|
|
0
|
if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) { |
|
|
0
|
0
|
0
|
|
|
0
|
|
|
|
|
|
0
|
|
|
|
|
|
539
|
0
|
|
|
|
|
0
|
return; # wait for withdraw to complete |
|
540
|
0
|
|
|
|
|
0
|
} elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) { |
|
541
|
0
|
|
|
|
|
0
|
return; # wait for active jobs to complete |
|
542
|
|
|
|
|
|
|
} |
|
543
|
0
|
|
|
|
|
0
|
return 1; |
|
544
|
|
|
|
|
|
|
} |
|
545
|
|
|
|
|
|
|
} |
|
546
|
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
sub rpc_handler { |
|
548
|
305
|
|
|
305
|
0
|
1188
|
my ($self, $call_timeout, $handler, @handler_params) = @_; |
|
549
|
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
# returns response or throws rpc_error. |
|
551
|
|
|
|
|
|
|
# returns undef when remote side cleanly closed connection with EOF. |
|
552
|
|
|
|
|
|
|
# |
|
553
|
305
|
|
|
|
|
947
|
while (!$self->rpc_stopped()) { |
|
554
|
1109
|
|
|
|
|
2457
|
my @pipes = (); |
|
555
|
1109
|
100
|
|
|
|
2742
|
if ($self->{async}) { |
|
556
|
602
|
|
|
|
|
2139
|
$self->_worker_sessions_expire(); |
|
557
|
602
|
50
|
|
|
|
3799
|
$self->_worker_childs_dequeue_and_run() unless $self->{stop}; |
|
558
|
584
|
|
|
|
|
5159
|
$self->{async}->childs_reap(nonblock => 1); |
|
559
|
584
|
|
|
|
|
4484
|
$self->rpc_worker_flowcontrol(@handler_params); |
|
560
|
584
|
|
|
|
|
957
|
@pipes = map { $_->{reader} } values %{$self->{async}{jobs}}; |
|
|
428
|
|
|
|
|
1525
|
|
|
|
584
|
|
|
|
|
3674
|
|
|
561
|
|
|
|
|
|
|
} |
|
562
|
1091
|
|
|
|
|
5442
|
my $timeout = $self->rpc_timeout($call_timeout); |
|
563
|
|
|
|
|
|
|
|
|
564
|
1091
|
100
|
100
|
|
|
5990
|
if ($timeout || @pipes) { |
|
565
|
440
|
|
|
|
|
9262
|
my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout); |
|
566
|
440
|
50
|
66
|
|
|
13615112
|
next if (@ready == 0) && $!{EINTR}; # $! is not reset on success |
|
567
|
440
|
100
|
|
|
|
2144
|
die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0); |
|
568
|
|
|
|
|
|
|
|
|
569
|
421
|
|
|
|
|
2248
|
foreach my $fh (@ready) { |
|
570
|
461
|
50
|
66
|
|
|
4488
|
if (($fh != $self->{sock}) && $self->{async}) { |
|
571
|
166
|
50
|
|
|
|
1051
|
unless (exists $self->{async}{jobs}{$fh->fileno}) { |
|
572
|
0
|
|
|
|
|
0
|
die rpc_error('io', "child pipe not found: ". $fh->fileno); |
|
573
|
|
|
|
|
|
|
} |
|
574
|
166
|
|
|
|
|
1690
|
my $child = $self->{async}{jobs}{$fh->fileno}; |
|
575
|
166
|
|
|
|
|
1373
|
$self->{async}->job_rem($child); |
|
576
|
166
|
|
|
|
|
2912
|
my $res = $self->_worker_child_read_and_finish($child); |
|
577
|
|
|
|
|
|
|
} |
|
578
|
|
|
|
|
|
|
} |
|
579
|
421
|
100
|
|
|
|
1022
|
next unless grep { $_ == $self->{sock} } @ready; |
|
|
461
|
|
|
|
|
3819
|
|
|
580
|
|
|
|
|
|
|
} |
|
581
|
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
# always block on full messages from rpcswitch |
|
583
|
946
|
|
|
|
|
1789
|
my $b = eval { netstring_read($self->{sock}) }; |
|
|
946
|
|
|
|
|
3591
|
|
|
584
|
946
|
100
|
|
|
|
3475
|
unless ($b) { |
|
585
|
97
|
50
|
33
|
|
|
420
|
next if ($@ && ($@ =~ /^EINTR/)); # check if stopped |
|
586
|
97
|
50
|
|
|
|
242
|
die rpc_error('io', $@) if $@; |
|
587
|
97
|
|
|
|
|
297
|
return; # EOF |
|
588
|
|
|
|
|
|
|
} |
|
589
|
849
|
|
|
|
|
1359
|
my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; |
|
|
849
|
|
|
|
|
1419
|
|
|
|
849
|
|
|
|
|
6132
|
|
|
590
|
849
|
100
|
|
|
|
33228
|
die rpc_error('jsonrpc', $@) if $@; |
|
591
|
830
|
100
|
|
|
|
2471
|
$self->{trace_cb}->('RCV', $msg) if $self->{trace_cb}; |
|
592
|
830
|
|
|
|
|
8028
|
my $res = eval { $handler->($self, $msg, @handler_params) }; |
|
|
830
|
|
|
|
|
3564
|
|
|
593
|
830
|
100
|
|
|
|
2503
|
if (my $err = $@) { |
|
594
|
38
|
50
|
|
|
|
1501
|
die $err if ref($err); # forward error |
|
595
|
0
|
|
|
|
|
0
|
die rpc_error('io', $err); |
|
596
|
|
|
|
|
|
|
} |
|
597
|
792
|
100
|
|
|
|
3823
|
if ($res) { |
|
598
|
114
|
|
|
|
|
684
|
return $res; |
|
599
|
|
|
|
|
|
|
} |
|
600
|
|
|
|
|
|
|
} |
|
601
|
0
|
|
|
|
|
0
|
return; # STOP is checked by caller |
|
602
|
|
|
|
|
|
|
} |
|
603
|
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
sub work { |
|
605
|
134
|
|
|
134
|
1
|
3339
|
my ($self, $workername, $methods, $opts) = @_; |
|
606
|
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
# a write on a shutdown socket should never happen |
|
608
|
|
|
|
|
|
|
# |
|
609
|
134
|
|
|
0
|
|
4014
|
local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" }; |
|
|
0
|
|
|
|
|
0
|
|
|
610
|
|
|
|
|
|
|
|
|
611
|
134
|
|
|
|
|
913
|
foreach my $method (keys %$methods) { |
|
612
|
130
|
|
|
|
|
558
|
$self->{methods}{$method}{cb} = $methods->{$method}{cb}; |
|
613
|
130
|
100
|
|
|
|
612
|
$self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {}; |
|
614
|
130
|
50
|
|
|
|
613
|
$self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter}; |
|
615
|
|
|
|
|
|
|
} |
|
616
|
134
|
100
|
|
|
|
578
|
$opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb}; |
|
617
|
134
|
100
|
|
|
|
2238
|
$self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async}; |
|
618
|
134
|
50
|
|
|
|
644
|
$self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol}; |
|
619
|
134
|
100
|
|
|
|
708
|
$self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session}; |
|
620
|
134
|
|
|
|
|
792
|
$self->rpc_handler(0, \&worker, $workername); |
|
621
|
|
|
|
|
|
|
|
|
622
|
97
|
50
|
|
|
|
724
|
if ($self->{stop}) { |
|
623
|
0
|
|
|
|
|
0
|
$self->rpc_worker_withdraw(); |
|
624
|
0
|
|
|
|
|
0
|
$self->{stop} = 'withdraw'; |
|
625
|
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
# wait some time for withdraw & active jobs to complete |
|
627
|
|
|
|
|
|
|
# |
|
628
|
0
|
|
|
0
|
|
0
|
local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
629
|
0
|
|
|
|
|
0
|
alarm($self->{gracetime}); |
|
630
|
0
|
|
|
|
|
0
|
$self->rpc_handler(0, \&worker, $workername); |
|
631
|
0
|
|
|
|
|
0
|
alarm(0); |
|
632
|
0
|
|
|
|
|
0
|
die rpc_error('io', 'STOP'); |
|
633
|
|
|
|
|
|
|
} |
|
634
|
97
|
100
|
|
|
|
382
|
if (my $async = $self->{async}) { |
|
635
|
|
|
|
|
|
|
# drop stored sessions |
|
636
|
|
|
|
|
|
|
# |
|
637
|
48
|
100
|
|
|
|
507
|
if (my $sessioncache = $self->{sessioncache}) { |
|
638
|
4
|
|
|
|
|
12
|
foreach my $session_id (keys %{$sessioncache->{active}}) { |
|
|
4
|
|
|
|
|
47
|
|
|
639
|
3
|
50
|
|
|
|
33
|
if (my $child = $sessioncache->session_get($session_id)) { |
|
640
|
3
|
|
|
|
|
27
|
$async->child_finish($child, 'idle'); |
|
641
|
|
|
|
|
|
|
} |
|
642
|
|
|
|
|
|
|
} |
|
643
|
|
|
|
|
|
|
} |
|
644
|
|
|
|
|
|
|
# reap remaining childs |
|
645
|
|
|
|
|
|
|
# |
|
646
|
48
|
50
|
|
|
|
172
|
if (keys %{$async->{finished}}) { |
|
|
48
|
|
|
|
|
291
|
|
|
647
|
48
|
|
|
0
|
|
2840
|
local $SIG{ALRM} = sub { warn "worker child wait timeout\n" }; |
|
|
0
|
|
|
|
|
0
|
|
|
648
|
48
|
|
|
|
|
700
|
alarm(1); # wait at most for one second |
|
649
|
48
|
50
|
|
|
|
344
|
unless ($async->childs_reap()) { # blocking |
|
650
|
0
|
|
|
|
|
0
|
$async->childs_reap(nonblock => 1); # continue nonblocking after timeout |
|
651
|
|
|
|
|
|
|
} |
|
652
|
48
|
|
|
|
|
1987
|
alarm(0); |
|
653
|
|
|
|
|
|
|
} |
|
654
|
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
# EOF is only an error here when there are outstanding requests |
|
656
|
|
|
|
|
|
|
# |
|
657
|
48
|
|
|
0
|
|
1002
|
my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 }); |
|
|
0
|
|
|
|
|
0
|
|
|
658
|
48
|
|
|
|
|
236
|
my @childs = keys %{$async->{finished}}; |
|
|
48
|
|
|
|
|
198
|
|
|
659
|
|
|
|
|
|
|
|
|
660
|
48
|
|
|
|
|
448
|
$async->childs_kill(); # don't wait here |
|
661
|
|
|
|
|
|
|
|
|
662
|
48
|
|
|
|
|
337
|
$async->{jobqueue} = []; |
|
663
|
48
|
|
|
|
|
241
|
$async->{finished} = {}; |
|
664
|
|
|
|
|
|
|
|
|
665
|
48
|
50
|
|
|
|
199
|
die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs); |
|
666
|
48
|
50
|
|
|
|
245
|
die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs); |
|
667
|
|
|
|
|
|
|
} |
|
668
|
97
|
|
|
|
|
3217
|
return; |
|
669
|
|
|
|
|
|
|
} |
|
670
|
|
|
|
|
|
|
|
|
671
|
|
|
|
|
|
|
sub call { |
|
672
|
171
|
|
|
171
|
1
|
181868
|
my ($self, $method, $params, $opts) = @_; |
|
673
|
171
|
|
|
|
|
456
|
my $reqauth = $opts->{reqauth}; |
|
674
|
171
|
|
|
|
|
323
|
my $call_timeout = $opts->{timeout}; |
|
675
|
|
|
|
|
|
|
|
|
676
|
171
|
100
|
|
|
|
608
|
if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests |
|
677
|
133
|
|
|
|
|
779
|
$self->rpc_send_call($method, $params, $reqauth); |
|
678
|
|
|
|
|
|
|
} |
|
679
|
|
|
|
|
|
|
# EOF is an error here (response missing) |
|
680
|
|
|
|
|
|
|
# |
|
681
|
171
|
50
|
|
|
|
1254
|
my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof'); |
|
682
|
114
|
100
|
|
|
|
779
|
return wantarray() ? @$res : $res->[0]; |
|
683
|
|
|
|
|
|
|
} |
|
684
|
|
|
|
|
|
|
|
|
685
|
|
|
|
|
|
|
# stop() exits an active $client->work() worker handler. |
|
686
|
|
|
|
|
|
|
# |
|
687
|
|
|
|
|
|
|
# - work() dies with RPC::Switch::Client::Tiny::Error which |
|
688
|
|
|
|
|
|
|
# might be {type => 'io', message => 'STOP'}, or any other |
|
689
|
|
|
|
|
|
|
# error if a non-restartable system call was interrupted. |
|
690
|
|
|
|
|
|
|
# |
|
691
|
|
|
|
|
|
|
# - stop() makes no sense for call() (it has rpc_timeout) |
|
692
|
|
|
|
|
|
|
# - the only way to call stop is from a signal handler. |
|
693
|
|
|
|
|
|
|
# - if a signal handler is called, non-restartavle perl |
|
694
|
|
|
|
|
|
|
# system call are interrupted and return $! == EINTR. |
|
695
|
|
|
|
|
|
|
# |
|
696
|
|
|
|
|
|
|
# -> this can break an active worker handler and result in |
|
697
|
|
|
|
|
|
|
# a RES_ERROR-message to the caller if a non-restartable |
|
698
|
|
|
|
|
|
|
# perl syscall is interrupted. |
|
699
|
|
|
|
|
|
|
# -> for the async worker mode this should mostly work. |
|
700
|
|
|
|
|
|
|
# (sysreadfull, rpc print & IO::Select are restartable). |
|
701
|
|
|
|
|
|
|
# -> stop will just wait for a gracetime of 2 seconds |
|
702
|
|
|
|
|
|
|
# for active jobs to complete. The remaining jobs |
|
703
|
|
|
|
|
|
|
# are terminated. |
|
704
|
|
|
|
|
|
|
# |
|
705
|
|
|
|
|
|
|
sub stop { |
|
706
|
0
|
|
|
0
|
0
|
0
|
my ($self, $opts) = @_; |
|
707
|
0
|
0
|
|
|
|
0
|
$self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2; |
|
708
|
0
|
|
|
|
|
0
|
$self->{stop} = 'pending'; |
|
709
|
0
|
|
|
|
|
0
|
return; |
|
710
|
|
|
|
|
|
|
} |
|
711
|
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
# The perl object destroy order is undefined, so $self->{sock} |
|
713
|
|
|
|
|
|
|
# might already be destroyed and it makes no sense to try to |
|
714
|
|
|
|
|
|
|
# send RES_ERROR messages for remaining childs. |
|
715
|
|
|
|
|
|
|
# see: https://perldoc.perl.org/perlobj#Global-Destruction |
|
716
|
|
|
|
|
|
|
# |
|
717
|
|
|
|
|
|
|
# So just terminate remaining childs, and let init-process |
|
718
|
|
|
|
|
|
|
# collect them instead of calling waitpid() here. |
|
719
|
|
|
|
|
|
|
# |
|
720
|
|
|
|
|
|
|
# TODO: perl will call DESTROY only when the process exits |
|
721
|
|
|
|
|
|
|
# cleanly or calls exit(). If the process is killed, perl |
|
722
|
|
|
|
|
|
|
# calls DESTROY only if a handler for the matching signal |
|
723
|
|
|
|
|
|
|
# is installed, like: $SIG{'TERM'} = sub { exit; }; |
|
724
|
|
|
|
|
|
|
# |
|
725
|
|
|
|
|
|
|
# -> so does it make sense to support DESTROY at all, |
|
726
|
|
|
|
|
|
|
# if there are situations when it is not called? |
|
727
|
|
|
|
|
|
|
# |
|
728
|
|
|
|
|
|
|
sub DESTROY { |
|
729
|
196
|
|
|
196
|
|
3569
|
my ($self) = @_; |
|
730
|
196
|
100
|
|
|
|
1966
|
$self->{async}->childs_kill() if $self->{async}; # don't wait here |
|
731
|
|
|
|
|
|
|
} |
|
732
|
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
1; |
|
734
|
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
__END__ |