File Coverage

blib/lib/Net/Clacks/Server.pm
Criterion Covered Total %
statement 79 841 9.3
branch 0 366 0.0
condition 0 121 0.0
subroutine 26 42 61.9
pod 4 9 44.4
total 109 1379 7.9


line stmt bran cond sub pod time code
1             package Net::Clacks::Server;
2             #---AUTOPRAGMASTART---
3 1     1   25 use 5.020;
  1         3  
4 1     1   4 use strict;
  1         2  
  1         17  
5 1     1   4 use warnings;
  1         1  
  1         24  
6 1     1   4 use diagnostics;
  1         1  
  1         6  
7 1     1   26 use mro 'c3';
  1         1  
  1         6  
8 1     1   20 use English;
  1         1  
  1         15  
9 1     1   342 use Carp;
  1         2  
  1         48  
10             our $VERSION = 22;
11 1     1   5 use autodie qw( close );
  1         2  
  1         6  
12 1     1   298 use Array::Contains;
  1         2  
  1         52  
13 1     1   6 use utf8;
  1         2  
  1         6  
14 1     1   34 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  1         1  
  1         51  
15             #---AUTOPRAGMAEND---
16              
17 1     1   674 use XML::Simple;
  1         7513  
  1         5  
18 1     1   61 use Time::HiRes qw(sleep usleep time);
  1         3  
  1         6  
19 1     1   112 use Sys::Hostname;
  1         1  
  1         33  
20 1     1   5 use Errno;
  1         2  
  1         25  
21 1     1   4 use IO::Socket::IP;
  1         2  
  1         7  
22 1     1   454 use IO::Select;
  1         1  
  1         49  
23 1     1   5 use IO::Socket::SSL;
  1         2  
  1         5  
24 1     1   581 use YAML::Syck;
  1         1641  
  1         47  
25 1     1   6 use MIME::Base64;
  1         2  
  1         33  
26 1     1   411 use File::Copy;
  1         3774  
  1         45  
27 1     1   492 use Data::Dumper;
  1         5125  
  1         47  
28              
29             # For turning off SSL session cache
30 1     1   470 use Readonly;
  1         3310  
  1         90  
