File Coverage

blib/lib/MR/IProto/Server/Connection.pm
Criterion Covered Total %
statement 12 81 14.8
branch 0 30 0.0
condition n/a
subroutine 4 16 25.0
pod 2 3 66.6
total 18 130 13.8


line stmt bran cond sub pod time code
1             package MR::IProto::Server::Connection;
2              
3             =head1 NAME
4              
5             =head1 DESCRIPTION
6              
7             =cut
8              
9 1     1   7 use Mouse;
  1         2  
  1         10  
10 1     1   426 use AnyEvent::DNS;
  1         2  
  1         29  
11 1     1   5 use Scalar::Util qw/weaken/;
  1         2  
  1         1051  
12              
13             with 'MR::IProto::Role::Debuggable';
14              
15             has handler => (
16             is => 'ro',
17             isa => 'CodeRef',
18             required => 1,
19             );
20              
21             has on_accept => (
22             is => 'ro',
23             isa => 'CodeRef',
24             );
25              
26             has on_close => (
27             is => 'ro',
28             isa => 'CodeRef',
29             );
30              
31             has on_error => (
32             is => 'ro',
33             isa => 'CodeRef',
34             );
35              
36             has fh => (
37             is => 'ro',
38             isa => 'FileHandle',
39             required => 1,
40             );
41              
42             has host => (
43             is => 'ro',
44             isa => 'Str',
45             required => 1,
46             );
47              
48             has port => (
49             is => 'ro',
50             isa => 'Int',
51             required => 1,
52             );
53              
54             has hostname => (
55             is => 'ro',
56             isa => 'Str',
57             writer => '_hostname',
58             lazy_build => 1,
59             );
60              
61             has _handle => (
62             is => 'ro',
63             isa => 'AnyEvent::Handle',
64             lazy_build => 1,
65             );
66              
67             has _recv_header => (
68             is => 'ro',
69             isa => 'CodeRef',
70             lazy_build => 1,
71             );
72              
73             sub BUILD {
74 0     0 1   my ($self) = @_;
75 0 0         $self->_debug(sprintf "Connection accepted") if $self->debug >= 1;
76 0           weaken($self);
77             AnyEvent::DNS::reverse_verify $self->host, sub {
78 0     0     my ($hostname) = @_;
79 0 0         if ($hostname) {
80 0           $self->_hostname($hostname);
81 0 0         $self->_debug(sprintf "%s resolved as %s", $self->host, $hostname) if $self->debug >= 4;
82             } else {
83 0           $self->_hostname($self->host);
84 0           $self->_debug(sprintf "Can't resolve %s", $self->host);
85             }
86 0           $self->_handle;
87 0 0         $self->on_accept->($self) if $self->on_accept;
88 0           return;
89 0           };
90 0           return;
91             }
92              
93             sub DEMOLISH {
94 0     0 1   my ($self) = @_;
95 0 0         $self->_debug(sprintf "Object for connection was destroyed\n") if $self->debug >= 1;
96 0           return;
97             }
98              
99             sub close {
100 0     0 0   my ($self) = @_;
101 0 0         $self->on_close->($self) if $self->on_close;
102 0           return;
103             }
104              
105             sub _build_hostname {
106 0     0     my ($self) = @_;
107 0           return $self->host;
108             }
109              
110             sub _build__handle {
111 0     0     my ($self) = @_;
112 0           weaken($self);
113 0           my $peername = join ':', $self->host, $self->port;
114             return AnyEvent::Handle->new(
115             fh => $self->fh,
116             peername => $peername,
117             on_read => sub {
118 0     0     my ($handle) = @_;
119 0           $handle->unshift_read( chunk => 12, $self->_recv_header );
120 0           return;
121             },
122             on_eof => sub {
123 0     0     my ($handle) = @_;
124 0 0         $self->_debug("Connection closed by foreign host\n") if $self->debug >= 1;
125 0           $handle->destroy();
126 0 0         $self->on_close->($self) if $self->on_close;
127 0           return;
128             },
129             on_error => sub {
130 0     0     my ($handle, $fatal, $message) = @_;
131 0           $handle->destroy();
132 0 0         if ($self->on_error) {
133 0 0         $self->_debug("error: $message\n") if $self->debug >= 1;
134 0           $self->on_error->($self, $message);
135             } else {
136 0           $self->_debug("error: $message\n");
137             }
138 0 0         $self->on_close->($self) if $self->on_close;
139 0           return;
140             }
141 0           );
142 0           return;
143             }
144              
145             sub _build__recv_header {
146 0     0     my ($self) = @_;
147 0           weaken($self);
148 0           my $handler = $self->handler;
149             return sub {
150 0     0     my ($handle, $data) = @_;
151 0 0         $self->_debug_dump('recv header: ', $data) if $self->debug >= 5;
152 0           my ($cmd, $length, $sync) = unpack 'L3', $data;
153             $handle->unshift_read(
154             chunk => $length,
155             sub {
156 0           my ($handle, $data) = @_;
157 0 0         $self->_debug_dump('recv payload: ', $data) if $self->debug >= 5;
158 0           my $result;
159 0 0         if (eval { $result = $handler->($self, $cmd, $data); 1 }) {
  0            
  0            
160 0           my $header = pack 'L3', $cmd, length $result, $sync;
161 0 0         if ($self->debug >= 6) {
162 0           $self->_debug_dump('send header: ', $header);
163 0           $self->_debug_dump('send payload: ', $result);
164             }
165 0           $handle->push_write($header . $result);
166             } else {
167 0           warn $@;
168 0           $self->_debug("Failed to handle cmd=$cmd\n");
169             }
170 0           return;
171             }
172 0           );
173 0           return;
174 0           };
175             }
176              
177             sub _debug {
178 0     0     my ($self, $msg) = @_;
179 0           $self->debug_cb->( sprintf "%s(%s:%d): %s", $self->hostname, $self->host, $self->port, $msg );
180 0           return;
181             }
182              
183 1     1   6 no Mouse;
  1         2  
  1         6  
184             __PACKAGE__->meta->make_immutable();
185              
186             1;