|  line  | 
 stmt  | 
 bran  | 
 cond  | 
 sub  | 
 pod  | 
 time  | 
 code  | 
| 
1
 | 
  
 
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Lightweight client for the RPC-Switch json-rpc request multiplexer  | 
| 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
3
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # see: RPC::Switch: https://github.com/a6502/rpc-switch   | 
| 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client  | 
| 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 package RPC::Switch::Client::Tiny;  | 
| 
7
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
8
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
2137023
 | 
 use strict;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
230
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
542
 | 
    | 
| 
9
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
100
 | 
 use warnings;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
40
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
595
 | 
    | 
| 
10
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
99
 | 
 use Carp 'croak';  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
60
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1011
 | 
    | 
| 
11
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
800
 | 
 use JSON;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
8369
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
157
 | 
    | 
| 
12
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
11132
 | 
 use IO::Select;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
32675
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
907
 | 
    | 
| 
13
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
15747
 | 
 use IO::Socket::SSL;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1239145
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
201
 | 
    | 
| 
14
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
14035
 | 
 use Time::HiRes qw(time);  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
25596
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
105
 | 
    | 
| 
15
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
11697
 | 
 use RPC::Switch::Client::Tiny::Error;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
41
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
649
 | 
    | 
| 
16
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
7816
 | 
 use RPC::Switch::Client::Tiny::Netstring;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
59
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1150
 | 
    | 
| 
17
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
8309
 | 
 use RPC::Switch::Client::Tiny::Async;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
40
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
647
 | 
    | 
| 
18
 | 
20
 | 
 
 | 
 
 | 
  
20
  
 | 
 
 | 
8060
 | 
 use RPC::Switch::Client::Tiny::SessionCache;  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
59
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
109089
 | 
    | 
| 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 our $VERSION = '1.67';  | 
| 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
22
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub new {  | 
| 
23
 | 
234
 | 
 
 | 
 
 | 
  
234
  
 | 
  
1
  
 | 
415363
 | 
 	my ($class, %args) = @_;  | 
| 
24
 | 
234
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
2090
 | 
 	my $s = $args{sock} or croak __PACKAGE__ . " expects sock";  | 
| 
25
 | 
234
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1125
 | 
 	unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call  | 
| 
26
 | 
234
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1746
 | 
 		defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!";  | 
| 
27
 | 
234
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2794
 | 
 		unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; }  | 
| 
 
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4142
 | 
    | 
| 
28
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
29
 | 
215
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1025
 | 
 	unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; }  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
30
 | 
215
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3584
 | 
 	my $self = bless {  | 
| 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		%args,  | 
| 
32
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		id        => 1,  # next request id  | 
| 
33
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		state     => '', # last rpcswitch.type  | 
| 
34
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		reqs      => {}, # outstanding requests  | 
| 
35
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		channels  => {}, # open rpcswitch channels  | 
| 
36
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		methods   => {}, # defined worker methods  | 
| 
37
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		announced => {}, # announced worker methods  | 
| 
38
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		msglimit  => 999999, # max netstring size  | 
| 
39
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}, $class;  | 
| 
40
 | 
215
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1284
 | 
 	if (ref($self->{sock}) eq 'IO::Socket::SSL') {  | 
| 
41
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->{auth_method} = 'clientcert' unless exists $self->{auth_method};  | 
| 
42
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert  | 
| 
43
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
44
 | 
215
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
704
 | 
 		$self->{auth_method} = 'password' unless exists $self->{auth_method};  | 
| 
45
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
46
 | 
215
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
986
 | 
 	$self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1};  | 
| 
47
 | 
215
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1130
 | 
 	return $self;  | 
| 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
50
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_error {  | 
| 
51
 | 
76
 | 
 
 | 
 
 | 
  
76
  
 | 
  
0
  
 | 
1425
 | 
 	return RPC::Switch::Client::Tiny::Error->new(@_);  | 
| 
52
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
53
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
54
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_send {  | 
| 
55
 | 
864
 | 
 
 | 
 
 | 
  
864
  
 | 
  
0
  
 | 
3257
 | 
 	my ($self, $msg) = @_;  | 
| 
56
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3086
 | 
 	my $s = $self->{sock};  | 
| 
57
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3601
 | 
 	my $len = length($s);  | 
| 
58
 | 
864
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
6523
 | 
 	if ($self->{msglimit} && ($len > $self->{msglimit})) {  | 
| 
59
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		warn "rpc_send msglimit exceeded: $len > $self->{msglimit}";  | 
| 
60
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		return;  | 
| 
61
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
62
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4277
 | 
 	$msg->{jsonrpc} = '2.0';  | 
| 
63
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1921
 | 
 	my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});  | 
| 
 
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4847
 | 
    | 
| 
64
 | 
864
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
29755
 | 
 	$self->{trace_cb}->('SND', $msg) if $self->{trace_cb};  | 
| 
65
 | 
864
 | 
 
 | 
 
 | 
 
 | 
 
 | 
11150
 | 
 	return netstring_write($s, $str);  | 
| 
66
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
67
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
68
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_send_req {  | 
| 
69
 | 
472
 | 
 
 | 
 
 | 
  
472
  
 | 
  
0
  
 | 
1950
 | 
 	my ($self, $method, $msg) = @_;  | 
| 
70
 | 
472
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2008
 | 
 	my $id = "$self->{id}"; $self->{id}++;  | 
| 
 
 | 
472
 | 
 
 | 
 
 | 
 
 | 
 
 | 
983
 | 
    | 
| 
71
 | 
472
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1123
 | 
 	$msg->{id} = $id;  | 
| 
72
 | 
472
 | 
 
 | 
 
 | 
 
 | 
 
 | 
935
 | 
 	$msg->{method} = $method;  | 
| 
73
 | 
472
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2178
 | 
 	$self->{reqs}{$id} = $method;  | 
| 
74
 | 
472
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3732
 | 
 	$self->{state} = $method if $method =~ /^rpcswitch\./;  | 
| 
75
 | 
472
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1414
 | 
 	$self->rpc_send($msg) or return;  | 
| 
76
 | 
467
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1529
 | 
 	return $id;  | 
| 
77
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
78
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
79
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_send_call {  | 
| 
80
 | 
176
 | 
 
 | 
 
 | 
  
176
  
 | 
  
0
  
 | 
1392
 | 
 	my ($self, $method, $params, $reqauth) = @_;  | 
| 
81
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
82
 | 
176
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
462
 | 
 	if (defined $reqauth) { # request authentication  | 
| 
83
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# Without the vcookie the rpcswitch does not validate  | 
| 
84
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# the reqauth parameter.  | 
| 
85
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# The vcookie 'eatme' value is hardcoded in the rpc-switch  | 
| 
86
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# code and is called 'channel information version'.  | 
| 
87
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
88
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}});  | 
| 
89
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
90
 | 
176
 | 
 
 | 
 
 | 
 
 | 
 
 | 
657
 | 
 		return $self->rpc_send_req($method, {params => $params});  | 
| 
91
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
92
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
93
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
94
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_decode {  | 
| 
95
 | 
849
 | 
 
 | 
 
 | 
  
849
  
 | 
  
0
  
 | 
2511
 | 
 	my ($self, $msg) = @_;  | 
| 
96
 | 
849
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4466
 | 
 	my ($req, $rsp) = ('', '');  | 
| 
97
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
98
 | 
849
 | 
  
 50
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
5979
 | 
 	unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) {  | 
| 
 
 | 
 
 | 
 
 | 
  
 66
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
99
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1}));  | 
| 
100
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
101
 | 
