File Coverage

blib/lib/Promises/Channel.pm
Criterion Covered Total %
statement 70 72 97.2
branch 2 2 100.0
condition 8 12 66.6
subroutine 16 17 94.1
pod 8 11 72.7
total 104 114 91.2


line stmt bran cond sub pod time code
1             package Promises::Channel;
2             # ABSTRACT: a coordination channel implemented with Promises
3             $Promises::Channel::VERSION = '0.01';
4              
5 1     1   225177 use strict;
  1         6  
  1         30  
6 1     1   5 use warnings;
  1         2  
  1         25  
7              
8 1     1   592 use Moo;
  1         11065  
  1         5  
9 1     1   1440 use Promises qw(deferred);
  1         2  
  1         6  
10              
11             extends 'Exporter';
12              
13             our @EXPORT_OK = qw(
14             channel
15             chan
16             );
17              
18              
19              
20             has limit =>
21             is => 'ro',
22             predicate => 'has_limit';
23              
24              
25             has is_shutdown =>
26             is => 'ro',
27             default => 0;
28              
29             has backlog =>
30             is => 'ro',
31             default => sub { [] };
32              
33             has inbox =>
34             is => 'ro',
35             default => sub { [] };
36              
37             has outbox =>
38             is => 'ro',
39             default => sub { [] };
40              
41              
42              
43             sub size {
44 32     32 1 5656 my $self = shift;
45 32         46 scalar @{ $self->inbox };
  32         187  
46             }
47              
48              
49             sub is_full {
50 20     20 1 27 my $self = shift;
51 20   66     56 return $self->has_limit
52             && $self->size == $self->limit;
53             }
54              
55              
56             sub is_empty {
57 1     1 1 3 my $self = shift;
58             return $self->size == 0
59 1   33     3 && !@{ $self->backlog };
60             }
61              
62              
63             sub put {
64 10     10 1 1841 my ($self, $item) = @_;
65 10         22 my $soon = deferred;
66              
67             my $promise = $soon->promise->then(sub {
68 10     10   347 my ($self, $item) = @_;
69 10         101 $self->drain;
70 0         0 return $self;
71 10         114 });
72              
73 10         625 push @{ $self->backlog }, [$item, $soon];
  10         27  
74 10         27 $self->pump;
75 10         24 $soon->promise;
76             }
77              
78              
79             sub get {
80 11     11 1 754 my $self = shift;
81 11         25 my $soon = deferred;
82 11         143 push @{ $self->outbox }, $soon;
  11         28  
83              
84             my $promise = $soon->promise->then(sub {
85 11     11   349 my ($self, $item) = @_;
86 11         23 $self->pump;
87 11         27 return ($self, $item);
88 11         22 });
89              
90 11         739 $self->drain;
91              
92 11         55 return $promise;
93             }
94              
95              
96             sub shutdown {
97 2     2 1 1136 my $self = shift;
98 2         8 $self->{is_shutdown} = 1;
99 2         6 $self->drain;
100 2         5 $self->pump;
101             }
102              
103             sub pump {
104 23     23 0 33 my $self = shift;
105              
106 23   100     27 while (@{ $self->backlog } && !$self->is_full) {
  33         1609  
107 10         21 my ($item, $soon) = @{ shift @{ $self->backlog } };
  10         13  
  10         25  
108 10         16 push @{ $self->inbox }, $item;
  10         31  
109 10         20 $soon->resolve($self->size);
110             }
111             }
112              
113             sub drain {
114 13     13 0 21 my $self = shift;
115              
116 13   66     17 while (@{ $self->inbox } && @{ $self->outbox }) {
  23         3864  
  10         33  
117 10         13 my $soon = shift @{ $self->outbox };
  10         19  
118 10         12 my $msg = shift @{ $self->inbox };
  10         17  
119 10         23 $soon->resolve($self, $msg);
120             }
121              
122 13 100       33 if ($self->is_shutdown) {
123 3         5 while (@{ $self->outbox }) {
  4         47  
124 1         2 my $soon = shift @{ $self->outbox };
  1         3  
125 1         3 $soon->resolve($self, undef);
126             }
127             }
128              
129 13         19 return;
130             }
131              
132             sub DEMOLISH {
133 1     1 0 1687 my $self = shift;
134 1         3 $self->shutdown;
135             }
136              
137              
138              
139 1     1 1 88 sub channel { Promises::Channel->new(@_) }
140 0     0 1   sub chan { Promises::Channel->new(@_) }
141              
142             1;
143              
144             __END__