File Coverage

blib/lib/PAGI/SSE.pm
Criterion Covered Total %
statement 290 345 84.0
branch 104 158 65.8
condition 39 77 50.6
subroutine 53 59 89.8
pod 37 41 90.2
total 523 680 76.9


line stmt bran cond sub pod time code
1             package PAGI::SSE;
2 18     18   2158740 use strict;
  18         29  
  18         643  
3 18     18   79 use warnings;
  18         52  
  18         919  
4 18     18   75 use Carp qw(croak);
  18         21  
  18         868  
5 18     18   6022 use Hash::MultiValue;
  18         36248  
  18         921  
6 18     18   1838 use Future::AsyncAwait;
  18         41684  
  18         104  
7 18     18   1023 use Future;
  18         20  
  18         350  
8 18     18   5335 use JSON::MaybeXS ();
  18         135593  
  18         676  
9 18     18   112 use Scalar::Util qw(blessed);
  18         28  
  18         905  
10 18     18   4886 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  18         180383  
  18         90567  
11              
12              
13             sub new {
14 91     91 1 1933765 my ($class, $scope, $receive, $send) = @_;
15              
16 91 100 66     777 croak "PAGI::SSE requires scope hashref"
17             unless $scope && ref($scope) eq 'HASH';
18 90 100 66     585 croak "PAGI::SSE requires receive coderef"
19             unless $receive && ref($receive) eq 'CODE';
20 89 100 66     396 croak "PAGI::SSE requires send coderef"
21             unless $send && ref($send) eq 'CODE';
22             croak "PAGI::SSE requires scope type 'sse', got '$scope->{type}'"
23 88 100 50     343 unless ($scope->{type} // '') eq 'sse';
24              
25             # Return existing SSE object if one was already created for this scope
26             # This ensures consistent state (is_started, is_closed, callbacks) if
27             # multiple code paths create SSE objects from the same scope.
28 87 50       162 return $scope->{'pagi.sse'} if $scope->{'pagi.sse'};
29              
30 87         609 my $self = bless {
31             scope => $scope,
32             receive => $receive,
33             send => $send,
34             _state => 'pending', # pending -> started -> closed
35             _on_close => [],
36             _on_error => [],
37             _disconnect_reason => undef, # Set when disconnect received
38             }, $class;
39              
40             # Cache in scope for reuse (weakened to avoid circular reference leak)
41 87         145 $scope->{'pagi.sse'} = $self;
42 87         127 Scalar::Util::weaken($scope->{'pagi.sse'});
43              
44 87         254 return $self;
45             }
46              
47             # Scope property accessors
48 5     5 1 16 sub scope { shift->{scope} }
49 2   50 2 1 506 sub path { shift->{scope}{path} // '/' }
50 0   0 0 1 0 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} // '/' }
  0   0     0  
51 5   50 5 1 15 sub query_string { shift->{scope}{query_string} // '' }
52              
53             # URL decode helper (handles + as space per application/x-www-form-urlencoded)
54             sub _url_decode {
55 12     12   14 my ($str) = @_;
56 12 50       14 return '' unless defined $str;
57 12         8 $str =~ s/\+/ /g;
58 12         14 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  2         6  
59 12         14 return $str;
60             }
61              
62             # UTF-8 decode helper with strict/lenient mode
63             sub _decode_utf8 {
64 8     8   9 my ($str, $strict) = @_;
65 8 50       8 return '' unless defined $str;
66 8 50       10 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
67 8         4 $flag |= LEAVE_SRC;
68 8         27 return decode('UTF-8', $str, $flag);
69             }
70              
71             # Query params as Hash::MultiValue (cached in scope)
72             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
73             sub query_params {
74 6     6 1 7 my ($self, %opts) = @_;
75 6   50     16 my $strict = delete $opts{strict} // 0;
76 6   100     13 my $raw = delete $opts{raw} // 0;
77 6 50       7 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
78              
79 6 50       11 my $cache_key = $raw ? 'pagi.sse.query.raw' : ($strict ? 'pagi.sse.query.strict' : 'pagi.sse.query');
    100          
80 6 100       15 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
81              
82 4         6 my $qs = $self->query_string;
83 4         4 my @pairs;
84              
85 4         16 for my $part (split /[&;]/, $qs) {
86 6 50       7 next unless length $part;
87 6         13 my ($key, $val) = split /=/, $part, 2;
88 6   50     9 $key //= '';
89 6   50     7 $val //= '';
90              
91             # URL decode (handles + as space)
92 6         8 my $key_decoded = _url_decode($key);
93 6         7 my $val_decoded = _url_decode($val);
94              
95             # UTF-8 decode unless raw mode
96 6 100       10 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
97 6 100       125 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
98              
99 6         82 push @pairs, $key_final, $val_final;
100             }
101              
102 4         13 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
103 4         169 return $self->{scope}{$cache_key};
104             }
105              
106             # Raw query params (no UTF-8 decoding)
107             sub raw_query_params {
108 1     1 1 2 my $self = shift;
109 1         3 return $self->query_params(raw => 1);
110             }
111              
112             # Single query param accessor
113             sub query_param {
114 4     4 1 8 my ($self, $name, %opts) = @_;
115 4         8 return $self->query_params(%opts)->get($name);
116             }
117              
118             # Raw single query param
119             sub raw_query_param {
120 1     1 1 2 my ($self, $name) = @_;
121 1         4 return $self->query_param($name, raw => 1);
122             }
123              
124 1   50 1 1 5 sub scheme { shift->{scope}{scheme} // 'http' }
125 1   50 1 1 6 sub http_version { shift->{scope}{http_version} // '1.1' }
126 0     0 0 0 sub client { shift->{scope}{client} }
127 0     0 0 0 sub server { shift->{scope}{server} }
128              
129              
130             # Application state (injected by PAGI::Lifespan, read-only)
131             sub state {
132 4     4 1 8 my $self = shift;
133 4   100     30 return $self->{scope}{state} // {};
134             }
135              
136             # Path parameter accessors - captured from URL path by router
137             # Stored in scope->{path_params} for router-agnostic access
138             sub path_params {
139 2     2 1 4 my ($self) = @_;
140 2   100     13 return $self->{scope}{path_params} // {};
141             }
142              
143             sub path_param {
144 6     6 1 28 my ($self, $name) = @_;
145 6   100     17 my $params = $self->{scope}{path_params} // {};
146 6         35 return $params->{$name};
147             }
148              
149             # Connection state accessors
150 4     4 1 25 sub connection_state { shift->{_state} }
151              
152             sub is_started {
153 114     114 1 1257 my $self = shift;
154 114         286 return $self->{_state} eq 'started';
155             }
156              
157             sub is_closed {
158 127     127 1 837 my $self = shift;
159 127         502 return $self->{_state} eq 'closed';
160             }
161              
162             # Disconnect reason - why the connection closed
163             # Common values: 'client_closed', 'write_error', 'send_timeout', 'idle_timeout'
164             sub disconnect_reason {
165 0     0 1 0 my $self = shift;
166 0         0 return $self->{_disconnect_reason};
167             }
168              
169             # Internal state setters
170             sub _set_state {
171 54     54   132 my ($self, $state) = @_;
172 54         82 $self->{_state} = $state;
173             }
174              
175             sub _set_closed {
176 24     24   38 my ($self) = @_;
177 24         48 $self->{_state} = 'closed';
178             }
179              
180             # Start the SSE stream
181 47     47 1 524 async sub start {
182 47         73 my ($self, %opts) = @_;
183              
184             # Idempotent - don't start twice
185 47 100 66     79 return $self if $self->is_started || $self->is_closed;
186              
187             my $event = {
188             type => 'sse.start',
189 45   100     226 status => $opts{status} // 200,
190             };
191 45 100       90 $event->{headers} = $opts{headers} if exists $opts{headers};
192              
193 45         96 await $self->{send}->($event);
194 45         1848 $self->_set_state('started');
195              
196 45         254 return $self;
197             }
198              
199             # Enable keepalive - sends sse.keepalive event to server
200             # Server handles the timer; this is loop-agnostic
201 8     8 1 28 async sub keepalive {
202 8         17 my ($self, $interval, $comment) = @_;
203              
204 8         64 await $self->{send}->({
205             type => 'sse.keepalive',
206             interval => $interval // 0,
207             comment => $comment // '',
208             });
209              
210 8         472 return $self;
211             }
212              
213             # Single header lookup (case-insensitive, returns last value)
214             sub header {
215 9     9 1 22 my ($self, $name) = @_;
216 9         18 $name = lc($name);
217 9         9 my $value;
218 9   50     40 for my $pair (@{$self->{scope}{headers} // []}) {
  9         30  
219 15 100       33 if (lc($pair->[0]) eq $name) {
220 6         11 $value = $pair->[1];
221             }
222             }
223 9         42 return $value;
224             }
225              
226             # All headers as Hash::MultiValue (cached in scope)
227             sub headers {
228 2     2 1 3 my $self = shift;
229 2 50       7 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
230              
231 2         3 my @pairs;
232 2   50     2 for my $pair (@{$self->{scope}{headers} // []}) {
  2         8  
233 5         11 push @pairs, lc($pair->[0]), $pair->[1];
234             }
235              
236 2         45 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
237 2         87 return $self->{scope}{'pagi.request.headers'};
238             }
239              
240             # All values for a header
241             sub header_all {
242 2     2 1 4 my ($self, $name) = @_;
243 2         7 return $self->headers->get_all(lc($name));
244             }
245              
246             # Get Last-Event-ID header from client (for reconnection)
247             sub last_event_id {
248 6     6 1 19 my ($self) = @_;
249 6         15 return $self->header('last-event-id');
250             }
251              
252             # Send data-only event
253 10     10 1 76 async sub send {
254 10         12 my ($self, $data) = @_;
255              
256 10 100       13 croak "Cannot send on closed SSE connection" if $self->is_closed;
257              
258             # Auto-start if not started
259 9 100       28 await $self->start unless $self->is_started;
260              
261 9         65 await $self->{send}->({
262             type => 'sse.send',
263             data => $data,
264             });
265              
266 9         196 return $self;
267             }
268              
269             # Send JSON-encoded data
270 5     5 1 61 async sub send_json {
271 5         7 my ($self, $data) = @_;
272              
273 5 50       9 croak "Cannot send on closed SSE connection" if $self->is_closed;
274              
275 5 100       10 await $self->start unless $self->is_started;
276              
277 5         49 my $json = JSON::MaybeXS::encode_json($data);
278              
279 5         14 await $self->{send}->({
280             type => 'sse.send',
281             data => $json,
282             });
283              
284 5         125 return $self;
285             }
286              
287             # Send full SSE event with all fields
288 15     15 1 160 async sub send_event {
289 15         62 my ($self, %opts) = @_;
290              
291 15 50       27 croak "Cannot send on closed SSE connection" if $self->is_closed;
292 15 50       43 croak "send_event requires 'data' parameter" unless exists $opts{data};
293              
294 15 100       29 await $self->start unless $self->is_started;
295              
296             # Auto-encode hashref/arrayref data as JSON
297 15         200 my $data = $opts{data};
298 15 100       32 if (ref $data) {
299 13         68 $data = JSON::MaybeXS::encode_json($data);
300             }
301              
302 15         35 my $event = {
303             type => 'sse.send',
304             data => $data,
305             };
306              
307 15 100       38 $event->{event} = $opts{event} if defined $opts{event};
308 15 100       44 $event->{id} = "$opts{id}" if defined $opts{id};
309 15 100       31 $event->{retry} = int($opts{retry}) if defined $opts{retry};
310              
311 15         26 await $self->{send}->($event);
312              
313 15         361 return $self;
314             }
315              
316             # Safe send - returns bool instead of throwing
317 10     10 1 89 async sub try_send {
318 10         14 my ($self, $data) = @_;
319 10 100       18 return 0 if $self->is_closed;
320              
321 9         7 eval {
322 9 50       14 await $self->start unless $self->is_started;
323 9         24 await $self->{send}->({
324             type => 'sse.send',
325             data => $data,
326             });
327             };
328 9 100       1283 if (my $err = $@) {
329 7         16 $self->_set_closed;
330 7         15 await $self->_trigger_error($err);
331 7         211 return 0;
332             }
333 2         6 return 1;
334             }
335              
336 2     2 1 25 async sub try_send_json {
337 2         4 my ($self, $data) = @_;
338 2 50       4 return 0 if $self->is_closed;
339              
340 2         3 eval {
341 2 50       5 await $self->start unless $self->is_started;
342 2         12 my $json = JSON::MaybeXS::encode_json($data);
343 2         6 await $self->{send}->({
344             type => 'sse.send',
345             data => $json,
346             });
347             };
348 2 50       47 if (my $err = $@) {
349 0         0 $self->_set_closed;
350 0         0 await $self->_trigger_error($err);
351 0         0 return 0;
352             }
353 2         6 return 1;
354             }
355              
356             # Send SSE comment (doesn't trigger onmessage in browser)
357 1     1 0 1 async sub send_comment {
358 1         1 my ($self, $comment) = @_;
359              
360 1 50       2 croak "Cannot send on closed SSE connection" if $self->is_closed;
361              
362 1 50       2 await $self->start unless $self->is_started;
363              
364 1         3 await $self->{send}->({
365             type => 'sse.comment',
366             comment => $comment,
367             });
368              
369 1         22 return $self;
370             }
371              
372 1     1 0 1 async sub try_send_comment {
373 1         2 my ($self, $comment) = @_;
374 1 50       2 return 0 if $self->is_closed;
375              
376 1         1 eval {
377 1 50       2 await $self->start unless $self->is_started;
378 1         2 await $self->{send}->({
379             type => 'sse.comment',
380             comment => $comment,
381             });
382             };
383 1 50       22 if (my $err = $@) {
384 0         0 $self->_set_closed;
385 0         0 await $self->_trigger_error($err);
386 0         0 return 0;
387             }
388 1         3 return 1;
389             }
390              
391 2     2 1 26 async sub try_send_event {
392 2         5 my ($self, %opts) = @_;
393 2 50       5 return 0 if $self->is_closed;
394              
395 2         4 eval {
396 2 50       4 await $self->start unless $self->is_started;
397              
398 2   50     40 my $data = $opts{data} // '';
399 2 50       5 if (ref $data) {
400 0         0 $data = JSON::MaybeXS::encode_json($data);
401             }
402              
403 2         5 my $event = {
404             type => 'sse.send',
405             data => $data,
406             };
407 2 50       7 $event->{event} = $opts{event} if defined $opts{event};
408 2 50       5 $event->{id} = "$opts{id}" if defined $opts{id};
409 2 50       4 $event->{retry} = int($opts{retry}) if defined $opts{retry};
410              
411 2         6 await $self->{send}->($event);
412             };
413 2 50       50 if (my $err = $@) {
414 0         0 $self->_set_closed;
415 0         0 await $self->_trigger_error($err);
416 0         0 return 0;
417             }
418 2         6 return 1;
419             }
420              
421             # Internal: trigger error callbacks
422 7     7   8 async sub _trigger_error {
423 7         10 my ($self, $error) = @_;
424              
425 7         6 for my $cb (@{$self->{_on_error}}) {
  7         16  
426 7         47 eval {
427 7         14 my $r = $cb->($self, $error);
428 6 100 66     76 if (blessed($r) && $r->isa('Future')) {
429 2         6 await $r;
430             }
431             };
432 7 100       41 if ($@) {
433 2         14 warn "PAGI::SSE on_error callback error: $@";
434             }
435             }
436              
437             # If no error handlers registered, warn
438 7 100       16 if (!@{$self->{_on_error}}) {
  7         30  
439 2         20 warn "PAGI::SSE error: $error";
440             }
441             }
442              
443             # Register close callback
444             sub on_close {
445 16     16 1 181 my ($self, $callback) = @_;
446 16         12 push @{$self->{_on_close}}, $callback;
  16         29  
447 16         25 return $self;
448             }
449              
450             # Register error callback
451             sub on_error {
452 8     8 1 48 my ($self, $callback) = @_;
453 8         5 push @{$self->{_on_error}}, $callback;
  8         13  
454 8         10 return $self;
455             }
456              
457             # Generic event dispatcher - dispatches to on_close or on_error
458             sub on {
459 4     4 1 58 my ($self, $event, $callback) = @_;
460 4 100       10 if ($event eq 'close') {
    100          
461 2         5 return $self->on_close($callback);
462             }
463             elsif ($event eq 'error') {
464 1         3 return $self->on_error($callback);
465             }
466             else {
467 1         108 croak "Unknown event type: $event (expected close or error)";
468             }
469             }
470              
471             # Internal: run all on_close callbacks
472 14     14   12 async sub _run_close_callbacks {
473 14         16 my ($self) = @_;
474              
475             # Only run once
476 14 50       23 return if $self->{_close_callbacks_ran};
477 14         14 $self->{_close_callbacks_ran} = 1;
478              
479 14   100     25 my $reason = $self->{_disconnect_reason} // 'unknown';
480              
481 14         11 for my $cb (@{$self->{_on_close}}) {
  14         23  
482 13         12 eval {
483 13         20 my $r = $cb->($self, $reason);
484 13 100 100     70 if (blessed($r) && $r->isa('Future')) {
485 1         3 await $r;
486             }
487             };
488 13 50       36 if ($@) {
489 0         0 warn "PAGI::SSE on_close callback error: $@";
490             }
491             }
492              
493             # Clear all callback arrays to break any closure-based cycles
494 14         37 $self->{_on_close} = [];
495 14         42 $self->{_on_error} = [];
496             }
497              
498             # Close the connection
499             sub close {
500 3     3 1 6 my ($self) = @_;
501              
502 3 50       7 return $self if $self->is_closed;
503              
504 3         6 $self->_set_closed;
505 3         5 $self->_run_close_callbacks->get;
506              
507 3         58 return $self;
508             }
509              
510             # Wait for disconnect
511 12     12 1 74 async sub run {
512 12         13 my ($self) = @_;
513              
514 12 100       20 await $self->start unless $self->is_started;
515              
516 12         42 while (!$self->is_closed) {
517 12         10 my $event = eval { await $self->{receive}->() };
  12         24  
518 12 100       501 if (my $err = $@) {
519 1         10 warn "PAGI::SSE receive error: $err";
520 1         5 last;
521             }
522              
523 11   50     24 my $type = $event->{type} // '';
524              
525 11 50       15 if ($type eq 'sse.disconnect') {
526 11   50     34 $self->{_disconnect_reason} = $event->{reason} // 'client_closed';
527 11         19 $self->_set_closed;
528 11         34 await $self->_run_close_callbacks;
529 11         271 last;
530             }
531             }
532              
533 12         31 return;
534             }
535              
536             # Iterate over items and send events
537 4     4 1 94 async sub each {
538 4         6 my ($self, $source, $callback) = @_;
539              
540 4 100       8 await $self->start unless $self->is_started;
541              
542 4         26 my $index = 0;
543              
544             # Handle arrayref
545 4 100       12 if (ref $source eq 'ARRAY') {
    50          
546 3         7 for my $item (@$source) {
547 7 50       29 last if $self->is_closed;
548              
549 7         12 my $result = await $callback->($item, $index++);
550              
551             # If callback returns a hashref, treat as event spec
552 7 100       255 if (ref $result eq 'HASH') {
553 2         7 await $self->send_event(%$result);
554             }
555             }
556             }
557             # Handle coderef iterator
558             elsif (ref $source eq 'CODE') {
559 1         3 while (!$self->is_closed) {
560 4         6 my $item = $source->();
561 4 100       12 last unless defined $item;
562              
563 3         6 my $result = await $callback->($item, $index++);
564              
565 3 50       118 if (ref $result eq 'HASH') {
566 0         0 await $self->send_event(%$result);
567             }
568             }
569             }
570             else {
571 0         0 croak "each() requires arrayref or coderef, got " . ref($source);
572             }
573              
574 4         32 return $self;
575             }
576              
577             # Periodic callback execution with interval delay
578             # Requires Future::IO to be installed
579 1     1 1 2 async sub every {
580 1         3 my ($self, $interval, $callback) = @_;
581              
582 1 50 33     11 croak "every() requires interval" unless defined $interval && $interval > 0;
583 1 50       5 croak "every() requires callback coderef" unless ref $callback eq 'CODE';
584              
585             # Future::IO is required for every() - fail clearly if not available
586 1 50       3 eval { require Future::IO; 1 }
  1         641  
  1         31154  
587             or croak "every() requires Future::IO to be installed. "
588             . "Install it with: cpanm Future::IO";
589              
590             # Future::IO must be configured with a backend
591 18     18   178 no warnings 'once';
  18         24  
  18         11518  
592 1 50       200 croak "Future::IO backend not configured. Add to your app.pl:\n"
593             . " use Future::IO::Impl::IOAsync;\n"
594             unless $Future::IO::IMPL;
595              
596 0 0         await $self->start unless $self->is_started;
597              
598             # Start background disconnect monitor
599 0           my $disconnect_future = $self->_watch_for_disconnect;
600              
601 0           while (!$self->is_closed) {
602             # Execute the callback
603 0           my $ok = eval { await $callback->(); 1 };
  0            
  0            
604 0 0         unless ($ok) {
605 0           my $err = $@;
606             # Callback failed - connection likely closed or error occurred
607 0           $self->_set_closed;
608 0           await $self->_run_close_callbacks;
609 0           last;
610             }
611              
612             # Race between sleep and disconnect detection
613 0           my $sleep_future = Future::IO->sleep($interval);
614 0           my $winner = await Future->wait_any($sleep_future, $disconnect_future);
615              
616             # If disconnect won, exit the loop
617 0 0         if ($self->is_closed) {
618 0 0 0       $sleep_future->cancel if $sleep_future->can('cancel') && !$sleep_future->is_ready;
619 0           last;
620             }
621             }
622              
623             # Clean up disconnect monitor if still running
624 0 0 0       $disconnect_future->cancel if $disconnect_future->can('cancel') && !$disconnect_future->is_ready;
625              
626 0           return $self;
627             }
628              
629             # Background future that completes when disconnect is detected
630             sub _watch_for_disconnect {
631 0     0     my ($self) = @_;
632              
633 0           my $receive = $self->{receive};
634 0           require Scalar::Util;
635 0           my $weak_self = $self;
636 0           Scalar::Util::weaken($weak_self);
637              
638 0     0     return (async sub {
639 0   0       while ($weak_self && !$weak_self->is_closed) {
640 0           my $event = eval { await $receive->() };
  0            
641 0 0         last unless $event;
642              
643 0   0       my $type = $event->{type} // '';
644 0 0         if ($type eq 'sse.disconnect') {
645 0 0         if ($weak_self) {
646 0   0       $weak_self->{_disconnect_reason} = $event->{reason} // 'client_closed';
647 0           $weak_self->_set_closed;
648 0           await $weak_self->_run_close_callbacks;
649             }
650 0           last;
651             }
652             }
653 0           })->();
654             }
655              
656             1;
657              
658             __END__