File Coverage

blib/lib/AnyEvent/MPRPC/Server.pm
Criterion Covered Total %
statement 33 78 42.3
branch 0 16 0.0
condition 0 5 0.0
subroutine 11 16 68.7
pod 2 2 100.0
total 46 117 39.3


line stmt bran cond sub pod time code
1             package AnyEvent::MPRPC::Server;
2 3     3   20 use strict;
  3         5  
  3         92  
3 3     3   14 use warnings;
  3         5  
  3         63  
4 3     3   2779 use Any::Moose;
  3         118792  
  3         21  
5              
6 3     3   1818 use Carp;
  3         7  
  3         229  
7 3     3   16 use Scalar::Util 'weaken';
  3         6  
  3         180  
8              
9 3     3   4422 use AnyEvent::Handle;
  3         75099  
  3         128  
10 3     3   4076 use AnyEvent::Socket;
  3         56505  
  3         489  
11 3     3   1878 use AnyEvent::MPRPC::CondVar;
  3         268  
  3         93  
12 3     3   2594 use AnyEvent::MessagePack;
  3         12592  
  3         97  
13              
14 3     3   1778 use AnyEvent::MPRPC::Constant;
  3         9  
  3         20  
15              
16             has address => (
17             is => 'ro',
18             isa => 'Maybe[Str]',
19             default => undef,
20             );
21              
22             has port => (
23             is => 'ro',
24             isa => 'Int|Str',
25             required => 1,
26             );
27              
28             has server => (
29             is => 'rw',
30             isa => 'Object',
31             );
32              
33             has on_error => (
34             is => 'rw',
35             isa => 'CodeRef',
36             lazy => 1,
37             default => sub {
38             return sub {
39             my ($handle, $fatal, $message) = @_;
40             carp sprintf "Server got error: %s", $message;
41             };
42             },
43             );
44              
45             has on_eof => (
46             is => 'rw',
47             isa => 'CodeRef',
48             lazy => 1,
49             default => sub {
50             return sub { };
51             },
52             );
53              
54             has on_accept => (
55             is => 'rw',
56             isa => 'CodeRef',
57             lazy => 1,
58             default => sub {
59             return sub { };
60             },
61             );
62              
63             has on_dispatch => (
64             is => 'rw',
65             isa => 'CodeRef',
66             lazy => 1,
67             default => sub {
68             return sub { };
69             },
70             );
71              
72             has handler_options => (
73             is => 'ro',
74             isa => 'HashRef',
75             default => sub { {} },
76             );
77              
78             has _handlers => (
79             is => 'ro',
80             isa => 'ArrayRef',
81             default => sub { [] },
82             );
83              
84             has _callbacks => (
85             is => 'ro',
86             isa => 'HashRef',
87             lazy => 1,
88             default => sub { {} },
89             );
90              
91 3     3   21 no Any::Moose;
  3         4  
  3         27  
92              
93             sub BUILD {
94 0     0 1   my $self = shift;
95              
96             $self->server(tcp_server $self->address, $self->port, sub {
97 0     0     $self->on_accept->(@_);
98              
99 0           my ($fh, $host, $port) = @_;
100 0           my $indicator = "$host:$port";
101              
102             my $handle = AnyEvent::Handle->new(
103             on_error => sub {
104 0           my ($h, $fatal, $msg) = @_;
105 0           $self->on_error->(@_);
106 0           $h->destroy;
107             },
108             on_eof => sub {
109 0           my ($h) = @_;
110             # client disconnected
111 0           $self->on_eof->(@_);
112 0           $h->destroy;
113             },
114 0           %{ $self->handler_options },
  0            
115             fh => $fh,
116             );
117              
118 0           $handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));
119              
120 0           $self->_handlers->[ fileno($fh) ] = $handle;
121 0 0         }) unless defined $self->server;
122 0           weaken $self;
123              
124 0           $self;
125             }
126              
127             sub reg_cb {
128 0     0 1   my ($self, %callbacks) = @_;
129              
130 0           while (my ($method, $callback) = each %callbacks) {
131 0           $self->_callbacks->{ $method } = $callback;
132             }
133             }
134              
135             sub _dispatch_cb {
136 0     0     my ($self, $indicator) = @_;
137              
138 0           weaken $self;
139              
140             return sub {
141 0 0   0     $self || return;
142              
143 0           my ($handle, $request) = @_;
144 0           $self->on_dispatch->($indicator, $handle, $request);
145 0 0         return if $handle->destroyed;
146              
147 0           $handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));
148              
149 0 0 0       return unless $request and ref $request eq 'ARRAY';
150              
151 0           my $target = $self->_callbacks->{ $request->[MP_REQ_METHOD] };
152              
153 0           my $id = $request->[MP_REQ_MSGID];
154 0           $indicator = "$indicator:$id";
155              
156             my $res_cb = sub {
157 0           my $type = shift;
158 0 0         my $result = @_ > 1 ? \@_ : $_[0];
159              
160 0 0         $handle->push_write( msgpack => [
    0          
    0          
161             MP_TYPE_RESPONSE,
162             int($id), # should be IV.
163             $type eq 'error' ? $result : undef,
164             $type eq 'result' ? $result : undef,
165             ]) if $handle;
166 0           };
167 0           weaken $handle;
168              
169 0           my $cv = AnyEvent::MPRPC::CondVar->new;
170             $cv->_cb(
171 0           sub { $res_cb->( result => $_[0]->recv ) },
172 0           sub { $res_cb->( error => $_[0]->recv ) },
173 0           );
174              
175 0   0       $target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) };
  0            
  0            
176 0           $target->( $cv, $request->[MP_REQ_PARAMS] );
177 0           };
178             }
179              
180             __PACKAGE__->meta->make_immutable;
181              
182             __END__