File Coverage

blib/lib/Net/PSYC/Circuit.pm
Criterion Covered Total %
statement 256 348 73.5
branch 95 170 55.8
condition 40 81 49.3
subroutine 33 44 75.0
pod 0 3 0.0
total 424 646 65.6


line stmt bran cond sub pod time code
1             package Net::PSYC::Circuit;
2              
3             our $VERSION = '0.4';
4              
5 1     1   6 use strict;
  1         2  
  1         43  
6 1     1   5 use Socket qw(SO_KEEPALIVE inet_ntoa);
  1         1  
  1         69  
7 1     1   2376 use IO::Socket::INET;
  1         11148  
  1         9  
8              
9             import Net::PSYC qw( watch add W sendmsg same_host send_mmp parse_uniform BLOCKING makeMSG make_psyc parse_psyc parse_mmp PSYC_PORT PSYCS_PORT register_host register_route make_mmp UNL);
10              
11             sub listen {
12             # looks funky.. eh? undef makes IO::Socket handle INADDR_ANY properly
13             # whereas '' causes an exception. stupid IO::Socket if you ask me.
14 1     1 0 4 my ($class, $ip, $port, $options) = @_;
15 1 50 50     22 my $socket = IO::Socket::INET->new(
      50        
16             LocalAddr => $ip || undef,
17             # undef == use INADDR_ANY
18             LocalPort => $port || undef,
19             # undef == take any port
20             Proto => 'tcp',
21             Listen => 7,
22             Blocking => BLOCKING() & 2,
23             Timeout => 5,
24             ReuseAddr => 1
25             )
26             or return $!;
27 1   33     533 my $self = {
      33        
28             'SOCKET' => $socket,
29             'IP' => $ip||$socket->sockhost(),
30             'PORT' => $_[2] || $socket->sockport,
31             'LAST_RECV' => getsockname($socket),
32             'type' => 'c',
33             'O' => $options,
34             };
35 1         67 W1('TCP Listen %s:%s successful.', $self->{'IP'}, $self->{'PORT'});
36 1         4 bless $self, 'Net::PSYC::Circuit::L';
37 1 50       4 watch($self) unless BLOCKING() & 2;
38 1         4 return $self;
39             }
40              
41             # new ( \*socket, vars )
42             sub new {
43 2     2 0 81 my ($class, $socket, $vars) = @_;
44 2         53 my $self = {
45             'O' => {},
46             'SOCKET' => $socket,
47             'type' => 'c',
48             # These buffer may be moved to a reconnected object.
49             # maybe not the IN-buffer, but right now we cannot do anything
50             # about it. -> later TODO
51             'I_BUFFER' => '',
52             'O_BUFFER' => [],
53             # this one not! (its only used for negotiation and such)
54             'N_BUFFER' => [],
55             'O_COUNT' => 0,
56             'CACHE' => {}, # cache for fragmented data
57             'I_LENGTH' => 0, # whether _length of incomplete
58             # packets exceeds buffer-length
59             'FRAGMENT_COUNT' => 0,
60             'R' => {},
61             'L' => 0,
62             'state_temp' => {},
63             'state' => {},
64             'vars' => {},
65             'error' => 0,
66             %$vars,
67             };
68 2         10 $socket->sockopt( SO_KEEPALIVE(), 1 );
69 2         31 bless $self, 'Net::PSYC::Circuit::C';
70              
71 2         15 $self->{'R_HOST'} = $self->{'R_IP'};
72 2         14 $self->{'peeraddr'} = "psyc://$self->{'R_HOST'}:$self->{'R_PORT'}/";
73 2         9 $Net::PSYC::C{"$self->{'R_IP'}\:$self->{'R_PORT'}"} = $self;
74            
75             # stupid if.
76 2 50       19 register_host($self->{'R_IP'}, $self->{'R_HOST'}) if ($self->{'R_HOST'});
77 2         6 register_host('127.0.0.1', 'localhost');
78             # TRUST ist something arbitrary anyway.. there is no problem to wait for
79             # the dns_resolution to set it correctly
80             same_host('127.0.0.1', $self->{'R_IP'},
81             sub {
82 2     2   4 my $result = shift;
83 2 50       11 $self->TRUST(9) if $result;
84 2         23 });
85 2         15 register_route("$self->{'R_HOST'}\:$self->{'R_PORT'}", $self);
86              
87             # if we are an accepted socket, the greeting has allready been fired out
88             #
89             # in case we are blocking _and_ mmp modules have been activated we block
90             # here and do negotiation
91             #
92 2 50       7 watch($self) unless (BLOCKING() & 2);
93 2 50       6 if (BLOCKING() & 1) { # blocking writes!
94 0         0 $self->logon();
95             } else {
96 2     2   13 add($self->{'SOCKET'}, 'w', sub { $self->logon() }, 0);
  2         8  
97             }
98 2         10 return $self;
99             }
100              
101             sub connect {
102 1     1 0 3 my $class = shift;
103 1         2 my $ip = shift;
104 1   33     5 my $port = shift || PSYC_PORT();
105 1         5 my $socket = IO::Socket::INET->new(Proto => 'tcp',
106             PeerAddr => $ip,
107             Blocking => BLOCKING() & 1,
108             PeerPort => $port );
109             # we need some nonblocking error handling
110 1 50       421 if (!$socket) {
111 0         0 W1('TCP connect to %s:%d failed. (%s)', $ip, $port, $!);
112 0         0 return 0;
113             }
114 1         5 my $self = {
115             'R_IP' => $ip,
116             'R_PORT' => $port,
117             };
118 1         8 return Net::PSYC::Circuit->new($socket, $self);
119             }
120              
121             # TCP connection class
122             package Net::PSYC::Circuit::C;
123              
124 1     1   1767 use bytes;
  1         2  
  1         9  