849
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3325
 | 
 	if (exists $msg->{method}) {  | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
102
 | 
420
 | 
 
 | 
 
 | 
 
 | 
 
 | 
968
 | 
 		$req = $msg->{method};  | 
| 
103
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif (!defined $msg->{id}) {  | 
| 
104
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1}));  | 
| 
105
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif (!exists $self->{reqs}{$msg->{id}}) {  | 
| 
106
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1}));  | 
| 
107
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
108
 | 
429
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1396
 | 
 		$rsp = delete $self->{reqs}{$msg->{id}};  | 
| 
109
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
110
 | 
429
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1137
 | 
 		if (exists $msg->{error}) {  | 
| 
111
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}});  | 
| 
112
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
113
 | 
429
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
952
 | 
 		if (!exists $msg->{result}) {  | 
| 
114
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing");  | 
| 
115
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
116
 | 
429
 | 
  
 50
  
 | 
  
100
  
 | 
 
 | 
 
 | 
2013
 | 
 		if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) {  | 
| 
 
 | 
 
 | 
 
 | 
  
 66
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
117
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}");  | 
| 
118
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
119
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
120
 | 
849
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3619
 | 
 	return ($req, $rsp);  | 
| 
121
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
122
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
123
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_worker_announce {  | 
| 
124
 | 
159
 | 
 
 | 
 
 | 
  
159
  
 | 
  
0
  
 | 
536
 | 
 	my ($self, $workername) = @_;  | 
| 
125
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
126
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# ignore repeated announce request or unfinished withdraw  | 
| 
127
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
128
 | 
159
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
254
 | 
 	return if (keys %{$self->{announced}});  | 
| 
 
 | 
159
 | 
 
 | 
 
 | 
 
 | 
 
 | 
987
 | 
    | 
| 
129
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
130
 | 
130
 | 
 
 | 
 
 | 
 
 | 
 
 | 
277
 | 
 	foreach my $method (keys %{$self->{methods}}) {  | 
| 
 
 | 
130
 | 
 
 | 
 
 | 
 
 | 
 
 | 
815
 | 
    | 
| 
131
 | 
145
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
471
 | 
 		next if exists $self->{methods}{$method}{id}; # active announce/withdraw request  | 
| 
132
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
133
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
670
 | 
 		my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}};  | 
| 
134
 | 
135
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
586
 | 
 		$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};  | 
| 
135
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1084
 | 
 		my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params});  | 
| 
136
 | 
135
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
676
 | 
 		die rpc_error('io', 'netstring_write') unless defined $id;  | 
| 
137
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
610
 | 
 		$self->{methods}{$method}{id} = $id;  | 
| 
138
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
139
 | 
130
 | 
 
 | 
 
 | 
 
 | 
 
 | 
359
 | 
 	return;  | 
| 
140
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
141
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
142
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_worker_withdraw {  | 
| 
143
 | 
31
 | 
 
 | 
 
 | 
  
31
  
 | 
  
0
  
 | 
172
 | 
 	my ($self) = @_;  | 
| 
144
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
145
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# callers will get code -32006 'opposite end of channel gone'  | 
| 
146
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# errors when the announcement is withdrawn.  | 
| 
147
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
148
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
55
 | 
 	foreach my $method (keys %{$self->{announced}}) {  | 
| 
 
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
206
 | 
    | 
| 
149
 | 
16
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
120
 | 
 		next if exists $self->{methods}{$method}{id}; # active announce/withdraw request  | 
| 
150
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
151
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
64
 | 
 		my $params = {method => $method};  | 
| 
152
 | 
8
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
120
 | 
 		$params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};  | 
| 
153
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
176
 | 
 		my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params});  | 
| 
154
 | 
8
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
208
 | 
 		die rpc_error('io', 'netstring_write') unless defined $id;  | 
| 
155
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
80
 | 
 		$self->{methods}{$method}{id} = $id;  | 
| 
156
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
157
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
109
 | 
 	return;  | 
| 
158
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
159
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
160
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_worker_flowcontrol {  | 
| 
161
 | 
584
 | 
 
 | 
 
 | 
  
584
  
 | 
  
0
  
 | 
2808
 | 
 	my ($self, $workername) = @_;  | 
| 
162
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
163
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# need to be in connected auth state  | 
| 
164
 | 
584
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
5249
 | 
 	return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello'));  | 
| 
165
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
166
 | 
452
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1221
 | 
 	if ($self->{flowcontrol}) {  | 
| 
167
 | 
102
 | 
 
 | 
 
 | 
 
 | 
 
 | 
275
 | 
 		my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}});  | 
| 
 
 | 
102
 | 
 
 | 
 
 | 
 
 | 
 
 | 
278
 | 
    | 
| 
 
 | 
102
 | 
 
 | 
 
 | 
 
 | 
 
 | 
669
 | 
    | 
| 
168
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async};  | 
| 
169
 | 
