File Coverage

blib/lib/Async/Util.pm
Criterion Covered Total %
statement 166 174 95.4
branch 38 66 57.5
condition 12 24 50.0
subroutine 20 20 100.0
pod 3 3 100.0
total 239 287 83.2


line stmt bran cond sub pod time code
1             package Async::Util;
2             BEGIN {
3 1     1   37618 $Async::Util::VERSION = '0.01';
4             }
5             # ABSTRACT: utilities for doing common async operations
6 1     1   9 use strict;
  1         2  
  1         29  
7 1     1   5 use warnings;
  1         2  
  1         44  
8 1     1   20 use v5.10;
  1         4  
  1         66  
9 1     1   4 no warnings 'recursion';
  1         2  
  1         37  
10 1     1   5 use Carp;
  1         2  
  1         58  
11 1     1   5 use Exporter;
  1         2  
  1         40  
12 1     1   5 use Scalar::Util qw(weaken);
  1         2  
  1         1695  
13              
14             our @ISA = qw(Exporter);
15             our @EXPORT_OK = qw(amap azipmap achain);
16             my $DEFAULT_AT_A_TIME = 100;
17              
18             sub amap {
19 2     2 1 6352 my (%args) = @_;
20              
21 2 100 66     18 return _amap_ignore(%args) if exists $args{output} && !$args{output};
22 1         6 return _amap(%args);
23             }
24              
25             sub _amap {
26 1     1   3 my (%args) = @_;
27              
28 1         3 my $action = $args{action};
29 1         2 my $inputs = $args{inputs};
30 1         3 my $cb = $args{cb};
31 1   33     8 my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;
32              
33 1 50       5 croak q/Argument 'inputs' is required/ if !defined $inputs;
34 1 50       4 croak q/Argument 'action' is required/ if !defined $action;
35 1 50       4 croak q/Argument 'cb' is required/ if !defined $cb;
36              
37 1 50       4 croak q/Argument 'inputs' must be an ArrayRef/ if ref $inputs ne 'ARRAY';
38 1 50       8 croak q/Argument 'action' must be a CodeRef/ if ref $action ne 'CODE';
39 1 50       3 croak q/Argument 'cb' must be a CodeRef/ if ref $cb ne 'CODE';
40              
41 1         2 my $inflight = 0;
42 1         2 my $cb_count = 0;
43 1         1 my $input_index = 0;
44 1         2 my $outputs = [];
45 1         2 my $any_err = 0;
46 1         1 my $after_work;
47              
48             my $run = sub {
49              
50 3   66 3   10 while ($inflight < $at_a_time && $input_index <= $#{ $inputs }) {
  6         25  
51              
52 3         6 $inflight++;
53              
54 3         9 my $index = $input_index;
55 3         5 my $input = $inputs->[ $index ];
56 3         3 $input_index++;
57              
58             my $after_work_wrapper = sub {
59 3         50 my ($res, $err) = @_;
60 3         3 my $i = $index;
61 3         11 $after_work->($res, $err, $i);
62 3         15 };
63              
64 3         12 $action->($input, $after_work_wrapper);
65              
66 3         106 weaken $after_work_wrapper;
67             }
68              
69 1         5 };
70              
71             $after_work = sub {
72 3     3   6 my ($output, $err, $index) = @_;
73              
74 3         3 $cb_count++;
75 3         4 $inflight--;
76              
77 3 50       8 return if $any_err;
78              
79 3 50       35 if ($err) {
80 0         0 $any_err = 1;
81 0         0 return $cb->(undef, $err);
82             }
83              
84             # store the output
85 3         5 $outputs->[$index] = $output;
86              
87 3 100       4 return $cb->($outputs) if $cb_count == @{ $inputs };
  3         11  
88              
89 2         7 $run->();
90 1         5 };
91              
92 1         3 $run->();
93 1         6 weaken $run;
94              
95 1         27 return;
96             }
97              
98             sub _amap_ignore {
99 1     1   3 my (%args) = @_;
100              
101 1         2 my $action = $args{action};
102 1         1 my $inputs = $args{inputs};
103 1         1 my $cb = $args{cb};
104 1   33     5 my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;
105              
106 1 50       2 croak q/Argument 'inputs' is required/ if !defined $inputs;
107 1 50       4 croak q/Argument 'action' is required/ if !defined $action;
108 1 50       2 croak q/Argument 'cb' is required/ if !defined $cb;
109              
110 1 50       3 croak q/Argument 'inputs' must be an ArrayRef/ if ref $inputs ne 'ARRAY';
111 1 50       3 croak q/Argument 'action' must be a CodeRef/ if ref $action ne 'CODE';
112 1 50       3 croak q/Argument 'cb' must be a CodeRef/ if ref $cb ne 'CODE';
113              
114 1         1 my $inflight = 0;
115 1         1 my $cb_count = 0;
116 1         2 my $input_index = 0;
117 1         0 my $any_err = 0;
118 1         1 my $after_work;
119              
120             my $run = sub {
121              
122 3   66 3   7 while ($inflight < $at_a_time && $input_index <= $#{ $inputs }) {
  6         35  
123              
124 3         4 $inflight++;
125              
126 3         3 my $index = $input_index;
127 3         4 my $input = $inputs->[ $index ];
128 3         3 $input_index++;
129              
130 3         7 $action->($input, $after_work);
131             }
132 1         4 };
133              
134             $after_work = sub {
135 3     3   20 my (undef, $err) = @_;
136              
137 3         4 $cb_count++;
138 3         2 $inflight--;
139              
140 3 50       11 return if $any_err;
141              
142 3 50       5 if ($err) {
143 0         0 $any_err = 1;
144 0         0 return $cb->(undef, $err);
145             }
146              
147 3 100       4 return $cb->() if $cb_count == @{ $inputs };
  3         10  
148              
149 2         7 $run->();
150 1         4 };
151              
152 1         3 $run->();
153              
154 1         8 weaken $after_work;
155              
156 1         8 return;
157             }
158              
159             sub azipmap {
160 1     1 1 776 my (%args) = @_;
161              
162 1         2 my $actions = $args{actions};
163 1         2 my $inputs = $args{inputs};
164 1         1 my $cb = $args{cb};
165 1   33     6 my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;
166              
167 1 50       3 croak q/Argument 'inputs' is required/ if !defined $inputs;
168 1 50       3 croak q/Argument 'actions' is required/ if !defined $actions;
169 1 50       3 croak q/Argument 'cb' is required/ if !defined $cb;
170              
171 1 50       3 croak q/Argument 'actions' must be an ArrayRef/ if ref $actions ne 'ARRAY';
172 1 50       3 croak q/Argument 'cb' must be a CodeRef/ if ref $cb ne 'CODE';
173              
174 1   33     3 $inputs //= map { undef } 1..@{ $actions };
  0         0  
  0         0  
175              
176 1         1 my $inflight = 0;
177 1         2 my $cb_count = 0;
178 1         2 my $work_idx = 0;
179 1         1 my $outputs = [];
180 1         2 my $any_err = 0;
181 1         1 my $after_work;
182              
183             my $run = sub {
184              
185 3   66 3   9 while ($inflight < $at_a_time && $work_idx <= $#{ $actions }) {
  6         23  
186              
187 3         3 $inflight++;
188              
189 3         3 my $index = $work_idx;
190 3         5 my $action = $actions->[ $index ];
191 3         3 my $input = $inputs->[ $index ];
192 3         3 $work_idx++;
193              
194             my $after_work_wrapper = sub {
195 3         10 my $i = $index;
196 3         9 $after_work->($_[0], $_[1], $i);
197 3         8 };
198              
199 3         7 $action->($input, $after_work_wrapper);
200              
201 3         25 weaken $after_work_wrapper;
202             }
203 1         5 };
204              
205             $after_work = sub {
206 3     3   4 my ($output, $err, $index) = @_;
207              
208 3         3 $cb_count++;
209 3         2 $inflight--;
210              
211 3 50       6 return if $any_err;
212              
213 3 50       6 if ($err) {
214 0         0 $any_err = 1;
215 0         0 $cb->(undef, $err);
216             }
217              
218 3         4 $outputs->[$index] = $output;
219              
220 3 100       3 return $cb->($outputs) if $cb_count == @{ $actions };
  3         8  
221              
222 2         8 $run->();
223 1         5 };
224              
225 1         4 $run->();
226 1         8 weaken $run;
227              
228 1         8 return;
229             }
230              
231             sub achain {
232 1     1 1 524 my (%args) = @_;
233              
234 1         3 my $input = $args{input};
235 1         2 my $cb = $args{cb};
236 1         1 my $steps = $args{steps};
237              
238 1 50       4 croak q/Argument 'cb' is required/ if !defined $cb;
239 1 50       3 croak q/Argument 'steps' is required/ if !defined $steps;
240              
241 1 50       3 croak q/Argument 'cb' must be a CodeRef/ if ref $cb ne 'CODE';
242 1 50       3 croak q/Argument 'steps' must be an ArrayRef/ if ref $steps ne 'ARRAY';
243              
244 1         2 my $run; $run = sub {
245 3     3   2288 my ($result, $err) = @_;
246              
247 3 50       7 return $cb->(undef, $err) if $err;
248              
249 3         3 my $next_cb = shift @{ $steps };
  3         5  
250              
251 3 100       11 return $cb->($result) if !defined $next_cb;
252              
253 2         5 $next_cb->($result, $run);
254 1         4 };
255              
256 1         2 $run->($input);
257 1         33 weaken $run;
258              
259 1         3 return;
260             }
261              
262             1;
263              
264             __END__