File Coverage

blib/lib/Net/NATS/Streaming/Client.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package Net::NATS::Streaming::Client;
2 1     1   62241 use strict;
  1         2  
  1         30  
3 1     1   5 use warnings;
  1         1  
  1         42  
4 1     1   6 use Carp 'confess';
  1         2  
  1         56  
5 1     1   5 use Scalar::Util 'blessed';
  1         2  
  1         45  
6 1     1   113 use OSSP::uuid;
  0            
  0            
7             use Net::NATS::Streaming::PB;
8             use base 'Net::NATS::Client';
9             use Class::XSAccessor {
10             accessors => [
11             'cluster_name',
12             'clientID',
13             'connect_request',
14             'connect_response',
15             'cluster_discover_subject'
16             ],
17             };
18              
19             our $VERSION = '0.01';
20             our $DEFAULT_CLUSTER_NAME = 'test-cluster';
21             our $DEFAULT_CLUSTER_DISCOVER_SUBJECT = '_STAN.discover';
22             our $DEFAULT_ACK_WAIT = 30;
23             our $DEFAULT_MAX_INFLIGHT = 1024;
24             our $DEFAULT_CONNECT_TIMEOUT = 2;
25             our $DEFAULT_MAX_PUB_ACKS_INFLIGHT = 16384;
26             sub uuidgen {
27             OSSP::uuid::uuid_create(my $uuid);
28             OSSP::uuid::uuid_make($uuid, OSSP::uuid::UUID_MAKE_V4());
29             OSSP::uuid::uuid_export($uuid, OSSP::uuid::UUID_FMT_STR(), my($str), undef);
30             OSSP::uuid::uuid_destroy($uuid);
31             return $str;
32             }
33              
34             sub connect
35             {
36             my $self = shift;
37             return unless $self->SUPER::connect(@_);
38             my $connect_request = Net::NATS::Streaming::PB::ConnectRequest->new({
39             clientID => $self->clientID//uuidgen , heartbeatInbox => $self->new_inbox
40             });
41             $self->subscribe($connect_request->heartbeatInbox, sub { $self->publish(shift->reply_to, "") });
42             my $connect_response;
43             $self->cluster_discover_subject($DEFAULT_CLUSTER_DISCOVER_SUBJECT) unless defined $self->cluster_discover_subject;
44             $self->cluster_name($DEFAULT_CLUSTER_NAME) unless defined $self->cluster_name;
45              
46             $self->request(
47             $self->cluster_discover_subject.'.'.$self->cluster_name,
48             $connect_request->pack,
49             sub {
50             $connect_response = Net::NATS::Streaming::PB::ConnectResponse->new(shift->data);
51             }
52             );
53             $self->wait_for_op($DEFAULT_CONNECT_TIMEOUT);
54             if(not $connect_response)
55             {
56             confess("Could not connect to streaming NATS server");
57             }
58             $self->connect_request($connect_request);
59             $self->connect_response($connect_response);
60             return 1;
61             }
62              
63             sub subscribe_stream
64             {
65             my ($self, $params, $sub) = @_;
66             my $subscription_request = Net::NATS::Streaming::PB::SubscriptionRequest->new;
67             $subscription_request->copy_from({
68             maxInFlight => $DEFAULT_MAX_INFLIGHT,
69             ackWaitInSecs => $DEFAULT_ACK_WAIT,
70             %{ blessed $params ? $params->to_hashref : $params }
71             });
72             my $inbox = $self->new_inbox();
73             $subscription_request->set_inbox($inbox);
74             $subscription_request->set_clientID($self->connect_request->clientID);
75             my $subscription_response;
76             $self->request(
77             $self->connect_response->subRequests,
78             $subscription_request->pack,
79             sub {
80             $subscription_response = Net::NATS::Streaming::PB::SubscriptionResponse->new(shift->data);
81             }
82             );
83             while($self->wait_for_op)
84             {
85             last if defined $subscription_response;
86             }
87             return $subscription_response->error if $subscription_response->error;
88             my $ackInbox = $subscription_response->ackInbox;
89             my $durableName = $subscription_request->durableName;
90             my $subject = $subscription_request->subject;
91             return $self->subscribe($inbox, sub {
92             return Net::NATS::Streaming::PB::UnsubscribeRequest->new({
93             inbox => $ackInbox,
94             durableName => $durableName,
95             subject => $subject,
96             clientID => $self->connect_request->clientID
97             })->pack unless @_;
98             my $msg = Net::NATS::Streaming::PB::MsgProto->new(shift->data);
99             my $ack = Net::NATS::Streaming::PB::Ack->new({
100             subject => $msg->subject,
101             sequence => $msg->sequence
102             });
103             $sub->($msg);
104             $self->publish($ackInbox, $ack->pack);
105             });
106             }
107              
108             sub unsubscribe_stream
109             {
110             my ($self, $subscription) = @_;
111             $self->publish(
112             $self->connect_response->unsubRequests,
113             $subscription->callback->()
114             );
115             $self->unsubscribe($subscription);
116             }
117              
118             my %pub_ack_handlers;
119             my $default_ack_handler = sub {};
120             sub publish_stream
121             {
122             my ($self, $params, $sub) = @_;
123             $params = blessed $params ? $params->to_hashref : $params;
124             if($sub and not exists $pub_ack_handlers{ $sub })
125             {
126             my $inbox = $self->new_inbox();
127             $self->subscribe($inbox, sub {
128             $sub->(Net::NATS::Streaming::PB::PubAck->new(shift->data));
129             });
130             $pub_ack_handlers{$sub} = $inbox;
131             }
132             elsif(not $sub and not exists $pub_ack_handlers{ $default_ack_handler })
133             {
134             my $inbox = $self->new_inbox();
135             $self->subscribe($inbox, $default_ack_handler);
136             $pub_ack_handlers{$default_ack_handler} = $inbox;
137             }
138             my $pub_msg = Net::NATS::Streaming::PB::PubMsg->new({
139             (exists $params->{guid} ? () : (guid => uuidgen())),
140             reply => $pub_ack_handlers{$sub//$default_ack_handler},
141             clientID => $self->connect_request->clientID,
142             %{ $params },
143             });
144             $self->publish(
145             $self->connect_response->pubPrefix.'.'.$pub_msg->subject,
146             $pub_msg->pack,
147             $sub ? $pub_ack_handlers{$sub} : $pub_ack_handlers{$default_ack_handler}
148             );
149             }
150              
151             sub close_stream
152             {
153             my $self = shift;
154             my $close_response;
155             $self->request(
156             $self->connect_response->closeRequests,
157             Net::NATS::Streaming::PB::CloseRequest->new({
158             clientID => $self->connect_request->clientID
159             })->pack,
160             sub { $close_response = Net::NATS::Streaming::PB::CloseResponse->new(shift->data); }
161             );
162             $self->wait_for_op($DEFAULT_CONNECT_TIMEOUT);
163             return $close_response ? $close_response->error : 'failed to close stream';
164             }
165              
166             1;
167              
168             __END__