File Coverage

blib/lib/Test/POE/Server/TCP.pm
Criterion Covered Total %
statement 199 243 81.8
branch 57 102 55.8
condition 6 16 37.5
subroutine 36 42 85.7
pod 16 16 100.0
total 314 419 74.9


line stmt bran cond sub pod time code
1             package Test::POE::Server::TCP;
2             $Test::POE::Server::TCP::VERSION = '1.18';
3             # ABSTRACT: A POE Component providing TCP server services for test cases
4              
5 12     12   2224487 use strict;
  12         27  
  12         510  
6 12     12   65 use warnings;
  12         22  
  12         478  
7 12     12   557 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         44489  
  12         71  
8 12     12   172467 use Socket;
  12         23  
  12         7966  
9 12     12   255 use Carp qw(carp croak);
  12         33  
  12         40655  
10              
11             sub spawn {
12 12     12 1 17374 my $package = shift;
13 12         79 my %opts = @_;
14 12         150 $opts{lc $_} = delete $opts{$_} for keys %opts;
15 12         45 my $options = delete $opts{options};
16 12         55 my $self = bless \%opts, $package;
17 12         110 $self->{_prefix} = delete $self->{prefix};
18 12 100       87 $self->{_prefix} = 'testd_' unless defined $self->{_prefix};
19 12 100       116 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
20 12 100       388 $self->{session_id} = POE::Session->create(
21             object_states => [
22             $self => { shutdown => '_shutdown',
23             send_event => '__send_event',
24             send_to_client => '_send_to_client',
25             send_to_all_clients => '_send_to_all_clients',
26             disconnect => '_disconnect',
27             terminate => '_terminate',
28             start_listener => '_start_listener',
29             },
30             $self => [ qw(_start register unregister _accept_client _accept_failed _conn_input _conn_error _conn_flushed _conn_alarm _send_to_client __send_event _disconnect _send_to_all_clients) ],
31             ],
32             heap => $self,
33             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
34             )->ID();
35 12         1767 return $self;
36             }
37              
38             sub session_id {
39 15     15 1 911 return $_[0]->{session_id};
40             }
41              
42             sub pause_listening {
43 1 50   1 1 47 return unless $_[0]->{listener};
44 1         8 $_[0]->{listener}->pause_accept();
45             }
46              
47             sub resume_listening {
48 1 50   1 1 1001727 return unless $_[0]->{listener};
49 1         11 $_[0]->{listener}->resume_accept();
50             }
51              
52             sub getsockname {
53 11 50   11 1 63 return unless $_[0]->{listener};
54 11         72 return $_[0]->{listener}->getsockname();
55             }
56              
57             sub port {
58 11     11 1 10993 my $self = shift;
59 11         68 return ( sockaddr_in( $self->getsockname() ) )[0];
60             }
61              
62             sub _conn_exists {
63 86     86   124 my ($self,$wheel_id) = @_;
64 86 50 33     535 return 0 unless $wheel_id and defined $self->{clients}->{ $wheel_id };
65 86         270 return 1;
66             }
67              
68             sub shutdown {
69 10     10 1 16065 my $self = shift;
70 10         121 $poe_kernel->call( $self->{session_id}, 'shutdown' );
71             }
72              
73             sub _start {
74 12     12   5343 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
75 12         82 $self->{session_id} = $_[SESSION]->ID();
76 12 50       107 if ( $self->{alias} ) {
77 0         0 $kernel->alias_set( $self->{alias} );
78             }
79             else {
80 12         85 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
81             }
82 12 100       654 if ( $kernel != $sender ) {
83 11         58 my $sender_id = $sender->ID;
84 11         122 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
85 11         58 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
86 11         39 $self->{sessions}->{$sender_id}->{'refcnt'}++;
87 11         46 $kernel->refcount_increment($sender_id, __PACKAGE__);
88 11         381 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
89 11         1571 $kernel->detach_myself();
90             }
91              
92 12         1529 $kernel->call( $self->{session_id}, 'start_listener' );
93 12         118 return;
94             }
95              
96             sub start_listener {
97 0     0 1 0 my $self = shift;
98 0         0 $poe_kernel->call( $self->{session_id}, 'start_listener', @_ );
99             }
100              
101             sub _start_listener {
102 12     12   642 my ($kernel,$self) = @_[KERNEL,OBJECT];
103 12 50       76 return if $self->{listener};
104              
105 12 100       239 $self->{listener} = POE::Wheel::SocketFactory->new(
    100          
106             ( defined $self->{address} ? ( BindAddress => $self->{address} ) : () ),
107             ( defined $self->{port} ? ( BindPort => $self->{port} ) : ( BindPort => 0 ) ),
108             SuccessEvent => '_accept_client',
109             FailureEvent => '_accept_failed',
110             SocketDomain => AF_INET, # Sets the socket() domain
111             SocketType => SOCK_STREAM, # Sets the socket() type
112             SocketProtocol => 'tcp', # Sets the socket() protocol
113             Reuse => 'on', # Lets the port be reused
114             );
115              
116 12         7135 return;
117             }
118              
119             sub _accept_client {
120 12     12   13070 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
121 12         203 my $sockaddr = inet_ntoa( ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[1] );
122 12         142 my $sockport = ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[0];
123 12         64 $peeraddr = inet_ntoa( $peeraddr );
124              
125 12         111 my $wheel = POE::Wheel::ReadWrite->new(
126             Handle => $socket,
127             _get_filters(
128             $self->{filter},
129             $self->{inputfilter},
130             $self->{outputfilter}
131             ),
132             InputEvent => '_conn_input',
133             ErrorEvent => '_conn_error',
134             FlushedEvent => '_conn_flushed',
135             );
136              
137 12 50       5586 return unless $wheel;
138            
139 12         270 my $id = $wheel->ID();
140 12         193 $self->{clients}->{ $id } =
141             {
142             wheel => $wheel,
143             peeraddr => $peeraddr,
144             peerport => $peerport,
145             sockaddr => $sockaddr,
146             sockport => $sockport,
147             };
148 12         80 $self->_send_event( $self->{_prefix} . 'connected', $id, $peeraddr, $peerport, $sockaddr, $sockport );
149              
150             #$self->{clients}->{ $id }->{alarm} = $kernel->delay_set( '_conn_alarm', $self->{time_out} || 300, $id );
151 12         54 return;
152             }
153              
154             sub client_info {
155 2     2 1 1456 my $self = shift;
156 2   50     7 my $id = shift || return;
157 2 50       7 return unless $self->_conn_exists( $id );
158 2         3 my %hash = %{ $self->{clients}->{ $id } };
  2         12  
159 2         6 delete $hash{wheel};
160 2 100       7 return map { $hash{$_} } qw(peeraddr peerport sockaddr sockport) if wantarray;
  4         12  
161 1         5 return \%hash;
162             }
163              
164             sub client_wheel {
165 1     1 1 6 my $self = shift;
166 1   50     4 my $id = shift || return;
167 1 50       3 return unless $self->_conn_exists( $id );
168 1         13 return $self->{clients}->{ $id }->{wheel};
169             }
170              
171             sub _get_filters {
172 12     12   96 my ($client_filter, $client_infilter, $client_outfilter) = @_;
173 12 100 66     138 if (defined $client_infilter or defined $client_outfilter) {
    50          
174             return (
175 1         8 "InputFilter" => _load_filter($client_infilter),
176             "OutputFilter" => _load_filter($client_outfilter)
177             );
178 0 0       0 if (defined $client_filter) {
179 0         0 carp(
180             "Filter ignored with InputFilter or OutputFilter"
181             );
182             }
183             }
184             elsif (defined $client_filter) {
185 0         0 return ( "Filter" => _load_filter($client_filter) );
186             }
187             else {
188 11         120 return ( Filter => POE::Filter::Line->new(), );
189             }
190              
191             }
192              
193             # Get something: either arrayref, ref, or string
194             # Return filter
195             sub _load_filter {
196 2     2   23 my $filter = shift;
197 2 50       24 if (ref ($filter) eq 'ARRAY') {
    50          
198 0         0 my @args = @$filter;
199 0         0 $filter = shift @args;
200 0 0       0 if ( _test_filter($filter) ){
201 0         0 return $filter->new(@args);
202             } else {
203 0         0 return POE::Filter::Line->new(@args);
204             }
205             }
206             elsif (ref $filter) {
207 2         11 return $filter->clone();
208             }
209             else {
210 0 0       0 if ( _test_filter($filter) ) {
211 0         0 return $filter->new();
212             } else {
213 0         0 return POE::Filter::Line->new();
214             }
215             }
216             }
217              
218             # Test if a Filter can be loaded, return sucess or failure
219             sub _test_filter {
220 0     0   0 my $filter = shift;
221 0         0 my $eval = eval {
222 0         0 (my $mod = $filter) =~ s!::!/!g;
223 0         0 require "$mod.pm";
224 0         0 1;
225             };
226 0 0 0     0 if (!$eval and $@) {
227 0         0 carp(
228             "Failed to load [$filter]\n" .
229             "Reason $@\nUsing defualt POE::Filter::Line "
230             );
231 0         0 return 0;
232             }
233 0         0 return 1;
234             }
235              
236             sub _accept_failed {
237 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
238 0         0 warn "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
239 0 0       0 delete $self->{listener} if $operation eq 'listen';
240 0         0 $self->_send_event( $self->{_prefix} . 'listener_failed', $operation, $errnum, $errstr );
241 0         0 return;
242             }
243              
244             sub disconnect {
245 5     5 1 3115 my $self = shift;
246 5         36 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
247             }
248              
249             sub _disconnect {
250 5     5   311 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
251 5 50       16 return unless $self->_conn_exists( $id );
252 5         17 $self->{clients}->{ $id }->{quit} = 1;
253 5         17 return 1;
254             }
255              
256             sub terminate {
257 4     4 1 1923 my $self = shift;
258 4         30 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
259             }
260              
261             sub _terminate {
262 4     4   262 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
263 4 50       24 return unless $self->_conn_exists( $id );
264 4         43 delete $self->{clients}->{ $id };
265 4         1693 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
266 4         17 return 1;
267             }
268              
269             sub _conn_input {
270 18     18   24119 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
271 18 50       61 return unless $self->_conn_exists( $id );
272             #$kernel->delay_adjust( $self->{clients}->{ $id }->{alarm}, $self->{time_out} || 300 );
273 18         87 $self->_send_event( $self->{_prefix} . 'client_input', $id, $input );
274 18         56 return;
275             }
276              
277             sub _conn_error {
278 2     2   2483 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
279 2 50       8 return unless $self->_conn_exists( $id );
280 2         8 my $href = delete $self->{clients}->{ $id };
281 2         12 delete $href->{wheel};
282 2         425 $self->_send_event( $self->{_prefix} . 'disconnected', $id, map { $href->{$_} } qw(peeraddr peerport sockaddr sockport) );
  8         21  
283 2         9 return;
284             }
285              
286             sub _conn_flushed {
287 29     29   19951 my ($self,$id) = @_[OBJECT,ARG0];
288 29 50       96 return unless $self->_conn_exists( $id );
289 29 100       121 if ( $self->{clients}->{ $id }->{BUFFER} ) {
290 6         9 my $item = shift @{ $self->{clients}->{ $id }->{BUFFER} };
  6         18  
291 6 100       18 unless ( $item ) {
292 2         10 delete $self->{clients}->{ $id }->{BUFFER};
293 2         17 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
294 2         9 return;
295             }
296 4         25 $self->{clients}->{ $id }->{wheel}->put($item);
297 4         291 return;
298             }
299 23 100       88 unless ( $self->{clients}->{ $id }->{quit} ) {
300 18         95 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
301 18         55 return;
302             }
303 5         50 delete $self->{clients}->{ $id };
304 5         1417 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
305 5         17 return;
306             }
307              
308             sub _conn_alarm {
309 0     0   0 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
310 0 0       0 return unless $self->_conn_exists( $id );
311 0         0 delete $self->{clients}->{ $id };
312 0         0 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
313 0         0 return;
314             }
315              
316             sub _shutdown {
317 14     14   2952 my ($kernel,$self) = @_[KERNEL,OBJECT];
318 14         123 delete $self->{listener};
319 14         3011 delete $self->{clients};
320 14         358 $kernel->alarm_remove_all();
321 14         752 $kernel->alias_remove( $_ ) for $kernel->alias_list();
322 14 50       706 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
323             # $self->_pluggable_destroy();
324 14         707 $self->_unregister_sessions();
325 14         384 return;
326             }
327              
328             sub register {
329 1     1 1 286 my ($kernel, $self, $session, $sender, @events) =
330             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
331              
332 1 50       3 unless (@events) {
333 0         0 warn "register: Not enough arguments";
334 0         0 return;
335             }
336              
337 1         3 my $sender_id = $sender->ID();
338              
339 1         4 foreach (@events) {
340 1 50       5 $_ = $self->{_prefix} . $_ unless /^_/;
341 1         4 $self->{events}->{$_}->{$sender_id} = $sender_id;
342 1         3 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
343 1 50 33     9 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
344 1         3 $kernel->refcount_increment($sender_id, __PACKAGE__);
345             }
346             }
347              
348 1         29 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
349 1         54 return;
350             }
351              
352             sub unregister {
353 2     2 1 3649 my ($kernel, $self, $session, $sender, @events) =
354             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
355              
356 2 50       11 unless (@events) {
357 0         0 warn "unregister: Not enough arguments";
358 0         0 return;
359             }
360              
361 2         13 $self->_unregister($session,$sender,@events);
362 2         10 undef;
363             }
364              
365             sub _unregister {
366 2     2   7 my ($self,$session,$sender) = splice @_,0,3;
367 2         9 my $sender_id = $sender->ID();
368              
369 2         14 foreach (@_) {
370 2 50       15 $_ = $self->{_prefix} . $_ unless /^_/;
371 2         39 my $blah = delete $self->{events}->{$_}->{$sender_id};
372 2 50       11 unless ( $blah ) {
373 0         0 warn "$sender_id hasn't registered for '$_' events\n";
374 0         0 next;
375             }
376 2 50       11 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
377 2         8 delete $self->{sessions}->{$sender_id};
378 2 50       8 unless ($session == $sender) {
379 2         14 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
380             }
381             }
382             }
383 2         149 undef;
384             }
385              
386             sub _unregister_sessions {
387 14     14   63 my $self = shift;
388 14         70 my $testd_id = $self->session_id();
389 14         29 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  14         90  
390 10 50       95 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
391 10         39 delete $self->{sessions}->{$session_id};
392 10 50       69 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
393             unless ( $session_id eq $testd_id );
394             }
395             }
396             }
397              
398             sub __send_event {
399 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
400 0         0 $self->_send_event( $event, @args );
401 0         0 return;
402             }
403              
404             sub _send_event {
405 61     61   104 my $self = shift;
406 61         147 my ($event, @args) = @_;
407 61         79 my $kernel = $POE::Kernel::poe_kernel;
408 61         70 my %sessions;
409              
410 61         61 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  61         277  
  61         365  
411              
412 61         328 $kernel->post( $_ => $event => @args ) for values %sessions;
413 61         5941 undef;
414             }
415              
416             sub send_to_client {
417 25     25 1 13430 my $self = shift;
418 25         114 $poe_kernel->call( $self->{session_id}, '_send_to_client', @_ );
419             }
420              
421             sub _send_to_client {
422 25     25   1569 my ($kernel,$self,$id,$output) = @_[KERNEL,OBJECT,ARG0..ARG1];
423 25 50       86 return unless $self->_conn_exists( $id );
424 25 50       87 return unless defined $output;
425              
426 25 100       76 if ( ref $output eq 'ARRAY' ) {
427 2         4 my $temp = [ @{ $output } ];
  2         19  
428 2         4 my $first = shift @{ $temp };
  2         5  
429 2 50       4 $self->{clients}->{ $id }->{BUFFER} = $temp if scalar @{ $temp };
  2         18  
430 2 50       30 $self->{clients}->{ $id }->{wheel}->put($first) if defined $first;
431 2         270 return 1;
432             }
433              
434 23         126 $self->{clients}->{ $id }->{wheel}->put($output);
435 23         1730 return 1;
436             }
437              
438             sub send_to_all_clients {
439 0     0 1 0 my $self = shift;
440 0         0 $poe_kernel->call( $self->{session_id}, '_send_to_all_clients', @_ );
441             }
442              
443             sub _send_to_all_clients {
444 1     1   747 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
445 1 50       13 return unless defined $output;
446 1         3 $self->send_to_client( $_, $output ) for
  1         10  
447             keys %{ $self->{clients} };
448 1         12 return 1;
449             }
450              
451             q{Putting the test into POE};
452              
453             __END__