File Coverage

blib/lib/Queue/Q/DistFIFO.pm
Criterion Covered Total %
statement 114 117 97.4
branch 19 26 73.0
condition 11 17 64.7
subroutine 19 19 100.0
pod 0 12 0.0
total 163 191 85.3


line stmt bran cond sub pod time code
1             package Queue::Q::DistFIFO;
2 3     3   1786 use strict;
  3         8  
  3         77  
3 3     3   14 use warnings;
  3         5  
  3         82  
4 3     3   15 use Carp qw(croak);
  3         20  
  3         198  
5              
6 3     3   15 use List::Util ();
  3         5  
  3         61  
7 3     3   16 use Scalar::Util qw(refaddr blessed);
  3         5  
  3         179  
8              
9             use Class::XSAccessor {
10 3         25 getters => [qw(shards next_shard)],
11 3     3   15 };
  3         5  
12              
13             sub new {
14 2     2 0 13104 my $class = shift;
15 2         15 my $self = bless({
16             @_,
17             next_shard => 0,
18             } => $class);
19              
20 2 50 33     32 if (not defined $self->{shards}
      33        
21             or not ref($self->{shards}) eq 'ARRAY'
22 2         12 or not @{$self->{shards}})
23             {
24 0         0 croak("Need 'shards' parameter being an array of shards");
25             }
26              
27 2         6 $self->{shards_order} = [ List::Util::shuffle( @{$self->shards} ) ];
  2         67  
28              
29 2         9 return $self;
30             }
31              
32             sub _next_shard {
33 378     378   560 my $self = shift;
34 378         562 my $ns = $self->{next_shard};
35 378         561 my $so = $self->{shards_order};
36 378 100       474 if ($ns > $#{$so}) {
  378         926  
37 74         121 $ns = $self->{next_shard} = 0;
38             }
39 378         578 ++$self->{next_shard};
40 378         1024 return $so->[$ns];
41             }
42              
43             sub enqueue_item {
44 4     4 0 1027 my $self = shift;
45 4 50       20 croak("Need exactly one item to enqeue")
46             if not @_ == 1;
47 4         15 return $self->_next_shard->enqueue_item($_[0]);
48             }
49              
50             sub enqueue_items {
51 14     14 0 1063 my $self = shift;
52 14 50       33 return if not @_;
53 14         18 my @rv;
54 14         44 push @rv, $self->_next_shard->enqueue_item($_) for @_;
55 14         36 return @rv;
56             }
57              
58             sub enqueue_items_strict_ordering {
59 6     6 0 640 my $self = shift;
60 6 50       18 return if not @_;
61 6         16 my $shard = $self->_next_shard;
62 6         25 return $shard->enqueue_items(@_);
63             }
64              
65             sub claim_item {
66 66     66 0 15847 my $self = shift;
67             # FIXME very inefficient!
68 66         155 my $shard = $self->_next_shard;
69 66         158 my $first_shard_addr = refaddr($shard);
70 66         83 my $class;
71 66         90 while (1) {
72 120         350 my $item = $shard->claim_item;
73 120 100       298 if (defined $item) {
74 64 100 66     418 $item->{_shard} = $shard
75             if blessed($item)
76             and $item->isa('Queue::Q::ClaimFIFO::Item');
77 64         254 return $item;
78             }
79 56         116 $shard = $self->_next_shard;
80 56 100       178 return undef if refaddr($shard) == $first_shard_addr;
81             }
82             }
83              
84             sub claim_items {
85 6     6 0 1607 my ($self, $n) = @_;
86 6   100     25 $n ||= 1;
87              
88 6         15 my $nshards = $self->num_shards;
89 6         19 my $at_a_time = int( $n / $nshards );
90 6         12 my $left_over = $n % $nshards;
91 6         19 my @shard_items = (($at_a_time) x $nshards);
92 6         27 ++$shard_items[$_] for 0 .. ($left_over-1);
93              
94 6         8 my @elem;
95              
96 6         14 my $shard = $self->_next_shard;
97 6         49 my $first_shard_addr = refaddr($shard);
98 6         9 my $i = 0;
99 6         9 my $nmissing = 0;
100 6         8 while (1) {
101 18         29 my $thisn = $shard_items[$i];
102 18         63 my @items = $shard->claim_items($thisn);
103 18         30 $shard_items[$i] -= scalar @items;
104 18         28 $nmissing += $shard_items[$i];
105             @items = map {
106 18 100 66     32 $_->{_shard} = $shard
  22         125  
107             if blessed($_)
108             and $_->isa('Queue::Q::ClaimFIFO::Item');
109 22         60 $_
110             } @items;
111 18         33 push @elem, @items;
112 18         40 $shard = $self->_next_shard;
113 18 100 100     101 last if scalar(@elem) == $n
114             or refaddr($shard) == $first_shard_addr;
115 12         23 ++$i;
116             }
117              
118             # Fall back to naive mode - this could be done much
119             # better by redistributing the remaining items to the
120             # shards that had data... FIXME
121 6         17 for (1 .. $nmissing) {
122 2         8 my $item = $self->claim_item;
123 2 50       8 last if not defined $item;
124 0         0 push @elem, $item;
125             }
126              
127 6         41 return @elem;
128             }
129              
130             sub flush_queue {
131 4     4 0 1255 my $self = shift;
132 4         12 my $shards = $self->{shards};
133 4         22 for my $i (0..$#$shards) {
134 20         74 $shards->[$i]->flush_queue;
135             }
136 4         12 return();
137             }
138              
139             sub queue_length {
140 22     22 0 153 my $self = shift;
141 22         47 my $shards = $self->{shards};
142 22         30 my $len = 0;
143 22         69 for my $i (0..$#$shards) {
144 110         318 $len += $shards->[$i]->queue_length;
145             }
146 22         117 return $len;
147             }
148              
149             sub claimed_count {
150 10     10 0 22 my $self = shift;
151 10         20 my $shards = $self->{shards};
152 10         19 my $ccount = 0;
153 10         37 for my $i (0..$#$shards) {
154 50         80 my $shard = $shards->[$i];
155 50         163 my $meth = $shard->can("claimed_count");
156 50 50       124 if (not $meth) {
157 0         0 Carp::croak("Shard $i does not support claimed count. Is it of type NaiveFIFO?");
158             }
159 50         134 $ccount += $meth->($shard);
160             }
161 10         67 return $ccount;
162             }
163              
164             sub mark_item_as_done {
165 43     43 0 15619 my $self = shift;
166 43         66 my $item = shift;
167 43         79 my $shard = delete $item->{_shard};
168 43 50       113 die "Need item's shard to mark it as done! "
169             . "Or was this item previously marked as done?" if not $shard;
170 43         132 $shard->mark_item_as_done($item);
171             }
172              
173             sub mark_items_as_done {
174 1     1 0 1367 my $self = shift;
175 1         8 $self->mark_item_as_done($_) for @_;
176             }
177              
178             sub num_shards {
179 6     6 0 11 my $self = shift;
180 6         8 return scalar(@{ $self->{shards} });
  6         15  
181             }
182              
183             1;