File Coverage

blib/lib/PAGI/SSE.pm
Criterion Covered Total %
statement 324 379 85.4
branch 124 178 69.6
condition 49 94 52.1
subroutine 60 66 90.9
pod 44 48 91.6
total 601 765 78.5


line stmt bran cond sub pod time code
1             package PAGI::SSE;
2             $PAGI::SSE::VERSION = '0.002001';
3 24     24   2557670 use strict;
  24         40  
  24         810  
4 24     24   102 use warnings;
  24         51  
  24         1124  
5 24     24   111 use Carp qw(croak);
  24         27  
  24         1101  
6 24     24   7711 use Hash::MultiValue;
  24         48018  
  24         1221  
7 24     24   1779 use Future::AsyncAwait;
  24         39246  
  24         125  
8 24     24   1060 use Future;
  24         28  
  24         413  
9 24     24   6828 use JSON::MaybeXS ();
  24         179420  
  24         927  
10 24     24   164 use Scalar::Util qw(blessed);
  24         35  
  24         1265  
11 24     24   6901 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  24         262783  
  24         138874  
12              
13              
14             sub new {
15 108     108 1 2504034 my ($class, $scope, $receive, $send) = @_;
16              
17 108 100 66     1055 croak "PAGI::SSE requires scope hashref"
18             unless $scope && ref($scope) eq 'HASH';
19 107 100 66     629 croak "PAGI::SSE requires receive coderef"
20             unless $receive && ref($receive) eq 'CODE';
21 106 100 66     546 croak "PAGI::SSE requires send coderef"
22             unless $send && ref($send) eq 'CODE';
23             croak "PAGI::SSE requires scope type 'sse', got '$scope->{type}'"
24 105 100 50     499 unless ($scope->{type} // '') eq 'sse';
25              
26             # Return existing SSE object if one was already created for this scope
27             # This ensures consistent state (is_started, is_closed, callbacks) if
28             # multiple code paths create SSE objects from the same scope.
29 104 100       254 return $scope->{'pagi.sse'} if $scope->{'pagi.sse'};
30              
31 103         684 my $self = bless {
32             scope => $scope,
33             receive => $receive,
34             send => $send,
35             _state => 'pending', # pending -> started -> closed
36             _on_close => [],
37             _on_error => [],
38             _disconnect_reason => undef, # Set when disconnect received
39             }, $class;
40              
41             # Cache in scope for reuse (weakened to avoid circular reference leak)
42 103         221 $scope->{'pagi.sse'} = $self;
43 103         175 Scalar::Util::weaken($scope->{'pagi.sse'});
44              
45 103         383 return $self;
46             }
47              
48             # Scope property accessors
49 5     5 1 22 sub scope { shift->{scope} }
50 2   50 2 1 556 sub path { shift->{scope}{path} // '/' }
51 0   0 0 1 0 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} // '/' }
  0   0     0  
52 5   50 5 1 25 sub query_string { shift->{scope}{query_string} // '' }
53              
54             # URL decode helper (handles + as space per application/x-www-form-urlencoded)
55             sub _url_decode {
56 12     12   20 my ($str) = @_;
57 12 50       25 return '' unless defined $str;
58 12         23 $str =~ s/\+/ /g;
59 12         24 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  2         12  
60 12         28 return $str;
61             }
62              
63             # UTF-8 decode helper with strict/lenient mode
64             sub _decode_utf8 {
65 8     8   18 my ($str, $strict) = @_;
66 8 50       19 return '' unless defined $str;
67 8 50       18 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
68 8         10 $flag |= LEAVE_SRC;
69 8         44 return decode('UTF-8', $str, $flag);
70             }
71              
72             # Query params as Hash::MultiValue (cached in scope)
73             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
74             sub query_params {
75 6     6 1 12 my ($self, %opts) = @_;
76 6   50     22 my $strict = delete $opts{strict} // 0;
77 6   100     18 my $raw = delete $opts{raw} // 0;
78 6 50       14 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
79              
80 6 50       14 my $cache_key = $raw ? 'pagi.sse.query.raw' : ($strict ? 'pagi.sse.query.strict' : 'pagi.sse.query');
    100          
81 6 100       21 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
82              
83 4         12 my $qs = $self->query_string;
84 4         6 my @pairs;
85              
86 4         24 for my $part (split /[&;]/, $qs) {
87 6 50       14 next unless length $part;
88 6         20 my ($key, $val) = split /=/, $part, 2;
89 6   50     17 $key //= '';
90 6   50     14 $val //= '';
91              
92             # URL decode (handles + as space)
93 6         14 my $key_decoded = _url_decode($key);
94 6         13 my $val_decoded = _url_decode($val);
95              
96             # UTF-8 decode unless raw mode
97 6 100       17 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
98 6 100       207 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
99              
100 6         155 push @pairs, $key_final, $val_final;
101             }
102              
103 4         24 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
104 4         264 return $self->{scope}{$cache_key};
105             }
106              
107             # Raw query params (no UTF-8 decoding)
108             sub raw_query_params {
109 1     1 1 2 my $self = shift;
110 1         3 return $self->query_params(raw => 1);
111             }
112              
113             # Single query param accessor
114             sub query_param {
115 4     4 1 12 my ($self, $name, %opts) = @_;
116 4         13 return $self->query_params(%opts)->get($name);
117             }
118              
119             # Raw single query param
120             sub raw_query_param {
121 1     1 1 81 my ($self, $name) = @_;
122 1         7 return $self->query_param($name, raw => 1);
123             }
124              
125 1   50 1 1 10 sub scheme { shift->{scope}{scheme} // 'http' }
126 1   50 1 1 10 sub http_version { shift->{scope}{http_version} // '1.1' }
127 0     0 0 0 sub client { shift->{scope}{client} }
128 0     0 0 0 sub server { shift->{scope}{server} }
129              
130              
131             # Application state (injected by PAGI::Lifespan, read-only)
132             sub state {
133 4     4 1 12 my $self = shift;
134 4   100     36 return $self->{scope}{state} // {};
135             }
136              
137             # Path parameter accessors - captured from URL path by router
138             # Stored in scope->{path_params} for router-agnostic access
139             sub path_params {
140 2     2 1 4 my ($self) = @_;
141 2   100     13 return $self->{scope}{path_params} // {};
142             }
143              
144             sub path_param {
145 6     6 1 26 my ($self, $name) = @_;
146 6   100     20 my $params = $self->{scope}{path_params} // {};
147 6         24 return $params->{$name};
148             }
149              
150             # Connection state accessors
151 4     4 1 31 sub connection_state { shift->{_state} }
152              
153             sub is_started {
154 126     126 1 2840 my $self = shift;
155 126         459 return $self->{_state} eq 'started';
156             }
157              
158             sub is_closed {
159 158     158 1 960 my $self = shift;
160 158         616 return $self->{_state} eq 'closed';
161             }
162              
163             # True while the stream is live (started and not yet closed/disconnected).
164             # Mirrors PAGI::WebSocket->is_connected (state eq 'connected'): an SSE scope
165             # carries no pagi.connection (spec: N/A for sse), so connection liveness is
166             # derived from this object's own state machine, not the base Context method.
167             sub is_connected {
168 9     9 1 50 my $self = shift;
169 9         35 return $self->{_state} eq 'started';
170             }
171              
172             # Disconnect reason - why the connection closed
173             # Common values: 'client_closed', 'write_error', 'send_timeout', 'idle_timeout'
174             sub disconnect_reason {
175 0     0 1 0 my $self = shift;
176 0         0 return $self->{_disconnect_reason};
177             }
178              
179             # Outbound flow-control introspection (delegates to the pagi.transport handle)
180             sub buffered_amount {
181 2     2 1 13 my $self = shift;
182 2         11 my $t = $self->{scope}{'pagi.transport'};
183 2 100       11 return 0 unless $t;
184 1         5 return $t->buffered_amount;
185             }
186              
187             sub high_water_mark {
188 2     2 1 545 my $self = shift;
189 2         7 my $t = $self->{scope}{'pagi.transport'};
190 2 100       10 return undef unless $t;
191 1         5 return $t->high_water_mark;
192             }
193              
194             sub low_water_mark {
195 2     2 1 573 my $self = shift;
196 2         7 my $t = $self->{scope}{'pagi.transport'};
197 2 100       11 return undef unless $t;
198 1         5 return $t->low_water_mark;
199             }
200              
201             sub on_high_water {
202 2     2 1 36 my ($self, $cb) = @_;
203 2         6 my $t = $self->{scope}{'pagi.transport'};
204 2 100 66     21 $t->on_high_water($cb) if $t && $t->can('on_high_water');
205 2         15 return $self;
206             }
207              
208             sub on_drain {
209 2     2 1 661 my ($self, $cb) = @_;
210 2         6 my $t = $self->{scope}{'pagi.transport'};
211 2 100 66     33 $t->on_drain($cb) if $t && $t->can('on_drain');
212 2         12 return $self;
213             }
214              
215             sub is_writable {
216 3     3 1 5 my $self = shift;
217 3         5 my $t = $self->{scope}{'pagi.transport'};
218 3 100       9 return 1 unless $t;
219 2         4 my $high = $t->high_water_mark;
220 2 50       7 return 1 unless defined $high;
221 2 100       4 return $t->buffered_amount < $high ? 1 : 0;
222             }
223              
224             # Internal state setters
225             sub _set_state {
226 63     63   247 my ($self, $state) = @_;
227 63         112 $self->{_state} = $state;
228             }
229              
230             sub _set_closed {
231 32     32   66 my ($self) = @_;
232 32         52 $self->{_state} = 'closed';
233              
234             # Wake a parked run() (which races this future against $receive) so a close
235             # from deep in a helper actually ends the stream.
236             $self->{_closed_future}->done
237 32 100 100     159 if $self->{_closed_future} && !$self->{_closed_future}->is_ready;
238             }
239              
240             # Start the SSE stream
241 56     56 1 765 async sub start {
242 56         227 my ($self, %opts) = @_;
243              
244             # Idempotent - don't start twice
245 56 100 66     121 return $self if $self->is_started || $self->is_closed;
246              
247             my $event = {
248             type => 'sse.start',
249 54   100     345 status => $opts{status} // 200,
250             };
251 54 100       107 $event->{headers} = $opts{headers} if exists $opts{headers};
252              
253 54         143 await $self->{send}->($event);
254 54         2900 $self->_set_state('started');
255              
256 54         297 return $self;
257             }
258              
259             # Enable keepalive - sends sse.keepalive event to server
260             # Server handles the timer; this is loop-agnostic
261 8     8 1 21 async sub keepalive {
262 8         10 my ($self, $interval, $comment) = @_;
263              
264 8         50 await $self->{send}->({
265             type => 'sse.keepalive',
266             interval => $interval // 0,
267             comment => $comment // '',
268             });
269              
270 8         344 return $self;
271             }
272              
273             # Single header lookup (case-insensitive, returns last value)
274             sub header {
275 9     9 1 26 my ($self, $name) = @_;
276 9         22 $name = lc($name);
277 9         12 my $value;
278 9   50     11 for my $pair (@{$self->{scope}{headers} // []}) {
  9         130  
279 15 100       46 if (lc($pair->[0]) eq $name) {
280 6         13 $value = $pair->[1];
281             }
282             }
283 9         50 return $value;
284             }
285              
286             # All headers as Hash::MultiValue (cached in scope)
287             sub headers {
288 2     2 1 6 my $self = shift;
289 2 50       9 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
290              
291 2         5 my @pairs;
292 2   50     4 for my $pair (@{$self->{scope}{headers} // []}) {
  2         10  
293 5         19 push @pairs, lc($pair->[0]), $pair->[1];
294             }
295              
296 2         15 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
297 2         143 return $self->{scope}{'pagi.request.headers'};
298             }
299              
300             # All values for a header
301             sub header_all {
302 2     2 1 7 my ($self, $name) = @_;
303 2         9 return $self->headers->get_all(lc($name));
304             }
305              
306             # Get Last-Event-ID header from client (for reconnection)
307             sub last_event_id {
308 6     6 1 20 my ($self) = @_;
309 6         17 return $self->header('last-event-id');
310             }
311              
312             # Send data-only event
313 11     11 1 86 async sub send {
314 11         86 my ($self, $data) = @_;
315              
316 11 100       18 croak "Cannot send on closed SSE connection" if $self->is_closed;
317              
318             # Auto-start if not started
319 10 100       17 await $self->start unless $self->is_started;
320              
321 10         74 await $self->{send}->({
322             type => 'sse.send',
323             data => $data,
324             });
325              
326 10         249 return $self;
327             }
328              
329             # Send JSON-encoded data
330 5     5 1 89 async sub send_json {
331 5         10 my ($self, $data) = @_;
332              
333 5 50       10 croak "Cannot send on closed SSE connection" if $self->is_closed;
334              
335 5 100       21 await $self->start unless $self->is_started;
336              
337 5         51 my $json = JSON::MaybeXS::encode_json($data);
338              
339 5         16 await $self->{send}->({
340             type => 'sse.send',
341             data => $json,
342             });
343              
344 5         117 return $self;
345             }
346              
347             # Send full SSE event with all fields
348 15     15 1 167 async sub send_event {
349 15         63 my ($self, %opts) = @_;
350              
351 15 50       35 croak "Cannot send on closed SSE connection" if $self->is_closed;
352 15 50       30 croak "send_event requires 'data' parameter" unless exists $opts{data};
353              
354 15 100       25 await $self->start unless $self->is_started;
355              
356             # Auto-encode hashref/arrayref data as JSON
357 15         178 my $data = $opts{data};
358 15 100       29 if (ref $data) {
359 13         89 $data = JSON::MaybeXS::encode_json($data);
360             }
361              
362 15         34 my $event = {
363             type => 'sse.send',
364             data => $data,
365             };
366              
367 15 100       40 $event->{event} = $opts{event} if defined $opts{event};
368 15 100       40 $event->{id} = "$opts{id}" if defined $opts{id};
369 15 100       28 $event->{retry} = int($opts{retry}) if defined $opts{retry};
370              
371 15         29 await $self->{send}->($event);
372              
373 15         348 return $self;
374             }
375              
376             # Safe send - returns bool instead of throwing
377 10     10 1 77 async sub try_send {
378 10         14 my ($self, $data) = @_;
379 10 100       21 return 0 if $self->is_closed;
380              
381 9         10 eval {
382 9 50       91 await $self->start unless $self->is_started;
383 9         32 await $self->{send}->({
384             type => 'sse.send',
385             data => $data,
386             });
387             };
388 9 100       1331 if (my $err = $@) {
389 7         19 $self->_set_closed;
390 7         16 await $self->_trigger_error($err);
391 7         234 return 0;
392             }
393 2         7 return 1;
394             }
395              
396 2     2 1 27 async sub try_send_json {
397 2         3 my ($self, $data) = @_;
398 2 50       4 return 0 if $self->is_closed;
399              
400 2         3 eval {
401 2 50       4 await $self->start unless $self->is_started;
402 2         11 my $json = JSON::MaybeXS::encode_json($data);
403 2         8 await $self->{send}->({
404             type => 'sse.send',
405             data => $json,
406             });
407             };
408 2 50       49 if (my $err = $@) {
409 0         0 $self->_set_closed;
410 0         0 await $self->_trigger_error($err);
411 0         0 return 0;
412             }
413 2         6 return 1;
414             }
415              
416             # Send SSE comment (doesn't trigger onmessage in browser)
417 1     1 0 1 async sub send_comment {
418 1         2 my ($self, $comment) = @_;
419              
420 1 50       3 croak "Cannot send on closed SSE connection" if $self->is_closed;
421              
422 1 50       1 await $self->start unless $self->is_started;
423              
424 1         3 await $self->{send}->({
425             type => 'sse.comment',
426             comment => $comment,
427             });
428              
429 1         23 return $self;
430             }
431              
432 1     1 0 1 async sub try_send_comment {
433 1         3 my ($self, $comment) = @_;
434 1 50       2 return 0 if $self->is_closed;
435              
436 1         2 eval {
437 1 50       1 await $self->start unless $self->is_started;
438 1         3 await $self->{send}->({
439             type => 'sse.comment',
440             comment => $comment,
441             });
442             };
443 1 50       30 if (my $err = $@) {
444 0         0 $self->_set_closed;
445 0         0 await $self->_trigger_error($err);
446 0         0 return 0;
447             }
448 1         3 return 1;
449             }
450              
451 2     2 1 24 async sub try_send_event {
452 2         7 my ($self, %opts) = @_;
453 2 50       5 return 0 if $self->is_closed;
454              
455 2         2 eval {
456 2 50       4 await $self->start unless $self->is_started;
457              
458 2   50     6 my $data = $opts{data} // '';
459 2 50       4 if (ref $data) {
460 0         0 $data = JSON::MaybeXS::encode_json($data);
461             }
462              
463 2         6 my $event = {
464             type => 'sse.send',
465             data => $data,
466             };
467 2 50       8 $event->{event} = $opts{event} if defined $opts{event};
468 2 50       4 $event->{id} = "$opts{id}" if defined $opts{id};
469 2 50       4 $event->{retry} = int($opts{retry}) if defined $opts{retry};
470              
471 2         12 await $self->{send}->($event);
472             };
473 2 50       47 if (my $err = $@) {
474 0         0 $self->_set_closed;
475 0         0 await $self->_trigger_error($err);
476 0         0 return 0;
477             }
478 2         6 return 1;
479             }
480              
481             # Internal: trigger error callbacks
482 7     7   18 async sub _trigger_error {
483 7         10 my ($self, $error) = @_;
484              
485 7         4 for my $cb (@{$self->{_on_error}}) {
  7         15  
486 7         15 eval {
487 7         13 my $r = $cb->($self, $error);
488 6 100 66     74 if (blessed($r) && $r->isa('Future')) {
489 2         7 await $r;
490             }
491             };
492 7 100       41 if ($@) {
493 2         16 warn "PAGI::SSE on_error callback error: $@";
494             }
495             }
496              
497             # If no error handlers registered, warn
498 7 100       7 if (!@{$self->{_on_error}}) {
  7         30  
499 2         17 warn "PAGI::SSE error: $error";
500             }
501             }
502              
503             # Register close callback
504             sub on_close {
505 19     19 1 522 my ($self, $callback) = @_;
506 19         28 push @{$self->{_on_close}}, $callback;
  19         46  
507 19         42 return $self;
508             }
509              
510             # Register error callback
511             sub on_error {
512 8     8 1 51 my ($self, $callback) = @_;
513 8         10 push @{$self->{_on_error}}, $callback;
  8         12  
514 8         13 return $self;
515             }
516              
517             # Generic event dispatcher - dispatches to on_close or on_error
518             sub on {
519 4     4 1 89 my ($self, $event, $callback) = @_;
520 4 100       16 if ($event eq 'close') {
    100          
521 2         6 return $self->on_close($callback);
522             }
523             elsif ($event eq 'error') {
524 1         4 return $self->on_error($callback);
525             }
526             else {
527 1         154 croak "Unknown event type: $event (expected close or error)";
528             }
529             }
530              
531             # Internal: run all on_close callbacks
532 22     22   41 async sub _run_close_callbacks {
533 22         30 my ($self) = @_;
534              
535             # Only run once
536 22 50       51 return if $self->{_close_callbacks_ran};
537 22         37 $self->{_close_callbacks_ran} = 1;
538              
539 22   50     64 my $reason = $self->{_disconnect_reason} // 'unknown';
540              
541 22         24 for my $cb (@{$self->{_on_close}}) {
  22         47  
542 16         19 eval {
543 16         37 my $r = $cb->($self, $reason);
544 16 100 100     247 if (blessed($r) && $r->isa('Future')) {
545 3         11 await $r;
546             }
547             };
548 16 50       1265 if ($@) {
549 0         0 warn "PAGI::SSE on_close callback error: $@";
550             }
551             }
552              
553             # Clear all callback arrays to break any closure-based cycles
554 22         74 $self->{_on_close} = [];
555 22         100 $self->{_on_error} = [];
556             }
557              
558             # Close the connection
559 11     11 1 904 async sub close {
560 11         30 my ($self, %opts) = @_;
561              
562 11 50       27 return $self if $self->is_closed;
563              
564             # Record the reason so on_close callbacks (and the server access log) see it.
565 11   100     89 $self->{_disconnect_reason} //= $opts{reason} // 'app_closed';
      33        
566              
567             # Tell the server to end the stream now (sse.close). The reason is
568             # server-side only and is never written to the wire.
569             await $self->{send}->({
570             type => 'sse.close',
571 11         62 (defined $opts{reason} ? (reason => $opts{reason}) : ()),
572             });
573              
574 11         448 $self->_set_closed;
575 11         99 await $self->_run_close_callbacks;
576              
577 11         514 return $self;
578             }
579              
580             # Wait for disconnect
581 13     13 1 128 async sub run {
582 13         21 my ($self) = @_;
583              
584 13 100       25 await $self->start unless $self->is_started;
585              
586             # Race incoming events against an explicit close() (which resolves this
587             # future via _set_closed), so run() returns when the stream is closed from
588             # anywhere -- not only when the client disconnects.
589 13   33     117 $self->{_closed_future} //= Future->new;
590              
591 13         103 while (!$self->is_closed) {
592 13         23 my $event = eval { await Future->wait_any($self->{receive}->(), $self->{_closed_future}) };
  13         47  
593 13 100       2889 if (my $err = $@) {
594 1         12 warn "PAGI::SSE receive error: $err";
595 1         18 last;
596             }
597              
598 12 100       24 last if $self->is_closed; # woken by close()
599              
600 11   50     26 my $type = $event->{type} // '';
601              
602 11 50       31 if ($type eq 'sse.disconnect') {
603 11   50     38 $self->{_disconnect_reason} = $event->{reason} // 'client_closed';
604 11         25 $self->_set_closed;
605 11         63 await $self->_run_close_callbacks;
606 11         287 last;
607             }
608             }
609              
610 13         50 return;
611             }
612              
613             # Iterate over items and send events
614 4     4 1 99 async sub each {
615 4         8 my ($self, $source, $callback) = @_;
616              
617 4 100       9 await $self->start unless $self->is_started;
618              
619 4         43 my $index = 0;
620              
621             # Handle arrayref
622 4 100       14 if (ref $source eq 'ARRAY') {
    50          
623 3         5 for my $item (@$source) {
624 7 50       47 last if $self->is_closed;
625              
626 7         15 my $result = await $callback->($item, $index++);
627              
628             # If callback returns a hashref, treat as event spec
629 7 100       297 if (ref $result eq 'HASH') {
630 2         6 await $self->send_event(%$result);
631             }
632             }
633             }
634             # Handle coderef iterator
635             elsif (ref $source eq 'CODE') {
636 1         3 while (!$self->is_closed) {
637 4         5 my $item = $source->();
638 4 100       15 last unless defined $item;
639              
640 3         5 my $result = await $callback->($item, $index++);
641              
642 3 50       127 if (ref $result eq 'HASH') {
643 0         0 await $self->send_event(%$result);
644             }
645             }
646             }
647             else {
648 0         0 croak "each() requires arrayref or coderef, got " . ref($source);
649             }
650              
651 4         33 return $self;
652             }
653              
654             # Periodic callback execution with interval delay
655             # Requires Future::IO to be installed
656 1     1 1 2 async sub every {
657 1         1 my ($self, $interval, $callback) = @_;
658              
659 1 50 33     116 croak "every() requires interval" unless defined $interval && $interval > 0;
660 1 50       6 croak "every() requires callback coderef" unless ref $callback eq 'CODE';
661              
662             # Future::IO is required for every() - fail clearly if not available
663 1 50       2 eval { require Future::IO; 1 }
  1         478  
  1         26340  
664             or croak "every() requires Future::IO to be installed. "
665             . "Install it with: cpanm Future::IO";
666              
667             # Future::IO must be configured with a backend
668 24     24   210 no warnings 'once';
  24         31  
  24         15480  
669 1 50       207 croak "Future::IO backend not configured. Add to your app.pl:\n"
670             . " use Future::IO::Impl::IOAsync;\n"
671             unless $Future::IO::IMPL;
672              
673 0 0         await $self->start unless $self->is_started;
674              
675             # Start background disconnect monitor
676 0           my $disconnect_future = $self->_watch_for_disconnect;
677              
678 0           while (!$self->is_closed) {
679             # Execute the callback
680 0           my $ok = eval { await $callback->(); 1 };
  0            
  0            
681 0 0         unless ($ok) {
682 0           my $err = $@;
683             # Callback failed - connection likely closed or error occurred
684 0           $self->_set_closed;
685 0           await $self->_run_close_callbacks;
686 0           last;
687             }
688              
689             # Race between sleep and disconnect detection
690 0           my $sleep_future = Future::IO->sleep($interval);
691 0           my $winner = await Future->wait_any($sleep_future, $disconnect_future);
692              
693             # If disconnect won, exit the loop
694 0 0         if ($self->is_closed) {
695 0 0 0       $sleep_future->cancel if $sleep_future->can('cancel') && !$sleep_future->is_ready;
696 0           last;
697             }
698             }
699              
700             # Clean up disconnect monitor if still running
701 0 0 0       $disconnect_future->cancel if $disconnect_future->can('cancel') && !$disconnect_future->is_ready;
702              
703 0           return $self;
704             }
705              
706             # Background future that completes when disconnect is detected
707             sub _watch_for_disconnect {
708 0     0     my ($self) = @_;
709              
710 0           my $receive = $self->{receive};
711 0           require Scalar::Util;
712 0           my $weak_self = $self;
713 0           Scalar::Util::weaken($weak_self);
714              
715 0     0     return (async sub {
716 0   0       while ($weak_self && !$weak_self->is_closed) {
717 0           my $event = eval { await $receive->() };
  0            
718 0 0         last unless $event;
719              
720 0   0       my $type = $event->{type} // '';
721 0 0         if ($type eq 'sse.disconnect') {
722 0 0         if ($weak_self) {
723 0   0       $weak_self->{_disconnect_reason} = $event->{reason} // 'client_closed';
724 0           $weak_self->_set_closed;
725 0           await $weak_self->_run_close_callbacks;
726             }
727 0           last;
728             }
729             }
730 0           })->();
731             }
732              
733             1;
734              
735             __END__