File Coverage

blib/lib/PAGI/WebSocket.pm
Criterion Covered Total %
statement 366 366 100.0
branch 129 140 92.1
condition 65 79 82.2
subroutine 73 73 100.0
pod 58 58 100.0
total 691 716 96.5


line stmt bran cond sub pod time code
1             package PAGI::WebSocket;
2             $PAGI::WebSocket::VERSION = '0.002001';
3 24     24   2805028 use strict;
  24         42  
  24         932  
4 24     24   107 use warnings;
  24         50  
  24         1215  
5 24     24   102 use Carp qw(croak);
  24         104  
  24         1287  
6 24     24   6861 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  24         249861  
  24         2146  
7 24     24   8861 use Hash::MultiValue;
  24         58274  
  24         1358  
8 24     24   2052 use Future::AsyncAwait;
  24         80008  
  24         193  
9 24     24   1258 use Future;
  24         99  
  24         560  
10 24     24   6192 use JSON::MaybeXS ();
  24         156137  
  24         922  
11 24     24   121 use Scalar::Util qw(blessed);
  24         35  
  24         153720  
12              
13              
14             sub new {
15 162     162 1 2739033 my ($class, $scope, $receive, $send) = @_;
16              
17 162 100 100     1228 croak "PAGI::WebSocket requires scope hashref"
18             unless $scope && ref($scope) eq 'HASH';
19 160 100 100     752 croak "PAGI::WebSocket requires receive coderef"
20             unless $receive && ref($receive) eq 'CODE';
21 158 100 100     652 croak "PAGI::WebSocket requires send coderef"
22             unless $send && ref($send) eq 'CODE';
23             croak "PAGI::WebSocket requires scope type 'websocket', got '$scope->{type}'"
24 156 100 50     857 unless ($scope->{type} // '') eq 'websocket';
25              
26             # Return existing WebSocket object if one was already created for this scope
27             # This ensures consistent state (is_connected, is_closed, callbacks) if
28             # multiple code paths create WebSocket objects from the same scope.
29 155 50       356 return $scope->{'pagi.websocket'} if $scope->{'pagi.websocket'};
30              
31 155         1105 my $self = bless {
32             scope => $scope,
33             receive => $receive,
34             send => $send,
35             _state => 'connecting', # connecting -> connected -> closed
36             _close_code => undef,
37             _close_reason => undef,
38             _on_close => [],
39             _on_error => [],
40             _on_message => [],
41             }, $class;
42              
43             # Cache in scope for reuse (weakened to avoid circular reference leak)
44 155         300 $scope->{'pagi.websocket'} = $self;
45 155         276 Scalar::Util::weaken($scope->{'pagi.websocket'});
46              
47 155         468 return $self;
48             }
49              
50             # Scope property accessors
51 4     4 1 16 sub scope { shift->{scope} }
52 2     2 1 520 sub path { shift->{scope}{path} }
53 2   66 2 1 24 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} }
  2         14  
