File Coverage

blib/lib/IPC/MicroSocket.pm
Criterion Covered Total %
statement 78 80 97.5
branch 6 10 60.0
condition n/a
subroutine 16 16 100.0
pod n/a
total 100 106 94.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2024-2026 -- leonerd@leonerd.org.uk
5              
6 6     6   825389 use v5.36;
  6         22  
7 6     6   581 use Object::Pad 0.817; # class :abstract
  6         7275  
  6         239  
8 6     6   1276 use Future::AsyncAwait;
  6         26854  
  6         39  
9 6     6   3122 use Syntax::Keyword::Match;
  6         19390  
  6         41  
10              
11             package IPC::MicroSocket 0.05;
12              
13             =head1 NAME
14              
15             C - minimal request/response or pub/sub mechanism
16              
17             =head1 DESCRIPTION
18              
19             This distribution provides two main modules for implementing servers or
20             clients that communicate over a local UNIX socket, to exchange messages. Each
21             client connects to one server, and a server supports multiple connected
22             clients.
23              
24             =over 4
25              
26             =item *
27              
28             To implement a client, see L.
29              
30             =item *
31              
32             To implement a server, see L.
33              
34             =back
35              
36             =head1 MESSAGES
37              
38             There are two supported kinds of message flows:
39              
40             =head2 Request/Response
41              
42             A client sends a request message to the server, which consists of a command
43             name and a list of arguments. The server eventually sends a response to it,
44             which contains a list of values. Responses are not necessarily delivered in
45             the requested order; servers are permitted to respond asynchronously. Requests
46             may also fail, sending a different kind of failure response to the client
47             instead.
48              
49             =head2 Publish/Subscribe
50              
51             A client subscribes to a given topic string on the server. The server can emit
52             messages to all the clients that subscribe to a particular topic.
53              
54             =head1 DATA ENCODING
55              
56             All transmitted strings are purely bytes. If you need to transmit Unicode
57             text, you must encode/decode it. If you need to send data structures that are
58             not plain byte strings, you must serialise/deserialise them.
59              
60             =cut
61              
62             class IPC::MicroSocket::Connection :abstract
63             {
64 6     6   5969 use Future::Buffer;
  6         18787  
  6         468  
65 6     6   1512 use Future::IO 0.17;
  6         76919  
  6         333  
66 6     6   2686 use Future::Selector 0.02; # ->run_until_ready
  6         24333  
  6         6651  
67              
68             field $fh :param;
69             ADJUST {
70             # Normally these are real filehandles, but in unit tests they're plain
71             # strings. The 'ref' test just stops that from being a problem
72             $fh->blocking( 0 ) if ref $fh;
73             }
74              
75             # A message is a sigil (U8), argc (U8), args (argc * U32+bytes)
76              
77 4     4   10991 async method _recv ()
  4         15  
  4         7  
78 4         9 {
79 13     13   422 my $buffer = Future::Buffer->new(
80 13         114 fill => sub () { Future::IO->read( $fh, 256 ) },
  13         23  
81 4         41 );
82              
83 4         47 MESSAGE: while(1) {
84 13 50       7058 defined( my $sigil = await $buffer->read_exactly( 1 ) )
85             or last MESSAGE;
86              
87 9 50       46755 defined( my $argc = unpack "C", await $buffer->read_exactly( 1 ) )
88             or last MESSAGE;
89              
90 9         1306 my @args;
91 9         41 foreach my $i ( 1 .. $argc ) {
92 19 50       716 defined( my $len = unpack "L>", await $buffer->read_exactly( 4 ) )
93             or last MESSAGE;
94              
95 19         1535 push @args, await $buffer->read_exactly( $len );
96             }
97              
98 9         660 $self->on_recv( $sigil, @args );
99             }
100             }
101              
102             method on_recv ( $sigil, @args );
103              
104 9     9   1104 async method send ( $sigil, @args )
  9         36  
  9         21  
  9         25  
  9         21  
105 9         29 {
106             await Future::IO->write_exactly( $fh,
107             join "",
108             $sigil,
109             pack( "C", scalar @args ),
110 9         58 map { pack( "L>", length ) . $_ } @args
111             );
112             }
113             }
114              
115             # Message sigils
116             # '(' $tag $func @args -- request
117             # ')' $tag @args -- response OK
118             # '#' $tag @args -- response fail
119             # '+' $topic -- subscribe
120             # '!' $topic @args -- publish
121              
122             class IPC::MicroSocket::ServerConnection :abstract
123             {
124             inherit IPC::MicroSocket::Connection;
125              
126             method on_request;
127              
128             method on_subscribe ( $topic );
129              
130             field %subscribed_topics;
131 3     3   3606 method is_subscribed ( $topic ) { return $subscribed_topics{ $topic }; }
  3         11  
  3         7  
  3         6  
  3         36  
132              
133             field $selector = Future::Selector->new;
134              
135 2     2   14 async method run ()
  2         7  
  2         3  
136 2         11 {
137 2         15 await $selector->run_until_ready( $self->_recv );
138 0         0 return;
139             }
140              
141 5     5   14 method on_recv ( $sigil, @args )
  5         28  
  5         13  
  5         16  
  5         13  
142             {
143             match( $sigil : eq ) {
144             case( "(" ) {
145 3         7 my $tag = shift @args;
146 2         7 $selector->add(
147             data => undef,
148             f => $self->on_request( @args )
149             ->then(
150             # done
151 2     2   24 sub ( @result ) { $self->send( ")", $tag, @result ) },
  2         602  
  2         6  
152             # fail
153 1     1   7 sub ( $err, @ ) { $self->send( "#", $tag, $err ) },
  1         269  
  1         3  
  1         2  
154 3         23 ),
155             );
156             }
157             case( "+" ) {
158 2         6 my $topic = shift @args;
159 2         11 $subscribed_topics{ $topic } = 1;
160 2         18 $self->on_subscribe( $topic );
161             }
162              
163 5 100       30 default {
    50          
164 0         0 warn "TODO: Unrecognised sigil $sigil\n";
165             }
166             }
167             }
168              
169 2     2   5 method publish ( $topic, @args )
  2         9  
  2         5  
  2         33  
  2         6  
170             {
171 2         11 $selector->add(
172             data => undef,
173             f => $self->send( "!", $topic, @args ),
174             );
175             }
176             }
177              
178             =head1 FAQs
179              
180             =head2 Why not ZeroMQ?
181              
182             I found ZeroMQ to be a lot of effort to use from Perl, and most critically it
183             does not appear to support both request/response and publish/subscribe message
184             flows to share the same UNIX socket. To support that in ZeroMQ it would appear
185             to be necessary to create two separate endpoints, one for each kind of message
186             flow.
187              
188             =head2 Why not JSON/YAML/your-favourite-serialisation?
189              
190             I mostly built this for a few very-small use-cases involving simple byte
191             strings or plain ASCII text, for which the overhead of JSON, YAML, or other
192             kinds of serialisation would be unnecessary. As the presented message
193             semantics are just opaque byte buffers, you are free to layer on top whatever
194             kind of message serialisation you wish.
195              
196             =head2 Why not IO::Async/Mojo/your-favourite-event-system?
197              
198             I wanted to use this distribution as an exercise in writing "pure"
199             L-driven event logic, as an experiment to test out L
200             and other related design shapes.
201              
202             =head1 TODO
203              
204             There are a number of additional features that this module I support.
205             Each will be considered if a use-case arises. Each would add extra code and
206             possible dependencies, and take away from the "micro" nature of the module, so
207             each would have to be considered on individual merit.
208              
209             =over 4
210              
211             =item *
212              
213             Configurations for encoding and serialisation of arguments.
214              
215             =item *
216              
217             Unsubscribe from individual topics by request.
218              
219             =item *
220              
221             Helper methods for other socket types, such as TCP sockets.
222              
223             =item *
224              
225             Flexible matching of subscription topics; such as string prefixes or delimited
226             component paths.
227              
228             =item *
229              
230             Other kinds of message flows, such as server-buffered streams with atomic
231             catchup-and-subscribe semantics ensuring clients receive all the buffer.
232              
233             =back
234              
235             =head1 AUTHOR
236              
237             Paul Evans
238              
239             =cut
240              
241             0x55AA;