File Coverage

blib/lib/AnyEvent/PacketForwarder.pm
Criterion Covered Total %
statement 18 64 28.1
branch 0 20 0.0
condition 0 8 0.0
subroutine 6 14 42.8
pod 1 2 50.0
total 25 108 23.1


line stmt bran cond sub pod time code
1             package AnyEvent::PacketForwarder;
2              
3 1     1   21153 use strict;
  1         3  
  1         42  
4 1     1   5 use warnings;
  1         2  
  1         84  
5              
6             our $VERSION = '0.01';
7              
8             require Exporter;
9             our @ISA = qw(Exporter);
10             our @EXPORT = qw(packet_forwarder);
11              
12 1     1   1745 use AnyEvent;
  1         6644  
  1         36  
13 1     1   1082 use AnyEvent::PacketReader;
  1         15421  
  1         59  
14 1     1   9 use Errno qw(EPIPE EMSGSIZE EINTR EAGAIN EWOULDBLOCK ENODATA);
  1         2  
  1         59  
15 1     1   6 use Carp;
  1         2  
  1         653  
16             our @CARP_NOT = qw(AnyEvent::PacketReader);
17              
18             our $QUEUE_SIZE = 10;
19              
20             sub packet_forwarder {
21 0     0 1   my $cb = pop;
22 0           my ($in, $out, $templ, $max_load_length, $queue_size) = @_;
23 0   0       $queue_size ||= $QUEUE_SIZE;
24              
25             # data is: 0:reader, 1:out, 2:queue_size, 3:queue, 4:cb, 5:out_watcher
26 0           my $data = [ undef , $out , $queue_size , [] , $cb , undef ];
27 0     0     $data->[0] = packet_reader $in, $templ, $max_load_length, sub { _packet($_[0], $data) };
  0            
28              
29 0           my $obj = \$data;
30 0           bless $obj;
31             }
32              
33             sub _push {
34 0     0     my $data = $_[1];
35             # use Data::Dumper;
36             # print STDERR Data::Dumper->Dump([\@_, $data], [qw(@_ $data)]);
37 0 0         if (length $_[0]) {
38 0           my $queue = $data->[3];
39 0           push @$queue, $_[0];
40 0 0         $data->[0]->pause if @$queue == $data->[2];
41 0   0 0     $data->[5] ||= AE::io $data->[1], 1, sub { _write($data) };
  0            
42             }
43             }
44              
45             sub _packet {
46 0     0     my $data = $_[1];
47 0 0         if (defined $_[0]) {
48             # use Data::Dumper;
49             # print STDERR Data::Dumper->Dump([$data], [qw($data)]);
50 0           $data->[4]->($_[0]);
51 0           _push(@_);
52 0           return;
53             }
54 0           $data->[4]->();
55 0 0         _fatal_write($data, ENODATA) unless defined $data->[5];
56 0           undef $data->[0];
57             }
58              
59             sub _write {
60 0     0     my $data = shift;
61 0           my $queue = $data->[3];
62 0           while (@$queue) {
63 0 0         unless (length $queue->[0]) {
64 0 0         $data->[0]->resume if @$queue == $data->[2];
65 0           shift @$queue;
66 0           next;
67             }
68              
69 0           my $bytes = syswrite($data->[1], $queue->[0]);
70 0 0         if ($bytes) {
    0          
71 0           substr($queue->[0], 0, $bytes, '');
72             }
73             elsif (defined $bytes) {
74 0           _fatal_write($data, EPIPE);
75             }
76             else {
77 0   0       $! == $_ and return for (EINTR, EAGAIN, EWOULDBLOCK);
78 0           _fatal_write($data);
79             }
80 0           return;
81             }
82 0 0         unless (defined $data->[0]) {
83 0           return _fatal_write($data, ENODATA);
84             }
85 0           undef $data->[5];
86             }
87              
88             sub _fatal_write {
89 0     0     my $data = shift;
90 0 0         local $! = shift if @_;
91 0           $data->[4]->(undef, 1);
92             }
93              
94             sub push {
95 0     0 0   my $data = ${shift()};
  0            
96 0           _push($_[0], $data);
97             }
98              
99             1;
100             __END__