File Coverage

blib/lib/IPC/Simple/Channel.pm
Criterion Covered Total %
statement 44 58 75.8
branch 2 2 100.0
condition 3 3 100.0
subroutine 10 14 71.4
pod 0 10 0.0
total 59 87 67.8


line stmt bran cond sub pod time code
1             package IPC::Simple::Channel;
2             $IPC::Simple::Channel::VERSION = '0.08';
3 4     4   28 use strict;
  4         8  
  4         124  
4 4     4   19 use warnings;
  4         8  
  4         105  
5              
6 4     4   17 use AnyEvent qw();
  4         8  
  4         2098  
7              
8             sub new {
9 6     6 0 22 my ($class) = @_;
10              
11 6         102 bless{
12             waiters => [],
13             buffer => [],
14             is_shutdown => 0,
15             }, $class;
16             }
17              
18             sub DESTROY {
19 0     0   0 my $self = shift;
20 0         0 $self->shutdown;
21             }
22              
23             sub shutdown {
24 4     4 0 12 my $self = shift;
25              
26 4         25 $self->{is_shutdown} = 1;
27              
28             # flush any remaining messages that have pending receivers
29 4         41 $self->flush;
30              
31             # send undef to any remaining receivers
32 4         12 $_->send for @{ $self->{waiters} };
  4         22  
33             }
34              
35             sub size {
36 0     0 0 0 my $self = shift;
37 0         0 return scalar @{ $self->{buffer} };
  0         0  
38             }
39              
40             sub put {
41 6     6 0 12 my $self = shift;
42 6         15 push @{ $self->{buffer} }, @_;
  6         14  
43 6         18 $self->flush;
44 6         18 return $self->{size};
45             }
46              
47             sub get {
48 2     2 0 5 my $self = shift;
49 2         10 $self->async->recv;
50             }
51              
52             sub recv {
53 5     5 0 11 my $self = shift;
54 5         17 $self->async->recv;
55             }
56              
57             sub next {
58 0     0 0 0 my $self = shift;
59 0         0 $self->async->recv;
60             }
61              
62             sub async {
63 7     7 0 12 my $self = shift;
64 7         181 my $cv = AnyEvent->condvar;
65              
66 7 100       700 if ($self->{is_shutdown}) {
67 4         7 my $msg = shift @{ $self->{buffer} };
  4         12  
68 4         20 $cv->send($msg);
69 4         44 return $cv;
70             }
71             else {
72 3         14 push @{ $self->{waiters} }, $cv;
  3         20  
73 3         94 $self->flush;
74 3         43 return $cv;
75             }
76             }
77              
78             sub flush {
79 13     13 0 31 my $self = shift;
80 13   100     29 while (@{ $self->{waiters} } && @{ $self->{buffer} }) {
  16         148  
  5         30  
81 3         5 my $cv = shift @{ $self->{waiters} };
  3         7  
82 3         10 $cv->send( shift @{ $self->{buffer} } );
  3         33  
83             }
84             }
85              
86             sub clear {
87 0     0 0   my $self = shift;
88 0           $self->flush;
89 0           my @msgs = @{ $self->{buffer} };
  0            
90 0           $self->{buffer} = [];
91 0           $self->{size} = 0;
92 0           return @msgs;
93             }
94              
95             1;
96              
97             __END__