File Coverage

blib/lib/AnyEvent/PacketReader.pm
Criterion Covered Total %
statement 24 124 19.3
branch 0 76 0.0
condition 0 16 0.0
subroutine 8 16 50.0
pod 1 3 33.3
total 33 235 14.0


line stmt bran cond sub pod time code
1             package AnyEvent::PacketReader;
2              
3             our $VERSION = '0.01';
4              
5 1     1   18998 use strict;
  1         1  
  1         31  
6 1     1   5 use warnings;
  1         3  
  1         27  
7 1     1   18 use 5.010;
  1         8  
  1         74  
8              
9             require Exporter;
10             our @ISA = qw(Exporter);
11             our @EXPORT = qw(packet_reader);
12              
13 1     1   1347 use AnyEvent;
  1         4955  
  1         24  
14 1     1   5 use Carp;
  1         1  
  1         81  
15 1     1   819 use Errno qw(EPIPE EBADMSG EMSGSIZE EINTR EAGAIN EWOULDBLOCK);
  1         1315  
  1         272  
16              
17             our $MAX_TOTAL_LENGTH = 1e6;
18              
19             our $debug;
20              
21             my %header_length = ( n => 2,
22             v => 2,
23             N => 4,
24             V => 4,
25             W => 1,
26             S => 2,
27             L => 4,
28             Q => 8 );
29              
30             for my $dir (qw(> <)) {
31             for my $t (qw(S L Q)) {
32             $header_length{"$t$dir"} = $header_length{$t};
33             }
34             }
35              
36             my %short_templ = map { $_ => $_ } keys %header_length;
37             my %load_offset = %header_length;
38             my $good_packers = join '', keys %header_length;
39              
40 1     1   878 use Data::Dumper;
  1         8466  
  1         680  
