File Coverage

blib/lib/AnyEvent/FIFO.pm
Criterion Covered Total %
statement 53 53 100.0
branch 11 14 78.5
condition 5 10 50.0
subroutine 11 11 100.0
pod 6 6 100.0
total 86 94 91.4


line stmt bran cond sub pod time code
1             package AnyEvent::FIFO;
2 3     3   2163 use strict;
  3         4  
  3         107  
3 3     3   1622 use AnyEvent;
  3         10910  
  3         72  
4 3     3   2870 use AnyEvent::Util ();
  3         46458  
  3         1596  
5              
6             our $VERSION = '0.00003';
7              
8             sub new {
9 2     2 1 1882 my $class = shift;
10 2         11 my $self = {
11             max_active => 1,
12             @_,
13             active => {},
14             events => {},
15             };
16 2   33     88 $self->{cv} ||= AE::cv;
17 2         10852 return bless($self, $class);
18             }
19              
20             sub push {
21 40     40 1 508 my ($self, $slot, $cb, @args) = @_;
22             # the first argument must be the name of the slot or a callback
23 40 100       87 if (ref $slot) {
24 10         12 unshift @args, $cb;
25 10         10 $cb = $slot;
26 10         14 $slot = "__default__";
27             }
28              
29 40         45 push @{$self->{events}->{$slot}}, [$cb, @args];
  40         129  
30 40         131 $self->{cv}->begin();
31              
32             AE::postpone sub {
33 40     40   13830 $self->drain();
34 40         277 };
35             }
36              
37             sub active {
38 40     40 1 54936 my ($self, $slot) = @_;
39 40 100       117 $slot = "__default__" unless(defined($slot));
40 40   50     209 return $self->{active}->{$slot} || 0;
41             }
42              
43             sub waiting {
44 40     40 1 67 my ($self, $slot) = @_;
45 40 100       117 $slot = "__default__" unless(defined($slot));
46 40 50       134 return $self->{events}->{$slot} ? (0 + @{$self->{events}->{$slot}}) : 0;
  40         243  
47             }
48              
49             sub cv {
50 2     2 1 21 my $self = shift;
51 2 50       15 $self->{cv} = $_[0] if(@_);
52 2         21 return $self->{cv};
53             }
54              
55             sub drain {
56 80     80 1 87 my $self = shift;
57              
58 80         85 my @slots = keys %{$self->{events}};
  80         231  
59 80         116 my $dispatched = 1;
60 80         231 while ($dispatched) {
61 100         287 $dispatched = 0;
62 100         130 foreach my $slot (@slots) {
63 240         479 my $events = $self->{events}->{$slot};
64 240 100 50     2876 if ( @$events && ($self->{active}->{$slot} ||= 0) < $self->{max_active} ) {
      66        
65 40         49 $dispatched++;
66 40         71 my $stuff = shift @$events;
67 40         117 my ($cb, @args) = @$stuff;
68 40         70 $self->{active}->{$slot}++;
69             $cb->( AnyEvent::Util::guard {
70 40     40   21540 $self->{active}->{$slot}--;
71 40 50       126 if ($self->{active}->{$slot} <= 0) {
72 40         90 delete $self->{active}->{$slot};
73             }
74 40         170 $self->{cv}->end();
75             AE::postpone sub {
76 40         173 $self->drain();
77 40         426 };
78 40         300 }, @args );
79             }
80             }
81             }
82             }
83              
84             1;
85              
86             __END__