File Coverage

blib/lib/Test/POE/Client/TCP.pm
Criterion Covered Total %
statement 199 276 72.1
branch 61 124 49.1
condition 16 42 38.1
subroutine 30 38 78.9
pod 13 13 100.0
total 319 493 64.7


line stmt bran cond sub pod time code
1             package Test::POE::Client::TCP;
2             $Test::POE::Client::TCP::VERSION = '1.22';
3             #ABSTRACT: A POE Component providing TCP client services for test cases
4              
5 12     12   273978 use strict;
  12         42  
  12         258  
6 12     12   43 use warnings;
  12         18  
  12         296  
7 12     12   761 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         60257  
  12         75  
8 12     12   152324 use POSIX qw[ETIMEDOUT];
  12         29  
  12         83  
9 12     12   739 use Socket;
  12         25  
  12         5515  
10 12     12   67 use Carp qw(carp croak);
  12         20  
  12         942  
11              
12             our $GOT_SSL;
13              
14             BEGIN {
15 12     12   35 eval {
16 12         32761 require POE::Component::SSLify;
17 0         0 import POE::Component::SSLify qw( Client_SSLify SSLify_ContextCreate );
18 0         0 $GOT_SSL = 1;
19             };
20             }
21              
22             sub spawn {
23 12     12 1 12662 my $package = shift;
24 12         27 my %opts = @_;
25 12         146 $opts{lc $_} = delete $opts{$_} for keys %opts;
26 12         28 my $options = delete $opts{options};
27 12         17 my $autoconnect = delete $opts{autoconnect};
28 12 50 33     50 if ( $autoconnect and !( $opts{address} and $opts{port} ) ) {
      66        
29 0         0 carp "You must provide both 'address' and 'port' parameters when specifying 'autoconnect'\n";
30 0         0 return;
31             }
32 12         23 my $usessl = delete $opts{usessl};
33 12 50 33     35 if ( $usessl && !$GOT_SSL ) {
34 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
35 0         0 return;
36             }
37 12 50 33     44 delete $opts{timeout} unless $opts{timeout} and $opts{timeout} =~ m!^\d+$!;
38 12         24 my $self = bless \%opts, $package;
39 12         52 $self->{usessl} = $usessl;
40 12         23 $self->{_prefix} = delete $self->{prefix};
41 12 100       51 $self->{_prefix} = 'testc_' unless defined $self->{_prefix};
42 12 100       64 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
43 12 50       312 $self->{session_id} = POE::Session->create(
44             object_states => [
45             $self => { shutdown => '_shutdown',
46             send_event => '__send_event',
47             send_to_server => '_send_to_server',
48             disconnect => '_disconnect',
49             terminate => '_terminate',
50             connect => '_connect',
51             _timeout => '_socket_fail',
52             },
53             $self => [ qw(_start register unregister _socket_up _socket_fail _conn_input _conn_error _conn_flushed _send_to_server __send_event _disconnect) ],
54             ],
55             heap => $self,
56             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
57             args => [ $autoconnect ],
58             )->ID();
59 12         1426 return $self;
60             }
61              
62             sub session_id {
63 13     13 1 688 shift->{session_id};
64             }
65              
66             sub context {
67 0     0 1 0 my $self = shift;
68 0 0       0 return $self->{context} if defined $self->{context};
69 0         0 return;
70             }
71              
72             sub shutdown {
73 12     12 1 11355 $poe_kernel->call( shift->{session_id}, 'shutdown' );
74             }
75              
76             sub server_info {
77 2     2 1 421 my $self = shift;
78 2 50       5 return unless $self->{_server_info};
79 2         2 my @vals = @{ $self->{_server_info} };
  2         3  
80 2 100       6 return @vals if wantarray;
81 1         2 return { map { $_ => shift @vals } qw(peeraddr peerport sockaddr sockport) };
  4         7  
82             }
83              
84             sub connect {
85 0     0 1 0 $poe_kernel->call( shift->{session_id}, 'connect' );
86             }
87              
88             sub wheel {
89 1     1 1 379 my $self = shift;
90 1 50       4 return unless $self->{socket};
91 1         3 return $self->{socket};
92             }
93              
94             sub alias {
95 0     0 1 0 shift->{alias};
96             }
97              
98             sub _start {
99 12     12   4476 my ($kernel,$self,$sender,$autoconnect) = @_[KERNEL,OBJECT,SENDER,ARG0];
100 12         74 $self->{session_id} = $_[SESSION]->ID();
101              
102 12 50       85 if ( $self->{alias} ) {
103 0         0 $kernel->alias_set( $self->{alias} );
104             }
105             else {
106 12         66 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
107             }
108              
109 12 100       559 if ( $kernel != $sender ) {
110 11         35 my $sender_id = $sender->ID;
111 11         93 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
112 11         36 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
113 11         24 $self->{sessions}->{$sender_id}->{'refcnt'}++;
114 11         44 $kernel->refcount_increment($sender_id, __PACKAGE__);
115 11         314 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
116 11         1355 $kernel->detach_myself();
117             }
118              
119 12 50 66     1429 $kernel->yield( 'connect' ) if $autoconnect and $self->{address} and $self->{port};
      33        
120 12         102 return;
121             }
122              
123             sub _connect {
124 10     10   14277 my ($kernel,$self) = @_[KERNEL,OBJECT];
125 10         17 my $args;
126 10 100       31 if ( ref( $_[ARG0] ) eq 'HASH' ) {
127 9         15 $args = { %{ $_[ARG0] } };
  9         59  
128             }
129             else {
130 1         3 $args = { @_[ARG0..$#_] };
131             }
132 10         20 $args->{lc $_} = delete $args->{$_} for keys %{ $args };
  10         88  
133 10 100 66     53 unless ( $self->{address} and $self->{port} ) {
134 9 50 33     58 unless ( $args->{address} and $args->{port} ) {
135 0         0 carp "You must provide both 'address' and 'port' parameters\n";
136 0         0 return;
137             }
138 9         25 $self->{address} = $args->{address};
139 9         17 $self->{port} = $args->{port};
140             }
141 10         15 my $usessl = delete $args->{usessl};
142 10 50 33     31 if ( $usessl && !$GOT_SSL ) {
143 0         0 carp "'usessl' specified but POE::Component::SSLify is not available\n";
144 0         0 return;
145             }
146              
147 10 50       30 $self->{usessl} = $usessl if defined $usessl;
148              
149 10 50       32 $self->{localaddr} = $args->{localaddr} if $args->{localaddr};
150 10 50       51 $self->{localport} = $args->{localaddr} if $args->{localport};
151              
152 10 50       29 if ( $self->{socket} ) {
153 0         0 carp "Already connected. Disconnect and call 'connect' again\n";
154 0         0 return;
155             }
156              
157 10 50       27 if ( $self->{factory} ) {
158 0         0 carp "Connection already in progress\n";
159 0         0 return;
160             }
161              
162             $self->{factory} = POE::Wheel::SocketFactory->new(
163             RemoteAddress => $self->{address},
164             RemotePort => $self->{port},
165             ( defined $self->{address} ? ( BindAddress => $self->{localaddr} ) : () ),
166 10 50       124 ( defined $self->{port} ? ( BindPort => $self->{localport} ) : () ),
    50          
167             SuccessEvent => '_socket_up',
168             FailureEvent => '_socket_fail',
169             SocketDomain => AF_INET, # Sets the socket() domain
170             SocketType => SOCK_STREAM, # Sets the socket() type
171             SocketProtocol => 'tcp', # Sets the socket() protocol
172             Reuse => 'yes', # Lets the port be reused
173             );
174              
175 10 50       5107 $kernel->delay( '_timeout', $self->{timeout}, 'connect', ETIMEDOUT, POSIX::strerror( ETIMEDOUT ) ) if $self->{timeout};
176 10         37 return;
177             }
178              
179             sub _socket_up {
180 10     10   8910 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
181 10         141 my $sockaddr = inet_ntoa( ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[1] );
182 10         81 my $sockport = ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[0];
183 10         40 $peeraddr = inet_ntoa( $peeraddr );
184 10         48 $kernel->delay( '_timeout' );
185              
186 10         541 delete $self->{factory};
187              
188 10 0 33     217 if ( $self->{usessl} && $GOT_SSL ) {
189 0         0 eval {
190 0         0 my $ctx;
191 0 0 0     0 if ($self->{sslctx}) {
    0          
192 0         0 $ctx = $self->{sslctx};
193             }
194             elsif ($self->{sslkey} && $self->{sslcert}) {
195 0         0 $ctx = SSLify_ContextCreate($self->{sslkey}, $self->{sslcert});
196             }
197             else {
198 0         0 $ctx = undef;
199             }
200 0         0 $socket = Client_SSLify( $socket, undef, undef, $ctx );
201 0 0       0 if ( $@ ) {
202 0         0 warn "Couldn't use an SSL socket: $@\n";
203 0         0 $self->{usessl} = 0;
204             }
205             };
206             }
207              
208             $self->{socket} = POE::Wheel::ReadWrite->new(
209             Handle => $socket,
210             _get_filters(
211             $self->{filter},
212             $self->{inputfilter},
213             $self->{outputfilter}
214 10         62 ),
215             InputEvent => '_conn_input',
216             ErrorEvent => '_conn_error',
217             FlushedEvent => '_conn_flushed',
218             );
219              
220 10         2767 $self->{_server_info} = [ $peeraddr, $peerport, $sockaddr, $sockport ];
221              
222 10         58 $self->_send_event( $self->{_prefix} . 'connected', $peeraddr, $peerport, $sockaddr, $sockport );
223 10         31 return;
224             }
225              
226             sub _get_filters {
227 10     10   37 my ($client_filter, $client_infilter, $client_outfilter) = @_;
228 10 100 66     85 if (defined $client_infilter or defined $client_outfilter) {
    50          
229             return (
230 1         3 "InputFilter" => _load_filter($client_infilter),
231             "OutputFilter" => _load_filter($client_outfilter)
232             );
233 0 0       0 if (defined $client_filter) {
234 0         0 carp(
235             "Filter ignored with InputFilter or OutputFilter"
236             );
237             }
238             }
239             elsif (defined $client_filter) {
240 0         0 return ( "Filter" => _load_filter($client_filter) );
241             }
242             else {
243 9         43 return ( Filter => POE::Filter::Line->new(), );
244             }
245              
246             }
247              
248             # Get something: either arrayref, ref, or string
249             # Return filter
250             sub _load_filter {
251 2     2   26 my $filter = shift;
252 2 50       6 if (ref ($filter) eq 'ARRAY') {
    50          
253 0         0 my @args = @$filter;
254 0         0 $filter = shift @args;
255 0 0       0 if ( _test_filter($filter) ){
256 0         0 return $filter->new(@args);
257             } else {
258 0         0 return POE::Filter::Line->new(@args);
259             }
260             }
261             elsif (ref $filter) {
262 2         8 return $filter->clone();
263             }
264             else {
265 0 0       0 if ( _test_filter($filter) ) {
266 0         0 return $filter->new();
267             } else {
268 0         0 return POE::Filter::Line->new();
269             }
270             }
271             }
272              
273             # Test if a Filter can be loaded, return sucess or failure
274             sub _test_filter {
275 0     0   0 my $filter = shift;
276 0         0 my $eval = eval {
277 0         0 (my $mod = $filter) =~ s!::!/!g;
278 0         0 require "$mod.pm";
279 0         0 1;
280             };
281 0 0 0     0 if (!$eval and $@) {
282 0         0 carp(
283             "Failed to load [$filter]\n" .
284             "Reason $@\nUsing defualt POE::Filter::Line "
285             );
286 0         0 return 0;
287             }
288 0         0 return 1;
289             }
290              
291             sub _socket_fail {
292 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
293 0 0       0 carp "Wheel $wheel_id generated $operation error $errnum: $errstr\n" if $self->{debug};
294 0         0 $kernel->delay( '_timeout' );
295 0         0 delete $self->{factory};
296 0         0 $self->_send_event( $self->{_prefix} . 'socket_failed', $operation, $errnum, $errstr );
297 0         0 return;
298             }
299              
300             sub disconnect {
301 0     0 1 0 my $self = shift;
302 0         0 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
303             }
304              
305             sub _disconnect {
306 0     0   0 my ($kernel,$self) = @_[KERNEL,OBJECT];
307 0 0       0 return unless $self->{socket};
308 0         0 $self->{_quit} = 1;
309 0         0 return 1;
310             }
311              
312             sub terminate {
313 7     7 1 3802 my $self = shift;
314 7         37 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
315             }
316              
317             sub _terminate {
318 9     9   1738 my ($kernel,$self) = @_[KERNEL,OBJECT];
319 9 50       34 return unless $self->{socket};
320 9 50       58 if ( $^O =~ /(cygwin|MSWin)/ ) {
321 0         0 $self->{socket}->shutdown_input();
322 0         0 $self->{socket}->shutdown_output();
323             }
324 9         50 delete $self->{socket};
325 9         2330 delete $self->{_server_info};
326 9         39 $self->_send_event( $self->{_prefix} . 'disconnected' );
327 9         25 return 1;
328             }
329              
330             sub _conn_input {
331 18     18   19229 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
332 18         64 $self->_send_event( $self->{_prefix} . 'input', $input );
333 18         38 return;
334             }
335              
336             sub _conn_error {
337 1     1   1158 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
338 1 50       4 return unless $self->{socket};
339 1         3 delete $self->{socket};
340 1         158 delete $self->{_server_info};
341 1         4 $self->_send_event( $self->{_prefix} . 'disconnected' );
342 1         2 return;
343             }
344              
345             sub _conn_flushed {
346 18     18   6357 my ($self,$id) = @_[OBJECT,ARG0];
347 18 50       67 return unless $self->{socket};
348 18 100       41 if ( $self->{BUFFER} ) {
349 6         6 my $item = shift @{ $self->{BUFFER} };
  6         13  
350 6 100       12 unless ( $item ) {
351 2         6 delete $self->{BUFFER};
352 2         10 $self->_send_event( $self->{_prefix} . 'flushed' );
353 2         5 return;
354             }
355 4         23 $self->{socket}->put($item);
356 4         248 return;
357             }
358 12 50       25 unless ( $self->{_quit} ) {
359 12         38 $self->_send_event( $self->{_prefix} . 'flushed' );
360 12         24 return;
361             }
362 0         0 delete $self->{socket};
363 0         0 delete $self->{_server_info};
364 0         0 $self->_send_event( $self->{_prefix} . 'disconnected' );
365 0         0 return;
366             }
367              
368             sub _shutdown {
369 12     12   661 my ($kernel,$self) = @_[KERNEL,OBJECT];
370 12         24 delete $self->{factory};
371 12         21 delete $self->{socket};
372 12         30 delete $self->{_server_info};
373 12         51 $kernel->alarm_remove_all();
374 12         500 $kernel->alias_remove( $_ ) for $kernel->alias_list();
375 12 50       376 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
376 12         603 $self->_unregister_sessions();
377 12         370 return;
378             }
379              
380             sub register {
381 1     1 1 293 my ($kernel, $self, $session, $sender, @events) =
382             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
383              
384 1 50       3 unless (@events) {
385 0         0 warn "register: Not enough arguments";
386 0         0 return;
387             }
388              
389 1         3 my $sender_id = $sender->ID();
390              
391 1         4 foreach (@events) {
392 1 50       4 $_ = $self->{_prefix} . $_ unless /^_/;
393 1         4 $self->{events}->{$_}->{$sender_id} = $sender_id;
394 1         2 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
395 1 50 33     6 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
396 1         2 $kernel->refcount_increment($sender_id, __PACKAGE__);
397             }
398             }
399              
400 1         35 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
401 1         63 return;
402             }
403              
404             sub unregister {
405 2     2 1 2430 my ($kernel, $self, $session, $sender, @events) =
406             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
407              
408 2 50       6 unless (@events) {
409 0         0 warn "unregister: Not enough arguments";
410 0         0 return;
411             }
412              
413 2         5 $self->_unregister($session,$sender,@events);
414 2         4 undef;
415             }
416              
417             sub _unregister {
418 2     2   4 my ($self,$session,$sender) = splice @_,0,3;
419 2         6 my $sender_id = $sender->ID();
420              
421 2         6 foreach (@_) {
422 2 50       7 $_ = $self->{_prefix} . $_ unless /^_/;
423 2         5 my $blah = delete $self->{events}->{$_}->{$sender_id};
424 2 50       4 unless ( $blah ) {
425 0         0 warn "$sender_id hasn't registered for '$_' events\n";
426 0         0 next;
427             }
428 2 50       5 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
429 2         4 delete $self->{sessions}->{$sender_id};
430 2 50       4 unless ($session == $sender) {
431 2         6 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
432             }
433             }
434             }
435 2         93 undef;
436             }
437              
438             sub _unregister_sessions {
439 12     12   34 my $self = shift;
440 12         37 my $testd_id = $self->session_id();
441 12         19 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  12         42  
442 10 50       38 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
443 10         24 delete $self->{sessions}->{$session_id};
444 10 50       65 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
445             unless ( $session_id eq $testd_id );
446             }
447             }
448             }
449              
450             sub __send_event {
451 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
452 0         0 $self->_send_event( $event, @args );
453 0         0 return;
454             }
455              
456             #sub send_event {
457             # my $self = shift;
458             # $poe_kernel->post( $self->{session_id}, '__send_event', @_ );
459             #}
460              
461             sub _send_event {
462 52     52   78 my $self = shift;
463 52         90 my ($event, @args) = @_;
464 52         64 my $kernel = $POE::Kernel::poe_kernel;
465 52         62 my %sessions;
466              
467 52         58 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  52         126  
  52         184  
468              
469 52         156 $kernel->post( $_ => $event => @args ) for values %sessions;
470 52         4199 undef;
471             }
472              
473             sub send_to_server {
474 1     1 1 502 my $self = shift;
475 1         4 $poe_kernel->call( $self->{session_id}, '_send_to_server', @_ );
476             }
477              
478             sub _send_to_server {
479 14     14   10201 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
480 14 50       50 return unless $self->{socket};
481 14 50       30 return unless $output;
482              
483 14 100       41 if ( ref $output eq 'ARRAY' ) {
484 2         3 my $first = shift @{ $output };
  2         5  
485 2 50       2 $self->{BUFFER} = $output if scalar @{ $output };
  2         12  
486 2 50       15 $self->{socket}->put($first) if defined $first;
487 2         165 return 1;
488             }
489              
490 12         51 $self->{socket}->put($output);
491 12         827 return 1;
492             }
493              
494             q{Putting the test into POE};
495              
496             __END__