File Coverage

blib/lib/AnyEvent/eris/Server.pm
Criterion Covered Total %
statement 201 297 67.6
branch 29 70 41.4
condition 4 23 17.3
subroutine 34 41 82.9
pod 0 25 0.0
total 268 456 58.7


line stmt bran cond sub pod time code
1             package AnyEvent::eris::Server;
2             # ABSTRACT: eris pub/sub Server
3              
4 12     12   2319787 use strict;
  12         51  
  12         303  
5 12     12   53 use warnings;
  12         20  
  12         315  
6 12     12   58 use Scalar::Util;
  12         19  
  12         420  
7 12     12   4458 use Sys::Hostname;
  12         10308  
  12         543  
8 12     12   648 use AnyEvent::Handle;
  12         21750  
  12         391  
9 12     12   553 use AnyEvent::Socket;
  12         12894  
  12         1124  
10 12     12   4478 use AnyEvent::Graphite;
  12         5720  
  12         41052  
11              
12             my @_STREAM_NAMES = qw(subscription match debug full regex);
13             my %_STREAM_ASSISTERS = (
14             subscription => 'programs',
15             match => 'words',
16             );
17              
18             # Precompiled Regular Expressions
19             my %_PRE = (
20             program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
21             );
22              
23             sub _server_error {
24 0     0   0 my ( $self, $err_str, $fatal ) = @_;
25 0         0 my $err_num = $!+0;
26 0         0 AE::log debug => "SERVER ERROR: $err_num, $err_str";
27              
28 0 0       0 $fatal and $self->{'_cv'}->send;
29             }
30              
31             my %client_commands = (
32             fullfeed => qr{^fullfeed},
33             nofullfeed => qr{^nofull(feed)?},
34             subscribe => qr{^sub(?:scribe)?\s(.*)},
35             unsubscribe => qr{^unsub(?:scribe)?\s(.*)},
36             match => qr{^match (.*)},
37             nomatch => qr{^nomatch (.*)},
38             debug => qr{^debug},
39             nobug => qr{^no(de)?bug},
40             regex => qr{^re(?:gex)?\s(.*)},
41             noregex => qr{^nore(gex)?},
42             status => qr{^status},
43             dump => qr{^dump\s(\S+)},
44             quit => qr{(exit|q(uit)?)},
45             );
46              
47             sub handle_subscribe {
48 2     2 0 7 my ( $self, $handle, $SID, $args ) = @_;
49              
50 2         19 $self->remove_stream( $SID, 'full' );
51              
52 2         20 my @programs = map lc, split /[\s,]+/, $args;
53 2         6 foreach my $program (@programs) {
54 10         17 $self->clients->{$SID}{'subscription'}{$program} = 1;
55              
56             # number of registered programs
57 10         20 $self->{'programs'}{$program}++;
58             }
59              
60             $handle->push_write(
61 2         14 'Subscribed to : ' .
62             join( ',', @programs ) .
63             "\n"
64             );
65             }
66              
67             sub handle_unsubscribe {
68 1     1 0 3 my ( $self, $handle, $SID, $args ) = @_;
69              
70 1         10 my @programs = map lc, split /[\s,]+/, $args;
71 1         4 foreach my $program (@programs) {
72 5         9 delete $self->clients->{$SID}{'subscription'}{$program};
73              
74             --$self->{'programs'}{$program} <= 0
75 5 50       12 and delete $self->{'programs'}{$program};
76             }
77              
78 1         3 delete $self->clients->{$SID}{'subscription'};
79              
80 1         7 $handle->push_write(
81             'Subscription removed for : ' .
82             join( ',', @programs ) .
83             "\n"
84             );
85             }
86              
87             sub handle_fullfeed {
88 3     3 0 10 my ( $self, $handle, $SID ) = @_;
89              
90 3         10 $self->remove_all_streams($SID);
91              
92 3         9 $self->clients->{$SID}{'full'} = 1;
93              
94 3         11 $handle->push_write(
95             "Full feed enabled, all other functions disabled.\n"
96             );
97             }
98              
99             sub handle_nofullfeed {
100 1     1 0 3 my ( $self, $handle, $SID ) = @_;
101              
102 1         3 $self->remove_all_streams($SID);
103              
104             # XXX: Not in original implementation
105 1         2 delete $self->clients->{$SID}{'full'};
106              
107 1         4 $handle->push_write("Full feed disabled.\n");
108             }
109              
110             sub handle_match {
111 1     1 0 3 my ( $self, $handle, $SID, $args ) = @_;
112              
113 1         3 $self->remove_stream( $SID, 'full' );
114              
115 1         11 my @words = map lc, split /[\s,]+/, $args;
116 1         4 foreach my $word (@words) {
117 2         6 $self->{'words'}{$word}++;
118 2         3 $self->clients->{$SID}{'match'}{$word} = 1;
119             }
120              
121             $handle->push_write(
122 1         11 'Receiving messages matching : ' .
123             join( ', ', @words ) .
124             "\n"
125             );
126             }
127              
128             sub handle_nomatch {
129 1     1 0 4 my ( $self, $handle, $SID, $args ) = @_;
130              
131 1         8 my @words = map lc, split /[\s,]+/, $args;
132 1         3 foreach my $word (@words) {
133 2         6 delete $self->clients->{$SID}{'match'}{$word};
134              
135             # Remove the word from searching if this was the last client
136             --$self->{'words'}{$word} <= 0
137 2 50       6 and delete $self->{'words'}{$word};
138             }
139              
140             $handle->push_write(
141 1         7 'No longer receiving messages matching : ' .
142             join( ', ', @words ) .
143             "\n"
144             );
145             }
146              
147             sub handle_debug {
148 1     1 0 3 my ( $self, $handle, $SID ) = @_;
149              
150 1         4 $self->remove_stream( $SID, 'full' );
151              
152 1         3 $self->clients->{$SID}{'debug'} = 1;
153 1         6 $handle->push_write("Debugging enabled.\n");
154             }
155              
156             sub handle_nobug {
157 1     1 0 4 my ( $self, $handle, $SID ) = @_;
158              
159 1         8 $self->remove_stream( $SID, 'debug' );
160 1         2 delete $self->clients->{$SID}{'debug'};
161 1         4 $handle->push_write("Debugging disabled.\n");
162             }
163              
164             sub handle_regex {
165 1     1 0 3 my ( $self, $handle, $SID, $args ) = @_;
166              
167             # do not handle a regex if it's already full subscription
168 1 50       2 $self->clients->{$SID}{'full'}
169             and return;
170              
171 1         2 my $regex;
172             eval {
173 1 50 33     12 defined $args && length $args
174             and $regex = qr{$args};
175 1         3 1;
176 1 50       2 } or do {
177 0   0     0 my $error = $@ || 'Zombie error';
178              
179 0         0 $handle->push_write(
180             "Invalid regular expression '$args', see: perldoc perlre\n"
181             );
182              
183 0         0 return;
184             };
185              
186 1         3 $self->clients->{$SID}{'regex'}{$regex} = 1;
187 1         5 $handle->push_write(
188             "Receiving messages matching regex : $args\n"
189             );
190             }
191              
192             sub handle_noregex {
193 1     1 0 2 my ( $self, $handle, $SID ) = @_;
194              
195 1         4 $self->remove_stream( $SID, 'regex' );
196 1         3 delete $self->clients->{$SID}{'regex'};
197 1         4 $handle->push_write("No longer receiving regex-based matches\n");
198             }
199              
200             sub handle_status {
201 1     1 0 3 my ( $self, $handle, $SID ) = @_;
202 1         3 my $clients = $self->clients;
203 1         2 my $client_count = scalar keys %{$clients};
  1         3  
204              
205 1         3 my @details = ();
206 1         3 foreach my $stream (@_STREAM_NAMES) {
207             # add streams from all SIDs
208 5         6 my $stream_count = 0;
209 5         6 my $assist_count = 0;
210 5         6 foreach my $SID ( keys %{$clients} ) {
  5         9  
211 5 100       11 $clients->{$SID}{$stream}
212             and $stream_count++;
213              
214 5         5 my $assist; $assist = $_STREAM_ASSISTERS{$stream}
215 5 100       11 and $assist_count += scalar keys %{ $self->{$assist} || {} };
  2 100       10  
216             }
217              
218 5 100       10 $stream_count == 0
219             and next;
220              
221 1         3 my $single_details = "$stream=$stream_count";
222 1 50       4 $assist_count and $single_details .= ":$assist_count";
223              
224 1         3 push @details, $single_details;
225             }
226              
227 1         3 my $details = join ', ', @details;
228 1         5 $handle->push_write("STATUS[0]: $client_count connections: $details\n");
229             }
230              
231             sub handle_dump {
232 1     1 0 3 my ( $self, $handle, $SID, $type ) = @_;
233 1         3 my $clients = $self->clients;
234              
235             my %dispatch = (
236             assisters => sub {
237 0     0   0 my @details = ();
238 0         0 foreach my $asst ( values %_STREAM_ASSISTERS ) {
239 0 0       0 $self->{$asst} or next;
240 0         0 my @SIDs = grep $clients->{$_}{$asst}, keys %{$clients};
  0         0  
241             push @details,
242 0         0 "$asst -> " . join ',', keys %{ $self->{$asst} };
  0         0  
243             }
244              
245 0         0 return @details;
246             },
247              
248             stats => sub {
249             my @details = map +(
250             "$_ -> $self->{'stats'}{$_}"
251 0     0   0 ), keys %{ $self->{'stats'} };
  0         0  
252              
253 0         0 return @details;
254             },
255              
256             streams => sub {
257 1     1   3 my @details = ();
258              
259 1         2 foreach my $stream (@_STREAM_NAMES) {
260 5         5 my @SIDs;
261 5         6 foreach my $SID ( keys %{$clients} ) {
  5         9  
262 5 100       11 $clients->{$SID}{$stream}
263             or next;
264              
265 1         2 my $stream_data = $clients->{$SID}{$stream};
266             push @SIDs, ref $stream_data eq 'HASH'
267 1 50       5 ? "$SID:" . join ',', keys %{$stream_data}
  0         0  
268             : $SID;
269             }
270              
271 5         13 push @details, "$stream -> " . join '; ', @SIDs;
272             }
273              
274 1         4 return @details;
275             },
276 1         12 );
277              
278 1 50       5 if ( my $cb = $dispatch{$type} ) {
279 1         2 my @msgs = $cb->();
280 1         4 my $msgs = join( "\n", @msgs ) . "\n";
281 1         7 $handle->push_write($msgs);
282             } else {
283 0         0 $handle->push_write("DUMP[-1]: No comprende.\n");
284             }
285             }
286              
287             sub handle_quit {
288 0     0 0 0 my ( $self, $handle, $SID ) = @_;
289 0         0 $handle->push_write('Terminating connection on your request.');
290 0         0 $self->hangup_client($SID);
291 0         0 $self->{'_cv'}->send;
292             }
293              
294             sub hangup_client {
295 0     0 0 0 my ( $self, $SID ) = @_;
296 0         0 delete $self->clients->{$SID};
297 0         0 AE::log debug => "Client Termination Posted: $SID";
298             }
299              
300             sub remove_stream {
301 26     26 0 53 my ( $self, $SID, $stream ) = @_;
302 26         101 AE::log debug => "Removing '$stream' for $SID";
303              
304 26         35021 my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};
305              
306             # FIXME:
307             # I *think* what this is supposed to do is delete assists
308             # that were registered for this client, which it doesn't
309             # - it deletes global assists instead - this needs to be
310             # looked into
311 26 50       75 if ($client_streams) {
312 0 0       0 if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {
313 0         0 foreach my $key ( keys %{$client_streams} ) {
  0         0  
314             --$self->{'assists'}{$assist}{$key} <= 0
315 0 0       0 and delete $self->{'assists'}{$assist}{$key}
316             }
317             }
318             }
319             }
320              
321             sub remove_all_streams {
322 4     4 0 7 my ( $self, $SID ) = @_;
323 4         12 foreach my $stream (@_STREAM_NAMES) {
324 20         39 $self->remove_stream( $SID, $stream );
325             }
326             }
327              
328             sub new {
329 12     12 0 61734 my $class = shift;
330 12         66 my $hostname = ( split '.', hostname )[0];
331 12         349 my $self = bless {
332             ListenAddress => '127.0.0.1', # "localhost" doesn't work :/
333             ListenPort => 9514,
334             GraphitePort => 2003,
335             GraphitePrefix => 'eris.dispatcher',
336             hostname => $hostname,
337              
338             @_,
339              
340             clients => {},
341             buffers => {},
342             }, $class;
343              
344 12         31 my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
  12         88  
345 12         73 Scalar::Util::weaken( my $inner_self = $self );
346              
347             $self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
348 10 50   10   2755 my ($fh) = @_
349             or return $inner_self->_server_error($!);
350              
351 10         19 my $handle; $handle = AnyEvent::Handle->new(
352             fh => $fh,
353             on_error => sub {
354 0         0 my ( $hdl, $fatal, $msg ) = @_;
355 0         0 my $SID = $inner_self->_session_id($hdl);
356 0         0 $inner_self->hangup_client($SID);
357 0         0 $inner_self->_server_error( $msg, $fatal );
358 0         0 $hdl->destroy;
359             },
360              
361             on_eof => sub {
362 0         0 my ($hdl) = @_;
363 0         0 my $SID = $inner_self->_session_id($hdl);
364 0         0 $inner_self->hangup_client($SID);
365 0         0 $hdl->destroy;
366 0         0 AE::log debug => "SERVER, client $SID disconnected.";
367             },
368              
369             on_read => sub {
370 15         3193 my ($hdl) = @_;
371 15         44 chomp( my $line = delete $hdl->{'rbuf'} );
372 15         44 my $SID = $inner_self->_session_id($hdl);
373              
374 15         87 foreach my $command ( keys %client_commands ) {
375 124         177 my $regex = $client_commands{$command};
376 124 100       443 if ( my ($args) = ( $line =~ /$regex/i ) ) {
377 15         37 my $method = "handle_$command";
378 15         85 return $inner_self->$method( $hdl, $SID, $args );
379             }
380             }
381              
382 0         0 $hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
383             },
384 10         124 );
385              
386 10         803 my $SID = $inner_self->_session_id($handle);
387 10         92 $handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
388 10         862 $inner_self->register_client( $SID, $handle );
389 12   33     173 };
390              
391             $self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
392 4     4   345503 $inner_self->flush_client;
393 12         6455 };
394              
395             $self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
396 11     11   1563 $inner_self->stats;
397 12         100 };
398              
399             # Statistics Tracking
400 12 50       56 $self->{'config'}{'GraphiteHost'}
401             and $self->graphite_connect;
402              
403 12         61 return $self;
404             }
405              
406             sub flush_client {
407 4     4 0 18 my $self = shift;
408 4         16 my $clients = $self->{'clients'};
409 4         10 my $buffers = $self->{'buffers'};
410              
411 4         10 foreach my $SID ( keys %{$buffers} ) {
  4         57  
412 2         9 my $msgs = $buffers->{$SID};
413 2 50       4 @{$msgs} > 0 or next;
  2         22  
414              
415             # write the messages to the SID
416 2         6 my $msgs_str = join "\n", @{$msgs};
  2         13  
417              
418 2         25 $clients->{$SID}{'handle'}->push_write("$msgs_str\n");
419 2         334 $buffers->{$SID} = [];
420             }
421             }
422              
423             sub graphite_connect {
424 0     0 0 0 my $self = shift;
425              
426             eval {
427             $self->{'_graphite'} = AnyEvent::Graphite->new(
428             host => $self->{'config'}{'GraphiteHost'},
429 0         0 port => $self->{'config'}{'GraphitePort'},
430             );
431              
432 0         0 1;
433 0 0       0 } or do {
434 0   0     0 my $error = $@ || 'Zombie error';
435 0         0 AE::log debug => "Graphite server setup failed: $error";
436             }
437             }
438              
439             sub stats {
440 11     11 0 25 my $self = shift;
441              
442 11 50       43 if ( ! exists $self->{'stats'} ) {
443 11         87 $self->{'stats'} = {
444             map +( $_ => 0 ), qw<
445             received received_bytes dispatched dispatched _bytes
446             >
447             };
448              
449 11         54 return;
450             }
451              
452 0         0 my $stats = delete $self->{'stats'};
453              
454 0 0       0 if ( $self->{'_graphite'} ) {
455 0         0 my $time = AE::now;
456 0         0 foreach my $stat ( keys %{$stats}) {
  0         0  
457             my $metric = join '.', $self->{'config'}{'GraphitePrefix'},
458 0         0 $self->{'hostname'},
459             $stat;
460             eval {
461 0         0 $self->{'_graphite'}->send($metric, $stats->{$stat}, $time);
462 0         0 1;
463 0 0       0 } or do {
464 0   0     0 my $error = $@ || 'Zombie error';
465 0         0 AE::log debug => 'Error sending statistics, reconnecting.';
466 0         0 $self->graphite_connect;
467 0         0 last;
468             }
469             }
470             }
471              
472             AE::log debug => 'STATS: ' .
473 0         0 join ', ', map "$_:$stats->{$_}", keys %{$stats};
  0         0  
474             }
475              
476             sub run {
477 11     11 0 4735 my $self = shift;
478 11   33     89 $self->{'_cv'} = shift || AE::cv;
479 11         68 $self->{'_cv'}->recv;
480             }
481              
482             sub clients {
483 87     87 0 7579 my $self = shift;
484 87   50     372 $self->{'clients'} ||= {};
485             }
486              
487             sub register_client {
488 10     10 0 34 my ( $self, $SID, $handle ) = @_;
489              
490 10         78 $self->clients->{$SID} = { handle => $handle };
491             }
492              
493             sub dispatch_message {
494 1     1 0 41287 my ( $self, $msg ) = @_;
495 1         9 $self->_dispatch_messages( [$msg] );
496             }
497              
498             sub dispatch_messages {
499 0     0 0 0 my ( $self, $msgs ) = @_;
500 0         0 $self->_dispatch_messages( [ split /\n/, $msgs ] );
501             }
502              
503             sub _dispatch_messages {
504 1     1   5 my ( $self, $msgs ) = @_;
505              
506 1         3 my $clients = $self->{'clients'};
507 1         4 my $buffers = $self->{'buffers'};
508 1         2 my $dispatched = 0;
509 1         2 my $bytes = 0;
510              
511             # Handle fullfeeds
512 1         3 foreach my $SID ( keys %{$clients} ) {
  1         21  
513 1         6 push @{ $buffers->{$SID} }, @{$msgs};
  1         5  
  1         5  
514 1         3 $dispatched += scalar @{$msgs};
  1         4  
515 1         2 $bytes += length $_ for @{$msgs};
  1         4  
516             }
517              
518 1         3 foreach my $msg ( @{$msgs} ) {
  1         3  
519             # Grab statitics;
520 1         4 $self->{'stats'}{'received'}++;
521 1         3 $self->{'stats'}{'received_bytes'} += length $msg;
522              
523             # Program based subscriptions
524 1 50       16 if ( my ($program) = map lc, ( $msg =~ $_PRE{'program'} ) ) {
525             # remove the sub process and PID from the program
526 0         0 $program =~ s/\(.*//g;
527 0         0 $program =~ s/\[.*//g;
528              
529 0 0 0     0 if ( exists $self->{'programs'}{$program} && $self->{'programs'}{$program} > 0 ) {
530 0         0 foreach my $SID ( keys %{$clients} ) {
  0         0  
531 0 0       0 exists $clients->{$SID}{'subscription'}{$program}
532             or next;
533              
534 0         0 push @{ $buffers->{$SID} }, $msg;
  0         0  
535 0         0 $dispatched++;
536 0         0 $bytes += length $msg;
537             }
538             }
539             }
540              
541             # Match based subscriptions
542 1 50       3 if ( keys %{ $self->{'words'} } ) {
  1         5  
543 0         0 foreach my $word ( keys %{ $self->{'words'} } ) {
  0         0  
544 0 0       0 if ( index ( $msg, $word ) != -1 ) {
545 0         0 foreach my $SID ( keys %{$clients} ) {
  0         0  
546 0 0       0 exists $clients->{$SID}{'match'}{$word}
547             or next;
548              
549 0         0 push @{ $buffers->{$SID} }, $msg;
  0         0  
550 0         0 $dispatched++;
551 0         0 $bytes += length $msg;
552             }
553             }
554             }
555             }
556              
557             # Regex based subscriptions
558 1 50       3 if ( keys %{ $self->{'regex'} } ) {
  1         7  
559 0         0 my %hit = ();
560 0         0 foreach my $SID ( keys %{$clients} ) {
  0         0  
561 0         0 foreach my $re ( keys %{ $clients->{$SID}{'regex'} } ) {
  0         0  
562 0 0 0     0 if ( $hit{$re} || $msg =~ /$re/ ) {
563 0         0 $hit{$re} = 1;
564 0         0 push @{ $buffers->{$SID} }, $msg;
  0         0  
565 0         0 $dispatched++;
566 0         0 $bytes += length $msg;
567             }
568             }
569             }
570             }
571             }
572              
573             # Report statistics for dispatched messages
574 1 50       5 if ( $dispatched > 0 ) {
575 1         3 $self->{'stats'}{'dispatched'} += $dispatched;
576 1         18 $self->{'stats'}{'dispatched_bytes'} += $bytes;
577             }
578             }
579              
580             sub _session_id {
581 25     25   52 my ( $self, $handle ) = @_;
582             # AnyEvent::Handle=HASH(0x1bb30f0)
583 25         163 "$handle" =~ /\D0x([a-fA-F0-9]+)/;
584 25         81 return $1;
585             }
586              
587             1;
588              
589             __END__
590              
591             =pod
592              
593             =head1 DESCRIPTION
594              
595             L<AnyEvent::eris::Server> is an L<AnyEvent> version of
596             L<POE::Component::Server::eris> - a simple pub/sub implementation,
597             written by Brad Lhotsky.
598              
599             Since I don't actually have any use for it right now, it's not
600             actively maintained. Might as well release it. If you're interested in
601             taking over it, just let me know.
602              
603             For now the documentation is sparse but the tests should be clear
604             enough to assist in understanding it.