File Coverage

blib/lib/AnyEvent/Handle/UDP.pm
Criterion Covered Total %
statement 127 172 73.8
branch 29 70 41.4
condition 8 26 30.7
subroutine 39 48 81.2
pod 22 25 88.0
total 225 341 65.9


line stmt bran cond sub pod time code
1             package AnyEvent::Handle::UDP;
2             $AnyEvent::Handle::UDP::VERSION = '0.049';
3 2     2   151522 use strict;
  2         13  
  2         58  
4 2     2   10 use warnings;
  2         3  
  2         46  
5              
6 2     2   2002 use AnyEvent ();
  2         10996  
  2         51  
7 2     2   1130 use AnyEvent::Util ();
  2         23301  
  2         53  
8 2     2   1152 use AnyEvent::Socket ();
  2         30769  
  2         64  
9              
10 2     2   18 use Carp ();
  2         4  
  2         30  
11 2     2   10 use Errno ();
  2         4  
  2         25  
12 2     2   9 use Scalar::Util ();
  2         4  
  2         26  
13 2     2   8 use Socket ();
  2         4  
  2         24  
14 2     2   1044 use Symbol ();
  2         1548  
  2         599  
15              
16             my $subname = eval { require Sub::Name } ? \&Sub::Name::subname : sub { $_[1] };
17             my %non_fatal = map { ( $_ => 1 ) } Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR;
18              
19             sub new {
20 4     4 1 10952 my ($class, %args) = @_;
21             my $self = bless {
22             on_recv => $args{on_recv} || Carp::croak('on_recv not given'),
23             reuse_addr => exists $args{reuse_addr} ? !!$args{reuse_addr} : 1,
24             receive_size => $args{receive_size} || 1500,
25             family => $args{family} || 0,
26 4 50 33     138 autoflush => $args{autoflush} || 0,
      50        
      50        
      50        
27             buffers => [],
28             }, $class;
29 4         12 $self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
  20         53  
30 4         36 $self->{$_} = AE::now() for qw/activity ractivity wactivity/;
31              
32 4         20 $self->{fh} = bless Symbol::gensym(), 'IO::Socket';
33 4 100       86 $self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
34 4 100       19 $self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};
35              
36 4         13 $self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;
  12         35  
