File Coverage

blib/lib/Net/NATS/Streaming/Client.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Net::NATS::Streaming::Client;
2 1     1   51495 use strict;
  1         2  
  1         26  
3 1     1   4 use warnings;
  1         3  
  1         24  
4 1     1   5 use Scalar::Util 'blessed';
  1         2  
  1         36  
5 1     1   214 use UUID;
  0            
  0            
6             use Net::NATS::Streaming::PB;
7             use base 'Net::NATS::Client';
8             use Class::XSAccessor {
9             accessors => [
10             'clusterID',
11             'clientID',
12             'connect_request',
13             'connect_response',
14             'cluster_discover_subject',
15             'heartbeats_seen',
16             'heartbeat_subscription'
17             ],
18             };
19              
20             our $VERSION = '0.03';
21             our $CLUSTER_ID = 'test-cluster';
22             our $CLUSTER_DISCOVER_SUBJECT = '_STAN.discover';
23             our $ACK_WAIT = 30;
24             our $MAX_INFLIGHT = 1024;
25             our $CONNECT_TIMEOUT = 2;
26             our $MAX_PUB_ACKS_INFLIGHT = 16384;
27             our $WAIT_FOR_OP_TIMEOUT = 120;
28             our $PUB_NOACK = 0;
29              
30             sub uuidgen
31             {
32             UUID::generate(my $uuid);
33             UUID::unparse($uuid, my $string);
34             return $string;
35             }
36              
37             sub connect
38             {
39             my $self = shift;
40             return unless $self->SUPER::connect(@_);
41             return $self->connect_stream;
42             }
43              
44             sub connect_stream
45             {
46             my $self = shift;
47             $self->heartbeats_seen(0);
48             my $connect_request = Net::NATS::Streaming::PB::ConnectRequest->new({
49             clientID => $self->clientID//uuidgen , heartbeatInbox => $self->new_inbox
50             });
51             my $connect_response;
52             $self->cluster_discover_subject($CLUSTER_DISCOVER_SUBJECT) unless defined $self->cluster_discover_subject;
53             $self->clusterID($CLUSTER_ID) unless defined $self->clusterID;
54              
55             $self->request(
56             $self->cluster_discover_subject.'.'.$self->clusterID,
57             $connect_request->pack,
58             sub {
59             $connect_response = Net::NATS::Streaming::PB::ConnectResponse->new(shift->data);
60             }
61             );
62             $self->wait_for_op($CONNECT_TIMEOUT);
63             if(not $connect_response)
64             {
65             return;
66             }
67             $self->heartbeat_subscription($self->subscribe(
68             $connect_request->heartbeatInbox,
69             sub {
70             $self->heartbeats_seen($self->heartbeats_seen + 1);
71             $self->publish(shift->reply_to, "");
72             }
73             ));
74             $self->connect_request($connect_request);
75             $self->connect_response($connect_response);
76             return 1;
77             }
78              
79             sub _handle_subscription_response
80             {
81             my ($self, $subject, $request) = @_;
82             my $subscription_response;
83             $self->request(
84             $subject,
85             $request->pack,
86             sub {
87             $subscription_response = Net::NATS::Streaming::PB::SubscriptionResponse->new(shift->data);
88             }
89             );
90             my $heartbeats_seen = $self->heartbeats_seen;
91             $self->run(sub { return 1 if defined $subscription_response or ($self->heartbeats_seen > $heartbeats_seen + 1) });
92             if(not $subscription_response or $subscription_response->error)
93             {
94             return $subscription_response ? $subscription_response->error : 'did not receive response from server';
95             }
96             return $subscription_response;
97             }
98              
99             sub subscribe_channel
100             {
101             my ($self, $params, $sub, $manual_ack) = @_;
102             my $subscription_request = Net::NATS::Streaming::PB::SubscriptionRequest->new;
103             $subscription_request->copy_from({
104             maxInFlight => $MAX_INFLIGHT,
105             ackWaitInSecs => $ACK_WAIT,
106             startPosition => exists $params->{durableName}
107             ? Net::NATS::Streaming::PB::StartPosition::LastReceived
108             : Net::NATS::Streaming::PB::StartPosition::NewOnly,
109             %{ blessed $params ? $params->to_hashref : $params }
110             });
111             my $inbox = $self->new_inbox();
112             $subscription_request->set_inbox($inbox);
113             $subscription_request->set_clientID($self->connect_request->clientID);
114             my $subscription_response = $self->_handle_subscription_response($self->connect_response->subRequests, $subscription_request);
115             return $subscription_response if not blessed $subscription_response;
116             my $ackInbox = $subscription_response->ackInbox;
117             my $durableName = $subscription_request->durableName;
118             my $subject = $subscription_request->subject;
119             return $self->subscribe($inbox, sub {
120             return Net::NATS::Streaming::PB::UnsubscribeRequest->new({
121             inbox => $ackInbox,
122             durableName => $durableName,
123             subject => $subject,
124             clientID => $self->connect_request->clientID
125             }) unless @_;
126             my $msg = Net::NATS::Streaming::PB::MsgProto->new(shift->data);
127             my $ack = Net::NATS::Streaming::PB::Ack->new({
128             subject => $msg->subject,
129             sequence => $msg->sequence
130             });
131             $sub->($msg, $ackInbox);
132             $self->publish($ackInbox, $ack->pack) if not $manual_ack;
133             });
134             }
135              
136             sub ack_msg
137             {
138             my ($self, $msg, $ackInbox) = @_;
139             my $ack = Net::NATS::Streaming::PB::Ack->new({
140             subject => $msg->subject,
141             sequence => $msg->sequence
142             });
143             $self->publish($ackInbox, $ack->pack);
144             }
145              
146             sub unsubscribe_channel
147             {
148             my ($self, $subscription) = @_;
149             my $unsubscribe_request = $subscription->callback->();
150             $self->unsubscribe($subscription);
151             return $self->_handle_subscription_response($self->connect_response->unsubRequests, $unsubscribe_request);
152             }
153              
154             my %pub_ack_handlers;
155             our %guids_in_flight;
156             my $default_ack_handler = sub {
157             my $PubAck = Net::NATS::Streaming::PB::PubAck->new(shift->data);
158             if($PubAck->error)
159             {
160             $guids_in_flight{ $PubAck->guid } = $PubAck->error;
161             return;
162             }
163             delete $guids_in_flight{ $PubAck->guid };
164             };
165              
166             sub publish_channel
167             {
168             my ($self, $params, $sub) = @_;
169             my $pub_msg = Net::NATS::Streaming::PB::PubMsg->new({
170             (exists $params->{guid} ? () : (guid => uuidgen())),
171             clientID => $self->connect_request->clientID,
172             %{ blessed $params ? $params->to_hashref : $params },
173             });
174             if($PUB_NOACK)
175             {
176             return $self->publish(
177             $self->connect_response->pubPrefix.'.'.$pub_msg->subject,
178             $pub_msg->pack
179             );
180             }
181             if($sub and not exists $pub_ack_handlers{ $sub })
182             {
183             my $inbox = $self->new_inbox();
184             my $subsciption = $self->subscribe($inbox, sub {
185             my $PubAck = Net::NATS::Streaming::PB::PubAck->new(shift->data);
186             $sub->($PubAck);
187             delete $guids_in_flight{ $PubAck->guid } if not $PubAck->error;
188             });
189             $pub_ack_handlers{$sub} = [$subsciption, $inbox];
190             }
191             elsif(not $sub and not exists $pub_ack_handlers{ $default_ack_handler })
192             {
193             my $inbox = $self->new_inbox();
194             my $subsciption = $self->subscribe($inbox, $default_ack_handler);
195             $pub_ack_handlers{$default_ack_handler} = [$subsciption, $inbox];
196             }
197             if(keys %guids_in_flight > $MAX_PUB_ACKS_INFLIGHT)
198             {
199             return "publish_channel error: too many outstanding Acks";
200             }
201             else
202             {
203             $guids_in_flight{ $pub_msg->guid } = 'no ack';
204             }
205             $self->publish(
206             $self->connect_response->pubPrefix.'.'.$pub_msg->subject,
207             $pub_msg->pack,
208             $sub ? $pub_ack_handlers{$sub}[1] : $pub_ack_handlers{$default_ack_handler}[1]
209             );
210             if(not $sub)
211             {
212             my $heartbeats_seen = $self->heartbeats_seen;
213             $self->run(sub {
214             return 1 if not exists $guids_in_flight{ $pub_msg->guid } or $guids_in_flight{ $pub_msg->guid } ne 'no ack';
215             return 1 if $self->heartbeats_seen > $heartbeats_seen + 1;
216             });
217             if(exists $guids_in_flight{ $pub_msg->guid })
218             {
219             return "publish_channel error: can't publish "
220             .$pub_msg->guid
221             .' :'
222             .$guids_in_flight{ $pub_msg->guid };
223             }
224             }
225             return;
226             }
227              
228             sub close_stream
229             {
230             my $self = shift;
231             my $close_response;
232             $self->request(
233             $self->connect_response->closeRequests,
234             Net::NATS::Streaming::PB::CloseRequest->new({
235             clientID => $self->connect_request->clientID
236             })->pack,
237             sub { $close_response = Net::NATS::Streaming::PB::CloseResponse->new(shift->data); }
238             );
239             while($self->wait_for_op($CONNECT_TIMEOUT))
240             {
241             last if defined $close_response;
242             }
243             for my $sub (keys %pub_ack_handlers)
244             {
245             $pub_ack_handlers{$sub}[0]->unsubscribe;
246             delete $pub_ack_handlers{$sub};
247             }
248             $self->heartbeat_subscription->unsubscribe;
249             $self->heartbeat_subscription(undef);
250             return $close_response ? $close_response->error : 'failed to close stream';
251             }
252              
253             sub close
254             {
255             my $self = shift;
256             my $ret = $self->close_stream;
257             $self->SUPER::close();
258             return $ret;
259             }
260              
261             sub run
262             {
263             my ($self, $sub, $timeout) = @_;
264             while($self->wait_for_op($timeout//$WAIT_FOR_OP_TIMEOUT))
265             {
266             last if $sub and $sub->();
267             }
268             }
269              
270             1;
271              
272             __END__