line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# $Id: Iterator.pm 2746 2007-10-19 16:36:50Z andy $ |
2
|
|
|
|
|
|
|
package Parallel::Iterator; |
3
|
|
|
|
|
|
|
|
4
|
9
|
|
|
9
|
|
308797
|
use warnings; |
|
9
|
|
|
|
|
24
|
|
|
9
|
|
|
|
|
307
|
|
5
|
9
|
|
|
9
|
|
51
|
use strict; |
|
9
|
|
|
|
|
20
|
|
|
9
|
|
|
|
|
415
|
|
6
|
9
|
|
|
9
|
|
51
|
use Carp; |
|
9
|
|
|
|
|
21
|
|
|
9
|
|
|
|
|
800
|
|
7
|
9
|
|
|
9
|
|
12174
|
use Storable qw( store_fd fd_retrieve dclone ); |
|
9
|
|
|
|
|
42315
|
|
|
9
|
|
|
|
|
830
|
|
8
|
9
|
|
|
9
|
|
11574
|
use IO::Handle; |
|
9
|
|
|
|
|
69931
|
|
|
9
|
|
|
|
|
467
|
|
9
|
9
|
|
|
9
|
|
10493
|
use IO::Select; |
|
9
|
|
|
|
|
17291
|
|
|
9
|
|
|
|
|
436
|
|
10
|
9
|
|
|
9
|
|
65
|
use Config; |
|
9
|
|
|
|
|
16
|
|
|
9
|
|
|
|
|
575
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
require 5.008; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $VERSION = '1.00'; |
15
|
9
|
|
|
9
|
|
60
|
use base qw( Exporter ); |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
1328
|
|
16
|
|
|
|
|
|
|
our @EXPORT_OK = qw( iterate iterate_as_array iterate_as_hash ); |
17
|
|
|
|
|
|
|
|
18
|
9
|
|
|
9
|
|
55
|
use constant IS_WIN32 => ( $^O =~ /^(MS)?Win32$/ ); |
|
9
|
|
|
|
|
17
|
|
|
9
|
|
|
|
|
7665
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
my %DEFAULTS = ( |
21
|
|
|
|
|
|
|
workers => ( ( $Config{d_fork} && !IS_WIN32 ) ? 10 : 0 ), |
22
|
|
|
|
|
|
|
onerror => 'die', |
23
|
|
|
|
|
|
|
nowarn => 0, |
24
|
|
|
|
|
|
|
batch => 1, |
25
|
|
|
|
|
|
|
adaptive => 0, |
26
|
|
|
|
|
|
|
); |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
=head1 NAME |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
Parallel::Iterator - Simple parallel execution |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
=head1 VERSION |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
This document describes Parallel::Iterator version 1.00 |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
=head1 SYNOPSIS |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
use Parallel::Iterator qw( iterate ); |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# A very expensive way to double 100 numbers... |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
my @nums = ( 1 .. 100 ); |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
my $iter = iterate( sub { |
45
|
|
|
|
|
|
|
my ( $id, $job ) = @_; |
46
|
|
|
|
|
|
|
return $job * 2; |
47
|
|
|
|
|
|
|
}, \@nums ); |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
my @out = (); |
50
|
|
|
|
|
|
|
while ( my ( $index, $value ) = $iter->() ) { |
51
|
|
|
|
|
|
|
$out[$index] = $value; |
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=head1 DESCRIPTION |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
The C |
57
|
|
|
|
|
|
|
each element in a list, returning a new list containing the |
58
|
|
|
|
|
|
|
transformed elements. |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
This module provides a 'parallel map'. Multiple worker processes are |
61
|
|
|
|
|
|
|
forked so that many instances of the transformation function may be |
62
|
|
|
|
|
|
|
executed simultaneously. |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
For time consuming operations, particularly operations that spend most |
65
|
|
|
|
|
|
|
of their time waiting for I/O, this is a big performance win. It also |
66
|
|
|
|
|
|
|
provides a simple idiom to make effective use of multi CPU systems. |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
There is, however, a considerable overhead associated with forking, so |
69
|
|
|
|
|
|
|
the example in the synopsis (doubling a list of numbers) is I a |
70
|
|
|
|
|
|
|
sensible use of this module. |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
=head2 Example |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
Imagine you have an array of URLs to fetch: |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
my @urls = qw( |
77
|
|
|
|
|
|
|
http://google.com/ |
78
|
|
|
|
|
|
|
http://hexten.net/ |
79
|
|
|
|
|
|
|
http://search.cpan.org/ |
80
|
|
|
|
|
|
|
... and lots more ... |
81
|
|
|
|
|
|
|
); |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
Write a function that retrieves a URL and returns its contents or undef |
84
|
|
|
|
|
|
|
if it can't be fetched: |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
sub fetch { |
87
|
|
|
|
|
|
|
my $url = shift; |
88
|
|
|
|
|
|
|
my $resp = $ua->get($url); |
89
|
|
|
|
|
|
|
return unless $resp->is_success; |
90
|
|
|
|
|
|
|
return $resp->content; |
91
|
|
|
|
|
|
|
}; |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
Now write a function to synthesize a special kind of iterator: |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub list_iter { |
96
|
|
|
|
|
|
|
my @ar = @_; |
97
|
|
|
|
|
|
|
my $pos = 0; |
98
|
|
|
|
|
|
|
return sub { |
99
|
|
|
|
|
|
|
return if $pos >= @ar; |
100
|
|
|
|
|
|
|
my @r = ( $pos, $ar[$pos] ); # Note: returns ( index, value ) |
101
|
|
|
|
|
|
|
$pos++; |
102
|
|
|
|
|
|
|
return @r; |
103
|
|
|
|
|
|
|
}; |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
The returned iterator will return each element of the array in turn and |
107
|
|
|
|
|
|
|
then undef. Actually it returns both the index I the value of each |
108
|
|
|
|
|
|
|
element in the array. Because multiple instances of the transformation |
109
|
|
|
|
|
|
|
function execute in parallel the results won't necessarily come back in |
110
|
|
|
|
|
|
|
order. The array index will later allow us to put completed items in the |
111
|
|
|
|
|
|
|
correct place in an output array. |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
Get an iterator for the list of URLs: |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
my $url_iter = list_iter( @urls ); |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
Then wrap it in another iterator which will return the transformed results: |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
my $page_iter = iterate( \&fetch, $url_iter ); |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
Finally loop over the returned iterator storing results: |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
my @out = ( ); |
124
|
|
|
|
|
|
|
while ( my ( $index, $value ) = $page_iter->() ) { |
125
|
|
|
|
|
|
|
$out[$index] = $value; |
126
|
|
|
|
|
|
|
} |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
Behind the scenes your program forked into ten (by default) instances of |
129
|
|
|
|
|
|
|
itself and executed the page requests in parallel. |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
=head2 Simpler interfaces |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
Having to construct an iterator is a pain so C is smart enough |
134
|
|
|
|
|
|
|
to do that for you. Instead of passing an iterator just pass a reference |
135
|
|
|
|
|
|
|
to the array: |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
my $page_iter = iterate( \&fetch, \@urls ); |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
If you pass a hash reference the iterator you get back will return key, |
140
|
|
|
|
|
|
|
value pairs: |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
my $some_iter = iterate( \&fetch, \%some_hash ); |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
If the returned iterator is inconvenient you can get back a hash or |
145
|
|
|
|
|
|
|
array instead: |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
my @done = iterate_as_array( \&fetch, @urls ); |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
my %done = iterate_as_hash( \&worker, %jobs ); |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=head2 How It Works |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
The current process is forked once for each worker. Each forked child is |
154
|
|
|
|
|
|
|
connected to the parent by a pair of pipes. The child's STDIN, STDOUT |
155
|
|
|
|
|
|
|
and STDERR are unaffected. |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
Input values are serialised (using Storable) and passed to the workers. |
158
|
|
|
|
|
|
|
Completed work items are serialised and returned. |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
=head2 Caveats |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
Parallel::Iterator is designed to be simple to use - but the underlying |
163
|
|
|
|
|
|
|
forking of the main process can cause mystifying problems unless you |
164
|
|
|
|
|
|
|
have an understanding of what is going on behind the scenes. |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
=head3 Worker execution enviroment |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
All code apart from the worker subroutine executes in the parent process |
169
|
|
|
|
|
|
|
as normal. The worker executes in a forked instance of the parent |
170
|
|
|
|
|
|
|
process. That means that things like this won't work as expected: |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
my %tally = (); |
173
|
|
|
|
|
|
|
my @r = iterate_as_array( sub { |
174
|
|
|
|
|
|
|
my ($id, $name) = @_; |
175
|
|
|
|
|
|
|
$tally{$name}++; # might not do what you think it does |
176
|
|
|
|
|
|
|
return reverse $name; |
177
|
|
|
|
|
|
|
}, @names ); |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
# Now print out the tally... |
180
|
|
|
|
|
|
|
while ( my ( $name, $count ) = each %tally ) { |
181
|
|
|
|
|
|
|
printf("%5d : %s\n", $count, $name); |
182
|
|
|
|
|
|
|
} |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
Because the worker is a closure it can see the C<%tally> hash from its |
185
|
|
|
|
|
|
|
enclosing scope; but because it's running in a forked clone of the parent |
186
|
|
|
|
|
|
|
process it modifies its own copy of C<%tally> rather than the copy for |
187
|
|
|
|
|
|
|
the parent process. |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
That means that after the job terminates the C<%tally> in the parent |
190
|
|
|
|
|
|
|
process will be empty. |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
In general you should avoid side effects in your worker subroutines. |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=head3 Serialization |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
Values are serialised using L to pass to the worker subroutine |
197
|
|
|
|
|
|
|
and results from the worker are again serialised before being passed |
198
|
|
|
|
|
|
|
back. Be careful what your values refer to: everything has to be |
199
|
|
|
|
|
|
|
serialised. If there's an indirect way to reach a large object graph |
200
|
|
|
|
|
|
|
Storable will find it and performance will suffer. |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
To find out how large your serialised values are serialise one of them |
203
|
|
|
|
|
|
|
and check its size: |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
use Storable qw( freeze ); |
206
|
|
|
|
|
|
|
my $serialized = freeze $some_obj; |
207
|
|
|
|
|
|
|
print length($serialized), " bytes\n"; |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
In your tests you may wish to guard against the possibility of a change |
210
|
|
|
|
|
|
|
to the structure of your values resulting in a sudden increase in |
211
|
|
|
|
|
|
|
serialized size: |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
ok length(freeze $some_obj) < 1000, "Object too bulky?"; |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
See the documetation for L for other caveats. |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
=head3 Performance |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
Process forking is expensive. Only use Parallel::Iterator in cases where: |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
=over |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
=item the worker waits for I/O |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
The case of fetching web pages is a good example of this. Fetching a |
226
|
|
|
|
|
|
|
page with LWP::UserAgent may take as long as a few seconds but probably |
227
|
|
|
|
|
|
|
consumes only a few milliseconds of processor time. Running many |
228
|
|
|
|
|
|
|
requests in parallel is a huge win - but be kind to the server you're |
229
|
|
|
|
|
|
|
talking to: don't launch a lot of parallel requests unless it's your |
230
|
|
|
|
|
|
|
server or you know it can handle the load. |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
=item the worker is CPU intensive and you have multiple cores / CPUs |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
If the worker is doing an expensive calculation you can parallelise that |
235
|
|
|
|
|
|
|
across multiple CPU cores. Benchmark first though. There's a |
236
|
|
|
|
|
|
|
considerable overhead associated with Parallel::Iterator; unless your |
237
|
|
|
|
|
|
|
calculations are time consuming that overhead will dwarf whatever time |
238
|
|
|
|
|
|
|
they take. |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=back |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
=head1 INTERFACE |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=head2 C<< iterate( [ $options ], $worker, $iterator ) >> |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
Get an iterator that applies the supplied transformation function to |
247
|
|
|
|
|
|
|
each value returned by the input iterator. |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
Instead of an iterator you may pass an array or hash reference and |
250
|
|
|
|
|
|
|
C will convert it internally into a suitable iterator. |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
If you are doing this you may wish to investigate C and |
253
|
|
|
|
|
|
|
C. |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
=head3 Options |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
A reference to a hash of options may be supplied as the first argument. |
258
|
|
|
|
|
|
|
The following options are supported: |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
=over |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
=item C |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
The number of concurrent processes to launch. Set this to 0 to disable |
265
|
|
|
|
|
|
|
forking. Defaults to 10 on systems that support fork and 0 (disable |
266
|
|
|
|
|
|
|
forking) on those that do not. |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
=item C |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
Normally C will issue a warning and fall back to single process |
271
|
|
|
|
|
|
|
mode on systems on which fork is not available. This option supresses |
272
|
|
|
|
|
|
|
that warning. |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
=item C |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
Ordinarily items are passed to the worker one at a time. If you are |
277
|
|
|
|
|
|
|
processing a large number of items it may be more efficient to process |
278
|
|
|
|
|
|
|
them in batches. Specify the batch size using this option. |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
Batching is transparent from the caller's perspective. Internally it |
281
|
|
|
|
|
|
|
modifies the iterators and worker (by wrapping them in additional |
282
|
|
|
|
|
|
|
closures) so that they pack, process and unpack chunks of work. |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
=item C |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
Extending the idea of batching a number of work items to amortize the |
287
|
|
|
|
|
|
|
overhead of passing work to and from parallel workers you may also ask |
288
|
|
|
|
|
|
|
C to heuristically determine the batch size by setting the |
289
|
|
|
|
|
|
|
C option to a numeric value. |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
The batch size will be computed as |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
/ / |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
A larger value for C will reduce the rate at which the batch |
296
|
|
|
|
|
|
|
size increases. Good values tend to be in the range 1 to 2. |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
You can also specify lower and, optionally, upper bounds on the batch |
299
|
|
|
|
|
|
|
size by passing an reference to an array containing ( lower bound, |
300
|
|
|
|
|
|
|
growth ratio, upper bound ). The upper bound may be omitted. |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
my $iter = iterate( |
303
|
|
|
|
|
|
|
{ adaptive => [ 5, 2, 100 ] }, |
304
|
|
|
|
|
|
|
$worker, \@stuff ); |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
=item C |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
The action to take when an error is thrown in the iterator. Possible |
309
|
|
|
|
|
|
|
values are 'die', 'warn' or a reference to a subroutine that will be |
310
|
|
|
|
|
|
|
called with the index of the job that threw the exception and the value |
311
|
|
|
|
|
|
|
of C<$@> thrown. |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
iterate( { |
314
|
|
|
|
|
|
|
onerror => sub { |
315
|
|
|
|
|
|
|
my ($id, $err) = @_; |
316
|
|
|
|
|
|
|
$self->log( "Error for index $id: $err" ); |
317
|
|
|
|
|
|
|
}, |
318
|
|
|
|
|
|
|
$worker, |
319
|
|
|
|
|
|
|
\@jobs |
320
|
|
|
|
|
|
|
); |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
The default is 'die'. |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
=back |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
=cut |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
sub _massage_iterator { |
329
|
27
|
|
|
27
|
|
50
|
my $iter = shift; |
330
|
27
|
100
|
|
|
|
146
|
if ( 'ARRAY' eq ref $iter ) { |
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
331
|
21
|
|
|
|
|
3355
|
my @ar = @$iter; |
332
|
21
|
|
|
|
|
41
|
my $pos = 0; |
333
|
|
|
|
|
|
|
return sub { |
334
|
50202
|
100
|
|
50202
|
|
101967
|
return if $pos >= @ar; |
335
|
50126
|
|
|
|
|
103271
|
my @r = ( $pos, $ar[$pos] ); |
336
|
50126
|
|
|
|
|
55911
|
$pos++; |
337
|
50126
|
|
|
|
|
217466
|
return @r; |
338
|
21
|
|
|
|
|
168
|
}; |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
elsif ( 'HASH' eq ref $iter ) { |
341
|
2
|
|
|
|
|
12
|
my %h = %$iter; |
342
|
2
|
|
|
|
|
12
|
my @k = keys %h; |
343
|
|
|
|
|
|
|
return sub { |
344
|
12
|
100
|
|
12
|
|
41
|
return unless @k; |
345
|
10
|
|
|
|
|
211
|
my $k = shift @k; |
346
|
10
|
|
|
|
|
183
|
return ( $k, $h{$k} ); |
347
|
2
|
|
|
|
|
14
|
}; |
348
|
|
|
|
|
|
|
} |
349
|
|
|
|
|
|
|
elsif ( 'CODE' eq ref $iter ) { |
350
|
4
|
|
|
|
|
12
|
return $iter; |
351
|
|
|
|
|
|
|
} |
352
|
|
|
|
|
|
|
else { |
353
|
0
|
|
|
|
|
0
|
croak "Iterator must be a code, array or hash ref"; |
354
|
|
|
|
|
|
|
} |
355
|
|
|
|
|
|
|
} |
356
|
|
|
|
|
|
|
|
357
|
|
|
|
|
|
|
sub _nonfork { |
358
|
9
|
|
|
9
|
|
17
|
my ( $options, $worker, $iter ) = @_; |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
return sub { |
361
|
551
|
|
|
551
|
|
1050
|
while ( 1 ) { |
362
|
551
|
100
|
|
|
|
1048
|
if ( my @next = $iter->() ) { |
363
|
542
|
|
|
|
|
1459
|
my ( $id, $work ) = @next; |
364
|
|
|
|
|
|
|
# dclone so that we have the same semantics as the |
365
|
|
|
|
|
|
|
# forked version. |
366
|
542
|
100
|
66
|
|
|
48704
|
$work = dclone $work if defined $work && ref $work; |
367
|
542
|
|
|
|
|
1206
|
my $result = eval { $worker->( $id, $work ) }; |
|
542
|
|
|
|
|
1218
|
|
368
|
542
|
50
|
|
|
|
8443
|
if ( my $err = $@ ) { |
369
|
0
|
|
|
|
|
0
|
$options->{onerror}->( $id, $err ); |
370
|
|
|
|
|
|
|
} |
371
|
|
|
|
|
|
|
else { |
372
|
542
|
|
|
|
|
2583
|
return ( $id, $result ); |
373
|
|
|
|
|
|
|
} |
374
|
|
|
|
|
|
|
} |
375
|
|
|
|
|
|
|
else { |
376
|
9
|
|
|
|
|
37
|
return; |
377
|
|
|
|
|
|
|
} |
378
|
|
|
|
|
|
|
} |
379
|
9
|
|
|
|
|
70
|
}; |
380
|
|
|
|
|
|
|
} |
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
# Does this sub look a bit long to you? :) |
383
|
|
|
|
|
|
|
sub _fork { |
384
|
18
|
|
|
18
|
|
77
|
my ( $options, $worker, $iter ) = @_; |
385
|
|
|
|
|
|
|
|
386
|
18
|
|
|
|
|
41
|
my @workers = (); |
387
|
18
|
|
|
|
|
35
|
my @result_queue = (); |
388
|
18
|
|
|
|
|
255
|
my $select = IO::Select->new; |
389
|
18
|
|
|
|
|
282
|
my $rotate = 0; |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
return sub { |
392
|
|
|
|
|
|
|
LOOP: { |
393
|
|
|
|
|
|
|
# Make new workers |
394
|
946
|
|
100
|
946
|
|
2270
|
while ( @workers < $options->{workers} && ( my @next = $iter->() ) ) |
|
1289
|
|
|
|
|
4700
|
|
395
|
|
|
|
|
|
|
{ |
396
|
|
|
|
|
|
|
|
397
|
76
|
|
|
|
|
8375
|
my ( $my_rdr, $my_wtr, $child_rdr, $child_wtr ) |
398
|
|
|
|
|
|
|
= map IO::Handle->new, 1 .. 4; |
399
|
|
|
|
|
|
|
|
400
|
76
|
50
|
|
|
|
17882
|
pipe $child_rdr, $my_wtr |
401
|
|
|
|
|
|
|
or croak "Can't open write pipe ($!)\n"; |
402
|
|
|
|
|
|
|
|
403
|
76
|
50
|
|
|
|
2026
|
pipe $my_rdr, $child_wtr |
404
|
|
|
|
|
|
|
or croak "Can't open read pipe ($!)\n"; |
405
|
|
|
|
|
|
|
|
406
|
76
|
50
|
|
|
|
119507
|
if ( my $pid = fork ) { |
407
|
|
|
|
|
|
|
# Parent |
408
|
76
|
|
|
|
|
10836
|
close $_ for $child_rdr, $child_wtr; |
409
|
76
|
|
|
|
|
1183
|
push @workers, $pid; |
410
|
76
|
|
|
|
|
5405
|
$select->add( [ $my_rdr, $my_wtr, 0 ] ); |
411
|
76
|
|
|
|
|
17387
|
_put_obj( \@next, $my_wtr ); |
412
|
|
|
|
|
|
|
} |
413
|
|
|
|
|
|
|
else { |
414
|
|
|
|
|
|
|
# Child |
415
|
0
|
|
|
|
|
0
|
close $_ for $my_rdr, $my_wtr; |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
# Don't execute any END blocks |
418
|
9
|
|
|
9
|
|
7709
|
use POSIX '_exit'; |
|
9
|
|
|
|
|
61221
|
|
|
9
|
|
|
|
|
73
|
|
419
|
0
|
|
|
|
|
0
|
eval q{END { _exit 0 }}; |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
# Worker loop |
422
|
0
|
|
|
|
|
0
|
while ( defined( my $job = _get_obj( $child_rdr ) ) ) { |
423
|
0
|
|
|
|
|
0
|
my $result = eval { $worker->( @$job ) }; |
|
0
|
|
|
|
|
0
|
|
424
|
0
|
|
|
|
|
0
|
my $err = $@; |
425
|
0
|
0
|
|
|
|
0
|
_put_obj( |
426
|
|
|
|
|
|
|
[ |
427
|
|
|
|
|
|
|
$err |
428
|
|
|
|
|
|
|
? ( 'E', $job->[0], $err ) |
429
|
|
|
|
|
|
|
: ( 'R', $job->[0], $result ) |
430
|
|
|
|
|
|
|
], |
431
|
|
|
|
|
|
|
$child_wtr |
432
|
|
|
|
|
|
|
); |
433
|
|
|
|
|
|
|
} |
434
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
# End of stream |
436
|
0
|
|
|
|
|
0
|
_put_obj( undef, $child_wtr ); |
437
|
0
|
|
|
|
|
0
|
close $_ for $child_rdr, $child_wtr; |
438
|
|
|
|
|
|
|
# We use CORE::exit for MP compatibility |
439
|
0
|
|
|
|
|
0
|
CORE::exit; |
440
|
|
|
|
|
|
|
} |
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
|
443
|
1289
|
100
|
|
|
|
9060
|
return @{ shift @result_queue } if @result_queue; |
|
928
|
|
|
|
|
21745
|
|
444
|
361
|
100
|
|
|
|
1472
|
if ( $select->count ) { |
445
|
344
|
|
|
|
|
1966
|
eval { |
446
|
344
|
|
|
|
|
1055
|
my @rdr = $select->can_read; |
447
|
|
|
|
|
|
|
# Anybody got completed work? |
448
|
344
|
|
|
|
|
100702
|
for my $r ( @rdr ) { |
449
|
1014
|
|
|
|
|
2610
|
my ( $rh, $wh, $eof ) = @$r; |
450
|
1014
|
100
|
|
|
|
2270
|
if ( defined( my $results = _get_obj( $rh ) ) ) { |
451
|
939
|
|
|
|
|
2090
|
my $type = shift @$results; |
452
|
939
|
100
|
|
|
|
3137
|
if ( $type eq 'R' ) { |
|
|
50
|
|
|
|
|
|
453
|
928
|
|
|
|
|
1581
|
push @result_queue, $results; |
454
|
|
|
|
|
|
|
} |
455
|
|
|
|
|
|
|
elsif ( $type eq 'E' ) { |
456
|
11
|
|
|
|
|
544
|
$options->{onerror}->( @$results ); |
457
|
|
|
|
|
|
|
} |
458
|
|
|
|
|
|
|
else { |
459
|
0
|
|
|
|
|
0
|
die "Bad result type: $type"; |
460
|
|
|
|
|
|
|
} |
461
|
|
|
|
|
|
|
|
462
|
|
|
|
|
|
|
# We operate a strict one in, one out policy |
463
|
|
|
|
|
|
|
# - which avoids deadlocks. Having received |
464
|
|
|
|
|
|
|
# the previous result send a new work value. |
465
|
938
|
50
|
|
|
|
2267
|
unless ( $eof ) { |
466
|
938
|
100
|
|
|
|
2174
|
if ( my @next = $iter->() ) { |
467
|
863
|
|
|
|
|
5104
|
_put_obj( \@next, $wh ); |
468
|
|
|
|
|
|
|
} |
469
|
|
|
|
|
|
|
else { |
470
|
75
|
|
|
|
|
241
|
_put_obj( undef, $wh ); |
471
|
75
|
|
|
|
|
879
|
close $wh; |
472
|
75
|
|
|
|
|
141
|
@{$r}[ 1, 2 ] = ( undef, 1 ); |
|
75
|
|
|
|
|
4361
|
|
473
|
|
|
|
|
|
|
} |
474
|
|
|
|
|
|
|
} |
475
|
|
|
|
|
|
|
} |
476
|
|
|
|
|
|
|
else { |
477
|
75
|
|
|
|
|
281
|
$select->remove( $r ); |
478
|
75
|
|
|
|
|
3560
|
close $rh; |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
} |
481
|
|
|
|
|
|
|
}; |
482
|
|
|
|
|
|
|
|
483
|
344
|
100
|
|
|
|
1291
|
if ( my $err = $@ ) { |
484
|
|
|
|
|
|
|
# Finish all the workers |
485
|
1
|
|
|
|
|
9
|
_put_obj( undef, $_->[1] ) for $select->handles; |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
# And wait for them to exit |
488
|
1
|
|
|
|
|
1851
|
waitpid( $_, 0 ) for @workers; |
489
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
# Rethrow |
491
|
1
|
|
|
|
|
13
|
die $err; |
492
|
|
|
|
|
|
|
} |
493
|
|
|
|
|
|
|
|
494
|
343
|
|
|
|
|
820
|
redo LOOP; |
495
|
|
|
|
|
|
|
} |
496
|
17
|
|
|
|
|
24508
|
waitpid( $_, 0 ) for @workers; |
497
|
17
|
|
|
|
|
111
|
return; |
498
|
|
|
|
|
|
|
} |
499
|
18
|
|
|
|
|
273
|
}; |
500
|
|
|
|
|
|
|
} |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
sub _batch_input_iter { |
503
|
10
|
|
|
10
|
|
22
|
my ( $code, $options ) = @_; |
504
|
|
|
|
|
|
|
|
505
|
10
|
100
|
|
|
|
47
|
if ( my $adapt = $options->{adaptive} ) { |
506
|
6
|
|
100
|
|
|
35
|
my $workers = $options->{workers} || 1; |
507
|
6
|
|
|
|
|
15
|
my $count = 0; |
508
|
|
|
|
|
|
|
|
509
|
6
|
100
|
|
|
|
35
|
$adapt = [ 1, $adapt, undef ] |
510
|
|
|
|
|
|
|
unless 'ARRAY' eq ref $adapt; |
511
|
|
|
|
|
|
|
|
512
|
6
|
|
|
|
|
20
|
my ( $min, $ratio, $max ) = @$adapt; |
513
|
6
|
100
|
66
|
|
|
67
|
$min = 1 unless defined $min && $min > 1; |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
return sub { |
516
|
774
|
|
|
774
|
|
1581
|
my @chunk = (); |
517
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
# Adapt batch size |
519
|
774
|
|
|
|
|
1840
|
my $batch = $count / $workers / $ratio; |
520
|
774
|
100
|
|
|
|
3337
|
$batch = $min if $batch < $min; |
521
|
774
|
100
|
100
|
|
|
3468
|
$batch = $max if defined $max && $batch > $max; |
522
|
|
|
|
|
|
|
|
523
|
774
|
|
100
|
|
|
3622
|
while ( @chunk < $batch && ( my @next = $code->() ) ) { |
524
|
30000
|
|
|
|
|
50190
|
push @chunk, \@next; |
525
|
30000
|
|
|
|
|
110599
|
$count++; |
526
|
|
|
|
|
|
|
} |
527
|
|
|
|
|
|
|
|
528
|
774
|
100
|
|
|
|
4634
|
return @chunk ? ( 0, \@chunk ) : (); |
529
|
6
|
|
|
|
|
116
|
}; |
530
|
|
|
|
|
|
|
} |
531
|
|
|
|
|
|
|
else { |
532
|
4
|
|
|
|
|
8
|
my $batch = $options->{batch}; |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
return sub { |
535
|
226
|
|
|
226
|
|
479
|
my @chunk = (); |
536
|
226
|
|
100
|
|
|
1194
|
while ( @chunk < $batch && ( my @next = $code->() ) ) { |
537
|
20000
|
|
|
|
|
93333
|
push @chunk, \@next; |
538
|
|
|
|
|
|
|
} |
539
|
226
|
100
|
|
|
|
1111
|
return @chunk ? ( 0, \@chunk ) : (); |
540
|
4
|
|
|
|
|
42
|
}; |
541
|
|
|
|
|
|
|
} |
542
|
|
|
|
|
|
|
} |
543
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
sub _batch_output_iter { |
545
|
10
|
|
|
10
|
|
82
|
my $code = shift; |
546
|
10
|
|
|
|
|
22
|
my @queue = (); |
547
|
|
|
|
|
|
|
return sub { |
548
|
50010
|
100
|
|
50010
|
|
100263
|
unless ( @queue ) { |
549
|
955
|
100
|
|
|
|
1681
|
if ( my ( undef, $chunk ) = $code->() ) { |
550
|
945
|
|
|
|
|
8915
|
@queue = @$chunk; |
551
|
|
|
|
|
|
|
} |
552
|
|
|
|
|
|
|
else { |
553
|
10
|
|
|
|
|
57
|
return; |
554
|
|
|
|
|
|
|
} |
555
|
|
|
|
|
|
|
} |
556
|
50000
|
|
|
|
|
70188
|
return @{ shift @queue }; |
|
50000
|
|
|
|
|
135442
|
|
557
|
10
|
|
|
|
|
63
|
}; |
558
|
0
|
|
|
|
|
0
|
return $code; |
559
|
|
|
|
|
|
|
} |
560
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
sub _batch_worker { |
562
|
10
|
|
|
10
|
|
62
|
my $code = shift; |
563
|
|
|
|
|
|
|
return sub { |
564
|
388
|
|
|
388
|
|
808
|
my ( undef, $chunk ) = @_; |
565
|
388
|
|
|
|
|
739
|
for my $item ( @$chunk ) { |
566
|
25000
|
|
|
|
|
97301
|
$item->[1] = $code->( @$item ); |
567
|
|
|
|
|
|
|
} |
568
|
388
|
|
|
|
|
2084
|
return $chunk; |
569
|
10
|
|
|
|
|
91
|
}; |
570
|
|
|
|
|
|
|
} |
571
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
sub iterate { |
573
|
27
|
50
|
|
27
|
1
|
18683
|
my %options = ( %DEFAULTS, %{ 'HASH' eq ref $_[0] ? shift : {} } ); |
|
27
|
|
|
|
|
354
|
|
574
|
|
|
|
|
|
|
|
575
|
27
|
50
|
|
|
|
135
|
croak "iterate takes 2 or 3 args" unless @_ == 2; |
576
|
|
|
|
|
|
|
|
577
|
27
|
|
|
|
|
119
|
my @bad_opt = grep { !exists $DEFAULTS{$_} } keys %options; |
|
135
|
|
|
|
|
280
|
|
578
|
27
|
50
|
|
|
|
120
|
croak "Unknown option(s): ", join( ', ', sort @bad_opt ), "\n" |
579
|
|
|
|
|
|
|
if @bad_opt; |
580
|
|
|
|
|
|
|
|
581
|
27
|
|
|
|
|
63
|
my $worker = shift; |
582
|
27
|
50
|
|
|
|
123
|
croak "Worker must be a coderef" |
583
|
|
|
|
|
|
|
unless 'CODE' eq ref $worker; |
584
|
|
|
|
|
|
|
|
585
|
27
|
|
|
|
|
105
|
my $iter = _massage_iterator( shift ); |
586
|
|
|
|
|
|
|
|
587
|
27
|
100
|
|
|
|
402
|
if ( $options{onerror} =~ /^(die|warn)$/ ) { |
588
|
26
|
|
|
|
|
3974
|
$options{onerror} = eval "sub { shift; $1 shift }"; |
589
|
|
|
|
|
|
|
} |
590
|
|
|
|
|
|
|
|
591
|
27
|
50
|
|
|
|
141
|
croak "onerror option must be 'die', 'warn' or a code reference" |
592
|
|
|
|
|
|
|
unless 'CODE' eq ref $options{onerror}; |
593
|
|
|
|
|
|
|
|
594
|
27
|
100
|
100
|
|
|
251
|
if ( $options{workers} > 0 && $DEFAULTS{workers} == 0 ) { |
595
|
3
|
50
|
|
|
|
10
|
warn "Fork not available; falling back to single process mode\n" |
596
|
|
|
|
|
|
|
unless $options{nowarn}; |
597
|
3
|
|
|
|
|
6
|
$options{workers} = 0; |
598
|
|
|
|
|
|
|
} |
599
|
|
|
|
|
|
|
|
600
|
27
|
100
|
|
|
|
112
|
my $factory = $options{workers} == 0 ? \&_nonfork : \&_fork; |
601
|
|
|
|
|
|
|
|
602
|
27
|
100
|
100
|
|
|
283
|
if ( $options{batch} > 1 || $options{adaptive} ) { |
603
|
10
|
|
|
|
|
48
|
return _batch_output_iter( |
604
|
|
|
|
|
|
|
$factory->( |
605
|
|
|
|
|
|
|
\%options, |
606
|
|
|
|
|
|
|
_batch_worker( $worker ), |
607
|
|
|
|
|
|
|
_batch_input_iter( $iter, \%options ) |
608
|
|
|
|
|
|
|
) |
609
|
|
|
|
|
|
|
); |
610
|
|
|
|
|
|
|
} |
611
|
|
|
|
|
|
|
else { |
612
|
|
|
|
|
|
|
# OK. Ready. Let's do it. |
613
|
17
|
|
|
|
|
142
|
return $factory->( \%options, $worker, $iter ); |
614
|
|
|
|
|
|
|
} |
615
|
|
|
|
|
|
|
} |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
=head2 C<< iterate_as_array >> |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
As C but instead of returning an iterator returns an array |
620
|
|
|
|
|
|
|
containing the collected output from the iterator. In a scalar context |
621
|
|
|
|
|
|
|
returns a reference to the same array. |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
For this to work properly the input iterator must return (index, value) |
624
|
|
|
|
|
|
|
pairs. This allows the results to be placed in the correct slots in the |
625
|
|
|
|
|
|
|
output array. The simplest way to do this is to pass an array reference |
626
|
|
|
|
|
|
|
as the input iterator: |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
my @output = iterate_as_array( \&some_handler, \@input ); |
629
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
=cut |
631
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
sub iterate_as_array { |
633
|
17
|
|
|
17
|
1
|
629948
|
my $iter = iterate( @_ ); |
634
|
17
|
|
|
|
|
42
|
my @out = (); |
635
|
17
|
|
|
|
|
55
|
while ( my ( $index, $value ) = $iter->() ) { |
636
|
50110
|
|
|
|
|
148570
|
$out[$index] = $value; |
637
|
|
|
|
|
|
|
} |
638
|
17
|
50
|
|
|
|
14896
|
return wantarray ? @out : \@out; |
639
|
|
|
|
|
|
|
} |
640
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
=head2 C<< iterate_as_hash >> |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
As C but instead of returning an iterator returns a hash |
644
|
|
|
|
|
|
|
containing the collected output from the iterator. In a scalar context |
645
|
|
|
|
|
|
|
returns a reference to the same hash. |
646
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
For this to work properly the input iterator must return (key, value) |
648
|
|
|
|
|
|
|
pairs. This allows the results to be placed in the correct slots in the |
649
|
|
|
|
|
|
|
output hash. The simplest way to do this is to pass a hash reference as |
650
|
|
|
|
|
|
|
the input iterator: |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
my %output = iterate_as_hash( \&some_handler, \%input ); |
653
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
=cut |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
sub iterate_as_hash { |
657
|
1
|
|
|
1
|
1
|
1703
|
my $iter = iterate( @_ ); |
658
|
1
|
|
|
|
|
4
|
my %out = (); |
659
|
1
|
|
|
|
|
9
|
while ( my ( $key, $value ) = $iter->() ) { |
660
|
5
|
|
|
|
|
19
|
$out{$key} = $value; |
661
|
|
|
|
|
|
|
} |
662
|
1
|
50
|
|
|
|
433
|
return wantarray ? %out : \%out; |
663
|
|
|
|
|
|
|
} |
664
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
sub _get_obj { |
666
|
1014
|
|
|
1014
|
|
1722
|
my $fd = shift; |
667
|
1014
|
|
|
|
|
3705
|
my $r = fd_retrieve $fd; |
668
|
1014
|
|
|
|
|
145868
|
return $r->[0]; |
669
|
|
|
|
|
|
|
} |
670
|
|
|
|
|
|
|
|
671
|
|
|
|
|
|
|
sub _put_obj { |
672
|
1015
|
|
|
1015
|
|
2073
|
my ( $obj, $fd ) = @_; |
673
|
1015
|
|
|
|
|
5698
|
store_fd [$obj], $fd; |
674
|
1015
|
|
|
|
|
286047
|
$fd->flush; |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
1; |
678
|
|
|
|
|
|
|
__END__ |