102
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
653
 | 
 		if ($cnt >= $self->{async}{max_async} * 2) {  | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
170
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
279
 | 
 			$self->rpc_worker_withdraw();  | 
| 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} elsif ($cnt < $self->{async}{max_async}) {  | 
| 
172
 | 
44
 | 
 
 | 
 
 | 
 
 | 
 
 | 
320
 | 
 			$self->rpc_worker_announce($workername);  | 
| 
173
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
174
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
175
 | 
452
 | 
 
 | 
 
 | 
 
 | 
 
 | 
830
 | 
 	return;  | 
| 
176
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
178
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub valid_worker_err {  | 
| 
179
 | 
19
 | 
 
 | 
 
 | 
  
19
  
 | 
  
0
  
 | 
95
 | 
 	my ($err) = @_;  | 
| 
180
 | 
19
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
893
 | 
 	$err = {text => $err} unless ref($err); # convert plain errors  | 
| 
181
 | 
19
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
209
 | 
 	$err->{class} = 'hard' unless exists $err->{class};  | 
| 
182
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
399
 | 
 	return $err;  | 
| 
183
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
184
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
185
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpcswitch_resp {  | 
| 
186
 | 
244
 | 
 
 | 
 
 | 
  
244
  
 | 
  
0
  
 | 
906
 | 
 	my ($rpcswitch) = @_;  | 
| 
187
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
188
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# Just the vcookie & vci-channel parameters are required by  | 
| 
189
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# the rpcswitch. The worker_id field is optional, and might  | 
| 
190
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# be set to the worker_id returned by the announce response.  | 
| 
191
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
192
 | 
244
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1106
 | 
 	$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}};  | 
| 
193
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}};  | 
| 
194
 | 
244
 | 
 
 | 
 
 | 
 
 | 
 
 | 
541
 | 
 	return $rpcswitch;  | 
| 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
196
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
197
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub client {  | 
| 
198
 | 
228
 | 
 
 | 
 
 | 
  
228
  
 | 
  
1
  
 | 
2280
 | 
 	my ($self, $msg, $method, $params, $reqauth) = @_;  | 
| 
199
 | 
228
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1235
 | 
 	my ($req, $rsp) = $self->rpc_decode($msg);  | 
| 
200
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
201
 | 
228
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2907
 | 
 	if ($req eq 'rpcswitch.greetings') {  | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
202
 | 
38
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
304
 | 
 		my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert  | 
| 
203
 | 
38
 | 
 
 | 
 
 | 
 
 | 
 
 | 
209
 | 
 		my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};  | 
| 
204
 | 
38
 | 
 
 | 
 
 | 
 
 | 
 
 | 
285
 | 
 		$self->rpc_send_req('rpcswitch.hello', {params => $helloparams});  | 
| 
205
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq 'rpcswitch.hello') {  | 
| 
206
 | 
19
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1463
 | 
 		if (!$msg->{result}[0]) {  | 
| 
207
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");  | 
| 
208
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
209
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
931
 | 
 		$self->rpc_send_call($method, $params, $reqauth);  | 
| 
210
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq 'rpcswitch.ping') {  | 
| 
211
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
95
 | 
 		return [$msg->{result}]; # ping complete  | 
| 
212
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq $method) {  | 
| 
213
 | 
133
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
532
 | 
 		if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel  | 
| 
214
 | 
133
 | 
 
 | 
 
 | 
 
 | 
 
 | 
646
 | 
 			$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone  | 
| 
215
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
216
 | 
133
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1349
 | 
 		if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump)  | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
217
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
76
 | 
 			$self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id  | 
| 
218
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error  | 
| 
219
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
247
 | 
 			my $e = valid_worker_err($msg->{result}[1]);  | 
| 
220
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
114
 | 
 			die rpc_error('worker', to_json($e), $e);  | 
| 
221
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} elsif ($msg->{result}[0] eq 'RES_OK') {  | 
| 
222
 | 
95
 | 
 
 | 
 
 | 
 
 | 
 
 | 
323
 | 
 			return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$]  | 
| 
 
 | 
95
 | 
 
 | 
 
 | 
 
 | 
 
 | 
532
 | 
    | 
| 
 
 | 
95
 | 
 
 | 
 
 | 
 
 | 
 
 | 
247
 | 
    | 
| 
223
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
224
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($req eq 'rpcswitch.result') {  | 
| 
225
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		my $channel = $msg->{rpcswitch}{vci};  | 
| 
226
 | 
  
0
  
 | 
  
  0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
0
 | 
 		if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) {  | 
| 
 
 | 
 
 | 
  
  0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
227
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification)  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
228
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) {  | 
| 
229
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			my $e = valid_worker_err($msg->{params}[2]);  | 
| 
230
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('worker', to_json($e), $e);  | 
| 
231
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
232
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]");  | 
| 
233
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($req eq 'rpcswitch.channel_gone') {  | 
| 
234
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
95
 | 
 		my $channel = $msg->{params}{channel};  | 
| 
235
 | 
19
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
133
 | 
 		if (exists $self->{channels}{$channel}) {  | 
| 
236
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
95
 | 
 			my $id = delete $self->{channels}{$channel};  | 
| 
237
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
152
 | 
 			die rpc_error('rpcswitch', "$req for request $id: $channel");  | 
| 
238
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
239
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('rpcswitch', "$req for unknown request: $channel");  | 
| 
240
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
241
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1}));  | 
| 
242
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
243
 | 
76
 | 
 
 | 
 
 | 
 
 | 
 
 | 
361
 | 
 	return;  | 
