File Coverage

blib/lib/Reflexive/Role/TCPServer.pm
Criterion Covered Total %
statement 47 65 72.3
branch 1 2 50.0
condition n/a
subroutine 18 28 64.2
pod 2 2 100.0
total 68 97 70.1


line stmt bran cond sub pod time code
1             package Reflexive::Role::TCPServer;
2             {
3             $Reflexive::Role::TCPServer::VERSION = '1.140030';
4             }
5              
6             #ABSTRACT: Provides a consumable Reflex-based multiplexing TCP server behavior
7              
8 1     1   140858 use Reflex::Role;
  1         754042  
  1         6  
9 1     1   9819 use Moose::Util::TypeConstraints;
  1         4  
  1         12  
10 1     1   4404 use MooseX::Params::Validate;
  1         21565  
  1         8  
11 1     1   1416 use MooseX::Types::Moose(':all');
  1         71781  
  1         11  
12 1     1   12494 use MooseX::Types::Structured(':all');
  1         513650  
  1         11  
13 1     1   790 use IO::Socket::INET;
  1         2  
  1         14  
14 1     1   1956 use POE::Filter::Stream;
  1         77969  
  1         33  
15 1     1   1018 use Reflexive::Stream::Filtering;
  1         2119724  
  1         50  
16 1     1   9 use Reflex::Callbacks('cb_method');
  1         2  
  1         10  
17 1     1   485 use Try::Tiny;
  1         2  
  1         1406  
