File Coverage

blib/lib/Test/POE/Client/TCP.pm
Criterion Covered Total %
statement 199 276 72.1
branch 61 124 49.1
condition 16 42 38.1
subroutine 30 38 78.9
pod 13 13 100.0
total 319 493 64.7


line stmt bran cond sub pod time code
1             package Test::POE::Client::TCP;
2             $Test::POE::Client::TCP::VERSION = '1.20';
3             #ABSTRACT: A POE Component providing TCP client services for test cases
4              
5 12     12   312260 use strict;
  12         37  
  12         306  
6 12     12   54 use warnings;
  12         22  
  12         314  
7 12     12   787 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         57106  
  12         74  
8 12     12   143379 use POSIX qw[ETIMEDOUT];
  12         36  
  12         76  
9 12     12   753 use Socket;
  12         22  
  12         5604  
10 12     12   70 use Carp qw(carp croak);
  12         32  
  12         913  
11              
12             our $GOT_SSL;
13              
14             BEGIN {
15 12     12   42 eval {
16 12         34820 require POE::Component::SSLify;
17 0         0 import POE::Component::SSLify qw( Client_SSLify );
18 0         0 $GOT_SSL = 1;
19             };
20             }
21              
22             sub spawn {
23 12     12 1 12089 my $package = shift;
24 12         30 my %opts = @_;
25 12         119 $opts{lc $_} = delete $opts{$_} for keys %opts;
26 12         21 my $options = delete $opts{options};
27 12         21 my $autoconnect = delete $opts{autoconnect};
28 12 50 33     43 if ( $autoconnect and !( $opts{address} and $opts{port} ) ) {
      66        
29 0         0 carp "You must provide both 'address' and 'port' parameters when specifying 'autoconnect'\n";
30 0         0 return;
31             }
32 12         23 my $usessl = delete $opts{usessl};
33 12 50 33     31 if ( $usessl && !$GOT_SSL ) {
34 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
35 0         0 return;
36             }
37 12 50 33     38 delete $opts{timeout} unless $opts{timeout} and $opts{timeout} =~ m!^\d+$!;
38 12         22 my $self = bless \%opts, $package;
39 12         40 $self->{usessl} = $usessl;
40 12         22 $self->{_prefix} = delete $self->{prefix};
41 12 100       49 $self->{_prefix} = 'testc_' unless defined $self->{_prefix};
42 12 100       63 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
43 12 50       259 $self->{session_id} = POE::Session->create(
44             object_states => [
45             $self => { shutdown => '_shutdown',
46             send_event => '__send_event',
47             send_to_server => '_send_to_server',
48             disconnect => '_disconnect',
49             terminate => '_terminate',
50             connect => '_connect',
51             _timeout => '_socket_fail',
52             },
53             $self => [ qw(_start register unregister _socket_up _socket_fail _conn_input _conn_error _conn_flushed _send_to_server __send_event _disconnect) ],
54             ],
55             heap => $self,
56             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
57             args => [ $autoconnect ],
58             )->ID();
59 12         1378 return $self;
60             }
61              
62             sub session_id {
63 13     13 1 711 shift->{session_id};
64             }
65              
66             sub context {
67 0     0 1 0 my $self = shift;
68 0 0       0 return $self->{context} if defined $self->{context};
69 0         0 return;
70             }
71              
72             sub shutdown {
73 12     12 1 10540 $poe_kernel->call( shift->{session_id}, 'shutdown' );
74             }
75              
76             sub server_info {
77 2     2 1 336 my $self = shift;
78 2 50       6 return unless $self->{_server_info};
79 2         2 my @vals = @{ $self->{_server_info} };
  2         3  
80 2 100       5 return @vals if wantarray;
81 1         2 return { map { $_ => shift @vals } qw(peeraddr peerport sockaddr sockport) };
  4         10  
82             }
83              
84             sub connect {
85 0     0 1 0 $poe_kernel->call( shift->{session_id}, 'connect' );
86             }
87              
88             sub wheel {
89 1     1 1 346 my $self = shift;
90 1 50       9 return unless $self->{socket};
91 1         4 return $self->{socket};
92             }
93              
94             sub alias {
95 0     0 1 0 shift->{alias};
96             }
97              
98             sub _start {
99 12     12   3818 my ($kernel,$self,$sender,$autoconnect) = @_[KERNEL,OBJECT,SENDER,ARG0];
100 12         96 $self->{session_id} = $_[SESSION]->ID();
101              
102 12 50       69 if ( $self->{alias} ) {
103 0         0 $kernel->alias_set( $self->{alias} );
104             }
105             else {
106 12         61 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
107             }
108              
109 12 100       460 if ( $kernel != $sender ) {
110 11         33 my $sender_id = $sender->ID;
111 11         75 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
112 11         35 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
113 11         25 $self->{sessions}->{$sender_id}->{'refcnt'}++;
114 11         43 $kernel->refcount_increment($sender_id, __PACKAGE__);
115 11         291 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
116 11         1262 $kernel->detach_myself();
117             }
118              
119 12 50 66     1355 $kernel->yield( 'connect' ) if $autoconnect and $self->{address} and $self->{port};
      33        
120 12         96 return;
121             }
122              
123             sub _connect {
124 10     10   13346 my ($kernel,$self) = @_[KERNEL,OBJECT];
125 10         14 my $args;
126 10 100       30 if ( ref( $_[ARG0] ) eq 'HASH' ) {
127 9         13 $args = { %{ $_[ARG0] } };
  9         37  
128             }
129             else {
130 1         2 $args = { @_[ARG0..$#_] };
131             }
132 10         18 $args->{lc $_} = delete $args->{$_} for keys %{ $args };
  10         48  
133 10 100 66     48 unless ( $self->{address} and $self->{port} ) {
134 9 50 33     62 unless ( $args->{address} and $args->{port} ) {
135 0         0 carp "You must provide both 'address' and 'port' parameters\n";
136 0         0 return;
137             }
138 9         19 $self->{address} = $args->{address};
139 9         15 $self->{port} = $args->{port};
140             }
141 10         18 my $usessl = delete $args->{usessl};
142 10 50 33     47 if ( $usessl && !$GOT_SSL ) {
143 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
144 0         0 return;
145             }
146              
147 10 50       28 $self->{usessl} = $usessl if defined $usessl;
148              
149 10 50       30 $self->{localaddr} = $args->{localaddr} if $args->{localaddr};
150 10 50       29 $self->{localport} = $args->{localaddr} if $args->{localport};
151              
152 10 50       26 if ( $self->{socket} ) {
153 0         0 carp "Already connected. Disconnect and call 'connect' again\n";
154 0         0 return;
155             }
156              
157 10 50       31 if ( $self->{factory} ) {
158 0         0 carp "Connection already in progress\n";
159 0         0 return;
160             }
161              
162             $self->{factory} = POE::Wheel::SocketFactory->new(
163             RemoteAddress => $self->{address},
164             RemotePort => $self->{port},
165             ( defined $self->{address} ? ( BindAddress => $self->{localaddr} ) : () ),
166 10 50       125 ( defined $self->{port} ? ( BindPort => $self->{localport} ) : () ),
    50          
167             SuccessEvent => '_socket_up',
168             FailureEvent => '_socket_fail',
169             SocketDomain => AF_INET, # Sets the socket() domain
170             SocketType => SOCK_STREAM, # Sets the socket() type
171             SocketProtocol => 'tcp', # Sets the socket() protocol
172             Reuse => 'yes', # Lets the port be reused
173             );
174              
175 10 50       4820 $kernel->delay( '_timeout', $self->{timeout}, 'connect', ETIMEDOUT, POSIX::strerror( ETIMEDOUT ) ) if $self->{timeout};
176 10         34 return;
177             }
178              
179             sub _socket_up {
180 10     10   8930 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
181 10         140 my $sockaddr = inet_ntoa( ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[1] );
182 10         97 my $sockport = ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[0];
183 10         42 $peeraddr = inet_ntoa( $peeraddr );
184 10         59 $kernel->delay( '_timeout' );
185              
186 10         605 delete $self->{factory};
187              
188 10 0 33     234 if ( $self->{usessl} && $GOT_SSL ) {
189 0         0 eval {
190 0         0 my $ctx;
191 0 0 0     0 if ($self->{sslctx}) {
    0          
192 0         0 $ctx = $self->{sslctx};
193             }
194             elsif ($self->{sslkey} && $self->{sslcert}) {
195 0         0 $ctx = SSLify_ContextCreate($self->{sslkey}, $self->{sslcert});
196             }
197             else {
198 0         0 $ctx = undef;
199             }
200 0         0 $socket = Client_SSLify( $socket, undef, undef, $ctx );
201 0 0       0 if ( $@ ) {
202 0         0 warn "Couldn't use an SSL socket: $@\n";
203 0         0 $self->{usessl} = 0;
204             }
205             };
206             }
207              
208             $self->{socket} = POE::Wheel::ReadWrite->new(
209             Handle => $socket,
210             _get_filters(
211             $self->{filter},
212             $self->{inputfilter},
213             $self->{outputfilter}
214 10         51 ),
215             InputEvent => '_conn_input',
216             ErrorEvent => '_conn_error',
217             FlushedEvent => '_conn_flushed',
218             );
219              
220 10         2810 $self->{_server_info} = [ $peeraddr, $peerport, $sockaddr, $sockport ];
221              
222 10         67 $self->_send_event( $self->{_prefix} . 'connected', $peeraddr, $peerport, $sockaddr, $sockport );
223 10         33 return;
224             }
225              
226             sub _get_filters {
227 10     10   45 my ($client_filter, $client_infilter, $client_outfilter) = @_;
228 10 100 66     103 if (defined $client_infilter or defined $client_outfilter) {
    50          
229             return (
230 1         2 "InputFilter" => _load_filter($client_infilter),
231             "OutputFilter" => _load_filter($client_outfilter)
232             );
233 0 0       0 if (defined $client_filter) {
234 0         0 carp(
235             "Filter ignored with InputFilter or OutputFilter"
236             );
237             }
238             }
239             elsif (defined $client_filter) {
240 0         0 return ( "Filter" => _load_filter($client_filter) );
241             }
242             else {
243 9         42 return ( Filter => POE::Filter::Line->new(), );
244             }
245              
246             }
247              
248             # Get something: either arrayref, ref, or string
249             # Return filter
250             sub _load_filter {
251 2     2   29 my $filter = shift;
252 2 50       5 if (ref ($filter) eq 'ARRAY') {
    50          
253 0         0 my @args = @$filter;
254 0         0 $filter = shift @args;
255 0 0       0 if ( _test_filter($filter) ){
256 0         0 return $filter->new(@args);
257             } else {
258 0         0 return POE::Filter::Line->new(@args);
259             }
260             }
261             elsif (ref $filter) {
262 2         8 return $filter->clone();
263             }
264             else {
265 0 0       0 if ( _test_filter($filter) ) {
266 0         0 return $filter->new();
267             } else {
268 0         0 return POE::Filter::Line->new();
269             }
270             }
271             }
272              
273             # Test if a Filter can be loaded, return sucess or failure
274             sub _test_filter {
275 0     0   0 my $filter = shift;
276 0         0 my $eval = eval {
277 0         0 (my $mod = $filter) =~ s!::!/!g;
278 0         0 require "$mod.pm";
279 0         0 1;
280             };
281 0 0 0     0 if (!$eval and $@) {
282 0         0 carp(
283             "Failed to load [$filter]\n" .
284             "Reason $@\nUsing defualt POE::Filter::Line "
285             );
286 0         0 return 0;
287             }
288 0         0 return 1;
289             }
290              
291             sub _socket_fail {
292 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
293 0 0       0 carp "Wheel $wheel_id generated $operation error $errnum: $errstr\n" if $self->{debug};
294 0         0 $kernel->delay( '_timeout' );
295 0         0 delete $self->{factory};
296 0         0 $self->_send_event( $self->{_prefix} . 'socket_failed', $operation, $errnum, $errstr );
297 0         0 return;
298             }
299              
300             sub disconnect {
301 0     0 1 0 my $self = shift;
302 0         0 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
303             }
304              
305             sub _disconnect {
306 0     0   0 my ($kernel,$self) = @_[KERNEL,OBJECT];
307 0 0       0 return unless $self->{socket};
308 0         0 $self->{_quit} = 1;
309 0         0 return 1;
310             }
311              
312             sub terminate {
313 7     7 1 3814 my $self = shift;
314 7         33 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
315             }
316              
317             sub _terminate {
318 9     9   1786 my ($kernel,$self) = @_[KERNEL,OBJECT];
319 9 50       40 return unless $self->{socket};
320 9 50       68 if ( $^O =~ /(cygwin|MSWin)/ ) {
321 0         0 $self->{socket}->shutdown_input();
322 0         0 $self->{socket}->shutdown_output();
323             }
324 9         41 delete $self->{socket};
325 9         2201 delete $self->{_server_info};
326 9         40 $self->_send_event( $self->{_prefix} . 'disconnected' );
327 9         24 return 1;
328             }
329              
330             sub _conn_input {
331 18     18   19906 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
332 18         75 $self->_send_event( $self->{_prefix} . 'input', $input );
333 18         35 return;
334             }
335              
336             sub _conn_error {
337 1     1   1140 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
338 1 50       3 return unless $self->{socket};
339 1         3 delete $self->{socket};
340 1         190 delete $self->{_server_info};
341 1         5 $self->_send_event( $self->{_prefix} . 'disconnected' );
342 1         2 return;
343             }
344              
345             sub _conn_flushed {
346 18     18   6326 my ($self,$id) = @_[OBJECT,ARG0];
347 18 50       58 return unless $self->{socket};
348 18 100       42 if ( $self->{BUFFER} ) {
349 6         4 my $item = shift @{ $self->{BUFFER} };
  6         10  
350 6 100       10 unless ( $item ) {
351 2         15 delete $self->{BUFFER};
352 2         7 $self->_send_event( $self->{_prefix} . 'flushed' );
353 2         5 return;
354             }
355 4         18 $self->{socket}->put($item);
356 4         199 return;
357             }
358 12 50       29 unless ( $self->{_quit} ) {
359 12         50 $self->_send_event( $self->{_prefix} . 'flushed' );
360 12         30 return;
361             }
362 0         0 delete $self->{socket};
363 0         0 delete $self->{_server_info};
364 0         0 $self->_send_event( $self->{_prefix} . 'disconnected' );
365 0         0 return;
366             }
367              
368             sub _shutdown {
369 12     12   660 my ($kernel,$self) = @_[KERNEL,OBJECT];
370 12         20 delete $self->{factory};
371 12         21 delete $self->{socket};
372 12         21 delete $self->{_server_info};
373 12         48 $kernel->alarm_remove_all();
374 12         460 $kernel->alias_remove( $_ ) for $kernel->alias_list();
375 12 50       360 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
376 12         551 $self->_unregister_sessions();
377 12         319 return;
378             }
379              
380             sub register {
381 1     1 1 328 my ($kernel, $self, $session, $sender, @events) =
382             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
383              
384 1 50       3 unless (@events) {
385 0         0 warn "register: Not enough arguments";
386 0         0 return;
387             }
388              
389 1         2 my $sender_id = $sender->ID();
390              
391 1         4 foreach (@events) {
392 1 50       4 $_ = $self->{_prefix} . $_ unless /^_/;
393 1         2 $self->{events}->{$_}->{$sender_id} = $sender_id;
394 1         2 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
395 1 50 33     6 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
396 1         3 $kernel->refcount_increment($sender_id, __PACKAGE__);
397             }
398             }
399              
400 1         42 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
401 1         66 return;
402             }
403              
404             sub unregister {
405 2     2 1 2395 my ($kernel, $self, $session, $sender, @events) =
406             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
407              
408 2 50       7 unless (@events) {
409 0         0 warn "unregister: Not enough arguments";
410 0         0 return;
411             }
412              
413 2         6 $self->_unregister($session,$sender,@events);
414 2         4 undef;
415             }
416              
417             sub _unregister {
418 2     2   4 my ($self,$session,$sender) = splice @_,0,3;
419 2         5 my $sender_id = $sender->ID();
420              
421 2         6 foreach (@_) {
422 2 50       9 $_ = $self->{_prefix} . $_ unless /^_/;
423 2         5 my $blah = delete $self->{events}->{$_}->{$sender_id};
424 2 50       4 unless ( $blah ) {
425 0         0 warn "$sender_id hasn't registered for '$_' events\n";
426 0         0 next;
427             }
428 2 50       15 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
429 2         5 delete $self->{sessions}->{$sender_id};
430 2 50       5 unless ($session == $sender) {
431 2         15 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
432             }
433             }
434             }
435 2         103 undef;
436             }
437              
438             sub _unregister_sessions {
439 12     12   22 my $self = shift;
440 12         30 my $testd_id = $self->session_id();
441 12         26 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  12         40  
442 10 50       35 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
443 10         20 delete $self->{sessions}->{$session_id};
444 10 50       65 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
445             unless ( $session_id eq $testd_id );
446             }
447             }
448             }
449              
450             sub __send_event {
451 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
452 0         0 $self->_send_event( $event, @args );
453 0         0 return;
454             }
455              
456             #sub send_event {
457             # my $self = shift;
458             # $poe_kernel->post( $self->{session_id}, '__send_event', @_ );
459             #}
460              
461             sub _send_event {
462 52     52   82 my $self = shift;
463 52         113 my ($event, @args) = @_;
464 52         67 my $kernel = $POE::Kernel::poe_kernel;
465 52         82 my %sessions;
466              
467 52         65 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  52         157  
  52         204  
468              
469 52         181 $kernel->post( $_ => $event => @args ) for values %sessions;
470 52         4413 undef;
471             }
472              
473             sub send_to_server {
474 1     1 1 508 my $self = shift;
475 1         4 $poe_kernel->call( $self->{session_id}, '_send_to_server', @_ );
476             }
477              
478             sub _send_to_server {
479 14     14   9765 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
480 14 50       51 return unless $self->{socket};
481 14 50       71 return unless $output;
482              
483 14 100       44 if ( ref $output eq 'ARRAY' ) {
484 2         2 my $first = shift @{ $output };
  2         4  
485 2 50       3 $self->{BUFFER} = $output if scalar @{ $output };
  2         5  
486 2 50       12 $self->{socket}->put($first) if defined $first;
487 2         167 return 1;
488             }
489              
490 12         46 $self->{socket}->put($output);
491 12         877 return 1;
492             }
493              
494             q{Putting the test into POE};
495              
496             __END__