| 
244
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
245
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
246
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub worker {  | 
| 
247
 | 
621
 | 
 
 | 
 
 | 
  
621
  
 | 
  
1
  
 | 
2000
 | 
 	my ($self, $msg, $workername) = @_;  | 
| 
248
 | 
621
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4344
 | 
 	my ($req, $rsp) = $self->rpc_decode($msg);  | 
| 
249
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
250
 | 
621
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3064
 | 
 	if ($req eq 'rpcswitch.greetings') {  | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
251
 | 
115
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1002
 | 
 		my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert  | 
| 
252
 | 
115
 | 
 
 | 
 
 | 
 
 | 
 
 | 
661
 | 
 		my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};  | 
| 
253
 | 
115
 | 
 
 | 
 
 | 
 
 | 
 
 | 
654
 | 
 		$self->rpc_send_req('rpcswitch.hello', {params => $helloparams});  | 
| 
254
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq 'rpcswitch.hello') {  | 
| 
255
 | 
115
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1226
 | 
 		if (!$msg->{result}[0]) {  | 
| 
256
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");  | 
| 
257
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
258
 | 
115
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1486
 | 
 		$self->rpc_worker_announce($workername);  | 
| 
259
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq 'rpcswitch.announce') {  | 
| 
260
 | 
135
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1021
 | 
 		if (!$msg->{result}[0]) {  | 
| 
261
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");  | 
| 
262
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
263
 | 
135
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1364
 | 
 		my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};  | 
| 
 
 | 
165
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1688
 | 
    | 
| 
 
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
520
 | 
    | 
| 
264
 | 
135
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
485
 | 
 		if (!defined $method) {  | 
| 
265
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]");  | 
| 
266
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
267
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# register announced method  | 
| 
268
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
492
 | 
 		$self->{announced}{$method}{cb} = $self->{methods}{$method}{cb};  | 
| 
269
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
331
 | 
 		$self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id};  | 
| 
270
 | 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
337
 | 
 		delete $self->{methods}{$method}{id};  | 
| 
271
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($req eq 'rpcswitch.ping') {  | 
| 
272
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->rpc_send({id => $msg->{id}, result => 'pong!'});  | 
| 
273
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif (exists $self->{announced}{$req}) {  | 
| 
274
 | 
248
 | 
 
 | 
 
 | 
 
 | 
 
 | 
897
 | 
 		$msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response  | 
| 
275
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
276
 | 
248
 | 
 
 | 
 
 | 
 
 | 
 
 | 
627
 | 
 		$self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone  | 
| 
277
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
278
 | 
248
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
652
 | 
 		if ($self->{async}) { # use async call for forked childs  | 
| 
279
 | 
199
 | 
 
 | 
 
 | 
 
 | 
 
 | 
814
 | 
 			$self->{async}->msg_enqueue($msg);  | 
| 
280
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} else {  | 
| 
281
 | 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
170
 | 
 			my $rpcswitch = rpcswitch_resp($msg->{rpcswitch});  | 
| 
282
 | 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
117
 | 
 			my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) };  | 
| 
 
 | 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
310
 | 
    | 
| 
283
 | 
49
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1119
 | 
 			if ($@) {  | 
| 
284
 | 
15
 | 
 
 | 
 
 | 
 
 | 
 
 | 
195
 | 
 				$self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch});  | 
| 
285
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			} else {  | 
| 
286
 | 
34
 | 
 
 | 
 
 | 
 
 | 
 
 | 
375
 | 
 				$self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch});  | 
| 
287
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
288
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
289
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($rsp eq 'rpcswitch.withdraw') {  | 
| 
290
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# Note: the rpcswitch sends just a boolean result here  | 
| 
291
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
292
 | 
8
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
96
 | 
 		if (!$msg->{result}) {  | 
| 
293
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "$rsp failed: $msg->{result}");  | 
| 
294
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
295
 | 
8
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
88
 | 
 		my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};  | 
| 
 
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
232
 | 
    | 
| 
 
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
176
 | 
    | 
| 
296
 | 
8
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
120
 | 
 		if (!defined $method) {  | 
| 
297
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}");  | 
| 
298
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
299
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# remove announced method  | 
| 
300
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
104
 | 
 		delete $self->{announced}{$method};  | 
| 
301
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
88
 | 
 		delete $self->{methods}{$method}{id};  | 
| 
302
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} elsif ($req eq 'rpcswitch.channel_gone') {  | 
| 
303
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		my $channel = $msg->{params}{channel};  | 
| 
304
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		if ($self->{async}) {  | 
| 
305
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
0
 | 
 			my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel });  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
