File Coverage

blib/lib/Consumer/NonBlock.pm
Criterion Covered Total %
statement 154 175 88.0
branch 50 76 65.7
condition 20 32 62.5
subroutine 23 28 82.1
pod 11 13 84.6
total 258 324 79.6


line stmt bran cond sub pod time code
1             package Consumer::NonBlock;
2 1     1   245280 use strict;
  1         2  
  1         30  
3 1     1   5 use warnings;
  1         2  
  1         52  
4              
5             our $VERSION = '0.002';
6              
7 1     1   572 use IO::Handle;
  1         6034  
  1         65  
8 1     1   12 use File::Spec;
  1         5  
  1         23  
9              
10 1     1   3 use File::Path qw/remove_tree/;
  1         2  
  1         90  
11 1     1   1116 use File::Temp qw/tempdir/;
  1         14218  
  1         110  
12 1     1   10 use Fcntl qw/:flock/;
  1         6  
  1         118  
13 1     1   5 use Carp qw/croak confess/;
  1         1  
  1         44  
14 1     1   5 use Time::HiRes qw/sleep/;
  1         1  
  1         11  
15              
16 1         8 use Object::HashBase qw{
17            
18            
19            
20              
21            
22            
23              
24            
25              
26             +fh
27             +batch
28             +batch_item
29             +buffer
30 1     1   603 };
  1         6911  
