File Coverage

blib/lib/AnyEvent/FDpasser.pm
Criterion Covered Total %
statement 153 200 76.5
branch 64 128 50.0
condition 7 25 28.0
subroutine 24 27 88.8
pod 9 14 64.2
total 257 394 65.2


line stmt bran cond sub pod time code
1             package AnyEvent::FDpasser;
2              
3 17     17   659909 use common::sense;
  17         42  
  17         124  
4              
5             our $VERSION = '0.3.0';
6              
7             require XSLoader;
8             XSLoader::load('AnyEvent::FDpasser', $VERSION);
9              
10 17     17   1687 use Carp;
  17         38  
  17         1334  
11 17     17   100 use Errno;
  17         36  
  17         713  
12 17     17   97 use POSIX; ## Uses POSIX::pipe/dup so we get accurate $!
  17         26  
  17         131  
13 17     17   99150 use Socket qw/AF_UNIX SOCK_STREAM SOL_SOCKET AF_UNSPEC SO_REUSEADDR/;
  17         93360  
  17         5720  
14              
15 17     17   167 use AnyEvent;
  17         36  
  17         567  
16 17     17   18753 use AnyEvent::Util;
  17         116068  
  17         78429  
17              
18              
19             sub new {
20 18     18 1 15642 my ($class, %arg) = @_;
21 18         169 my $self = bless {}, $class;
22              
23 18         312 $self->{on_error} = $arg{on_error};
24              
25 18         124 $self->{obuf} = [];
26 18         106 $self->{ibuf} = [];
27              
28 18 100       157 if (ref $arg{fh} eq 'ARRAY') {
29 2 50       4 die "too many elements in fh array" if scalar @{$arg{fh}} > 2;
  2         10  
30 2         12 $self->{fh} = $arg{fh}->[0];
31 2         6 $self->{fh_pair} = $arg{fh}->[1];
32             } else {
33 16 100       125 if (!defined $arg{fh}) {
34 10         50 ($self->{fh}, $self->{fh_pair}) = fdpasser_socketpair();
35             } else {
36 6         34 $self->{fh} = $arg{fh};
37             }
38             }
39              
40 18 50       120 unless ($arg{dont_set_nonblocking}) {
41 18         224 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
42 18 100       307 AnyEvent::Util::fh_nonblocking $self->{fh_pair}, 1
43             if exists $self->{fh_pair};
44             }
45              
46 18         245 $self->setup_fh_duped;
47              
48 18         70 return $self;
49             }
50              
51              
52              
53              
54             sub i_am_parent {
55 6     6 1 8809 my ($self) = @_;
56              
57 6 50       562 die "i_am_parent only applicable when socketpair used" if !defined $self->{fh_pair};
58 6 50       181 die "passer object is in error_state: $self->{error_state}" if exists $self->{error_state};
59              
60 6         231 close($self->{fh_pair});
61 6         365 delete $self->{fh_pair};
62             }
63              
64             sub i_am_child {
65 6     6 1 10455 my ($self) = @_;
66              
67 6 50       514 die "i_am_child only applicable when socketpair used" if !defined $self->{fh_pair};
68 6 50       235 die "passer object is in error_state: $self->{error_state}" if exists $self->{error_state};
69              
70 6         750 close($self->{fh});
71 6         165 $self->{fh} = $self->{fh_pair};
72 6         607 delete $self->{fh_pair};
73             }
74              
75              
76              
77              
78             sub push_send_fh {
79 17     17 1 9449 my ($self, $fh_to_send, $cb) = @_;
80              
81 17 50       560 die "passer object is in error_state: $self->{error_state}" if exists $self->{error_state};
82 17 50       416 die "must call i_am_parent or i_am_child" if exists $self->{fh_pair};
83              
84 17   100 11   374 $cb ||= sub {};
  11         31  
85              
86 17         47 push @{$self->{obuf}}, [$fh_to_send, $cb];
  17         217  
87              
88 17         162 $self->try_to_send;
89             }
90              
91              
92             sub push_recv_fh {
93 15     15 1 23223 my ($self, $cb) = @_;
94              
95 15 50       325 die "passer object is in error_state: $self->{error_state}" if exists $self->{error_state};
96 15 50       498 die "must call i_am_parent or i_am_child" if exists $self->{fh_pair};
97              
98 15         62 push @{$self->{ibuf}}, $cb;
  15         109  
99              
100 15         172 $self->try_to_recv;
101             }
102              
103              
104              
105              
106             sub try_to_send {
107 34     34 0 408 my ($self) = @_;
108              
109 34 50       139 return unless $self->{fh};
110 34 100       220 return unless @{$self->{obuf}};
  34         549  
111 21 100       161 return if defined $self->{owatcher};
112 17 50       355 return if defined $self->{full_descriptor_table_state};
113              
114             $self->{owatcher} = AE::io $self->{fh}, 1, sub {
115              
116 17     17   4698 my $fh_to_send = shift @{$self->{obuf}};
  17         56  
117              
118 17         7421 my $rv = send_fd(fileno($self->{fh}), fileno($fh_to_send->[0]));
119              
120 17 50       205 if ($rv < 0) {
    50          
121 0 0 0     0 if ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{EINTR}) {
      0        
122             ## Spurious ready notification or signal: put fh back on queue
123 0         0 unshift @{$self->{obuf}}, $fh_to_send;
  0         0  
124             } else {
125             ## Unknown error
126 0         0 $self->error($!);
127             }
128             } elsif ($rv == 0) {
129 0         0 $self->error('sendmsg wrote 0 bytes');
130             } else {
131 17         147 $fh_to_send->[1]->();
132             ## Don't do a close($fh_to_send->[0]) because the program may wish to keep it alive
133 17         366 undef $fh_to_send;
134 17         369 $self->{owatcher} = undef;
135 17         289 $self->try_to_send;
136             }
137              
138 17         1716 };
139             }
140              
141              
142             sub try_to_recv {
143 29     29 0 123 my ($self) = @_;
144              
145 29 100       94 return unless @{$self->{ibuf}};
  29         899  
146 18 100       145 return if defined $self->{iwatcher};
147 15 50       93 return if defined $self->{full_descriptor_table_state};
148              
149             $self->{iwatcher} = AE::io $self->{fh}, 0, sub {
150              
151 15     15   34436 my $cb = shift @{$self->{ibuf}};
  15         68  
152              
153 15         243 POSIX::close($self->{fh_duped});
154 15         46 delete $self->{fh_duped};
155              
156             ## Race condition: If another thread or a signal handler creates a new descriptor at this
157             ## exact point in time, it could cause the descriptor table to fill up and the following
158             ## to error.
159              
160 15         479 my $rv = recv_fd(fileno($self->{fh}));
161              
162 15 100       208 if ($rv == -1) {
    50          
    50          
163 1 50 33     51 if ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{EINTR}) {
    50 33        
      33        
      33        
164             ## Spurious ready notification or signal: put the cb back on the queue
165 0         0 unshift @{$self->{ibuf}}, $cb;
  0         0  
166             } elsif ($!{EMSGSIZE} || $!{EMFILE} || $!{ENFILE}) {
167             ## File descriptor table is full. This should be very unlikely given the close+duping
168             ## technique used to detect this. In this case the descriptor stream may be
169             ## desynchronised and we must shutdown the passer.
170              
171 0         0 my $err = $!;
172              
173 0         0 carp "AnyEvent::FDpasser - file descriptor table full, closing passer: $!";
174              
175 0         0 $self->error($err);
176             } else {
177             ## Unknown error
178 1         196 $self->error($!);
179             }
180             } elsif ($rv == -2) {
181 0         0 $self->error("cmsg truncated");
182             } elsif ($rv == 0) {
183             ## Orderly shutdown
184 0         0 $self->error(undef);
185             } else {
186 14         736 open(my $new_fh, '+<&=', $rv);
187 14         54 $self->{iwatcher} = undef;
188 14         100 $cb->($new_fh);
189 14         8808 $self->try_to_recv;
190             }
191 15         1571 };
192              
193 15         8894 $self->setup_fh_duped;
194             }
195              
196              
197              
198              
199              
200             sub _convert_fh_to_fd {
201 0     0   0 my $fh = shift;
202 0 0       0 $fh = fileno($fh) unless $fh =~ /^\d+$/;
203 0         0 return $fh;
204             }
205              
206             sub fdpasser_socketpair {
207 15     15 1 689 my ($s1, $s2);
208              
209 15 50       191 if ($^O eq 'MSWin32') {
    50          
210 0         0 die "AnyEvent::FDpasser does not support windows";
211             } elsif (fdpasser_mode() == 2) {
212 0         0 pipe $s1, $s2;
213 0 0       0 die "can't pipe: $!" unless $s1;
214             } else {
215 15         942 socketpair $s1, $s2, AF_UNIX, SOCK_STREAM, AF_UNSPEC;
216 15 50       75 die "can't make socketpair: $!" unless $s2;
217             }
218              
219 15         109 return ($s1, $s2);
220             }
221              
222             sub fdpasser_server {
223 1     1 1 1564 my ($path, $backlog) = @_;
224              
225 1   50     173 $backlog ||= 10;
226              
227 1         20 my $fh;
228              
229 1 50       84 if ($^O eq 'MSWin32') {
    50          
230 0         0 die "AnyEvent::FDpasser does not support windows";
231             } elsif (fdpasser_mode() == 2) {
232 0         0 my $fd = _fdpasser_server($path);
233 0 0       0 die "unable to _fdpasser_server($path) : $!" if $fd < 0;
234 0 0       0 open($fh, '+<&=', $fd) || die "couldn't open";
235             } else {
236 1 50       137 socket($fh, AF_UNIX, SOCK_STREAM, AF_UNSPEC) || die "Unable to create AF_UNIX socket: $!";
237 1 50       83 setsockopt($fh, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) || die "Unable to setsockopt(SO_REUSEADDR): $!";
238 1         75 unlink($path);
239 1 50       48 bind($fh, Socket::sockaddr_un($path)) || die "Unable to bind AF_UNIX socket to $path : $!";
240 1 50       119 listen($fh, $backlog) || die "Unable to listen on $path : $!";
241             }
242              
243 1         25 return $fh;
244             }
245              
246             sub fdpasser_accept {
247 1     1 1 263483 my ($listener_fh) = @_;
248              
249 1         2 my $passer_fh;
250              
251 1 50       16 if ($^O eq 'MSWin32') {
    50          
252 0         0 die "AnyEvent::FDpasser does not support windows";
253             } elsif (fdpasser_mode() == 2) {
254 0         0 my $fd = _fdpasser_accept(fileno($listener_fh));
255 0 0       0 die "unable to _fdpasser_accept($listener_fh) : $!" if $fd < 0;
256 0 0       0 open($passer_fh, '+<&=', $fd) || die "couldn't open";
257             } else {
258 1         61 accept($passer_fh, $listener_fh);
259             }
260              
261 1         7 return $passer_fh;
262             }
263              
264             sub fdpasser_connect {
265 1     1 1 264905 my ($path) = @_;
266              
267 1         7 my $fh;
268              
269 1 50       96 if ($^O eq 'MSWin32') {
    50          
270 0         0 die "AnyEvent::FDpasser does not support windows";
271             } elsif (fdpasser_mode() == 2) {
272 0         0 my $fd = _fdpasser_connect($path);
273 0 0       0 die "unable to _fdpasser_connect($path) : $!" if $fd < 0;
274 0 0       0 open($fh, '+<&=', $fd) || die "couldn't open";
275             } else {
276 1 50       126 socket($fh, AF_UNIX, SOCK_STREAM, AF_UNSPEC) || die "Unable to create AF_UNIX socket: $!";
277 1 50       28 connect($fh, Socket::sockaddr_un($path)) || die "Unable to connect AF_UNIX socket to $path : $!";
278             }
279              
280 1         223 return $fh;
281             }
282              
283              
284             sub error {
285 1     1 0 4 my ($self, $err) = @_;
286              
287 1         2 my $on_error = $self->{on_error};
288 1         25 close($self->{fh});
289 1 50       5 close($self->{fh_pair}) if exists $self->{fh_pair};
290              
291 1 50       6 if (exists $self->{fh_duped}) {
292 0         0 POSIX::close($self->{fh_duped});
293 0         0 delete $self->{fh_duped};
294             }
295 1 50       5 if (exists $self->{fh_duped_orig}) {
296 1         27 POSIX::close($self->{fh_duped_orig});
297 1         7 delete $self->{fh_duped_orig};
298             }
299              
300 1         8 delete $self->{$_} foreach (qw/owatcher iwatcher obuf ibuf fh fh_pair fh_duped fh_duped_orig on_error/);
301              
302 1         25 $self->{error_state} = $err;
303              
304             {
305 1         2 local $@ = $err;
  1         11  
306 1 50       6 $on_error->() if $on_error;
307             }
308             }
309              
310              
311             sub DESTROY {
312 18     18   4435 my ($self) = @_;
313              
314 18 100       365 if (exists $self->{fh_duped}) {
315 6         119 POSIX::close($self->{fh_duped});
316 6         29 delete $self->{fh_duped};
317             }
318 18 100       369 if (exists $self->{fh_duped_orig}) {
319 17         206 POSIX::close($self->{fh_duped_orig});
320 17         3617 delete $self->{fh_duped_orig};
321             }
322             }
323              
324              
325             sub setup_fh_duped {
326 33     33 0 110 my ($self) = @_;
327              
328 33 100       481 return if exists $self->{fh_duped};
329              
330 21 100       113 if (!exists $self->{fh_duped_orig}) {
331 18         376 my ($r, $w) = POSIX::pipe();
332 18 50       82 die "can't call pipe: $!" unless defined $r;
333 18         1619 POSIX::close($w);
334 18         103 $self->{fh_duped_orig} = $r;
335             }
336              
337 21         207 $self->{fh_duped} = POSIX::dup($self->{fh_duped_orig});
338              
339 21 50       271 if (!defined $self->{fh_duped}) {
340 0           delete $self->{fh_duped};
341 0 0 0       if ($!{EMFILE} || $!{ENFILE}) {
342             ## Descriptor table full: have to make sure not to call recvmsg now
343 0           $self->enter_full_descriptor_table_state;
344             } else {
345 0           die "unable to dup descriptor for reason other than full descriptor table: $!";
346             }
347             }
348             }
349              
350             sub enter_full_descriptor_table_state {
351 0     0 0   my ($self) = @_;
352              
353 0 0         return if $self->{full_descriptor_table_state};
354              
355 0           $self->{full_descriptor_table_state} = 1;
356              
357 0           undef $self->{iwatcher};
358              
359 0           my $watcher; $watcher = AE::timer 0.05, 0.5, sub {
360 0     0     $self->setup_fh_duped;
361 0 0         if (exists $self->{fh_duped}) {
362 0           undef $watcher;
363 0           delete $self->{full_descriptor_table_state};
364 0           $self->try_to_recv;
365             }
366 0           };
367             }
368              
369              
370             1;
371              
372             __END__