54 19   100 19 1 68 sub query_string { shift->{scope}{query_string} // '' }
55 2   100 2 1 12 sub scheme { shift->{scope}{scheme} // 'ws' }
56 3   100 3 1 19 sub http_version { shift->{scope}{http_version} // '1.1' }
57 3   100 3 1 19 sub subprotocols { shift->{scope}{subprotocols} // [] }
58 1     1 1 5 sub client { shift->{scope}{client} }
59 1     1 1 5 sub server { shift->{scope}{server} }
60              
61              
62             # Application state (injected by PAGI::Lifespan, read-only)
63             sub state {
64 6     6 1 14 my $self = shift;
65 6   100     41 return $self->{scope}{state} // {};
66             }
67              
68             # Path parameter accessors - captured from URL path by router
69             # Stored in scope->{path_params} for router-agnostic access
70             sub path_params {
71 2     2 1 8 my ($self) = @_;
72 2   100     10 return $self->{scope}{path_params} // {};
73             }
74              
75             sub path_param {
76 6     6 1 28 my ($self, $name) = @_;
77 6   100     18 my $params = $self->{scope}{path_params} // {};
78 6         20 return $params->{$name};
79             }
80              
81             # Internal: URL decode a string (handles + as space)
82             sub _url_decode {
83 58     58   64 my ($str) = @_;
84 58 50       61 return '' unless defined $str;
85 58         69 $str =~ s/\+/ /g;
86 58         78 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  18         48  
87 58         72 return $str;
88             }
89              
90             # Internal: Decode UTF-8 with replacement or croak in strict mode
91             sub _decode_utf8 {
92 50     50   138 my ($str, $strict) = @_;
93 50 50       56 return '' unless defined $str;
94 50 100       55 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
95 50         54 $flag |= LEAVE_SRC;
96 50         138 return decode('UTF-8', $str, $flag);
97             }
98              
99             # Query params as Hash::MultiValue (cached in scope)
100             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
101             sub query_params {
102 32     32 1 278 my ($self, %opts) = @_;
103 32   100     86 my $strict = delete $opts{strict} // 0;
104 32   100     65 my $raw = delete $opts{raw} // 0;
105 32 100       220 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
106              
107 31 100       53 my $cache_key = $raw ? 'pagi.websocket.query.raw' : ($strict ? 'pagi.websocket.query.strict' : 'pagi.websocket.query');
    100          
108 31 100       91 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
109              
110 17         37 my $qs = $self->query_string;
111 17         20 my @pairs;
112              
113 17         65 for my $part (split /[&;]/, $qs) {
114 29 50       46 next unless length $part;
115 29         52 my ($key, $val) = split /=/, $part, 2;
116 29   50     61 $key //= '';
117 29   100     41 $val //= '';
118              
119             # URL decode (handles + as space)
120 29         83 my $key_decoded = _url_decode($key);
121 29         33 my $val_decoded = _url_decode($val);
122              
123             # UTF-8 decode unless raw mode
124 29 100       47 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
125 29 100       699 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
126              
127 28         490 push @pairs, $key_final, $val_final;
128             }
129              
130 16         56 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
131 16         590 return $self->{scope}{$cache_key};
132             }
133              
134             # Raw query params (no UTF-8 decoding)
135             sub raw_query_params {
136 2     2 1 344 my $self = shift;
137 2         6 return $self->query_params(raw => 1);
138             }
139              
140             # Shortcut for single query param
141             sub query {
142 23     23 1 3312 my ($self, $name, %opts) = @_;
143 23         47 return $self->query_params(%opts)->get($name);
144             }
145              
146             # Raw single query param
147             sub raw_query {
148 2     2 1 284 my ($self, $name) = @_;
149 2         6 return $self->query($name, raw => 1);
150             }
151              
152             # Single header lookup (case-insensitive, returns last value)
153             sub header {
154 4     4 1 10 my ($self, $name) = @_;
155 4         7 $name = lc($name);
156 4         5 my $value;
157 4   50     4 for my $pair (@{$self->{scope}{headers} // []}) {
  4         29  
158 20 100       27 if (lc($pair->[0]) eq $name) {
159 4         5 $value = $pair->[1];
160             }
161             }
162 4         13 return $value;
163             }
164              
165             # All headers as Hash::MultiValue (cached in scope)
166             sub headers {
167 3     3 1 576 my $self = shift;
168 3 100       15 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
169              
170 2         3 my @pairs;
171 2   50     2 for my $pair (@{$self->{scope}{headers} // []}) {
  2         6  
172 8         17 push @pairs, lc($pair->[0]), $pair->[1];
173             }
174              
175 2         12 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
176 2         98 return $self->{scope}{'pagi.request.headers'};
177             }
178              
179             # All values for a header
180             sub header_all {
181 2     2 1 5 my ($self, $name) = @_;
182 2         6 return $self->headers->get_all(lc($name));
183             }
184              
185             # State accessors
186 4     4 1 21 sub connection_state { shift->{_state} }
187              
188             sub is_connected {
189 17     17 1 979 my $self = shift;
190 17         80 return $self->{_state} eq 'connected';
191             }
192              
193             sub is_closed {
194 144     144 1 7738 my $self = shift;
195 144         1096 return $self->{_state} eq 'closed';
196             }
197              
198 55     55 1 987 sub close_code { shift->{_close_code} }
199 52     52 1 97 sub close_reason { shift->{_close_reason} }
200              
201             # Outbound flow-control introspection (delegates to the pagi.transport handle)
202             sub buffered_amount {
203 2     2 1 12 my $self = shift;
204 2         10 my $t = $self->{scope}{'pagi.transport'};
205 2 100       10 return 0 unless $t;
206 1         6 return $t->buffered_amount;
207             }
208              
209             sub high_water_mark {
210 2     2 1 589 my $self = shift;
211 2         7 my $t = $self->{scope}{'pagi.transport'};
212 2 100       11 return undef unless $t;
213 1         4 return $t->high_water_mark;
214             }
215              
216             sub low_water_mark {
217 2     2 1 542 my $self = shift;
218 2         8 my $t = $self->{scope}{'pagi.transport'};
219 2 100       10 return undef unless $t;
220 1         6 return $t->low_water_mark;
221             }
222              
223             sub on_high_water {
224 2     2 1 46 my ($self, $cb) = @_;
225 2         7 my $t = $self->{scope}{'pagi.transport'};
226 2 100 66     24 $t->on_high_water($cb) if $t && $t->can('on_high_water');
227 2         16 return $self;
228             }
229              
230             sub on_drain {
231 2     2 1 655 my ($self, $cb) = @_;
232 2         6 my $t = $self->{scope}{'pagi.transport'};
233 2 100 66     22 $t->on_drain($cb) if $t && $t->can('on_drain');
234 2         14 return $self;
235             }
236              
237             sub is_writable {
238 3     3 1 7 my $self = shift;
239 3         8 my $t = $self->{scope}{'pagi.transport'};
240 3 100       11 return 1 unless $t;
241 2         9 my $high = $t->high_water_mark;
242 2 50       10 return 1 unless defined $high;
243 2 100       35 return $t->buffered_amount < $high ? 1 : 0;
244             }
245              
246             # Internal state setters
247             sub _set_state {
248 104     104   228 my ($self, $state) = @_;
249 104         198 $self->{_state} = $state;
250             }
251              
252             sub _set_closed {
253 48     48   115 my ($self, $code, $reason) = @_;
254 48         81 $self->{_state} = 'closed';
255 48   100     157 $self->{_close_code} = $code // 1005;
256 48   100     153 $self->{_close_reason} = $reason // '';
257             }
258              
259             # Register callback to run on disconnect/close
260             sub on_close {
261 17     17 1 295 my ($self, $callback) = @_;
262 17         17 push @{$self->{_on_close}}, $callback;
  17         27  
263 17         26 return $self;
264             }
265              
266             # Internal: run all on_close callbacks exactly once
267 43     43   51 async sub _run_close_callbacks {
268 43         63 my ($self) = @_;
269              
270             # Only run once
271 43 50       92 return if $self->{_close_callbacks_ran};
272 43         69 $self->{_close_callbacks_ran} = 1;
273              
274 43         129 my $code = $self->close_code;
275 43         82 my $reason = $self->close_reason;
276              
277 43         49 for my $cb (@{$self->{_on_close}}) {
  43         100  
278 15         20 eval {
279 15         25 my $r = $cb->($code, $reason);
280             # Only await if callback returns a Future
281 15 100 100     244 if (blessed($r) && $r->isa('Future')) {
282 10         17 await $r;
283             }
284             };
285 15 100       139 if ($@) {
286 1         11 warn "PAGI::WebSocket on_close callback error: $@";
287             }
288             }
289              
290             # Clear all callback arrays to break any closure-based cycles
291 43         94 $self->{_on_close} = [];
292 43         70 $self->{_on_error} = [];
293 43         198 $self->{_on_message} = [];
294             }
295              
296             # Register callback to run on errors
297             sub on_error {
298 10     10 1 177 my ($self, $callback) = @_;
299 10         12 push @{$self->{_on_error}}, $callback;
  10         20  
300 10         16 return $self;
301             }
302              
303             # Register callback to run on message receive
304             sub on_message {
305 12     12 1 208 my ($self, $callback) = @_;
306 12         15 push @{$self->{_on_message}}, $callback;
  12         29  
307 12         24 return $self;
308             }
309              
310             # Generic event registration (Socket.IO style)
311             sub on {
312 8     8 1 505 my ($self, $event, $callback) = @_;
313              
314 8 100       29 if ($event eq 'message') {
    100          
    100          
315 3         38 return $self->on_message($callback);
316             }
317             elsif ($event eq 'close') {
318 2         7 return $self->on_close($callback);
319             }
320             elsif ($event eq 'error') {
321 2         5 return $self->on_error($callback);
322             }
323             else {
324 1         214 croak "Unknown event type: $event (expected message, close, or error)";
325             }
326             }
327              
328             # Internal: trigger error callbacks
329 7     7   87 async sub _trigger_error {
330 7         16 my ($self, $error) = @_;
331              
332 7         8 for my $cb (@{$self->{_on_error}}) {
  7         17  
333 8         13 eval {
334 8         16 my $r = $cb->($error);
335 8 100 66     79 if (blessed($r) && $r->isa('Future')) {
336 2         4 await $r;
337             }
338             };
339 8 100       44 if ($@) {
340 1         9 warn "PAGI::WebSocket on_error callback error: $@";
341             }
342             }
343              
344             # If no error handlers registered, warn
345 7 100       9 if (!@{$self->{_on_error}}) {
  7         38  
346 1         19 warn "PAGI::WebSocket error: $error";
347             }
348             }
349              
350             # Accept the WebSocket connection
351 95     95 1 521 async sub accept {
352 95         154 my ($self, %opts) = @_;
353              
354 95         200 my $event = {
355             type => 'websocket.accept',
356             };
357 95 100       219 $event->{subprotocol} = $opts{subprotocol} if exists $opts{subprotocol};
358 95 100       181 $event->{headers} = $opts{headers} if exists $opts{headers};
359              
360 95         257 await $self->{send}->($event);
361 95         4883 $self->_set_state('connected');
362              
363 95         524 return $self;
364             }
365              
366             # Close the WebSocket connection
367 19     19 1 574 async sub close {
368 19         63 my ($self, $code, $reason) = @_;
369              
370             # Idempotent - don't send close twice
371 19 100       49 return if $self->is_closed;
372              
373 16   100     140 $code //= 1000;
374 16   100     64 $reason //= '';
375              
376 16         78 await $self->{send}->({
377             type => 'websocket.close',
378             code => $code,
379             reason => $reason,
380             });
381              
382 16         631 $self->_set_closed($code, $reason);
383 16         45 await $self->_run_close_callbacks;
384              
385 16         631 return $self;
386             }
387              
388             # Whether the server advertised the WebSocket denial-response extension.
389             # See L.
390             sub supports_denial_response {
391 7     7 1 15 my $self = shift;
392 7 100       31 return $self->{scope}{extensions}{'websocket.http.response'} ? 1 : 0;
393             }
394              
395             # Reject the handshake with a custom HTTP response (status/headers/body) instead
396             # of the bare 403. Falls back to a plain close when the server does not advertise
397             # the extension. Valid only before accept.
398             # See L.
399 4     4 1 16 async sub deny {
400 4         11 my ($self, %opts) = @_;
401              
402 4   50     9 my $status = $opts{status} // 403;
403 4   100     12 my $headers = $opts{headers} // [];
404 4 100       12 my $body = defined $opts{body} ? $opts{body} : '';
405              
406 4 100       10 if (!$self->supports_denial_response) {
407 2         10 await $self->{send}->({ type => 'websocket.close', code => 1008, reason => '' });
408 2         70 $self->_set_closed(1008, '');
409 2         9 return $self;
410             }
411              
412             await $self->{send}->({
413             type => 'websocket.http.response.start',
414             status => $status,
415             headers => $headers,
416 2         9 });
417 2         168 await $self->{send}->({
418             type => 'websocket.http.response.body',
419             body => $body,
420             more => 0,
421             });
422              
423             # An HTTP denial sends a response, not a WebSocket close frame — there is no
424             # RFC6455 close code, so mark closed without recording one (close_code stays
425             # undef). The bare-403 fallback above DID send a real close frame and keeps
426             # its 1008 via _set_closed.
427 2         46 $self->{_state} = 'closed';
428 2         17 return $self;
429             }
430              
431             # Send text message
432 7     7 1 157 async sub send_text {
433 7         15 my ($self, $text) = @_;
434              
435 7 100       16 croak "Cannot send on closed WebSocket" if $self->is_closed;
436              
437 6         56 await $self->{send}->({
438             type => 'websocket.send',
439             text => $text,
440             });
441              
442 6         168 return $self;
443             }
444              
445             # Send binary message
446 3     3 1 496 async sub send_bytes {
447 3         6 my ($self, $bytes) = @_;
448              
449 3 100       9 croak "Cannot send on closed WebSocket" if $self->is_closed;
450              
451 2         7 await $self->{send}->({
452             type => 'websocket.send',
453             bytes => $bytes,
454             });
455              
456 2         49 return $self;
457             }
458              
459             # Send JSON-encoded message
460 6     6 1 831 async sub send_json {
461 6         42 my ($self, $data) = @_;
462              
463 6 100       17 croak "Cannot send on closed WebSocket" if $self->is_closed;
464              
465 5         39 my $json = JSON::MaybeXS::encode_json($data);
466              
467 5         25 await $self->{send}->({
468             type => 'websocket.send',
469             text => $json,
470             });
471              
472 5         201 return $self;
473             }
474              
475             # Safe send methods - return bool instead of throwing
476              
477 8     8 1 169 async sub try_send_text {
478 8         20 my ($self, $text) = @_;
479 8 100       21 return 0 if $self->is_closed;
480              
481 6         9 eval {
482 6         28 await $self->{send}->({
483             type => 'websocket.send',
484             text => $text,
485             });
486             };
487             # A failed send is not a disconnect (a send after close is a silent no-op per
488             # spec), so return false per the try_* contract without fabricating a 1006
489             # close or mutating connection state.
490 6 100       468 return 0 if $@;
491 4         13 return 1;
492             }
493              
494 7     7 1 127 async sub try_send_bytes {
495 7         13 my ($self, $bytes) = @_;
496 7 100       17 return 0 if $self->is_closed;
497              
498 6         10 eval {
499 6         26 await $self->{send}->({
500             type => 'websocket.send',
501             bytes => $bytes,
502             });
503             };
504             # A failed send is not a disconnect (a send after close is a silent no-op per
505             # spec), so return false per the try_* contract without fabricating a 1006
506             # close or mutating connection state.
507 6 100       416 return 0 if $@;
508 4         11 return 1;
509             }
510              
511 7     7 1 147 async sub try_send_json {
512 7         11 my ($self, $data) = @_;
513 7 100       29 return 0 if $self->is_closed;
514              
515 6         38 my $json = JSON::MaybeXS::encode_json($data);
516 6         10 eval {
517 6         22 await $self->{send}->({
518             type => 'websocket.send',
519             text => $json,
520             });
521             };
522             # A failed send is not a disconnect (a send after close is a silent no-op per
523             # spec), so return false per the try_* contract without fabricating a 1006
524             # close or mutating connection state.
525 6 100       370 return 0 if $@;
526 4         15 return 1;
527             }
528              
529             # Silent send methods - no-op when closed
530              
531 3     3 1 86 async sub send_text_if_connected {
532 3         9 my ($self, $text) = @_;
533 3 100       8 return unless $self->is_connected;
534 2         6 await $self->try_send_text($text);
535 2         85 return;
536             }
537              
538 3     3 1 48 async sub send_bytes_if_connected {
539 3         6 my ($self, $bytes) = @_;
540 3 100       7 return unless $self->is_connected;
541 2         8 await $self->try_send_bytes($bytes);
542 2         44 return;
543             }
544              
545 3     3 1 95 async sub send_json_if_connected {
546 3         8 my ($self, $data) = @_;
547 3 100       9 return unless $self->is_connected;
548 2         8 await $self->try_send_json($data);
549 2         115 return;
550             }
551              
552             # Receive methods
553              
554 69     69 1 309 async sub receive {
555 69         88 my ($self) = @_;
556              
557 69 100       116 return undef if $self->is_closed;
558              
559 64         77 while (1) {
560 99         178 my $event = await $self->{receive}->();
561              
562 98 100 66     3055 if (!defined($event) || $event->{type} eq 'websocket.disconnect') {
563             # 1005 = No Status Rcvd (RFC 6455)
564 27   100     69 my $code = $event->{code} // 1005;
565 27   100     147 my $reason = $event->{reason} // '';
566 27         62 $self->_set_closed($code, $reason);
567 27         52 await $self->_run_close_callbacks;
568 27         693 return undef;
569             }
570              
571             # websocket.connect is a handshake event, not application data, so it is
572             # filtered out of the message stream here. PAGI's handshake contract: the
573             # server sends websocket.connect and waits for the app's reply; the app
574             # replies by sending accept()/close(). The app does not need to consume
575             # the connect event itself — this filter makes a stray one a no-op rather
576             # than surfacing it as a message. See accept() for the contract.
577 71 100       209 next if $event->{type} eq 'websocket.connect';
578              
579 36         120 return $event;
580             }
581             }
582              
583 28     28 1 1029 async sub receive_text {
584 28         46 my ($self) = @_;
585              
586 28         37 while (1) {
587 30         86 my $event = await $self->receive;
588 30 100       896 return undef unless $event;
589              
590             # Skip non-receive events and binary frames
591 20 50       66 next unless $event->{type} eq 'websocket.receive';
592 20 100       42 next unless exists $event->{text};
593              
594 18         52 return $event->{text};
595             }
596             }
597              
598 5     5 1 609 async sub receive_bytes {
599 5         9 my ($self) = @_;
600              
601 5         6 while (1) {
602 5         13 my $event = await $self->receive;
603 5 100       158 return undef unless $event;
604              
605             # Skip non-receive events and text frames
606 3 50       12 next unless $event->{type} eq 'websocket.receive';
607 3 50       7 next unless exists $event->{bytes};
608              
609 3         10 return $event->{bytes};
610             }
611             }
612              
613 4     4 1 586 async sub receive_json {
614 4         8 my ($self) = @_;
615              
616 4         12 my $text = await $self->receive_text;
617 4 100       188 return undef unless defined $text;
618              
619 3         73 return JSON::MaybeXS::decode_json($text);
620             }
621              
622             # Iteration helpers
623              
624 2     2 1 32 async sub each_message {
625 2         4 my ($self, $callback) = @_;
626              
627 2         7 while (my $event = await $self->receive) {
628 5 50       179 next unless $event->{type} eq 'websocket.receive';
629 5         8 await $callback->($event);
630             }
631              
632 2         43 return;
633             }
634              
635 6     6 1 164 async sub each_text {
636 6         12 my ($self, $callback) = @_;
637              
638 6         21 while (my $text = await $self->receive_text) {
639 9         243 await $callback->($text);
640             }
641              
642 5         106 return;
643             }
644              
645 1     1 1 2 async sub each_bytes {
646 1         2 my ($self, $callback) = @_;
647              
648 1         2 while (my $bytes = await $self->receive_bytes) {
649 1         25 await $callback->($bytes);
650             }
651              
652 1         23 return;
653             }
654              
655 3     3 1 50 async sub each_json {
656 3         8 my ($self, $callback) = @_;
657              
658 3         5 while (1) {
659 6         134 my $text = await $self->receive_text;
660 6 100       153 last unless defined $text;
661              
662 3         20 my $data = JSON::MaybeXS::decode_json($text);
663 3         6 await $callback->($data);
664             }
665              
666 3         48 return;
667             }
668              
669             # Callback-based event loop (alternative to each_* iteration)
670 9     9 1 48 async sub run {
671 9         21 my ($self) = @_;
672              
673 9         25 while (1) {
674 16         81 my $event = eval { await $self->receive };
  16         42  
675 16 100       783 if (my $err = $@) {
676 1         16 warn "PAGI::WebSocket receive error: $err";
677 1         9 last;
678             }
679 15 100       31 last unless $event;
680              
681 7 50       17 next unless $event->{type} eq 'websocket.receive';
682              
683 7   33     15 my $data = $event->{text} // $event->{bytes};
684              
685 7         10 for my $cb (@{$self->{_on_message}}) {
  7         14  
686 8         9 eval {
687 8         15 my $r = $cb->($data, $event);
688             # Await if callback returns a Future
689 5 100 66     49 if (blessed($r) && $r->isa('Future')) {
690 1         3 await $r;
691             }
692             };
693 8 100       44 if (my $err = $@) {
694 3         8 await $self->_trigger_error($err);
695             }
696             }
697             }
698              
699 9         25 return;
700             }
701              
702             # Keepalive support using WebSocket protocol-level ping/pong (RFC 6455)
703             # Sends websocket.keepalive event to server - loop-agnostic, server handles timers
704 6     6 1 19 async sub keepalive {
705 6         11 my ($self, $interval, $timeout) = @_;
706              
707 6   50     15 $interval //= 0;
708              
709 6         12 my $event = {
710             type => 'websocket.keepalive',
711             interval => $interval,
712             };
713 6 100       47 $event->{timeout} = $timeout if defined $timeout;
714              
715 6         17 await $self->{send}->($event);
716              
717 6         277 return $self;
718             }
719              
720             1;
721              
722             __END__