File Coverage

blib/lib/Async/Simple/Pool.pm
Criterion Covered Total %
statement 100 101 99.0
branch 35 38 92.1
condition 8 10 80.0
subroutine 12 12 100.0
pod 6 6 100.0
total 161 167 96.4


line stmt bran cond sub pod time code
1             package Async::Simple::Pool;
2              
3             =head1 NAME
4              
5             Async::Simple::Pool - Simple manager of asyncronous tasks
6              
7             =head1 SYNOPSIS
8              
9             Simplest way:
10              
11             use Async::Simple::Pool;
12             use Data::Dumper;
13              
14             my $task = sub{
15             my $data = shift;
16              
17             return $data->{i} * 10;
18             };
19              
20             my $data = [ { i => 1 }, { i => 2 }, { i => 3 } ];
21              
22             my $pool = Async::Simple::Pool->new( $task, $data );
23              
24             my $result = $pool->process;
25              
26             say Dumper $result;
27              
28             $VAR1 = [
29             10,
30             20,
31             30
32             ];
33              
34              
35             Some other ways to do $pool->new(), using various param sets.
36              
37             Note: If you pass $data to $pool->new() then all processes will be started immediately.
38             You can call $pool->process after "new" and get your results in no time.
39              
40             my $pool = Async::Simple::Pool->new( $task, $data ); # Simplest way to create a new pool. Results will be available just after "new"
41              
42             my $pool = Async::Simple::Pool->new( %pool_params ); # Creates a new pool. The only one param "task" is required by default.
43              
44             my $pool = Async::Simple::Pool->new( $task, %pool_params ); # "$task" is required, al params are optional
45              
46             my $pool = Async::Simple::Pool->new( $data, %pool_params ); # $data - required, all pool_params except "task" are optional
47              
48             my $pool = Async::Simple::Pool->new( $task, $data, %pool_params ); # $task, $data - required, all pool_params are optional
49              
50              
51             By default "task" is required and must be a CodeRef.
52             For example:
53              
54             $task = sub { my $task_X_data = shift; some useful code; return $task_X_result };
55              
56              
57             $data can be ArrayRef of your tasks params.
58              
59             $data = [ $task_data1, $task_data2, ... ];
60              
61              
62             Also $data can be HashRef of your tasks params.
63             In this case you can pass any scalars as keys of this hash. They will be mirrored into result
64              
65             $data = { task_id1 => $task_data1, task_id2 => $task_data2, ... };
66              
67              
68             The "pool->new()" creates "$pool->tasks_count" count of "$pool->task_class objects".
69             By default task_class is "Async::Simple::Task::Fork".
70             In this case "$pool->tasks_count" processes will be preforked (10 by default).
71             Each of them starts to wait for data which will be provided by pool later.
72              
73              
74             This is the main dispatcher of pool. It behavior depends on %pool_params.
75             If you pass $data to $pool->process, this data will be added to execution.
76              
77             $results = $pool->process( $data );
78              
79              
80             Type of $result depends on pool params that you pass in $pool->new( %pool_params );
81             By default result is arrayref.
82              
83              
84             You can use these %pool_params:
85              
86             data - ArrayRef/HashRef. A data for tasks, as described above,
87              
88             tasks_count - Integer number of workers. 10 by default.
89              
90             flush_data - 1 - remove or 0 - don't remove results from pool, when they has been readed by $pool->process()
91              
92             result_type - list (list of ready results) / full_list (list of all results) / hash (hash of ready results)
93              
94             break_on - busy (when all workers are busy) / run(all data is executing) / done (all result are ready)
95              
96             task_class - see explanation below. For example 'Your::Task::Class';
97              
98             task_params - Any params you wish to pass to each task object to $task->new( %$here ).
99              
100              
101             The last way to use pool is to make your own task class.
102             You can make your own class of task. This class MUST has at least this code:
103              
104             package Your::Task::Class;
105              
106             use parent 'Async::Simple::Task';
107              
108             # Trying to read result.
109             # If result found, call $self->result( $result );
110             # If result is not ready, do nothing
111             sub get {
112             my $self = shift;
113              
114             return unless you have result;
115              
116             # result can be undef; Don't worry, all will be ok!
117             $self->result( $result );
118             };
119              
120             # Just push data to execution in your way
121             sub put {
122             my ( $self, $data ) = @_;
123              
124             $self->clear_answer; # Optional, if you plan to use your package regardlessly from this pool.
125              
126             # Pass your data to your processor here
127             }
128              
129             1;
130              
131              
132             =head1 DESCRIPTION
133              
134             Allows to work with pool of async processes.
135              
136             There are many other similar packages you can find on CPAN: Async::Queue, Anyevent::FIFO, Task::Queue, Proc::Simple.
137              
138             The main difference of this package is convenience and simplicity of usage.
139              
140              
141             =head1 METHODS
142              
143             $pool->new( various params as described above )
144              
145             $pool->process( $optional_data_ref )
146              
147              
148             =head1 SUPPORT AND DOCUMENTATION
149              
150             After installing, you can find documentation for this module with the
151             perldoc command.
152              
153             perldoc Async::Simple::Task
154              
155             You can also look for information at:
156              
157             RT, CPAN's request tracker (report bugs here)
158             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task
159              
160             AnnoCPAN, Annotated CPAN documentation
161             http://annocpan.org/dist/Async-Simple-Task
162              
163             CPAN Ratings
164             http://cpanratings.perl.org/d/Async-Simple-Task
165              
166             Search CPAN
167             http://search.cpan.org/dist/Async-Simple-Task/
168              
169              
170             =head1 AUTHOR
171              
172             ANTONC <antonc@cpan.org>
173              
174             =head1 LICENSE
175              
176             This program is free software; you can redistribute it and/or modify it
177             under the terms of the the Artistic License (2.0). You may obtain a
178             copy of the full license at:
179              
180             L<http://www.perlfoundation.org/artistic_license_2_0>
181              
182             =cut
183              
184              
185             # Async::Queue, Anyevent::FIFO - very similar to this, but have no enough sugar, has Anyevent dependence, has no prefork and fixed pool
186             # Task::Pool - wery similar, uses tasks, results represented as a spream
187             # Task::Queue - low level code
188             # Proc::Simple - wery similar byt not flexible enough
189              
190              
191 2     2   247041 use Modern::Perl;
  2         10913  
  2         20  
