line
stmt
bran
cond
sub
pod
time
code
1
package Parallel::DataPipe;
2
3
our $VERSION='0.11';
4
152
152
3095560
use 5.8.0;
152
608
152
11936
5
136
136
1784
use strict;
136
492
136
22068
6
133
133
1700
use warnings;
133
972
133
13477
7
129
129
148292
use IO::Select;
129
194765
129
21386
8
125
125
1396
use List::Util qw(first min);
125
283
125
25792
9
109
50
109
1518
use constant PIPE_MAX_CHUNK_SIZE => $^O =~ m{linux|cygwin}? 16*1024 : 1024;
109
303
109
14476
10
92
92
460
use constant _EOF_ => (-(1<<31));
92
184
92
315652
11
12
sub run {
13
350
350
1
4924126
my $param = {};
14
350
1515
my ($input,$map,$output) = @_;
15
350
100
4160
if (ref($input) eq 'HASH') {
16
258
696
$param = $input;
17
} else {
18
92
552
$param = {input=>$input, process=>$map, output=>$output };
19
}
20
350
2008
pipeline($param);
21
}
22
23
sub pipeline {
24
383
383
1
64468
my $class=shift;
25
383
100
2464
if (ref($class) eq 'HASH') {
26
350
1008
unshift @_, $class;
27
350
1164
$class = __PACKAGE__;
28
}
29
383
1054
my @pipes;
30
# init pipes
31
my $default_input;
32
383
884
for my $param (@_) {
33
400
100
1933
unless (exists $param->{input}) {
34
17
50
561
$param->{input} = $default_input or die "You have to specify input for the first pipe";
35
}
36
400
5517
my $pipe = $class->new($param);
37
309
100
3116
if (ref($pipe->{output}) eq 'ARRAY') {
38
175
448
$default_input = $pipe->{output};
39
}
40
309
7048
push @pipes, $pipe;
41
}
42
292
3339
run_pipes(0,@pipes);
43
292
1024
my $result = $pipes[$#pipes]->{output};
44
# @pipes=() kills parent
45
# as well as its implicit destroying
46
# destroy pipes one by one if you want to survive!!!
47
292
3531
undef $_ for @pipes;
48
292
100
2542
return unless defined(wantarray);
49
158
50
549
return unless $result;
50
158
50
167250
return wantarray?@$result:$result;
51
}
52
53
sub run_pipes {
54
792
792
0
1769
my ($prev_busy,$me,@next) = @_;
55
792
33
2084
my $me_busy = $me->load_data || $me->busy_processors;
56
792
31178
while ($me_busy) {
57
502204
1292345
$me->receive_and_merge_data;
58
502204
100
1277163
$me_busy = $me->load_data || $me->busy_processors;
59
502204
66
1482899
my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
60
502204
66
901552
$me_busy ||= $next_busy;
61
# get data from pipe if we have free_processors
62
502204
100
66
1653492
return $me_busy if $prev_busy && $me->free_processors;
63
}
64
293
660
return 0;
65
}
66
67
# input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
68
# if there is no data it returns undef
69
sub input_iterator {
70
506123
506123
0
635037
my $self = shift;
71
506123
986814
$self->{input_iterator}->(@_);
72
}
73
74
sub output_iterator {
75
502204
502204
0
600675
my $self = shift;
76
502204
1093476
$self->{output_iterator}->(@_);
77
}
78
79
# this is to set/create input iterator
80
sub set_input_iterator {
81
400
400
0
804
my ($self,$param) = @_;
82
400
1424
my $old_behaviour = $param->{input_iterator};
83
400
2780
my ($input_iterator) = extract_param($param, qw(input_iterator input queue data));
84
400
100
2996
unless (ref($input_iterator) eq 'CODE') {
85
319
50
1209
die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
86
319
377
my $queue = $input_iterator;
87
319
1016
$self->{input} = $queue;
88
319
50
689
if ($old_behaviour) {
89
0
0
my $l = @$queue;
90
0
0
my $i = 0;
91
0
0
0
0
$input_iterator = sub {$i<$l?$queue->[$i++]:undef};
0
0
92
} else {
93
# this behaviour is introduced with 0.06
94
319
50
505572
3529
$input_iterator = sub {$queue?shift(@$queue):undef};
505572
1751109
95
}
96
}
97
400
1762
$self->{input_iterator} = $input_iterator;
98
}
99
100
sub set_output_iterator {
101
309
309
0
1503
my ($self,$param) = @_;
102
309
8517
my ($output_iterator) = extract_param($param, qw(merge_data output_iterator output output_queue output_data merge reduce));
103
309
100
4229
unless (ref($output_iterator) eq 'CODE') {
104
175
50
4830
my $queue = $output_iterator || [];
105
175
2484
$self->{output} = $queue;
106
175
441908
2325
$output_iterator = sub {push @$queue,$_};
441908
1560542
107
}
108
309
2681
$self->{output_iterator} = $output_iterator;
109
}
110
111
# loads all free processor with data from input
112
# return the number of loaded processors
113
sub load_data {
114
502996
502996
0
611443
my $self = shift;
115
502996
1317860
my @free_processors = $self->free_processors;
116
502996
748752
my $result = 0;
117
502996
840293
for my $processor (@free_processors) {
118
506123
1397194
my $data = $self->input_iterator;
119
# return number of processors loaded
120
506123
100
1166341
return $result unless defined($data);
121
502204
496774
$result++;
122
502204
944112
$self->load_data_processor($data,$processor);
123
}
124
499077
2257665
return $result;
125
}
126
127
# this should work with Windows NT or if user explicitly set that
128
my $number_of_cpu_cores = $ENV{NUMBER_OF_PROCESSORS};
129
sub number_of_cpu_cores {
130
#$number_of_cpu_cores = $_[0] if @_; # setter
131
278
100
278
0
1181
return $number_of_cpu_cores if $number_of_cpu_cores;
132
92
184
eval {
133
# try unix (linux,cygwin,etc.)
134
92
842444
$number_of_cpu_cores = scalar grep m{^processor\t:\s\d+\s*$},`cat /proc/cpuinfo 2>/dev/null`;
135
# try bsd
136
92
50
8556
($number_of_cpu_cores) = map m{hw.ncpu:\s+(\d+)},`sysctl -a` unless $number_of_cpu_cores;
137
};
138
# otherwise it sets number_of_cpu_cores to 2
139
92
50
1380
return $number_of_cpu_cores || 1;
140
}
141
142
sub freeze {
143
62058
62058
0
84141
my $self = shift;
144
62058
340545
$self->{freeze}->(@_);
145
}
146
147
sub thaw {
148
62074
62074
0
69377
my $self = shift;
149
62074
938962
$self->{thaw}->(@_);
150
}
151
152
# this inits freeze and thaw with Storable subroutines and try to replace them with Sereal counterparts
153
sub init_serializer {
154
400
400
0
1331
my ($self,$param) = @_;
155
400
66
4229
my ($freeze,$thaw) = grep $_ && ref($_) eq 'CODE',map delete $param->{$_},qw(freeze thaw);
156
400
100
66
2747
if ($freeze && $thaw) {
157
76
304
$self->{freeze} = $freeze;
158
76
228
$self->{thaw} = $thaw;
159
} else {
160
# try cereal
161
92
50
92
80224
eval q{
92
58236
92
5796
324
82542
162
use Sereal qw(encode_sereal decode_sereal);
163
$self->{freeze} = \&encode_sereal;
164
$self->{thaw} = \&decode_sereal;
165
1;
166
}
167
or
168
eval q{
169
use Storable;
170
$self->{freeze} = \&Storable::nfreeze;
171
$self->{thaw} = \&Storable::thaw;
172
1;
173
};
174
175
}
176
}
177
178
179
# this subroutine reads data from pipe and converts it to perl reference
180
# or scalar - if size is negative
181
# it always expects size of frozen scalar so it know how many it should read
182
# to feed thaw
183
sub _get_data {
184
514317
514317
797477
my ($self,$fh) = @_;
185
514317
730455
my ($data_size,$data);
186
514317
1549965
$fh->sysread($data_size,4);
187
514317
537913060
$data_size = unpack("l",$data_size);
188
514317
100
1229405
return undef if $data_size == _EOF_; # this if for process_data terminating
189
514226
50
868330
if ($data_size == 0) {
190
0
0
$data = '';
191
} else {
192
514226
599182
my $length = abs($data_size);
193
514226
559415
my $offset = 0;
194
# allocate all the buffer for $data beforehand
195
514226
1773728
$data = sprintf("%${length}s","");
196
514226
1341652
while ($offset < $length) {
197
523954
1406490
my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
198
523954
1692368
$fh->sysread(my $buf,$chunk_size);
199
# use lvalue form of substr to copy data in preallocated buffer
200
523954
6900526
substr($data,$offset,$chunk_size) = $buf;
201
523954
2065259
$offset += $chunk_size;
202
}
203
514226
100
1296676
$data = $self->thaw($data) if $data_size<0;
204
}
205
514226
2327374
return $data;
206
}
207
208
# this subroutine serialize data reference. otherwise
209
# it puts negative size of scalar and scalar itself to pipe.
210
sub _put_data {
211
515186
515186
787630
my ($self,$fh,$data) = @_;
212
515186
100
1161484
unless (defined($data)) {
213
960
4620
$fh->syswrite(pack("l", _EOF_));
214
960
2003040
return;
215
}
216
514226
846479
my $length = length($data);
217
514226
100
1212156
if (ref($data)) {
218
62058
143545
$data = $self->freeze($data);
219
62058
1483980
$length = -length($data);
220
}
221
514226
2532638
$fh->syswrite(pack("l", $length));
222
514226
45794584
$length = abs($length);
223
514226
666345
my $offset = 0;
224
514226
1797779
while ($offset < $length) {
225
523442
1210388
my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
226
523442
2150181
$fh->syswrite(substr($data,$offset,$chunk_size));
227
523442
34894645
$offset += $chunk_size;
228
}
229
}
230
231
sub _fork_data_processor {
232
4277
4277
6316
my ($data_processor_callback) = @_;
233
# create processor as fork
234
4277
5967016
my $pid = fork();
235
4277
50
97103
unless (defined $pid) {
236
#print "say goodbye - can't fork!\n"; <>;
237
0
0
die "can't fork!";
238
}
239
4277
100
21117
if ($pid == 0) {
240
local $SIG{TERM} = sub {
241
0
0
0
exit;
242
91
41129
}; # exit silently from data processors
243
# data processor is eternal loop which wait for raw data on pipe from main
244
# data processor is killed when it's not needed anymore by _kill_data_processors
245
91
5133
$data_processor_callback->() while (1);
246
0
0
exit;
247
}
248
4186
1156062
return $pid;
249
}
250
251
sub _create_data_processor {
252
4277
4277
8542
my ($self,$process_data_callback) = @_;
253
254
# parent <=> child pipes
255
4277
88985
my ($parent_read, $parent_write) = pipely();
256
4277
9341
my ($child_read, $child_write) = pipely();
257
258
my $data_processor = sub {
259
12113
12113
53902
local $_ = $self->_get_data($child_read);
260
12113
100
91819
unless (defined($_)) {
261
91
329622
exit 0;
262
}
263
12022
99122
$_ = $process_data_callback->($_);
264
12022
129774
$self->_put_data($parent_write,$_);
265
4277
25835
};
266
267
# return data processor record
268
return {
269
4277
31600
pid => _fork_data_processor($data_processor), # needed to kill processor when there is no more data to process
270
child_write => $child_write, # pipe to write raw data from main to data processor
271
parent_read => $parent_read, # pipe to write raw data from main to data processor
272
};
273
}
274
275
sub extract_param {
276
1509
1509
0
17234
my ($param, @alias) = @_;
277
1509
3676
29478
return first {defined($_)} map delete($param->{$_}), @alias;
3676
10085
278
}
279
280
sub create_data_processors {
281
400
400
0
860
my ($self,$param) = @_;
282
400
1082
my $process_data_callback = extract_param($param,qw(process_data process processor map));
283
400
1757
my $number_of_data_processors = extract_param($param,qw(number_of_data_processors number_of_processors));
284
400
100
4303
$number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
285
400
50
4446
die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
286
400
50
1878
die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
287
400
6087
return [map $self->_create_data_processor($process_data_callback,$_), 0..$number_of_data_processors-1];
288
}
289
290
sub load_data_processor {
291
502204
502204
0
824219
my ($self,$data,$processor) = @_;
292
502204
880228
$processor->{item_number} = $self->{item_number}++;
293
502204
50
849151
die "no support of data processing for undef items!" unless defined($data);
294
502204
712020
$processor->{busy} = 1;
295
502204
1623248
$self->_put_data($processor->{child_write},$data);
296
}
297
298
sub busy_processors {
299
3343
3343
0
4126
my $self = shift;
300
3343
3863
return grep $_->{busy}, @{$self->{processors}};
3343
23621
301
}
302
303
sub free_processors {
304
503495
503495
0
559785
my $self = shift;
305
503495
556777
return grep !$_->{busy}, @{$self->{processors}};
503495
4070020
306
}
307
308
sub receive_and_merge_data {
309
502204
502204
0
625483
my $self = shift;
310
502204
700138
my ($processors,$ready) = @{$self}{qw(processors ready)};
502204
1488246
311
502204
100
1194173
$self->{ready} = $ready = [] unless $ready;
312
502204
100
66
2673571
@$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
313
502204
16238600
my $fh = shift(@$ready);
314
502204
4128754
2960240
my $processor = first {$_->{parent_read} == $fh} @$processors;
4128754
7556372
315
502204
1944527
local $_ = $self->_get_data($fh);
316
502204
937312
$processor->{busy} = undef; # make processor free
317
502204
1282555
$self->output_iterator($_,$processor->{item_number});
318
}
319
320
sub _kill_data_processors {
321
60
60
300
my ($self) = @_;
322
60
600
my $processors = $self->{processors};
323
60
660
my @pid_to_kill = map $_->{pid}, @$processors;
324
60
900
my %pid_to_wait = map {$_=>undef} @pid_to_kill;
960
3180
325
# put undef to input of data_processor - they know it's time to exit
326
60
420
$self->_put_data($_->{child_write}) for @$processors;
327
60
1200
while (@pid_to_kill) {
328
960
68783640
my $pid = wait;
329
960
8160
delete $pid_to_wait{$pid};
330
960
39840
@pid_to_kill = keys %pid_to_wait;
331
}
332
}
333
334
sub new {
335
400
400
0
707
my ($class, $param) = @_;
336
400
7521
my $self = {mypid=>$$};
337
400
2253
bless $self,$class;
338
400
2171
$self->set_input_iterator($param);
339
# item_number for merge implementation
340
400
3061
$self->{item_number} = 0;
341
# check if user want to use alternative serialisation routines
342
400
3092
$self->init_serializer($param);
343
# @$processors is array with data processor info
344
400
2338
$self->{processors} = $self->create_data_processors($param);
345
# data_merge is sub which merge all processed data inside parent thread
346
# it is called each time after process_data returns some new portion of data
347
309
10242
$self->set_output_iterator($param);
348
309
8205
my $not_supported = join ", ", keys %$param;
349
309
50
1168
die "Parameters are redundant or not supported:". $not_supported if $not_supported;
350
309
7532
return $self;
351
}
352
353
sub DESTROY {
354
76
76
1368
my $self = shift;
355
76
100
40821
return unless $self->{mypid} == $$;
356
60
1200
$self->_kill_data_processors;
357
#semctl($self->{sem_id},0,IPC_RMID,0);
358
}
359
360
=begin comment
361
362
Why I copied IO::Pipely::pipely instead of use IO::Pipely qw(pipely)?
363
1. Do not depend on installation of additional module
364
2. I don't know (yet) how to win race condition:
365
A) In Makefile.PL I would to check if fork & pipe works on the platform before creating Makefile.
366
But I am not sure if it's ok that at that moment I can use pipely to create pipes.
367
so
368
B) to use pipely I have to create makefile
369
For now I decided just copy code for pipely into this module.
370
Then if I know how do win that race condition I will get rid of this code and
371
will use IO::Pipely qw(pipely) instead and
372
will add dependency on it.
373
374
=end comment
375
376
=cut
377
378
# IO::Pipely is copyright 2000-2012 by Rocco Caputo.
379
92
92
99084
use Symbol qw(gensym);
92
95680
92
8740
380
92
552
use IO::Socket qw(
381
AF_UNIX
382
PF_INET
383
PF_UNSPEC
384
SOCK_STREAM
385
SOL_SOCKET
386
SOMAXCONN
387
SO_ERROR
388
SO_REUSEADDR
389
inet_aton
390
pack_sockaddr_in
391
unpack_sockaddr_in
392
92
92
95588
);
92
3073260
393
92
92
29900
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
92
184
92
4508
394
92
92
276
use Errno qw(EINPROGRESS EWOULDBLOCK);
92
184
92
98072
395
396
my (@oneway_pipe_types, @twoway_pipe_types);
397
if ($^O eq "MSWin32" or $^O eq "MacOS") {
398
@oneway_pipe_types = qw(inet socketpair pipe);
399
@twoway_pipe_types = qw(inet socketpair pipe);
400
}
401
elsif ($^O eq "cygwin") {
402
@oneway_pipe_types = qw(pipe inet socketpair);
403
@twoway_pipe_types = qw(inet pipe socketpair);
404
}
405
else {
406
@oneway_pipe_types = qw(pipe socketpair inet);
407
@twoway_pipe_types = qw(socketpair inet pipe);
408
}
409
410
sub pipely {
411
8554
8554
0
17110
my %arg = @_;
412
413
8554
45563
my $conduit_type = delete($arg{type});
414
8554
50
142561
my $debug = delete($arg{debug}) || 0;
415
416
# Generate symbols to be used as filehandles for the pipe's ends.
417
#
418
# Filehandle autovivification isn't used for portability with older
419
# versions of Perl.
420
421
8554
168374
my ($a_read, $b_write) = (gensym(), gensym());
422
423
# Try the specified conduit type only. No fallback.
424
425
8554
50
720938
if (defined $conduit_type) {
426
0
0
0
return ($a_read, $b_write) if _try_oneway_type(
427
$conduit_type, $debug, \$a_read, \$b_write
428
);
429
}
430
431
# Otherwise try all available conduit types until one works.
432
# Conduit types that fail are discarded for speed.
433
434
8554
53578
while (my $try_type = $oneway_pipe_types[0]) {
435
8554
50
76338
return ($a_read, $b_write) if _try_oneway_type(
436
$try_type, $debug, \$a_read, \$b_write
437
);
438
0
0
shift @oneway_pipe_types;
439
}
440
441
# There's no conduit type left. Bummer!
442
443
0
0
0
$debug and warn "nothing worked";
444
0
0
return;
445
}
446
447
# Try a pipe by type.
448
449
sub _try_oneway_type {
450
8554
8554
41371
my ($type, $debug, $a_read, $b_write) = @_;
451
452
# Try a pipe().
453
8554
50
20035
if ($type eq "pipe") {
454
8554
76816
eval {
455
8554
50
382603
pipe($$a_read, $$b_write) or die "pipe failed: $!";
456
};
457
458
# Pipe failed.
459
8554
50
21733
if (length $@) {
460
0
0
0
warn "pipe failed: $@" if $debug;
461
0
0
return;
462
}
463
464
8554
50
15077
$debug and do {
465
0
0
warn "using a pipe";
466
0
0
warn "ar($$a_read) bw($$b_write)\n";
467
};
468
469
# Turn off buffering. POE::Kernel does this for us, but
470
# someone might want to use the pipe class elsewhere.
471
8554
221457
select((select($$b_write), $| = 1)[0]);
472
8554
73105
return 1;
473
}
474
475
# Try a UNIX-domain socketpair.
476
0
0
if ($type eq "socketpair") {
477
0
eval {
478
0
0
socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
479
or die "socketpair failed: $!";
480
};
481
482
0
0
if (length $@) {
483
0
0
warn "socketpair failed: $@" if $debug;
484
0
return;
485
}
486
487
0
0
$debug and do {
488
0
warn "using a UNIX domain socketpair";
489
0
warn "ar($$a_read) bw($$b_write)\n";
490
};
491
492
# It's one-way, so shut down the unused directions.
493
0
shutdown($$a_read, 1);
494
0
shutdown($$b_write, 0);
495
496
# Turn off buffering. POE::Kernel does this for us, but someone
497
# might want to use the pipe class elsewhere.
498
0
select((select($$b_write), $| = 1)[0]);
499
0
return 1;
500
}
501
502
# Try a pair of plain INET sockets.
503
0
0
if ($type eq "inet") {
504
0
eval {
505
0
($$a_read, $$b_write) = _make_socket();
506
};
507
508
0
0
if (length $@) {
509
0
0
warn "make_socket failed: $@" if $debug;
510
0
return;
511
}
512
513
0
0
$debug and do {
514
0
warn "using a plain INET socket";
515
0
warn "ar($$a_read) bw($$b_write)\n";
516
};
517
518
# It's one-way, so shut down the unused directions.
519
0
shutdown($$a_read, 1);
520
0
shutdown($$b_write, 0);
521
522
# Turn off buffering. POE::Kernel does this for us, but someone
523
# might want to use the pipe class elsewhere.
524
0
select((select($$b_write), $| = 1)[0]);
525
0
return 1;
526
}
527
528
# There's nothing left to try.
529
0
0
$debug and warn "unknown pipely() socket type ``$type''";
530
0
return;
531
}
532
533
534
1;
535
536
=head1 NAME
537
538
C - parallel data processing conveyor
539
540
=encoding utf-8
541
542
=head1 SYNOPSIS
543
544
use Parallel::DataPipe;
545
Parallel::DataPipe::run {
546
input => [1..100],
547
process => sub { "$_:$$" },
548
number_of_data_processors => 100,
549
output => sub { print "$_\n" },
550
};
551
552
553
=head1 DESCRIPTION
554
555
556
If you have some long running script processing data item by item
557
(having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc)
558
you can speed it up 4-20 times using parallel datapipe conveyour.
559
Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24!
560
And huge amount of memory: memory is cheap now.
561
So they are ready for parallel data processing.
562
With this script there is an easy and flexible way to use that power.
563
564
So what are the benefits of this module?
565
566
1) because it uses input_iterator it does not have to know all input data before starting parallel processing
567
568
2) because it uses merge_data processed data is ready for using in main thread immediately.
569
570
1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
571
572
If you don't want to overload your database with multiple simultaneous queries
573
you make queries only within input_iterator and then process_data and then flush it with merge_data.
574
On the other hand you usually win if make queries in process_data and do a lot of data processors.
575
Possibly even more then physical cores if database queries takes a long time and then small amount to process.
576
577
It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.
578
579
Make tests and you will know.
580
581
To (re)write your script for using all processing power of your server you have to find out:
582
583
1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
584
585
2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
586
587
3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
588
589
Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
590
591
=head1 SUBROUTINES
592
593
=head2 run
594
595
This is subroutine which covers magic of parallelizing data processing.
596
It receives paramaters with these keys via hash ref.
597
598
B - reference to array or subroutine which should return data item to be processed.
599
in case of subroutine it should return undef to signal EOF.
600
In case of array it uses it as queue, i.e. shift(@$array) until there is no data item,
601
This behaviour has been introduced in 0.06.
602
Also you can use these aliases:
603
input_iterator, queue, data
604
605
Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
606
while new behaviour is to treat this parameter like a queue.
607
0.06 support old behaviour only for input_iterator,
608
while in the future it will behave as a queue to make life easier
609
610
B - reference to subroutine which process data items. they are passed via $_ variable
611
Then it should return processed data. this subroutine is executed in forked process so don't
612
use any shared resources inside it.
613
Also you can update children state, but it will not affect parent state.
614
Also you can use these aliases:
615
process_data
616
617
These parameters are optional and has reasonable defaults, so you change them only know what you do
618
619
B - optional. either reference to a subroutine or array which receives processed data item.
620
subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number.
621
this subroutine is executed in parent thread, so you can rely on changes that it made.
622
if you don't specify this parameter array with processed data can be received as a subroutine result.
623
You can use this aliseases for this parameter:
624
merge_data, merge
625
626
B - (optional) number of parallel data processors. if you don't specify,
627
it tries to find out a number of cpu cores
628
and create the same number of data processor children.
629
It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT.
630
If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env.
631
It makes sense to have explicit C
632
which possibly is greater then cpu cores number
633
if you are to use all slave DB servers in your environment
634
and making query to DB servers takes more time then processing returned data.
635
Otherwise it's optimal to have C equal to number of cpu cores.
636
637
B, B - you can use alternative serializer.
638
for example if you know that you are working with array of words (0..65535) you can use
639
freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]}
640
which will reduce the amount of bytes exchanged between processes.
641
But do it as the last optimization resort only.
642
In fact automatic choise is quite good and efficient.
643
It uses encode_sereal and decode_sereal if Sereal module is found.
644
Otherwise it use Storable freeze and thaw.
645
646
Note: run has also undocumented prototype for calling (\@\$) i.e.
647
648
my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});
649
650
This feature is experimental and can be removed. Use it at your own risk.
651
652
=head2 pipeline
653
654
pipeline() is a chain of run() (parallel data pipes) executed in parallel
655
and input for next pipe is implicitly got from previous one.
656
657
run {input => \@queue, process => \&process, output => \@out}
658
659
is the same as
660
661
pipeline {input => \@queue, process => \&process, output => \@out}
662
663
But with pipeline you can create chain of connected pipes and run all of them in parallel
664
like it's done in unix with processes pipeline.
665
666
pipeline(
667
{ input => \@queue, process => \&process1},
668
{ process => \&process2},
669
{ process => \&process3, output => sub {print "$_\n";} },
670
);
671
672
And it works like in unix - input of next pipe is (implicitly) set to output from previous pipe.
673
You have to specify input for the first pipe explicitly (see example of parallel grep 'hello' below ).
674
675
If you don't specify input for next pipe it is assumed that it is output from previous pipe like in unix.
676
Also this assumption that input of next pipe depends on output of previous is applied for algorithm
677
on prioritizing of execution of pipe processors.
678
As long as the very right (last in list) pipe has input items to process it executes it's data processors.
679
If this pipe has free processor that is not loaded with data then the processors from previous pipe are executed
680
to produce an input data for next pipe.
681
This is recursively applied for all chain of pipes.
682
683
Here is parallel grep implemented in 40 lines of perl code:
684
685
use List::More qw(part);
686
my @dirs = '.';
687
my @files;
688
pipeline(
689
# this pipe looks (recursively) for all files in specified @dirs
690
{
691
input => \@dirs,
692
process => sub {
693
my ($files,$dirs) = part -d?1:0,glob("$_/*");
694
return [$files,$dirs];
695
},
696
output => sub {
697
my ($files,$dirs) = @$_;
698
push @dirs,@$dirs;# recursion is here
699
push @files,@$files;
700
},
701
},
702
# this pipe grep files for word hello
703
{
704
input => \@files,
705
process => sub {
706
my ($file) = $_;
707
open my $fh, $file;
708
my @lines;
709
while (<$fh>) {
710
# line_number : line
711
push @lines,"$.:$_" if m{hello};
712
}
713
return [$file,\@lines];
714
},
715
output => sub {
716
my ($file,$lines) = @$_;
717
# print filename, line_number , line
718
print "$file:$_" for @$lines;
719
}
720
}
721
);
722
723
=head1 HOW parallel pipe (run) WORKS
724
725
1) Main thread (parent) forks C of children for processing data.
726
727
2) As soon as data comes from C it sends it to next child using
728
pipe mechanizm.
729
730
3) Child processes data and returns result back to parent using pipe.
731
732
4) Parent firstly fills up all the pipes to children with data and then
733
starts to expect processed data on pipes from children.
734
735
5) If it receives result from chidlren it sends processed data to C subroutine,
736
and starts loop 2) again.
737
738
6) loop 2) continues until input data is ended (end of C array or C sub returned undef).
739
740
7) In the end parent expects processed data from all busy chidlren and puts processed data to C
741
742
8) After having all the children sent processed data they are killed and run returns to the caller.
743
744
Note:
745
If C or returns reference, it serialize/deserialize data before/after pipe.
746
That way you have full control whether data will be serialized on IPC.
747
748
=head1 SEE ALSO
749
750
L
751
752
L
753
754
L
755
756
L
757
758
L - pipes that work almost everywhere
759
760
L - portable multitasking and networking framework for any event loop
761
762
L
763
764
L
765
766
=head1 DEPENDENCIES
767
768
Only core modules are used.
769
770
if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
771
772
=head1 BUGS
773
774
For all bugs please send an email to okharch@gmail.com.
775
776
=head1 SOURCE REPOSITORY
777
778
See the git source on github
779
L
780
781
=head1 COPYRIGHT
782
783
Copyright (c) 2013 Oleksandr Kharchenko
784
785
All right reserved. This library is free software; you can redistribute it
786
and/or modify it under the same terms as Perl itself.
787
788
=head1 AUTHOR
789
790
Oleksandr Kharchenko
791
792
=cut