File Coverage

blib/lib/PAGI/SSE.pm
Criterion Covered Total %
statement 316 371 85.1
branch 119 174 68.3
condition 43 83 51.8
subroutine 59 65 90.7
pod 43 47 91.4
total 580 740 78.3


line stmt bran cond sub pod time code
1             package PAGI::SSE;
2             $PAGI::SSE::VERSION = '0.002000';
3 20     20   2067892 use strict;
  20         60  
  20         731  
4 20     20   82 use warnings;
  20         68  
  20         959  
5 20     20   107 use Carp qw(croak);
  20         28  
  20         1154  
6 20     20   6121 use Hash::MultiValue;
  20         38882  
  20         992  
7 20     20   1785 use Future::AsyncAwait;
  20         41103  
  20         119  
8 20     20   992 use Future;
  20         39  
  20         370  
9 20     20   5568 use JSON::MaybeXS ();
  20         144802  
  20         805  
10 20     20   134 use Scalar::Util qw(blessed);
  20         31  
  20         1172  
11 20     20   5154 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  20         183979  
  20         110818  
12              
13              
14             sub new {
15 98     98 1 1930159 my ($class, $scope, $receive, $send) = @_;
16              
17 98 100 66     892 croak "PAGI::SSE requires scope hashref"
18             unless $scope && ref($scope) eq 'HASH';
19 97 100 66     528 croak "PAGI::SSE requires receive coderef"
20             unless $receive && ref($receive) eq 'CODE';
21 96 100 66     484 croak "PAGI::SSE requires send coderef"
22             unless $send && ref($send) eq 'CODE';
23             croak "PAGI::SSE requires scope type 'sse', got '$scope->{type}'"
24 95 100 50     431 unless ($scope->{type} // '') eq 'sse';
25              
26             # Return existing SSE object if one was already created for this scope
27             # This ensures consistent state (is_started, is_closed, callbacks) if
28             # multiple code paths create SSE objects from the same scope.
29 94 50       211 return $scope->{'pagi.sse'} if $scope->{'pagi.sse'};
30              
31 94         554 my $self = bless {
32             scope => $scope,
33             receive => $receive,
34             send => $send,
35             _state => 'pending', # pending -> started -> closed
36             _on_close => [],
37             _on_error => [],
38             _disconnect_reason => undef, # Set when disconnect received
39             }, $class;
40              
41             # Cache in scope for reuse (weakened to avoid circular reference leak)
42 94         289 $scope->{'pagi.sse'} = $self;
43 94         155 Scalar::Util::weaken($scope->{'pagi.sse'});
44              
45 94         327 return $self;
46             }
47              
48             # Scope property accessors
49 5     5 1 17 sub scope { shift->{scope} }
50 2   50 2 1 621 sub path { shift->{scope}{path} // '/' }
51 0   0 0 1 0 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} // '/' }
  0   0     0  
52 5   50 5 1 17 sub query_string { shift->{scope}{query_string} // '' }
53              
54             # URL decode helper (handles + as space per application/x-www-form-urlencoded)
55             sub _url_decode {
56 12     12   13 my ($str) = @_;
57 12 50       14 return '' unless defined $str;
58 12         12 $str =~ s/\+/ /g;
59 12         14 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  2         7  
60 12         13 return $str;
61             }
62              
63             # UTF-8 decode helper with strict/lenient mode
64             sub _decode_utf8 {
65 8     8   10 my ($str, $strict) = @_;
66 8 50       10 return '' unless defined $str;
67 8 50       9 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
68 8         8 $flag |= LEAVE_SRC;
69 8         27 return decode('UTF-8', $str, $flag);
70             }
71              
72             # Query params as Hash::MultiValue (cached in scope)
73             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
74             sub query_params {
75 6     6 1 8 my ($self, %opts) = @_;
76 6   50     14 my $strict = delete $opts{strict} // 0;
77 6   100     13 my $raw = delete $opts{raw} // 0;
78 6 50       9 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
79              
80 6 50       11 my $cache_key = $raw ? 'pagi.sse.query.raw' : ($strict ? 'pagi.sse.query.strict' : 'pagi.sse.query');
    100          
81 6 100       15 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
82              
83 4         7 my $qs = $self->query_string;
84 4         4 my @pairs;
85              
86 4         17 for my $part (split /[&;]/, $qs) {
87 6 50       9 next unless length $part;
88 6         11 my ($key, $val) = split /=/, $part, 2;
89 6   50     11 $key //= '';
90 6   50     8 $val //= '';
91              
92             # URL decode (handles + as space)
93 6         8 my $key_decoded = _url_decode($key);
94 6         7 my $val_decoded = _url_decode($val);
95              
96             # UTF-8 decode unless raw mode
97 6 100       9 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
98 6 100       150 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
99              
100 6         86 push @pairs, $key_final, $val_final;
101             }
102              
103 4         15 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
104 4         156 return $self->{scope}{$cache_key};
105             }
106              
107             # Raw query params (no UTF-8 decoding)
108             sub raw_query_params {
109 1     1 1 1 my $self = shift;
110 1         28 return $self->query_params(raw => 1);
111             }
112              
113             # Single query param accessor
114             sub query_param {
115 4     4 1 28 my ($self, $name, %opts) = @_;
116 4         11 return $self->query_params(%opts)->get($name);
117             }
118              
119             # Raw single query param
120             sub raw_query_param {
121 1     1 1 3 my ($self, $name) = @_;
122 1         2 return $self->query_param($name, raw => 1);
123             }
124              
125 1   50 1 1 5 sub scheme { shift->{scope}{scheme} // 'http' }
126 1   50 1 1 7 sub http_version { shift->{scope}{http_version} // '1.1' }
127 0     0 0 0 sub client { shift->{scope}{client} }
128 0     0 0 0 sub server { shift->{scope}{server} }
129              
130              
131             # Application state (injected by PAGI::Lifespan, read-only)
132             sub state {
133 4     4 1 10 my $self = shift;
134 4   100     52 return $self->{scope}{state} // {};
135             }
136              
137             # Path parameter accessors - captured from URL path by router
138             # Stored in scope->{path_params} for router-agnostic access
139             sub path_params {
140 2     2 1 4 my ($self) = @_;
141 2   100     32 return $self->{scope}{path_params} // {};
142             }
143              
144             sub path_param {
145 6     6 1 27 my ($self, $name) = @_;
146 6   100     18 my $params = $self->{scope}{path_params} // {};
147 6         23 return $params->{$name};
148             }
149              
150             # Connection state accessors
151 4     4 1 30 sub connection_state { shift->{_state} }
152              
153             sub is_started {
154 114     114 1 1249 my $self = shift;
155 114         318 return $self->{_state} eq 'started';
156             }
157              
158             sub is_closed {
159 127     127 1 795 my $self = shift;
160 127         482 return $self->{_state} eq 'closed';
161             }
162              
163             # Disconnect reason - why the connection closed
164             # Common values: 'client_closed', 'write_error', 'send_timeout', 'idle_timeout'
165             sub disconnect_reason {
166 0     0 1 0 my $self = shift;
167 0         0 return $self->{_disconnect_reason};
168             }
169              
170             # Outbound flow-control introspection (delegates to the pagi.transport handle)
171             sub buffered_amount {
172 2     2 1 8 my $self = shift;
173 2         6 my $t = $self->{scope}{'pagi.transport'};
174 2 100       7 return 0 unless $t;
175 1         3 return $t->buffered_amount;
176             }
177              
178             sub high_water_mark {
179 2     2 1 341 my $self = shift;
180 2         4 my $t = $self->{scope}{'pagi.transport'};
181 2 100       7 return undef unless $t;
182 1         4 return $t->high_water_mark;
183             }
184              
185             sub low_water_mark {
186 2     2 1 309 my $self = shift;
187 2         5 my $t = $self->{scope}{'pagi.transport'};
188 2 100       5 return undef unless $t;
189 1         4 return $t->low_water_mark;
190             }
191              
192             sub on_high_water {
193 2     2 1 29 my ($self, $cb) = @_;
194 2         4 my $t = $self->{scope}{'pagi.transport'};
195 2 100 66     13 $t->on_high_water($cb) if $t && $t->can('on_high_water');
196 2         9 return $self;
197             }
198              
199             sub on_drain {
200 2     2 1 374 my ($self, $cb) = @_;
201 2         5 my $t = $self->{scope}{'pagi.transport'};
202 2 100 66     13 $t->on_drain($cb) if $t && $t->can('on_drain');
203 2         9 return $self;
204             }
205              
206             sub is_writable {
207 3     3 1 5 my $self = shift;
208 3         7 my $t = $self->{scope}{'pagi.transport'};
209 3 100       17 return 1 unless $t;
210 2         5 my $high = $t->high_water_mark;
211 2 50       6 return 1 unless defined $high;
212 2 100       5 return $t->buffered_amount < $high ? 1 : 0;
213             }
214              
215             # Internal state setters
216             sub _set_state {
217 54     54   106 my ($self, $state) = @_;
218 54         85 $self->{_state} = $state;
219             }
220              
221             sub _set_closed {
222 24     24   53 my ($self) = @_;
223 24         39 $self->{_state} = 'closed';
224             }
225              
226             # Start the SSE stream
227 47     47 1 668 async sub start {
228 47         63 my ($self, %opts) = @_;
229              
230             # Idempotent - don't start twice
231 47 100 66     94 return $self if $self->is_started || $self->is_closed;
232              
233             my $event = {
234             type => 'sse.start',
235 45   100     190 status => $opts{status} // 200,
236             };
237 45 100       81 $event->{headers} = $opts{headers} if exists $opts{headers};
238              
239 45         106 await $self->{send}->($event);
240 45         1995 $self->_set_state('started');
241              
242 45         203 return $self;
243             }
244              
245             # Enable keepalive - sends sse.keepalive event to server
246             # Server handles the timer; this is loop-agnostic
247 8     8 1 22 async sub keepalive {
248 8         14 my ($self, $interval, $comment) = @_;
249              
250 8         58 await $self->{send}->({
251             type => 'sse.keepalive',
252             interval => $interval // 0,
253             comment => $comment // '',
254             });
255              
256 8         371 return $self;
257             }
258              
259             # Single header lookup (case-insensitive, returns last value)
260             sub header {
261 9     9 1 26 my ($self, $name) = @_;
262 9         25 $name = lc($name);
263 9         53 my $value;
264 9   50     15 for my $pair (@{$self->{scope}{headers} // []}) {
  9         39  
265 15 100       47 if (lc($pair->[0]) eq $name) {
266 6         12 $value = $pair->[1];
267             }
268             }
269 9         49 return $value;
270             }
271              
272             # All headers as Hash::MultiValue (cached in scope)
273             sub headers {
274 2     2 1 3 my $self = shift;
275 2 50       7 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
276              
277 2         3 my @pairs;
278 2   50     2 for my $pair (@{$self->{scope}{headers} // []}) {
  2         7  
279 5         11 push @pairs, lc($pair->[0]), $pair->[1];
280             }
281              
282 2         13 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
283 2         87 return $self->{scope}{'pagi.request.headers'};
284             }
285              
286             # All values for a header
287             sub header_all {
288 2     2 1 5 my ($self, $name) = @_;
289 2         7 return $self->headers->get_all(lc($name));
290             }
291              
292             # Get Last-Event-ID header from client (for reconnection)
293             sub last_event_id {
294 6     6 1 24 my ($self) = @_;
295 6         17 return $self->header('last-event-id');
296             }
297              
298             # Send data-only event
299 10     10 1 73 async sub send {
300 10         13 my ($self, $data) = @_;
301              
302 10 100       16 croak "Cannot send on closed SSE connection" if $self->is_closed;
303              
304             # Auto-start if not started
305 9 100       13 await $self->start unless $self->is_started;
306              
307 9         66 await $self->{send}->({
308             type => 'sse.send',
309             data => $data,
310             });
311              
312 9         190 return $self;
313             }
314              
315             # Send JSON-encoded data
316 5     5 1 54 async sub send_json {
317 5         8 my ($self, $data) = @_;
318              
319 5 50       9 croak "Cannot send on closed SSE connection" if $self->is_closed;
320              
321 5 100       7 await $self->start unless $self->is_started;
322              
323 5         52 my $json = JSON::MaybeXS::encode_json($data);
324              
325 5         27 await $self->{send}->({
326             type => 'sse.send',
327             data => $json,
328             });
329              
330 5         112 return $self;
331             }
332              
333             # Send full SSE event with all fields
334 15     15 1 189 async sub send_event {
335 15         74 my ($self, %opts) = @_;
336              
337 15 50       34 croak "Cannot send on closed SSE connection" if $self->is_closed;
338 15 50       37 croak "send_event requires 'data' parameter" unless exists $opts{data};
339              
340 15 100       25 await $self->start unless $self->is_started;
341              
342             # Auto-encode hashref/arrayref data as JSON
343 15         180 my $data = $opts{data};
344 15 100       33 if (ref $data) {
345 13         109 $data = JSON::MaybeXS::encode_json($data);
346             }
347              
348 15         34 my $event = {
349             type => 'sse.send',
350             data => $data,
351             };
352              
353 15 100       43 $event->{event} = $opts{event} if defined $opts{event};
354 15 100       39 $event->{id} = "$opts{id}" if defined $opts{id};
355 15 100       32 $event->{retry} = int($opts{retry}) if defined $opts{retry};
356              
357 15         58 await $self->{send}->($event);
358              
359 15         396 return $self;
360             }
361              
362             # Safe send - returns bool instead of throwing
363 10     10 1 78 async sub try_send {
364 10         17 my ($self, $data) = @_;
365 10 100       17 return 0 if $self->is_closed;
366              
367 9         9 eval {
368 9 50       14 await $self->start unless $self->is_started;
369 9         31 await $self->{send}->({
370             type => 'sse.send',
371             data => $data,
372             });
373             };
374 9 100       1411 if (my $err = $@) {
375 7         17 $self->_set_closed;
376 7         18 await $self->_trigger_error($err);
377 7         249 return 0;
378             }
379 2         6 return 1;
380             }
381              
382 2     2 1 26 async sub try_send_json {
383 2         3 my ($self, $data) = @_;
384 2 50       5 return 0 if $self->is_closed;
385              
386 2         3 eval {
387 2 50       4 await $self->start unless $self->is_started;
388 2         12 my $json = JSON::MaybeXS::encode_json($data);
389 2         7 await $self->{send}->({
390             type => 'sse.send',
391             data => $json,
392             });
393             };
394 2 50       48 if (my $err = $@) {
395 0         0 $self->_set_closed;
396 0         0 await $self->_trigger_error($err);
397 0         0 return 0;
398             }
399 2         6 return 1;
400             }
401              
402             # Send SSE comment (doesn't trigger onmessage in browser)
403 1     1 0 2 async sub send_comment {
404 1         2 my ($self, $comment) = @_;
405              
406 1 50       2 croak "Cannot send on closed SSE connection" if $self->is_closed;
407              
408 1 50       2 await $self->start unless $self->is_started;
409              
410 1         4 await $self->{send}->({
411             type => 'sse.comment',
412             comment => $comment,
413             });
414              
415 1         22 return $self;
416             }
417              
418 1     1 0 2 async sub try_send_comment {
419 1         2 my ($self, $comment) = @_;
420 1 50       2 return 0 if $self->is_closed;
421              
422 1         2 eval {
423 1 50       2 await $self->start unless $self->is_started;
424 1         4 await $self->{send}->({
425             type => 'sse.comment',
426             comment => $comment,
427             });
428             };
429 1 50       22 if (my $err = $@) {
430 0         0 $self->_set_closed;
431 0         0 await $self->_trigger_error($err);
432 0         0 return 0;
433             }
434 1         3 return 1;
435             }
436              
437 2     2 1 26 async sub try_send_event {
438 2         7 my ($self, %opts) = @_;
439 2 50       6 return 0 if $self->is_closed;
440              
441 2         3 eval {
442 2 50       4 await $self->start unless $self->is_started;
443              
444 2   50     6 my $data = $opts{data} // '';
445 2 50       4 if (ref $data) {
446 0         0 $data = JSON::MaybeXS::encode_json($data);
447             }
448              
449 2         5 my $event = {
450             type => 'sse.send',
451             data => $data,
452             };
453 2 50       6 $event->{event} = $opts{event} if defined $opts{event};
454 2 50       11 $event->{id} = "$opts{id}" if defined $opts{id};
455 2 50       5 $event->{retry} = int($opts{retry}) if defined $opts{retry};
456              
457 2         5 await $self->{send}->($event);
458             };
459 2 50       49 if (my $err = $@) {
460 0         0 $self->_set_closed;
461 0         0 await $self->_trigger_error($err);
462 0         0 return 0;
463             }
464 2         5 return 1;
465             }
466              
467             # Internal: trigger error callbacks
468 7     7   10 async sub _trigger_error {
469 7         11 my ($self, $error) = @_;
470              
471 7         15 for my $cb (@{$self->{_on_error}}) {
  7         56  
472 7         16 eval {
473 7         15 my $r = $cb->($self, $error);
474 6 100 66     81 if (blessed($r) && $r->isa('Future')) {
475 2         5 await $r;
476             }
477             };
478 7 100       47 if ($@) {
479 2         48 warn "PAGI::SSE on_error callback error: $@";
480             }
481             }
482              
483             # If no error handlers registered, warn
484 7 100       8 if (!@{$self->{_on_error}}) {
  7         33  
485 2         20 warn "PAGI::SSE error: $error";
486             }
487             }
488              
489             # Register close callback
490             sub on_close {
491 16     16 1 261 my ($self, $callback) = @_;
492 16         21 push @{$self->{_on_close}}, $callback;
  16         36  
493 16         32 return $self;
494             }
495              
496             # Register error callback
497             sub on_error {
498 8     8 1 81 my ($self, $callback) = @_;
499 8         8 push @{$self->{_on_error}}, $callback;
  8         15  
500 8         13 return $self;
501             }
502              
503             # Generic event dispatcher - dispatches to on_close or on_error
504             sub on {
505 4     4 1 103 my ($self, $event, $callback) = @_;
506 4 100       17 if ($event eq 'close') {
    100          
507 2         5 return $self->on_close($callback);
508             }
509             elsif ($event eq 'error') {
510 1         5 return $self->on_error($callback);
511             }
512             else {
513 1         165 croak "Unknown event type: $event (expected close or error)";
514             }
515             }
516              
517             # Internal: run all on_close callbacks
518 14     14   17 async sub _run_close_callbacks {
519 14         16 my ($self) = @_;
520              
521             # Only run once
522 14 50       29 return if $self->{_close_callbacks_ran};
523 14         24 $self->{_close_callbacks_ran} = 1;
524              
525 14   100     33 my $reason = $self->{_disconnect_reason} // 'unknown';
526              
527 14         17 for my $cb (@{$self->{_on_close}}) {
  14         30  
528 13         16 eval {
529 13         27 my $r = $cb->($self, $reason);
530 13 100 100     133 if (blessed($r) && $r->isa('Future')) {
531 1         5 await $r;
532             }
533             };
534 13 50       48 if ($@) {
535 0         0 warn "PAGI::SSE on_close callback error: $@";
536             }
537             }
538              
539             # Clear all callback arrays to break any closure-based cycles
540 14         58 $self->{_on_close} = [];
541 14         54 $self->{_on_error} = [];
542             }
543              
544             # Close the connection
545             sub close {
546 3     3 1 9 my ($self) = @_;
547              
548 3 50       11 return $self if $self->is_closed;
549              
550 3         8 $self->_set_closed;
551 3         8 $self->_run_close_callbacks->get;
552              
553 3         71 return $self;
554             }
555              
556             # Wait for disconnect
557 12     12 1 78 async sub run {
558 12         20 my ($self) = @_;
559              
560 12 100       24 await $self->start unless $self->is_started;
561              
562 12         43 while (!$self->is_closed) {
563 12         15 my $event = eval { await $self->{receive}->() };
  12         26  
564 12 100       672 if (my $err = $@) {
565 1         10 warn "PAGI::SSE receive error: $err";
566 1         6 last;
567             }
568              
569 11   50     25 my $type = $event->{type} // '';
570              
571 11 50       28 if ($type eq 'sse.disconnect') {
572 11   50     39 $self->{_disconnect_reason} = $event->{reason} // 'client_closed';
573 11         24 $self->_set_closed;
574 11         23 await $self->_run_close_callbacks;
575 11         337 last;
576             }
577             }
578              
579 12         36 return;
580             }
581              
582             # Iterate over items and send events
583 4     4 1 107 async sub each {
584 4         6 my ($self, $source, $callback) = @_;
585              
586 4 100       7 await $self->start unless $self->is_started;
587              
588 4         26 my $index = 0;
589              
590             # Handle arrayref
591 4 100       13 if (ref $source eq 'ARRAY') {
    50          
592 3         6 for my $item (@$source) {
593 7 50       30 last if $self->is_closed;
594              
595 7         14 my $result = await $callback->($item, $index++);
596              
597             # If callback returns a hashref, treat as event spec
598 7 100       253 if (ref $result eq 'HASH') {
599 2         29 await $self->send_event(%$result);
600             }
601             }
602             }
603             # Handle coderef iterator
604             elsif (ref $source eq 'CODE') {
605 1         2 while (!$self->is_closed) {
606 4         6 my $item = $source->();
607 4 100       13 last unless defined $item;
608              
609 3         5 my $result = await $callback->($item, $index++);
610              
611 3 50       156 if (ref $result eq 'HASH') {
612 0         0 await $self->send_event(%$result);
613             }
614             }
615             }
616             else {
617 0         0 croak "each() requires arrayref or coderef, got " . ref($source);
618             }
619              
620 4         26 return $self;
621             }
622              
623             # Periodic callback execution with interval delay
624             # Requires Future::IO to be installed
625 1     1 1 3 async sub every {
626 1         2 my ($self, $interval, $callback) = @_;
627              
628 1 50 33     7 croak "every() requires interval" unless defined $interval && $interval > 0;
629 1 50       5 croak "every() requires callback coderef" unless ref $callback eq 'CODE';
630              
631             # Future::IO is required for every() - fail clearly if not available
632 1 50       2 eval { require Future::IO; 1 }
  1         731  
  1         29414  
633             or croak "every() requires Future::IO to be installed. "
634             . "Install it with: cpanm Future::IO";
635              
636             # Future::IO must be configured with a backend
637 20     20   177 no warnings 'once';
  20         30  
  20         13693  
638 1 50       226 croak "Future::IO backend not configured. Add to your app.pl:\n"
639             . " use Future::IO::Impl::IOAsync;\n"
640             unless $Future::IO::IMPL;
641              
642 0 0         await $self->start unless $self->is_started;
643              
644             # Start background disconnect monitor
645 0           my $disconnect_future = $self->_watch_for_disconnect;
646              
647 0           while (!$self->is_closed) {
648             # Execute the callback
649 0           my $ok = eval { await $callback->(); 1 };
  0            
  0            
650 0 0         unless ($ok) {
651 0           my $err = $@;
652             # Callback failed - connection likely closed or error occurred
653 0           $self->_set_closed;
654 0           await $self->_run_close_callbacks;
655 0           last;
656             }
657              
658             # Race between sleep and disconnect detection
659 0           my $sleep_future = Future::IO->sleep($interval);
660 0           my $winner = await Future->wait_any($sleep_future, $disconnect_future);
661              
662             # If disconnect won, exit the loop
663 0 0         if ($self->is_closed) {
664 0 0 0       $sleep_future->cancel if $sleep_future->can('cancel') && !$sleep_future->is_ready;
665 0           last;
666             }
667             }
668              
669             # Clean up disconnect monitor if still running
670 0 0 0       $disconnect_future->cancel if $disconnect_future->can('cancel') && !$disconnect_future->is_ready;
671              
672 0           return $self;
673             }
674              
675             # Background future that completes when disconnect is detected
676             sub _watch_for_disconnect {
677 0     0     my ($self) = @_;
678              
679 0           my $receive = $self->{receive};
680 0           require Scalar::Util;
681 0           my $weak_self = $self;
682 0           Scalar::Util::weaken($weak_self);
683              
684 0     0     return (async sub {
685 0   0       while ($weak_self && !$weak_self->is_closed) {
686 0           my $event = eval { await $receive->() };
  0            
687 0 0         last unless $event;
688              
689 0   0       my $type = $event->{type} // '';
690 0 0         if ($type eq 'sse.disconnect') {
691 0 0         if ($weak_self) {
692 0   0       $weak_self->{_disconnect_reason} = $event->{reason} // 'client_closed';
693 0           $weak_self->_set_closed;
694 0           await $weak_self->_run_close_callbacks;
695             }
696 0           last;
697             }
698             }
699 0           })->();
700             }
701              
702             1;
703              
704             __END__