| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Parallel::Pipes v1.0.0; |
|
2
|
34
|
|
|
34
|
|
1665783
|
use v5.24; |
|
|
34
|
|
|
|
|
103
|
|
|
3
|
34
|
|
|
34
|
|
172
|
use warnings; |
|
|
34
|
|
|
|
|
117
|
|
|
|
34
|
|
|
|
|
1956
|
|
|
4
|
34
|
|
|
34
|
|
150
|
use experimental qw(lexical_subs signatures); |
|
|
34
|
|
|
|
|
46
|
|
|
|
34
|
|
|
|
|
200
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $TRIAL = 0; |
|
7
|
34
|
|
|
34
|
|
17296
|
use IO::Handle; |
|
|
34
|
|
|
|
|
119469
|
|
|
|
34
|
|
|
|
|
1875
|
|
|
8
|
34
|
|
|
34
|
|
14960
|
use IO::Select; |
|
|
34
|
|
|
|
|
51544
|
|
|
|
34
|
|
|
|
|
2358
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
34
|
|
|
34
|
|
192
|
use constant WIN32 => $^O eq 'MSWin32'; |
|
|
34
|
|
|
|
|
57
|
|
|
|
34
|
|
|
|
|
4111
|
|
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
package Parallel::Pipes::Impl { |
|
13
|
34
|
|
|
34
|
|
19249
|
use Storable (); |
|
|
34
|
|
|
|
|
127051
|
|
|
|
34
|
|
|
|
|
22684
|
|
|
14
|
164
|
|
|
164
|
|
709
|
sub new ($class, %option) { |
|
|
164
|
|
|
|
|
1842
|
|
|
|
164
|
|
|
|
|
13894
|
|
|
|
164
|
|
|
|
|
360
|
|
|
15
|
164
|
50
|
|
|
|
1414
|
my $read_fh = delete $option{read_fh} or die; |
|
16
|
164
|
50
|
|
|
|
1705
|
my $write_fh = delete $option{write_fh} or die; |
|
17
|
164
|
|
|
|
|
12747
|
$write_fh->autoflush(1); |
|
18
|
164
|
|
|
|
|
49505
|
bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class; |
|
19
|
|
|
|
|
|
|
} |
|
20
|
433
|
|
|
433
|
|
819
|
sub read ($self) { |
|
|
433
|
|
|
|
|
954
|
|
|
|
433
|
|
|
|
|
750
|
|
|
21
|
433
|
100
|
|
|
|
2665
|
my $_size = $self->_read(4) or return; |
|
22
|
404
|
|
|
|
|
1901
|
my $size = unpack 'I', $_size; |
|
23
|
404
|
|
|
|
|
3021
|
my $freezed = $self->_read($size); |
|
24
|
404
|
|
|
|
|
1618
|
Storable::thaw($freezed); |
|
25
|
|
|
|
|
|
|
} |
|
26
|
404
|
|
|
404
|
|
10688831
|
sub write ($self, $data) { |
|
|
404
|
|
|
|
|
931
|
|
|
|
404
|
|
|
|
|
635
|
|
|
|
404
|
|
|
|
|
621
|
|
|
27
|
404
|
|
|
|
|
3138
|
my $freezed = Storable::freeze({data => $data}); |
|
28
|
404
|
|
|
|
|
23952
|
my $size = pack 'I', length($freezed); |
|
29
|
404
|
|
|
|
|
2796
|
$self->_write("$size$freezed"); |
|
30
|
|
|
|
|
|
|
} |
|
31
|
837
|
|
|
837
|
|
1611
|
sub _read ($self, $size) { |
|
|
837
|
|
|
|
|
1229
|
|
|
|
837
|
|
|
|
|
1284
|
|
|
|
837
|
|
|
|
|
3106
|
|
|
32
|
837
|
|
|
|
|
1817
|
my $fh = $self->{read_fh}; |
|
33
|
837
|
|
|
|
|
1501
|
my $offset = length $self->{buf}; |
|
34
|
837
|
|
|
|
|
2193
|
while ($offset < $size) { |
|
35
|
436
|
|
|
|
|
595071
|
my $len = sysread $fh, $self->{buf}, 65536, $offset; |
|
36
|
436
|
50
|
|
|
|
2979
|
if (!defined $len) { |
|
|
|
100
|
|
|
|
|
|
|
37
|
0
|
|
|
|
|
0
|
die $!; |
|
38
|
|
|
|
|
|
|
} elsif ($len == 0) { |
|
39
|
29
|
|
|
|
|
291
|
last; |
|
40
|
|
|
|
|
|
|
} else { |
|
41
|
407
|
|
|
|
|
1236
|
$offset += $len; |
|
42
|
|
|
|
|
|
|
} |
|
43
|
|
|
|
|
|
|
} |
|
44
|
837
|
|
|
|
|
5265
|
return substr $self->{buf}, 0, $size, ''; |
|
45
|
|
|
|
|
|
|
} |
|
46
|
404
|
|
|
404
|
|
790
|
sub _write ($self, $data) { |
|
|
404
|
|
|
|
|
676
|
|
|
|
404
|
|
|
|
|
745
|
|
|
|
404
|
|
|
|
|
758
|
|
|
47
|
404
|
|
|
|
|
3199
|
my $fh = $self->{write_fh}; |
|
48
|
404
|
|
|
|
|
673
|
my $size = length $data; |
|
49
|
404
|
|
|
|
|
711
|
my $offset = 0; |
|
50
|
404
|
|
|
|
|
1110
|
while ($size) { |
|
51
|
404
|
|
|
|
|
16652
|
my $len = syswrite $fh, $data, $size, $offset; |
|
52
|
404
|
50
|
|
|
|
2020
|
if (!defined $len) { |
|
|
|
50
|
|
|
|
|
|
|
53
|
0
|
|
|
|
|
0
|
die $!; |
|
54
|
|
|
|
|
|
|
} elsif ($len == 0) { |
|
55
|
0
|
|
|
|
|
0
|
last; |
|
56
|
|
|
|
|
|
|
} else { |
|
57
|
404
|
|
|
|
|
658
|
$size -= $len; |
|
58
|
404
|
|
|
|
|
1106
|
$offset += $len; |
|
59
|
|
|
|
|
|
|
} |
|
60
|
|
|
|
|
|
|
} |
|
61
|
404
|
|
|
|
|
3124
|
$size; |
|
62
|
|
|
|
|
|
|
} |
|
63
|
|
|
|
|
|
|
} |
|
64
|
|
|
|
|
|
|
package Parallel::Pipes::Here { |
|
65
|
|
|
|
|
|
|
our @ISA = qw(Parallel::Pipes::Impl); |
|
66
|
34
|
|
|
34
|
|
325
|
use Carp (); |
|
|
34
|
|
|
|
|
98
|
|
|
|
34
|
|
|
|
|
11850
|
|
|
67
|
135
|
|
|
135
|
|
2256
|
sub new ($class, %option) { |
|
|
135
|
|
|
|
|
1854
|
|
|
|
135
|
|
|
|
|
9957
|
|
|
|
135
|
|
|
|
|
716
|
|
|
68
|
135
|
|
|
|
|
8835
|
$class->SUPER::new(%option, _written => 0); |
|
69
|
|
|
|
|
|
|
} |
|
70
|
1240
|
|
|
1240
|
|
1686
|
sub is_written ($self) { |
|
|
1240
|
|
|
|
|
1435
|
|
|
|
1240
|
|
|
|
|
1425
|
|
|
71
|
1240
|
|
|
|
|
3587
|
$self->{_written} == 1; |
|
72
|
|
|
|
|
|
|
} |
|
73
|
292
|
|
|
292
|
|
585
|
sub read ($self) { |
|
|
292
|
|
|
|
|
391
|
|
|
|
292
|
|
|
|
|
342
|
|
|
74
|
292
|
50
|
|
|
|
596
|
if (!$self->is_written) { |
|
75
|
0
|
|
|
|
|
0
|
Carp::croak("This pipe has not been written; you cannot read it"); |
|
76
|
|
|
|
|
|
|
} |
|
77
|
292
|
|
|
|
|
549
|
$self->{_written}--; |
|
78
|
292
|
50
|
|
|
|
879
|
return unless my $read = $self->SUPER::read; |
|
79
|
292
|
|
|
|
|
7737
|
$read->{data}; |
|
80
|
|
|
|
|
|
|
} |
|
81
|
292
|
|
|
292
|
|
701
|
sub write ($self, $task) { |
|
|
292
|
|
|
|
|
356
|
|
|
|
292
|
|
|
|
|
448
|
|
|
|
292
|
|
|
|
|
566
|
|
|
82
|
292
|
50
|
|
|
|
685
|
if ($self->is_written) { |
|
83
|
0
|
|
|
|
|
0
|
Carp::croak("This pipe has already been written; you must read it first"); |
|
84
|
|
|
|
|
|
|
} |
|
85
|
292
|
|
|
|
|
451
|
$self->{_written}++; |
|
86
|
292
|
|
|
|
|
1210
|
$self->SUPER::write($task); |
|
87
|
|
|
|
|
|
|
} |
|
88
|
|
|
|
|
|
|
} |
|
89
|
|
|
|
|
|
|
package Parallel::Pipes::There { |
|
90
|
|
|
|
|
|
|
our @ISA = qw(Parallel::Pipes::Impl); |
|
91
|
|
|
|
|
|
|
} |
|
92
|
|
|
|
|
|
|
package Parallel::Pipes::Impl::NoFork { |
|
93
|
34
|
|
|
34
|
|
209
|
use Carp (); |
|
|
34
|
|
|
|
|
52
|
|
|
|
34
|
|
|
|
|
53855
|
|
|
94
|
24
|
|
|
24
|
|
50
|
sub new ($class, %option) { |
|
|
24
|
|
|
|
|
45
|
|
|
|
24
|
|
|
|
|
67
|
|
|
|
24
|
|
|
|
|
46
|
|
|
95
|
24
|
|
|
|
|
300
|
bless {%option}, $class; |
|
96
|
|
|
|
|
|
|
} |
|
97
|
2172
|
|
|
2172
|
|
3528
|
sub is_written ($self) { |
|
|
2172
|
|
|
|
|
3201
|
|
|
|
2172
|
|
|
|
|
3253
|
|
|
98
|
2172
|
|
|
|
|
8308
|
exists $self->{_result}; |
|
99
|
|
|
|
|
|
|
} |
|
100
|
714
|
|
|
714
|
|
2377
|
sub read ($self) { |
|
|
714
|
|
|
|
|
1844
|
|
|
|
714
|
|
|
|
|
1152
|
|
|
101
|
714
|
50
|
|
|
|
2032
|
if (!$self->is_written) { |
|
102
|
0
|
|
|
|
|
0
|
Carp::croak("This pipe has not been written; you cannot read it"); |
|
103
|
|
|
|
|
|
|
} |
|
104
|
714
|
|
|
|
|
2905
|
delete $self->{_result}; |
|
105
|
|
|
|
|
|
|
} |
|
106
|
714
|
|
|
714
|
|
1840
|
sub write ($self, $task) { |
|
|
714
|
|
|
|
|
1027
|
|
|
|
714
|
|
|
|
|
967
|
|
|
|
714
|
|
|
|
|
950
|
|
|
107
|
714
|
50
|
|
|
|
1490
|
if ($self->is_written) { |
|
108
|
0
|
|
|
|
|
0
|
Carp::croak("This pipe has already been written; you must read it first"); |
|
109
|
|
|
|
|
|
|
} |
|
110
|
714
|
|
|
|
|
3314
|
my $result = $self->{work}->($task); |
|
111
|
714
|
|
|
|
|
5083227
|
$self->{_result} = $result; |
|
112
|
|
|
|
|
|
|
} |
|
113
|
|
|
|
|
|
|
} |
|
114
|
|
|
|
|
|
|
|
|
115
|
69
|
|
|
69
|
0
|
3580339
|
sub new ($class, $number, $work, $option = undef) { |
|
|
69
|
|
|
|
|
326
|
|
|
|
69
|
|
|
|
|
127
|
|
|
|
69
|
|
|
|
|
138
|
|
|
|
69
|
|
|
|
|
110
|
|
|
|
69
|
|
|
|
|
98
|
|
|
116
|
69
|
|
|
|
|
115
|
if (WIN32 and $number != 1) { |
|
117
|
|
|
|
|
|
|
die "The number of pipes must be 1 under WIN32 environment.\n"; |
|
118
|
|
|
|
|
|
|
} |
|
119
|
69
|
|
100
|
|
|
889
|
my $self = bless { |
|
120
|
|
|
|
|
|
|
work => $work, |
|
121
|
|
|
|
|
|
|
number => $number, |
|
122
|
|
|
|
|
|
|
no_fork => $number == 1, |
|
123
|
|
|
|
|
|
|
pipes => {}, |
|
124
|
|
|
|
|
|
|
option => $option || {}, |
|
125
|
|
|
|
|
|
|
}, $class; |
|
126
|
|
|
|
|
|
|
|
|
127
|
69
|
100
|
|
|
|
258
|
if ($self->no_fork) { |
|
128
|
24
|
|
|
|
|
514
|
$self->{pipes}{-1} = Parallel::Pipes::Impl::NoFork->new(pid => -1, work => $self->{work}); |
|
129
|
|
|
|
|
|
|
} else { |
|
130
|
45
|
|
|
|
|
304
|
$self->_fork for 1 .. $number; |
|
131
|
|
|
|
|
|
|
} |
|
132
|
40
|
|
|
|
|
644
|
$self; |
|
133
|
|
|
|
|
|
|
} |
|
134
|
|
|
|
|
|
|
|
|
135
|
1123
|
|
|
1123
|
0
|
1761
|
sub no_fork ($self) { $self->{no_fork} } |
|
|
1123
|
|
|
|
|
1507
|
|
|
|
1123
|
|
|
|
|
1620
|
|
|
|
1123
|
|
|
|
|
38047
|
|
|
136
|
|
|
|
|
|
|
|
|
137
|
164
|
|
|
164
|
|
472
|
sub _fork ($self) { |
|
|
164
|
|
|
|
|
383
|
|
|
|
164
|
|
|
|
|
196
|
|
|
138
|
164
|
|
|
|
|
344
|
my $work = $self->{work}; |
|
139
|
164
|
|
|
|
|
8648
|
pipe my $read_fh1, my $write_fh1; |
|
140
|
164
|
|
|
|
|
6795
|
pipe my $read_fh2, my $write_fh2; |
|
141
|
164
|
|
|
|
|
297680
|
my $pid = fork; |
|
142
|
164
|
50
|
|
|
|
12250
|
die "fork failed" unless defined $pid; |
|
143
|
164
|
100
|
|
|
|
7384
|
if ($pid == 0) { |
|
144
|
29
|
|
|
|
|
3884
|
srand; |
|
145
|
29
|
|
|
|
|
2605
|
close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes; |
|
|
56
|
|
|
|
|
3928
|
|
|
146
|
29
|
|
|
|
|
3804
|
my $there = Parallel::Pipes::There->new(read_fh => $read_fh2, write_fh => $write_fh1); |
|
147
|
29
|
|
|
|
|
1735
|
while (my $read = $there->read) { |
|
148
|
112
|
|
|
|
|
7742
|
$there->write( $work->($read->{data}) ); |
|
149
|
|
|
|
|
|
|
} |
|
150
|
29
|
|
|
|
|
15714
|
exit; |
|
151
|
|
|
|
|
|
|
} |
|
152
|
135
|
|
|
|
|
19337
|
close $_ for $write_fh1, $read_fh2; |
|
153
|
135
|
|
|
|
|
12888
|
$self->{pipes}{$pid} = Parallel::Pipes::Here->new( |
|
154
|
|
|
|
|
|
|
pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2, |
|
155
|
|
|
|
|
|
|
); |
|
156
|
|
|
|
|
|
|
} |
|
157
|
|
|
|
|
|
|
|
|
158
|
1080
|
|
|
1080
|
0
|
2694
|
sub pipes ($self) { |
|
|
1080
|
|
|
|
|
1796
|
|
|
|
1080
|
|
|
|
|
1796
|
|
|
159
|
1080
|
|
|
|
|
9892
|
map { $self->{pipes}{$_} } sort { $a <=> $b } keys $self->{pipes}->%*; |
|
|
2300
|
|
|
|
|
7665
|
|
|
|
2412
|
|
|
|
|
4752
|
|
|
160
|
|
|
|
|
|
|
} |
|
161
|
|
|
|
|
|
|
|
|
162
|
1015
|
|
|
1015
|
0
|
3988
|
sub is_ready ($self, @args) { |
|
|
1015
|
|
|
|
|
1912
|
|
|
|
1015
|
|
|
|
|
1961
|
|
|
|
1015
|
|
|
|
|
1810
|
|
|
163
|
1015
|
100
|
|
|
|
3876
|
return $self->pipes if $self->no_fork; |
|
164
|
|
|
|
|
|
|
|
|
165
|
277
|
100
|
|
|
|
1165
|
my @pipes = @args ? @args : $self->pipes; |
|
166
|
277
|
100
|
|
|
|
705
|
if (my @ready = grep { $_->{_written} == 0 } @pipes) { |
|
|
1278
|
|
|
|
|
3260
|
|
|
167
|
54
|
|
|
|
|
318
|
return @ready; |
|
168
|
|
|
|
|
|
|
} |
|
169
|
|
|
|
|
|
|
|
|
170
|
223
|
|
|
|
|
357
|
my $select = IO::Select->new(map { $_->{read_fh} } @pipes); |
|
|
1014
|
|
|
|
|
2640
|
|
|
171
|
223
|
|
|
|
|
27728
|
my @ready; |
|
172
|
223
|
100
|
|
|
|
618
|
if (my $tick = $self->{option}{idle_tick}) { |
|
173
|
21
|
|
|
|
|
21
|
while (1) { |
|
174
|
35
|
100
|
|
|
|
231
|
if (my @r = $select->can_read($tick)) { |
|
175
|
21
|
|
|
|
|
1387083
|
@ready = @r; |
|
176
|
21
|
|
|
|
|
43
|
last; |
|
177
|
|
|
|
|
|
|
} |
|
178
|
14
|
|
|
|
|
5614348
|
$self->{option}{idle_work}->(); |
|
179
|
|
|
|
|
|
|
} |
|
180
|
|
|
|
|
|
|
} else { |
|
181
|
202
|
|
|
|
|
515
|
@ready = $select->can_read; |
|
182
|
|
|
|
|
|
|
} |
|
183
|
|
|
|
|
|
|
|
|
184
|
223
|
|
|
|
|
527854
|
my @return; |
|
185
|
223
|
|
|
|
|
766
|
for my $pipe (@pipes) { |
|
186
|
1014
|
100
|
|
|
|
1330
|
if (grep { $pipe->{read_fh} == $_ } @ready) { |
|
|
1313
|
|
|
|
|
4162
|
|
|
187
|
289
|
|
|
|
|
537
|
push @return, $pipe; |
|
188
|
|
|
|
|
|
|
} |
|
189
|
|
|
|
|
|
|
} |
|
190
|
223
|
|
|
|
|
2222
|
return @return; |
|
191
|
|
|
|
|
|
|
} |
|
192
|
|
|
|
|
|
|
|
|
193
|
64
|
|
|
64
|
0
|
434
|
sub is_written ($self) { |
|
|
64
|
|
|
|
|
106
|
|
|
|
64
|
|
|
|
|
92
|
|
|
194
|
64
|
|
|
|
|
157
|
grep { $_->is_written } $self->pipes; |
|
|
270
|
|
|
|
|
417
|
|
|
195
|
|
|
|
|
|
|
} |
|
196
|
|
|
|
|
|
|
|
|
197
|
39
|
|
|
39
|
0
|
44980
|
sub close ($self) { |
|
|
39
|
|
|
|
|
274
|
|
|
|
39
|
|
|
|
|
69
|
|
|
198
|
39
|
100
|
|
|
|
121
|
return if $self->no_fork; |
|
199
|
|
|
|
|
|
|
|
|
200
|
15
|
|
|
|
|
46
|
close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes; |
|
|
75
|
|
|
|
|
1463
|
|
|
201
|
15
|
|
|
|
|
101
|
while ($self->{pipes}->%*) { |
|
202
|
75
|
|
|
|
|
7657607
|
my $pid = wait; |
|
203
|
75
|
50
|
|
|
|
3168
|
if (delete $self->{pipes}{$pid}) { |
|
204
|
|
|
|
|
|
|
# OK |
|
205
|
|
|
|
|
|
|
} else { |
|
206
|
0
|
|
|
|
|
|
warn "wait() unexpectedly returns $pid\n"; |
|
207
|
|
|
|
|
|
|
} |
|
208
|
|
|
|
|
|
|
} |
|
209
|
|
|
|
|
|
|
} |
|
210
|
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
1; |
|
212
|
|
|
|
|
|
|
__END__ |