File Coverage

blib/lib/Coro/PrioChannel.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Coro::PrioChannel;
2             {
3             $Coro::PrioChannel::VERSION = '0.005';
4             }
5 2     2   69682 use strict;
  2         5  
  2         93  
6 2     2   11 use warnings;
  2         4  
  2         355  
7              
8             # ABSTRACT: Priority message queues for Coro
9              
10              
11 2     2   7087 use Coro qw(:prio);
  0            
  0            
12             use Coro::Semaphore ();
13              
14             use List::Util qw(first sum);
15             use AnyEvent ();
16              
17             sub SGET() { 0 }
18             sub SPUT() { 1 }
19             sub REPRIO() { 2 }
20             sub NEXTCHECK() { 3 }
21             sub DATA() { 4 }
22             sub MAX() { PRIO_MAX - PRIO_MIN + DATA + 1 }
23              
24              
25             sub new {
26             # we cheat, just like Coro::Channel.
27             bless [
28             (Coro::Semaphore::_alloc 0), # counts data
29             (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
30             $_[2], # reprioritization check time
31             (defined $_[2] ? (AnyEvent->now + $_[2]) : undef), # last reprioritization check
32             [], # initially empty
33             ]
34             }
35              
36              
37             sub _put {
38             my $after = (AnyEvent->now + $_[0]->[REPRIO]) if defined $_[0]->[REPRIO];
39             push @{$_[0][DATA + ($_[2]||PRIO_NORMAL) - PRIO_MIN()]}, [$_[1], $after];
40             }
41              
42             sub put {
43             $_[0]->reprioritize;
44             _put @_;
45             Coro::Semaphore::up $_[0][SGET];
46             Coro::Semaphore::down $_[0][SPUT];
47             }
48              
49              
50             sub get {
51             Coro::Semaphore::down $_[0][SGET];
52             Coro::Semaphore::up $_[0][SPUT];
53              
54             my $a = first { $_ && scalar @$_ } reverse @{$_[0]}[DATA..MAX];
55              
56             ref $a ? shift(@$a)->[0] : undef;
57             }
58              
59              
60             sub reprioritize {
61             return unless defined $_[0]->[REPRIO];
62              
63             my $now = AnyEvent->now;
64             return unless $_[0]->[NEXTCHECK] <= $now;
65              
66             my $q = $_[0];
67             foreach my $pri (PRIO_MIN .. PRIO_HIGH) {
68             my $next_pri = $pri + 1;
69             my $idx = DATA + $pri - PRIO_MIN;
70             my @keep;
71              
72             foreach my $item (@{$q->[$idx]}) {
73             if ($item->[1] <= $now) {
74             _put $q, $item->[0], $next_pri;
75             } else {
76             push @keep, $item;
77             }
78             }
79              
80             $q->[$idx] = \@keep;
81             }
82            
83             $q->[NEXTCHECK] = $now + $q->[REPRIO];
84             return;
85             }
86              
87              
88             sub shutdown {
89             Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
90             }
91              
92              
93             sub size {
94             my $min = @_ > 1 ? $_[1] - PRIO_MIN + DATA : DATA;
95             sum map { $_ ? scalar @$_ : 0 } @{$_[0]}[$min..MAX];
96             }
97              
98              
99             1;
100              
101             __END__