File Coverage

blib/lib/Tak/ConnectionReceiver.pm
Criterion Covered Total %
statement 12 35 34.2
branch 0 4 0.0
condition n/a
subroutine 4 14 28.5
pod 0 7 0.0
total 16 60 26.6


line stmt bran cond sub pod time code
1             package Tak::ConnectionReceiver;
2              
3 1     1   910 use Tak::Request;
  1         1  
  1         29  
4 1     1   5 use Scalar::Util qw(weaken);
  1         2  
  1         47  
5 1     1   849 use Log::Contextual qw(:log);
  1         39875  
  1         5  
6 1     1   10558 use Moo;
  1         2  
  1         5  
7              
8             with 'Tak::Role::Service';
9              
10             has requests => (is => 'ro', default => sub { {} });
11              
12             has channel => (is => 'ro', required => 1);
13              
14             has service => (is => 'ro', required => 1);
15              
16             has on_close => (is => 'ro', required => 1);
17              
18             sub BUILD {
19 0     0 0   weaken(my $self = shift);
20 0           my $channel = $self->channel;
21             Tak->loop->watch_io(
22             handle => $channel->read_fh,
23             on_read_ready => sub {
24 0     0     $channel->read_messages(sub { $self->receive(@_) });
  0            
25             }
26 0           );
27             }
28              
29             sub DEMOLISH {
30 0     0 0   Tak->loop->unwatch_io(
31             handle => $_[0]->channel->read_fh,
32             on_read_ready => 1,
33             );
34             }
35              
36             sub receive_request {
37 0     0 0   my ($self, $tag, $meta, @payload) = @_;
38 0           my $channel = $self->channel;
39 0 0         unless (ref($meta) eq 'HASH') {
40 0           $channel->write_message(mistake => $tag => 'meta value not a hashref');
41 0           return;
42             }
43             my $req = Tak::Request->new(
44             ($meta->{progress}
45 0     0     ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
46             : ()),
47 0     0     on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
48 0 0         );
49 0           $self->service->start_request($req => @payload);
50             }
51              
52             sub receive_progress {
53 0     0 0   my ($self, $tag, @payload) = @_;
54 0           $self->requests->{$tag}->progress(@payload);
55             }
56              
57             sub receive_result {
58 0     0 0   my ($self, $tag, @payload) = @_;
59 0           (delete $self->requests->{$tag})->result(@payload);
60             }
61              
62             sub receive_message {
63 0     0 0   my ($self, @payload) = @_;
64 0           $self->service->receive(@payload);
65             }
66              
67             sub receive_close {
68 0     0 0   my ($self, @payload) = @_;
69 0           $self->on_close->(@payload);
70             }
71              
72             1;