File Coverage

blib/lib/Mercury/Controller/PubSub/Cascade.pm
Criterion Covered Total %
statement 37 37 100.0
branch 2 2 100.0
condition n/a
subroutine 8 8 100.0
pod 3 3 100.0
total 50 50 100.0


line stmt bran cond sub pod time code
1             package Mercury::Controller::PubSub::Cascade;
2             our $VERSION = '0.015';
3             # ABSTRACT: Pub/sub controller with a topic heirarchy and cascading
4              
5             #pod =head1 SYNOPSIS
6             #pod
7             #pod # myapp.pl
8             #pod use Mojolicious::Lite;
9             #pod plugin 'Mercury';
10             #pod websocket( '/pub/*topic' )
11             #pod ->to( controller => 'PubSub::Cascade', action => 'pub' );
12             #pod websocket( '/sub/*topic' )
13             #pod ->to( controller => 'PubSub::Cascade', action => 'sub' );
14             #pod
15             #pod =head1 DESCRIPTION
16             #pod
17             #pod This controller enables a Lsub pattern|Mercury::Pattern::PubSub> on
18             #pod a pair of endpoints (L and L.
19             #pod
20             #pod In this variant, topics are organized into a heirarchy. Subscribers can
21             #pod subscribe to higher branch of the tree to recieve messages from all the
22             #pod publishers on lower branches of the tree. So, a subscriber to C
23             #pod will receive messages sent to C, C, and C.
24             #pod
25             #pod For more information on the pub/sub pattern, see L.
26             #pod
27             #pod =head1 SEE ALSO
28             #pod
29             #pod =over
30             #pod
31             #pod =item L
32             #pod
33             #pod =item L
34             #pod
35             #pod =item L
36             #pod
37             #pod =back
38             #pod
39             #pod =cut
40              
41 2     2   4007 use Mojo::Base 'Mojolicious::Controller';
  2         8  
  2         21  
42 2     2   1433 use Mercury::Pattern::PubSub;
  2         8  
  2         25  
43              
44             #pod =method publish
45             #pod
46             #pod $app->routes->websocket( '/pub/*topic' )
47             #pod ->to( controller => 'PubSub::Cascade', action => 'publish' );
48             #pod
49             #pod Controller action to connect a websocket as a publisher. A publish
50             #pod client sends messages through the socket. The message will be sent to
51             #pod all of the connected subscribers for the topic and all parent topics.
52             #pod
53             #pod This endpoint requires a C in the stash.
54             #pod
55             #pod =cut
56              
57             sub publish {
58 4     4 1 3365 my ( $c ) = @_;
59 4         31 my $topic = $c->stash( 'topic' );
60 4         84 my $pattern = $c->_pattern( $topic );
61 4         23 $pattern->add_publisher( $c->tx );
62              
63             # Send messages to parent topics
64             $c->tx->on( message => sub {
65 3     3   22 my ( $tx, $msg ) = @_;
66 3         15 $c->_send_message( $topic, $msg );
67 4         17 } );
68              
69 4         78 $c->rendered( 101 );
70             }
71              
72             #pod =method subscribe
73             #pod
74             #pod $app->routes->websocket( '/sub/*topic' )
75             #pod ->to( controller => 'PubSub::Cascade', action => 'subscribe' );
76             #pod
77             #pod Controller action to connect a websocket as a subscriber. A subscriber
78             #pod will recieve every message sent by publishers to the current topic and
79             #pod any child topics.
80             #pod
81             #pod This endpoint requires a C in the stash.
82             #pod
83             #pod =cut
84              
85             sub subscribe {
86 5     5 1 7011 my ( $c ) = @_;
87 5         31 my $pattern = $c->_pattern( $c->stash( 'topic' ) );
88 5         25 $pattern->add_subscriber( $c->tx );
89 5         28 $c->rendered( 101 );
90             }
91              
92             #pod =method post
93             #pod
94             #pod Post a new message to the given topic without subscribing or
95             #pod establishing a WebSocket connection. This allows new messages to be
96             #pod pushed by any HTTP client.
97             #pod
98             #pod =cut
99              
100             sub post {
101 2     2 1 5709 my ( $c ) = @_;
102 2         13 my $topic = $c->stash( 'topic' );
103 2         436 my $pattern = $c->_pattern( $topic );
104 2         19 $pattern->send_message( $c->req->body );
105 2         15 $c->_send_message( $topic, $c->req->body );
106 2         23 $c->render(
107             status => 200,
108             text => '',
109             );
110             }
111              
112             #=method _pattern
113             #
114             # my $pattern = $c->_pattern( $topic );
115             #
116             # Get or create the L object for the given
117             # topic.
118             #
119             #=cut
120              
121             sub _pattern {
122 11     11   114 my ( $c, $topic ) = @_;
123 11         136 my $pattern = $c->mercury->pattern( 'PubSub::Cascade' => $topic );
124 11 100       151 if ( !$pattern ) {
125 3         26 $pattern = Mercury::Pattern::PubSub->new;
126 3         38 $c->mercury->pattern( 'PubSub::Cascade' => $topic => $pattern );
127             }
128 11         44 return $pattern;
129             }
130              
131             #=method _send_message
132             #
133             # $c->_send_message( $topic, $msg );
134             #
135             # Send the given message out on all the appropriate topics. This handles
136             # the "Cascade" part.
137             #=cut
138              
139             sub _send_message {
140 5     5   96 my ( $c, $topic, $msg ) = @_;
141 5         37 my @parts = split m{/}, $topic;
142             my @patterns =
143             # Only pattern objects that have been created
144 2         34 grep { defined }
145             # Change topics into pattern objects
146 2         24 map { $c->mercury->pattern( 'PubSub::Cascade' => $_ ) }
147             # Build parent topics
148 5         35 map { join '/', @parts[0..$_] }
  2         19  
149             0..$#parts-1;
150 5         28 $_->send_message( $msg ) for @patterns;
151             }
152              
153             1;
154              
155             __END__