File Coverage

blib/lib/Net/Clacks/Client.pm
Criterion Covered Total %
statement 56 767 7.3
branch 0 334 0.0
condition 0 114 0.0
subroutine 19 51 37.2
pod 30 30 100.0
total 105 1296 8.1


line stmt bran cond sub pod time code
1             package Net::Clacks::Client;
2             #---AUTOPRAGMASTART---
3 2     2   549 use v5.36;
  2         4  
4 2     2   13 use strict;
  2         4  
  2         36  
5 2     2   5 use diagnostics;
  2         4  
  2         11  
6 2     2   51 use mro 'c3';
  2         4  
  2         10  
7 2     2   44 use English qw(-no_match_vars);
  2         2  
  2         13  
8 2     2   644 use Carp qw[carp croak confess cluck longmess shortmess];
  2         3  
  2         139  
9             our $VERSION = 35;
10 2     2   8 use autodie qw( close );
  2         3  
  2         13  
11 2     2   599 use Array::Contains;
  2         3  
  2         80  
12 2     2   8 use utf8;
  2         3  
  2         11  
13 2     2   59 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  2         2  
  2         71  
14 2     2   8 use Data::Dumper;
  2         2  
  2         97  
15 2     2   13 use builtin qw[true false is_bool];
  2         4  
  2         53  
16 2     2   6 no warnings qw(experimental::builtin); ## no critic (TestingAndDebugging::ProhibitNoWarnings)
  2         3  
  2         87  
17             #---AUTOPRAGMAEND---
18              
19 2     2   580 use IO::Socket::IP;
  2         51269  
  2         16  
20             #use IO::Socket::UNIX;
21 2     2   1311 use Time::HiRes qw[sleep usleep time];
  2         4  
  2         20  
22 2     2   958 use Sys::Hostname;
  2         1596  
  2         144  
23 2     2   617 use IO::Select;
  2         2205  
  2         108  
24 2     2   1151 use IO::Socket::SSL;
  2         98233  
  2         19  
25 2     2   1391 use MIME::Base64;
  2         1097  
  2         16611  
