File Coverage

blib/lib/Test/POE/Client/TCP.pm
Criterion Covered Total %
statement 203 278 73.0
branch 65 128 50.7
condition 16 42 38.1
subroutine 30 38 78.9
pod 13 13 100.0
total 327 499 65.5


line stmt bran cond sub pod time code
1             package Test::POE::Client::TCP;
2             $Test::POE::Client::TCP::VERSION = '1.26';
3             #ABSTRACT: A POE Component providing TCP client services for test cases
4              
5 14     14   5180380 use strict;
  14         29  
  14         548  
6 14     14   68 use warnings;
  14         24  
  14         985  
7 14     14   1675 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  14         130382  
  14         137  
8 14     14   288903 use POSIX qw[ETIMEDOUT];
  14         37  
  14         122  
9 14         1249 use Socket qw(AF_INET AF_INET6 SOCK_STREAM inet_ntoa inet_ntop inet_pton
10 14     14   1413 unpack_sockaddr_in unpack_sockaddr_in6);
  14         30  
11 14     14   109 use Carp qw(carp croak);
  14         23  
  14         1470  
12              
13             our $GOT_SSL;
14              
15             BEGIN {
16 14     14   49 eval {
17 14         8107 require POE::Component::SSLify;
18 14         351493 import POE::Component::SSLify qw( Client_SSLify SSLify_ContextCreate );
19 14         61083 $GOT_SSL = 1;
20             };
21             }
22              
23             sub spawn {
24 13     13 1 970170 my $package = shift;
25 13         44 my %opts = @_;
26 13         66 $opts{lc $_} = delete $opts{$_} for keys %opts;
27 13         62 my $options = delete $opts{options};
28 13         33 my $autoconnect = delete $opts{autoconnect};
29 13 50 33     107 if ( $autoconnect and !( $opts{address} and $opts{port} ) ) {
      66        
30 0         0 carp "You must provide both 'address' and 'port' parameters when specifying 'autoconnect'\n";
31 0         0 return;
32             }
33 13         31 my $usessl = delete $opts{usessl};
34 13 50 33     106 if ( $usessl && !$GOT_SSL ) {
35 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
36 0         0 return;
37             }
38 13 50 33     73 delete $opts{timeout} unless $opts{timeout} and $opts{timeout} =~ m!^\d+$!;
39 13         100 my $self = bless \%opts, $package;
40 13         117 $self->{usessl} = $usessl;
41 13         97 $self->{_prefix} = delete $self->{prefix};
42 13 100       60 $self->{_prefix} = 'testc_' unless defined $self->{_prefix};
43 13 100       110 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
44 13 50       407 $self->{session_id} = POE::Session->create(
45             object_states => [
46             $self => { shutdown => '_shutdown',
47             send_event => '__send_event',
48             send_to_server => '_send_to_server',
49             disconnect => '_disconnect',
50             terminate => '_terminate',
51             connect => '_connect',
52             _timeout => '_socket_fail',
53             },
54             $self => [ qw(_start register unregister _socket_up _socket_fail _conn_input _conn_error _conn_flushed _send_to_server __send_event _disconnect) ],
55             ],
56             heap => $self,
57             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
58             args => [ $autoconnect ],
59             )->ID();
60 13         2313 return $self;
61             }
62              
63             sub session_id {
64 14     14 1 790 shift->{session_id};
65             }
66              
67             sub context {
68 0     0 1 0 my $self = shift;
69 0 0       0 return $self->{context} if defined $self->{context};
70 0         0 return;
71             }
72              
73             sub shutdown {
74 13     13 1 23894 $poe_kernel->call( shift->{session_id}, 'shutdown' );
75             }
76              
77             sub server_info {
78 2     2 1 631 my $self = shift;
79 2 50       8 return unless $self->{_server_info};
80 2         3 my @vals = @{ $self->{_server_info} };
  2         7  
81 2 100       10 return @vals if wantarray;
82 1         4 return { map { $_ => shift @vals } qw(peeraddr peerport sockaddr sockport) };
  4         15  
83             }
84              
85             sub connect {
86 0     0 1 0 $poe_kernel->call( shift->{session_id}, 'connect' );
87             }
88              
89             sub wheel {
90 2     2 1 2290 my $self = shift;
91 2 50       12 return unless $self->{socket};
92 2         13 return $self->{socket};
93             }
94              
95             sub alias {
96 0     0 1 0 shift->{alias};
97             }
98              
99             sub _start {
100 13     13   7723 my ($kernel,$self,$sender,$autoconnect) = @_[KERNEL,OBJECT,SENDER,ARG0];
101 13         83 $self->{session_id} = $_[SESSION]->ID();
102              
103 13 50       142 if ( $self->{alias} ) {
104 0         0 $kernel->alias_set( $self->{alias} );
105             }
106             else {
107 13         113 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
108             }
109              
110 13 100       674 if ( $kernel != $sender ) {
111 12         49 my $sender_id = $sender->ID;
112 12         128 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
113 12         61 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
114 12         44 $self->{sessions}->{$sender_id}->{'refcnt'}++;
115 12         77 $kernel->refcount_increment($sender_id, __PACKAGE__);
116 12         449 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
117 12         2042 $kernel->detach_myself();
118             }
119              
120 13 50 66     2200 $kernel->yield( 'connect' ) if $autoconnect and $self->{address} and $self->{port};
      33        
121 13         151 return;
122             }
123              
124             sub _connect {
125 11     11   25897 my ($kernel,$self) = @_[KERNEL,OBJECT];
126 11         28 my $args;
127 11 100       54 if ( ref( $_[ARG0] ) eq 'HASH' ) {
128 10         52 $args = { %{ $_[ARG0] } };
  10         53  
129             }
130             else {
131 1         4 $args = { @_[ARG0..$#_] };
132             }
133 11         26 $args->{lc $_} = delete $args->{$_} for keys %{ $args };
  11         85  
134 11 100 66     100 unless ( $self->{address} and $self->{port} ) {
135 10 50 33     83 unless ( $args->{address} and $args->{port} ) {
136 0         0 carp "You must provide both 'address' and 'port' parameters\n";
137 0         0 return;
138             }
139 10         53 $self->{address} = $args->{address};
140 10         27 $self->{port} = $args->{port};
141             }
142 11         28 my $usessl = delete $args->{usessl};
143 11 50 33     63 if ( $usessl && !$GOT_SSL ) {
144 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
145 0         0 return;
146             }
147              
148 11 50       51 $self->{usessl} = $usessl if defined $usessl;
149              
150 11 50       64 $self->{localaddr} = $args->{localaddr} if $args->{localaddr};
151 11 50       96 $self->{localport} = $args->{localaddr} if $args->{localport};
152              
153 11 50       41 if ( $self->{socket} ) {
154 0         0 carp "Already connected. Disconnect and call 'connect' again\n";
155 0         0 return;
156             }
157              
158 11 50       40 if ( $self->{factory} ) {
159 0         0 carp "Connection already in progress\n";
160 0         0 return;
161             }
162              
163             $self->{domain} =
164 11 100       142 defined( inet_pton( AF_INET6, $self->{address} ) ) ? AF_INET6 : AF_INET;
165             $self->{factory} = POE::Wheel::SocketFactory->new(
166             RemoteAddress => $self->{address},
167             RemotePort => $self->{port},
168             ( defined $self->{address} ? ( BindAddress => $self->{localaddr} ) : () ),
169             ( defined $self->{port} ? ( BindPort => $self->{localport} ) : () ),
170             SuccessEvent => '_socket_up',
171             FailureEvent => '_socket_fail',
172             SocketDomain => $self->{domain}, # Sets the socket() domain
173 11 50       245 SocketType => SOCK_STREAM, # Sets the socket() type
    50          
174             SocketProtocol => 'tcp', # Sets the socket() protocol
175             Reuse => 'yes', # Lets the port be reused
176             );
177              
178 11 50       8903 $kernel->delay( '_timeout', $self->{timeout}, 'connect', ETIMEDOUT, POSIX::strerror( ETIMEDOUT ) ) if $self->{timeout};
179 11         81 return;
180             }
181              
182             sub _socket_up {
183 11     11   15546 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
184 11 100       200 my @sockaddr = ( $self->{domain} == AF_INET6 )
185             ? unpack_sockaddr_in6 ( CORE::getsockname $socket )
186             : unpack_sockaddr_in ( CORE::getsockname $socket );
187 11         81 my $sockaddr = inet_ntop( $self->{domain}, $sockaddr[1] );
188 11         29 my $sockport = $sockaddr[0];
189 11         61 $peeraddr = inet_ntop( $self->{domain}, $peeraddr );
190 11         67 $kernel->delay( '_timeout' );
191              
192 11         887 delete $self->{factory};
193              
194 11 0 33     332 if ( $self->{usessl} && $GOT_SSL ) {
195 0         0 eval {
196 0         0 my $ctx;
197 0 0 0     0 if ($self->{sslctx}) {
    0          
198 0         0 $ctx = $self->{sslctx};
199             }
200             elsif ($self->{sslkey} && $self->{sslcert}) {
201 0         0 $ctx = SSLify_ContextCreate($self->{sslkey}, $self->{sslcert});
202             }
203             else {
204 0         0 $ctx = undef;
205             }
206 0         0 $socket = Client_SSLify( $socket, undef, undef, $ctx );
207 0 0       0 if ( $@ ) {
208 0         0 warn "Couldn't use an SSL socket: $@\n";
209 0         0 $self->{usessl} = 0;
210             }
211             };
212             }
213              
214             $self->{socket} = POE::Wheel::ReadWrite->new(
215             Handle => $socket,
216             _get_filters(
217             $self->{filter},
218             $self->{inputfilter},
219             $self->{outputfilter}
220 11         184 ),
221             InputEvent => '_conn_input',
222             ErrorEvent => '_conn_error',
223             FlushedEvent => '_conn_flushed',
224             );
225              
226 11         5113 $self->{_server_info} = [ $peeraddr, $peerport, $sockaddr, $sockport ];
227              
228 11         89 $self->_send_event( $self->{_prefix} . 'connected', $peeraddr, $peerport, $sockaddr, $sockport );
229 11         51 return;
230             }
231              
232             sub _get_filters {
233 11     11   74 my ($client_filter, $client_infilter, $client_outfilter) = @_;
234 11 100 66     190 if (defined $client_infilter or defined $client_outfilter) {
    50          
235             return (
236 1         4 "InputFilter" => _load_filter($client_infilter),
237             "OutputFilter" => _load_filter($client_outfilter)
238             );
239 0 0       0 if (defined $client_filter) {
240 0         0 carp(
241             "Filter ignored with InputFilter or OutputFilter"
242             );
243             }
244             }
245             elsif (defined $client_filter) {
246 0         0 return ( "Filter" => _load_filter($client_filter) );
247             }
248             else {
249 10         98 return ( Filter => POE::Filter::Line->new(), );
250             }
251              
252             }
253              
254             # Get something: either arrayref, ref, or string
255             # Return filter
256             sub _load_filter {
257 2     2   21 my $filter = shift;
258 2 50       7 if (ref ($filter) eq 'ARRAY') {
    50          
259 0         0 my @args = @$filter;
260 0         0 $filter = shift @args;
261 0 0       0 if ( _test_filter($filter) ){
262 0         0 return $filter->new(@args);
263             } else {
264 0         0 return POE::Filter::Line->new(@args);
265             }
266             }
267             elsif (ref $filter) {
268 2         10 return $filter->clone();
269             }
270             else {
271 0 0       0 if ( _test_filter($filter) ) {
272 0         0 return $filter->new();
273             } else {
274 0         0 return POE::Filter::Line->new();
275             }
276             }
277             }
278              
279             # Test if a Filter can be loaded, return sucess or failure
280             sub _test_filter {
281 0     0   0 my $filter = shift;
282 0         0 my $eval = eval {
283 0         0 (my $mod = $filter) =~ s!::!/!g;
284 0         0 require "$mod.pm";
285 0         0 1;
286             };
287 0 0 0     0 if (!$eval and $@) {
288 0         0 carp(
289             "Failed to load [$filter]\n" .
290             "Reason $@\nUsing defualt POE::Filter::Line "
291             );
292 0         0 return 0;
293             }
294 0         0 return 1;
295             }
296              
297             sub _socket_fail {
298 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
299 0 0       0 carp "Wheel $wheel_id generated $operation error $errnum: $errstr\n" if $self->{debug};
300 0         0 $kernel->delay( '_timeout' );
301 0         0 delete $self->{factory};
302 0         0 $self->_send_event( $self->{_prefix} . 'socket_failed', $operation, $errnum, $errstr );
303 0         0 return;
304             }
305              
306             sub disconnect {
307 0     0 1 0 my $self = shift;
308 0         0 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
309             }
310              
311             sub _disconnect {
312 0     0   0 my ($kernel,$self) = @_[KERNEL,OBJECT];
313 0 0       0 return unless $self->{socket};
314 0         0 $self->{_quit} = 1;
315 0         0 return 1;
316             }
317              
318             sub terminate {
319 8     8 1 9402 my $self = shift;
320 8         59 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
321             }
322              
323             sub _terminate {
324 10     10   4178 my ($kernel,$self) = @_[KERNEL,OBJECT];
325 10 50       176 return unless $self->{socket};
326 10 50       113 if ( $^O =~ /(cygwin|MSWin)/ ) {
327 0         0 $self->{socket}->shutdown_input();
328 0         0 $self->{socket}->shutdown_output();
329             }
330 10         73 delete $self->{socket};
331 10         5165 delete $self->{_server_info};
332 10         98 $self->_send_event( $self->{_prefix} . 'disconnected' );
333 10         43 return 1;
334             }
335              
336             sub _conn_input {
337 19     19   37514 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
338 19         105 $self->_send_event( $self->{_prefix} . 'input', $input );
339 19         96 return;
340             }
341              
342             sub _conn_error {
343 1     1   1996 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
344 1 50       3 return unless $self->{socket};
345 1         4 delete $self->{socket};
346 1         222 delete $self->{_server_info};
347 1         3 $self->_send_event( $self->{_prefix} . 'disconnected' );
348 1         3 return;
349             }
350              
351             sub _conn_flushed {
352 19     19   12665 my ($self,$id) = @_[OBJECT,ARG0];
353 19 50       108 return unless $self->{socket};
354 19 100       67 if ( $self->{BUFFER} ) {
355 6         10 my $item = shift @{ $self->{BUFFER} };
  6         44  
356 6 100       17 unless ( $item ) {
357 2         8 delete $self->{BUFFER};
358 2         32 $self->_send_event( $self->{_prefix} . 'flushed' );
359 2         40 return;
360             }
361 4         20 $self->{socket}->put($item);
362 4         397 return;
363             }
364 13 50       46 unless ( $self->{_quit} ) {
365 13         103 $self->_send_event( $self->{_prefix} . 'flushed' );
366 13         47 return;
367             }
368 0         0 delete $self->{socket};
369 0         0 delete $self->{_server_info};
370 0         0 $self->_send_event( $self->{_prefix} . 'disconnected' );
371 0         0 return;
372             }
373              
374             sub _shutdown {
375 13     13   1166 my ($kernel,$self) = @_[KERNEL,OBJECT];
376 13         38 delete $self->{factory};
377 13         28 delete $self->{socket};
378 13         42 delete $self->{_server_info};
379 13         71 $kernel->alarm_remove_all();
380 13         1017 $kernel->alias_remove( $_ ) for $kernel->alias_list();
381 13 50       633 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
382 13         869 $self->_unregister_sessions();
383 13         626 return;
384             }
385              
386             sub register {
387 1     1 1 282 my ($kernel, $self, $session, $sender, @events) =
388             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
389              
390 1 50       3 unless (@events) {
391 0         0 warn "register: Not enough arguments";
392 0         0 return;
393             }
394              
395 1         3 my $sender_id = $sender->ID();
396              
397 1         4 foreach (@events) {
398 1 50       4 $_ = $self->{_prefix} . $_ unless /^_/;
399 1         3 $self->{events}->{$_}->{$sender_id} = $sender_id;
400 1         3 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
401 1 50 33     18 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
402 1         3 $kernel->refcount_increment($sender_id, __PACKAGE__);
403             }
404             }
405              
406 1         27 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
407 1         63 return;
408             }
409              
410             sub unregister {
411 2     2 1 4008 my ($kernel, $self, $session, $sender, @events) =
412             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
413              
414 2 50       11 unless (@events) {
415 0         0 warn "unregister: Not enough arguments";
416 0         0 return;
417             }
418              
419 2         10 $self->_unregister($session,$sender,@events);
420 2         8 undef;
421             }
422              
423             sub _unregister {
424 2     2   6 my ($self,$session,$sender) = splice @_,0,3;
425 2         8 my $sender_id = $sender->ID();
426              
427 2         12 foreach (@_) {
428 2 50       12 $_ = $self->{_prefix} . $_ unless /^_/;
429 2         6 my $blah = delete $self->{events}->{$_}->{$sender_id};
430 2 50       6 unless ( $blah ) {
431 0         0 warn "$sender_id hasn't registered for '$_' events\n";
432 0         0 next;
433             }
434 2 50       8 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
435 2         8 delete $self->{sessions}->{$sender_id};
436 2 50       7 unless ($session == $sender) {
437 2         23 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
438             }
439             }
440             }
441 2         137 undef;
442             }
443              
444             sub _unregister_sessions {
445 13     13   46 my $self = shift;
446 13         53 my $testd_id = $self->session_id();
447 13         31 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  13         61  
448 11 50       58 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
449 11         42 delete $self->{sessions}->{$session_id};
450 11 50       66 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
451             unless ( $session_id eq $testd_id );
452             }
453             }
454             }
455              
456             sub __send_event {
457 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
458 0         0 $self->_send_event( $event, @args );
459 0         0 return;
460             }
461              
462             #sub send_event {
463             # my $self = shift;
464             # $poe_kernel->post( $self->{session_id}, '__send_event', @_ );
465             #}
466              
467             sub _send_event {
468 56     56   119 my $self = shift;
469 56         169 my ($event, @args) = @_;
470 56         115 my $kernel = $POE::Kernel::poe_kernel;
471 56         100 my %sessions;
472              
473 56         122 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  56         213  
  56         340  
474              
475 56         289 $kernel->post( $_ => $event => @args ) for values %sessions;
476 56         7668 undef;
477             }
478              
479             sub send_to_server {
480 1     1 1 1215 my $self = shift;
481 1         6 $poe_kernel->call( $self->{session_id}, '_send_to_server', @_ );
482             }
483              
484             sub _send_to_server {
485 15     15   19413 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
486 15 50       78 return unless $self->{socket};
487 15 50       46 return unless $output;
488              
489 15 100       81 if ( ref $output eq 'ARRAY' ) {
490 2         4 my $first = shift @{ $output };
  2         6  
491 2 50       4 $self->{BUFFER} = $output if scalar @{ $output };
  2         8  
492 2 50       17 $self->{socket}->put($first) if defined $first;
493 2         275 return 1;
494             }
495              
496 13         81 $self->{socket}->put($output);
497 13         1494 return 1;
498             }
499              
500             q{Putting the test into POE};
501              
502             __END__