File Coverage

blib/lib/Net/STOMP/Client/Wrapper.pm
Criterion Covered Total %
statement 18 94 19.1
branch 0 50 0.0
condition 0 7 0.0
subroutine 6 25 24.0
pod 19 19 100.0
total 43 195 22.0


line stmt bran cond sub pod time code
1             package Net::STOMP::Client::Wrapper;
2 1     1   109344 use strict;
  1         2  
  1         36  
3 1     1   42 use warnings;
  1         3  
  1         58  
4 1     1   17 use base qw{Package::New};
  1         8  
  1         645  
5 1     1   1139 use Net::STOMP::Client;
  1         112680  
  1         38  
6 1     1   672 use Net::RabbitMQ::Management::API;
  1         82739  
  1         36  
7 1     1   424 use URL::Encode qw{url_encode};
  1         3939  
  1         914  
8              
9             our $VERSION = '0.03';
10              
11             =head1 NAME
12              
13             Net::STOMP::Client::Wrapper - Stomp Client and RabbitMQ Management API wrapper
14              
15             =head1 SYNOPSIS
16              
17             Producer
18              
19             use Net::STOMP::Client::Wrapper;
20             my $wrapper = Net::STOMP::Client::Wrapper->new(queue_name=>"my_queue"); #ISA Net::STOMP::Client::Wrapper
21             my $stomp = $wrapper->stomp_connect; #ISA Net::STOMP::Client connected
22             $wrapper->send(body=>"my_payload");
23              
24             Consumer
25              
26             use Net::STOMP::Client::Wrapper;
27             my $wrapper = Net::STOMP::Client::Wrapper->new(queue_name=>"my_queue"); #ISA Net::STOMP::Client::Wrapper
28             my $stomp = $wrapper->stomp_connect_subscribe; #ISA Net::STOMP::Client subscribed to queue
29             $stomp->wait_for_frames(callback => \&queue_callback);
30              
31             Monitor
32              
33             use Net::STOMP::Client::Wrapper;
34             my $wrapper = Net::STOMP::Client::Wrapper->new(queue_name=>"my_queue"); #ISA Net::STOMP::Client::Wrapper
35             my $result = $wrapper->management_api_get_queue; #ISA Net::RabbitMQ::Management::API::Result
36             my $content = $result->content; #ISA HASH
37             my $consumers = $content->{'consumers'} || 0;
38             my $messages = $content->{'messages'} || 0;
39             printf "Consumers: %s, Messages: %s\n", $consumers, $messages;
40              
41             Super Class
42              
43             package My::Wrapper;
44             use base qw{Net::STOMP::Client::Wrapper};
45             sub host {"my_host"};
46             sub queue_name {"my_queue"};
47              
48             =head1 DESCRIPTION
49              
50             Net::STOMP::Client::Wrapper is a wrapper of L and L with sane defaults.
51              
52             This package is a wrapper for my typical use case which is a single RabbitMQ server with the Stomp and Management API plugins enabled and a single queue_name.
53              
54             sudo yum install rabbitmq-server
55             sudo /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp
56             sudo /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
57             sudo systemctl enable rabbitmq-server
58             sudo systemctl start rabbitmq-server
59              
60             =head1 Properties
61              
62             =head2 host
63              
64             Default: 127.0.0.1
65              
66             =cut
67              
68             sub host {
69 0     0 1   my $self = shift;
70 0 0         $self->{'host'} = shift if @_;
71 0 0         $self->{'host'} = '127.0.0.1' unless $self->{'host'};
72 0           return $self->{'host'};
73             }
74              
75             =head2 port
76              
77             Default: 61613
78              
79             =cut
80              
81             sub port {
82 0     0 1   my $self = shift;
83 0 0         $self->{'port'} = shift if @_;
84 0 0         $self->{'port'} = '61613' unless $self->{'port'};
85 0           return $self->{'port'};
86             }
87              
88             =head2 login
89              
90             Default: guest
91              
92             =cut
93              
94             sub login {
95 0     0 1   my $self = shift;
96 0 0         $self->{'login'} = shift if @_;
97 0 0         $self->{'login'} = 'guest' unless $self->{'login'};
98 0           return $self->{'login'};
99             }
100              
101             =head2 passcode
102              
103             Default: guest
104              
105             =cut
106              
107             sub passcode {
108 0     0 1   my $self = shift;
109 0 0         $self->{'passcode'} = shift if @_;
110 0 0         $self->{'passcode'} = 'guest' unless $self->{'passcode'};
111 0           return $self->{'passcode'};
112             }
113              
114             =head2 vhost, vhost_url_encoded
115              
116             Default: /
117              
118             =cut
119              
120             sub vhost {
121 0     0 1   my $self = shift;
122 0 0         $self->{'vhost'} = shift if @_;
123 0 0         $self->{'vhost'} = '/' unless $self->{'vhost'};
124 0           return $self->{'vhost'};
125             }
126              
127 0     0 1   sub vhost_url_encoded {url_encode(shift->vhost)};
128              
129             =head2 queue_name, destination
130              
131             Returns the short queue_name or the formatted destination.
132              
133             $wrapper->queue_name("my_queue")
134             my $queue_name = $wrapper->queue_name;
135             my $destination = $wrapper->destination; #ISA string formatted as "/queue/{queue_name}"
136              
137             Default: ''
138              
139             =cut
140              
141             sub queue_name {
142 0     0 1   my $self = shift;
143 0 0         $self->{'queue_name'} = shift if @_;
144 0 0         $self->{'queue_name'} = '' unless defined $self->{'queue_name'};
145 0           return $self->{'queue_name'};
146             }
147              
148 0     0 1   sub destination {join('/', '/queue', shift->queue_name)};
149              
150             =head2 subscribe_id
151              
152             Default: {uuid}
153              
154             =cut
155              
156             sub subscribe_id {
157 0     0 1   my $self = shift;
158 0 0         $self->{'subscribe_id'} = shift if @_;
159 0 0         $self->{'subscribe_id'} = $self->stomp->uuid unless $self->{'subscribe_id'};
160 0           return $self->{'subscribe_id'};
161             }
162              
163             =head2 subscribe_ack
164              
165             Default: client
166              
167             =cut
168              
169             sub subscribe_ack {
170 0     0 1   my $self = shift;
171 0 0         $self->{'subscribe_ack'} = shift if @_;
172 0 0         $self->{'subscribe_ack'} = 'client' unless $self->{'subscribe_ack'};
173 0           return $self->{'subscribe_ack'};
174             }
175              
176             =head2 subscribe_prefetch_count
177              
178             Default: 1
179              
180             =cut
181              
182             sub subscribe_prefetch_count {
183 0     0 1   my $self = shift;
184 0 0         $self->{'subscribe_prefetch_count'} = shift if @_;
185 0 0         $self->{'subscribe_prefetch_count'} = 1 unless $self->{'subscribe_prefetch_count'};
186 0           return $self->{'subscribe_prefetch_count'};
187             }
188              
189             our $MANAGEMENT_API_PROTOCOL = 'http';
190             our $MANAGEMENT_API_PORT = '15672';
191             our $MANAGEMENT_API_PATH = '/api';
192              
193             =head2 management_api_url
194              
195             Default: http://{host}:15672/api
196              
197             =cut
198              
199             sub management_api_url {
200 0     0 1   my $self = shift;
201 0 0         $self->{'management_api_url'} = shift if @_;
202 0 0         $self->{'management_api_url'} = sprintf('%s://%s:%s%s', $MANAGEMENT_API_PROTOCOL, $self->host, $MANAGEMENT_API_PORT, $MANAGEMENT_API_PATH) unless $self->{'management_api_url'};
203 0           return $self->{'management_api_url'};
204             }
205              
206             =head1 Methods
207              
208             =head2 send
209              
210             Wrapper around `stomp->send` with default destination
211              
212             $wrapper->send(body=>"my_string"); #destination is defaulted to $wrapper->destination;
213             $wrapper->send(destination=>"/queue/another_queue", body=>"my_string");
214              
215             Note: stomp must be connected before calling send.
216              
217             =cut
218              
219             sub send {
220 0     0 1   my $self = shift;
221 0           my %data = @_;
222 0   0       $data{'destination'} ||= $self->destination;
223 0           return $self->stomp->send(%data);
224             }
225              
226             =head2 management_api_get_queue
227              
228             Returns a L object
229              
230             =cut
231              
232             sub management_api_get_queue {
233 0     0 1   my $self = shift;
234 0           return $self->management_api->get_queue(name => $self->queue_name, vhost => $self->vhost_url_encoded); #ISA Net::RabbitMQ::Management::API::Result
235             }
236              
237             =head1 Object Accessors
238              
239             =head2 stomp_connect_subscribe
240              
241             Returns a L object connection and subscribed to the configured queue
242              
243             my $stomp = $wrapper->stomp_connect_subscribe;
244              
245             Limitations: Only Call once!
246              
247             =cut
248              
249             sub stomp_connect_subscribe {
250 0     0 1   my $self = shift;
251 0           my $stomp = $self->stomp_connect;
252 0 0         die("Error: queue_name required") unless $self->queue_name;
253 0           my %data = (
254             'destination' => $self->destination,
255             'id' => $self->subscribe_id,
256             'ack' => $self->subscribe_ack,
257             'prefetch-count' => $self->subscribe_prefetch_count,
258             );
259 0           $stomp->subscribe(%data);
260 0   0       my $subscriptions = $self->{'__subscriptions'} ||= []; #cache for stomp_disconnect
261 0           push @$subscriptions, \%data;
262 0           return $stomp;
263             }
264              
265             =head2 stomp_connect
266              
267             Returns a connected L object.
268              
269             my $stomp = $wrapper->stomp_connect;
270              
271             Limitations: Only Call once!
272              
273             =cut
274              
275             sub stomp_connect {
276 0     0 1   my $self = shift;
277 0           my $stomp = $self->stomp;
278 0           $stomp->connect(login => $self->login, passcode => $self->passcode, host => $self->vhost);
279 0           return $stomp;
280             }
281              
282             =head2 stomp_disconnect
283              
284             Unsubscribes to any subscriptions and disconnects stomp client.
285              
286             =cut
287              
288             sub stomp_disconnect {
289 0     0 1   my $self = shift;
290 0           my $stomp = $self->stomp;
291 0   0       my $subscriptions = $self->{'__subscriptions'} || [];
292 0           while (@$subscriptions) {
293 0           my $subscription = pop @$subscriptions;
294 0           my $id = $subscription->{'id'};
295 0           $stomp->unsubscribe(id => $id);
296             }
297 0           return $stomp->disconnect;
298             }
299              
300             =head2 stomp
301              
302             Returns the cached L object
303              
304             =cut
305              
306             sub stomp {
307 0     0 1   my $self = shift;
308 0 0         $self->{'stomp'} = shift if @_;
309 0 0         $self->{'stomp'} = Net::STOMP::Client->new(host => $self->host, port => $self->port) unless $self->{'stomp'};
310 0           return $self->{'stomp'};
311             }
312              
313             =head2 management_api
314              
315             Returns a L object
316              
317             =cut
318              
319             sub management_api {
320 0     0 1   my $self = shift;
321 0 0         $self->{'management_api'} = shift if @_;
322 0 0         $self->{'management_api'} = Net::RabbitMQ::Management::API->new(url => $self->management_api_url) unless $self->{'management_api'};
323 0           return $self->{'management_api'};
324             }
325              
326             =head1 SEE ALSO
327              
328             L, L
329              
330             =head1 AUTHOR
331              
332             Michael R. Davis
333              
334             =head1 COPYRIGHT AND LICENSE
335              
336             Copyright (C) 2025 by Michael Davis
337              
338             LICENSE: MIT
339              
340             =cut
341              
342             1;