File Coverage

blib/lib/Async/Queue.pm
Criterion Covered Total %
statement 84 84 100.0
branch 36 36 100.0
condition 26 27 96.3
subroutine 15 15 100.0
pod 5 5 100.0
total 166 167 99.4


line stmt bran cond sub pod time code
1             package Async::Queue;
2              
3 6     6   177506 use 5.006;
  6         25  
  6         253  
4 6     6   42 use strict;
  6         12  
  6         198  
5 6     6   46 use warnings;
  6         17  
  6         191  
6              
7 6     6   37 use Carp;
  6         26  
  6         625  
8 6     6   37 use Scalar::Util qw(looks_like_number);
  6         13  
  6         1680  
9              
10             sub new {
11 71     71 1 76318 my ($class, %options) = @_;
12 71         473 my $self = bless {
13             concurrency => 1,
14             worker => undef,
15             drain => undef,
16             empty => undef,
17             saturated => undef,
18             task_queue => [],
19             running => 0,
20             }, $class;
21 71         425 $self->$_($options{$_}) foreach qw(concurrency worker drain empty saturated);
22 48         184 return $self;
23             }
24              
25             sub _define_hook_accessors {
26 24     24   66 my ($name, %options) = @_;
27 24         32 my $class = __PACKAGE__;
28 24         50 my $fullname = "${class}::$name";
29 6     6   177 no strict 'refs';
  6         16  
  6         6271  
30 24         161 *{$fullname} = sub {
31 2844     2844   29556 my ($self, $v) = @_;
32 2844 100       5968 if(@_ > 1) {
33 303 100 66     926 croak "$name must not be undef." if !defined($v) && !$options{allow_undef};
34 300 100 100     1449 croak "$name must be a coderef" if defined($v) && ref($v) ne 'CODE';
35 260 100       523 croak "You cannot set $name while there is a running task." if $self->running > 0;
36 248         754 $self->{$name} = $v;
37             }
38 2789         9077 return $self->{$name};
39 24         98 };
40             }
41              
42             sub running {
43 7339     7339 1 9691 my ($self) = @_;
44 7339         16261 return $self->{running};
45             }
46              
47             sub concurrency {
48 11267     11267 1 29600 my ($self, $conc) = @_;
49 11267 100       24998 if(@_ > 1) {
50 91 100       231 croak "You cannot set concurrency while there is a running task" if $self->running > 0;
51 88 100       223 $conc = 1 if not defined($conc);
52 88 100       466 croak "concurrency must be a number" if !looks_like_number($conc);
53 86         159 $self->{concurrency} = int($conc);
54             }
55 11262         39062 return $self->{concurrency};
56             }
57              
58             sub length {
59 946     946 1 6831 my ($self) = @_;
60 946         972 return int(@{$self->{task_queue}});
  946         3575  
61             }
62              
63             *waiting = \&length;
64              
65             _define_hook_accessors 'worker';
66             _define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);
67              
68             sub push {
69 2155     2155 1 41919 my ($self, $task, $cb) = @_;
70 2155 100       4096 if(@_ < 2) {
71 1         20 croak("You must specify something to push.");
72             }
73 2154 100 100     6686 if(defined($cb) && ref($cb) ne 'CODE') {
74 5         58 croak("callback for a task must be a coderef");
75             }
76 2149         2332 push(@{$self->{task_queue}}, [$task, $cb]);
  2149         4916  
77 2149         3888 $self->_shift_run(1);
78 2145         3965 return $self;
79             }
80              
81             sub _shift_run {
82 4280     4280   4855 my ($self, $from_push) = @_;
83 4280 100 100     6666 return if $self->concurrency > 0 && $self->running >= $self->concurrency;
84 2238         2476 my $args_ref = shift(@{$self->{task_queue}});
  2238         4231  
85 2238 100       4657 return if !defined($args_ref);
86 2144         2986 my ($task, $cb) = @$args_ref;
87 2144         2649 $self->{running} += 1;
88 2144 100 100     3514 if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {
      100        
89 26         58 $self->saturated->($self);
90             }
91 2143 100 100     12060 if(@{$self->{task_queue}} == 0 && defined($self->empty)) {
  2143         5642  
92 114         311 $self->empty->($self);
93             }
94 2142         43672 my $sync = 1;
95 2142         4296 my $sync_completed = 0;
96             $self->worker->($task, sub {
97 2132     2132   105999 my (@worker_results) = @_;
98 2132 100       5156 $cb->(@worker_results) if defined($cb);
99 2132         14793 $self->{running} -= 1;
100 2132 100 100     2104 if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {
  2132   100     5810  
101 32         67 $self->drain->($self);
102             }
103 2131 100       15246 if($sync) {
104 2104         4018 $sync_completed = 1;
105             }else {
106 27         59 @_ = ($self);
107 27         95 goto &_shift_run;
108             }
109 2142         9285 }, $self);
110 2140         15369 $sync = 0;
111 2140 100       7329 if($sync_completed) {
112 2104         3490 @_ = ($self);
113 2104         5731 goto &_shift_run;
114             }
115             }
116              
117              
118             =head1 NAME
119              
120             Async::Queue - control concurrency of asynchronous tasks
121              
122             =head1 VERSION
123              
124             Version 0.021
125              
126             =cut
127              
128             our $VERSION = '0.021';
129              
130              
131             =head1 SYNOPSIS
132              
133              
134             use Async::Queue;
135            
136             ## create a queue object with concurrency 2
137             my $q = Async::Queue->new(
138             concurrency => 2, worker => sub {
139             my ($task, $callback) = @_;
140             print "hello $task->{name}\n";
141             $callback->();
142             }
143             );
144            
145             ## assign a callback
146             $q->drain(sub {
147             print "all items have been processed\n";
148             });
149            
150             ## add some items to the queue
151             $q->push({name => 'foo'}, sub {
152             print "finished processing foo\n";
153             });
154             $q->push({name => 'bar'}, sub {
155             print "finished processing bar\n";
156             });
157              
158              
159             =head1 DESCRIPTION
160              
161             L is used to process tasks with the specified concurrency.
162             The tasks given to L are processed in parallel with its worker routine up to the concurrency level.
163             If more tasks arrive at the L object, those tasks will wait for currently running tasks to finish.
164             When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.
165              
166             In short, L is a Perl port of the C object of async.js (L).
167              
168             The basic usage of L is as follows:
169              
170             =over
171              
172             =item 1.
173              
174             Create L object with C attribute and optional C attribute.
175             C is a subroutine reference that processes tasks. C is the concurrency level.
176              
177             =item 2.
178              
179             Push tasks to the L object via C method with optional callback functions.
180              
181             The tasks will be processed in FIFO order by the C subroutine.
182             When a task is finished, the callback function, if any, is called with the results.
183              
184              
185             =back
186              
187              
188             =head1 CLASS METHODS
189              
190             =head2 $queue = Async::Queue->new(%attributes);
191              
192             Creates an L object.
193              
194             It takes named arguments to initialize attributes of the L object.
195             See L for the list of the attributes.
196              
197             C attribute is mandatory.
198              
199              
200             =head1 ATTRIBUTES
201              
202             An L object has the following attributes.
203              
204             You can initialize the attributes in C method.
205             You can get and set the attributes of an L object via their accessor methods (See L).
206              
207             Note that you cannot set any attribute listed here while there is a task running in the L object.
208             This is because changing the attributes during task execution is very confusing and leads to unpredictable behavior.
209             So if you want to set an attribute, make sure there is no task running (C method can be useful).
210              
211             =head2 worker (CODE($task, $callback, $queue), mandatory)
212              
213             C attribute is a subroutine reference that processes a task. It must not be C.
214              
215             C subroutine reference takes three arguments, C<$task>, C<$callback> and C<$queue>.
216              
217             C<$task> is the task object the C is supposed to process.
218              
219             C<$callback> is a callback subroutine reference that C must call when the task is finished.
220             C<$callback> can take any list of arguments, which will be passed to the C<$finish_callback> given to the C method
221             (See L).
222              
223             C<$queue> is the L object that holds the worker.
224              
225             So the C attribute is something like:
226              
227             my $q = Async::Queue->new(worker => sub {
228             my ($task, $callback, $queue) = @_;
229             my @results = some_processing($task);
230             $callback->(@results);
231             });
232              
233             You can do asynchonous processing by deferring the call to C<$callback>:
234              
235             my $q = Async::Queue->new(worker => sub {
236             my ($task, $callback, $queue) = @_;
237             some_async_processing($task, on_finish => sub {
238             my @results = @_;
239             $callback->(@results);
240             });
241             });
242              
243              
244             =head2 concurrency (INT, optional, default = 1)
245              
246             C attribute is the maximum number of tasks that can be processed at the same time.
247             It must be an integer number.
248              
249             If C is set to 0 or any negative number, the concurrency level becomes infinite,
250             i.e. pushed tasks are immediately processed no matter how many are already running.
251              
252             If C is set to C (or omitted in C method), it will be 1.
253              
254              
255             =head2 saturated (CODE($queue), optional, default = undef)
256              
257             C attribute is a subroutine reference that is called when the number of running tasks hits C.
258             This means further tasks will wait in the queue.
259              
260             C subroutine reference takes one argument (C<$queue>), which is the L object holding it.
261              
262              
263             =head2 empty (CODE($queue), optional, default = undef)
264              
265             C attribute is a subroutine reference that is called when the last task from the queue is given to the worker.
266             This means there is no task waiting in the L object.
267              
268             If the L object is not saturated, C subroutine is called every time a task is pushed.
269             This is because every pushed task goes into the queue first even if the L object can process the task immediately.
270              
271             C subroutine reference takes one argument (C<$queue>), which is the L object holding it.
272              
273             =head2 drain (CODE($queue), optional, default = undef)
274              
275             C attribute is a subroutine reference that is called when the last task in the L object has finished.
276             This means there is no task running or waiting in the L object.
277              
278             C subroutine reference takes one argument (C<$queue>), which is the C object holding it.
279              
280             =head1 OBJECT METHODS
281              
282             =head2 $queue->push($task, [$finish_callback->(@results)] );
283              
284             Pushes a task into the L object.
285             The argument C<$task> is mandatory, while C<$finish_callback> is optional.
286              
287             C<$task> is a task that the worker will process. It will be given as the C<$task> argument to the C subroutine.
288              
289             C<$finish_callback> is a subroutine reference that will be called when the worker finishes processing the task.
290             The arguments for C<$finish_callback> (C<@results>) are the arguments for the C<$callback> subroutine reference in the C subroutine.
291              
292             C method returns the L object.
293              
294             =head2 $running_num = $queue->running();
295              
296             Returns the number of currently running tasks in the L object.
297              
298             =head2 $waiting_num = $queue->waiting();
299              
300             Returns the number of waiting tasks in the L object.
301              
302             =head2 $waiting_num = $queue->length();
303              
304             Alias for C method. It returns the number of waiting tasks in the L object.
305              
306             =head2 $worker = $queue->worker([$new_worker]);
307              
308             Accessor for the C attribute.
309              
310             =head2 $concurrency = $queue->concurrency([$new_concurrency]);
311              
312             Accessor for the C attribute.
313              
314             =head2 $saturated = $queue->saturated([$new_saturated]);
315              
316             Accessor for the C attribute.
317              
318             =head2 $empty = $queue->empty([$new_empty]);
319              
320             Accessor for the C attribute.
321              
322             =head2 $drain = $queue->drain([$new_drain]);
323              
324             Accessor for the C attribute.
325              
326             =head1 EXAMPLE
327              
328             =head2 Concurrent HTTP downloader
329              
330             use strict;
331             use warnings;
332             use AnyEvent;
333             use AnyEvent::HTTP;
334             use Async::Queue;
335            
336             my $q = Async::Queue->new(concurrency => 3, worker => sub {
337             my ($url, $callback) = @_;
338             print STDERR "Start $url\n";
339             http_get $url, sub {
340             my ($data, $headers) = @_;
341             print STDERR "End $url\n";
342             $callback->($data);
343             };
344             });
345            
346             my @urls = (
347             'http://www.debian.org/',
348             'http://www.ubuntu.com/',
349             'http://fedoraproject.org/',
350             'http://www.opensuse.org/',
351             'http://www.centos.org/',
352             'http://www.slackware.com/',
353             'http://www.gentoo.org/',
354             'http://www.archlinux.org/',
355             'http://trisquel.info/',
356             );
357            
358             my %results = ();
359             my $cv = AnyEvent->condvar;
360             foreach my $url (@urls) {
361             $cv->begin();
362             $q->push($url, sub {
363             my ($data) = @_;
364             $results{$url} = $data;
365             $cv->end();
366             });
367             }
368             $cv->recv;
369            
370             foreach my $key (keys %results) {
371             print STDERR "$key: " . length($results{$key}) . "bytes\n";
372             }
373              
374             This example uses L to send HTTP GET requests for multiple URLs simultaneously.
375             While simultaneous requests dramatically improve efficiency, it may overload the client host
376             and/or the network.
377              
378             This is where L comes in handy. With L you can control the concurrency level
379             of the HTTP sessions (in this case, up to three).
380              
381              
382              
383             =head1 SEE ALSO
384              
385             =over
386              
387             =item L
388              
389             The goal of L is the same as that of L: to control concurrency level of asynchronous tasks.
390             The big difference is that L is a queue of subroutines while L is a queue of tasks (data).
391             In L, worker subroutine is registered with the object in advance.
392             In L, it is workers that are pushed to the queue.
393              
394             You can emulate L with L by pushing subroutine references to it as tasks.
395              
396             =back
397              
398              
399             =head1 AUTHOR
400              
401             Toshio Ito, C<< >>
402              
403             =head1 REPOSITORY
404              
405             L
406              
407             =head1 BUGS
408              
409             Please report any bugs or feature requests to C, or through
410             the web interface at L. I will be notified, and then you'll
411             automatically be notified of progress on your bug as I make changes.
412              
413              
414              
415             =head1 SUPPORT
416              
417             You can find documentation for this module with the perldoc command.
418              
419             perldoc Async::Queue
420              
421              
422             You can also look for information at:
423              
424             =over 4
425              
426             =item * RT: CPAN's request tracker (report bugs here)
427              
428             L
429              
430             =item * AnnoCPAN: Annotated CPAN documentation
431              
432             L
433              
434             =item * CPAN Ratings
435              
436             L
437              
438             =item * Search CPAN
439              
440             L
441              
442             =back
443              
444              
445             =head1 LICENSE AND COPYRIGHT
446              
447             Copyright 2012 Toshio Ito.
448              
449             This program is free software; you can redistribute it and/or modify it
450             under the terms of either: the GNU General Public License as published
451             by the Free Software Foundation; or the Artistic License.
452              
453             See http://dev.perl.org/licenses/ for more information.
454              
455              
456             =cut
457              
458             1; # End of Async::Queue