File Coverage

blib/lib/Queue/Q/DistFIFO.pm
Criterion Covered Total %
statement 114 117 97.4
branch 19 26 73.0
condition 11 17 64.7
subroutine 19 19 100.0
pod 0 12 0.0
total 163 191 85.3


line stmt bran cond sub pod time code
1             package Queue::Q::DistFIFO;
2 3     3   1723 use strict;
  3         5  
  3         75  
3 3     3   13 use warnings;
  3         6  
  3         78  
4 3     3   15 use Carp qw(croak);
  3         17  
  3         125  
5              
6 3     3   14 use List::Util ();
  3         5  
  3         54  
7 3     3   15 use Scalar::Util qw(refaddr blessed);
  3         3  
  3         260  
8              
9             use Class::XSAccessor {
10 3         23 getters => [qw(shards next_shard)],
11 3     3   16 };
  3         4  
12              
13             sub new {
14 2     2 0 6084 my $class = shift;
15 2         8 my $self = bless({
16             @_,
17             next_shard => 0,
18             } => $class);
19              
20 2 50 33     17 if (not defined $self->{shards}
      33        
21             or not ref($self->{shards}) eq 'ARRAY'
22 2         9 or not @{$self->{shards}})
23             {
24 0         0 croak("Need 'shards' parameter being an array of shards");
25             }
26              
27 2         5 $self->{shards_order} = [ List::Util::shuffle( @{$self->shards} ) ];
  2         41  
28              
29 2         5 return $self;
30             }
31              
32             sub _next_shard {
33 378     378   437 my $self = shift;
34 378         540 my $ns = $self->{next_shard};
35 378         450 my $so = $self->{shards_order};
36 378 100       417 if ($ns > $#{$so}) {
  378         849  
37 74         104 $ns = $self->{next_shard} = 0;
38             }
39 378         547 ++$self->{next_shard};
40 378         889 return $so->[$ns];
41             }
42              
43             sub enqueue_item {
44 4     4 0 387 my $self = shift;
45 4 50       12 croak("Need exactly one item to enqeue")
46             if not @_ == 1;
47 4         11 return $self->_next_shard->enqueue_item($_[0]);
48             }
49              
50             sub enqueue_items {
51 14     14 0 723 my $self = shift;
52 14 50       30 return if not @_;
53 14         13 my @rv;
54 14         39 push @rv, $self->_next_shard->enqueue_item($_) for @_;
55 14         32 return @rv;
56             }
57              
58             sub enqueue_items_strict_ordering {
59 6     6 0 369 my $self = shift;
60 6 50       16 return if not @_;
61 6         14 my $shard = $self->_next_shard;
62 6         21 return $shard->enqueue_items(@_);
63             }
64              
65             sub claim_item {
66 66     66 0 8726 my $self = shift;
67             # FIXME very inefficient!
68 66         126 my $shard = $self->_next_shard;
69 66         129 my $first_shard_addr = refaddr($shard);
70 66         76 my $class;
71 66         70 while (1) {
72 120         300 my $item = $shard->claim_item;
73 120 100       253 if (defined $item) {
74 64 100 66     327 $item->{_shard} = $shard
75             if blessed($item)
76             and $item->isa('Queue::Q::ClaimFIFO::Item');
77 64         160 return $item;
78             }
79 56         97 $shard = $self->_next_shard;
80 56 100       153 return undef if refaddr($shard) == $first_shard_addr;
81             }
82             }
83              
84             sub claim_items {
85 6     6 0 1065 my ($self, $n) = @_;
86 6   100     20 $n ||= 1;
87              
88 6         13 my $nshards = $self->num_shards;
89 6         15 my $at_a_time = int( $n / $nshards );
90 6         8 my $left_over = $n % $nshards;
91 6         15 my @shard_items = (($at_a_time) x $nshards);
92 6         21 ++$shard_items[$_] for 0 .. ($left_over-1);
93              
94 6         6 my @elem;
95              
96 6         13 my $shard = $self->_next_shard;
97 6         38 my $first_shard_addr = refaddr($shard);
98 6         8 my $i = 0;
99 6         8 my $nmissing = 0;
100 6         7 while (1) {
101 18         21 my $thisn = $shard_items[$i];
102 18         53 my @items = $shard->claim_items($thisn);
103 18         28 $shard_items[$i] -= scalar @items;
104 18         29 $nmissing += $shard_items[$i];
105             @items = map {
106 18 100 66     26 $_->{_shard} = $shard
  22         111  
107             if blessed($_)
108             and $_->isa('Queue::Q::ClaimFIFO::Item');
109 22         49 $_
110             } @items;
111 18         28 push @elem, @items;
112 18         34 $shard = $self->_next_shard;
113 18 100 100     128 last if scalar(@elem) == $n
114             or refaddr($shard) == $first_shard_addr;
115 12         19 ++$i;
116             }
117              
118             # Fall back to naive mode - this could be done much
119             # better by redistributing the remaining items to the
120             # shards that had data... FIXME
121 6         14 for (1 .. $nmissing) {
122 2         5 my $item = $self->claim_item;
123 2 50       7 last if not defined $item;
124 0         0 push @elem, $item;
125             }
126              
127 6         30 return @elem;
128             }
129              
130             sub flush_queue {
131 4     4 0 620 my $self = shift;
132 4         14 my $shards = $self->{shards};
133 4         11 for my $i (0..$#$shards) {
134 20         57 $shards->[$i]->flush_queue;
135             }
136 4         10 return();
137             }
138              
139             sub queue_length {
140 22     22 0 114 my $self = shift;
141 22         30 my $shards = $self->{shards};
142 22         28 my $len = 0;
143 22         47 for my $i (0..$#$shards) {
144 110         291 $len += $shards->[$i]->queue_length;
145             }
146 22         85 return $len;
147             }
148              
149             sub claimed_count {
150 10     10 0 15 my $self = shift;
151 10         12 my $shards = $self->{shards};
152 10         20 my $ccount = 0;
153 10         27 for my $i (0..$#$shards) {
154 50         64 my $shard = $shards->[$i];
155 50         120 my $meth = $shard->can("claimed_count");
156 50 50       106 if (not $meth) {
157 0         0 Carp::croak("Shard $i does not support claimed count. Is it of type NaiveFIFO?");
158             }
159 50         112 $ccount += $meth->($shard);
160             }
161 10         48 return $ccount;
162             }
163              
164             sub mark_item_as_done {
165 43     43 0 9402 my $self = shift;
166 43         48 my $item = shift;
167 43         67 my $shard = delete $item->{_shard};
168 43 50       100 die "Need item's shard to mark it as done! "
169             . "Or was this item previously marked as done?" if not $shard;
170 43         106 $shard->mark_item_as_done($item);
171             }
172              
173             sub mark_items_as_done {
174 1     1 0 1048 my $self = shift;
175 1         5 $self->mark_item_as_done($_) for @_;
176             }
177              
178             sub num_shards {
179 6     6 0 11 my $self = shift;
180 6         6 return scalar(@{ $self->{shards} });
  6         14  
181             }
182              
183             1;