File Coverage

lib/AnyEvent/Connection.pm
Criterion Covered Total %
statement 91 128 71.0
branch 17 40 42.5
condition 7 12 58.3
subroutine 21 31 67.7
pod 7 9 77.7
total 143 220 65.0


line stmt bran cond sub pod time code
1             package AnyEvent::Connection;
2              
3 2     2   94366 use common::sense 2;m{
  2         61  
  2         19  
4             use strict;
5             use warnings;
6             };
7 2     2   2829 use Object::Event 1.21;
  2         162032  
  2         82  
8 2     2   29 use base 'Object::Event';
  2         10  
  2         218  
9              
10 2     2   14 use AnyEvent 5;
  2         39  
  2         48  
11 2     2   2481 use AnyEvent::Socket;
  2         83922  
  2         597  
12              
13 2     2   34 use Carp;
  2         4  
  2         176  
14              
15 2     2   14 use Scalar::Util qw(weaken);
  2         4  
  2         223  
16 2     2   2097 use AnyEvent::Connection::Raw;
  2         7  
  2         103  
17 2     2   23 use AnyEvent::Connection::Util;
  2         5  
  2         14  
18             # @rewrite s/^# //; # Development hacks, see L
19             # use Devel::Leak::Cb;
20              
21             =head1 NAME
22              
23             AnyEvent::Connection - Base class for tcp connectful clients
24              
25             =cut
26              
27             our $VERSION = '0.06';
28              
29             =head1 SYNOPSIS
30              
31             package MyTCPClient;
32             use base 'AnyEvent::Connection';
33              
34             package main;
35             my $client = MyTCPClient->new(
36             host => 'localhost',
37             port => 12345,
38             );
39             $client->reg_cb(
40             connected => sub {
41             my ($client,$connection,$host,$port) = @_;
42             # ...
43             $client->after(
44             $interval, sub {
45             # Called after interval, if connection still alive
46             }
47             );
48             }
49             connfail = sub {
50             my ($client,$reason) = @_;
51             # ...
52             },
53             disconnect => sub {
54             my ($client,$reason) = @_;
55             },
56             error => sub {
57             my ($client,$error) = @_;
58             # Called in error conditions for callbackless methods
59             },
60             );
61             $client->connect;
62              
63             =head1 EVENTS
64              
65             =over 4
66              
67             =item connected ($connobject, $host, $port)
68              
69             Called when client get connected.
70              
71             =item connfail
72              
73             Called, when client fails to connect
74              
75             =item disconnect
76              
77             Called whenever client disconnects
78              
79             =item error
80              
81             Called in error conditions for callbackless methods (for ex: when calling push_write on non-connected client)
82              
83             =back
84              
85             =head1 OPTIONS
86              
87             =over 4
88              
89             =item host
90              
91             Host to connect to
92              
93             =item port
94              
95             Port to connect to
96              
97             =item timeout [ = 3 ]
98              
99             Connect/read/write timeout in seconds
100              
101             =item reconnect [ = 1 ]
102              
103             If true, automatically reconnect after disconnect/connfail after delay $reconnect seconds
104              
105             =item rawcon [ = AnyEvent::Connection::Raw ]
106              
107             Class that implements low-level connection
108              
109             =back
110              
111             =head1 OPERATION METHODS
112              
113             =over 4
114              
115             =item new
116              
117             Cleates connection object (see OPTIONS)
118              
119             =item connect
120              
121             Begin connection
122              
123             =item disconnect ($reason)
124              
125             Close current connection. reason is optional
126              
127             =item reconnect
128              
129              
130             Close current connection and establish a new one
131              
132             =item after($interval, $cb->())
133              
134             Helper method. AE::timer(after), associated with current connection
135              
136             Will be destroyed if connection is destroyed, so no timer invocation after connection destruction.
137              
138             =item periodic($interval, $cb->())
139              
140             Helper method. AE::timer(periodic), associated with current connection
141              
142             Will be destroyed if connection is destroyed, so no timer invocation after connection destruction.
143              
144             =item periodic_stop()
145              
146             If called within periodic callback, periodic will be stopped.
147              
148             my $count;
149             $client->periodic(1,sub {
150             $client->periodic_stop if ++$count > 10;
151             });
152            
153             # callback will be called only 10 times;
154              
155             =item destroy
156              
157             Close connection, destroy all associated objects and timers, clean self
158              
159             =back
160              
161             =head1 CONNECT METHODS
162              
163             When connected, there are some methods, that proxied to raw connection or to AE::Handle
164              
165              
166             =over 4
167              
168             =item push_write
169              
170             See AE::Handle::push_write
171              
172             =item push_read
173              
174             See AE::Handle::push_read
175              
176             =item unshift_read
177              
178             See AE::Handle::unshift_read
179              
180             =item say
181              
182             Same as push_write + newline
183              
184             =item reply
185              
186             Same as push_write + newline
187              
188             =back
189              
190             For next methods there is a feature.
191             Callback will be called in any way, either by successful processing or by error or object destruction
192              
193             =over 4
194              
195             =item recv($bytes, %args, cb => $cb->())
196              
197             Similar to
198              
199             $fh->push_read(chunk => $bytes, $cb->());
200              
201             =item command($data, %args, cb => $cb->());
202              
203             Similar to
204              
205             $fh->push_write($data);
206             $fh->push_read(line => $cb->());
207              
208             =back
209              
210             =cut
211              
212             sub new {
213 1     1 1 7420 my $self = shift->SUPER::new(@_);
214 1         1313 $self->init(@_);
215 1         3 return $self;
216             }
217              
218             sub init {
219 1     1 0 11 my $self = shift;
220 1   50     40 $self->{debug} ||= 0;
221 1         2 $self->{connected} = 0;
222 1         8 $self->{connecting} = 0;
223 1 50       55 $self->{reconnect} = 1 unless defined $self->{reconnect};
224 1   50     34 $self->{timeout} ||= 3;
225 1         13 $self->{timers} = {};
226 1   50     20 $self->{rawcon} ||= 'AnyEvent::Connection::Raw';
227             #warn "Init $self";
228             }
229              
230             #sub connected {
231             # warn "Connected";
232             # shift->event(connected => ());
233             #}
234              
235             sub connect {
236 5     5 1 4084 my $self = shift;
237 5 100       37 $self->{connecting} and return;
238 4         10 $self->{connecting} = 1;
239 4         36 weaken $self;
240 4 50 66     47 croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
241 4         24 $self->{type} = 'client';
242            
243 4 50       21 warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
244             # @rewrite s/sub {/cb connect {/;
245             $self->{_}{con}{cb} = sub {
246 4     4   5265 pop;
247 4         69 delete $self->{_}{con};
248 4 50       52 if (my $fh = shift) {
249 4 50       28 warn "Connected @_" if $self->{debug};
250 4         91 $self->{con} = $self->{rawcon}->new(
251             fh => $fh,
252             timeout => $self->{timeout},
253             debug => $self->{debug},
254             );
255             $self->{con}->reg_cb(
256             disconnect => sub {
257 1 50       41 warn "Disconnected $self->{host}:$self->{port} @_" if $self->{debug};
258 1         8 $self->disconnect(@_);
259 1         58 $self->_reconnect_after();
260             },
261 4         76 );
262 4         490 $self->{connected} = 1;
263             #warn "Send connected event";
264 4         42 $self->event(connected => $self->{con}, @_);
265             } else {
266 0 0       0 warn "Not connected $self->{host}:$self->{port}: $!" if $self->{debug};
267 0         0 $self->event(connfail => "$!");
268 0         0 $self->_reconnect_after();
269             }
270 4         66 };
271 4     4   37 $self->{_}{con}{pre} = sub { $self->{timeout} };
  4         1810  
272 4         56 $self->{_}{con}{grd} =
273             AnyEvent::Socket::tcp_connect
274             $self->{host}, $self->{port},
275             $self->{_}{con}{cb}, $self->{_}{con}{pre}
276             ;
277             }
278              
279             sub accept {
280 0     0 0 0 croak "Not implemented yet";
281             }
282              
283              
284             sub _reconnect_after {
285 1     1   7 weaken( my $self = shift );
286 1 50       7 $self->{reconnect} or return $self->{connecting} = 0;
287             $self->{timers}{reconnect} = AnyEvent->timer(
288             after => $self->{reconnect},
289             cb => sub {
290 1 50   1   91046 $self or return;
291 1         15 delete $self->{timers}{reconnect};
292 1         5 $self->{connecting} = 0;
293 1         10 $self->connect;
294             }
295 1         46 );
296             }
297              
298             sub periodic_stop;
299             sub periodic {
300 0     0 1 0 weaken( my $self = shift );
301 0         0 my $interval = shift;
302 0         0 my $cb = shift;
303             #warn "Create periodic $interval";
304             $self->{timers}{int $cb} = AnyEvent->timer(
305             after => $interval,
306             interval => $interval,
307             cb => sub {
308             local *periodic_stop = sub {
309 0         0 warn "Stopping periodic ".int $cb;
310 0         0 delete $self->{timers}{int $cb}; undef $cb
  0         0  
311 0     0   0 };
312 0 0       0 $self or return;
313 0         0 $cb->();
314             },
315 0         0 );
316             defined wantarray and return AnyEvent::Util::guard(sub {
317 0     0   0 delete $self->{timers}{int $cb};
318 0         0 undef $cb;
319 0 0       0 });
320 0         0 return;
321             }
322              
323             sub after {
324 0     0 1 0 weaken( my $self = shift );
325 0         0 my $interval = shift;
326 0         0 my $cb = shift;
327             #warn "Create after $interval";
328             $self->{timers}{int $cb} = AnyEvent->timer(
329             after => $interval,
330             cb => sub {
331 0 0   0   0 $self or return;
332 0         0 delete $self->{timers}{int $cb};
333 0         0 $cb->();
334 0         0 undef $cb;
335             },
336 0         0 );
337             defined wantarray and return AnyEvent::Util::guard(sub {
338 0     0   0 delete $self->{timers}{int $cb};
339 0         0 undef $cb;
340 0 0       0 });
341 0         0 return;
342             }
343              
344             sub reconnect {
345 1     1 1 7916 my $self = shift;
346 1         15 $self->disconnect;
347 1         7 $self->connect;
348             }
349              
350             sub disconnect {
351 5     5 1 15287 my $self = shift;
352             #$self->{con} or return;
353             #warn "Disconnecting $self->{connected} || $self->{connecting} || $self->{reconnect} by @{[ (caller)[1,2] ]}";
354 5 50       35 ref $self->{con} eq 'HASH' and warn dumper($self->{con});
355 5 100       19 $self->{con} and eval{ $self->{con}->close; };
  4         30  
356 5 50       18 warn if $@;
357 5         16 delete $self->{con};
358 5   66     34 my $wascon = $self->{connected} || $self->{connecting};
359 5         12 $self->{connected} = 0;
360 5         13 $self->{connecting} = 0;
361             #$self->{reconnect} = 0;
362 5         16 delete $self->{timers}{reconnect};
363 5 100       53 $self->event('disconnect',@_) if $wascon;
364 5         11577 return;
365             }
366              
367 0     0   0 sub AnyEvent::Connection::destroyed::AUTOLOAD {}
368              
369             sub destroy {
370 0     0 1 0 my ($self) = @_;
371 0         0 $self->DESTROY;
372 0         0 bless $self, "AnyEvent::Connection::destroyed";
373             }
374              
375             sub DESTROY {
376 1     1   267 my $self = shift;
377 1 50       8 warn "(".int($self).") Destroying AE::CNN" if $self->{debug};
378 1         5 $self->disconnect;
379 1         63 %$self = ();
380             }
381              
382             BEGIN {
383 2     2   18 no strict 'refs';
  2         9  
  2         323  
384 2     2   6 for my $m (qw(push_write push_read unshift_read say reply recv command want_command)) {
385             *$m = sub {
386 0     0   0 my $self = shift;
387 0 0       0 $self->{connected} or return $self->event( error => "Not connected for $m" );
388 0         0 $self->{con}->$m(@_);
389 16         208 };
390             }
391             }
392              
393             =head1 AUTHOR
394              
395             Mons Anderson, C<< >>
396              
397             =head1 BUGS
398              
399             Please report any bugs or feature requests to C, or through
400             the web interface at L. I will be notified, and then you'll
401             automatically be notified of progress on your bug as I make changes.
402              
403              
404              
405              
406             =head1 SUPPORT
407              
408             You can find documentation for this module with the perldoc command.
409              
410             perldoc AnyEvent::Connection
411              
412              
413             You can also look for information at:
414              
415             =over 4
416              
417             =item * RT: CPAN's request tracker
418              
419             L
420              
421             =item * AnnoCPAN: Annotated CPAN documentation
422              
423             L
424              
425             =item * CPAN Ratings
426              
427             L
428              
429             =item * Search CPAN
430              
431             L
432              
433             =back
434              
435              
436             =head1 ACKNOWLEDGEMENTS
437              
438              
439             =head1 COPYRIGHT & LICENSE
440              
441             Copyright 2009 Mons Anderson, all rights reserved.
442              
443             This program is free software; you can redistribute it and/or modify it
444             under the same terms as Perl itself.
445              
446              
447             =cut
448              
449             1; # End of AnyEvent::Connection