31             Readonly my $SSL_SESS_CACHE_OFF => 0x0000;
32              
33             my %overheadflags = (
34             A => "auth_token", # Authentication token
35             O => "auth_ok", # Authentication OK
36             F => "auth_failed", # Authentication FAILED
37              
38             E => 'error_message', # Server to client error message
39              
40             C => "close_all_connections",
41             D => "discard_message",
42             G => "forward_message",
43             I => "set_interclacks_mode", # value: true/false, disables 'G' and 'U'
44             L => "lock_for_sync", # value: true/false, only available in interclacks client mode
45             M => "informal_message", # informal message, no further operation on it
46             N => "no_logging",
47             S => "shutdown_service", # value: positive number (number in seconds before shutdown). If interclacks clients are present, should be high
48             # enough to flush all buffers to them
49              
50             T => 'timestamp', # Used before KEYSYNC to compensate for time drift between different systems
51             U => "return_to_sender",
52             Z => "no_flags", # Only sent when no other flags are set
53             );
54              
55             BEGIN {
56             {
57             # We need to add some extra function to IO::Socket::SSL so we can track the client ID
58             # on both TCP and Unix Domain Sockets
59 1     1   7 no strict 'refs'; ## no critic (TestingAndDebugging::ProhibitNoStrict)
  1     1   2  
  1         110  
  1         3  
60 1         6 *{"IO::Socket::SSL::_setClientID"} = sub {
61 0     0   0 my ($self, $cid) = @_;
62            
63 0         0 ${*$self}{'__client_id'} = $cid; ## no critic (References::ProhibitDoubleSigils)
  0         0  
64 0         0 return;
65 1         3 };
66            
67 1         1037 *{"IO::Socket::SSL::_getClientID"} = sub {
68 0     0   0 my ($self) = @_;
69            
70 0   0     0 return ${*$self}{'__client_id'} || ''; ## no critic (References::ProhibitDoubleSigils)
71 1         3 };
72              
73             }
74            
75             }
76              
77             sub new {
78 0     0 1   my ($class, $isDebugging, $configfile) = @_;
79              
80 0           my $self = bless {}, $class;
81              
82 0           $self->{isDebugging} = $isDebugging;
83 0           $self->{configfile} = $configfile;
84              
85 0           $self->{timeoffset} = 0;
86              
87 0 0         if(defined($ENV{CLACKS_SIMULATED_TIME_OFFSET})) {
88 0           $self->{timeoffset} = 0 + $ENV{CLACKS_SIMULATED_TIME_OFFSET};
89 0           print "****** RUNNING WITH A SIMULATED TIME OFFSET OF ", $self->{timeoffset}, " seconds ******\n";
90             }
91              
92 0           $self->{clackscache} = {};
93 0           $self->{clackscachetime} = {};
94 0           $self->{clackscacheaccesstime} = {};
95              
96 0           return $self;
97             }
98              
99             sub init {
100 0     0 1   my ($self) = @_;
101              
102              
103 0           my @paths;
104 0 0         if(defined($ENV{'PC_CONFIG_PATHS'})) {
105 0           push @paths, split/\:/, $ENV{'PC_CONFIG_PATHS'};
106 0           print "Found config paths:\n", Dumper(\@paths), " \n";
107             } else {
108 0           print("PC_CONFIG_PATHS undefined, falling back to legacy mode\n");
109 0           @paths = ('', 'configs/');
110             }
111              
112 0           my $filedata;
113 0           my $fname = $self->{configfile};
114 0           foreach my $path (@paths) {
115 0 0 0       if($path ne '' && $path !~ /\/$/) {
116 0           $path .= '/';
117             }
118 0           my $fullfname = $path . $fname;
119 0 0         next unless (-f $fullfname);
120 0           print " Loading config file $fullfname\n";
121              
122 0           $filedata = slurpBinFile($fullfname);
123              
124 0           foreach my $varname (keys %ENV) {
125 0 0         next unless $varname =~ /^PC\_/;
126              
127 0           my $newval = $ENV{$varname};
128 0           $filedata =~ s/$varname/$newval/g;
129             }
130              
131 0           last;
132             }
133              
134 0 0 0       if(!defined($filedata) || $filedata eq "") {
135 0           croak("Can't load config file: Not found or empty!");
136             }
137              
138 0           print "------- Parsing config file $fname ------\n";
139 0           my $config = XMLin($filedata, ForceArray => [ 'ip', 'socket' ]);
140              
141 0           my $hname = hostname;
142              
143             # Copy hostname-specific stuff to root if it exists
144 0 0         if(defined($config->{hosts}->{$hname})) {
145 0           foreach my $key (keys %{$config->{hosts}->{$hname}}) {
  0            
146 0           $config->{$key} = $config->{hosts}->{$hname}->{$key};
147             }
148             }
149              
150 0           $self->{config} = $config;
151              
152 0 0         if(!defined($self->{config}->{throttle}->{maxsleep})) {
153 0           $self->{config}->{throttle}->{maxsleep} = 100;
154             }
155 0 0         if(!defined($self->{config}->{throttle}->{step})) {
156 0           $self->{config}->{throttle}->{step} = 10;
157             }
158              
159 0           $self->{usleep} = 0;
160              
161 0 0 0       if(!defined($self->{config}->{ssl}) ||
      0        
162             !defined($self->{config}->{ssl}->{cert}) ||
163             !defined($self->{config}->{ssl}->{key})) {
164 0           croak("Missing or incomplete SSL config!");
165             }
166 0 0         if(!-f $self->{config}->{ssl}->{cert}) {
167 0           croak("SSL cert file " . $self->{config}->{ssl}->{cert} . " not found!");
168             }
169 0 0         if(!-f $self->{config}->{ssl}->{key}) {
170 0           croak("SSL key file " . $self->{config}->{ssl}->{key} . " not found!");
171             }
172              
173 0 0         if(!defined($self->{config}->{username})) {
174 0           croak("Username not defined!");
175             }
176 0 0         if(!defined($self->{config}->{password})) {
177 0           croak("Password not defined!");
178             }
179 0           $self->{authtoken} = encode_base64($self->{config}->{username}, '') . ':' . encode_base64($self->{config}->{password}, '');
180              
181 0 0         if(defined($self->{config}->{persistancefile})) {
182 0           $self->{persistance} = 1;
183             } else {
184 0           $self->{persistance} = 0;
185             }
186              
187 0 0         if(!defined($self->{config}->{persistanceinterval})) {
188 0           $self->{persistanceinterval} = 10;
189             } else {
190 0           $self->{persistanceinterval} = $self->{config}->{persistanceinterval};
191             }
192              
193 0 0         if(!defined($self->{config}->{interclacksreconnecttimeout})) {
194 0           $self->{config}->{interclacksreconnecttimeout} = 30;
195             }
196              
197 0 0         if(!defined($self->{config}->{authtimeout})) {
198 0           $self->{config}->{authtimeout} = 15;
199             }
200              
201 0 0         if(!defined($self->{config}->{deletedcachetime})) {
202 0           $self->{config}->{deletedcachetime} = 60 * 60; # 1 hour
203             }
204 0 0         if(!defined($self->{config}->{stalecachetime})) {
205 0           $self->{config}->{stalecachetime} = 60 * 60 * 24; # 1 day
206             }
207              
208 0           my @tcpsockets;
209              
210 0 0         if(defined($config->{ip})) {
211 0           foreach my $ip (@{$config->{ip}}) {
  0            
212             my $tcp = IO::Socket::IP->new(
213             LocalHost => $ip,
214             LocalPort => $config->{port},
215 0 0         Listen => 1,
216             Blocking => 0,
217             ReuseAddr => 1,
218             Proto => 'tcp',
219             ) or croak($ERRNO);
220             #binmode($tcp, ':bytes');
221 0           push @tcpsockets, $tcp;
222 0           print "Listening on $ip:", $config->{port}, "/tcp\n";
223             }
224             }
225              
226 0 0 0       if(defined($config->{socket}) || defined($self->{config}->{master}->{socket})) {
227 0           my $udsloaded = 0;
228 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
229 0           require IO::Socket::UNIX;
230 0           $udsloaded = 1;
231             };
232 0 0         if(!$udsloaded) {
233 0           croak("Specified a unix domain socket, but i couldn't load IO::Socket::UNIX!");
234             }
235              
236             # Add the ClientID stuff to Unix domain sockets as well. We don't do this in the BEGIN{} block
237             # since we are not yet sure we are going to load IO::Socket::UNIX in the first place
238             {
239 1     1   8 no strict 'refs'; ## no critic (TestingAndDebugging::ProhibitNoStrict)
  1         3  
  1         8509  
  0            
240 0           *{"IO::Socket::UNIX::_setClientID"} = sub {
241 0     0     my ($self, $cid) = @_;
242            
243 0           ${*$self}{'__client_id'} = $cid; ## no critic (References::ProhibitDoubleSigils)
  0            
244 0           return;
245 0           };
246            
247 0           *{"IO::Socket::UNIX::_getClientID"} = sub {
248 0     0     my ($self) = @_;
249            
250 0   0       return ${*$self}{'__client_id'} || ''; ## no critic (References::ProhibitDoubleSigils)
251 0           };
252             }
253             }
254              
255 0 0         if(defined($config->{socket})) {
256 0           foreach my $socket (@{$config->{socket}}) {
  0            
257 0 0         if(-S $socket) {
258 0           print "Removing old unix domain socket file $socket\n";
259 0           unlink $socket;
260             }
261 0 0         my $tcp = IO::Socket::UNIX->new(
262             Type => SOCK_STREAM(),
263             Local => $socket,
264             Listen => 1,
265             #Blocking => 0,
266             ) or croak($ERRNO);
267 0           $tcp->blocking(0);
268             #binmode($tcp, ':bytes');
269 0           push @tcpsockets, $tcp;
270 0           print "Listening on Unix domain socket $socket\n";
271              
272 0 0 0       if(defined($config->{socketchmod}) && $config->{socketchmod} ne '') {
273 0           my $cmd = 'chmod ' . $config->{socketchmod} . ' ' . $socket;
274 0           print $cmd, "\n";
275 0           `$cmd`;
276             }
277             }
278             }
279              
280 0           $self->{tcpsockets} = \@tcpsockets;
281              
282              
283 0           print "Ready.\n";
284              
285              
286 0           return;
287             }
288              
289             sub loadPersistanceFile {
290 0     0 0   my ($self, $fname) = @_;
291              
292 0           my %clackscache;
293             my %clackscachetime;
294 0           my %clackscacheaccesstime;
295              
296 0 0         if(open(my $ifh, '<', $fname)) {
297 0           my $line = <$ifh>;
298 0           my $timestampline = <$ifh>;
299 0           my $accesstimeline = <$ifh>;
300 0           my $endline = <$ifh>;
301 0           my $needupgrade = 0;
302 0           close $ifh;
303              
304 0           chomp $line;
305 0           chomp $timestampline;
306 0           chomp $accesstimeline;
307              
308 0 0 0       if(!defined($endline) && $accesstimeline eq 'ENDBYTES') {
309 0           $endline = 'ENDBYTES';
310 0           $accesstimeline = '';
311 0           $needupgrade = 1;
312             } else {
313 0           chomp $endline;
314             }
315              
316 0 0 0       if(!defined($line) || !defined($timestampline) || $endline ne 'ENDBYTES') {
      0        
317 0           carp("Invalid persistance file " . $fname . "! File is incomplete!");
318 0           return; # Fail
319             }
320              
321 0           my $loadok = 0;
322              
323 0 0         if($line ne '') {
324 0           eval {
325 0           $line = decode_base64($line);
326 0           $line = Load($line);
327 0           $loadok = 1;
328             };
329 0 0         if(!$loadok) {
330 0           carp("Invalid persistance file " . $fname . "! Failed to decode data line!");
331 0           return; # Fail
332             }
333             }
334 0           %clackscache = %{$line};
  0            
335              
336             # Mark all data as current as a fallback
337 0           my $now = $self->getTime();
338 0           foreach my $key (keys %clackscache) {
339 0           $clackscachetime{$key} = $now;
340             }
341              
342 0 0         if($timestampline ne '') {
343 0           $loadok = 0;
344 0           eval {
345 0           $timestampline = decode_base64($timestampline);
346 0           $timestampline = Load($timestampline);
347 0           $loadok = 1;
348             };
349 0 0         if(!$loadok) {
350 0           carp("Invalid persistance file " . $fname . "! Failed to decode timestamp line, using current time!");
351 0           return; # Fail
352             } else {
353 0           my %clackstemp = %{$timestampline};
  0            
354 0           foreach my $key (keys %clackstemp) {
355 0           $clackscachetime{$key} = $clackstemp{$key};
356             }
357             }
358             }
359              
360 0 0         if($needupgrade) {
    0          
361 0           print "Pre-Version 22 persistance file detected. Upgrading automatically.\n";
362 0           foreach my $key (keys %clackscache) {
363 0           $clackscacheaccesstime{$key} = $now;
364             }
365             } elsif($accesstimeline ne '') {
366 0           $loadok = 0;
367 0           eval {
368 0           $accesstimeline = decode_base64($accesstimeline);
369 0           $accesstimeline = Load($accesstimeline);
370 0           $loadok = 1;
371             };
372 0 0         if(!$loadok) {
373 0           carp("Invalid persistance file " . $fname . "! Failed to decode timestamp line, using current time!");
374 0           return; # Fail
375             } else {
376 0           %clackscacheaccesstime = %{$accesstimeline};
  0            
377             }
378             }
379             } else {
380             # Fail
381 0           return;
382             }
383              
384 0           return \%clackscache, \%clackscachetime, \%clackscacheaccesstime;
385             }
386              
387              
388             sub run { ## no critic (Subroutines::ProhibitExcessComplexity)
389 0     0 1   my ($self) = @_;
390              
391 0           my $savecache = 0;
392 0           my $lastsavecache = 0;
393              
394             # Let STDOUT/STDERR settle down first
395 0           sleep(0.1);
396              
397             # Need to ignore SIGPIPE, this can screw us over in certain circumstances
398             # while writing to the network. We can only detect certain types of disconnects
399             # after writing to the socket, but we will get a SIGPIPE if we try. So we just
400             # ignore the signal and carry on as usual...
401 0           $SIG{PIPE} = 'IGNORE';
402              
403 0           my @toremove;
404             my @outbox;
405 0           my %clients;
406              
407 0           my $shutdowntime;
408 0           my $selector = IO::Select->new();
409 0           my $interclackslock = 0;
410 0           my $nextinterclackscheck = 0;
411              
412 0           my $keepRunning = 1;
413 0     0     $SIG{INT} = sub { $keepRunning = 0; };
  0            
414 0     0     $SIG{TERM} = sub { $keepRunning = 0; };
  0            
415              
416             # Restore persistance file if required
417 0 0         if($self->{persistance}) {
418 0           my $previousfname = $self->{config}->{persistancefile} . '_bck';
419 0           my $tempfname = $self->{config}->{persistancefile} . '_';
420 0           my $loadok = 0;
421 0 0         if(-f $self->{config}->{persistancefile}) {
422 0           print "Trying to load persistance file ", $self->{config}->{persistancefile}, "\n";
423 0           my ($cc, $cct, $cca) = $self->loadPersistanceFile($self->{config}->{persistancefile});
424 0 0 0       if(defined($cc) && ref $cc eq 'HASH') {
425 0           $self->{clackscache} = $cc;
426 0           $self->{clackscachetime} = $cct;
427 0           $self->{clackscacheaccesstime} = $cca;
428 0           $savecache = 1; # Force saving a new persistance file
429 0           $loadok = 1;
430             }
431             }
432              
433 0 0 0       if(!$loadok && -f $previousfname) {
434 0           print "Trying to load backup (previous) persistance file ", $previousfname, "\n";
435 0           my ($cc, $cct, $cca) = $self->loadPersistanceFile($previousfname);
436 0 0 0       if(defined($cc) && ref $cc eq 'HASH') {
437 0           $self->{clackscache} = $cc;
438 0           $self->{clackscachetime} = $cct;
439 0           $self->{clackscacheaccesstime} = $cca;
440 0           $savecache = 2; # Force saving a new persistance file plus a new backup
441 0           $loadok = 1;
442             }
443             }
444 0 0 0       if(!$loadok && -f $tempfname) {
445 0           print "Oh no. As a final, desperate solution, trying to load a 'temporary file while saving' persistance file ", $tempfname, "\n";
446 0           my ($cc, $cct, $cca) = $self->loadPersistanceFile($tempfname);
447 0 0 0       if(defined($cc) && ref $cc eq 'HASH') {
448 0           $self->{clackscache} = $cc;
449 0           $self->{clackscachetime} = $cct;
450 0           $self->{clackscacheaccesstime} = $cca;
451 0           $savecache = 2; # Force saving a new persistance file plus a new backup
452 0           $loadok = 1;
453             }
454             }
455              
456 0 0         if(!$loadok) {
457 0           print "Sorry, no valid persistance file found. Starting server 'blankety-blank'\n";
458 0           $savecache = 2;
459             } else {
460 0           print "Persistance file loaded\n";
461             }
462             }
463              
464 0           while($keepRunning) {
465 0           my $workCount = 0;
466              
467             # Check for shutdown time
468 0 0 0       if($shutdowntime && $shutdowntime < time) {
469 0           print STDERR "Shutdown time has arrived!\n";
470 0           $keepRunning = 0;
471             }
472              
473 0           my $now = $self->getTime();
474 0 0 0       if($savecache && $now > ($lastsavecache + $self->{persistanceinterval})) {
475 0           $lastsavecache = $now;
476 0           $self->savePersistanceFile($savecache);
477 0           $savecache = 0;
478             }
479              
480             # We are in client mode. We need to add an interclacks link
481 0 0 0       if(defined($self->{config}->{master}->{socket}) || defined($self->{config}->{master}->{ip})) {
482 0           my $mcid;
483 0 0         if(defined($self->{config}->{master}->{socket})) {
484 0           $mcid = 'unixdomainsocket:interclacksmaster';
485             } else {
486 0           $mcid = $self->{config}->{master}->{ip}->[0] . ':' . $self->{config}->{master}->{port};
487             }
488 0 0 0       if(!defined($clients{$mcid}) && $nextinterclackscheck < $now) {
489 0           $nextinterclackscheck = $now + $self->{config}->{interclacksreconnecttimeout} + int(rand(10));
490              
491 0           print "Connect to master\n";
492 0           my $msocket;
493              
494 0 0         if(defined($self->{config}->{master}->{socket})) {
495             $msocket = IO::Socket::UNIX->new(
496 0           Peer => $self->{config}->{master}->{socket}->[0],
497             Type => SOCK_STREAM,
498             );
499             } else {
500             $msocket = IO::Socket::IP->new(
501             PeerHost => $self->{config}->{master}->{ip}->[0],
502             PeerPort => $self->{config}->{master}->{port},
503 0           Type => SOCK_STREAM,
504             Timeout => 5,
505             );
506             }
507 0 0         if(!defined($msocket)) {
508 0           print STDERR "Can't connect to MASTER via interclacks!\n";
509             } else {
510 0           print "connected to master\n";
511              
512 0 0         if(ref $msocket ne 'IO::Socket::UNIX') {
513             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
514             # There is simply no point in running it over a local socket.
515 0           my $encrypted = IO::Socket::SSL->start_SSL($msocket,
516             SSL_verify_mode => SSL_VERIFY_NONE,
517             );
518 0 0         if(!$encrypted) {
519 0           print "startSSL failed: ", $SSL_ERROR, "\n";
520 0           next;
521             }
522             }
523              
524 0           $msocket->blocking(0);
525             #binmode($msocket, ':bytes');
526             my %tmp = (
527             buffer => '',
528             charbuffer => [],
529             listening => {},
530             socket => $msocket,
531             lastping => $now,
532             mirror => 0,
533             outbuffer => "CLACKS PageCamel $VERSION in interclacks client mode\r\n" . # Tell the server we are using PageCamel Interclacks...
534             "OVERHEAD A " . $self->{authtoken} . "\r\n" . # ...send Auth token
535             "OVERHEAD I 1\r\n", # ...and turn interclacks master mode ON on remote side
536             clientinfo => 'Interclacks link',
537             client_timeoffset => 0,
538             interclacks => 1,
539             interclacksclient => 1,
540             lastinterclacksping => $now,
541             lastmessage => $now,
542             authtimeout => $now + $self->{config}->{authtimeout},
543 0           authok => 0,
544             failcount => 0,
545             outmessages => [],
546             inmessages => [],
547             messagedelay => 0,
548             inmessagedelay => 0,
549             outmessagedelay => 0,
550             );
551              
552 0 0         if(defined($self->{config}->{master}->{ip})) {
553 0           $tmp{host} = $self->{config}->{master}->{ip}->[0];
554 0           $tmp{port} = $self->{config}->{master}->{port};
555             }
556 0           $clients{$mcid} = \%tmp;
557 0           $msocket->_setClientID($mcid);
558 0           $selector->add($msocket);
559              
560 0           $workCount++;
561             }
562             }
563             }
564              
565 0           foreach my $tcpsocket (@{$self->{tcpsockets}}) {
  0            
566 0           my $clientsocket = $tcpsocket->accept;
567 0 0         if(defined($clientsocket)) {
568 0           $clientsocket->blocking(0);
569 0           my ($cid, $chost, $cport);
570 0 0         if(ref $tcpsocket eq 'IO::Socket::UNIX') {
571 0           $chost = 'unixdomainsocket';
572 0           $cport = $now . ':' . int(rand(1_000_000));
573             } else {
574 0           ($chost, $cport) = ($clientsocket->peerhost, $clientsocket->peerport);
575             }
576 0           print "Got a new client $chost:$cport!\n";
577 0           $cid = "$chost:$cport";
578 0           foreach my $debugcid (keys %clients) {
579 0 0         if($clients{$debugcid}->{mirror}) {
580 0           $clients{$debugcid}->{outbuffer} .= "DEBUG CONNECTED=" . $cid . "\r\n";
581             }
582             }
583              
584 0 0         if(ref $clientsocket ne 'IO::Socket::UNIX') {
585             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
586             # There is simply no point in running it over a local socket.
587             my $encrypted = IO::Socket::SSL->start_SSL($clientsocket,
588             SSL_server => 1,
589             SSL_cert_file => $self->{config}->{ssl}->{cert},
590             SSL_key_file => $self->{config}->{ssl}->{key},
591             SSL_cipher_list => 'ALL:!ADH:!RC4:+HIGH:+MEDIUM:!LOW:!SSLv2:!SSLv3!EXPORT',
592             SSL_create_ctx_callback => sub {
593 0     0     my $ctx = shift;
594              
595             # Enable workarounds for broken clients
596 0           Net::SSLeay::CTX_set_options($ctx, &Net::SSLeay::OP_ALL); ## no critic (Subroutines::ProhibitAmpersandSigils)
597              
598             # Disable session resumption completely
599 0           Net::SSLeay::CTX_set_session_cache_mode($ctx, $SSL_SESS_CACHE_OFF);
600              
601             # Disable session tickets
602 0           Net::SSLeay::CTX_set_options($ctx, &Net::SSLeay::OP_NO_TICKET); ## no critic (Subroutines::ProhibitAmpersandSigils)
603             },
604 0           );
605 0 0         if(!$encrypted) {
606 0           print "startSSL failed: ", $SSL_ERROR, "\n";
607 0           next;
608             }
609             }
610              
611 0           $clientsocket->blocking(0);
612             #binmode($clientsocket, ':bytes');
613             #$clientsocket->{clacks_cid} = $cid;
614             my %tmp = (
615             buffer => '',
616             charbuffer => [],
617             listening => {},
618             socket => $clientsocket,
619             lastping => $now,
620             mirror => 0,
621             outbuffer => "CLACKS PageCamel $VERSION\r\n" .
622             "OVERHEAD M Authentication required\r\n", # Informal message
623             clientinfo => 'UNKNOWN',
624             client_timeoffset => 0,
625             host => $chost,
626             port => $cport,
627             interclacks => 0,
628             interclacksclient => 0,
629             lastinterclacksping => 0,
630             lastmessage => $now,
631             authtimeout => $now + $self->{config}->{authtimeout},
632 0           authok => 0,
633             failcount => 0,
634             outmessages => [],
635             inmessages => [],
636             inmessagedelay => 0,
637             outmessagedelay => 0,
638             );
639 0           if(0 && $self->{isDebugging}) {
640             $tmp{authok} = 1;
641             $tmp{outbuffer} .= "OVERHEAD M debugmode_auth_not_really_required\r\n"
642             }
643 0           $clients{$cid} = \%tmp;
644 0           $clientsocket->_setClientID($cid);
645 0           $selector->add($clientsocket);
646 0           $workCount++;
647             }
648             }
649              
650             # Check if there are any clients to disconnect...
651              
652 0           my $pingtime = $now - $self->{config}->{pingtimeout};
653 0           my $interclackspingtime = $now - $self->{config}->{interclackspingtimeout};
654 0           my $interclackspinginterval = $now - int($self->{config}->{interclackspingtimeout} / 3);
655 0           foreach my $cid (keys %clients) {
656 0 0         if(!$clients{$cid}->{socket}->connected) {
657 0           push @toremove, $cid;
658 0           next;
659             }
660 0 0         if(!$clients{$cid}->{interclacks}) {
661 0 0 0       if($clients{$cid}->{lastping} > 0 && $clients{$cid}->{lastping} < $pingtime) {
662 0           $self->evalsyswrite($clients{$cid}->{socket}, "\r\nTIMEOUT\r\n");
663 0           push @toremove, $cid;
664 0           next;
665             }
666             } else {
667 0 0         if($clients{$cid}->{lastping} < $interclackspingtime) {
668 0           $self->evalsyswrite($clients{$cid}->{socket}, "\r\nTIMEOUT\r\n");
669 0           push @toremove, $cid;
670 0           next;
671             }
672             }
673              
674 0 0 0       if($clients{$cid}->{interclacks} && $clients{$cid}->{lastinterclacksping} < $interclackspinginterval) {
675 0           $clients{$cid}->{lastinterclacksping} = $now;
676 0           $clients{$cid}->{outbuffer} .= "PING\r\n";
677             }
678              
679 0 0 0       if(!$clients{$cid}->{authok} && $clients{$cid}->{authtimeout} < $now) {
680             # Authentication timeout!
681 0           push @toremove, $cid;
682             }
683             }
684              
685             # ...and disconnect them
686 0           while((my $cid = shift @toremove)) {
687             # In some circumstances, there may be multiple @toremove entries for the same client. Ignore them...
688 0 0         if(defined($clients{$cid})) {
689 0           print "Removing client $cid\n";
690 0           foreach my $debugcid (keys %clients) {
691 0 0         if($clients{$debugcid}->{mirror}) {
692 0           $clients{$debugcid}->{outbuffer} .= "DEBUG DISCONNECTED=" . $cid . "\r\n";
693             }
694             }
695              
696 0 0 0       if($clients{$cid}->{interclacksclient} && $interclackslock) {
697 0           print "...this one is interclacks master and has us locked - UNLOCKING mid-sync!\n";
698 0           $interclackslock = 0;
699             }
700              
701 0           $selector->remove($clients{$cid}->{socket});
702 0           delete $clients{$cid};
703             }
704              
705 0           $workCount++;
706             }
707              
708 0 0         if(!(scalar keys %clients)) {
709             # No clients to handle, let's sleep and try again later
710 0           sleep(0.1);
711 0           next;
712             }
713              
714              
715 0           my $hasoutbufferwork = 0;
716 0           foreach my $cid (keys %clients) {
717 0 0         if(length($clients{$cid}->{buffer}) > 0) {
718             # Found some work to do
719 0           $hasoutbufferwork = 1;
720 0           last;
721             }
722             }
723 0           my $selecttimeout = 0.5; # Half a second
724 0 0         if($hasoutbufferwork) {
725 0           $selecttimeout = 0.05;
726             }
727              
728 0           my @inclients = $selector->can_read($selecttimeout);
729 0           foreach my $clientsocket (@inclients) {
730 0           my $cid = $clientsocket->_getClientID();
731              
732 0           my $totalread = 0;
733 0           my $readchunksleft = 3;
734 0           while(1) {
735 0           my $rawbuffer;
736 0           my $readok = 0;
737 0           eval {
738 0           sysread($clients{$cid}->{socket}, $rawbuffer, 1_000_000); # Read at most 1 Meg at a time
739 0           $readok = 1;
740             };
741 0 0         if(!$readok) {
742 0           push @toremove, $cid;
743 0           last;
744             }
745 0 0 0       if(defined($rawbuffer) && length($rawbuffer)) {
746 0           $totalread += length($rawbuffer);
747 0           push @{$clients{$cid}->{charbuffer}}, split//, $rawbuffer;
  0            
748 0           $readchunksleft--;
749 0 0         if(!$readchunksleft) {
750 0           last;
751             }
752 0           next;
753             }
754 0           last;
755             }
756            
757             # Check if we could read data from a socket that was marked as readable.
758             # Thanks to SSL, this might ocxasionally fail. Don't bail out at the first
759             # error, only if multiple happen one after the other
760 0 0         if($totalread) {
761 0           $clients{$cid}->{failcount} = 0;
762             } else {
763 0           $clients{$cid}->{failcount}++;
764            
765 0 0         if($clients{$cid}->{failcount} > 5) {
766             # Socket was active multiple times but delivered no data?
767             # EOF, maybe, possible, perhaps?
768 0           push @toremove, $cid;
769             }
770             }
771             }
772              
773 0           foreach my $cid (keys %clients) {
774 0           while(@{$clients{$cid}->{charbuffer}}) {
  0            
775 0           my $buf = shift @{$clients{$cid}->{charbuffer}};
  0            
776              
777 0           $workCount++;
778 0 0         if($buf eq "\r") {
    0          
779 0           next;
780             } elsif($buf eq "\n") {
781 0 0         next if($clients{$cid}->{buffer} eq ''); # Empty lines
782              
783             my %inmsg = (
784             message => $clients{$cid}->{buffer},
785             releasetime => $now + $clients{$cid}->{inmessagedelay},
786 0           );
787 0           push @{$clients{$cid}->{inmessages}}, \%inmsg;
  0            
788 0           $clients{$cid}->{buffer} = '';
789             } else {
790 0           $clients{$cid}->{buffer} .= $buf;
791             }
792             }
793              
794 0 0 0       if($interclackslock && !$clients{$cid}->{interclacksclient}) {
795             # We are locked into interclacks sync lock, but this is not the connection to master,
796             # so we don't handle the input buffer for this client at the moment.
797 0           next;
798             }
799              
800              
801             # ******************************************************************************
802             # ******************************************************************************
803             # ******************************************************************************
804             # ******************************************************************************
805             # ******************************************************************************
806             # ******************************************************************************
807             # ******************************************************************************
808 0           while(scalar @{$clients{$cid}->{inmessages}}) {
  0            
809 0 0         last if($clients{$cid}->{inmessages}->[0]->{releasetime} > $now);
810 0           my $inmsgtmp = shift @{$clients{$cid}->{inmessages}};
  0            
811 0           my $inmsg = $inmsgtmp->{message};
812              
813             # Handle CLACKS identification header
814 0 0         if($inmsg =~ /^CLACKS\ (.+)/) {
815 0           $clients{$cid}->{clientinfo} = $1;
816 0           $clients{$cid}->{clientinfo} =~ s/\;/\_/g;
817 0           print "Client at ", $cid, " identified as ", $clients{$cid}->{clientinfo}, "\n";
818 0           next;
819             }
820              
821 0           my $nodebug = 0;
822 0           my $sendinterclacks = 1;
823 0           my $discardafterlogging = 0;
824             # Handle OVERHEAD messages before logging (for handling 'N' flag correctly)
825 0 0         if($inmsg =~ /^OVERHEAD\ (.+?)\ (.+)/) {
826 0           my ($flags, $value) = ($1, $2);
827 0           $sendinterclacks = 0;
828 0           my @flagparts = split//, $flags;
829 0           my %parsedflags;
830             my %newflags;
831 0           foreach my $key (sort keys %overheadflags) {
832 0 0         if(contains($key, \@flagparts)) {
833 0           $parsedflags{$overheadflags{$key}} = 1;
834 0           $newflags{$overheadflags{$key}} = 1;
835             } else {
836 0           $parsedflags{$overheadflags{$key}} = 0;
837 0           $newflags{$overheadflags{$key}} = 0;
838             }
839             }
840              
841 0 0         if($parsedflags{auth_token}) {
842 0 0         if($value eq $self->{authtoken}) {
843 0           $clients{$cid}->{authok} = 1;
844             #$clients{$cid}->{outbuffer} .= "OVERHEAD O Welcome!\r\n";
845 0           push @{$clients{$cid}->{outmessages}}, {releasetime => $now + $clients{$cid}->{outmessagedelay}, message => 'OVERHEAD O Welcome!'};
  0            
846             } else {
847 0           $clients{$cid}->{authok} = 0;
848             #$clients{$cid}->{outbuffer} .= "OVERHEAD F Login failed!\r\n";
849 0           push @{$clients{$cid}->{outmessages}}, {releasetime => $now + $clients{$cid}->{outmessagedelay}, message => 'OVERHEAD F Login failed!'};
  0            
850 0           push @{$clients{$cid}->{outmessages}}, {releasetime => $now + $clients{$cid}->{outmessagedelay}, message => 'EXIT'};
  0            
851 0           push @toremove, $cid; # Disconnect the client
852 0           last;
853             }
854             }
855              
856             # Ignore other command when not authenticated
857 0 0         if(!$clients{$cid}->{authok}) {
858 0           next;
859             }
860              
861 0 0         if($parsedflags{timestamp}) {
862 0           $now = $self->getTime(); # Make sure we are at the "latest" $now. This is one of the very few critical sections
863 0           $clients{$cid}->{client_timeoffset} = $now - $value;
864 0           print "**** CLIENT TIME OFFSET: ", $clients{$cid}->{client_timeoffset}, "\n";
865 0           next;
866             }
867              
868 0 0 0       if($parsedflags{lock_for_sync} && $clients{$cid}->{interclacksclient}) {
869 0 0         if($value) {
870 0           print "Interclacks sync lock ON.\n";
871 0           $interclackslock = 1;
872             } else {
873 0           print "Interclacks sync lock OFF.\n";
874 0           $interclackslock = 0;
875              
876             # Send server our keys AFTER we got everything FROM the server (e.g. after unlock)
877 0           $clients{$cid}->{outbuffer} .= "OVERHEAD T " . $self->getTime() . "\r\n"; # Send local time to server for offset calculation
878 0           foreach my $ckey (sort keys %{$self->{clackscache}}) {
  0            
879 0           $clients{$cid}->{outbuffer} .= "KEYSYNC " . $self->{clackscachetime}->{$ckey} . " " . $self->{clackscacheaccesstime}->{$ckey} . " U $ckey=" . $self->{clackscache}->{$ckey} . "\r\n";
880             }
881 0           foreach my $ckey (sort keys %{$self->{clackscachetime}}) {
  0            
882 0 0         next if(defined($self->{clackscache}->{$ckey}));
883 0           $clients{$cid}->{outbuffer} .= "KEYSYNC " . $self->{clackscachetime}->{$ckey} . " 0 D $ckey=REMOVED\r\n";
884             }
885             }
886 0           $parsedflags{forward_message} = 0; # Don't forward
887 0           $newflags{return_to_sender} = 0; # Don't return to sender
888             }
889              
890 0 0 0       if($parsedflags{close_all_connections} && $value) {
891 0           foreach my $closecid (keys %clients) {
892 0 0 0       if($clients{$closecid}->{interclacks} && $parsedflags{forward_message}) {
893 0           $self->evalsyswrite($clients{$closecid}->{socket}, "\r\nOVERHEAD GC 1\r\n");
894             }
895 0           $self->evalsyswrite($clients{$closecid}->{socket}, "\r\nQUIT\r\n");
896 0           push @toremove, $closecid;
897             }
898 0           $parsedflags{forward_message} = 0; # Already forwarded where needed
899             }
900              
901 0 0         if($parsedflags{shutdown_service}) {
902 0           $value = 0 + $value;
903 0 0         if($value > 0) {
904 0           $shutdowntime = $value + $now;
905 0           print STDERR "Shutting down in $value seconds\n";
906             }
907             }
908 0 0         if($parsedflags{discard_message}) {
909 0           $discardafterlogging = 1;
910             }
911 0 0         if($parsedflags{no_logging}) {
912 0           $nodebug = 1;
913             }
914              
915 0 0         if($parsedflags{error_message}) {
916 0           print STDERR 'ERROR from ', $cid, ': ', $value, "\n";
917             }
918              
919 0 0         if($parsedflags{set_interclacks_mode}) {
920 0           $newflags{forward_message} = 0;
921 0           $newflags{return_to_sender} = 0;
922              
923 0 0         if($value) {
924 0           $clients{$cid}->{interclacks} = 1;
925 0           $clients{$cid}->{lastping} = $now;
926              
927              
928             $clients{$cid}->{outbuffer} .= "CLACKS PageCamel $VERSION in interclacks master mode\r\n" . # Tell client we are in interclacks master mode
929             "OVERHEAD M Authentication required\r\n" . # Informal message
930 0           "OVERHEAD A " . $self->{authtoken} . "\r\n" . # ...and send Auth token...
931             "OVERHEAD L 1\r\n" . # ...and lock client for sync
932             "OVERHEAD T " . time . "\r\n"; # ... and send local timestamp
933              
934             # Make sure our new interclacks client has an *exact* copy of our buffer
935             #$clients{$cid}->{outbuffer} .= "CLEARCACHE\r\n";
936 0           foreach my $ckey (sort keys %{$self->{clackscache}}) {
  0            
937 0           $clients{$cid}->{outbuffer} .= "KEYSYNC " . $self->{clackscachetime}->{$ckey} . " " . $self->{clackscacheaccesstime}->{$ckey} . " U $ckey=" . $self->{clackscache}->{$ckey} . "\r\n";
938             }
939 0           foreach my $ckey (sort keys %{$self->{clackscachetime}}) {
  0            
940 0 0         next if(defined($self->{clackscache}->{$ckey}));
941 0           $clients{$cid}->{outbuffer} .= "KEYSYNC " . $self->{clackscachetime}->{$ckey} . " 0 D $ckey=REMOVED\r\n";
942             }
943 0           $clients{$cid}->{outbuffer} .= "OVERHEAD L 0\r\n"; # unlock client after sync
944 0           $clients{$cid}->{outbuffer} .= "PING\r\n";
945 0           $clients{$cid}->{lastinterclacksping} = $now;
946             } else {
947 0           $clients{$cid}->{interclacks} = 0;
948 0           $clients{$cid}->{lastping} = $now;
949             }
950             }
951              
952 0           my $newflagstring = '';
953 0           $newflags{return_to_sender} = 0;
954              
955 0           foreach my $key (sort keys %overheadflags) {
956 0 0         next if($key eq 'Z');
957 0 0         if($newflags{$overheadflags{$key}}) {
958 0           $newflagstring .= $key;
959             }
960             }
961 0 0         if($newflagstring eq '') {
962 0           $newflagstring = 'Z';
963             }
964              
965 0 0         if($parsedflags{forward_message}) {
966 0           foreach my $overheadcid (keys %clients) {
967 0 0 0       next if($cid eq $overheadcid && !$parsedflags{return_to_sender});
968              
969 0           $clients{$overheadcid}->{outbuffer} .= "OVERHEAD $newflagstring $value\r\n";
970             }
971             }
972             }
973              
974             # Ignore other command when not authenticated
975 0 0         if(!$clients{$cid}->{authok}) {
976 0           next;
977             }
978              
979 0 0         if(!$nodebug) {
980             # Add ALL incoming messages as debug-type messages to the outbox
981 0           my %tmp = (
982             sender => $cid,
983             type => 'DEBUG',
984             data => $inmsg,
985             );
986              
987 0           push @outbox, \%tmp;
988             }
989              
990 0 0         if($discardafterlogging) {
991 0           next;
992             }
993              
994              
995 0 0 0       if($inmsg =~ /^OVERHEAD\ /) { ## no critic (ControlStructures::ProhibitCascadingIfElse)
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
996             # Already handled
997             } elsif($inmsg =~ /^LISTEN\ (.*)/) {
998 0           $clients{$cid}->{listening}->{$1} = 1;
999 0           $sendinterclacks = 0;
1000             } elsif($inmsg =~ /^UNLISTEN\ (.*)/) {
1001 0           delete $clients{$cid}->{listening}->{$1};
1002 0           $sendinterclacks = 0;
1003             } elsif($inmsg =~ /^MONITOR/) {
1004 0           $clients{$cid}->{mirror} = 1;
1005 0           $sendinterclacks = 0;
1006             } elsif($inmsg =~ /^UNMONITOR/) {
1007 0           $clients{$cid}->{mirror} = 0;
1008 0           $sendinterclacks = 0;
1009             } elsif($inmsg =~ /^QUIT/) {
1010 0           print STDERR "Client disconnected cleanly!\n";
1011 0           push @toremove, $cid;
1012 0           $sendinterclacks = 0;
1013             } elsif($inmsg =~ /^TIMEOUT/ && $clients{$cid}->{interclacks}) {
1014 0           print STDERR "Ooops, didn't send timely PINGS through interclacks link!\n";
1015 0           push @toremove, $cid;
1016 0           $sendinterclacks = 0;
1017             } elsif($inmsg =~ /^PING/) {
1018 0           $clients{$cid}->{lastping} = $now;
1019 0           $sendinterclacks = 0;
1020             } elsif($inmsg =~ /^NOPING/) {
1021             # Disable PING check until next PING recieved
1022 0           $clients{$cid}->{lastping} = 0;
1023 0           $sendinterclacks = 0;
1024             } elsif($inmsg =~ /^NOTIFY\ (.*)/) {
1025 0           my %tmp = (
1026             sender => $cid,
1027             type => 'NOTIFY',
1028             name => $1,
1029             );
1030 0           push @outbox, \%tmp;
1031             } elsif($inmsg =~ /^SET\ (.+?)\=(.*)/) {
1032 0           my %tmp = (
1033             sender => $cid,
1034             type => 'SET',
1035             name => $1,
1036             value => $2,
1037             );
1038 0           push @outbox, \%tmp;
1039             } elsif($inmsg =~ /^KEYSYNC\ (.+?)\ (.+?)\ (.+?)\ (.+?)\=(.*)/) {
1040             #print "***** ", $inmsg, "\n";
1041 0           my ($ctimestamp, $atimestamp, $cmode, $ckey, $cval) = ($1, $2, $3, $4, $5);
1042 0           $clients{$cid}->{lastping} = $now; # KEYSYNC acts as a PING as well
1043              
1044 0           $ctimestamp += $clients{$cid}->{client_timeoffset}; # Take client time offset into account
1045 0 0         if($atimestamp) {
1046 0           $atimestamp += $clients{$cid}->{client_timeoffset}; # Take client time offset into account
1047             }
1048              
1049 0 0         if(!defined($self->{clackscachetime}->{$ckey})) {
1050 0           $self->{clackscachetime}->{$ckey} = 0;
1051             }
1052 0 0 0       if(!defined($self->{clackscachetime}->{$ckey}) || $ctimestamp > $self->{clackscachetime}->{$ckey}) {
1053             # If *we* have the older entry (or none at all), *only* then work on the keysync command
1054 0 0         if($cmode eq "U") { # "Update"
1055 0           $self->{clackscache}->{$ckey} = $cval;
1056 0           $self->{clackscachetime}->{$ckey} = $ctimestamp;
1057 0           $self->{clackscacheaccesstime}->{$ckey} = $atimestamp;
1058             } else { # REMOVE request from server
1059 0           $self->{clackscachetime}->{$ckey} = $ctimestamp;
1060 0 0         if(defined($self->{clackscache}->{$ckey})) {
1061 0           delete $self->{clackscache}->{$ckey};
1062             }
1063 0 0         if(defined($self->{clackscacheaccesstime}->{$ckey})) {
1064 0           delete $self->{clackscacheaccesstime}->{$ckey};
1065             }
1066             }
1067             }
1068              
1069 0           $savecache = 1;
1070 0           $sendinterclacks = 1;
1071             } elsif($inmsg =~ /^STORE\ (.+?)\=(.*)/) {
1072 0           $self->{clackscache}->{$1} = $2;
1073 0           $self->{clackscachetime}->{$1} = $now;
1074 0           $self->{clackscacheaccesstime}->{$1} = $now;
1075 0           $savecache = 1;
1076             } elsif($inmsg =~ /^SETANDSTORE\ (.+?)\=(.*)/) {
1077 0           my %tmp = (
1078             sender => $cid,
1079             type => 'SETANDSTORE',
1080             name => $1,
1081             value => $2,
1082             );
1083 0           push @outbox, \%tmp;
1084 0           $self->{clackscache}->{$tmp{name}} = $tmp{value};
1085 0           $self->{clackscachetime}->{$tmp{name}} = $now;
1086 0           $self->{clackscacheaccesstime}->{$tmp{name}} = $now;
1087 0           $savecache = 1;
1088             } elsif($inmsg =~ /^RETRIEVE\ (.+)/) {
1089             #$clients{$cid}->{outbuffer} .= "SET ". $line->{name} . "=" . $line->{value} . "\r\n";
1090 0           my $ckey = $1;
1091 0 0         if(defined($self->{clackscache}->{$ckey})) {
1092 0           $clients{$cid}->{outbuffer} .= "RETRIEVED $ckey=" . $self->{clackscache}->{$ckey} . "\r\n";
1093 0           $self->{clackscacheaccesstime}->{$ckey} = $now;
1094 0           $savecache = 1;
1095             } else {
1096 0           $clients{$cid}->{outbuffer} .= "NOTRETRIEVED $ckey\r\n";
1097             }
1098 0           $sendinterclacks = 0;
1099             } elsif($inmsg =~ /^REMOVE\ (.+)/) {
1100 0           my $ckey = $1;
1101 0 0         if(defined($self->{clackscache}->{$ckey})) {
1102 0           delete $self->{clackscache}->{$ckey};
1103 0           $self->{clackscachetime}->{$ckey} = $now;
1104             }
1105 0 0         if(defined($self->{clackscacheaccesstime}->{$ckey})) {
1106 0           delete $self->{clackscacheaccesstime}->{$ckey};
1107             }
1108 0           $savecache = 1;
1109             } elsif($inmsg =~ /^INCREMENT\ (.+)/) {
1110 0           my $ckey = $1;
1111 0           my $cval = 1;
1112 0 0         if($ckey =~ /(.+)\=(.+)/) {
1113 0           ($ckey, $cval) = ($1, $2);
1114 0           $cval = 0 + $cval;
1115             }
1116 0 0         if(defined($self->{clackscache}->{$ckey})) {
1117 0           $self->{clackscache}->{$ckey} += $cval;
1118             } else {
1119 0           $self->{clackscache}->{$ckey} = $cval;
1120             }
1121 0           $self->{clackscachetime}->{$ckey} = $now;
1122 0           $self->{clackscacheaccesstime}->{$ckey} = $now;
1123 0           $savecache = 1;
1124             } elsif($inmsg =~ /^DECREMENT\ (.+)/) {
1125 0           my $ckey = $1;
1126 0           my $cval = 1;
1127 0 0         if($ckey =~ /(.+)\=(.+)/) {
1128 0           ($ckey, $cval) = ($1, $2);
1129 0           $cval = 0 + $cval;
1130             }
1131 0 0         if(defined($self->{clackscache}->{$ckey})) {
1132 0           $self->{clackscache}->{$ckey} -= $cval;
1133             } else {
1134 0           $self->{clackscache}->{$ckey} = 0 - $cval;
1135             }
1136 0           $self->{clackscachetime}->{$ckey} = $now;
1137 0           $self->{clackscacheaccesstime}->{$ckey} = $now;
1138 0           $savecache = 1;
1139             } elsif($inmsg =~ /^KEYLIST/) {
1140 0           $clients{$cid}->{outbuffer} .= "KEYLISTSTART\r\n";
1141 0           foreach my $ckey (sort keys %{$self->{clackscache}}) {
  0            
1142 0           $clients{$cid}->{outbuffer} .= "KEY $ckey\r\n";
1143             }
1144 0           $clients{$cid}->{outbuffer} .= "KEYLISTEND\r\n";
1145 0           $sendinterclacks = 0;
1146             } elsif($inmsg =~ /^CLEARCACHE/) {
1147 0           %{$self->{clackscache}} = ();
  0            
1148 0           %{$self->{clackscachetime}} = ();
  0            
1149 0           %{$self->{clackscacheaccesstime}} = ();
  0            
1150 0           $savecache = 1;
1151              
1152             # local managment commands
1153             } elsif($inmsg =~ /^CLIENTLIST/) {
1154 0           $clients{$cid}->{outbuffer} .= "CLIENTLISTSTART\r\n";
1155 0           foreach my $lmccid (sort keys %clients) {
1156             $clients{$cid}->{outbuffer} .= "CLIENT CID=$lmccid;" .
1157             "HOST=" . $clients{$lmccid}->{host} . ";" .
1158             "PORT=" . $clients{$lmccid}->{port} . ";" .
1159             "CLIENTINFO=" . $clients{$lmccid}->{clientinfo} . ";" .
1160             "OUTBUFFER_LENGTH=" . length($clients{$lmccid}->{outbuffer}) . ";" .
1161             "INBUFFER_LENGTH=" . length($clients{$lmccid}->{buffer}) . ";" .
1162             "INTERCLACKS=" . $clients{$lmccid}->{interclacks} . ";" .
1163             "MONITOR=" . $clients{$lmccid}->{mirror} . ";" .
1164             "LASTPING=" . $clients{$lmccid}->{lastping} . ";" .
1165 0           "LASTINTERCLACKSPING=" . $clients{$lmccid}->{lastinterclacksping} . ";" .
1166             "\r\n";
1167             }
1168 0           $clients{$cid}->{outbuffer} .= "CLIENTLISTEND\r\n";
1169 0           $sendinterclacks = 0;
1170             } elsif($inmsg =~ /^CLIENTDISCONNECT\ (.+)/) {
1171 0           my $lmccid = $1;
1172 0 0         if(defined($clients{$lmccid})) {
1173             # Try to notify the client (may or may not work);
1174 0           $self->evalsyswrite($clients{$lmccid}->{socket}, "\r\nQUIT\r\n");
1175 0           push @toremove, $lmccid;
1176             }
1177 0           $sendinterclacks = 0;
1178             } elsif($inmsg =~ /^FLUSH\ (.+)/) {
1179 0           my $retid = $1;
1180 0           $clients{$cid}->{outbuffer} .= "FLUSHED $retid\r\n";
1181 0           $sendinterclacks = 0;
1182             } else {
1183 0           print STDERR "ERROR Unknown_command ", $inmsg, "\r\n";
1184 0           $sendinterclacks = 0;
1185 0           $clients{$cid}->{outbuffer} .= "OVERHEAD E unknown_command " . $inmsg . "\r\n";
1186             }
1187              
1188             # forward interclacks messages
1189 0 0         if($sendinterclacks) {
1190 0           foreach my $interclackscid (keys %clients) {
1191 0 0 0       if($cid eq $interclackscid || !$clients{$interclackscid}->{interclacks}) {
1192 0           next;
1193             }
1194 0           $clients{$interclackscid}->{outbuffer} .= $inmsg . "\r\n";
1195             }
1196             }
1197              
1198             }
1199              
1200             }
1201              
1202             # clean up very old "deleted" entries
1203 0           my $stillvalidtime = $now - $self->{config}->{deletedcachetime};
1204 0           foreach my $key (keys %{$self->{clackscachetime}}) {
  0            
1205 0 0         next if($self->{clackscachetime}->{$key} > $stillvalidtime);
1206 0 0         if(defined($self->{clackscache}->{$key})) { # Still has data? Fix clackscachetime entry
1207 0           $self->{clackscachetime}->{$key} = $now;
1208             }
1209 0           delete $self->{clackscachetime}->{$key};
1210 0           $savecache = 1;
1211             }
1212              
1213             # Clean up (forget) stale cached entries
1214 0           $stillvalidtime = $now - $self->{config}->{stalecachetime};
1215 0           foreach my $key (keys %{$self->{clackscacheaccesstime}}) {
  0            
1216 0 0         next if($self->{clackscacheaccesstime}->{$key} > $stillvalidtime);
1217 0           delete $self->{clackscacheaccesstime}->{$key};
1218 0 0         if(defined($self->{clackscache})) {
1219 0           delete $self->{clackscache}->{$key};
1220             }
1221 0 0         if(defined($self->{clackscachetime})) {
1222 0           delete $self->{clackscachetime}->{$key};
1223             }
1224              
1225 0           my %tmp = (
1226             sender => 'SERVERCACHE',
1227             type => 'DEBUG',
1228             data => 'FORGET=' . $key,
1229             );
1230              
1231 0           push @outbox, \%tmp;
1232             }
1233              
1234              
1235             # Outbox contains the messages that have to be forwarded to the clients when listening (or when the connection is in interclacks mode)
1236             # We iterate over the outbox and put those messages into the output buffers of the corresponding client connection
1237 0           while((my $line = shift @outbox)) {
1238 0           $workCount++;
1239 0           foreach my $cid (keys %clients) {
1240 0 0 0       if($line->{type} eq 'DEBUG' && $clients{$cid}->{mirror}) {
1241 0           $clients{$cid}->{outbuffer} .= "DEBUG " . $line->{sender} . "=". $line->{data} . "\r\n";
1242             }
1243              
1244 0 0         if($cid eq $line->{sender}) {
1245 0           next;
1246             }
1247              
1248 0 0 0       if($line->{type} ne 'DEBUG' && defined($clients{$cid}->{listening}->{$line->{name}})) {
1249             # Just buffer in the clients outbuffers
1250 0 0         if($line->{type} eq 'NOTIFY') {
    0          
    0          
1251 0           $clients{$cid}->{outbuffer} .= "NOTIFY ". $line->{name} . "\r\n";
1252             } elsif($line->{type} eq 'SET') {
1253 0           $clients{$cid}->{outbuffer} .= "SET ". $line->{name} . "=" . $line->{value} . "\r\n";
1254             } elsif($line->{type} eq 'SETANDSTORE') {
1255             # We forward SETANDSTORE as such only over interclacks connections. Basic clients don't have a cache,
1256             # so we only send a SET command
1257 0 0         if($clients{$cid}->{interclacks}) {
1258 0           $clients{$cid}->{outbuffer} .= "SETANDSTORE ". $line->{name} . "=" . $line->{value} . "\r\n";
1259             } else {
1260 0           $clients{$cid}->{outbuffer} .= "SET ". $line->{name} . "=" . $line->{value} . "\r\n";
1261             }
1262             }
1263             }
1264             }
1265             }
1266              
1267              
1268             # Push all messages that can be released at this time into the corresponding char based output for each client
1269 0           foreach my $cid (keys %clients) {
1270 0           while(scalar @{$clients{$cid}->{outmessages}}) {
  0            
1271 0 0         last if($clients{$cid}->{outmessages}->[0]->{releasetime} > $now);
1272              
1273 0           my $outmsg = shift @{$clients{$cid}->{outmessages}};
  0            
1274 0 0         if($outmsg->{message} eq 'EXIT') {
1275 0           push @toremove, $cid; # Disconnect the client
1276             } else {
1277 0           $clients{$cid}->{outbuffer} .= $outmsg->{message} . "\r\n";
1278             }
1279             }
1280             }
1281              
1282             # ******************************************************************************
1283             # ******************************************************************************
1284             # ******************************************************************************
1285             # ******************************************************************************
1286             # ******************************************************************************
1287             # ******************************************************************************
1288             # ******************************************************************************
1289              
1290             # Send as much as possible
1291 0           foreach my $cid (keys %clients) {
1292 0 0         if(length($clients{$cid}->{outbuffer})) {
    0          
1293 0           $clients{$cid}->{lastmessage} = $now;
1294             } elsif(($clients{$cid}->{lastmessage} + 60) < $now) {
1295 0           $clients{$cid}->{lastmessage} = $now;
1296 0           $clients{$cid}->{outbuffer} .= "NOP\r\n"; # send "No OPerations" command, just to
1297             # check if socket is still open
1298             }
1299              
1300 0 0         next if(!length($clients{$cid}->{outbuffer}));
1301              
1302             # Output bandwidth-limited stuff, in as big chunks as possible
1303 0           my $written;
1304 0           $workCount++;
1305 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
1306 0           $written = syswrite($clients{$cid}->{socket}, $clients{$cid}->{outbuffer});
1307             };
1308 0 0         if($EVAL_ERROR) {
1309 0           print STDERR "Write error: $EVAL_ERROR\n";
1310 0           push @toremove, $cid;
1311 0           next;
1312             }
1313 0 0 0       if(!$clients{$cid}->{socket}->opened || $clients{$cid}->{socket}->error || ($ERRNO ne '' && !$ERRNO{EWOULDBLOCK})) {
      0        
      0        
1314 0           print STDERR "webPrint write failure: $ERRNO\n";
1315 0           push @toremove, $cid;
1316 0           next;
1317             }
1318              
1319 0 0 0       if(defined($written) && $written) {
1320 0 0         if(length($clients{$cid}->{outbuffer}) == $written) {
1321 0           $clients{$cid}->{outbuffer} = '';
1322             } else {
1323 0           $clients{$cid}->{outbuffer} = substr($clients{$cid}->{outbuffer}, $written);
1324             }
1325             }
1326             }
1327              
1328 0 0         if($workCount) {
    0          
1329 0           $self->{usleep} = 0;
1330             } elsif($self->{usleep} < $self->{config}->{throttle}->{maxsleep}) {
1331 0           $self->{usleep} += $self->{config}->{throttle}->{step};
1332             }
1333 0 0         if($self->{usleep}) {
1334 0           sleep($self->{usleep} / 1000);
1335             }
1336             }
1337              
1338 0           print "Shutting down...\n";
1339              
1340             # Make sure we save the latest version of the persistance file
1341 0           $self->savePersistanceFile($savecache);
1342              
1343 0           sleep(0.5);
1344 0           foreach my $cid (keys %clients) {
1345 0           print "Removing client $cid\n";
1346             # Try to notify the client (may or may not work);
1347 0           $self->evalsyswrite($clients{$cid}->{socket}, "\r\nQUIT\r\n");
1348              
1349 0           delete $clients{$cid};
1350             }
1351 0           print "All clients removed\n";
1352              
1353              
1354 0           return;
1355             }
1356             sub savePersistanceFile {
1357 0     0 0   my ($self, $savecache) = @_;
1358              
1359 0 0         if(!$self->{persistance}) {
1360 0           return;
1361             }
1362              
1363 0           print "Saving persistance file\n";
1364 0           my $line = Dump($self->{clackscache});
1365 0           $line = encode_base64($line, '');
1366 0           my $timestampline = Dump($self->{clackscachetime});
1367 0           $timestampline = encode_base64($timestampline, '');
1368 0           my $accesstimeline = Dump($self->{clackscacheaccesstime});
1369 0           $accesstimeline = encode_base64($accesstimeline, '');
1370              
1371 0           my $tempfname = $self->{config}->{persistancefile} . '_';
1372 0           my $backfname = $self->{config}->{persistancefile} . '_bck';
1373 0 0         if($savecache == 1) {
1374             # Normal savecache operation only
1375 0           copy($self->{config}->{persistancefile}, $backfname);
1376             }
1377              
1378 0 0         if(open(my $ofh, '>', $tempfname)) {
1379 0           print $ofh $line, "\n";
1380 0           print $ofh $timestampline, "\n";
1381 0           print $ofh $accesstimeline, "\n";
1382 0           print $ofh "ENDBYTES\n";
1383 0           close $ofh;
1384             }
1385 0           move($tempfname, $self->{config}->{persistancefile});
1386              
1387 0 0         if($savecache == 2) {
1388             # Need to make sure we have a valid backup file, since we had a general problem while loading
1389 0           copy($self->{config}->{persistancefile}, $backfname);
1390             }
1391              
1392 0           return;
1393             }
1394              
1395             sub deref {
1396 0     0 1   my ($self, $val) = @_;
1397              
1398 0 0         return if(!defined($val));
1399              
1400 0   0       while(ref($val) eq "SCALAR" || ref($val) eq "REF") {
1401 0           $val = ${$val};
  0            
1402 0 0         last if(!defined($val));
1403             }
1404              
1405 0           return $val;
1406             }
1407              
1408             sub evalsyswrite {
1409 0     0 0   my ($self, $socket, $buffer) = @_;
1410              
1411 0 0         return 0 unless(length($buffer));
1412              
1413 0           my $written = 0;
1414 0           my $ok = 0;
1415 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
1416 0           $written = syswrite($socket, $buffer);
1417 0           $ok = 1;
1418             };
1419 0 0 0       if($EVAL_ERROR || !$ok) {
1420 0           print STDERR "Write error: $EVAL_ERROR\n";
1421 0           return -1;
1422             }
1423              
1424 0           return $written;
1425             }
1426              
1427             sub getTime {
1428 0     0 0   my ($self) = @_;
1429              
1430 0           my $now = time + $self->{timeoffset};
1431              
1432 0           return $now;
1433             }
1434              
1435             sub slurpBinFile {
1436 0     0 0   my $fname = shift;
1437              
1438             # Read in file in binary mode, slurping it into a single scalar.
1439             # We have to make sure we use binmode *and* turn on the line termination variable completly
1440             # to work around the multiple idiosynchrasies of Perl on Windows
1441 0 0         open(my $fh, "<", $fname) or croak($ERRNO);
1442 0           local $INPUT_RECORD_SEPARATOR = undef;
1443 0           binmode($fh);
1444 0           my $data = <$fh>;
1445 0           close($fh);
1446              
1447 0           return $data;
1448             }
1449              
1450              
1451              
1452             1;
1453             __END__