File Coverage

blib/lib/IPC/Simple/Channel.pm
Criterion Covered Total %
statement 44 58 75.8
branch 2 2 100.0
condition 3 3 100.0
subroutine 10 14 71.4
pod 0 10 0.0
total 59 87 67.8


line stmt bran cond sub pod time code
1             package IPC::Simple::Channel;
2             $IPC::Simple::Channel::VERSION = '0.09';
3 4     4   29 use strict;
  4         8  
  4         118  
4 4     4   19 use warnings;
  4         17  
  4         98  
5              
6 4     4   20 use AnyEvent qw();
  4         9  
  4         2281  
7              
8             sub new {
9 6     6 0 30 my ($class) = @_;
10              
11 6         109 bless{
12             waiters => [],
13             buffer => [],
14             is_shutdown => 0,
15             }, $class;
16             }
17              
18             sub DESTROY {
19 0     0   0 my $self = shift;
20 0         0 $self->shutdown;
21             }
22              
23             sub shutdown {
24 4     4 0 12 my $self = shift;
25              
26 4         17 $self->{is_shutdown} = 1;
27              
28             # flush any remaining messages that have pending receivers
29 4         37 $self->flush;
30              
31             # send undef to any remaining receivers
32 4         12 $_->send for @{ $self->{waiters} };
  4         22  
33             }
34              
35             sub size {
36 0     0 0 0 my $self = shift;
37 0         0 return scalar @{ $self->{buffer} };
  0         0  
38             }
39              
40             sub put {
41 6     6 0 13 my $self = shift;
42 6         7 push @{ $self->{buffer} }, @_;
  6         20  
43 6         25 $self->flush;
44 6         15 return $self->{size};
45             }
46              
47             sub get {
48 2     2 0 5 my $self = shift;
49 2         6 $self->async->recv;
50             }
51              
52             sub recv {
53 5     5 0 26 my $self = shift;
54 5         25 $self->async->recv;
55             }
56              
57             sub next {
58 0     0 0 0 my $self = shift;
59 0         0 $self->async->recv;
60             }
61              
62             sub async {
63 7     7 0 12 my $self = shift;
64 7         183 my $cv = AnyEvent->condvar;
65              
66 7 100       709 if ($self->{is_shutdown}) {
67 4         5 my $msg = shift @{ $self->{buffer} };
  4         10  
68 4         11 $cv->send($msg);
69 4         43 return $cv;
70             }
71             else {
72 3         15 push @{ $self->{waiters} }, $cv;
  3         15  
73 3         94 $self->flush;
74 3         44 return $cv;
75             }
76             }
77              
78             sub flush {
79 13     13 0 26 my $self = shift;
80 13   100     25 while (@{ $self->{waiters} } && @{ $self->{buffer} }) {
  16         128  
  5         32  
81 3         4 my $cv = shift @{ $self->{waiters} };
  3         6  
82 3         7 $cv->send( shift @{ $self->{buffer} } );
  3         31  
83             }
84             }
85              
86             sub clear {
87 0     0 0   my $self = shift;
88 0           $self->flush;
89 0           my @msgs = @{ $self->{buffer} };
  0            
90 0           $self->{buffer} = [];
91 0           $self->{size} = 0;
92 0           return @msgs;
93             }
94              
95             1;
96              
97             __END__