File Coverage

blib/lib/Finance/Bitcoin/Feed/Pusher.pm
Criterion Covered Total %
statement 82 189 43.3
branch 3 62 4.8
condition 0 13 0.0
subroutine 28 58 48.2
pod 0 30 0.0
total 113 352 32.1


line stmt bran cond sub pod time code
1             package Finance::Bitcoin::Feed::Pusher;
2              
3 1     1   395 use strict;
  1         1  
  1         28  
4 1     1   4 use warnings;
  1         1  
  1         32  
5             our $VERSION = '0.01';
6 1     1   4 use feature qw(say);
  1         1  
  1         45  
7              
8 1     1   566 use AnyEvent::Socket;
  1         22180  
  1         180  
9 1     1   732 use AnyEvent::Handle;
  1         5958  
  1         33  
10 1     1   579 use Protocol::WebSocket::Handshake::Client;
  1         26444  
  1         30  
11 1     1   8 use Protocol::WebSocket::Frame;
  1         2  
  1         13  
12              
13 1     1   638 use JSON;
  1         11046  
  1         4  
14 1     1   613 use URI;
  1         3321  
  1         25  
15 1     1   611 use Data::Dumper;
  1         4877  
  1         81  
16              
17 1         69 use constant CHANNELS => qw(
18             order_book
19             live_trades
20 1     1   10 );
  1         1  
21              
22 1     1   4 use constant ATTRIBUTES => qw(protocol app_key channels ssl);
  1         2  
  1         45  
23 1     1   5 use constant PROTOCOL => 6;
  1         0  
  1         36  
24 1     1   4 use constant APP_KEY => 'de504dc5763aeef9ff52';
  1         1  
  1         34  
25              
26             # TODO: test SSL. This didnt seem to work for me last time I set it...
27 1     1   3 use constant SSL => 0;
  1         1  
  1         33  
28              
29             # cleartext...
30 1     1   3 use constant PORT => 80;
  1         1  
  1         45  
31 1     1   6 use constant SCHEME => 'ws';
  1         2  
  1         57  
32 1     1   7 use constant TLS => undef;
  1         1  
  1         63  
33 1     1   7 use constant HOST => 'ws.pusherapp.com';
  1         1  
  1         55  
34              
35             # ssl...
36 1     1   6 use constant SSL_PORT => 443;
  1         2  
  1         61  
37 1     1   30 use constant SSL_SCHEME => 'wws';
  1         3  
  1         71  
38 1     1   7 use constant SSL_TLS => 'connect';
  1         2  
  1         57  
39 1     1   7 use constant SSL_HOST => 'wws.pusherapp.com';
  1         2  
  1         1953  