306
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			if (@$msgs) {  | 
| 
307
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs);  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
308
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
309
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
310
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		if (exists $self->{channels}{$channel}) {  | 
| 
311
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			delete $self->{channels}{$channel};  | 
| 
312
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} else {  | 
| 
313
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			warn "worker $req for unknown request: $channel";  | 
| 
314
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
315
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
316
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		warn "worker unsupported msg: ".to_json($msg, {canonical => 1});  | 
| 
317
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
318
 | 
621
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2195
 | 
 	return;  | 
| 
319
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
320
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
321
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub is_session_req {  | 
| 
322
 | 
34
 | 
 
 | 
 
 | 
  
34
  
 | 
  
0
  
 | 
158
 | 
 	my ($self, $params) = @_;  | 
| 
323
 | 
34
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
488
 | 
 	return unless $self->{sessioncache};  | 
| 
324
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
325
 | 
20
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
250
 | 
 	if (exists $params->{session} && exists $params->{session}{id}) {  | 
| 
326
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
164
 | 
 		return $params->{session};  | 
| 
327
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
328
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	return;  | 
| 
329
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
330
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
331
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub is_session_resp {  | 
| 
332
 | 
34
 | 
 
 | 
 
 | 
  
34
  
 | 
  
0
  
 | 
218
 | 
 	my ($self, $params) = @_;  | 
| 
333
 | 
34
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
631
 | 
 	return unless $self->{sessioncache};  | 
| 
334
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
335
 | 
20
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
437
 | 
 	if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) {  | 
| 
 
 | 
 
 | 
 
 | 
  
 33
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
 
 | 
 
 | 
 
 | 
  
 33
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
336
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
164
 | 
 		return $params->[2]->{set_session};  | 
| 
337
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
338
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	return;  | 
| 
339
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
340
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
341
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub child_handler {  | 
| 
342
 | 
18
 | 
 
 | 
 
 | 
  
18
  
 | 
  
0
  
 | 
284
 | 
 	my ($self, $wr) = @_;  | 
| 
343
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
344
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# The child has to explicitly close the ssl-socket without shutdown.  | 
| 
345
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# Otherwise the parent will get an EOF.  | 
| 
346
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors  | 
| 
347
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
348
 | 
18
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
550
 | 
 	if (ref($self->{sock}) eq 'IO::Socket::SSL') {  | 
| 
349
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->{sock}->close(SSL_no_shutdown => 1);  | 
| 
350
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
351
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
946
 | 
 		close($self->{sock});  | 
| 
352
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
353
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
270
 | 
 	$self->{sock} = $wr;  | 
| 
354
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
702
 | 
 	delete $self->{trace_cb};  | 
| 
355
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
905
 | 
 	local $SIG{INT} = 'DEFAULT';  | 
| 
356
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
579
 | 
 	local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result  | 
| 
357
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
358
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# When session handling is enabled a child might process  | 
| 
359
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# more than one request with the same session_id.  | 
| 
360
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
361
 | 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
668
 | 
 	while (1) {  | 
| 
362
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
981
 | 
 		my $b = eval { netstring_read($self->{sock}) };  | 
| 
 
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1024
 | 
    | 
| 
363
 | 
24
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
282
 | 
 		unless ($b) {  | 
| 
364
 | 
4
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
41
 | 
 			next if ($@ && ($@ =~ /^EINTR/)); # interrupted  | 
| 
365
 | 
4
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
21
 | 
 			die "worker child: $@" if $@;  | 
| 
366
 | 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
49
 | 
 			last; # EOF  | 
| 
367
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
368
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
147
 | 
 		my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
183
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
801
 | 
    | 
| 
369
 | 
20
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
2617
 | 
 		die "worker child: $@" if $@;  | 
| 
370
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
371
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# The client catches all possible die() calls, so that it is  | 
| 
372
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# guaranteed to call exit either from here or from a signal handler.  | 
| 
373
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
374
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
140
 | 
 		my $params;  | 
| 
375
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
236
 | 
 		my $callback = $self->{methods}{$msg->{method}}{cb};  | 
| 
376
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
208
 | 
 		my @resp;  | 
| 
377
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
149
 | 
 		eval {  | 
| 
378
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
605
 | 
 			local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code  | 
| 
379
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
500
 | 
 			@resp = $callback->($msg->{params}, $msg->{rpcswitch});  | 
| 
380
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		};  | 
| 
381
 | 
20
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
903590
 | 
 		if (my $err = $@) {  | 
| 
382
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			$params = ['RES_ERROR', $msg->{id}, $err];  | 
| 
383
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} else {  | 
| 
384
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
161
 | 
 			$params = ['RES_OK', $msg->{id}, @resp];  | 
| 
385
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
386
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
319
 | 
 		$b = eval { to_json($params, {%{$self->{json_utf8}}}) };  | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
209
 | 
    | 
| 
 
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
381
 | 
    | 
| 
387
 | 
20
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
1429
 | 
 		return 1 if $@; # signal die from json encode  | 
| 
388
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
303
 | 
 		my $res = netstring_write($self->{sock}, $b);  | 
| 
389
 | 
20
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
334
 | 
 		return 2 unless $res; # signal socket error  | 
| 
390
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
391
 | 
20
 | 
  
100
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
484
 | 
 		last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params});  | 
| 
392
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
393
 | 
18
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
467
 | 
 	close($self->{sock}) or return 3; # signal errors like broken pipe  | 
| 
394
 | 
17
 | 
 
 | 
 
 | 
 
 | 
 
 | 
489
 | 
 	return 0;  | 
| 
395
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
396
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
397
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _worker_child_write {  | 
| 
398
 | 
177
 | 
 
 | 
 
 | 
  
177
  
 | 
 
 | 
698
 | 
 	my ($self, $child, $msg) = @_;  | 
| 
399
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
400
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
521
 | 
 	my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});  | 
| 
 
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
6792
 | 
    | 
| 
401
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
20141
 | 
 	my $res = netstring_write($child->{reader}, $b); # forward request to worker child  | 
| 
402
 | 
177
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
806
 | 
 	die rpc_error('io', 'netstring_write') unless $res;  | 
| 
403
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
478
 | 
 	return;  | 
| 
404
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
405
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
406
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _worker_child_get {  | 
| 
407
 | 
195
 | 
 
 | 
 
 | 
  
195
  
 | 
 
 | 
414
 | 
 	my ($self, $msg) = @_;  | 
| 
408
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
409
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# First try to reuse child for existing session  | 
| 
410
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
411
 | 
195
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
573
 | 
 	if (my $sessioncache = $self->{sessioncache}) {  | 
| 
412
 | 
20
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
223
 | 
 		if (my $session_req = $self->is_session_req($msg->{params})) {  | 
| 
 
 | 
 
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
413
 | 
20
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
585
 | 
 			if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) {  | 
| 
414
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
18
 | 
 				return $child;  | 
| 
415
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
416
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} elsif ($sessioncache->{session_persist_user}) {  | 
| 
417
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			if (exists $msg->{params}{$sessioncache->{session_persist_user}}) {  | 
| 
418
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				my $user = $msg->{params}{$sessioncache->{session_persist_user}};  | 
| 
419
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) {  | 
| 
420
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 					delete $child->{session}; # reused session will be added after session_resp  | 
| 
421
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 					return $child;  | 
| 
422
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				}  | 
| 
423
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
424
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
425
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
426
 | 
189
 | 
 
 | 
 
 | 
 
 | 
 
 | 
893
 | 
 	my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci});  | 
| 
427
 | 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5170
 | 
 	return $child;  | 
| 
428
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
429
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
430
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _worker_childs_dequeue_and_run {  | 
| 
431
 | 
602
 | 
 
 | 
 
 | 
  
602
  
 | 
 
 | 
1123
 | 
 	my ($self) = @_;  | 
| 
432
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
433
 | 
602
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2037
 | 
 	while (my $msg = $self->{async}->msg_dequeue()) {  | 
| 
434
 | 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
528
 | 
 		my $id = $msg->{id};  | 
| 
435
 | 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
717
 | 
 		my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch});  | 
| 
436
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
437
 | 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
360
 | 
 		my $child = eval { $self->_worker_child_get($msg) };  | 
| 
 
 | 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
592
 | 
    | 
| 
438
 | 
177
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
761
 | 
 		if ($@) {  | 
| 
439
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});  | 
| 
440
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} else {  | 
| 
441
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
505
 | 
 			eval { $self->_worker_child_write($child, $msg) };  | 
| 
 
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1260
 | 
    | 
| 
442
 | 
177
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
418
 | 
 			if ($@) {  | 
| 
443
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				$self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});  | 
| 
444
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				$self->{async}->child_finish($child, 'error');  | 
| 
445
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			} else {  | 
| 
446
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
8399
 | 
 				$self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp});  | 
| 
447
 | 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3136
 | 
 				$self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp});  | 
| 
448
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
449
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
450
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
451
 | 
584
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2148
 | 
 	return;  | 
| 
452
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
453
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
454
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _worker_child_read_and_finish {  | 
| 
455
 | 
166
 | 
 
 | 
 
 | 
  
166
  
 | 
 
 | 
409
 | 
 	my ($self, $child) = @_;  | 
| 
456
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
457
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
217
 | 
 	my $res;  | 
| 
458
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
392
 | 
 	my $b = eval { netstring_read($child->{reader}) };  | 
| 
 
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1349
 | 
    | 
| 
459
 | 
166
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
687
 | 
 	unless ($b) {  | 
| 
460
 | 
14
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
196
 | 
 		my $err = $@ ? $@ : 'EOF';  | 
| 
461
 | 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
420
 | 
 		$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});  | 
| 
462
 | 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1036
 | 
 		$self->{async}->child_finish($child, 'error');  | 
| 
463
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	} else {  | 
| 
464
 | 
152
 | 
 
 | 
 
 | 
 
 | 
 
 | 
394
 | 
 		my $params = eval { from_json($b, {%{$self->{json_utf8}}}) };  | 
| 
 
 | 
152
 | 
 
 | 
 
 | 
 
 | 
 
 | 
276
 | 
    | 
| 
 
 | 
152
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1168
 | 
    | 
| 
465
 | 
152
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
7390
 | 
 		if ($@) {  | 
| 
466
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}});  | 
| 
467
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			$self->{async}->child_finish($child, 'error');  | 
| 
468
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		} else {  | 
| 
469
 | 
152
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1907
 | 
 			$res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}});  | 
