File Coverage

blib/lib/IO/Pipely.pm
Criterion Covered Total %
statement 73 179 40.7
branch 25 118 21.1
condition 2 19 10.5
subroutine 12 13 92.3
pod 2 2 100.0
total 114 331 34.4


line stmt bran cond sub pod time code
1             package IO::Pipely;
2             $IO::Pipely::VERSION = '0.006';
3 2     2   209468 use warnings;
  2         22  
  2         80  
4 2     2   11 use strict;
  2         3  
  2         52  
5              
6 2     2   956 use Symbol qw(gensym);
  2         1709  
  2         141  
7 2         25 use IO::Socket qw(
8             AF_UNIX
9             PF_INET
10             PF_UNSPEC
11             SOCK_STREAM
12             SOL_SOCKET
13             SOMAXCONN
14             SO_ERROR
15             SO_REUSEADDR
16             inet_aton
17             pack_sockaddr_in
18             unpack_sockaddr_in
19 2     2   971 );
  2         41319  
20 2     2   550 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  2         5  
  2         108  
21 2     2   12 use Errno qw(EINPROGRESS EWOULDBLOCK);
  2         4  
  2         267  
22              
23             our @EXPORT_OK = qw(pipely socketpairly);
24 2     2   16 use base qw(Exporter);
  2         4  
  2         766  
