File Coverage

blib/lib/Parallel/Pipes.pm
Criterion Covered Total %
statement 196 204 96.0
branch 34 46 73.9
condition 2 2 100.0
subroutine 29 29 100.0
pod 0 6 0.0
total 261 287 90.9


line stmt bran cond sub pod time code
1             package Parallel::Pipes v1.0.0;
2 34     34   1665783 use v5.24;
  34         103  
3 34     34   172 use warnings;
  34         117  
  34         1956  
4 34     34   150 use experimental qw(lexical_subs signatures);
  34         46  
  34         200  
5              
6             our $TRIAL = 0;
7 34     34   17296 use IO::Handle;
  34         119469  
  34         1875  
8 34     34   14960 use IO::Select;
  34         51544  
  34         2358  
9              
10 34     34   192 use constant WIN32 => $^O eq 'MSWin32';
  34         57  
  34         4111  
11              
12             package Parallel::Pipes::Impl {
13 34     34   19249 use Storable ();
  34         127051  
  34         22684  
14 164     164   709 sub new ($class, %option) {
  164         1842  
  164         13894  
  164         360  
15 164 50       1414 my $read_fh = delete $option{read_fh} or die;
16 164 50       1705 my $write_fh = delete $option{write_fh} or die;
17 164         12747 $write_fh->autoflush(1);
18 164         49505 bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class;
19             }
20 433     433   819 sub read ($self) {
  433         954  
  433         750  
21 433 100       2665 my $_size = $self->_read(4) or return;
22 404         1901 my $size = unpack 'I', $_size;
23 404         3021 my $freezed = $self->_read($size);
24 404         1618 Storable::thaw($freezed);
25             }
26 404     404   10688831 sub write ($self, $data) {
  404         931  
  404         635  
  404         621  
27 404         3138 my $freezed = Storable::freeze({data => $data});
28 404         23952 my $size = pack 'I', length($freezed);
29 404         2796 $self->_write("$size$freezed");
30             }
31 837     837   1611 sub _read ($self, $size) {
  837         1229  
  837         1284  
  837         3106  
32 837         1817 my $fh = $self->{read_fh};
33 837         1501 my $offset = length $self->{buf};
34 837         2193 while ($offset < $size) {
35 436         595071 my $len = sysread $fh, $self->{buf}, 65536, $offset;
36 436 50       2979 if (!defined $len) {
    100          
37 0         0 die $!;
38             } elsif ($len == 0) {
39 29         291 last;
40             } else {
41 407         1236 $offset += $len;
42             }
43             }
44 837         5265 return substr $self->{buf}, 0, $size, '';
45             }
46 404     404   790 sub _write ($self, $data) {
  404         676  
  404         745  
  404         758  
47 404         3199 my $fh = $self->{write_fh};
48 404         673 my $size = length $data;
49 404         711 my $offset = 0;
50 404         1110 while ($size) {
51 404         16652 my $len = syswrite $fh, $data, $size, $offset;
52 404 50       2020 if (!defined $len) {
    50          
53 0         0 die $!;
54             } elsif ($len == 0) {
55 0         0 last;
56             } else {
57 404         658 $size -= $len;
58 404         1106 $offset += $len;
59             }
60             }
61 404         3124 $size;
62             }
63             }
64             package Parallel::Pipes::Here {
65             our @ISA = qw(Parallel::Pipes::Impl);
66 34     34   325 use Carp ();
  34         98  
  34         11850  
67 135     135   2256 sub new ($class, %option) {
  135         1854  
  135         9957  
  135         716  
68 135         8835 $class->SUPER::new(%option, _written => 0);
69             }
70 1240     1240   1686 sub is_written ($self) {
  1240         1435  
  1240         1425  
71 1240         3587 $self->{_written} == 1;
72             }
73 292     292   585 sub read ($self) {
  292         391  
  292         342  
74 292 50       596 if (!$self->is_written) {
75 0         0 Carp::croak("This pipe has not been written; you cannot read it");
76             }
77 292         549 $self->{_written}--;
78 292 50       879 return unless my $read = $self->SUPER::read;
79 292         7737 $read->{data};
80             }
81 292     292   701 sub write ($self, $task) {
  292         356  
  292         448  
  292         566  
82 292 50       685 if ($self->is_written) {
83 0         0 Carp::croak("This pipe has already been written; you must read it first");
84             }
85 292         451 $self->{_written}++;
86 292         1210 $self->SUPER::write($task);
87             }
88             }
89             package Parallel::Pipes::There {
90             our @ISA = qw(Parallel::Pipes::Impl);
91             }
92             package Parallel::Pipes::Impl::NoFork {
93 34     34   209 use Carp ();
  34         52  
  34         53855  
94 24     24   50 sub new ($class, %option) {
  24         45  
  24         67  
  24         46  
95 24         300 bless {%option}, $class;
96             }
97 2172     2172   3528 sub is_written ($self) {
  2172         3201  
  2172         3253  
98 2172         8308 exists $self->{_result};
99             }
100 714     714   2377 sub read ($self) {
  714         1844  
  714         1152  
101 714 50       2032 if (!$self->is_written) {
102 0         0 Carp::croak("This pipe has not been written; you cannot read it");
103             }
104 714         2905 delete $self->{_result};
105             }
106 714     714   1840 sub write ($self, $task) {
  714         1027  
  714         967  
  714         950  
107 714 50       1490 if ($self->is_written) {
108 0         0 Carp::croak("This pipe has already been written; you must read it first");
109             }
110 714         3314 my $result = $self->{work}->($task);
111 714         5083227 $self->{_result} = $result;
112             }
113             }
114              
115 69     69 0 3580339 sub new ($class, $number, $work, $option = undef) {
  69         326  
  69         127  
  69         138  
  69         110  
  69         98  
116 69         115 if (WIN32 and $number != 1) {
117             die "The number of pipes must be 1 under WIN32 environment.\n";
118             }
119 69   100     889 my $self = bless {
120             work => $work,
121             number => $number,
122             no_fork => $number == 1,
123             pipes => {},
124             option => $option || {},
125             }, $class;
126              
127 69 100       258 if ($self->no_fork) {
128 24         514 $self->{pipes}{-1} = Parallel::Pipes::Impl::NoFork->new(pid => -1, work => $self->{work});
129             } else {
130 45         304 $self->_fork for 1 .. $number;
131             }
132 40         644 $self;
133             }
134              
135 1123     1123 0 1761 sub no_fork ($self) { $self->{no_fork} }
  1123         1507  
  1123         1620  
  1123         38047  
