File Coverage

blib/lib/AnyEvent/Handle/UDP.pm
Criterion Covered Total %
statement 132 178 74.1
branch 30 72 41.6
condition 8 26 30.7
subroutine 41 50 82.0
pod 22 25 88.0
total 233 351 66.3


line stmt bran cond sub pod time code
1             package AnyEvent::Handle::UDP;
2             $AnyEvent::Handle::UDP::VERSION = '0.048';
3 2     2   29031 use strict;
  2         2  
  2         44  
4 2     2   6 use warnings;
  2         2  
  2         36  
5              
6 2     2   1726 use AnyEvent ();
  2         7563  
  2         40  
7 2     2   933 use AnyEvent::Util ();
  2         17548  
  2         39  
8 2     2   1006 use AnyEvent::Socket ();
  2         19880  
  2         48  
9              
10 2     2   11 use Carp ();
  2         2  
  2         22  
11 2     2   6 use Errno ();
  2         2  
  2         18  
12 2     2   5 use Scalar::Util ();
  2         2  
  2         18  
13 2     2   5 use Socket ();
  2         2  
  2         18  
14 2     2   824 use Symbol ();
  2         1218  
  2         102  
15              
16             BEGIN {
17 2 50   2   3 *subname = eval { require Sub::Name } ? \&Sub::Name::subname : sub { $_[1] };
  2         382  
  0         0  
18             }
19 2     2   1237 use namespace::clean;
  2         19909  
  2         7  
20             my %non_fatal = map { ( $_ => 1 ) } Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR;
21              
22             sub new {
23 4     4 1 6284 my ($class, %args) = @_;
24             my $self = bless {
25             on_recv => $args{on_recv} || Carp::croak('on_recv not given'),
26             reuse_addr => exists $args{reuse_addr} ? !!$args{reuse_addr} : 1,
27             receive_size => $args{receive_size} || 1500,
28             family => $args{family} || 0,
29 4 50 33     113 autoflush => $args{autoflush} || 0,
      50        
      50        
      50        
30             buffers => [],
31             }, $class;
32 4         7 $self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
  20         34  
33 4         36 $self->{$_} = AE::now() for qw/activity ractivity wactivity/;
34              
35 4         11 $self->{fh} = bless Symbol::gensym(), 'IO::Socket';
36 4 100       58 $self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
37 4 100       13 $self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};
38              
39 4         7 $self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;
  12         25  
