File Coverage

blib/lib/POE/Component/Server/Bayeux.pm
Criterion Covered Total %
statement 9 9 100.0
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 12 12 100.0


line stmt bran cond sub pod time code
1             package POE::Component::Server::Bayeux;
2              
3             =head1 NAME
4              
5             POE::Component::Server::Bayeux - Bayeux/cometd server implementation in POE
6              
7             =head1 SYNOPSIS
8              
9             use POE qw(Component::Server::Bayeux);
10              
11             # Create the server, listening on port 8080
12             my $server = POE::Component::Server::Bayeux->spawn(
13             Port => 8080,
14             Alias => 'bayeux_server',
15             );
16              
17             # Create a local client, a reply-bot
18             POE::Session->create(
19             inline_states => {
20             _start => sub {
21             my ($kernel, $heap) = @_[KERNEL, HEAP];
22             $kernel->alias_set('test_local_client');
23              
24             # Subscribe to /chat/demo, assigning a state for events
25             $kernel->post('bayeux_server', 'subscribe', {
26             channel => '/chat/demo',
27             client_id => $heap->{client_id},
28             args => {
29             state => 'subscribe_response',
30             },
31             });
32             },
33             subscribe_response => sub {
34             my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0];
35              
36             # Don't auto-reply to my own messages
37             return if $message->{clientId} eq $heap->{client_id};
38              
39             # Auto-reply to every message posted
40             $kernel->post('bayeux_server', 'publish', {
41             channel => $message->{channel},
42             client_id => $heap->{client_id},
43             data => {
44             user => 'Autobot',
45             chat => "I got your message, ".($message->{data}{user} || 'anon'),
46             },
47             });
48             },
49             },
50             heap => {
51             client_id => 'test_local_client',
52             },
53             );
54              
55             $poe_kernel->run();
56              
57             =head1 DESCRIPTION
58              
59             This module implements the Bayeux Protocol (1.0draft1) from the Dojo Foundation.
60             Also called cometd, Bayeux is a low-latency routing protocol for JSON encoded
61             events between clients and servers in a publish-subscribe model.
62              
63             This is the server implementation. There is also a client found at
64             L. With this server, you can roll out a cometd
65             server and basic HTTP server with POE communication capabilities. It comes bundled
66             with test code that you can run in your browser to test the functionality for a
67             basic chat program.
68              
69             B: This is the first release of this code. Not much testing has been
70             done, so please keep that in mind if you plan on using this for production. It was
71             developed for a production environment that is still being built, so future versions
72             of this code will be released over the next month that will be more feature complete
73             and less prone to errors.
74              
75             =cut
76              
77 3     3   829900 use strict;
  3         6  
  3         166  
78 3     3   17 use warnings;
  3         6  
  3         115  
79              
80 3         19 use POE qw(
81             Component::Server::HTTP
82             Component::Server::Bayeux::Client
83             Component::Server::Bayeux::Request
84 3     3   18 );
  3         11  
