| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | # Lightweight client for the RPC-Switch json-rpc request multiplexer | 
| 2 |  |  |  |  |  |  | # | 
| 3 |  |  |  |  |  |  | # see: RPC::Switch: https://github.com/a6502/rpc-switch | 
| 4 |  |  |  |  |  |  | #      RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client | 
| 5 |  |  |  |  |  |  | # | 
| 6 |  |  |  |  |  |  | package RPC::Switch::Client::Tiny; | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 20 |  |  | 20 |  | 2195276 | use strict; | 
|  | 20 |  |  |  |  | 212 |  | 
|  | 20 |  |  |  |  | 562 |  | 
| 9 | 20 |  |  | 20 |  | 119 | use warnings; | 
|  | 20 |  |  |  |  | 40 |  | 
|  | 20 |  |  |  |  | 502 |  | 
| 10 | 20 |  |  | 20 |  | 100 | use Carp 'croak'; | 
|  | 20 |  |  |  |  | 41 |  | 
|  | 20 |  |  |  |  | 1014 |  | 
| 11 | 20 |  |  | 20 |  | 808 | use JSON; | 
|  | 20 |  |  |  |  | 8897 |  | 
|  | 20 |  |  |  |  | 100 |  | 
| 12 | 20 |  |  | 20 |  | 11265 | use IO::Select; | 
|  | 20 |  |  |  |  | 37230 |  | 
|  | 20 |  |  |  |  | 926 |  | 
| 13 | 20 |  |  | 20 |  | 15486 | use IO::Socket::SSL; | 
|  | 20 |  |  |  |  | 1261182 |  | 
|  | 20 |  |  |  |  | 198 |  | 
| 14 | 20 |  |  | 20 |  | 14665 | use Time::HiRes qw(time); | 
|  | 20 |  |  |  |  | 26223 |  | 
|  | 20 |  |  |  |  | 85 |  | 
| 15 | 20 |  |  | 20 |  | 12387 | use RPC::Switch::Client::Tiny::Error; | 
|  | 20 |  |  |  |  | 59 |  | 
|  | 20 |  |  |  |  | 669 |  | 
| 16 | 20 |  |  | 20 |  | 7928 | use RPC::Switch::Client::Tiny::Netstring; | 
|  | 20 |  |  |  |  | 59 |  | 
|  | 20 |  |  |  |  | 1436 |  | 
| 17 | 20 |  |  | 20 |  | 8481 | use RPC::Switch::Client::Tiny::Async; | 
|  | 20 |  |  |  |  | 79 |  | 
|  | 20 |  |  |  |  | 629 |  | 
| 18 | 20 |  |  | 20 |  | 8517 | use RPC::Switch::Client::Tiny::SessionCache; | 
|  | 20 |  |  |  |  | 59 |  | 
|  | 20 |  |  |  |  | 114513 |  | 
| 19 |  |  |  |  |  |  |  | 
| 20 |  |  |  |  |  |  | our $VERSION = '1.66_01'; | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | sub new { | 
| 23 | 234 |  |  | 234 | 1 | 421105 | my ($class, %args) = @_; | 
| 24 | 234 | 50 |  |  |  | 1771 | my $s = $args{sock} or croak __PACKAGE__ . " expects sock"; | 
| 25 | 234 | 50 |  |  |  | 1009 | unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call | 
| 26 | 234 | 50 |  |  |  | 1351 | defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!"; | 
| 27 | 234 | 100 |  |  |  | 3003 | unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; } | 
|  | 19 |  |  |  |  | 5054 |  | 
| 28 |  |  |  |  |  |  | } | 
| 29 | 215 | 50 |  |  |  | 1255 | unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; } | 
|  | 0 |  |  |  |  | 0 |  | 
| 30 | 215 |  |  |  |  | 3685 | my $self = bless { | 
| 31 |  |  |  |  |  |  | %args, | 
| 32 |  |  |  |  |  |  | id        => 1,  # next request id | 
| 33 |  |  |  |  |  |  | state     => '', # last rpcswitch.type | 
| 34 |  |  |  |  |  |  | reqs      => {}, # outstanding requests | 
| 35 |  |  |  |  |  |  | channels  => {}, # open rpcswitch channels | 
| 36 |  |  |  |  |  |  | methods   => {}, # defined worker methods | 
| 37 |  |  |  |  |  |  | announced => {}, # announced worker methods | 
| 38 |  |  |  |  |  |  | msglimit  => 999999, # max netstring size | 
| 39 |  |  |  |  |  |  | }, $class; | 
| 40 | 215 | 50 |  |  |  | 1689 | if (ref($self->{sock}) eq 'IO::Socket::SSL') { | 
| 41 | 0 | 0 |  |  |  | 0 | $self->{auth_method} = 'clientcert' unless exists $self->{auth_method}; | 
| 42 | 0 | 0 |  |  |  | 0 | $self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert | 
| 43 |  |  |  |  |  |  | } else { | 
| 44 | 215 | 50 |  |  |  | 1529 | $self->{auth_method} = 'password' unless exists $self->{auth_method}; | 
| 45 |  |  |  |  |  |  | } | 
| 46 | 215 | 100 |  |  |  | 1090 | $self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1}; | 
| 47 | 215 |  |  |  |  | 1947 | return $self; | 
| 48 |  |  |  |  |  |  | } | 
| 49 |  |  |  |  |  |  |  | 
| 50 |  |  |  |  |  |  | sub rpc_error { | 
| 51 | 76 |  |  | 76 | 0 | 1520 | return RPC::Switch::Client::Tiny::Error->new(@_); | 
| 52 |  |  |  |  |  |  | } | 
| 53 |  |  |  |  |  |  |  | 
| 54 |  |  |  |  |  |  | sub rpc_send { | 
| 55 | 863 |  |  | 863 | 0 | 3389 | my ($self, $msg) = @_; | 
| 56 | 863 |  |  |  |  | 1808 | my $s = $self->{sock}; | 
| 57 | 863 |  |  |  |  | 2582 | my $len = length($s); | 
| 58 | 863 | 50 | 33 |  |  | 8069 | if ($self->{msglimit} && ($len > $self->{msglimit})) { | 
| 59 | 0 |  |  |  |  | 0 | warn "rpc_send msglimit exceeded: $len > $self->{msglimit}"; | 
| 60 | 0 |  |  |  |  | 0 | return; | 
| 61 |  |  |  |  |  |  | } | 
| 62 | 863 |  |  |  |  | 4286 | $msg->{jsonrpc} = '2.0'; | 
| 63 | 863 |  |  |  |  | 1642 | my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); | 
|  | 863 |  |  |  |  | 4841 |  | 
| 64 | 863 | 100 |  |  |  | 27876 | $self->{trace_cb}->('SND', $msg) if $self->{trace_cb}; | 
| 65 | 863 |  |  |  |  | 11593 | return netstring_write($s, $str); | 
| 66 |  |  |  |  |  |  | } | 
| 67 |  |  |  |  |  |  |  | 
| 68 |  |  |  |  |  |  | sub rpc_send_req { | 
| 69 | 472 |  |  | 472 | 0 | 1406 | my ($self, $method, $msg) = @_; | 
| 70 | 472 |  |  |  |  | 1860 | my $id = "$self->{id}"; $self->{id}++; | 
|  | 472 |  |  |  |  | 1197 |  | 
| 71 | 472 |  |  |  |  | 1375 | $msg->{id} = $id; | 
| 72 | 472 |  |  |  |  | 1202 | $msg->{method} = $method; | 
| 73 | 472 |  |  |  |  | 2343 | $self->{reqs}{$id} = $method; | 
| 74 | 472 | 100 |  |  |  | 3266 | $self->{state} = $method if $method =~ /^rpcswitch\./; | 
| 75 | 472 | 100 |  |  |  | 1557 | $self->rpc_send($msg) or return; | 
| 76 | 467 |  |  |  |  | 1576 | return $id; | 
| 77 |  |  |  |  |  |  | } | 
| 78 |  |  |  |  |  |  |  | 
| 79 |  |  |  |  |  |  | sub rpc_send_call { | 
| 80 | 176 |  |  | 176 | 0 | 2629 | my ($self, $method, $params, $reqauth) = @_; | 
| 81 |  |  |  |  |  |  |  | 
| 82 | 176 | 50 |  |  |  | 552 | if (defined $reqauth) { # request authentication | 
| 83 |  |  |  |  |  |  | # Without the vcookie the rpcswitch does not validate | 
| 84 |  |  |  |  |  |  | # the reqauth parameter. | 
| 85 |  |  |  |  |  |  | # The vcookie 'eatme' value is hardcoded in the rpc-switch | 
| 86 |  |  |  |  |  |  | # code and is called 'channel information version'. | 
| 87 |  |  |  |  |  |  | # | 
| 88 | 0 |  |  |  |  | 0 | return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}}); | 
| 89 |  |  |  |  |  |  | } else { | 
| 90 | 176 |  |  |  |  | 762 | return $self->rpc_send_req($method, {params => $params}); | 
| 91 |  |  |  |  |  |  | } | 
| 92 |  |  |  |  |  |  | } | 
| 93 |  |  |  |  |  |  |  | 
| 94 |  |  |  |  |  |  | sub rpc_decode { | 
| 95 | 849 |  |  | 849 | 0 | 2545 | my ($self, $msg) = @_; | 
| 96 | 849 |  |  |  |  | 4631 | my ($req, $rsp) = ('', ''); | 
| 97 |  |  |  |  |  |  |  | 
| 98 | 849 | 50 | 66 |  |  | 5962 | unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) { | 
|  |  |  | 66 |  |  |  |  | 
| 99 | 0 |  |  |  |  | 0 | die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1})); | 
| 100 |  |  |  |  |  |  | } | 
| 101 | 849 | 100 |  |  |  | 3802 | if (exists $msg->{method}) { | 
|  |  | 50 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
| 102 | 420 |  |  |  |  | 1336 | $req = $msg->{method}; | 
| 103 |  |  |  |  |  |  | } elsif (!defined $msg->{id}) { | 
| 104 | 0 |  |  |  |  | 0 | die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1})); | 
| 105 |  |  |  |  |  |  | } elsif (!exists $self->{reqs}{$msg->{id}}) { | 
| 106 | 0 |  |  |  |  | 0 | die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1})); | 
| 107 |  |  |  |  |  |  | } else { | 
| 108 | 429 |  |  |  |  | 1250 | $rsp = delete $self->{reqs}{$msg->{id}}; | 
| 109 |  |  |  |  |  |  |  | 
| 110 | 429 | 50 |  |  |  | 1119 | if (exists $msg->{error}) { | 
| 111 | 0 |  |  |  |  | 0 | die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}}); | 
| 112 |  |  |  |  |  |  | } | 
| 113 | 429 | 50 |  |  |  | 1212 | if (!exists $msg->{result}) { | 
| 114 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing"); | 
| 115 |  |  |  |  |  |  | } | 
| 116 | 429 | 50 | 100 |  |  | 1751 | if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) { | 
|  |  |  | 66 |  |  |  |  | 
| 117 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}"); | 
| 118 |  |  |  |  |  |  | } | 
| 119 |  |  |  |  |  |  | } | 
| 120 | 849 |  |  |  |  | 3534 | return ($req, $rsp); | 
| 121 |  |  |  |  |  |  | } | 
| 122 |  |  |  |  |  |  |  | 
| 123 |  |  |  |  |  |  | sub rpc_worker_announce { | 
| 124 | 159 |  |  | 159 | 0 | 735 | my ($self, $workername) = @_; | 
| 125 |  |  |  |  |  |  |  | 
| 126 |  |  |  |  |  |  | # ignore repeated announce request or unfinished withdraw | 
| 127 |  |  |  |  |  |  | # | 
| 128 | 159 | 100 |  |  |  | 395 | return if (keys %{$self->{announced}}); | 
|  | 159 |  |  |  |  | 647 |  | 
| 129 |  |  |  |  |  |  |  | 
| 130 | 130 |  |  |  |  | 311 | foreach my $method (keys %{$self->{methods}}) { | 
|  | 130 |  |  |  |  | 714 |  | 
| 131 | 145 | 100 |  |  |  | 472 | next if exists $self->{methods}{$method}{id}; # active announce/withdraw request | 
| 132 |  |  |  |  |  |  |  | 
| 133 | 135 |  |  |  |  | 628 | my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}}; | 
| 134 | 135 | 50 |  |  |  | 437 | $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; | 
| 135 | 135 |  |  |  |  | 530 | my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params}); | 
| 136 | 135 | 50 |  |  |  | 639 | die rpc_error('io', 'netstring_write') unless defined $id; | 
| 137 | 135 |  |  |  |  | 620 | $self->{methods}{$method}{id} = $id; | 
| 138 |  |  |  |  |  |  | } | 
| 139 | 130 |  |  |  |  | 329 | return; | 
| 140 |  |  |  |  |  |  | } | 
| 141 |  |  |  |  |  |  |  | 
| 142 |  |  |  |  |  |  | sub rpc_worker_withdraw { | 
| 143 | 31 |  |  | 31 | 0 | 237 | my ($self) = @_; | 
| 144 |  |  |  |  |  |  |  | 
| 145 |  |  |  |  |  |  | # callers will get code -32006 'opposite end of channel gone' | 
| 146 |  |  |  |  |  |  | # errors when the announcement is withdrawn. | 
| 147 |  |  |  |  |  |  | # | 
| 148 | 31 |  |  |  |  | 126 | foreach my $method (keys %{$self->{announced}}) { | 
|  | 31 |  |  |  |  | 238 |  | 
| 149 | 16 | 100 |  |  |  | 144 | next if exists $self->{methods}{$method}{id}; # active announce/withdraw request | 
| 150 |  |  |  |  |  |  |  | 
| 151 | 8 |  |  |  |  | 72 | my $params = {method => $method}; | 
| 152 | 8 | 50 |  |  |  | 144 | $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter}; | 
| 153 | 8 |  |  |  |  | 144 | my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params}); | 
| 154 | 8 | 50 |  |  |  | 144 | die rpc_error('io', 'netstring_write') unless defined $id; | 
| 155 | 8 |  |  |  |  | 112 | $self->{methods}{$method}{id} = $id; | 
| 156 |  |  |  |  |  |  | } | 
| 157 | 31 |  |  |  |  | 109 | return; | 
| 158 |  |  |  |  |  |  | } | 
| 159 |  |  |  |  |  |  |  | 
| 160 |  |  |  |  |  |  | sub rpc_worker_flowcontrol { | 
| 161 | 581 |  |  | 581 | 0 | 2479 | my ($self, $workername) = @_; | 
| 162 |  |  |  |  |  |  |  | 
| 163 |  |  |  |  |  |  | # need to be in connected auth state | 
| 164 | 581 | 100 | 100 |  |  | 5193 | return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello')); | 
| 165 |  |  |  |  |  |  |  | 
| 166 | 449 | 100 |  |  |  | 1157 | if ($self->{flowcontrol}) { | 
| 167 | 102 |  |  |  |  | 272 | my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}}); | 
|  | 102 |  |  |  |  | 259 |  | 
|  | 102 |  |  |  |  | 292 |  | 
| 168 |  |  |  |  |  |  | #printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async}; | 
| 169 | 102 | 100 |  |  |  | 804 | if ($cnt >= $self->{async}{max_async} * 2) { | 
|  |  | 100 |  |  |  |  |  | 
| 170 | 31 |  |  |  |  | 353 | $self->rpc_worker_withdraw(); | 
| 171 |  |  |  |  |  |  | } elsif ($cnt < $self->{async}{max_async}) { | 
| 172 | 44 |  |  |  |  | 320 | $self->rpc_worker_announce($workername); | 
| 173 |  |  |  |  |  |  | } | 
| 174 |  |  |  |  |  |  | } | 
| 175 | 449 |  |  |  |  | 865 | return; | 
| 176 |  |  |  |  |  |  | } | 
| 177 |  |  |  |  |  |  |  | 
| 178 |  |  |  |  |  |  | sub valid_worker_err { | 
| 179 | 19 |  |  | 19 | 0 | 304 | my ($err) = @_; | 
| 180 | 19 | 50 |  |  |  | 1083 | $err = {text => $err} unless ref($err); # convert plain errors | 
| 181 | 19 | 50 |  |  |  | 133 | $err->{class} = 'hard' unless exists $err->{class}; | 
| 182 | 19 |  |  |  |  | 190 | return $err; | 
| 183 |  |  |  |  |  |  | } | 
| 184 |  |  |  |  |  |  |  | 
| 185 |  |  |  |  |  |  | sub rpcswitch_resp { | 
| 186 | 244 |  |  | 244 | 0 | 515 | my ($rpcswitch) = @_; | 
| 187 |  |  |  |  |  |  |  | 
| 188 |  |  |  |  |  |  | # Just the vcookie & vci-channel parameters are required by | 
| 189 |  |  |  |  |  |  | # the rpcswitch. The worker_id field is optional, and might | 
| 190 |  |  |  |  |  |  | # be set to the worker_id returned by the announce response. | 
| 191 |  |  |  |  |  |  | # | 
| 192 | 244 |  |  |  |  | 1031 | $rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}}; | 
| 193 |  |  |  |  |  |  | #$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}}; | 
| 194 | 244 |  |  |  |  | 489 | return $rpcswitch; | 
| 195 |  |  |  |  |  |  | } | 
| 196 |  |  |  |  |  |  |  | 
| 197 |  |  |  |  |  |  | sub client { | 
| 198 | 228 |  |  | 228 | 1 | 3002 | my ($self, $msg, $method, $params, $reqauth) = @_; | 
| 199 | 228 |  |  |  |  | 1064 | my ($req, $rsp) = $self->rpc_decode($msg); | 
| 200 |  |  |  |  |  |  |  | 
| 201 | 228 | 100 |  |  |  | 1615 | if ($req eq 'rpcswitch.greetings') { | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
| 202 | 38 | 50 |  |  |  | 190 | my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert | 
| 203 | 38 |  |  |  |  | 209 | my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; | 
| 204 | 38 |  |  |  |  | 456 | $self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); | 
| 205 |  |  |  |  |  |  | } elsif ($rsp eq 'rpcswitch.hello') { | 
| 206 | 19 | 50 |  |  |  | 1976 | if (!$msg->{result}[0]) { | 
| 207 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); | 
| 208 |  |  |  |  |  |  | } | 
| 209 | 19 |  |  |  |  | 874 | $self->rpc_send_call($method, $params, $reqauth); | 
| 210 |  |  |  |  |  |  | } elsif ($rsp eq 'rpcswitch.ping') { | 
| 211 | 19 |  |  |  |  | 266 | return [$msg->{result}]; # ping complete | 
| 212 |  |  |  |  |  |  | } elsif ($rsp eq $method) { | 
| 213 | 133 | 50 |  |  |  | 836 | if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel | 
| 214 | 133 |  |  |  |  | 627 | $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone | 
| 215 |  |  |  |  |  |  | } | 
| 216 | 133 | 100 |  |  |  | 1254 | if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump) | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
| 217 | 19 |  |  |  |  | 361 | $self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id | 
| 218 |  |  |  |  |  |  | } elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error | 
| 219 | 19 |  |  |  |  | 266 | my $e = valid_worker_err($msg->{result}[1]); | 
| 220 | 19 |  |  |  |  | 247 | die rpc_error('worker', to_json($e), $e); | 
| 221 |  |  |  |  |  |  | } elsif ($msg->{result}[0] eq 'RES_OK') { | 
| 222 | 95 |  |  |  |  | 342 | return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$] | 
|  | 95 |  |  |  |  | 570 |  | 
|  | 95 |  |  |  |  | 380 |  | 
| 223 |  |  |  |  |  |  | } | 
| 224 |  |  |  |  |  |  | } elsif ($req eq 'rpcswitch.result') { | 
| 225 | 0 |  |  |  |  | 0 | my $channel = $msg->{rpcswitch}{vci}; | 
| 226 | 0 | 0 | 0 |  |  | 0 | if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) { | 
|  |  | 0 | 0 |  |  |  |  | 
| 227 | 0 |  |  |  |  | 0 | return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification) | 
|  | 0 |  |  |  |  | 0 |  | 
|  | 0 |  |  |  |  | 0 |  | 
| 228 |  |  |  |  |  |  | } elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) { | 
| 229 | 0 |  |  |  |  | 0 | my $e = valid_worker_err($msg->{params}[2]); | 
| 230 | 0 |  |  |  |  | 0 | die rpc_error('worker', to_json($e), $e); | 
| 231 |  |  |  |  |  |  | } | 
| 232 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]"); | 
| 233 |  |  |  |  |  |  | } elsif ($req eq 'rpcswitch.channel_gone') { | 
| 234 | 19 |  |  |  |  | 76 | my $channel = $msg->{params}{channel}; | 
| 235 | 19 | 50 |  |  |  | 76 | if (exists $self->{channels}{$channel}) { | 
| 236 | 19 |  |  |  |  | 76 | my $id = delete $self->{channels}{$channel}; | 
| 237 | 19 |  |  |  |  | 133 | die rpc_error('rpcswitch', "$req for request $id: $channel"); | 
| 238 |  |  |  |  |  |  | } | 
| 239 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$req for unknown request: $channel"); | 
| 240 |  |  |  |  |  |  | } else { | 
| 241 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1})); | 
| 242 |  |  |  |  |  |  | } | 
| 243 | 76 |  |  |  |  | 361 | return; | 
| 244 |  |  |  |  |  |  | } | 
| 245 |  |  |  |  |  |  |  | 
| 246 |  |  |  |  |  |  | sub worker { | 
| 247 | 621 |  |  | 621 | 1 | 2187 | my ($self, $msg, $workername) = @_; | 
| 248 | 621 |  |  |  |  | 3866 | my ($req, $rsp) = $self->rpc_decode($msg); | 
| 249 |  |  |  |  |  |  |  | 
| 250 | 621 | 100 |  |  |  | 2876 | if ($req eq 'rpcswitch.greetings') { | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
|  |  | 100 |  |  |  |  |  | 
|  |  | 50 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
| 251 | 115 | 50 |  |  |  | 561 | my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert | 
| 252 | 115 |  |  |  |  | 538 | my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}}; | 
| 253 | 115 |  |  |  |  | 514 | $self->rpc_send_req('rpcswitch.hello', {params => $helloparams}); | 
| 254 |  |  |  |  |  |  | } elsif ($rsp eq 'rpcswitch.hello') { | 
| 255 | 115 | 50 |  |  |  | 1090 | if (!$msg->{result}[0]) { | 
| 256 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); | 
| 257 |  |  |  |  |  |  | } | 
| 258 | 115 |  |  |  |  | 2198 | $self->rpc_worker_announce($workername); | 
| 259 |  |  |  |  |  |  | } elsif ($rsp eq 'rpcswitch.announce') { | 
| 260 | 135 | 50 |  |  |  | 685 | if (!$msg->{result}[0]) { | 
| 261 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]"); | 
| 262 |  |  |  |  |  |  | } | 
| 263 | 135 | 100 |  |  |  | 1297 | my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; | 
|  | 165 |  |  |  |  | 2198 |  | 
|  | 135 |  |  |  |  | 466 |  | 
| 264 | 135 | 50 |  |  |  | 519 | if (!defined $method) { | 
| 265 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]"); | 
| 266 |  |  |  |  |  |  | } | 
| 267 |  |  |  |  |  |  | # register announced method | 
| 268 | 135 |  |  |  |  | 463 | $self->{announced}{$method}{cb} = $self->{methods}{$method}{cb}; | 
| 269 | 135 |  |  |  |  | 338 | $self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id}; | 
| 270 | 135 |  |  |  |  | 400 | delete $self->{methods}{$method}{id}; | 
| 271 |  |  |  |  |  |  | } elsif ($req eq 'rpcswitch.ping') { | 
| 272 | 0 |  |  |  |  | 0 | $self->rpc_send({id => $msg->{id}, result => 'pong!'}); | 
| 273 |  |  |  |  |  |  | } elsif (exists $self->{announced}{$req}) { | 
| 274 | 248 |  |  |  |  | 651 | $msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response | 
| 275 |  |  |  |  |  |  |  | 
| 276 | 248 |  |  |  |  | 648 | $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone | 
| 277 |  |  |  |  |  |  |  | 
| 278 | 248 | 100 |  |  |  | 594 | if ($self->{async}) { # use async call for forked childs | 
| 279 | 199 |  |  |  |  | 807 | $self->{async}->msg_enqueue($msg); | 
| 280 |  |  |  |  |  |  | } else { | 
| 281 | 49 |  |  |  |  | 170 | my $rpcswitch = rpcswitch_resp($msg->{rpcswitch}); | 
| 282 | 49 |  |  |  |  | 117 | my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) }; | 
|  | 49 |  |  |  |  | 420 |  | 
| 283 | 49 | 100 |  |  |  | 1139 | if ($@) { | 
| 284 | 15 |  |  |  |  | 255 | $self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch}); | 
| 285 |  |  |  |  |  |  | } else { | 
| 286 | 34 |  |  |  |  | 469 | $self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch}); | 
| 287 |  |  |  |  |  |  | } | 
| 288 |  |  |  |  |  |  | } | 
| 289 |  |  |  |  |  |  | } elsif ($rsp eq 'rpcswitch.withdraw') { | 
| 290 |  |  |  |  |  |  | # Note: the rpcswitch sends just a boolean result here | 
| 291 |  |  |  |  |  |  | # | 
| 292 | 8 | 50 |  |  |  | 184 | if (!$msg->{result}) { | 
| 293 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "$rsp failed: $msg->{result}"); | 
| 294 |  |  |  |  |  |  | } | 
| 295 | 8 | 50 |  |  |  | 104 | my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}}; | 
|  | 8 |  |  |  |  | 176 |  | 
|  | 8 |  |  |  |  | 128 |  | 
| 296 | 8 | 50 |  |  |  | 40 | if (!defined $method) { | 
| 297 | 0 |  |  |  |  | 0 | die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}"); | 
| 298 |  |  |  |  |  |  | } | 
| 299 |  |  |  |  |  |  | # remove announced method | 
| 300 | 8 |  |  |  |  | 112 | delete $self->{announced}{$method}; | 
| 301 | 8 |  |  |  |  | 32 | delete $self->{methods}{$method}{id}; | 
| 302 |  |  |  |  |  |  | } elsif ($req eq 'rpcswitch.channel_gone') { | 
| 303 | 0 |  |  |  |  | 0 | my $channel = $msg->{params}{channel}; | 
| 304 | 0 | 0 |  |  |  | 0 | if ($self->{async}) { | 
| 305 | 0 |  |  | 0 |  | 0 | my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel }); | 
|  | 0 |  |  |  |  | 0 |  | 
| 306 | 0 | 0 |  |  |  | 0 | if (@$msgs) { | 
| 307 | 0 |  |  |  |  | 0 | warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs); | 
|  | 0 |  |  |  |  | 0 |  | 
| 308 |  |  |  |  |  |  | } | 
| 309 |  |  |  |  |  |  | } | 
| 310 | 0 | 0 |  |  |  | 0 | if (exists $self->{channels}{$channel}) { | 
| 311 | 0 |  |  |  |  | 0 | delete $self->{channels}{$channel}; | 
| 312 |  |  |  |  |  |  | } else { | 
| 313 | 0 |  |  |  |  | 0 | warn "worker $req for unknown request: $channel"; | 
| 314 |  |  |  |  |  |  | } | 
| 315 |  |  |  |  |  |  | } else { | 
| 316 | 0 |  |  |  |  | 0 | warn "worker unsupported msg: ".to_json($msg, {canonical => 1}); | 
| 317 |  |  |  |  |  |  | } | 
| 318 | 621 |  |  |  |  | 1881 | return; | 
| 319 |  |  |  |  |  |  | } | 
| 320 |  |  |  |  |  |  |  | 
| 321 |  |  |  |  |  |  | sub is_session_req { | 
| 322 | 34 |  |  | 34 | 0 | 171 | my ($self, $params) = @_; | 
| 323 | 34 | 100 |  |  |  | 373 | return unless $self->{sessioncache}; | 
| 324 |  |  |  |  |  |  |  | 
| 325 | 20 | 50 | 33 |  |  | 232 | if (exists $params->{session} && exists $params->{session}{id}) { | 
| 326 | 20 |  |  |  |  | 142 | return $params->{session}; | 
| 327 |  |  |  |  |  |  | } | 
| 328 | 0 |  |  |  |  | 0 | return; | 
| 329 |  |  |  |  |  |  | } | 
| 330 |  |  |  |  |  |  |  | 
| 331 |  |  |  |  |  |  | sub is_session_resp { | 
| 332 | 34 |  |  | 34 | 0 | 127 | my ($self, $params) = @_; | 
| 333 | 34 | 100 |  |  |  | 498 | return unless $self->{sessioncache}; | 
| 334 |  |  |  |  |  |  |  | 
| 335 | 20 | 50 | 33 |  |  | 440 | if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) { | 
|  |  |  | 33 |  |  |  |  | 
|  |  |  | 33 |  |  |  |  | 
| 336 | 20 |  |  |  |  | 197 | return $params->[2]->{set_session}; | 
| 337 |  |  |  |  |  |  | } | 
| 338 | 0 |  |  |  |  | 0 | return; | 
| 339 |  |  |  |  |  |  | } | 
| 340 |  |  |  |  |  |  |  | 
| 341 |  |  |  |  |  |  | sub child_handler { | 
| 342 | 18 |  |  | 18 | 0 | 370 | my ($self, $wr) = @_; | 
| 343 |  |  |  |  |  |  |  | 
| 344 |  |  |  |  |  |  | # The child has to explicitly close the ssl-socket without shutdown. | 
| 345 |  |  |  |  |  |  | # Otherwise the parent will get an EOF. | 
| 346 |  |  |  |  |  |  | # see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors | 
| 347 |  |  |  |  |  |  | # | 
| 348 | 18 | 50 |  |  |  | 482 | if (ref($self->{sock}) eq 'IO::Socket::SSL') { | 
| 349 | 0 |  |  |  |  | 0 | $self->{sock}->close(SSL_no_shutdown => 1); | 
| 350 |  |  |  |  |  |  | } else { | 
| 351 | 18 |  |  |  |  | 582 | close($self->{sock}); | 
| 352 |  |  |  |  |  |  | } | 
| 353 | 18 |  |  |  |  | 83 | $self->{sock} = $wr; | 
| 354 | 18 |  |  |  |  | 501 | delete $self->{trace_cb}; | 
| 355 | 18 |  |  |  |  | 810 | local $SIG{INT} = 'DEFAULT'; | 
| 356 | 18 |  |  |  |  | 631 | local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result | 
| 357 |  |  |  |  |  |  |  | 
| 358 |  |  |  |  |  |  | # When session handling is enabled a child might process | 
| 359 |  |  |  |  |  |  | # more than one request with the same session_id. | 
| 360 |  |  |  |  |  |  | # | 
| 361 | 18 |  |  |  |  | 273 | while (1) { | 
| 362 | 24 |  |  |  |  | 946 | my $b = eval { netstring_read($self->{sock}) }; | 
|  | 24 |  |  |  |  | 739 |  | 
| 363 | 24 | 100 |  |  |  | 214 | unless ($b) { | 
| 364 | 4 | 50 | 33 |  |  | 28 | next if ($@ && ($@ =~ /^EINTR/)); # interrupted | 
| 365 | 4 | 50 |  |  |  | 23 | die "worker child: $@" if $@; | 
| 366 | 4 |  |  |  |  | 12 | last; # EOF | 
| 367 |  |  |  |  |  |  | } | 
| 368 | 20 |  |  |  |  | 171 | my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; | 
|  | 20 |  |  |  |  | 121 |  | 
|  | 20 |  |  |  |  | 799 |  | 
| 369 | 20 | 50 |  |  |  | 2907 | die "worker child: $@" if $@; | 
| 370 |  |  |  |  |  |  |  | 
| 371 |  |  |  |  |  |  | # The client catches all possible die() calls, so that it is | 
| 372 |  |  |  |  |  |  | # guaranteed to call exit either from here or from a signal handler. | 
| 373 |  |  |  |  |  |  | # | 
| 374 | 20 |  |  |  |  | 232 | my $params; | 
| 375 | 20 |  |  |  |  | 229 | my $callback = $self->{methods}{$msg->{method}}{cb}; | 
| 376 | 20 |  |  |  |  | 173 | my @resp; | 
| 377 | 20 |  |  |  |  | 129 | eval { | 
| 378 | 20 |  |  |  |  | 686 | local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code | 
| 379 | 20 |  |  |  |  | 590 | @resp = $callback->($msg->{params}, $msg->{rpcswitch}); | 
| 380 |  |  |  |  |  |  | }; | 
| 381 | 20 | 50 |  |  |  | 903509 | if (my $err = $@) { | 
| 382 | 0 |  |  |  |  | 0 | $params = ['RES_ERROR', $msg->{id}, $err]; | 
| 383 |  |  |  |  |  |  | } else { | 
| 384 | 20 |  |  |  |  | 150 | $params = ['RES_OK', $msg->{id}, @resp]; | 
| 385 |  |  |  |  |  |  | } | 
| 386 | 20 |  |  |  |  | 82 | $b = eval { to_json($params, {%{$self->{json_utf8}}}) }; | 
|  | 20 |  |  |  |  | 46 |  | 
|  | 20 |  |  |  |  | 345 |  | 
| 387 | 20 | 50 |  |  |  | 1279 | return 1 if $@; # signal die from json encode | 
| 388 | 20 |  |  |  |  | 404 | my $res = netstring_write($self->{sock}, $b); | 
| 389 | 20 | 50 |  |  |  | 538 | return 2 unless $res; # signal socket error | 
| 390 |  |  |  |  |  |  |  | 
| 391 | 20 | 100 | 66 |  |  | 204 | last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params}); | 
| 392 |  |  |  |  |  |  | } | 
| 393 | 18 | 100 |  |  |  | 556 | close($self->{sock}) or return 3; # signal errors like broken pipe | 
| 394 | 17 |  |  |  |  | 487 | return 0; | 
| 395 |  |  |  |  |  |  | } | 
| 396 |  |  |  |  |  |  |  | 
| 397 |  |  |  |  |  |  | sub _worker_child_write { | 
| 398 | 177 |  |  | 177 |  | 706 | my ($self, $child, $msg) = @_; | 
| 399 |  |  |  |  |  |  |  | 
| 400 | 177 |  |  |  |  | 342 | my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}}); | 
|  | 177 |  |  |  |  | 6455 |  | 
| 401 | 177 |  |  |  |  | 19927 | my $res = netstring_write($child->{reader}, $b); # forward request to worker child | 
| 402 | 177 | 50 |  |  |  | 928 | die rpc_error('io', 'netstring_write') unless $res; | 
| 403 | 177 |  |  |  |  | 613 | return; | 
| 404 |  |  |  |  |  |  | } | 
| 405 |  |  |  |  |  |  |  | 
| 406 |  |  |  |  |  |  | sub _worker_child_get { | 
| 407 | 195 |  |  | 195 |  | 566 | my ($self, $msg) = @_; | 
| 408 |  |  |  |  |  |  |  | 
| 409 |  |  |  |  |  |  | # First try to reuse child for existing session | 
| 410 |  |  |  |  |  |  | # | 
| 411 | 195 | 100 |  |  |  | 517 | if (my $sessioncache = $self->{sessioncache}) { | 
| 412 | 20 | 50 |  |  |  | 166 | if (my $session_req = $self->is_session_req($msg->{params})) { | 
|  |  | 0 |  |  |  |  |  | 
| 413 | 20 | 100 |  |  |  | 183 | if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) { | 
| 414 | 6 |  |  |  |  | 18 | return $child; | 
| 415 |  |  |  |  |  |  | } | 
| 416 |  |  |  |  |  |  | } elsif ($sessioncache->{session_persist_user}) { | 
| 417 | 0 | 0 |  |  |  | 0 | if (exists $msg->{params}{$sessioncache->{session_persist_user}}) { | 
| 418 | 0 |  |  |  |  | 0 | my $user = $msg->{params}{$sessioncache->{session_persist_user}}; | 
| 419 | 0 | 0 |  |  |  | 0 | if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) { | 
| 420 | 0 |  |  |  |  | 0 | delete $child->{session}; # reused session will be added after session_resp | 
| 421 | 0 |  |  |  |  | 0 | return $child; | 
| 422 |  |  |  |  |  |  | } | 
| 423 |  |  |  |  |  |  | } | 
| 424 |  |  |  |  |  |  | } | 
| 425 |  |  |  |  |  |  | } | 
| 426 | 189 |  |  |  |  | 1161 | my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci}); | 
| 427 | 171 |  |  |  |  | 3263 | return $child; | 
| 428 |  |  |  |  |  |  | } | 
| 429 |  |  |  |  |  |  |  | 
| 430 |  |  |  |  |  |  | sub _worker_childs_dequeue_and_run { | 
| 431 | 599 |  |  | 599 |  | 1128 | my ($self) = @_; | 
| 432 |  |  |  |  |  |  |  | 
| 433 | 599 |  |  |  |  | 2641 | while (my $msg = $self->{async}->msg_dequeue()) { | 
| 434 | 195 |  |  |  |  | 482 | my $id = $msg->{id}; | 
| 435 | 195 |  |  |  |  | 760 | my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch}); | 
| 436 |  |  |  |  |  |  |  | 
| 437 | 195 |  |  |  |  | 435 | my $child = eval { $self->_worker_child_get($msg) }; | 
|  | 195 |  |  |  |  | 776 |  | 
| 438 | 177 | 50 |  |  |  | 1010 | if ($@) { | 
| 439 | 0 |  |  |  |  | 0 | $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); | 
| 440 |  |  |  |  |  |  | } else { | 
| 441 | 177 |  |  |  |  | 471 | eval { $self->_worker_child_write($child, $msg) }; | 
|  | 177 |  |  |  |  | 1555 |  | 
| 442 | 177 | 50 |  |  |  | 419 | if ($@) { | 
| 443 | 0 |  |  |  |  | 0 | $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp}); | 
| 444 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($child, 'error'); | 
| 445 |  |  |  |  |  |  | } else { | 
| 446 | 177 |  |  |  |  | 7950 | $self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp}); | 
| 447 | 177 |  |  |  |  | 1703 | $self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp}); | 
| 448 |  |  |  |  |  |  | } | 
| 449 |  |  |  |  |  |  | } | 
| 450 |  |  |  |  |  |  | } | 
| 451 | 581 |  |  |  |  | 2218 | return; | 
| 452 |  |  |  |  |  |  | } | 
| 453 |  |  |  |  |  |  |  | 
| 454 |  |  |  |  |  |  | sub _worker_child_read_and_finish { | 
| 455 | 165 |  |  | 165 |  | 443 | my ($self, $child) = @_; | 
| 456 |  |  |  |  |  |  |  | 
| 457 | 165 |  |  |  |  | 231 | my $res; | 
| 458 | 165 |  |  |  |  | 345 | my $b = eval { netstring_read($child->{reader}) }; | 
|  | 165 |  |  |  |  | 942 |  | 
| 459 | 165 | 100 |  |  |  | 493 | unless ($b) { | 
| 460 | 14 | 50 |  |  |  | 210 | my $err = $@ ? $@ : 'EOF'; | 
| 461 | 14 |  |  |  |  | 406 | $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); | 
| 462 | 14 |  |  |  |  | 1092 | $self->{async}->child_finish($child, 'error'); | 
| 463 |  |  |  |  |  |  | } else { | 
| 464 | 151 |  |  |  |  | 448 | my $params = eval { from_json($b, {%{$self->{json_utf8}}}) }; | 
|  | 151 |  |  |  |  | 282 |  | 
|  | 151 |  |  |  |  | 1523 |  | 
| 465 | 151 | 50 |  |  |  | 8468 | if ($@) { | 
| 466 | 0 |  |  |  |  | 0 | $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}}); | 
| 467 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($child, 'error'); | 
| 468 |  |  |  |  |  |  | } else { | 
| 469 | 151 |  |  |  |  | 1974 | $res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}}); | 
| 470 | 151 | 50 |  |  |  | 717 | unless ($res) { | 
| 471 | 0 |  |  |  |  | 0 | my $err = "result msg limit exceeded: " . length($b); | 
| 472 | 0 |  |  |  |  | 0 | $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}}); | 
| 473 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($child, 'error'); | 
| 474 | 0 |  |  |  |  | 0 | return $res; | 
| 475 |  |  |  |  |  |  | } | 
| 476 |  |  |  |  |  |  |  | 
| 477 | 151 | 100 |  |  |  | 527 | if (my $sessioncache = $self->{sessioncache}) { | 
| 478 | 14 | 50 |  |  |  | 117 | if (my $set_session = $self->is_session_resp($params)) { | 
| 479 | 14 | 100 | 66 |  |  | 163 | if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) { | 
| 480 | 8 |  |  |  |  | 112 | $child->{session} = $sessioncache->session_new($set_session); | 
| 481 | 8 |  |  |  |  | 68 | $sessioncache->expire_insert($child->{session}); | 
| 482 |  |  |  |  |  |  | } | 
| 483 |  |  |  |  |  |  | } | 
| 484 |  |  |  |  |  |  |  | 
| 485 | 14 | 100 |  |  |  | 145 | if ($sessioncache->session_put($child)) { | 
|  |  | 50 |  |  |  |  |  | 
| 486 | 9 |  |  |  |  | 15 | my $cnt = scalar keys %{$sessioncache->{active}}; | 
|  | 9 |  |  |  |  | 132 |  | 
| 487 | 9 | 50 |  |  |  | 36 | if ($cnt > $sessioncache->{max_session}) { | 
| 488 | 0 | 0 |  |  |  | 0 | if ($child = $sessioncache->lru_dequeue()) { | 
| 489 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($child, 'lru'); | 
| 490 |  |  |  |  |  |  | } | 
| 491 |  |  |  |  |  |  | } | 
| 492 | 9 |  |  |  |  | 18 | $child = undef; | 
| 493 |  |  |  |  |  |  | } elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) { | 
| 494 |  |  |  |  |  |  | # update idle user session with older session_id | 
| 495 |  |  |  |  |  |  | # | 
| 496 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($idle_child, 'update'); | 
| 497 |  |  |  |  |  |  |  | 
| 498 | 0 | 0 |  |  |  | 0 | if ($sessioncache->session_put($child)) { | 
| 499 | 0 |  |  |  |  | 0 | $child = undef; | 
| 500 |  |  |  |  |  |  | } | 
| 501 |  |  |  |  |  |  | } | 
| 502 |  |  |  |  |  |  | } | 
| 503 | 151 | 100 |  |  |  | 801 | if ($child) { | 
| 504 | 142 |  |  |  |  | 682 | $self->{async}->child_finish($child, 'done'); | 
| 505 |  |  |  |  |  |  | } | 
| 506 |  |  |  |  |  |  | } | 
| 507 |  |  |  |  |  |  | } | 
| 508 | 165 |  |  |  |  | 884 | return $res; | 
| 509 |  |  |  |  |  |  | } | 
| 510 |  |  |  |  |  |  |  | 
| 511 |  |  |  |  |  |  | sub _worker_sessions_expire { | 
| 512 | 599 |  |  | 599 |  | 1113 | my ($self) = @_; | 
| 513 | 599 | 100 |  |  |  | 1817 | return unless $self->{sessioncache}; | 
| 514 |  |  |  |  |  |  |  | 
| 515 |  |  |  |  |  |  | # If a job for the expired session is active, the session | 
| 516 |  |  |  |  |  |  | # will be dropped when session_put() is called after the | 
| 517 |  |  |  |  |  |  | # job completed. | 
| 518 |  |  |  |  |  |  | # | 
| 519 | 57 |  |  |  |  | 266 | while (my $child = $self->{sessioncache}->expired_dequeue()) { | 
| 520 | 0 |  |  |  |  | 0 | $self->{async}->child_finish($child, 'expired'); | 
| 521 |  |  |  |  |  |  | } | 
| 522 | 57 |  |  |  |  | 97 | return; | 
| 523 |  |  |  |  |  |  | } | 
| 524 |  |  |  |  |  |  |  | 
| 525 |  |  |  |  |  |  | sub rpc_timeout { | 
| 526 | 1088 |  |  | 1088 | 0 | 2441 | my ($self, $call_timeout) = @_; | 
| 527 |  |  |  |  |  |  |  | 
| 528 | 1088 | 100 | 66 |  |  | 5267 | if ($call_timeout && (keys %{$self->{reqs}} > 0)) { | 
|  | 19 |  |  |  |  | 418 |  | 
| 529 | 19 |  |  |  |  | 95 | return $call_timeout; # for individual client call | 
| 530 |  |  |  |  |  |  | } | 
| 531 | 1069 |  |  |  |  | 2094 | return $self->{timeout}; | 
| 532 |  |  |  |  |  |  | } | 
| 533 |  |  |  |  |  |  |  | 
| 534 |  |  |  |  |  |  | sub rpc_stopped { | 
| 535 | 1106 |  |  | 1106 | 0 | 2629 | my ($self) = @_; | 
| 536 |  |  |  |  |  |  |  | 
| 537 | 1106 | 50 |  |  |  | 4052 | if ($self->{stop}) { | 
| 538 | 0 | 0 | 0 |  |  | 0 | if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) { | 
|  | 0 | 0 | 0 |  |  | 0 |  | 
|  |  |  | 0 |  |  |  |  | 
| 539 | 0 |  |  |  |  | 0 | return; # wait for withdraw to complete | 
| 540 | 0 |  |  |  |  | 0 | } elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) { | 
| 541 | 0 |  |  |  |  | 0 | return; # wait for active jobs to complete | 
| 542 |  |  |  |  |  |  | } | 
| 543 | 0 |  |  |  |  | 0 | return 1; | 
| 544 |  |  |  |  |  |  | } | 
| 545 |  |  |  |  |  |  | } | 
| 546 |  |  |  |  |  |  |  | 
| 547 |  |  |  |  |  |  | sub rpc_handler { | 
| 548 | 305 |  |  | 305 | 0 | 1450 | my ($self, $call_timeout, $handler, @handler_params) = @_; | 
| 549 |  |  |  |  |  |  |  | 
| 550 |  |  |  |  |  |  | # returns response or throws rpc_error. | 
| 551 |  |  |  |  |  |  | # returns undef when remote side cleanly closed connection with EOF. | 
| 552 |  |  |  |  |  |  | # | 
| 553 | 305 |  |  |  |  | 1101 | while (!$self->rpc_stopped()) { | 
| 554 | 1106 |  |  |  |  | 2118 | my @pipes = (); | 
| 555 | 1106 | 100 |  |  |  | 2951 | if ($self->{async}) { | 
| 556 | 599 |  |  |  |  | 2181 | $self->_worker_sessions_expire(); | 
| 557 | 599 | 50 |  |  |  | 2381 | $self->_worker_childs_dequeue_and_run() unless $self->{stop}; | 
| 558 | 581 |  |  |  |  | 4131 | $self->{async}->childs_reap(nonblock => 1); | 
| 559 | 581 |  |  |  |  | 3189 | $self->rpc_worker_flowcontrol(@handler_params); | 
| 560 | 581 |  |  |  |  | 949 | @pipes = map { $_->{reader} } values %{$self->{async}{jobs}}; | 
|  | 439 |  |  |  |  | 2680 |  | 
|  | 581 |  |  |  |  | 3579 |  | 
| 561 |  |  |  |  |  |  | } | 
| 562 | 1088 |  |  |  |  | 4924 | my $timeout = $self->rpc_timeout($call_timeout); | 
| 563 |  |  |  |  |  |  |  | 
| 564 | 1088 | 100 | 100 |  |  | 5639 | if ($timeout || @pipes) { | 
| 565 | 437 |  |  |  |  | 9621 | my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout); | 
| 566 | 437 | 50 | 66 |  |  | 13554854 | next if (@ready == 0) && $!{EINTR}; # $! is not reset on success | 
| 567 | 437 | 100 |  |  |  | 2331 | die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0); | 
| 568 |  |  |  |  |  |  |  | 
| 569 | 418 |  |  |  |  | 3123 | foreach my $fh (@ready) { | 
| 570 | 460 | 50 | 66 |  |  | 3131 | if (($fh != $self->{sock}) && $self->{async}) { | 
| 571 | 165 | 50 |  |  |  | 1468 | unless (exists $self->{async}{jobs}{$fh->fileno}) { | 
| 572 | 0 |  |  |  |  | 0 | die rpc_error('io', "child pipe not found: ". $fh->fileno); | 
| 573 |  |  |  |  |  |  | } | 
| 574 | 165 |  |  |  |  | 2303 | my $child = $self->{async}{jobs}{$fh->fileno}; | 
| 575 | 165 |  |  |  |  | 1580 | $self->{async}->job_rem($child); | 
| 576 | 165 |  |  |  |  | 773 | my $res = $self->_worker_child_read_and_finish($child); | 
| 577 |  |  |  |  |  |  | } | 
| 578 |  |  |  |  |  |  | } | 
| 579 | 418 | 100 |  |  |  | 1023 | next unless grep { $_ == $self->{sock} } @ready; | 
|  | 460 |  |  |  |  | 3591 |  | 
| 580 |  |  |  |  |  |  | } | 
| 581 |  |  |  |  |  |  |  | 
| 582 |  |  |  |  |  |  | # always block on full messages from rpcswitch | 
| 583 | 946 |  |  |  |  | 1795 | my $b = eval { netstring_read($self->{sock}) }; | 
|  | 946 |  |  |  |  | 3402 |  | 
| 584 | 946 | 100 |  |  |  | 3733 | unless ($b) { | 
| 585 | 97 | 50 | 33 |  |  | 330 | next if ($@ && ($@ =~ /^EINTR/)); # check if stopped | 
| 586 | 97 | 50 |  |  |  | 503 | die rpc_error('io', $@) if $@; | 
| 587 | 97 |  |  |  |  | 535 | return; # EOF | 
| 588 |  |  |  |  |  |  | } | 
| 589 | 849 |  |  |  |  | 2375 | my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) }; | 
|  | 849 |  |  |  |  | 1596 |  | 
|  | 849 |  |  |  |  | 5613 |  | 
| 590 | 849 | 100 |  |  |  | 30309 | die rpc_error('jsonrpc', $@) if $@; | 
| 591 | 830 | 100 |  |  |  | 2808 | $self->{trace_cb}->('RCV', $msg) if $self->{trace_cb}; | 
| 592 | 830 |  |  |  |  | 8355 | my $res = eval { $handler->($self, $msg, @handler_params) }; | 
|  | 830 |  |  |  |  | 2988 |  | 
| 593 | 830 | 100 |  |  |  | 2818 | if (my $err = $@) { | 
| 594 | 38 | 50 |  |  |  | 608 | die $err if ref($err); # forward error | 
| 595 | 0 |  |  |  |  | 0 | die rpc_error('io', $err); | 
| 596 |  |  |  |  |  |  | } | 
| 597 | 792 | 100 |  |  |  | 4266 | if ($res) { | 
| 598 | 114 |  |  |  |  | 722 | return $res; | 
| 599 |  |  |  |  |  |  | } | 
| 600 |  |  |  |  |  |  | } | 
| 601 | 0 |  |  |  |  | 0 | return; # STOP is checked by caller | 
| 602 |  |  |  |  |  |  | } | 
| 603 |  |  |  |  |  |  |  | 
| 604 |  |  |  |  |  |  | sub work { | 
| 605 | 134 |  |  | 134 | 1 | 3413 | my ($self, $workername, $methods, $opts) = @_; | 
| 606 |  |  |  |  |  |  |  | 
| 607 |  |  |  |  |  |  | # a write on a shutdown socket should never happen | 
| 608 |  |  |  |  |  |  | # | 
| 609 | 134 |  |  | 0 |  | 3863 | local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 610 |  |  |  |  |  |  |  | 
| 611 | 134 |  |  |  |  | 863 | foreach my $method (keys %$methods) { | 
| 612 | 130 |  |  |  |  | 582 | $self->{methods}{$method}{cb} = $methods->{$method}{cb}; | 
| 613 | 130 | 100 |  |  |  | 483 | $self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {}; | 
| 614 | 130 | 50 |  |  |  | 492 | $self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter}; | 
| 615 |  |  |  |  |  |  | } | 
| 616 | 134 | 100 |  |  |  | 461 | $opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb}; | 
| 617 | 134 | 100 |  |  |  | 1927 | $self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async}; | 
| 618 | 134 | 50 |  |  |  | 1389 | $self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol}; | 
| 619 | 134 | 100 |  |  |  | 709 | $self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session}; | 
| 620 | 134 |  |  |  |  | 794 | $self->rpc_handler(0, \&worker, $workername); | 
| 621 |  |  |  |  |  |  |  | 
| 622 | 97 | 50 |  |  |  | 388 | if ($self->{stop}) { | 
| 623 | 0 |  |  |  |  | 0 | $self->rpc_worker_withdraw(); | 
| 624 | 0 |  |  |  |  | 0 | $self->{stop} = 'withdraw'; | 
| 625 |  |  |  |  |  |  |  | 
| 626 |  |  |  |  |  |  | # wait some time for withdraw & active jobs to complete | 
| 627 |  |  |  |  |  |  | # | 
| 628 | 0 |  |  | 0 |  | 0 | local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; }; | 
|  | 0 |  |  |  |  | 0 |  | 
|  | 0 |  |  |  |  | 0 |  | 
| 629 | 0 |  |  |  |  | 0 | alarm($self->{gracetime}); | 
| 630 | 0 |  |  |  |  | 0 | $self->rpc_handler(0, \&worker, $workername); | 
| 631 | 0 |  |  |  |  | 0 | alarm(0); | 
| 632 | 0 |  |  |  |  | 0 | die rpc_error('io', 'STOP'); | 
| 633 |  |  |  |  |  |  | } | 
| 634 | 97 | 100 |  |  |  | 501 | if (my $async = $self->{async}) { | 
| 635 |  |  |  |  |  |  | # drop stored sessions | 
| 636 |  |  |  |  |  |  | # | 
| 637 | 48 | 100 |  |  |  | 441 | if (my $sessioncache = $self->{sessioncache}) { | 
| 638 | 4 |  |  |  |  | 35 | foreach my $session_id (keys %{$sessioncache->{active}}) { | 
|  | 4 |  |  |  |  | 47 |  | 
| 639 | 3 | 50 |  |  |  | 18 | if (my $child = $sessioncache->session_get($session_id)) { | 
| 640 | 3 |  |  |  |  | 15 | $async->child_finish($child, 'idle'); | 
| 641 |  |  |  |  |  |  | } | 
| 642 |  |  |  |  |  |  | } | 
| 643 |  |  |  |  |  |  | } | 
| 644 |  |  |  |  |  |  | # reap remaining childs | 
| 645 |  |  |  |  |  |  | # | 
| 646 | 48 | 50 |  |  |  | 274 | if (keys %{$async->{finished}}) { | 
|  | 48 |  |  |  |  | 403 |  | 
| 647 | 48 |  |  | 0 |  | 2984 | local $SIG{ALRM} = sub { warn "worker child wait timeout\n" }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 648 | 48 |  |  |  |  | 555 | alarm(1); # wait at most for one second | 
| 649 | 48 | 50 |  |  |  | 388 | unless ($async->childs_reap()) { # blocking | 
| 650 | 0 |  |  |  |  | 0 | $async->childs_reap(nonblock => 1); # continue nonblocking after timeout | 
| 651 |  |  |  |  |  |  | } | 
| 652 | 48 |  |  |  |  | 1542 | alarm(0); | 
| 653 |  |  |  |  |  |  | } | 
| 654 |  |  |  |  |  |  |  | 
| 655 |  |  |  |  |  |  | # EOF is only an error here when there are outstanding requests | 
| 656 |  |  |  |  |  |  | # | 
| 657 | 48 |  |  | 0 |  | 1023 | my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 }); | 
|  | 0 |  |  |  |  | 0 |  | 
| 658 | 48 |  |  |  |  | 233 | my @childs = keys %{$async->{finished}}; | 
|  | 48 |  |  |  |  | 200 |  | 
| 659 |  |  |  |  |  |  |  | 
| 660 | 48 |  |  |  |  | 310 | $async->childs_kill(); # don't wait here | 
| 661 |  |  |  |  |  |  |  | 
| 662 | 48 |  |  |  |  | 195 | $async->{jobqueue} = []; | 
| 663 | 48 |  |  |  |  | 270 | $async->{finished} = {}; | 
| 664 |  |  |  |  |  |  |  | 
| 665 | 48 | 50 |  |  |  | 210 | die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs); | 
| 666 | 48 | 50 |  |  |  | 212 | die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs); | 
| 667 |  |  |  |  |  |  | } | 
| 668 | 97 |  |  |  |  | 2577 | return; | 
| 669 |  |  |  |  |  |  | } | 
| 670 |  |  |  |  |  |  |  | 
| 671 |  |  |  |  |  |  | sub call { | 
| 672 | 171 |  |  | 171 | 1 | 196802 | my ($self, $method, $params, $opts) = @_; | 
| 673 | 171 |  |  |  |  | 589 | my $reqauth = $opts->{reqauth}; | 
| 674 | 171 |  |  |  |  | 361 | my $call_timeout = $opts->{timeout}; | 
| 675 |  |  |  |  |  |  |  | 
| 676 | 171 | 100 |  |  |  | 855 | if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests | 
| 677 | 133 |  |  |  |  | 570 | $self->rpc_send_call($method, $params, $reqauth); | 
| 678 |  |  |  |  |  |  | } | 
| 679 |  |  |  |  |  |  | # EOF is an error here (response missing) | 
| 680 |  |  |  |  |  |  | # | 
| 681 | 171 | 50 |  |  |  | 1197 | my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof'); | 
| 682 | 114 | 100 |  |  |  | 798 | return wantarray() ? @$res : $res->[0]; | 
| 683 |  |  |  |  |  |  | } | 
| 684 |  |  |  |  |  |  |  | 
| 685 |  |  |  |  |  |  | # stop() exits an active $client->work() worker handler. | 
| 686 |  |  |  |  |  |  | # | 
| 687 |  |  |  |  |  |  | # - work() dies with RPC::Switch::Client::Tiny::Error which | 
| 688 |  |  |  |  |  |  | #   might be {type => 'io', message => 'STOP'}, or any other | 
| 689 |  |  |  |  |  |  | #   error if a non-restartable system call was interrupted. | 
| 690 |  |  |  |  |  |  | # | 
| 691 |  |  |  |  |  |  | #   - stop() makes no sense for call() (it has rpc_timeout) | 
| 692 |  |  |  |  |  |  | #   - the only way to call stop is from a signal handler. | 
| 693 |  |  |  |  |  |  | #   - if a signal handler is called, non-restartavle perl | 
| 694 |  |  |  |  |  |  | #     system call are interrupted and return $! == EINTR. | 
| 695 |  |  |  |  |  |  | # | 
| 696 |  |  |  |  |  |  | #   -> this can break an active worker handler and result in | 
| 697 |  |  |  |  |  |  | #      a RES_ERROR-message to the caller if a non-restartable | 
| 698 |  |  |  |  |  |  | #      perl syscall is interrupted. | 
| 699 |  |  |  |  |  |  | #   -> for the async worker mode this should mostly work. | 
| 700 |  |  |  |  |  |  | #      (sysreadfull, rpc print & IO::Select are restartable). | 
| 701 |  |  |  |  |  |  | #   -> stop will just wait for a gracetime of 2 seconds | 
| 702 |  |  |  |  |  |  | #      for active jobs to complete. The remaining jobs | 
| 703 |  |  |  |  |  |  | #      are terminated. | 
| 704 |  |  |  |  |  |  | # | 
| 705 |  |  |  |  |  |  | sub stop { | 
| 706 | 0 |  |  | 0 | 0 | 0 | my ($self, $opts) = @_; | 
| 707 | 0 | 0 |  |  |  | 0 | $self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2; | 
| 708 | 0 |  |  |  |  | 0 | $self->{stop} = 'pending'; | 
| 709 | 0 |  |  |  |  | 0 | return; | 
| 710 |  |  |  |  |  |  | } | 
| 711 |  |  |  |  |  |  |  | 
| 712 |  |  |  |  |  |  | # The perl object destroy order is undefined, so $self->{sock} | 
| 713 |  |  |  |  |  |  | # might already be destroyed and it makes no sense to try to | 
| 714 |  |  |  |  |  |  | # send RES_ERROR messages for remaining childs. | 
| 715 |  |  |  |  |  |  | # see: https://perldoc.perl.org/perlobj#Global-Destruction | 
| 716 |  |  |  |  |  |  | # | 
| 717 |  |  |  |  |  |  | # So just terminate remaining childs, and let init-process | 
| 718 |  |  |  |  |  |  | # collect them instead of calling waitpid() here. | 
| 719 |  |  |  |  |  |  | # | 
| 720 |  |  |  |  |  |  | # TODO: perl will call DESTROY only when the process exits | 
| 721 |  |  |  |  |  |  | #       cleanly or calls exit(). If the process is killed, perl | 
| 722 |  |  |  |  |  |  | #       calls DESTROY only if a handler for the matching signal | 
| 723 |  |  |  |  |  |  | #       is installed, like: $SIG{'TERM'} = sub { exit; }; | 
| 724 |  |  |  |  |  |  | # | 
| 725 |  |  |  |  |  |  | #       -> so does it make sense to support DESTROY at all, | 
| 726 |  |  |  |  |  |  | #          if there are situations when it is not called? | 
| 727 |  |  |  |  |  |  | # | 
| 728 |  |  |  |  |  |  | sub DESTROY { | 
| 729 | 196 |  |  | 196 |  | 3300 | my ($self) = @_; | 
| 730 | 196 | 100 |  |  |  | 1999 | $self->{async}->childs_kill() if $self->{async}; # don't wait here | 
| 731 |  |  |  |  |  |  | } | 
| 732 |  |  |  |  |  |  |  | 
| 733 |  |  |  |  |  |  | 1; | 
| 734 |  |  |  |  |  |  |  | 
| 735 |  |  |  |  |  |  | __END__ |