File Coverage

blib/lib/Parallel/Pipes/App.pm
Criterion Covered Total %
statement 63 64 98.4
branch 22 32 68.7
condition n/a
subroutine 8 8 100.0
pod 2 2 100.0
total 95 106 89.6


line stmt bran cond sub pod time code
1             package Parallel::Pipes::App;
2 22     22   285450 use strict;
  22         55  
  22         803  
3 22     22   110 use warnings;
  22         44  
  22         1243  
4              
5 22     22   4180 use Parallel::Pipes;
  22         110  
  22         18216  
6              
7             our $VERSION = '0.201';
8              
9 681 50   681   3136 sub _min { $_[0] < $_[1] ? $_[0] : $_[1] }
10              
11             sub run {
12 41     41 1 168274 my ($class, %argv) = @_;
13              
14 41 50       155 my $work = $argv{work} or die "need 'work' argument\n";
15 41 50       128 my $num = $argv{num} or die "need 'num' argument\n";
16 41 50       140 my $tasks = $argv{tasks} or die "need 'tasks' argument\n";
17 41         76 my $before_work = $argv{before_work};
18 41         71 my $after_work = $argv{after_work};
19 41         73 my $init_work = $argv{init_work};
20 41         53 my $idle_tick = $argv{idle_tick};
21 41         88 my $idle_work = $argv{idle_work};
22              
23 41 100       419 my $pipes = Parallel::Pipes->new(
24             $num,
25             $work,
26             $idle_tick ? { idle_tick => $idle_tick, idle_work => $idle_work } : (),
27             );
28 26 100       142 $init_work->($pipes) if $init_work;
29 26         2243 while (1) {
30 736         4419 my @ready = $pipes->is_ready;
31 736 100       1684 if (my @written = grep { $_->is_written } @ready) {
  879         3704  
32 681         1531 for my $written (@written) {
33 718         2661 my $result = $written->read;
34 718 50       3795 $after_work->($result, $written) if $after_work;
35             }
36             }
37 736 100       5140 if (@$tasks) {
38 681         1028 my $min = _min $#{$tasks}, $#ready;
  681         3141  
39 681         2425 for my $i (0 .. $min) {
40 750         2205 my $task = shift @$tasks;
41 750 100       1563 $before_work->($task, $ready[$i]) if $before_work;
42 750         4269 $ready[$i]->write($task);
43             }
44             } else {
45 55 100       352 if (@ready == $num) {
46 26         90 last;
47             } else {
48 29 50       165 if (my @written = $pipes->is_written) {
49 29         209 my @ready = $pipes->is_ready(@written);
50 29         188 for my $written (@ready) {
51 32         3671 my $result = $written->read;
52 32 50       182 $after_work->($result, $written) if $after_work;
53             }
54             } else {
55 0         0 die "unexpected";
56             }
57             }
58             }
59             }
60 26         458 $pipes->close;
61 26         523 1;
62             }
63              
64             sub map :method {
65 22     22 1 3703832 my ($class, %argv) = @_;
66              
67 22 50       88 my $orig_num = $argv{num} or die "need 'num' argument\n";
68 22 50       143 my $orig_tasks = $argv{tasks} or die "need 'tasks' argument\n";
69 22 50       77 my $orig_work = $argv{work} or die "need 'work' argument\n";
70              
71 22         44 my @task = map { [$_, $orig_tasks->[$_]] } 0..$#{$orig_tasks};
  682         836  
  22         132  
72             my $work = sub {
73 372     372   568 my ($index, $task) = @{$_[0]};
  372         869  
74 372         1125 my $result = $orig_work->($task);
75 372         3828790 [$index, $result];
76 22         88 };
77 22         44 my @result;
78             my $after_work = sub {
79 527     527   933 my ($index, $result) = @{$_[0]};
  527         1562  
80 527         2015 $result[$index] = $result;
81 22         99 };
82 22         121 $class->run(num => $orig_num, work => $work, tasks => \@task, after_work => $after_work);
83 17         506 @result;
84             }
85              
86             1;
87             __END__