File Coverage

lib/Finance/Alpaca/DataStream.pm
Criterion Covered Total %
statement 57 72 79.1
branch 11 18 61.1
condition n/a
subroutine 13 15 86.6
pod 2 3 66.6
total 83 108 76.8


line stmt bran cond sub pod time code
1             package Finance::Alpaca::DataStream 0.9904 {
2 17     17   108 use strictures 2;
  17         137  
  17         669  
3 17     17   3509 use Moo;
  17         32  
  17         120  
4 17     17   5443 use feature 'signatures';
  17         39  
  17         1782  
5 17     17   101 no warnings 'experimental::signatures';
  17         37  
  17         871  
6 17     17   98 use Types::Standard qw[ArrayRef CodeRef Dict Enum InstanceOf Str];
  17         45  
  17         170  
7 17     17   20684 use Mojo::Promise;
  17         35  
  17         289  
8             #
9 17     17   521 use lib './lib/';
  17         29  
  17         88  
10 17     17   8029 use Finance::Alpaca::Struct::Bar qw[to_Bar Bar];
  17         50  
  17         207  
11 17     17   18263 use Finance::Alpaca::Struct::Trade qw[to_Trade Trade];
  17         55  
  17         227  
12 17     17   16512 use Finance::Alpaca::Struct::Quote qw[to_Quote Quote];
  17         47  
  17         170  
13             #
14             has source => ( is => 'ro', isa => Enum [ 'iex', 'sip' ], required => 1, default => 'iex' );
15             has tx => ( is => 'rwp', isa => InstanceOf ['Mojo::Transaction::WebSocket'], predicate => 1 );
16             has cb => ( is => 'ro', isa => CodeRef, required => 1 );
17             has subscriptions => (
18             is => 'rwp',
19             isa => Dict [
20             bars => ArrayRef [Str], dailyBars => ArrayRef [Str], quotes => ArrayRef [Str],
21             trades => ArrayRef [Str]
22             ],
23             default => sub { { bars => [], quotes => [], trades => [] } },
24             lazy => 1
25             );
26              
27 1     1 0 14 sub authorize ( $s, $ua, $keys ) {
  1         2  
  1         2  
  1         2  
  1         1  
28 1         3 $ua->websocket_p( 'wss://stream.data.alpaca.markets/v2/'
29             . $s->source => { 'Sec-WebSocket-Extensions' => 'permessage-deflate' } )->then(
30 1     1   2 sub ($tx) {
  1         353743  
31 1         8 my $promise = Mojo::Promise->new;
32 1         76 $s->_set_tx($tx);
33              
34             #$tx->on( finish => sub { $promise->resolve } );
35             # my $promise = Mojo::Promise->new;
36             #$tx->on( finish => sub { $promise->resolve } );
37             $tx->on(
38 4         9 json => sub ( $tx, $msgs ) {
39 4         10 for my $msg (@$msgs) {
40              
41 4 100       36 if ( $msg->{T} eq 'success' ) {
    50          
    50          
    100          
    50          
    50          
42 2 100       10 if ( $msg->{msg} eq 'connected' ) { # Send auth
    50          
43 1         12 $tx->send(
44             {
45             json => {
46             action => 'auth',
47             key => $keys->[0],
48             secret => $keys->[1]
49             }
50             }
51             );
52             }
53             elsif ( $msg->{msg} eq 'authenticated' ) {
54 1         9 $promise->resolve;
55             }
56             }
57             elsif ( $msg->{T} eq 'error' ) {
58 0         0 $s->cb->($msg);
59 0 0       0 if ( $msg->{code} eq 406 ) { # Already connected; ignore
60              
61             # Send auth
62 0         0 $tx->send(
63             {
64             json => {
65             action => 'auth',
66             key => $s->keys->[0],
67             secret => $s->keys->[1]
68             }
69             }
70             );
71             }
72             }
73             elsif ( $msg->{T} eq 't' ) {
74 0         0 $s->cb->( to_Trade($msg) );
75             }
76             elsif ( $msg->{T} eq 'q' ) {
77 1         8 $s->cb->( to_Quote($msg) );
78             }
79             elsif ( $msg->{T} eq 'b' ) {
80 0         0 $s->cb->( to_Bar($msg) );
81             }
82             elsif ( $msg->{T} eq 'subscription' ) {
83 1         3 delete $msg->{T};
84 1         41 $s->_set_subscriptions($msg);
85             }
86             else {
87             #warn 'unknown data';
88             #...;
89 0         0 $s->cb->($msg);
90             }
91             }
92              
93             #$tx->finish;
94             }
95 1         49 );
96 1         7 return $promise;
97             }
98 0         0 )->catch(
99 0     0   0 sub ($err) {
  0         0  
100 0         0 warn "WebSocket error: $err";
101             }
102 1         10 );
103             }
104              
105 1     1 1 33 sub subscribe ( $s, %params ) {
  1         2  
  1         4  
  1         2  
106 1         14 $s->tx->send( { json => { action => 'subscribe', %params } } );
107             }
108              
109 0     0 1   sub unsubscribe ( $s, %params ) {
  0            
  0            
  0            
110 0           $s->tx->send( { json => { action => 'unsubscribe', %params } } );
111             }
112             }
113             1;
114             __END__