40              
41 4         8 $self->_drained;
42 4         9 return $self;
43             }
44              
45             sub _insert {
46 36     36   34 my ($name, $sub) = @_;
47 2     2   678 no strict 'refs';
  2         2  
  2         3174  
48 36         82 *{$name} = subname $name, $sub;
  36         108  
49             }
50              
51             for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
52             _insert($name, sub {
53 1     1 1 37 my $self = shift;
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
54 1 50       5 $self->{$name} = shift if @_;
55 1         6 return $self->{$name};
56             });
57             }
58              
59             for my $name (qw/sockname peername/) {
60             _insert($name, sub {
61 2     2 1 7 my $self = shift;
        2 1    
62 2         6 return $self->{fh}->$name;
63             });
64             }
65              
66             sub fh {
67 0     0 1 0 my $self = shift;
68 0         0 return $self->{fh};
69             }
70              
71             sub on_drain {
72 0     0 1 0 my $self = shift;
73 0 0       0 if (@_) {
74 0         0 $self->{on_drain} = shift;
75 0 0       0 $self->_drained if not @{ $self->{buffers} };
  0         0  
76             }
77 0         0 return $self->{on_drain};
78             }
79              
80             sub _drained {
81 4     4   6 my $self = shift;
82 4 50       12 $self->{on_drain}->($self) if defined $self->{on_drain};
83             }
84              
85             for my $dir ('', 'r', 'w') {
86             my $timeout = "${dir}timeout";
87             my $activity = "${dir}activity";
88             my $on_timeout = "on_$timeout";
89             my $timer = "${dir}timer";
90             my $clear_timeout = "clear_$timeout";
91             my $timeout_reset = "${timeout}_reset";
92              
93             my $callback;
94             $callback = sub {
95             my $self = shift;
96             if (not exists $self->{$timeout} or not $self->{fh}) {
97             delete $self->{$timer};
98             return;
99             }
100             my $now = AE::now;
101             my $after = $self->{$activity} + $self->{$timeout} - $now;
102             if ($after <= 0) {
103             $self->{$activity} = $now;
104             my $time = $self->{$on_timeout};
105             my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
106             $time ? $time->($self) : $self->_error->(0, $error);
107             return if not exists $self->{$timeout};
108             }
109             Scalar::Util::weaken($self);
110             return if not $self;
111             $self->{$timer} = AE::timer($after, 0, sub {
112             delete $self->{$timer};
113             $callback->($self);
114             });
115             };
116              
117             _insert($timeout, sub {
118 2     2 1 2 my $self = shift;
        2 1    
        2 1    
119 2 50       5 if (@_) {
120 2         2 my $value = shift;
121 2         4 $self->{$timeout} = $value;
122 2 50       4 if ($value == 0) {
123 0         0 delete $self->{$timer};
124 0         0 delete $self->{$timeout};
125 0         0 return;
126             }
127             else {
128 2         5 $callback->($self);
129             }
130             }
131 2         5 return $self->{$timeout};
132             });
133              
134             _insert($clear_timeout, sub {
135 0     0 0 0 my $self = shift;
        0 0    
        0 0    
136 0         0 delete $self->{$timeout};
137 0         0 return;
138             });
139              
140             _insert($timeout_reset, sub {
141 11     11 1 1362 my $self = shift;
        11 1    
        11 1    
142 11         21 $self->{$activity} = AE::now;
143             });
144             }
145              
146             sub bind_to {
147 0     0 1 0 my ($self, $addr) = @_;
148 0         0 return $self->_bind_to($self->{fh}, $addr);
149             }
150              
151             my $add_reader = sub {
152             my $self = shift;
153             $self->{reader} = AE::io($self->{fh}, 0, sub {
154             while (defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
155             $self->timeout_reset;
156             $self->rtimeout_reset;
157             $self->{on_recv}->($buffer, $self, $addr);
158             }
159             $self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
160             return;
161             });
162             };
163              
164             sub _bind_to {
165 3     3   4 my ($self, $fh, $addr) = @_;
166             my $bind_to = sub {
167 3     3   6 my ($domain, $type, $proto, $sockaddr) = @_;
168 3 50       10 if (!Scalar::Util::openhandle($fh)) {
169 3 50       78 socket $fh, $domain, $type, $proto or redo;
170 3         10 AnyEvent::Util::fh_nonblocking $fh, 1;
171 3 50 50     39 setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or $self->_error(1, "Couldn't set so_reuseaddr: $!") if $self->{reuse_addr};
172 3         11 $add_reader->($self);
173             }
174 3 50       38 bind $fh, $sockaddr or $self->_error(1, "Could not bind: $!");
175 3         21 };
176 3 50       9 if (ref $addr) {
177 3         2 my ($host, $port) = @{$addr};
  3         6  
178 3         7 _on_addr($self, $fh, $host, $port, $bind_to);
179             }
180             else {
181 0         0 $bind_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr);
182             }
183 3         26 return;
184             }
185              
186             sub connect_to {
187 0     0 1 0 my ($self, $addr) = @_;
188 0         0 return $self->_connect_to($self->{fh}, $addr);
189             }
190              
191             sub _connect_to {
192 1     1   2 my ($self, $fh, $addr) = @_;
193             my $connect_to = sub {
194 1     1   2 my ($domain, $type, $proto, $sockaddr) = @_;
195 1 50       2 if (!Scalar::Util::openhandle($fh)) {
196 1 50       13 socket $fh, $domain, $type, $proto or redo;
197 1         3 AnyEvent::Util::fh_nonblocking $fh, 1;
198 1         4 $add_reader->($self);
199             }
200 1 50       9 connect $fh, $sockaddr or $self->_error(1, "Could not connect: $!");
201 1         3 };
202 1 50       3 if (ref $addr) {
203 0         0 my ($host, $port) = @{$addr};
  0         0  
204 0         0 _on_addr($self, $fh, $host, $port, $connect_to);
205             }
206             else {
207 1         3 $connect_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr);
208             }
209 1         4 return;
210             }
211              
212             my $get_family = sub {
213             my ($self, $fh) = @_;
214             return $self->{family} if !Scalar::Util::openhandle($fh) || !getsockname $fh;
215             my $family = Socket::sockaddr_family(getsockname $fh);
216             return +($family == Socket::AF_INET) ? 4 : ($family == Socket::AF_INET6) ? 6 : $self->{family};
217             };
218              
219             sub _on_addr {
220 3     3   5 my ($self, $fh, $host, $port, $on_success) = @_;
221              
222             AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', $get_family->($self, $fh), Socket::SOCK_DGRAM, sub {
223 3     3   644 my @targets = @_;
224 3         3 while (1) {
225 3 50       10 my $target = shift @targets or $self->_error(1, "Could not resolve $host:$port");
226 3         5 $on_success->(@{$target});
  3         7  
227 3         9 last;
228             }
229 3         6 });
230 3         75 return;
231             }
232              
233             sub _error {
234 0     0   0 my ($self, $fatal, $message) = @_;
235              
236 0 0       0 if (exists $self->{on_error}) {
237 0         0 $self->{on_error}->($self, $fatal, $message);
238 0 0       0 $self->destroy if $fatal;
239             } else {
240 0         0 $self->destroy;
241 0         0 Carp::croak("AnyEvent::Handle::UDP uncaught error: $message");
242             }
243 0         0 return;
244             }
245              
246             sub push_send {
247 2     2 1 388 my ($self, $message, $to, $cv) = @_;
248 2 0       4 $to = AnyEvent::Socket::pack_sockaddr($to->[1], defined $to->[0] ? AnyEvent::Socket::parse_address($to->[0]) : Socket::INADDR_ANY) if ref $to;
    50          
249 2 50 0     5 $cv ||= AnyEvent::CondVar->new if defined wantarray;
250 2 50 33     7 if ($self->{autoflush} and ! @{ $self->{buffers} }) {
  0         0  
251 0         0 my $ret = $self->_send($message, $to, $cv);
252 0 0 0     0 $self->_push_writer($message, $to, $cv) if not defined $ret and $non_fatal{$! + 0};
253 0 0       0 $self->_drained if $ret;
254             }
255             else {
256 2         4 $self->_push_writer($message, $to, $cv);
257             }
258 2         7 return $cv;
259             }
260              
261             sub _send {
262 2     2   3 my ($self, $message, $to, $cv) = @_;
263 2 100       43 my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0;
264 2 0 33     6 $self->_error(1, "Could not send: $!") if not defined $ret and !$non_fatal{$! + 0};
265 2 50       4 if (defined $ret) {
266 2         5 $self->timeout_reset;
267 2         4 $self->wtimeout_reset;
268 2 50       5 $cv->($ret) if defined $cv;
269             }
270 2         2 return $ret;
271             }
272              
273             sub _push_writer {
274 2     2   3 my ($self, $message, $to, $condvar) = @_;
275 2         2 push @{ $self->{buffers} }, [ $message, $to, $condvar ];
  2         5  
276             $self->{writer} ||= AE::io $self->{fh}, 1, sub {
277 2 50   2   20 if (@{ $self->{buffers} }) {
  2         6  
278 2         2 while (my $entry = shift @{ $self->{buffers} }) {
  4         11  
279 2         1 my ($msg, $to, $cv) = @{$entry};
  2         4  
280 2         3 my $ret = $self->_send($msg, $to, $cv);
281 2 50       5 if (not defined $ret) {
282 0 0       0 unshift @{ $self->{buffers} }, $entry if $self->{buffers};
  0         0  
283 0         0 last;
284             }
285             }
286             }
287             else {
288 0         0 $self->_drained;
289             }
290 2 50       3 delete $self->{writer} if not @{ $self->{buffers} };
  2         26  
291 2   33     22 };
292 2         4 return $condvar;
293             }
294              
295             sub destroy {
296 0     0 1   my $self = shift;
297 0           delete @{$self}{qw/reader writer/};
  0            
298 0           close $self->{fh};
299 0           $self->timeout(0);
300 0           $self->rtimeout(0);
301 0           $self->wtimeout(0);
302 0           return;
303             }
304              
305             1;
306              
307             # ABSTRACT: client/server UDP handles for AnyEvent
308              
309             __END__