File Coverage

blib/lib/IPC/MicroSocket/Server.pm
Criterion Covered Total %
statement 70 74 94.5
branch 6 8 75.0
condition n/a
subroutine 16 17 94.1
pod 3 4 75.0
total 95 103 92.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2024-2026 -- leonerd@leonerd.org.uk
5              
6 3     3   628242 use v5.36;
  3         12  
7 3     3   20 use Object::Pad 0.817; # class :abstract
  3         28  
  3         139  
8 3     3   422 use Future::AsyncAwait;
  3         6  
  3         19  
9 3     3   1460 use Sublike::Extended 0.29 'method';
  3         1658  
  3         23  
10              
11 3     3   1277 use IPC::MicroSocket;
  3         8  
  3         568  
12              
13             package IPC::MicroSocket::Server 0.05; # this 'package' statement just to keep CPAN indexers happy
14             class IPC::MicroSocket::Server :abstract;
15              
16 3     3   243 use Carp;
  3         6  
  3         197  
17              
18 3     3   15 use Future::Selector;
  3         4  
  3         5283  
19              
20             =head1 NAME
21              
22             C - server role
23              
24             =head1 SYNOPSIS
25              
26             =for highlighter perl
27              
28             use v5.36;
29             use Future::AsyncAwait;
30             use Object::Pad v0.807;
31             use IPC::MicroSocket::Server;
32              
33             class ExampleServer {
34             apply IPC::MicroSocket::Server;
35              
36             async method on_connection_request ( $conn, $cmd, @args )
37             {
38             say "Connection sends $cmd";
39             return "Response for $cmd";
40             }
41              
42             method on_connection_subscribe {}
43             }
44              
45             await ExampleServer->new_unix( path => "my-app.sock" )
46             ->run;
47              
48             =head1 DESCRIPTION
49              
50             This module provides the server role for L. This is an
51             incomplete role, which requires any class that applies it to provide some
52             methods that contain the actual behaviour for the server.
53              
54             =cut
55              
56             field $fh :param;
57             ADJUST {
58             # Normally these are real filehandles, but in unit tests they're plain
59             # strings. The 'ref' test just stops that from being a problem
60             $fh->blocking( 0 ) if ref $fh;
61             }
62              
63             field $connection_class :param = "IPC::MicroSocket::Server::_Connection";
64              
65 2     2 0 2337 field @clients :reader;
  2         9  
66              
67             =head1 CONSTRUCTOR
68              
69             =cut
70              
71             =head2 new_unix
72              
73             $server = IPC::MicroSocket::Server->new_unix( path => $path, %args );
74              
75             A convenience constructor for creating a new server instance listening on the
76             given UNIX socket path.
77              
78             Note as this is a role, this must be invoked on a class that applies the role
79             and implements the missing methods.
80              
81             Takes the following named arguments:
82              
83             =over 4
84              
85             =item listen => INT
86              
87             Sets the size of the C queue; defaults to 5 if not specified.
88              
89             =back
90              
91             I any other remaining arguments are passed to the instance
92             constructor of the underlying object class.
93              
94             =cut
95              
96 1     1 1 20947 method new_unix :common ( :$path, :$listen //= 5, %rest )
  1         4  
  1         7  
  1         2  
97             {
98 1         9 require IO::Socket::UNIX;
99              
100 1 50       5 my $listensock = IO::Socket::UNIX->new(
101             Local => $path,
102             Listen => $listen,
103             ReuseAddr => 1,
104             ) or croak "Cannot create socket - $@";
105              
106 1         34 return $class->new( fh => $listensock, %rest );
107             }
108              
109             field $selector;
110 2     2   7 method _selector ()
  2         7  
  2         2  
111             {
112 2 100       28 return $selector if $selector;
113              
114 1         18 $selector = Future::Selector->new;
115              
116 2     2   2823 $selector->add(
117             data => "acceptor",
118 2         7 gen => async sub () {
  2         4  
119 2         17 $self->_accepted( await Future::IO->accept( $fh ) );
120 1 50       23 },
121             ) if $fh; # might be false in some unit tests
122              
123 1         9612 return $selector;
124             }
125              
126 1     1   191 async method _accepted ( $clientsock )
  1         5  
  1         2  
  1         3  
127 1         3 {
128 1         20 push @clients, my $client = $connection_class->new(
129             server => $self,
130             fh => $clientsock,
131             );
132              
133 0     0   0 $self->_selector->add(
134             data => $client,
135             f => $client->run
136 0         0 ->on_ready(sub ( $ ) {
137 0         0 @clients = grep { $_ != $client } @clients;
  0         0  
138 1         5 }),
139             );
140             }
141              
142             =head1 METHODS
143              
144             =cut
145              
146             =head2 publish
147              
148             $server->publish( $topic, @args );
149              
150             Sends a C frame to every connected client.
151              
152             Note that this is I an C method; the send future for each client
153             becomes owned by the selector for each connected client instance individually.
154              
155             =cut
156              
157 2     2 1 8533 method publish ( $topic, @args )
  2         12  
  2         5  
  2         5  
  2         5  
158             {
159 2         5 foreach my $client ( @clients ) {
160 2 100       13 $client->is_subscribed( $topic ) and $client->publish( $topic, @args );
161             }
162             }
163              
164             =head2 run
165              
166             await $server->run;
167              
168             Returns a L that represents the indefinite runtime of the server
169             instance.
170              
171             =cut
172              
173 1     1 1 2783 method run ()
  1         6  
  1         2  
174             {
175 1         26 return $self->_selector->run;
176             }
177              
178             =head1 REQUIRED METHODS
179              
180             =cut
181              
182             =head2 on_connection_request
183              
184             @response = await $server->on_connection_request( $conn, $cmd, @args );
185              
186             Invoked on receipt of a C frame from a connected client. It should
187             asynchronously return the response list to be sent back to the client.
188              
189             =cut
190              
191             method on_connection_request;
192              
193             =head2 on_connection_subscribe
194              
195             $server->on_connection_subscribe( $conn, $topic );
196              
197             Invoked on receipt of a C frame from a connected client.
198              
199             =cut
200              
201             method on_connection_subscribe;
202              
203             # The default server connection class
204             class IPC::MicroSocket::Server::_Connection
205             {
206             inherit IPC::MicroSocket::ServerConnection;
207              
208             field $server :param;
209              
210 1     1   3 async method on_request ( $cmd, @args )
  1         3  
  1         3  
  1         3  
  1         2  
211 1         3 {
212 1         6 await $server->on_connection_request( $self, $cmd, @args );
213             }
214              
215 1     1   3 method on_subscribe ( $topic )
  1         3  
  1         2  
  1         2  
216             {
217 1         51 $server->on_connection_subscribe( $self, $topic );
218             }
219             }
220              
221             =head1 AUTHOR
222              
223             Paul Evans
224              
225             =cut
226              
227             0x55AA;