File Coverage

blib/lib/Parallel/DataPipe.pm
Criterion Covered Total %
statement 235 279 84.2
branch 63 112 56.2
condition 19 30 63.3
subroutine 44 46 95.6
pod 2 20 10.0
total 363 487 74.5


line stmt bran cond sub pod time code
1             package Parallel::DataPipe;
2              
3             our $VERSION='0.11';
4 152     152   3095560 use 5.8.0;
  152         608  
  152         11936  
5 136     136   1784 use strict;
  136         492  
  136         22068  
6 133     133   1700 use warnings;
  133         972  
  133         13477  
7 129     129   148292 use IO::Select;
  129         194765  
  129         21386  
8 125     125   1396 use List::Util qw(first min);
  125         283  
  125         25792  
9 109 50   109   1518 use constant PIPE_MAX_CHUNK_SIZE => $^O =~ m{linux|cygwin}? 16*1024 : 1024;
  109         303  
  109         14476  
10 92     92   460 use constant _EOF_ => (-(1<<31));
  92         184  
  92         315652  
11              
12             sub run {
13 350     350 1 4924126 my $param = {};
14 350         1515 my ($input,$map,$output) = @_;
15 350 100       4160 if (ref($input) eq 'HASH') {
16 258         696 $param = $input;
17             } else {
18 92         552 $param = {input=>$input, process=>$map, output=>$output };
19             }
20 350         2008 pipeline($param);
21             }
22              
23             sub pipeline {
24 383     383 1 64468 my $class=shift;
25 383 100       2464 if (ref($class) eq 'HASH') {
26 350         1008 unshift @_, $class;
27 350         1164 $class = __PACKAGE__;
28             }
29 383         1054 my @pipes;
30             # init pipes
31             my $default_input;
32 383         884 for my $param (@_) {
33 400 100       1933 unless (exists $param->{input}) {
34 17 50       561 $param->{input} = $default_input or die "You have to specify input for the first pipe";
35             }
36 400         5517 my $pipe = $class->new($param);
37 309 100       3116 if (ref($pipe->{output}) eq 'ARRAY') {
38 175         448 $default_input = $pipe->{output};
39             }
40 309         7048 push @pipes, $pipe;
41             }
42 292         3339 run_pipes(0,@pipes);
43 292         1024 my $result = $pipes[$#pipes]->{output};
44             # @pipes=() kills parent
45             # as well as its implicit destroying
46             # destroy pipes one by one if you want to survive!!!
47 292         3531 undef $_ for @pipes;
48 292 100       2542 return unless defined(wantarray);
49 158 50       549 return unless $result;
50 158 50       167250 return wantarray?@$result:$result;
51             }
52              
53             sub run_pipes {
54 792     792 0 1769 my ($prev_busy,$me,@next) = @_;
55 792   33     2084 my $me_busy = $me->load_data || $me->busy_processors;
56 792         31178 while ($me_busy) {
57 502204         1292345 $me->receive_and_merge_data;
58 502204   100     1277163 $me_busy = $me->load_data || $me->busy_processors;
59 502204   66     1482899 my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
60 502204   66     901552 $me_busy ||= $next_busy;
61             # get data from pipe if we have free_processors
62 502204 100 66     1653492 return $me_busy if $prev_busy && $me->free_processors;
63             }
64 293         660 return 0;
65             }
66              
67             # input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
68             # if there is no data it returns undef
69             sub input_iterator {
70 506123     506123 0 635037 my $self = shift;
71 506123         986814 $self->{input_iterator}->(@_);
72             }
73              
74             sub output_iterator {
75 502204     502204 0 600675 my $self = shift;
76 502204         1093476 $self->{output_iterator}->(@_);
77             }
78              
79             # this is to set/create input iterator
80             sub set_input_iterator {
81 400     400 0 804 my ($self,$param) = @_;
82 400         1424 my $old_behaviour = $param->{input_iterator};
83 400         2780 my ($input_iterator) = extract_param($param, qw(input_iterator input queue data));
84 400 100       2996 unless (ref($input_iterator) eq 'CODE') {
85 319 50       1209 die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
86 319         377 my $queue = $input_iterator;
87 319         1016 $self->{input} = $queue;
88 319 50       689 if ($old_behaviour) {
89 0         0 my $l = @$queue;
90 0         0 my $i = 0;
91 0 0   0   0 $input_iterator = sub {$i<$l?$queue->[$i++]:undef};
  0         0  
92             } else {
93             # this behaviour is introduced with 0.06
94 319 50   505572   3529 $input_iterator = sub {$queue?shift(@$queue):undef};
  505572         1751109  
95             }
96             }
97 400         1762 $self->{input_iterator} = $input_iterator;
98             }
99              
100             sub set_output_iterator {
101 309     309 0 1503 my ($self,$param) = @_;
102 309         8517 my ($output_iterator) = extract_param($param, qw(merge_data output_iterator output output_queue output_data merge reduce));
103 309 100       4229 unless (ref($output_iterator) eq 'CODE') {
104 175   50     4830 my $queue = $output_iterator || [];
105 175         2484 $self->{output} = $queue;
106 175     441908   2325 $output_iterator = sub {push @$queue,$_};
  441908         1560542  
107             }
108 309         2681 $self->{output_iterator} = $output_iterator;
109             }
110              
111             # loads all free processor with data from input
112             # return the number of loaded processors
113             sub load_data {
114 502996     502996 0 611443 my $self = shift;
115 502996         1317860 my @free_processors = $self->free_processors;
116 502996         748752 my $result = 0;
117 502996         840293 for my $processor (@free_processors) {
118 506123         1397194 my $data = $self->input_iterator;
119             # return number of processors loaded
120 506123 100       1166341 return $result unless defined($data);
121 502204         496774 $result++;
122 502204         944112 $self->load_data_processor($data,$processor);
123             }
124 499077         2257665 return $result;
125             }
126              
127             # this should work with Windows NT or if user explicitly set that
128             my $number_of_cpu_cores = $ENV{NUMBER_OF_PROCESSORS};
129             sub number_of_cpu_cores {
130             #$number_of_cpu_cores = $_[0] if @_; # setter
131 278 100   278 0 1181 return $number_of_cpu_cores if $number_of_cpu_cores;
132 92         184 eval {
133             # try unix (linux,cygwin,etc.)
134 92         842444 $number_of_cpu_cores = scalar grep m{^processor\t:\s\d+\s*$},`cat /proc/cpuinfo 2>/dev/null`;
135             # try bsd
136 92 50       8556 ($number_of_cpu_cores) = map m{hw.ncpu:\s+(\d+)},`sysctl -a` unless $number_of_cpu_cores;
137             };
138             # otherwise it sets number_of_cpu_cores to 2
139 92   50     1380 return $number_of_cpu_cores || 1;
140             }
141              
142             sub freeze {
143 62058     62058 0 84141 my $self = shift;
144 62058         340545 $self->{freeze}->(@_);
145             }
146              
147             sub thaw {
148 62074     62074 0 69377 my $self = shift;
149 62074         938962 $self->{thaw}->(@_);
150             }
151              
152             # this inits freeze and thaw with Storable subroutines and try to replace them with Sereal counterparts
153             sub init_serializer {
154 400     400 0 1331 my ($self,$param) = @_;
155 400   66     4229 my ($freeze,$thaw) = grep $_ && ref($_) eq 'CODE',map delete $param->{$_},qw(freeze thaw);
156 400 100 66     2747 if ($freeze && $thaw) {
157 76         304 $self->{freeze} = $freeze;
158 76         228 $self->{thaw} = $thaw;
159             } else {
160             # try cereal
161 92 50   92   80224 eval q{
  92         58236  
  92         5796  
  324         82542  
162             use Sereal qw(encode_sereal decode_sereal);
163             $self->{freeze} = \&encode_sereal;
164             $self->{thaw} = \&decode_sereal;
165             1;
166             }
167             or
168             eval q{
169             use Storable;
170             $self->{freeze} = \&Storable::nfreeze;
171             $self->{thaw} = \&Storable::thaw;
172             1;
173             };
174              
175             }
176             }
177              
178              
179             # this subroutine reads data from pipe and converts it to perl reference
180             # or scalar - if size is negative
181             # it always expects size of frozen scalar so it know how many it should read
182             # to feed thaw
183             sub _get_data {
184 514317     514317   797477 my ($self,$fh) = @_;
185 514317         730455 my ($data_size,$data);
186 514317         1549965 $fh->sysread($data_size,4);
187 514317         537913060 $data_size = unpack("l",$data_size);
188 514317 100       1229405 return undef if $data_size == _EOF_; # this if for process_data terminating
189 514226 50       868330 if ($data_size == 0) {
190 0         0 $data = '';
191             } else {
192 514226         599182 my $length = abs($data_size);
193 514226         559415 my $offset = 0;
194             # allocate all the buffer for $data beforehand
195 514226         1773728 $data = sprintf("%${length}s","");
196 514226         1341652 while ($offset < $length) {
197 523954         1406490 my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
198 523954         1692368 $fh->sysread(my $buf,$chunk_size);
199             # use lvalue form of substr to copy data in preallocated buffer
200 523954         6900526 substr($data,$offset,$chunk_size) = $buf;
201 523954         2065259 $offset += $chunk_size;
202             }
203 514226 100       1296676 $data = $self->thaw($data) if $data_size<0;
204             }
205 514226         2327374 return $data;
206             }
207              
208             # this subroutine serialize data reference. otherwise
209             # it puts negative size of scalar and scalar itself to pipe.
210             sub _put_data {
211 515186     515186   787630 my ($self,$fh,$data) = @_;
212 515186 100       1161484 unless (defined($data)) {
213 960         4620 $fh->syswrite(pack("l", _EOF_));
214 960         2003040 return;
215             }
216 514226         846479 my $length = length($data);
217 514226 100       1212156 if (ref($data)) {
218 62058         143545 $data = $self->freeze($data);
219 62058         1483980 $length = -length($data);
220             }
221 514226         2532638 $fh->syswrite(pack("l", $length));
222 514226         45794584 $length = abs($length);
223 514226         666345 my $offset = 0;
224 514226         1797779 while ($offset < $length) {
225 523442         1210388 my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
226 523442         2150181 $fh->syswrite(substr($data,$offset,$chunk_size));
227 523442         34894645 $offset += $chunk_size;
228             }
229             }
230              
231             sub _fork_data_processor {
232 4277     4277   6316 my ($data_processor_callback) = @_;
233             # create processor as fork
234 4277         5967016 my $pid = fork();
235 4277 50       97103 unless (defined $pid) {
236             #print "say goodbye - can't fork!\n"; <>;
237 0         0 die "can't fork!";
238             }
239 4277 100       21117 if ($pid == 0) {
240             local $SIG{TERM} = sub {
241 0     0   0 exit;
242 91         41129 }; # exit silently from data processors
243             # data processor is eternal loop which wait for raw data on pipe from main
244             # data processor is killed when it's not needed anymore by _kill_data_processors
245 91         5133 $data_processor_callback->() while (1);
246 0         0 exit;
247             }
248 4186         1156062 return $pid;
249             }
250              
251             sub _create_data_processor {
252 4277     4277   8542 my ($self,$process_data_callback) = @_;
253              
254             # parent <=> child pipes
255 4277         88985 my ($parent_read, $parent_write) = pipely();
256 4277         9341 my ($child_read, $child_write) = pipely();
257              
258             my $data_processor = sub {
259 12113     12113   53902 local $_ = $self->_get_data($child_read);
260 12113 100       91819 unless (defined($_)) {
261 91         329622 exit 0;
262             }
263 12022         99122 $_ = $process_data_callback->($_);
264 12022         129774 $self->_put_data($parent_write,$_);
265 4277         25835 };
266              
267             # return data processor record
268             return {
269 4277         31600 pid => _fork_data_processor($data_processor), # needed to kill processor when there is no more data to process
270             child_write => $child_write, # pipe to write raw data from main to data processor
271             parent_read => $parent_read, # pipe to write raw data from main to data processor
272             };
273             }
274              
275             sub extract_param {
276 1509     1509 0 17234 my ($param, @alias) = @_;
277 1509     3676   29478 return first {defined($_)} map delete($param->{$_}), @alias;
  3676         10085  
278             }
279              
280             sub create_data_processors {
281 400     400 0 860 my ($self,$param) = @_;
282 400         1082 my $process_data_callback = extract_param($param,qw(process_data process processor map));
283 400         1757 my $number_of_data_processors = extract_param($param,qw(number_of_data_processors number_of_processors));
284 400 100       4303 $number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
285 400 50       4446 die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
286 400 50       1878 die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
287 400         6087 return [map $self->_create_data_processor($process_data_callback,$_), 0..$number_of_data_processors-1];
288             }
289              
290             sub load_data_processor {
291 502204     502204 0 824219 my ($self,$data,$processor) = @_;
292 502204         880228 $processor->{item_number} = $self->{item_number}++;
293 502204 50       849151 die "no support of data processing for undef items!" unless defined($data);
294 502204         712020 $processor->{busy} = 1;
295 502204         1623248 $self->_put_data($processor->{child_write},$data);
296             }
297              
298             sub busy_processors {
299 3343     3343 0 4126 my $self = shift;
300 3343         3863 return grep $_->{busy}, @{$self->{processors}};
  3343         23621  
301             }
302              
303             sub free_processors {
304 503495     503495 0 559785 my $self = shift;
305 503495         556777 return grep !$_->{busy}, @{$self->{processors}};
  503495         4070020  
306             }
307              
308             sub receive_and_merge_data {
309 502204     502204 0 625483 my $self = shift;
310 502204         700138 my ($processors,$ready) = @{$self}{qw(processors ready)};
  502204         1488246  
311 502204 100       1194173 $self->{ready} = $ready = [] unless $ready;
312 502204 100 66     2673571 @$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
313 502204         16238600 my $fh = shift(@$ready);
314 502204     4128754   2960240 my $processor = first {$_->{parent_read} == $fh} @$processors;
  4128754         7556372  
315 502204         1944527 local $_ = $self->_get_data($fh);
316 502204         937312 $processor->{busy} = undef; # make processor free
317 502204         1282555 $self->output_iterator($_,$processor->{item_number});
318             }
319              
320             sub _kill_data_processors {
321 60     60   300 my ($self) = @_;
322 60         600 my $processors = $self->{processors};
323 60         660 my @pid_to_kill = map $_->{pid}, @$processors;
324 60         900 my %pid_to_wait = map {$_=>undef} @pid_to_kill;
  960         3180  
325             # put undef to input of data_processor - they know it's time to exit
326 60         420 $self->_put_data($_->{child_write}) for @$processors;
327 60         1200 while (@pid_to_kill) {
328 960         68783640 my $pid = wait;
329 960         8160 delete $pid_to_wait{$pid};
330 960         39840 @pid_to_kill = keys %pid_to_wait;
331             }
332             }
333              
334             sub new {
335 400     400 0 707 my ($class, $param) = @_;
336 400         7521 my $self = {mypid=>$$};
337 400         2253 bless $self,$class;
338 400         2171 $self->set_input_iterator($param);
339             # item_number for merge implementation
340 400         3061 $self->{item_number} = 0;
341             # check if user want to use alternative serialisation routines
342 400         3092 $self->init_serializer($param);
343             # @$processors is array with data processor info
344 400         2338 $self->{processors} = $self->create_data_processors($param);
345             # data_merge is sub which merge all processed data inside parent thread
346             # it is called each time after process_data returns some new portion of data
347 309         10242 $self->set_output_iterator($param);
348 309         8205 my $not_supported = join ", ", keys %$param;
349 309 50       1168 die "Parameters are redundant or not supported:". $not_supported if $not_supported;
350 309         7532 return $self;
351             }
352              
353             sub DESTROY {
354 76     76   1368 my $self = shift;
355 76 100       40821 return unless $self->{mypid} == $$;
356 60         1200 $self->_kill_data_processors;
357             #semctl($self->{sem_id},0,IPC_RMID,0);
358             }
359              
360             =begin comment
361              
362             Why I copied IO::Pipely::pipely instead of use IO::Pipely qw(pipely)?
363             1. Do not depend on installation of additional module
364             2. I don't know (yet) how to win race condition:
365             A) In Makefile.PL I would to check if fork & pipe works on the platform before creating Makefile.
366             But I am not sure if it's ok that at that moment I can use pipely to create pipes.
367             so
368             B) to use pipely I have to create makefile
369             For now I decided just copy code for pipely into this module.
370             Then if I know how do win that race condition I will get rid of this code and
371             will use IO::Pipely qw(pipely) instead and
372             will add dependency on it.
373              
374             =end comment
375              
376             =cut
377              
378             # IO::Pipely is copyright 2000-2012 by Rocco Caputo.
379 92     92   99084 use Symbol qw(gensym);
  92         95680  
  92         8740  