125 1     1   24 use strict;
  1         2  
  1         40  
126              
127 1     1   6 use Socket;
  1         2  
  1         896  
128              
129 1     1   6 use base qw(Net::PSYC::MMP::State Net::PSYC::Hook);
  1         3  
  1         888  
130              
131             import Net::PSYC qw( revoke W UNL sendmsg same_host send_mmp parse_uniform BLOCKING makeMSG make_psyc parse_psyc parse_mmp make_mmp register_route register_host dns_lookup);
132              
133             my $PING_INTERVAL = 77;
134              
135             sub TRUST {
136 2     2   3 my $self = shift;
137 2 50       9 $self->{'TRUST'} = $_[0] if exists $_[0];
138 2   50     19 return $self->{'TRUST'} || 3;
139             }
140              
141             sub accept_modules {
142 0     0   0 my $self = shift;
143 0         0 my $module = shift;
144 0         0 my $on = shift;
145            
146             # !defined($on) ist quasi 1
147 0 0 0     0 if (!defined($on) || $on) {
148 0 0       0 return 1 if $self->accepting_modules($module);
149 0         0 $self->{'O'}->{'_understand_modules'}->{$module} = 1;
150 0         0 $self->fire($self->{'peeraddr'},0,{ '+_understand_modules' => $module });
151             # it is possible that this happens before proper neg? then we are
152             # dead! TODO
153              
154 0 0       0 return 1 if (!$self->{'R'}->{'_understand_modules'}->{$module});
155              
156 0 0       0 if ($module eq '_compress') {
    0          
157 0         0 return $self->zlib_init_client();
158             } elsif ($module eq '_encrypt') {
159 0 0       0 unless (SSL()) {
160 0         0 W0("The other side offers SSL-encryption. It would be wise to install IO::Socket::SSL (v0.93 or above).");
161 0         0 return 1;
162             }
163            
164             $self->fire($self->{'peeraddr'},0,
165             { '+_using_modules' => '_encrypt' },
166 0     0   0 sub { $self->{'OK'} = 0 },
167 0         0 );
168 0         0 $self->{'SSL_client'} = 1;
169             # TODO . same code as in gotiate(). Think about something else.
170             # plus: in case we have eventing we should use a timer-event to
171             # stop waiting.
172 0         0 return 1;
173             }
174             } else {
175             # may be impossible.
176 0         0 W0('It is impossible to remove the mmp module %s from an established'.
177             ' connection.', $module);
178 0 0       0 return 1 if (!exists $self->{'O'}->{'_understand_modules'}->{$module});
179             }
180             }
181              
182             sub accepting_modules {
183 2     2   4 my $self = shift;
184 2         4 my $module = shift;
185            
186 2 50       12 return 0 unless ($self->{'O'}->{'_understand_modules'}->{$module});
187 2         8 return 1;
188             }
189              
190             # counterparts to understand_ and use_
191             # means that a _using_modules came in
192             sub negotiate {
193 2     2   4 my $self = shift;
194 2         144 my $module = shift;
195            
196 2 50       10 unless (exists $self->{'R'}->{'_using_modules'}) {
197 2         9 $self->{'R'}->{'_using_modules'} = {};
198             }
199 2         9 $self->{'R'}->{'_using_modules'}->{$module} = 1;
200            
201 2 50       12 if ($module eq '_encrypt') {
    50          
202 0 0       0 if ($self->{'SSL_client'}) {
203 0         0 return $self->tls_init_client();
204             } else {
205 0         0 return $self->tls_init_server();
206             }
207             } elsif ($module eq '_compress') {
208 2         83 return $self->zlib_init_server();
209             }
210             }
211              
212             sub gotiate {
213 2     2   4 my $self = shift;
214 2         6 my $module = shift;
215              
216 2 50       10 unless (exists $self->{'R'}->{'_understand_modules'}) {
217 0         0 $self->{'R'}->{'_understand_modules'} = {};
218             }
219 2         8 $self->{'R'}->{'_understand_modules'}->{$module} = 1;
220 2 50       10 return 1 unless ($self->accepting_modules($module));
221              
222 2 50       19 if ($module eq '_encrypt') {
    50          
223 0 0       0 unless (SSL()) {
224 0         0 W0("The other side offers SSL-encryption. It would be wise to install IO::Socket::SSL (v0.93 or above).");
225 0         0 return 1;
226             }
227            
228 0         0 if (Net::PSYC::FORK) {
229             $self->fire($self->{'peeraddr'},0,
230             { '+_using_modules' => '_encrypt' },
231 0     0   0 sub { $self->{'OK'} = 0 }
232             );
233             } else {
234             $self->fire('',0,{ '_using_modules' => '_encrypt' },
235 0     0   0 sub { $self->{'OK'} = 0 }
236 0         0 );
237             }
238 0         0 $self->{'SSL_client'} = 1;
239            
240 0         0 return 1;
241             } elsif ($module eq '_compress') {
242 2         8 Net::PSYC::Event::revoke($self->{'SOCKET'}, 'w');
243 2         10 $self->zlib_init_client();
244 2         10 return 1;
245             }
246             }
247              
248             sub ping_init {
249 21     21   25 my $self = shift;
250             # we are a server or do not have eventing
251 21 100 66     122 return 1 if ($self->{'L'} || BLOCKING());
252            
253 18 100       77 Net::PSYC::Event::remove($self->{'ping_id'}) if exists $self->{'ping_id'};
254             $self->{'ping_sub'} ||= sub {
255 0     0   0 syswrite($self->{'SOCKET'}, ".\n");
256 18   100     52 };
257 18         55 $self->{'ping_id'} = Net::PSYC::Event::add( $PING_INTERVAL, 't',
258             $self->{'ping_sub'}, 1);
259             }
260              
261 0     0   0 sub tls_init_server { 1 }
262             sub tls_init_client {
263 0     0   0 my $self = shift;
264 0         0 my $t = IO::Socket::SSL->start_SSL($self->{'SOCKET'});
265             # SSL_server => ($self->{'L'}) ? 1 : 0);
266 0 0       0 if (ref $t ne 'IO::Socket::SSL') {
267 0         0 return 1;
268             }
269 0         0 W1('Using encryption to %s.', $self->{'peeraddr'});
270 0         0 $self->{'SOCKET'} = $t;
271 0         0 $self->{'OK'} = 1;
272 0 0       0 unless (BLOCKING()) {
273 0         0 Net::PSYC::Event::forget($self);
274 0         0 Net::PSYC::Event::watch($self);
275 0         0 Net::PSYC::Event::revoke($self->{'SOCKET'}, 'w');
276             }
277             }
278              
279             # the naming of client/server is fucked up. TODO
280             sub zlib_init_server {
281 2     2   4 my $self = shift;
282 2 50       4 unless (eval{ require Net::PSYC::MMP::Compress }) {
  2         24  
283 0         0 Net::PSYC::shutdown($self->{'SOCKET'});
284 0         0 W0('Somehow your Compression modules does not work (%s). '.
285             'Shutting down connection.', $@);
286             # shut down.. whatever
287             # TODO switch off _understand_modules _compress
288 0         0 return 1;
289             }
290 2 50       9 unless ($self->{'_compress'}) {
291 0         0 $self->{'_compress'} = new Net::PSYC::MMP::Compress($self);
292             }
293 2         18 $self->{'_compress'}->init('decrypt');
294 2         6 return 1;
295             }
296              
297             sub zlib_init_client {
298 2     2   6 my $self = shift;
299 2 50       6 unless (eval { require Net::PSYC::MMP::Compress }) {
  2         32  
300 0         0 W0('Somehow your Compression modules does not work (%s).', $@);
301 0         0 return 1;
302             }
303              
304 2 50       11 unless ($self->{'_compress'}) {
305 2         297 $self->{'_compress'} = new Net::PSYC::MMP::Compress($self);
306             }
307 2         4 if (Net::PSYC::FORK) {
308             $self->fire($self->{'peeraddr'},0,{ '+_using_modules' => '_compress' },
309 0     0   0 sub { $self->{'_compress'}->init('encrypt') });
310             } else {
311             $self->fire('',0,{ '_using_modules' => '_compress' },
312 2     2   27 sub { $self->{'_compress'}->init('encrypt') });
  2         14  
313             }
314             }
315              
316             sub logon {
317 2     2   3 my $self = shift;
318              
319 2         6 $self->{'O'} = \%Net::PSYC::O;
320             # TODO nonblocking dns.
321 2   33     8 $self->{'R_HOST'} = gethostbyaddr($self->{'SOCKET'}->peeraddr(), AF_INET())
322             || $self->{'R_IP'};
323 2         697 $self->{'IP'} = $self->{'SOCKET'}->sockhost();
324 2         82 $self->{'PORT'} = $self->{'SOCKET'}->sockport();
325 2         53 $self->{'R_IP'} = $self->{'SOCKET'}->peerhost();
326 2         65 $self->{'R_PORT'} = $self->{'SOCKET'}->peerport();
327 2         86 $self->{'LAST_RECV'} = $self->{'SOCKET'}->peername();
328 2         30 register_host($self->{'R_IP'}, inet_ntoa($self->{'SOCKET'}->peeraddr()));
329 2         9 register_host('127.0.0.1', $self->{'IP'});
330 2         8 register_route(inet_ntoa($self->{'SOCKET'}->peeraddr()).":$self->{'R_PORT'}", $self);
331              
332 2         11 W1('TCP: Connected with %s:%s', $self->{'R_IP'}, $self->{'R_PORT'});
333 2         106 syswrite($self->{'SOCKET'}, ".\n");
334              
335             # I would like to rename OK. it may be necessary to work on STATE-BITS in
336             # the future. for now this is okay TODO
337             #
338             # we allow sending messages before receiving a _notice_circuit_established
339             # in case
340             # - we accept()ed the connection
341             # - we are doing blocking writes _and_ reads
342             # - we are anachronistic and not on tls, zlib or lsd TODO
343 2 50       7 unless (BLOCKING() & 1) {
344 2     22   15 Net::PSYC::Event::add($self->{'SOCKET'}, 'w', sub {$self->write()}, 0);
  22         62  
345             }
346 2 100 33     21 if ($self->{'L'} || (BLOCKING() & 1 && BLOCKING() & 2)) {
      66        
347 1         4 $self->{'OK'} = 1;
348             }
349 2         15 $self->greet();
350             }
351              
352             # greet
353             sub greet {
354 2     2   4 my $self = shift;
355              
356             # _notice_circuit_established versenden. (MMP-neg)
357             # we _could_ send _using_modules here.. but. who cares???
358 2         4 my $h;
359             my $m;
360 2         3 if (Net::PSYC::FORK) {
361             $h = {
362             '=_understand_modules' => [ keys %{$self->{'O'}->{'_understand_modules'}} ],
363             '=_implementation' => $self->{'O'}->{'_implementation'},
364             '=_understand_protocols' => $self->{'O'}->{'_understand_protocols'},
365             };
366             $m = make_psyc('_notice_circuit_established',
367             'Connection to [_source] established!');
368             } else {
369 2         26 $h = {
370 2         4 '_understand_modules' => [ keys %{$self->{'O'}->{'_understand_modules'}} ],
371             '_implementation' => $self->{'O'}->{'_implementation'},
372             '_understand_protocols' => $self->{'O'}->{'_understand_protocols'},
373             };
374 2         9 $m = make_psyc('_notice_circuit_established',
375             'Connection to [_source] established!');
376 2         9 $self->fire($self->{'peeraddr'}, $m, $h);
377             # formally this is wrong, because in !FORK _*_modules are psyc vars. but
378             # since the muve does not give a shit we do neither.
379 2         7 $m = make_psyc('_status_circuit',
380             'I feel good.');
381              
382             }
383 2         6 $self->fire($self->{'peeraddr'}, $m, $h);
384             }
385              
386             sub send {
387 11     11   20 my ($self, $target, $data, $vars, $prio) = @_;
388              
389 11         34 W2('"%s" -> send(%.10s.., %s)', $self->{'peeraddr'}, $target, $data);
390 11 50 33     59 if (!exists $vars->{"_source"} && exists $self->{'me'}) {
391 0         0 $vars->{"_source"} = $self->{'me'};
392             }
393 11 100       26 if (ref $data eq 'ARRAY') {
394 1         2 if (1) { #$self->{'O'}->{'_using_modules'}->{'_fragments'}) {
395 1         4 $vars->{'_counter'} = $self->{'FRAGMENT_COUNTER'}++;
396 1         3 $vars->{'_amount_fragments'} = scalar(@$data);
397             } else {
398             # very bad bad idea... better drop the packet
399             $data = [ join('', @$data) ];
400             }
401             } else {
402 10         30 $data = [ $data ];
403             }
404              
405 11         14 push(@{$self->{'O_BUFFER'}}, [ $data, $vars, 0 ]);
  11         33  
406              
407 11 50       28 $self->{'O_COUNT'} = scalar(@{$self->{'O_BUFFER'}}) - 1 if ($prio);
  0         0  
408            
409 11 50       27 if (BLOCKING()) { # send the packet instantly
    50          
410 0         0 $self->write();
411             } elsif ($self->{'OK'}) {
412 0         0 revoke($self->{'SOCKET'}, 'w');
413             }
414            
415 11         44 return 0;
416             }
417              
418             sub fire {
419 6     6   13 my $self = shift;
420 6         17 my ($target, $data, $vars, $cb) = @_;
421              
422 6   100     31 $data ||= '';
423 6   50     15 $vars ||= {};
424 6 100       19 $vars->{'_target'} = $target if $target;
425             #unless ($vars->{'_target'}) {
426             # W("fire may not be called without a proper _target",0);
427             # return 0;
428             #}
429 6   33     52 $vars->{'_source'} ||= delete $vars->{'_source'};
430              
431             #W("'$vars->{'_target'}'->fire('$data', $vars, ".($cb||'undef').")",2);
432              
433 6 100 66     36 if (!exists $vars->{"_source"} && exists $self->{'me'}) {
434 1         3 $vars->{"_source"} = $self->{'me'};
435             }
436              
437 6         204 push(@{$self->{'N_BUFFER'}}, [ [ $data ], $vars, 0, $cb ]);
  6         27  
438 6 50       22 if (BLOCKING()) { # send the packet instantly
439 0         0 $self->write();
440             } else {
441 6         21 Net::PSYC::Event::revoke($self->{'SOCKET'}, 'w');
442             }
443             }
444              
445             sub write () {
446 22     22   30 my $self = shift;
447            
448             # no permission to send packets.. and we are not wierdo enough!
449             # TODO
450 22 100       64 return 1 unless ($self->{'OK'});
451              
452 21         32 my $N = $self->{'N_BUFFER'};
453 21         38 my $O = $self->{'O_BUFFER'};
454 21         26 my ($data, $vars, $count, $cb);
455              
456 21 100       59 if (scalar @$N) {
    50          
457 6         8 ($data, $vars, $count, $cb) = @{$N->[0]};
  6         18  
458             } elsif (exists $O->[$self->{'O_COUNT'}]) {
459 15         22 ($data, $vars, $count) = @{$O->[$self->{'O_COUNT'}]};
  15         46  
460             } else {
461 0         0 W2('packets in %p: %d%s', $self, scalar(@$O), "\n");
462 0         0 return 1; # no packets!
463             }
464            
465 21 100       59 $vars->{'_fragment'} = $count if ($vars->{'_amount_fragments'});
466              
467 21         41 my $d = $data->[$count];
468              
469 1     1   64 use Storable qw(dclone);
  1         2  
  1         2036  
470 21         717 $vars = dclone($vars); # but the current design.. TODO
471             # TODO . shutdown connection if trigger fails. its really important for
472             # encryption/decryption
473 21         82 $self->trigger('send', $vars, \$d);
474            
475 21         213 my $m = make_mmp($vars, $d, $self);
476 21         69 $self->trigger('encrypt', \$m);
477              
478 21 50       400 if (!defined(syswrite($self->{'SOCKET'}, $m))) {
479             # put the packet back into the queue
480            
481 0 0       0 if (++$self->{'error'} >= 3) {
482 0         0 W0('Sending a tcp packet to %s failed for the third time. Closing '.
483             'connection.', $self->{'peeraddr'});
484 0         0 return -1;
485             }
486 0         0 W0('Sending a packet to %s failed (%s). %d more retries.',
487             $self->{'peeraddr'}, $self->{'error'});
488 0         0 return 1;
489             } else {
490 21         108 $self->{'error'} = 0;
491             }
492 21         53 $self->ping_init();
493            
494 21         84 $self->trigger('sent', $vars, \$d);
495 21 100       53 $cb->() if ($cb);
496              
497 21         61 W2('TCP: wrote %d bytes of data to the socket', length($m));
498 21         57 W2('TCP: >>>>>>>> OUTGOING >>>>>>>>\n%s\nTCP: <<<<<<< OUTGOING <<<<<<<\n',
499             $m);
500              
501 21 100 66     100 if (($vars->{'_amount_fragments'} || @$data) == $count + 1) {
502             # all fragments of this packet sent
503             # delete it..
504 17 100       37 if (scalar @$N) {
505 6         8 shift @$N;
506             } else {
507 11         12 splice(@{$O}, $self->{'O_COUNT'}, 1);
  11         27  
508             }
509             } else {
510             # fragments of this packet left
511             # increase the fragment-id
512 4         9 $self->{'O_BUFFER'}->[$self->{'O_COUNT'}]->[2]++;
513             # increase the packet id..
514 4         6 $self->{'O_COUNT'}++;
515             }
516 21 100       85 $self->{'O_COUNT'} = 0 unless ( exists $O->[$self->{'O_COUNT'}] );
517 21 100 100     96 if ( @$N || @$O ) {
518 18 50 33     48 if (BLOCKING() || $Net::PSYC::ANACHRONISM) { # send the packet
519 0         0 $self->write();
520             } else {
521 18         53 revoke($self->{'SOCKET'}, 'w');
522             }
523             }
524 21         226 return 1;
525             }
526              
527             sub read () {
528 7     7   15 my $self = shift;
529 7         11 my ($data, $read);
530            
531             # if you change the buffer-size.. remember to fix buffersize of
532             # MMP::Compress and rest..
533 7         90 $read = sysread($self->{'SOCKET'}, $data, 4096);
534            
535 7 50       21 return if (!$read); # connection lost !?
536             # gibt es nen 'richtigen' weg herauszufinden, ob die connection noch lebt?
537             # connected() und die ganzen anderen socket-funcs helfen einem da in
538             # den ekligen fällen nicht..
539            
540 7 50       44 unless ($self->trigger('decrypt', \$data)) {
541 0         0 W0('Fatal error during decrypt. Closing connection');
542 0         0 return;
543             }
544            
545 7         31 $$self{'I_BUFFER'} .= $data;
546 7 50       16 warn $! unless (defined($read));
547 7         16 $self->{'I_LENGTH'} += $read;
548             # open(file, ">>$self->{'HOST'}:$self->{'PORT'}.in");
549             # print file $data;
550             # print file "\n========\n";
551             # close file;
552 7         26 W2('TCP: Read %d bytes from socket.', $read);
553 7         133 W2('TCP: >>>>>>>> INCOMING >>>>>>>>\n%s\nTCP: <<<<<<< INCOMING <<<<<<<',
554             $data);
555            
556 7 100       21 unless ($self->{'LF'}) {
557             # we need to check for a leading ".\n"
558             # this is not the very best solution though..
559 4 100       12 if ($self->{'I_LENGTH'} > 2) {
560 2 50       17 if ( $self->{'I_BUFFER'} =~ s/^\.(\r?\n)//g ) {
561 2         9 $self->{'LF'} = $1;
562             # remember if the other side uses \n or \r\n
563             # to terminate lines.. we need that for proper
564             # and safe parsing
565             } else {
566 0         0 syswrite($self->{'SOCKET'},
567             make_psyc('_error_syntax_initialization',
568             'The protocol begins with a dot on a line by itself.'));
569 0         0 W0('Closed Connection to %s', $self->{'R_HOST'});
570 0         0 Net::PSYC::shutdown($self);
571             }
572             }
573             }
574            
575 7         36 return 1;
576             }
577              
578             # return undef if packets are incomplete
579             # return 0 if there maybe/are still packets in the buffer
580             # return the packet
581             sub recv () {
582 27     27   40 my $self = shift;
583            
584 27 100       77 return unless ($self->{'LF'});
585 25 100 66     143 return if ($self->{'I_LENGTH'} < 0 || '' eq $self->{'I_BUFFER'});
586              
587 21         84 my ($vars, $data) = parse_mmp(\$$self{'I_BUFFER'}, $self->{'LF'}, $self);
588              
589 21 50       51 return if (!defined($vars));
590            
591 21 50       121 if ($vars < 0) {
592 0         0 $self->{'I_LENGTH'} = $vars;
593 0         0 return;
594             }
595              
596 21 50       49 if ($vars == 0) {
597 0         0 return (-1, $data);
598             }
599              
600 21         29 if (!Net::PSYC::FORK) {
601 21 100       168 if (exists $vars->{'_using_modules'}) {
602              
603 2 50       9 unless (ref $vars->{'_using_modules'} eq 'ARRAY') {
604 2 50       15 $self->negotiate($vars->{'_using_modules'})
605             if $vars->{'_using_modules'};
606             } else {
607 0 0       0 map { $_ && $self->negotiate($_) } @{$vars->{'_using_modules'}};
  0         0  
  0         0  
608             }
609             }
610             }
611              
612 21 50       74 return (-1, "Fatal error during receive.")
613             unless ($self->trigger('receive', $vars, \$data));
614              
615 21 100 100     122 unless (exists $self->{'me'} || $self->{'L'} || !exists $vars->{'_target'}) {
      66        
616 1         3 $self->{'me'} = $vars->{'_target'};
617 1         4 my $r = parse_uniform($vars->{'_target'});
618 1 50 33     7 if (ref $r && $r->{'host'}) {
619             dns_lookup($r->{'host'},
620             sub {
621 1     1   1 my $ip = shift;
622 1 50       3 unless ($ip) {
623 0         0 W0('Could not resolve %s.', $r->{'host'});
624             }
625 1         6 W0('%s -> %s', $r->{'host'}, $ip);
626 1   33     7 register_host('127.0.0.1', $ip || $r->{'host'});
627 1         8 });
628             } else {
629 0         0 W0('I cannot parse that target: %s. Closing connection.',
630             $vars->{'_target'});
631 0         0 return -1;
632             }
633             }
634            
635 21 50       30 $vars = { %{$self->{'vars'}}, %$vars } if (each %{$self->{'vars'}});
  0         0  
  21         68  
636            
637             # TODO return -1 unless trigger().
638             # TODO we have to check _context for consistency anyway! do that or someone
639             # starts killing perlpsycs
640             # these routing schemes are bogus. i would like to use the new ones. should
641             # be easier to do nonblocking dns then. one big change
642 21 100       55 unless (exists $vars->{'_source'}) {
643 20         48 $vars->{'_source'} = $self->{'peeraddr'};
644             } else {
645 1   33     11 my $h = parse_uniform($vars->{'_context'}||$vars->{'_source'});
646 1 50       7 unless (ref $h) {
647 0   0     0 W0('I cannot parse that uni: %s. Closing connection.',
648             $vars->{'_context'} || $vars->{'_source'} );
649 0         0 return -1;
650             }
651            
652 1 50       8 unless (same_host($h->{'host'}, $self->{'R_IP'})) {
653 0 0       0 if ($self->TRUST < 5) {
654             # just dont relay
655 0         0 W0('TCP: Refused packet from %s. (_source: %s)',
656             $self->{'peeraddr'}, $vars->{'_source'});
657 0         0 return 0;
658             }
659             } else {
660             # we will relay for you in the future
661 1         8 register_route($vars->{'_source'}, $self);
662             }
663             }
664             =onion
665             if (exists $vars->{'_source_relay'} && $self->{'_options'}->{'_accept_modules'} =~ /_onion/ && $self->{'r_options'}->{'_accept_modules'} =~ /_onion/) {
666             register_route($vars->{'_source_relay'}, $self);
667             W("_Onion: Use $self->{'R_IP'} to route $vars->{'_source_relay'}",2);
668             # remember pseudo-address to route packets back!
669             }
670             =cut
671             ####
672             # FRAGMENT
673             # handle fragmented data
674             #if (exists $self->{'O'}->{'_understand_modules'}->{'_fragments'}
675             #&& exists $vars->{'_fragment'}) {
676 21 100       58 if (exists $vars->{'_fragment'}) {
677             # {source} {logical target} {counter} [ {fragment} ]
678 5   50     43 my $packet_id = '{'.($vars->{'_source'} || '').
      50        
      50        
679             '}{'.($vars->{'_target'} || '').
680             '}{'.($vars->{'_counter'} || '').'}';
681 5 100       15 if (!exists $self->{'CACHE'}->{$packet_id}) {
682 1         7 $self->{'CACHE'}->{$packet_id} = [
683             {
684             '_totalLength' => $vars->{'_totalLength'},
685             '_amount_fragments' => $vars->{'_amount_fragments'},
686             '_amount' => 0,
687             },
688             []
689             ];
690             }
691 5         11 my $v = $self->{'CACHE'}->{$packet_id}->[0];
692 5         11 my $c = $self->{'CACHE'}->{$packet_id}->[1];
693             # increase the counter
694 5 50       15 $v->{'_amount'}++ if (!$c->[$vars->{'_fragment'}]);
695             #print STDERR "Fragment: $vars->{'_fragment'} (total: $vars->{'_amount_fragments'}, amount: $v->{'_amount'}, id: '$packet_id')\n";
696            
697 5         12 $c->[$vars->{'_fragment'}] = $data;
698 5 100       12 if ($v->{'_amount'} == $v->{'_amount_fragments'}) {
699 1         5 $data = join('', @$c);
700 1         6 delete $self->{'CACHE'}->{$packet_id};
701 1         5 W1('TCP: Fragmented packet complete! length: %d', length($data));
702             } else {
703 4         27 W1('TCP: Fragmented number %d', int($vars->{'_fragment'}));
704 4         21 return 0;
705             }
706             }
707             ####
708 17 100       142 return 0 if ($data eq '');
709            
710 15         63 W1('TCP[%s] => %s', $vars->{'_source'}, $vars->{'_target'});
711 15         122 $vars->{'_INTERNAL_origin'} = $self;
712 15         56 return ($vars, $data);
713             }
714              
715             sub DESTROY {
716 0     0   0 my $self = shift;
717 0 0       0 $self->{'SOCKET'}->shutdown(0) if $self->{'SOCKET'};
718             }
719              
720             # TCP listen class
721             package Net::PSYC::Circuit::L;
722              
723 1     1   7 use strict;
  1         2  
  1         212  
724              
725             import Net::PSYC qw(W0);
726              
727             sub read () {
728 1     1   2 my $self = shift;
729 1         18 my $socket = $self->{'SOCKET'}->accept();
730 1         252 my $obj = Net::PSYC::Circuit->new($socket, {
731             'L' => 1,
732             'R_IP' => $socket->peerhost(),
733             'R_PORT' => $socket->peerport(),
734             });
735 1         8 return 1;
736             }
737              
738 1     1   2 sub recv () { }
739              
740             sub send {
741 0     0     W0("\nTCP: I am listening, not sending! Dont use me that way!");
742             }
743              
744             sub TRUST {
745 0     0     W0("\nTCP: Dont TRUST() me, I'm only listening.");
746             }
747              
748             1;