File Coverage

blib/lib/Patro/Server.pm
Criterion Covered Total %
statement 478 674 70.9
branch 269 394 68.2
condition 49 106 46.2
subroutine 44 50 88.0
pod 0 25 0.0
total 840 1249 67.2


line stmt bran cond sub pod time code
1             package Patro::Server;
2 60     60   1186 use strict;
  60         108  
  60         1539  
3 60     60   270 use warnings;
  60         106  
  60         1476  
4 60     60   290 use Carp;
  60         102  
  60         2863  
5 60     60   4097 eval "use Sys::HostAddr";
  0         0  
  0         0  
6 60     60   311 use Socket ();
  60         131  
  60         941  
7 60     60   240 use Scalar::Util 'reftype';
  60         94  
  60         2312  
8 60     60   16707 use POSIX ':sys_wait_h';
  60         266746  
  60         282  
9             require overload;
10              
11             our $threads_avail;
12       2463     *sxdiag = sub {};
13             if ($ENV{PATRO_SERVER_DEBUG}) {
14             *sxdiag = *::xdiag;
15             our $DEBUG = 1;
16             }
17             our $VERSION = '0.13';
18 60     60   91119 our @SERVERS :shared;
  60         52118  
  60         306  
19             our %OPTS = ( # XXX - needs documentation
20             keep_alive => 30,
21             idle_timeout => 30,
22             fincheck_freq => 5,
23             );
24              
25             sub new {
26 77     59 0 20882 my $pkg = shift;
27 59         118 my $opts = shift;
28              
29 59   33     304 my $host = $ENV{HOSTNAME} // qx(hostname) // "localhost";
      50        
30 59 50       218 if ($INC{'Sys/HostAddr.pm'}) {
31 0         0 my $host2 = Sys::HostAddr->new->main_ip;
32 0 50       0 $host = $host2 if $host2;
33             }
34 59         189 chomp($host);
35              
36 59 50       16897 socket(my $socket, Socket::PF_INET(), Socket::SOCK_STREAM(),
37             getprotobyname("tcp")) || croak __PACKAGE__, " socket: $!";
38 59 50       691 setsockopt($socket, Socket::SOL_SOCKET(), Socket::SO_REUSEADDR(),
39             pack("l",1)) || croak __PACKAGE__, " setsockopt: $!";
40 59         6801 my $sockaddr = Socket::pack_sockaddr_in(0, Socket::inet_aton($host));
41 59 50       927 bind($socket, $sockaddr) || croak __PACKAGE__, " bind: $!";
42 59 50       473 listen($socket, Socket::SOMAXCONN()) || croak __PACKAGE__, " listen: $!";
43 59         312 $sockaddr = getsockname($socket);
44 59         305 my ($port,$addr) = Socket::unpack_sockaddr_in($sockaddr);
45              
46             my $meta = {
47             sockaddr => $sockaddr,
48             socket => $socket,
49             host => $host,
50             host2 => Socket::inet_aton($addr),
51             port => $port,
52              
53             creator_pid => $$,
54             creator_tid => $threads_avail && threads->tid,
55             style => $threads_avail ? 'threaded' : 'forked',
56              
57             keep_alive => $OPTS{keep_alive},
58             idle_timeout => $OPTS{idle_timeout},
59 59 50 33     1097923 version => $Patro::Server::VERSION,
60             };
61              
62 59         377 $Patro::SERVER_VERSION = $Patro::Server::VERSION;
63              
64 59         180 my $obj = {};
65 59         145 my @store;
66              
67 59 50       243 if ($threads_avail) {
68 0         0 for (@_) {
69 0         0 local $threads::shared::clone_warn = undef;
70 0         0 eval { $_ = threads::shared::shared_clone($_) };
  0         0  
71 0 0       0 if ($@ =~ /CODE|GLOB/) {
72 0         0 require Patro::LeumJelly;
73 0         0 warn $@;
74 0         0 $threads::shared::clone_warn = 0;
75 0         0 $_ = threads::shared::shared_clone($_);
76             }
77             }
78             }
79 59         318 foreach my $o (@_) {
80 95         217 my ($num,$str);
81             {
82 60     60   23176 no overloading;
  60         106  
  60         2200  
  95         199  
83 60     60   358 no warnings 'portable';
  60         81  
  60         70750  
84 95         259 $str = "$o";
85 95         693 ($num) = $str =~ /x(\w+)/;
86 95         332 $num = hex($num);
87             }
88 95         315 $obj->{$num} = $o;
89 95         418 my $reftype = Scalar::Util::reftype($o);
90 95         232 my $ref = CORE::ref($o);
91 95 50       491 if ($ref eq 'threadsx::shared::code') {
    50          
92 0         0 $ref = $reftype = 'CODE*';
93             } elsif ($ref eq 'threadsx::shared::glob') {
94 0         0 $ref = $reftype = 'GLOB';
95             }
96 95         425 my $store = {
97             ref => $ref,
98             reftype => $reftype,
99             id => $num
100             };
101 95 100       442 if (overload::Overloaded($o)) {
102 12 50 33     753 if ($ref ne 'CODE' && $ref ne 'CODE*' && $ref ne 'GLOB') {
      33        
103 12         42 $store->{overload} = _overloads($o);
104             }
105             }
106 95         3981 push @store, $store;
107             }
108 59         489 my $self = bless {
109             meta => $meta,
110             store => \@store,
111             obj => $obj
112             }, __PACKAGE__;
113 59         349 $self->{config} = $self->config;
114 59         382 $self->start_server;
115 18         186 eval { push @SERVERS, $self };
  18         191  
116 18 50       227 warn $@ if $@;
117 18 50       195 if (@SERVERS == 1) {
118 18         3953 eval q~END {
119             if ($Patro::Server::threads_avail) {
120             $_->detach for threads->list(threads::running);
121             }
122             }~;
123             }
124 18         772 return $self;
125             }
126              
127              
128             sub start_server {
129 59     59 0 125 my $self = shift;
130 59         193 my $meta = $self->{meta};
131 59 50       269 if ($meta->{style} eq 'threaded') {
132 0         0 my $server_thread;
133             $server_thread = threads->create(
134             sub {
135 0     0   0 $SIG{KILL} = sub { exit };
  0         0  
136 0         0 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  0         0  
137 0         0 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
138 0 0       0 if ($self->{meta}{pid_file}) {
139 0         0 open my $fh, '>>', $self->{meta}{pid_file};
140 0         0 flock $fh, 2;
141 0         0 seek $fh, 0, 2;
142 0         0 print {$fh} "$$-", threads->tid, "\n";
  0         0  
143 0         0 close $fh;
144             }
145 0         0 $self->accept_clients;
146 0         0 return;
147 0         0 } );
148 0         0 $self->{meta}{server_thread} = $server_thread;
149 0         0 $self->{meta}{server_pid} = $$;
150 0         0 $self->{meta}{server_tid} = $server_thread->tid;
151             #$server_thread->detach;
152             } else {
153 59         57789 my $pid = CORE::fork();
154 59 50       3112 if (!defined($pid)) {
155 0         0 croak __PACKAGE__, " fork: $!";
156             }
157 59 100       1008 if ($pid == 0) {
158 41 50       816 if ($self->{meta}{pid_file}) {
159 0         0 open my $fh, '>>', $self->{meta}{pid_file};
160 0         0 flock $fh, 2;
161 0         0 seek $fh, 0, 2;
162 0         0 print {$fh} "$$\n";
  0         0  
163 0         0 close $fh;
164             }
165 41         1774 $self->accept_clients;
166 18         3845 exit;
167             }
168 18         665 $self->{meta}{server_pid} = $pid;
169             }
170             }
171              
172             # return list of operators that are overloaded on the given object
173             my @oplist;
174             sub _overloads {
175 16     16   34 my $obj = shift;
176 16 50       34 return unless overload::Overloaded($obj);
177 16 100       581 if (!@oplist) {
178 6         219 @oplist = split ' ',join(' ',values %overload::ops);
179             }
180              
181 16         51 my %ops = map { $_ => 1 } grep { overload::Method($obj,$_) } @oplist;
  607         1514  
  1200         36442  
182              
183             # we also need to account for the operations that are *implicitly*
184             # overloaded.
185              
186             # Many ops can be generated out of 0+, "", or bool
187 16 50 66     158 if ($ops{"0+"} || $ops{'""'} || $ops{bool}) {
      33        
188 16         184 $ops{$_}++ for qw(0+ "" bool int ! qr . x .= x= <> -X);
189             }
190              
191             # assignment ops can be generated from binary ops
192 16         41 foreach my $binop (qw(. x + - * / ** % & | ^ << >> &. |. ^.)) {
193 256 100       479 $ops{$binop . "="}++ if $ops{$binop};
194             }
195              
196             # all comparison ops can be generated from <=> and cmp
197 16 100       69 @ops{qw{< <= > >= == !=}} = (1) x 6 if $ops{"<=>"};
198 16 100       100 @ops{qw(le lt ge gt eq ne)} = (1) x 6 if $ops{cmp};
199              
200 16 100       46 $ops{neg}++ if $ops{"-"};
201 16 100       41 $ops{"--"}++ if $ops{"-="};
202 16 50 66     73 $ops{abs}++ if $ops{"<"} && $ops{neg};
203 16 100       46 $ops{"++"}++ if $ops{"+="};
204              
205             # all ops are overloaded if there is a 'nomethod' specified
206 16 50       42 @ops{@oplist} = (1) x @oplist if $ops{nomethod};
207 16         303 return [keys %ops];
208             }
209              
210             sub config {
211 59     59 0 125 my $self = shift;
212             my $config_data = {
213             host => $self->{meta}{host},
214             port => $self->{meta}{port},
215             store => $self->{store},
216             style => $self->{meta}{style},
217 59         698 version => $Patro::Server::VERSION
218             };
219 59         320 return bless $config_data, 'Patro::Config';
220             }
221              
222             ########################################
223              
224             sub Patro::Config::to_string {
225 9     9   5857 my ($self) = @_;
226 9         339 return Patro::LeumJelly::serialize({%$self});
227             }
228              
229             sub Patro::Config::to_file {
230 2     2   825 my ($self,$file) = @_;
231 2 50       8 if (!$file) {
232             # TODO: select a temp filename
233             }
234 2         5 my $fh;
235 2 50       171 if (!open($fh, '>', $file)) {
236 0         0 croak "Patro::Config::to_file: could not write cfga file '$file': $!";
237             }
238 2         6 print {$fh} $self->to_string;
  2         11  
239 2         212 close $fh;
240 2         12 return $file;
241             }
242              
243             sub Patro::Config::from_string {
244 11     11   33 my ($self, $string) = @_;
245 11         43 my $cfg = Patro::LeumJelly::deserialize($string);
246 11         457 return bless $cfg, 'Patro::Config';
247             }
248              
249             sub Patro::Config::from_file {
250 3     3   8 my ($self, $file) = @_;
251 3 50 33     12 if (!defined($file) && !CORE::ref($self) && $self ne 'Patro::Config') {
      33        
252 0         0 $file = $self;
253             }
254 3         5 my $fh;
255 3 50       62 if (CORE::ref($file) eq 'GLOB') {
    50          
256 0         0 $fh = $file;
257             } elsif (!open $fh, '<' ,$file) {
258 0         0 croak "Patro::Config::fron_file: could not read cfg file '$file': $!";
259             }
260 3         42 my $data = <$fh>;
261 3         17 close $fh;
262 3         16 return Patro::Config->from_string($data);
263             }
264              
265             ########################################
266              
267             sub accept_clients {
268             # accept connection from client
269             # spin off connection to separate thread or process
270             # perform request_response_loop on the client connection
271 41     41 0 344 my $self = shift;
272 41         371 my $meta = $self->{meta};
273              
274 41         575 $meta->{last_connection} = time;
275 41         400 $meta->{finished} = 0;
276              
277 41         455 while (!$meta->{finished}) {
278 84     24   4970 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  24         406  
279 84     0   1047 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
280 84   50     2118 alarm ($OPTS{fincheck_freq} || 5);
281 84         356 my $client;
282 84         330 my $server = $meta->{socket};
283 84         29372141 my $paddr = accept($client,$server);
284 84 100       1513 if (!$paddr) {
285 23 50   60   602 next if $!{EINTR};
  60         15399  
  60         58344  
  60         556  
286 0 0 0     0 next if $!{ECHILD} || $!==10; # !?! why $!{ECHILD} not suff on Lin?
287 0         0 ::xdiag("accept failure, %errno is",\%!);
288 0         0 croak __PACKAGE__, ": accept ", 0+$!," $!";
289             }
290 61         317 $meta->{last_connection} = time;
291              
292 61         1928 $self->start_subserver($client);
293 38         779 $self->watch_for_finishers('MAIN');
294             }
295             }
296              
297             sub start_subserver {
298 61     61 0 338 my ($self,$client) = @_;
299 61 50       633 if ($self->{meta}{style} eq 'forked') {
300 61         54378 my $pid = CORE::fork();
301 61 50       2477 if (!defined($pid)) {
302 0         0 croak __PACKAGE__,": fork after accept $!";
303             }
304 61 100       1123 if ($pid != 0) {
305 38 50       448 if ($self->{meta}{pid_file}) {
306 0         0 open my $fh, '>>', $self->{meta}{pid_file};
307 0         0 flock $fh, 2;
308 0         0 seek $fh, 0, 2;
309 0         0 print {$fh} "$pid\n";
  0         0  
310 0         0 close $fh;
311             }
312 38         1087 $self->{meta}{pids}{$pid}++;
313 38         1004 return;
314             }
315 23         1066 $self->request_response_loop($client);
316 23         6282 exit;
317             } else {
318             my $subthread = threads->create(
319             sub {
320 0     0   0 $self->request_response_loop($client);
321 0         0 threads->self->detach;
322 0         0 return;
323 0         0 } );
324 0 0       0 if ($self->{meta}{pid_file}) {
325 0         0 open my $fh, '>>', $self->{meta}{pid_file};
326 0         0 flock $fh, 2;
327 0         0 seek $fh, 0, 2;
328 0         0 print {$fh} "$$-", $subthread->tid, "\n";
  0         0  
329 0         0 close $fh;
330             }
331 0         0 $self->{meta}{pids}{"$$-" . $subthread->tid}++;
332 0         0 push @{$self->{meta}{subthreads}}, $subthread;
  0         0  
333              
334             # $subthread->detach ?
335              
336 0         0 return;
337             }
338             }
339              
340             sub watch_for_finishers {
341 62     62 0 694 my ($self,$sig) = @_;
342 62         483 alarm 0;
343              
344             # XXX - how do you know when a thread is finished?
345             # what if it is a detached thread?
346              
347 62   66     2218 while ((my $pid = waitpid(-1,WNOHANG())) > 0 && WIFEXITED($?)) {
348 24         341 delete $self->{meta}{pids}{$pid};
349             }
350 62 50       446 if ($self->{meta}{subthreads}) {
351 0         0 my $n = @{$self->{meta}{subthreads}};
  0         0  
352 0         0 my $n1 = threads->list(threads::all());
353 0         0 my $n2 = threads->list(threads::running());
354 0         0 my @joinable = threads->list(threads::joinable());
355 0 0       0 if (@joinable) {
356 0         0 foreach my $subthread (@joinable) {
357             my ($i) = grep {
358 0         0 $self->{meta}{subthreads}{$_} == $subthread
  0         0  
359             } 0 .. $n-1;
360 0 0       0 if (!defined($i)) {
361 0         0 warn "subthread $subthread not found on this server!";
362 0         0 next;
363             }
364 0         0 $self->{meta}{subthreads}[$i]->join;
365 0         0 $self->{meta}{subthreads}[$i] = undef;
366             }
367             $self->{meta}{subthreads} =
368 0         0 [ grep { defined } @{$self->{meta}{subthreads} } ];
  0         0  
  0         0  
369             }
370             }
371 62 100       1196 unless ($self->still_active) {
372 18         82 $self->{meta}{finished}++;
373             }
374 62     0   2156 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
375 62     0   1682 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  0         0  
376 62   50     1917 alarm ($OPTS{fincheck_freq} || 5);
377             }
378              
379             sub still_active {
380 62     62 0 244 my $self = shift;
381 62         235 my $meta = $self->{meta};
382 62 50       366 if (time <= $meta->{keep_alive}) {
383 0         0 return 1;
384             }
385 62 100       481 if (time < $meta->{last_connection} + $meta->{idle_timeout}) {
386 39         343 return 1;
387             }
388 23 100       79 if (keys %{$meta->{pids}}) {
  23         218  
389 5         28 return 1;
390             }
391 18         115 return;
392             }
393              
394             sub request_response_loop {
395 23     23 0 250 my ($self, $client) = @_;
396              
397 23         1540 local $Patro::Server::disconnect = 0;
398 23         920 my $fh0 = select $client;
399 23         620 $| = 1;
400 23         371 select $fh0;
401              
402 23         81756 while (my $req = readline($client)) {
403 473 50       4292 next unless $req =~ /\S/;
404 473         3270 sxdiag("server: got request '$req'");
405 473         1767 my $resp = $self->process_request($req);
406 473         1469 sxdiag("server: response to request is ",$resp);
407 473         1173 $resp = $self->serialize_response($resp);
408 473         1800 sxdiag("server: serialized response to request is ",$resp);
409 473         688 print {$client} $resp,"\n";
  473         20576  
410 473 100       18938743 last if $Patro::Server::disconnect;
411             }
412 23         1487 close $client;
413 23         205 return;
414             }
415              
416             our $SIDES; # for the server to activate or suppress some
417             # side-effects from the lower levels of the
418             # request handler
419              
420             sub process_request {
421 473     473 0 1207 my ($self,$request) = @_;
422 473 50       2414 croak "process_request: expected scalar request" if ref($request);
423              
424 473         1363 $request = Patro::LeumJelly::deserialize($request);
425 473         13271 my $topic = $request->{topic};
426 473 50       1162 if (!defined($topic)) {
427 0         0 return $self->error_response("bad topic in request '$_[1]'");
428             }
429            
430 473         923 my $has_args = $request->{has_args};
431 473         822 my $args = $request->{args};
432 473 100       1066 if ($request->{has_args}) {
433 330         542 local $@;
434             $args = [ map {
435 459 100       1119 if (CORE::ref($_) eq '.Patroon') {
436 5         9 eval { $self->{obj}{$$_} };
  5         19  
437             } else {
438 454         1409 $_
439 330         552 } } @{$request->{args}} ];
  330         927  
440 330 50       852 if ($@) {
441 0         0 return $self->error_response($@);
442             }
443             }
444 473         875 my $id = $request->{id};
445 473         780 my $cmd = $request->{command};
446 473         752 my $ctx = $request->{context};
447 473 100       1261 my @orig_args = $has_args ? @$args : ();
448 473 100       1085 my @orig_refs = $has_args ? \ (@$args) : ();
449 473         1451 my @orig_dump = map Patro::LeumJelly::serialize([$_]), @$args;
450 473         13880 local $! = 0;
451 473         1835 local $? = 0;
452 473         1086 local $SIDES = {};
453 473         749 my @r;
454 473         625 our $DEBUG;
455 473   50     2766 local $DEBUG = $DEBUG || $request->{_debug} || 0;
456              
457 473 100       1763 if ($topic eq 'META') {
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    50          
458 40         235 @r = $self->process_request_META($id,$cmd,$ctx,$has_args,$args);
459             } elsif ($topic eq 'HASH') {
460 61         266 @r = $self->process_request_HASH($id,$cmd,$ctx,$has_args,$args);
461             } elsif ($topic eq 'ARRAY') {
462 240         667 @r = $self->process_request_ARRAY($id,$cmd,$ctx,$has_args,$args);
463             } elsif ($topic eq 'SCALAR') {
464 18         65 @r = $self->process_request_SCALAR($id,$cmd,$ctx,$has_args,$args);
465             } elsif ($topic eq 'METHOD') {
466 31         121 @r = $self->process_request_METHOD($id,$cmd,$ctx,$has_args,$args);
467             } elsif ($topic eq 'CODE') {
468 3         60 @r = $self->process_request_CODE($id,undef,$ctx,$has_args,$args);
469             } elsif ($topic eq 'HANDLE') {
470 60         271 @r = $self->process_request_HANDLE($id,$cmd,$ctx,$has_args,$args);
471             } elsif ($topic eq 'OVERLOAD') {
472 15         47 my $obj = $self->{obj}{$id};
473 15         60 @r = $self->process_request_OVERLOAD($obj,$cmd,$args,$ctx);
474             } elsif ($topic eq 'REF') {
475 5         16 @r = $self->process_request_REF($id,$cmd,$ctx,$has_args,$args);
476             } else {
477 0         0 @r = ();
478 0         0 $@ = __PACKAGE__ . ": unrecognized topic '$topic' in proxy request";
479             }
480 473 100 100     2334 if (@r && CORE::ref($r[0]) eq '.Patroclus') {
481 40         275 return $r[0];
482             }
483 433         1532 my $sides = bless {}, '.Patroclus';
484              
485 433 100       1757 $sides->{errno} = 0 + $! if $!;
486 433 100       1499 $sides->{errno_extended} = $^E if $^E;
487 433 100       1006 $sides->{child_error} = $? if $?;
488 433 100       896 $sides->{error} = $@ if $@;
489 433         1059 $sides->{"x-requestId"} = ++$Patro::Server::requestId;
490              
491             # how to update elements of @_ that have changes?
492             # three implementations below. Pick one.
493             # 1. "side A" - return all elements of @_. You will have to
494             # filter out "Modification of a read-only element attempted ..."
495             # messages
496             # 2. "side B" - do a deep comparison of original and final
497             # elements of @_, return the ones that mismatch I CHOOSE YOU!
498             # 3. original implementation - do shallow comparison of original
499             # and final elements of @_. Insufficient for code that updates
500             # nested data of the inputs
501 433         715 my (@out,@outref);
502              
503             # "sideB" - do a deep compare for all arguments
504 433   100     1917 for (my $j=0; $j<@$args && !$SIDES->{no_out}; $j++) {
505 389         1453 my $dj = Patro::LeumJelly::serialize([$args->[$j]]);
506 389         11699 for (my $i=0; $i<@orig_refs; $i++) {
507 972 100       2628 next if $orig_refs[$i] != \$args->[$j];
508 389 100       1751 if ($orig_dump[$i] ne $dj) {
509 14         53 push @out, $i, $args->[$j];
510             }
511             }
512             }
513 433         875 $sides->{sideB} = 1;
514              
515 433 100       1004 $sides->{out} = \@out if @out;
516 433 50       886 $sides->{outref} = \@outref if @outref;
517 433 100 100     2013 if ($ctx >= 2) {
    100          
518 14         42 return $self->list_response($sides, @r);
519             } elsif ($ctx == 1 && defined $r[0]) {
520 387         1185 my $y = $self->scalar_response($sides, $r[0]);
521             # if ($topic eq 'REF') { ::xdiag("response:",$y) }
522 387         3517 return $y;
523             } else {
524 32         156 return $self->void_response($sides);
525             }
526             }
527              
528             sub process_request_META {
529 40     40 0 147 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
530 40 100       141 if ($cmd eq 'disconnect') {
531 4         15 $Patro::Server::disconnect = 1;
532 4         46 return bless { disconnect_ok => 1 }, '.Patroclus';
533             }
534 36         180 my $obj = $self->{obj}{$id};
535 36 50       230 if ($cmd eq 'ref') {
    50          
    50          
536 0         0 return CORE::ref($obj);
537             } elsif ($cmd eq 'reftype') {
538 0         0 return Scalar::Util::reftype($obj);
539             } elsif ($cmd eq 'destroy') {
540 36         195 delete $self->{obj}{$id};
541 36         80 my @ids = keys %{$self->{obj}};
  36         227  
542 36 100       129 if (@ids == 0) {
543 6         19 $Patro::Server::disconnect = 1;
544 6         52 return bless { disconnect_ok => 1 }, '.Patroclus';
545             } else {
546 30         249 return bless { disconnect_ok => 0,
547             num_reminaing_objs => 0+@ids }, '.Patroclus';
548             }
549             } else {
550 0         0 $@ = "Patro: unsupported meta command '$cmd'";
551 0         0 return;
552             }
553             }
554              
555             sub process_request_HASH {
556 61     61 0 264 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
557 61         168 my $obj = $self->{obj}{$id};
558 61 50       337 if (reftype($obj) ne 'HASH') {
559 0         0 $@ = "Not a HASH reference";
560 0         0 return;
561             # !!! what if '%{}' op is overloaded?
562             }
563 61 100       229 if ($cmd eq 'STORE') {
    100          
    100          
    50          
    0          
    0          
    0          
    0          
564 6         21 my ($key,$val) = @$args;
565 6         20 my $old_val = $obj->{$key};
566 6         72 $obj->{$key} = threads::shared::shared_clone($val);
567 6         24 return $old_val;
568             } elsif ($cmd eq 'FETCH') {
569 45         291 return $obj->{$args->[0]};
570             } elsif ($cmd eq 'DELETE') {
571 2         28 return delete $obj->{$args->[0]};
572             } elsif ($cmd eq 'EXISTS') {
573 8         37 return exists $obj->{$args->[0]};
574             } elsif ($cmd eq 'CLEAR') {
575 0         0 %$obj = ();
576 0         0 return;
577             } elsif ($cmd eq 'FIRSTKEY') {
578 0         0 keys %$obj;
579 0         0 my ($k,$v) = each %$obj;
580 0         0 return $k;
581             } elsif ($cmd eq 'NEXTKEY') {
582 0         0 my ($k,$v) = each %$obj;
583 0         0 return $k;
584             } elsif ($cmd eq 'SCALAR') {
585 0         0 return scalar %$obj;
586             } else {
587 0         0 $@ = "HASH function '$cmd' not recognized";
588 0         0 return;
589             }
590             }
591              
592             sub process_request_ARRAY {
593 240     240 0 596 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
594 240         472 my $obj = $self->{obj}{$id};
595 240 50       809 if (reftype($obj) ne 'ARRAY') {
596 0         0 $@ = "Not an ARRAY ref";
597 0         0 return;
598             }
599 240 100 33     748 if ($cmd eq 'STORE') {
    100          
    100          
    50          
    100          
    100          
    100          
    100          
    50          
    0          
600 6         26 my ($index,$val) = @$args;
601 6         21 my $old_val = $obj->[$index];
602             # ?!!!? does $val have to be shared?
603 6         15 eval { $obj->[$index] = threads::shared::shared_clone($val) };
  6         50  
604 6         72 return $old_val;
605             } elsif ($cmd eq 'FETCH') {
606 174         273 return eval { $obj->[$args->[0]] };
  174         562  
607             } elsif ($cmd eq 'FETCHSIZE') {
608 32         98 return scalar @$obj;
609             } elsif ($cmd eq 'STORESIZE' || $cmd eq 'EXTEND') {
610 0         0 my $n = $#{$obj} = $args->[0]-1;
  0         0  
611 0         0 return $n+1;
612             } elsif ($cmd eq 'SPLICE') {
613 16         52 my ($off,$len,@list) = @$args;
614 16 100       57 if ($off < 0) {
615 3         12 $off += @$obj;
616 3 50       9 if ($off < 0) {
617 0         0 $@ = "Modification of non-createable array value attempted, "
618             . "subscript $off";
619 0         0 return;
620             }
621             }
622 16 100 66     62 if (!defined($len) || $len eq 'undef') {
623 2         4 $len = @{$obj} - $off;
  2         3  
624             }
625 16 100       35 if ($len < 0) {
626 2         4 $len += @{$obj} - $off;
  2         4  
627 2 50       6 if ($len < 0) {
628 0         0 $len = 0;
629             }
630             }
631 16         20 my @val = splice @{$obj}, $off, $len, @list;
  16         88  
632 16         41 $SIDES->{no_out} = 1; # don't try to update @_
633             # SPLICE is the only ARRAY function that doesn't assume scalar context
634 16 100       32 if ($ctx == 1) {
635 6 50       38 return @val > 0 ? $val[-1] : undef;
636             } else {
637 10         31 return @val;
638             }
639             } elsif ($cmd eq 'PUSH') {
640 4         13 return push @{$obj}, map threads::shared::shared_clone($_), @$args;
  4         72  
641             } elsif ($cmd eq 'UNSHIFT') {
642 2         7 return unshift @{$obj}, map threads::shared::shared_clone($_), @$args;
  2         17  
643             } elsif ($cmd eq 'POP') {
644 3         9 return pop @{$obj};
  3         17  
645             } elsif ($cmd eq 'SHIFT') {
646 3         7 return shift @{$obj};
  3         21  
647             } elsif ($cmd eq 'EXISTS') {
648 0         0 return exists $obj->[$args->[0]];
649             } else {
650 0         0 $@ = "tied ARRAY function '$cmd' not recognized";
651 0         0 return;
652             }
653             }
654              
655             sub process_request_SCALAR {
656 18     18 0 56 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
657 18         62 my $obj = $self->{obj}{$id};
658 18 50       99 if (reftype($obj) ne 'SCALAR') {
659 0         0 $@ = "Not a SCALAR reference";
660 0         0 return;
661             }
662 18 100       50 if ($cmd eq 'STORE') {
    50          
663 6         10 my $val = ${$obj};
  6         21  
664 6         75 ${$obj} = threads::shared::shared_clone($args->[0]);
  6         17  
665 6         24 return $val;
666             } elsif ($cmd eq 'FETCH') {
667 12         23 return ${$obj};
  12         54  
668             } else {
669 0         0 $@ = "tied SCALAR function '$cmd' not recognized";
670 0         0 return;
671             }
672             }
673              
674             sub process_request_METHOD {
675 31     31 0 95 my ($self,$id,$command,$context,$has_args,$args) = @_;
676 31         74 my $obj = $self->{obj}{$id};
677 31 50       84 if (!$obj) {
678 0         0 $@ = "Bad object id '$id' in proxy method call";
679 0         0 return;
680             }
681 31         73 my @r;
682 31 100       156 if ($command =~ /::/) {
    100          
683 60     60   130306 no strict 'refs';
  60         108  
  60         87892  
684 1 50       4 if ($context < 2) {
685 1 50       4 @r = scalar eval { $has_args ? &$command($obj,@$args)
  1         16  
686             : &$command($obj) };
687             } else {
688 0 0       0 @r = eval { $has_args ? &$command($obj,@$args)
  0         0  
689             : &$command($obj) };
690             }
691             } elsif ($context < 2) {
692 29 100       44 @r = scalar eval { $has_args ? $obj->$command(@$args)
  29         321  
693             : $obj->$command };
694             } else {
695 1 50       2 @r = eval { $has_args ? $obj->$command(@$args)
  1         8  
696             : $obj->$command };
697             }
698 31         341 return @r;
699             }
700              
701             sub process_request_HANDLE {
702 60     60 0 212 my ($self,$id,$command,$context,$has_args,$args) = @_;
703 60         191 my $obj = $self->{obj}{$id};
704 60 50       182 my $fh = CORE::ref($obj) eq 'threadsx::shared::glob' ? $obj->glob : $obj;
705 60 100 66     736 if ($command eq 'PRINT') {
    100 100        
    50          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    50          
    50          
    100          
    50          
706 6         13 my $z = print {$fh} @$args;
  6         13137  
707 6         71 return $z;
708             } elsif ($command eq 'PRINTF') {
709 1 50       4 if ($has_args) {
710 1         3 my $template = shift @$args;
711 1         3 my $z = printf {$fh} $template, @$args;
  1         18  
712 1         4 return $z;
713             } else {
714             # I don't think we can get here through the proxy
715 0         0 my $z = printf {$fh} "";
  0         0  
716 0         0 return $z;
717             }
718             } elsif ($command eq 'WRITE') {
719 0 0       0 if (@$args < 2) {
720 0         0 return $self->error_response("Not enough arguments for syswrite");
721             }
722 0   0     0 return syswrite($fh, $args->[0],
      0        
723             $args->[1] // undef, $args->[2] // undef);
724             } elsif ($command eq 'READLINE') {
725 13         25 my @val;
726 13 100       42 if ($context > 1) {
727 2         23 my @val = readline($fh);
728 2         10 return @val;
729             } else {
730 11         198 my $val = readline($fh);
731 11         57 return $val;
732             }
733             } elsif ($command eq 'GETC') {
734 5         26 my $ch = getc($fh);
735 5         21 return $ch;
736             } elsif ($command eq 'READ' || $command eq 'READ?' ||
737             $command eq 'SYSREAD') {
738 3         14 local $Patro::read_sysread_flag; # don't clobber
739 3 50       21 if (@$args < 2) {
740             # I don't think we can get here through the proxy
741 0         0 $@ = "Not enough arguments for " . lc($command);
742 0         0 return;
743             }
744 3         14 my (undef, $len, $off) = @$args;
745 3         7 my $z;
746 3 100 33     24 if ($command eq 'SYSREAD' ||
      66        
747             ($command eq 'READ?' && fileno($fh) >= 0)) {
748 1   50     10 $z = sysread $fh, $args->[0], $len, $off || 0;
749             } else {
750             # sysread doesn't work, for example, on file handles opened
751             # from a reference to a scalar
752 2   50     18 $z = read $fh, $args->[0], $len, $off || 0;
753             }
754 3         23 return $z;
755             } elsif ($command eq 'EOF') {
756 2         21 return eof($fh);
757             } elsif ($command eq 'FILENO') {
758 1         3 my $z = fileno($fh);
759 1         3 return $z;
760             } elsif ($command eq 'SEEK') {
761 2 50       14 if (@$args < 2) {
    50          
762 0         0 $@ = "Not enough arguments for seek";
763 0         0 return;
764             } elsif (@$args > 2) {
765 0         0 $@ = "Too many arguments for seek";
766 0         0 return;
767             } else {
768 2         21 my $z = seek $fh, $args->[0], $args->[1];
769 2         9 return $z;
770             }
771             } elsif ($command eq 'TELL') {
772 8         25 my $z = tell($fh);
773 8         26 return $z;
774             } elsif ($command eq 'BINMODE') {
775 3         5 my $z;
776 3 100       10 if (@$args) {
777 2         176 $z = binmode $fh, $args->[0];
778             } else {
779 1         22 $z = binmode $fh;
780             }
781 3         304 return $z;
782             } elsif ($command eq 'CLOSE') {
783 5 50       22 if ($Patro::SECURE) {
784 0         0 $@ = "Patro: insecure CLOSE operation on proxy filehandle";
785 0         0 return;
786             }
787 5         74 my $z = close $fh;
788 5         34 return $z;
789             } elsif ($command eq 'OPEN') {
790 5 50       19 if ($Patro::SECURE) {
791 0         0 $@ = "Patro: insecure OPEN operation on proxy filehandle";
792 0         0 return;
793             }
794 5         18 my $z;
795 5         20 my $mode = shift @$args;
796 5 100       28 if (@$args == 0) {
797 1         53 $z = open $fh, $mode;
798             } else {
799 4         18 my $expr = shift @$args;
800 4 100       22 if (@$args == 0) {
801 3         293 $z = open $fh, $mode, $expr;
802             } else {
803 1         2911 $z = open $fh, $mode, $expr, @$args;
804             }
805             }
806              
807             # it is hard to set autoflush from the proxy.
808             # Since it is usually what you want, let's do it here.
809 5 50       33 if ($z) {
810 5         42 my $fh_sel = select $fh;
811 5         26 $| = 1;
812 5         46 select $fh_sel;
813             }
814 5         38 return $z;
815             }
816             # commands that are not in the tied filehandle
817             elsif ($command eq 'TRUNCATE') {
818 1         13 my $z = truncate $fh, $args->[0];
819 1         7 return $z;
820             } elsif ($command eq 'FCNTL') {
821 0         0 my $z = fcntl $fh, $args->[0], $args->[1];
822 0         0 return $z;
823             } elsif ($command eq 'FLOCK') {
824 0         0 my $z = flock $fh, $args->[0];
825 0         0 return $z;
826             } elsif ($command eq 'STAT') {
827 2 100       10 if ($context < 2) {
828 1         15 return scalar stat $fh;
829             } else {
830 1         7 return stat $fh;
831             }
832             } elsif ($command eq '-X') {
833 3         11 my $key = $args->[0];
834 3         337 return eval "-$key \$fh";
835            
836             } else {
837 0         0 $@ = "tied HANDLE function '$command' not found";
838 0         0 return;
839             }
840             }
841              
842             sub process_request_CODE {
843 3     3 0 16 my ($self,$id,$command_NOTUSED,$context,$has_args,$args) = @_;
844 3         14 my $sub = $self->{obj}{$id};
845 3 50       26 if (CORE::ref($sub) eq 'threadsx::shared::code') {
846 0         0 $sub = $sub->code;
847             }
848 3 50       13 if ($context < 2) {
849 3 100       9 return scalar eval { $has_args ? $sub->(@$args) : $sub->() };
  3         48  
850             } else {
851 0 0       0 return eval { $has_args ? $sub->(@$args) : $sub->() };
  0         0  
852             }
853             }
854              
855             sub process_request_OVERLOAD {
856 15     15 0 41 my ($self,$x,$op,$args,$context) = @_;
857 15 100       55 if ($op eq '@{}') {
    100          
    50          
    50          
858 2         10 return \@$x;
859             } elsif ($op eq '%{}') {
860 1         17 return \%$x;
861             } elsif ($op eq '&{}') {
862 0         0 return \&$x;
863             } elsif ($op eq '${}') {
864 0         0 return \$$x;
865             } # elsif ($op eq '*{}') { return \*$x; }
866 12         24 my ($y,$swap) = @$args;
867 12 50       22 if ($swap) {
868 0         0 ($x,$y) = ($y,$x);
869             }
870 12         27 local $@ = '';
871 12         16 my $z;
872 12 50       44 if ($op =~ /[&|~^][.]=?/) {
873 0         0 $op =~ s/\.//;
874             }
875 12 50 33     217 if ($op eq '-X') {
    50 33        
    50 33        
    50 33        
    50 33        
    50 33        
    100 33        
    50 33        
    100 33        
    50          
876 0         0 $z = eval "-$y \$x";
877             } elsif ($op eq 'neg') {
878 0         0 $z = eval { -$x };
  0         0  
879             } elsif ($op eq '!' || $op eq '~' || $op eq '++' || $op eq '--') {
880 0         0 $z = eval "$op\$x";
881             } elsif ($op eq 'qr') {
882 0         0 $z = eval { qr/$x/ };
  0         0  
883             } elsif ($op eq 'atan2') {
884 0         0 $z = eval { atan2($x,$y) };
  0         0  
885             } elsif ($op eq 'cos' || $op eq 'sin' || $op eq 'exp' || $op eq 'abs' ||
886             $op eq 'int' || $op eq 'sqrt' || $op eq 'log') {
887 0         0 $z = eval "$op(\$x)";
888             } elsif ($op eq 'bool') {
889 3 50       9 $z = eval { $x ? 1 : 0 }; # this isn't right
  3         160  
890             } elsif ($op eq '0+') {
891 0         0 $z = eval "0 + \$x"; # this isn't right, either
892             } elsif ($op eq '""') {
893 2         3 $z = eval { "$x" };
  2         27  
894             } elsif ($op eq '<>') {
895             # always scalar context readline
896 0         0 $z = eval { readline($x) };
  0         0  
897             } else { # binary operator
898 7         423 $z = eval "\$x $op \$y";
899             }
900 12 50       472 if ($@) {
901 0         0 return;
902             }
903 12 50       25 if ($threads_avail) {
904 0         0 $z = threads::shared::shared_clone($z);
905             }
906 12         32 return $z;
907             }
908              
909             sub process_request_REF {
910 5     5 0 14 my ($self,$id,$command,$context,$has_args,$args) = @_;
911 5         13 my $obj = $self->{obj}{$id};
912 5 50       29 if (reftype($obj) ne 'REF') {
913 0         0 $@ = "Not a REF";
914 0         0 return;
915             }
916 5 50       13 if ($command eq 'deref') {
917 5         35 return $$obj;
918             }
919 0         0 $@ = "$command is not an appropriate operation for REF";
920 0         0 return;
921             }
922              
923             ########################################
924              
925             sub void_response {
926 32     32 0 84 my $addl = {};
927 32 50 33     312 if (@_ > 0 && CORE::ref($_[-1]) eq '.Patroclus') {
928 32         101 $addl = pop @_;
929             }
930 32         472 return +{ context => 0, response => undef, %$addl };
931             }
932              
933             sub scalar_response {
934 387     387 0 869 my ($self,$sides,$val) = @_;
935             return +{
936 387         2196 context => 1,
937             response => $val,
938             %$sides
939             };
940             }
941              
942             sub list_response {
943 14     14 0 38 my ($self,$sides,@val) = @_;
944             return +{
945 14         184 context => 2,
946             response => \@val,
947             %$sides
948             };
949             }
950              
951             sub error_response {
952 0     0 0 0 my ($self,@msg) = @_;
953 0         0 return { error => join('', @msg) };
954             }
955              
956             ########################################
957              
958             sub serialize_response {
959 473     473 0 949 my ($self, $resp) = @_;
960 473 100       1163 if ($resp->{context}) {
961 401 100       875 if ($resp->{context} == 1) {
    50          
962 387         915 $resp->{response} = patrol($self,$resp,$resp->{response});
963             } elsif ($resp->{context} == 2) {
964             $resp->{response} = [
965 14         19 map patrol($self,$resp,$_), @{$resp->{response}} ];
  14         64  
966             }
967             }
968              
969 473 100       1154 if ($resp->{out}) {
970 12         19 $resp->{out} = [ map patrol($self,$resp,$_), @{$resp->{out}} ];
  12         47  
971             }
972              
973 473         1372 sxdiag("Server: final response before serialization: ",$resp);
974 473         1022 $resp = Patro::LeumJelly::serialize($resp);
975 473         20471 return $resp;
976             }
977              
978             # we should not send any serialized references back to the client.
979             # replace any references in the response with an
980             # object id.
981             sub patrol {
982 460     460 0 938 my ($self,$resp,$obj) = @_;
983 460 50       1651 sxdiag("patrol: called on: ",defined($obj) ? "$obj" : "");
984 460 100       1251 return $obj unless ref($obj);
985              
986 62 50       186 if ($threads_avail) {
987 0 0       0 if (CORE::ref($obj) eq 'CODE') {
    0          
988 0         0 $obj = threadsx::shared::code->new($obj);
989 0         0 sxdiag("patrol: coderef converted");
990             } elsif (CORE::ref($obj) eq 'GLOB') {
991 0         0 $obj = threadsx::shared::glob->new($obj);
992 0         0 sxdiag("patrol: glob converted");
993             }
994             }
995              
996 62         124 my $id = do {
997 60     60   439 no overloading;
  60         140  
  60         17062  
998 62         177 0 + $obj;
999             };
1000              
1001 62 100       233 if (!$self->{obj}{$id}) {
1002 31         114 $self->{obj}{$id} = $obj;
1003 31         71 my $ref = CORE::ref($obj);
1004 31         65 my $reftype;
1005 31 50       119 if ($ref eq 'threadsx::shared::code') {
    50          
1006 0         0 $ref = 'CODE';
1007 0         0 $reftype = 'CODE';
1008             } elsif ($ref eq 'threadsx::shared::glob') {
1009 0         0 $ref = 'GLOB';
1010 0         0 $reftype = 'GLOB';
1011             } else {
1012 31         140 $reftype = Scalar::Util::reftype($obj);
1013             }
1014 31         159 sxdiag("patrol: ref types for $id are $ref,$reftype");
1015 31         379 $resp->{meta}{$id} = {
1016             id => $id, ref => $ref, reftype => $reftype
1017             };
1018 31 100       217 if (overload::Overloaded($obj)) {
1019 4         235 $resp->{meta}{$id}{overload} = _overloads($obj);
1020             }
1021 31         1770 sxdiag("new response meta: ",$resp->{meta}{$id});
1022             } else {
1023 31         141 sxdiag("id $id has been seen before");
1024             }
1025 62         355 return bless \$id,'.Patrobras';
1026             }
1027              
1028             sub TEST_MODE {
1029 59 50   59 0 171 if ($INC{'perl5db.pl'}) {
1030 0         0 ::xdiag("TEST_MODE adjusted for debugging");
1031 0         0 $OPTS{keep_alive} = 3600;
1032 0         0 $OPTS{fincheck_freq} = 3500;
1033 0         0 $OPTS{idle_timeout} = 3600;
1034 0         0 alarm 9999;
1035 0         0 return;
1036             }
1037 59         101 $OPTS{keep_alive} = 2;
1038 59         82 $OPTS{fincheck_freq} = 2;
1039 59         92 $OPTS{idle_timeout} = 1;
1040 59 50       142 if ($threads_avail) {
1041 0         0 $OPTS{fincheck_freq} = "0 but true";
1042             }
1043             }
1044              
1045             1;
1046              
1047             =head1 NAME
1048              
1049             Patro::Server - remote object server for Patro
1050              
1051             =head1 VERSION
1052              
1053             0.13
1054              
1055             =head1 DESCRIPTION
1056              
1057             A server class for making references available to proxy clients
1058             in the L distribution.
1059             The server handles requests for any references that are being served,
1060             manipulates references on the server, and returns the results of
1061             operations to the proxy objects on the clients.
1062              
1063             =head1 LICENSE AND COPYRIGHT
1064              
1065             MIT License
1066              
1067             Copyright (c) 2017, Marty O'Brien
1068              
1069             Permission is hereby granted, free of charge, to any person obtaining a copy
1070             of this software and associated documentation files (the "Software"), to deal
1071             in the Software without restriction, including without limitation the rights
1072             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
1073             copies of the Software, and to permit persons to whom the Software is
1074             furnished to do so, subject to the following conditions:
1075              
1076             The above copyright notice and this permission notice shall be included in all
1077             copies or substantial portions of the Software.
1078              
1079             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1080             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1081             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1082             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1083             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
1084             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
1085             SOFTWARE.
1086              
1087             =cut