File Coverage

blib/lib/Async/Simple/Pool.pm
Criterion Covered Total %
statement 124 147 84.3
branch 49 84 58.3
condition 8 10 80.0
subroutine 14 15 93.3
pod 8 8 100.0
total 203 264 76.8


line stmt bran cond sub pod time code
1             package Async::Simple::Pool;
2              
3             =head1 NAME
4              
5             Async::Simple::Pool - Simple manager of asyncronous tasks
6              
7             =head1 SYNOPSIS
8              
9             Simplest way:
10              
11             use Async::Simple::Pool;
12             use Data::Dumper;
13              
14             my $task = sub{
15             my $data = shift;
16              
17             return $data->{i} * 10;
18             };
19              
20             my $data = [ { i => 1 }, { i => 2 }, { i => 3 } ];
21              
22             my $pool = Async::Simple::Pool->new( $task, $data );
23              
24             my $result = $pool->process;
25              
26             say Dumper $result;
27              
28             $VAR1 = [
29             10,
30             20,
31             30
32             ];
33              
34              
35             Some other ways to do $pool->new(), using various param sets.
36              
37             Note: If you pass $data to $pool->new() then all processes will be started immediately.
38             You can call $pool->process after "new" and get your results in no time.
39              
40             my $pool = Async::Simple::Pool->new( $task, $data ); # Simplest way to create a new pool. Results will be available just after "new"
41              
42             my $pool = Async::Simple::Pool->new( %pool_params ); # Creates a new pool. The only one param "task" is required by default.
43              
44             my $pool = Async::Simple::Pool->new( $task, %pool_params ); # "$task" is required, al params are optional
45              
46             my $pool = Async::Simple::Pool->new( $data, %pool_params ); # $data - required, all pool_params except "task" are optional
47              
48             my $pool = Async::Simple::Pool->new( $task, $data, %pool_params ); # $task, $data - required, all pool_params are optional
49              
50              
51             By default "task" is required and must be a CodeRef.
52             For example:
53              
54             $task = sub { my $task_X_data = shift; some useful code; return $task_X_result };
55              
56              
57             $data can be ArrayRef of your tasks params.
58              
59             $data = [ $task_data1, $task_data2, ... ];
60              
61              
62             Also $data can be HashRef of your tasks params.
63             In this case you can pass any scalars as keys of this hash. They will be mirrored into result
64              
65             $data = { task_id1 => $task_data1, task_id2 => $task_data2, ... };
66              
67              
68             The "pool->new()" creates "$pool->tasks_count" count of "$pool->task_class objects".
69             By default task_class is "Async::Simple::Task::Fork".
70             In this case "$pool->tasks_count" processes will be preforked (10 by default).
71             Each of them starts to wait for data which will be provided by pool later.
72              
73              
74             This is the main dispatcher of pool. It behavior depends on %pool_params.
75             If you pass $data to $pool->process, this data will be added to execution.
76              
77             $results = $pool->process( $data );
78              
79              
80             Type of $result depends on pool params that you pass in $pool->new( %pool_params );
81             By default result is arrayref.
82              
83              
84             You can use these %pool_params:
85              
86             data - ArrayRef/HashRef. A data for tasks, as described above,
87              
88             tasks_count - Integer number of workers. 10 by default.
89              
90             flush_data - 1 - remove or 0 - don't remove results from pool, when they has been readed by $pool->process()
91              
92             result_type - list (list of ready results) / full_list (list of all results) / hash (hash of ready results)
93              
94             break_on - busy (when all workers are busy) / run(all data is executing) / done (all result are ready)
95              
96             task_class - see explanation below. For example 'Your::Task::Class';
97              
98             task_params - Any params you wish to pass to each task object to $task->new( %$here ).
99              
100              
101             The last way to use pool is to make your own task class.
102             You can make your own class of task. This class MUST has at least this code:
103              
104             package Your::Task::Class;
105              
106             use parent 'Async::Simple::Task';
107              
108             # Trying to read result.
109             # If result found, call $self->result( $result );
110             # If result is not ready, do nothing
111             sub get {
112             my $self = shift;
113              
114             return unless you have result;
115              
116             # result can be undef; Don't worry, all will be ok!
117             $self->result( $result );
118             };
119              
120             # Just push data to execution in your way
121             sub put {
122             my ( $self, $data ) = @_;
123              
124             $self->clear_answer; # Optional, if you plan to use your package regardlessly from this pool.
125              
126             # Pass your data to your processor here
127             }
128              
129             1;
130              
131              
132             =head1 DESCRIPTION
133              
134             Allows to work with pool of async processes.
135              
136             There are many other similar packages you can find on CPAN: Async::Queue, Anyevent::FIFO, Task::Queue, Proc::Simple.
137              
138             The main difference of this package is convenience and simplicity of usage.
139              
140              
141             =head1 METHODS
142              
143             $pool->new( various params as described above )
144              
145             $pool->process( $optional_data_ref )
146              
147              
148             =head1 SUPPORT AND DOCUMENTATION
149              
150             After installing, you can find documentation for this module with the
151             perldoc command.
152              
153             perldoc Async::Simple::Task
154              
155             You can also look for information at:
156              
157             RT, CPAN's request tracker (report bugs here)
158             http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task
159              
160             AnnoCPAN, Annotated CPAN documentation
161             http://annocpan.org/dist/Async-Simple-Task
162              
163             CPAN Ratings
164             http://cpanratings.perl.org/d/Async-Simple-Task
165              
166             Search CPAN
167             http://search.cpan.org/dist/Async-Simple-Task/
168              
169              
170             =head1 AUTHOR
171              
172             ANTONC <antonc@cpan.org>
173              
174             =head1 LICENSE
175              
176             This program is free software; you can redistribute it and/or modify it
177             under the terms of the the Artistic License (2.0). You may obtain a
178             copy of the full license at:
179              
180             L<http://www.perlfoundation.org/artistic_license_2_0>
181              
182             =cut
183              
184              
185             # Async::Queue, Anyevent::FIFO - very similar to this, but have no enough sugar, has Anyevent dependence, has no prefork and fixed pool
186             # Task::Pool - wery similar, uses tasks, results represented as a spream
187             # Task::Queue - low level code
188             # Proc::Simple - wery similar byt not flexible enough
189              
190              
191 5     5   574739 use Modern::Perl;
  5         10346  
  5         47  
