File Coverage

blib/lib/Test/POE/Server/TCP.pm
Criterion Covered Total %
statement 206 263 78.3
branch 63 116 54.3
condition 6 16 37.5
subroutine 37 46 80.4
pod 18 18 100.0
total 330 459 71.9


line stmt bran cond sub pod time code
1             package Test::POE::Server::TCP;
2             $Test::POE::Server::TCP::VERSION = '1.20';
3             # ABSTRACT: A POE Component providing TCP server services for test cases
4              
5 12     12   1208418 use strict;
  12         22  
  12         319  
6 12     12   43 use warnings;
  12         12  
  12         389  
7 12     12   500 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         30195  
  12         55  
8 12     12   84099 use Socket;
  12         18  
  12         5604  
9 12     12   55 use Carp qw(carp croak);
  12         14  
  12         1378  
10              
11             our $GOT_SOCKET6;
12              
13             BEGIN {
14 12     12   20 eval {
15 12         461 Socket->import(qw(AF_INET6 IN6ADDR_ANY NI_NUMERICHOST NI_NUMERICSERV getnameinfo));
16 12         21 $GOT_SOCKET6 = 1;
17             };
18 12 50       31914 if (!$GOT_SOCKET6) {
19             # provide a dummy subs so code compiles
20 0         0 *AF_INET6 = sub { ~0 };
  0         0  
21 0         0 *IN6ADDR_ANY = sub { ~0 };
  0         0  
22             }
23             }
24              
25             sub spawn {
26 12     12 1 6717 my $package = shift;
27 12         44 my %opts = @_;
28 12         83 $opts{lc $_} = delete $opts{$_} for keys %opts;
29 12         27 my $options = delete $opts{options};
30 12         24 my $self = bless \%opts, $package;
31 12         56 $self->{_prefix} = delete $self->{prefix};
32 12 100       53 $self->{_prefix} = 'testd_' unless defined $self->{_prefix};
33 12 100       90 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
34 12 100       283 $self->{session_id} = POE::Session->create(
35             object_states => [
36             $self => { shutdown => '_shutdown',
37             send_event => '__send_event',
38             send_to_client => '_send_to_client',
39             send_to_all_clients => '_send_to_all_clients',
40             disconnect => '_disconnect',
41             terminate => '_terminate',
42             start_listener => '_start_listener',
43             },
44             $self => [ qw(_start register unregister _accept_client _conn_input _conn_error _conn_flushed _conn_alarm
45             _send_to_client __send_event _disconnect _send_to_all_clients _accept_failed4 _accept_failed6) ],
46             ],
47             heap => $self,
48             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
49             )->ID();
50 12         1107 return $self;
51             }
52              
53             sub session_id {
54 15     15 1 373 return $_[0]->{session_id};
55             }
56              
57             sub pause_listening {
58 1 50   1 1 35 $_[0]->{listener}->pause_accept() if $_[0]->{listener};
59 1 50       43 $_[0]->{listener6}->pause_accept() if $_[0]->{listener6};
60             }
61              
62             sub resume_listening {
63 1 50   1 1 1001718 $_[0]->{listener}->resume_accept() if $_[0]->{listener};
64 1 50       48 $_[0]->{listener6}->resume_accept() if $_[0]->{listener6};
65             }
66              
67             sub getsockname {
68 11 50   11 1 54 return unless $_[0]->{listener};
69 11         39 return $_[0]->{listener}->getsockname();
70             }
71              
72             sub port {
73 11     11 1 6406 my $self = shift;
74 11         48 return ( sockaddr_in( $self->getsockname() ) )[0];
75             }
76              
77             sub getsockname6 {
78 0 0   0 1 0 return unless $_[0]->{listener6};
79 0         0 return $_[0]->{listener6}->getsockname();
80             }
81              
82             sub port6 {
83 0     0 1 0 my $self = shift;
84 0         0 return ( sockaddr_in6( $self->getsockname6() ) )[0];
85             }
86              
87             sub _conn_exists {
88 86     86   86 my ($self,$wheel_id) = @_;
89 86 50 33     402 return 0 unless $wheel_id and defined $self->{clients}->{ $wheel_id };
90 86         211 return 1;
91             }
92              
93             sub shutdown {
94 10     10 1 10044 my $self = shift;
95 10         61 $poe_kernel->call( $self->{session_id}, 'shutdown' );
96             }
97              
98             sub _start {
99 12     12   3153 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
100 12         38 $self->{session_id} = $_[SESSION]->ID();
101 12 50       59 if ( $self->{alias} ) {
102 0         0 $kernel->alias_set( $self->{alias} );
103             }
104             else {
105 12         68 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
106             }
107 12 100       368 if ( $kernel != $sender ) {
108 11         37 my $sender_id = $sender->ID;
109 11         65 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
110 11         27 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
111 11         21 $self->{sessions}->{$sender_id}->{'refcnt'}++;
112 11         34 $kernel->refcount_increment($sender_id, __PACKAGE__);
113 11         211 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
114 11         1035 $kernel->detach_myself();
115             }
116              
117 12         1014 $kernel->call( $self->{session_id}, 'start_listener' );
118 12         74 return;
119             }
120              
121             sub start_listener {
122 0     0 1 0 my $self = shift;
123 0         0 $poe_kernel->call( $self->{session_id}, 'start_listener', @_ );
124             }
125              
126             sub _start_listener {
127 12     12   417 my ($kernel,$self) = @_[KERNEL,OBJECT];
128 12 50       44 return if $self->{listener};
129              
130             $self->{listener} = POE::Wheel::SocketFactory->new(
131             ( defined $self->{address} ? ( BindAddress => $self->{address} ) : () ),
132 12 100       159 ( defined $self->{port} ? ( BindPort => $self->{port} ) : ( BindPort => 0 ) ),
    100          
133             SuccessEvent => '_accept_client',
134             FailureEvent => '_accept_failed4',
135             SocketDomain => AF_INET, # Sets the socket() domain
136             SocketType => SOCK_STREAM, # Sets the socket() type
137             SocketProtocol => 'tcp', # Sets the socket() protocol
138             Reuse => 'on', # Lets the port be reused
139             );
140              
141 12 50       4603 return unless $GOT_SOCKET6;
142              
143             $self->{listener6} = POE::Wheel::SocketFactory->new(
144             #BindAddress => IN6ADDR_ANY,
145 12 100       95 ( defined $self->{port} ? ( BindPort => $self->{port} ) : ( BindPort => 0 ) ),
146             SuccessEvent => '_accept_client',
147             FailureEvent => '_accept_failed6',
148             SocketDomain => AF_INET6, # Sets the socket() domain
149             SocketType => SOCK_STREAM, # Sets the socket() type
150             SocketProtocol => 'tcp', # Sets the socket() protocol
151             Reuse => 'on', # Lets the port be reused
152             );
153              
154 12         2918 return;
155             }
156              
157             sub _accept_client {
158 12     12   8264 my ($kernel,$self,$socket,$listener_id) = @_[KERNEL,OBJECT,ARG0,ARG3];
159              
160 12         233 my (undef,$peeraddr,$peerport) = getnameinfo( CORE::getpeername( $socket ), NI_NUMERICHOST | NI_NUMERICSERV );
161 12         233 my (undef,$sockaddr,$sockport) = getnameinfo( CORE::getsockname( $socket ), NI_NUMERICHOST | NI_NUMERICSERV );
162              
163 12         59 s!^::ffff:!! for ( $sockaddr, $peeraddr );
164              
165             my $wheel = POE::Wheel::ReadWrite->new(
166             Handle => $socket,
167             _get_filters(
168             $self->{filter},
169             $self->{inputfilter},
170             $self->{outputfilter}
171 12         74 ),
172             InputEvent => '_conn_input',
173             ErrorEvent => '_conn_error',
174             FlushedEvent => '_conn_flushed',
175             );
176              
177 12 50       3234 return unless $wheel;
178              
179 12         67 my $id = $wheel->ID();
180 12         117 $self->{clients}->{ $id } =
181             {
182             wheel => $wheel,
183             peeraddr => $peeraddr,
184             peerport => $peerport,
185             sockaddr => $sockaddr,
186             sockport => $sockport,
187             };
188 12         72 $self->_send_event( $self->{_prefix} . 'connected', $id, $peeraddr, $peerport, $sockaddr, $sockport );
189              
190             #$self->{clients}->{ $id }->{alarm} = $kernel->delay_set( '_conn_alarm', $self->{time_out} || 300, $id );
191 12         31 return;
192             }
193              
194             sub client_info {
195 2     2 1 959 my $self = shift;
196 2   50     5 my $id = shift || return;
197 2 50       13 return unless $self->_conn_exists( $id );
198 2         3 my %hash = %{ $self->{clients}->{ $id } };
  2         7  
199 2         3 delete $hash{wheel};
200 2 100       5 return map { $hash{$_} } qw(peeraddr peerport sockaddr sockport) if wantarray;
  4         8  
201 1         2 return \%hash;
202             }
203              
204             sub client_wheel {
205 1     1 1 4 my $self = shift;
206 1   50     2 my $id = shift || return;
207 1 50       2 return unless $self->_conn_exists( $id );
208 1         5 return $self->{clients}->{ $id }->{wheel};
209             }
210              
211             sub _get_filters {
212 12     12   33 my ($client_filter, $client_infilter, $client_outfilter) = @_;
213 12 100 66     100 if (defined $client_infilter or defined $client_outfilter) {
    50          
214             return (
215 1         2 "InputFilter" => _load_filter($client_infilter),
216             "OutputFilter" => _load_filter($client_outfilter)
217             );
218 0 0       0 if (defined $client_filter) {
219 0         0 carp(
220             "Filter ignored with InputFilter or OutputFilter"
221             );
222             }
223             }
224             elsif (defined $client_filter) {
225 0         0 return ( "Filter" => _load_filter($client_filter) );
226             }
227             else {
228 11         97 return ( Filter => POE::Filter::Line->new(), );
229             }
230              
231             }
232              
233             # Get something: either arrayref, ref, or string
234             # Return filter
235             sub _load_filter {
236 2     2   20 my $filter = shift;
237 2 50       11 if (ref ($filter) eq 'ARRAY') {
    50          
238 0         0 my @args = @$filter;
239 0         0 $filter = shift @args;
240 0 0       0 if ( _test_filter($filter) ){
241 0         0 return $filter->new(@args);
242             } else {
243 0         0 return POE::Filter::Line->new(@args);
244             }
245             }
246             elsif (ref $filter) {
247 2         9 return $filter->clone();
248             }
249             else {
250 0 0       0 if ( _test_filter($filter) ) {
251 0         0 return $filter->new();
252             } else {
253 0         0 return POE::Filter::Line->new();
254             }
255             }
256             }
257              
258             # Test if a Filter can be loaded, return sucess or failure
259             sub _test_filter {
260 0     0   0 my $filter = shift;
261 0         0 my $eval = eval {
262 0         0 (my $mod = $filter) =~ s!::!/!g;
263 0         0 require "$mod.pm";
264 0         0 1;
265             };
266 0 0 0     0 if (!$eval and $@) {
267 0         0 carp(
268             "Failed to load [$filter]\n" .
269             "Reason $@\nUsing defualt POE::Filter::Line "
270             );
271 0         0 return 0;
272             }
273 0         0 return 1;
274             }
275              
276             sub _accept_failed4 {
277 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
278 0         0 warn "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
279 0 0       0 delete $self->{listener} if $operation eq 'listen';
280 0         0 $self->_send_event( $self->{_prefix} . 'listener_failed', $operation, $errnum, $errstr );
281 0         0 return;
282             }
283              
284             sub _accept_failed6 {
285 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
286 0         0 warn "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
287 0 0       0 delete $self->{listener6} if $operation eq 'listen';
288 0         0 $self->_send_event( $self->{_prefix} . 'listener_failed', $operation, $errnum, $errstr );
289 0         0 return;
290             }
291              
292             sub disconnect {
293 5     5 1 1450 my $self = shift;
294 5         21 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
295             }
296              
297             sub _disconnect {
298 5     5   184 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
299 5 50       11 return unless $self->_conn_exists( $id );
300 5         8 $self->{clients}->{ $id }->{quit} = 1;
301 5         10 return 1;
302             }
303              
304             sub terminate {
305 4     4 1 1495 my $self = shift;
306 4         15 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
307             }
308              
309             sub _terminate {
310 4     4   163 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
311 4 50       10 return unless $self->_conn_exists( $id );
312 4         25 delete $self->{clients}->{ $id };
313 4         874 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
314 4         9 return 1;
315             }
316              
317             sub _conn_input {
318 18     18   17179 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
319 18 50       41 return unless $self->_conn_exists( $id );
320             #$kernel->delay_adjust( $self->{clients}->{ $id }->{alarm}, $self->{time_out} || 300 );
321 18         68 $self->_send_event( $self->{_prefix} . 'client_input', $id, $input );
322 18         37 return;
323             }
324              
325             sub _conn_error {
326 2     2   2199 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
327 2 50       8 return unless $self->_conn_exists( $id );
328 2         5 my $href = delete $self->{clients}->{ $id };
329 2         9 delete $href->{wheel};
330 2         343 $self->_send_event( $self->{_prefix} . 'disconnected', $id, map { $href->{$_} } qw(peeraddr peerport sockaddr sockport) );
  8         20  
331 2         8 return;
332             }
333              
334             sub _conn_flushed {
335 29     29   9913 my ($self,$id) = @_[OBJECT,ARG0];
336 29 50       59 return unless $self->_conn_exists( $id );
337 29 100       80 if ( $self->{clients}->{ $id }->{BUFFER} ) {
338 6         8 my $item = shift @{ $self->{clients}->{ $id }->{BUFFER} };
  6         18  
339 6 100       15 unless ( $item ) {
340 2         4 delete $self->{clients}->{ $id }->{BUFFER};
341 2         9 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
342 2         6 return;
343             }
344 4         24 $self->{clients}->{ $id }->{wheel}->put($item);
345 4         189 return;
346             }
347 23 100       51 unless ( $self->{clients}->{ $id }->{quit} ) {
348 18         67 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
349 18         43 return;
350             }
351 5         28 delete $self->{clients}->{ $id };
352 5         766 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
353 5         10 return;
354             }
355              
356             sub _conn_alarm {
357 0     0   0 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
358 0 0       0 return unless $self->_conn_exists( $id );
359 0         0 delete $self->{clients}->{ $id };
360 0         0 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
361 0         0 return;
362             }
363              
364             sub _shutdown {
365 14     14   2165 my ($kernel,$self) = @_[KERNEL,OBJECT];
366 14         68 delete $self->{listener};
367 14         1724 delete $self->{listener6};
368 14         1343 delete $self->{clients};
369 14         214 $kernel->alarm_remove_all();
370 14         477 $kernel->alias_remove( $_ ) for $kernel->alias_list();
371 14 50       377 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
372             # $self->_pluggable_destroy();
373 14         470 $self->_unregister_sessions();
374 14         264 return;
375             }
376              
377             sub register {
378 1     1 1 212 my ($kernel, $self, $session, $sender, @events) =
379             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
380              
381 1 50       3 unless (@events) {
382 0         0 warn "register: Not enough arguments";
383 0         0 return;
384             }
385              
386 1         2 my $sender_id = $sender->ID();
387              
388 1         4 foreach (@events) {
389 1 50       4 $_ = $self->{_prefix} . $_ unless /^_/;
390 1         7 $self->{events}->{$_}->{$sender_id} = $sender_id;
391 1         3 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
392 1 50 33     7 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
393 1         3 $kernel->refcount_increment($sender_id, __PACKAGE__);
394             }
395             }
396              
397 1         20 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
398 1         51 return;
399             }
400              
401             sub unregister {
402 2     2 1 1720 my ($kernel, $self, $session, $sender, @events) =
403             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
404              
405 2 50       8 unless (@events) {
406 0         0 warn "unregister: Not enough arguments";
407 0         0 return;
408             }
409              
410 2         11 $self->_unregister($session,$sender,@events);
411 2         4 undef;
412             }
413              
414             sub _unregister {
415 2     2   6 my ($self,$session,$sender) = splice @_,0,3;
416 2         4 my $sender_id = $sender->ID();
417              
418 2         9 foreach (@_) {
419 2 50       21 $_ = $self->{_prefix} . $_ unless /^_/;
420 2         4 my $blah = delete $self->{events}->{$_}->{$sender_id};
421 2 50       5 unless ( $blah ) {
422 0         0 warn "$sender_id hasn't registered for '$_' events\n";
423 0         0 next;
424             }
425 2 50       7 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
426 2         4 delete $self->{sessions}->{$sender_id};
427 2 50       6 unless ($session == $sender) {
428 2         8 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
429             }
430             }
431             }
432 2         69 undef;
433             }
434              
435             sub _unregister_sessions {
436 14     14   24 my $self = shift;
437 14         48 my $testd_id = $self->session_id();
438 14         22 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  14         48  
439 10 50       38 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
440 10         25 delete $self->{sessions}->{$session_id};
441 10 50       48 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
442             unless ( $session_id eq $testd_id );
443             }
444             }
445             }
446              
447             sub __send_event {
448 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
449 0         0 $self->_send_event( $event, @args );
450 0         0 return;
451             }
452              
453             sub _send_event {
454 61     61   69 my $self = shift;
455 61         99 my ($event, @args) = @_;
456 61         63 my $kernel = $POE::Kernel::poe_kernel;
457 61         52 my %sessions;
458              
459 61         53 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  61         182  
  61         255  
460              
461 61         195 $kernel->post( $_ => $event => @args ) for values %sessions;
462 61         3866 undef;
463             }
464              
465             sub send_to_client {
466 25     25 1 7961 my $self = shift;
467 25         78 $poe_kernel->call( $self->{session_id}, '_send_to_client', @_ );
468             }
469              
470             sub _send_to_client {
471 25     25   939 my ($kernel,$self,$id,$output) = @_[KERNEL,OBJECT,ARG0..ARG1];
472 25 50       58 return unless $self->_conn_exists( $id );
473 25 50       54 return unless defined $output;
474              
475 25 100       58 if ( ref $output eq 'ARRAY' ) {
476 2         3 my $temp = [ @{ $output } ];
  2         4  
477 2         2 my $first = shift @{ $temp };
  2         5  
478 2 50       2 $self->{clients}->{ $id }->{BUFFER} = $temp if scalar @{ $temp };
  2         7  
479 2 50       17 $self->{clients}->{ $id }->{wheel}->put($first) if defined $first;
480 2         141 return 1;
481             }
482              
483 23         80 $self->{clients}->{ $id }->{wheel}->put($output);
484 23         1115 return 1;
485             }
486              
487             sub send_to_all_clients {
488 0     0 1 0 my $self = shift;
489 0         0 $poe_kernel->call( $self->{session_id}, '_send_to_all_clients', @_ );
490             }
491              
492             sub _send_to_all_clients {
493 1     1   597 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
494 1 50       4 return unless defined $output;
495 1         1 $self->send_to_client( $_, $output ) for
496 1         18 keys %{ $self->{clients} };
497 1         6 return 1;
498             }
499              
500             q{Putting the test into POE};
501              
502             __END__