File Coverage

blib/lib/IO/Lambda/HTTP.pm
Criterion Covered Total %
statement 160 281 56.9
branch 45 168 26.7
condition 16 63 25.4
subroutine 26 33 78.7
pod 2 15 13.3
total 249 560 44.4


line stmt bran cond sub pod time code
1             # $Id: HTTP.pm,v 1.59 2012/01/13 06:16:28 dk Exp $
2             package IO::Lambda::HTTP;
3 1     1   1115 use vars qw(@ISA @EXPORT_OK $DEBUG);
  1         2  
  1         87  
4             @ISA = qw(Exporter);
5             @EXPORT_OK = qw(http_request);
6              
7             our $DEBUG = $IO::Lambda::DEBUG{http} || 0;
8              
9 1     1   6 use strict;
  1         1  
  1         22  
10 1     1   5 use warnings;
  1         2  
  1         23  
11 1     1   939 use Socket;
  1         3896  
  1         681  
12 1     1   6 use Exporter;
  1         1  
  1         31  
13 1     1   786 use IO::Socket;
  1         20412  
  1         5  
14 1     1   4644 use HTTP::Response;
  1         8290  
  1         37  
15 1     1   9 use IO::Lambda qw(:lambda :stream);
  1         1  
  1         285  
16 1     1   710 use IO::Lambda::Socket qw(connect);
  1         3  
  1         59  
17 1     1   5 use Time::HiRes qw(time);
  1         2  
  1         5  
