File Coverage

blib/lib/IO/Async/Listener.pm
Criterion Covered Total %
statement 96 106 90.5
branch 38 58 65.5
condition 1 6 16.6
subroutine 21 22 95.4
pod 7 8 87.5
total 163 200 81.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2024 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Listener 0.805;
7              
8 4     4   20699 use v5.14;
  4         18  
9 4     4   24 use warnings;
  4         82  
  4         293  
10 4     4   26 use base qw( IO::Async::Handle );
  4         7  
  4         2127  
11              
12 4     4   30 use IO::Async::Handle;
  4         7  
  4         109  
13 4     4   17 use IO::Async::OS;
  4         9  
  4         121  
14              
15 4     4   17 use Future 0.33; # ->catch
  4         55  
  4         116  
16              
17 4     4   17 use Errno qw( EAGAIN EWOULDBLOCK );
  4         8  
  4         359  
18              
19 4     4   21 use Socket qw( sockaddr_family SOL_SOCKET SO_ACCEPTCONN SO_TYPE );
  4         6  
  4         281  
20              
21 4     4   19 use Carp;
  4         8  
  4         5840  
22              
23             =head1 NAME
24              
25             C - listen on network sockets for incoming connections
26              
27             =head1 SYNOPSIS
28              
29             =for highlighter language=perl
30              
31             use Future::AsyncAwait;
32             use IO::Async::Listener;
33              
34             use IO::Async::Loop;
35             my $loop = IO::Async::Loop->new;
36              
37             my $listener = IO::Async::Listener->new(
38             on_stream => sub {
39             my ( undef, $stream ) = @_;
40              
41             $stream->configure(
42             on_read => sub {
43             my ( $self, $buffref, $eof ) = @_;
44             $self->write( $$buffref );
45             $$buffref = "";
46             return 0;
47             },
48             );
49              
50             $loop->add( $stream );
51             },
52             );
53              
54             $loop->add( $listener );
55              
56             await $listener->listen(
57             service => "echo",
58             socktype => 'stream',
59             );
60              
61             $loop->run;
62              
63             This object can also be used indirectly via an L:
64              
65             use IO::Async::Stream;
66              
67             use IO::Async::Loop;
68             my $loop = IO::Async::Loop->new;
69              
70             await $loop->listen(
71             service => "echo",
72             socktype => 'stream',
73              
74             on_stream => sub {
75             ...
76             },
77             );
78              
79             $loop->run;
80              
81             =head1 DESCRIPTION
82              
83             This subclass of L adds behaviour which watches a socket in
84             listening mode, to accept incoming connections on them.
85              
86             A Listener can be constructed and given a existing socket in listening mode.
87             Alternatively, the Listener can construct a socket by calling the C
88             method. Either a list of addresses can be provided, or a service name can be
89             looked up using the underlying loop's C method.
90              
91             =cut
92              
93             =head1 EVENTS
94              
95             The following events are invoked, either using subclass methods or CODE
96             references in parameters:
97              
98             =head2 on_accept $clientsocket | $handle
99              
100             Invoked whenever a new client connects to the socket.
101              
102             If neither C nor C parameters are set, this
103             will be invoked with the new client socket directly. If a handle constructor
104             or class are set, this will be invoked with the newly-constructed handle,
105             having the new socket already configured onto it.
106              
107             =head2 on_stream $stream
108              
109             An alternative to C, this is passed an instance of
110             L when a new client connects. This is provided as a
111             convenience for the common case that a Stream object is required as the
112             transport for a Protocol object.
113              
114             This is now vaguely deprecated in favour of using C with a handle
115             constructor or class.
116              
117             =head2 on_socket $socket
118              
119             Similar to C, but constructs an instance of L.
120             This is most useful for C or C sockets.
121              
122             This is now vaguely deprecated in favour of using C with a handle
123             constructor or class.
124              
125             =head2 on_accept_error $socket, $errno
126              
127             Optional. Invoked if the C syscall indicates an error (other than
128             C or C). If not provided, failures of C will
129             be passed to the main C handler.
130              
131             =cut
132              
133             =head1 PARAMETERS
134              
135             The following named parameters may be passed to C or C:
136              
137             =head2 on_accept => CODE
138              
139             =head2 on_stream => CODE
140              
141             =head2 on_socket => CODE
142              
143             CODE reference for the event handlers. Because of the mutually-exclusive
144             nature of their behaviour, only one of these may be set at a time. Setting one
145             will remove the other two.
146              
147             =head2 handle => IO
148              
149             The IO handle containing an existing listen-mode socket.
150              
151             =head2 handle_constructor => CODE
152              
153             Optional. If defined, gives a CODE reference to be invoked every time a new
154             client socket is accepted from the listening socket. It is passed the listener
155             object itself, and is expected to return a new instance of
156             L or a subclass, used to wrap the new client socket.
157              
158             $handle = $handle_constructor->( $listener );
159              
160             This can also be given as a subclass method
161              
162             $handle = $listener->handle_constructor();
163              
164             =head2 handle_class => STRING
165              
166             Optional. If defined and C isn't, then new wrapper handles
167             are constructed by invoking the C method on the given class name, passing
168             in no additional parameters.
169              
170             $handle = $handle_class->new();
171              
172             This can also be given as a subclass method
173              
174             $handle = $listener->handle_class->new;
175              
176             =head2 acceptor => STRING|CODE
177              
178             Optional. If defined, gives the name of a method or a CODE reference to use to
179             implement the actual accept behaviour. This will be invoked as:
180              
181             ( $accepted ) = await $listener->acceptor( $socket );
182              
183             ( $handle ) = await $listener->acceptor( $socket, handle => $handle );
184              
185             It is invoked with the listening socket as its its argument, and optionally
186             an L instance as a named parameter, and is expected to
187             return a C that will eventually yield the newly-accepted socket or
188             handle instance, if such was provided.
189              
190             =cut
191              
192             sub _init
193             {
194 12     12   21 my $self = shift;
195 12         57 $self->SUPER::_init( @_ );
196              
197 12         57 $self->{acceptor} = "_accept";
198             }
199              
200             my @acceptor_events = qw( on_accept on_stream on_socket );
201              
202             sub configure
203             {
204 18     18 1 961 my $self = shift;
205 18         67 my %params = @_;
206              
207 18 100       112 if( grep exists $params{$_}, @acceptor_events ) {
208 11 50       59 grep( defined $_, @params{@acceptor_events} ) <= 1 or
209             croak "Can only set at most one of 'on_accept', 'on_stream' or 'on_socket'";
210              
211             # Don't exists-test, so we'll clear the other two
212 11         58 $self->{$_} = delete $params{$_} for @acceptor_events;
213             }
214              
215 18 50       76 croak "Cannot set 'on_read_ready' on a Listener" if exists $params{on_read_ready};
216              
217 18 100       64 if( defined $params{handle} ) {
    100          
218 10         22 my $handle = delete $params{handle};
219             # Sanity check it - it may be a bare GLOB ref, not an IO::Socket-derived handle
220 10 50       109 defined getsockname( $handle ) or croak "IO handle $handle does not have a sockname";
221              
222             # So now we know it's at least some kind of socket. Is it listening?
223             # SO_ACCEPTCONN would tell us, but not all OSes implement it. Since it's
224             # only a best-effort sanity check, we won't mind if the OS doesn't.
225 10         82 my $acceptconn = getsockopt( $handle, SOL_SOCKET, SO_ACCEPTCONN );
226 10 50 33     92 !defined $acceptconn or unpack( "I", $acceptconn ) or croak "Socket is not accepting connections";
227              
228             # This is a bit naughty but hopefully nobody will mind...
229 10 50       61 bless $handle, "IO::Socket" if ref( $handle ) eq "GLOB";
230              
231 10         57 $self->SUPER::configure( read_handle => $handle );
232             }
233             elsif( exists $params{handle} ) {
234 1         4 delete $params{handle};
235              
236 1         6 $self->SUPER::configure( read_handle => undef );
237             }
238              
239 18 50       74 unless( grep $self->can_event( $_ ), @acceptor_events ) {
240 0         0 croak "Expected to be able to 'on_accept', 'on_stream' or 'on_socket'";
241             }
242              
243 18         42 foreach (qw( acceptor handle_constructor handle_class )) {
244 54 100       161 $self->{$_} = delete $params{$_} if exists $params{$_};
245             }
246              
247 18 50       65 if( keys %params ) {
248 0         0 croak "Cannot pass though configuration keys to underlying Handle - " . join( ", ", keys %params );
249             }
250             }
251              
252             sub on_read_ready
253             {
254 10     10 1 20 my $self = shift;
255              
256 10         43 my $socket = $self->read_handle;
257              
258 10         23 my $on_done;
259             my %acceptor_params;
260              
261 10 100       38 if( $on_done = $self->can_event( "on_stream" ) ) {
    100          
    50          
262             # TODO: It doesn't make sense to put a SOCK_DGRAM in an
263             # IO::Async::Stream but currently we don't detect this
264 1         25 require IO::Async::Stream;
265 1         25 $acceptor_params{handle} = IO::Async::Stream->new;
266             }
267             elsif( $on_done = $self->can_event( "on_socket" ) ) {
268 1         1006 require IO::Async::Socket;
269 1         17 $acceptor_params{handle} = IO::Async::Socket->new;
270             }
271             # on_accept needs to be last in case of multiple layers of subclassing
272             elsif( $on_done = $self->can_event( "on_accept" ) ) {
273 8         16 my $handle;
274              
275             # Test both params before moving on to either method
276 8 100       119 if( my $constructor = $self->{handle_constructor} ) {
    50          
    100          
    50          
277 2         10 $handle = $self->{handle_constructor}->( $self );
278             }
279             elsif( my $class = $self->{handle_class} ) {
280 0         0 $handle = $class->new;
281             }
282             elsif( $self->can( "handle_constructor" ) ) {
283 1         5 $handle = $self->handle_constructor;
284             }
285             elsif( $self->can( "handle_class" ) ) {
286 0         0 $handle = $self->handle_class->new;
287             }
288              
289 8 100       29 $acceptor_params{handle} = $handle if $handle;
290             }
291             else {
292 0         0 die "ARG! Missing on_accept,on_stream,on_socket!";
293             }
294              
295 10         81 my $acceptor = $self->acceptor;
296             my $f = $self->$acceptor( $socket, %acceptor_params )->on_done( sub {
297 10 50   10   653 my ( $result ) = @_ or return; # false-alarm
298 10         42 $on_done->( $self, $result );
299             })->catch( accept => sub {
300 0     0   0 my ( $message, $name, @args ) = @_;
301 0         0 my ( $socket, $dollarbang ) = @args;
302 0 0       0 $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ) or
303             $self->invoke_error( "accept() failed - $dollarbang", accept => $socket, $dollarbang );
304 10         64 });
305              
306             # TODO: Consider if this wants a more fine-grained place to report
307             # non-accept() failures (such as SSL) to
308 10         1731 $self->adopt_future( $f );
309             }
310              
311             sub _accept
312             {
313 10     10   20 my $self = shift;
314 10         24 my ( $listen_sock, %params ) = @_;
315              
316 10         57 my $accepted = $listen_sock->accept;
317              
318 10 50 0     1823 if( defined $accepted ) {
    0          
319 10         61 $accepted->blocking( 0 );
320 10 100       169 if( my $handle = $params{handle} ) {
321 5         37 $handle->set_handle( $accepted );
322 5         35 return Future->done( $handle );
323             }
324             else {
325 5         53 return Future->done( $accepted );
326             }
327             }
328             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
329 0         0 return Future->done;
330             }
331             else {
332 0         0 return Future->fail( "Cannot accept() - $!", accept => $listen_sock, $! );
333             }
334             }
335              
336             =head1 METHODS
337              
338             The following methods documented in C expressions return L
339             instances.
340              
341             =cut
342              
343             =head2 acceptor
344              
345             $acceptor = $listener->acceptor;
346              
347             Returns the currently-set C method name or code reference. This may
348             be of interest to Loop C extension methods that wish to extend or wrap
349             it.
350              
351             =cut
352              
353             sub acceptor
354             {
355 10     10 1 21 my $self = shift;
356 10         29 return $self->{acceptor};
357             }
358              
359             sub is_listening
360             {
361 3     3 0 9989 my $self = shift;
362              
363 3         12 return ( defined $self->sockname );
364             }
365              
366             =head2 sockname
367              
368             $name = $listener->sockname;
369              
370             Returns the C of the underlying listening socket
371              
372             =cut
373              
374             sub sockname
375             {
376 7     7 1 804 my $self = shift;
377              
378 7 100       27 my $handle = $self->read_handle or return undef;
379 6         33 return $handle->sockname;
380             }
381              
382             =head2 family
383              
384             $family = $listener->family;
385              
386             Returns the socket address family of the underlying listening socket
387              
388             =cut
389              
390             sub family
391             {
392 2     2 1 2824 my $self = shift;
393              
394 2 50       8 my $sockname = $self->sockname or return undef;
395 2         48 return sockaddr_family( $sockname );
396             }
397              
398             =head2 socktype
399              
400             $socktype = $listener->socktype;
401              
402             Returns the socket type of the underlying listening socket
403              
404             =cut
405              
406             sub socktype
407             {
408 2     2 1 6 my $self = shift;
409              
410 2 50       9 my $handle = $self->read_handle or return undef;
411 2         52 return $handle->sockopt(SO_TYPE);
412             }
413              
414             =head2 listen
415              
416             await $listener->listen( %params );
417              
418             This method sets up a listening socket and arranges for the acceptor callback
419             to be invoked each time a new connection is accepted on the socket.
420              
421             Most parameters given to this method are passed into the C method of
422             the L object. In addition, the following arguments are also
423             recognised directly:
424              
425             =over 8
426              
427             =item on_listen => CODE
428              
429             Optional. A callback that is invoked when the listening socket is ready.
430             Similar to that on the underlying loop method, except it is passed the
431             listener object itself.
432              
433             $on_listen->( $listener );
434              
435             =back
436              
437             =cut
438              
439             sub listen
440             {
441 1     1 1 2 my $self = shift;
442 1         5 my ( %params ) = @_;
443              
444 1         4 my $loop = $self->loop;
445 1 50       5 defined $loop or croak "Cannot listen when not a member of a Loop"; # TODO: defer?
446              
447 1 50       5 if( my $on_listen = delete $params{on_listen} ) {
448 1     1   5 $params{on_listen} = sub { $on_listen->( $self ) };
  1         5  
449             }
450              
451 1         15 $loop->listen( listener => $self, %params );
452             }
453              
454             =head1 EXAMPLES
455              
456             =head2 Listening on UNIX Sockets
457              
458             The C argument can be passed an existing socket already in listening
459             mode, making it possible to listen on other types of socket such as UNIX
460             sockets.
461              
462             use IO::Async::Listener;
463             use IO::Socket::UNIX;
464              
465             use IO::Async::Loop;
466             my $loop = IO::Async::Loop->new;
467              
468             my $listener = IO::Async::Listener->new(
469             on_stream => sub {
470             my ( undef, $stream ) = @_;
471              
472             $stream->configure(
473             on_read => sub {
474             my ( $self, $buffref, $eof ) = @_;
475             $self->write( $$buffref );
476             $$buffref = "";
477             return 0;
478             },
479             );
480              
481             $loop->add( $stream );
482             },
483             );
484              
485             $loop->add( $listener );
486              
487             my $socket = IO::Socket::UNIX->new(
488             Local => "echo.sock",
489             Listen => 1,
490             ) or die "Cannot make UNIX socket - $!\n";
491              
492             $listener->listen(
493             handle => $socket,
494             );
495              
496             $loop->run;
497              
498             =head2 Passing Plain Socket Addresses
499              
500             The C or C parameters should contain a definition of a plain
501             socket address in a form that the L C
502             method can use.
503              
504             This example shows how to listen on TCP port 8001 on address 10.0.0.1:
505              
506             $listener->listen(
507             addr => {
508             family => "inet",
509             socktype => "stream",
510             port => 8001,
511             ip => "10.0.0.1",
512             },
513             ...
514             );
515              
516             This example shows another way to listen on a UNIX socket, similar to the
517             earlier example:
518              
519             $listener->listen(
520             addr => {
521             family => "unix",
522             socktype => "stream",
523             path => "echo.sock",
524             },
525             ...
526             );
527              
528             =head2 Using A Kernel-Assigned Port Number
529              
530             Rather than picking a specific port number, is it possible to ask the kernel
531             to assign one arbitrarily that is currently free. This can be done by
532             requesting port number 0 (which is actually the default if no port number is
533             otherwise specified). To determine which port number the kernel actually
534             picked, inspect the C accessor on the actual socket filehandle.
535              
536             Either use the L returned by the C method:
537              
538             $listener->listen(
539             addr => { family => "inet" },
540             )->on_done( sub {
541             my ( $listener ) = @_;
542             my $socket = $listener->read_handle;
543              
544             say "Now listening on port ", $socket->sockport;
545             });
546              
547             Or pass an C continuation:
548              
549             $listener->listen(
550             addr => { family => "inet" },
551              
552             on_listen => sub {
553             my ( $listener ) = @_;
554             my $socket = $listener->read_handle;
555              
556             say "Now listening on port ", $socket->sockport;
557             },
558             );
559              
560             =head1 AUTHOR
561              
562             Paul Evans
563              
564             =cut
565              
566             0x55AA;