136              
137 164     164   472 sub _fork ($self) {
  164         383  
  164         196  
138 164         344 my $work = $self->{work};
139 164         8648 pipe my $read_fh1, my $write_fh1;
140 164         6795 pipe my $read_fh2, my $write_fh2;
141 164         297680 my $pid = fork;
142 164 50       12250 die "fork failed" unless defined $pid;
143 164 100       7384 if ($pid == 0) {
144 29         3884 srand;
145 29         2605 close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes;
  56         3928  
146 29         3804 my $there = Parallel::Pipes::There->new(read_fh => $read_fh2, write_fh => $write_fh1);
147 29         1735 while (my $read = $there->read) {
148 112         7742 $there->write( $work->($read->{data}) );
149             }
150 29         15714 exit;
151             }
152 135         19337 close $_ for $write_fh1, $read_fh2;
153 135         12888 $self->{pipes}{$pid} = Parallel::Pipes::Here->new(
154             pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2,
155             );
156             }
157              
158 1080     1080 0 2694 sub pipes ($self) {
  1080         1796  
  1080         1796  
159 1080         9892 map { $self->{pipes}{$_} } sort { $a <=> $b } keys $self->{pipes}->%*;
  2300         7665  
  2412         4752  
160             }
161              
162 1015     1015 0 3988 sub is_ready ($self, @args) {
  1015         1912  
  1015         1961  
  1015         1810  
163 1015 100       3876 return $self->pipes if $self->no_fork;
164              
165 277 100       1165 my @pipes = @args ? @args : $self->pipes;
166 277 100       705 if (my @ready = grep { $_->{_written} == 0 } @pipes) {
  1278         3260  
167 54         318 return @ready;
168             }
169              
170 223         357 my $select = IO::Select->new(map { $_->{read_fh} } @pipes);
  1014         2640  
171 223         27728 my @ready;
172 223 100       618 if (my $tick = $self->{option}{idle_tick}) {
173 21         21 while (1) {
174 35 100       231 if (my @r = $select->can_read($tick)) {
175 21         1387083 @ready = @r;
176 21         43 last;
177             }
178 14         5614348 $self->{option}{idle_work}->();
179             }
180             } else {
181 202         515 @ready = $select->can_read;
182             }
183              
184 223         527854 my @return;
185 223         766 for my $pipe (@pipes) {
186 1014 100       1330 if (grep { $pipe->{read_fh} == $_ } @ready) {
  1313         4162  
187 289         537 push @return, $pipe;
188             }
189             }
190 223         2222 return @return;
191             }
192              
193 64     64 0 434 sub is_written ($self) {
  64         106  
  64         92  
194 64         157 grep { $_->is_written } $self->pipes;
  270         417  
195             }
196              
197 39     39 0 44980 sub close ($self) {
  39         274  
  39         69  
198 39 100       121 return if $self->no_fork;
199              
200 15         46 close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes;
  75         1463  
201 15         101 while ($self->{pipes}->%*) {
202 75         7657607 my $pid = wait;
203 75 50       3168 if (delete $self->{pipes}{$pid}) {
204             # OK
205             } else {
206 0           warn "wait() unexpectedly returns $pid\n";
207             }
208             }
209             }
210              
211             1;
212             __END__