File Coverage

blib/lib/AnyEvent/Handle/UDP.pm
Criterion Covered Total %
statement 131 181 72.3
branch 30 74 40.5
condition 11 32 34.3
subroutine 41 50 82.0
pod 22 25 88.0
total 235 362 64.9


line stmt bran cond sub pod time code
1             package AnyEvent::Handle::UDP;
2             $AnyEvent::Handle::UDP::VERSION = '0.050';
3 2     2   143733 use strict;
  2         14  
  2         59  
4 2     2   11 use warnings;
  2         4  
  2         47  
5              
6 2     2   2055 use AnyEvent ();
  2         11118  
  2         54  
7 2     2   1082 use AnyEvent::Util ();
  2         23816  
  2         53  
8 2     2   1179 use AnyEvent::Socket ();
  2         30512  
  2         64  
9              
10 2     2   16 use Carp ();
  2         5  
  2         29  
11 2     2   11 use Errno ();
  2         4  
  2         27  
12 2     2   10 use Scalar::Util ();
  2         4  
  2         29  
13 2     2   10 use Socket ();
  2         4  
  2         24  
14 2     2   1097 use Symbol ();
  2         1551  
  2         784  
15              
16             my $subname = eval { require Sub::Name } ? \&Sub::Name::subname : sub { $_[1] };
17             my %non_fatal = map { ( $_ => 1 ) } Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR;
18              
19             sub new {
20 4     4 1 10739 my ($class, %args) = @_;
21             my $self = bless {
22             on_recv => $args{on_recv} || Carp::croak('on_recv not given'),
23             reuse_addr => exists $args{reuse_addr} ? !!$args{reuse_addr} : 1,
24             receive_size => $args{receive_size} || 1500,
25             family => $args{family} || 0,
26             autoflush => $args{autoflush} || 0,
27       3     on_bind => $args{on_bind} || sub {},
28       1     on_connect => $args{on_connect} || sub {},
29 4 50 33     171 fh => $args{fh} || bless(Symbol::gensym(), 'IO::Socket'),
      50        
      50        
      50        
      50        
      50        
      50        
30             buffers => [],
31             }, $class;
32 4         110 $self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
  20         52  
33 4         36 $self->{$_} = AE::now() for qw/activity ractivity wactivity/;
34              
35 4 100       21 $self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
36 4 100       18 $self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};
37              
38 4         11 $self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;
  12         35  