85             use HTTP::Status; # for RC_OK
86             use Params::Validate qw(CODEREF HASHREF validate validate_with);
87             use POE::Component::Server::Bayeux::Utilities qw(:all);
88              
89             # Logger modules
90             use base qw(Class::Accessor);
91             __PACKAGE__->mk_accessors(qw(logger session));
92             use Log::Log4perl qw(get_logger :levels);
93             use Log::Log4perl::Appender;
94             use Log::Log4perl::Layout;
95              
96             # Basic HTTP server modules
97             use URI;
98              
99             ## Class globals ###
100              
101             our $VERSION = '0.04';
102             our $protocol_version = '1.0';
103             our $supported_connection_types = [ 'long-polling' ];
104              
105             ## Class locals ###
106              
107             our %file_types = (
108             'application/javascript' => [ qr/\.js$/i ],
109             'text/html' => [ qr/\.html?$/i ],
110             'text/css' => [ qr/\.css$/i ],
111             'image/png' => [ qr/\.png$/i ],
112             'image/jpeg' => [ qr/\.jpe?g$/i ],
113             'image/gif' => [ qr/\.gif$/i ],
114             );
115              
116             {
117             package POE::Component::Server::Bayeux::Logger;
118              
119             # Performing a dump of a data structure is a useful thing for debuging,
120             # but in live code you may not need it. This is one way of accomplishing
121             # this.
122            
123             use strict;
124             use warnings;
125             use Carp;
126              
127             __PACKAGE__->mk_wrapped_levels(qw(trace debug info warn error fatal));
128              
129             sub new {
130             my ($class, %self) = @_;
131              
132             croak "Pass 'logger'" unless $self{logger};
133              
134             if (! $self{dumper}) {
135             require JSON::XS;
136             my $json = JSON::XS->new();
137             $json->indent(1);
138             $json->ascii(1);
139             $json->space_after(1);
140             $self{dumper} = sub {
141             $json->encode(@_);
142             };
143             }
144              
145             return bless \%self, $class;
146             }
147              
148             sub mk_wrapped_levels {
149             my ($class, @levels) = @_;
150              
151             no strict 'refs';
152             foreach my $level (@levels) {
153             *{"${class}::$level"} = sub {
154             my ($self, $message, @extra) = @_;
155             my $logger = $self->{logger};
156              
157             # Make sure before creating a dump of the ref's that I have to
158             my $is_method = 'is_' . $level;
159             return unless $logger->$is_method;
160              
161             my $total = $message;
162             if (@extra) {
163             $total .= "\n" unless $total =~ m/\n$/s;
164             $total .= $self->{dumper}->(@extra);
165             }
166              
167             $logger->$level($total);
168             };
169             }
170             }
171             }
172              
173             ## Class methods ###
174              
175             =head1 USAGE
176              
177             =head2 spawn (...)
178              
179             =over 4
180              
181             Create a new Bayeux server. Arguments to this method:
182              
183             =over 4
184              
185             =item I (default: 80)
186              
187             Bind an HTTP server to this port.
188              
189             =item I (default: 'bayeux')
190              
191             The POE session alias for local clients to post to.
192              
193             =item I (default: 0)
194              
195             Allow HTTP-connected clients to publish without handshake.
196              
197             =item I (default: 120)
198              
199             Seconds before an HTTP-connected client is timed out and forced to rehandshake.
200             Clients must not go this long between having a connect open.
201              
202             =item I (default: 10)
203              
204             Maximum number of concurrent connections allowed from a single IP address. Not effective
205             for anything but the bayeux/cometd connections, as the simple HTTP server doesn't support
206             counting concurrent connections.
207              
208             =item I (default: 0)
209              
210             Either 0 or 1, indicates level of logging.
211              
212             =item I (default: undef)
213              
214             If present, opens the file path indicated for logging output.
215              
216             =item I (default: '/var/www')
217              
218             Document root of generic HTTP server for file serving.
219              
220             =item I (default: [ 'index.html' ])
221              
222             Index file (think Apache config).
223              
224             =item I (default: {})
225              
226             Provide a hashref of MIME types and their associated expiry time. Similar to
227             mod_expires 'ExpiresByType $key "access plus $value seconds"'.
228              
229             =item I (default: undef)
230              
231             Provide a subref which will be called with the B and B
232             of any simple HTTP requests before the request is completed. This could allow the code to modify
233             the headers of the response as needed (i.e., path-based expiry time).
234              
235             =item I (default: {})
236              
237             Each key of this hash represents a service channel that will be available. The
238             name of the channel will be '/service/$key', and the handling is dependent on
239             the $value. Provide '_handler' as a fallback handler.
240              
241             If $value is a coderef, the code will be called with a single arg of the message
242             being acted upon. The return value(s) of the coderef will be considered response(s)
243             to be sent back to the client, so return an empty array if you don't want this to
244             happen (if you've added responses by $message->request->add_response()).
245              
246             =item I (defaults: sub {})
247              
248             Coderef to perform authorization checks on messages. Code block is passed two args,
249             the Client, and the Message. If the message should be rejected, the code should set
250             is_error() on the message.
251              
252             One could use this to perform authentication on the 'handshake' message:
253              
254             sub {
255             my ($client, $message) = @_;
256              
257             return unless $message->isa('POE::Component::Server::Bayeux::Message::Meta');
258             return unless $message->type eq 'handshake';
259              
260             my $error;
261              
262             while (1) {
263             if (! $message->ext ||
264             ! (defined $message->ext->{username} && defined $message->ext->{password})) {
265             $error = "Must pass username and password in ext to handshake";
266             last;
267             }
268              
269             my $authenticated = $message->ext->{username} eq 'admin'
270             && $message->ext->{password} eq 'password' ? 1 : 0;
271              
272             if (! $authenticated) {
273             $error = "Invalid username or password";
274             last;
275             }
276              
277             $client->flags->{is_authenticated} = 1;
278             last;
279             }
280              
281             if ($error) {
282             $message->is_error($error);
283             }
284             }
285              
286             =item I (defaults: sub {})
287              
288             Coderef to receive general event notifications from the server. Sends a hashref like so:
289              
290             {
291             event => 'new_connection',
292             client_id => ...,
293             client => ...,
294             message => ...,
295             }
296              
297             See L for more details about every type of event that this will receive.
298              
299             =item I (defaults: {})
300              
301             Additional ContentHandler for L creation. Use this to extend the HTTP server content handling.
302              
303             =back
304              
305             Returns a class object with methods of interest:
306              
307             =over 4
308              
309             =item I
310              
311             Returns the L object used by the server. Use this for unified logging output.
312              
313             =item I
314              
315             The L object returned from an internal create() call.
316              
317             =back
318              
319             =back
320              
321             =cut
322              
323             sub spawn {
324             my $class = shift;
325             my %args = validate(@_, {
326             Port => { default => '80' },
327             Alias => { default => 'bayeux' },
328             AnonPublish => { default => 0 },
329             Debug => { default => 0 },
330             LogFile => { default => '' },
331             # Client must not go 2 minutes without having an outstanding connect
332             ConnectTimeout => { default => 2 * 60 },
333             DocumentRoot => { default => '/var/www' },
334             DirectoryIndex => { default => [ 'index.html' ] },
335             TypeExpires => { default => {} },
336             PostHandle => { default => undef, type => CODEREF },
337             Services => { default => {} },
338             MessageACL => { default => sub {}, type => CODEREF },
339             Callback => { default => sub {}, type => CODEREF },
340             ContentHandler => { default => {}, type => HASHREF },
341             ClientMaxConnections => { default => 10 },
342             Logger => 0,
343             });
344              
345             my $logger = $args{Logger};
346              
347             # Setup logger
348             if (! $logger) {
349             $logger = Log::Log4perl->get_logger('bayeux_server');
350             my $logger_layout = Log::Log4perl::Layout::PatternLayout->new("[\%d] \%p: \%m\%n");
351             $logger->level($args{Debug} ? $DEBUG : $INFO);
352              
353             my $stdout_appender = Log::Log4perl::Appender->new(
354             'Log::Log4perl::Appender::Screen',
355             name => 'screenlog',
356             stderr => 0,
357             );
358             $stdout_appender->layout($logger_layout);
359              
360             $logger->add_appender($stdout_appender);
361              
362             if ($args{LogFile}) {
363             my $file_appender = Log::Log4perl::Appender->new(
364             'Log::Log4perl::Appender::File',
365             name => 'filelog',
366             filename => $args{LogFile},
367             );
368             $file_appender->layout( $logger_layout );
369              
370             $logger->add_appender($file_appender);
371             }
372             }
373              
374             # Wrap the Log4perl logger in my own class
375             $logger = POE::Component::Server::Bayeux::Logger->new(
376             logger => $logger,
377             );
378              
379             # Create HTTP server
380             my $http_aliases = POE::Component::Server::HTTP->new(
381             Port => $args{Port},
382             ContentHandler => {
383             '/cometd' => sub {
384             $poe_kernel->call( $args{Alias}, 'handle_cometd', @_ );
385             },
386             '/' => sub {
387             $poe_kernel->call( $args{Alias}, 'handle_generic', @_ );
388             },
389             %{ $args{ContentHandler} },
390             },
391             );
392              
393             my $self = bless { %args, logger => $logger }, $class;
394              
395             # Create manager session
396             $self->{session} = POE::Session->create(
397             inline_states => {
398             _start => \&manager_start,
399             _stop => \&manager_stop,
400             shutdown => \&manager_shutdown,
401              
402             handle_cometd => \&handle_cometd,
403             handle_generic => \&http_server_generic,
404             delay_request => \&delay_request,
405             complete_request => \&complete_request,
406             check_timeouts => \&check_timeouts,
407            
408             subscribe => \&subscribe,
409             unsubscribe => \&unsubscribe,
410             publish => \&publish,
411              
412             client_push => \&client_push,
413             client_connect => \&client_connect,
414             client_disconnect => \&client_disconnect,
415              
416             delay_sub => \&delay_sub,
417             delay_sub_cb => \&delay_sub_cb,
418             },
419             heap => {
420             args => \%args,
421             manager => $args{Alias},
422             clients => {
423             # example_client_id => {
424             # subscriptions => {
425             # '/chat/demo/not_real' => 1,
426             # },
427             # },
428             },
429             requests => {
430             # example_request_id => 1,
431             },
432             requests_by_ip => {},
433             logger => $logger,
434             http_aliases => $http_aliases,
435             },
436             ($ENV{POE_DEBUG} ? (
437             options => { trace => 1, debug => 1 },
438             ) : ()),
439             );
440              
441             return $self;
442             }
443              
444             ###### POE States ######################
445              
446             =head1 POE STATES
447              
448             Most of the server code is regarding interaction with HTTP-connected clients.
449             For this, see L. It supports locally
450             connected POE sessions, and for this, makes the following states available.
451              
452             These same states are called internally to handle the basic PubSub behavior of
453             the server for all clients, local and HTTP.
454              
455             =cut
456              
457             sub manager_start {
458             my ($kernel, $heap) = @_[KERNEL, HEAP];
459              
460             $kernel->alias_set( $heap->{manager} );
461              
462             $kernel->delay('check_timeouts', 30);
463              
464             $heap->{logger}->info("Bayeux server started. Connect to port $$heap{args}{Port}");
465              
466             if ($ENV{POE_DEBUG}) {
467             $kernel->alias_resolve($heap->{http_aliases}{httpd})->option( trace => 1, debug => 1 );
468             $kernel->alias_resolve($heap->{http_aliases}{tcp})->option( trace => 1, debug => 1 );
469             }
470             }
471              
472             sub manager_stop {
473             my ($kernel, $heap) = @_[KERNEL, HEAP];
474             }
475              
476             sub manager_shutdown {
477             my ($kernel, $heap) = @_[KERNEL, HEAP];
478              
479             $heap->{logger}->info("Shutting down");
480              
481             while (my $request = values %{ $heap->{requests} }) {
482             $request->complete();
483             }
484              
485             $kernel->alarm_remove_all();
486             $kernel->alias_set( $heap->{manager} );
487              
488             $kernel->call( $heap->{http_aliases}{httpd}, 'shutdown' );
489             $kernel->call( $heap->{http_aliases}{tcp}, 'shutdown' );
490             }
491              
492             sub http_server_generic {
493             my ($kernel, $heap, $request, $response) = @_[KERNEL, HEAP, ARG0, ARG1];
494              
495             my $uri = URI->new($request->uri);
496             my $path = $heap->{args}{DocumentRoot} . '/' . $uri->path;
497              
498             # Attempt to find a directory index
499             if (-d $path) {
500             $path .= '/' unless $path =~ m{/$};
501             foreach my $index_name (@{ $heap->{args}{DirectoryIndex} }) {
502             next unless -f $path . $index_name;
503             $path .= $index_name;
504             last;
505             }
506              
507             }
508             if (-d $path) {
509             $response->code(RC_OK);
510             $response->content("Directory listing denied");
511             }
512             elsif (-f $path) {
513             $response->code(RC_OK);
514             open my $in, '<', $path;
515             if (! $in) {
516             $response->content("Unable to open '$path': $!");
517             return RC_OK;
518             }
519              
520             # Find a file type
521             my $type;
522             foreach my $possible_type (keys %file_types) {
523             next unless grep { $path =~ $_ } @{ $file_types{$possible_type} };
524             $type = $possible_type;
525             last;
526             }
527             $type ||= 'text/plain';
528             $response->content_type($type);
529              
530             if (my $whence = $heap->{args}{TypeExpires}{$type}) {
531             $response->expires( time() + $whence );
532             }
533              
534             my $content;
535             {
536             local $/ = undef;
537             $content = <$in>;
538             }
539             close $in;
540             $response->content($content);
541              
542             my $ip = $request->header('X-Forwarded-For') || $request->{connection}{remote_ip};
543             $heap->{logger}->info(sprintf 'Serving %s %s %s', $ip, $uri->path, $response->content_type);
544             }
545             else {
546             $response->code(RC_NOT_FOUND);
547             $response->content("Path '".$uri->path."' not found");
548             }
549              
550             if ($heap->{args}{PostHandle}) {
551             $heap->{args}{PostHandle}($request, $response);
552             }
553              
554             # Ensure no KeepAlive
555             $request->header(Connection => 'close');
556              
557             return RC_OK;
558             }
559              
560             ## Remote clients, long-polling ###
561              
562             sub handle_cometd {
563             my ($kernel, $heap, $request, $response) = @_[KERNEL, HEAP, ARG0, ARG1];
564              
565             # Deny based upon ClientMaxConnections restrictions
566              
567             my $ip = $request->header('X-Forwarded-For') || $request->{connection}{remote_ip};
568             if (! $ip) {
569             $ip = '0.0.0.0';
570             $heap->{logger}->error("No IP found for cometd request");
571             }
572              
573             $heap->{requests_by_ip}{$ip} ||= {};
574             my @request_ids = keys %{ $heap->{requests_by_ip}{$ip} };
575             if (int @request_ids > $heap->{args}{ClientMaxConnections}) {
576             $heap->{logger}->info("Denying $ip; too many connections (".int(@request_ids).")");
577              
578             $response->code(RC_SERVICE_UNAVAILABLE);
579             $response->header( 'Content-Type' => "text/json; charset=utf-8" );
580             $response->content( '{ "error": "Too many connections from your IP", "successful": false }' );
581             return RC_OK;
582             }
583             else {
584             #$heap->{logger}->info("IP $ip has " . int(@request_ids) . " connections");
585             }
586              
587             # Proceed with processing
588              
589             #$heap->{logger}->debug("Handling new cometd request");
590              
591             #$heap->{logger}->debug($request->as_string);
592              
593             my $bayeux_request = POE::Component::Server::Bayeux::Request->new(
594             request => $request,
595             response => $response,
596             server_heap => $heap,
597             ip => $ip,
598             );
599             $bayeux_request->handle();
600              
601             if ($bayeux_request->is_complete) {
602             $heap->{logger}->debug("Immediate remote response:", $bayeux_request->json_response);
603             return RC_OK;
604             }
605             else {
606             $heap->{requests}{ $bayeux_request->id } = $bayeux_request;
607             $heap->{requests_by_ip}{$ip}{ $bayeux_request->id } = $bayeux_request;
608             return RC_WAIT;
609             }
610             }
611              
612             sub delay_request {
613             my ($kernel, $heap, $request_id, $delay) = @_[KERNEL, HEAP, ARG0, ARG1];
614              
615             $heap->{logger}->debug("Delaying $delay to process request $request_id");
616             $kernel->delay_add('complete_request', $delay, $request_id);
617             }
618              
619             sub complete_request {
620             my ($kernel, $heap, $request_id) = @_[KERNEL, HEAP, ARG0];
621              
622             return unless defined $heap->{requests}{$request_id};
623             my $request = delete $heap->{requests}{$request_id};
624              
625             my $ip = $request->ip;
626             if ($heap->{requests_by_ip}{$ip}) {
627             delete $heap->{requests_by_ip}{$ip}{$request_id};
628             if (! keys %{ $heap->{requests_by_ip}{$ip} }) {
629             delete $heap->{requests_by_ip}{$ip};
630             }
631             }
632             else {
633             $heap->{logger}->error("Couldn't find requests by ip ".($ip || 'undef'));
634             }
635              
636             eval {
637             $request->complete();
638             };
639             if ($@) {
640             $heap->{logger}->error("Couldn't complete request $request_id ($@) - mayhap the client went away?");
641             }
642             else {
643             $heap->{logger}->debug("Delayed remote response to request $request_id from $ip:", $request->json_response);
644             }
645             }
646              
647             sub check_timeouts {
648             my ($kernel, $heap) = @_[KERNEL, HEAP];
649              
650             # Setup my next call time
651             $kernel->delay('check_timeouts', 30);
652              
653             foreach my $client_id (keys %{ $heap->{clients} }) {
654             my $client = POE::Component::Server::Bayeux::Client->new(
655             id => $client_id,
656             server_heap => $heap,
657             );
658             $client->check_timeout();
659             if ($client->is_error) {
660             $heap->{logger}->info("Found timeed out client $client_id in check_timeouts()");
661             }
662             }
663             }
664              
665             sub delay_sub {
666             my ($kernel, $heap, $delay_name, $delay_sec, $sub) = @_[KERNEL, HEAP, ARG0 .. $#_];
667              
668             if (my $existing = $heap->{delay_sub}{$delay_name}) {
669             return;
670             }
671             $kernel->delay_add('delay_sub_cb', $delay_sec, $delay_name);
672             $heap->{delay_sub}{$delay_name} = $sub;
673             }
674             sub delay_sub_cb {
675             my ($kernel, $heap, $delay_name) = @_[KERNEL, HEAP, ARG0];
676              
677             my $sub = delete $heap->{delay_sub}{$delay_name};
678             &$sub();
679             }
680              
681             ## Client agnostic, no auth performed ###
682              
683             =head2 subscribe ({...})
684              
685             =over 4
686              
687             Required keys 'channel', 'client_id'. Optional key 'args' (hashref).
688              
689             Subscribes client_id to the channel indicated. If subscribe() is called by
690             another session, it's treated as a non-HTTP request and will not perform
691             authentication on the subscription. Local clients need not handshake or
692             connect.
693              
694             Events published to the subscribed channel are sent to the calling session's
695             method named 'deliver', which can be overrided by the args hashref key 'state'.
696             For example:
697              
698             $kernel->post('bayeux_server', 'subscribe', {
699             channel => '/chat/demo',
700             client_id => 'local_client',
701             args => {
702             state => 'subscribe_events',
703             },
704             });
705              
706             =back
707              
708             =cut
709              
710             sub subscribe {
711             my ($kernel, $heap, $args) = @_[KERNEL, HEAP, ARG0];
712              
713             my @args = %$args;
714             my %args;
715             eval {
716             %args = validate(@args, {
717             channel => 1,
718             client_id => 1,
719             args => { default => {} },
720             });
721             };
722             if ($@) {
723             $heap->{logger}->error("subscribe() invalid call: $@");
724             return;
725             }
726              
727             # If subscribe() was called by another POE session
728             if ($_[SESSION] != $_[SENDER]) {
729             # Create a client, thereby storing the session in the client heap
730             my $client = POE::Component::Server::Bayeux::Client->new(
731             id => $args{client_id},
732             session => $_[SENDER],
733             server_heap => $heap,
734             );
735             }
736              
737             $args{args}{subscribed} = time;
738             $heap->{clients}{ $args->{client_id} }{subscriptions}{ $args->{channel} } = $args{args};
739              
740             $heap->{args}{Callback}->({
741             event => 'subscribe',
742             client_id => $args->{client_id},
743             channel => $args->{channel},
744             });
745             }
746              
747             =head2 unsubscribe ({...})
748              
749             =over 4
750              
751             Required keys 'channel', 'client_id'.
752              
753             Unsubscribes client_id from the channel indicated.
754              
755             =back
756              
757             =cut
758              
759             sub unsubscribe {
760             my ($kernel, $heap, $sender, $args) = @_[KERNEL, HEAP, SENDER, ARG0];
761              
762             my @args = %$args;
763             my %args;
764             eval {
765             %args = validate(@args, {
766             channel => 1,
767             client_id => 1,
768             });
769             };
770             if ($@) {
771             $heap->{logger}->error("unsubscribe() invalid call: $@");
772             return;
773             }
774              
775             my $client_heap = $heap->{clients}{ $args->{client_id} };
776             return unless $client_heap;
777             return unless $client_heap->{subscriptions}{ $args->{channel} };
778             delete $client_heap->{subscriptions}{ $args->{channel} };
779              
780             $heap->{args}{Callback}->({
781             event => 'unsubscribe',
782             client_id => $args->{client_id},
783             channel => $args->{channel},
784             });
785             }
786              
787             =head2 publish ({...})
788              
789             =over 4
790              
791             Required keys 'channel' and 'data'. Optional keys 'client_id', 'id', and 'ext'.
792              
793             Publishes a message to the channel specified. The keys 'client_id', 'id' and
794             'ext' are passed thru, appended to the message sent. For local clients who
795             subscribed from another session, the message is immediately posted to their
796             callback state. For HTTP clients, messages are put into queue and flushed if
797             they have an open /meta/connect.
798              
799             =back
800              
801             =cut
802              
803             sub publish {
804             my ($kernel, $heap, $sender, $args) = @_[KERNEL, HEAP, SENDER, ARG0];
805              
806             my @args = %$args;
807             my %args;
808             eval {
809             %args = validate(@args, {
810             channel => 1,
811             client_id => 0,
812             data => 1,
813             id => 0,
814             ext => 0,
815             timestamp => 0,
816             });
817             };
818             if ($@) {
819             $heap->{logger}->error("publish() invalid call: $@");
820             return;
821             }
822              
823             # Check each subscription, getting list of who to send this to
824              
825             my %send_to_clients;
826             CLIENT:
827             foreach my $client_id (keys %{ $heap->{clients} }) {
828             my $client_heap = $heap->{clients}{$client_id};
829             next unless $client_heap->{subscriptions};
830             foreach my $subscribed (keys %{ $client_heap->{subscriptions} }) {
831             next unless channel_match($args{channel}, $subscribed);
832             my $subscription_args = $client_heap->{subscriptions}{$subscribed};
833             $send_to_clients{ $client_id } = $subscription_args;
834             next CLIENT;
835             }
836             }
837              
838             $heap->{args}{Callback}->({
839             event => 'publish',
840             %args,
841             });
842              
843             my @send_to_clients = keys %send_to_clients;
844             if (! @send_to_clients) {
845             $heap->{logger}->debug("publish('$args{channel}') had no Bayeux subscribers");
846             return;
847             }
848              
849             # Construct deliver packet
850              
851             my %deliver = (
852             map { $_ => $args{$_} }
853             grep { defined $args{$_} }
854             qw(channel data id ext timestamp)
855             );
856             $deliver{clientId} = $args{client_id} if defined $args{client_id};
857              
858             foreach my $client_id (@send_to_clients) {
859             my $client = POE::Component::Server::Bayeux::Client->new(
860             id => $client_id,
861             server_heap => $heap,
862             );
863             next if ! $client || $client->is_error;
864             $client->send_message(\%deliver, $send_to_clients{$client_id});
865             }
866             }
867              
868             =head2 client_push ({...})
869              
870             =cut
871              
872             sub client_push {
873             my ($kernel, $heap, $sender, $args) = @_[KERNEL, HEAP, SENDER, ARG0];
874            
875             # Validate args
876             my @args = %$args;
877             my %args;
878             eval {
879             %args = validate_with(
880             params => \@args,
881             spec => {
882             channel => 1,
883             client_id => 1,
884             },
885             allow_extra => 1,
886             );
887             };
888             if ($@) {
889             $heap->{logger}->error("client_push() invalid call: $@");
890             return;
891             }
892              
893             # Construct the packet
894              
895             my %deliver = (%args);
896             $deliver{clientId} = delete $deliver{client_id};
897              
898             # Find the client and push the packet
899              
900             my $client = POE::Component::Server::Bayeux::Client->new(
901             id => $args{client_id},
902             server_heap => $heap,
903             );
904             if (! $client) {
905             $heap->{logger}->error("client_push() failed: no client found from $args{client_id}");
906             return;
907             }
908             if ($client->is_error) {
909             $heap->{logger}->debug("client_push() failed: client $args{client_id} in error state (".$client->is_error.")");
910             return;
911             }
912             $client->send_message(\%deliver);
913              
914             $heap->{args}{Callback}->({
915             event => 'client_push',
916             %args,
917             });
918             }
919              
920             sub client_connect {
921             my ($kernel, $heap, $sender, $args) = @_[KERNEL, HEAP, SENDER, ARG0];
922            
923             my @args = %$args;
924             my %args;
925             eval {
926             %args = validate(@args, {
927             client_id => 1,
928             ip => 0,
929             session => 0,
930             });
931             };
932             if ($@) {
933             $heap->{logger}->error("client_connect() invalid call: $@");
934             return;
935             }
936              
937             # Nothing to do here; the Client class adds it to my $heap->{clients}
938              
939             $heap->{args}{Callback}->({
940             event => 'client_connect',
941             %args,
942             });
943             }
944              
945             sub client_disconnect {
946             my ($kernel, $heap, $sender, $args) = @_[KERNEL, HEAP, SENDER, ARG0];
947            
948             my @args = %$args;
949             my %args;
950             eval {
951             %args = validate(@args, {
952             client_id => 1,
953             });
954             };
955             if ($@) {
956             $heap->{logger}->error("client_disconnect() invalid call: $@");
957             return;
958             }
959              
960             my $client_heap = $heap->{clients}{ $args{client_id} };
961             return unless $client_heap;
962              
963             foreach my $channel (keys %{ $client_heap->{subscriptions} }) {
964             # Do a call since this needs to happen right now
965             $kernel->call($_[SESSION], 'unsubscribe', {
966             client_id => $args{client_id},
967             channel => $channel,
968             });
969             }
970              
971             delete $heap->{clients}{ $args{client_id} };
972              
973             $heap->{args}{Callback}->({
974             event => 'client_disconnect',
975             %args,
976             });
977             }
978              
979             =head2 Server Callbacks
980              
981             Using the B feature of the server spawning, you can be notified about every significant event on the server. Below describes all the current callback events:
982              
983             =over 4
984              
985             =item I
986              
987             Keys 'client_id' and 'channel'
988              
989             =item I
990              
991             Keys 'client_id' and 'channel'
992              
993             =item I
994              
995             Keys 'channel' and 'data', optional: 'client_id', 'id', 'ext'
996              
997             =item I
998              
999             Keys 'channel' and 'client_id', optional: (any extra). Indicates data was pushed to the client not as a normal request/response or a publish/subscribe (out-of-sequence reply to a /service, for example). Likely only triggered by local sessions.
1000              
1001             =item I
1002              
1003             Keys 'client_id' and either 'ip' or 'session' depending on the type of client.
1004              
1005             =item I
1006              
1007             Key 'client_id'.
1008              
1009             =back
1010              
1011             =head1 TODO
1012              
1013             Lots of stuff.
1014              
1015             The code currently implements only the long-polling transport and doesn't yet
1016             strictly follow all the directives in the protocol document http://svn.xantus.org/shortbus/trunk/bayeux/bayeux.html
1017              
1018             =head1 KNOWN BUGS
1019              
1020             No known bugs, but I'm sure you can find some.
1021              
1022             =head1 SEE ALSO
1023              
1024             L, L
1025              
1026             =head1 COPYRIGHT
1027              
1028             Copyright (c) 2008 Eric Waters and XMission LLC (http://www.xmission.com/).
1029             All rights reserved. This program is free software; you can redistribute it
1030             and/or modify it under the same terms as Perl itself.
1031              
1032             The full text of the license can be found in the LICENSE file included with
1033             this module.
1034              
1035             =head1 AUTHOR
1036              
1037             Eric Waters
1038              
1039             =cut
1040              
1041             1;