192 5     5   3000 use Moose;
  5         2159915  
  5         41  
193 5     5   45185 use namespace::autoclean;
  5         31267  
  5         20  
194 5     5   344 use Class::Load;
  5         12  
  5         151  
195 5     5   1277 use Clone;
  5         8908  
  5         217  
196 5     5   2122 use JSON::XS;
  5         17868  
  5         10232  
197              
198             our $VERSION = '0.18';
199              
200             =head2 data
201              
202             You can pass hashref or arrayref as data
203              
204             When it is array, then each item of it will be passed to task as task params
205             ids for internal format will be generated automatically by increasing from 0
206              
207             When is is hashref, then each value of hash will be passed to task as task params
208             ids for internal format will be the same as in your hash
209              
210             In both cases it converts to internal format:
211              
212             { id => { source => paramref1, result => if_processed1 }, { source => paramref2, result => if_processed2 }, ... };
213              
214             =cut
215              
216             has data => (
217             is => 'rw',
218             isa => 'HashRef[HashRef]',
219             default => sub { return {} },
220             );
221              
222              
223             =head2 tasks_count
224              
225             tasks_count - an integer number of tasks that will be created (defailt is 10).
226              
227             =cut
228              
229             has tasks_count => (
230             is => 'ro',
231             isa => 'Int',
232             required => 1,
233             default => 10,
234             );
235              
236              
237             =head2 flush_data
238              
239             flush_data - (1|0) - remove used data and results after is has been readed in $self->process;
240              
241             =cut
242              
243             has flush_data => (
244             is => 'rw',
245             isa => 'Str',
246             default => 0,
247             );
248              
249              
250             =head2 result_type
251              
252             defines structure of results. results_type = (hash|list|fulllist)
253              
254             when 'list' - returns all results as list without placing them to the order of source data
255              
256             when 'fulllist' - returns all results as list with the full accordance to the source data order and positions
257              
258             when 'hash' - resurns hash, where index is the position of corresponding source data item and value - result
259              
260             =cut
261              
262             has result_type => (
263             is => 'rw',
264             isa => 'Str',
265             default => 'fulllist',
266             );
267              
268              
269             =head2 break_on
270              
271             Condition of stopping waiting for results and do something other before next check.
272              
273             'busy' = $self->process will exit after filling all the tasks with tasks, without any checks
274              
275             'run' = $self->process will end straight after the last task started
276              
277             'done' = $self->process will wait until all the tasks have finished their work
278              
279             Default is 'done'
280              
281             =cut
282              
283             has break_on => (
284             is => 'rw',
285             isa => 'Str',
286             default => 'done',
287             );
288              
289              
290             =head2 task_class
291              
292             Task object class name. Default is 'Async::Simple::Fork'.
293              
294             =cut
295              
296             has task_class => (
297             is => 'rw',
298             isa => 'Str',
299             required => 1,
300             default => (
301             $^O =~ /^(dos|os2|MSWin32|NetWare)$/
302             ? 'Async::Simple::Task::ForkTmpFile'
303             : 'Async::Simple::Task::Fork'
304             ),
305             );
306              
307              
308             =head2 task_params
309              
310             Task init params.
311              
312             Pool will push all these params to task->new( here ).
313              
314             You can pass all these params directly into pool constructor.
315             In this case task_params will be separated by magic;)
316              
317             =cut
318              
319             has task_params => (
320             is => 'rw',
321             isa => 'HashRef',
322             );
323              
324              
325             =head2 logger
326              
327             Something that can write your logs
328             It can be one of types: CodeRef, FileHandle, Str, Int
329              
330             In case of CodeRef, we will call it with one param = 'text to log'
331             In case of FileHandle, we will try to write in it
332             In case of Str, we try to interprete it as file_name and write into it
333             Also you can pass 'stdout' or 'stderr' as string
334              
335             By default logger is undefined, so nobody writes nothing to nowhere
336              
337             =cut
338              
339             has logger => (
340             is => 'rw',
341             isa => 'CodeRef | FileHandle | Str',
342             );
343              
344              
345              
346             # Tasks - ArrayRef of task objects
347             has tasks => (
348             is => 'ro',
349             isa => 'ArrayRef',
350             lazy => 1,
351             required => 1,
352             builder => 'make_tasks',
353             );
354              
355              
356             # List of all internal keys of data
357             # desceases when a new process is added
358             has queue_keys => (
359             is => 'rw',
360             isa => 'ArrayRef',
361             default => sub { return [] },
362             );
363              
364              
365             # List of all internal keys of data
366             # desceases when we ask for result vith flush_data is setted
367             has all_keys => (
368             is => 'rw',
369             isa => 'ArrayRef',
370             default => sub { return [] },
371             );
372              
373              
374             =head2 new( some various params )
375              
376             some ways to call it:
377              
378             my $pool = Async::Simple::Pool->new( $task ); # Process should be started below.
379              
380             my $pool = Async::Simple::Pool->new( $task, \@data ); # Process will be started inside new.
381              
382             my $pool = Async::Simple::Pool->new( \@data, task => $task ); # Process will be started inside new.
383              
384             my $results = Async::Simple::Pool->new( $task, \@data )->results; # Just do everything and give me my results!
385              
386             my $pool = Async::Simple::Pool->new( task => $task ); # Minimal init with hash of params, all by default, process sould be started manually below
387              
388              
389             full list of params for default task type (Async::Simple::Fork) with default values.
390              
391             my $pp = Async::Simple::Pool->new(
392             tasks_count => 10,
393             break_on => 'done', # [ 'busy', 'run', 'done' ]
394             data => \@data,
395             task_class => 'Async::Simple::Fork',
396             task_params => { # Can be placed into pool params directly
397             task => $task,
398             timeout => 0.01,
399             },
400             );
401              
402             It is a good idea to run new() before gathering all this huge amount of data, and run $pool->process separately:
403              
404             my $pool = Async::Simple::Pool->new( $task );
405              
406             <collecting all your data after forking>
407              
408             my $results = $pool->process( \@your_data );
409              
410             =cut
411              
412              
413             # params parsing, some sugar
414             around BUILDARGS => sub {
415             my ( $orig, $class, @params ) = @_;
416              
417             my ( $task, $data );
418             $task = shift @params if ref $params[0] eq 'CODE';
419             $data = shift @params if ref $params[0];
420              
421             my %params = @params;
422              
423             # Hack for earlier logginf
424             bless( Clone::clone( \%params ), $class )->log( 'INIT: Received params', \%params );
425              
426             my ( @new_keys, $keys );
427              
428             if ( $data ) {
429             ( $data, $keys ) = _conv_data_to_internal( {}, $data );
430             push @new_keys, @$keys;
431             };
432              
433             if ( $params{data} ) {
434             ( $data, $keys ) = _conv_data_to_internal( $data, $params{data} );
435             push @new_keys, @$keys;
436             };
437              
438             $params{task_params}{task} //= $task;
439             $params{data} = $data if $data;
440             $params{queue_keys} = \@new_keys;
441             $params{all_keys} = Clone::clone \@new_keys;
442              
443             my @task_param_names = grep !$class->can($_), keys %params;
444              
445             for ( @task_param_names ) {
446             $params{task_params}{$_} //= delete $params{$_};
447             };
448              
449             my $i = 0;
450              
451             # Hack for earlier logginf
452             bless( Clone::clone( \%params ), $class )->log( 'INIT: Parsed params', \%params );
453              
454             return $class->$orig( %params );
455             };
456              
457              
458             =head2 BUILD
459              
460             Internal. Overrided init for magic with params.
461              
462             =cut
463              
464             sub BUILD {
465 20     20 1 102 my ( $self ) = @_;
466              
467 20         769 my $task_class = $self->task_class;
468              
469 20         81 $self->log( 'BUILD: task class', $task_class );
470              
471 20         217 Class::Load::load_class( $task_class );
472              
473 20   50     2934 my @bad_task_param_names = grep !$task_class->can($_), keys %{ $self->task_params // {} };
  20         712  
474              
475 20 50       97 if ( scalar @bad_task_param_names ) {
476 0         0 $self->log( 'BUILD: bad_task_param_names', \@bad_task_param_names );
477 0         0 die 'Unknown params found: (' . join( ', ', @bad_task_param_names ) . ' )';
478             };
479              
480 20 100       55 if ( scalar keys %{ $self->data } ) {
  20         674  
481 15         67 $self->log( 'BUILD', '$self->process called' );
482 15         61 $self->process;
483             }
484             };
485              
486              
487             =head2 process
488              
489             Main dispatcher of child tasks
490              
491             - writes data to tasks
492              
493             - checks for results
494              
495              
496             We don't care about all internal fails, dying or hang ons of your tasks.
497              
498             If your task can do something bad, please write workaround for this case inside your "sub".
499              
500             Will be called inside new() in case you pass data there.
501              
502             =cut
503              
504             sub process {
505 46     46 1 180034604 my ( $self, $new_data ) = @_;
506              
507 46 100       208 if ( $new_data ) {
508 4 50       130 $self->log( 'PROCESS: new data received', $new_data ) if $self->logger;
509              
510 4         112 my ( $data, $keys ) = _conv_data_to_internal( $self->data, $new_data );
511              
512 4 50       129 $self->log( 'PROCESS: new data parsed', $data ) if $self->logger;
513              
514 4         127 $self->data( $data );
515 4         8 push @{ $self->queue_keys }, @$keys;
  4         152  
516 4         8 push @{ $self->all_keys }, @$keys;
  4         106  
517             };
518              
519 46         1857 my $break_on_busy = $self->break_on eq 'busy';
520 46         1437 my $break_on_run = $self->break_on eq 'run';
521              
522 46         117 while( 1 ) {
523 207 50       9627 $self->log( 'PROCESS', 'internal cycle unless exit condition' ) if $self->logger;
524              
525 207 100       839 $self->read_tasks() if grep $_->has_id, @{ $self->tasks };
  207         9181  
526 207         1510 $self->write_tasks();
527              
528 207 100       12152 if ( $break_on_busy ) {
529 20 50       808 $self->log( 'PROCESS', 'internal cycle exit: all threads are busy' ) if $self->logger;
530 20         68 last;
531             }
532              
533             # Has not started data
534 187 100       430 next if scalar @{ $self->queue_keys };
  187         8258  
535              
536 81 100       504 if ( $break_on_run ) {
537 2 50       68 $self->log( 'PROCESS', 'internal cycle exit: all tasks are started' ) if $self->logger;
538 2         6 last;
539             }
540              
541             # Has unprocessed data
542 79 100       181 next if grep $_->has_id, @{ $self->tasks };
  79         3421  
543              
544 24 50       756 $self->log( 'PROCESS', 'internal cycle exit: all tasks done' ) if $self->logger;
545 24         58 last;
546             };
547              
548 46 50       1485 $self->log( 'PROCESS: finished', $self->results ) if $self->logger;
549              
550 46         204 return $self->results;
551             };
552              
553              
554             =head2 results
555              
556             Internal.
557             Returns all results that already gathered
558             by default returns hash, where keys equal to indexes of source data list
559             and values are the results for data at these indexes.
560              
561             =cut
562              
563             sub results {
564 46     46 1 158 my ( $self ) = @_;
565              
566 46         1533 my $data = $self->data;
567              
568 46         1878 my $is_list = $self->result_type =~ /list/;
569 46         1469 my $is_full = $self->result_type =~ /full/;
570              
571 46 100       395 my $results = $is_list ? [] : {};
572              
573 46         125 for ( @{ $self->all_keys } ) {
  46         1881  
574 741         3091 my $result = $data->{$_}->{result};
575 741         1587 my $has_result = exists $data->{$_}->{result};
576              
577 741 100 100     2304 next if !$is_full && !$has_result;
578              
579             $is_list
580             ? ( push @$results, $result )
581 591 100       1945 : ( $results->{$_} = $result );
582              
583 591 100 66     19094 if ( $self->flush_data && $has_result ) {
584 60         281 delete $data->{$_};
585             };
586             };
587              
588 46 100       1889 $self->all_keys( [ keys %$data ] ) if $self->flush_data;
589              
590 46         976 return $results;
591             }
592              
593              
594             =head2 make_tasks
595              
596             Internal.
597             All tasks are created here.
598             Called from constructor.
599              
600             =cut
601              
602             sub make_tasks {
603 19     19 1 56 my ( $self ) = @_;
604              
605 19         59 my @tasks;
606 19         629 my $task_class = $self->task_class;
607              
608 19         707 for( 1 .. $self->tasks_count ) {
609 185         606 my $task = $task_class->new( %{ $self->task_params } );
  185         7978  
610 185         2608 push @tasks, $task;
611              
612 185 50       9956 $self->log( 'NEW THREAD ADDED', { ref $task => {%$task} } ) if $self->logger;
613             };
614              
615 19         1162 return \@tasks;
616             };
617              
618              
619             =head2 read_tasks
620              
621             Internal.
622             Reads busy tasks.
623              
624             =cut
625              
626             sub read_tasks {
627 175     175 1 713 my ( $self ) = @_;
628              
629 175 50       513 my @busy_tasks = grep $_->has_id, @{ $self->tasks } or return;
  175         30126  
630              
631 175 50       7548 $self->log( 'READ TASKS', { busy_tasks_found => scalar @busy_tasks } ) if $self->logger;
632              
633 175         6852 my $data = $self->data;
634              
635 175         1157 for my $task ( @busy_tasks ) {
636 1695         91842 $task->clear_answer;
637 1695         13050 $task->get();
638              
639 1695 100       138023 unless ( $task->has_answer ) {
640 1412 50       79646 $self->log( 'READ TASKS NO ANSWER', { id => $task->id } ) if $self->logger;
641 1412         8520 next;
642             };
643              
644 283 50       25101 $self->log( 'READ TASKS GOT ANSWER', { id => $task->id, answer => $task->answer } ) if $self->logger;
645              
646 283         8665 $data->{ $task->id }->{result} = $task->answer;
647 283         9602 $task->clear_id;
648             };
649             };
650              
651              
652             =head2 write_tasks
653              
654             Internal.
655             Writes to free tasks.
656              
657             =cut
658              
659             sub write_tasks {
660 207     207 1 709 my ( $self ) = @_;
661              
662 207 100       686 my @free_tasks = grep !$_->has_id, @{ $self->tasks } or return;
  207         14069  
663              
664 67 50       2221 $self->log( 'WRITE TASKS', { free_tasks_found => scalar @free_tasks } ) if $self->logger;
665              
666 67         2551 my $data = $self->data;
667              
668 67         293 for my $task ( @free_tasks ) {
669              
670 338         70608 my $pointer = shift @{ $self->queue_keys };
  338         13887  
671              
672 338 100       1148 return unless defined $pointer;
673              
674 303 50       10089 $self->log( 'WRITE TASKS: TASK ADDED', { id => $pointer, data => $data->{ $pointer }->{source} } ) if $self->logger;
675              
676 303         10704 $task->id( $pointer );
677              
678             # just in case, if somebody did not care for this in "task" package
679 303         11825 $task->clear_answer;
680              
681 303         1495 $task->put( $data->{ $pointer }->{source} );
682             };
683             };
684              
685              
686             =head2 _conv_data_to_internal
687              
688             Internal.
689             Converts source data ( hashref or arrayref ) to internal representation ( hashref ).
690              
691             =cut
692              
693             sub _conv_data_to_internal {
694 19     19   56 my ( $int_data, $data ) = @_;
695              
696 19         47 my @new_keys;
697             my %new_data;
698              
699             # $pool->new( coderef, [ @data ], %params );
700 19 100       107 if ( ref $data eq 'ARRAY' ) {
    50          
701              
702             # Gets max integer index in existing source data indexes,
703 15   100     339 my $i = ( [ sort { $a <=> $b } grep /^\d+$/, keys %$int_data ]->[-1] || -1 ) + 1;
704              
705 15         198 push @new_keys, $_ for $i..$i+@$data-1;
706 15         53 %new_data = map { $_, { source => $data->[$_-$i] } } @new_keys;
  300         1195  
707              
708             }
709             # $pool->new( coderef, { %data }, %params );
710             elsif ( ref $data eq 'HASH' ) {
711 4         31 @new_keys = keys %$data;
712 4         11 %new_data = map { $_, { source => $data->{$_} } } @new_keys;
  13         56  
713             };
714              
715 19         353 return { %$int_data, %new_data }, \@new_keys;
716             };
717              
718              
719             =head2 fmt_log_text
720              
721             Internal.
722             Adding extra data to logging text
723             yyyy-mm-dd hh:mm:ss (Program)[pid]: $text
724              
725             =cut
726              
727             sub fmt_log_text {
728 0     0 1 0 my ( $self, $action, $text ) = @_;
729              
730 0 0       0 unless ( defined $text ) {
731 0         0 $text = $action;
732 0         0 $action = 'DEFAULT';
733             };
734              
735 0         0 my ( $ss, $mi, $hh, $dd, $mm, $yyyy ) = localtime();
736 0         0 $yyyy += 1900;
737 0         0 my $date_time = sprintf '%04d-%02d-%02d %02d:%02d:%02d', $yyyy, $mm, $dd, $hh, $mi, $ss;
738              
739 0 0       0 if ( ref $text ) {
740 0         0 $text = JSON::XS->new->allow_unknown->allow_blessed->encode( $text );
741             }
742              
743 0         0 return sprintf "%s\t%s\t%s\t%s\t%s", $date_time, $$, $0, $action, $text;
744             }
745              
746              
747             =head2 log
748              
749             Internal.
750             Writes pool log
751              
752             =cut
753              
754             sub log {
755 75     75 1 347 my ( $self, $action, $text ) = @_;
756              
757             # No logger - no problems
758 75 50       2559 my $logger = $self->logger or return;
759              
760 0           my $log_text = $self->fmt_log_text( $action, $text );
761              
762 0 0         if ( ref $logger eq 'CODE' ) {
    0          
    0          
    0          
763 0           $logger->( $log_text );
764             }
765             elsif ( ref $logger eq 'GLOB' ) {
766 0 0         die "logger file $logger not found" unless -f $logger;
767 0 0         die "logger file $logger not found" unless -w $logger;
768 0           say $logger $log_text;
769             }
770             elsif ( $logger =~ /^stdout$/ai ) {
771 0           say STDOUT $log_text;
772             }
773             elsif ( $logger =~ /^stderr$/ai ) {
774 0           say STDERR $log_text;
775             }
776             else {
777 0 0         open ( my $f, '>>', $logger ) or die 'can`t open log file ' . $logger;
778 0           $self->logger( $f );
779 0           $self->log( $text );
780             }
781             };
782              
783             __PACKAGE__->meta->make_immutable;