| 
470
 | 
152
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
722
 | 
 			unless ($res) {  | 
| 
471
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				my $err = "result msg limit exceeded: " . length($b);  | 
| 
472
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				$res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});  | 
| 
473
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				$self->{async}->child_finish($child, 'error');  | 
| 
474
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				return $res;  | 
| 
475
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
476
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
477
 | 
152
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
488
 | 
 			if (my $sessioncache = $self->{sessioncache}) {  | 
| 
478
 | 
14
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
255
 | 
 				if (my $set_session = $self->is_session_resp($params)) {  | 
| 
479
 | 
14
 | 
  
100
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
141
 | 
 					if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) {  | 
| 
480
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
104
 | 
 						$child->{session} = $sessioncache->session_new($set_session);  | 
| 
481
 | 
8
 | 
 
 | 
 
 | 
 
 | 
 
 | 
79
 | 
 						$sessioncache->expire_insert($child->{session});  | 
| 
482
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					}  | 
| 
483
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				}  | 
| 
484
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
485
 | 
14
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
120
 | 
 				if ($sessioncache->session_put($child)) {  | 
| 
 
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
486
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
15
 | 
 					my $cnt = scalar keys %{$sessioncache->{active}};  | 
| 
 
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
105
 | 
    | 
| 
487
 | 
9
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
33
 | 
 					if ($cnt > $sessioncache->{max_session}) {  | 
| 
488
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 						if ($child = $sessioncache->lru_dequeue()) {  | 
| 
489
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 							$self->{async}->child_finish($child, 'lru');  | 
| 
490
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 						}  | 
| 
491
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					}  | 
| 
492
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
21
 | 
 					$child = undef;  | 
| 
493
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				} elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) {  | 
| 
494
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					# update idle user session with older session_id  | 
| 
495
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					#  | 
| 
496
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 					$self->{async}->child_finish($idle_child, 'update');  | 
| 
497
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
498
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 					if ($sessioncache->session_put($child)) {  | 
| 
499
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 						$child = undef;  | 
| 
500
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					}  | 
| 
501
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				}  | 
| 
502
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
503
 | 