41             $SIG{INT} = sub {
42             print Data::Dumper->Dump([\%short_templ, \%header_length, \%load_offset], [qw(%short_templ %header_length %load_offset)]);
43             exit 1;
44             };
45              
46             sub packet_reader {
47 0     0 1   my $cb = pop;
48 0           my ($fh, $templ, $max_total_length) = @_;
49 0 0 0       croak 'Usage: packet_reader($fh, [$templ, [$max_total_length,]] $callback)'
50             unless defined $fh and defined $cb;
51              
52 0   0       $max_total_length ||= $MAX_TOTAL_LENGTH;
53 0           my ($header_length, $load_offset, $short_templ);
54              
55 0 0         if (defined $templ) {
56 0 0         unless (defined($short_templ = $short_templ{$templ})) {
57 0 0         $debug and warn "PR: examining template '$templ'\n";
58 0           my $load_offset;
59 0 0         if ($templ =~ /^(x+)(\d*)/g) {
    0          
60 0 0         $header_length = length($1) + (length $2 ? $2 - 1 : 0);
61             }
62             elsif ($templ =~ /^\@!(\d*)/g) {
63 0 0         $header_length = (length $1 ? $1 : 1);
64             }
65             else {
66 0           $header_length = 0;
67             }
68              
69 0 0         $templ =~ /\G([$good_packers][<>]?)/go
70             or croak "bad header template '$templ'";
71              
72 0   0       $header_length += ($header_length{$1} // die "Internal error: \$header_length{$1} is not defined");
73              
74 0           $short_templ = substr $templ, 0, pos($templ);
75              
76              
77 0 0         if ($templ =~ /\G\@!(\d*)/g) {
78 0 0         $load_offset = (length $1 ? $1 : 1);
79             }
80             else {
81 0           $load_offset = $header_length;
82 0 0         if ($templ =~ /\G(x+)(\d*)/g) {
83 0 0         $load_offset += length $1 + (length $2 ? $2 - 1 : 0);
84             }
85             }
86              
87 0 0         $templ =~ /\G$/g or croak "bad header template '$templ'";
88              
89 0           $short_templ{$templ} = $short_templ;
90 0           $header_length{$templ} = $header_length;
91 0           $load_offset{$templ} = $load_offset;
92 0 0         $debug and warn "PR: template '$templ' examined, header_length: $header_length, load_offset: $load_offset\n";
93             }
94              
95 0           $header_length = $header_length{$templ};
96 0           $load_offset = $load_offset{$templ};
97              
98             }
99             else {
100 0 0         $debug and warn "PR: defaulting to template 'N'\n";
101 0           $templ = 'N';
102 0           $header_length = 4;
103 0           $load_offset = 4;
104             }
105              
106             # data is: 0:buffer, 1:fh, 2:watcher, 3:header_length, 4:total_length, 5:short_templ, 6:max_total_length, 7:cb, 8:load_offset
107 0           my $data = ['' , $fh , undef , $header_length , undef , $short_templ , $max_total_length , $cb , $load_offset ];
108 0           my $obj = \$data;
109 0           bless $obj;
110 0           $obj->resume;
111 0           $obj;
112             }
113              
114             sub pause {
115 0     0 0   my $data = ${shift()};
  0            
116 0           $data->[2] = undef;
117             }
118              
119             sub resume {
120 0     0 0   my $data = ${shift()};
  0            
121 0 0         if (defined(my $fh = $data->[1])) {
122 0     0     $data->[2] = AE::io $fh, 0, sub { _read($data) };
  0            
123             }
124             }
125              
126             sub DESTROY {
127 0     0     my $obj = shift;
128 0 0 0       $debug and warn "PR: watcher is gone, aborting read\n" if ${$obj}->[3];
  0            
129 0           @{$$obj} = ();
  0            
130             }
131              
132             sub _hexdump {
133 0     0     local ($!, $@);
134 1     1   6 no warnings qw(uninitialized);
  1         1  
  1         750  
135 0           while ($_[0] =~ /(.{1,32})/smg) {
136 0           my $line = $1;
137 0           my @c= (( map { sprintf "%02x",$_ } unpack('C*', $line)),
  0            
138             ((" ") x 32))[0..31];
139 0 0         $line=~s/(.)/ my $c=$1; unpack("c",$c)>=32 ? $c : '.' /egms;
  0            
  0            
140 0           print STDERR "$_[1] ", join(" ", @c, '|', $line), "\n";
141             }
142 0           print STDERR "\n";
143              
144             }
145              
146             sub _read {
147 0     0     my $data = shift;
148 0   0       my $length = $data->[4] || $data->[3];
149 0           my $offset = length $data->[0];
150 0           my $remaining = $length - $offset;
151 0           my $bytes = sysread($data->[1], $data->[0], $remaining, $offset);
152 0 0         if ($bytes) {
    0          
153 0 0         $debug and warn "PR: $bytes bytes read\n";
154 0 0         if (length $data->[0] == $length) {
155 0 0         unless (defined $data->[4]) {
156 0           my $load_length = unpack $data->[5], $data->[0];
157 0 0         unless (defined $load_length) {
158 0 0         $debug and warn "PR: unable to extract size field from header\n";
159 0           return _fatal($data, EBADMSG);
160             }
161 0           my $total_length = $data->[8] + $load_length;
162 0 0         $debug and warn "PR: reading full packet ".
163             "(load length: $load_length, total: $total_length, current: $length)\n";
164              
165 0 0         if ($total_length > $data->[6]) {
166 0 0         $debug and warn "PR: received packet is too long\n";
167 0           return _fatal($data, EMSGSIZE)
168             }
169 0 0         if ($length < $total_length) {
170 0           $data->[4] = $total_length;
171 0           return;
172             }
173             # else, the packet is done
174 0 0         if ($length > $total_length) {
175 0 0         $debug and warn "PR: header length ($length) > total length ($total_length)\n";
176 0           return _fatal($data, EBADMSG);
177             }
178             }
179              
180 0 0         $debug and warn "PR: packet read, invoking callback\n";
181 0           $data->[7]->($data->[0]);
182             # somebody may have taken a reference to the buffer so we start clean:
183 0           @$data = ('', @$data[1..3], undef, @$data[5..$#$data]);
184 0 0         $debug and warn "PR: waiting for a new packet\n";
185             }
186             }
187             elsif (defined $bytes) {
188 0 0         $debug and warn "PR: EOF!\n";
189 0           return _fatal($data, EPIPE);
190             }
191             else {
192 0 0         $debug and warn "PR: sysread failed: $!\n";
193 0   0       $! == $_ and return for (EINTR, EAGAIN, EWOULDBLOCK);
194 0           return _fatal($data);
195             }
196             }
197              
198             sub _fatal {
199 0     0     my $data = shift;
200 0 0         local $! = shift if @_;
201 0 0         if ($debug) {
202 0           warn "PR: fatal error: $!\n";
203 0           _hexdump($data->[0], 'pkt:');
204             }
205 0           $data->[7]->();
206 0           @$data = (); # release watcher;
207             }
208              
209             1;
210             __END__