File Coverage

blib/lib/AnyEvent/eris/Client.pm
Criterion Covered Total %
statement 45 100 45.0
branch 6 30 20.0
condition 1 10 10.0
subroutine 12 18 66.6
pod 0 5 0.0
total 64 163 39.2


line stmt bran cond sub pod time code
1             package AnyEvent::eris::Client;
2             # ABSTRACT: eris pub/sub Client
3              
4 12     12   147248 use strict;
  12         23  
  12         313  
5 12     12   52 use warnings;
  12         20  
  12         256  
6 12     12   51 use Carp;
  12         24  
  12         545  
7 12     12   59 use AnyEvent;
  12         19  
  12         181  
8 12     12   625 use AnyEvent::Handle;
  12         16355  
  12         231  
9 12     12   626 use AnyEvent::Socket;
  12         12129  
  12         1045  
10 12     12   76 use List::Util;
  12         62  
  12         578  
11 12     12   64 use Scalar::Util;
  12         20  
  12         439  
12 12     12   5315 use Parse::Syslog::Line 'parse_syslog_line';
  12         195010  
  12         9984  
13              
14             # we recognize these
15             my @PROTOCOL_LINE_PREFIXES = (
16             'Subscribe to :',
17             'Receiving ',
18             'Full feed enabled',
19             'EHLO Streamer',
20             );
21              
22             sub new {
23 3     3 0 8996 my ( $class, %opts ) = @_;
24              
25 3         19 my $self = bless {
26             RemoteAddress => '127.0.0.1',
27             RemotePort => 9514,
28             ReturnType => 'hash',
29             Subscribe => undef,
30             Match => undef,
31             MessageHandler => undef,
32             %opts,
33             }, $class;
34              
35 3 100       13 $opts{'MessageHandler'}
36             or AE::log fatal => 'You must provide a MessageHandler';
37              
38 2 100       9 ref $opts{'MessageHandler'} eq 'CODE'
39             or AE::log fatal => 'You need to specify a subroutine reference to the \'MessageHandler\' parameter.';
40              
41 1         18 $self->_connect;
42              
43 1         12 return $self;
44             }
45              
46             sub _connect {
47 1     1   1 my $self = shift;
48              
49 1         8 my $block = $self->{'ReturnType'} eq 'block';
50 1 50       3 my $separator = $block ? "\n" : '';
51 1         3 my ( $addr, $port ) = @{$self}{qw<RemoteAddress RemotePort>};
  1         3  
52              
53             # FIXME: TODO item for this
54             # in second thought, this should just be removed because
55             # it's meant for internal manual buffering, which we don't need
56 1 50       14 $block
57             and AE::log fatal => 'Block option not supported';
58              
59 1         5 Scalar::Util::weaken( my $inner_self = $self );
60              
61             $self->{'_client'} ||= tcp_connect $addr, $port, sub {
62 0 0   0   0 my ($fh) = @_
63             or AE::log fatal => "Connect failed: $!";
64              
65 0         0 my $hdl; $hdl = AnyEvent::Handle->new(
66             fh => $fh,
67             on_error => sub {
68 0         0 AE::log error => $_[2];
69 0         0 $_[0]->destroy;
70             $inner_self->{'_reconnect_timer'} = AE::timer 10, 0, sub {
71 0         0 undef $inner_self->{'_reconnect_timer'};
72 0         0 $inner_self->_connect;
73 0         0 };
74             },
75              
76 0         0 on_eof => sub { $hdl->destroy; AE::log info => 'Done.' },
  0         0  
77              
78             on_read => sub {
79             $hdl->push_read (line => sub {
80 0         0 my ($hdl, $line) = @_;
81              
82             List::Util::first {
83 0         0 substr( $line, 0, length $_ ) eq $_
84 0 0       0 } @PROTOCOL_LINE_PREFIXES and return;
85              
86 0         0 $inner_self->handle_message( $line, $hdl );
87 0         0 });
88             },
89 0         0 );
90              
91 0         0 $inner_self->{'buffer'} = '';
92              
93             # FIXME: should this really be in a timer?
94             # all the actions relating to a socket are deferred anyway
95             $inner_self->{'_setup_pipe_timer'} = AE::timer 0, 0, sub {
96 0         0 undef $inner_self->{'_setup_pipe_timer'};
97 0         0 $inner_self->setup_pipe($hdl);
98 0         0 };
99 1   33     12 };
100              
101 1         4480 return $self;
102             }
103              
104             sub setup_pipe {
105 0     0 0   my ( $self, $handle ) = @_;
106              
107             # Parse for Subscriptions or Matches
108 0           my %data;
109 0           foreach my $target (qw(Subscribe Match)) {
110 0 0         if ( defined $self->{$target} ) {
111             my @data = ref $self->{$target} eq 'ARRAY'
112 0           ? @{ $self->{$target} }
113 0 0         : $self->{$target};
114              
115 0 0         @data = map lc, @data if $target eq 'Subscribe';
116 0 0         next unless scalar @data > 0;
117 0           $data{$target} = \@data;
118             }
119             }
120              
121             # Check to make sure we're doing something
122 0 0         keys %data
123             or AE::log fatal => 'Must specify a subscription or a match parameters!';
124              
125             # Send the Subscription
126 0           foreach my $target ( sort keys %data ) {
127 0           my $subname = 'do_' . lc $target;
128 0           $self->$subname( $handle, $data{$target} );
129             }
130             }
131              
132             sub do_subscribe {
133 0     0 0   my ( $self, $handle, $subs ) = @_;
134              
135 0 0   0     if ( List::Util::first { $_ eq 'fullfeed' } @{$subs} ) {
  0            
  0            
136 0           $handle->push_write("fullfeed\n");
137             } else {
138             $handle->push_write(
139             'sub ' .
140 0           join( ', ', @{$subs} ) .
  0            
141             "\n"
142             );
143             }
144             }
145              
146             sub do_match {
147 0     0 0   my ( $self, $handle, $matches ) = @_;
148             $handle->push_write(
149             'match ' .
150 0           join( ', ', @{$matches} ) .
  0            
151             "\n"
152             );
153             }
154              
155             sub handle_message {
156 0     0 0   my ( $self, $line, $handle ) = @_;
157              
158 0           my $msg;
159             my $success = eval {
160 12     12   116 no warnings;
  12         28  
  12         1785  
161 0           $msg = parse_syslog_line($line);
162 0           1;
163 0 0         } or do {
164 0   0       my $error = $@ || 'Zombie error';
165 0           AE::log error => "Could not parse line: $line ($error)\n";
166             };
167              
168 0 0 0       $success && $msg or return;
169              
170             # Try the Message Handler, eventually we can do statistics here.
171             eval {
172 0           $self->{'MessageHandler'}->($msg);
173 0           1;
174 0 0         } or do {
175 0   0       my $error = $@ || 'Zombie error';
176 0           AE::log error => "MessageHandler failed: $error";
177             };
178             }
179              
180             1;
181              
182             __END__
183              
184             =pod
185              
186             =head1 DESCRIPTION
187              
188             L<AnyEvent::eris::Client> is an L<AnyEvent> version of
189             L<POE::Component::Client::eris> - a simple pub/sub implementation,
190             written by Brad Lhotsky.
191              
192             Since I don't actually have any use for it right now, it's not
193             actively maintained. Might as well release it. If you're interested in
194             taking over it, just let me know.
195              
196             For now the documentation is sparse but the tests should be clear
197             enough to assist in understanding it.