File Coverage

blib/lib/AnyEvent/MPRPC/Client.pm
Criterion Covered Total %
statement 72 81 88.8
branch 16 24 66.6
condition 3 3 100.0
subroutine 17 18 94.4
pod 2 2 100.0
total 110 128 85.9


line stmt bran cond sub pod time code
1             package AnyEvent::MPRPC::Client;
2 3     3   17 use strict;
  3         7  
  3         102  
3 3     3   16 use warnings;
  3         6  
  3         109  
4 3     3   15 use Any::Moose;
  3         5  
  3         20  
5              
6 3     3   1575 use Carp;
  3         6  
  3         266  
7 3     3   17 use Scalar::Util 'weaken';
  3         6  
  3         126  
8              
9 3     3   16 use AnyEvent;
  3         4  
  3         94  
10 3     3   26 use AnyEvent::Socket;
  3         4  
  3         372  
11 3     3   16 use AnyEvent::Handle;
  3         4  
  3         71  
12 3     3   15 use AnyEvent::MessagePack;
  3         5  
  3         58  
13 3     3   16 use AnyEvent::MPRPC::Constant;
  3         5  
  3         20  
14              
15             has host => (
16             is => 'ro',
17             isa => 'Str',
18             required => 1,
19             );
20              
21             has port => (
22             is => 'ro',
23             isa => 'Int|Str',
24             required => 1,
25             );
26              
27             has connect_timeout => (
28             is => 'ro',
29             isa => 'Int|Str',
30             );
31              
32             has handler => (
33             is => 'rw',
34             isa => 'AnyEvent::Handle',
35             );
36              
37             has on_error => (
38             is => 'rw',
39             isa => 'CodeRef',
40             lazy => 1,
41             default => sub {
42             return sub {
43             my ($handle, $fatal, $message) = @_;
44             croak sprintf "Client got error: %s", $message;
45             };
46             },
47             );
48              
49             has handler_options => (
50             is => 'ro',
51             isa => 'HashRef',
52             default => sub { {} },
53             );
54              
55             has _request_pool => (
56             is => 'ro',
57             isa => 'ArrayRef',
58             lazy => 1,
59             default => sub { [] },
60             );
61              
62             has _next_id => (
63             is => 'ro',
64             isa => 'CodeRef',
65             lazy => 1,
66             default => sub {
67             my $id = 0;
68             sub { ++$id };
69             },
70             );
71              
72             has _callbacks => (
73             is => 'ro',
74             isa => 'HashRef',
75             lazy => 1,
76             default => sub { +{} },
77             );
78              
79             has _connection_guard => (
80             is => 'rw',
81             isa => 'Object',
82             );
83              
84             has 'before_connect' => (
85             is => 'ro',
86             isa => 'CodeRef',
87             );
88              
89             has 'after_connect' => (
90             is => 'ro',
91             isa => 'CodeRef',
92             );
93              
94             # depreciated!
95             has 'on_connect' => (
96             is => 'ro',
97             isa => 'CodeRef',
98             );
99              
100 3     3   18 no Any::Moose;
  3         3  
  3         18  
101              
102             sub BUILD {
103 2     2 1 12 my $self = shift;
104              
105 2 100   0   41 my $after_connect = $self->after_connect ? sub { $self->after_connect->($self, @_) } : undef;
  0         0  
106             my $guard = tcp_connect $self->host, $self->port, sub {
107 2 50   2   5527 my $fh = shift
108             or return
109             $self->on_error->(
110             undef, 1,
111             "Failed to connect $self->{host}:$self->{port}: $!",
112             );
113 2         6 my($host, $port, $retry) = @_;
114 2 100       23 $self->after_connect
115             and $self->after_connect->($self, $fh, $host, $port, $retry);
116              
117             my $handle = AnyEvent::Handle->new(
118             on_error => sub {
119 0         0 my ($h, $fatal, $msg) = @_;
120 0         0 $self->on_error->(@_);
121 0         0 $h->destroy;
122             },
123 2         24 %{ $self->handler_options },
  2         51  
124             fh => $fh,
125             );
126              
127 2         566 $handle->unshift_read(msgpack => $self->_handle_response_cb);
128              
129 2         465 while (my $pooled = shift @{ $self->_request_pool }) {
  3         390  
130 1         26 $handle->push_write( msgpack => $pooled );
131             }
132              
133 2         426 $self->handler( $handle );
134             }, sub {
135 2     2   17960 my $connect_timeout;
136              
137 2 100       30 $self->before_connect
138             and $self->before_connect->($self, @_);
139              
140             # on_conect is depreciated!
141 2 50       42 $self->on_connect
142             and $connect_timeout = $self->on_connect->($self, @_);
143              
144             # For backward compatibility, if connect_timeout option isn't specifed
145             # use return value of on_connect callback as connect timeout seconds.
146 2 50       15 $self->connect_timeout
147             and $connect_timeout = $self->connect_timeout;
148              
149 2         10 return $connect_timeout;
150 2         120 };
151 2         1716 weaken $self;
152              
153 2         56 $self->_connection_guard($guard);
154             }
155              
156             sub call {
157 4     4 1 5016 my ($self, $method) = (shift, shift);
158 4 100 100     61 my $param = (@_ == 1 && ref $_[0] eq "ARRAY") ? $_[0] : [@_];
159              
160 4         33 my $msgid = $self->_next_id->();
161              
162 4         14 my $request = [
163             MP_TYPE_REQUEST,
164             int($msgid), # should be IV
165             $method,
166             $param,
167             ];
168              
169 4 100       18 if ($self->handler) {
170 3         24 $self->handler->push_write( msgpack => $request );
171             }
172             else {
173 1         3 push @{ $self->_request_pool }, $request;
  1         13  
174             }
175              
176             # $msgid is stringified, but $request->{MP_RES_MSGID] is still IV
177 4         2834 $self->_callbacks->{ $msgid } = AnyEvent->condvar;
178             }
179              
180             sub _handle_response_cb {
181 6     6   15 my $self = shift;
182              
183 6         20 weaken $self;
184              
185             return sub {
186 4 50   4   1296 $self || return;
187              
188 4         7 my ($handle, $res) = @_;
189              
190 4         22 my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };
191              
192 4 50       17 if (my $error = $res->[MP_RES_ERROR]) {
193 0 0       0 if ($d) {
194 0         0 $d->croak($error);
195             } else {
196 0         0 Carp::croak($error);
197             }
198             }
199              
200 4         84 $handle->unshift_read(msgpack => $self->_handle_response_cb);
201              
202 4 50       120 if ($d) {
203 4         20 $d->send($res->[MP_RES_RESULT]);
204             } else {
205 0           warn q/Invalid response from server/;
206 0           return;
207             }
208 6         100 };
209             }
210              
211             __PACKAGE__->meta->make_immutable;
212              
213             __END__