|  line  | 
 stmt  | 
 bran  | 
 cond  | 
 sub  | 
 pod  | 
 time  | 
 code  | 
| 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 package Async::Queue;  | 
| 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
3
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
177506
 | 
 use 5.006;  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
25
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
253
 | 
    | 
| 
4
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
42
 | 
 use strict;  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
12
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
198
 | 
    | 
| 
5
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
46
 | 
 use warnings;  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
17
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
191
 | 
    | 
| 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
7
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
37
 | 
 use Carp;  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
26
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
625
 | 
    | 
| 
8
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
37
 | 
 use Scalar::Util qw(looks_like_number);  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
13
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1680
 | 
    | 
| 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub new {  | 
| 
11
 | 
71
 | 
 
 | 
 
 | 
  
71
  
 | 
  
1
  
 | 
76318
 | 
     my ($class, %options) = @_;  | 
| 
12
 | 
71
 | 
 
 | 
 
 | 
 
 | 
 
 | 
473
 | 
     my $self = bless {  | 
| 
13
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         concurrency => 1,  | 
| 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         worker => undef,  | 
| 
15
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         drain => undef,  | 
| 
16
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         empty => undef,  | 
| 
17
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         saturated => undef,  | 
| 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         task_queue => [],  | 
| 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         running => 0,  | 
| 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }, $class;  | 
| 
21
 | 
71
 | 
 
 | 
 
 | 
 
 | 
 
 | 
425
 | 
     $self->$_($options{$_}) foreach qw(concurrency worker drain empty saturated);  | 
| 
22
 | 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
184
 | 
     return $self;  | 
| 
23
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
25
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _define_hook_accessors {  | 
| 
26
 | 
24
 | 
 
 | 
 
 | 
  
24
  
 | 
 
 | 
66
 | 
     my ($name, %options) = @_;  | 
| 
27
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
32
 | 
     my $class = __PACKAGE__;  | 
| 
28
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
50
 | 
     my $fullname = "${class}::$name";  | 
| 
29
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
 
 | 
177
 | 
     no strict 'refs';  | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
16
 | 
    | 
| 
 
 | 
6
 | 
 
 | 
 
 | 
 
 | 
 
 | 
6271
 | 
    | 
| 
30
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
161
 | 
     *{$fullname} = sub {  | 
| 
31
 | 
2844
 | 
 
 | 
 
 | 
  
2844
  
 | 
 
 | 
29556
 | 
         my ($self, $v) = @_;  | 
| 
32
 | 
2844
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
5968
 | 
         if(@_ > 1) {  | 
| 
33
 | 
303
 | 
  
100
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
926
 | 
             croak "$name must not be undef." if !defined($v) && !$options{allow_undef};  | 
| 
34
 | 
300
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
1449
 | 
             croak "$name must be a coderef" if defined($v) && ref($v) ne 'CODE';  | 
| 
35
 | 
260
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
523
 | 
             croak "You cannot set $name while there is a running task." if $self->running > 0;  | 
| 
36
 | 
248
 | 
 
 | 
 
 | 
 
 | 
 
 | 
754
 | 
             $self->{$name} = $v;  | 
| 
37
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
38
 | 
2789
 | 
 
 | 
 
 | 
 
 | 
 
 | 
9077
 | 
         return $self->{$name};  | 
| 
39
 | 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
98
 | 
     };  | 
| 
40
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
41
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
42
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub running {  | 
| 
43
 | 
7339
 | 
 
 | 
 
 | 
  
7339
  
 | 
  
1
  
 | 
9691
 | 
     my ($self) = @_;  | 
| 
44
 | 
7339
 | 
 
 | 
 
 | 
 
 | 
 
 | 
16261
 | 
     return $self->{running};  | 
| 
45
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
46
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
47
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub concurrency {  | 
| 
48
 | 
11267
 | 
 
 | 
 
 | 
  
11267
  
 | 
  
1
  
 | 
29600
 | 
     my ($self, $conc) = @_;  | 
| 
49
 | 
11267
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
24998
 | 
     if(@_ > 1) {  | 
| 
50
 | 
91
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
231
 | 
         croak "You cannot set concurrency while there is a running task" if $self->running > 0;  | 
| 
51
 | 
88
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
223
 | 
         $conc = 1 if not defined($conc);  | 
| 
52
 | 
88
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
466
 | 
         croak "concurrency must be a number" if !looks_like_number($conc);  | 
| 
53
 | 
86
 | 
 
 | 
 
 | 
 
 | 
 
 | 
159
 | 
         $self->{concurrency} = int($conc);  | 
| 
54
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
55
 | 
11262
 | 
 
 | 
 
 | 
 
 | 
 
 | 
39062
 | 
     return $self->{concurrency};  | 
| 
56
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
57
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
58
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub length {  | 
| 
59
 | 
946
 | 
 
 | 
 
 | 
  
946
  
 | 
  
1
  
 | 
6831
 | 
     my ($self) = @_;  | 
| 
60
 | 
946
 | 
 
 | 
 
 | 
 
 | 
 
 | 
972
 | 
     return int(@{$self->{task_queue}});  | 
| 
 
 | 
946
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3575
 | 
    | 
| 
61
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
62
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
63
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 *waiting = \&length;  | 
| 
64
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
65
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 _define_hook_accessors 'worker';  | 
| 
66
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 _define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);  | 
| 
67
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
68
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub push {  | 
| 
69
 | 
2155
 | 
 
 | 
 
 | 
  
2155
  
 | 
  
1
  
 | 
41919
 | 
     my ($self, $task, $cb) = @_;  | 
| 
70
 | 
2155
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
4096
 | 
     if(@_ < 2) {  | 
| 
71
 | 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
20
 | 
         croak("You must specify something to push.");  | 
| 
72
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
73
 | 
2154
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
6686
 | 
     if(defined($cb) && ref($cb) ne 'CODE') {  | 
| 
74
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
58
 | 
         croak("callback for a task must be a coderef");  | 
| 
75
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
76
 | 
2149
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2332
 | 
     push(@{$self->{task_queue}}, [$task, $cb]);  | 
| 
 
 | 
2149
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4916
 | 
    | 
| 
77
 | 
2149
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3888
 | 
     $self->_shift_run(1);  | 
| 
78
 | 
2145
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3965
 | 
     return $self;  | 
| 
79
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
80
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
81
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub _shift_run {  | 
| 
82
 | 
4280
 | 
 
 | 
 
 | 
  
4280
  
 | 
 
 | 
4855
 | 
     my ($self, $from_push) = @_;  | 
| 
83
 | 
4280
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
6666
 | 
     return if $self->concurrency > 0 && $self->running >= $self->concurrency;  | 
| 
84
 | 
2238
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2476
 | 
     my $args_ref = shift(@{$self->{task_queue}});  | 
| 
 
 | 
2238
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4231
 | 
    | 
| 
85
 | 
2238
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
4657
 | 
     return if !defined($args_ref);  | 
| 
86
 | 
2144
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2986
 | 
     my ($task, $cb) = @$args_ref;  | 
| 
87
 | 
2144
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2649
 | 
     $self->{running} += 1;  | 
| 
88
 | 
2144
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
3514
 | 
     if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {  | 
| 
 
 | 
 
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
89
 | 
26
 | 
 
 | 
 
 | 
 
 | 
 
 | 
58
 | 
         $self->saturated->($self);  | 
| 
90
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
91
 | 
2143
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
12060
 | 
     if(@{$self->{task_queue}} == 0 && defined($self->empty)) {  | 
| 
 
 | 
2143
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5642
 | 
    | 
| 
92
 | 
114
 | 
 
 | 
 
 | 
 
 | 
 
 | 
311
 | 
         $self->empty->($self);  | 
| 
93
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
94
 | 
2142
 | 
 
 | 
 
 | 
 
 | 
 
 | 
43672
 | 
     my $sync = 1;  | 
| 
95
 | 
2142
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4296
 | 
     my $sync_completed = 0;  | 
| 
96
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $self->worker->($task, sub {  | 
| 
97
 | 
2132
 | 
 
 | 
 
 | 
  
2132
  
 | 
 
 | 
105999
 | 
         my (@worker_results) = @_;  | 
| 
98
 | 
2132
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
5156
 | 
         $cb->(@worker_results) if defined($cb);  | 
| 
99
 | 
2132
 | 
 
 | 
 
 | 
 
 | 
 
 | 
14793
 | 
         $self->{running} -= 1;  | 
| 
100
 | 
2132
 | 
  
100
  
 | 
  
100
  
 | 
 
 | 
 
 | 
2104
 | 
         if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {  | 
| 
 
 | 
2132
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
5810
 | 
    | 
| 
101
 | 
32
 | 
 
 | 
 
 | 
 
 | 
 
 | 
67
 | 
             $self->drain->($self);  | 
| 
102
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
103
 | 
2131
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
15246
 | 
         if($sync) {  | 
| 
104
 | 
2104
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4018
 | 
             $sync_completed = 1;  | 
| 
105
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }else {  | 
| 
106
 | 
27
 | 
 
 | 
 
 | 
 
 | 
 
 | 
59
 | 
             @_ = ($self);  | 
| 
107
 | 
27
 | 
 
 | 
 
 | 
 
 | 
 
 | 
95
 | 
             goto &_shift_run;  | 
| 
108
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
109
 | 
2142
 | 
 
 | 
 
 | 
 
 | 
 
 | 
9285
 | 
     }, $self);  | 
| 
110
 | 
2140
 | 
 
 | 
 
 | 
 
 | 
 
 | 
15369
 | 
     $sync = 0;  | 
| 
111
 | 
2140
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
7329
 | 
     if($sync_completed) {  | 
| 
112
 | 
2104
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3490
 | 
         @_ = ($self);  | 
| 
113
 | 
2104
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5731
 | 
         goto &_shift_run;  | 
| 
114
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
115
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
116
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
117
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
118
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 NAME  | 
| 
119
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
120
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Async::Queue - control concurrency of asynchronous tasks  | 
| 
121
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
122
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 VERSION  | 
| 
123
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
124
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Version 0.021  | 
| 
125
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
126
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =cut  | 
| 
127
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
128
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 our $VERSION = '0.021';  | 
| 
129
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
130
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
131
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 SYNOPSIS  | 
| 
132
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
133
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
134
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use Async::Queue;  | 
| 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
136
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     ## create a queue object with concurrency 2  | 
| 
137
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my $q = Async::Queue->new(  | 
| 
138
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         concurrency => 2, worker => sub {  | 
| 
139
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             my ($task, $callback) = @_;  | 
| 
140
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             print "hello $task->{name}\n";  | 
| 
141
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $callback->();  | 
| 
142
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
143
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     );  | 
| 
144
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
145
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     ## assign a callback  | 
| 
146
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $q->drain(sub {  | 
| 
147
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         print "all items have been processed\n";  | 
| 
148
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
149
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
150
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     ## add some items to the queue  | 
| 
151
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $q->push({name => 'foo'}, sub {  | 
| 
152
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         print "finished processing foo\n";  | 
| 
153
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
154
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $q->push({name => 'bar'}, sub {  | 
| 
155
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         print "finished processing bar\n";  | 
| 
156
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
157
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
158
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
159
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 DESCRIPTION  | 
| 
160
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
161
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L is used to process tasks with the specified concurrency.  | 
| 
162
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The tasks given to L are processed in parallel with its worker routine up to the concurrency level.  | 
| 
163
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 If more tasks arrive at the L object, those tasks will wait for currently running tasks to finish.  | 
| 
164
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.  | 
| 
165
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 In short, L is a Perl port of the C object of async.js (L).  | 
| 
167
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
168
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The basic usage of L is as follows:  | 
| 
169
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
170
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =over  | 
| 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
172
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item 1.  | 
| 
173
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
174
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Create L object with C attribute and optional C attribute.  | 
| 
175
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C is a subroutine reference that processes tasks. C is the concurrency level.  | 
| 
176
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item 2.  | 
| 
178
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
179
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Push tasks to the L object via C method with optional callback functions.  | 
| 
180
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
181
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The tasks will be processed in FIFO order by the C subroutine.  | 
| 
182
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 When a task is finished, the callback function, if any, is called with the results.  | 
| 
183
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
184
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
185
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =back  | 
| 
186
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
187
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
188
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 CLASS METHODS  | 
| 
189
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
190
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $queue = Async::Queue->new(%attributes);  | 
| 
191
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
192
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Creates an L object.  | 
| 
193
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
194
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 It takes named arguments to initialize attributes of the L object.  | 
| 
195
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 See L for the list of the attributes.  | 
| 
196
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
197
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is mandatory.  | 
| 
198
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
199
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
200
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 ATTRIBUTES  | 
| 
201
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
202
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 An L object has the following attributes.  | 
| 
203
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
204
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can initialize the attributes in C method.  | 
| 
205
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can get and set the attributes of an L object via their accessor methods (See L"OBJECT METHODS">).  | 
| 
206
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
207
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Note that you cannot set any attribute listed here while there is a task running in the L object.  | 
| 
208
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This is because changing the attributes during task execution is very confusing and leads to unpredictable behavior.  | 
| 
209
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 So if you want to set an attribute, make sure there is no task running (C method can be useful).  | 
| 
210
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
211
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 worker (CODE($task, $callback, $queue), mandatory)  | 
| 
212
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
213
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is a subroutine reference that processes a task. It must not be C.  | 
| 
214
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
215
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C subroutine reference takes three arguments, C<$task>, C<$callback> and C<$queue>.  | 
| 
216
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
217
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$task> is the task object the C is supposed to process.  | 
| 
218
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
219
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$callback> is a callback subroutine reference that C must call when the task is finished.  | 
| 
220
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$callback> can take any list of arguments, which will be passed to the C<$finish_callback> given to the C method  | 
| 
221
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 (See L"OBJECT METHODS">).  | 
| 
222
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
223
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$queue> is the L object that holds the worker.  | 
| 
224
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
225
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 So the C attribute is something like:  | 
| 
226
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
227
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my $q = Async::Queue->new(worker => sub {  | 
| 
228
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         my ($task, $callback, $queue) = @_;  | 
| 
229
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         my @results = some_processing($task);  | 
| 
230
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         $callback->(@results);  | 
| 
231
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
232
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
233
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can do asynchonous processing by deferring the call to C<$callback>:  | 
| 
234
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
235
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my $q = Async::Queue->new(worker => sub {  | 
| 
236
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         my ($task, $callback, $queue) = @_;  | 
| 
237
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         some_async_processing($task, on_finish => sub {  | 
| 
238
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             my @results = @_;  | 
| 
239
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $callback->(@results);  | 
| 
240
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         });  | 
| 
241
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
242
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
244
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 concurrency (INT, optional, default = 1)  | 
| 
245
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
246
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is the maximum number of tasks that can be processed at the same time.  | 
| 
247
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 It must be an integer number.  | 
| 
248
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
249
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 If C is set to 0 or any negative number, the concurrency level becomes infinite,  | 
| 
250
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 i.e. pushed tasks are immediately processed no matter how many are already running.  | 
| 
251
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
252
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 If C is set to C (or omitted in C method), it will be 1.  | 
| 
253
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
254
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
255
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 saturated (CODE($queue), optional, default = undef)  | 
| 
256
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
257
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is a subroutine reference that is called when the number of running tasks hits C.  | 
| 
258
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This means further tasks will wait in the queue.  | 
| 
259
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
260
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C subroutine reference takes one argument (C<$queue>), which is the L object holding it.  | 
| 
261
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
262
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
263
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 empty (CODE($queue), optional, default = undef)  | 
| 
264
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
265
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is a subroutine reference that is called when the last task from the queue is given to the worker.  | 
| 
266
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This means there is no task waiting in the L object.  | 
| 
267
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
268
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 If the L object is not saturated, C subroutine is called every time a task is pushed.  | 
| 
269
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This is because every pushed task goes into the queue first even if the L object can process the task immediately.  | 
| 
270
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
271
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C subroutine reference takes one argument (C<$queue>), which is the L object holding it.  | 
| 
272
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
273
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 drain (CODE($queue), optional, default = undef)  | 
| 
274
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
275
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C attribute is a subroutine reference that is called when the last task in the L object has finished.  | 
| 
276
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This means there is no task running or waiting in the L object.  | 
| 
277
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
278
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C subroutine reference takes one argument (C<$queue>), which is the C object holding it.  | 
| 
279
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
280
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 OBJECT METHODS  | 
| 
281
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
282
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $queue->push($task, [$finish_callback->(@results)] );  | 
| 
283
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
284
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Pushes a task into the L object.  | 
| 
285
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The argument C<$task> is mandatory, while C<$finish_callback> is optional.  | 
| 
286
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
287
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$task> is a task that the worker will process. It will be given as the C<$task> argument to the C subroutine.  | 
| 
288
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
289
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C<$finish_callback> is a subroutine reference that will be called when the worker finishes processing the task.  | 
| 
290
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The arguments for C<$finish_callback> (C<@results>) are the arguments for the C<$callback> subroutine reference in the C subroutine.  | 
| 
291
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
292
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 C method returns the L object.  | 
| 
293
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
294
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $running_num = $queue->running();  | 
| 
295
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
296
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Returns the number of currently running tasks in the L object.  | 
| 
297
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
298
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $waiting_num = $queue->waiting();  | 
| 
299
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
300
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Returns the number of waiting tasks in the L object.  | 
| 
301
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
302
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $waiting_num = $queue->length();  | 
| 
303
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
304
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Alias for C method. It returns the number of waiting tasks in the L object.  | 
| 
305
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
306
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $worker = $queue->worker([$new_worker]);  | 
| 
307
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
308
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Accessor for the C attribute.  | 
| 
309
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
310
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $concurrency = $queue->concurrency([$new_concurrency]);  | 
| 
311
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
312
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Accessor for the C attribute.  | 
| 
313
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
314
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $saturated = $queue->saturated([$new_saturated]);  | 
| 
315
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
316
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Accessor for the C attribute.  | 
| 
317
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
318
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $empty = $queue->empty([$new_empty]);  | 
| 
319
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
320
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Accessor for the C attribute.  | 
| 
321
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
322
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 $drain = $queue->drain([$new_drain]);  | 
| 
323
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
324
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Accessor for the C attribute.  | 
| 
325
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
326
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 EXAMPLE  | 
| 
327
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
328
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head2 Concurrent HTTP downloader  | 
| 
329
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
330
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use strict;  | 
| 
331
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use warnings;  | 
| 
332
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use AnyEvent;  | 
| 
333
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use AnyEvent::HTTP;  | 
| 
334
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     use Async::Queue;  | 
| 
335
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
336
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my $q = Async::Queue->new(concurrency => 3, worker => sub {  | 
| 
337
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         my ($url, $callback) = @_;  | 
| 
338
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         print STDERR "Start $url\n";  | 
| 
339
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         http_get $url, sub {  | 
| 
340
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             my ($data, $headers) = @_;  | 
| 
341
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             print STDERR "End $url\n";  | 
| 
342
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $callback->($data);  | 
| 
343
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         };  | 
| 
344
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     });  | 
| 
345
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
346
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my @urls = (  | 
| 
347
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.debian.org/',  | 
| 
348
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.ubuntu.com/',  | 
| 
349
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://fedoraproject.org/',  | 
| 
350
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.opensuse.org/',  | 
| 
351
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.centos.org/',  | 
| 
352
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.slackware.com/',  | 
| 
353
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.gentoo.org/',  | 
| 
354
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://www.archlinux.org/',  | 
| 
355
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         'http://trisquel.info/',  | 
| 
356
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     );  | 
| 
357
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
358
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my %results = ();  | 
| 
359
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     my $cv = AnyEvent->condvar;  | 
| 
360
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     foreach my $url (@urls) {  | 
| 
361
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         $cv->begin();  | 
| 
362
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         $q->push($url, sub {  | 
| 
363
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             my ($data) = @_;  | 
| 
364
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $results{$url} = $data;  | 
| 
365
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $cv->end();  | 
| 
366
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         });  | 
| 
367
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
368
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $cv->recv;  | 
| 
369
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
       | 
| 
370
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     foreach my $key (keys %results) {  | 
| 
371
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         print STDERR "$key: " . length($results{$key}) . "bytes\n";  | 
| 
372
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
373
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
374
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This example uses L to send HTTP GET requests for multiple URLs simultaneously.  | 
| 
375
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 While simultaneous requests dramatically improve efficiency, it may overload the client host  | 
| 
376
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 and/or the network.  | 
| 
377
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
378
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This is where L comes in handy. With L you can control the concurrency level  | 
| 
379
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 of the HTTP sessions (in this case, up to three).  | 
| 
380
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
381
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
382
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
383
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 SEE ALSO  | 
| 
384
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
385
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =over  | 
| 
386
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
387
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item L  | 
| 
388
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
389
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The goal of L is the same as that of L: to control concurrency level of asynchronous tasks.  | 
| 
390
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 The big difference is that L is a queue of subroutines while L is a queue of tasks (data).  | 
| 
391
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 In L, worker subroutine is registered with the object in advance.  | 
| 
392
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 In L, it is workers that are pushed to the queue.  | 
| 
393
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
394
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can emulate L with L by pushing subroutine references to it as tasks.  | 
| 
395
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
396
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =back  | 
| 
397
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
398
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
399
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 AUTHOR  | 
| 
400
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
401
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Toshio Ito, C<<  >>  | 
| 
402
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
403
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 REPOSITORY  | 
| 
404
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
405
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L  | 
| 
406
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
407
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 BUGS  | 
| 
408
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
409
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Please report any bugs or feature requests to C, or through  | 
| 
410
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 the web interface at L.  I will be notified, and then you'll  | 
| 
411
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 automatically be notified of progress on your bug as I make changes.  | 
| 
412
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
413
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
414
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
415
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 SUPPORT  | 
| 
416
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
417
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can find documentation for this module with the perldoc command.  | 
| 
418
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
419
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     perldoc Async::Queue  | 
| 
420
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
421
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
422
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 You can also look for information at:  | 
| 
423
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
424
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =over 4  | 
| 
425
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
426
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item * RT: CPAN's request tracker (report bugs here)  | 
| 
427
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
428
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L  | 
| 
429
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
430
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item * AnnoCPAN: Annotated CPAN documentation  | 
| 
431
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
432
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L  | 
| 
433
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
434
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item * CPAN Ratings  | 
| 
435
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
436
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L  | 
| 
437
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
438
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =item * Search CPAN  | 
| 
439
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
440
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 L  | 
| 
441
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
442
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =back  | 
| 
443
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
444
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
445
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =head1 LICENSE AND COPYRIGHT  | 
| 
446
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
447
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 Copyright 2012 Toshio Ito.  | 
| 
448
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
449
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 This program is free software; you can redistribute it and/or modify it  | 
| 
450
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 under the terms of either: the GNU General Public License as published  | 
| 
451
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 by the Free Software Foundation; or the Artistic License.  | 
| 
452
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
453
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 See http://dev.perl.org/licenses/ for more information.  | 
| 
454
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
455
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
456
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 =cut  | 
| 
457
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
458
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 1; # End of Async::Queue  |