File Coverage

blib/lib/AnyEvent/MtGox/Stream.pm
Criterion Covered Total %
statement 33 90 36.6
branch 0 30 0.0
condition 0 17 0.0
subroutine 11 17 64.7
pod 1 1 100.0
total 45 155 29.0


line stmt bran cond sub pod time code
1             package AnyEvent::MtGox::Stream;
2              
3 1     1   54214 use strict;
  1         4  
  1         41  
4 1     1   6 use warnings;
  1         2  
  1         36  
5              
6 1     1   1868 use AnyEvent;
  1         12878  
  1         82  
7 1     1   1330 use AnyEvent::HTTP qw(http_post);
  1         55043  
  1         113  
8 1     1   11 use AnyEvent::Util qw(guard);
  1         3  
  1         49  
9 1     1   7 use Carp qw(croak);
  1         3  
  1         53  
10 1     1   7 use Errno qw(EPIPE);
  1         1  
  1         42  
11 1     1   1148 use JSON ();
  1         21927  
  1         30  
12 1     1   1162 use Protocol::WebSocket::Frame;
  1         355432  
  1         44  
13 1     1   1564 use Protocol::WebSocket::Handshake::Client;
  1         89308  
  1         32  
14 1     1   12366 use URI;
  1         25380  
  1         5249  
15              
16             our $VERSION = '0.02';
17             $VERSION = eval $VERSION;
18              
19             sub new {
20 0     0 1   my ($class, %params) = @_;
21              
22 0   0       my $secure = $params{secure} || 0;
23 0   0 0     my $on_disconnect = $params{on_disconnect} || sub { croak 'Disconnected' };
  0            
24 0   0 0     my $on_error = $params{on_error} || sub { croak @_ };
  0            
25 0   0 0     my $on_message = $params{on_message} || sub { };
  0            
26              
27 0           my $server = 'socketio.mtgox.com';
28 0           my $handle;
29              
30             # Socket.IO handshake.
31 0           my $uri = URI->new("http://$server/socket.io/1");
32 0 0         $uri->scheme('https') if $secure;
33 0           AE::log debug => "Making Socket.IO handshake to $uri";
34             http_post $uri, undef, sub {
35 0     0     my ($body, $headers) = @_;
36 0 0 0       return $on_error->('Socket.IO handshake failed')
37             unless '200' eq $headers->{Status} and defined $body;
38 0           my ($sid, $heartbeat) = split ':', $body, 3;
39 0           AE::log debug => "Socket.IO handshake succeeded: $sid";
40              
41 0           my $timer;
42             $handle = AnyEvent::Handle->new(
43             connect => [ $uri->host, $uri->port ],
44             tls => $secure ? 'connect' : undef,
45             on_error => sub {
46 0           my ($handle, $fatal, $msg) = @_;
47 0           $handle->destroy;
48 0           undef $timer;
49 0 0         $!{EPIPE} == $! ? $on_disconnect->() : $on_error->($msg);
50             },
51 0 0         );
52              
53             # WebSocket handshake.
54 0           $uri->path("/socket.io/1/websocket/$sid");
55 0 0         $uri->scheme($secure ? 'wss' : 'ws');
56 0           AE::log debug => "Making WebSocket handshake to $uri";
57 0           my $wsh = Protocol::WebSocket::Handshake::Client->new(url => "$uri");
58 0           $handle->push_write($wsh->to_string);
59              
60             $handle->push_read(sub {
61 0 0         $wsh->parse($handle->rbuf)
62             or return 1, $on_error->($wsh->error);
63 0 0         if ($wsh->is_done) {
64 0           AE::log debug => 'WebSocket handshake succeeded';
65 0           return 1;
66             }
67 0           });
68              
69 0           my $frame = Protocol::WebSocket::Frame->new;
70             my $send_message = sub {
71 0           $handle->push_write($frame->new(@_)->to_bytes);
72 0           };
73             my $send_heartbeat = sub {
74 0           AE::log debug => 'Sending heartbeat';
75 0           $send_message->('2::');
76 0           };
77              
78 0 0         $timer = AE::timer $heartbeat - 2, $heartbeat - 2, $send_heartbeat
79             if $heartbeat;
80              
81 0           my $json = JSON->new;
82              
83             $handle->push_read(sub {
84 0           $frame->append($handle->rbuf);
85 0           while (defined(my $msg = $frame->next)) {
86 0           my ($type, $id, $endpoint, $data) = split ':', $msg, 4;
87 0 0 0       if ('4' eq $type and '/mtgox' eq $endpoint) {
    0 0        
    0          
    0          
    0          
88 0 0         if (my $scalar = eval { $json->decode($data) }) {
  0            
89 0           $on_message->($scalar);
90             }
91             }
92             # Respond to heartbeats only if a heartbeat timeout wasn't
93             # given in the handshake.
94             elsif ('2' eq $type and not $heartbeat) {
95 0           $send_heartbeat->();
96             }
97 0           elsif ('1::' eq $msg) { $send_message->('1::/mtgox') }
98 0           elsif ('0' eq $type) { return 1, $on_disconnect->() }
99 0           elsif ('7' eq $type) { return 1, $on_error->($data) }
100              
101             # Unhandled message types:
102             # 3: message
103             # 5: event
104             # 6: ack
105             # 8: noop
106             }
107 0           });
108 0           };
109              
110 0 0         return unless defined wantarray;
111 0     0     return guard { $handle->destroy };
  0            
112             }
113              
114             1;
115              
116             __END__