File Coverage

blib/lib/Test/POE/Client/TCP.pm
Criterion Covered Total %
statement 199 271 73.4
branch 61 120 50.8
condition 16 39 41.0
subroutine 30 38 78.9
pod 13 13 100.0
total 319 481 66.3


line stmt bran cond sub pod time code
1             package Test::POE::Client::TCP;
2             $Test::POE::Client::TCP::VERSION = '1.18';
3             #ABSTRACT: A POE Component providing TCP client services for test cases
4              
5 12     12   365290 use strict;
  12         46  
  12         327  
6 12     12   60 use warnings;
  12         25  
  12         378  
7 12     12   909 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         77751  
  12         90  
8 12     12   177819 use POSIX qw[ETIMEDOUT];
  12         36  
  12         103  
9 12     12   959 use Socket;
  12         36  
  12         7056  
10 12     12   95 use Carp qw(carp croak);
  12         25  
  12         1165  
11              
12             our $GOT_SSL;
13              
14             BEGIN {
15 12     12   43 eval {
16 12         41062 require POE::Component::SSLify;
17 0         0 import POE::Component::SSLify qw( Client_SSLify );
18 0         0 $GOT_SSL = 1;
19             };
20             }
21              
22             sub spawn {
23 12     12 1 14685 my $package = shift;
24 12         40 my %opts = @_;
25 12         174 $opts{lc $_} = delete $opts{$_} for keys %opts;
26 12         31 my $options = delete $opts{options};
27 12         28 my $autoconnect = delete $opts{autoconnect};
28 12 50 33     61 if ( $autoconnect and !( $opts{address} and $opts{port} ) ) {
      66        
29 0         0 carp "You must provide both 'address' and 'port' parameters when specifying 'autoconnect'\n";
30 0         0 return;
31             }
32 12         25 my $usessl = delete $opts{usessl};
33 12 50 33     46 if ( $usessl && !$GOT_SSL ) {
34 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
35 0         0 return;
36             }
37 12 50 33     53 delete $opts{timeout} unless $opts{timeout} and $opts{timeout} =~ m!^\d+$!;
38 12         33 my $self = bless \%opts, $package;
39 12         64 $self->{usessl} = $usessl;
40 12         31 $self->{_prefix} = delete $self->{prefix};
41 12 100       65 $self->{_prefix} = 'testc_' unless defined $self->{_prefix};
42 12 100       73 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
43 12 50       343 $self->{session_id} = POE::Session->create(
44             object_states => [
45             $self => { shutdown => '_shutdown',
46             send_event => '__send_event',
47             send_to_server => '_send_to_server',
48             disconnect => '_disconnect',
49             terminate => '_terminate',
50             connect => '_connect',
51             _timeout => '_socket_fail',
52             },
53             $self => [ qw(_start register unregister _socket_up _socket_fail _conn_input _conn_error _conn_flushed _send_to_server __send_event _disconnect) ],
54             ],
55             heap => $self,
56             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
57             args => [ $autoconnect ],
58             )->ID();
59 12         1704 return $self;
60             }
61              
62             sub session_id {
63 13     13 1 993 shift->{session_id};
64             }
65              
66             sub context {
67 0     0 1 0 my $self = shift;
68 0 0       0 return $self->{context} if defined $self->{context};
69 0         0 return;
70             }
71              
72             sub shutdown {
73 12     12 1 12443 $poe_kernel->call( shift->{session_id}, 'shutdown' );
74             }
75              
76             sub server_info {
77 2     2 1 661 my $self = shift;
78 2 50       9 return unless $self->{_server_info};
79 2         4 my @vals = @{ $self->{_server_info} };
  2         8  
80 2 100       10 return @vals if wantarray;
81 1         5 return { map { $_ => shift @vals } qw(peeraddr peerport sockaddr sockport) };
  4         15  
82             }
83              
84             sub connect {
85 0     0 1 0 $poe_kernel->call( shift->{session_id}, 'connect' );
86             }
87              
88             sub wheel {
89 1     1 1 498 my $self = shift;
90 1 50       6 return unless $self->{socket};
91 1         7 return $self->{socket};
92             }
93              
94             sub alias {
95 0     0 1 0 shift->{alias};
96             }
97              
98             sub _start {
99 12     12   4851 my ($kernel,$self,$sender,$autoconnect) = @_[KERNEL,OBJECT,SENDER,ARG0];
100 12         77 $self->{session_id} = $_[SESSION]->ID();
101              
102 12 50       83 if ( $self->{alias} ) {
103 0         0 $kernel->alias_set( $self->{alias} );
104             }
105             else {
106 12         58 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
107             }
108              
109 12 100       569 if ( $kernel != $sender ) {
110 11         38 my $sender_id = $sender->ID;
111 11         83 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
112 11         37 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
113 11         40 $self->{sessions}->{$sender_id}->{'refcnt'}++;
114 11         47 $kernel->refcount_increment($sender_id, __PACKAGE__);
115 11         332 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
116 11         1470 $kernel->detach_myself();
117             }
118              
119 12 50 66     1640 $kernel->yield( 'connect' ) if $autoconnect and $self->{address} and $self->{port};
      33        
120 12         112 return;
121             }
122              
123             sub _connect {
124 10     10   16541 my ($kernel,$self) = @_[KERNEL,OBJECT];
125 10         20 my $args;
126 10 100       40 if ( ref( $_[ARG0] ) eq 'HASH' ) {
127 9         19 $args = { %{ $_[ARG0] } };
  9         41  
128             }
129             else {
130 1         2 $args = { @_[ARG0..$#_] };
131             }
132 10         23 $args->{lc $_} = delete $args->{$_} for keys %{ $args };
  10         63  
133 10 100 66     60 unless ( $self->{address} and $self->{port} ) {
134 9 50 33     79 unless ( $args->{address} and $args->{port} ) {
135 0         0 carp "You must provide both 'address' and 'port' parameters\n";
136 0         0 return;
137             }
138 9         36 $self->{address} = $args->{address};
139 9         23 $self->{port} = $args->{port};
140             }
141 10         20 my $usessl = delete $args->{usessl};
142 10 50 33     61 if ( $usessl && !$GOT_SSL ) {
143 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
144 0         0 return;
145             }
146              
147 10 50       32 $self->{usessl} = $usessl if defined $usessl;
148              
149 10 50       28 $self->{localaddr} = $args->{localaddr} if $args->{localaddr};
150 10 50       38 $self->{localport} = $args->{localaddr} if $args->{localport};
151              
152 10 50       41 if ( $self->{socket} ) {
153 0         0 carp "Already connected. Disconnect and call 'connect' again\n";
154 0         0 return;
155             }
156              
157 10 50       33 if ( $self->{factory} ) {
158 0         0 carp "Connection already in progress\n";
159 0         0 return;
160             }
161              
162             $self->{factory} = POE::Wheel::SocketFactory->new(
163             RemoteAddress => $self->{address},
164             RemotePort => $self->{port},
165             ( defined $self->{address} ? ( BindAddress => $self->{localaddr} ) : () ),
166 10 50       157 ( defined $self->{port} ? ( BindPort => $self->{localport} ) : () ),
    50          
167             SuccessEvent => '_socket_up',
168             FailureEvent => '_socket_fail',
169             SocketDomain => AF_INET, # Sets the socket() domain
170             SocketType => SOCK_STREAM, # Sets the socket() type
171             SocketProtocol => 'tcp', # Sets the socket() protocol
172             Reuse => 'yes', # Lets the port be reused
173             );
174              
175 10 50       5775 $kernel->delay( '_timeout', $self->{timeout}, 'connect', ETIMEDOUT, POSIX::strerror( ETIMEDOUT ) ) if $self->{timeout};
176 10         47 return;
177             }
178              
179             sub _socket_up {
180 10     10   10144 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
181 10         170 my $sockaddr = inet_ntoa( ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[1] );
182 10         100 my $sockport = ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[0];
183 10         58 $peeraddr = inet_ntoa( $peeraddr );
184 10         54 $kernel->delay( '_timeout' );
185              
186 10         634 delete $self->{factory};
187              
188 10 0 33     267 if ( $self->{usessl} && $GOT_SSL ) {
189 0         0 eval {
190 0         0 $socket = Client_SSLify( $socket );
191 0 0       0 if ( $@ ) {
192 0         0 warn "Couldn't use an SSL socket: $@\n";
193 0         0 $self->{usessl} = 0;
194             }
195             };
196             }
197              
198             $self->{socket} = POE::Wheel::ReadWrite->new(
199             Handle => $socket,
200             _get_filters(
201             $self->{filter},
202             $self->{inputfilter},
203             $self->{outputfilter}
204 10         66 ),
205             InputEvent => '_conn_input',
206             ErrorEvent => '_conn_error',
207             FlushedEvent => '_conn_flushed',
208             );
209              
210 10         3108 $self->{_server_info} = [ $peeraddr, $peerport, $sockaddr, $sockport ];
211              
212 10         76 $self->_send_event( $self->{_prefix} . 'connected', $peeraddr, $peerport, $sockaddr, $sockport );
213 10         31 return;
214             }
215              
216             sub _get_filters {
217 10     10   45 my ($client_filter, $client_infilter, $client_outfilter) = @_;
218 10 100 66     84 if (defined $client_infilter or defined $client_outfilter) {
    50          
219             return (
220 1         4 "InputFilter" => _load_filter($client_infilter),
221             "OutputFilter" => _load_filter($client_outfilter)
222             );
223 0 0       0 if (defined $client_filter) {
224 0         0 carp(
225             "Filter ignored with InputFilter or OutputFilter"
226             );
227             }
228             }
229             elsif (defined $client_filter) {
230 0         0 return ( "Filter" => _load_filter($client_filter) );
231             }
232             else {
233 9         58 return ( Filter => POE::Filter::Line->new(), );
234             }
235              
236             }
237              
238             # Get something: either arrayref, ref, or string
239             # Return filter
240             sub _load_filter {
241 2     2   28 my $filter = shift;
242 2 50       7 if (ref ($filter) eq 'ARRAY') {
    50          
243 0         0 my @args = @$filter;
244 0         0 $filter = shift @args;
245 0 0       0 if ( _test_filter($filter) ){
246 0         0 return $filter->new(@args);
247             } else {
248 0         0 return POE::Filter::Line->new(@args);
249             }
250             }
251             elsif (ref $filter) {
252 2         11 return $filter->clone();
253             }
254             else {
255 0 0       0 if ( _test_filter($filter) ) {
256 0         0 return $filter->new();
257             } else {
258 0         0 return POE::Filter::Line->new();
259             }
260             }
261             }
262              
263             # Test if a Filter can be loaded, return sucess or failure
264             sub _test_filter {
265 0     0   0 my $filter = shift;
266 0         0 my $eval = eval {
267 0         0 (my $mod = $filter) =~ s!::!/!g;
268 0         0 require "$mod.pm";
269 0         0 1;
270             };
271 0 0 0     0 if (!$eval and $@) {
272 0         0 carp(
273             "Failed to load [$filter]\n" .
274             "Reason $@\nUsing defualt POE::Filter::Line "
275             );
276 0         0 return 0;
277             }
278 0         0 return 1;
279             }
280              
281             sub _socket_fail {
282 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
283 0 0       0 carp "Wheel $wheel_id generated $operation error $errnum: $errstr\n" if $self->{debug};
284 0         0 $kernel->delay( '_timeout' );
285 0         0 delete $self->{factory};
286 0         0 $self->_send_event( $self->{_prefix} . 'socket_failed', $operation, $errnum, $errstr );
287 0         0 return;
288             }
289              
290             sub disconnect {
291 0     0 1 0 my $self = shift;
292 0         0 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
293             }
294              
295             sub _disconnect {
296 0     0   0 my ($kernel,$self) = @_[KERNEL,OBJECT];
297 0 0       0 return unless $self->{socket};
298 0         0 $self->{_quit} = 1;
299 0         0 return 1;
300             }
301              
302             sub terminate {
303 7     7 1 4020 my $self = shift;
304 7         41 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
305             }
306              
307             sub _terminate {
308 9     9   2073 my ($kernel,$self) = @_[KERNEL,OBJECT];
309 9 50       69 return unless $self->{socket};
310 9 50       78 if ( $^O =~ /(cygwin|MSWin)/ ) {
311 0         0 $self->{socket}->shutdown_input();
312 0         0 $self->{socket}->shutdown_output();
313             }
314 9         58 delete $self->{socket};
315 9         2945 delete $self->{_server_info};
316 9         50 $self->_send_event( $self->{_prefix} . 'disconnected' );
317 9         28 return 1;
318             }
319              
320             sub _conn_input {
321 18     18   21291 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
322 18         82 $self->_send_event( $self->{_prefix} . 'input', $input );
323 18         48 return;
324             }
325              
326             sub _conn_error {
327 1     1   1143 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
328 1 50       4 return unless $self->{socket};
329 1         4 delete $self->{socket};
330 1         165 delete $self->{_server_info};
331 1         4 $self->_send_event( $self->{_prefix} . 'disconnected' );
332 1         2 return;
333             }
334              
335             sub _conn_flushed {
336 18     18   7600 my ($self,$id) = @_[OBJECT,ARG0];
337 18 50       64 return unless $self->{socket};
338 18 100       47 if ( $self->{BUFFER} ) {
339 6         10 my $item = shift @{ $self->{BUFFER} };
  6         13  
340 6 100       23 unless ( $item ) {
341 2         7 delete $self->{BUFFER};
342 2         14 $self->_send_event( $self->{_prefix} . 'flushed' );
343 2         7 return;
344             }
345 4         16 $self->{socket}->put($item);
346 4         274 return;
347             }
348 12 50       34 unless ( $self->{_quit} ) {
349 12         50 $self->_send_event( $self->{_prefix} . 'flushed' );
350 12         33 return;
351             }
352 0         0 delete $self->{socket};
353 0         0 delete $self->{_server_info};
354 0         0 $self->_send_event( $self->{_prefix} . 'disconnected' );
355 0         0 return;
356             }
357              
358             sub _shutdown {
359 12     12   776 my ($kernel,$self) = @_[KERNEL,OBJECT];
360 12         30 delete $self->{factory};
361 12         27 delete $self->{socket};
362 12         24 delete $self->{_server_info};
363 12         61 $kernel->alarm_remove_all();
364 12         600 $kernel->alias_remove( $_ ) for $kernel->alias_list();
365 12 50       445 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
366 12         681 $self->_unregister_sessions();
367 12         378 return;
368             }
369              
370             sub register {
371 1     1 1 429 my ($kernel, $self, $session, $sender, @events) =
372             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
373              
374 1 50       3 unless (@events) {
375 0         0 warn "register: Not enough arguments";
376 0         0 return;
377             }
378              
379 1         4 my $sender_id = $sender->ID();
380              
381 1         5 foreach (@events) {
382 1 50       5 $_ = $self->{_prefix} . $_ unless /^_/;
383 1         4 $self->{events}->{$_}->{$sender_id} = $sender_id;
384 1         4 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
385 1 50 33     7 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
386 1         4 $kernel->refcount_increment($sender_id, __PACKAGE__);
387             }
388             }
389              
390 1         58 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
391 1         101 return;
392             }
393              
394             sub unregister {
395 2     2 1 3050 my ($kernel, $self, $session, $sender, @events) =
396             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
397              
398 2 50       11 unless (@events) {
399 0         0 warn "unregister: Not enough arguments";
400 0         0 return;
401             }
402              
403 2         11 $self->_unregister($session,$sender,@events);
404 2         6 undef;
405             }
406              
407             sub _unregister {
408 2     2   7 my ($self,$session,$sender) = splice @_,0,3;
409 2         9 my $sender_id = $sender->ID();
410              
411 2         13 foreach (@_) {
412 2 50       13 $_ = $self->{_prefix} . $_ unless /^_/;
413 2         8 my $blah = delete $self->{events}->{$_}->{$sender_id};
414 2 50       7 unless ( $blah ) {
415 0         0 warn "$sender_id hasn't registered for '$_' events\n";
416 0         0 next;
417             }
418 2 50       10 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
419 2         6 delete $self->{sessions}->{$sender_id};
420 2 50       11 unless ($session == $sender) {
421 2         10 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
422             }
423             }
424             }
425 2         174 undef;
426             }
427              
428             sub _unregister_sessions {
429 12     12   26 my $self = shift;
430 12         49 my $testd_id = $self->session_id();
431 12         35 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  12         51  
432 10 50       49 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
433 10         26 delete $self->{sessions}->{$session_id};
434 10 50       95 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
435             unless ( $session_id eq $testd_id );
436             }
437             }
438             }
439              
440             sub __send_event {
441 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
442 0         0 $self->_send_event( $event, @args );
443 0         0 return;
444             }
445              
446             #sub send_event {
447             # my $self = shift;
448             # $poe_kernel->post( $self->{session_id}, '__send_event', @_ );
449             #}
450              
451             sub _send_event {
452 52     52   86 my $self = shift;
453 52         111 my ($event, @args) = @_;
454 52         85 my $kernel = $POE::Kernel::poe_kernel;
455 52         72 my %sessions;
456              
457 52         74 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  52         158  
  52         246  
458              
459 52         196 $kernel->post( $_ => $event => @args ) for values %sessions;
460 52         4790 undef;
461             }
462              
463             sub send_to_server {
464 1     1 1 581 my $self = shift;
465 1         5 $poe_kernel->call( $self->{session_id}, '_send_to_server', @_ );
466             }
467              
468             sub _send_to_server {
469 14     14   11272 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
470 14 50       85 return unless $self->{socket};
471 14 50       50 return unless $output;
472              
473 14 100       45 if ( ref $output eq 'ARRAY' ) {
474 2         4 my $first = shift @{ $output };
  2         6  
475 2 50       4 $self->{BUFFER} = $output if scalar @{ $output };
  2         10  
476 2 50       27 $self->{socket}->put($first) if defined $first;
477 2         221 return 1;
478             }
479              
480 12         56 $self->{socket}->put($output);
481 12         1037 return 1;
482             }
483              
484             q{Putting the test into POE};
485              
486             __END__