File Coverage

blib/lib/IPC/MicroSocket/Client.pm
Criterion Covered Total %
statement 65 82 79.2
branch 10 18 55.5
condition n/a
subroutine 12 14 85.7
pod 4 5 80.0
total 91 119 76.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2024-2026 -- leonerd@leonerd.org.uk
5              
6 3     3   253288 use v5.36;
  3         7  
7 3     3   20 use Object::Pad 0.807;
  3         19  
  3         102  
8 3     3   267 use Future::AsyncAwait;
  3         6  
  3         37  
9 3     3   681 use Sublike::Extended 0.29 'sub';
  3         696  
  3         13  
10 3     3   673 use Syntax::Keyword::Match;
  3         2979  
  3         27  
11              
12 3     3   743 use IPC::MicroSocket;
  3         4  
  3         471  
13              
14             class IPC::MicroSocket::Client 0.05;
15              
16 3     3   220 use Carp;
  3         4  
  3         162  
17              
18 3     3   12 use Future::Selector;
  3         3  
  3         4521  
19              
20             =head1 NAME
21              
22             C - client connector
23              
24             =head1 SYNOPSIS
25              
26             =for highlighter perl
27              
28             use v5.36;
29             use Future::AsyncAwait;
30             use IPC::MicroSocket::Client;
31              
32             my $client = IPC::MicroSocket::Client->new_unix( path => "my-app.sock" );
33              
34             say await $client->request( "PING" );
35              
36             =head1 DESCRIPTION
37              
38             This module provides the client connector class for L.
39              
40             =cut
41              
42             inherit IPC::MicroSocket::Connection;
43              
44             field %resp_f_by_tag;
45             field %subscribes_by_topic;
46              
47             =head1 CONSTRUCTOR
48              
49             =cut
50              
51             =head2 new_unix
52              
53             $client = IPC::MicroSocket::Client->new_unix( path => $path );
54              
55             A convenience constructor for connecting a new client instance to a given
56             UNIX socket path.
57              
58             =cut
59              
60             # class method
61 0     0 1 0 sub new_unix ( $class, :$path )
  0         0  
  0         0  
  0         0  
62             {
63 0         0 require IO::Socket::UNIX;
64              
65 0 0       0 my $sock = IO::Socket::UNIX->new( Peer => $path ) or
66             croak "Cannot connect to server - $@";
67              
68 0         0 return $class->new( fh => $sock );
69             }
70              
71             =head2 new_inprocess
72              
73             $client = IPC::MicroSocket::Client->new_inprocess( $server );
74              
75             I
76              
77             Creates a new client instance connected to an existing
78             L instance within the same process. This allows
79             operation of code that would normally use a separately-connected client to
80             run from within the same process as the server.
81              
82             I the implementation uses a C socketpair to transport
83             bytes as if an external socket was connected, but a later version of this
84             module may make use of some other, more efficient internal mechanism instead.
85              
86             =cut
87              
88             # class method
89 0     0 1 0 sub new_inprocess ( $class, $server )
  0         0  
  0         0  
  0         0  
90             {
91 0         0 require IO::Socket::UNIX;
92              
93 0 0       0 my ( $sock_for_server, $sock_for_client ) = IO::Socket::UNIX->socketpair(
94             Socket::AF_UNIX, Socket::SOCK_STREAM, 0,
95             ) or croak "Cannot socketpair() - $!";
96              
97 0         0 my $client = $class->new( fh => $sock_for_client );
98              
99 0         0 $server->_accepted( $sock_for_server );
100              
101 0         0 return $client;
102             }
103              
104 3     3 0 10 method on_recv ( $sigil, @args )
  3         14  
  3         9  
  3         10  
  3         4  
105             {
106             match( $sigil : eq ) {
107             case( ")" ) {
108 1         3 my $tag = shift @args;
109 1 50       44 my $f = delete $resp_f_by_tag{ $tag } or return;
110 1         7 $f->done( @args );
111             }
112             case( "#" ) {
113 1         4 my $tag = shift @args;
114 1 50       6 my $f = delete $resp_f_by_tag{ $tag } or return;
115 1         11 $f->fail( $args[0], slurm => @args[1..$#args] );
116             }
117             case( "!" ) {
118 1         3 my $topic = shift @args;
119             $subscribes_by_topic{ $topic } and
120 1 50       10 $subscribes_by_topic{ $topic }->( @args );
121             }
122 3 100       21 default {
    100          
    50          
123 0         0 warn "Unrecognised message sigil $sigil\n";
124             }
125             }
126             }
127              
128             field $selector;
129 3     3   9 method _selector ()
  3         16  
  3         5  
130             {
131 3 100       20 return $selector if $selector;
132              
133 1         19 $selector = Future::Selector->new;
134 1         27 $selector->add(
135             data => "runloop",
136             f => $self->_recv,
137             );
138              
139 1         2011 return $selector;
140             }
141              
142             =head1 METHODS
143              
144             =cut
145              
146             =head2 request
147              
148             @response = await $client->request( @args );
149              
150             Sends a C frame with the given arguments, waiting for a response. The
151             returned future will complete with its C frame.
152              
153             =cut
154              
155             field $last_tag = 0;
156 2     2 1 8963 async method request ( @args )
  2         10  
  2         37  
  2         26  
157 2         6 {
158 2         16 my $tag = pack "C", ( $last_tag += 1 ) %= 256;
159              
160 2         21 await $self->send( '(', $tag, @args );
161              
162 2         12857 my $f = ( $resp_f_by_tag{ $tag } = Future->new );
163              
164 2         29 my $s = $self->_selector;
165 2         10 await $s->run_until_ready( $f );
166 1         466 return await $f;
167             }
168              
169             =head2 subscribe
170              
171             await $client->subscribe( $topic, $on_recv );
172              
173             $on_recv->( @args );
174              
175             Sends a C frame for the given topic name, then waits indefinitely
176             for C frames that match it. Each received frame will invoke the
177             C<$on_recv> callback.
178              
179             Note that the L returned by this method should not complete in normal
180             circumstances but will remain pending forever.
181              
182             =cut
183              
184 1     1 1 7538 async method subscribe ( $topic, $on_recv )
  1         7  
  1         3  
  1         3  
  1         2  
185 1         4 {
186 1         7 await $self->send( '+', $topic );
187              
188 1         2522 $subscribes_by_topic{ $topic } = $on_recv;
189              
190 1         7 my $s = $self->_selector;
191 1         7 await $s->select while 1;
192             }
193              
194             =head1 AUTHOR
195              
196             Paul Evans
197              
198             =cut
199              
200             0x55AA;