File Coverage

blib/lib/PAGI/WebSocket.pm
Criterion Covered Total %
statement 315 321 98.1
branch 101 112 90.1
condition 57 68 83.8
subroutine 65 66 98.4
pod 51 51 100.0
total 589 618 95.3


line stmt bran cond sub pod time code
1             package PAGI::WebSocket;
2 17     17   2034540 use strict;
  17         26  
  17         582  
3 17     17   74 use warnings;
  17         42  
  17         844  
4 17     17   77 use Carp qw(croak);
  17         23  
  17         893  
5 17     17   4804 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  17         168835  
  17         1284  
6 17     17   5561 use Hash::MultiValue;
  17         34337  
  17         828  
7 17     17   1737 use Future::AsyncAwait;
  17         77140  
  17         120  
8 17     17   883 use Future;
  17         44  
  17         492  
9 17     17   5432 use JSON::MaybeXS ();
  17         108943  
  17         624  
10 17     17   86 use Scalar::Util qw(blessed);
  17         19  
  17         85570  
11              
12              
13             sub new {
14 105     105 1 1922709 my ($class, $scope, $receive, $send) = @_;
15              
16 105 100 100     673 croak "PAGI::WebSocket requires scope hashref"
17             unless $scope && ref($scope) eq 'HASH';
18 103 100 100     469 croak "PAGI::WebSocket requires receive coderef"
19             unless $receive && ref($receive) eq 'CODE';
20 101 100 100     436 croak "PAGI::WebSocket requires send coderef"
21             unless $send && ref($send) eq 'CODE';
22             croak "PAGI::WebSocket requires scope type 'websocket', got '$scope->{type}'"
23 99 100 50     449 unless ($scope->{type} // '') eq 'websocket';
24              
25             # Return existing WebSocket object if one was already created for this scope
26             # This ensures consistent state (is_connected, is_closed, callbacks) if
27             # multiple code paths create WebSocket objects from the same scope.
28 98 50       203 return $scope->{'pagi.websocket'} if $scope->{'pagi.websocket'};
29              
30 98         650 my $self = bless {
31             scope => $scope,
32             receive => $receive,
33             send => $send,
34             _state => 'connecting', # connecting -> connected -> closed
35             _close_code => undef,
36             _close_reason => undef,
37             _on_close => [],
38             _on_error => [],
39             _on_message => [],
40             }, $class;
41              
42             # Cache in scope for reuse (weakened to avoid circular reference leak)
43 98         164 $scope->{'pagi.websocket'} = $self;
44 98         154 Scalar::Util::weaken($scope->{'pagi.websocket'});
45              
46 98         209 return $self;
47             }
48              
49             # Scope property accessors
50 1     1 1 4 sub scope { shift->{scope} }
51 1     1 1 8 sub path { shift->{scope}{path} }
52 2   66 2 1 5 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} }
  2         12  