40              
41             sub VERBOSE {
42 0   0 0 0 0 return $ENV{DEBUG} || 0;
43             }
44              
45             sub DEBUG {
46 0   0 0 0 0 return $ENV{DEBUG} || 0;
47             }
48              
49             # THESE two methods: trade() and order_book() are the main methods you will want to move and rewrite into your own module.
50             # within these subroutines you will have access to the json response in a hash format.
51             sub trade {
52 0     0 0 0 my $self = shift;
53 0         0 my $data = shift;
54 0         0 warn Data::Dumper->Dump([$data]);
55 0         0 warn "*** I am the default trade()... you should overwride this method in your own package\n";
56             }
57              
58             sub order_book {
59 0     0 0 0 my $self = shift;
60 0         0 my $data = shift;
61 0         0 warn Data::Dumper->Dump([$data]);
62 0         0 warn "** I am the default order_book()... you should overwride this method in your own package\n";
63             }
64              
65             # end the methods you should definately override.
66              
67             # This module is meant to be used as a base for your own module.
68             # Your own module will decide what to do with the incoming message through the
69             # trade() and order_book() routines.
70             #
71             # You should look at "test.pl" to see a basic example.
72              
73 1     1 0 9 sub new { (bless {} => shift)->init(@_) }
74              
75             sub init {
76 1     1 0 1 my $self = shift;
77 1         3 my %args = @_;
78 1         9 foreach my $attribute ($self->attributes) {
79 4 100       14 $self->$attribute($args{$attribute}) if exists $args{$attribute};
80             }
81 1         3 return $self;
82             }
83              
84             sub setup {
85 0     0 0 0 my $self = shift;
86 0 0       0 $self->channels([CHANNELS]) unless $self->channels;
87 0 0       0 $self->protocol(PROTOCOL) unless $self->protocol;
88 0 0       0 $self->app_key(APP_KEY) unless $self->app_key;
89 0 0       0 $self->ssl(SSL) unless $self->ssl;
90             }
91              
92             sub go {
93 0     0 0 0 my $self = shift;
94 0         0 $self->setup;
95 0         0 $self->handle;
96 0         0 $self->wait;
97             }
98              
99             sub handle {
100 0     0 0 0 my $self = shift;
101 0         0 $self->client(Protocol::WebSocket::Handshake::Client->new(url => $self->uri->as_string));
102 0         0 $self->frame(Protocol::WebSocket::Frame->new);
103 0         0 $self->{handle} = AnyEvent::Handle->new(
104             connect => [$self->host, $self->port],
105             tls => $self->tls,
106             tls_ctx => {verify => 0},
107             keepalive => 1,
108             wtimeout => 50,
109             on_connect => $self->on_connect,
110             on_read => $self->on_read,
111             on_wtimeout => $self->on_wtimeout,
112             on_error => $self->on_error,
113             on_eof => $self->on_eof,
114             );
115             }
116              
117             sub on_read {
118 0     0 0 0 my $self = shift;
119             return sub {
120 0     0   0 my $handle = shift;
121 0         0 my $chunk = $handle->{rbuf};
122 0         0 $handle->{rbuf} = undef;
123 0 0       0 if (!$self->client->is_done) {
124 0         0 $self->client->parse($chunk);
125             }
126              
127 0         0 $self->frame->append($chunk);
128 0 0       0 if ($self->frame->is_ping()) {
129 0         0 $handle->push_write(
130             $self->frame->new(
131             buffer => '',
132             type => 'pong'
133             )->to_bytes
134             );
135             }
136 0         0 while (my $msg = $self->frame->next) {
137 0         0 my $d;
138 0 0       0 eval { $d = $self->json->decode($msg); } or do {
  0         0  
139 0         0 my $e = $@;
140 0         0 warn $self->now . ' - error: ' . $e;
141 0         0 next;
142             };
143              
144 0 0       0 if ($d->{event} eq 'pusher:connection_established') {
    0          
    0          
    0          
145 0 0       0 say $self->now . ' - subscribing to events' if VERBOSE;
146 0         0 foreach my $channel (@{$self->channels}) {
  0         0  
147 0 0       0 say $self->now . ' - requesting channel: ' . $channel
148             if VERBOSE;
149 0         0 $handle->push_write(
150             $self->frame->new(
151             $self->json->encode({
152             event => 'pusher:subscribe',
153             data => {
154             channel => $channel,
155             },
156             })
157             )->to_bytes
158             );
159             }
160             } elsif ($d->{event} eq 'pusher_internal:subscription_succeeded') {
161 0 0       0 printf("%s - subscribed to channel: %s\n", $self->now, $d->{channel})
162             if VERBOSE;
163             }
164              
165             elsif ($d->{event} eq 'trade') {
166 0 0       0 printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)})
  0         0  
167             if VERBOSE;
168 0 0       0 if ($d->{channel} eq 'live_trades') {
169 0         0 my $data = $self->json->decode($d->{data});
170 0         0 $self->trade($data);
171             } else {
172 0 0       0 printf "%s - got event: %s", $self->now, Dumper $d
173             if VERBOSE;
174             }
175             } elsif ($d->{event} eq 'data') {
176 0 0       0 printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)})
  0         0  
177             if VERBOSE;
178 0 0       0 if ($d->{channel} eq 'order_book') {
179 0         0 my $data = $self->json->decode($d->{data});
180 0         0 $self->order_book($data);
181             } else {
182 0 0       0 printf '%s - got event: %s', $self->now, Dumper $d
183             if VERBOSE;
184             }
185             }
186              
187             else {
188 0 0       0 printf '%s - got event: %s', $self->now, Dumper $d if VERBOSE;
189             }
190              
191             }
192             }
193 0         0 }
194              
195             sub on_connect {
196 0     0 0 0 my $self = shift;
197             return sub {
198 0     0   0 my $handle = shift;
199 0 0       0 say $self->now . ' - connected to pusher' if VERBOSE;
200 0         0 $handle->push_write($self->client->to_string);
201             }
202 0         0 }
203              
204             sub on_wtimeout {
205 0     0 0 0 my $self = shift;
206             return sub {
207 0     0   0 my $handle = shift;
208 0         0 $handle->push_write(
209             $self->frame->new(
210             buffer => '',
211             type => 'ping'
212             )->to_bytes
213             );
214             }
215 0         0 }
216              
217             sub on_error {
218 0     0 0 0 my $self = shift;
219             return sub {
220 0     0   0 my ($handle, $fatal, $msg) = @_;
221 0 0 0     0 warn $self->now . " - fatal($fatal): $msg" if VERBOSE or DEBUG;
222 0         0 $handle->destroy;
223 0         0 $self->setup;
224             }
225 0         0 }
226              
227             sub on_eof {
228 0     0 0 0 my $self = shift;
229             return sub {
230 0     0   0 my $handle = shift;
231 0 0 0     0 warn $self->now . " - lost connection, reconnecting"
232             if VERBOSE or DEBUG;
233 0         0 $self->setup;
234             }
235 0         0 }
236              
237 1     1 0 3 sub attributes { ATTRIBUTES }
238 0     0 0 0 sub wait { AnyEvent->condvar->wait }
239 0   0 0 0 0 sub json { shift->{json} ||= JSON->new }
240 0 0   0 0 0 sub host { shift->ssl ? SSL_HOST : HOST }
241 0 0   0 0 0 sub port { shift->ssl ? SSL_PORT : PORT }
242 0 0   0 0 0 sub tls { shift->ssl ? SSL_TLS : TLS }
243 0 0   0 0 0 sub scheme { shift->ssl ? SSL_SCHEME : SCHEME }
244 0     0 0 0 sub client { my $self = shift; $self->get_set(@_) }
  0         0  
