File Coverage

blib/lib/AnyEvent/Twitter/Stream.pm
Criterion Covered Total %
statement 29 145 20.0
branch 0 50 0.0
condition 0 39 0.0
subroutine 10 20 50.0
pod 1 1 100.0
total 40 255 15.6


line stmt bran cond sub pod time code
1             package AnyEvent::Twitter::Stream;
2              
3 3     3   18657 use strict;
  3         3  
  3         61  
4 3     3   78 use 5.008_001;
  3         8  
5             our $VERSION = '0.28';
6              
7 3     3   1689 use AnyEvent;
  3         7519  
  3         75  
8 3     3   1613 use AnyEvent::HTTP;
  3         64221  
  3         201  
9 3     3   23 use AnyEvent::Util;
  3         4  
  3         187  
10 3     3   1321 use MIME::Base64;
  3         1407  
  3         146  
11 3     3   1290 use URI;
  3         13893  
  3         69  
12 3     3   13 use URI::Escape;
  3         3  
  3         131  
13 3     3   11 use Carp;
  3         2  
  3         139  
14 3     3   1657 use Compress::Raw::Zlib;
  3         11363  
  3         4383  
15              
16             our $STREAMING_SERVER = 'stream.twitter.com';
17             our $USERSTREAM_SERVER = 'userstream.twitter.com';
18             our $SITESTREAM_SERVER = 'sitestream.twitter.com';
19             our $PROTOCOL = 'https';
20             our $US_PROTOCOL = 'https'; # for testing
21              
22             my %methods = (
23             filter => [ POST => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/filter.json" } ],
24             sample => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/sample.json" } ],
25             firehose => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/firehose.json" } ],
26             userstream => [ GET => sub { "$US_PROTOCOL://$USERSTREAM_SERVER/1.1/user.json" } ],
27             sitestream => [ GET => sub { "$PROTOCOL://$SITESTREAM_SERVER/1.1/site.json" } ],
28              
29             # DEPRECATED
30             links => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/links.json" } ],
31             retweet => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/retweet.json" } ],
32             );
33              
34             sub new {
35 0     0 1   my $class = shift;
36 0           my %args = @_;
37              
38 0           my $username = delete $args{username};
39 0           my $password = delete $args{password};
40 0           my $consumer_key = delete $args{consumer_key};
41 0           my $consumer_secret = delete $args{consumer_secret};
42 0           my $token = delete $args{token};
43 0           my $token_secret = delete $args{token_secret};
44 0           my $method = delete $args{method};
45 0   0 0     my $on_connect = delete $args{on_connect} || sub { };
46 0           my $on_tweet = delete $args{on_tweet};
47 0   0 0     my $on_error = delete $args{on_error} || sub { die @_ };
  0            
48 0   0 0     my $on_eof = delete $args{on_eof} || sub { };
49 0   0 0     my $on_keepalive = delete $args{on_keepalive} || sub { };
50 0           my $on_delete = delete $args{on_delete};
51 0           my $on_friends = delete $args{on_friends};
52 0           my $on_direct_message = delete $args{on_direct_message};
53 0           my $on_event = delete $args{on_event};
54 0           my $timeout = delete $args{timeout};
55              
56 0           my $decode_json;
57 0 0         unless (delete $args{no_decode_json}) {
58 0           require JSON;
59 0           $decode_json = 1;
60             }
61              
62 0           my ($zlib, $_zstatus);
63 0 0         if (delete $args{use_compression}){
64 0           ($zlib, $_zstatus) = Compress::Raw::Zlib::Inflate->new(
65             -LimitOutput => 1,
66             -AppendOutput => 1,
67             -WindowBits => WANT_GZIP_OR_ZLIB,
68             );
69 0 0         die "Can't make inflator: $_zstatus" unless $zlib;
70             }
71              
72 0 0 0       unless ($methods{$method} || exists $args{api_url} ) {
73 0           $on_error->("Method $method not available.");
74 0           return;
75             }
76              
77 0   0       my $uri = URI->new(delete $args{api_url} || $methods{$method}[1]());
78              
79 0           my $request_body;
80 0   0       my $request_method = delete $args{request_method} || $methods{$method}[0] || 'GET';
81 0 0         if ( $request_method eq 'POST' ) {
82 0           $request_body = join '&', map "$_=" . URI::Escape::uri_escape_utf8($args{$_}), keys %args;
83             }else{
84 0           $uri->query_form(%args);
85             }
86              
87 0           my $auth;
88 0 0         if ($consumer_key) {
89 0           eval {require Net::OAuth;};
  0            
90 0 0         die $@ if $@;
91              
92 0 0         my $request = Net::OAuth->request('protected resource')->new(
93             version => '1.0',
94             consumer_key => $consumer_key,
95             consumer_secret => $consumer_secret,
96             token => $token,
97             token_secret => $token_secret,
98             request_method => $request_method,
99             signature_method => 'HMAC-SHA1',
100             timestamp => time,
101             nonce => MIME::Base64::encode( time . $$ . rand ),
102             request_url => $uri,
103             $request_method eq 'POST' ? (extra_params => \%args) : (),
104             );
105 0           $request->sign;
106 0           $auth = $request->to_authorization_header;
107             }else{
108 0           $auth = "Basic ".MIME::Base64::encode("$username:$password", '');
109             }
110              
111 0           my $self = bless {}, $class;
112              
113             {
114 0           Scalar::Util::weaken(my $self = $self);
  0            
115              
116             my $set_timeout = $timeout
117 0     0     ? sub { $self->{timeout} = AE::timer($timeout, 0, sub { $on_error->('timeout') }) }
  0            
118 0 0   0     : sub {};
119              
120             my $on_json_message = sub {
121 0     0     my ($json) = @_;
122              
123             # Twitter stream returns "\x0a\x0d\x0a" if there's no matched tweets in ~30s.
124 0           $set_timeout->();
125 0 0         if ($json !~ /^\s*$/) {
126 0 0         my $tweet = $decode_json ? JSON::decode_json($json) : $json;
127 0 0 0       if ($on_delete && $tweet->{delete} && $tweet->{delete}->{status}) {
    0 0        
    0 0        
    0 0        
      0        
128 0           $on_delete->($tweet->{delete}->{status}->{id}, $tweet->{delete}->{status}->{user_id});
129             }elsif($on_friends && $tweet->{friends}) {
130 0           $on_friends->($tweet->{friends});
131             }elsif($on_direct_message && $tweet->{direct_message}) {
132 0           $on_direct_message->($tweet->{direct_message});
133             }elsif($on_event && $tweet->{event}) {
134 0           $on_event->($tweet);
135             }else{
136 0           $on_tweet->($tweet);
137             }
138             }
139             else {
140 0           $on_keepalive->();
141             }
142 0           };
143              
144 0           $set_timeout->();
145              
146             $self->{connection_guard} = http_request($request_method, $uri,
147             headers => {
148             Accept => '*/*',
149             ( defined $zlib ? ('Accept-Encoding' => 'deflate, gzip') : ()),
150             Authorization => $auth,
151             ($request_method eq 'POST'
152             ? ('Content-Type' => 'application/x-www-form-urlencoded')
153             : ()
154             ),
155             },
156             body => $request_body,
157             on_header => sub {
158 0     0     my($headers) = @_;
159 0 0         if ($headers->{Status} ne '200') {
160 0           $on_error->("$headers->{Status}: $headers->{Reason}");
161 0           return;
162             }
163 0           return 1;
164             },
165             want_body_handle => 1, # for some reason on_body => sub {} doesn't work :/
166             sub {
167 0     0     my ($handle, $headers) = @_;
168              
169 0 0         return unless $handle;
170 0           my $input;
171             my $chunk_reader;
172             $chunk_reader = sub {
173 0           my ($handle, $line) = @_;
174              
175 0 0         $line =~ /^([0-9a-fA-F]+)/ or die 'bad chunk (incorrect length)';
176 0           my $len = hex $1;
177 0           my $chunk_part_reader;
178             $chunk_part_reader = sub {
179 0           my ($handle, $chunk_raw) = @_;
180 0           my $chunk = $chunk_raw;
181 0           $chunk =~ s/\r\n$//;
182 0           $input .= $chunk;
183 0 0         unless ($headers->{'content-encoding'}) {
    0          
184 0           while ($input =~ s/^(.*?)\r\n//) {
185 0           my ($json_raw) = $1;
186 0           $on_json_message->($json_raw);
187             }
188             } elsif ($headers->{'content-encoding'} =~ 'deflate|gzip') {
189 0           my ($message);
190 0   0       do {
191 0           $_zstatus = $zlib->inflate(\$input, \$message);
192 0 0 0       return unless $_zstatus == Z_OK || $_zstatus == Z_BUF_ERROR;
193             } while ( $_zstatus == Z_OK && length $input );
194 0           $on_json_message->($message);
195             } else {
196 0           die "Don't know how to decode $headers->{'content-encoding'}"
197             }
198 0           $handle->push_read(line => $chunk_reader);
199 0           }; # chunk_part_reader
200 0           $handle->push_read(chunk => $len + 2, $chunk_part_reader);
201 0           };
202              
203             my $line_reader = sub {
204 0           my ($handle, $line) = @_;
205              
206 0           $on_json_message->($line);
207 0           };
208              
209             $handle->on_error(sub {
210 0           undef $handle;
211 0           $on_error->($_[2]);
212 0           });
213             $handle->on_eof(sub {
214 0           undef $handle;
215 0           $on_eof->(@_);
216 0           });
217              
218 0 0 0       if (($headers->{'transfer-encoding'} || '') =~ /\bchunked\b/i) {
219             $handle->on_read(sub {
220 0           my ($handle) = @_;
221 0           $handle->push_read(line => $chunk_reader);
222 0           });
223             } else {
224             $handle->on_read(sub {
225 0           my ($handle) = @_;
226 0           $handle->push_read(line => $line_reader);
227 0           });
228             }
229              
230             $self->{guard} = AnyEvent::Util::guard {
231 0 0         $handle->destroy if $handle;
232 0           };
233              
234 0           $on_connect->();
235             }
236 0 0         );
    0          
237             }
238              
239 0           return $self;
240             }
241              
242             1;
243             __END__