line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Async::Queue; |
2
|
|
|
|
|
|
|
|
3
|
6
|
|
|
6
|
|
177506
|
use 5.006; |
|
6
|
|
|
|
|
25
|
|
|
6
|
|
|
|
|
253
|
|
4
|
6
|
|
|
6
|
|
42
|
use strict; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
198
|
|
5
|
6
|
|
|
6
|
|
46
|
use warnings; |
|
6
|
|
|
|
|
17
|
|
|
6
|
|
|
|
|
191
|
|
6
|
|
|
|
|
|
|
|
7
|
6
|
|
|
6
|
|
37
|
use Carp; |
|
6
|
|
|
|
|
26
|
|
|
6
|
|
|
|
|
625
|
|
8
|
6
|
|
|
6
|
|
37
|
use Scalar::Util qw(looks_like_number); |
|
6
|
|
|
|
|
13
|
|
|
6
|
|
|
|
|
1680
|
|
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
sub new { |
11
|
71
|
|
|
71
|
1
|
76318
|
my ($class, %options) = @_; |
12
|
71
|
|
|
|
|
473
|
my $self = bless { |
13
|
|
|
|
|
|
|
concurrency => 1, |
14
|
|
|
|
|
|
|
worker => undef, |
15
|
|
|
|
|
|
|
drain => undef, |
16
|
|
|
|
|
|
|
empty => undef, |
17
|
|
|
|
|
|
|
saturated => undef, |
18
|
|
|
|
|
|
|
task_queue => [], |
19
|
|
|
|
|
|
|
running => 0, |
20
|
|
|
|
|
|
|
}, $class; |
21
|
71
|
|
|
|
|
425
|
$self->$_($options{$_}) foreach qw(concurrency worker drain empty saturated); |
22
|
48
|
|
|
|
|
184
|
return $self; |
23
|
|
|
|
|
|
|
} |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
sub _define_hook_accessors { |
26
|
24
|
|
|
24
|
|
66
|
my ($name, %options) = @_; |
27
|
24
|
|
|
|
|
32
|
my $class = __PACKAGE__; |
28
|
24
|
|
|
|
|
50
|
my $fullname = "${class}::$name"; |
29
|
6
|
|
|
6
|
|
177
|
no strict 'refs'; |
|
6
|
|
|
|
|
16
|
|
|
6
|
|
|
|
|
6271
|
|
30
|
24
|
|
|
|
|
161
|
*{$fullname} = sub { |
31
|
2844
|
|
|
2844
|
|
29556
|
my ($self, $v) = @_; |
32
|
2844
|
100
|
|
|
|
5968
|
if(@_ > 1) { |
33
|
303
|
100
|
66
|
|
|
926
|
croak "$name must not be undef." if !defined($v) && !$options{allow_undef}; |
34
|
300
|
100
|
100
|
|
|
1449
|
croak "$name must be a coderef" if defined($v) && ref($v) ne 'CODE'; |
35
|
260
|
100
|
|
|
|
523
|
croak "You cannot set $name while there is a running task." if $self->running > 0; |
36
|
248
|
|
|
|
|
754
|
$self->{$name} = $v; |
37
|
|
|
|
|
|
|
} |
38
|
2789
|
|
|
|
|
9077
|
return $self->{$name}; |
39
|
24
|
|
|
|
|
98
|
}; |
40
|
|
|
|
|
|
|
} |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
sub running { |
43
|
7339
|
|
|
7339
|
1
|
9691
|
my ($self) = @_; |
44
|
7339
|
|
|
|
|
16261
|
return $self->{running}; |
45
|
|
|
|
|
|
|
} |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
sub concurrency { |
48
|
11267
|
|
|
11267
|
1
|
29600
|
my ($self, $conc) = @_; |
49
|
11267
|
100
|
|
|
|
24998
|
if(@_ > 1) { |
50
|
91
|
100
|
|
|
|
231
|
croak "You cannot set concurrency while there is a running task" if $self->running > 0; |
51
|
88
|
100
|
|
|
|
223
|
$conc = 1 if not defined($conc); |
52
|
88
|
100
|
|
|
|
466
|
croak "concurrency must be a number" if !looks_like_number($conc); |
53
|
86
|
|
|
|
|
159
|
$self->{concurrency} = int($conc); |
54
|
|
|
|
|
|
|
} |
55
|
11262
|
|
|
|
|
39062
|
return $self->{concurrency}; |
56
|
|
|
|
|
|
|
} |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
sub length { |
59
|
946
|
|
|
946
|
1
|
6831
|
my ($self) = @_; |
60
|
946
|
|
|
|
|
972
|
return int(@{$self->{task_queue}}); |
|
946
|
|
|
|
|
3575
|
|
61
|
|
|
|
|
|
|
} |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
*waiting = \&length; |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
_define_hook_accessors 'worker'; |
66
|
|
|
|
|
|
|
_define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated); |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
sub push { |
69
|
2155
|
|
|
2155
|
1
|
41919
|
my ($self, $task, $cb) = @_; |
70
|
2155
|
100
|
|
|
|
4096
|
if(@_ < 2) { |
71
|
1
|
|
|
|
|
20
|
croak("You must specify something to push."); |
72
|
|
|
|
|
|
|
} |
73
|
2154
|
100
|
100
|
|
|
6686
|
if(defined($cb) && ref($cb) ne 'CODE') { |
74
|
5
|
|
|
|
|
58
|
croak("callback for a task must be a coderef"); |
75
|
|
|
|
|
|
|
} |
76
|
2149
|
|
|
|
|
2332
|
push(@{$self->{task_queue}}, [$task, $cb]); |
|
2149
|
|
|
|
|
4916
|
|
77
|
2149
|
|
|
|
|
3888
|
$self->_shift_run(1); |
78
|
2145
|
|
|
|
|
3965
|
return $self; |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sub _shift_run { |
82
|
4280
|
|
|
4280
|
|
4855
|
my ($self, $from_push) = @_; |
83
|
4280
|
100
|
100
|
|
|
6666
|
return if $self->concurrency > 0 && $self->running >= $self->concurrency; |
84
|
2238
|
|
|
|
|
2476
|
my $args_ref = shift(@{$self->{task_queue}}); |
|
2238
|
|
|
|
|
4231
|
|
85
|
2238
|
100
|
|
|
|
4657
|
return if !defined($args_ref); |
86
|
2144
|
|
|
|
|
2986
|
my ($task, $cb) = @$args_ref; |
87
|
2144
|
|
|
|
|
2649
|
$self->{running} += 1; |
88
|
2144
|
100
|
100
|
|
|
3514
|
if($self->running == $self->concurrency && $from_push && defined($self->saturated)) { |
|
|
|
100
|
|
|
|
|
89
|
26
|
|
|
|
|
58
|
$self->saturated->($self); |
90
|
|
|
|
|
|
|
} |
91
|
2143
|
100
|
100
|
|
|
12060
|
if(@{$self->{task_queue}} == 0 && defined($self->empty)) { |
|
2143
|
|
|
|
|
5642
|
|
92
|
114
|
|
|
|
|
311
|
$self->empty->($self); |
93
|
|
|
|
|
|
|
} |
94
|
2142
|
|
|
|
|
43672
|
my $sync = 1; |
95
|
2142
|
|
|
|
|
4296
|
my $sync_completed = 0; |
96
|
|
|
|
|
|
|
$self->worker->($task, sub { |
97
|
2132
|
|
|
2132
|
|
105999
|
my (@worker_results) = @_; |
98
|
2132
|
100
|
|
|
|
5156
|
$cb->(@worker_results) if defined($cb); |
99
|
2132
|
|
|
|
|
14793
|
$self->{running} -= 1; |
100
|
2132
|
100
|
100
|
|
|
2104
|
if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) { |
|
2132
|
|
100
|
|
|
5810
|
|
101
|
32
|
|
|
|
|
67
|
$self->drain->($self); |
102
|
|
|
|
|
|
|
} |
103
|
2131
|
100
|
|
|
|
15246
|
if($sync) { |
104
|
2104
|
|
|
|
|
4018
|
$sync_completed = 1; |
105
|
|
|
|
|
|
|
}else { |
106
|
27
|
|
|
|
|
59
|
@_ = ($self); |
107
|
27
|
|
|
|
|
95
|
goto &_shift_run; |
108
|
|
|
|
|
|
|
} |
109
|
2142
|
|
|
|
|
9285
|
}, $self); |
110
|
2140
|
|
|
|
|
15369
|
$sync = 0; |
111
|
2140
|
100
|
|
|
|
7329
|
if($sync_completed) { |
112
|
2104
|
|
|
|
|
3490
|
@_ = ($self); |
113
|
2104
|
|
|
|
|
5731
|
goto &_shift_run; |
114
|
|
|
|
|
|
|
} |
115
|
|
|
|
|
|
|
} |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=head1 NAME |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
Async::Queue - control concurrency of asynchronous tasks |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
=head1 VERSION |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
Version 0.021 |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=cut |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
our $VERSION = '0.021'; |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
=head1 SYNOPSIS |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
use Async::Queue; |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
## create a queue object with concurrency 2 |
137
|
|
|
|
|
|
|
my $q = Async::Queue->new( |
138
|
|
|
|
|
|
|
concurrency => 2, worker => sub { |
139
|
|
|
|
|
|
|
my ($task, $callback) = @_; |
140
|
|
|
|
|
|
|
print "hello $task->{name}\n"; |
141
|
|
|
|
|
|
|
$callback->(); |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
); |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
## assign a callback |
146
|
|
|
|
|
|
|
$q->drain(sub { |
147
|
|
|
|
|
|
|
print "all items have been processed\n"; |
148
|
|
|
|
|
|
|
}); |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
## add some items to the queue |
151
|
|
|
|
|
|
|
$q->push({name => 'foo'}, sub { |
152
|
|
|
|
|
|
|
print "finished processing foo\n"; |
153
|
|
|
|
|
|
|
}); |
154
|
|
|
|
|
|
|
$q->push({name => 'bar'}, sub { |
155
|
|
|
|
|
|
|
print "finished processing bar\n"; |
156
|
|
|
|
|
|
|
}); |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
=head1 DESCRIPTION |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
L is used to process tasks with the specified concurrency. |
162
|
|
|
|
|
|
|
The tasks given to L are processed in parallel with its worker routine up to the concurrency level. |
163
|
|
|
|
|
|
|
If more tasks arrive at the L object, those tasks will wait for currently running tasks to finish. |
164
|
|
|
|
|
|
|
When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order. |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
In short, L is a Perl port of the C object of async.js (L). |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
The basic usage of L is as follows: |
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
=over |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
=item 1. |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
Create L object with C attribute and optional C attribute. |
175
|
|
|
|
|
|
|
C is a subroutine reference that processes tasks. C is the concurrency level. |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
=item 2. |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
Push tasks to the L object via C method with optional callback functions. |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
The tasks will be processed in FIFO order by the C subroutine. |
182
|
|
|
|
|
|
|
When a task is finished, the callback function, if any, is called with the results. |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
=back |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=head1 CLASS METHODS |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
=head2 $queue = Async::Queue->new(%attributes); |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Creates an L object. |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
It takes named arguments to initialize attributes of the L object. |
195
|
|
|
|
|
|
|
See L for the list of the attributes. |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
C attribute is mandatory. |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
An L object has the following attributes. |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
You can initialize the attributes in C method. |
205
|
|
|
|
|
|
|
You can get and set the attributes of an L object via their accessor methods (See L"OBJECT METHODS">). |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
Note that you cannot set any attribute listed here while there is a task running in the L object. |
208
|
|
|
|
|
|
|
This is because changing the attributes during task execution is very confusing and leads to unpredictable behavior. |
209
|
|
|
|
|
|
|
So if you want to set an attribute, make sure there is no task running (C method can be useful). |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
=head2 worker (CODE($task, $callback, $queue), mandatory) |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
C attribute is a subroutine reference that processes a task. It must not be C. |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
C subroutine reference takes three arguments, C<$task>, C<$callback> and C<$queue>. |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
C<$task> is the task object the C is supposed to process. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
C<$callback> is a callback subroutine reference that C must call when the task is finished. |
220
|
|
|
|
|
|
|
C<$callback> can take any list of arguments, which will be passed to the C<$finish_callback> given to the C method |
221
|
|
|
|
|
|
|
(See L"OBJECT METHODS">). |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
C<$queue> is the L object that holds the worker. |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
So the C attribute is something like: |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
my $q = Async::Queue->new(worker => sub { |
228
|
|
|
|
|
|
|
my ($task, $callback, $queue) = @_; |
229
|
|
|
|
|
|
|
my @results = some_processing($task); |
230
|
|
|
|
|
|
|
$callback->(@results); |
231
|
|
|
|
|
|
|
}); |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
You can do asynchonous processing by deferring the call to C<$callback>: |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
my $q = Async::Queue->new(worker => sub { |
236
|
|
|
|
|
|
|
my ($task, $callback, $queue) = @_; |
237
|
|
|
|
|
|
|
some_async_processing($task, on_finish => sub { |
238
|
|
|
|
|
|
|
my @results = @_; |
239
|
|
|
|
|
|
|
$callback->(@results); |
240
|
|
|
|
|
|
|
}); |
241
|
|
|
|
|
|
|
}); |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=head2 concurrency (INT, optional, default = 1) |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
C attribute is the maximum number of tasks that can be processed at the same time. |
247
|
|
|
|
|
|
|
It must be an integer number. |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
If C is set to 0 or any negative number, the concurrency level becomes infinite, |
250
|
|
|
|
|
|
|
i.e. pushed tasks are immediately processed no matter how many are already running. |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
If C is set to C (or omitted in C method), it will be 1. |
253
|
|
|
|
|
|
|
|
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
=head2 saturated (CODE($queue), optional, default = undef) |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
C attribute is a subroutine reference that is called when the number of running tasks hits C. |
258
|
|
|
|
|
|
|
This means further tasks will wait in the queue. |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
C subroutine reference takes one argument (C<$queue>), which is the L object holding it. |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=head2 empty (CODE($queue), optional, default = undef) |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
C attribute is a subroutine reference that is called when the last task from the queue is given to the worker. |
266
|
|
|
|
|
|
|
This means there is no task waiting in the L object. |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
If the L object is not saturated, C subroutine is called every time a task is pushed. |
269
|
|
|
|
|
|
|
This is because every pushed task goes into the queue first even if the L object can process the task immediately. |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
C subroutine reference takes one argument (C<$queue>), which is the L object holding it. |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
=head2 drain (CODE($queue), optional, default = undef) |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
C attribute is a subroutine reference that is called when the last task in the L object has finished. |
276
|
|
|
|
|
|
|
This means there is no task running or waiting in the L object. |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
C subroutine reference takes one argument (C<$queue>), which is the C object holding it. |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
=head1 OBJECT METHODS |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
=head2 $queue->push($task, [$finish_callback->(@results)] ); |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
Pushes a task into the L object. |
285
|
|
|
|
|
|
|
The argument C<$task> is mandatory, while C<$finish_callback> is optional. |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
C<$task> is a task that the worker will process. It will be given as the C<$task> argument to the C subroutine. |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
C<$finish_callback> is a subroutine reference that will be called when the worker finishes processing the task. |
290
|
|
|
|
|
|
|
The arguments for C<$finish_callback> (C<@results>) are the arguments for the C<$callback> subroutine reference in the C subroutine. |
291
|
|
|
|
|
|
|
|
292
|
|
|
|
|
|
|
C method returns the L object. |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
=head2 $running_num = $queue->running(); |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
Returns the number of currently running tasks in the L object. |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
=head2 $waiting_num = $queue->waiting(); |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
Returns the number of waiting tasks in the L object. |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
=head2 $waiting_num = $queue->length(); |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
Alias for C method. It returns the number of waiting tasks in the L object. |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
=head2 $worker = $queue->worker([$new_worker]); |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
Accessor for the C attribute. |
309
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
=head2 $concurrency = $queue->concurrency([$new_concurrency]); |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
Accessor for the C attribute. |
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
=head2 $saturated = $queue->saturated([$new_saturated]); |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
Accessor for the C attribute. |
317
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
=head2 $empty = $queue->empty([$new_empty]); |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
Accessor for the C attribute. |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=head2 $drain = $queue->drain([$new_drain]); |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
Accessor for the C attribute. |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
=head1 EXAMPLE |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
=head2 Concurrent HTTP downloader |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
use strict; |
331
|
|
|
|
|
|
|
use warnings; |
332
|
|
|
|
|
|
|
use AnyEvent; |
333
|
|
|
|
|
|
|
use AnyEvent::HTTP; |
334
|
|
|
|
|
|
|
use Async::Queue; |
335
|
|
|
|
|
|
|
|
336
|
|
|
|
|
|
|
my $q = Async::Queue->new(concurrency => 3, worker => sub { |
337
|
|
|
|
|
|
|
my ($url, $callback) = @_; |
338
|
|
|
|
|
|
|
print STDERR "Start $url\n"; |
339
|
|
|
|
|
|
|
http_get $url, sub { |
340
|
|
|
|
|
|
|
my ($data, $headers) = @_; |
341
|
|
|
|
|
|
|
print STDERR "End $url\n"; |
342
|
|
|
|
|
|
|
$callback->($data); |
343
|
|
|
|
|
|
|
}; |
344
|
|
|
|
|
|
|
}); |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
my @urls = ( |
347
|
|
|
|
|
|
|
'http://www.debian.org/', |
348
|
|
|
|
|
|
|
'http://www.ubuntu.com/', |
349
|
|
|
|
|
|
|
'http://fedoraproject.org/', |
350
|
|
|
|
|
|
|
'http://www.opensuse.org/', |
351
|
|
|
|
|
|
|
'http://www.centos.org/', |
352
|
|
|
|
|
|
|
'http://www.slackware.com/', |
353
|
|
|
|
|
|
|
'http://www.gentoo.org/', |
354
|
|
|
|
|
|
|
'http://www.archlinux.org/', |
355
|
|
|
|
|
|
|
'http://trisquel.info/', |
356
|
|
|
|
|
|
|
); |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
my %results = (); |
359
|
|
|
|
|
|
|
my $cv = AnyEvent->condvar; |
360
|
|
|
|
|
|
|
foreach my $url (@urls) { |
361
|
|
|
|
|
|
|
$cv->begin(); |
362
|
|
|
|
|
|
|
$q->push($url, sub { |
363
|
|
|
|
|
|
|
my ($data) = @_; |
364
|
|
|
|
|
|
|
$results{$url} = $data; |
365
|
|
|
|
|
|
|
$cv->end(); |
366
|
|
|
|
|
|
|
}); |
367
|
|
|
|
|
|
|
} |
368
|
|
|
|
|
|
|
$cv->recv; |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
foreach my $key (keys %results) { |
371
|
|
|
|
|
|
|
print STDERR "$key: " . length($results{$key}) . "bytes\n"; |
372
|
|
|
|
|
|
|
} |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
This example uses L to send HTTP GET requests for multiple URLs simultaneously. |
375
|
|
|
|
|
|
|
While simultaneous requests dramatically improve efficiency, it may overload the client host |
376
|
|
|
|
|
|
|
and/or the network. |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
This is where L comes in handy. With L you can control the concurrency level |
379
|
|
|
|
|
|
|
of the HTTP sessions (in this case, up to three). |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=head1 SEE ALSO |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
=over |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
=item L |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
The goal of L is the same as that of L: to control concurrency level of asynchronous tasks. |
390
|
|
|
|
|
|
|
The big difference is that L is a queue of subroutines while L is a queue of tasks (data). |
391
|
|
|
|
|
|
|
In L, worker subroutine is registered with the object in advance. |
392
|
|
|
|
|
|
|
In L, it is workers that are pushed to the queue. |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
You can emulate L with L by pushing subroutine references to it as tasks. |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=back |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
=head1 AUTHOR |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
Toshio Ito, C<< >> |
402
|
|
|
|
|
|
|
|
403
|
|
|
|
|
|
|
=head1 REPOSITORY |
404
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
L |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
=head1 BUGS |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
Please report any bugs or feature requests to C, or through |
410
|
|
|
|
|
|
|
the web interface at L. I will be notified, and then you'll |
411
|
|
|
|
|
|
|
automatically be notified of progress on your bug as I make changes. |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
|
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
=head1 SUPPORT |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
You can find documentation for this module with the perldoc command. |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
perldoc Async::Queue |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
|
422
|
|
|
|
|
|
|
You can also look for information at: |
423
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
=over 4 |
425
|
|
|
|
|
|
|
|
426
|
|
|
|
|
|
|
=item * RT: CPAN's request tracker (report bugs here) |
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
L |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
=item * AnnoCPAN: Annotated CPAN documentation |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
L |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
=item * CPAN Ratings |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
L |
437
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
=item * Search CPAN |
439
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
L |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
=back |
443
|
|
|
|
|
|
|
|
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
=head1 LICENSE AND COPYRIGHT |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
Copyright 2012 Toshio Ito. |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
450
|
|
|
|
|
|
|
under the terms of either: the GNU General Public License as published |
451
|
|
|
|
|
|
|
by the Free Software Foundation; or the Artistic License. |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
See http://dev.perl.org/licenses/ for more information. |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
|
456
|
|
|
|
|
|
|
=cut |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
1; # End of Async::Queue |