18              
19              
20              
21             attribute_parameter 'reflex_stream_class' => 'Reflexive::Stream::Filtering';
22              
23              
24             attribute_parameter 'input_filter_class' => 'POE::Filter::Stream';
25              
26              
27             parameter input_filter_args =>
28             (
29             isa => HashRef,
30             default => sub { {} },
31             );
32              
33              
34             attribute_parameter 'output_filter_class' => 'POE::Filter::Stream';
35              
36              
37             parameter output_filter_args =>
38             (
39             isa => HashRef,
40             default => sub { {} },
41             );
42              
43             role
44             {
45             my $p = shift;
46             my $reflex_stream_class = $p->reflex_stream_class;
47             my $input_filter_class = $p->input_filter_class;
48             my $output_filter_class = $p->output_filter_class;
49             my %input_filter_args = %{$p->input_filter_args};
50             my %output_filter_args = %{$p->output_filter_args};
51              
52              
53             requires qw/on_socket_data/;
54              
55              
56             has port =>
57             (
58             is => 'ro',
59             isa => Int,
60             default => 5000,
61             writer => '_set_port',
62             );
63              
64              
65             has host =>
66             (
67             is => 'ro',
68             isa => Str,
69             default => '0.0.0.0',
70             writer => '_set_host',
71             );
72              
73            
74             has listener =>
75             (
76             is => 'ro',
77             isa => FileHandle,
78             lazy => 1,
79             clearer => 'clear_listener',
80             predicate => 'has_listener',
81             builder => '_build_listener',
82             );
83              
84              
85             has listener_active =>
86             (
87             is => 'rw',
88             isa => Bool,
89             default => 1,
90             );
91              
92              
93             has sockets =>
94             (
95             is => 'ro',
96             isa => HashRef,
97             traits => ['Hash'],
98             default => sub { {} },
99             clearer => '_clear_sockets',
100             handles =>
101             {
102             '_set_socket' => 'set',
103             '_delete_socket' => 'delete',
104             '_count_sockets' => 'count',
105             '_all_sockets' => 'values',
106             }
107             );
108              
109             # unfortunate Moose bug that attribute delegates are not instantiated
110             # in a role until composition time. The workaround is to define stubs
111             # in the role and when the attribute fully instantiated, the delegates
112             # are installed and take over
113              
114 0     0   0 sub _set_socket {}
115 0     0   0 sub _delete_socket {}
116 0     0   0 sub _count_sockets {}
117 0     0   0 sub _clear_sockets {}
118 0     0   0 sub _all_sockets {}
119 0     0 1 0 sub listener {}
120 0     0 1 0 sub listener_active {}
121              
122              
123             method _build_listener => sub
124             {
125 1     1   2 my $self = shift;
126 1         32 my $listener = IO::Socket::INET->new
127             (
128             Listen => 5,
129             LocalAddr => $self->host,
130             LocalPort => $self->port,
131             Proto => 'tcp',
132             );
133              
134 1 50       276 unless($listener)
135             {
136 0         0 Carp::confess "Unable to bind to ${\$self->host}:${\$self->port}";
  0         0  
  0         0  
137             }
138              
139 1         46 return $listener;
140             };
141              
142              
143             method _build_socket => sub
144             {
145 1     1   3850 my ($self, $handle) = pos_validated_list
146             (
147             \@_,
148             { does => 'Reflexive::Role::TCPServer' },
149             { isa => FileHandle },
150             );
151              
152 1         2171 return $reflex_stream_class->new
153             (
154             handle => $handle,
155             input_filter => $input_filter_class->new(%input_filter_args),
156             output_filter => $output_filter_class->new(%output_filter_args),
157             );
158              
159             };
160              
161              
162             method try_listener_build => sub
163             {
164 1     1   229 my $self = shift;
165              
166             try
167             {
168 1     1   79 $self->listener();
169             }
170             catch
171             {
172 0     0   0 $self->on_listener_error
173             (
174             Reflex::Event::Error->new(
175             _emitters => [$self],
176             string => "$!",
177             number => ($! + 0),
178             function => 'bind',
179             )
180             );
181             }
182 1         16 };
183              
184              
185 1     1   28 method BUILD => sub { };
186              
187             # slight timing bug with regard to Reflex::Role::Readable
188             # we need to make sure the listening socket is created before
189             # it is fed to the underlying POE mechanism hence why before
190             # is used instead of after
191              
192             before BUILD => sub
193             {
194             my $self = shift;
195             # start listening
196             $self->try_listener_build();
197             };
198              
199             after BUILD => sub
200             {
201             my $self = shift;
202             $self->watch
203             (
204             $self,
205             'socket_stop' => cb_method($self, 'on_socket_stop'),
206             'socket_error' => cb_method($self, 'on_socket_error'),
207             'socket_data' => cb_method($self, 'on_socket_data'),
208             );
209             };
210              
211              
212              
213             method on_listener_accept => sub
214             {
215 1     1   8166 my ($self, $args) = pos_validated_list
216             (
217             \@_,
218             { does => 'Reflexive::Role::TCPServer' },
219             { isa => 'Reflex::Event::Socket' },
220             );
221              
222 1         1139 $self->store_socket($self->_build_socket($args->handle()));
223              
224             };
225              
226              
227             method on_listener_error => sub
228             {
229 0     0   0 my ($self, $args) = pos_validated_list
230             (
231             \@_,
232             { does => 'Reflexive::Role::TCPServer' },
233             { isa => 'Reflex::Event::Error' },
234             );
235              
236 0         0 die "Failed to ${\$args->function}. " .
  0         0  
237 0         0 "Error Code: ${\$args->number} " .
238 0         0 "Error Message: ${\$args->string}";
239             };
240              
241              
242             method on_socket_stop => sub
243             {
244 1     1   697 my ($self, $args) = pos_validated_list
245             (
246             \@_,
247             { does => 'Reflexive::Role::TCPServer' },
248             { isa => 'Reflex::Event' },
249             );
250              
251 1         1069 $self->remove_socket($args->get_first_emitter());
252             };
253              
254              
255             method on_socket_error => sub
256             {
257 0     0   0 my ($self, $args) = pos_validated_list
258             (
259             \@_,
260             { does => 'Reflexive::Role::TCPServer' },
261             { isa => 'Reflex::Event::Error' },
262             );
263              
264 0         0 $self->remove_socket($args->get_first_emitter());
265             };
266              
267              
268             method shutdown => sub
269             {
270 1     1   17062 my $self = shift;
271 1         9 $self->stop_listening();
272 1         184 $_->stopped() for $self->_all_sockets();
273             };
274              
275             with 'Reflex::Role::Accepting' =>
276             {
277             att_active => 'listener_active',
278             att_listener => 'listener',
279             method_pause => 'pause_listening',
280             method_resume => 'resume_listening',
281             method_stop => 'stop_listening',
282             };
283              
284             with 'Reflexive::Role::Collective' =>
285             {
286             stored_constraint => role_type('Reflex::Role::Collectible'),
287             watched_events =>
288             [
289             [ stopped => ['emit_socket_stop', 'socket_stop' ] ],
290             [ error => ['emit_socket_error', 'socket_error'] ],
291             [ data => ['emit_socket_data', 'socket_data' ] ],
292             ],
293             method_remember => 'store_socket',
294             method_forget => 'remove_socket',
295             method_clear_objects => '_clear_sockets',
296             method_count_objects => '_count_sockets',
297             method_add_object => '_set_socket',
298             method_del_object => '_delete_socket',
299             };
300              
301             };
302              
303             1;
304              
305              
306             =pod
307              
308             =head1 NAME
309              
310             Reflexive::Role::TCPServer - Provides a consumable Reflex-based multiplexing TCP server behavior
311              
312             =head1 VERSION
313              
314             version 1.140030
315              
316             =head1 SYNOPSIS
317              
318             {
319             package MyTCPServer;
320             use Moose;
321             use MooseX::Types::Moose(':all');
322             use MooseX::Types::Structured(':all');
323             use MooseX::Params::Validate;
324              
325             extends 'Reflex::Base';
326              
327             sub on_socket_data
328             {
329             my ($self, $args) = pos_validated_list
330             (
331             \@_,
332             { isa => 'MyTCPServer' },
333             { isa => 'Reflexive::Event::Data' },
334             );
335             my $data = $args->data;
336             my $socket = $args->get_first_emitter();
337             warn "Received data ($data) from socket ($socket)";
338             chomp($data);
339             # look at Reflex::Role::Streaming for what methods are available
340             $socket->put(reverse($data)."\n");
341             }
342              
343             with 'Reflexive::Role::TCPServer';
344             }
345              
346             my $server = MyTCPServer->new();
347             $server->run_all();
348              
349             =head1 DESCRIPTION
350              
351             Reflexive::Role::TCPServer provides a multiplexing TCP server behavior for
352             consuming classes. It does this by being an amalgamation of other Reflex and
353             Reflexive roles such as L<Reflex::Role::Accepting> and
354             L<Reflexive::Role::Collective>. The only required method to be implemented by
355             the consumer is L</on_socket_data> which is called when sockets receive data.
356              
357             See the eg directory in the shipped distribution for an example that is more
358             detailed than the synopsis.
359              
360             =head1 ROLE_PARAMETERS
361              
362             =head2 reflex_stream_class
363              
364             This is the name of the class to use when constructing a stream. It should
365             conform to (or better, subclass) L<Reflexive::Stream::Filtering>. By default,
366             L<Reflexive::Stream::Filtering> objects are instantiated
367              
368             =head2 input_filter_class
369              
370             This is the name of the class to use when constructing an input filter for each
371             socket that is accepted. It defaults to L<POE::Filter::Stream>.
372              
373             Please see L<Reflexive::Stream::Filtering> for more information on how
374             filtering occurs on data.
375              
376             =head2 input_filter_args
377              
378             If the input filter class takes any arguments during construction, put them
379             here as a HashRef
380              
381             =head2 output_filter_class
382              
383             This is the name of the class to use when constructing an output filter for each
384             socket that is accepted. It defaults to L<POE::Filter::Stream>.
385              
386             Please see L<Reflexive::Stream::Filtering> for more information on how
387             filtering occurs on data.
388              
389             =head2 output_filter_args
390              
391             If the output filter class takes any arguments during construction, put them
392             here as a HashRef
393              
394             =head1 ROLE_REQUIRES
395              
396             =head2 on_socket_data
397              
398             (Reflexive::Event::Data)
399              
400             This role requires the method on_socket_data to be implemented in the consuming
401             class.
402              
403             The only argument to this method will be a L<Reflexive::Event::Data> object.
404             The socket that generated the event will be available via
405             L<Reflex::Event/get_first_emitter>. The filtered data will be available via
406             L<Reflexive::Event::Data/data>
407              
408             =head1 PUBLIC_ATTRIBUTES
409              
410             =head2 port
411              
412             is: ro, isa: Int, default: 5000, writer: _set_port
413              
414             port holds the particular TCP port number to use when listening for
415             connections. It defaults to 5000 for no real particular reason, other than
416             to make it easier to use this role in the PSGI space.
417              
418             =head2 host
419              
420             is: ro, isa: Str, default: '0.0.0.0', writer: _set_host
421              
422             host holds the address to use when setting up the listening socket. It defaults
423             to 0.0.0.0 (which means all available interfaces/addresses).
424              
425             =head1 PROTECTED_ATTRIBUTES
426              
427             =head2 listener
428              
429             is: ro, isa: FileHandle, lazy: 1
430             clearer: clear_listener
431             predicate: has_listener
432             builder: _build_listener
433              
434             listener holds the listening socket from which to accept connections. Ideally,
435             this attribute shouldn't be touched in consuming classes
436              
437             =head2 listener_active
438              
439             is: ro, isa: Bool, default: 1
440              
441             listener_active determines the default state of the listener socket upon
442             creation of the object. It defaults to true which means that when the object is
443             built and handed off to POE, it will immediately select() over it. If this
444             behavior is not desired, simply set this to false at construction.
445              
446             =head2 sockets
447              
448             is: ro, isa: HashRef, traits: Hash
449             clearer: _clear_sockets
450             handles:
451             '_set_socket' => 'set',
452             '_delete_socket' => 'delete',
453             '_count_sockets' => 'count',
454             '_all_sockets' => 'values',
455              
456             sockets stores the complete, accepted connections from clients.
457              
458             sockets is really only for low-level access and the facilities from the
459             consumed L<Reflexive::Role::Collective> should be used to store/remove clients.
460              
461             =head1 PUBLIC_METHODS
462              
463             =head2 try_listener_build
464              
465             try_listener_build is the method called when the object is first instantiated
466             to attempt to bind a listening socket. It wraps construction of the
467             L</listener> attribute inside a try/catch block. If it fails the
468             L</on_listener_error> callback is fired to allow for retrying the binding.
469              
470             =head2 shutdown
471              
472             shutdown will stop the listening socket forcibly stop all active sockets.
473              
474             This will allow the event loop to terminate.
475              
476             =head1 PROTECTED_METHODS
477              
478             =head2 _build_listener
479              
480             _build_listener takes the L</host> and L</port> attributes and builds a
481             listening socket using L<IO::Socket::INET>. If it is unable to bind to the
482             host/port combination, it will confess.
483              
484             =head2 _build_socket
485              
486             (FileHandle)
487              
488             _build_socket is called when the listener_accept event fires. The raw socket,
489             and the filters constructed from the L</input_filter_class> and
490             L</output_filter_class> parameters are passed to the constructor for
491             L<Reflexive::Stream::Filtering> and returned.
492              
493             =head2 BUILD
494              
495             BUILD is advised in a couple of different ways to ensure proper operation:
496              
497             1) before BUILD is used to attempt to build the listener socket prior to
498             L<Reflex::Role::Readable> attempts to use the socket. This allows for the
499             capture of exceptions on binding if they occur.
500              
501             2) after BUILD is used to watch the events that this role emits.
502              
503             =head2 on_listener_accept
504              
505             (Reflex::Event::Socket)
506              
507             on_listener_accept is the callback method called when a socket connection has
508             been accepted. It calls L</_build_socket> and stores the result using
509             L<Reflexive::Role::Collective/remember> which is named "store_socket" in this
510             role.
511              
512             =head2 on_listener_error
513              
514             (Reflex::Event::Error)
515              
516             on_listener_error is the callback called when there is an error on the
517             listening socket.
518              
519             =head2 on_socket_stop
520              
521             (Reflex::Event)
522              
523             on_socket_stop is the callback method fired when sockets close. It calls
524             L<Reflexive::Role::Collective/forget>, which is named "remove_socket" in this
525             role, to no longer store the socket. The socket that sent the event will be
526             the first emitter.
527              
528             =head2 on_socket_error
529              
530             (Reflex::Event::Error)
531              
532             on_socket_error is the callback fired when a socket encounters an error. The
533             socket that sent the event will be the first emitter. This method merely
534             unstores the socket.
535              
536             =head1 AUTHOR
537              
538             Nicholas R. Perez <nperez@cpan.org>
539              
540             =head1 COPYRIGHT AND LICENSE
541              
542             This software is copyright (c) 2013 by Nicholas R. Perez <nperez@cpan.org>.
543              
544             This is free software; you can redistribute it and/or modify it under
545             the same terms as the Perl 5 programming language system itself.
546              
547             =cut
548              
549              
550             __END__