File Coverage

blib/lib/Future/Workflow/Pipeline.pm
Criterion Covered Total %
statement 103 103 100.0
branch 19 22 86.3
condition 5 6 83.3
subroutine 19 19 100.0
pod 5 5 100.0
total 151 155 97.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2021-2024 -- leonerd@leonerd.org.uk
5              
6 6     6   1107549 use v5.26;
  6         25  
7 6     6   36 use warnings;
  6         14  
  6         428  
8 6     6   3893 use Object::Pad 0.800;
  6         47670  
  6         463  
9              
10             package Future::Workflow::Pipeline 0.02;
11             class Future::Workflow::Pipeline;
12              
13 6     6   2572 use Carp;
  6         28  
  6         405  
14              
15 6     6   3498 use Future::AsyncAwait;
  6         131303  
  6         34  
16              
17             =head1 NAME
18              
19             C - a pipeline of processing stages
20              
21             =head1 SYNOPSIS
22              
23             =for highlighter language=perl
24              
25             # 1: Make a pipeline
26             my $pipeline = Future::Workflow::Pipeline->new;
27              
28              
29             # 2: Add some stages to it
30            
31             # An async stage; e.g. perform an HTTP fetch
32             my $ua = Net::Future::HTTP->new;
33             $pipeline->append_stage_async( async sub ($url) {
34             return await $ua->GET( $url );
35             });
36              
37             # A synchronous (in-process) stage; e.g. some HTML parsing
38             $pipeline->append_stage_sync( sub ($response) {
39             my $dom = Mojo::DOM->new( $response->decoded_content );
40             return $dom->at('div[id="main"]')->text;
41             });
42              
43             # A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
44             $pipeline->append_stage_detached( sub ($text) {
45             my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);
46              
47             my $best; my $bestscore;
48             while(my @words = $iter->next) {
49             my $str = join "\0", @words;
50             my $score = md5sum( $str );
51             next if defined $bestscore and $score ge $bestscore;
52              
53             $best = $str;
54             $bestscore = $score;
55             }
56              
57             return $best;
58             });
59              
60              
61             # 3: Give it an output
62              
63             # These are alternatives:
64              
65             # An asynchronous output
66             my $dbh = Database::Async->new( ... );
67             $pipeline->set_output_async( async sub ($best) {
68             await $dbh->do('INSERT INTO Results VALUES (?)', $best);
69             });
70              
71             # A synchronous output
72             $pipeline->set_output_sync( sub ($best) {
73             print "MD5 minimized sort order is:\n";
74             print " $_\n" for split m/\0/, $best;
75             });
76              
77              
78             # 4: Now start it running on some input values
79              
80             foreach my $url (slurp_lines("urls.txt")) {
81             await $pipeline->push_input($url);
82             }
83              
84              
85             # 5: Wait for it all to finish
86             await $pipeline->drain;
87              
88             =head1 DESCRIPTION
89              
90             Instances of this class implement a "pipeline", a sequence of data-processing
91             stages. Each stage is represented by a function that is passed a single
92             argument and should return a result. The pipeline itself stores a function
93             that will be passed each eventual result.
94              
95             =head2 Queueing
96              
97             In front of every stage there exists a queue of pending items. If the first
98             stage is currently busy when C is called, the item is accepted
99             into its queue instead. Items will be taken from the queue in the order they
100             were pushed when the stage's work function finishes with prior items.
101              
102             If the queue between stages is full, then items will remain pending in prior
103             stages. Ultimately this back-pressure will make its way back to the
104             C method at the beginning of the pipeline.
105              
106             =cut
107              
108             =head1 CONSTRUCTOR
109              
110             $pipeline = Future::Workflow::Pipeline->new;
111              
112             The constructor takes no additional parameters.
113              
114             =cut
115              
116             field $_output;
117             field @_stages;
118              
119             =head1 METHODS
120              
121             =cut
122              
123             =head2 set_output
124              
125             $pipeline->set_output( $code );
126              
127             await $code->( $result );
128              
129             Sets the destination output for the pipeline. Each completed work item will be
130             passed to the invoked function, which is expected to return a C.
131              
132             =cut
133              
134 12     12 1 129 method set_output ( $code )
  12         46  
  12         21  
  12         21  
135             {
136 12         37 $_output = $code;
137 12 50       58 $_stages[-1]->set_output( $_output ) if @_stages;
138             }
139              
140             =head2 set_output_sync
141              
142             $pipeline->set_output_sync( $code );
143              
144             $code->( $result );
145              
146             Similar to L, where the output function is called synchronously,
147             returning when it has finished.
148              
149             =cut
150              
151 3     3 1 47 method set_output_sync ( $code )
  3         31  
  3         8  
  3         5  
152             {
153 3     4   40 $self->set_output( async sub ( $result ) { $code->( $result ) } );
  4         1331  
  4         9  
  4         10  
  4         7  
  4         15  
154             }
155              
156             =head2 append_stage
157              
158             $pipeline->append_stage( $code, %args );
159              
160             $result = await $code->( $item );
161              
162             Appends a pipeline stage that is implemented by an asynchronous function. Each
163             work item will be passed in by invoking the function, and it is expected to
164             return a C which will eventually yield the result of that stage.
165              
166             The following optional named args are recognised:
167              
168             =over 4
169              
170             =item concurrent => NUM
171              
172             Allow this number of outstanding items concurrently.
173              
174             =item max_queue => NUM
175              
176             If defined, no more than this number of items can be enqueued. If undefined,
177             no limit is applied.
178              
179             This value can be zero, which means that any attempts to push more items will
180             remain pending until the work function is free to deal with it; i.e. no
181             queueing will be permitted.
182              
183             =item on_failure => CODE
184              
185             $on_failure->( $f )
186              
187             Provides a callback event function for handling a failure thrown by the stage
188             code. If not provided, the default behaviour is to print the failure message
189             as a warning.
190              
191             Note that this handler cannot turn a failure into a successful result or
192             otherwise resume or change behaviour of the pipeline. For error-correction you
193             will have to handle that inside the stage function code itself. This handler
194             is purely the last stop of error handling, informing the user of an
195             otherwise-unhandled error before ignoring it.
196              
197             =back
198              
199             =cut
200              
201 13     13 1 111 method append_stage ( $code, %args )
  13         33  
  13         27  
  13         28  
  13         51  
