File Coverage

blib/lib/Mercury/Pattern/PubSub.pm
Criterion Covered Total %
statement 38 38 100.0
branch 4 4 100.0
condition n/a
subroutine 9 9 100.0
pod 5 5 100.0
total 56 56 100.0


line stmt bran cond sub pod time code
1             package Mercury::Pattern::PubSub;
2             our $VERSION = '0.016';
3             # ABSTRACT: Manage a pub/sub pattern for a single topic
4              
5             #pod =head1 SYNOPSIS
6             #pod
7             #pod # Connect the publisher
8             #pod my $pub_ua = Mojo::UserAgent->new;
9             #pod my $pub_tx = $ua->websocket( '/pub/foo' );
10             #pod
11             #pod # Connect the subscriber socket
12             #pod my $sub_ua = Mojo::UserAgent->new;
13             #pod my $sub_tx = $ua->websocket( '/sub/foo' );
14             #pod
15             #pod # Connect the two sockets using pub/sub
16             #pod my $pattern = Mercury::Pattern::PubSub->new;
17             #pod $pattern->add_publisher( $pub_tx );
18             #pod $pattern->add_subscriber( $sub_tx );
19             #pod
20             #pod # Send a message
21             #pod $sub_tx->on( message => sub {
22             #pod my ( $tx, $msg ) = @_;
23             #pod print $msg; # Hello, World!
24             #pod } );
25             #pod $pub_tx->send( 'Hello, World!' );
26             #pod
27             #pod =head1 DESCRIPTION
28             #pod
29             #pod This pattern connects publishers, which send messages, to subscribers,
30             #pod which recieve messages. Each message sent by a publisher will be
31             #pod received by all connected subscribers. This pattern is useful for
32             #pod sending notification events and logging.
33             #pod
34             #pod =head1 SEE ALSO
35             #pod
36             #pod =over
37             #pod
38             #pod =item L
39             #pod
40             #pod =item L
41             #pod
42             #pod =back
43             #pod
44             #pod =cut
45              
46 3     3   16 use Mojo::Base '-base';
  3         6  
  3         16  
47              
48             #pod =attr subscribers
49             #pod
50             #pod Arrayref of connected websockets ready to receive messages
51             #pod
52             #pod =cut
53              
54             has subscribers => sub { [] };
55              
56             #pod =attr publishers
57             #pod
58             #pod Arrayref of connected websockets ready to publish messages
59             #pod
60             #pod =cut
61              
62             has publishers => sub { [] };
63              
64             #pod =method add_subscriber
65             #pod
66             #pod $pat->add_subscriber( $tx );
67             #pod
68             #pod Add the connection as a subscriber. Subscribers will receive all messages
69             #pod sent by publishers.
70             #pod
71             #pod =cut
72              
73             sub add_subscriber {
74 7     7 1 32 my ( $self, $tx ) = @_;
75             $tx->on( finish => sub {
76 7     7   4609 my ( $tx ) = @_;
77 7         24 $self->remove_subscriber( $tx );
78 7         54 } );
79 7         39 push @{ $self->subscribers }, $tx;
  7         25  
80 7         33 return;
81             }
82              
83             #pod =method remove_subscriber
84             #pod
85             #pod $pat->remove_subscriber( $tx );
86             #pod
87             #pod Remove a subscriber. Called automatically when a subscriber socket is
88             #pod closed.
89             #pod
90             #pod =cut
91              
92             sub remove_subscriber {
93 7     7 1 15 my ( $self, $tx ) = @_;
94 7         10 my @subs = @{ $self->subscribers };
  7         17  
95 7         49 for my $i ( 0.. $#subs ) {
96 11 100       38 if ( $subs[$i] eq $tx ) {
97 7         14 splice @subs, $i, 1;
98 7         19 return;
99             }
100             }
101             }
102              
103             #pod =method add_publisher
104             #pod
105             #pod $pat->add_publisher( $tx );
106             #pod
107             #pod Add a publisher to this topic. Publishers send messages to all
108             #pod subscribers.
109             #pod
110             #pod =cut
111              
112             sub add_publisher {
113 5     5 1 31 my ( $self, $tx ) = @_;
114             $tx->on( message => sub {
115 4     4   16383 my ( $tx, $msg ) = @_;
116 4         25 $self->send_message( $msg );
117 5         51 } );
118             $tx->on( finish => sub {
119 5     5   4100 my ( $tx ) = @_;
120 5         16 $self->remove_publisher( $tx );
121 5         51 } );
122 5         24 push @{ $self->publishers }, $tx;
  5         18  
123 5         15 return;
124             }
125              
126             #pod =method remove_publisher
127             #pod
128             #pod $pat->remove_publisher( $tx );
129             #pod
130             #pod Remove a publisher from the list. Called automatically when the
131             #pod publisher socket is closed.
132             #pod
133             #pod =cut
134              
135             sub remove_publisher {
136 5     5 1 12 my ( $self, $tx ) = @_;
137 5         24 my @pubs = @{ $self->publishers };
  5         13  
138 5         38 for my $i ( 0.. $#pubs ) {
139 6 100       26 if ( $pubs[$i] eq $tx ) {
140 5         11 splice @pubs, $i, 1;
141 5         16 return;
142             }
143             }
144             }
145              
146             #pod =method send_message
147             #pod
148             #pod $pat->send_message( $message );
149             #pod
150             #pod Send a message to all subscribers.
151             #pod
152             #pod =cut
153              
154             sub send_message {
155 8     8 1 74 my ( $self, $message ) = @_;
156 8         16 $_->send( $message ) for @{ $self->subscribers };
  8         22  
157 8         2093 return;
158             }
159              
160             1;
161              
162             __END__