18              
19             sub http_request(&)
20             {
21 0     0 1 0 __PACKAGE__-> new(context)->
22             condition(shift, \&http_request, 'http_request')
23             }
24              
25             sub new
26             {
27 4     4 1 9541 my ( $class, $req, %options) = @_;
28              
29 4         10 my $self = bless {}, $class;
30              
31 4 50       13 $self-> {deadline} = $options{timeout} + time if defined $options{timeout};
32 4 50       12 $self-> {deadline} = $options{deadline} if defined $options{deadline};
33 4 50       20 $self-> {max_redirect} = defined($options{max_redirect}) ? $options{max_redirect} : 7;
34              
35 4         9 delete @options{qw(deadline timeout max_redirect)};
36 4         11 $self-> {$_} = $options{$_} for keys %options;
37              
38 4         7 my %headers;
39 4         13 $headers{'User-Agent'} = "perl/IO-Lambda-HTTP v$IO::Lambda::VERSION";
40              
41 4 50       12 if ( $self-> {keep_alive}) {
42 0 0       0 unless ( $self-> {conn_cache}) {
43 0         0 require LWP::ConnCache;
44 0         0 $self-> {conn_cache} = LWP::ConnCache-> new;
45             }
46 0 0       0 unless ( $req-> protocol) {
47 0         0 $req-> protocol('HTTP/1.1');
48             }
49 0         0 $headers{Host} = $req-> uri-> host;
50 0         0 $headers{Connection} = 'Keep-Alive';
51 0         0 $headers{'Keep-Alive'} = 300;
52             }
53              
54 4 50       10 require IO::Lambda::DNS if $self-> {async_dns};
55            
56 4         20 my $h = $req-> headers;
57 4         35 while ( my ($k, $v) = each %headers) {
58 4 50       13 $h-> header($k, $v) unless defined $h-> header($k);
59             }
60              
61 4         288 return $self-> handle_redirect( $req);
62             }
63              
64             # HTTP::Response features methods base() and request() that we need to set as well
65             sub finalize_response
66             {
67 4     4 0 8 my ( $self, $req, $response) = @_;
68 4         16 $response-> request($req);
69 4         43 return $response;
70             }
71              
72             # reissue the request, if necessary, because of 30X or 401 errors
73             sub handle_redirect
74             {
75 4     4 0 9 my ( $self, $req) = @_;
76            
77 4         6 my $was_redirected = 0;
78 4         6 my $was_failed_auth = 0;
79              
80 4         7 my $auth = $self-> {auth};
81              
82             lambda {
83 8     8   13 my $method;
84 8 50       16 if ( $auth) {
85             # create fake response for protocol initiation, -- but just once
86 0         0 my $x = HTTP::Response-> new;
87 0         0 $x-> headers-> header('WWW-Authenticate', split(',', $auth));
88 0         0 $method = $self-> get_authenticator( $req, $x);
89 0         0 undef $auth;
90             }
91 8   33     38 context $method || $self-> handle_connection( $req);
92             tail {
93             # request is finished
94 8         15 my $response = shift;
95 8 50       27 return $response unless ref($response);
96              
97 8 100 33     28 if ( $response-> code =~ /^3/) {
    50 33        
      33        
98 4         85 $was_failed_auth = 0;
99             return 'too many redirects'
100 4 50       16 if ++$was_redirected > $self-> {max_redirect};
101              
102 4         13 my $location = $response-> header('Location');
103 4 50       165 return $response unless defined $location;
104 4         19 $req-> uri( URI-> new_abs( $location, $req-> uri));
105 4         619 $req-> headers-> header( Host => $req-> uri-> host);
106              
107 4 50       285 warn "redirect to " . $req-> uri . "\n" if $DEBUG;
108              
109 4         14 this-> start;
110             } elsif (
111             not($was_failed_auth) and
112             $response-> code eq '401' and
113             defined($self-> {username}) and
114             defined($self-> {password})
115             ) {
116 0         0 $was_failed_auth++;
117 0         0 $method = $self-> get_authenticator( $req, $response);
118 0         0 context $method;
119             return $method ? tail {
120 0         0 my $r = shift;
121 0 0       0 return $r unless $r;
122              
123             # start from beginning, from handle_connection;
124 0         0 this-> start;
125 0 0       0 } : $self-> finalize_response($req, $response);
126             } else {
127 4         119 return $self-> finalize_response($req, $response);
128             }
129 4         30 }};
  8         60  
130             }
131              
132             # if request needs authentication, and we can do something about it, create
133             # a lambda that handles the authentication
134             sub get_authenticator
135             {
136 0     0 0 0 my ( $self, $req, $response) = @_;
137              
138             # supports authentication?
139 0         0 my %auth;
140 0         0 for my $auth ( $response-> header('WWW-Authenticate')) {
141 0         0 $auth =~ s/\s.*$//;
142 0         0 $auth{$auth}++;
143             }
144              
145             my %preferred = defined($self-> {preferred_auth}) ? (
146             ref($self-> {preferred_auth}) ?
147 0         0 %{ $self-> {preferred_auth} } :
148 0 0       0 ( $self-> {preferred_auth} => 1 )
    0          
149             ) : ();
150            
151             my @auth = sort {
152 0   0     0 ($preferred{$b} || 0) <=> ($preferred{$a} || 0)
      0        
153             } grep {
154 0 0       0 not exists($preferred{$_}) or $preferred{$_} >= 0;
  0         0  
155             } keys %auth;
156              
157 0         0 my $compilation_errors = '';
158 0         0 for my $auth ( @auth) {
159 0 0       0 if ( $auth eq 'Basic') {
160             # always
161 0 0       0 warn "trying basic authentication\n" if $DEBUG;
162 0         0 $req-> authorization_basic( $self-> {username}, $self-> {password});
163 0         0 return $self-> handle_connection( $req);
164             }
165              
166 0         0 eval { require "IO/Lambda/HTTP/Authen/$auth.pm" };
  0         0  
167 0 0 0     0 $compilation_errors .= "$@\n"
168             if $@ and ($@ !~ m{^Can't locate IO/Lambda/HTTP/Authen/$auth});
169 0 0       0 next if $@;
170            
171 0         0 my $lambda = "IO::Lambda::HTTP::Authen::$auth"->
172             authenticate( $self, $req, $response);
173 0 0 0     0 warn "trying authentication with '$auth'\n" if $DEBUG and $lambda;
174 0 0       0 return $lambda if $lambda;
175             }
176            
177             # XXX Propagate compilation errors as http errors. Doubtful.
178 0 0   0   0 return lambda { $compilation_errors } if length $compilation_errors;
  0         0  
179              
180 0         0 return undef;
181             }
182              
183              
184             # get scheme and eventually load module
185             my $got_https;
186             sub prepare_transport
187             {
188 8     8 0 16 my ( $self, $req) = @_;
189 8         23 my $scheme = $req-> uri-> scheme;
190              
191 8 50       181 unless ( defined $scheme) {
    50          
    50          
192 0         0 return "bad URI: " . $req-> uri-> as_string;
193             } elsif ( $scheme eq 'https') {
194 0 0       0 unless ( $got_https) {
195 0         0 eval { require IO::Lambda::HTTP::HTTPS; };
  0         0  
196 0 0       0 return "https not supported: $@" if $@;
197 0         0 $got_https++;
198             }
199 0         0 $self-> {reader} = IO::Lambda::HTTP::HTTPS::https_reader();
200 0         0 $self-> {writer} = \&IO::Lambda::HTTP::HTTPS::https_writer;
201 0 0       0 warn "https enabled\n" if $DEBUG;
202             } elsif ( $scheme ne 'http') {
203 0         0 return "bad URI scheme: $scheme";
204             } else {
205 8         16 $self-> {reader} = undef;
206 8         15 $self-> {writer} = undef;
207             }
208              
209 8         16 return;
210             }
211              
212             # returns static lambda that reads from socket until a condition (see sysreader) is satisfied
213             sub http_read
214             {
215 24     24 0 47 my ( $self, $cond) = @_;
216 24         143 return $self-> {reader}, $self-> {socket}, \ $self-> {buf}, $cond, $self-> {deadline};
217             }
218              
219             # read from socket until a condition (see sysreader) is satisfied
220             # after this call no communication should happen
221             sub http_tail
222             {
223 8     8 0 99 my ( $self, $cond) = @_;
224 8         25 context $self-> http_read($cond);
225 8         30 &tail();
226             }
227              
228             sub socket
229             {
230 8     8 0 14 my ( $self, $host, $port) = @_;
231              
232 8         70 my $sock = IO::Socket::INET-> new(
233             PeerAddr => $host,
234             PeerPort => $port,
235             Proto => 'tcp',
236             Blocking => 0,
237             );
238 8 50       24040 return $sock, ( $sock ? undef : "connect: $!");
239             }
240              
241             # Connect to the remote, wait for protocol to finish, and
242             # close the connection if needed. Returns HTTP::Response object on success
243             sub handle_connection
244             {
245 8     8 0 15 my ( $self, $req) = @_;
246            
247 8         10 my ( $host, $port);
248 8 50       19 if ( defined( $self-> {proxy})) {
249 0 0       0 if ( ref($self->{proxy})) {
250 0     0   0 return lambda { "'proxy' option must be a non-empty array" } if
251             ref($self->{proxy}) ne 'ARRAY' or
252 0 0 0     0 not @{$self->{proxy}};
  0         0  
253 0         0 ($host, $port) = @{$self->{proxy}};
  0         0  
254             } else {
255 0         0 $host = $self-> {proxy};
256             }
257 0   0     0 $port ||= $req-> uri-> port;
258             } else {
259 8         28 ( $host, $port) = ( $req-> uri-> host, $req-> uri-> port);
260             }
261              
262             # have a chance to load eventual modules early
263 8         625 my $err = $self-> prepare_transport( $req);
264 8 50   0   23 return lambda { $err } if defined $err;
  0         0  
265              
266             lambda {
267             # resolve hostname
268 8 50 33 8   35 if (
269             $self-> {async_dns} and
270             $host !~ /^(\d{1,3}\.){3}(\d{1,3})$/
271             ) {
272             context $host,
273 0   0     0 timeout => ($self-> {deadline} || $IO::Lambda::DNS::TIMEOUT);
274 0 0       0 warn "resolving $host\n" if $DEBUG;
275             return IO::Lambda::DNS::dns( sub {
276 0         0 $host = shift;
277 0 0       0 return $host unless $host =~ /^\d/; # error
278 0 0       0 warn "resolved to $host\n" if $DEBUG;
279 0         0 return this-> start; # restart the lambda with different $host
280 0         0 });
281             }
282              
283 8         14 delete $self-> {close_connection};
284 8 50 50     35 $self-> {close_connection}++
285             if ( $req-> header('Connection') || '') =~ /^close/i;
286            
287             # got cached socket?
288 8         388 my ( $sock, $cached);
289 8         15 my $cc = $self-> {conn_cache};
290              
291 8 50       19 if ( $cc) {
292 0         0 $sock = $cc-> withdraw( __PACKAGE__, "$host:$port");
293 0 0       0 if ( $sock) {
294 0         0 my $err = unpack('i', getsockopt( $sock, SOL_SOCKET, SO_ERROR));
295 0 0       0 $err ? undef $sock : $cached++;
296 0 0       0 warn "reused socket is ".($err ? "bad" : "ok")."\n" if $DEBUG;
    0          
297             }
298             }
299              
300             # connect
301 8         9 my $err;
302 8 50 33     28 warn "connecting\n" if $DEBUG and not($sock);
303 8 50       33 ( $sock, $err) = $self-> socket( $host, $port) unless $sock;
304 8 50       32 return $err unless $sock;
305 8         48 context( $sock, $self-> {deadline});
306              
307             connect {
308 8 50       20 return shift if @_;
309             # connected
310              
311 8         20 $self-> {socket} = $sock;
312 8         30 $self-> {reader} = readbuf ( $self-> {reader});
313 8 50       24 $self-> {writer} = $self-> {writer}-> ($cached) if $self-> {writer};
314 8         30 $self-> {writer} = writebuf( $self-> {writer});
315              
316 8         27 context $self-> handle_request( $req);
317             autocatch tail {
318 8         16 my $response = shift;
319            
320             # put back the connection, if possible
321 8 50 33     24 if ( $cc and not $self-> {close_connection}) {
322 0         0 my $err = unpack('i', getsockopt( $sock, SOL_SOCKET, SO_ERROR));
323 0 0 0     0 warn "deposited socket back\n" if $DEBUG and not($err);
324 0 0       0 $cc-> deposit( __PACKAGE__, "$host:$port", $sock)
325             unless $err;
326             }
327            
328 8 0 33     19 warn "connection:close\n" if $DEBUG and $self-> {close_connection};
329              
330 8         12 delete @{$self}{qw(close_connection socket buf writer reader)};
  8         55  
331            
332 8         25 return $response;
333 8         78 }}}
  8         44  
334 8         55 }
335              
336             # Execute single http request over an established connection.
337             # Returns either a HTTP::Response object, or an error string
338             sub handle_request
339             {
340 8     8 0 16 my ( $self, $req) = @_;
341              
342             lambda {
343 8     8   20 $self-> {buf} = '';
344              
345 8         22 context $self-> handle_request_in_buffer( $req);
346              
347 8 50       21 if ( $DEBUG) {
348 0         0 warn "request sent\n";
349 0 0       0 warn $req-> as_string . "\n" if $DEBUG > 1;
350             }
351             tail {
352 8         20 my ( undef, $error) = @_; # readbuf style
353 8 50       25 if ( $DEBUG) {
354 0         0 warn "got response\n";
355 0 0       0 warn (( $error ? $error : $self-> {buf}) . "\n") if $DEBUG > 1;
    0          
356             }
357 8 50       50 return defined($error) ? $error : $self-> parse( \ $self-> {buf} );
358 8         51 }}
359 8         38 }
360              
361             # Execute single http request over an established connection.
362             # Returns 2 parameters, readbuf-style, where actually only the 2nd matters,
363             # and signals error if defined. 2 parameters are there for readbuf() compatibility,
364             # so that the protocol handler can easily fall back to readbuf() itself.
365             sub handle_request_in_buffer
366             {
367 8     8 0 18 my ( $self, $req) = @_;
368              
369 8         31 my $method = $req-> method;
370              
371             # fixup path - otherwise LWP generates request as GET http://hostname/uri HTTP/1.1
372             # which not all servers understand
373 8         96 my ($req_line, $save_uri);
374 8 50 50     46 if (!$self-> {proxy} && ( $req-> protocol || '') =~ /http\/1.\d/i) {
      33        
375 0         0 $save_uri = $req-> uri;
376              
377 0         0 my $fullpath = $save_uri-> path;
378 0 0       0 $fullpath = "/$fullpath" unless $fullpath =~ m[^/];
379 0         0 $req-> uri( $fullpath);
380             }
381 8         125 $req_line = $req-> as_string("\x0d\x0a");
382 8 50       828 $req-> uri($save_uri) if defined $save_uri;
383              
384             lambda {
385             # send request
386             context
387             $self-> {writer},
388             $self-> {socket}, \ $req_line,
389 8     8   45 undef, 0, $self-> {deadline};
390             state write => tail {
391 8         13 my ( $bytes_written, $error) = @_;
392 8 50       15 return undef, $error if $error;
393              
394 8         38 context $self-> {socket}, $self-> {deadline};
395             readable {
396             # request sent, now wait for data
397 8 50       26 return undef, 'timeout' unless shift;
398            
399             # read first line
400 8         88 context $self-> http_read(qr/^.*?\n/);
401             state head => tail {
402 8         15 my $line = shift;
403 8 50       22 unless ( defined $line) {
404 0         0 my $error = shift;
405             # remote closed connection and content is single-line HTTP/1.0
406 0 0       0 return (undef, $error) if $error ne 'eof';
407 0         0 return (undef, undef);
408             }
409              
410             # no headers?
411 8 50       60 return $self-> http_tail
412             unless $line =~ /^HTTP\/[\.\d]+\s+\d{3}\s+/i;
413              
414             # got some headers
415 8         41 context $self-> http_read( qr/^.*?\r?\n\r?\n/s);
416             state body => tail {
417 8         17 $line = shift;
418 8 50       21 return undef, shift unless defined $line;
419              
420 8         75 my $headers = HTTP::Response-> parse( $line);
421              
422             # Connection: close
423 8   50     4498 my $c = lc( $headers-> header('Connection') || '');
424 8         423 $self-> {close_connection} = $c =~ /^close\s*$/i;
425              
426 8 50       24 return 1 if $method eq 'HEAD';
427              
428 8         35 return $self-> http_read_body( length $line, $headers);
429 8         32 }}}}}
  8         53  
  8         76  
  8         27  
430 8         41 }
431              
432             # have headers, read body
433             sub http_read_body
434             {
435 8     8 0 20 my ( $self, $offset, $headers) = @_;
436              
437             # have Content-Length? read that many bytes then
438 8         28 my $l = $headers-> header('Content-Length');
439 8 100 66     414 return $self-> http_tail( $1 + $offset )
440             if defined ($l) and $l =~ /^(\d+)\s*$/;
441              
442             # have 'Transfer-Encoding: chunked' ? read the chunks
443 4   50     16 my $te = lc( $headers-> header('Transfer-Encoding') || '');
444             return $self-> http_read_chunked($offset)
445 4 50       186 if $self-> {chunked} = $te =~ /^chunked\s*$/i;
446              
447             # just read as much as possible then -- however with considerations;
448             # we can't do that if server keeps connection open, otherwise we'll hang
449              
450             # http/1.0 and less doesn't implement open connections
451 4 50 33     15 return $self-> http_tail if
452             $headers-> protocol =~ /^HTTP\/(\d+\.\d+)/ and
453             $1 < 1.1;
454              
455             # server wants to close the connection
456             return $self-> http_tail if
457 0 0       0 $self-> {close_connection};
458             }
459              
460             # read sequence of TE chunks
461             sub http_read_chunked
462             {
463 0     0 0 0 my ( $self, $offset) = @_;
464              
465 0         0 my ( @frame, @ctx);
466              
467             # read chunk size
468 0         0 pos( $self-> {buf} ) = $offset;
469 0         0 context @ctx = $self-> http_read( qr/\G[^\r\n]+\r?\n/i);
470             state size => tail {
471 0     0   0 my $line = shift;
472 0 0       0 return undef, shift unless defined $line; # got error
473              
474             # advance
475 0         0 substr( $self-> {buf}, $offset, length($line), '');
476 0         0 pos( $self-> {buf} ) = $offset;
477 0         0 $line =~ s/\r?\n//;
478 0 0       0 return undef, "protocol error: chunk size error"
479             unless $line =~ /^[\da-f]+$/;
480 0         0 my $size = hex $line;
481 0 0       0 warn "reading chunk $size bytes\n" if $DEBUG;
482 0 0       0 return 1 unless $size;
483 0         0 $size += 2; # CRLF
484            
485             # save this lambda frame
486 0         0 @frame = restartable;
487              
488             # read the chunk itself
489 0         0 context $self-> http_read( $offset + $size);
490             state chunk => tail {
491 0 0       0 unless ( shift ) {
492 0         0 undef @frame; # break circular reference
493 0         0 return undef, shift;
494             }
495              
496 0         0 $offset += $size - 2;
497 0         0 substr( $self->{buf}, $offset, 2, '' ); # remove CRLF
498 0         0 pos( $self-> {buf} ) = $offset;
499 0 0       0 warn "chunk $size bytes ok\n" if $DEBUG;
500              
501 0         0 context @ctx;
502 0         0 again( @frame);
503 0         0 undef @frame; # break circular reference
504 0         0 }};
  0         0  
505             }
506              
507              
508             sub parse
509             {
510 8     8 0 18 my ( $self, $buf_ptr) = @_;
511 8 50       118 return HTTP::Response-> parse( $$buf_ptr) if $$buf_ptr =~ /^(HTTP\S+)\s+(\d{3})\s+/i;
512 0           return HTTP::Response-> new( '000', '', undef, $$buf_ptr);
513             }
514              
515             1;
516              
517             __DATA__