File Coverage

blib/lib/HTTP/Async.pm
Criterion Covered Total %
statement 309 336 91.9
branch 82 116 70.6
condition 48 80 60.0
subroutine 46 53 86.7
pod 27 27 100.0
total 512 612 83.6


line stmt bran cond sub pod time code
1 33     33   944506 use strict;
  33         47  
  33         799  
2 33     33   105 use warnings;
  33         37  
  33         1299  
3              
4             package HTTP::Async;
5              
6             our $VERSION = '0.33';
7              
8 33     33   115 use Carp;
  33         45  
  33         1594  
9 33     33   16175 use Data::Dumper;
  33         151325  
  33         1639  
10 33     33   13274 use HTTP::Response;
  33         224255  
  33         797  
11 33     33   14138 use IO::Select;
  33         36825  
  33         1388  
12 33     33   13271 use Net::HTTP::NB;
  33         987612  
  33         286  
13 33     33   12961 use Net::HTTP;
  33         45  
  33         202  
14 33     33   20158 use URI;
  33         56  
  33         673  
15 33     33   13937 use Time::HiRes qw( time sleep );
  33         28842  
  33         123  
16              
17             =head1 NAME
18              
19             HTTP::Async - process multiple HTTP requests in parallel without blocking.
20              
21             =head1 SYNOPSIS
22              
23             Create an object and add some requests to it:
24              
25             use HTTP::Async;
26             my $async = HTTP::Async->new;
27            
28             # create some requests and add them to the queue.
29             $async->add( HTTP::Request->new( GET => 'http://www.perl.org/' ) );
30             $async->add( HTTP::Request->new( GET => 'http://www.ecclestoad.co.uk/' ) );
31              
32             and then EITHER process the responses as they come back:
33              
34             while ( my $response = $async->wait_for_next_response ) {
35             # Do some processing with $response
36             }
37            
38             OR do something else if there is no response ready:
39            
40             while ( $async->not_empty ) {
41             if ( my $response = $async->next_response ) {
42             # deal with $response
43             } else {
44             # do something else
45             }
46             }
47              
48             OR just use the async object to fetch stuff in the background and deal with
49             the responses at the end.
50              
51             # Do some long code...
52             for ( 1 .. 100 ) {
53             some_function();
54             $async->poke; # lets it check for incoming data.
55             }
56              
57             while ( my $response = $async->wait_for_next_response ) {
58             # Do some processing with $response
59             }
60              
61             =head1 DESCRIPTION
62              
63             Although using the conventional C is fast and easy it does
64             have some drawbacks - the code execution blocks until the request has been
65             completed and it is only possible to process one request at a time.
66             C attempts to address these limitations.
67              
68             It gives you a 'Async' object that you can add requests to, and then get the
69             requests off as they finish. The actual sending and receiving of the requests
70             is abstracted. As soon as you add a request it is transmitted, if there are
71             too many requests in progress at the moment they are queued. There is no
72             concept of starting or stopping - it runs continuously.
73              
74             Whilst it is waiting to receive data it returns control to the code that
75             called it meaning that you can carry out processing whilst fetching data from
76             the network. All without forking or threading - it is actually done using
77             C
78              
79             =head1 Default settings:
80              
81             There are a number of default settings that should be suitable for most uses.
82             However in some circumstances you might wish to change these.
83              
84             slots: 20
85             timeout: 180 (seconds)
86             max_request_time: 300 (seconds)
87             max_redirect: 7
88             poll_interval: 0.05 (seconds)
89             proxy_host: ''
90             proxy_port: ''
91             local_addr: ''
92             local_port: ''
93             ssl_options: {}
94             cookie_jar: undef
95             peer_addr: ''
96              
97             If defined, is expected to be similar to C, with extract_cookies and add_cookie_header methods.
98              
99             The option max_redirects has been renamed to max_redirect to be consistent with LWP::UserAgent, although max_redirects still works.
100              
101            
102             =head1 METHODS
103              
104             =head2 new
105              
106             my $async = HTTP::Async->new( %args );
107              
108             Creates a new HTTP::Async object and sets it up. Variations from the default
109             can be set by passing them in as C<%args>.
110              
111             =cut
112              
113             sub new {
114 30     30 1 378341 my $class = shift;
115 30         812 my $self = bless {
116              
117             opts => {
118             slots => 20,
119             max_redirect => 7,
120             timeout => 180,
121             max_request_time => 300,
122             poll_interval => 0.05,
123             cookie_jar => undef,
124             },
125              
126             id_opts => {},
127              
128             to_send => [],
129             in_progress => {},
130             to_return => [],
131              
132             current_id => 0,
133             fileno_to_id => {},
134             }, $class;
135              
136 30         185 $self->_init(@_);
137              
138 29         68 return $self;
139             }
140              
141             sub _init {
142 36     36   74 my $self = shift;
143 36         93 my %args = @_;
144 36         177 $self->_set_opt( $_ => $args{$_} ) for sort keys %args;
145 35         70 return $self;
146             }
147              
148 39     39   355 sub _next_id { return ++$_[0]->{current_id} }
149              
150             =head2 slots, timeout, max_request_time, poll_interval, max_redirect, proxy_host, proxy_port, local_addr, local_port, ssl_options, cookie_jar, peer_addr
151              
152             $old_value = $async->slots;
153             $new_value = $async->slots( $new_value );
154              
155             Get/setters for the C<$async> objects config settings. Timeout is for
156             inactivity and is in seconds.
157              
158             Slots is the maximum number of parallel requests to make.
159              
160             =cut
161              
162             my %GET_SET_KEYS = map { $_ => 1 } qw( slots poll_interval
163             timeout max_request_time max_redirect
164             proxy_host proxy_port local_addr local_port ssl_options cookie_jar peer_addr);
165              
166             sub _add_get_set_key {
167 6     6   5 my $class = shift;
168 6         6 my $key = shift;
169 6         12 $GET_SET_KEYS{$key} = 1;
170             }
171              
172             my %KEY_ALIASES = ( max_redirects => 'max_redirect' );
173              
174             sub _get_opt {
175 1754     1754   1898 my $self = shift;
176 1754         2010 my $key = shift;
177 1754         1627 my $id = shift;
178              
179 1754 50       3366 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
180              
181 1754 50       3215 die "$key not valid for _get_opt" unless $GET_SET_KEYS{$key};
182              
183             # If there is an option set for this id then use that, otherwise fall back
184             # to the defaults.
185             return $self->{id_opts}{$id}{$key}
186 1754 100 100     5175 if $id && defined $self->{id_opts}{$id}{$key};
187              
188 1747         34838237 return $self->{opts}{$key};
189              
190             }
191              
192             sub _set_opt {
193 24     24   80 my $self = shift;
194 24         38 my $key = shift;
195              
196 24 100       75 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
197              
198 24 100       69 die "$key not valid for _set_opt" unless $GET_SET_KEYS{$key};
199 23 50       108 $self->{opts}{$key} = shift if @_;
200 23         73 return $self->{opts}{$key};
201             }
202              
203             foreach my $key ( keys %GET_SET_KEYS ) {
204 1 50   1 1 107 eval "
  1 0   0 1 16  
  0 0   0 1 0  
  0 100   18 1 0  
  0 100   25 1 0  
  0 0   0 1 0  
  18 100   657 1 4835  
  18 0   0 1 82  
  25 0   0 1 39  
  25 50   39 1 102  
  0 0   0 1 0  
  0 100   4 1 0  
  657         1088  
  657         3262  
  0         0  
  0         0  
  0         0  
  0         0  
  39         68  
  39         226  
  0         0  
  0         0  
  4         3126  
  4         34  
205             sub $key {
206             my \$self = shift;
207             return scalar \@_
208             ? \$self->_set_opt( '$key', \@_ )
209             : \$self->_get_opt( '$key' );
210             }
211             ";
212             }
213              
214             =head2 add
215              
216             my @ids = $async->add(@requests);
217             my $first_id = $async->add(@requests);
218              
219             Adds requests to the queues. Each request is given an unique integer id (for
220             this C<$async>) that can be used to track the requests if needed. If called in
221             list context an array of ids is returned, in scalar context the id of the
222             first request added is returned.
223              
224             =cut
225              
226             sub add {
227 29     29 1 235404 my $self = shift;
228 29         68 my @returns = ();
229              
230 29         159 foreach my $req (@_) {
231 36         155 push @returns, $self->add_with_opts( $req, {} );
232             }
233              
234 29 50       228 return wantarray ? @returns : $returns[0];
235             }
236              
237             =head2 add_with_opts
238              
239             my $id = $async->add_with_opts( $request, \%opts );
240              
241             This method lets you add a single request to the queue with options that
242             differ from the defaults. For example you might wish to set a longer timeout
243             or to use a specific proxy. Returns the id of the request.
244              
245             The method croaks when passed an invalid option.
246              
247             =cut
248              
249             sub add_with_opts {
250 33     33 1 8582 my $self = shift;
251 33         44 my $req = shift;
252 33         41 my $opts = shift;
253              
254 33         43 for my $key (keys %{$opts}) {
  33         136  
255 7 100       158 croak "$key not valid for add_with_opts" unless $GET_SET_KEYS{$key};
256             }
257              
258 32         109 my $id = $self->_next_id;
259              
260 32         42 push @{ $$self{to_send} }, [ $req, $id ];
  32         98  
261 32         199 $self->{id_opts}{$id} = $opts;
262 32         75 $self->poke;
263              
264 32         96 return $id;
265             }
266              
267             =head2 poke
268              
269             $async->poke;
270              
271             At fairly frequent intervals some housekeeping needs to performed - such as
272             reading received data and starting new requests. Calling C lets the
273             object do this and then return quickly. Usually you will not need to use this
274             as most other methods do it for you.
275              
276             You should use C if your code is spending time elsewhere (ie not using
277             the async object) to allow it to keep the data flowing over the network. If it
278             is not used then the buffers may fill up and completed responses will not be
279             replaced with new requests.
280              
281             =cut
282              
283             sub poke {
284 8307     8307 1 10511 my $self = shift;
285              
286 8307         10033 $self->_process_in_progress;
287 8307         10060 $self->_process_to_send;
288              
289 8307         7263 return 1;
290             }
291              
292             =head2 next_response
293              
294             my $response = $async->next_response;
295             my ( $response, $id ) = $async->next_response;
296              
297             Returns the next response (as a L object) that is waiting, or
298             returns undef if there is none. In list context it returns a (response, id)
299             pair, or an empty list if none. Does not wait for a response so returns very
300             quickly.
301              
302             =cut
303              
304             sub next_response {
305 11     11 1 16 my $self = shift;
306 11         88 return $self->_next_response(0);
307             }
308              
309             =head2 wait_for_next_response
310              
311             my $response = $async->wait_for_next_response( 3.5 );
312             my ( $response, $id ) = $async->wait_for_next_response( 3.5 );
313              
314             As C but only returns if there is a next response or the time
315             in seconds passed in has elapsed. If no time is given then it blocks. Whilst
316             waiting it checks the queues every c seconds. The times can be
317             fractional seconds.
318              
319             =cut
320              
321             sub wait_for_next_response {
322 31     31 1 6242 my $self = shift;
323 31         41 my $wait_for = shift;
324              
325 31 100       705 $wait_for = $self->max_request_time
326             if !defined $wait_for;
327              
328 31         96 return $self->_next_response($wait_for);
329             }
330              
331             sub _next_response {
332 42     42   51 my $self = shift;
333 42   100     133 my $wait_for = shift || 0;
334 42         138 my $end_time = time + $wait_for;
335 42         51 my $resp_and_id = undef;
336              
337 42         98 while ( !$self->empty ) {
338 694         808 $resp_and_id = shift @{ $$self{to_return} };
  694         1282  
339              
340             # last if we have a response or we have run out of time.
341             last
342 694 100 100     3849 if $resp_and_id
343             || time > $end_time;
344              
345             # sleep for the default sleep time.
346             # warn "sleeping for " . $self->poll_interval;
347 654         30802 sleep $self->poll_interval;
348             }
349              
350             # If there is no result return false.
351 42 100       158 return unless $resp_and_id;
352              
353             # We have a response - delete the options for it from the store.
354 33         94 delete $self->{id_opts}{ $resp_and_id->[1] };
355              
356             # If we have a result return list or response depending on
357             # context.
358             return wantarray
359 33 50       197 ? @$resp_and_id
360             : $resp_and_id->[0];
361             }
362              
363             =head2 to_send_count
364              
365             my $pending = $async->to_send_count;
366              
367             Returns the number of items which have been added but have not yet started being processed.
368              
369             =cut
370              
371             sub to_send_count {
372 542     542 1 753 my $self = shift;
373 542         1502 $self->poke;
374 542         502 return scalar @{ $self->{to_send} };
  542         2118  
375             }
376              
377             =head2 to_return_count
378              
379             my $completed = $async->to_return_count;
380              
381             Returns the number of items which have completed transferring, and are waiting to be returned by next_response().
382              
383             =cut
384              
385             sub to_return_count {
386 3767     3767 1 2788 my $self = shift;
387 3767         3732 $self->poke;
388 3767         2575 return scalar @{ $self->{to_return} };
  3767         4490  
389             }
390              
391             =head2 in_progress_count
392              
393             my $running = $async->in_progress_count;
394              
395             Returns the number of items which are currently being processed asynchronously.
396              
397             =cut
398              
399             sub in_progress_count {
400 744     744 1 798 my $self = shift;
401 744         1261 $self->poke;
402 744         692 return scalar keys %{ $self->{in_progress} };
  744         2495  
403             }
404              
405             =head2 total_count
406              
407             my $total = $async->total_count;
408              
409             Returns the sum of the to_send_count, in_progress_count and to_return_count.
410              
411             This should be the total number of items which have been added that have not yet been returned by next_response().
412              
413             =cut
414              
415             sub total_count {
416 741     741 1 2229 my $self = shift;
417              
418 741         2703 my $count = 0 #
419             + $self->to_send_count #
420             + $self->in_progress_count #
421             + $self->to_return_count;
422              
423 741         2882 return $count;
424             }
425              
426             =head2 info
427              
428             print $async->info;
429              
430             Prints a line describing what the current state is.
431              
432             =cut
433              
434             sub info {
435 0     0 1 0 my $self = shift;
436              
437 0         0 return sprintf(
438             "HTTP::Async status: %4u,%4u,%4u (send, progress, return)\n",
439             $self->to_send_count, #
440             $self->in_progress_count, #
441             $self->to_return_count
442             );
443             }
444              
445             =head2 remove
446              
447             $async->remove($id);
448             my $success = $async->remove($id);
449              
450             Removes the item with the given id no matter which state it is currently in. Returns true if an item is removed, and false otherwise.
451              
452             =cut
453              
454             sub remove {
455 8     8 1 1093 my $self = shift;
456 8         8 my $id = shift;
457              
458 8         13 my $hashref = delete $self->{in_progress}{$id};
459 8 100       21 if (!$hashref) {
460 5         20 for my $list ('to_send', 'to_return') {
461 10         8 my ($r_and_id) = grep { $_->[1] eq $id } @{ $self->{$list} };
  3         11  
  10         24  
462 10         7 $hashref = $r_and_id->[0];
463 10 100       24 if ($hashref) {
464 3         9 @{ $self->{$list} }
465 3         3 = grep { $_->[1] ne $id } @{ $self->{$list} };
  3         6  
  3         5  
466             }
467             }
468             }
469 8 100       26 return if !$hashref;
470              
471 6         8 my $s = $hashref->{handle};
472 6         11 $self->_io_select->remove($s);
473 6         137 delete $self->{id_opts}{$id};
474              
475 6         183 return 1;
476             }
477              
478             =head2 remove_all
479              
480             $async->remove_all;
481             my $success = $async->remove_all;
482              
483             Removes all items no matter what states they are currently in. Returns true if any items are removed, and false otherwise.
484              
485             =cut
486              
487             sub remove_all {
488 2     2 1 2 my $self = shift;
489 2 100       5 return if $self->empty;
490              
491             my @ids = (
492 0         0 (map { $_->[1] } @{ $self->{to_send} }),
  1         6  
493 1         3 (keys %{ $self->{in_progress} }),
494 1         4 (map { $_->[1] } @{ $self->{to_return} }),
  1         26  
  1         2  
495             );
496              
497 1         7 for my $id (@ids) {
498 2         5 $self->remove($id);
499             }
500              
501 1         5 return 1;
502             }
503              
504             =head2 empty, not_empty
505              
506             while ( $async->not_empty ) { ...; }
507             while (1) { ...; last if $async->empty; }
508              
509             Returns true or false depending on whether there are request or responses
510             still on the object.
511              
512             =cut
513              
514             sub empty {
515 701     701 1 2324 my $self = shift;
516 701 100       2543 return $self->total_count ? 0 : 1;
517             }
518              
519             sub not_empty {
520 3     3 1 2751 my $self = shift;
521 3         8 return !$self->empty;
522             }
523              
524             =head2 DESTROY
525              
526             The destroy method croaks if an object is destroyed but is not empty. This is
527             to help with debugging.
528              
529             =cut
530              
531             sub DESTROY {
532 30     30   9364814 my $self = shift;
533 30         55 my $class = ref $self;
534              
535 30 50       95 carp "$class object destroyed but still in use"
536             if $self->total_count;
537              
538             carp "$class INTERNAL ERROR: 'id_opts' not empty"
539 30 50       37 if scalar keys %{ $self->{id_opts} };
  30         125  
540              
541 30         989 return;
542             }
543              
544             # Go through all the values on the select list and check to see if
545             # they have been fully received yet.
546              
547             sub _process_in_progress {
548 8307     8307   5696 my $self = shift;
549 8307         8319 my %seen_ids = ();
550              
551             HANDLE:
552 8307         9448 foreach my $s ( $self->_io_select->can_read(0) ) {
553              
554             # Get the id and add it to the hash of seen ids so we don't check it
555             # later for errors.
556 232   50     6274 my $id = $self->{fileno_to_id}{ $s->fileno }
557             || die "INTERNAL ERROR: could not got id for fileno";
558 232         1778 $seen_ids{$id}++;
559              
560 232         334 my $hashref = $$self{in_progress}{$id};
561 232   100     656 my $tmp = $hashref->{tmp} ||= {};
562              
563             # warn Dumper $hashref;
564              
565             # Check that we have not timed-out.
566 232 100 66     1479 if ( time > $hashref->{timeout_at}
567             || time > $hashref->{finish_by} )
568             {
569              
570             # warn sprintf "Timeout: %.3f > %.3f", #
571             # time, $hashref->{timeout_at};
572              
573             $self->_add_error_response_to_return(
574             id => $id,
575             code => 504,
576             request => $hashref->{request},
577             previous => $hashref->{previous},
578 1         11 content => 'Timed out',
579             );
580              
581 1         6 $self->_io_select->remove($s);
582 1         50 delete $$self{fileno_to_id}{ $s->fileno };
583 1         16 next HANDLE;
584             }
585              
586             # If there is a code then read the body.
587 231 100       484 if ( $$tmp{code} ) {
588 169         292 my $buf;
589 169         499 my $n = $s->read_entity_body( $buf, 1024 * 16 ); # 16kB
590 169 100       9067 $$tmp{is_complete} = 1 unless $n;
591 169         379 $$tmp{content} .= $buf;
592              
593             # warn "Received " . length( $buf ) ;
594              
595             # warn $buf;
596             }
597              
598             # If no code try to read the headers.
599             else {
600 62         388 $s->flush;
601              
602 62         81 my ( $code, $message, %headers );
603              
604 62         101 eval {
605 62         282 ( $code, $message, %headers ) =
606             $s->read_response_headers( laxed => 1, junk_out => [] );
607             };
608              
609 62 100       13486 if ($@) {
610             $self->_add_error_response_to_return(
611             'code' => 504,
612             'content' => $@,
613             'id' => $id,
614             'request' => $hashref->{request},
615             'previous' => $hashref->{previous}
616 1         8 );
617 1         3 $self->_io_select->remove($s);
618 1         36 delete $$self{fileno_to_id}{ $s->fileno };
619 1         8 next HANDLE;
620             }
621              
622 61 50       159 if ($code) {
623              
624             # warn "Got headers: $code $message " . time;
625              
626 61         155 $$tmp{code} = $code;
627 61         131 $$tmp{message} = $message;
628 61         163 my @headers_array = map { $_, $headers{$_} } keys %headers;
  233         361  
629 61         215 $$tmp{headers} = \@headers_array;
630              
631             }
632             }
633              
634             # Reset the timeout.
635 230         683 $hashref->{timeout_at} = time + $self->_get_opt( 'timeout', $id );
636             # warn "recieved - timeout set to '$hashref->{timeout_at}'";
637              
638             # If the message is complete then create a request and add it
639             # to 'to_return';
640 230 100       561 if ( $$tmp{is_complete} ) {
641 59         138 delete $$self{fileno_to_id}{ $s->fileno };
642 59         307 $self->_io_select->remove($s);
643              
644             # warn Dumper $$hashref{content};
645              
646             my $response = HTTP::Response->new(
647 59         2359 @$tmp{ 'code', 'message', 'headers', 'content' } );
648              
649 59         8564 $response->request( $hashref->{request} );
650 59 100       435 $response->previous( $hashref->{previous} ) if $hashref->{previous};
651              
652             # Deal with cookies
653 59         231 my $jar = $self->_get_opt('cookie_jar', $id);
654 59 100       126 if ($jar) {
655 1         14 $jar->extract_cookies($response);
656             }
657              
658             # If it was a redirect and there are still redirects left
659             # create a new request and unshift it onto the 'to_send'
660             # array.
661             # Only redirect GET and HEAD as per RFC 2616.
662             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
663 59         837 my $code = $response->code;
664 59         352 my $get_or_head = $response->request->method =~ m{^(?:GET|HEAD)$};
665              
666 59 100 100     932 if (
      33        
      66        
      66        
      66        
667             $response->is_redirect # is a redirect
668             && $hashref->{redirects_left} > 0 # and we still want to follow
669             && ($get_or_head || $code !~ m{^30[127]$}) # must be GET or HEAD if it's 301, 302 or 307
670             && $code != 304 # not a 'not modified' reponse
671             && $code != 305 # not a 'use proxy' response
672             )
673             {
674              
675 30         379 $hashref->{redirects_left}--;
676              
677 30         73 my $loc = $response->header('Location');
678 30         904 my $uri = $response->request->uri;
679              
680 30 50 33     302 warn "Problem: " . Dumper( { loc => $loc, uri => $uri } )
      33        
      33        
681             unless $uri && ref $uri && $loc && !ref $loc;
682              
683 30         342 my $url = _make_url_absolute( url => $loc, ref => $uri );
684              
685 30         4605 my $request = $response->request->clone;
686 30         2792 $request->uri($url);
687              
688             # These headers should never be forwarded
689 30         1446 $request->remove_header('Host', 'Cookie');
690              
691             # Don't leak private information.
692             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec15.html#sec15.1.3
693 30 0 33     562 if ($request->header('Referer') &&
      33        
694             $hashref->{request}->uri->scheme eq 'https' &&
695             $request->uri->scheme eq 'http') {
696              
697 0         0 $request->remove_header('Referer');
698             }
699              
700             # See Other should use GET
701 30 50 33     725 if ($code == 303 && !$get_or_head) {
702 0         0 $request->method('GET');
703 0         0 $request->content('');
704 0         0 $request->remove_content_headers;
705             }
706              
707 30         83 $self->_send_request( [ $request, $id ] );
708 30         61 $hashref->{previous} = $response;
709             }
710             else {
711 29         481 $self->_add_to_return_queue( [ $response, $id ] );
712 29         83 delete $$self{in_progress}{$id};
713             }
714              
715 59         311 delete $hashref->{tmp};
716             }
717             }
718              
719             # warn Dumper(
720             # {
721             # in_progress => $self->{in_progress},
722             # seen_ids => \%seen_ids,
723             # }
724             # );
725              
726 8307         50828 foreach my $id ( keys %{ $self->{in_progress} } ) {
  8307         13594  
727              
728             # skip this one if it was processed above.
729 7522 100       10063 next if $seen_ids{$id};
730              
731 7321         5800 my $hashref = $self->{in_progress}{$id};
732              
733 7321 100 100     30126 if ( time > $hashref->{timeout_at}
734             || time > $hashref->{finish_by} )
735             {
736              
737             # warn Dumper( { hashref => $hashref, now => time } );
738              
739             # we have a request that has timed out - handle it
740             $self->_add_error_response_to_return(
741             id => $id,
742             code => 504,
743             request => $hashref->{request},
744             previous => $hashref->{previous},
745 3         32 content => 'Timed out',
746             );
747              
748 3         8 my $s = $hashref->{handle};
749 3         9 $self->_io_select->remove($s);
750 3         146 delete $$self{fileno_to_id}{ $s->fileno };
751             }
752             }
753              
754 8307         8669 return 1;
755             }
756              
757             sub _add_to_return_queue {
758 36     36   52 my $self = shift;
759 36         67 my $req_and_id = shift;
760 36         47 push @{ $$self{to_return} }, $req_and_id;
  36         94  
761 36         91 return 1;
762             }
763              
764             # Add all the items waiting to be sent to 'to_send' up to the 'slots'
765             # limit.
766              
767             sub _process_to_send {
768 8307     8307   6083 my $self = shift;
769              
770 8307   66     5760 while ( scalar @{ $$self{to_send} }
  8346         17030  
771 39         174 && $self->slots > scalar keys %{ $$self{in_progress} } )
772             {
773 39         180 $self->_send_request( shift @{ $$self{to_send} } );
  39         142  
774             }
775              
776 8307         6524 return 1;
777             }
778              
779             sub _send_request {
780 69     69   78 my $self = shift;
781 69         60 my $r_and_id = shift;
782 69         92 my ( $request, $id ) = @$r_and_id;
783              
784 69         182 my $uri = URI->new( $request->uri );
785              
786 69         4250 my %args = ();
787              
788             # Get cookies from jar if one exists
789 69         156 my $jar = $self->_get_opt('cookie_jar', $id);
790 69 100       136 if ($jar) {
791 1         10 $jar->add_cookie_header($request);
792             }
793              
794             # We need to use a different request_uri for proxied requests. Decide to use
795             # this if a proxy port or host is set.
796             #
797             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
798 69         608 $args{Host} = $uri->host;
799 69         2084 $args{PeerAddr} = $self->_get_opt( 'proxy_host', $id );
800 69         126 $args{PeerPort} = $self->_get_opt( 'proxy_port', $id );
801 69         123 $args{LocalAddr} = $self->_get_opt('local_addr', $id );
802 69         119 $args{LocalPort} = $self->_get_opt('local_port', $id );
803              
804             # https://rt.cpan.org/Public/Bug/Display.html?id=33071
805 69         134 $args{Timeout} = $self->_get_opt( 'timeout', $id);
806              
807             # ACF - Pass ssl_options through
808 69         116 $args{ssl_opts} = $self->_get_opt( 'ssl_options', $id);
809              
810             my $request_is_to_proxy =
811             ( $args{PeerAddr} || $args{PeerPort} ) # if either are set...
812 69 100 66     295 ? 1 # ...then we are a proxy request
813             : 0; # ...otherwise not
814              
815             # If we did not get a setting from the proxy then use the uri values.
816              
817 69   66     257 $args{PeerAddr} ||= $uri->host;
818 69         1134 my $peer_address = $self->_get_opt('peer_addr', $id );
819 69 100       128 if($peer_address) {
820 1         2 $args{PeerAddr} = $peer_address;
821             }
822              
823 69   66     300 $args{PeerPort} ||= $uri->port;
824              
825 69         1182 my $net_http_class = 'Net::HTTP::NB';
826 69 50 33     231 if ($uri->scheme and $uri->scheme eq 'https' and not $request_is_to_proxy) {
    50 33        
      33        
      33        
827 0         0 $net_http_class = 'Net::HTTPS::NB';
828 0         0 eval {
829 0         0 require Net::HTTPS::NB;
830 0         0 Net::HTTPS::NB->import();
831             };
832 0 0       0 die "$net_http_class must be installed for https support" if $@;
833              
834             # Add SSL options, if any, to args
835 0         0 my $ssl_options = $self->_get_opt('ssl_options');
836 0 0       0 @args{ keys %$ssl_options } = values %$ssl_options if $ssl_options;
837             }
838             elsif($uri->scheme and $uri->scheme eq 'https' and $request_is_to_proxy) {
839             # We are making an HTTPS request through an HTTP proxy such as squid.
840             # The proxy will handle the HTTPS, we need to connect to it via HTTP
841             # and then make a request where the https is clear from the scheme...
842             $args{Host} = sprintf(
843             '%s:%s',
844 0         0 delete @args{'PeerAddr', 'PeerPort'}
845             );
846             }
847 69         3033 my $s = eval { $net_http_class->new(%args) };
  69         783  
848              
849             # We could not create a request - fake up a 503 response with
850             # error as content.
851 69 100       123060 if ( !$s ) {
852              
853             $self->_add_error_response_to_return(
854             id => $id,
855             code => 503,
856             request => $request,
857             previous => $$self{in_progress}{$id}{previous},
858 2         32 content => $@,
859             );
860              
861 2         49 return 1;
862             }
863              
864 67         82 my %headers;
865 67         371 for my $key ($request->{_headers}->header_field_names) {
866 1         27 $headers{$key} = $request->header($key);
867             }
868              
869             # Decide what to use as the request_uri
870 67 100       905 my $request_uri = $request_is_to_proxy # is this a proxy request....
871             ? $uri->as_string # ... if so use full url
872             : _strip_host_from_uri($uri); # ...else strip off scheme, host and port
873              
874 67 50       214 croak "Could not write request to $uri '$!'"
875             unless $s->write_request( $request->method, $request_uri, %headers,
876             $request->content );
877              
878 67         15268 $self->_io_select->add($s);
879              
880 67         2205 my $time = time;
881 67   100     319 my $entry = $$self{in_progress}{$id} ||= {};
882              
883 67         221 $$self{fileno_to_id}{ $s->fileno } = $id;
884              
885 67         387 $entry->{request} = $request;
886 67         96 $entry->{started_at} = $time;
887              
888            
889 67         128 $entry->{timeout_at} = $time + $self->_get_opt( 'timeout', $id );
890             # warn "sent - timeout set to '$entry->{timeout_at}'";
891              
892 67         127 $entry->{finish_by} = $time + $self->_get_opt( 'max_request_time', $id );
893 67         138 $entry->{handle} = $s;
894              
895             $entry->{redirects_left} = $self->_get_opt( 'max_redirect', $id )
896 67 100       189 unless exists $entry->{redirects_left};
897              
898 67         253 return 1;
899             }
900              
901             sub _strip_host_from_uri {
902 69     69   8112 my $uri = shift;
903              
904 69         182 my $scheme_and_auth = quotemeta( $uri->scheme . '://' . $uri->authority );
905 69         1719 my $url = $uri->as_string;
906              
907 69         701 $url =~ s/^$scheme_and_auth//;
908 69 100       1336 $url = "/$url" unless $url =~ m{^/};
909              
910 69         126 return $url;
911             }
912              
913             sub _io_select {
914 8444     8444   5722 my $self = shift;
915 8444   66     26558 return $$self{io_select} ||= IO::Select->new();
916             }
917              
918             sub _make_url_absolute {
919 34     34   7496 my %args = @_;
920              
921 34         36 my $in = $args{url};
922 34         27 my $ref = $args{ref};
923              
924 34         113 return URI->new_abs($in, $ref)->as_string;
925             }
926              
927             sub _add_error_response_to_return {
928 7     7   15 my $self = shift;
929 7         111 my %args = @_;
930              
931 33     33   81855 use HTTP::Status;
  33         55  
  33         10584  
932              
933             my $response =
934             HTTP::Response->new( $args{code}, status_message( $args{code} ),
935 7         63 undef, $args{content} );
936              
937 7         675 $response->request( $args{request} );
938 7 50       69 $response->previous( $args{previous} ) if $args{previous};
939              
940 7         86 $self->_add_to_return_queue( [ $response, $args{id} ] );
941 7         19 delete $$self{in_progress}{ $args{id} };
942              
943 7         18 return $response;
944              
945             }
946              
947             =head1 SEE ALSO
948              
949             L - a polite form of this module. Slows the scraping down
950             by domain so that the remote server is not overloaded.
951              
952             =head1 GOTCHAS
953              
954             The responses may not come back in the same order as the requests were made.
955             For https requests to work, you must have L installed.
956              
957             =head1 THANKS
958              
959             Egor Egorov contributed patches for proxies, catching connections that die
960             before headers sent and more.
961              
962             Tomohiro Ikebe from livedoor.jp submitted patches (and a test) to properly
963             handle 304 responses.
964              
965             Naveed Massjouni for adding the https handling code.
966              
967             Alex Balhatchet for adding the https + proxy handling code, and for making the
968             tests run ok in parallel.
969              
970             Josef Toman for fixing two bugs, one related to header handling and another
971             related to producing an absolute URL correctly.
972              
973             Github user 'c00ler-' for adding LocalAddr and LocalPort support.
974              
975             rt.cpan.org user 'Florian (fschlich)' for typo in documentation.
976              
977             Heikki Vatiainen for the ssl_options support patch.
978              
979             Daniel Lintott of the Debian Perl Group for pointing out a test failure when
980             using a very recent version of HTTP::Server::Simple to implement
981             t/TestServer.pm
982              
983             =head1 BUGS AND REPO
984              
985             Please submit all bugs, patches etc on github
986              
987             L
988              
989             =head1 AUTHOR
990              
991             Edmund von der Burg C<< >>.
992              
993             L
994              
995             =head1 LICENCE AND COPYRIGHT
996              
997             Copyright (c) 2006, Edmund von der Burg C<< >>.
998             All rights reserved.
999              
1000             This module is free software; you can redistribute it and/or modify it under
1001             the same terms as Perl itself.
1002              
1003             =head1 DISCLAIMER OF WARRANTY
1004              
1005             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE
1006             SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE
1007             STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE
1008             SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED,
1009             INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
1010             FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND
1011             PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE,
1012             YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.
1013              
1014             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY
1015             COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE
1016             SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES,
1017             INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
1018             OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED TO
1019             LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
1020             THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER
1021             SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
1022             POSSIBILITY OF SUCH DAMAGES.
1023              
1024             =cut
1025              
1026             1;
1027