53 15   100 15 1 44 sub query_string { shift->{scope}{query_string} // '' }
54 2   100 2 1 12 sub scheme { shift->{scope}{scheme} // 'ws' }
55 2   100 2 1 11 sub http_version { shift->{scope}{http_version} // '1.1' }
56 2   100 2 1 14 sub subprotocols { shift->{scope}{subprotocols} // [] }
57 1     1 1 3 sub client { shift->{scope}{client} }
58 1     1 1 6 sub server { shift->{scope}{server} }
59              
60             # Per-connection storage - lives in scope, shared across Request/Response/WebSocket/SSE
61             # See PAGI::Request for detailed design notes on why stash is scope-based.
62             sub stash {
63 12     12 1 132 my $self = shift;
64 12   100     67 return $self->{scope}{'pagi.stash'} //= {};
65             }
66              
67             # Application state (injected by PAGI::Lifespan, read-only)
68             sub state {
69 7     7 1 17 my $self = shift;
70 7   100     44 return $self->{scope}{state} // {};
71             }
72              
73             # Path parameter accessors - captured from URL path by router
74             # Stored in scope->{path_params} for router-agnostic access
75             sub path_params {
76 2     2 1 9 my ($self) = @_;
77 2   100     8 return $self->{scope}{path_params} // {};
78             }
79              
80             sub path_param {
81 5     5 1 26 my ($self, $name) = @_;
82 5   100     16 my $params = $self->{scope}{path_params} // {};
83 5         20 return $params->{$name};
84             }
85              
86             # Internal: URL decode a string (handles + as space)
87             sub _url_decode {
88 46     46   71 my ($str) = @_;
89 46 50       55 return '' unless defined $str;
90 46         55 $str =~ s/\+/ /g;
91 46         60 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  16         38  
92 46         66 return $str;
93             }
94              
95             # Internal: Decode UTF-8 with replacement or croak in strict mode
96             sub _decode_utf8 {
97 42     42   54 my ($str, $strict) = @_;
98 42 50       51 return '' unless defined $str;
99 42 100       45 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
100 42         40 $flag |= LEAVE_SRC;
101 42         123 return decode('UTF-8', $str, $flag);
102             }
103              
104             # Query params as Hash::MultiValue (cached in scope)
105             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
106             sub query_params {
107 26     26 1 280 my ($self, %opts) = @_;
108 26   100     74 my $strict = delete $opts{strict} // 0;
109 26   100     49 my $raw = delete $opts{raw} // 0;
110 26 100       228 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
111              
112 25 100       47 my $cache_key = $raw ? 'pagi.websocket.query.raw' : ($strict ? 'pagi.websocket.query.strict' : 'pagi.websocket.query');
    100          
113 25 100       82 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
114              
115 13         22 my $qs = $self->query_string;
116 13         15 my @pairs;
117              
118 13         53 for my $part (split /[&;]/, $qs) {
119 23 50       32 next unless length $part;
120 23         42 my ($key, $val) = split /=/, $part, 2;
121 23   50     34 $key //= '';
122 23   100     35 $val //= '';
123              
124             # URL decode (handles + as space)
125 23         29 my $key_decoded = _url_decode($key);
126 23         31 my $val_decoded = _url_decode($val);
127              
128             # UTF-8 decode unless raw mode
129 23 100       39 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
130 23 100       615 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
131              
132 22         382 push @pairs, $key_final, $val_final;
133             }
134              
135 12         37 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
136 12         439 return $self->{scope}{$cache_key};
137             }
138              
139             # Raw query params (no UTF-8 decoding)
140             sub raw_query_params {
141 1     1 1 303 my $self = shift;
142 1         3 return $self->query_params(raw => 1);
143             }
144              
145             # Shortcut for single query param
146             sub query {
147 19     19 1 3216 my ($self, $name, %opts) = @_;
148 19         42 return $self->query_params(%opts)->get($name);
149             }
150              
151             # Raw single query param
152             sub raw_query {
153 1     1 1 292 my ($self, $name) = @_;
154 1         2 return $self->query($name, raw => 1);
155             }
156              
157             # Single header lookup (case-insensitive, returns last value)
158             sub header {
159 4     4 1 12 my ($self, $name) = @_;
160 4         6 $name = lc($name);
161 4         4 my $value;
162 4   50     3 for my $pair (@{$self->{scope}{headers} // []}) {
  4         10  
163 20 100       29 if (lc($pair->[0]) eq $name) {
164 4         5 $value = $pair->[1];
165             }
166             }
167 4         26 return $value;
168             }
169              
170             # All headers as Hash::MultiValue (cached in scope)
171             sub headers {
172 2     2 1 544 my $self = shift;
173 2 100       9 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
174              
175 1         1 my @pairs;
176 1   50     2 for my $pair (@{$self->{scope}{headers} // []}) {
  1         3  
177 5         10 push @pairs, lc($pair->[0]), $pair->[1];
178             }
179              
180 1         8 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
181 1         58 return $self->{scope}{'pagi.request.headers'};
182             }
183              
184             # All values for a header
185             sub header_all {
186 1     1 1 2 my ($self, $name) = @_;
187 1         4 return $self->headers->get_all(lc($name));
188             }
189              
190             # State accessors
191 4     4 1 35 sub connection_state { shift->{_state} }
192              
193             sub is_connected {
194 10     10 1 1040 my $self = shift;
195 10         40 return $self->{_state} eq 'connected';
196             }
197              
198             sub is_closed {
199 102     102 1 2907 my $self = shift;
200 102         714 return $self->{_state} eq 'closed';
201             }
202              
203 38     38 1 668 sub close_code { shift->{_close_code} }
204 38     38 1 60 sub close_reason { shift->{_close_reason} }
205              
206             # Internal state setters
207             sub _set_state {
208 71     71   120 my ($self, $state) = @_;
209 71         109 $self->{_state} = $state;
210             }
211              
212             sub _set_closed {
213 37     37   72 my ($self, $code, $reason) = @_;
214 37         54 $self->{_state} = 'closed';
215 37   100     140 $self->{_close_code} = $code // 1005;
216 37   100     96 $self->{_close_reason} = $reason // '';
217             }
218              
219             # Register callback to run on disconnect/close
220             sub on_close {
221 15     15 1 229 my ($self, $callback) = @_;
222 15         17 push @{$self->{_on_close}}, $callback;
  15         21  
223 15         24 return $self;
224             }
225              
226             # Internal: run all on_close callbacks exactly once
227 32     32   33 async sub _run_close_callbacks {
228 32         52 my ($self) = @_;
229              
230             # Only run once
231 32 50       61 return if $self->{_close_callbacks_ran};
232 32         47 $self->{_close_callbacks_ran} = 1;
233              
234 32         64 my $code = $self->close_code;
235 32         54 my $reason = $self->close_reason;
236              
237 32         38 for my $cb (@{$self->{_on_close}}) {
  32         109  
238 13         15 eval {
239 13         21 my $r = $cb->($code, $reason);
240             # Only await if callback returns a Future
241 13 100 66     259 if (blessed($r) && $r->isa('Future')) {
242 11         22 await $r;
243             }
244             };
245 13 100       172 if ($@) {
246 1         61 warn "PAGI::WebSocket on_close callback error: $@";
247             }
248             }
249             }
250              
251             # Register callback to run on errors
252             sub on_error {
253 6     6 1 89 my ($self, $callback) = @_;
254 6         8 push @{$self->{_on_error}}, $callback;
  6         9  
255 6         11 return $self;
256             }
257              
258             # Register callback to run on message receive
259             sub on_message {
260 9     9 1 148 my ($self, $callback) = @_;
261 9         12 push @{$self->{_on_message}}, $callback;
  9         14  
262 9         17 return $self;
263             }
264              
265             # Generic event registration (Socket.IO style)
266             sub on {
267 8     8 1 472 my ($self, $event, $callback) = @_;
268              
269 8 100       25 if ($event eq 'message') {
    100          
    100          
270 3         7 return $self->on_message($callback);
271             }
272             elsif ($event eq 'close') {
273 2         6 return $self->on_close($callback);
274             }
275             elsif ($event eq 'error') {
276 2         6 return $self->on_error($callback);
277             }
278             else {
279 1         176 croak "Unknown event type: $event (expected message, close, or error)";
280             }
281             }
282              
283             # Internal: trigger error callbacks
284             sub _trigger_error {
285 5     5   44 my ($self, $error) = @_;
286              
287 5         6 for my $cb (@{$self->{_on_error}}) {
  5         9  
288 5         6 eval { $cb->($error) };
  5         9  
289 5 50       28 if ($@) {
290 0         0 warn "PAGI::WebSocket on_error callback error: $@";
291             }
292             }
293              
294             # If no error handlers registered, warn
295 5 100       23 if (!@{$self->{_on_error}}) {
  5         40  
296 1         14 warn "PAGI::WebSocket error: $error";
297             }
298             }
299              
300             # Accept the WebSocket connection
301 66     66 1 261 async sub accept {
302 66         192 my ($self, %opts) = @_;
303              
304 66         109 my $event = {
305             type => 'websocket.accept',
306             };
307 66 100       116 $event->{subprotocol} = $opts{subprotocol} if exists $opts{subprotocol};
308 66 100       111 $event->{headers} = $opts{headers} if exists $opts{headers};
309              
310 66         177 await $self->{send}->($event);
311 66         2829 $self->_set_state('connected');
312              
313 66         306 return $self;
314             }
315              
316             # Close the WebSocket connection
317 15     15 1 371 async sub close {
318 15         27 my ($self, $code, $reason) = @_;
319              
320             # Idempotent - don't send close twice
321 15 100       25 return if $self->is_closed;
322              
323 12   100     43 $code //= 1000;
324 12   100     32 $reason //= '';
325              
326 12         40 await $self->{send}->({
327             type => 'websocket.close',
328             code => $code,
329             reason => $reason,
330             });
331              
332 12         335 $self->_set_closed($code, $reason);
333 12         26 await $self->_run_close_callbacks;
334              
335 12         314 return $self;
336             }
337              
338             # Send text message
339 6     6 1 117 async sub send_text {
340 6         12 my ($self, $text) = @_;
341              
342 6 100       14 croak "Cannot send on closed WebSocket" if $self->is_closed;
343              
344 5         20 await $self->{send}->({
345             type => 'websocket.send',
346             text => $text,
347             });
348              
349 5         230 return $self;
350             }
351              
352             # Send binary message
353 2     2 1 450 async sub send_bytes {
354 2         4 my ($self, $bytes) = @_;
355              
356 2 100       4 croak "Cannot send on closed WebSocket" if $self->is_closed;
357              
358 1         3 await $self->{send}->({
359             type => 'websocket.send',
360             bytes => $bytes,
361             });
362              
363 1         23 return $self;
364             }
365              
366             # Send JSON-encoded message
367 6     6 1 472 async sub send_json {
368 6         9 my ($self, $data) = @_;
369              
370 6 100       13 croak "Cannot send on closed WebSocket" if $self->is_closed;
371              
372 5         83 my $json = JSON::MaybeXS::encode_json($data);
373              
374 5         20 await $self->{send}->({
375             type => 'websocket.send',
376             text => $json,
377             });
378              
379 5         150 return $self;
380             }
381              
382             # Safe send methods - return bool instead of throwing
383              
384 4     4 1 105 async sub try_send_text {
385 4         6 my ($self, $text) = @_;
386 4 100       7 return 0 if $self->is_closed;
387              
388 3         3 eval {
389 3         12 await $self->{send}->({
390             type => 'websocket.send',
391             text => $text,
392             });
393             };
394 3 100       307 if ($@) {
395 1         6 $self->_set_closed(1006, 'Connection lost');
396 1         5 return 0;
397             }
398 2         6 return 1;
399             }
400              
401 4     4 1 75 async sub try_send_bytes {
402 4         5 my ($self, $bytes) = @_;
403 4 100       6 return 0 if $self->is_closed;
404              
405 3         3 eval {
406 3         10 await $self->{send}->({
407             type => 'websocket.send',
408             bytes => $bytes,
409             });
410             };
411 3 100       223 if ($@) {
412 1         5 $self->_set_closed(1006, 'Connection lost');
413 1         4 return 0;
414             }
415 2         5 return 1;
416             }
417              
418 4     4 1 77 async sub try_send_json {
419 4         5 my ($self, $data) = @_;
420 4 100       7 return 0 if $self->is_closed;
421              
422 3         19 my $json = JSON::MaybeXS::encode_json($data);
423 3         3 eval {
424 3         10 await $self->{send}->({
425             type => 'websocket.send',
426             text => $json,
427             });
428             };
429 3 100       193 if ($@) {
430 1         5 $self->_set_closed(1006, 'Connection lost');
431 1         4 return 0;
432             }
433 2         5 return 1;
434             }
435              
436             # Silent send methods - no-op when closed
437              
438 2     2 1 50 async sub send_text_if_connected {
439 2         3 my ($self, $text) = @_;
440 2 100       5 return unless $self->is_connected;
441 1         38 await $self->try_send_text($text);
442 1         22 return;
443             }
444              
445 2     2 1 48 async sub send_bytes_if_connected {
446 2         3 my ($self, $bytes) = @_;
447 2 100       4 return unless $self->is_connected;
448 1         3 await $self->try_send_bytes($bytes);
449 1         22 return;
450             }
451              
452 2     2 1 50 async sub send_json_if_connected {
453 2         4 my ($self, $data) = @_;
454 2 100       4 return unless $self->is_connected;
455 1         3 await $self->try_send_json($data);
456 1         22 return;
457             }
458              
459             # Receive methods
460              
461 52     52 1 228 async sub receive {
462 52         58 my ($self) = @_;
463              
464 52 100       149 return undef if $self->is_closed;
465              
466 47         58 while (1) {
467 74         115 my $event = await $self->{receive}->();
468              
469 74 100 66     2098 if (!defined($event) || $event->{type} eq 'websocket.disconnect') {
470             # 1005 = No Status Rcvd (RFC 6455)
471 20   100     61 my $code = $event->{code} // 1005;
472 20   100     56 my $reason = $event->{reason} // '';
473 20         53 $self->_set_closed($code, $reason);
474 20         43 await $self->_run_close_callbacks;
475 20         502 return undef;
476             }
477              
478             # Skip connect events - they're handled by accept()
479 54 100       98 next if $event->{type} eq 'websocket.connect';
480              
481 27         67 return $event;
482             }
483             }
484              
485 25     25 1 675 async sub receive_text {
486 25         36 my ($self) = @_;
487              
488 25         19 while (1) {
489 27         49 my $event = await $self->receive;
490 27 100       862 return undef unless $event;
491              
492             # Skip non-receive events and binary frames
493 17 50       73 next unless $event->{type} eq 'websocket.receive';
494 17 100       26 next unless exists $event->{text};
495              
496 15         39 return $event->{text};
497             }
498             }
499              
500 2     2 1 311 async sub receive_bytes {
501 2         3 my ($self) = @_;
502              
503 2         2 while (1) {
504 2         4 my $event = await $self->receive;
505 2 100       49 return undef unless $event;
506              
507             # Skip non-receive events and text frames
508 1 50       3 next unless $event->{type} eq 'websocket.receive';
509 1 50       3 next unless exists $event->{bytes};
510              
511 1         3 return $event->{bytes};
512             }
513             }
514              
515 3     3 1 341 async sub receive_json {
516 3         4 my ($self) = @_;
517              
518 3         5 my $text = await $self->receive_text;
519 3 100       62 return undef unless defined $text;
520              
521 2         29 return JSON::MaybeXS::decode_json($text);
522             }
523              
524             # Iteration helpers
525              
526 1     1 1 48 async sub each_message {
527 1         2 my ($self, $callback) = @_;
528              
529 1         3 while (my $event = await $self->receive) {
530 3 50       59 next unless $event->{type} eq 'websocket.receive';
531 3         5 await $callback->($event);
532             }
533              
534 1         21 return;
535             }
536              
537 6     6 1 160 async sub each_text {
538 6         11 my ($self, $callback) = @_;
539              
540 6         20 while (my $text = await $self->receive_text) {
541 8         235 await $callback->($text);
542             }
543              
544 5         154 return;
545             }
546              
547 0     0 1 0 async sub each_bytes {
548 0         0 my ($self, $callback) = @_;
549              
550 0         0 while (my $bytes = await $self->receive_bytes) {
551 0         0 await $callback->($bytes);
552             }
553              
554 0         0 return;
555             }
556              
557 3     3 1 73 async sub each_json {
558 3         6 my ($self, $callback) = @_;
559              
560 3         6 while (1) {
561 6         103 my $text = await $self->receive_text;
562 6 100       253 last unless defined $text;
563              
564 3         23 my $data = JSON::MaybeXS::decode_json($text);
565 3         9 await $callback->($data);
566             }
567              
568 3         10 return;
569             }
570              
571             # Callback-based event loop (alternative to each_* iteration)
572 5     5 1 16 async sub run {
573 5         6 my ($self) = @_;
574              
575 5         9 while (my $event = await $self->receive) {
576 5 50       108 next unless $event->{type} eq 'websocket.receive';
577              
578 5   33     11 my $data = $event->{text} // $event->{bytes};
579              
580 5         4 for my $cb (@{$self->{_on_message}}) {
  5         8  
581 6         5 eval {
582 6         11 my $r = $cb->($data, $event);
583             # Await if callback returns a Future
584 5 100 66     43 if (blessed($r) && $r->isa('Future')) {
585 1         2 await $r;
586             }
587             };
588 6 100       33 if ($@) {
589 1         3 $self->_trigger_error($@);
590             }
591             }
592             }
593              
594 5         109 return;
595             }
596              
597             # Keepalive support using WebSocket protocol-level ping/pong (RFC 6455)
598             # Sends websocket.keepalive event to server - loop-agnostic, server handles timers
599 5     5 1 15 async sub keepalive {
600 5         8 my ($self, $interval, $timeout) = @_;
601              
602 5   50     9 $interval //= 0;
603              
604 5         13 my $event = {
605             type => 'websocket.keepalive',
606             interval => $interval,
607             };
608 5 100       32 $event->{timeout} = $timeout if defined $timeout;
609              
610 5         9 await $self->{send}->($event);
611              
612 5         227 return $self;
613             }
614              
615             1;
616              
617             __END__