File Coverage

blib/lib/AnyEvent/WebSocket/Connection.pm
Criterion Covered Total %
statement 138 140 98.5
branch 36 40 90.0
condition 3 3 100.0
subroutine 21 21 100.0
pod 3 4 75.0
total 201 208 96.6


line stmt bran cond sub pod time code
1             package AnyEvent::WebSocket::Connection;
2              
3 14     14   1384635 use strict;
  14         71  
  14         467  
4 14     14   85 use warnings;
  14         32  
  14         388  
5 14     14   3010 use Moo;
  14         59635  
  14         158  
6 14     14   13361 use Protocol::WebSocket::Frame;
  14         789544  
  14         438  
7 14     14   109 use Scalar::Util ();
  14         41  
  14         320  
8 14     14   95 use Encode ();
  14         44  
  14         270  
9 14     14   2467 use AE;
  14         439  
  14         530  
10 14     14   6470 use AnyEvent::WebSocket::Message;
  14         103  
  14         506  
11 14     14   4932 use PerlX::Maybe qw( maybe provided );
  14         26104  
  14         67  
12 14     14   924 use Carp ();
  14         34  
  14         23008  
13              
14             # ABSTRACT: WebSocket connection for AnyEvent
15             our $VERSION = '0.54'; # VERSION
16              
17              
18             has handle => (
19             is => 'ro',
20             required => 1,
21             );
22              
23              
24             has masked => (
25             is => 'ro',
26             default => sub { 0 },
27             );
28              
29              
30             has subprotocol => (
31             is => 'ro',
32             );
33              
34              
35             has max_payload_size => (
36             is => 'ro',
37             );
38              
39              
40             has max_fragments => (
41             is => 'ro',
42             );
43              
44              
45             has close_code => (
46             is => 'rw',
47             );
48              
49              
50             has close_reason => (
51             is => 'rw',
52             );
53              
54              
55             has close_error => (
56             is => 'rw',
57             );
58              
59             foreach my $type (qw( each_message next_message finish parse_error ))
60             {
61             has "_${type}_cb" => (
62             is => 'ro',
63             init_arg => undef,
64             default => sub { [] },
65             );
66             }
67              
68             foreach my $flag (qw( _is_read_open _is_write_open ))
69             {
70             has $flag => (
71             is => 'rw',
72             init_arg => undef,
73             default => sub { 1 },
74             );
75             }
76              
77             has "_is_finished" => (
78             is => 'rw',
79             init_arg => undef,
80             default => sub { 0 },
81             );
82              
83             sub BUILD
84             {
85 69     69 0 544 my $self = shift;
86 69         304 Scalar::Util::weaken $self;
87              
88 69         151 my @temp_messages = ();
89 69         130 my $are_callbacks_supposed_to_be_ready = 0;
90              
91             my $finish = sub {
92 35     35   10782 my(undef, undef, $message) = @_;
93 35         83 my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
94 35 50       182 return if $self->_is_finished;
95             eval
96 35         68 {
97 35         127 $self->_process_message($_) foreach @temp_messages;
98             };
99 35         126 @temp_messages = ();
100 35         107 $self->_is_finished(1);
101 35         177 $self->handle->push_shutdown;
102 35         1434 $self->_is_read_open(0);
103 35         148 $self->_is_write_open(0);
104 35 100       130 $self->close_error($message) if defined $message;
105 35         120 $_->($self, $message) for @{ $self->_finish_cb };
  35         205  
106 69         353 };
107 69         448 $self->handle->on_error($finish);
108 69         585 $self->handle->on_eof($finish);
109              
110 69         751 my $frame = Protocol::WebSocket::Frame->new(
111             maybe max_payload_size => $self->max_payload_size,
112             maybe max_fragments_amount => $self->max_fragments,
113             );
114              
115             my $read_cb = sub {
116 236     236   101483 my ($handle) = @_;
117 236         436 local $@;
118 236         481 my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
119             my $success = eval
120 236         489 {
121 236         935 $frame->append($handle->{rbuf});
122 236         3048 while(defined(my $body = $frame->next_bytes))
123             {
124 105 100       9348 next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
125 104         328 my $message = AnyEvent::WebSocket::Message->new(
126             body => $body,
127             opcode => $frame->opcode,
128             );
129 104 100       15375 if($are_callbacks_supposed_to_be_ready)
130             {
131 61         175 $self->_process_message($message);
132             }
133             else
134             {
135 43         174 push(@temp_messages, $message);
136             }
137             }
138 232         11934 1; # succeed to parse.
139             };
140 236 100       1391 if(!$success)
141             {
142 4         57 $self->_force_shutdown();
143 4         8 $_->($self, $@) for @{ $self->_parse_error_cb };
  4         30  
144             }
145 69         2539 };
146              
147              
148             # Message processing (calling _process_message) is delayed by
149             # $are_callbacks_supposed_to_be_ready flag. This is necessary to
150             # make sure all received messages are delivered to each_message and
151             # next_message callbacks. If there is some data in rbuf, changing
152             # the on_read callback makes the callback fire, but there is of
153             # course no each_message/next_message callback to receive the
154             # message yet. So we put messages to @temp_messages for a
155             # while. After the control is returned to the user, who sets up
156             # each_message/next_message callbacks, @temp_messages are processed.
157              
158             # An alternative approach would be temporarily disabling on_read by
159             # $self->handle->on_read(undef). However, this can cause a weird
160             # situation in TLS mode, because on_eof can fire even if we don't
161             # have any on_read (
162             # https://metacpan.org/pod/AnyEvent::Handle#I-get-different-callback-invocations-in-TLS-mode-Why-cant-I-pause-reading
163             # )
164 69         702 $self->handle->on_read($read_cb);
165 69         2455 my $idle_w; $idle_w = AE::idle sub {
166 57     57   1179 undef $idle_w;
167 57 100       268 if(defined($self))
168             {
169 23         53 my $strong_self = $self;
170 23         47 $are_callbacks_supposed_to_be_ready = 1;
171 23         46 local $@;
172             my $success = eval
173 23         45 {
174 23         164 $self->_process_message($_) foreach @temp_messages;
175 23         797 1;
176             };
177 23         121 @temp_messages = ();
178 23 50       291 if(!$success)
179             {
180 0         0 $self->_force_shutdown();
181             }
182             }
183 69         1125 };
184             }
185              
186             sub _process_message
187             {
188 104     104   306 my ($self, $received_message) = @_;
189 104 100       330 return if !$self->_is_read_open;
190              
191 103 100 100     358 if($received_message->is_text || $received_message->is_binary)
    100          
    50          
192             {
193             # make a copy in order to allow specifying new callbacks inside the
194             # currently executed next_callback itself. otherwise, any next_callback
195             # added inside the currently executed callback would be added to the end
196             # of the array and executed for the currently processed message instead of
197             # actually the next one.
198 77         131 my @next_callbacks = @{ $self->_next_message_cb };
  77         217  
199 77         143 @{ $self->_next_message_cb } = ();
  77         159  
200 77         231 $_->($self, $received_message) for @next_callbacks;
201              
202             # make a copy in case one of the callbacks get
203             # unregistered in the middle of the loop
204 77         3346 my @callbacks = @{ $self->_each_message_cb };
  77         214  
205             $_->($self, $received_message, $self->_cancel_for(each_message => $_) )
206 77         357 for @callbacks;
207             }
208             elsif($received_message->is_close)
209             {
210 25         83 my $body = $received_message->body;
211 25 100       72 if($body)
212             {
213 19         98 my($code, $reason) = unpack 'na*', $body;
214 19         66 $self->close_code($code);
215 19         80 $self->close_reason(Encode::decode('UTF-8', $reason));
216             }
217 25         1498 $self->_is_read_open(0);
218 25         72 $self->close();
219             }
220             elsif($received_message->is_ping)
221             {
222 1         30 $self->send(AnyEvent::WebSocket::Message->new(opcode => 10, body => $received_message->body));
223             }
224             }
225              
226             sub _force_shutdown
227             {
228 4     4   17 my ($self) = @_;
229 4         34 $self->handle->push_shutdown;
230 4         256 $self->_is_write_open(0);
231 4         26 $self->_is_read_open(0);
232             }
233              
234              
235             sub send
236             {
237 131     131 1 28803 my($self, $message) = @_;
238 131         241 my $frame;
239              
240 131 100       546 return $self if !$self->_is_write_open;
241              
242 112 100       315 if(ref $message)
243             {
244 47         273 $frame = Protocol::WebSocket::Frame->new(opcode => $message->opcode, buffer => $message->body, masked => $self->masked, max_payload_size => 0);
245             }
246             else
247             {
248 65         378 $frame = Protocol::WebSocket::Frame->new(buffer => $message, masked => $self->masked, max_payload_size => 0);
249             }
250 112         4119 $self->handle->push_write($frame->to_bytes);
251 112         253264 $self;
252             }
253              
254              
255             sub _cancel_for
256             {
257 136     136   868 my( $self, $event, $handler ) = @_;
258              
259 136         361 my $handler_id = Scalar::Util::refaddr($handler);
260              
261             return sub {
262 2     2   10 my $accessor = "_${event}_cb";
263 2         13 @{ $self->$accessor } = grep { Scalar::Util::refaddr($_) != $handler_id }
  5         14  
264 2         3 @{ $self->$accessor };
  2         6  
265 136         790 };
266             }
267              
268             sub on
269             {
270 85     85 1 39933 my($self, $event, $cb) = @_;
271              
272 85 100       368 if($event eq 'next_message')
    100          
    100          
    50          
273             {
274 32         58 push @{ $self->_next_message_cb }, $cb;
  32         149  
275             }
276             elsif($event eq 'each_message')
277             {
278 16         85 push @{ $self->_each_message_cb }, $cb;
  16         71  
279             }
280             elsif($event eq 'finish')
281             {
282 35         70 push @{ $self->_finish_cb }, $cb;
  35         134  
283             }
284             elsif($event eq 'parse_error')
285             {
286 2         6 push @{ $self->_parse_error_cb }, $cb;
  2         11  
287             }
288             else
289             {
290 0         0 Carp::croak "unrecongized event: $event";
291             }
292              
293 85         326 return $self->_cancel_for($event,$cb);
294             }
295              
296              
297             sub close
298             {
299 40     40 1 810 my($self, $code, $reason) = @_;
300              
301 40 100       191 my $body = pack('n', ($code) ? $code : '1005');
302              
303 40 100       149 $body .= Encode::encode 'UTF-8', $reason if defined $reason;
304              
305 40         1162 $self->send(AnyEvent::WebSocket::Message->new(
306             opcode => 8,
307             body => $body,
308             ));
309 40         283 $self->handle->push_shutdown;
310 40         2250 $self->_is_write_open(0);
311 40         184 $self;
312             }
313              
314             if($] < 5.010)
315             {
316             # This is a workaround for GH#19
317             # https://github.com/plicease/AnyEvent-WebSocket-Client/issues/19
318             # I am not 100% sure about this, but maybe as a trade off it isn't
319             # too bad? The previous workaround was to downgrade to AE 6.x
320             # something. Unfortunately, we now require AE 7.x something for
321             # SSL bug fixes.
322             *DEMOLISH = sub
323             {
324             my($self) = @_;
325             eval { $self->handle->push_shutdown } if $self->_is_write_open;
326             };
327             }
328              
329             1;
330              
331             __END__