26              
27 0     0 1   sub new($class, $server, $port, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
28 0           my $self = bless {}, $class;
29              
30 0 0 0       if(!defined($server) || !length($server)) {
31 0           croak("server not defined!");
32             }
33 0 0 0       if(!defined($port) || !length($port)) {
34 0           croak("port not defined!");
35             }
36 0 0 0       if(!defined($username) || !length($username)) {
37 0           croak("username not defined!");
38             }
39 0 0 0       if(!defined($password) || !length($password)) {
40 0           croak("password not defined!");
41             }
42 0 0 0       if(!defined($clientname) || !length($clientname)) {
43 0           croak("clientname not defined!");
44             }
45              
46 0           $self->{server} = $server;
47 0           $self->{port} = $port;
48              
49 0           $self->init($username, $password, $clientname, $iscaching);
50              
51 0           return $self;
52             }
53              
54 0     0 1   sub newSocket($class, $socketpath, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
55 0           my $self = bless {}, $class;
56              
57 0 0 0       if(!defined($socketpath) || !length($socketpath)) {
58 0           croak("socketpath not defined!");
59             }
60 0 0 0       if(!defined($username) || !length($username)) {
61 0           croak("username not defined!");
62             }
63 0 0 0       if(!defined($password) || !length($password)) {
64 0           croak("password not defined!");
65             }
66 0 0 0       if(!defined($clientname) || !length($clientname)) {
67 0           croak("clientname not defined!");
68             }
69              
70 0           my $udsloaded = 0;
71 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
72 0           require IO::Socket::UNIX;
73 0           $udsloaded = 1;
74             };
75 0 0         if(!$udsloaded) {
76 0           croak("Specified a unix domain socket, but i couldn't load IO::Socket::UNIX!");
77             }
78              
79 0           $self->{socketpath} = $socketpath;
80              
81 0           $self->init($username, $password, $clientname, $iscaching);
82              
83 0           return $self;
84             }
85              
86 0     0 1   sub init($self, $username, $password, $clientname, $iscaching) {
  0            
  0            
  0            
  0            
  0            
  0            
87 0 0 0       if(!defined($username) || $username eq '') {
88 0           croak("Username not defined!");
89             }
90 0 0 0       if(!defined($password) || $password eq '') {
91 0           croak("Password not defined!");
92             }
93              
94 0 0 0       if(!defined($clientname || $clientname eq '')) {
95 0           croak("Clientname not defined!");
96             }
97 0           $self->{clientname} = $clientname;
98              
99 0           $self->{authtoken} = encode_base64($username, '') . ':' . encode_base64($password, '');
100              
101 0 0         if(!defined($iscaching)) {
102 0           $iscaching = 0;
103             }
104 0           $self->{iscaching} = $iscaching;
105              
106 0 0         if($self->{iscaching}) {
107 0           $self->{cache} = {};
108             }
109              
110 0           $self->{needreconnect} = 1;
111 0           $self->{inlines} = [];
112 0           $self->{firstconnect} = 1;
113              
114 0           $self->{memcached_compatibility} = 0;
115              
116             $self->{remembrancenames} = [
117 0           'Ivy Bdubs',
118             'Terry Pratchett',
119             'Sven Guckes',
120             'Sheila', # faithful four-legged family member of @NightStorm_KPC
121             ];
122 0           $self->{remembranceinterval} = 3600; # One hour
123 0           $self->{nextremembrance} = time + $self->{remembranceinterval};
124              
125 0           $self->reconnect();
126              
127 0           return;
128             }
129              
130 0     0 1   sub reconnect($self) {
  0            
  0            
131             # Clean up old selector before deleting socket
132 0 0 0       if(defined($self->{selector}) && defined($self->{socket})) {
133 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
134 0           $self->{selector}->remove($self->{socket});
135             };
136             }
137 0 0         if(defined($self->{socket})) {
138 0           delete $self->{socket};
139             }
140 0           undef $self->{selector};
141              
142 0 0         if(!$self->{firstconnect}) {
143             # Not our first connection (=real reconnect).
144             # wait a short random time before reconnecting. In case all
145             # clients got disconnected, we want to avoid having all clients reconnect
146             # at the exact same time
147 0           my $waittime = rand(4000)/1000;
148 0           sleep($waittime);
149             }
150              
151 0           my $socket;
152 0 0 0       if(defined($self->{server}) && defined($self->{port})) {
    0          
153             $socket = IO::Socket::IP->new(
154             PeerHost => $self->{server},
155             PeerPort => $self->{port},
156 0 0         Type => SOCK_STREAM,
157             ) or croak("Failed to connect to Clacks TCP message service: $ERRNO");
158             } elsif(defined($self->{socketpath})) {
159             $socket = IO::Socket::UNIX->new(
160             Peer => $self->{socketpath},
161 0 0         Type => SOCK_STREAM,
162             ) or croak("Failed to connect to Clacks Unix Domain Socket message service: $ERRNO");
163             } else {
164 0           croak("Neither TCP nor Unix domain socket specified. Don't know where to connect to.");
165             }
166              
167             #binmode($socket, ':bytes');
168 0           $socket->blocking(0);
169              
170              
171 0 0         if(ref $socket ne 'IO::Socket::UNIX') {
172             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
173             # There is simply no point in running it over a local socket.
174 0 0         IO::Socket::SSL->start_SSL($socket,
175             SSL_verify_mode => SSL_VERIFY_NONE,
176             ) or croak("Can't use SSL: " . $SSL_ERROR);
177             }
178              
179 0           $self->{socket} = $socket;
180 0           $self->{selector} = IO::Select->new($self->{socket});
181 0           $self->{failcount} = 0;
182 0           $self->{lastping} = time;
183 0           $self->{inbuffer} = '';
184 0           $self->{incharbuffer} = [];
185 0           $self->{outbuffer} = '';
186 0           $self->{serverinfo} = 'UNKNOWN';
187 0           $self->{needreconnect} = 0;
188 0           $self->{firstline} = 1;
189 0           $self->{headertimeout} = time + 15;
190              
191             # Do *not* nuke "inlines" array, since it may hold "QUIT" messages that the client wants to handle, for example, to re-issue
192             # "LISTEN" commands.
193             # $self->{inlines} = ();
194              
195 0 0         if($self->{firstconnect}) {
196 0           $self->{firstconnect} = 0;
197             } else {
198 0           push @{$self->{inlines}}, "RECONNECTED";
  0            
199             }
200              
201             # Startup "handshake". As everything else, this is asyncronous, both server and
202             # client send their respective version strings and then wait to recieve their counterparts
203             # Also, this part is REQUIRED, just to make sure we actually speek to CLACKS protocol
204 0           $self->{outbuffer} .= 'CLACKS ' . $self->{clientname} . "\r\n";
205 0           $self->{outbuffer} .= 'OVERHEAD A ' . $self->{authtoken} . "\r\n";
206 0           $self->doNetwork();
207              
208 0           return;
209             }
210              
211 0     0 1   sub activate_memcached_compat($self) {
  0            
  0            
212 0           $self->{memcached_compatibility} = 1;
213 0           return;
214             }
215              
216 0     0 1   sub getRawSocket($self) {
  0            
  0            
217 0 0         if($self->{needreconnect}) {
218 0           $self->reconnect();
219             }
220              
221 0           return $self->{socket};
222             }
223              
224 0     0 1   sub doNetwork($self, $readtimeout = 0) {
  0            
  0            
  0            
225 0 0         if(!defined($readtimeout)) {
226             # Don't wait
227 0           $readtimeout = 0;
228             }
229              
230             # Negative read timeout means "send only"
231              
232 0 0         if($self->{needreconnect}) {
233 0           $self->reconnect();
234             }
235              
236 0 0 0       if($self->{nextremembrance} && time > $self->{nextremembrance}) {
237             # A person is not dead while their name is still spoken.
238 0           $self->{nextremembrance} = time + $self->{remembranceinterval} + int(rand($self->{remembranceinterval} / 10));
239 0           my $neverforget = $self->{remembrancenames}->[rand @{$self->{remembrancenames}}];
  0            
240 0           $self->{outbuffer} .= 'OVERHEAD GNU ' . $neverforget . "\r\n";
241             }
242              
243             # doNetwork interleaves handling incoming and outgoing traffic.
244             # This is only relevant on slow links.
245             #
246             # It returns even if the outgoing or incoming buffers are not empty
247             # (meaning that partially buffered data can exists). This way we use the
248             # available bandwidth without blocking unduly the application (we assume it's a realtime
249             # application with multiple things going on at the same time)-
250             #
251             # The downside of this is that doNetwork() needs to be called on a regular basis and sending
252             # and recieving might be delayed until the next cycle. This delay can be minimized by simply
253             # not transfering huge values over clacks, but instead using it the way it was intended to be used:
254             # Small variables can be SET directly by clacks, huge datasets should be stored in the
255             # database and the recievers only NOTIFY'd that a change has taken place.
256             #
257             # The big exception here is the here is the ClacksCache part of the story. These functions
258             # call doNetwork() in a loop until the outbuffer is empty. And depending on requirement, they
259             # KEEP on calling doNetwork() until the answer is recieved. This makes ClacksCache functions
260             # syncronous and causes some delay in the calling function. But doing these asyncronous will
261             # cause more headaches, maybe even leading up to insanity. Believe me, i tried, but those pink
262             # elephants stomping about my rubber-padded room are such a distraction...
263              
264 0           my $workCount = 0;
265              
266 0 0         if(length($self->{outbuffer})) {
267 0           my $brokenpipe = 0;
268 0           my $writeok = 0;
269 0           my $written;
270 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
271 0     0     local $SIG{PIPE} = sub { $brokenpipe = 1; };
  0            
272 0           $written = syswrite($self->{socket}, $self->{outbuffer});
273 0           $writeok = 1;
274             };
275              
276 0 0 0       if($brokenpipe || !$writeok) {
277 0           $self->{needreconnect} = 1;
278 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
279 0           return;
280             }
281              
282 0 0 0       if(defined($written) && $written) {
283 0           $workCount += $written;
284 0 0         if(length($self->{outbuffer}) == $written) {
285 0           $self->{outbuffer} = '';
286             } else {
287 0           $self->{outbuffer} = substr($self->{outbuffer}, $written);
288             }
289             }
290             }
291              
292 0 0         if($readtimeout < 0) {
293 0           return 1;
294             }
295            
296              
297             {
298 0           my @temp;
  0            
299 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
300 0           @temp = $self->{selector}->can_read($readtimeout);
301             };
302 0 0         if(scalar @temp == 0) {
303             # Timeout
304 0           return $workCount;
305             }
306             }
307              
308 0           my $totalread = 0;
309 0           while(1) {
310 0           my $buf;
311 0           my $readok = 0;
312 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
313 0           sysread($self->{socket}, $buf, 10_000); # Read in at most 10kB at once
314 0           $readok = 1;
315             };
316 0 0         if(!$readok) {
317 0           $self->{needreconnect} = 1;
318 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
319 0           return;
320             }
321 0 0 0       if(defined($buf) && length($buf)) {
322 0           $totalread += length($buf);
323             #print STDERR "+ $buf\n--\n";
324 0           push @{$self->{incharbuffer}}, split//, $buf;
  0            
325 0           next;
326             }
327 0           last;
328             }
329            
330             # Check if we actually got data after checking with can_read() first
331 0 0         if($totalread) {
332 0           $self->{failcount} = 0;
333             } else {
334             # This should normally not happen, but thanks to SSL, it sometimes does
335             # We ignore single instances of those but disconnect if many happen in a row
336 0           $self->{failcount}++;
337 0           sleep(0.05);
338            
339 0 0         if($self->{failcount} > 5) {
340 0           $self->{needreconnect} = 1;
341 0           return;
342             }
343             }
344 0           while(@{$self->{incharbuffer}}) {
  0            
345 0           my $char = shift @{$self->{incharbuffer}};
  0            
346 0           $workCount++;
347 0 0         if($char eq "\r") {
    0          
348 0           next;
349             } elsif($char eq "\n") {
350 0 0         if($self->{inbuffer} eq 'NOP') { # Just drop "No OPerations" packets, only used by server to
351             # verify that the connection is still active
352             #$self->{firstline}
353 0           $self->{inbuffer} = '';
354 0           next;
355             }
356              
357 0 0         if($self->{firstline}) {
358 0 0         if($self->{inbuffer} !~ /^CLACKS\ /) {
359             # Whoops, not a clacks server or something gone wrong with the protocol
360 0           $self->{needreconnect} = 1;
361 0           return 0;
362             } else {
363 0           $self->{firstline} = 0;
364             }
365             }
366              
367 0           push @{$self->{inlines}}, $self->{inbuffer};
  0            
368 0           $self->{inbuffer} = '';
369             } else {
370 0           $self->{inbuffer} .= $char;
371             }
372             }
373              
374 0 0 0       if($self->{firstline} && $self->{headertimeout} < time) {
375 0           $self->{needreconnect} = 1;
376 0           return 0;
377             }
378              
379 0           return $workCount;
380             }
381              
382             my %overheadflags = (
383             A => "auth_token", # Authentication token
384             O => "auth_ok", # Authentication OK
385             F => "auth_failed", # Authentication FAILED
386              
387             E => 'error_message', # Server to client error message
388              
389             C => "close_all_connections",
390             D => "discard_message",
391             G => "forward_message",
392             I => "set_interclacks_mode", # value: true/false, disables 'G' and 'U'
393             M => "informal_message", # informal message
394             N => "no_logging",
395             S => "shutdown_service", # value: positive number (number in seconds before shutdown). If interclacks clients are present, should be high
396             # enough to flush all buffers to them
397             U => "return_to_sender",
398             Z => "no_flags", # Only sent when no other flags are set
399             );
400              
401 0     0 1   sub getNext($self) {
  0            
  0            
402             # Recieve next incoming message (if any)
403              
404             restartgetnext:
405 0           my $line = shift @{$self->{inlines}};
  0            
406              
407 0 0         if(!defined($line)) {
408 0           return;
409             }
410              
411 0           my %data;
412             #print STDERR "> $line\n";
413 0 0         if($line =~ /^NOTIFY\ (.+)/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
414 0           %data = (
415             type => 'notify',
416             name => $1,
417             );
418             } elsif($line =~ /^SET\ (.+?)\=(.*)/) {
419 0           %data = (
420             type => 'set',
421             name => $1,
422             data => $2,
423             );
424             } elsif($line =~ /^CLACKS\ (.+)/) {
425 0           %data = (
426             type => 'serverinfo',
427             data => $1,
428             );
429             } elsif($line =~ /^DEBUG\ (.+?)\=(.*)/) {
430 0           %data = (
431             type => 'debug',
432             host => $1,
433             command => $2,
434             );
435             } elsif($line =~ /^QUIT/) {
436 0           %data = (
437             type => 'disconnect',
438             data => 'quit',
439             );
440 0           $self->{needreconnect} = 1;
441             } elsif($line =~ /^TIMEOUT/) {
442 0           %data = (
443             type => 'disconnect',
444             data => 'timeout',
445             );
446 0           $self->{needreconnect} = 1;
447             } elsif($line =~ /^RECONNECTED/) {
448 0           %data = (
449             type => 'reconnected',
450             data => 'send your LISTEN requests again',
451             );
452             } elsif($line =~ /^OVERHEAD\ (.+?)\ (.+)/) {
453             # Minimal handling of OVERHEAD flags
454 0           my ($flags, $value) = ($1, $2);
455 0           my @flagparts = split//, $flags;
456 0           my %parsedflags;
457 0           foreach my $key (sort keys %overheadflags) {
458 0 0         if(contains($key, \@flagparts)) {
459 0           $parsedflags{$overheadflags{$key}} = 1;
460             } else {
461 0           $parsedflags{$overheadflags{$key}} = 0;
462             }
463             }
464              
465 0 0         if($parsedflags{auth_ok}) {
    0          
    0          
    0          
466             #print STDERR "Clacks AUTH OK\n";
467 0           goto restartgetnext; # try the next message
468             } elsif($parsedflags{error_message}) {
469 0           %data = (
470             type => 'error_message',
471             data => $value,
472             );
473             } elsif($parsedflags{auth_failed}) {
474 0           croak("Clacks Authentication failed!");
475             } elsif($parsedflags{informal_message}) {
476 0 0         if($parsedflags{forward_message}) {
477 0           %data = (
478             type => 'informal',
479             data => $value,
480             );
481             }
482 0 0         if($parsedflags{return_to_sender}) {
483 0           my $uturn = 'OVERHEAD M';
484 0 0         if($parsedflags{no_logging}) {
485 0           $uturn .= 'N';
486             }
487 0           $uturn .= ' ' . $value;
488 0           $self->{outbuffer} .= $uturn;
489             }
490             }
491              
492             } else {
493             # UNKNOWN, ignore
494 0           goto restartgetnext; # try the next message
495             }
496              
497 0 0         if(!defined($data{type})) {
498 0           return;
499             }
500              
501 0           $data{rawline} = $line;
502              
503 0           return \%data;
504             }
505              
506              
507 0     0 1   sub ping($self) {
  0            
  0            
508 0 0         if($self->{lastping} < (time - 120)) {
509             # Only send a ping every 120 seconds or less
510 0           $self->{outbuffer} .= "PING\r\n";
511 0           $self->{lastping} = time;
512             }
513              
514 0           return;
515             }
516              
517 0     0 1   sub disablePing($self) {
  0            
  0            
518 0           $self->{outbuffer} .= "NOPING\r\n";
519              
520 0           return;
521             }
522              
523              
524 0     0 1   sub notify($self, $varname) {
  0            
  0            
  0            
525 0 0 0       if(!defined($varname) || !length($varname)) {
526 0           carp("varname not defined!");
527 0           return;
528             }
529              
530 0 0         if($self->{needreconnect}) {
531 0           $self->reconnect;
532             }
533              
534 0           $self->{outbuffer} .= "NOTIFY $varname\r\n";
535              
536 0 0         if($self->{memcached_compatibility}) {
537 0           while(1) {
538 0           $self->doNetwork();
539 0 0         if($self->{needreconnect}) {
540             # Nothing we can do, really...
541 0           return;
542             }
543 0 0         last if(!length($self->{outbuffer}));
544 0           usleep(1000);
545             }
546 0           $self->autohandle_messages();
547             }
548              
549 0           return;
550             }
551              
552 0     0 1   sub set($self, $varname, $value, $forcesend = 0) { ## no critic (NamingConventions::ProhibitAmbiguousNames)
  0            
  0            
  0            
  0            
  0            
553 0 0 0       if(!defined($varname) || !length($varname)) {
554 0           carp("varname not defined!");
555 0           return;
556             }
557 0 0         if(!defined($value)) {
558 0           carp("value not defined!");
559 0           return;
560             }
561              
562 0 0         if(!defined($forcesend)) {
563 0           $forcesend = 0;
564             }
565              
566 0 0         if($self->{needreconnect}) {
567 0           $self->reconnect;
568             }
569              
570             # Handle caching to lower output volumne
571 0 0 0       if($self->{iscaching} && !$forcesend && defined($self->{cache}->{$varname}) && $self->{cache}->{$varname} eq $value) {
      0        
      0        
572             # Already the same value send
573 0           return;
574             }
575              
576 0 0         if($self->{iscaching}) {
577 0           $self->{cache}->{$varname} = $value;
578             }
579              
580 0           $self->{outbuffer} .= "SET $varname=$value\r\n";
581              
582 0 0         if($self->{memcached_compatibility}) {
583 0           while(1) {
584 0           $self->doNetwork();
585 0 0         if($self->{needreconnect}) {
586             # Nothing we can do, really...
587 0           return;
588             }
589 0 0         last if(!length($self->{outbuffer}));
590 0           usleep(1000);
591             }
592              
593 0           $self->autohandle_messages();
594             }
595              
596 0           return;
597             }
598              
599 0     0 1   sub listen($self, $varname) { ## no critic (Subroutines::ProhibitBuiltinHomonyms)
  0            
  0            
  0            
600 0 0 0       if(!defined($varname) || !length($varname)) {
601 0           carp("varname not defined!");
602 0           return;
603             }
604              
605 0 0         if($self->{needreconnect}) {
606 0           $self->reconnect;
607             }
608              
609 0           $self->{outbuffer} .= "LISTEN $varname\r\n";
610              
611 0           return;
612             }
613              
614 0     0 1   sub unlisten($self, $varname) {
  0            
  0            
  0            
615 0 0 0       if(!defined($varname) || !length($varname)) {
616 0           carp("varname not defined!");
617 0           return;
618             }
619              
620 0 0         if($self->{needreconnect}) {
621 0           $self->reconnect;
622             }
623              
624 0           $self->{outbuffer} .= "UNLISTEN $varname\r\n";
625              
626 0           return;
627             }
628              
629 0     0 1   sub setMonitormode($self, $active) {
  0            
  0            
  0            
630 0 0         if($self->{needreconnect}) {
631 0           $self->reconnect;
632             }
633              
634 0 0 0       if(!defined($active) || !$active) {
635 0           $self->{outbuffer} .= "UNMONITOR\r\n";
636             } else {
637 0           $self->{outbuffer} .= "MONITOR\r\n";
638             }
639              
640 0           return;
641             }
642              
643 0     0 1   sub getServerinfo($self) {
  0            
  0            
644 0           return $self->{serverinfo};
645             }
646              
647             # ---------------- ClackCache handling --------------------
648             # ClacksCache handling always implies doNetwork()
649             # Also, we do NOT use the caching system used for SET
650 0     0 1   sub store($self, $varname, $value) {
  0            
  0            
  0            
  0            
651 0 0 0       if(!defined($varname) || !length($varname)) {
652 0           carp("varname not defined!");
653 0           return;
654             }
655 0 0         if(!defined($value)) {
656 0           carp("value not defined!");
657 0           return;
658             }
659              
660 0 0         if($self->{needreconnect}) {
661 0           $self->reconnect;
662             }
663              
664 0           $self->{outbuffer} .= "STORE $varname=$value\r\n";
665 0           while(1) {
666 0           $self->doNetwork();
667 0 0         if($self->{needreconnect}) {
668             # Nothing we can do, really...
669 0           return;
670             }
671 0 0         last if(!length($self->{outbuffer}));
672 0           usleep(1000);
673             }
674              
675 0 0         if($self->{memcached_compatibility}) {
676 0           $self->autohandle_messages();
677             }
678              
679 0           return;
680             }
681              
682 0     0 1   sub retrieve($self, $varname) {
  0            
  0            
  0            
683 0 0 0       if(!defined($varname) || !length($varname)) {
684 0           carp("varname not defined!");
685 0           return;
686             }
687              
688 0           my $value;
689              
690 0 0         if($self->{needreconnect}) {
691 0           $self->reconnect;
692             }
693              
694 0           $self->{outbuffer} .= "RETRIEVE $varname\r\n";
695              
696             # Make sure we send everything
697 0           while(1) {
698 0           $self->doNetwork();
699 0 0         if($self->{needreconnect}) {
700             # Nothing we can do, really...
701 0           return;
702             }
703 0 0         last if(!length($self->{outbuffer}));
704             }
705              
706             # Now, wait for the answer
707 0           my $answerline;
708 0           while(1) {
709 0           $self->doNetwork(0.5);
710 0 0         if($self->{needreconnect}) {
711             # Nothing we can do, really...
712 0           return;
713             }
714 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
715 0 0 0       if($self->{inlines}->[$i] =~ /^RETRIEVED\ $varname/ || $self->{inlines}->[$i] =~ /^NOTRETRIEVED\ $varname/) {
716             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
717 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
718 0           last;
719             }
720             }
721 0 0         last if(defined($answerline));
722             }
723              
724 0 0         if($answerline =~ /^RETRIEVED\ (.+?)\=(.*)/) {
725 0           my ($key, $val) = ($1, $2);
726 0 0         if($key ne $varname) {
727             #print STDERR "Retrieved clacks key $key does not match requested varname $varname!\n";
728 0           return;
729             }
730 0           return $val;
731             }
732              
733             # No matching key
734 0           return;
735             }
736              
737 0     0 1   sub remove($self, $varname) {
  0            
  0            
  0            
738 0 0 0       if(!defined($varname) || !length($varname)) {
739 0           carp("varname not defined!");
740 0           return;
741             }
742              
743 0 0         if($self->{needreconnect}) {
744 0           $self->reconnect;
745             }
746              
747 0           $self->{outbuffer} .= "REMOVE $varname\r\n";
748 0           while(1) {
749 0           $self->doNetwork();
750 0 0         if($self->{needreconnect}) {
751             # Nothing we can do, really...
752 0           return;
753             }
754 0 0         last if(!length($self->{outbuffer}));
755 0           usleep(1000);
756             }
757              
758 0 0         if($self->{memcached_compatibility}) {
759 0           $self->autohandle_messages();
760             }
761              
762 0           return;
763             }
764              
765 0     0 1   sub increment($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
766 0 0 0       if(!defined($varname) || !length($varname)) {
767 0           carp("varname not defined!");
768 0           return;
769             }
770              
771 0 0         if($self->{needreconnect}) {
772 0           $self->reconnect;
773             }
774              
775 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
776 0           $self->{outbuffer} .= "INCREMENT $varname\r\n";
777             } else {
778 0           $stepsize = 0 + $stepsize;
779 0           $self->{outbuffer} .= "INCREMENT $varname=$stepsize\r\n";
780             }
781              
782 0           while(1) {
783 0           $self->doNetwork();
784 0 0         if($self->{needreconnect}) {
785             # Nothing we can do, really...
786 0           return;
787             }
788 0 0         last if(!length($self->{outbuffer}));
789 0           usleep(1000);
790             }
791              
792 0 0         if($self->{memcached_compatibility}) {
793 0           $self->autohandle_messages();
794             }
795              
796 0           return;
797             }
798              
799 0     0 1   sub decrement($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
800 0 0 0       if(!defined($varname) || !length($varname)) {
801 0           carp("varname not defined!");
802 0           return;
803             }
804              
805 0 0         if($self->{needreconnect}) {
806 0           $self->reconnect;
807             }
808              
809 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
810 0           $self->{outbuffer} .= "DECREMENT $varname\r\n";
811             } else {
812 0           $stepsize = 0 + $stepsize;
813 0           $self->{outbuffer} .= "DECREMENT $varname=$stepsize\r\n";
814             }
815 0           while(1) {
816 0           $self->doNetwork();
817 0 0         if($self->{needreconnect}) {
818             # Nothing we can do, really...
819 0           return;
820             }
821 0 0         last if(!length($self->{outbuffer}));
822 0           usleep(1000);
823             }
824              
825 0 0         if($self->{memcached_compatibility}) {
826 0           $self->autohandle_messages();
827             }
828              
829 0           return;
830             }
831              
832 0     0 1   sub clearcache($self) {
  0            
  0            
833 0 0         if($self->{needreconnect}) {
834 0           $self->reconnect;
835             }
836              
837 0           $self->{outbuffer} .= "CLEARCACHE\r\n";
838 0           while(1) {
839 0           $self->doNetwork();
840 0 0         if($self->{needreconnect}) {
841             # Nothing we can do, really...
842 0           return;
843             }
844 0 0         last if(!length($self->{outbuffer}));
845 0           usleep(1000);
846             }
847              
848 0 0         if($self->{memcached_compatibility}) {
849 0           $self->autohandle_messages();
850             }
851              
852 0           return;
853             }
854              
855 0     0 1   sub keylist($self) {
  0            
  0            
856 0 0         if($self->{needreconnect}) {
857 0           $self->reconnect;
858             }
859              
860 0           my $value;
861              
862 0           $self->{outbuffer} .= "KEYLIST\r\n";
863              
864             # Make sure we send everything
865 0           while(1) {
866 0           $self->doNetwork();
867 0 0         if($self->{needreconnect}) {
868             # Nothing we can do, really...
869 0           return;
870             }
871 0 0         last if(!length($self->{outbuffer}));
872 0           usleep(1000);
873             }
874              
875             # Now, wait for the answer
876 0           my $liststartfound = 0;
877 0           my $listendfound = 0;
878 0           while(1) {
879 0           $self->doNetwork(0.5);
880 0 0         if($self->{needreconnect}) {
881             # Nothing we can do, really...
882 0           return;
883             }
884 0           $liststartfound = 0;
885 0           $listendfound = 0;
886 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
887 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTSTART/) {
888 0           $liststartfound = 1;
889 0           next;
890             }
891 0 0         next unless($liststartfound);
892 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTEND/) {
893 0           $listendfound = 1;
894 0           last;
895             }
896             }
897 0 0         last if($listendfound);
898             }
899              
900             # Now, grab the keys from inlines buffer
901 0           my @keys;
902 0           my $idx = 0;
903 0           my $listfound = 0;
904 0           while($idx < scalar @{$self->{inlines}}) {
  0            
905 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTSTART/) {
906             # Just remove this line
907 0           splice @{$self->{inlines}}, $idx, 1;
  0            
908 0           $listfound = 1;
909 0           next;
910             }
911              
912 0 0         if(!$listfound) {
913 0           $idx++;
914 0           next;
915             }
916              
917 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTEND/) {
918             # End of list
919 0           last;
920             }
921              
922 0 0         if($self->{inlines}->[$idx] =~ /^KEY\ (.+)/) {
923 0           push @keys, $1;
924             # Don't increment $idx, but the rest of the array one element down
925 0           splice @{$self->{inlines}}, $idx, 1;
  0            
926             } else {
927 0           $idx++;
928             }
929             }
930              
931 0 0         if($self->{memcached_compatibility}) {
932 0           $self->autohandle_messages();
933             }
934              
935 0           return @keys;
936             }
937              
938 0     0 1   sub clientlist($self) {
  0            
  0            
939 0 0         if($self->{needreconnect}) {
940 0           $self->reconnect;
941             }
942              
943 0           my $value;
944              
945 0           $self->{outbuffer} .= "CLIENTLIST\r\n";
946              
947             # Make sure we send everything
948 0           while(1) {
949 0           $self->doNetwork();
950 0 0         if($self->{needreconnect}) {
951             # Nothing we can do, really...
952 0           return;
953             }
954 0 0         last if(!length($self->{outbuffer}));
955 0           usleep(1000);
956             }
957              
958             # Now, wait for the answer
959 0           my $liststartfound = 0;
960 0           my $listendfound = 0;
961 0           while(1) {
962 0           $self->doNetwork(0.5);
963 0 0         if($self->{needreconnect}) {
964             # Nothing we can do, really...
965 0           return;
966             }
967 0           $liststartfound = 0;
968 0           $listendfound = 0;
969 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
970 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTSTART/) {
971 0           $liststartfound = 1;
972 0           next;
973             }
974 0 0         next unless($liststartfound);
975 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTEND/) {
976 0           $listendfound = 1;
977 0           last;
978             }
979             }
980 0 0         last if($listendfound);
981             }
982              
983             # Now, grab the keys from inlines buffer
984 0           my @keys;
985 0           my $idx = 0;
986 0           my $listfound = 0;
987 0           while($idx < scalar @{$self->{inlines}}) {
  0            
988 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTSTART/) {
989             # Just remove this line
990 0           splice @{$self->{inlines}}, $idx, 1;
  0            
991 0           $listfound = 1;
992 0           next;
993             }
994              
995 0 0         if(!$listfound) {
996 0           $idx++;
997 0           next;
998             }
999              
1000 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTEND/) {
1001             # End of list
1002 0           last;
1003             }
1004              
1005 0 0         if($self->{inlines}->[$idx] =~ /^CLIENT\ (.+)/) {
1006 0           my @parts = split/\;/, $1;
1007 0           my %data;
1008 0           foreach my $part (@parts) {
1009 0           my ($datakey, $datavalue) = split/\=/, $part, 2;
1010             #$datakey = lc $datakey;
1011 0           $data{lc $datakey} = $datavalue;
1012             }
1013 0           push @keys, \%data;
1014             # Don't increment $idx, but the rest of the array one element down
1015 0           splice @{$self->{inlines}}, $idx, 1;
  0            
1016             } else {
1017 0           $idx++;
1018             }
1019             }
1020              
1021 0 0         if($self->{memcached_compatibility}) {
1022 0           $self->autohandle_messages();
1023             }
1024              
1025 0           return @keys;
1026             }
1027              
1028 0     0 1   sub flush($self, $flushid = '') {
  0            
  0            
  0            
1029 0 0 0       if(!defined($flushid) || $flushid eq '') {
1030 0           $flushid = 'AUTO' . int(rand(1_000_000)) . int(rand(1_000_000));
1031             }
1032              
1033 0 0         if($self->{needreconnect}) {
1034 0           $self->reconnect;
1035             }
1036              
1037 0           $self->{outbuffer} .= "FLUSH $flushid\r\n";
1038              
1039             # Make sure we send everything
1040 0           while(1) {
1041 0           $self->doNetwork();
1042 0 0         if($self->{needreconnect}) {
1043             # Nothing we can do, really...
1044 0           return;
1045             }
1046 0 0         last if(!length($self->{outbuffer}));
1047             }
1048              
1049             # Now, wait for the answer
1050 0           my $answerline;
1051 0           while(1) {
1052 0           $self->doNetwork(0.5);
1053 0 0         if($self->{needreconnect}) {
1054             # Nothing we can do, really...
1055 0           return;
1056             }
1057 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
1058 0 0         if($self->{inlines}->[$i] =~ /^FLUSHED\ $flushid/) {
1059             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
1060 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
1061 0           last;
1062             }
1063             }
1064 0 0         last if(defined($answerline));
1065             }
1066              
1067 0           return;
1068             }
1069              
1070              
1071 0     0 1   sub autohandle_messages($self) {
  0            
  0            
1072 0           $self->doNetwork();
1073              
1074 0           while((my $line = $self->getNext())) {
1075 0 0         if($line->{type} eq 'disconnect') {
1076 0           $self->{needreconnect} = 1;
1077             }
1078             }
1079              
1080 0           return;
1081             }
1082              
1083             # ---------------- ClackCache handling --------------------
1084              
1085 0     0 1   sub sendRawCommand($self, $command) {
  0            
  0            
  0            
1086 0 0 0       if(!defined($command) || !length($command)) {
1087 0           carp("command not defined!");
1088 0           return;
1089             }
1090              
1091 0 0         if($self->{needreconnect}) {
1092 0           $self->reconnect;
1093             }
1094              
1095 0           $self->{outbuffer} .= $command . "\r\n";
1096              
1097 0           return;
1098             }
1099              
1100             # setAndStore combines the SET and STORE command into the SETANDSTORE server command. This is mostly done
1101             # for optimizing interclacks connections
1102             # Other clients will only get a SET notification, but the server also runs a STORE operation
1103 0     0 1   sub setAndStore($self, $varname, $value, $forcesend = 0) {
  0            
  0            
  0            
  0            
  0            
1104 0 0 0       if(!defined($varname) || !length($varname)) {
1105 0           carp("varname not defined!");
1106 0           return;
1107             }
1108              
1109 0 0         if(!defined($value)) {
1110 0           carp("value not defined!");
1111 0           return;
1112             }
1113              
1114 0 0         if(!defined($forcesend)) {
1115 0           $forcesend = 0;
1116             }
1117              
1118 0           $self->{outbuffer} .= "SETANDSTORE $varname=$value\r\n";
1119              
1120 0 0         if($self->{memcached_compatibility}) {
1121 0           while(1) {
1122 0           $self->doNetwork();
1123 0 0         if($self->{needreconnect}) {
1124             # Nothing we can do, really...
1125 0           return;
1126             }
1127 0 0         last if(!length($self->{outbuffer}));
1128 0           usleep(1000);
1129             }
1130              
1131 0           $self->autohandle_messages();
1132             }
1133              
1134 0           return;
1135             }
1136              
1137 0     0 1   sub fastdisconnect($self) {
  0            
  0            
1138 0 0         if(defined($self->{socket})) {
1139 0           delete $self->{socket};
1140             }
1141 0           $self->{needreconnect} = 1;
1142 0           return;
1143             }
1144              
1145 0     0 1   sub disconnect($self) {
  0            
  0            
1146 0 0         if($self->{needreconnect}) {
1147             # We are not connected, just do nothing
1148 0           return;
1149             }
1150              
1151 0           $self->flush();
1152 0           $self->{outbuffer} .= "QUIT\r\n";
1153 0           my $endtime = time + 0.5; # Wait a maximum of half a second to send
1154 0           while(1) {
1155 0 0         last if(time > $endtime);
1156 0           my $xstart = time;
1157 0           $self->doNetwork(-1);
1158 0           my $xend = time;
1159 0           my $timetaken = $xend - $xstart;
1160 0 0         if($timetaken > 1) {
1161             #print STDERR "\n §§§§§§§§§§§§§§§§§§§§§§§§§§§§§ doNetwork() took ", $timetaken, " seconds\n";
1162             }
1163 0 0         last if(!length($self->{outbuffer}));
1164 0           sleep(0.02);
1165             }
1166 0           sleep(0.2); # Wait for the OS to flush the socket
1167              
1168 0           delete $self->{socket};
1169 0           $self->{needreconnect} = 1;
1170              
1171 0           return;
1172             }
1173              
1174 0     0     sub DESTROY($self) {
  0            
  0            
1175             # Try to disconnect cleanly, but socket might already be DESTROYed, so catch any errors
1176 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
1177 0           $self->disconnect();
1178             };
1179              
1180 0           return;
1181             }
1182              
1183             1;
1184             __END__