line
stmt
bran
cond
sub
pod
time
code
1
package Parallel::DataPipe;
2
3
our $VERSION='0.12';
4
152
152
6749524
use 5.8.0;
152
548
5
152
136
9612
use strict;
136
2252
136
3428
6
136
133
16568
use warnings;
133
2285
133
3222
7
133
129
59825
use IO::Select;
129
157993
129
7547
8
129
125
15573
use List::Util qw(first min);
125
1306
125
14196
9
125
50
109
9884
use constant PIPE_MAX_CHUNK_SIZE => $^O =~ m{linux|cygwin}? 16*1024 : 1024;
109
779
109
10842
10
109
92
4921
use constant _EOF_ => (-(1<<31));
92
276
92
244904
11
12
sub run {
13
350
350
1
3832138
my $param = {};
14
350
1497
my ($input,$map,$output) = @_;
15
350
100
4049
if (ref($input) eq 'HASH') {
16
258
1076
$param = $input;
17
} else {
18
92
460
$param = {input=>$input, process=>$map, output=>$output };
19
}
20
350
5431
pipeline($param);
21
}
22
23
sub pipeline {
24
383
383
1
79895
my $class=shift;
25
383
100
4836
if (ref($class) eq 'HASH') {
26
350
2286
unshift @_, $class;
27
350
1787
$class = __PACKAGE__;
28
}
29
383
1285
my @pipes;
30
# init pipes
31
my $default_input;
32
383
1795
for my $param (@_) {
33
400
100
1946
unless (exists $param->{input}) {
34
17
50
663
$param->{input} = $default_input or die "You have to specify input for the first pipe";
35
}
36
400
6533
my $pipe = $class->new($param);
37
309
100
2713
if (ref($pipe->{output}) eq 'ARRAY') {
38
175
937
$default_input = $pipe->{output};
39
}
40
309
4534
push @pipes, $pipe;
41
}
42
292
2772
run_pipes(0,@pipes);
43
292
1157
my $result = $pipes[$#pipes]->{output};
44
# @pipes=() kills parent
45
# as well as its implicit destroying
46
# destroy pipes one by one if you want to survive!!!
47
292
1867
undef $_ for @pipes;
48
292
100
2602
return unless defined(wantarray);
49
158
50
405
return unless $result;
50
158
50
124534
return wantarray?@$result:$result;
51
}
52
53
sub run_pipes {
54
792
792
0
2208
my ($prev_busy,$me,@next) = @_;
55
792
33
3193
my $me_busy = $me->load_data || $me->busy_processors;
56
792
2786
while ($me_busy) {
57
502204
1234990
$me->receive_and_merge_data;
58
502204
100
1103755
$me_busy = $me->load_data || $me->busy_processors;
59
502204
66
1120505
my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
60
502204
66
879937
$me_busy ||= $next_busy;
61
# get data from pipe if we have free_processors
62
502204
100
66
1137887
return $me_busy if $prev_busy && $me->free_processors;
63
}
64
293
908
return 0;
65
}
66
67
# input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
68
# if there is no data it returns undef
69
sub input_iterator {
70
506123
506123
0
668584
my $self = shift;
71
506123
896971
$self->{input_iterator}->(@_);
72
}
73
74
sub output_iterator {
75
502204
502204
0
707388
my $self = shift;
76
502204
1008305
$self->{output_iterator}->(@_);
77
}
78
79
# this is to set/create input iterator
80
sub set_input_iterator {
81
400
400
0
1413
my ($self,$param) = @_;
82
400
1140
my $old_behaviour = $param->{input_iterator};
83
400
2779
my ($input_iterator) = extract_param($param, qw(input_iterator input queue data));
84
400
100
5575
unless (ref($input_iterator) eq 'CODE') {
85
319
50
1465
die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
86
319
1426
my $queue = $input_iterator;
87
319
1037
$self->{input} = $queue;
88
319
50
1106
if ($old_behaviour) {
89
0
0
my $l = @$queue;
90
0
0
my $i = 0;
91
0
0
0
0
$input_iterator = sub {$i<$l?$queue->[$i++]:undef};
0
0
92
} else {
93
# this behaviour is introduced with 0.06
94
319
50
505572
3529
$input_iterator = sub {$queue?shift(@$queue):undef};
505572
1175535
95
}
96
}
97
400
2168
$self->{input_iterator} = $input_iterator;
98
}
99
100
sub set_output_iterator {
101
309
309
0
2800
my ($self,$param) = @_;
102
309
9031
my ($output_iterator) = extract_param($param, qw(merge_data output_iterator output output_queue output_data merge reduce));
103
309
100
6828
unless (ref($output_iterator) eq 'CODE') {
104
175
50
4504
my $queue = $output_iterator || [];
105
175
1759
$self->{output} = $queue;
106
175
441908
3313
$output_iterator = sub {push @$queue,$_};
441908
1196585
107
}
108
309
5109
$self->{output_iterator} = $output_iterator;
109
}
110
111
# loads all free processor with data from input
112
# return the number of loaded processors
113
sub load_data {
114
502996
502996
0
680637
my $self = shift;
115
502996
863111
my @free_processors = $self->free_processors;
116
502996
777383
my $result = 0;
117
502996
888683
for my $processor (@free_processors) {
118
506123
858734
my $data = $self->input_iterator;
119
# return number of processors loaded
120
506123
100
995529
return $result unless defined($data);
121
502204
672816
$result++;
122
502204
825316
$self->load_data_processor($data,$processor);
123
}
124
499077
1355560
return $result;
125
}
126
127
# this should work with Windows NT or if user explicitly set that
128
my $number_of_cpu_cores = $ENV{NUMBER_OF_PROCESSORS};
129
sub number_of_cpu_cores {
130
#$number_of_cpu_cores = $_[0] if @_; # setter
131
278
100
278
0
1291
return $number_of_cpu_cores if $number_of_cpu_cores;
132
92
184
eval {
133
# try unix (linux,cygwin,etc.)
134
92
431480
$number_of_cpu_cores = scalar grep m{^processor\t:\s\d+\s*$},`cat /proc/cpuinfo 2>/dev/null`;
135
# try bsd
136
92
50
6532
($number_of_cpu_cores) = map m{hw.ncpu:\s+(\d+)},`sysctl -a` unless $number_of_cpu_cores;
137
};
138
# otherwise it sets number_of_cpu_cores to 2
139
92
50
2668
return $number_of_cpu_cores || 1;
140
}
141
142
sub freeze {
143
62058
62058
0
85009
my $self = shift;
144
62058
206325
$self->{freeze}->(@_);
145
}
146
147
sub thaw {
148
62074
62074
0
94564
my $self = shift;
149
62074
350943
$self->{thaw}->(@_);
150
}
151
152
# this inits freeze and thaw with Storable subroutines and try to replace them with Sereal counterparts
153
sub init_serializer {
154
400
400
0
2740
my ($self,$param) = @_;
155
400
66
6646
my ($freeze,$thaw) = grep $_ && ref($_) eq 'CODE',map delete $param->{$_},qw(freeze thaw);
156
400
100
66
2972
if ($freeze && $thaw) {
157
76
304
$self->{freeze} = $freeze;
158
76
228
$self->{thaw} = $thaw;
159
} else {
160
# try cereal
161
92
50
92
47472
eval q{
92
96784
92
6716
324
95969
162
use Sereal qw(encode_sereal decode_sereal);
163
$self->{freeze} = \&encode_sereal;
164
$self->{thaw} = \&decode_sereal;
165
1;
166
}
167
or
168
eval q{
169
use Storable;
170
$self->{freeze} = \&Storable::nfreeze;
171
$self->{thaw} = \&Storable::thaw;
172
1;
173
};
174
175
}
176
}
177
178
179
# this subroutine reads data from pipe and converts it to perl reference
180
# or scalar - if size is negative
181
# it always expects size of frozen scalar so it know how many it should read
182
# to feed thaw
183
sub _get_data {
184
514317
514317
881868
my ($self,$fh) = @_;
185
514317
654308
my ($data_size,$data);
186
514317
1551128
$fh->sysread($data_size,4);
187
514317
267373003
$data_size = unpack("l",$data_size);
188
514317
100
1238159
return undef if $data_size == _EOF_; # this if for process_data terminating
189
514226
50
890377
if ($data_size == 0) {
190
0
0
$data = '';
191
} else {
192
514226
731654
my $length = abs($data_size);
193
514226
648071
my $offset = 0;
194
# allocate all the buffer for $data beforehand
195
514226
1906325
$data = sprintf("%${length}s","");
196
514226
1066921
while ($offset < $length) {
197
523954
1246769
my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
198
523954
1451639
$fh->sysread(my $buf,$chunk_size);
199
# use lvalue form of substr to copy data in preallocated buffer
200
523954
5922373
substr($data,$offset,$chunk_size) = $buf;
201
523954
1314109
$offset += $chunk_size;
202
}
203
514226
100
996714
$data = $self->thaw($data) if $data_size<0;
204
}
205
514226
2446097
return $data;
206
}
207
208
# this subroutine serialize data reference. otherwise
209
# it puts negative size of scalar and scalar itself to pipe.
210
sub _put_data {
211
515186
515186
894493
my ($self,$fh,$data) = @_;
212
515186
100
826106
unless (defined($data)) {
213
960
3240
$fh->syswrite(pack("l", _EOF_));
214
960
35400
return;
215
}
216
514226
781692
my $length = length($data);
217
514226
100
896272
if (ref($data)) {
218
62058
112783
$data = $self->freeze($data);
219
62058
1561495
$length = -length($data);
220
}
221
514226
2180255
$fh->syswrite(pack("l", $length));
222
514226
19080183
$length = abs($length);
223
514226
811989
my $offset = 0;
224
514226
1182357
while ($offset < $length) {
225
523442
1384693
my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
226
523442
2196898
$fh->syswrite(substr($data,$offset,$chunk_size));
227
523442
9448204
$offset += $chunk_size;
228
}
229
}
230
231
sub _fork_data_processor {
232
4277
4277
11290
my ($data_processor_callback, $init_data_processor) = @_;
233
# create processor as fork
234
4277
2835346
my $pid = fork();
235
4277
50
79264
unless (defined $pid) {
236
#print "say goodbye - can't fork!\n"; <>;
237
0
0
die "can't fork!";
238
}
239
4277
100
13916
if ($pid == 0) {
240
local $SIG{TERM} = sub {
241
0
0
0
exit;
242
91
20901
}; # exit silently from data processors
243
# data processor is eternal loop which wait for raw data on pipe from main
244
# data processor is killed when it's not needed anymore by _kill_data_processors
245
91
50
33
4448
$init_data_processor->() if ref($init_data_processor) && ref($init_data_processor) eq 'CODE';
246
91
2660
$data_processor_callback->() while (1);
247
0
0
exit;
248
}
249
4186
944067
return $pid;
250
}
251
252
sub _create_data_processor {
253
4277
4277
18191
my ($self,$process_data_callback, $init_data_processor) = @_;
254
255
# parent <=> child pipes
256
4277
103990
my ($parent_read, $parent_write) = pipely();
257
4277
10962
my ($child_read, $child_write) = pipely();
258
259
my $data_processor = sub {
260
12113
12113
37983
local $_ = $self->_get_data($child_read);
261
12113
100
36645
unless (defined($_)) {
262
91
74208
exit 0;
263
}
264
12022
35042
$_ = $process_data_callback->($_);
265
12022
75563
$self->_put_data($parent_write,$_);
266
4277
49474
};
267
268
# return data processor record
269
return {
270
4277
35125
pid => _fork_data_processor($data_processor,$init_data_processor), # needed to kill processor when there is no more data to process
271
child_write => $child_write, # pipe to write raw data from main to data processor
272
parent_read => $parent_read, # pipe to write raw data from main to data processor
273
};
274
}
275
276
sub extract_param {
277
1909
1909
0
30541
my ($param, @alias) = @_;
278
1909
4076
36702
return first {defined($_)} map delete($param->{$_}), @alias;
4076
15030
279
}
280
281
sub create_data_processors {
282
400
400
0
1516
my ($self,$param) = @_;
283
400
1186
my $process_data_callback = extract_param($param,qw(process_data process processor map));
284
400
2053
my $init_data_processor = extract_param($param,qw(init_data_processor));
285
400
1356
my $number_of_data_processors = extract_param($param,qw(number_of_data_processors number_of_processors));
286
400
100
6232
$number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
287
400
50
5432
die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
288
400
50
2461
die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
289
400
4708
return [map $self->_create_data_processor($process_data_callback, $init_data_processor, $_), 0..$number_of_data_processors-1];
290
}
291
292
sub load_data_processor {
293
502204
502204
0
792151
my ($self,$data,$processor) = @_;
294
502204
793459
$processor->{item_number} = $self->{item_number}++;
295
502204
50
825128
die "no support of data processing for undef items!" unless defined($data);
296
502204
667621
$processor->{busy} = 1;
297
502204
912377
$self->_put_data($processor->{child_write},$data);
298
}
299
300
sub busy_processors {
301
3343
3343
0
5615
my $self = shift;
302
3343
5067
return grep $_->{busy}, @{$self->{processors}};
3343
15395
303
}
304
305
sub free_processors {
306
503495
503495
0
622210
my $self = shift;
307
503495
742991
return grep !$_->{busy}, @{$self->{processors}};
503495
2608749
308
}
309
310
sub receive_and_merge_data {
311
502204
502204
0
733650
my $self = shift;
312
502204
668142
my ($processors,$ready) = @{$self}{qw(processors ready)};
502204
1048519
313
502204
100
927196
$self->{ready} = $ready = [] unless $ready;
314
502204
100
66
1496386
@$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
315
502204
14248340
my $fh = shift(@$ready);
316
502204
4255345
2520303
my $processor = first {$_->{parent_read} == $fh} @$processors;
4255345
6369044
317
502204
1537551
local $_ = $self->_get_data($fh);
318
502204
953585
$processor->{busy} = undef; # make processor free
319
502204
1135793
$self->output_iterator($_,$processor->{item_number});
320
}
321
322
sub _kill_data_processors {
323
60
60
660
my ($self) = @_;
324
60
480
my $processors = $self->{processors};
325
60
1080
my @pid_to_kill = map $_->{pid}, @$processors;
326
60
360
my %pid_to_wait = map {$_=>undef} @pid_to_kill;
960
4260
327
# put undef to input of data_processor - they know it's time to exit
328
60
1260
$self->_put_data($_->{child_write}) for @$processors;
329
60
300
while (@pid_to_kill) {
330
960
27816900
my $pid = wait;
331
960
18300
delete $pid_to_wait{$pid};
332
960
30900
@pid_to_kill = keys %pid_to_wait;
333
}
334
}
335
336
sub new {
337
400
400
0
1766
my ($class, $param) = @_;
338
400
6894
my $self = {mypid=>$$};
339
400
2590
bless $self,$class;
340
400
3461
$self->set_input_iterator($param);
341
# item_number for merge implementation
342
400
2455
$self->{item_number} = 0;
343
# check if user want to use alternative serialisation routines
344
400
5415
$self->init_serializer($param);
345
# @$processors is array with data processor info
346
400
3832
$self->{processors} = $self->create_data_processors($param);
347
# data_merge is sub which merge all processed data inside parent thread
348
# it is called each time after process_data returns some new portion of data
349
309
11220
$self->set_output_iterator($param);
350
309
4963
my $not_supported = join ", ", keys %$param;
351
309
50
1054
die "Parameters are redundant or not supported:". $not_supported if $not_supported;
352
309
6859
return $self;
353
}
354
355
sub DESTROY {
356
76
76
879
my $self = shift;
357
76
100
21629
return unless $self->{mypid} == $$;
358
60
1680
$self->_kill_data_processors;
359
#semctl($self->{sem_id},0,IPC_RMID,0);
360
}
361
362
=begin comment
363
364
Why I copied IO::Pipely::pipely instead of use IO::Pipely qw(pipely)?
365
1. Do not depend on installation of additional module
366
2. I don't know (yet) how to win race condition:
367
A) In Makefile.PL I would to check if fork & pipe works on the platform before creating Makefile.
368
But I am not sure if it's ok that at that moment I can use pipely to create pipes.
369
so
370
B) to use pipely I have to create makefile
371
For now I decided just copy code for pipely into this module.
372
Then if I know how do win that race condition I will get rid of this code and
373
will use IO::Pipely qw(pipely) instead and
374
will add dependency on it.
375
376
=end comment
377
378
=cut
379
380
# IO::Pipely is copyright 2000-2012 by Rocco Caputo.
381
92
92
47656
use Symbol qw(gensym);
92
70840
92
6348
382
92
368
use IO::Socket qw(
383
AF_UNIX
384
PF_INET
385
PF_UNSPEC
386
SOCK_STREAM
387
SOL_SOCKET
388
SOMAXCONN
389
SO_ERROR
390
SO_REUSEADDR
391
inet_aton
392
pack_sockaddr_in
393
unpack_sockaddr_in
394
92
92
42136
);
92
1847360
395
92
92
22080
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
92
184
92
5980
396
92
92
552
use Errno qw(EINPROGRESS EWOULDBLOCK);
92
184
92
75808
397
398
my (@oneway_pipe_types, @twoway_pipe_types);
399
if ($^O eq "MSWin32" or $^O eq "MacOS") {
400
@oneway_pipe_types = qw(inet socketpair pipe);
401
@twoway_pipe_types = qw(inet socketpair pipe);
402
}
403
elsif ($^O eq "cygwin") {
404
@oneway_pipe_types = qw(pipe inet socketpair);
405
@twoway_pipe_types = qw(inet pipe socketpair);
406
}
407
else {
408
@oneway_pipe_types = qw(pipe socketpair inet);
409
@twoway_pipe_types = qw(socketpair inet pipe);
410
}
411
412
sub pipely {
413
8554
8554
0
18875
my %arg = @_;
414
415
8554
16365
my $conduit_type = delete($arg{type});
416
8554
50
98546
my $debug = delete($arg{debug}) || 0;
417
418
# Generate symbols to be used as filehandles for the pipe's ends.
419
#
420
# Filehandle autovivification isn't used for portability with older
421
# versions of Perl.
422
423
8554
116112
my ($a_read, $b_write) = (gensym(), gensym());
424
425
# Try the specified conduit type only. No fallback.
426
427
8554
50
578908
if (defined $conduit_type) {
428
0
0
0
return ($a_read, $b_write) if _try_oneway_type(
429
$conduit_type, $debug, \$a_read, \$b_write
430
);
431
}
432
433
# Otherwise try all available conduit types until one works.
434
# Conduit types that fail are discarded for speed.
435
436
8554
54504
while (my $try_type = $oneway_pipe_types[0]) {
437
8554
50
67253
return ($a_read, $b_write) if _try_oneway_type(
438
$try_type, $debug, \$a_read, \$b_write
439
);
440
0
0
shift @oneway_pipe_types;
441
}
442
443
# There's no conduit type left. Bummer!
444
445
0
0
0
$debug and warn "nothing worked";
446
0
0
return;
447
}
448
449
# Try a pipe by type.
450
451
sub _try_oneway_type {
452
8554
8554
49303
my ($type, $debug, $a_read, $b_write) = @_;
453
454
# Try a pipe().
455
8554
50
22406
if ($type eq "pipe") {
456
8554
47940
eval {
457
8554
50
364594
pipe($$a_read, $$b_write) or die "pipe failed: $!";
458
};
459
460
# Pipe failed.
461
8554
50
25686
if (length $@) {
462
0
0
0
warn "pipe failed: $@" if $debug;
463
0
0
return;
464
}
465
466
8554
50
15269
$debug and do {
467
0
0
warn "using a pipe";
468
0
0
warn "ar($$a_read) bw($$b_write)\n";
469
};
470
471
# Turn off buffering. POE::Kernel does this for us, but
472
# someone might want to use the pipe class elsewhere.
473
8554
177233
select((select($$b_write), $| = 1)[0]);
474
8554
49695
return 1;
475
}
476
477
# Try a UNIX-domain socketpair.
478
0
0
if ($type eq "socketpair") {
479
0
eval {
480
0
0
socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
481
or die "socketpair failed: $!";
482
};
483
484
0
0
if (length $@) {
485
0
0
warn "socketpair failed: $@" if $debug;
486
0
return;
487
}
488
489
0
0
$debug and do {
490
0
warn "using a UNIX domain socketpair";
491
0
warn "ar($$a_read) bw($$b_write)\n";
492
};
493
494
# It's one-way, so shut down the unused directions.
495
0
shutdown($$a_read, 1);
496
0
shutdown($$b_write, 0);
497
498
# Turn off buffering. POE::Kernel does this for us, but someone
499
# might want to use the pipe class elsewhere.
500
0
select((select($$b_write), $| = 1)[0]);
501
0
return 1;
502
}
503
504
# Try a pair of plain INET sockets.
505
0
0
if ($type eq "inet") {
506
0
eval {
507
0
($$a_read, $$b_write) = _make_socket();
508
};
509
510
0
0
if (length $@) {
511
0
0
warn "make_socket failed: $@" if $debug;
512
0
return;
513
}
514
515
0
0
$debug and do {
516
0
warn "using a plain INET socket";
517
0
warn "ar($$a_read) bw($$b_write)\n";
518
};
519
520
# It's one-way, so shut down the unused directions.
521
0
shutdown($$a_read, 1);
522
0
shutdown($$b_write, 0);
523
524
# Turn off buffering. POE::Kernel does this for us, but someone
525
# might want to use the pipe class elsewhere.
526
0
select((select($$b_write), $| = 1)[0]);
527
0
return 1;
528
}
529
530
# There's nothing left to try.
531
0
0
$debug and warn "unknown pipely() socket type ``$type''";
532
0
return;
533
}
534
535
536
1;
537
538
=head1 NAME
539
540
C - parallel data processing conveyor
541
542
=encoding utf-8
543
544
=head1 SYNOPSIS
545
546
use Parallel::DataPipe;
547
Parallel::DataPipe::run {
548
input => [1..100],
549
process => sub { "$_:$$" },
550
number_of_data_processors => 100,
551
output => sub { print "$_\n" },
552
};
553
554
555
=head1 DESCRIPTION
556
557
558
If you have some long running script processing data item by item
559
(having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc)
560
you can speed it up 4-20 times using parallel datapipe conveyour.
561
Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24!
562
And huge amount of memory: memory is cheap now.
563
So they are ready for parallel data processing.
564
With this script there is an easy and flexible way to use that power.
565
566
So what are the benefits of this module?
567
568
1) because it uses input_iterator it does not have to know all input data before starting parallel processing
569
570
2) because it uses merge_data processed data is ready for using in main thread immediately.
571
572
1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
573
574
If you don't want to overload your database with multiple simultaneous queries
575
you make queries only within input_iterator and then process_data and then flush it with merge_data.
576
On the other hand you usually win if make queries in process_data and do a lot of data processors.
577
Possibly even more then physical cores if database queries takes a long time and then small amount to process.
578
579
It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.
580
581
Make tests and you will know.
582
583
To (re)write your script for using all processing power of your server you have to find out:
584
585
1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
586
587
2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
588
589
3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
590
591
Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
592
593
=head1 SUBROUTINES
594
595
=head2 run
596
597
This is subroutine which covers magic of parallelizing data processing.
598
It receives paramaters with these keys via hash ref.
599
600
B - reference to array or subroutine which should return data item to be processed.
601
in case of subroutine it should return undef to signal EOF.
602
In case of array it uses it as queue, i.e. shift(@$array) until there is no data item,
603
This behaviour has been introduced in 0.06.
604
Also you can use these aliases:
605
input_iterator, queue, data
606
607
Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
608
while new behaviour is to treat this parameter like a queue.
609
0.06 support old behaviour only for input_iterator,
610
while in the future it will behave as a queue to make life easier
611
612
B - reference to subroutine which process data items. they are passed via $_ variable
613
Then it should return processed data. this subroutine is executed in forked process so don't
614
use any shared resources inside it.
615
Also you can update children state, but it will not affect parent state.
616
Also you can use these aliases:
617
process_data
618
619
These parameters are optional and has reasonable defaults, so you change them only know what you do
620
621
B - optional. either reference to a subroutine or array which receives processed data item.
622
subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number.
623
this subroutine is executed in parent thread, so you can rely on changes that it made.
624
if you don't specify this parameter array with processed data can be received as a subroutine result.
625
You can use this aliseases for this parameter:
626
merge_data, merge
627
628
B - (optional) number of parallel data processors. if you don't specify,
629
it tries to find out a number of cpu cores
630
and create the same number of data processor children.
631
It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT.
632
If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env.
633
It makes sense to have explicit C
634
which possibly is greater then cpu cores number
635
if you are to use all slave DB servers in your environment
636
and making query to DB servers takes more time then processing returned data.
637
Otherwise it's optimal to have C equal to number of cpu cores.
638
639
B, B - you can use alternative serializer.
640
for example if you know that you are working with array of words (0..65535) you can use
641
freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]}
642
which will reduce the amount of bytes exchanged between processes.
643
But do it as the last optimization resort only.
644
In fact automatic choise is quite good and efficient.
645
It uses encode_sereal and decode_sereal if Sereal module is found.
646
Otherwise it use Storable freeze and thaw.
647
648
Note: run has also undocumented prototype for calling (\@\$) i.e.
649
650
my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});
651
652
This feature is experimental and can be removed. Use it at your own risk.
653
654
=head2 pipeline
655
656
pipeline() is a chain of run() (parallel data pipes) executed in parallel
657
and input for next pipe is implicitly got from previous one.
658
659
run {input => \@queue, process => \&process, output => \@out}
660
661
is the same as
662
663
pipeline {input => \@queue, process => \&process, output => \@out}
664
665
But with pipeline you can create chain of connected pipes and run all of them in parallel
666
like it's done in unix with processes pipeline.
667
668
pipeline(
669
{ input => \@queue, process => \&process1},
670
{ process => \&process2},
671
{ process => \&process3, output => sub {print "$_\n";} },
672
);
673
674
And it works like in unix - input of next pipe is (implicitly) set to output from previous pipe.
675
You have to specify input for the first pipe explicitly (see example of parallel grep 'hello' below ).
676
677
If you don't specify input for next pipe it is assumed that it is output from previous pipe like in unix.
678
Also this assumption that input of next pipe depends on output of previous is applied for algorithm
679
on prioritizing of execution of pipe processors.
680
As long as the very right (last in list) pipe has input items to process it executes it's data processors.
681
If this pipe has free processor that is not loaded with data then the processors from previous pipe are executed
682
to produce an input data for next pipe.
683
This is recursively applied for all chain of pipes.
684
685
Here is parallel grep implemented in 40 lines of perl code:
686
687
use List::More qw(part);
688
my @dirs = '.';
689
my @files;
690
pipeline(
691
# this pipe looks (recursively) for all files in specified @dirs
692
{
693
input => \@dirs,
694
process => sub {
695
my ($files,$dirs) = part -d?1:0,glob("$_/*");
696
return [$files,$dirs];
697
},
698
output => sub {
699
my ($files,$dirs) = @$_;
700
push @dirs,@$dirs;# recursion is here
701
push @files,@$files;
702
},
703
},
704
# this pipe grep files for word hello
705
{
706
input => \@files,
707
process => sub {
708
my ($file) = $_;
709
open my $fh, $file;
710
my @lines;
711
while (<$fh>) {
712
# line_number : line
713
push @lines,"$.:$_" if m{hello};
714
}
715
return [$file,\@lines];
716
},
717
output => sub {
718
my ($file,$lines) = @$_;
719
# print filename, line_number , line
720
print "$file:$_" for @$lines;
721
}
722
}
723
);
724
725
=head1 HOW parallel pipe (run) WORKS
726
727
1) Main thread (parent) forks C of children for processing data.
728
729
2) As soon as data comes from C it sends it to next child using
730
pipe mechanizm.
731
732
3) Child processes data and returns result back to parent using pipe.
733
734
4) Parent firstly fills up all the pipes to children with data and then
735
starts to expect processed data on pipes from children.
736
737
5) If it receives result from chidlren it sends processed data to C subroutine,
738
and starts loop 2) again.
739
740
6) loop 2) continues until input data is ended (end of C array or C sub returned undef).
741
742
7) In the end parent expects processed data from all busy chidlren and puts processed data to C
743
744
8) After having all the children sent processed data they are killed and run returns to the caller.
745
746
Note:
747
If C or returns reference, it serialize/deserialize data before/after pipe.
748
That way you have full control whether data will be serialized on IPC.
749
750
=head1 SEE ALSO
751
752
L
753
754
L
755
756
L
757
758
L
759
760
L - pipes that work almost everywhere
761
762
L - portable multitasking and networking framework for any event loop
763
764
L
765
766
L
767
768
=head1 DEPENDENCIES
769
770
Only core modules are used.
771
772
if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
773
774
=head1 BUGS
775
776
For all bugs please send an email to okharch@gmail.com.
777
778
=head1 SOURCE REPOSITORY
779
780
See the git source on github
781
L
782
783
=head1 COPYRIGHT
784
785
Copyright (c) 2013 Oleksandr Kharchenko
786
787
All right reserved. This library is free software; you can redistribute it
788
and/or modify it under the same terms as Perl itself.
789
790
=head1 AUTHOR
791
792
Oleksandr Kharchenko
793
794
=cut