202             {
203 13 100       46 my $old_tail = @_stages ? $_stages[-1] : undef;
204              
205 13         173 push @_stages, my $new_tail = Future::Workflow::Pipeline::_Stage->new(
206             code => $code,
207             %args,
208             );
209 13 50       73 $new_tail->set_output( $_output ) if $_output;
210              
211 4     4   830 $old_tail->set_output( async sub ( $item ) {
  4         7  
  4         10  
  4         24  
212 4         21 await $new_tail->push_input( $item );
213 13 100       87 } ) if $old_tail;
214             }
215              
216             =head2 append_stage_sync
217              
218             $pipeline->append_stage_sync( $code, %args );
219              
220             $result = $code->( $item );
221              
222             Similar to L, where the stage function is called synchronously,
223             returning its result immediately.
224              
225             Because of this, the C named parameter is not permitted.
226              
227             =cut
228              
229 2     2 1 25 method append_stage_sync ( $code, %args )
  2         7  
  2         3  
  2         5  
  2         4  
230             {
231             defined $args{concurrent} and
232 2 50       9 croak "->append_stage_sync does not permit the 'concurrent' parameter";
233              
234 2         4 return $self->append_stage(
235 2     2   16 async sub ( $item ) { return $code->( $item ) },
  2         4  
  2         5  
  2         3  
  2         10  
236             %args,
237             );
238             }
239              
240             =head2 push_input
241              
242             await $pipeline->push_input( $item );
243              
244             Adds a new work item into the pipeline, which will pass through each of the
245             stages and eventually invoke the output function.
246              
247             =cut
248              
249 25     25 1 3086 async method push_input ( $item )
  25         79  
  25         49  
  25         43  
250 25         53 {
251             # TODO: this feels like a weird specialcase for no stages
252 25 100       65 if( @_stages ) {
253 21         82 await $_stages[0]->push_input( $item );
254             }
255             else {
256 4         14 await $_output->( $item );
257             }
258             }
259              
260             class Future::Workflow::Pipeline::_Stage :strict(params) {
261              
262 6     6   11029 use Future;
  6         12  
  6         10992  
263              
264             field $_code :param;
265 16     16   40 field $_output :writer;
  16         41  
266              
267             field $_on_failure :param = sub ( $f ) {
268             warn "Pipeline stage failed: ", scalar $f->failure;
269             };
270              
271             # $_concurrent == maximum size of @_work_f
272             field $_concurrent :param = 1;
273             field @_work_f;
274              
275             # $_max_queue == maximum size of @_queue, or undef for unbounded
276             field $_max_queue :param = undef;
277             field @_queue;
278              
279             field @_awaiting_input;
280              
281 23     23   72 async method _do ( $item )
  23         53  
  23         43  
  23         34  
282 23         44 {
283 23         74 await $_output->( await $_code->( $item ) );
284             }
285              
286 23     23   44 method _schedule ( $item, $i )
  23         79  
  23         42  
  23         37  
  23         40  
287             {
288 23         84 my $f = $_work_f[$i] = $self->_do( $item );
289 21     21   27822 $f->on_ready( sub ( $f ) {
  21         43  
  21         112  
290 21 100       81 $_on_failure->( $f ) if $f->is_failed;
291              
292 21 100       1291 if( @_queue ) {
293 5         50 $self->_schedule( shift @_queue, $i );
294 5 100       156 ( shift @_awaiting_input )->done if @_awaiting_input;
295             }
296             else {
297 16         49 undef $_work_f[$i];
298             }
299 23         3089 } );
300             }
301              
302 25     25   50 async method push_input ( $item )
  25         84  
  25         51  
  25         57  
303 25         48 {
304 25         66 my $i;
305             defined $_work_f[$_] or ( $i = $_ ), last
306 25   66     176 for 0 .. $_concurrent-1;
307              
308 25 100       66 if( defined $i ) {
309 18         105 $self->_schedule( $item, $i );
310             }
311             else {
312 7 100 100     34 if( defined $_max_queue and @_queue >= $_max_queue ) {
313             # TODO: Maybe we should clone one of the work futures?
314 3         14 push @_awaiting_input, my $enqueue_f = Future->new;
315 3         39 await $enqueue_f;
316             }
317 5         170 push @_queue, $item;
318             }
319             }
320             }
321              
322             =head1 UNSOLVED QUESTIONS
323              
324             =over 4
325              
326             =item *
327              
328             Is each work item represented by some object that gets passed around?
329              
330             Can we store context, maybe in a hash or somesuch, that each stage can inspect
331             and append more things into? It might be useful to remember at least the
332             initial URLs by the time we generate the outputs
333              
334             =item *
335              
336             Tuning parameters. In particular, being able to at least set overall
337             concurrency of C and C stages, the detachment model of the
338             C stages (threads vs. forks), inter-stage buffering?
339              
340             =back
341              
342             =cut
343              
344             =head1 AUTHOR
345              
346             Paul Evans
347              
348             =cut
349              
350             0x55AA;