File Coverage

lib/AnyEvent/Connection/Raw.pm
Criterion Covered Total %
statement 67 135 49.6
branch 14 64 21.8
condition 1 8 12.5
subroutine 14 25 56.0
pod 1 9 11.1
total 97 241 40.2


line stmt bran cond sub pod time code
1             package #hide
2             AnyEvent::Connection::Raw;
3              
4 2     2   13 use common::sense 2;m{
  2         68  
  2         18  
5             use strict;
6             use warnings;
7             };
8 2     2   281 use Object::Event 1.21;
  2         40  
  2         193  
9 2     2   13 use base 'Object::Event';
  2         4  
  2         216  
10 2     2   4218 use AnyEvent::Handle;
  2         42496  
  2         117  
11 2     2   1242 use AnyEvent::Connection::Util;
  2         6  
  2         13  
12 2     2   10 use Scalar::Util qw(weaken);
  2         3  
  2         85  
13 2     2   8 use Carp;
  2         4  
  2         15165  
14             # @rewrite s/^# //;
15             # use Devel::Leak::Cb;
16              
17             sub _call_waiting {
18 4     4   6 my $me = shift;
19 4         8 for my $k (keys %{ $me->{waitingcb} }) {
  4         38  
20 1 50       5 warn "call waiting $k with @_" if $me->{debug};
21 1 50       5 if ($me->{waitingcb}{$k}) {
22 1         5 $me->{waitingcb}{$k}->(undef, @_);
23             }
24 1         9731 delete $me->{waitingcb}{$k};
25             }
26             }
27              
28             sub new {
29 4     4 1 12 my $pkg = shift;
30 4         46 my $self = $pkg->SUPER::new(@_);
31 4 50       430 $self->{nl} = "\015\012" unless defined $self->{nl};
32 4 50       16 $self->{debug} = 0 unless defined $self->{debug};
33 4         16 weaken(my $me = $self);
34             # @rewrite s/sub /cb 'conn.cb.eof' /;
35             $self->{cb}{eof} = sub {
36 0 0   0   0 $me or return;
37             #local *__ANON__ = 'conn.cb.eof';
38 0         0 warn "[\U$me->{side}\E] Eof on handle";
39 0         0 delete $me->{h};
40 0         0 $me->event('disconnect');
41 0         0 $me->_call_waiting("EOF from handle");
42 4         37 } ;
43             # @rewrite s/sub /cb 'conn.cb.err' /;
44             $self->{cb}{err} = sub {
45 1 50   1   1569 $me or return;
46             #local *__ANON__ = 'conn.cb.err';
47             #use Carp;Carp::cluck((0+$!).": $!");
48 1         5 my $e = "$!";
49 1 50       7 if ( $me->{destroying} ) {
50 0         0 warn "err on destroy";
51 0         0 $e = "Connection closed";
52             } else {
53             #warn "[\U$me->{side}\E] Error on handle: $e"; # uncomment
54             }
55 1         3 delete $me->{h};
56 1         16 $self->event( disconnect => "Error: $e" );
57 1         76 $me->_call_waiting($e);
58 4         70 };
59 4   50     14 $self->{timeout} ||= 30;
60 4         57 $self->{h} = AnyEvent::Handle->new(
61             fh => $self->{fh},
62             autocork => 1,
63             keepalive => 1,
64             on_eof => $self->{cb}{eof},
65             on_error => $self->{cb}{err},
66             );
67 4         505 $self;
68             }
69              
70             sub destroy {
71 4     4 0 10 my ($self) = @_;
72 4         20 $self->DESTROY;
73 4         32 bless $self, "AnyEvent::Connection::Raw::destroyed";
74             }
75             *close = \&destroy;
76 1     1   4 sub AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
77             sub DESTROY {
78 4     4   98 my $self = shift;
79 4 50       30 warn "(".int($self).") Destroying AE::CNN::Raw" if $self->{debug};
80 4         13 delete $self->{fh};
81 4         31 $self->_call_waiting("destroying connection");
82 4 100       49 $self->{h} and $self->{h}->destroy;
83 4         114 delete $self->{h};
84 4         88 %$self = ();
85 4         12 return;
86             }
87              
88             sub push_write {
89 0     0 0 0 my $self = shift;
90 0 0       0 $self->{h} or return;
91 0         0 for (@_) {
92 0 0 0     0 if (!ref and utf8::is_utf8($_)) {
93 0         0 $_ = $_;
94 0         0 utf8::encode $_;
95             }
96             }
97 0         0 $self->{h}->push_write(@_);
98 0 0       0 warn ">> @_ " if $self->{debug};
99             }
100              
101             sub push_read {
102 0     0 0 0 my $self = shift;
103 0         0 my $cb = pop;
104 0 0       0 $self->{h} or return;
105 0 0       0 $self->{h}->timeout($self->{timeout}) if $self->{timeout};
106             $self->{h}->push_read(@_,sub {
107 0     0   0 shift->timeout(undef); # disable timeout and remove handle from @_
108 0         0 $cb->($self,@_);
109 0         0 undef $cb;
110 0         0 });
111             }
112              
113             sub unshift_read {
114 0     0 0 0 my $self = shift;
115 0 0       0 $self->{h} or return;
116 0         0 $self->{h}->unshift_read(@_);
117             }
118              
119             sub say {
120 0     0 0 0 my $self = shift;
121 0 0       0 $self->{h} or return;
122 0         0 for (@_) {
123 0 0 0     0 if (!ref and utf8::is_utf8($_)) {
124 0         0 $_ = $_;
125 0         0 utf8::encode $_;
126             }
127             }
128 0         0 $self->{h}->push_write("@_$self->{nl}");
129 0 0       0 warn ">> @_ " if $self->{debug};
130 0         0 return;
131             }
132             *reply = \&say;
133              
134             sub recv {
135 0     0 0 0 my ($self,$bytes,%args) = @_;
136 0 0       0 $args{cb} or croak "no cb for recv at @{[ (caller)[1,2] ]}";
  0         0  
137 0 0       0 $self->{h} or return $args{cb}(undef,"Not connected");
138 0 0       0 warn "<+ read $bytes " if $self->{debug};
139 0         0 weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
140             $self->{h}->unshift_read( chunk => $bytes, sub {
141             #local *__ANON__ = 'conn.recv.read';
142             # Also eat CRLF or LF from read buffer
143 0 0   0   0 substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
144 0 0       0 substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
145 0         0 delete $self->{waitingcb}{int $args{cb}};
146 0         0 shift; (delete $args{cb})->(@_);
  0         0  
147 0         0 %args = ();
148 0         0 } );
149             }
150              
151             sub command {
152 1     1 0 7041 my $self = shift;
153 1         4 my $write = shift;
154 1 50       7 if (utf8::is_utf8($write)) {
155 0         0 utf8::encode $write;
156             }
157 1         4 my %args = @_;
158 1 50       6 $args{cb} or croak "no cb for command at @{[ (caller)[1,2] ]}";
  0         0  
159 1 50       4 $self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
160 1         20 weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
161            
162             #my $i if 0;
163             #my $c = ++$i;
164 1 50       6 warn ">> $write " if $self->{debug};
165 1         13 $self->{h}->push_write("$write$self->{nl}");
166             #$self->{h}->timeout( $self->{select_timeout} );
167 1 50       89 warn "{debug};
168             # @rewrite s/sub {/cb 'conn.command.read' {/;
169             $self->{h}->push_read( regex => qr<\015?\012>, sub {
170             #local *__ANON__ = 'conn.command.read';
171 0     0     shift;
172 0           for (@_) {
173 0           chomp;
174 0 0         substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
175             }
176 0 0         warn "<< @_ " if $self->{debug};
177 0           delete $self->{waitingcb}{int $args{cb}};
178 0           delete($args{cb})->(@_);
179 0           %args = ();
180 0           undef $self;
181 1         31 } );
182             #sub {
183             #$self->{state}{handle}->timeout( 0 ) if $self->_qsize < 1;
184             #diag "<< $c. $write: $_[1] (".$self->_qsize."), timeout ".($self->{state}{handle}->timeout ? 'enabled' : 'disabled');
185             #$cb->(@_);
186             #});
187             }
188              
189             # Serverside feature
190             sub want_command {
191 0     0 0   my $self = shift;
192 0 0         $self->{h} or return;
193             # @rewrite s/sub {/cb 'conn.wand_command.read' {/;
194             $self->{h}->push_read( regex => qr<\015?\012>, sub {
195             #local *__ANON__ = 'conn.want_command.read';
196 0     0     shift;
197 0           for (@_) {
198 0           chomp;
199 0 0         substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
200             }
201 0           $self->event( command => @_ );
202 0           $self->want_command;
203 0           });
204             }
205              
206             1;