39              
40 4         15 $self->_drained;
41 4         16 return $self;
42             }
43              
44             sub _insert {
45 36     36   65 my ($name, $sub) = @_;
46 2     2   20 no strict 'refs';
  2         4  
  2         4832  
47 36         149 *{$name} = $subname->($name, $sub);
  36         146  
48             }
49              
50             for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
51             _insert($name, sub {
52 1     1 1 42 my $self = shift;
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
53 1 50       8 $self->{$name} = shift if @_;
54 1         2 return $self->{$name};
55             });
56             }
57              
58             for my $name (qw/sockname peername/) {
59             _insert($name, sub {
60 2     2 1 10 my $self = shift;
        2 1    
61 2         10 return $self->{fh}->$name;
62             });
63             }
64              
65             sub fh {
66 0     0 1 0 my $self = shift;
67 0         0 return $self->{fh};
68             }
69              
70             sub on_drain {
71 0     0 1 0 my $self = shift;
72 0 0       0 if (@_) {
73 0         0 $self->{on_drain} = shift;
74 0 0       0 $self->_drained if not @{ $self->{buffers} };
  0         0  
75             }
76 0         0 return $self->{on_drain};
77             }
78              
79             sub _drained {
80 4     4   8 my $self = shift;
81 4 50       14 $self->{on_drain}->($self) if defined $self->{on_drain};
82             }
83              
84             for my $dir ('', 'r', 'w') {
85             my $timeout = "${dir}timeout";
86             my $activity = "${dir}activity";
87             my $on_timeout = "on_$timeout";
88             my $timer = "${dir}timer";
89             my $clear_timeout = "clear_$timeout";
90             my $timeout_reset = "${timeout}_reset";
91              
92             my $callback;
93             $callback = sub {
94             my $self = shift;
95             if (not exists $self->{$timeout} or not $self->{fh}) {
96             delete $self->{$timer};
97             return;
98             }
99             my $now = AE::now;
100             my $after = $self->{$activity} + $self->{$timeout} - $now;
101             if ($after <= 0) {
102             $self->{$activity} = $now;
103             my $time = $self->{$on_timeout};
104             my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
105             $time ? $time->($self) : $self->_error(0, $error);
106             return if not exists $self->{$timeout};
107             }
108             Scalar::Util::weaken($self);
109             return if not $self;
110             $self->{$timer} = AE::timer($after, 0, sub {
111             delete $self->{$timer};
112             $callback->($self);
113             });
114             };
115              
116             _insert($timeout, sub {
117 2     2 1 4 my $self = shift;
        2 1    
        2 1    
118 2 50       7 if (@_) {
119 2         3 my $value = shift;
120 2         7 $self->{$timeout} = $value;
121 2 50       6 if ($value == 0) {
122 0         0 delete $self->{$timer};
123 0         0 delete $self->{$timeout};
124 0         0 return;
125             }
126             else {
127 2         5 $callback->($self);
128             }
129             }
130 2         9 return $self->{$timeout};
131             });
132              
133             _insert($clear_timeout, sub {
134 0     0 0 0 my $self = shift;
        0 0    
        0 0    
135 0         0 delete $self->{$timeout};
136 0         0 return;
137             });
138              
139             _insert($timeout_reset, sub {
140 11     11 1 1522 my $self = shift;
        11 1    
        11 1    
141 11         33 $self->{$activity} = AE::now;
142             });
143             }
144              
145             sub bind_to {
146 0     0 1 0 my ($self, $addr) = @_;
147 0         0 return $self->_bind_to($self->{fh}, $addr);
148             }
149              
150             my $add_reader = sub {
151             my $self = shift;
152             $self->{reader} = AE::io($self->{fh}, 0, sub {
153             while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
154             $self->timeout_reset;
155             $self->rtimeout_reset;
156             $self->{on_recv}->($buffer, $self, $addr);
157             }
158             $self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
159             return;
160             });
161             };
162              
163             sub _bind_to {
164 3     3   9 my ($self, $fh, $addr) = @_;
165             my $bind_to = sub {
166 3     3   8 my ($domain, $type, $proto, $sockaddr) = @_;
167 3 50       11 if (!Scalar::Util::openhandle($fh)) {
168 3 50       123 socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
169 3         22 AnyEvent::Util::fh_nonblocking $fh, 1;
170 3 50 50     73 setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or die "Couldn't set so_reuseaddr: $!" if $self->{reuse_addr};
171 3         13 $add_reader->($self);
172             }
173 3 50       66 if (bind $fh, $sockaddr) {
174 3         11 $self->{on_bind}->();
175             }
176             else {
177 0         0 die "Could not bind: $!";
178             }
179 3         14 };
180 3 50       11 if (ref $addr) {
181 3         6 my ($host, $port) = @{$addr};
  3         8  
182 3         9 _on_addr($self, $fh, $host, $port, $bind_to);
183             }
184             else {
185 0 0       0 eval { $bind_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr); 1 }
  0         0  
  0         0  
186             or $self->_error(1, $@);
187             }
188 3         19 return;
189             }
190              
191             sub connect_to {
192 0     0 1 0 my ($self, $addr) = @_;
193 0         0 return $self->_connect_to($self->{fh}, $addr);
194             }
195              
196             sub _connect_to {
197 1     1   4 my ($self, $fh, $addr) = @_;
198             my $connect_to = sub {
199 1     1   3 my ($domain, $type, $proto, $sockaddr) = @_;
200 1 50       5 if (!Scalar::Util::openhandle($fh)) {
201 1 50       30 socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
202 1         5 AnyEvent::Util::fh_nonblocking $fh, 1;
203 1         10 $add_reader->($self);
204             }
205 1 50       15 if (connect $fh, $sockaddr) {
206 1         4 $self->{on_connect}->();
207             }
208             else {
209 0         0 die "Could not connect: $!";
210             }
211 1         5 };
212 1 50       3 if (ref $addr) {
213 0         0 my ($host, $port) = @{$addr};
  0         0  
214 0         0 _on_addr($self, $fh, $host, $port, $connect_to);
215             }
216             else {
217 1 50       3 eval { $connect_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr); 1 }
  1         5  
  1         4  
218             or $self->_error(1, $@);
219             }
220 1         7 return;
221             }
222              
223             my $get_family = sub {
224             my ($self, $fh) = @_;
225             return $self->{family} if !Scalar::Util::openhandle($fh) || !getsockname $fh;
226             my $family = Socket::sockaddr_family(getsockname $fh);
227             return +($family == Socket::AF_INET) ? 4 : ($family == Socket::AF_INET6) ? 6 : $self->{family};
228             };
229              
230             sub _on_addr {
231 3     3   8 my ($self, $fh, $host, $port, $on_success) = @_;
232              
233             AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', $get_family->($self, $fh), Socket::SOCK_DGRAM, sub {
234 3     3   889 my @targets = @_;
235 3         11 while (@targets) {
236 3         7 my $target = shift @targets;
237 3 50       7 eval { $on_success->(@{$target}); 1 } and return;
  3         5  
  3         10  
  3         17  
238             }
239 0         0 $self->_error(1, "Could not resolve $host:$port")
240 3         9 });
241 3         112 return;
242             }
243              
244             sub _error {
245 0     0   0 my ($self, $fatal, $message) = @_;
246              
247 0 0       0 if (exists $self->{on_error}) {
248 0         0 $self->{on_error}->($self, $fatal, $message);
249 0 0       0 $self->destroy if $fatal;
250             } else {
251 0         0 $self->destroy;
252 0         0 Carp::croak("AnyEvent::Handle::UDP uncaught error: $message");
253             }
254 0         0 return;
255             }
256              
257             sub push_send {
258 2     2 1 602 my ($self, $message, $to, $cv) = @_;
259 2 0       8 $to = AnyEvent::Socket::pack_sockaddr($to->[1], defined $to->[0] ? AnyEvent::Socket::parse_address($to->[0]) : Socket::INADDR_ANY) if ref $to;
    50          
260 2 50 0     6 $cv ||= AnyEvent::CondVar->new if defined wantarray;
261 2 50 33     8 if ($self->{autoflush} and ! @{ $self->{buffers} }) {
  0         0  
262 0         0 my $ret = $self->_send($message, $to, $cv);
263 0 0 0     0 $self->_push_writer($message, $to, $cv) if not defined $ret and $non_fatal{$! + 0};
264 0 0       0 $self->_drained if $ret;
265             }
266             else {
267 2         6 $self->_push_writer($message, $to, $cv);
268             }
269 2         16 return $cv;
270             }
271              
272             sub _send {
273 2     2   5 my ($self, $message, $to, $cv) = @_;
274 2 100       67 my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0;
275 2 0 33     10 $self->_error(1, "Could not send: $!") if not defined $ret and !$non_fatal{$! + 0};
276 2 50       13 if (defined $ret) {
277 2         8 $self->timeout_reset;
278 2         7 $self->wtimeout_reset;
279 2 50       5 $cv->($ret) if defined $cv;
280             }
281 2         4 return $ret;
282             }
283              
284             sub _push_writer {
285 2     2   5 my ($self, $message, $to, $condvar) = @_;
286 2         4 push @{ $self->{buffers} }, [ $message, $to, $condvar ];
  2         8  
287             $self->{writer} ||= AE::io $self->{fh}, 1, sub {
288 2 50   2   49 if (@{ $self->{buffers} }) {
  2         8  
289 2         4 while (my $entry = shift @{ $self->{buffers} }) {
  4         12  
290 2         4 my ($msg, $to, $cv) = @{$entry};
  2         5  
291 2         7 my $ret = $self->_send($msg, $to, $cv);
292 2 50       9 if (not defined $ret) {
293 0 0       0 unshift @{ $self->{buffers} }, $entry if $self->{buffers};
  0         0  
294 0         0 last;
295             }
296             }
297             }
298             else {
299 0         0 $self->_drained;
300             }
301 2 50       4 delete $self->{writer} if not @{ $self->{buffers} };
  2         31  
302 2   33     47 };
303 2         6 return $condvar;
304             }
305              
306             sub destroy {
307 0     0 1   my $self = shift;
308 0           delete @{$self}{qw/reader writer/};
  0            
309 0           close $self->{fh};
310 0           $self->timeout(0);
311 0           $self->rtimeout(0);
312 0           $self->wtimeout(0);
313 0           return;
314             }
315              
316             1;
317              
318             # ABSTRACT: client/server UDP handles for AnyEvent
319              
320             __END__