File Coverage

blib/lib/Alvis/Saa.pm
Criterion Covered Total %
statement 26 350 7.4
branch 1 110 0.9
condition 1 30 3.3
subroutine 7 22 31.8
pod 10 16 62.5
total 45 528 8.5


line stmt bran cond sub pod time code
1             package Alvis::Saa;
2              
3             $Alvis::Saa::VERSION = '0.2';
4              
5 1     1   24317 use strict;
  1         2  
  1         37  
6              
7 1     1   567 use Alvis::Tana;
  1         3  
  1         26  
8              
9             # use Data::Dumper;
10 1     1   3225 use Sys::Hostname;
  1         1480  
  1         58  
11 1     1   1062 use IO::Socket;
  1         30381  
  1         6  
12 1     1   2444 use IO::Select;
  1         1805  
  1         65  
13 1     1   7 use Fcntl;
  1         2  
  1         4024  
14              
15             my $LOCALADDR_PREFIX = "/var/tmp/searchrpc_localsoc_";
16             my $debug = 0;
17              
18             ######################################################################
19             #
20             # Public methods
21             #
22             ###################################################################
23              
24             sub new
25             {
26 1     1 1 14 my ($this) = @_;
27 1   33     11 my $class = ref($this) || $this;
28              
29 1         7 my $my_addr = gethostbyname(hostname());
30 1 50       880 if(!defined($my_addr))
31             {
32 0         0 return undef;
33             }
34              
35             $this = {
36 1         16 'servs' => {},
37             'serv_sel' => IO::Select->new(),
38             'conns' => {},
39             'conn_sel' => IO::Select->new(),
40             'ip_clis' => {},
41             'my_addr' => $my_addr,
42             'err' => '',
43             'queue' => [],
44             };
45              
46 1         30 bless $this, $class;
47              
48 1         27 $SIG{'PIPE'} = 'IGNORE';
49              
50 1         4 return $this;
51             }
52              
53             sub err
54             {
55 0     0 1   my $this = shift;
56              
57 0           return $this->{'err'};
58             }
59              
60             # 'auto_arb' => bool # Autoread arb messages?
61             # 'callback' => [func, params]
62             sub listen
63             {
64 0     0 1   my $this = shift;
65 0           my $port = shift;
66              
67 0           my %par = @_;
68              
69 0 0         if(exists($this->{'servs'}->{$port}))
70             {
71 0           $this->{'err'} = "Already listening";
72 0           return 0;
73             }
74            
75 0           my $serv =
76             {
77             'port' => $port,
78             'auto_arb' => 0,
79             };
80              
81 0 0         if(exists($par{'callback'}))
82             {
83 0           $serv->{'callback'} = $par{'callback'};
84             }
85              
86 0 0         if(exists($par{'auto_arb'}))
87             {
88 0           $serv->{'auto_arb'} = $par{'auto_arb'};
89             }
90              
91 0           my $inet_sock = IO::Socket::INET->new(LocalPort => $port,
92             Type => SOCK_STREAM,
93             Reuse => 1,
94             Listen => 10);
95 0 0         if(!defined($inet_sock))
96             {
97 0           $this->{'err'} = "$@";
98 0           return 0;
99             }
100              
101             # print STDERR "Soketti on $LOCALADDR_PREFIX$port\n";
102 0           unlink "$LOCALADDR_PREFIX$port";
103 0           my $unix_sock = IO::Socket::UNIX->new(Local => "$LOCALADDR_PREFIX$port",
104             Type => SOCK_STREAM,
105             Listen => 10);
106 0 0         if(!defined($unix_sock))
107             {
108 0           $this->{'err'} = "$@";
109 0           close($inet_sock);
110 0           return 0;
111             }
112              
113 0           binmode($inet_sock, ":raw");
114 0           binmode($unix_sock, ":raw");
115              
116 0           $serv->{'inet_sock'} = $inet_sock;
117 0           $serv->{'unix_sock'} = $unix_sock;
118              
119 0           $this->{'servs'}->{$port} = $serv;
120 0           $this->{'serv_sel'}->add($inet_sock);
121 0           $this->{'serv_sel'}->add($unix_sock);
122              
123 0           return 1;
124             }
125              
126             sub connected
127             {
128 0     0 1   my $this = shift;
129 0           my $host = shift;
130 0           my $port = shift;
131              
132 0           return(exists($this->{'conns'}->{"${host}_$port"}));
133             }
134              
135             sub disconnect_all
136             {
137 0     0 1   my $this = shift;
138              
139 0           foreach (keys(%{$this->{'conns'}}))
  0            
140             {
141 0           my $conn = $this->{'conns'}->{$_}->{'conn'};
142 0           $this->{'conn_sel'}->remove($conn);
143 0           delete($this->{'conns'}->{"$_"});
144              
145 0           shutdown($conn, 2);
146 0           close($conn);
147             }
148              
149 0           return 1;
150             }
151              
152             sub disconnect
153             {
154 0     0 1   my $this = shift;
155 0           my $host = shift;
156 0           my $port = shift;
157              
158 0 0         if(!exists($this->{'conns'}->{"${host}_$port"}))
159             {
160 0           $this->{'err'} = "Not connected";
161 0           return 0;
162             }
163              
164 0           my $conn = $this->{'conns'}->{"${host}_$port"}->{'conn'};
165 0           $this->{'conn_sel'}->remove($conn);
166 0           delete($this->{'conns'}->{"${host}_$port"});
167              
168 0           shutdown($conn, 2);
169 0           close($conn);
170              
171 0           return 1;
172             }
173              
174              
175             sub unlisten
176             {
177 0     0 1   my $this = shift;
178 0           my $port = shift;
179              
180 0 0         if(!exists($this->{'servs'}->{$port}))
181             {
182 0           $this->{'err'} = "Not connected";
183 0           return 0;
184             }
185              
186 0           my $serv = $this->{'servs'}->{$port};
187 0           $this->{'serv_sel'}->remove($serv->{'unix_sock'});
188 0           $this->{'serv_sel'}->remove($serv->{'inet_sock'});
189 0           shutdown($serv->{'unix_sock'}, 2);
190 0           shutdown($serv->{'inet_sock'}, 2);
191 0           close($serv->{'unix_sock'});
192 0           close($serv->{'inet_sock'});
193 0           unlink("$LOCALADDR_PREFIX$port");
194 0           delete($this->{'servs'}->{$port});
195              
196 0           return 1;
197             }
198              
199             sub connect
200             {
201 0     0 1   my $this = shift;
202 0           my $host = shift;
203 0           my $port = shift;
204              
205 0 0         if(exists($this->{'conns'}->{"${host}_$port"}))
206             {
207 0           $this->{'err'} = "Already connected";
208 0           return 0;
209             }
210              
211 0           my $cn =
212             {
213             'host' => $host,
214             'port' => $port,
215             'auto_arb' => 1,
216             };
217              
218 0           my $addr = gethostbyname($host);
219 0           my $conn = undef;
220             # local socket handling is fundamentally broken, a saa-redesign is needed
221             # if($this->{'my_addr'} eq $addr) # try domain socket first
222             # {
223             # $conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
224             # Type => SOCK_STREAM,
225             # Timeout => 10);
226             # }
227 0 0         if(!defined($conn))
228             {
229             # $debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
230 0 0         if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
231             PeerPort => $port,
232             Proto => "tcp",
233             Type => SOCK_STREAM)))
234             {
235 0 0         $debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
236 0           $this->{'err'} = "$@";
237 0           return 0;
238             }
239             }
240             else
241             {
242 0 0         $debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
243             }
244              
245 0           binmode($conn, ":raw");
246              
247 0           $cn->{'conn'} = $conn;
248 0           $this->{'conn_sel'}->add($conn);
249 0           $this->{'conns'}->{"${host}_$port"} = $cn;
250              
251 0           return 1;
252             }
253              
254             # 'auto_arb' => bool
255             sub conn_set
256             {
257 0     0 0   my $this = shift;
258 0           my $host = shift;
259 0           my $port = shift;
260              
261 0           my %par = @_;
262              
263 0           my $c = "${host}_$port";
264 0 0         if(!exists($this->{'conns'}->{$c}))
265             {
266 0           $this->{'err'} = "No such connection.";
267 0           return 0;
268             }
269              
270 0           for(keys(%par))
271             {
272 0           $this->{'conns'}->{$c}->{$_} = $par{$_};
273             }
274            
275 0           return 1;
276             }
277              
278              
279             # 'tag' => client name for the msg
280             # 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
281             # 'arb_name' => scalar
282             sub queue
283             {
284 0     0 1   my $this = shift;
285 0           my $host = shift;
286 0           my $port = shift;
287 0           my $msg = shift;
288              
289 0           my %par = @_;
290              
291 0           my $q_elem = {
292             'host' => $host,
293             'port' => $port,
294             'msg' => $msg
295             };
296              
297 0 0         if(exists($par{'arb'}))
298             {
299 0           $q_elem->{'arb'} = $par{'arb'};
300 0           $q_elem->{'arb_name'} = $par{'arb_name'};
301             }
302              
303 0 0         if(exists($par{'tag'}))
304             {
305 0           $q_elem->{'tag'} = $par{'tag'};
306             }
307              
308             # print STDERR "scheduled req: " . Dumper($q_elem);
309 0           push(@{$this->{'queue'}}, $q_elem);
  0            
310              
311 0           return 1;
312             }
313              
314             sub process_accept
315             {
316 0     0 0   my $this = shift;
317 0           my $timeout = shift;
318              
319 0           $timeout=10;
320              
321 0           my @servs = keys(%{$this->{'servs'}});
  0            
322 0           my @reads = $this->{'serv_sel'}->can_read($timeout);
323            
324             # print "Riidit: " . Dumper(\@reads) . "\n";
325 0           my $conn;
326 0           foreach $conn (@reads)
327             {
328 0           my $serv;
329 0           my $found = 0;
330 0           for(@servs)
331             {
332 0 0 0       if($this->{'servs'}->{$_}->{'inet_sock'} == $conn ||
333             $this->{'servs'}->{$_}->{'unix_sock'} == $conn)
334             {
335 0           $serv = $this->{'servs'}->{$_};
336 0           $found = 1;
337 0           last;
338             }
339             }
340              
341 0           my $client = $conn->accept();
342             # print "Conn " . Dumper($conn);
343 0 0         if(!defined($client))
344             {
345             # print STDERR "PRKL: $!\n";
346 0           next;
347             }
348              
349 0           my $str_ip;
350             my $port;
351             #for some reason sockdomain returns undef
352             # if(AF_INET == $client->sockdomain)
353             # {
354 0           my $sockaddr = $client->peername();
355 0           my $iaddr;
356 0           ($port, $iaddr) = sockaddr_in($sockaddr);
357 0           $str_ip = inet_ntoa($iaddr);
358             # print STDERR "Saa: accept found port $port and ip $str_ip\n";
359             # }
360             # else # AF_UNIX
361             # {
362             # my $sn = $client->sockname();
363             # $sn =~ /$LOCALADDR_PREFIX([0-9]+)/;
364             # $port = $1;
365             # $str_ip = inet_ntoa($this->{'my_addr'});
366             # $debug && print STDERR "Saa::process_accept(): AF_UNIX connection with ip $str_ip port $port\n";
367             # }
368              
369 0           my $cn =
370             {
371             'host' => $str_ip,
372             'port' => $port,
373             'conn' => $client,
374             'lport' => $serv->{'port'},
375             };
376              
377 0 0         $serv->{'auto_arb'} && ($cn->{'auto_arb'} = $serv->{'auto_arb'});
378 0 0         $serv->{'callback'} && ($cn->{'callback'} = $serv->{'callback'});
379              
380 0           $this->{'conns'}->{"${str_ip}_$port"} = $cn;
381 0           $this->{'conn_sel'}->add($client);
382             }
383              
384 0           return (1, 0);
385             }
386              
387             sub process_write
388             {
389 0     0 0   my $this = shift;
390 0           my $sent = shift;
391              
392 0           my $q = $this->{'queue'};
393 0           my %banned = (); # makes sure the order of messages for the same connection is kept
394              
395 0           my $offset = 0;
396 0           while($offset < scalar(@$q))
397             {
398 0           my $qe = $q->[$offset];
399              
400             # ensure connection
401 0 0         if(! $this->connected($qe->{'host'}, $qe->{'port'}))
402             {
403             # print STDERR "Write connects to $qe->{'host'} $qe->{'port'}\n";
404 0 0         if(!$this->connect($qe->{'host'}, $qe->{'port'}))
405             {
406             # print STDERR "Write is not connected to $qe->{'host'} $qe->{'port'}: $@\n";
407 0           $this->{'err'} = $@;
408 0           $qe->{'status'} = "failed";
409 0           shift(@$q);
410 0           push(@$sent, $qe);
411              
412 0           return (0, scalar(@$q));
413             }
414             }
415            
416 0           my $connstr = $qe->{'host'} . "_" . $qe->{'port'};
417 0           my $conn = $this->{'conns'}->{$connstr}->{'conn'};
418              
419             #see writability if not known
420 0 0         if($banned{$connstr})
421             {
422 0           $offset++;
423 0           next; # earlier message in q already unsent to this connstr
424             }
425            
426 0 0         if(scalar(IO::Select->new($conn)->can_write(0)))
427             {
428 0           my $ok;
429 0 0         if(defined($qe->{'arb_name'}))
430             {
431             # print STDERR "Saa: writing arb " . $qe->{'arb_name'} . " " . Dumper($qe->{'msg'});
432 0           $ok = Alvis::Tana::write($conn, $qe->{'msg'}, $qe->{'arb_name'});
433             }
434             else
435             {
436             # print STDERR "Saa: writing fix " . Dumper($qe->{'msg'});
437 0           $ok = Alvis::Tana::write($conn, $qe->{'msg'}, undef);
438             }
439 0 0         if(!$ok)
440             {
441 0           $qe->{'status'} = "failed";
442 0           $this->{'err'} = Alvis::Tana::error($conn);
443 0           $this->disconnect($qe->{'host'}, $qe->{'port'});
444 0           shift(@$q);
445 0           push(@$sent, $qe);
446            
447 0           return (0, $offset < scalar(@$q));
448             }
449            
450 0 0         if(defined($qe->{'arb_name'}))
451             {
452 0           $ok = 1;
453 0           my $func = undef;
454 0           my @param;
455 0 0         if(!ref($qe->{'arb'})) #scalar
    0          
456             {
457             # print STDERR "S: writing scalar arb " . $qe->{'arb'} . "\n";
458             # print STDERR Dumper($qe->{'arb'});
459 0           $ok = Alvis::Tana::write_arb($conn, $qe->{'arb'}, 1);
460             }
461             elsif(ref($qe->{'arb'}) eq 'ARRAY')
462             {
463             # print STDERR "S: arb-callback with params\n";
464 0           @param = @{$qe->{'arb'}};
  0            
465 0           $func = shift(@param);
466             }
467             else
468             {
469             # print STDERR "S: arb-callback without params\n";
470 0           $func = $qe->{'arb'};
471 0           @param = ();
472             }
473            
474 0 0         if(defined($func))
475             {
476             # print STDERR "S: writing arb from func...\n";
477 0           my $end = 0;
478 0           my $arb;
479 0   0       while($ok && !$end)
480             {
481 0           ($arb, $end) = $func->($this, $qe->{'tag'}, $qe->{'host'}, $qe->{'port'}, @param);
482 0           $ok = Alvis::Tana::write_arb($conn, $arb, $end);
483             }
484            
485 0 0         if(!$ok)
486             {
487 0           $qe->{'status'} = "failed";
488 0           $this->{'err'} = Alvis::Tana::error($conn);
489 0           $this->disconnect($qe->{'host'}, $qe->{'port'});
490 0           shift(@$q);
491 0           push(@$sent, $qe);
492            
493 0           return (0, $offset < scalar(@$q));
494             }
495             }
496             }
497 0           $qe->{'status'} = "ok";
498 0           shift(@$q);
499 0           push(@$sent, $qe);
500             }
501             }
502              
503 0           return(1, $offset < scalar(@$q));
504             }
505              
506             sub process_read
507             {
508 0     0 0   my $this = shift;
509 0           my $received = shift;
510 0           my $timeout = shift;
511              
512 0           my $pending = 0;
513              
514             # if(rand(100000) < 10)
515             # {
516             # my $ch = $this->{'conns'};
517             # print STDERR "Saa::process_read() does can_read for " . scalar(keys(%$ch)) . " sockets\n";
518             # }
519 0           my @conns = $this->{'conn_sel'}->can_read(0);
520 0           my @cns = keys(%{$this->{'conns'}});
  0            
521              
522             # print STDERR "read conns: " . scalar(@conns) . Dumper(@conns);
523            
524 0           my $conn;
525 0           foreach $conn (@conns)
526             {
527 0           my $cn;
528             my $cnkey;
529 0           my $found = 0;
530 0           for (@cns)
531             {
532 0 0         if($this->{'conns'}->{$_}->{'conn'} == $conn)
533             {
534 0           $cn = $this->{'conns'}->{$_};
535 0           $cnkey = $_;
536 0           $found = 1;
537 0           last;
538             }
539             }
540              
541 0           my $arb_type = 0;
542             # print STDERR "saa: reading msg from " . $conns[$i] . " / " . fileno($conns[$i]) . "\n";
543 0           my $msg = Alvis::Tana::read($conn, \$arb_type);
544              
545             # warn "Saa process_read(): Alvis::Tana::read() gave msg",Dumper($msg);
546              
547 0 0         if(!defined($msg))
548             {
549 0           $this->{'err'} = Alvis::Tana::error($conn);
550 0 0         if(scalar(@conns) > scalar(@$received))
551             {
552 0           $pending = 1;
553             }
554            
555 0           $this->disconnect($cnkey);
556 0           next;
557             }
558 0           my $entry =
559             {
560             'msg' => $msg,
561             'type' => 'fix',
562             'host' => $cn->{'host'},
563             'port' => $cn->{'port'},
564             'conn' => $conn,
565             };
566 0 0         if(defined($arb_type))
567             {
568 0           $entry->{'type'} = 'arb';
569 0           $entry->{'arb_name'} = $arb_type;
570 0 0 0       if(exists($cn->{'auto_arb'}) && ($cn->{'auto_arb'}))
571             {
572 0           my $eom = 0;
573 0           my $arb = '';
574 0           while(!$eom)
575             {
576 0           my $ext = Alvis::Tana::read_arb($entry->{'conn'}, 1024000, \$eom);
577 0 0         if(!defined($ext))
578             {
579 0           $this->{'err'} = "Error auto-reading arb: " . Alvis::Tana::error($entry->{'conn'});
580 0 0         if(scalar(@conns) > scalar(@$received))
581             {
582 0           $pending = 1;
583             }
584 0           return (0, $pending);
585             }
586            
587 0           $arb .= $ext;
588             }
589            
590 0           $entry->{'arb'} = $arb;
591             }
592             }
593            
594 0 0         if($cn->{'callback'})
595             {
596 0           my $cb = $cn->{'callback'};
597 0           my $func = undef;
598 0           my @param = ();
599            
600 0 0         $debug && print STDERR "Callback = ", ref($cb), "\n";
601 0 0         if(ref($cb) eq 'CODE')
602             {
603 0           $func = $cb;
604             }
605             else
606             {
607 0           @param = @$cb;
608 0           $func = shift(@param);
609             }
610 0 0         $debug && print STDERR "Func cb ref = ", ref($func), "\n";
611 0           $func->($this, $entry, @param);
612             }
613             else
614             {
615 0           push(@$received, $entry);
616             }
617             }
618              
619 0           return (1, 0);
620             }
621              
622              
623             sub process
624             {
625 0     0 1   my $this = shift;
626 0           my $timeout = shift;
627              
628 0           $timeout=10;
629              
630 0           my $received = [];
631 0           my $sent = [];
632              
633             #cleanup
634 0           for (keys(%{$this->{'conns'}}))
  0            
635             {
636 0           my $c = $this->{'conns'}->{$_}->{'conn'};
637 0 0         if((!$c->connected()))
638             {
639 0           print STDERR "Reaping $_\n";
640 0           my($host, $port) = split("_", $_);
641 0           $this->disconnect($host, $port);
642             }
643             }
644              
645             # read from conns
646             # print STDERR "*saa read\n";
647 0           my ($ok, $pending) = $this->process_read($received, $timeout);
648 0 0         if(!$ok)
649             {
650 0           print STDERR "read sanoi nok\n";
651 0           return (0, $sent, $received, $pending);
652             }
653              
654             # print STDERR "*saa write\n";
655             # write queue
656 0 0         if($pending)
657             {
658 0           ($ok, undef) = $this->process_write($sent, $timeout);
659             }
660             else
661             {
662 0           ($ok, $pending) = $this->process_write($sent, $timeout);
663             }
664 0 0         if(!$ok)
665             {
666             # print STDERR "write sanoi nok\n";
667 0           return (0, $sent, $received, $pending);
668             }
669              
670             # print STDERR "*saa accept\n";
671             # accept
672 0 0         if($pending)
673             {
674 0           ($ok, undef) = $this->process_accept($timeout);
675             }
676             else
677             {
678 0           ($ok, $pending) = $this->process_accept($timeout);
679             }
680              
681 0           return ($ok, $sent, $received, $pending);
682             }
683              
684             sub tana_msg_reply
685             {
686 0     0 0   my ($saa, $msg, $host, $port, $wait) = @_;
687              
688 0           my $giveup_time = time() + $wait;
689 0           my $done = 0;
690 0           my $reply = undef;
691 0           my $ok = 1;
692             do
693 0   0       {
694 0 0         if(!$saa->queue($host, $port, $msg))
695             {
696 0           return (0, "Saa::queue() failed: " . $saa->{'err'});
697             }
698              
699 0           my $received = [];
700 0           $ok = 1;
701 0   0       while(scalar(@$received) < 1 && (time() < $giveup_time) && $ok)
      0        
702             {
703 0           ($ok, undef, $received, undef) = $saa->process(0.1);
704 0 0         if(!$ok)
705             {
706 0           return (0, "Saa::process() failed: " . $saa->{'err'});
707             }
708             }
709 0 0         if(scalar(@$received) > 0)
710             {
711 0           $reply = $received->[0]->{'msg'};
712 0           $done = 1;
713             }
714             } while((!$done) && (time() < $giveup_time));
715              
716 0 0         if(!$done)
717             {
718 0           return (0, "Timeout.");
719             }
720              
721 0           return ($ok, $reply);
722             }
723              
724             sub tana_msg_send
725             {
726 0     0 0   my ($saa, $msg, $host, $port, $wait) = @_;
727              
728 0           my $giveup_time = time() + $wait;
729 0           my $done = 0;
730 0           my $stat = undef;
731 0           my $sent = [];
732 0           my $ok = 1;
733             do
734 0   0       {
735 0 0         if(!$saa->queue($host, $port, $msg))
736             {
737 0           return (0, "Saa::queue() failed: " . $saa->{'err'});
738             }
739 0           $ok = 1;
740 0           $sent = [];
741 0   0       while(scalar(@$sent) < 1 && (time() < $giveup_time) && $ok)
      0        
742             {
743 0           ($ok, $sent, undef, undef) = $saa->process(0.1);
744 0 0         if(!$ok)
745             {
746 0           return (0, "Saa::process() failed: " . $saa->{'err'});
747             }
748             }
749 0 0         if(scalar(@$sent) > 0)
750             {
751 0           $done = 1;
752             }
753             } while((!$done) && (time() < $giveup_time));
754              
755 0 0         if(!$done)
756             {
757 0           return (0, "Timeout.");
758             }
759              
760 0           return ($ok, $sent->[0]->{'status'});
761             }
762              
763              
764              
765             1;
766              
767             __END__