37              
38 4         15 $self->_drained;
39 4         14 return $self;
40             }
41              
42             sub _insert {
43 36     36   63 my ($name, $sub) = @_;
44 2     2   24 no strict 'refs';
  2         16  
  2         4585  
45 36         148 *{$name} = $subname->($name, $sub);
  36         127  
46             }
47              
48             for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
49             _insert($name, sub {
50 1     1 1 45 my $self = shift;
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
        1 1    
51 1 50       9 $self->{$name} = shift if @_;
52 1         3 return $self->{$name};
53             });
54             }
55              
56             for my $name (qw/sockname peername/) {
57             _insert($name, sub {
58 2     2 1 10 my $self = shift;
        2 1    
59 2         10 return $self->{fh}->$name;
60             });
61             }
62              
63             sub fh {
64 0     0 1 0 my $self = shift;
65 0         0 return $self->{fh};
66             }
67              
68             sub on_drain {
69 0     0 1 0 my $self = shift;
70 0 0       0 if (@_) {
71 0         0 $self->{on_drain} = shift;
72 0 0       0 $self->_drained if not @{ $self->{buffers} };
  0         0  
73             }
74 0         0 return $self->{on_drain};
75             }
76              
77             sub _drained {
78 4     4   6 my $self = shift;
79 4 50       14 $self->{on_drain}->($self) if defined $self->{on_drain};
80             }
81              
82             for my $dir ('', 'r', 'w') {
83             my $timeout = "${dir}timeout";
84             my $activity = "${dir}activity";
85             my $on_timeout = "on_$timeout";
86             my $timer = "${dir}timer";
87             my $clear_timeout = "clear_$timeout";
88             my $timeout_reset = "${timeout}_reset";
89              
90             my $callback;
91             $callback = sub {
92             my $self = shift;
93             if (not exists $self->{$timeout} or not $self->{fh}) {
94             delete $self->{$timer};
95             return;
96             }
97             my $now = AE::now;
98             my $after = $self->{$activity} + $self->{$timeout} - $now;
99             if ($after <= 0) {
100             $self->{$activity} = $now;
101             my $time = $self->{$on_timeout};
102             my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
103             $time ? $time->($self) : $self->_error->(0, $error);
104             return if not exists $self->{$timeout};
105             }
106             Scalar::Util::weaken($self);
107             return if not $self;
108             $self->{$timer} = AE::timer($after, 0, sub {
109             delete $self->{$timer};
110             $callback->($self);
111             });
112             };
113              
114             _insert($timeout, sub {
115 2     2 1 5 my $self = shift;
        2 1    
        2 1    
116 2 50       6 if (@_) {
117 2         4 my $value = shift;
118 2         6 $self->{$timeout} = $value;
119 2 50       5 if ($value == 0) {
120 0         0 delete $self->{$timer};
121 0         0 delete $self->{$timeout};
122 0         0 return;
123             }
124             else {
125 2         6 $callback->($self);
126             }
127             }
128 2         9 return $self->{$timeout};
129             });
130              
131             _insert($clear_timeout, sub {
132 0     0 0 0 my $self = shift;
        0 0    
        0 0    
133 0         0 delete $self->{$timeout};
134 0         0 return;
135             });
136              
137             _insert($timeout_reset, sub {
138 11     11 1 1641 my $self = shift;
        11 1    
        11 1    
139 11         32 $self->{$activity} = AE::now;
140             });
141             }
142              
143             sub bind_to {
144 0     0 1 0 my ($self, $addr) = @_;
145 0         0 return $self->_bind_to($self->{fh}, $addr);
146             }
147              
148             my $add_reader = sub {
149             my $self = shift;
150             $self->{reader} = AE::io($self->{fh}, 0, sub {
151             while (defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
152             $self->timeout_reset;
153             $self->rtimeout_reset;
154             $self->{on_recv}->($buffer, $self, $addr);
155             }
156             $self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
157             return;
158             });
159             };
160              
161             sub _bind_to {
162 3     3   10 my ($self, $fh, $addr) = @_;
163             my $bind_to = sub {
164 3     3   8 my ($domain, $type, $proto, $sockaddr) = @_;
165 3 50       13 if (!Scalar::Util::openhandle($fh)) {
166 3 50       142 socket $fh, $domain, $type, $proto or redo;
167 3         18 AnyEvent::Util::fh_nonblocking $fh, 1;
168 3 50 50     72 setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or $self->_error(1, "Couldn't set so_reuseaddr: $!") if $self->{reuse_addr};
169 3         13 $add_reader->($self);
170             }
171 3 50       54 bind $fh, $sockaddr or $self->_error(1, "Could not bind: $!");
172 3         17 };
173 3 50       11 if (ref $addr) {
174 3         5 my ($host, $port) = @{$addr};
  3         8  
175 3         58 _on_addr($self, $fh, $host, $port, $bind_to);
176             }
177             else {
178 0         0 $bind_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr);
179             }
180 3         16 return;
181             }
182              
183             sub connect_to {
184 0     0 1 0 my ($self, $addr) = @_;
185 0         0 return $self->_connect_to($self->{fh}, $addr);
186             }
187              
188             sub _connect_to {
189 1     1   4 my ($self, $fh, $addr) = @_;
190             my $connect_to = sub {
191 1     1   3 my ($domain, $type, $proto, $sockaddr) = @_;
192 1 50       3 if (!Scalar::Util::openhandle($fh)) {
193 1 50       46 socket $fh, $domain, $type, $proto or redo;
194 1         7 AnyEvent::Util::fh_nonblocking $fh, 1;
195 1         10 $add_reader->($self);
196             }
197 1 50       17 connect $fh, $sockaddr or $self->_error(1, "Could not connect: $!");
198 1         5 };
199 1 50       4 if (ref $addr) {
200 0         0 my ($host, $port) = @{$addr};
  0         0  
201 0         0 _on_addr($self, $fh, $host, $port, $connect_to);
202             }
203             else {
204 1         5 $connect_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr);
205             }
206 1         8 return;
207             }
208              
209             my $get_family = sub {
210             my ($self, $fh) = @_;
211             return $self->{family} if !Scalar::Util::openhandle($fh) || !getsockname $fh;
212             my $family = Socket::sockaddr_family(getsockname $fh);
213             return +($family == Socket::AF_INET) ? 4 : ($family == Socket::AF_INET6) ? 6 : $self->{family};
214             };
215              
216             sub _on_addr {
217 3     3   10 my ($self, $fh, $host, $port, $on_success) = @_;
218              
219             AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', $get_family->($self, $fh), Socket::SOCK_DGRAM, sub {
220 3     3   918 my @targets = @_;
221 3         6 while (1) {
222 3 50       11 my $target = shift @targets or $self->_error(1, "Could not resolve $host:$port");
223 3         6 $on_success->(@{$target});
  3         12  
224 3         11 last;
225             }
226 3         11 });
227 3         103 return;
228             }
229              
230             sub _error {
231 0     0   0 my ($self, $fatal, $message) = @_;
232              
233 0 0       0 if (exists $self->{on_error}) {
234 0         0 $self->{on_error}->($self, $fatal, $message);
235 0 0       0 $self->destroy if $fatal;
236             } else {
237 0         0 $self->destroy;
238 0         0 Carp::croak("AnyEvent::Handle::UDP uncaught error: $message");
239             }
240 0         0 return;
241             }
242              
243             sub push_send {
244 2     2 1 641 my ($self, $message, $to, $cv) = @_;
245 2 0       6 $to = AnyEvent::Socket::pack_sockaddr($to->[1], defined $to->[0] ? AnyEvent::Socket::parse_address($to->[0]) : Socket::INADDR_ANY) if ref $to;
    50          
246 2 50 0     6 $cv ||= AnyEvent::CondVar->new if defined wantarray;
247 2 50 33     7 if ($self->{autoflush} and ! @{ $self->{buffers} }) {
  0         0  
248 0         0 my $ret = $self->_send($message, $to, $cv);
249 0 0 0     0 $self->_push_writer($message, $to, $cv) if not defined $ret and $non_fatal{$! + 0};
250 0 0       0 $self->_drained if $ret;
251             }
252             else {
253 2         8 $self->_push_writer($message, $to, $cv);
254             }
255 2         16 return $cv;
256             }
257              
258             sub _send {
259 2     2   5 my ($self, $message, $to, $cv) = @_;
260 2 100       69 my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0;
261 2 0 33     10 $self->_error(1, "Could not send: $!") if not defined $ret and !$non_fatal{$! + 0};
262 2 50       8 if (defined $ret) {
263 2         8 $self->timeout_reset;
264 2         6 $self->wtimeout_reset;
265 2 50       7 $cv->($ret) if defined $cv;
266             }
267 2         5 return $ret;
268             }
269              
270             sub _push_writer {
271 2     2   5 my ($self, $message, $to, $condvar) = @_;
272 2         3 push @{ $self->{buffers} }, [ $message, $to, $condvar ];
  2         8  
273             $self->{writer} ||= AE::io $self->{fh}, 1, sub {
274 2 50   2   62 if (@{ $self->{buffers} }) {
  2         7  
275 2         4 while (my $entry = shift @{ $self->{buffers} }) {
  4         16  
276 2         3 my ($msg, $to, $cv) = @{$entry};
  2         5  
277 2         7 my $ret = $self->_send($msg, $to, $cv);
278 2 50       8 if (not defined $ret) {
279 0 0       0 unshift @{ $self->{buffers} }, $entry if $self->{buffers};
  0         0  
280 0         0 last;
281             }
282             }
283             }
284             else {
285 0         0 $self->_drained;
286             }
287 2 50       4 delete $self->{writer} if not @{ $self->{buffers} };
  2         30  
288 2   33     32 };
289 2         5 return $condvar;
290             }
291              
292             sub destroy {
293 0     0 1   my $self = shift;
294 0           delete @{$self}{qw/reader writer/};
  0            
295 0           close $self->{fh};
296 0           $self->timeout(0);
297 0           $self->rtimeout(0);
298 0           $self->wtimeout(0);
299 0           return;
300             }
301              
302             1;
303              
304             # ABSTRACT: client/server UDP handles for AnyEvent
305              
306             __END__