25              
26             # The order of pipe primitives depends on the platform.
27              
28             # It's not always safe to assume that a function can be used if it's
29             # present.
30              
31             my (@oneway_pipe_types, @twoway_pipe_types);
32             if ($^O eq "MSWin32" or $^O eq "MacOS") {
33             @oneway_pipe_types = qw(inet socketpair pipe);
34             @twoway_pipe_types = qw(inet socketpair pipe);
35             }
36             elsif ($^O eq "cygwin") {
37             @oneway_pipe_types = qw(pipe inet socketpair);
38             @twoway_pipe_types = qw(inet pipe socketpair);
39             }
40             else {
41             @oneway_pipe_types = qw(pipe socketpair inet);
42             @twoway_pipe_types = qw(socketpair inet pipe);
43             }
44              
45             # Provide dummy constants so things at least compile. These constants
46             # aren't used if we're RUNNING_IN_HELL, but Perl needs to see them.
47              
48             BEGIN {
49             # older perls than 5.10 needs a kick in the arse to AUTOLOAD the constant...
50 2 50   2   17 eval "F_GETFL" if $] < 5.010;
51              
52 2 50       3426 if ( ! defined &Fcntl::F_GETFL ) {
53 0 0       0 if ( ! defined prototype "F_GETFL" ) {
54 0         0 *F_GETFL = sub { 0 };
  0         0  
55 0         0 *F_SETFL = sub { 0 };
  0         0  
56             } else {
57 0         0 *F_GETFL = sub () { 0 };
58 0         0 *F_SETFL = sub () { 0 };
59             }
60             }
61             }
62              
63             # Make a socket. This is a homebrew socketpair() for systems that
64             # don't support it. The things I must do to make Windows happy.
65              
66             sub _make_socket {
67              
68             ### Server side.
69              
70 0     0   0 my $acceptor = gensym();
71 0         0 my $accepted = gensym();
72              
73 0 0       0 my $tcp = getprotobyname('tcp') or die "getprotobyname: $!";
74 0 0       0 socket( $acceptor, PF_INET, SOCK_STREAM, $tcp ) or die "socket: $!";
75              
76 0 0       0 setsockopt( $acceptor, SOL_SOCKET, SO_REUSEADDR, 1) or die "reuse: $!";
77              
78 0 0       0 my $server_addr = inet_aton('127.0.0.1') or die "inet_aton: $!";
79 0 0       0 $server_addr = pack_sockaddr_in(0, $server_addr)
80             or die "sockaddr_in: $!";
81              
82 0 0       0 bind( $acceptor, $server_addr ) or die "bind: $!";
83              
84 0         0 $acceptor->blocking(0);
85              
86 0         0 $server_addr = getsockname($acceptor);
87              
88 0 0       0 listen( $acceptor, SOMAXCONN ) or die "listen: $!";
89              
90             ### Client side.
91              
92 0         0 my $connector = gensym();
93              
94 0 0       0 socket( $connector, PF_INET, SOCK_STREAM, $tcp ) or die "socket: $!";
95              
96 0         0 $connector->blocking(0);
97              
98 0 0       0 unless (connect( $connector, $server_addr )) {
99 0 0 0     0 die "connect: $!" if $! and ($! != EINPROGRESS) and ($! != EWOULDBLOCK);
      0        
100             }
101              
102 0         0 my $connector_address = getsockname($connector);
103 0         0 my ($connector_port, $connector_addr) =
104             unpack_sockaddr_in($connector_address);
105              
106             ### Loop around 'til it's all done. I thought I was done writing
107             ### select loops. Damnit.
108              
109 0         0 my $in_read = '';
110 0         0 my $in_write = '';
111              
112 0         0 vec( $in_read, fileno($acceptor), 1 ) = 1;
113 0         0 vec( $in_write, fileno($connector), 1 ) = 1;
114              
115 0         0 my $done = 0;
116 0         0 while ($done != 0x11) {
117 0         0 my $hits = select( my $out_read = $in_read,
118             my $out_write = $in_write,
119             undef,
120             5
121             );
122 0 0       0 unless ($hits) {
123 0 0 0     0 next if ($! and ($! == EINPROGRESS) or ($! == EWOULDBLOCK));
      0        
124 0 0       0 die "select: $!" unless $hits;
125             }
126              
127             # Accept happened.
128 0 0       0 if (vec($out_read, fileno($acceptor), 1)) {
129 0         0 my $peer = accept($accepted, $acceptor);
130 0         0 my ($peer_port, $peer_addr) = unpack_sockaddr_in($peer);
131              
132 0 0 0     0 if ( $peer_port == $connector_port and
133             $peer_addr eq $connector_addr
134             ) {
135 0         0 vec($in_read, fileno($acceptor), 1) = 0;
136 0         0 $done |= 0x10;
137             }
138             }
139              
140             # Connect happened.
141 0 0       0 if (vec($out_write, fileno($connector), 1)) {
142 0         0 $! = unpack('i', getsockopt($connector, SOL_SOCKET, SO_ERROR));
143 0 0       0 die "connect: $!" if $!;
144              
145 0         0 vec($in_write, fileno($connector), 1) = 0;
146 0         0 $done |= 0x01;
147             }
148             }
149              
150             # Turn blocking back on, damnit.
151 0         0 $accepted->blocking(1);
152 0         0 $connector->blocking(1);
153              
154 0         0 return ($accepted, $connector);
155             }
156              
157             sub pipely {
158 2     2 1 782 my %arg = @_;
159              
160 2         6 my $conduit_type = delete($arg{type});
161 2   50     13 my $debug = delete($arg{debug}) || 0;
162              
163             # Generate symbols to be used as filehandles for the pipe's ends.
164             #
165             # Filehandle autovivification isn't used for portability with older
166             # versions of Perl.
167              
168 2         11 my ($a_read, $b_write) = (gensym(), gensym());
169              
170             # Try the specified conduit type only. No fallback.
171              
172 2 50       49 if (defined $conduit_type) {
173 2 50       6 return ($a_read, $b_write) if _try_oneway_type(
174             $conduit_type, $debug, \$a_read, \$b_write
175             );
176             }
177              
178             # Otherwise try all available conduit types until one works.
179             # Conduit types that fail are discarded for speed.
180              
181 0         0 while (my $try_type = $oneway_pipe_types[0]) {
182 0 0       0 return ($a_read, $b_write) if _try_oneway_type(
183             $try_type, $debug, \$a_read, \$b_write
184             );
185 0         0 shift @oneway_pipe_types;
186             }
187              
188             # There's no conduit type left. Bummer!
189              
190 0 0       0 $debug and warn "nothing worked";
191 0         0 return;
192             }
193              
194             sub socketpairly {
195 2     2 1 1144 my %arg = @_;
196              
197 2         6 my $conduit_type = delete($arg{type});
198 2   50     13 my $debug = delete($arg{debug}) || 0;
199              
200             # Generate symbols to be used as filehandles for the pipe's ends.
201             #
202             # Filehandle autovivification isn't used for portability with older
203             # versions of Perl.
204              
205 2         12 my ($a_read, $a_write) = (gensym(), gensym());
206 2         47 my ($b_read, $b_write) = (gensym(), gensym());
207              
208 2 50       35 if (defined $conduit_type) {
209 2 50       8 return ($a_read, $a_write, $b_read, $b_write) if _try_twoway_type(
210             $conduit_type, $debug,
211             \$a_read, \$a_write,
212             \$b_read, \$b_write
213             );
214             }
215              
216 0         0 while (my $try_type = $twoway_pipe_types[0]) {
217 0 0       0 return ($a_read, $a_write, $b_read, $b_write) if _try_twoway_type(
218             $try_type, $debug,
219             \$a_read, \$a_write,
220             \$b_read, \$b_write
221             );
222 0         0 shift @oneway_pipe_types;
223             }
224              
225             # There's no conduit type left. Bummer!
226              
227 0 0       0 $debug and warn "nothing worked";
228 0         0 return;
229             }
230              
231             # Try a pipe by type.
232              
233             sub _try_oneway_type {
234 2     2   5 my ($type, $debug, $a_read, $b_write) = @_;
235              
236             # Try a pipe().
237 2 100       20 if ($type eq "pipe") {
238 1         2 eval {
239 1 50       41 pipe($$a_read, $$b_write) or die "pipe failed: $!";
240             };
241              
242             # Pipe failed.
243 1 50       5 if (length $@) {
244 0 0       0 warn "pipe failed: $@" if $debug;
245 0         0 return;
246             }
247              
248 1 50       4 $debug and do {
249 0         0 warn "using a pipe";
250 0         0 warn "ar($$a_read) bw($$b_write)\n";
251             };
252              
253             # Turn off buffering. POE::Kernel does this for us, but
254             # someone might want to use the pipe class elsewhere.
255 1         9 select((select($$b_write), $| = 1)[0]);
256 1         8 return 1;
257             }
258              
259             # Try a UNIX-domain socketpair.
260 1 50       4 if ($type eq "socketpair") {
261 1         3 eval {
262 1 50       87 socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
263             or die "socketpair failed: $!";
264             };
265              
266 1 50       5 if (length $@) {
267 0 0       0 warn "socketpair failed: $@" if $debug;
268 0         0 return;
269             }
270              
271 1 50       4 $debug and do {
272 0         0 warn "using a UNIX domain socketpair";
273 0         0 warn "ar($$a_read) bw($$b_write)\n";
274             };
275              
276             # It's one-way, so shut down the unused directions.
277 1         12 shutdown($$a_read, 1);
278 1         7 shutdown($$b_write, 0);
279              
280             # Turn off buffering. POE::Kernel does this for us, but someone
281             # might want to use the pipe class elsewhere.
282 1         9 select((select($$b_write), $| = 1)[0]);
283 1         8 return 1;
284             }
285              
286             # Try a pair of plain INET sockets.
287 0 0       0 if ($type eq "inet") {
288 0         0 eval {
289 0         0 ($$a_read, $$b_write) = _make_socket();
290             };
291              
292 0 0       0 if (length $@) {
293 0 0       0 warn "make_socket failed: $@" if $debug;
294 0         0 return;
295             }
296              
297 0 0       0 $debug and do {
298 0         0 warn "using a plain INET socket";
299 0         0 warn "ar($$a_read) bw($$b_write)\n";
300             };
301              
302             # It's one-way, so shut down the unused directions.
303 0         0 shutdown($$a_read, 1);
304 0         0 shutdown($$b_write, 0);
305              
306             # Turn off buffering. POE::Kernel does this for us, but someone
307             # might want to use the pipe class elsewhere.
308 0         0 select((select($$b_write), $| = 1)[0]);
309 0         0 return 1;
310             }
311              
312             # There's nothing left to try.
313 0 0       0 $debug and warn "unknown pipely() socket type ``$type''";
314 0         0 return;
315             }
316              
317             # Try a pipe by type.
318              
319             sub _try_twoway_type {
320 2     2   18 my ($type, $debug, $a_read, $a_write, $b_read, $b_write) = @_;
321              
322             # Try a socketpair().
323 2 100       8 if ($type eq "socketpair") {
324 1         2 eval {
325 1 50       63 socketpair($$a_read, $$b_read, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
326             or die "socketpair 1 failed: $!";
327             };
328              
329             # Socketpair failed.
330 1 50       5 if (length $@) {
331 0 0       0 warn "socketpair failed: $@" if $debug;
332 0         0 return;
333             }
334              
335 1 50       3 $debug and do {
336 0         0 warn "using UNIX domain socketpairs";
337 0         0 warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
338             };
339              
340             # It's two-way, so each reader is also a writer.
341 1         4 $$a_write = $$a_read;
342 1         2 $$b_write = $$b_read;
343              
344             # Turn off buffering. POE::Kernel does this for us, but someone
345             # might want to use the pipe class elsewhere.
346 1         7 select((select($$a_write), $| = 1)[0]);
347 1         4 select((select($$b_write), $| = 1)[0]);
348 1         8 return 1;
349             }
350              
351             # Try a couple pipe() calls.
352 1 50       5 if ($type eq "pipe") {
353 1         2 eval {
354 1 50       40 pipe($$a_read, $$b_write) or die "pipe 1 failed: $!";
355 1 50       27 pipe($$b_read, $$a_write) or die "pipe 2 failed: $!";
356             };
357              
358             # Pipe failed.
359 1 50       5 if (length $@) {
360 0 0       0 warn "pipe failed: $@" if $debug;
361 0         0 return;
362             }
363              
364 1 50       3 $debug and do {
365 0         0 warn "using a pipe";
366 0         0 warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
367             };
368              
369             # Turn off buffering. POE::Kernel does this for us, but someone
370             # might want to use the pipe class elsewhere.
371 1         10 select((select($$a_write), $| = 1)[0]);
372 1         5 select((select($$b_write), $| = 1)[0]);
373 1         8 return 1;
374             }
375              
376             # Try a pair of plain INET sockets.
377 0 0         if ($type eq "inet") {
378 0           eval {
379 0           ($$a_read, $$b_read) = _make_socket();
380             };
381              
382             # Sockets failed.
383 0 0         if (length $@) {
384 0 0         warn "make_socket failed: $@" if $debug;
385 0           return;
386             }
387              
388 0 0         $debug and do {
389 0           warn "using a plain INET socket";
390 0           warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
391             };
392              
393 0           $$a_write = $$a_read;
394 0           $$b_write = $$b_read;
395              
396             # Turn off buffering. POE::Kernel does this for us, but someone
397             # might want to use the pipe class elsewhere.
398 0           select((select($$a_write), $| = 1)[0]);
399 0           select((select($$b_write), $| = 1)[0]);
400 0           return 1;
401             }
402              
403 0 0         $debug and warn "unknown pipely(2) socket type ``$type''";
404 0           return;
405             }
406              
407             1;
408              
409             __END__