File Coverage

blib/lib/Async/MergePoint.pm
Criterion Covered Total %
statement 42 42 100.0
branch 19 20 95.0
condition 3 3 100.0
subroutine 7 7 100.0
pod 4 4 100.0
total 75 76 98.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2007-2011 -- leonerd@leonerd.org.uk
5              
6             package Async::MergePoint;
7              
8 3     3   176493 use strict;
  3         7  
  3         111  
9 3     3   18 use warnings;
  3         8  
  3         127  
10              
11             our $VERSION = '0.04';
12              
13 3     3   16 use Carp;
  3         10  
  3         1702  
14              
15             =head1 NAME
16              
17             C - resynchronise diverged control flow
18              
19             =head1 SYNOPSIS
20              
21             use Async::MergePoint;
22              
23             my $merge = Async::MergePoint->new(
24             needs => [ "leaves", "water" ],
25             );
26              
27             my $water;
28             Kettle->boil(
29             on_boiled => sub { $water = shift; $merge->done( "water" ); }
30             );
31              
32             my $tea_leaves;
33             Cupboard->get_tea_leaves(
34             on_fetched => sub { $tea_leaves = shift; $merge->done( "leaves" ); }
35             );
36              
37             $merge->close(
38             on_finished => sub {
39             # Make tea using $water and $tea_leaves
40             }
41             );
42              
43             =head1 DESCRIPTION
44              
45             Often in program logic, multiple different steps need to be taken that are
46             independent of each other, but their total result is needed before the next
47             step can be taken. In synchonous code, the usual approach is to do them
48             sequentially.
49              
50             An asynchronous or event-based program could do this, but if each step
51             involves some IO idle time, better overall performance can often be gained by
52             running the steps in parallel. A C object can then be used
53             to wait for all of the steps to complete, before passing the combined result
54             of each step on to the next stage.
55              
56             A merge point maintains a set of outstanding operations it is waiting on;
57             these are arbitrary string values provided at the object's construction. Each
58             time the C method is called, the named item is marked as being
59             complete. When all of the required items are so marked, the C
60             continuation is invoked.
61              
62             For use cases where code may be split across several different lexical scopes,
63             it may not be convenient or possible to share a lexical variable, to pass on
64             the result of some asynchronous operation. In these cases, when an item is
65             marked as complete a value can also be provided which contains the results of
66             that step. The C callback is passed a hash (in list form, rather
67             than by reference) of the collected item values.
68              
69             This module was originally part of the L distribution, but was
70             removed under the inspiration of Pedro Melo's L distribution,
71             because it doesn't itself contain anything IO-specific.
72              
73             =cut
74              
75             =head1 CONSTRUCTOR
76              
77             =cut
78              
79             =head2 $merge = Async::MergePoint->new( %params )
80              
81             This function returns a new instance of a C object. The
82             C<%params> hash takes the following keys:
83              
84             =over 8
85              
86             =item needs => ARRAY
87              
88             Optional. An array containing unique item names to wait on. The order of this
89             array is not significant.
90              
91             =item on_finished => CODE
92              
93             Optional. CODE reference to the continuation for when the merge point becomes
94             ready. If provided, will be passed to the C method.
95              
96             =back
97              
98             =cut
99              
100             sub new
101             {
102 10     10 1 5562 my $class = shift;
103 10         35 my ( %params ) = @_;
104              
105 10         58 my $self = bless {
106             needs => {},
107             items => {},
108             }, $class;
109              
110 10 100       41 if( $params{needs} ) {
111 8 100       395 ref $params{needs} eq 'ARRAY' or croak "Expected 'needs' to be an ARRAY ref";
112 7         12 $self->needs( @{ $params{needs} } );
  7         31  
113             }
114              
115 9 100       32 if( $params{on_finished} ) {
116 5         17 $self->close( on_finished => $params{on_finished} );
117             }
118              
119 8         27 return $self;
120             }
121              
122             =head1 METHODS
123              
124             =cut
125              
126             =head2 $merge->close( %params )
127              
128             Allows an C continuation to be set if one was not provided to the
129             constructor.
130              
131             =over 8
132              
133             =item on_finished => CODE
134              
135             CODE reference to the continuation for when the merge point becomes ready.
136              
137             =back
138              
139             The C continuation will be called when every key in the C
140             list has been notified by the C method. It will be called as
141              
142             $on_finished->( %items )
143              
144             where the C<%items> hash will contain the item names that were waited on, and
145             the values passed to the C method for each one. Note that this is
146             passed as a list, not as a HASH reference.
147              
148             While this feature can be used to pass data from the component parts back up
149             into the continuation, it may be more direct to use normal lexical variables
150             instead. This method allows the continuation to be placed after the blocks of
151             code that execute the component parts, so it reads downwards, and may make it
152             more readable.
153              
154             =cut
155              
156             sub close
157             {
158 10     10 1 1045 my $self = shift;
159 10         25 my %params = @_;
160              
161 10 100       496 ref $params{on_finished} eq 'CODE' or croak "Expected 'on_finished' to be a CODE ref";
162              
163 9 100       224 $self->{on_finished} and croak "Already have an 'on_finished', can't set another";
164            
165 8         21 $self->{on_finished} = $params{on_finished};
166              
167 8 100       12 if( !keys %{ $self->{needs} } ) {
  8         45  
168             # Execute it now
169 2         4 $self->{on_finished}->( %{$self->{items}} );
  2         10  
170             }
171             }
172              
173             =head2 $merge->needs( @keys )
174              
175             When called on an open MergePoint (i.e. one that does not yet have an
176             C continuation), this method adds extra key names to the set of
177             outstanding names. The order of this list is not significant.
178              
179             This method throws an exception if the MergePoint is already closed.
180              
181             =cut
182              
183             sub needs
184             {
185 9     9 1 738 my $self = shift;
186              
187 9 100       223 $self->{on_finished} and croak "Cannot add extra keys to a closed MergePoint";
188              
189 8         23 foreach my $key ( @_ ) {
190 9 50       30 $self->{needs}{$key} and croak "Already need '$key'";
191 9         42 $self->{needs}{$key}++;
192             }
193             }
194              
195             =head2 $merge->done( $item, $value )
196              
197             This method informs the merge point that the C<$item> is now ready, and
198             passes it a value to store, to be passed into the C continuation.
199             If this call gives the final remaining item being waited for, the
200             C continuation is called within it, and the method will not
201             return until it has completed.
202              
203             =cut
204              
205             sub done
206             {
207 7     7 1 5394 my $self = shift;
208 7         17 my ( $item, $value ) = @_;
209              
210 7 100       2019 exists $self->{needs}->{$item} or croak "$self does not need $item";
211              
212 6         16 delete $self->{needs}->{$item};
213 6         20 $self->{items}->{$item} = $value;
214              
215 6 100 100     8 if( !keys %{ $self->{needs} } and $self->{on_finished} ) {
  6         50  
216 4         10 $self->{on_finished}->( %{$self->{items}} );
  4         25  
217             }
218             }
219              
220             =head1 EXAMPLES
221              
222             =head2 Asynchronous Plugins
223              
224             Consider a program using C to provide a plugin architecture
225             to respond to events, where sometimes the response to an event may require
226             asynchronous work. A C object can be used to coordinate the
227             responses from the plugins to this event.
228              
229             my $merge = Async::MergePoint->new();
230              
231             foreach my $plugin ( $self->plugins ) {
232             $plugin->handle_event( "event", $merge, @args );
233             }
234              
235             $merge->close( on_finished => sub {
236             my %results = @_;
237             print "All plugins have recognised $event\n";
238             } );
239              
240             Each plugin that wishes to handle the event can use its own package name, for
241             example, as its unique key name for the MergePoint. A plugin handling the
242             event synchonously could perform something such as:
243              
244             sub handle_event
245             {
246             my ( $event, $merge, @args ) = @_;
247             ....
248             $merge->needs( __PACKAGE__ );
249             $merge->done( __PACKAGE__ => $result );
250             }
251              
252             Whereas, to handle the event asynchronously the plugin can instead perform:
253              
254             sub handle_event
255             {
256             my ( $event, $merge, @args ) = @_;
257             ....
258             $merge->needs( __PACKAGE__ );
259              
260             sometime_later( sub {
261             $merge->done( __PACKAGE__ => $result );
262             } );
263             }
264              
265             =head1 AUTHOR
266              
267             Paul Evans
268              
269             =cut
270              
271             0x55AA;