380 92         552 use IO::Socket qw(
381             AF_UNIX
382             PF_INET
383             PF_UNSPEC
384             SOCK_STREAM
385             SOL_SOCKET
386             SOMAXCONN
387             SO_ERROR
388             SO_REUSEADDR
389             inet_aton
390             pack_sockaddr_in
391             unpack_sockaddr_in
392 92     92   95588 );
  92         3073260  
393 92     92   29900 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  92         184  
  92         4508  
394 92     92   276 use Errno qw(EINPROGRESS EWOULDBLOCK);
  92         184  
  92         98072  
395              
396             my (@oneway_pipe_types, @twoway_pipe_types);
397             if ($^O eq "MSWin32" or $^O eq "MacOS") {
398             @oneway_pipe_types = qw(inet socketpair pipe);
399             @twoway_pipe_types = qw(inet socketpair pipe);
400             }
401             elsif ($^O eq "cygwin") {
402             @oneway_pipe_types = qw(pipe inet socketpair);
403             @twoway_pipe_types = qw(inet pipe socketpair);
404             }
405             else {
406             @oneway_pipe_types = qw(pipe socketpair inet);
407             @twoway_pipe_types = qw(socketpair inet pipe);
408             }
409              
410             sub pipely {
411 8554     8554 0 17110 my %arg = @_;
412              
413 8554         45563 my $conduit_type = delete($arg{type});
414 8554   50     142561 my $debug = delete($arg{debug}) || 0;
415              
416             # Generate symbols to be used as filehandles for the pipe's ends.
417             #
418             # Filehandle autovivification isn't used for portability with older
419             # versions of Perl.
420              
421 8554         168374 my ($a_read, $b_write) = (gensym(), gensym());
422              
423             # Try the specified conduit type only. No fallback.
424              
425 8554 50       720938 if (defined $conduit_type) {
426 0 0       0 return ($a_read, $b_write) if _try_oneway_type(
427             $conduit_type, $debug, \$a_read, \$b_write
428             );
429             }
430              
431             # Otherwise try all available conduit types until one works.
432             # Conduit types that fail are discarded for speed.
433              
434 8554         53578 while (my $try_type = $oneway_pipe_types[0]) {
435 8554 50       76338 return ($a_read, $b_write) if _try_oneway_type(
436             $try_type, $debug, \$a_read, \$b_write
437             );
438 0         0 shift @oneway_pipe_types;
439             }
440              
441             # There's no conduit type left. Bummer!
442              
443 0 0       0 $debug and warn "nothing worked";
444 0         0 return;
445             }
446              
447             # Try a pipe by type.
448              
449             sub _try_oneway_type {
450 8554     8554   41371 my ($type, $debug, $a_read, $b_write) = @_;
451              
452             # Try a pipe().
453 8554 50       20035 if ($type eq "pipe") {
454 8554         76816 eval {
455 8554 50       382603 pipe($$a_read, $$b_write) or die "pipe failed: $!";
456             };
457              
458             # Pipe failed.
459 8554 50       21733 if (length $@) {
460 0 0       0 warn "pipe failed: $@" if $debug;
461 0         0 return;
462             }
463              
464 8554 50       15077 $debug and do {
465 0         0 warn "using a pipe";
466 0         0 warn "ar($$a_read) bw($$b_write)\n";
467             };
468              
469             # Turn off buffering. POE::Kernel does this for us, but
470             # someone might want to use the pipe class elsewhere.
471 8554         221457 select((select($$b_write), $| = 1)[0]);
472 8554         73105 return 1;
473             }
474              
475             # Try a UNIX-domain socketpair.
476 0 0         if ($type eq "socketpair") {
477 0           eval {
478 0 0         socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
479             or die "socketpair failed: $!";
480             };
481              
482 0 0         if (length $@) {
483 0 0         warn "socketpair failed: $@" if $debug;
484 0           return;
485             }
486              
487 0 0         $debug and do {
488 0           warn "using a UNIX domain socketpair";
489 0           warn "ar($$a_read) bw($$b_write)\n";
490             };
491              
492             # It's one-way, so shut down the unused directions.
493 0           shutdown($$a_read, 1);
494 0           shutdown($$b_write, 0);
495              
496             # Turn off buffering. POE::Kernel does this for us, but someone
497             # might want to use the pipe class elsewhere.
498 0           select((select($$b_write), $| = 1)[0]);
499 0           return 1;
500             }
501              
502             # Try a pair of plain INET sockets.
503 0 0         if ($type eq "inet") {
504 0           eval {
505 0           ($$a_read, $$b_write) = _make_socket();
506             };
507              
508 0 0         if (length $@) {
509 0 0         warn "make_socket failed: $@" if $debug;
510 0           return;
511             }
512              
513 0 0         $debug and do {
514 0           warn "using a plain INET socket";
515 0           warn "ar($$a_read) bw($$b_write)\n";
516             };
517              
518             # It's one-way, so shut down the unused directions.
519 0           shutdown($$a_read, 1);
520 0           shutdown($$b_write, 0);
521              
522             # Turn off buffering. POE::Kernel does this for us, but someone
523             # might want to use the pipe class elsewhere.
524 0           select((select($$b_write), $| = 1)[0]);
525 0           return 1;
526             }
527              
528             # There's nothing left to try.
529 0 0         $debug and warn "unknown pipely() socket type ``$type''";
530 0           return;
531             }
532              
533              
534             1;
535              
536             =head1 NAME
537              
538             C - parallel data processing conveyor
539              
540             =encoding utf-8
541              
542             =head1 SYNOPSIS
543              
544             use Parallel::DataPipe;
545             Parallel::DataPipe::run {
546             input => [1..100],
547             process => sub { "$_:$$" },
548             number_of_data_processors => 100,
549             output => sub { print "$_\n" },
550             };
551              
552              
553             =head1 DESCRIPTION
554              
555              
556             If you have some long running script processing data item by item
557             (having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc)
558             you can speed it up 4-20 times using parallel datapipe conveyour.
559             Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24!
560             And huge amount of memory: memory is cheap now.
561             So they are ready for parallel data processing.
562             With this script there is an easy and flexible way to use that power.
563              
564             So what are the benefits of this module?
565              
566             1) because it uses input_iterator it does not have to know all input data before starting parallel processing
567              
568             2) because it uses merge_data processed data is ready for using in main thread immediately.
569              
570             1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
571              
572             If you don't want to overload your database with multiple simultaneous queries
573             you make queries only within input_iterator and then process_data and then flush it with merge_data.
574             On the other hand you usually win if make queries in process_data and do a lot of data processors.
575             Possibly even more then physical cores if database queries takes a long time and then small amount to process.
576              
577             It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.
578              
579             Make tests and you will know.
580              
581             To (re)write your script for using all processing power of your server you have to find out:
582              
583             1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
584              
585             2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
586              
587             3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
588              
589             Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
590              
591             =head1 SUBROUTINES
592              
593             =head2 run
594              
595             This is subroutine which covers magic of parallelizing data processing.
596             It receives paramaters with these keys via hash ref.
597              
598             B - reference to array or subroutine which should return data item to be processed.
599             in case of subroutine it should return undef to signal EOF.
600             In case of array it uses it as queue, i.e. shift(@$array) until there is no data item,
601             This behaviour has been introduced in 0.06.
602             Also you can use these aliases:
603             input_iterator, queue, data
604              
605             Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
606             while new behaviour is to treat this parameter like a queue.
607             0.06 support old behaviour only for input_iterator,
608             while in the future it will behave as a queue to make life easier
609              
610             B - reference to subroutine which process data items. they are passed via $_ variable
611             Then it should return processed data. this subroutine is executed in forked process so don't
612             use any shared resources inside it.
613             Also you can update children state, but it will not affect parent state.
614             Also you can use these aliases:
615             process_data
616              
617             These parameters are optional and has reasonable defaults, so you change them only know what you do
618              
619             B - optional. either reference to a subroutine or array which receives processed data item.
620             subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number.
621             this subroutine is executed in parent thread, so you can rely on changes that it made.
622             if you don't specify this parameter array with processed data can be received as a subroutine result.
623             You can use this aliseases for this parameter:
624             merge_data, merge
625              
626             B - (optional) number of parallel data processors. if you don't specify,
627             it tries to find out a number of cpu cores
628             and create the same number of data processor children.
629             It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT.
630             If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env.
631             It makes sense to have explicit C
632             which possibly is greater then cpu cores number
633             if you are to use all slave DB servers in your environment
634             and making query to DB servers takes more time then processing returned data.
635             Otherwise it's optimal to have C equal to number of cpu cores.
636              
637             B, B - you can use alternative serializer.
638             for example if you know that you are working with array of words (0..65535) you can use
639             freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]}
640             which will reduce the amount of bytes exchanged between processes.
641             But do it as the last optimization resort only.
642             In fact automatic choise is quite good and efficient.
643             It uses encode_sereal and decode_sereal if Sereal module is found.
644             Otherwise it use Storable freeze and thaw.
645              
646             Note: run has also undocumented prototype for calling (\@\$) i.e.
647              
648             my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});
649              
650             This feature is experimental and can be removed. Use it at your own risk.
651              
652             =head2 pipeline
653              
654             pipeline() is a chain of run() (parallel data pipes) executed in parallel
655             and input for next pipe is implicitly got from previous one.
656              
657             run {input => \@queue, process => \&process, output => \@out}
658              
659             is the same as
660              
661             pipeline {input => \@queue, process => \&process, output => \@out}
662              
663             But with pipeline you can create chain of connected pipes and run all of them in parallel
664             like it's done in unix with processes pipeline.
665              
666             pipeline(
667             { input => \@queue, process => \&process1},
668             { process => \&process2},
669             { process => \&process3, output => sub {print "$_\n";} },
670             );
671              
672             And it works like in unix - input of next pipe is (implicitly) set to output from previous pipe.
673             You have to specify input for the first pipe explicitly (see example of parallel grep 'hello' below ).
674              
675             If you don't specify input for next pipe it is assumed that it is output from previous pipe like in unix.
676             Also this assumption that input of next pipe depends on output of previous is applied for algorithm
677             on prioritizing of execution of pipe processors.
678             As long as the very right (last in list) pipe has input items to process it executes it's data processors.
679             If this pipe has free processor that is not loaded with data then the processors from previous pipe are executed
680             to produce an input data for next pipe.
681             This is recursively applied for all chain of pipes.
682              
683             Here is parallel grep implemented in 40 lines of perl code:
684              
685             use List::More qw(part);
686             my @dirs = '.';
687             my @files;
688             pipeline(
689             # this pipe looks (recursively) for all files in specified @dirs
690             {
691             input => \@dirs,
692             process => sub {
693             my ($files,$dirs) = part -d?1:0,glob("$_/*");
694             return [$files,$dirs];
695             },
696             output => sub {
697             my ($files,$dirs) = @$_;
698             push @dirs,@$dirs;# recursion is here
699             push @files,@$files;
700             },
701             },
702             # this pipe grep files for word hello
703             {
704             input => \@files,
705             process => sub {
706             my ($file) = $_;
707             open my $fh, $file;
708             my @lines;
709             while (<$fh>) {
710             # line_number : line
711             push @lines,"$.:$_" if m{hello};
712             }
713             return [$file,\@lines];
714             },
715             output => sub {
716             my ($file,$lines) = @$_;
717             # print filename, line_number , line
718             print "$file:$_" for @$lines;
719             }
720             }
721             );
722              
723             =head1 HOW parallel pipe (run) WORKS
724              
725             1) Main thread (parent) forks C of children for processing data.
726              
727             2) As soon as data comes from C it sends it to next child using
728             pipe mechanizm.
729              
730             3) Child processes data and returns result back to parent using pipe.
731              
732             4) Parent firstly fills up all the pipes to children with data and then
733             starts to expect processed data on pipes from children.
734              
735             5) If it receives result from chidlren it sends processed data to C subroutine,
736             and starts loop 2) again.
737              
738             6) loop 2) continues until input data is ended (end of C array or C sub returned undef).
739              
740             7) In the end parent expects processed data from all busy chidlren and puts processed data to C
741              
742             8) After having all the children sent processed data they are killed and run returns to the caller.
743              
744             Note:
745             If C or returns reference, it serialize/deserialize data before/after pipe.
746             That way you have full control whether data will be serialized on IPC.
747              
748             =head1 SEE ALSO
749              
750             L
751              
752             L
753              
754             L
755              
756             L
757              
758             L - pipes that work almost everywhere
759              
760             L - portable multitasking and networking framework for any event loop
761              
762             L
763              
764             L
765              
766             =head1 DEPENDENCIES
767              
768             Only core modules are used.
769              
770             if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
771              
772             =head1 BUGS
773              
774             For all bugs please send an email to okharch@gmail.com.
775              
776             =head1 SOURCE REPOSITORY
777              
778             See the git source on github
779             L
780              
781             =head1 COPYRIGHT
782              
783             Copyright (c) 2013 Oleksandr Kharchenko
784              
785             All right reserved. This library is free software; you can redistribute it
786             and/or modify it under the same terms as Perl itself.
787              
788             =head1 AUTHOR
789              
790             Oleksandr Kharchenko
791              
792             =cut