File Coverage

blib/lib/PAGI/WebSocket.pm
Criterion Covered Total %
statement 331 331 100.0
branch 108 118 91.5
condition 58 69 84.0
subroutine 65 65 100.0
pod 50 50 100.0
total 612 633 96.6


line stmt bran cond sub pod time code
1             package PAGI::WebSocket;
2 20     20   1987565 use strict;
  20         30  
  20         687  
3 20     20   79 use warnings;
  20         33  
  20         925  
4 20     20   75 use Carp qw(croak);
  20         25  
  20         932  
5 20     20   4930 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  20         168672  
  20         1574  
6 20     20   7094 use Hash::MultiValue;
  20         45500  
  20         1036  
7 20     20   1640 use Future::AsyncAwait;
  20         66319  
  20         140  
8 20     20   918 use Future;
  20         27  
  20         388  
9 20     20   4924 use JSON::MaybeXS ();
  20         120611  
  20         695  
10 20     20   103 use Scalar::Util qw(blessed);
  20         26  
  20         102107  
11              
12              
13             sub new {
14 147     147 1 2008415 my ($class, $scope, $receive, $send) = @_;
15              
16 147 100 100     893 croak "PAGI::WebSocket requires scope hashref"
17             unless $scope && ref($scope) eq 'HASH';
18 145 100 100     625 croak "PAGI::WebSocket requires receive coderef"
19             unless $receive && ref($receive) eq 'CODE';
20 143 100 100     554 croak "PAGI::WebSocket requires send coderef"
21             unless $send && ref($send) eq 'CODE';
22             croak "PAGI::WebSocket requires scope type 'websocket', got '$scope->{type}'"
23 141 100 50     596 unless ($scope->{type} // '') eq 'websocket';
24              
25             # Return existing WebSocket object if one was already created for this scope
26             # This ensures consistent state (is_connected, is_closed, callbacks) if
27             # multiple code paths create WebSocket objects from the same scope.
28 140 50       306 return $scope->{'pagi.websocket'} if $scope->{'pagi.websocket'};
29              
30 140         964 my $self = bless {
31             scope => $scope,
32             receive => $receive,
33             send => $send,
34             _state => 'connecting', # connecting -> connected -> closed
35             _close_code => undef,
36             _close_reason => undef,
37             _on_close => [],
38             _on_error => [],
39             _on_message => [],
40             }, $class;
41              
42             # Cache in scope for reuse (weakened to avoid circular reference leak)
43 140         299 $scope->{'pagi.websocket'} = $self;
44 140         211 Scalar::Util::weaken($scope->{'pagi.websocket'});
45              
46 140         457 return $self;
47             }
48              
49             # Scope property accessors
50 4     4 1 84 sub scope { shift->{scope} }
51 2     2 1 564 sub path { shift->{scope}{path} }
52 2   66 2 1 5 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} }
  2         13  
