File Coverage

blib/lib/POE/Component/Server/JSONUnix.pm
Criterion Covered Total %
statement 69 206 33.5
branch 9 74 12.1
condition 4 27 14.8
subroutine 22 43 51.1
pod 4 4 100.0
total 108 354 30.5


line stmt bran cond sub pod time code
1             package POE::Component::Server::JSONUnix;
2              
3 4     4   455991 use strict;
  4         14  
  4         128  
4 4     4   17 use warnings;
  4         4  
  4         194  
5 4     4   85 use v5.10;
  4         13  
6 4     4   19 use mro;
  4         5  
  4         26  
7              
8 4     4   97 use Carp qw(carp croak);
  4         6  
  4         204  
9 4     4   1402 use Socket qw(PF_UNIX SOCK_STREAM);
  4         9982  
  4         530  
10 4     4   1048 use IO::Socket::UNIX ();
  4         45530  
  4         65  
11 4     4   1121 use JSON::MaybeXS ();
  4         31638  
  4         97  
12              
13 4         23 use POE qw(
14             Wheel::SocketFactory
15             Wheel::ReadWrite
16             Filter::Line
17             Driver::SysRW
18 4     4   1577 );
  4         42380  
19              
20             our $VERSION = '0.0.1';
21              
22             =head1 NAME
23              
24             POE::Component::Server::JSONUnix - pluggable JSON-over-Unix-socket server for POE
25              
26             =head1 SYNOPSIS
27              
28             use POE;
29             use POE::Component::Server::JSONUnix;
30              
31             my $server = POE::Component::Server::JSONUnix->spawn(
32             socket_path => '/tmp/app.sock',
33             socket_mode => 0600,
34             commands => {
35             echo => sub {
36             my ($server, $request, $ctx) = @_;
37             return { echoed => $request->{args} };
38             },
39             },
40             );
41              
42             # Add more commands at any time.
43             $server->register(
44             add => sub {
45             my ($server, $request, $ctx) = @_;
46             my $sum = 0;
47             $sum += $_ for @{ $request->{args}{numbers} // [] };
48             return { sum => $sum };
49             },
50             );
51              
52             $poe_kernel->run;
53              
54             =head1 DESCRIPTION
55              
56             This module is a small, event-driven server that listens on a Unix domain
57             socket and speaks a simple JSON request/response protocol. It is built on
58             L and is designed to be extended: the set of commands it understands is a
59             plain dispatch table you can add to at construction time, at run time, or by
60             subclassing.
61              
62             It is suitable as a local control or RPC endpoint for a daemon -- the sort of
63             thing you talk to from a command-line tool, a cron job, or another process on
64             the same host.
65              
66             =head1 PROTOCOL
67              
68             The framing is newline-delimited JSON: each message is a single JSON object on
69             its own line, terminated by C<\n>. (Pretty-printed, multi-line JSON is not
70             supported by the default filter; see L.)
71              
72             A request looks like:
73              
74             {"command":"add","args":{"numbers":[1,2,3]},"id":7}
75              
76             =over 4
77              
78             =item *
79              
80             C (required) -- the name of the command to run. C is accepted as
81             an alias.
82              
83             =item *
84              
85             C (optional) -- an arbitrary payload passed straight through to the
86             handler.
87              
88             =item *
89              
90             C (optional) -- an opaque value echoed back in the response so asynchronous
91             clients can correlate replies with requests.
92              
93             =back
94              
95             A successful response:
96              
97             {"id":7,"status":"ok","result":{"sum":6}}
98              
99             An error response:
100              
101             {"id":7,"status":"error","error":"unknown command: subtract"}
102              
103             Malformed JSON, a non-object request, a missing command, an unknown command, or
104             a handler that dies all produce an C response rather than disturbing the
105             server or other clients.
106              
107             =head1 CONSTRUCTOR
108              
109             =head2 spawn
110              
111             my $server = POE::Component::Server::JSONUnix->spawn(%args);
112              
113             Creates the server's POE session and returns the server object. Recognised
114             arguments:
115              
116             =over 4
117              
118             =item *
119              
120             C (required) -- filesystem path of the Unix domain socket to listen
121             on. If a stale socket file is present it is removed; if another process is
122             actively listening there, C dies rather than clobber it.
123              
124             =item *
125              
126             C -- hash reference of C<< name => \&handler >> pairs to register. See
127             L.
128              
129             =item *
130              
131             C -- if set (e.g. C<0600>), C the socket to these
132             permissions after binding. Unix socket permissions govern who may connect, so
133             setting this is recommended.
134              
135             =item *
136              
137             C -- POE session alias. Defaults to C. Set this if you
138             run more than one server in a single process.
139              
140             =item *
141              
142             C -- whether to remove a stale (not-in-use) socket file on
143             startup. Defaults to true.
144              
145             =item *
146              
147             C -- code reference called as
148             C<< $cb->($operation, $errnum, $errstr [, $wheel_id]) >> on listen and
149             connection I/O errors. Normal client disconnects are not reported.
150              
151             =back
152              
153             =cut
154              
155             sub spawn {
156 0     0 1 0 my ( $class, %args ) = @_;
157              
158             my $path = delete $args{socket_path}
159 0 0       0 or croak "spawn() requires a 'socket_path' argument";
160              
161             my $self = bless {
162             socket_path => $path,
163             alias => ( delete $args{alias} ) // 'json_unix_server',
164             socket_mode => delete $args{socket_mode}, # e.g. 0600
165             unlink_existing => ( delete $args{unlink_existing} ) // 1,
166             on_error => delete $args{on_error}, # coderef (optional)
167 0   0     0 commands => {},
      0        
168             clients => {},
169             json => JSON::MaybeXS->new(
170             utf8 => 1,
171             canonical => 1,
172             allow_nonref => 0,
173             ),
174             }, $class;
175              
176             # Precedence (later overrides earlier): built-ins < cmd_* methods < arg.
177 0         0 $self->_register_builtins;
178 0         0 $self->_register_cmd_methods;
179              
180 0 0       0 if ( defined( my $cmds = delete $args{commands} ) ) {
181 0 0       0 croak "'commands' must be a hash reference"
182             unless ref $cmds eq 'HASH';
183 0         0 $self->register(%$cmds);
184             }
185              
186             # Fail fast and synchronously on a busy or unusable socket path, so the
187             # caller of spawn() gets the error rather than a dead session later.
188 0         0 $self->_prepare_socket_path;
189              
190 0         0 POE::Session->create(
191             object_states => [
192             $self => {
193             _start => '_poe_start',
194             _stop => '_poe_stop',
195             shutdown => '_poe_shutdown',
196             register_command => '_poe_register_command',
197             got_connection => '_poe_got_connection',
198             listen_error => '_poe_listen_error',
199             client_input => '_poe_client_input',
200             client_error => '_poe_client_error',
201             client_flushed => '_poe_client_flushed',
202             },
203             ],
204             );
205              
206 0         0 return $self;
207             } ## end sub spawn
208              
209             #--- command registration --------------------------------------------------
210              
211             =head1 METHODS
212              
213             =head2 register
214              
215             $server->register(name => \&handler, ...);
216              
217             Add or replace commands. Returns the server object. Croaks if a handler is not a
218             code reference.
219              
220             =cut
221              
222             # register(name => \&handler, ...) — add or replace commands at any time.
223             sub register {
224 4     4 1 144159 my ( $self, %cmds ) = @_;
225 4         14 for my $name ( sort keys %cmds ) {
226 5         7 my $code = $cmds{$name};
227 5 100       182 croak "Handler for command '$name' must be a code reference"
228             unless ref $code eq 'CODE';
229 4         12 $self->{commands}{$name} = $code;
230             }
231 3         8 return $self;
232             } ## end sub register
233              
234             =head2 command_names
235              
236             my $names = $server->command_names; # array reference, sorted
237              
238             The names of all currently registered commands. (Also available to clients as
239             the built-in C command.)
240              
241             =cut
242              
243 2     2 1 11 sub command_names { return [ sort keys %{ $_[0]->{commands} } ] }
  2         15  
244              
245             sub _register_builtins {
246 0     0   0 my ($self) = @_;
247             $self->register(
248             ping => sub {
249 0     0   0 my ( $server, $req, $ctx ) = @_;
250 0         0 return { pong => 1, time => time() };
251             },
252             commands => sub {
253 0     0   0 my ( $server, $req, $ctx ) = @_;
254 0         0 return { commands => $server->command_names };
255             },
256 0         0 );
257 0         0 return;
258             } ## end sub _register_builtins
259              
260             # Discover cmd_ methods anywhere in the class hierarchy so a server can
261             # be built simply by subclassing this module and adding methods.
262             sub _register_cmd_methods {
263 1     1   543 my ($self) = @_;
264 1         2 my %names;
265 4     4   198711 no strict 'refs';
  4         8  
  4         8051  
266 1         1 for my $pkg ( @{ mro::get_linear_isa( ref $self ) } ) {
  1         6  
267 2         2 for my $sym ( keys %{"${pkg}::"} ) {
  2         20  
268 58 100       85 $names{$1} = 1 if $sym =~ /\Acmd_(.+)\z/;
269             }
270             }
271 1         2 for my $name ( keys %names ) {
272 2         3 my $method = "cmd_$name";
273 2 50       9 next unless $self->can($method);
274             $self->{commands}{$name} = sub {
275 1     1   276 my ( $server, $req, $ctx ) = @_;
276 1         5 return $server->$method( $req, $ctx );
277 2         11 };
278             }
279 1         3 return;
280             } ## end sub _register_cmd_methods
281              
282             =head2 shutdown
283              
284             $server->shutdown;
285              
286             Stop accepting connections, close all clients, remove the socket file, and let
287             the session end.
288              
289             =cut
290              
291             sub shutdown {
292 0     0 1 0 my ($self) = @_;
293 0         0 $poe_kernel->post( $self->{alias}, 'shutdown' );
294 0         0 return;
295             }
296              
297             #--- socket setup ----------------------------------------------------------
298              
299             sub _prepare_socket_path {
300 0     0   0 my ($self) = @_;
301 0         0 my $path = $self->{socket_path};
302              
303 0 0       0 return unless -e $path;
304              
305 0 0       0 croak "socket_path '$path' exists and is not a socket"
306             unless -S $path;
307              
308             # If something is actively listening there, refuse rather than clobber it.
309 0         0 my $probe = IO::Socket::UNIX->new( Type => SOCK_STREAM, Peer => $path );
310 0 0       0 if ($probe) {
311 0         0 close $probe;
312 0         0 croak "Another server is already listening on '$path'";
313             }
314              
315             croak "Stale socket '$path' present but unlink_existing is disabled"
316 0 0       0 unless $self->{unlink_existing};
317              
318 0 0       0 unlink $path
319             or croak "Could not remove stale socket '$path': $!";
320 0         0 return;
321             } ## end sub _prepare_socket_path
322              
323             #--- POE: session lifecycle ------------------------------------------------
324              
325             sub _poe_start {
326 0     0   0 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
327              
328 0         0 $kernel->alias_set( $self->{alias} );
329              
330             $self->{listener} = POE::Wheel::SocketFactory->new(
331             SocketDomain => PF_UNIX,
332             SocketType => SOCK_STREAM,
333             BindAddress => $self->{socket_path},
334 0         0 SuccessEvent => 'got_connection',
335             FailureEvent => 'listen_error',
336             );
337              
338 0 0       0 if ( defined $self->{socket_mode} ) {
339             chmod $self->{socket_mode}, $self->{socket_path}
340 0 0       0 or carp "chmod on '$self->{socket_path}' failed: $!";
341             }
342 0         0 return;
343             } ## end sub _poe_start
344              
345             sub _poe_stop {
346 0     0   0 my ($self) = $_[OBJECT];
347 0         0 $self->_remove_socket_file;
348 0         0 return;
349             }
350              
351             sub _poe_shutdown {
352 0     0   0 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
353 0         0 delete $self->{listener}; # stop accepting new connections
354 0         0 %{ $self->{clients} } = (); # drop wheels -> close client sockets
  0         0  
355 0         0 $kernel->alias_remove( $self->{alias} );
356 0         0 $self->_remove_socket_file;
357 0         0 return;
358             }
359              
360             sub _poe_register_command {
361 0     0   0 my ( $self, $name, $code ) = @_[ OBJECT, ARG0, ARG1 ];
362 0         0 eval { $self->register( $name => $code ) };
  0         0  
363 0 0       0 carp "register_command failed: $@" if $@;
364 0         0 return;
365             }
366              
367             sub _remove_socket_file {
368 0     0   0 my ($self) = @_;
369 0         0 my $path = $self->{socket_path};
370 0 0 0     0 unlink $path if defined $path && -S $path;
371 0         0 return;
372             }
373              
374             #--- POE: listener events --------------------------------------------------
375              
376             sub _poe_listen_error {
377 0     0   0 my ( $self, $op, $errnum, $errstr ) = @_[ OBJECT, ARG0, ARG1, ARG2 ];
378 0         0 carp "listen error during $op: $errstr ($errnum)";
379 0 0       0 $self->{on_error}->( "listen:$op", $errnum, $errstr ) if $self->{on_error};
380 0         0 delete $self->{listener};
381 0         0 return;
382             }
383              
384             sub _poe_got_connection {
385 0     0   0 my ( $self, $socket ) = @_[ OBJECT, ARG0 ];
386              
387 0         0 my $wheel = POE::Wheel::ReadWrite->new(
388             Handle => $socket,
389             Driver => POE::Driver::SysRW->new,
390             Filter => POE::Filter::Line->new( Literal => "\n" ),
391             InputEvent => 'client_input',
392             ErrorEvent => 'client_error',
393             FlushedEvent => 'client_flushed',
394             );
395              
396 0         0 $self->{clients}{ $wheel->ID } = {
397             wheel => $wheel,
398             close_after_flush => 0,
399             };
400 0         0 return;
401             } ## end sub _poe_got_connection
402              
403             #--- POE: per-client events ------------------------------------------------
404              
405             sub _poe_client_input {
406 0     0   0 my ( $self, $line, $id ) = @_[ OBJECT, ARG0, ARG1 ];
407              
408 0 0       0 return unless $self->{clients}{$id};
409 0 0 0     0 return unless defined $line && $line =~ /\S/; # ignore blank keepalives
410              
411 0         0 my $request;
412 0 0       0 unless ( eval { $request = $self->{json}->decode($line); 1 } ) {
  0         0  
  0         0  
413 0         0 $self->_send(
414             $id,
415             {
416             status => 'error',
417             error => 'invalid JSON: ' . _clean_err($@),
418             }
419             );
420 0         0 return;
421             } ## end unless ( eval { $request = $self->{json}->decode...})
422              
423 0 0       0 unless ( ref $request eq 'HASH' ) {
424 0         0 $self->_send(
425             $id,
426             {
427             status => 'error',
428             error => 'request must be a JSON object',
429             }
430             );
431 0         0 return;
432             } ## end unless ( ref $request eq 'HASH' )
433              
434 0         0 my $req_id = $request->{id};
435 0   0     0 my $cmd_name = $request->{command} // $request->{cmd};
436              
437 0         0 my $ctx = POE::Component::Server::JSONUnix::Context->_new(
438             server => $self,
439             wheel_id => $id,
440             req_id => $req_id,
441             command => $cmd_name,
442             request => $request,
443             );
444              
445 0 0       0 unless ( defined $cmd_name ) {
446 0         0 $ctx->error("missing 'command' field");
447 0         0 return;
448             }
449              
450 0         0 my $handler = $self->{commands}{$cmd_name};
451 0 0       0 unless ($handler) {
452 0         0 $ctx->error("unknown command: $cmd_name");
453 0         0 return;
454             }
455              
456 0         0 my @ret = eval { $handler->( $self, $request, $ctx ) };
  0         0  
457 0 0       0 if ( my $err = $@ ) {
458 0 0       0 unless ( $ctx->responded ) {
459 0 0       0 if ( ref $err eq 'HASH' ) {
460 0         0 $ctx->respond( { status => 'error', %$err } );
461             } else {
462 0         0 $ctx->error( _clean_err($err) );
463             }
464             }
465 0         0 return;
466             } ## end if ( my $err = $@ )
467              
468 0 0       0 return if $ctx->responded; # handler already answered (any path)
469 0 0       0 return unless defined $ret[0]; # undef return => async; answers later
470 0         0 $ctx->respond_result( $ret[0] ); # sync return => wrap as {ok, result}
471 0         0 return;
472             } ## end sub _poe_client_input
473              
474             sub _poe_client_error {
475 0     0   0 my ( $self, $op, $errnum, $errstr, $id ) = @_[ OBJECT, ARG0, ARG1, ARG2, ARG3 ];
476              
477             # operation 'read' with errnum 0 is a normal EOF (client hung up).
478 0 0 0     0 if ( $self->{on_error} && !( $op eq 'read' && $errnum == 0 ) ) {
      0        
479 0         0 eval { $self->{on_error}->( $op, $errnum, $errstr, $id ) };
  0         0  
480             }
481 0         0 $self->_close_client($id);
482 0         0 return;
483             } ## end sub _poe_client_error
484              
485             sub _poe_client_flushed {
486 0     0   0 my ( $self, $id ) = @_[ OBJECT, ARG0 ];
487 0 0       0 my $client = $self->{clients}{$id} or return;
488 0 0       0 $self->_close_client($id) if $client->{close_after_flush};
489 0         0 return;
490             }
491              
492             #--- sending / closing -----------------------------------------------------
493              
494             sub _send {
495 0     0   0 my ( $self, $id, $data ) = @_;
496 0 0       0 my $client = $self->{clients}{$id} or return;
497 0 0       0 my $wheel = $client->{wheel} or return;
498              
499 0         0 my $json = eval { $self->{json}->encode($data) };
  0         0  
500 0 0       0 unless ( defined $json ) {
501             $json = $self->{json}->encode(
502             {
503 0         0 status => 'error',
504             error => 'internal error: response could not be serialised',
505             }
506             );
507             }
508 0         0 $wheel->put($json); # Filter::Line appends the trailing newline
509 0         0 return;
510             } ## end sub _send
511              
512             sub _close_client {
513 0     0   0 my ( $self, $id ) = @_;
514 0         0 delete $self->{clients}{$id}; # destroying the wheel closes the socket
515 0         0 return;
516             }
517              
518             sub _close_client_after_flush {
519 0     0   0 my ( $self, $id ) = @_;
520 0 0       0 my $client = $self->{clients}{$id} or return;
521 0         0 my $wheel = $client->{wheel};
522 0 0 0     0 if ( $wheel && $wheel->get_driver_out_octets ) {
523 0         0 $client->{close_after_flush} = 1;
524             } else {
525 0         0 $self->_close_client($id);
526             }
527 0         0 return;
528             } ## end sub _close_client_after_flush
529              
530             sub _clean_err {
531 0     0   0 my ($msg) = @_;
532 0         0 $msg = "$msg";
533 0         0 $msg =~ s/\s+at \S+ line \d+\.?.*//s; # strip "at file line N ..."
534 0         0 $msg =~ s/\s+\z//;
535 0         0 return $msg;
536             }
537              
538             #===========================================================================
539             package POE::Component::Server::JSONUnix::Context;
540             #===========================================================================
541             # Handed to every command handler. A handler can answer synchronously by
542             # returning a value, or asynchronously by stashing $ctx and calling one of
543             # these methods later (e.g. after a timer fires or a backend request returns).
544              
545             our $VERSION = '0.01';
546              
547             sub _new {
548 5     5   155445 my ( $class, %a ) = @_;
549             return bless {
550             server => $a{server},
551             wheel_id => $a{wheel_id},
552             req_id => $a{req_id},
553             command => $a{command},
554             request => $a{request},
555 5         26 responded => 0,
556             }, $class;
557             } ## end sub _new
558              
559 1     1   5 sub request { $_[0]{request} }
560 1     1   8 sub command { $_[0]{command} }
561 1     1   4 sub id { $_[0]{req_id} }
562 1     1   3030 sub responded { $_[0]{responded} }
563              
564             # Send a full response envelope. status defaults to 'ok'; the request id, if
565             # present, is echoed back automatically. Only the first call has any effect.
566             sub respond {
567 5     5   16 my ( $self, $envelope ) = @_;
568 5 100       38 return if $self->{responded};
569 4         8 $self->{responded} = 1;
570              
571 4         10 my %out = %$envelope;
572 4   100     14 $out{status} //= 'ok';
573             $out{id} = $self->{req_id}
574 4 100 66     25 if defined $self->{req_id} && !exists $out{id};
575              
576 4         12 $self->{server}->_send( $self->{wheel_id}, \%out );
577 4         31 return;
578             } ## end sub respond
579              
580             # Convenience: respond with {status:ok, result:}.
581             sub respond_result {
582 1     1   5 my ( $self, $result ) = @_;
583 1         5 return $self->respond( { status => 'ok', result => $result } );
584             }
585              
586             # Convenience: respond with {status:error, error:, ...extra}.
587             sub error {
588 1     1   5 my ( $self, $message, %extra ) = @_;
589 1         5 return $self->respond( { status => 'error', error => $message, %extra } );
590             }
591              
592             # Close this client's connection once any queued output has been flushed.
593             sub close {
594 0     0     my ($self) = @_;
595 0           $self->{server}->_close_client_after_flush( $self->{wheel_id} );
596 0           return;
597             }
598              
599             1;
600              
601             __END__