File Coverage

blib/lib/DBD/Gofer/Transport/pipeone.pm
Criterion Covered Total %
statement 98 115 85.2
branch 22 46 47.8
condition 4 15 26.6
subroutine 17 21 80.9
pod 0 7 0.0
total 141 204 69.1


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::pipeone;
2              
3             # $Id: pipeone.pm 10087 2007-10-16 12:42:37Z Tim $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 8     8   56 use strict;
  8         29  
  8         262  
11 8     8   40 use warnings;
  8         17  
  8         286  
12              
13 8     8   41 use Carp;
  8         14  
  8         585  
14 8     8   51 use Fcntl;
  8         18  
  8         1689  
15 8     8   3325 use IO::Select;
  8         11953  
  8         390  
16 8     8   3353 use IPC::Open3 qw(open3);
  8         19754  
  8         426  
17 8     8   64 use Symbol qw(gensym);
  8         20  
  8         293  
18              
19 8     8   46 use base qw(DBD::Gofer::Transport::Base);
  8         17  
  8         10881  
20              
21             our $VERSION = "0.010088";
22              
23             __PACKAGE__->mk_accessors(qw(
24             connection_info
25             go_perl
26             ));
27              
28              
29             sub new {
30 336     336 0 1426 my ($self, $args) = @_;
31 336   33     1820 $args->{go_perl} ||= do {
32 0 0       0 ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
33             };
34 336 50       1440 if (not ref $args->{go_perl}) {
35             # user can override the perl to be used, either with an array ref
36             # containing the command name and args to use, or with a string
37             # (ie via the DSN) in which case, to enable args to be passed,
38             # we split on two or more consecutive spaces (otherwise the path
39             # to perl couldn't contain a space itself).
40 336         3809 $args->{go_perl} = [ split /\s{2,}/, $args->{go_perl} ];
41             }
42 336         2684 return $self->SUPER::new($args);
43             }
44              
45              
46             # nonblock($fh) puts filehandle into nonblocking mode
47             sub nonblock {
48 776     776 0 2634 my $fh = shift;
49 776 50       8321 my $flags = fcntl($fh, F_GETFL, 0)
50             or croak "Can't get flags for filehandle $fh: $!";
51 776 50       9278 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
52             or croak "Can't make filehandle $fh nonblocking: $!";
53             }
54              
55              
56             sub start_pipe_command {
57 388     388 0 1472 my ($self, $cmd) = @_;
58 388 50       2014 $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
59              
60             # if it's important that the subprocess uses the same
61             # (versions of) modules as us then the caller should
62             # set PERL5LIB itself.
63              
64             # limit various forms of insanity, for now
65 388         2950 local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
66 388         2716 local $ENV{DBI_AUTOPROXY};
67 388         1940 local $ENV{DBI_PROFILE};
68              
69 388         4619 my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
70 388 50       32796 my $pid = open3($wfh, $rfh, $efh, @$cmd)
71             or die "error starting @$cmd: $!\n";
72 388 50       2003550 if ($self->trace) {
73 0         0 $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d r%d e%d, ppid=$$}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
74             }
75 388         3310 nonblock($rfh);
76 388         2263 nonblock($efh);
77 388         13844 my $ios = IO::Select->new($rfh, $efh);
78              
79             return {
80 388         94383 cmd=>$cmd,
81             pid=>$pid,
82             wfh=>$wfh, rfh=>$rfh, efh=>$efh,
83             ios=>$ios,
84             };
85             }
86              
87              
88             sub cmd_as_string {
89 0     0 0 0 my $self = shift;
90             # XXX meant to return a properly shell-escaped string suitable for system
91             # but its only for debugging so that can wait
92 0         0 my $connection_info = $self->connection_info;
93 0 0       0 return join " ", map { (m/^[-:\w]*$/) ? $_ : "'$_'" } @{$connection_info->{cmd}};
  0         0  
  0         0  
94             }
95              
96              
97             sub transmit_request_by_transport {
98 380     380 0 1512 my ($self, $request) = @_;
99              
100 380         3515 my $frozen_request = $self->freeze_request($request);
101              
102 380         1229 my $cmd = [ @{$self->go_perl}, qw(-MDBI::Gofer::Transport::pipeone -e run_one_stdio)];
  380         1930  
103 380         2316 my $info = $self->start_pipe_command($cmd);
104              
105 380         2330 my $wfh = delete $info->{wfh};
106             # send frozen request
107 380         4665 local $\;
108 380 50       8109 print $wfh $frozen_request
109             or warn "error writing to @$cmd: $!\n";
110             # indicate that there's no more
111 380 50       5206 close $wfh
112             or die "error closing pipe to @$cmd: $!\n";
113              
114 380         5012 $self->connection_info( $info );
115 380         23969 return;
116             }
117              
118              
119             sub read_response_from_fh {
120 760     760 0 2701 my ($self, $fh_actions) = @_;
121 760         2989 my $trace = $self->trace;
122              
123 760   50     2879 my $info = $self->connection_info || die;
124 760         1875 my ($ios) = @{$info}{qw(ios)};
  760         2049  
125 760         1805 my $errors = 0;
126 760         1512 my $complete;
127              
128 760 50       5188 die "No handles to read response from" unless $ios->count;
129              
130 760         6921 while ($ios->count) {
131 1149         12604 my @readable = $ios->can_read();
132 1149         64218362 for my $fh (@readable) {
133 1520         4137 local $_;
134 1520   50     10202 my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
135 1520         27030 my $rv = sysread($fh, $_='', 1024*31); # to fit in 32KB slab
136 1520 100       6910 unless ($rv) { # error (undef) or end of file (0)
137 760         1766 my $action;
138 760 50       2684 unless (defined $rv) { # was an error
139 0 0       0 $self->trace_msg("error on handle $fh: $!\n") if $trace >= 4;
140 0   0     0 $action = $actions->{error} || $actions->{eof};
141 0         0 ++$errors;
142             # XXX an error may be a permenent condition of the handle
143             # if so we'll loop here - not good
144             }
145             else {
146 760         2538 $action = $actions->{eof};
147 760 50       2891 $self->trace_msg("eof on handle $fh\n") if $trace >= 4;
148             }
149 760 50       3656 if ($action->($fh)) {
150 760 50       2666 $self->trace_msg("removing $fh from handle set\n") if $trace >= 4;
151 760         5345 $ios->remove($fh);
152             }
153 760         49739 next;
154             }
155             # action returns true if the response is now complete
156             # (we finish all handles
157 760 100       4997 $actions->{read}->($fh) && ++$complete;
158             }
159 1149 100       7663 last if $complete;
160             }
161 760         4391 return $errors;
162             }
163              
164              
165             sub receive_response_by_transport {
166 380     380 0 1411 my $self = shift;
167              
168 380   50     1844 my $info = $self->connection_info || die;
169 380         1388 my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
  380         1993  
170              
171 380         1251 my $frozen_response;
172             my $stderr_msg;
173              
174             $self->read_response_from_fh( {
175             $efh => {
176 0     0   0 error => sub { warn "error reading response stderr: $!"; 1 },
  0         0  
177 380     380   1034 eof => sub { warn "eof on stderr" if 0; 1 },
  380         1388  
178 0     0   0 read => sub { $stderr_msg .= $_; 0 },
  0         0  
179             },
180             $rfh => {
181 0     0   0 error => sub { warn "error reading response: $!"; 1 },
  0         0  
182 380     380   944 eof => sub { warn "eof on stdout" if 0; 1 },
  380         1714  
183 380     380   2808 read => sub { $frozen_response .= $_; 0 },
  380         3101  
184             },
185 380         26772 });
186              
187 380 50       24070 waitpid $info->{pid}, 0
188             or warn "waitpid: $!"; # XXX do something more useful?
189              
190 380 50       2151 die ref($self)." command (@$cmd) failed: $stderr_msg"
191             if not $frozen_response; # no output on stdout at all
192              
193             # XXX need to be able to detect and deal with corruption
194 380         6647 my $response = $self->thaw_response($frozen_response);
195              
196 380 50       1834 if ($stderr_msg) {
197             # add stderr messages as warnings (for PrintWarn)
198 0 0 0     0 $response->add_err(0, $stderr_msg, undef, $self->trace)
199             # but ignore warning from old version of blib
200             unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
201             }
202              
203 380         2697 return $response;
204             }
205              
206              
207             1;
208              
209             __END__