File Coverage

blib/lib/Mojolicious/Plugin/Multiplex/Multiplexer.pm
Criterion Covered Total %
statement 47 47 100.0
branch 24 26 92.3
condition 6 7 85.7
subroutine 12 12 100.0
pod 4 4 100.0
total 93 96 96.8


line stmt bran cond sub pod time code
1             package Mojolicious::Plugin::Multiplex::Multiplexer;
2              
3 1     1   16 use Mojo::Base 'Mojo::EventEmitter';
  1         2  
  1         6  
4              
5 1     1   148 use Carp ();
  1         2  
  1         14  
6 1     1   6 use Scalar::Util ();
  1         2  
  1         284  
7              
8             has tx => sub { Carp::croak 'tx is required' };
9              
10             my %map = (
11             sub => 'subscribe',
12             msg => 'message',
13             uns => 'unsubscribe',
14             err => 'error',
15             sta => 'status',
16             );
17              
18             sub new {
19 3     3 1 82 my $self = shift->SUPER::new(@_);
20 3         31 my $tx = $self->tx;
21 3 50       23 return undef unless $tx->is_websocket;
22 3         23 Scalar::Util::weaken $self->{tx};
23              
24             $tx->on(text => sub {
25 9     9   26057 my ($tx, $bytes) = @_;
26 9         18 my %message;
27 9         111 @message{qw/type topic payload/} = split /,/, $bytes, 3;
28              
29 9         30 my $e = $map{$message{type}};
30 9         26 my @args = ($message{topic});
31              
32 9 100       45 if (! defined $e) {
    100          
    100          
    100          
33 1         3 $e = 'error';
34 1         6 push @args, {
35             error => 'Message type not understood',
36             message => \%message,
37             };
38             } elsif ($e eq 'error') {
39 1         5 push @args, {
40             error => 'Client error',
41             message => \%message,
42             };
43             } elsif ($e eq 'message') {
44 1         4 push @args, $message{payload};
45             } elsif ($e eq 'status') {
46 1     1   8 no warnings 'uninitialized';
  1         2  
  1         515  
47 4         10 my $s = $message{payload};
48 4 100 66     26 push @args, $s eq 'true' ? 1 :
    100          
    100          
49             $s eq 'false' ? 0 :
50             ! (defined $s || length $s) ? undef :
51             {error => 'Status payload not understood', message => \%message};
52 4 100       14 $e = 'error' if ref $args[-1];
53             }
54              
55 9         32 $self->emit($e, @args);
56 3         29 });
57              
58 3     3   34 $tx->on(finish => sub { $self->emit(finish => @_) });
  3         10940  
59              
60 3         30 return $self;
61             }
62              
63             sub send_status {
64 3     3 1 11078 my ($self, $topic, $payload, $cb) = @_;
65 3 100       15 $payload = defined($payload) ? $payload ? ',true' : ',false' : '';
    100          
66 3         13 $self->_send("sta,$topic$payload", $cb);
67             }
68              
69             sub send {
70 6     6 1 16912 my ($self, $topic, $payload, $cb) = @_;
71 6   100     27 $payload //= '';
72 6         27 $self->_send("msg,$topic,$payload", $cb);
73             }
74              
75             sub send_error {
76 4     4 1 12865 my ($self, $topic, $payload, $cb) = @_;
77 4   100     20 $payload //= '';
78 4         19 $self->_send("err,$topic,$payload", $cb);
79             }
80              
81             sub _send {
82 13     13   29 my ($self, $msg, $cb) = @_;
83 13 50       41 return unless my $tx = $self->tx;
84 13 100   1   109 $tx->send($msg, $cb ? sub { $self->$cb() } : ());
  1         242  
85             }
86              
87             1;
88              
89             =head1 NAME
90              
91             Mojolicious::Plugin::Multiplex::Multiplexer - Dispatcher class for multiplexing websockets
92              
93             =head1 SYNOPSIS
94              
95             # a simple single-threaded message relay example
96              
97             use Mojolicious::Plugin::Multiplex::Multiplexer;
98             my $multiplex = Mojolicious::Plugin::Multiplex::Multiplexer->new(tx => $tx);
99              
100             my %topics;
101             $multiplex->on(message => sub {
102             my ($multiplex, $topic, $payload) = @_;
103             return unless my $cb = $topics{$topic};
104             $multiplex->$cb($topic, $payload);
105             });
106              
107             $multiplex->on(subscribe => sub {
108             my ($multiplex, $topic) = @_;
109             $topics{$topic} = sub { shift->send(@_) };
110             $multiplex->send_status($topic, 1);
111             });
112              
113             $multiplex->on(unsubscribe => sub {
114             my ($multiplex, $topic) = @_;
115             delete $topics{$topic};
116             $multiplex->send_status($topic, 0);
117             });
118              
119             =head1 DESCRIPTION
120              
121             This class sends and receives messages over a L using a variant of the sockjs websocket multiplex protocol.
122             This variant defines five message types, they are: C, C, C, C, C.
123             Further each message is assigned to a topic (channel) which is used to separate messages by subscribed listener.
124             Note that though the protocol defines an error message, the event is also emitted on other errors; in the case of an error message the error string will be C.
125              
126             =head1 PLEASE NOTE
127              
128             This class is rather low level and is useful for writing bindings to backend message services like brokers.
129             Notice that it does not store any connection state information either, which would be the responsibility of the consuming module or script.
130             An example is given in the distribution for using this class with L to relay JSON messages between web-based chat clients.
131              
132             As this module is low level it does no character encoding or decoding.
133             If a topic or payload contains non ascii characters it must be manually encoded or decoded as necessary.
134             Note further that topics cannot contain a comma due to the limitations of the simple protocol.
135              
136             =head1 EVENTS
137              
138             Inherits all of the events from L and implements the following new ones.
139              
140             =head2 subscribe / unsubscribe
141              
142             $multiplex->on(subscribe => sub { my ($multiplex, $topic) = @_; ... });
143              
144             Emitted with a topic when the client expresses an interest in subscribing to or leaving the given topic.
145              
146             A server should respond to this message event with a L reply indicating the new subscription state.
147              
148             =head2 message
149              
150             $multiplex->on(message => sub { my ($multiplex, $topic, $payload) = @_; ... });
151              
152             Emitted when a message is received from the client.
153             It is passed the topic and the payload in original encoded form (bytes).
154              
155             =head2 status
156              
157             $multiplex->on(status => sub { my ($multiplex, $topic, $payload) = @_; ... });
158              
159             Emitted when a client attempts to indicate its own subscription status of a topic (rare) or else requests the subscription status for a given topic (proposed usage).
160             Emitted with a topic name and either true or false (but defined) value when indicating the state or undefined when requesting a state.
161              
162             The server may reply to these requests but none is required.
163             For agreement with an indicated state or sending the requested current state, use L.
164             For disagreeing with the indicated state, an error should be sent with L.
165              
166             =head2 error
167              
168             $multiplex->on(error => sub { my ($multiplex, $topic, $error) = @_; ... });
169              
170             Emitted when a client sends a message which is not understood or other errors.
171             Fatal if not handled
172             Passed the topic and an error data structure.
173             This structure contains an C key which defines the error and a C key which contains the raw parsed error.
174              
175             =head2 finish
176              
177             $multiplex->on(finish => sub { my ($multiplex, $tx, $code, $reason) = @_; ... });
178              
179             Emitted when the websocket connection is finished.
180             This event is proxied from the transaction for convenience.
181              
182             =head1 ATTRIBUTES
183              
184             Inherits all of the attributes from L and implements the following new ones.
185              
186             =head2 tx
187              
188             The transaction associated with the websocket.
189             This should be an instance of L.
190              
191             =head1 METHODS
192              
193             Inherits all of the methods from L and implements the following new ones.
194              
195             =head2 send_status
196              
197             $multiplex->send_status($topic, $state, $cb);
198              
199             Send the current state of a topic subscription (as in the response from L and L or request the client's subscription state (rare).
200             Takes a topic, a state, and an optional drain callback.
201             The state variable will be coerced from any true, false or undefined values, where true and false values indicate subscribed or not subscribed respectively.
202             Undefined propts the client to respond with its own notion of the subscription state, which is a rare thing for a server to do; additionally the client may not implement the response to the request.
203              
204             =head2 send
205              
206             $multiplex->send($topic, $payload, $cb);
207              
208             Send a message to the client on the given topic.
209             Takes a topic, a payload, and an optional drain callback.
210             As mentioned above, neither the topic name or payload are encoded before sending, so be sure to do so manually if necessary.
211              
212             =head2 send_error
213              
214             $multiplex->send_error($topic, $payload, $cb);
215              
216             Send an error message to the client on the given topic.
217             Takes a topic, a payload, and an optional drain callback.
218              
219             =head1 PROTOCOL
220              
221             The original protocol (extended below) was proposed by SockJS, can be found at L.
222              
223             The underlying protocol is quite simple. Each message is a string consisting of three comma separated parts: type, topic and payload. There are three valid message types:
224              
225             sub - expresses a will to subscribe to a given topic.
226             msg - a message with payload is being sent on a topic.
227             uns - a will to unsubscribe from a topic.
228             err - a message with a payload that is emitted as an error
229             sta - a status message stating whether the other party believes you are subscribed to the topic
230             the playload is either "true" or "false"
231             this message is sent as a response to sub/uns messages
232             (TODO: consider empty status message to request the status from the other party?)
233              
234