File Coverage

blib/lib/Parallel/Pipes.pm
Criterion Covered Total %
statement 145 153 94.7
branch 34 46 73.9
condition 2 2 100.0
subroutine 29 29 100.0
pod 0 6 0.0
total 210 236 88.9


line stmt bran cond sub pod time code
1             package Parallel::Pipes;
2 34     34   2242915 use 5.008001;
  34         137  
3 34     34   191 use strict;
  34         134  
  34         1022  
4 34     34   262 use warnings;
  34         85  
  34         2068  
5 34     34   16515 use IO::Handle;
  34         193994  
  34         2967  
6 34     34   17473 use IO::Select;
  34         69124  
  34         3139  
7              
8 34     34   332 use constant WIN32 => $^O eq 'MSWin32';
  34         79  
  34         4580  
9              
10             our $VERSION = '0.201';
11              
12             {
13             package Parallel::Pipes::Impl;
14 34     34   23874 use Storable ();
  34         166479  
  34         19506  
15             sub new {
16 164     164   5044 my ($class, %option) = @_;
17 164 50       2234 my $read_fh = delete $option{read_fh} or die;
18 164 50       1256 my $write_fh = delete $option{write_fh} or die;
19 164         13111 $write_fh->autoflush(1);
20 164         58487 bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class;
21             }
22             sub read :method {
23 433     433   1451 my $self = shift;
24 433 100       2189 my $_size = $self->_read(4) or return;
25 404         1725 my $size = unpack 'I', $_size;
26 404         1599 my $freezed = $self->_read($size);
27 404         2168 Storable::thaw($freezed);
28             }
29             sub write :method {
30 404     404   10656666 my ($self, $data) = @_;
31 404         3856 my $freezed = Storable::freeze({data => $data});
32 404         27386 my $size = pack 'I', length($freezed);
33 404         3388 $self->_write("$size$freezed");
34             }
35             sub _read {
36 837     837   2310 my ($self, $size) = @_;
37 837         2332 my $fh = $self->{read_fh};
38 837         1882 my $offset = length $self->{buf};
39 837         3483 while ($offset < $size) {
40 436         577298 my $len = sysread $fh, $self->{buf}, 65536, $offset;
41 436 50       3306 if (!defined $len) {
    100          
42 0         0 die $!;
43             } elsif ($len == 0) {
44 29         349 last;
45             } else {
46 407         1505 $offset += $len;
47             }
48             }
49 837         5614 return substr $self->{buf}, 0, $size, '';
50             }
51             sub _write {
52 404     404   1900 my ($self, $data) = @_;
53 404         1080 my $fh = $self->{write_fh};
54 404         879 my $size = length $data;
55 404         776 my $offset = 0;
56 404         1723 while ($size) {
57 404         22534 my $len = syswrite $fh, $data, $size, $offset;
58 404 50       2301 if (!defined $len) {
    50          
59 0         0 die $!;
60             } elsif ($len == 0) {
61 0         0 last;
62             } else {
63 404         909 $size -= $len;
64 404         1399 $offset += $len;
65             }
66             }
67 404         3212 $size;
68             }
69             }
70             {
71             package Parallel::Pipes::Here;
72             our @ISA = qw(Parallel::Pipes::Impl);
73 34     34   348 use Carp ();
  34         102  
  34         11159  
74             sub new {
75 135     135   12764 my ($class, %option) = @_;
76 135         8764 $class->SUPER::new(%option, _written => 0);
77             }
78             sub is_written {
79 1249     1249   2664 my $self = shift;
80 1249         4323 $self->{_written} == 1;
81             }
82             sub read :method {
83 292     292   1116 my $self = shift;
84 292 50       723 if (!$self->is_written) {
85 0         0 Carp::croak("This pipe has not been written; you cannot read it");
86             }
87 292         697 $self->{_written}--;
88 292 50       1305 return unless my $read = $self->SUPER::read;
89 292         9847 $read->{data};
90             }
91             sub write :method {
92 292     292   1412 my ($self, $task) = @_;
93 292 50       657 if ($self->is_written) {
94 0         0 Carp::croak("This pipe has already been written; you must read it first");
95             }
96 292         602 $self->{_written}++;
97 292         1396 $self->SUPER::write($task);
98             }
99             }
100             {
101             package Parallel::Pipes::There;
102             our @ISA = qw(Parallel::Pipes::Impl);
103             }
104             {
105             package Parallel::Pipes::Impl::NoFork;
106 34     34   270 use Carp ();
  34         68  
  34         46076  
107             sub new {
108 24     24   67 my ($class, %option) = @_;
109 24         146 bless {%option}, $class;
110             }
111             sub is_written {
112 2172     2172   3717 my $self = shift;
113 2172         8173 exists $self->{_result};
114             }
115             sub read :method {
116 714     714   1903 my $self = shift;
117 714 50       1727 if (!$self->is_written) {
118 0         0 Carp::croak("This pipe has not been written; you cannot read it");
119             }
120 714         2522 delete $self->{_result};
121             }
122             sub write :method {
123 714     714   2283 my ($self, $task) = @_;
124 714 50       1371 if ($self->is_written) {
125 0         0 Carp::croak("This pipe has already been written; you must read it first");
126             }
127 714         2063 my $result = $self->{work}->($task);
128 714         4875561 $self->{_result} = $result;
129             }
130             }
131              
132             sub new {
133 69     69 0 5070175 my ($class, $number, $work, $option) = @_;
134 69         142 if (WIN32 and $number != 1) {
135             die "The number of pipes must be 1 under WIN32 environment.\n";
136             }
137 69   100     810 my $self = bless {
138             work => $work,
139             number => $number,
140             no_fork => $number == 1,
141             pipes => {},
142             option => $option || {},
143             }, $class;
144              
145 69 100       287 if ($self->no_fork) {
146 24         252 $self->{pipes}{-1} = Parallel::Pipes::Impl::NoFork->new(pid => -1, work => $self->{work});
147             } else {
148 45         303 $self->_fork for 1 .. $number;
149             }
150 40         897 $self;
151             }
152              
153 1127     1127 0 7038 sub no_fork { shift->{no_fork} }
154              
155             sub _fork {
156 164     164   603 my $self = shift;
157 164         502 my $work = $self->{work};
158 164         10483 pipe my $read_fh1, my $write_fh1;
159 164         12481 pipe my $read_fh2, my $write_fh2;
160 164         320396 my $pid = fork;
161 164 50       12631 die "fork failed" unless defined $pid;
162 164 100       5520 if ($pid == 0) {
163 29         4675 srand;
164 29         3159 close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes;
  56         4437  
165 29         3734 my $there = Parallel::Pipes::There->new(read_fh => $read_fh2, write_fh => $write_fh1);
166 29         1670 while (my $read = $there->read) {
167 112         8546 $there->write( $work->($read->{data}) );
168             }
169 29         13439 exit;
170             }
171 135         19124 close $_ for $write_fh1, $read_fh2;
172 135         12975 $self->{pipes}{$pid} = Parallel::Pipes::Here->new(
173             pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2,
174             );
175             }
176              
177             sub pipes {
178 1084     1084 0 2191 my $self = shift;
179 1084         2144 map { $self->{pipes}{$_} } sort { $a <=> $b } keys %{$self->{pipes}};
  2320         8296  
  2112         5815  
  1084         9743  
180             }
181              
182             sub is_ready {
183 1019     1019 0 3649 my $self = shift;
184 1019 100       2842 return $self->pipes if $self->no_fork;
185              
186 281 100       1606 my @pipes = @_ ? @_ : $self->pipes;
187 281 100       871 if (my @ready = grep { $_->{_written} == 0 } @pipes) {
  1298         3578  
188 57         424 return @ready;
189             }
190              
191 224         444 my $select = IO::Select->new(map { $_->{read_fh} } @pipes);
  1019         3776  
192 224         34951 my @ready;
193 224 100       821 if (my $tick = $self->{option}{idle_tick}) {
194 14         27 while (1) {
195 28 100       232 if (my @r = $select->can_read($tick)) {
196 14         1395679 @ready = @r;
197 14         48 last;
198             }
199 14         5607510 $self->{option}{idle_work}->();
200             }
201             } else {
202 210         708 @ready = $select->can_read;
203             }
204              
205 224         492137 my @return;
206 224         981 for my $pipe (@pipes) {
207 1019 100       1994 if (grep { $pipe->{read_fh} == $_ } @ready) {
  1320         5200  
208 289         658 push @return, $pipe;
209             }
210             }
211 224         2451 return @return;
212             }
213              
214             sub is_written {
215 66     66 0 291 my $self = shift;
216 66         209 grep { $_->is_written } $self->pipes;
  280         593  
217             }
218              
219             sub close :method {
220 39     39 0 55652 my $self = shift;
221 39 100       128 return if $self->no_fork;
222              
223 15         59 close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes;
  75         1692  
224 15         94 while (%{$self->{pipes}}) {
  90         890  
225 75         8075975 my $pid = wait;
226 75 50       3099 if (delete $self->{pipes}{$pid}) {
227             # OK
228             } else {
229 0         0 warn "wait() unexpectedly returns $pid\n";
230             }
231             }
232             }
233              
234             1;
235             __END__