31              
32 1     1 1 18 sub weaken { $_[0]->{+IS_WEAK} = 1 };
33              
34             sub init {
35 4     4 0 96 my $self = shift;
36              
37 4 50 66     26 croak "Must be either a reader or a writer" unless $self->{+IS_READER} || $self->{+IS_WRITER};
38 4 50 66     31 croak "Must only be a reader or a writer, not both" if $self->{+IS_READER} && $self->{+IS_WRITER};
39              
40 4 50       24 my $dir = $self->{+DIR} or croak "'dir' is a required attribute";
41 4 50       60 croak "Invalid directory '$dir'" unless -d $dir;
42              
43 4 100       13 if ($self->{+IS_WRITER}) {
44 2   50     13 my $data = $self->_update(mode => '+>', open => 1, batch_size => $self->{+BATCH_SIZE} // 100);
45 2   33     9 $self->{+BATCH_SIZE} //= $data->{batch_size};
46             }
47             else {
48 2         4 my $data = $self->_update();
49 2   33     7 $self->{+BATCH_SIZE} //= $data->{batch_size};
50 2 50       16 croak "Could not find batch size" unless defined $self->{+BATCH_SIZE};
51             }
52              
53 4         8 delete $self->{+FH};
54              
55 4         6 $self->{+BATCH} = 0;
56 4         14 $self->{+BATCH_ITEM} = 0;
57             }
58              
59             sub pair {
60 2     2 1 203276 my $class = shift;
61 2         9 my (%params) = @_;
62              
63 2         5 my $dir;
64 2 50 33     17 my $base = $params{base_dir} // ($params{shm} ? '/dev/shm' : ());
65 2 50       6 if ($base) {
66 0 0       0 croak "base dir '$base' is not valid" unless -d $base;
67 0         0 $dir = tempdir("ConsumerNonBlock-$$-XXXXXX", CLEANUP => 0, DIR => $base);
68             }
69             else {
70 2         51 $dir = tempdir("ConsumerNonBlock-$$-XXXXXX", CLEANUP => 0, TMPDIR => 1);
71             }
72              
73 2   50     8138 my $batch_size = $params{+BATCH_SIZE} // 100;
74              
75 2         5 $params{+BATCH_SIZE} = $batch_size;
76 2         6 $params{+DIR} = $dir;
77              
78 2         24 my $writer = $class->new(%params, is_writer => 1);
79 2         30 my $reader = $class->new(%params, is_reader => 1);
80              
81 2         22 return ($reader, $writer);
82             }
83              
84             sub reader {
85 0     0 1 0 my $class = shift;
86 0         0 my ($dir, %params) = @_;
87 0         0 return $class->new(%params, DIR() => $dir, is_reader => 1);
88             }
89              
90             sub reader_from_env {
91 0     0 1 0 my $class = shift;
92 0 0       0 my $dir = $ENV{CONSUMER_NONBLOCK_DIR} or croak 'The $CONSUMER_NONBLOCK_DIR env var is not set';
93 0         0 $class->reader($dir, @_);
94             }
95              
96             sub writer {
97 0     0 1 0 my $class = shift;
98 0         0 my ($dir, %params) = @_;
99 0         0 return $class->new(batch_size => 100, %params, DIR() => $dir, is_writer => 1);
100             }
101              
102             sub _data_file {
103 56     56   165 my $self_or_class = shift;
104 56         251 my ($dir) = @_;
105              
106 56 50       191 unless ($dir) {
107 56 50       226 croak "Cannot call data_file() on a class, must use an instance or pass in a directory"
108             unless ref $self_or_class;
109              
110 56         403 $dir = $self_or_class->dir;
111             }
112              
113 56         2012 return File::Spec->catfile($dir, 'data');
114             }
115              
116             sub _update {
117 56     56   150 my $self = shift;
118 56         213 my %data = @_;
119 56         139 my $write = @_;
120              
121 56         215 my $df = $self->_data_file;
122              
123 56   100     375 my $mode = delete($data{mode}) || '+<';
124              
125 56 50       4715 open(my $fh, $mode, $df) or confess "Could not open file '$df': $!";
126 56 100       960 flock($fh, $write ? LOCK_EX : LOCK_SH) or confess "Could not lock file '$df': $!";
    50          
127              
128 56         1061 while (my $line = <$fh>) {
129 108         1214 my ($key, $val) = ($line =~ m/^([^:]+):(.+)$/);
130 108 50       303 next unless $key;
131 108   100     1325 $data{$key} //= $val;
132             }
133              
134 56 100       186 if ($write) {
135 4         21 seek($fh, 0, 0);
136 4         36 print $fh "$_\:$data{$_}\n" for keys %data;
137             }
138              
139 56         595 $fh->flush();
140              
141 56 50       954 flock($fh, LOCK_UN) or confess "Could not unlock file '$df': $!";
142 56         809 close($fh);
143              
144 56         520 return \%data;
145             }
146              
147             sub set_env_var {
148 0     0 1 0 my $self = shift;
149 0         0 $ENV{CONSUMER_NONBLOCK_DIR} = $self->{+DIR};
150             }
151              
152             sub write {
153 0     0 1 0 my $self = shift;
154 0         0 my $count = 0;
155              
156 0         0 for my $item (@_) {
157 0 0       0 next unless defined $item;
158              
159 0         0 for my $line (split /\n/, $item) {
160 0         0 $count++;
161 0         0 $self->write_line($line);
162             }
163             }
164              
165 0         0 return $count;
166             }
167              
168             sub _check_batch_boundary {
169 77     77   273 my $self = shift;
170 77         585 my (%params) = @_;
171              
172 77 100       558 return if $self->{+BATCH_ITEM} < $self->{+BATCH_SIZE};
173              
174 4         95 delete $self->{+FH};
175 4         8 $self->{+BATCH_ITEM} = 0;
176              
177             unlink(File::Spec->catfile($self->{+DIR}, $self->{+BATCH}))
178 4 100       254 if $params{delete};
179              
180 4         12 $self->{+BATCH} += 1;
181              
182 4         9 return;
183             }
184              
185             sub _batch_fh {
186 77     77   156 my $self = shift;
187 77         205 my ($mode) = @_;
188              
189 77 100       491 return $self->{+FH} if $self->{+FH};
190              
191 8         85 my $file = File::Spec->catfile($self->{+DIR}, $self->{+BATCH});
192 8         15 my $fh;
193              
194             # Only open it if it exists, or we are creating it.
195 8 50 66     217 return unless -e $file || $mode eq '>';
196              
197 8         405 open($fh, $mode, $file);
198 8         37 return $self->{+FH} = $fh;
199             }
200              
201             sub write_raw {
202 3     3 0 1452 my $self = shift;
203 3         7 my ($raw) = @_;
204              
205 3 50       13 croak "No data to write" unless defined $raw;
206              
207 3         31 $self->_check_batch_boundary();
208 3         9 my $fh = $self->_batch_fh('>');
209              
210 3         24 print $fh $raw;
211 3         144 $fh->flush();
212 3         11 $self->{+BATCH_ITEM}++;
213              
214 3         11 return;
215             }
216              
217             sub write_line {
218 12     12 1 33 my $self = shift;
219 12         15 my ($line) = @_;
220              
221 12 50       27 croak "No line to write" unless defined $line;
222              
223 12         10 chomp($line);
224              
225 12         24 $self->_check_batch_boundary();
226 12         21 my $fh = $self->_batch_fh('>');
227              
228 12         36 print $fh $line, "\n";
229 12         12 $self->{+BATCH_ITEM}++;
230              
231 12         36 return;
232             }
233              
234             sub read_line {
235 15     15 1 79 my $self = shift;
236              
237 15         32 my $buffer = \$self->{+BUFFER};
238              
239 15         23 my $loop = 0;
240 15         18 while (1) {
241             # There must be a better way, maybe INotify?
242 63 100       976853 sleep(0.02) if $loop;
243 63   100     736 $loop ||= 1;
244              
245 62         857 $self->_check_batch_boundary(delete => 1);
246              
247 62         126 my ($fh, $line);
248              
249 62 50       797 if ($fh = $self->_batch_fh('<')) {
250 62 100       666 seek($fh, 0, 1) if $fh->eof;
251 62         2898 $line = <$fh>;
252 62 100       322 if (defined $line) {
253 14 100       34 unless (chomp($line)) {
254 1 50       9 $$buffer = defined($$buffer) ? $$buffer . $line : $line;
255 1         3 next;
256             }
257             }
258             }
259              
260 61 100 66     419 if ($fh && defined $line) {
261 13         20 $self->{+BATCH_ITEM}++;
262 13 100       26 if ($$buffer) {
263 1         4 delete $self->{+BUFFER};
264 1 50       15 return $$buffer . $line if $$buffer;
265             }
266              
267 12         38 return $line;
268             }
269             else {
270 48         281 my $data = $self->_update();
271 48 100       357 next if $data->{open};
272              
273 1         81 unlink(File::Spec->catfile($self->{+DIR}, $self->{+BATCH}));
274              
275 1         9 return undef;
276             }
277             }
278             }
279              
280             sub read_lines {
281 1     1 1 1007 my $self = shift;
282 1         3 my @out;
283              
284 1         3 while (my $line = $self->read_line) {
285 6         15 push @out => $line;
286             }
287              
288 1         8 return @out;
289             }
290              
291             sub DESTROY {
292 5     5   3568 my $self = shift;
293              
294 5 100       29 return if $self->is_weak;
295              
296 4 100       16 if ($self->{+IS_WRITER}) {
297 2         12 $self->_update(open => 0);
298 2         79 return;
299             }
300              
301 2 50       8 if ($self->{+IS_READER}) {
302 2         2136 remove_tree($self->{+DIR}, {safe => 1, keep_root => 0});
303 2         332 return;
304             }
305             }
306              
307 1     1 1 5 sub close { $_[0] = undef }
308              
309             1;
310              
311             __END__