File Coverage

blib/lib/Net/Async/Statsd/Server.pm
Criterion Covered Total %
statement 53 56 94.6
branch 5 8 62.5
condition 6 12 50.0
subroutine 17 19 89.4
pod 10 10 100.0
total 91 105 86.6


line stmt bran cond sub pod time code
1             package Net::Async::Statsd::Server;
2             $Net::Async::Statsd::Server::VERSION = '0.005';
3 2     2   55669 use strict;
  2         3  
  2         45  
4 2     2   7 use warnings;
  2         1  
  2         43  
5              
6 2     2   364 use parent qw(IO::Async::Notifier);
  2         213  
  2         8  
7              
8             =head1 NAME
9              
10             Net::Async::Statsd::Server - asynchronous server for Etsy's statsd protocol
11              
12             =head1 VERSION
13              
14             version 0.004
15              
16             =head1 SYNOPSIS
17              
18             use Future;
19             use IO::Async::Loop;
20             use Net::Async::Statsd::Server;
21             my $loop = IO::Async::Loop->new;
22             $loop->add(my $statsd = Net::Async::Statsd::Server->new(
23             port => 3001,
24             ));
25             $statsd->bus->subscribe_to_event(
26             count => sub {
27             my ($ev, $k, $delta, $type) = @_;
28             }
29             );
30              
31             =head1 DESCRIPTION
32              
33             Provides an asynchronous server for the statsd API.
34              
35             =cut
36              
37 2     2   11485 use curry;
  2         300  
  2         48  
38 2     2   10 use Socket qw(SOCK_DGRAM);
  2         2  
  2         100  
39 2     2   15 use IO::Socket::IP;
  2         2  
  2         15  
40 2     2   1774 use IO::Async::Socket;
  2         9497  
  2         62  
41              
42 2     2   740 use Net::Async::Statsd::Bus;
  2         4  
  2         1027  
43              
44             =head1 METHODS
45              
46             All public methods return a L indicating when the write has completed.
47             Since writes are UDP packets, there is no guarantee that the remote will
48             receive the value, so this is mostly intended as a way to detect when
49             statsd writes are slow.
50              
51             =cut
52              
53             =head2 host
54              
55             Which host to listen on. Probably want '0.0.0.0' (set via L)
56             here if you want to listen on all addresses.
57              
58             =cut
59              
60 0     0 1 0 sub host { shift->{host} }
61              
62             =head2 port
63              
64             The UDP port we'll accept traffic on. Use L to set it.
65              
66             =cut
67              
68 8     8 1 4637 sub port { shift->{port} }
69              
70             =head2 configure
71              
72             Used for setting values.
73              
74             =cut
75              
76             sub configure {
77 2     2 1 6892 my ($self, %args) = @_;
78 2         8 for (qw(port host)) {
79 4 100       21 $self->{$_} = delete $args{$_} if exists $args{$_};
80             }
81 2         14 $self->SUPER::configure(%args);
82             }
83              
84             =head2 listening
85              
86             Resolves with the port number when the UDP server is listening.
87              
88             =cut
89              
90             sub listening {
91 2     2 1 1868 my ($self) = @_;
92 2   33     15 $self->{listening} ||= do {
93 2         6 $self->listen
94             }
95             }
96              
97             =head2 listen
98              
99             Establishes the underlying UDP socket.
100              
101             =cut
102              
103             sub listen {
104 2     2 1 3 my ($self) = @_;
105              
106 2         7 my $f = $self->loop->new_future;
107 2 50 50     1614 my $sock = IO::Socket::IP->new(
108             Proto => 'udp',
109             ReuseAddr => 1,
110             Type => SOCK_DGRAM,
111             LocalPort => $self->port // 0,
112             Listen => $self->listen_backlog,
113             Blocking => 0,
114             ) or die "No bind: $@\n";
115 2         1311 $self->{port} = $sock->sockport;
116 2         109 my $ias = IO::Async::Socket->new(
117             handle => $sock,
118             on_recv => $self->curry::on_recv,
119             on_recv_error => $self->curry::on_recv_error,
120             );
121 2         373 $self->add_child($ias);
122 2         308 $f->done($self->port);
123             }
124              
125             =head2 bus
126              
127             Returns the L instance for this server.
128              
129             This object exists purely for the purpose of dispatching events.
130              
131             =cut
132              
133 12   66 12 1 847 sub bus { shift->{bus} ||= Net::Async::Statsd::Bus->new }
134              
135             =head2 listen_backlog
136              
137             Default listen backlog. Immutable, set to 4096 for no particular reason.
138              
139             =cut
140              
141 2     2 1 17 sub listen_backlog { 4096 }
142              
143             {
144             my %type = (
145             ms => 'timing',
146             c => 'count',
147             g => 'gauge',
148             );
149              
150             =head2 type_for_char
151              
152             Badly-named lookup method - returns the type matching the given characters.
153              
154             =cut
155              
156             sub type_for_char {
157 6     6 1 9 my ($self, $char) = @_;
158 6 50       14 die "no character?" unless defined $char;
159 6         20 return $type{$char};
160             }
161             }
162              
163             =head2 on_recv
164              
165             Called if we receive data.
166              
167             =cut
168              
169             sub on_recv {
170 6     6 1 2124 my ($self, undef, $dgram, $addr) = @_;
171             $self->loop->resolver->getnameinfo(
172             addr => $addr,
173             numeric => 1,
174             dgram => 1,
175             )->on_done(sub {
176 6     6   25415 my ($host, $port) = @_;
177 6         29 $self->debug_printf("UDP packet received from %s", join ':', $host, $port);
178 6 50       54 my ($k, $v, $type_char, $rate) = $dgram =~ /^([^:]+):([^|]+)\|([^|]+)(?:\|\@(.+))?/ or warn "Invalid dgram: $dgram";
179 6   50     24 $rate ||= 1;
180 6   50     13 my $type = $self->type_for_char($type_char) // 'unknown';
181 6         15 $self->bus->invoke_event(
182             $type => ($k, $v, $rate, $host, $port)
183             );
184 6         5147 $self->debug_printf(
185             "dgram %s from %s: %s => %s (%s)",
186             $dgram,
187             join(':', $host, $port),
188             $k,
189             $v,
190             $type
191             );
192 6         17 });
193             }
194              
195             =head2 on_recv_error
196              
197             Called if we had an error while receiving.
198              
199             =cut
200              
201             sub on_recv_error {
202 0     0 1   my ($self, undef, $err) = @_;
203 0           $self->debug_printf("UDP packet receive error: %s", $err);
204             }
205              
206             1;
207              
208             __END__