192 2     2   1289 use Moose;
  2         1062581  
  2         21  
193 2     2   18606 use namespace::autoclean;
  2         13899  
  2         8  
194 2     2   194 use Class::Load;
  2         6  
  2         99  
195 2     2   778 use Clone;
  2         5503  
  2         4250  
196              
197             our $VERSION = '0.13';
198              
199             =head2 data
200              
201             You can pass hashref or arrayref as data
202              
203             When it is array, then each item of it will be passed to task as task params
204             ids for internal format will be generated automatically by increasing from 0
205              
206             When is is hashref, then each value of hash will be passed to task as task params
207             ids for internal format will be the same as in your hash
208              
209             In both cases it converts to internal format:
210              
211             { id => { source => paramref1, result => if_processed1 }, { source => paramref2, result => if_processed2 }, ... };
212              
213             =cut
214              
215             has data => (
216             is => 'rw',
217             isa => 'HashRef[HashRef]',
218             default => sub { return {} },
219             );
220              
221              
222             =head2 tasks_count
223              
224             tasks_count - an integer number of tasks that will be created (defailt is 10).
225              
226             =cut
227              
228             has tasks_count => (
229             is => 'ro',
230             isa => 'Int',
231             required => 1,
232             default => 10,
233             );
234              
235              
236             =head2 flush_data
237              
238             flush_data - (1|0) - remove used data and results after is has been readed in $self->process;
239              
240             =cut
241              
242             has flush_data => (
243             is => 'rw',
244             isa => 'Str',
245             default => 0,
246             );
247              
248              
249             =head2 result_type
250              
251             defines structure of results. results_type = (hash|list|fulllist)
252              
253             when 'list' - returns all results as list without placing them to the order of source data
254              
255             when 'fulllist' - returns all results as list with the full accordance to the source data order and positions
256              
257             when 'hash' - resurns hash, where index is the position of corresponding source data item and value - result
258              
259             =cut
260              
261             has result_type => (
262             is => 'rw',
263             isa => 'Str',
264             default => 'fulllist',
265             );
266              
267              
268             =head2 break_on
269              
270             Condition of stopping waiting for results and do something other before next check.
271              
272             'busy' = $self->process will exit after filling all the tasks with tasks, without any checks
273              
274             'run' = $self->process will end straight after the last task started
275              
276             'done' = $self->process will wait until all the tasks have finished their work
277              
278             Default is 'done'
279              
280             =cut
281              
282             has break_on => (
283             is => 'rw',
284             isa => 'Str',
285             default => 'done',
286             );
287              
288              
289             =head2 task_class
290              
291             Task object class name. Default is 'Async::Simple::Fork'.
292              
293             =cut
294              
295             has task_class => (
296             is => 'rw',
297             isa => 'Str',
298             required => 1,
299             default => (
300             $^O =~ /^(dos|os2|MSWin32|NetWare)$/
301             ? 'Async::Simple::Task::ForkTmpFile'
302             : 'Async::Simple::Task::Fork'
303             ),
304             );
305              
306              
307             =head2 task_params
308              
309             Task init params.
310              
311             Pool will push all these params to task->new( here ).
312              
313             You can pass all these params directly into pool constructor.
314             In this case task_params will be separated by magic;)
315              
316             =cut
317              
318             has task_params => (
319             is => 'rw',
320             isa => 'HashRef',
321             );
322              
323              
324              
325             # Tasks - ArrayRef of task objects
326             has tasks => (
327             is => 'ro',
328             isa => 'ArrayRef',
329             lazy => 1,
330             required => 1,
331             builder => 'make_tasks',
332             );
333              
334              
335             # List of all internal keys of data
336             # desceases when a new process is added
337             has queue_keys => (
338             is => 'rw',
339             isa => 'ArrayRef',
340             default => sub { return [] },
341             );
342              
343              
344             # List of all internal keys of data
345             # desceases when we ask for result vith flush_data is setted
346             has all_keys => (
347             is => 'rw',
348             isa => 'ArrayRef',
349             default => sub { return [] },
350             );
351              
352              
353             =head2 new( some various params )
354              
355             some ways to call it:
356              
357             my $pool = Async::Simple::Pool->new( $task ); # Process should be started below.
358              
359             my $pool = Async::Simple::Pool->new( $task, \@data ); # Process will be started inside new.
360              
361             my $pool = Async::Simple::Pool->new( \@data, task => $task ); # Process will be started inside new.
362              
363             my $results = Async::Simple::Pool->new( $task, \@data )->results; # Just do everything and give me my results!
364              
365             my $pool = Async::Simple::Pool->new( task => $task ); # Minimal init with hash of params, all by default, process sould be started manually below
366              
367              
368             full list of params for default task type (Async::Simple::Fork) with default values.
369              
370             my $pp = Async::Simple::Pool->new(
371             tasks_count => 10,
372             break_on => 'done', # [ 'busy', 'run', 'done' ]
373             data => \@data,
374             task_class => 'Async::Simple::Fork',
375             task_params => { # Can be placed into pool params directly
376             task => $task,
377             timeout => 0.01,
378             },
379             );
380              
381             It is a good idea to run new() before gathering all this huge amount of data, and run $pool->process separately:
382              
383             my $pool = Async::Simple::Pool->new( $task );
384              
385             <collecting all your data after forking>
386              
387             my $results = $pool->process( \@your_data );
388              
389             =cut
390              
391              
392             # params parsing, some sugar
393             around BUILDARGS => sub {
394             my ( $orig, $class, @params ) = @_;
395              
396             my ( $task, $data );
397             $task = shift @params if ref $params[0] eq 'CODE';
398             $data = shift @params if ref $params[0];
399              
400             my %params = @params;
401              
402             my ( @new_keys, $keys );
403              
404             if ( $data ) {
405             ( $data, $keys ) = _conv_data_to_internal( {}, $data );
406             push @new_keys, @$keys;
407             };
408              
409             if ( $params{data} ) {
410             ( $data, $keys ) = _conv_data_to_internal( $data, $params{data} );
411             push @new_keys, @$keys;
412             };
413              
414             $params{task_params}{task} //= $task;
415             $params{data} = $data if $data;
416             $params{queue_keys} = \@new_keys;
417             $params{all_keys} = Clone::clone \@new_keys;
418              
419             my @task_param_names = grep !$class->can($_), keys %params;
420              
421             for ( @task_param_names ) {
422             $params{task_params}{$_} //= delete $params{$_};
423             };
424              
425             my $i = 0;
426              
427             return $class->$orig( %params );
428             };
429              
430              
431             =head2 BUILD
432              
433             Internal. Overrided init for magic with params.
434              
435             =cut
436              
437             sub BUILD {
438 20     20 1 68 my ( $self ) = @_;
439              
440 20         998 my $task_class = $self->task_class;
441              
442 20         379 Class::Load::load_class( $task_class );
443              
444 20   50     2402 my @bad_task_param_names = grep !$task_class->can($_), keys %{ $self->task_params // {} };
  20         824  
445              
446 20 50       118 if ( scalar @bad_task_param_names ) {
447 0         0 die 'Unknown params found: (' . join( ', ', @bad_task_param_names ) . ' )';
448             };
449              
450 20 100       50 if ( scalar keys %{ $self->data } ) {
  20         852  
451 15         80 $self->process;
452             }
453             };
454              
455              
456             =head2 process
457              
458             Main dispatcher of child tasks
459              
460             - writes data to tasks
461              
462             - checks for results
463              
464              
465             We don't care about all internal fails, dying or hang ons of your tasks.
466              
467             If your task can do something bad, please write workaround for this case inside your "sub".
468              
469             Will be called inside new() in case you pass data there.
470              
471             =cut
472              
473             sub process {
474 46     46 1 180019721 my ( $self, $new_data ) = @_;
475              
476 46 100       190 if ( $new_data ) {
477 4         198 my ( $data, $keys ) = _conv_data_to_internal( $self->data, $new_data );
478              
479 4         167 $self->data( $data );
480 4         16 push @{ $self->queue_keys }, @$keys;
  4         259  
481 4         20 push @{ $self->all_keys }, @$keys;
  4         165  
482             };
483              
484 46         2193 my $break_on_busy = $self->break_on eq 'busy';
485 46         1666 my $break_on_run = $self->break_on eq 'run';
486              
487 46         110 while( 1 ) {
488 217 100       597 $self->read_tasks() if grep $_->has_id, @{ $self->tasks };
  217         9076  
489 217         2304 $self->write_tasks();
490              
491 217 100       9023 last if $break_on_busy;
492              
493             # Has not started data
494 197 100       634 next if scalar @{ $self->queue_keys };
  197         9923  
495              
496 91 100       522 last if $break_on_run;
497              
498             # Has unprocessed data
499 89 100       370 next if grep $_->has_id, @{ $self->tasks };
  89         4202  
500              
501 24         75 last;
502             };
503              
504 46         349 return $self->results;
505             };
506              
507              
508             =head2 results
509              
510             Internal.
511             Returns all results that already gathered
512             by default returns hash, where keys equal to indexes of source data list
513             and values are the results for data at these indexes.
514              
515             =cut
516              
517             sub results {
518 46     46 1 185 my ( $self ) = @_;
519              
520 46         2157 my $data = $self->data;
521              
522 46         2012 my $is_list = $self->result_type =~ /list/;
523 46         6604 my $is_full = $self->result_type =~ /full/;
524              
525 46 100       225 my $results = $is_list ? [] : {};
526              
527 46         113 for ( @{ $self->all_keys } ) {
  46         1821  
528 741         2441 my $result = $data->{$_}->{result};
529 741         2020 my $has_result = exists $data->{$_}->{result};
530              
531 741 100 100     2058 next if !$is_full && !$has_result;
532              
533             $is_list
534             ? ( push @$results, $result )
535 591 100       1981 : ( $results->{$_} = $result );
536              
537 591 100 66     22169 if ( $self->flush_data && $has_result ) {
538 60         245 delete $data->{$_};
539             };
540             };
541              
542 46 100       1871 $self->all_keys( [ keys %$data ] ) if $self->flush_data;
543              
544 46         1138 return $results;
545             }
546              
547              
548             =head2 make_tasks
549              
550             Internal.
551             All tasks are created here.
552             Called from constructor.
553              
554             =cut
555              
556             sub make_tasks {
557 19     19 1 64 my ( $self ) = @_;
558              
559 19         49 my @tasks;
560 19         645 my $task_class = $self->task_class;
561              
562 19         754 for( 1 .. $self->tasks_count ) {
563 185         1222 push @tasks, $task_class->new( %{ $self->task_params } );
  185         10711  
564             };
565              
566 19         1192 return \@tasks;
567             };
568              
569              
570             =head2 read_tasks
571              
572             Internal.
573             Reads busy tasks.
574              
575             =cut
576              
577             sub read_tasks {
578 185     185 1 837 my ( $self ) = @_;
579              
580 185 50       1000 my @busy_tasks = grep $_->has_id, @{ $self->tasks } or return;
  185         7877  
581              
582 185         7995 my $data = $self->data;
583              
584 185         1012 for my $task ( @busy_tasks ) {
585 1732         89023 $task->clear_answer;
586 1732         14016 $task->get();
587              
588 1732 100       147209 next unless $task->has_answer;
589              
590 283         9561 $data->{ $task->id }->{result} = $task->answer;
591 283         11383 $task->clear_id;
592             };
593             };
594              
595              
596             =head2 write_tasks
597              
598             Internal.
599             Writes to free tasks.
600              
601             =cut
602              
603             sub write_tasks {
604 217     217 1 932 my ( $self ) = @_;
605              
606 217 100       613 my @free_tasks = grep !$_->has_id, @{ $self->tasks } or return;
  217         10639  
607              
608 75         3587 my $data = $self->data;
609              
610 75         334 for my $task ( @free_tasks ) {
611              
612 345         70317 my $pointer = shift @{ $self->queue_keys };
  345         14295  
613              
614 345 100       1145 return unless defined $pointer;
615              
616 303         10584 $task->id( $pointer );
617              
618             # just in case, if somebody did not care for this in "task" package
619 303         12235 $task->clear_answer;
620              
621 303         1691 $task->put( $data->{ $pointer }->{source} );
622             };
623             };
624              
625              
626             =head2 _conv_data_to_internal
627              
628             Internal.
629             Converts source data ( hashref or arrayref ) to internal representation ( hashref ).
630              
631             =cut
632              
633             sub _conv_data_to_internal {
634 19     19   80 my ( $int_data, $data ) = @_;
635              
636 19         48 my @new_keys;
637             my %new_data;
638              
639             # $pool->new( coderef, [ @data ], %params );
640 19 100       137 if ( ref $data eq 'ARRAY' ) {
    50          
641              
642             # Gets max integer index in existing source data indexes,
643 15   100     296 my $i = ( [ sort { $a <=> $b } grep /^\d+$/, keys %$int_data ]->[-1] || -1 ) + 1;
644              
645 15         231 push @new_keys, $_ for $i..$i+@$data-1;
646 15         111 %new_data = map { $_, { source => $data->[$_-$i] } } @new_keys;
  300         1462  
647              
648             }
649             # $pool->new( coderef, { %data }, %params );
650             elsif ( ref $data eq 'HASH' ) {
651 4         42 @new_keys = keys %$data;
652 4         21 %new_data = map { $_, { source => $data->{$_} } } @new_keys;
  13         96  
653             };
654              
655 19         371 return { %$int_data, %new_data }, \@new_keys;
656             };
657              
658              
659             __PACKAGE__->meta->make_immutable;