File Coverage

lib/PAGI/App/WebSocket/Broadcast.pm
Criterion Covered Total %
statement 21 64 32.8
branch 1 24 4.1
condition 3 14 21.4
subroutine 6 8 75.0
pod 2 4 50.0
total 33 114 28.9


line stmt bran cond sub pod time code
1             package PAGI::App::WebSocket::Broadcast;
2              
3 1     1   493 use strict;
  1         3  
  1         30  
4 1     1   2 use warnings;
  1         1  
  1         34  
5 1     1   3 use Future::AsyncAwait;
  1         1  
  1         6  
6              
7             =head1 NAME
8              
9             PAGI::App::WebSocket::Broadcast - Pub/sub WebSocket broadcast
10              
11             =head1 SYNOPSIS
12              
13             use PAGI::App::WebSocket::Broadcast;
14              
15             my $app = PAGI::App::WebSocket::Broadcast->new->to_app;
16              
17             =cut
18              
19             # Shared state for all connections
20             my %channels; # channel => { clients => { id => send_cb } }
21             my $next_id = 1;
22              
23             sub new {
24 1     1 0 1095 my ($class, %args) = @_;
25              
26             return bless {
27             default_channel => $args{channel} // 'default',
28 1   50     12 echo_self => $args{echo_self} // 0,
      50        
29             }, $class;
30             }
31              
32             sub to_app {
33 1     1 0 1 my ($self) = @_;
34              
35 1         5 my $default_channel = $self->{default_channel};
36 1         3 my $echo_self = $self->{echo_self};
37              
38 0     0   0 return async sub {
39 0         0 my ($scope, $receive, $send) = @_;
40 0 0       0 die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'websocket';
41              
42             # Accept the connection
43 0         0 await $send->({ type => 'websocket.accept' });
44              
45 0         0 my $client_id = $next_id++;
46 0         0 my $channel = $default_channel;
47              
48             # Register client
49 0   0     0 $channels{$channel} //= { clients => {} };
50 0         0 $channels{$channel}{clients}{$client_id} = $send;
51              
52 0         0 eval {
53 0         0 while (1) {
54 0         0 my $event = await $receive->();
55              
56 0 0       0 if ($event->{type} eq 'websocket.receive') {
    0          
57 0   0     0 my $message = $event->{text} // $event->{bytes};
58 0         0 my $is_text = exists $event->{text};
59              
60             # Broadcast to all clients in the channel
61 0         0 my $clients = $channels{$channel}{clients};
62 0         0 for my $id (keys %$clients) {
63 0 0 0     0 next if $id eq $client_id && !$echo_self;
64 0         0 my $client_send = $clients->{$id};
65 0         0 eval {
66 0 0       0 if ($is_text) {
67 0         0 await $client_send->({
68             type => 'websocket.send',
69             text => $message,
70             });
71             } else {
72 0         0 await $client_send->({
73             type => 'websocket.send',
74             bytes => $message,
75             });
76             }
77             };
78             # Remove dead clients
79 0 0       0 if ($@) {
80 0         0 delete $clients->{$id};
81             }
82             }
83             } elsif ($event->{type} eq 'websocket.disconnect') {
84 0         0 last;
85             }
86             }
87             };
88              
89             # Cleanup
90 0         0 delete $channels{$channel}{clients}{$client_id};
91 0 0       0 delete $channels{$channel} if !keys %{$channels{$channel}{clients}};
  0         0  
92 1         7 };
93             }
94              
95             # Class method to broadcast to a channel
96             sub broadcast {
97 0     0 1 0 my ($class, $channel, $message, %opts) = @_;
98              
99 0 0       0 return unless $channels{$channel};
100              
101 0         0 my $is_text = !$opts{binary};
102 0         0 my $clients = $channels{$channel}{clients};
103              
104 0         0 for my $id (keys %$clients) {
105 0         0 my $send = $clients->{$id};
106 0         0 eval {
107 0 0       0 if ($is_text) {
108 0         0 $send->({ type => 'websocket.send', text => $message });
109             } else {
110 0         0 $send->({ type => 'websocket.send', bytes => $message });
111             }
112             };
113 0 0       0 delete $clients->{$id} if $@;
114             }
115             }
116              
117             # Get connected client count
118             sub client_count {
119 1     1 1 3193 my ($class, $channel) = @_;
120 1   50     7 $channel //= undef;
121              
122 1 50       3 if ($channel) {
123 0 0       0 return 0 unless $channels{$channel};
124 0         0 return scalar keys %{$channels{$channel}{clients}};
  0         0  
125             }
126 1         2 my $total = 0;
127 1         3 $total += scalar keys %{$_->{clients}} for values %channels;
  0         0  
128 1         2 return $total;
129             }
130              
131             1;
132              
133             __END__