File Coverage

blib/lib/AnyEvent/Gearman/Connection.pm
Criterion Covered Total %
statement 67 77 87.0
branch 9 16 56.2
condition 1 3 33.3
subroutine 14 14 100.0
pod 7 7 100.0
total 98 117 83.7


line stmt bran cond sub pod time code
1             package AnyEvent::Gearman::Connection;
2 6     6   6619 use Any::Moose;
  6         17  
  6         51  
3 6     6   4307 use Scalar::Util 'weaken';
  6         14  
  6         369  
4              
5 6     6   9747 use AnyEvent::Socket;
  6         108802  
  6         1273  
6 6     6   10479 use AnyEvent::Handle;
  6         105381  
  6         1597  
7              
8             has hostspec => (
9             is => 'ro',
10             isa => 'Str',
11             required => 1,
12             );
13              
14             has _host => (
15             is => 'rw',
16             isa => 'Str',
17             );
18              
19             has _port => (
20             is => 'rw',
21             isa => 'Int | Str',
22             );
23              
24             has context => (
25             is => 'rw',
26             isa => 'Object',
27             weak_ref => 1,
28             );
29              
30             has handler => (
31             is => 'rw',
32             isa => 'Maybe[AnyEvent::Handle]',
33             clearer => 'clear_handler',
34             );
35              
36             has on_connect_callbacks => (
37             is => 'rw',
38             isa => 'ArrayRef',
39             default => sub { [] },
40             );
41              
42             has dead_time => (
43             is => 'rw',
44             isa => 'Int',
45             default => 0,
46             );
47              
48             has _need_handle => (
49             is => 'rw',
50             isa => 'ArrayRef',
51             default => sub { [] },
52             );
53              
54             has _job_handles => (
55             is => 'rw',
56             isa => 'HashRef',
57             default => sub { {} },
58             );
59              
60             has _con_guard => (
61             is => 'rw',
62             isa => 'Object',
63             );
64              
65 6     6   115 no Any::Moose;
  6         14  
  6         66  
66              
67             sub BUILD {
68 7     7 1 26 my $self = shift;
69              
70             # parse hostspec
71 7         539 my ($host, $service) = parse_hostport $self->hostspec;
72 7 100       688 unless (defined $host) {
73 2         8 $host = $self->hostspec;
74 2         4 $service = 4730;
75             }
76              
77 7 50 33     256 unless (defined($host) && defined($service)) {
78 0         0 die sprintf('Failed to parse hostspec: "%s"', $self->hostspec);
79             }
80              
81 7         102 $self->_host( $host );
82 7         500 $self->_port( $service );
83             }
84              
85             sub connect {
86 10     10 1 41 my ($self) = @_;
87              
88             # already connected
89 10 50       69 return if $self->handler;
90              
91             my $g = tcp_connect $self->_host, $self->_port, sub {
92 6     6   939 my ($fh) = @_;
93              
94 6 50       27 if ($fh) {
95             my $handle = AnyEvent::Handle->new(
96             fh => $fh,
97 26         203898 on_read => sub { $self->process_packet },
98             on_error => sub {
99 1         18 my @undone = @{ $self->_need_handle },
  1         30  
100 1         254 values %{ $self->_job_handles };
101 1         22 $_->event('on_fail') for @undone;
102              
103 1         223 $self->_need_handle([]);
104 1         14 $self->_job_handles({});
105 1         45 $self->mark_dead;
106             },
107 6         373 );
108 6         1371 $self->handler( $handle );
109 6         26 $_->() for map { $_->[0] } @{ $self->on_connect_callbacks };
  7         50  
  6         32  
110             }
111             else {
112 0         0 warn sprintf("Connection failed: %s", $!);
113 0         0 $self->mark_dead;
114 0         0 $_->() for map { $_->[1] } @{ $self->on_connect_callbacks };
  0         0  
  0         0  
115             }
116              
117 6         356 $self->on_connect_callbacks( [] );
118 10         1863 };
119              
120 10         48731 weaken $self;
121 10         288 $self->_con_guard($g);
122              
123 10         128 $self;
124             }
125              
126             sub connected {
127 28     28 1 266 !!shift->handler;
128             }
129              
130             sub add_on_ready {
131 27     27 1 84 my ($self, $cb, $eb) = @_;
132              
133 27 100       230 if ($self->connected) {
134 17         376 $cb->();
135             }
136             else {
137 10         39 push @{ $self->on_connect_callbacks }, [ $cb, $eb ];
  10         106  
138 10         128 $self->connect;
139             }
140             }
141              
142             sub mark_dead {
143 1     1 1 11 my ($self) = @_;
144 1         18 $self->dead_time( time + 10 );
145 1         67 $self->clear_handler;
146             }
147              
148             sub alive {
149 12     12 1 49 my ($self) = @_;
150 12         212 $self->dead_time <= time;
151             }
152              
153             sub process_packet {
154 26     26 1 74 my $self = shift;
155 26         157 my $handle = $self->handler;
156              
157             $handle->unshift_read(chunk => 4, sub {
158 26 50   26   725 unless ($_[1] eq "\0RES") {
159 0         0 die qq[invalid packet: $_[1]"];
160             }
161              
162             $_[0]->unshift_read( chunk => 8, sub {
163 26         2077 my ($type, $len) = unpack 'NN', $_[1];
164 26         459 my $packet_handler = $self->can("process_packet_$type");
165              
166 26 50       104 unless ($packet_handler) {
167             # Ignore unimplement packet
168 0 0       0 $_[0]->unshift_read( chunk => $len, sub {} ) if $len;
  0         0  
169 0         0 return;
170             }
171              
172 26         513 $packet_handler->( $self, $len );
173 26         288 });
174 26         535 });
175 26         1251 weaken $self;
176             }
177              
178             __PACKAGE__->meta->make_immutable;
179              
180             __END__