152
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
489
 | 
 			if ($child) {  | 
| 
504
 | 
143
 | 
 
 | 
 
 | 
 
 | 
 
 | 
652
 | 
 				$self->{async}->child_finish($child, 'done');  | 
| 
505
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
506
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
507
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
508
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1568
 | 
 	return $res;  | 
| 
509
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
510
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
511
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _worker_sessions_expire {  | 
| 
512
 | 
602
 | 
 
 | 
 
 | 
  
602
  
 | 
 
 | 
1099
 | 
 	my ($self) = @_;  | 
| 
513
 | 
602
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1552
 | 
 	return unless $self->{sessioncache};  | 
| 
514
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
515
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# If a job for the expired session is active, the session  | 
| 
516
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# will be dropped when session_put() is called after the  | 
| 
517
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# job completed.  | 
| 
518
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
519
 | 
60
 | 
 
 | 
 
 | 
 
 | 
 
 | 
233
 | 
 	while (my $child = $self->{sessioncache}->expired_dequeue()) {  | 
| 
520
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->{async}->child_finish($child, 'expired');  | 
| 
521
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
522
 | 
60
 | 
 
 | 
 
 | 
 
 | 
 
 | 
100
 | 
 	return;  | 
| 
523
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
524
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
525
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_timeout {  | 
| 
526
 | 
1091
 | 
 
 | 
 
 | 
  
1091
  
 | 
  
0
  
 | 
3058
 | 
 	my ($self, $call_timeout) = @_;  | 
| 
527
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
528
 | 
1091
 | 
  
100
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
3115
 | 
 	if ($call_timeout && (keys %{$self->{reqs}} > 0)) {  | 
| 
 
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
152
 | 
    | 
| 
529
 | 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
76
 | 
 		return $call_timeout; # for individual client call  | 
| 
530
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
531
 | 
1072
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2448
 | 
 	return $self->{timeout};  | 
| 
532
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
533
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
534
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_stopped {  | 
| 
535
 | 
1109
 | 
 
 | 
 
 | 
  
1109
  
 | 
  
0
  
 | 
2452
 | 
 	my ($self) = @_;  | 
| 
536
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
537
 | 
1109
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
4030
 | 
 	if ($self->{stop}) {  | 
| 
538
 | 
  
0
  
 | 
  
  0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
0
 | 
 		if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) {  | 
| 
 
 | 
  
0
  
 | 
  
  0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
 
 | 
 
 | 
 
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
539
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			return; # wait for withdraw to complete  | 
| 
540
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		} elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) {  | 
| 
541
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			return; # wait for active jobs to complete  | 
| 
542
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
543
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		return 1;  | 
| 
544
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
545
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
546
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
547
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub rpc_handler {  | 
| 
548
 | 
305
 | 
 
 | 
 
 | 
  
305
  
 | 
  
0
  
 | 
1219
 | 
 	my ($self, $call_timeout, $handler, @handler_params) = @_;  | 
| 
549
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
550
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# returns response or throws rpc_error.  | 
| 
551
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# returns undef when remote side cleanly closed connection with EOF.  | 
| 
552
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
553
 | 
305
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1110
 | 
 	while (!$self->rpc_stopped()) {  | 
| 
554
 | 
1109
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2289
 | 
 		my @pipes = ();  | 
| 
555
 | 
1109
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3047
 | 
 		if ($self->{async}) {  | 
| 
556
 | 
602
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1887
 | 
 			$self->_worker_sessions_expire();  | 
| 
557
 | 
602
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
2734
 | 
 			$self->_worker_childs_dequeue_and_run() unless $self->{stop};  | 
| 
558
 | 
584
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4571
 | 
 			$self->{async}->childs_reap(nonblock => 1);  | 
| 
559
 | 
584
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4996
 | 
 			$self->rpc_worker_flowcontrol(@handler_params);  | 
| 
560
 | 
584
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1000
 | 
 			@pipes = map { $_->{reader} } values %{$self->{async}{jobs}};  | 
| 
 
 | 
428
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1500
 | 
    | 
| 
 
 | 
584
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3664
 | 
    | 
| 
561
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
562
 | 
1091
 | 
 
 | 
 
 | 
 
 | 
 
 | 
6353
 | 
 		my $timeout = $self->rpc_timeout($call_timeout);  | 
| 
563
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
564
 | 
1091
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
6702
 | 
 		if ($timeout || @pipes) {  | 
| 
565
 | 
440
 | 
 
 | 
 
 | 
 
 | 
 
 | 
10168
 | 
 			my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout);  | 
| 
566
 | 
440
 | 
  
 50
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
13620227
 | 
 			next if (@ready == 0) && $!{EINTR}; # $! is not reset on success  | 
| 
567
 | 
440
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2637
 | 
 			die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0);  | 
| 
568
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
569
 | 
421
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3227
 | 
 			foreach my $fh (@ready) {  | 
| 
570
 | 
461
 | 
  
 50
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
3706
 | 
 				if (($fh != $self->{sock}) && $self->{async}) {  | 
| 
571
 | 
166
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
976
 | 
 					unless (exists $self->{async}{jobs}{$fh->fileno}) {  | 
| 
572
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 						die rpc_error('io', "child pipe not found: ". $fh->fileno);  | 
| 
573
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 					}  | 
| 
574
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2136
 | 
 					my $child = $self->{async}{jobs}{$fh->fileno};  | 
| 
575
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1502
 | 
 					$self->{async}->job_rem($child);  | 
| 
576
 | 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
590
 | 
 					my $res = $self->_worker_child_read_and_finish($child);  | 
| 
577
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				}  | 
| 
578
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
579
 | 
421
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
983
 | 
 			next unless grep { $_ == $self->{sock} } @ready;  | 
| 
 
 | 
461
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4107
 | 
    | 
| 
580
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
581
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
582
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# always block on full messages from rpcswitch  | 
| 
583
 | 
946
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1847
 | 
 		my $b = eval { netstring_read($self->{sock}) };  | 
| 
 
 | 
946
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3555
 | 
    | 
| 
584
 | 
946
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2930
 | 
 		unless ($b) {  | 
| 
585
 | 
97
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
333
 | 
 			next if ($@ && ($@ =~ /^EINTR/)); # check if stopped  | 
| 
586
 | 
97
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
278
 | 
 			die rpc_error('io', $@) if $@;  | 
| 
587
 | 
97
 | 
 
 | 
 
 | 
 
 | 
 
 | 
769
 | 
 			return; # EOF  | 
| 
588
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
589
 | 
849
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1419
 | 
 		my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };  | 
| 
 
 | 
849
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2031
 | 
    | 
| 
 
 | 
849
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5605
 | 
    | 
| 
590
 | 
849
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
31745
 | 
 		die rpc_error('jsonrpc', $@) if $@;  | 
| 
591
 | 
830
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3260
 | 
 		$self->{trace_cb}->('RCV', $msg) if $self->{trace_cb};  | 
| 
592
 | 
830
 | 
 
 | 
 
 | 
 
 | 
 
 | 
7779
 | 
 		my $res = eval { $handler->($self, $msg, @handler_params) };  | 
| 
 
 | 
830
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3933
 | 
    | 
| 
593
 | 
830
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2493
 | 
 		if (my $err = $@) {  | 
| 
594
 | 
38
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
418
 | 
 			die $err if ref($err); # forward error  | 
| 
595
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 			die rpc_error('io', $err);  | 
| 
596
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
597
 | 
792
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
3821
 | 
 		if ($res) {  | 
| 
598
 | 
114
 | 
 
 | 
 
 | 
 
 | 
 
 | 
608
 | 
 			return $res;  | 
| 
599
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
600
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
601
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	return; # STOP is checked by caller  | 
| 
602
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
603
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
604
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub work {  | 
| 
605
 | 
134
 | 
 
 | 
 
 | 
  
134
  
 | 
  
1
  
 | 
3141
 | 
 	my ($self, $workername, $methods, $opts) = @_;  | 
| 
606
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
607
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# a write on a shutdown socket should never happen  | 
| 
608
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
609
 | 
134
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
3736
 | 
 	local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" };  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
610
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
611
 | 
134
 | 
 
 | 
 
 | 
 
 | 
 
 | 
811
 | 
 	foreach my $method (keys %$methods) {  | 
| 
612
 | 
130
 | 
 
 | 
 
 | 
 
 | 
 
 | 
747
 | 
 		$self->{methods}{$method}{cb} = $methods->{$method}{cb};  | 
| 
613
 | 
130
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
525
 | 
 		$self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {};  | 
| 
614
 | 
130
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
516
 | 
 		$self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter};  | 
| 
615
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
616
 | 
134
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
524
 | 
 	$opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb};  | 
| 
617
 | 
134
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
1653
 | 
 	$self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async};  | 
| 
618
 | 
134
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
782
 | 
 	$self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol};  | 
| 
619
 | 
134
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
699
 | 
 	$self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session};  | 
| 
620
 | 
134
 | 
 
 | 
 
 | 
 
 | 
 
 | 
719
 | 
 	$self->rpc_handler(0, \&worker, $workername);  | 
| 
621
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
622
 | 
97
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
433
 | 
 	if ($self->{stop}) {  | 
| 
623
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->rpc_worker_withdraw();  | 
| 
624
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->{stop} = 'withdraw';  | 
| 
625
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
626
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# wait some time for withdraw & active jobs to complete  | 
| 
627
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
628
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
0
 | 
 		local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; };  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
629
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		alarm($self->{gracetime});  | 
| 
630
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		$self->rpc_handler(0, \&worker, $workername);  | 
| 
631
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		alarm(0);  | 
| 
632
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 		die rpc_error('io', 'STOP');  | 
| 
633
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
634
 | 
97
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
314
 | 
 	if (my $async = $self->{async}) {  | 
| 
635
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# drop stored sessions  | 
| 
636
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
637
 | 
48
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
384
 | 
 		if (my $sessioncache = $self->{sessioncache}) {  | 
| 
638
 | 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
10
 | 
 			foreach my $session_id (keys %{$sessioncache->{active}}) {  | 
| 
 
 | 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
51
 | 
    | 
| 
639
 | 
3
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
18
 | 
 				if (my $child = $sessioncache->session_get($session_id)) {  | 
| 
640
 | 
3
 | 
 
 | 
 
 | 
 
 | 
 
 | 
15
 | 
 					$async->child_finish($child, 'idle');  | 
| 
641
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 				}  | 
| 
642
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
643
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
644
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# reap remaining childs  | 
| 
645
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
646
 | 
48
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
216
 | 
 		if (keys %{$async->{finished}}) {  | 
| 
 
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
338
 | 
    | 
| 
647
 | 
48
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
3100
 | 
 			local $SIG{ALRM} = sub { warn "worker child wait timeout\n" };  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
648
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
642
 | 
 			alarm(1); # wait at most for one second  | 
| 
649
 | 
48
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
382
 | 
 			unless ($async->childs_reap()) { # blocking  | 
| 
650
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 				$async->childs_reap(nonblock => 1); # continue nonblocking after timeout  | 
| 
651
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 			}  | 
| 
652
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1624
 | 
 			alarm(0);  | 
| 
653
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		}  | 
| 
654
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
655
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		# EOF is only an error here when there are outstanding requests  | 
| 
656
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 		#  | 
| 
657
 | 
48
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
762
 | 
 		my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 });  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
658
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
245
 | 
 		my @childs = keys %{$async->{finished}};  | 
| 
 
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
148
 | 
    | 
| 
659
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
660
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
411
 | 
 		$async->childs_kill(); # don't wait here  | 
| 
661
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
662
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
213
 | 
 		$async->{jobqueue} = [];  | 
| 
663
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
230
 | 
 		$async->{finished} = {};  | 
| 
664
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
665
 | 
48
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
212
 | 
 		die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs);  | 
| 
666
 | 
48
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
197
 | 
 		die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs);  | 
| 
667
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
668
 | 
97
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2603
 | 
 	return;  | 
| 
669
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
670
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
671
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub call {  | 
| 
672
 | 
171
 | 
 
 | 
 
 | 
  
171
  
 | 
  
1
  
 | 
168359
 | 
 	my ($self, $method, $params, $opts) = @_;  | 
| 
673
 | 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
627
 | 
 	my $reqauth = $opts->{reqauth};  | 
| 
674
 | 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
304
 | 
 	my $call_timeout = $opts->{timeout};  | 
| 
675
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
676
 | 
171
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
760
 | 
 	if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests  | 
| 
677
 | 
133
 | 
 
 | 
 
 | 
 
 | 
 
 | 
798
 | 
 		$self->rpc_send_call($method, $params, $reqauth);  | 
| 
678
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	}  | 
| 
679
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	# EOF is an error here (response missing)  | 
| 
680
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 	#  | 
| 
681
 | 
171
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
988
 | 
 	my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof');  | 
| 
682
 | 
114
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
570
 | 
 	return wantarray() ? @$res : $res->[0];  | 
| 
683
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
684
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
685
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # stop() exits an active $client->work() worker handler.  | 
| 
686
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
687
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # - work() dies with RPC::Switch::Client::Tiny::Error which  | 
| 
688
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   might be {type => 'io', message => 'STOP'}, or any other  | 
| 
689
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   error if a non-restartable system call was interrupted.  | 
| 
690
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
691
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   - stop() makes no sense for call() (it has rpc_timeout)  | 
| 
692
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   - the only way to call stop is from a signal handler.  | 
| 
693
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   - if a signal handler is called, non-restartavle perl  | 
| 
694
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #     system call are interrupted and return $! == EINTR.  | 
| 
695
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
696
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   -> this can break an active worker handler and result in  | 
| 
697
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      a RES_ERROR-message to the caller if a non-restartable  | 
| 
698
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      perl syscall is interrupted.  | 
| 
699
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   -> for the async worker mode this should mostly work.  | 
| 
700
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      (sysreadfull, rpc print & IO::Select are restartable).  | 
| 
701
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   -> stop will just wait for a gracetime of 2 seconds  | 
| 
702
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      for active jobs to complete. The remaining jobs  | 
| 
703
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #      are terminated.  | 
| 
704
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
705
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub stop {  | 
| 
706
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
  
0
  
 | 
0
 | 
 	my ($self, $opts) = @_;  | 
| 
707
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	$self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2;  | 
| 
708
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	$self->{stop} = 'pending';  | 
| 
709
 | 
0
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
 	return;  | 
| 
710
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
711
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
712
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # The perl object destroy order is undefined, so $self->{sock}  | 
| 
713
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # might already be destroyed and it makes no sense to try to  | 
| 
714
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # send RES_ERROR messages for remaining childs.  | 
| 
715
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # see: https://perldoc.perl.org/perlobj#Global-Destruction  | 
| 
716
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
717
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # So just terminate remaining childs, and let init-process   | 
| 
718
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # collect them instead of calling waitpid() here.  | 
| 
719
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
720
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # TODO: perl will call DESTROY only when the process exits  | 
| 
721
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #       cleanly or calls exit(). If the process is killed, perl  | 
| 
722
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #       calls DESTROY only if a handler for the matching signal  | 
| 
723
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #       is installed, like: $SIG{'TERM'} = sub { exit; };  | 
| 
724
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
725
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #       -> so does it make sense to support DESTROY at all,  | 
| 
726
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #          if there are situations when it is not called?  | 
| 
727
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #  | 
| 
728
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub DESTROY {  | 
| 
729
 | 
196
 | 
 
 | 
 
 | 
  
196
  
 | 
 
 | 
2820
 | 
 	my ($self) = @_;  | 
| 
730
 | 
196
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
2684
 | 
 	$self->{async}->childs_kill() if $self->{async}; # don't wait here  | 
| 
731
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
732
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
733
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 1;  | 
| 
734
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
735
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 __END__  |