File Coverage

lib/AnyEvent/Tools/Buffer.pm
Criterion Covered Total %
statement 114 143 79.7
branch 44 70 62.8
condition 7 16 43.7
subroutine 18 18 100.0
pod 0 9 0.0
total 183 256 71.4


line stmt bran cond sub pod time code
1 1     1   5 use utf8;
  1         2  
  1         8  
2 1     1   22 use strict;
  1         1  
  1         30  
3 1     1   5 use warnings;
  1         2  
  1         54  
4              
5             package AnyEvent::Tools::Buffer;
6 1     1   5 use AnyEvent::AggressiveIdle qw(aggressive_idle stop_aggressive_idle);
  1         2  
  1         58  
7 1     1   4 use AnyEvent::Util;
  1         2  
  1         61  
8 1     1   5 use Carp;
  1         1  
  1         1363  
9              
10             sub new
11             {
12 4     4 0 7 my $class = shift;
13 4 50       17 croak "usage: buffer(on_flush => sub { ... }, ...)" if @_ % 2;
14              
15              
16 4         15 my (%opts) = @_;
17              
18 4   33     46 my $self = bless {
19             queue => [],
20             exists => {},
21             timer => undef,
22             lock => 0,
23             do_flush => 0,
24             unique_cb => undef,
25             } => ref($class) || $class;
26              
27 4         14 $self->on_flush($opts{on_flush});
28 4   100     20 $self->size($opts{size} || 0);
29 4   100     17 $self->interval($opts{interval} || 0);
30 4         14 $self->unique_cb($opts{unique_cb});
31              
32 4         15 return $self;
33             }
34              
35             sub interval
36             {
37 314     314 0 417 my ($self, $ival) = @_;
38 314 100       1950 return $self->{interval} if @_ == 1;
39 4 100       8 undef $self->{timer} unless $ival;
40 4         6 return $self->{interval} = $ival;
41             }
42              
43             sub on_flush
44             {
45 4     4 0 6 my ($self, $cb) = @_;
46 4 50 33     25 croak "callback must be CODEREF" if $cb and 'CODE' ne ref $cb;
47 4         45 return $self->{on_flush} = $cb;
48             }
49              
50             sub unique_cb
51             {
52 4     4 0 9 my ($self, $cb) = @_;
53              
54             # disable unique checking
55 4 100       8 unless($cb) {
56 3         4 $self->{exists} = {};
57 3         6 return $self->{unique_cb} = $cb;
58             }
59              
60 1 50       5 croak "unique_cb must be CODEREF" unless 'CODE' eq ref $cb;
61 1         4 $self->flush;
62 1         3 return $self->{unique_cb} = $cb;
63             }
64              
65             sub push :method
66             {
67 63846     63846 0 2255205 my ($self, @data) = @_;
68 63846 50       128003 if (@data) {
69 63846 100       126822 if ($self->{unique_cb}) {
70 63708         117717 while(@data) {
71 63708         81304 my $add = shift @data;
72 63708         148216 my $key = $self->{unique_cb}->($add);
73 63708 50 33     376437 croak "unique_cb must return defined SCALAR"
74             if ref $key or !defined($key);
75 63708 100       250986 next if exists $self->{exists}{$key};
76 1001         2367 $self->{exists}{$key} = 1;
77 1001         1093 push @{ $self->{queue} }, $add;
  1001         3584  
78             }
79             } else {
80 138         1892 push @{ $self->{queue} }, @data;
  138         243  
81             }
82             }
83              
84 63846         130299 $self->_check_buffer;
85 63846         253451 return;
86             }
87              
88             sub unshift :method
89             {
90 12962     12962 0 402752 my ($self, @data) = @_;
91 12962 50       25515 if (@data) {
92 12962 50       22045 if ($self->{unique_cb}) {
93 0         0 while(@data) {
94 0         0 my $add = pop @data;
95 0         0 my $key = $self->{unique_cb}->($add);
96 0 0 0     0 croak "unique_cb must return defined SCALAR"
97             if ref $key or !defined($key);
98 0 0       0 next if exists $self->{exists}{$key};
99 0         0 $_++ for values %{ $self->{exists} };
  0         0  
100 0         0 $self->{exists}{$key} = 1;
101 0         0 unshift @{ $self->{queue} }, $add;
  0         0  
102             }
103             } else {
104 12962         12159 unshift @{ $self->{queue} }, @data;
  12962         25722  
105             }
106             }
107              
108 12962         28565 $self->_check_buffer;
109 12962         49846 return;
110             }
111              
112             sub unshift_back
113             {
114 1     1 0 22 my ($self, $data) = @_;
115 1 50       4 croak "Guard has already been destroyed" unless $self->{lock};
116 1 50       4 unless ($self->{unique_cb}) {
117 1         1 unshift @{ $self->{queue} }, @$data;
  1         3  
118 1         2 return;
119             }
120              
121 0         0 my @buffer;
122 0         0 $self->{exists} = {};
123 0         0 for (@$data, @{ $self->{queue} }) {
  0         0  
124 0         0 my $key = $self->{unique_cb}->($_);
125 0 0       0 next if exists $self->{exists}{$key};
126 0         0 $self->{exists}{$key} = 1;
127 0         0 push @buffer, $_;
128             }
129 0         0 $self->{queue} = \@buffer;
130 0         0 return;
131             }
132              
133              
134             sub size
135             {
136 76926     76926 0 88604 my ($self, $value) = @_;
137 76926 100       275224 return $self->{size} if @_ == 1;
138 4         9 $self->{size} = $value;
139 4         11 $self->_check_buffer;
140 4         5 return $self->{size};
141             }
142              
143              
144             sub flush
145             {
146 132     132 0 413 my ($self) = @_;
147 132 100       162 return unless @{ $self->{queue} };
  132         689  
148 130 50       362 return unless $self->{on_flush};
149 130 50       496 if ($self->{lock}) {
150 0         0 $self->{do_flush} = 1;
151 0         0 return;
152             }
153 130         225 undef $self->{timer};
154 130         12603 my $queue = $self->{queue};
155 130         265 $self->{queue} = [];
156 130         412 $self->{exists} = {};
157             my $guard = guard sub {
158              
159 130 50   130   510 return unless $self; # it can be destroyed
160              
161 130         357 $self->{lock} = 0;
162              
163 130 50       524 if ($self->{do_flush}) {
164 0         0 $self->{do_flush} = 0;
165 0 0       0 return unless @{ $self->{queue} };
  0         0  
166             aggressive_idle sub { # avoid recursion
167 0         0 stop_aggressive_idle $_[0];
168 0 0       0 $self->flush if $self;
169 0         0 };
170 0         0 return;
171             }
172 130 50       1239 return unless $self;
173 130 100       188 return unless @{ $self->{queue} };
  130         2896  
174             aggressive_idle sub { # avoid recursion
175 1         16 stop_aggressive_idle $_[0];
176 1 50       10 $self->_check_buffer if $self; # can be destroyed again
177 1         5 };
178 1         15 return;
179 130         1667 };
180 130         245 $self->{lock} = 1;
181 130         877 $self->{on_flush}->($guard, $queue);
182 130         1598 return;
183             }
184              
185              
186             sub _check_buffer
187             {
188 76813     76813   93340 my ($self) = @_;
189              
190 76813 100       155561 return if $self->{lock};
191 76801 50       154966 return unless $self->{on_flush};
192              
193 76801 100       85619 unless (@{ $self->{queue} }) {
  76801         173103  
194 5         7 undef $self->{timer};
195 5         9 return;
196             }
197              
198 76796 100       144898 if ($self->size) {
199 126 100       111 if (@{ $self->{queue} } >= $self->size) {
  126         241  
200 26         43 $self->flush;
201 26         41 return;
202             }
203             }
204              
205 76770 100       192317 return if $self->{timer};
206 205 100       538 return unless $self->interval;
207 105     104   235 $self->{timer} = AE::timer $self->interval, 0 => sub { $self->flush };
  104         5763  
208 105         187 return;
209             }
210              
211             1;