245 0     0 0 0 sub frame { my $self = shift; $self->get_set(@_) }
  0         0  
246 1     1 0 2 sub channels { my $self = shift; $self->get_set(@_) }
  1         5  
247 0     0 0 0 sub protocol { my $self = shift; $self->get_set(@_) }
  0         0  
248 0     0 0 0 sub app_key { my $self = shift; $self->get_set(@_) }
  0         0  
249 0     0 0 0 sub ssl { my $self = shift; $self->get_set(@_) }
  0         0  
250              
251             sub now {
252 0     0 0 0 sprintf '%4d-%02d-%02d %02d:%02d:%02d', (localtime(time))[5] + 1900, (localtime(time))[4, 3, 2, 1, 0];
253             }
254              
255             sub get_set {
256 1     1 0 2 my $self = shift;
257 1         6 my $attribute = ((caller(1))[3] =~ /::(\w+)$/)[0];
258 1 50       34 $self->{$attribute} = shift if scalar @_;
259 1         3 return $self->{$attribute};
260             }
261              
262             sub uri {
263 0     0 0   my $self = shift;
264 0 0         unless ($self->{uri}) {
265 0           my $uri = URI->new;
266 0           $uri->scheme('http');
267 0           $uri->host($self->host);
268 0           $uri->path(sprintf '/app/%s' => $self->app_key);
269 0           $uri->query_form(protocol => $self->protocol);
270 0           $uri->scheme($self->scheme);
271 0           $self->{uri} = $uri;
272             }
273 0           return $self->{uri};
274             }
275              
276             1;
277              
278             __END__
279              
280             # Below is stub documentation for your module. You'd better edit it!
281              
282             =head1 NAME
283              
284             This module is extracted from Finance::BitStamp::Socket v0.01.
285             BitStamp::Socket - Perl extension for connecting to the BitStamp exchange
286             socket through the Pusher service.
287              
288             =head1 SYNOPSIS
289              
290             # this will dump the socket messages to the terminal...
291              
292             use BitStamp::Socket;
293             BitStamp::Socket->new->go;
294              
295             ... or just type this at the command prompt:
296              
297             $ perl -e 'use base qw(BitStamp::Socket); main->new->go'
298              
299             =======================
300             But instead do this:
301             =======================
302              
303             use base qw(BitStamp::Socket);
304             main->new->go;
305            
306             sub order_book {
307             my $self = shift;
308             my $data = shift;
309             # I just got new order book socket data
310             # ... your code goes here ... #
311             }
312              
313             sub trade {
314             my $self = shift;
315             my $data = shift;
316             # I just got new trade socket data
317             # ... your code goes here ... #
318             }
319              
320              
321             =head1 DESCRIPTION
322              
323             This module is extracted from Finance::BitStamp::Socket v0.01. Please refer to
324             L<Finance::BitStamp::Socket>
325              
326             I cannot close its constant VERBOSE, so I copied it to my package directly.
327              
328             The BitStamp socket is the fastest any most bandwidth efficient way
329             to maintain your own up to date tracking of all trades and market
330             changes.
331              
332             This module will save you some time since the connection and
333             communication negotiations are done for you. All you need to do
334             is write the code to handle the messages. For example: to store
335             into a database.
336              
337              
338             =head1 SEE ALSO
339              
340             AnyEvent::Socket, AnyEvent::Handle
341              
342             =head1 AUTHOR
343              
344             Jeff Anderson, E<lt>peawormsworth@gmail.comE<gt>
345              
346             =head1 COPYRIGHT AND LICENSE
347              
348             Copyright (C) 2014 by Jeff Anderson
349              
350             This library is free software; you can redistribute it and/or modify
351             it under the same terms as Perl itself, either Perl version 5.14.2 or,
352             at your option, any later version of Perl 5 you may have available.
353              
354              
355             =cut
356