File Coverage

blib/lib/AnyEvent/Curl/Multi.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package AnyEvent::Curl::Multi;
2              
3 1     1   25054 use common::sense;
  1         11  
  1         5  
4 1     1   64 use base 'Object::Event';
  1         2  
  1         1126  
5 1     1   40397 use Carp qw(croak);
  1         8  
  1         65  
6 1     1   5 use AnyEvent;
  1         2  
  1         31  
7 1     1   473 use WWW::Curl 4.14;
  0            
  0            
8             use WWW::Curl::Easy;
9             use WWW::Curl::Multi;
10             use Scalar::Util qw(refaddr);
11             use HTTP::Response;
12              
13             our $VERSION = '1.1';
14              
15             # Test whether subsecond timeouts are supported.
16             eval { CURLOPT_TIMEOUT_MS(); }; my $MS_TIMEOUT_SUPPORTED = $@ ? 0 : 1;
17              
18             =head1 NAME
19              
20             AnyEvent::Curl::Multi - a fast event-driven HTTP client
21              
22             =head1 SYNOPSIS
23              
24             use AnyEvent;
25             use AnyEvent::Curl::Multi;
26            
27             my $client = AnyEvent::Curl::Multi->new;
28             $client->max_concurrency(10);
29              
30             # Method 1: Object::Event pattern
31             #
32             # Schedule callbacks to be fired when a response is received,
33             # or when an error occurs.
34             $client->reg_cb(response => sub {
35             my ($client, $request, $response, $stats) = @_;
36             # $response is an HTTP::Request object
37             });
38             $client->reg_cb(error => sub {
39             my ($client, $request, $errmsg, $stats) = @_;
40             # ...
41             });
42             my $request = HTTP::Request->new(...);
43             $client->request($request);
44              
45             # Method 2: AnyEvent::CondVar pattern
46             # Do not use this pattern in an existing event loop!
47             my $handle = $client->request($request);
48             eval {
49             my ($response, $stats) = $handle->cv->recv;
50             # $response is an HTTP::Request object
51             # ...
52             };
53             if ($@) {
54             my $errmsg = $@;
55             # ...
56             }
57            
58             =head1 DESCRIPTION
59              
60             This module is an AnyEvent user; you must use and run a supported event loop.
61              
62             AnyEvent::Curl::Multi is an asynchronous, event-driven HTTP client. You can
63             use it to make multiple HTTP requests in parallel using a single process. It
64             uses libcurl for fast performance.
65              
66             =head2 Initializing the client
67              
68             my $client = AnyEvent::Curl::Multi->new;
69              
70             You can specify the maximum number of concurrent requests by setting
71             C, e.g.:
72              
73             my $client = AnyEvent::Curl::Multi->new(max_concurrency => 10);
74              
75             You can also set the maximum concurrency after the client has been created:
76              
77             $client->max_concurrency(10);
78              
79             A value of 0 means no limit will be imposed.
80              
81             You can also set global default behaviors for requests:
82              
83             =over
84              
85             =item timeout => PERIOD
86              
87             Specifies a timeout for each request. If your WWW::Curl is linked against
88             libcurl 7.16.2 or later, this value can be specified in fractional seconds (ms
89             resolution). Otherwise, the value must be specified in whole seconds.
90              
91             =item proxy => HOST[:PORT]
92              
93             Specifies a proxy host/port, separated by a colon. (The port number is
94             optional.)
95              
96             =item max_redirects => COUNT
97              
98             Specifies the maximum number of HTTP redirects that will be followed. Set to
99             0 to disable following redirects.
100              
101             =item ipresolve => 4 | 6
102              
103             Specifies which kind of IP address to select when resolving host names. This
104             is only useful when using host names that resolve to both IPv4 and IPv6
105             addresses. The allowed values are 4 (IPv4) or 6 (IPv6). The default is to
106             resolve to all addresses.
107              
108             =back
109              
110             =head2 Issuing requests
111              
112             To dispatch HTTP requests to the client, use the request() method. request()
113             takes an HTTP::Request object as the first argument, and a list of
114             attribute-value pairs as the remaining arguments:
115            
116             $handle = $client->request($request, ...);
117              
118             The following attributes are accepted:
119              
120             =over
121              
122             =item timeout => PERIOD
123              
124             Specified a timeout for the request. If your WWW::Curl is linked against
125             libcurl 7.16.2 or later, this value can be specified in fractional seconds (ms
126             resolution). Otherwise, the value must be specified in whole seconds.
127              
128             =item proxy => HOST[:PORT]
129              
130             Specifies a proxy host/port, separated by a colon. (The port number is optional.)
131              
132             =item max_redirects => COUNT
133              
134             Specifies the maximum number of HTTP redirects that will be followed. Set to
135             0 to disable following redirects.
136              
137             =back
138            
139             The request() method returns an object of class AnyEvent::Curl::Multi::Handle.
140             This object can be used later to cancel the request; see "Canceling requests",
141             below.
142              
143             Calling $handle->cv() will return an AnyEvent condvar that you can use as usual
144             (e.g., recv() or cb()) to retrieve response results, or that will croak if an
145             error occurs. See L for details on condvars.
146              
147             =head2 Callbacks
148              
149             Instead of using condvars, you may register interest in the following events
150             using the client's reg_cb() method (see Object::Event for more details on
151             reg_cb()):
152              
153             =over
154              
155             =item response => $cb->($client, $request, $response, $stats);
156              
157             Fired when a response is received. (This doesn't imply that the response is
158             HTTP OK, so you should examine the response to determine whether there was
159             an HTTP error of some sort.)
160              
161             The arguments sent to your callback will be the client object, the original
162             request (untampered with), the response (as an HTTP::Response object), and a
163             hashref containing some interesting statistics.
164              
165             =item error => $cb->($client, $request, $errmsg, $stats);
166              
167             Fired when an error is received.
168              
169             The arguments sent to your callback will be the client object, the original
170             request (untampered with), the error message, and a hashref containing some
171             interesting statistics. (If the error was other than a timeout, the statistics
172             may be invalid.)
173              
174             =back
175            
176             =cut
177              
178             sub new {
179             my $class = shift;
180              
181             my $self = $class->SUPER::new(
182             multi_h => WWW::Curl::Multi->new,
183             state => {},
184             timer_w => undef,
185             io_w => {},
186             queue => [],
187             max_concurrency => 0,
188             max_redirects => 0,
189             timeout => undef,
190             proxy => undef,
191             debug => undef,
192             ipresolve => undef,
193             @_
194             );
195              
196             if (! $MS_TIMEOUT_SUPPORTED
197             && $self->{timeout}
198             && $self->{timeout} != int($self->{timeout})) {
199             croak "Subsecond timeout resolution is not supported by your " .
200             "libcurl version. Upgrade to 7.16.2 or later.";
201             }
202              
203             return bless $self, $class;
204             }
205              
206             sub request {
207             my $self = shift;
208             my ($req, %opts) = @_;
209              
210             my $easy_h;
211              
212             if ($req->isa("HTTP::Request")) {
213             # Convert to WWW::Curl::Easy
214             $easy_h = $self->_gen_easy_h($req, %opts);
215             } else {
216             croak "Unsupported request type";
217             }
218              
219             # Initialize easy curl handle
220             my $id = refaddr $easy_h;
221             my ($response, $header);
222             $easy_h->setopt(CURLOPT_WRITEDATA, \$response);
223             $easy_h->setopt(CURLOPT_WRITEHEADER, \$header);
224             $easy_h->setopt(CURLOPT_PRIVATE, $id);
225              
226             my $obj = {
227             easy_h => $easy_h,
228             req => $req,
229             response => \$response,
230             header => \$header,
231             cv => AE::cv,
232             };
233              
234             push @{$self->{queue}}, $obj;
235              
236             $self->_dequeue;
237              
238             return bless $obj, 'AnyEvent::Curl::Multi::Handle';
239             }
240              
241             sub _dequeue {
242             my $self = shift;
243              
244             while ($self->{max_concurrency} == 0 ||
245             scalar keys %{$self->{state}} < $self->{max_concurrency}) {
246             if (my $dequeued = shift @{$self->{queue}}) {
247             $self->{state}->{refaddr($dequeued->{easy_h})} = $dequeued;
248             # Add it to our multi handle
249             $self->{multi_h}->add_handle($dequeued->{easy_h});
250             } else {
251             last;
252             }
253             }
254            
255             # Start our timer
256             $self->{timer_w} = AE::timer(0, 0.5, sub { $self->_perform });
257             }
258              
259             sub _perform {
260             my $self = shift;
261              
262             $self->{multi_h}->perform;
263              
264             while (my ($id, $rv) = $self->{multi_h}->info_read) {
265             if ($id) {
266             my $state = $self->{state}->{$id};
267             my $req = $state->{req};
268             my $easy_h = $state->{easy_h};
269             my $stats = {
270             total_time => $easy_h->getinfo(CURLINFO_TOTAL_TIME),
271             dns_time => $easy_h->getinfo(CURLINFO_NAMELOOKUP_TIME),
272             connect_time => $easy_h->getinfo(CURLINFO_CONNECT_TIME),
273             start_transfer_time =>
274             $easy_h->getinfo(CURLINFO_STARTTRANSFER_TIME),
275             download_bytes =>
276             $easy_h->getinfo(CURLINFO_SIZE_DOWNLOAD),
277             upload_bytes => $easy_h->getinfo(CURLINFO_SIZE_UPLOAD),
278             };
279             if ($rv) {
280             # Error
281             $state->{cv}->croak($easy_h->errbuf);
282             $req->event('error', $easy_h->errbuf, $stats)
283             if $req->can('event');
284             $self->event('error', $req, $easy_h->errbuf, $stats);
285             } else {
286             # libcurl appends subsequent response headers to the buffer
287             # when following redirects. We need to remove all but the
288             # most recent header before we parse the response.
289             my $last_header = (split(/\r?\n\r?\n/,
290             ${$state->{header}}))[-1];
291             my $response = HTTP::Response->parse($last_header .
292             "\n\n" .
293             ${$state->{response}});
294             $req->uri($easy_h->getinfo(CURLINFO_EFFECTIVE_URL));
295             $response->request($req);
296             $state->{cv}->send($response, $stats);
297             $req->event('response', $response, $stats)
298             if $req->can('event');
299             $self->event('response', $req, $response, $stats);
300             }
301             delete $self->{state}->{$id};
302             $self->_dequeue;
303             }
304             }
305              
306             # We must recalculate the number of active handles here, because
307             # a user-provided callback may have added a new one.
308             my $active_handles = scalar keys %{$self->{state}};
309             if (! $active_handles) {
310             # Nothing left to do - no point keeping the watchers around anymore.
311             delete $self->{timer_w};
312             delete $self->{io_w};
313             return;
314             }
315              
316             # Re-establish all I/O watchers
317             foreach my $fd (keys %{$self->{io_w}}) {
318             delete $self->{io_w}->{$fd};
319             }
320              
321             my ($readfds, $writefds, $errfds) = $self->{multi_h}->fdset;
322              
323             foreach my $fd (@$writefds) {
324             $self->{io_w}->{$fd} ||= AE::io($fd, 1, sub { $self->_perform });
325             }
326             foreach my $fd (@$readfds) {
327             $self->{io_w}->{$fd} ||= AE::io($fd, 0, sub { $self->_perform });
328             }
329             }
330              
331             sub _gen_easy_h {
332             my $self = shift;
333             my $req = shift;
334             my %opts = @_;
335              
336             my $easy_h = WWW::Curl::Easy->new;
337             $easy_h->setopt(CURLOPT_URL, $req->uri);
338              
339             $easy_h->setopt(CURLOPT_SSL_VERIFYPEER, 0);
340             $easy_h->setopt(CURLOPT_DNS_CACHE_TIMEOUT, 0);
341              
342             if (defined $self->{ipresolve}) {
343             if (int($self->{ipresolve}) == 4) {
344             $easy_h->setopt(CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
345             } elsif (int($self->{ipresolve}) == 6) {
346             $easy_h->setopt(CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V6);
347             } else {
348             die "Invalid ipresolve setting '$self->{ipresolve}' (must be 4 or 6)";
349             }
350             }
351              
352             $easy_h->setopt(CURLOPT_CUSTOMREQUEST, $req->method);
353             $easy_h->setopt(CURLOPT_HTTPHEADER,
354             [ split "\n", $req->headers->as_string ]);
355             if (length $req->content) {
356             $easy_h->setopt(CURLOPT_POSTFIELDS, $req->content);
357             $easy_h->setopt(CURLOPT_POSTFIELDSIZE, length $req->content);
358             }
359              
360             # Accept gzip or deflate-compressed responses
361             $easy_h->setopt(CURLOPT_ENCODING, "");
362              
363             $easy_h->setopt(CURLOPT_VERBOSE, 1) if $self->{debug} || $opts{debug};
364              
365             my $proxy = $self->{proxy} || $opts{proxy};
366             $easy_h->setopt(CURLOPT_PROXY, $proxy) if $proxy;
367              
368             my $timeout = $self->{timeout} || $opts{timeout};
369              
370             if ($timeout) {
371             if ($timeout == int($timeout)) {
372             $easy_h->setopt(CURLOPT_TIMEOUT, $timeout);
373             } else {
374             croak "Subsecond timeout resolution is not supported by your " .
375             "libcurl version. Upgrade to 7.16.2 or later."
376             unless $MS_TIMEOUT_SUPPORTED;
377             $easy_h->setopt(CURLOPT_TIMEOUT_MS(), $timeout * 1000);
378             }
379             }
380              
381             my $max_redirects = defined $opts{max_redirects} ? $opts{max_redirects}
382             : $self->{max_redirects};
383              
384             if ($max_redirects > 0) {
385             $easy_h->setopt(CURLOPT_FOLLOWLOCATION, 1);
386             $easy_h->setopt(CURLOPT_MAXREDIRS, $max_redirects);
387             }
388              
389             return $easy_h;
390             }
391              
392             =head2 Canceling requests
393              
394             To cancel a request, use the cancel() method:
395              
396             my $handle = $client->request(...);
397              
398             # Later...
399             $client->cancel($handle);
400              
401             =cut
402              
403             sub cancel {
404             my $self = shift;
405             my $obj = shift;
406              
407             croak "Missing object" unless $obj;
408              
409             $self->{multi_h}->remove_handle($obj->{easy_h});
410             delete $self->{state}->{refaddr($obj->{easy_h})};
411             undef $obj;
412             $self->_dequeue;
413             }
414              
415             sub max_concurrency {
416             my $self = shift;
417             if (defined(my $conc = shift)) {
418             $self->{max_concurrency} = $conc;
419             }
420             return $self->{max_concurrency};
421             }
422              
423             package AnyEvent::Curl::Multi::Handle;
424              
425             sub cv { shift->{cv} }
426              
427             1;
428              
429             =head1 NOTES
430              
431             B There are some bugs in prior
432             versions pertaining to host resolution and accurate timeouts. In addition,
433             subsecond timeouts were not available prior to version 7.16.2.
434              
435             B Otherwise, the DNS
436             resolution phase that occurs at the beginning of each request will block your
437             program, which could significantly compromise its concurrency. (You can verify
438             whether your libcurl has been built with c-ares support by running C
439             and looking for "AsynchDNS" in the features list.)
440              
441             libcurl's internal hostname resolution cache is disabled by this module (among
442             other problems, it does not honor DNS TTL values). If you need fast hostname
443             resolution, consider installing and configuring a local DNS cache such as BIND
444             or dnscache (part of djbdns).
445              
446             SSL peer verification is disabled. If you consider this a serious problem,
447             please contact the author.
448              
449             =head1 SEE ALSO
450              
451             L, L, L, L,
452             L
453              
454             =head1 AUTHORS AND CONTRIBUTORS
455              
456             Michael S. Fischer (L) released the original version
457             and is the current maintainer.
458              
459             =head1 COPYRIGHT AND LICENSE
460              
461             (C) 2010-2011 Michael S. Fischer.
462             (C) 2010-2011 Yahoo! Inc.
463              
464             This program is free software; you can redistribute it and/or modify it
465             under the terms of either: the GNU General Public License as published
466             by the Free Software Foundation; or the Artistic License.
467              
468             See http://dev.perl.org/licenses/ for more information.
469              
470             =cut
471              
472             __END__