File Coverage

blib/lib/Async/Selector/Aggregator.pm
Criterion Covered Total %
statement 36 36 100.0
branch 8 8 100.0
condition 12 15 80.0
subroutine 9 9 100.0
pod 5 5 100.0
total 70 73 95.8


line stmt bran cond sub pod time code
1             package Async::Selector::Aggregator;
2 2     2   32058 use strict;
  2         4  
  2         87  
3 2     2   11 use warnings;
  2         5  
  2         1205  
4 2     2   17 use Scalar::Util qw(blessed);
  2         5  
  2         116  
5 2     2   11 use Carp;
  2         4  
  2         1125  
6              
7             sub new {
8 53     53 1 47431 my ($class) = @_;
9 53         313 my $self = bless {
10             is_active => 1,
11             watchers => [],
12             }, $class;
13 53         186 return $self;
14             }
15              
16             sub add {
17 98     98 1 1849 my ($self, $watcher) = @_;
18 98 100 66     1221 if(!defined($watcher) || !blessed($watcher) || !$watcher->can('active') || !$watcher->can('cancel')) {
      66        
      66        
19 7         79 croak('watcher must be either Async::Selector::Watcher or Async::Selector::Aggregator');
20             }
21 91 100       332 if($self eq $watcher) {
22 2         45 croak('you cannot add the aggregator itself.');
23             }
24 89         215 my $w_active = $watcher->active;
25 89         290 my $s_active = $self->active;
26 89 100 100     563 if($w_active && !$s_active) {
    100 100        
27 12         39 $watcher->cancel();
28             }elsif(!$w_active && $s_active) {
29 5         15 $self->cancel();
30             }
31 89         102 push(@{$self->{watchers}}, $watcher);
  89         297  
32             }
33              
34             sub watchers {
35 66     66 1 129 my ($self) = @_;
36 66         70 return @{$self->{watchers}};
  66         387  
37             }
38              
39             sub active {
40 208     208 1 15030 my ($self) = @_;
41 208         641 return $self->{is_active};
42             }
43              
44             sub cancel {
45 46     46 1 171 my ($self) = @_;
46 46         86 $self->{is_active} = 0;
47 46         138 foreach my $w ($self->watchers) {
48 68         184 $w->cancel();
49             }
50             }
51              
52             our $VERSION = '1.02';
53              
54             1;
55              
56             =pod
57              
58             =head1 NAME
59              
60             Async::Selector::Aggregator - aggregator of watchers and other aggregators
61              
62             =head1 VERSION
63              
64             1.02
65              
66             =head1 SYNOPSIS
67              
68              
69             use Async::Selector;
70             use Async::Selector::Aggregator;
71            
72             ## Setup resources with 3 selectors, each of which registers 'resource'
73             my %resources = (
74             a => { val => 0, selector => Async::Selector->new },
75             b => { val => 0, selector => Async::Selector->new },
76             c => { val => 0, selector => Async::Selector->new },
77             );
78             foreach my $res (values %resources) {
79             $res->{selector}->register(resource => sub {
80             my ($threshold) = @_;
81             return $res->{val} >= $threshold ? $res->{val} : undef;
82             });
83             }
84            
85             ## Aggregate 3 selectors into one. Resource names are now ('a', 'b', 'c')
86             sub aggregate_watch {
87             my $callback = pop;
88             my %watch_spec = @_;
89             my $aggregator = Async::Selector::Aggregator->new();
90             foreach my $key (keys %watch_spec) {
91             my $watcher = $resources{$key}{selector}->watch(
92             resource => $watch_spec{$key}, sub {
93             my ($w, %res) = @_;
94             $callback->($aggregator, $key => $res{resource});
95             }
96             );
97             $aggregator->add($watcher);
98             last if !$aggregator->active;
99             }
100             return $aggregator;
101             }
102            
103             ## Treat 3 selectors like a single selector almost transparently.
104             ## $w and $watcher are actually an Async::Selector::Aggregator.
105             my $watcher = aggregate_watch(a => 3, b => 0, sub {
106             my ($w, %res) = @_;
107             handle_a($res{a}) if exists $res{a};
108             handle_b($res{b}) if exists $res{b};
109             $w->cancel;
110             });
111            
112             ## In this case, the callback is called immediately and $w->cancel is called.
113            
114             $watcher->active; ## => false
115              
116              
117             =head1 DESCRIPTION
118              
119             L is an object that keeps L objects and/or other aggregator objects
120             and treats them as a single watcher.
121             Using L, you can ensure that a certain set of watchers are always cancelled at the same time.
122             This is useful when you use multiple Ls and treat them as a single selector.
123              
124             Watchers and aggregators kept in an L are in one of the two states;
125             they are all active or they are all inactive.
126             No intermediate state is possible unless you call C method on individual watchers.
127             You should not C individual watchers once you aggregate them into an L object.
128              
129              
130             =head1 CLASS METHODS
131              
132             =head2 $aggregator = Async::Selector::Aggregator->new()
133              
134             Creates a new L object. It takes no argument.
135              
136             A newly created aggregator is active.
137              
138             =head1 OBJECT METHODS
139              
140             =head2 $aggregator->add($watcher)
141              
142             Adds the given C<$watcher> to the C<$aggregator>.
143             The C<$watcher> may be an L object or an L object.
144              
145             If C<$aggregator> is active and C<$watcher> is inactive, C<< $aggregator->cancel() >> is automatically called.
146             If C<$aggregator> is inactive and C<$watcher> is active, C<< $watcher->cancel() >> is automatically called.
147             This is because all watchers in the C<$aggregator> must share the same state.
148              
149             If C<$watcher> is the same instance as C<$aggregator>, it croaks.
150              
151             =head2 @watchers = $aggregator->watchers()
152              
153             Returns the list of all watchers kept in the C<$aggregator>.
154              
155             =head2 $is_active = $aggregator->active()
156              
157             Returns true if the C<$aggregator> is active. Returns false otherwise.
158              
159              
160             =head2 $aggregator->cancel()
161              
162             Cancels the C<$aggregator>, that is, changes its state into inactive.
163             This method also cancels all watchers kept in the C<$aggregator>.
164              
165              
166             =head1 AUTHOR
167              
168             Toshio Ito C<< >>
169              
170             =cut