File Coverage

blib/lib/Broker/Async.pm
Criterion Covered Total %
statement 61 61 100.0
branch 11 14 78.5
condition 1 3 33.3
subroutine 13 13 100.0
pod 1 6 16.6
total 87 97 89.6


line stmt bran cond sub pod time code
1             package Broker::Async;
2 6     6   4477 use strict;
  6         12  
  6         214  
3 6     6   33 use warnings;
  6         9  
  6         208  
4 6     6   3155 use Broker::Async::Worker;
  6         27  
  6         275  
5 6     6   57 use Carp;
  6         68  
  6         627  
6 6     6   73 use Scalar::Util qw( blessed weaken );
  6         12  
  6         746  
7              
8             =head1 NAME
9              
10             Broker::Async - broker tasks for multiple workers
11              
12             =for html
13              
14             =head1 SYNOPSIS
15              
16             my @workers;
17             for my $uri (@uris) {
18             my $client = SomeClient->new($uri);
19             push @workers, sub { $client->request(@_) };
20             }
21              
22             my $broker = Broker::Async->new(workers => \@workers);
23             for my $future (map $broker->do($_), @requests) {
24             my $result = $future->get;
25             ...
26             }
27              
28             =head1 DESCRIPTION
29              
30             This module brokers tasks for multiple asynchronous workers. A worker can be any code reference that returns a L, representing work awaiting completion.
31              
32             Some common use cases include throttling asynchronous requests to a server, or delegating tasks to a limited number of processes.
33              
34             =cut
35              
36             our $VERSION = "0.0.5"; # __VERSION__
37              
38             =head1 ATTRIBUTES
39              
40             =head2 workers
41              
42             An array ref of workers used for handling tasks.
43             Can be a code reference, a hash ref of L arguments, or a L object.
44             Every invocation of a worker must return a L object.
45              
46             Under the hood, code and hash references are simply used to instantiate a L object.
47             See L for more documentation about how these parameters are used.
48              
49             =cut
50              
51             use Class::Tiny qw( workers ), {
52 5         42 queue => sub { [] },
53 6     6   62 };
  6         10  
  6         48  
54              
55             =head1 METHODS
56              
57             =head2 new
58              
59             my $broker = Broker::Async->new(
60             workers => [ sub { ... }, ... ],
61             );
62              
63             =cut
64              
65             sub active {
66 130     130 0 4759 my ($self) = @_;
67 130         4652 return grep { $_->active } @{ $self->workers };
  1045         38556  
  130         47846  
68             }
69              
70             sub available {
71 408     408 0 16080 my ($self) = @_;
72 408         6626 return grep { $_->available } @{ $self->workers };
  3740         200045  
  408         69548  
73             }
74              
75             sub BUILD {
76 10     10 0 79699 my ($self) = @_;
77 10         27 for my $name (qw( workers )) {
78 10 50       308 croak "$name attribute required" unless defined $self->$name;
79             }
80              
81 10         240 my $workers = $self->workers;
82 10 50       73 croak "workers attribute must be an array ref: received $workers"
83             unless ref($workers) eq 'ARRAY';
84              
85 10         35 for (my $i = 0; $i < @$workers; $i++) {
86 21         128 my $worker = $workers->[$i];
87              
88 21         33 my $type = ref($worker);
89 21 100       49 if ($type eq 'CODE') {
    100          
90 17         74 $workers->[$i] = Broker::Async::Worker->new({code => $worker});
91             } elsif ($type eq 'HASH') {
92 3         14 $workers->[$i] = Broker::Async::Worker->new($worker);
93             }
94             }
95             }
96              
97             =head2 do
98              
99             my $future = $broker->do(@args);
100              
101             Queue the invocation of a worker with @args.
102             @args can be any data structure, and is passed as is to a worker code ref.
103             Returns a L object that resolves when the work is done.
104              
105             There is no guarantee when a worker will be called, that depends on when a worker becomes available.
106             However, calls are guaranteed to be invoked in the order they are seen by $broker->do.
107              
108             =cut
109              
110              
111             sub do {
112 130     130 1 39308 my ($self, @args) = @_;
113              
114             # enforces consistent order of task execution
115             # makes sure current task is only started if nothing else is queued
116 130         4930 $self->process_queue;
117              
118 130         5309 my $future;
119 130 100       4865 if (my @active_futures = map $_->active, $self->active) {
    50          
120             # generate future from an existing future
121             # see Future::_new_convergent
122 119         16734 my $_future = $active_futures[0];
123 119   33     4576 ref($_) eq "Future" or $_future = $_, last for @active_futures;
124              
125 119         3534 $future = $_future->new;
126 119         13189 push @{ $self->queue }, {args => \@args, future => $future};
  119         33457  
127             } elsif (my ($available_worker) = $self->available) {
128             # should only be here if there's nothing active and nothing queued
129             # so start the task and return it's future
130 11         7553 $future = $self->do_worker($available_worker, @args);
131             }
132              
133             # start any recently queued tasks, if there are available workers
134 130         92591 $self->process_queue;
135 130         120997 return $future;
136             }
137              
138             sub do_worker {
139 130     130 0 4672 weaken(my $self = shift);
140 130         4653 my ($worker, @args) = @_;
141              
142             return $worker->do(@args)->on_ready(sub{
143             # queue next task
144 126     126   68162 $self->process_queue;
145 130         4743 });
146             }
147              
148             sub process_queue {
149 386     386 0 14692 weaken(my $self = shift);
150 386         143705 my $queue = $self->queue;
151              
152 386         59611 while (@$queue) {
153 397 100       11093 my ($worker) = $self->available or last;
154 119         15373 my $task = shift @$queue;
155              
156 119         6113 $self->do_worker($worker, @{$task->{args}})
157 119         3064 ->on_ready($task->{future});
158             }
159             }
160              
161             =head1 AUTHOR
162              
163             Mark Flickinger Emaf@cpan.orgE
164              
165             =head1 LICENSE
166              
167             This software is licensed under the same terms as Perl itself.
168              
169             =cut
170              
171             1;