53 19   100 19 1 70 sub query_string { shift->{scope}{query_string} // '' }
54 2   100 2 1 13 sub scheme { shift->{scope}{scheme} // 'ws' }
55 3   100 3 1 21 sub http_version { shift->{scope}{http_version} // '1.1' }
56 3   100 3 1 30 sub subprotocols { shift->{scope}{subprotocols} // [] }
57 1     1 1 4 sub client { shift->{scope}{client} }
58 1     1 1 5 sub server { shift->{scope}{server} }
59              
60              
61             # Application state (injected by PAGI::Lifespan, read-only)
62             sub state {
63 6     6 1 13 my $self = shift;
64 6   100     35 return $self->{scope}{state} // {};
65             }
66              
67             # Path parameter accessors - captured from URL path by router
68             # Stored in scope->{path_params} for router-agnostic access
69             sub path_params {
70 2     2 1 7 my ($self) = @_;
71 2   100     10 return $self->{scope}{path_params} // {};
72             }
73              
74             sub path_param {
75 6     6 1 28 my ($self, $name) = @_;
76 6   100     117 my $params = $self->{scope}{path_params} // {};
77 6         22 return $params->{$name};
78             }
79              
80             # Internal: URL decode a string (handles + as space)
81             sub _url_decode {
82 58     58   99 my ($str) = @_;
83 58 50       107 return '' unless defined $str;
84 58         103 $str =~ s/\+/ /g;
85 58         115 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  18         75  
86 58         121 return $str;
87             }
88              
89             # Internal: Decode UTF-8 with replacement or croak in strict mode
90             sub _decode_utf8 {
91 50     50   97 my ($str, $strict) = @_;
92 50 50       96 return '' unless defined $str;
93 50 100       97 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
94 50         129 $flag |= LEAVE_SRC;
95 50         221 return decode('UTF-8', $str, $flag);
96             }
97              
98             # Query params as Hash::MultiValue (cached in scope)
99             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
100             sub query_params {
101 32     32 1 419 my ($self, %opts) = @_;
102 32   100     106 my $strict = delete $opts{strict} // 0;
103 32   100     84 my $raw = delete $opts{raw} // 0;
104 32 100       240 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
105              
106 31 100       74 my $cache_key = $raw ? 'pagi.websocket.query.raw' : ($strict ? 'pagi.websocket.query.strict' : 'pagi.websocket.query');
    100          
107 31 100       150 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
108              
109 17         50 my $qs = $self->query_string;
110 17         28 my @pairs;
111              
112 17         97 for my $part (split /[&;]/, $qs) {
113 29 50       69 next unless length $part;
114 29         125 my ($key, $val) = split /=/, $part, 2;
115 29   50     73 $key //= '';
116 29   100     54 $val //= '';
117              
118             # URL decode (handles + as space)
119 29         63 my $key_decoded = _url_decode($key);
120 29         52 my $val_decoded = _url_decode($val);
121              
122             # UTF-8 decode unless raw mode
123 29 100       147 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
124 29 100       1071 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
125              
126 28         846 push @pairs, $key_final, $val_final;
127             }
128              
129 16         85 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
130 16         862 return $self->{scope}{$cache_key};
131             }
132              
133             # Raw query params (no UTF-8 decoding)
134             sub raw_query_params {
135 2     2 1 452 my $self = shift;
136 2         6 return $self->query_params(raw => 1);
137             }
138              
139             # Shortcut for single query param
140             sub query {
141 23     23 1 3805 my ($self, $name, %opts) = @_;
142 23         115 return $self->query_params(%opts)->get($name);
143             }
144              
145             # Raw single query param
146             sub raw_query {
147 2     2 1 291 my ($self, $name) = @_;
148 2         7 return $self->query($name, raw => 1);
149             }
150              
151             # Single header lookup (case-insensitive, returns last value)
152             sub header {
153 4     4 1 9 my ($self, $name) = @_;
154 4         7 $name = lc($name);
155 4         5 my $value;
156 4   50     4 for my $pair (@{$self->{scope}{headers} // []}) {
  4         11  
157 20 100       29 if (lc($pair->[0]) eq $name) {
158 4         22 $value = $pair->[1];
159             }
160             }
161 4         14 return $value;
162             }
163              
164             # All headers as Hash::MultiValue (cached in scope)
165             sub headers {
166 3     3 1 549 my $self = shift;
167 3 100       17 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
168              
169 2         3 my @pairs;
170 2   50     4 for my $pair (@{$self->{scope}{headers} // []}) {
  2         9  
171 8         21 push @pairs, lc($pair->[0]), $pair->[1];
172             }
173              
174 2         15 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
175 2         164 return $self->{scope}{'pagi.request.headers'};
176             }
177              
178             # All values for a header
179             sub header_all {
180 2     2 1 7 my ($self, $name) = @_;
181 2         7 return $self->headers->get_all(lc($name));
182             }
183              
184             # State accessors
185 4     4 1 22 sub connection_state { shift->{_state} }
186              
187             sub is_connected {
188 17     17 1 1091 my $self = shift;
189 17         72 return $self->{_state} eq 'connected';
190             }
191              
192             sub is_closed {
193 143     143 1 3804 my $self = shift;
194 143         842 return $self->{_state} eq 'closed';
195             }
196              
197 53     53 1 1206 sub close_code { shift->{_close_code} }
198 53     53 1 113 sub close_reason { shift->{_close_reason} }
199              
200             # Internal state setters
201             sub _set_state {
202 104     104   249 my ($self, $state) = @_;
203 104         180 $self->{_state} = $state;
204             }
205              
206             sub _set_closed {
207 51     51   88 my ($self, $code, $reason) = @_;
208 51         81 $self->{_state} = 'closed';
209 51   100     101 $self->{_close_code} = $code // 1005;
210 51   100     197 $self->{_close_reason} = $reason // '';
211             }
212              
213             # Register callback to run on disconnect/close
214             sub on_close {
215 18     18 1 314 my ($self, $callback) = @_;
216 18         19 push @{$self->{_on_close}}, $callback;
  18         38  
217 18         27 return $self;
218             }
219              
220             # Internal: run all on_close callbacks exactly once
221 46     46   89 async sub _run_close_callbacks {
222 46         62 my ($self) = @_;
223              
224             # Only run once
225 46 50       91 return if $self->{_close_callbacks_ran};
226 46         63 $self->{_close_callbacks_ran} = 1;
227              
228 46         91 my $code = $self->close_code;
229 46         83 my $reason = $self->close_reason;
230              
231 46         57 for my $cb (@{$self->{_on_close}}) {
  46         101  
232 16         18 eval {
233 16         27 my $r = $cb->($code, $reason);
234             # Only await if callback returns a Future
235 16 100 100     256 if (blessed($r) && $r->isa('Future')) {
236 11         18 await $r;
237             }
238             };
239 16 100       138 if ($@) {
240 1         12 warn "PAGI::WebSocket on_close callback error: $@";
241             }
242             }
243              
244             # Clear all callback arrays to break any closure-based cycles
245 46         99 $self->{_on_close} = [];
246 46         73 $self->{_on_error} = [];
247 46         175 $self->{_on_message} = [];
248             }
249              
250             # Register callback to run on errors
251             sub on_error {
252 10     10 1 127 my ($self, $callback) = @_;
253 10         10 push @{$self->{_on_error}}, $callback;
  10         12  
254 10         12 return $self;
255             }
256              
257             # Register callback to run on message receive
258             sub on_message {
259 12     12 1 149 my ($self, $callback) = @_;
260 12         13 push @{$self->{_on_message}}, $callback;
  12         15  
261 12         22 return $self;
262             }
263              
264             # Generic event registration (Socket.IO style)
265             sub on {
266 8     8 1 300 my ($self, $event, $callback) = @_;
267              
268 8 100       19 if ($event eq 'message') {
    100          
    100          
269 3         4 return $self->on_message($callback);
270             }
271             elsif ($event eq 'close') {
272 2         4 return $self->on_close($callback);
273             }
274             elsif ($event eq 'error') {
275 2         3 return $self->on_error($callback);
276             }
277             else {
278 1         166 croak "Unknown event type: $event (expected message, close, or error)";
279             }
280             }
281              
282             # Internal: trigger error callbacks
283 7     7   71 async sub _trigger_error {
284 7         9 my ($self, $error) = @_;
285              
286 7         9 for my $cb (@{$self->{_on_error}}) {
  7         13  
287 8         13 eval {
288 8         12 my $r = $cb->($error);
289 8 100 66     71 if (blessed($r) && $r->isa('Future')) {
290 2         4 await $r;
291             }
292             };
293 8 100       44 if ($@) {
294 1         36 warn "PAGI::WebSocket on_error callback error: $@";
295             }
296             }
297              
298             # If no error handlers registered, warn
299 7 100       7 if (!@{$self->{_on_error}}) {
  7         28  
300 1         13 warn "PAGI::WebSocket error: $error";
301             }
302             }
303              
304             # Accept the WebSocket connection
305 98     98 1 626 async sub accept {
306 98         132 my ($self, %opts) = @_;
307              
308 98         161 my $event = {
309             type => 'websocket.accept',
310             };
311 98 100       224 $event->{subprotocol} = $opts{subprotocol} if exists $opts{subprotocol};
312 98 100       157 $event->{headers} = $opts{headers} if exists $opts{headers};
313              
314 98         228 await $self->{send}->($event);
315 98         4119 $self->_set_state('connected');
316              
317 98         463 return $self;
318             }
319              
320             # Close the WebSocket connection
321 19     19 1 422 async sub close {
322 19         33 my ($self, $code, $reason) = @_;
323              
324             # Idempotent - don't send close twice
325 19 100       43 return if $self->is_closed;
326              
327 16   100     62 $code //= 1000;
328 16   100     47 $reason //= '';
329              
330 16         59 await $self->{send}->({
331             type => 'websocket.close',
332             code => $code,
333             reason => $reason,
334             });
335              
336 16         457 $self->_set_closed($code, $reason);
337 16         67 await $self->_run_close_callbacks;
338              
339 16         537 return $self;
340             }
341              
342             # Send text message
343 9     9 1 148 async sub send_text {
344 9         18 my ($self, $text) = @_;
345              
346 9 100       29 croak "Cannot send on closed WebSocket" if $self->is_closed;
347              
348 8         31 await $self->{send}->({
349             type => 'websocket.send',
350             text => $text,
351             });
352              
353 8         266 return $self;
354             }
355              
356             # Send binary message
357 3     3 1 713 async sub send_bytes {
358 3         6 my ($self, $bytes) = @_;
359              
360 3 100       6 croak "Cannot send on closed WebSocket" if $self->is_closed;
361              
362 2         7 await $self->{send}->({
363             type => 'websocket.send',
364             bytes => $bytes,
365             });
366              
367 2         49 return $self;
368             }
369              
370             # Send JSON-encoded message
371 7     7 1 757 async sub send_json {
372 7         11 my ($self, $data) = @_;
373              
374 7 100       31 croak "Cannot send on closed WebSocket" if $self->is_closed;
375              
376 6         40 my $json = JSON::MaybeXS::encode_json($data);
377              
378 6         20 await $self->{send}->({
379             type => 'websocket.send',
380             text => $json,
381             });
382              
383 6         149 return $self;
384             }
385              
386             # Safe send methods - return bool instead of throwing
387              
388 6     6 1 83 async sub try_send_text {
389 6         8 my ($self, $text) = @_;
390 6 100       12 return 0 if $self->is_closed;
391              
392 5         5 eval {
393 5         14 await $self->{send}->({
394             type => 'websocket.send',
395             text => $text,
396             });
397             };
398 5 100       365 if ($@) {
399 1         7 $self->_set_closed(1006, 'Connection lost');
400 1         5 return 0;
401             }
402 4         11 return 1;
403             }
404              
405 6     6 1 76 async sub try_send_bytes {
406 6         7 my ($self, $bytes) = @_;
407 6 100       10 return 0 if $self->is_closed;
408              
409 5         7 eval {
410 5         14 await $self->{send}->({
411             type => 'websocket.send',
412             bytes => $bytes,
413             });
414             };
415 5 100       283 if ($@) {
416 1         5 $self->_set_closed(1006, 'Connection lost');
417 1         4 return 0;
418             }
419 4         10 return 1;
420             }
421              
422 6     6 1 81 async sub try_send_json {
423 6         8 my ($self, $data) = @_;
424 6 100       11 return 0 if $self->is_closed;
425              
426 5         24 my $json = JSON::MaybeXS::encode_json($data);
427 5         7 eval {
428 5         12 await $self->{send}->({
429             type => 'websocket.send',
430             text => $json,
431             });
432             };
433 5 100       244 if ($@) {
434 1         4 $self->_set_closed(1006, 'Connection lost');
435 1         5 return 0;
436             }
437 4         11 return 1;
438             }
439              
440             # Silent send methods - no-op when closed
441              
442 3     3 1 49 async sub send_text_if_connected {
443 3         40 my ($self, $text) = @_;
444 3 100       9 return unless $self->is_connected;
445 2         6 await $self->try_send_text($text);
446 2         43 return;
447             }
448              
449 3     3 1 49 async sub send_bytes_if_connected {
450 3         5 my ($self, $bytes) = @_;
451 3 100       6 return unless $self->is_connected;
452 2         6 await $self->try_send_bytes($bytes);
453 2         42 return;
454             }
455              
456 3     3 1 51 async sub send_json_if_connected {
457 3         5 my ($self, $data) = @_;
458 3 100       6 return unless $self->is_connected;
459 2         6 await $self->try_send_json($data);
460 2         59 return;
461             }
462              
463             # Receive methods
464              
465 76     76 1 326 async sub receive {
466 76         87 my ($self) = @_;
467              
468 76 100       132 return undef if $self->is_closed;
469              
470 71         80 while (1) {
471 109         186 my $event = await $self->{receive}->();
472              
473 108 100 66     3138 if (!defined($event) || $event->{type} eq 'websocket.disconnect') {
474             # 1005 = No Status Rcvd (RFC 6455)
475 30   100     75 my $code = $event->{code} // 1005;
476 30   100     156 my $reason = $event->{reason} // '';
477 30         77 $self->_set_closed($code, $reason);
478 30         58 await $self->_run_close_callbacks;
479 30         801 return undef;
480             }
481              
482             # Skip connect events - they're handled by accept()
483 78 100       172 next if $event->{type} eq 'websocket.connect';
484              
485 40         116 return $event;
486             }
487             }
488              
489 35     35 1 816 async sub receive_text {
490 35         45 my ($self) = @_;
491              
492 35         33 while (1) {
493 37         64 my $event = await $self->receive;
494 37 100       1101 return undef unless $event;
495              
496             # Skip non-receive events and binary frames
497 24 50       52 next unless $event->{type} eq 'websocket.receive';
498 24 100       40 next unless exists $event->{text};
499              
500 22         62 return $event->{text};
501             }
502             }
503              
504 5     5 1 333 async sub receive_bytes {
505 5         9 my ($self) = @_;
506              
507 5         7 while (1) {
508 5         10 my $event = await $self->receive;
509 5 100       165 return undef unless $event;
510              
511             # Skip non-receive events and text frames
512 3 50       10 next unless $event->{type} eq 'websocket.receive';
513 3 50       7 next unless exists $event->{bytes};
514              
515 3         13 return $event->{bytes};
516             }
517             }
518              
519 4     4 1 392 async sub receive_json {
520 4         6 my ($self) = @_;
521              
522 4         10 my $text = await $self->receive_text;
523 4 100       111 return undef unless defined $text;
524              
525 3         41 return JSON::MaybeXS::decode_json($text);
526             }
527              
528             # Iteration helpers
529              
530 2     2 1 36 async sub each_message {
531 2         4 my ($self, $callback) = @_;
532              
533 2         9 while (my $event = await $self->receive) {
534 5 50       138 next unless $event->{type} eq 'websocket.receive';
535 5         17 await $callback->($event);
536             }
537              
538 2         62 return;
539             }
540              
541 8     8 1 135 async sub each_text {
542 8         20 my ($self, $callback) = @_;
543              
544 8         27 while (my $text = await $self->receive_text) {
545 12         369 await $callback->($text);
546             }
547              
548 7         214 return;
549             }
550              
551 1     1 1 2 async sub each_bytes {
552 1         2 my ($self, $callback) = @_;
553              
554 1         4 while (my $bytes = await $self->receive_bytes) {
555 1         38 await $callback->($bytes);
556             }
557              
558 1         69 return;
559             }
560              
561 4     4 1 66 async sub each_json {
562 4         10 my ($self, $callback) = @_;
563              
564 4         6 while (1) {
565 8         131 my $text = await $self->receive_text;
566 8 100       245 last unless defined $text;
567              
568 4         26 my $data = JSON::MaybeXS::decode_json($text);
569 4         9 await $callback->($data);
570             }
571              
572 4         11 return;
573             }
574              
575             # Callback-based event loop (alternative to each_* iteration)
576 9     9 1 44 async sub run {
577 9         14 my ($self) = @_;
578              
579 9         9 while (1) {
580 16         84 my $event = eval { await $self->receive };
  16         35  
581 16 100       762 if (my $err = $@) {
582 1         17 warn "PAGI::WebSocket receive error: $err";
583 1         8 last;
584             }
585 15 100       29 last unless $event;
586              
587 7 50       11 next unless $event->{type} eq 'websocket.receive';
588              
589 7   33     15 my $data = $event->{text} // $event->{bytes};
590              
591 7         6 for my $cb (@{$self->{_on_message}}) {
  7         11  
592 8         7 eval {
593 8         15 my $r = $cb->($data, $event);
594             # Await if callback returns a Future
595 5 100 66     41 if (blessed($r) && $r->isa('Future')) {
596 1         3 await $r;
597             }
598             };
599 8 100       40 if (my $err = $@) {
600 3         7 await $self->_trigger_error($err);
601             }
602             }
603             }
604              
605 9         23 return;
606             }
607              
608             # Keepalive support using WebSocket protocol-level ping/pong (RFC 6455)
609             # Sends websocket.keepalive event to server - loop-agnostic, server handles timers
610 6     6 1 17 async sub keepalive {
611 6         13 my ($self, $interval, $timeout) = @_;
612              
613 6   50     13 $interval //= 0;
614              
615 6         13 my $event = {
616             type => 'websocket.keepalive',
617             interval => $interval,
618             };
619 6 100       16 $event->{timeout} = $timeout if defined $timeout;
620              
621 6         14 await $self->{send}->($event);
622              
623 6         280 return $self;
624             }
625              
626             1;
627              
628             __END__