File Coverage

blib/lib/AnyEvent/WebSocket/Connection.pm
Criterion Covered Total %
statement 138 140 98.5
branch 36 40 90.0
condition 3 3 100.0
subroutine 21 21 100.0
pod 3 4 75.0
total 201 208 96.6


line stmt bran cond sub pod time code
1             package AnyEvent::WebSocket::Connection;
2              
3 14     14   1079009 use strict;
  14         57  
  14         366  
4 14     14   65 use warnings;
  14         25  
  14         307  
5 14     14   2254 use Moo;
  14         45340  
  14         69  
6 14     14   10642 use Protocol::WebSocket::Frame;
  14         634204  
  14         341  
7 14     14   93 use Scalar::Util ();
  14         27  
  14         202  
8 14     14   69 use Encode ();
  14         35  
  14         193  
9 14     14   1911 use AE;
  14         314  
  14         382  
10 14     14   5413 use AnyEvent::WebSocket::Message;
  14         37  
  14         436  
11 14     14   3840 use PerlX::Maybe qw( maybe provided );
  14         20448  
  14         58  
12 14     14   796 use Carp ();
  14         27  
  14         17846  
13              
14             # ABSTRACT: WebSocket connection for AnyEvent
15             our $VERSION = '0.53'; # VERSION
16              
17              
18             has handle => (
19             is => 'ro',
20             required => 1,
21             );
22              
23              
24             has masked => (
25             is => 'ro',
26             default => sub { 0 },
27             );
28              
29              
30             has subprotocol => (
31             is => 'ro',
32             );
33              
34              
35             has max_payload_size => (
36             is => 'ro',
37             );
38              
39              
40             has max_fragments => (
41             is => 'ro',
42             );
43              
44              
45             has close_code => (
46             is => 'rw',
47             );
48              
49              
50             has close_reason => (
51             is => 'rw',
52             );
53              
54              
55             has close_error => (
56             is => 'rw',
57             );
58              
59             foreach my $type (qw( each_message next_message finish parse_error ))
60             {
61             has "_${type}_cb" => (
62             is => 'ro',
63             init_arg => undef,
64             default => sub { [] },
65             );
66             }
67              
68             foreach my $flag (qw( _is_read_open _is_write_open ))
69             {
70             has $flag => (
71             is => 'rw',
72             init_arg => undef,
73             default => sub { 1 },
74             );
75             }
76              
77             has "_is_finished" => (
78             is => 'rw',
79             init_arg => undef,
80             default => sub { 0 },
81             );
82              
83             sub BUILD
84             {
85 69     69 0 548 my $self = shift;
86 69         238 Scalar::Util::weaken $self;
87            
88 69         133 my @temp_messages = ();
89 69         116 my $are_callbacks_supposed_to_be_ready = 0;
90            
91             my $finish = sub {
92 35     35   8857 my(undef, undef, $message) = @_;
93 35         87 my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
94 35 50       166 return if $self->_is_finished;
95             eval
96 35         65 {
97 35         117 $self->_process_message($_) foreach @temp_messages;
98             };
99 35         115 @temp_messages = ();
100 35         98 $self->_is_finished(1);
101 35         124 $self->handle->push_shutdown;
102 35         1137 $self->_is_read_open(0);
103 35         87 $self->_is_write_open(0);
104 35 100       107 $self->close_error($message) if defined $message;
105 35         74 $_->($self, $message) for @{ $self->_finish_cb };
  35         186  
106 69         317 };
107 69         403 $self->handle->on_error($finish);
108 69         541 $self->handle->on_eof($finish);
109              
110 69         747 my $frame = Protocol::WebSocket::Frame->new(
111             maybe max_payload_size => $self->max_payload_size,
112             maybe max_fragments_amount => $self->max_fragments,
113             );
114              
115             my $read_cb = sub {
116 236     236   82250 my ($handle) = @_;
117 236         349 local $@;
118 236         350 my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
119             my $success = eval
120 236         394 {
121 236         810 $frame->append($handle->{rbuf});
122 236         2387 while(defined(my $body = $frame->next_bytes))
123             {
124 105 100       12781 next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
125 104         284 my $message = AnyEvent::WebSocket::Message->new(
126             body => $body,
127             opcode => $frame->opcode,
128             );
129 104 100       12470 if($are_callbacks_supposed_to_be_ready)
130             {
131 61         135 $self->_process_message($message);
132             }
133             else
134             {
135 43         170 push(@temp_messages, $message);
136             }
137             }
138 232         9808 1; # succeed to parse.
139             };
140 236 100       1133 if(!$success)
141             {
142 4         56 $self->_force_shutdown();
143 4         8 $_->($self, $@) for @{ $self->_parse_error_cb };
  4         30  
144             }
145 69         2146 };
146              
147              
148             # Message processing (calling _process_message) is delayed by
149             # $are_callbacks_supposed_to_be_ready flag. This is necessary to
150             # make sure all received messages are delivered to each_message and
151             # next_message callbacks. If there is some data in rbuf, changing
152             # the on_read callback makes the callback fire, but there is of
153             # course no each_message/next_message callback to receive the
154             # message yet. So we put messages to @temp_messages for a
155             # while. After the control is returned to the user, who sets up
156             # each_message/next_message callbacks, @temp_messages are processed.
157              
158             # An alternative approach would be temporarily disabling on_read by
159             # $self->handle->on_read(undef). However, this can cause a weird
160             # situation in TLS mode, because on_eof can fire even if we don't
161             # have any on_read (
162             # https://metacpan.org/pod/AnyEvent::Handle#I-get-different-callback-invocations-in-TLS-mode-Why-cant-I-pause-reading
163             # )
164 69         344 $self->handle->on_read($read_cb);
165 69         2226 my $idle_w; $idle_w = AE::idle sub {
166 57     57   876 undef $idle_w;
167 57 100       228 if(defined($self))
168             {
169 23         45 my $strong_self = $self;
170 23         37 $are_callbacks_supposed_to_be_ready = 1;
171 23         40 local $@;
172             my $success = eval
173 23         51 {
174 23         92 $self->_process_message($_) foreach @temp_messages;
175 23         605 1;
176             };
177 23         90 @temp_messages = ();
178 23 50       237 if(!$success)
179             {
180 0         0 $self->_force_shutdown();
181             }
182             }
183 69         978 };
184             }
185              
186             sub _process_message
187             {
188 104     104   265 my ($self, $received_message) = @_;
189 104 100       287 return if !$self->_is_read_open;
190            
191 103 100 100     296 if($received_message->is_text || $received_message->is_binary)
    100          
    50          
192             {
193             # make a copy in order to allow specifying new callbacks inside the
194             # currently executed next_callback itself. otherwise, any next_callback
195             # added inside the currently executed callback would be added to the end
196             # of the array and executed for the currently processed message instead of
197             # actually the next one.
198 77         112 my @next_callbacks = @{ $self->_next_message_cb };
  77         185  
199 77         116 @{ $self->_next_message_cb } = ();
  77         153  
200 77         204 $_->($self, $received_message) for @next_callbacks;
201              
202             # make a copy in case one of the callbacks get
203             # unregistered in the middle of the loop
204 77         3114 my @callbacks = @{ $self->_each_message_cb };
  77         191  
205             $_->($self, $received_message, $self->_cancel_for(each_message => $_) )
206 77         285 for @callbacks;
207             }
208             elsif($received_message->is_close)
209             {
210 25         87 my $body = $received_message->body;
211 25 100       71 if($body)
212             {
213 19         82 my($code, $reason) = unpack 'na*', $body;
214 19         65 $self->close_code($code);
215 19         72 $self->close_reason(Encode::decode('UTF-8', $reason));
216             }
217 25         1530 $self->_is_read_open(0);
218 25         65 $self->close();
219             }
220             elsif($received_message->is_ping)
221             {
222 1         25 $self->send(AnyEvent::WebSocket::Message->new(opcode => 10, body => $received_message->body));
223             }
224             }
225              
226             sub _force_shutdown
227             {
228 4     4   14 my ($self) = @_;
229 4         33 $self->handle->push_shutdown;
230 4         209 $self->_is_write_open(0);
231 4         21 $self->_is_read_open(0);
232             }
233              
234              
235             sub send
236             {
237 131     131 1 24324 my($self, $message) = @_;
238 131         201 my $frame;
239            
240 131 100       406 return $self if !$self->_is_write_open;
241            
242 112 100       271 if(ref $message)
243             {
244 47         248 $frame = Protocol::WebSocket::Frame->new(opcode => $message->opcode, buffer => $message->body, masked => $self->masked, max_payload_size => 0);
245             }
246             else
247             {
248 65         317 $frame = Protocol::WebSocket::Frame->new(buffer => $message, masked => $self->masked, max_payload_size => 0);
249             }
250 112         3510 $self->handle->push_write($frame->to_bytes);
251 112         202486 $self;
252             }
253              
254              
255             sub _cancel_for
256             {
257 136     136   700 my( $self, $event, $handler ) = @_;
258              
259 136         336 my $handler_id = Scalar::Util::refaddr($handler);
260              
261             return sub {
262 2     2   9 my $accessor = "_${event}_cb";
263 2         10 @{ $self->$accessor } = grep { Scalar::Util::refaddr($_) != $handler_id }
  5         12  
264 2         5 @{ $self->$accessor };
  2         6  
265 136         671 };
266             }
267              
268             sub on
269             {
270 85     85 1 33519 my($self, $event, $cb) = @_;
271            
272 85 100       721 if($event eq 'next_message')
    100          
    100          
    50          
273             {
274 32         43 push @{ $self->_next_message_cb }, $cb;
  32         119  
275             }
276             elsif($event eq 'each_message')
277             {
278 16         39 push @{ $self->_each_message_cb }, $cb;
  16         74  
279             }
280             elsif($event eq 'finish')
281             {
282 35         69 push @{ $self->_finish_cb }, $cb;
  35         122  
283             }
284             elsif($event eq 'parse_error')
285             {
286 2         6 push @{ $self->_parse_error_cb }, $cb;
  2         9  
287             }
288             else
289             {
290 0         0 Carp::croak "unrecongized event: $event";
291             }
292              
293 85         265 return $self->_cancel_for($event,$cb);
294             }
295              
296              
297             sub close
298             {
299 40     40 1 664 my($self, $code, $reason) = @_;
300              
301 40 100       155 my $body = pack('n', ($code) ? $code : '1005');
302              
303 40 100       144 $body .= Encode::encode 'UTF-8', $reason if defined $reason;
304              
305 40         1029 $self->send(AnyEvent::WebSocket::Message->new(
306             opcode => 8,
307             body => $body,
308             ));
309 40         245 $self->handle->push_shutdown;
310 40         1812 $self->_is_write_open(0);
311 40         162 $self;
312             }
313              
314             if($] < 5.010)
315             {
316             # This is a workaround for GH#19
317             # https://github.com/plicease/AnyEvent-WebSocket-Client/issues/19
318             # I am not 100% sure about this, but maybe as a trade off it isn't
319             # too bad? The previous workaround was to downgrade to AE 6.x
320             # something. Unfortunately, we now require AE 7.x something for
321             # SSL bug fixes.
322             *DEMOLISH = sub
323             {
324             my($self) = @_;
325             eval { $self->handle->push_shutdown } if $self->_is_write_open;
326             };
327             }
328              
329             1;
330              
331             __END__