File Coverage

blib/lib/IO/Async/FileStream.pm
Criterion Covered Total %
statement 102 107 95.3
branch 32 46 69.5
condition 8 16 50.0
subroutine 16 18 88.8
pod 4 7 57.1
total 162 194 83.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2024 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::FileStream 0.805;
7              
8 2     2   1483 use v5.14;
  2         9  
9 2     2   12 use warnings;
  2         3  
  2         210  
10              
11 2     2   15 use base qw( IO::Async::Stream );
  2         5  
  2         855  
12              
13 2     2   598 use IO::Async::File;
  2         9  
  2         139  
14              
15 2     2   18 use Carp;
  2         4  
  2         167  
16 2     2   16 use Fcntl qw( SEEK_SET SEEK_CUR );
  2         4  
  2         3042  
17              
18             =head1 NAME
19              
20             C - read the tail of a file
21              
22             =head1 SYNOPSIS
23              
24             =for highlighter language=perl
25              
26             use IO::Async::FileStream;
27              
28             use IO::Async::Loop;
29             my $loop = IO::Async::Loop->new;
30              
31             open my $logh, "<", "var/logs/daemon.log" or
32             die "Cannot open logfile - $!";
33              
34             my $filestream = IO::Async::FileStream->new(
35             read_handle => $logh,
36              
37             on_initial => sub {
38             my ( $self ) = @_;
39             $self->seek_to_last( "\n" );
40             },
41              
42             on_read => sub {
43             my ( $self, $buffref ) = @_;
44              
45             while( $$buffref =~ s/^(.*\n)// ) {
46             print "Received a line $1";
47             }
48              
49             return 0;
50             },
51             );
52              
53             $loop->add( $filestream );
54              
55             $loop->run;
56              
57             =head1 DESCRIPTION
58              
59             This subclass of L allows reading the end of a regular file
60             which is being appended to by some other process. It invokes the C
61             event when more data has been added to the file.
62              
63             This class provides an API identical to L when given a
64             C; it should be treated similarly. In particular, it can be given
65             an C handler, or subclassed to provide an C method, or even
66             used as the C for an L object.
67              
68             It will not support writing.
69              
70             To watch a file, directory, or other filesystem entity for updates of other
71             properties, such as C, see also L.
72              
73             =cut
74              
75             =head1 EVENTS
76              
77             The following events are invoked, either using subclass methods or CODE
78             references in parameters.
79              
80             Because this is a subclass of L in read-only mode, all the
81             events supported by C relating to the read handle are supported here.
82             This is not a full list; see also the documentation relating to
83             L.
84              
85             =head2 $ret = on_read \$buffer, $eof
86              
87             Invoked when more data is available in the internal receiving buffer.
88              
89             Note that C<$eof> only indicates that all the data currently available in the
90             file has now been read; in contrast to a regular L, this
91             object will not stop watching after this condition. Instead, it will continue
92             watching the file for updates.
93              
94             =head2 on_truncated
95              
96             Invoked when the file size shrinks. If this happens, it is presumed that the
97             file content has been replaced. Reading will then commence from the start of
98             the file.
99              
100             =head2 on_initial $size
101              
102             Invoked the first time the file is looked at. It is passed the initial size of
103             the file. The code implementing this method can use the C or
104             C methods to set the initial read position in the file to skip
105             over some initial content.
106              
107             This method may be useful to skip initial content in the file, if the object
108             should only respond to new content added after it was created.
109              
110             =cut
111              
112             sub _init
113             {
114 7     7   17 my $self = shift;
115 7         37 my ( $params ) = @_;
116              
117 7         45 $self->SUPER::_init( $params );
118              
119 7         25 $params->{close_on_read_eof} = 0;
120              
121 7         18 $self->{last_size} = undef;
122              
123 7         41 $self->add_child( $self->{file} = IO::Async::File->new(
124             on_devino_changed => $self->_replace_weakself( 'on_devino_changed' ),
125             on_size_changed => $self->_replace_weakself( 'on_size_changed' ),
126             ) );
127             }
128              
129             =head1 PARAMETERS
130              
131             The following named parameters may be passed to C or C, in
132             addition to the parameters relating to reading supported by
133             L.
134              
135             =head2 filename => STRING
136              
137             Optional. If supplied, watches the named file rather than the filehandle given
138             in C. The file will be opened by the constructor, and then
139             watched for renames. If the file is renamed, the new filename is opened and
140             tracked similarly after closing the previous file.
141              
142             =head2 interval => NUM
143              
144             Optional. The interval in seconds to poll the filehandle using C
145             looking for size changes. A default of 2 seconds will be applied if not
146             defined.
147              
148             =cut
149              
150             sub configure
151             {
152 8     8 1 15 my $self = shift;
153 8         31 my %params = @_;
154              
155 8         21 foreach (qw( on_truncated on_initial )) {
156 16 100       73 $self->{$_} = delete $params{$_} if exists $params{$_};
157             }
158              
159 8         19 foreach (qw( interval )) {
160 8 100       49 $self->{file}->configure( $_ => delete $params{$_} ) if exists $params{$_};
161             }
162 8 100 33     44 if( exists $params{filename} ) {
    50          
163 1         6 $self->{file}->configure( filename => delete $params{filename} );
164 1         4 $params{read_handle} = $self->{file}->handle;
165             }
166             elsif( exists $params{handle} or exists $params{read_handle} ) {
167 7   33     56 my $handle = delete $params{handle} // delete $params{read_handle};
168              
169 7         28 $self->{file}->configure( handle => $handle );
170 7         30 $params{read_handle} = $self->{file}->handle;
171             }
172              
173 8 50       24 croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle};
174              
175 8         46 $self->SUPER::configure( %params );
176              
177 8 100 66     21 if( $self->read_handle and !defined $self->{last_size} ) {
178 7         18 my $size = (stat $self->read_handle)[7];
179              
180 7         19 $self->{last_size} = $size;
181              
182 7         20 local $self->{running_initial} = 1;
183 7         44 $self->maybe_invoke_event( on_initial => $size );
184             }
185             }
186              
187             =head1 METHODS
188              
189             =cut
190              
191             # Replace IO::Async::Handle's implementation
192             sub _watch_read
193             {
194 21     21   34 my $self = shift;
195 21         39 my ( $want ) = @_;
196              
197 21 100       48 if( $want ) {
198 14 50       46 $self->{file}->start if !$self->{file}->is_running;
199             }
200             else {
201 7         50 $self->{file}->stop;
202             }
203             }
204              
205             sub _watch_write
206             {
207 7     7   16 my $self = shift;
208 7         18 my ( $want ) = @_;
209              
210 7 50       23 croak "Cannot _watch_write in " . ref($self) if $want;
211             }
212              
213             sub on_devino_changed
214             {
215 1 50   1 0 5 my $self = shift or return;
216              
217 1         4 $self->{renamed} = 1;
218 1         5 $self->debug_printf( "read tail of old file" );
219 1         3 $self->read_more;
220             }
221              
222             sub on_size_changed
223             {
224 9 50   9 0 27 my $self = shift or return;
225 9         26 my ( $size ) = @_;
226              
227 9 100       30 if( $size < $self->{last_size} ) {
228 1         20 $self->maybe_invoke_event( on_truncated => );
229 1         7 $self->{last_pos} = 0;
230             }
231              
232 9         17 $self->{last_size} = $size;
233              
234 9         84 $self->debug_printf( "read_more" );
235 9         34 $self->read_more;
236             }
237              
238             sub read_more
239             {
240 11     11 0 27 my $self = shift;
241              
242 11 100       55 sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ) if defined $self->{last_pos};
243              
244 11         69 $self->on_read_ready;
245              
246 11         35 $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell
247              
248 11 50       95 if( $self->{last_pos} < $self->{last_size} ) {
    100          
249 0     0   0 $self->loop->later( sub { $self->read_more } );
  0         0  
250             }
251             elsif( $self->{renamed} ) {
252 1         5 $self->debug_printf( "reopening for rename" );
253              
254 1         3 $self->{last_size} = 0;
255              
256 1 50       6 if( $self->{last_pos} ) {
257 1         4 $self->maybe_invoke_event( on_truncated => );
258 1         3 $self->{last_pos} = 0;
259 1     1   6 $self->loop->later( sub { $self->read_more } );
  1         4  
260             }
261              
262 1         8 $self->configure( read_handle => $self->{file}->handle );
263 1         8 undef $self->{renamed};
264             }
265             }
266              
267             sub write
268             {
269 0     0 1 0 carp "Cannot ->write from a ".ref($_[0]);
270             }
271              
272             =head2 seek
273              
274             $filestream->seek( $offset, $whence );
275              
276             Callable only during the C event. Moves the read position in the
277             filehandle to the given offset. C<$whence> is interpreted as for C,
278             should be either C, C or C. Will be set to
279             C if not provided.
280              
281             Normally this would be used to seek to the end of the file, for example
282              
283             on_initial => sub {
284             my ( $self, $filesize ) = @_;
285             $self->seek( $filesize );
286             }
287              
288             =cut
289              
290             sub seek
291             {
292 2     2 1 8 my $self = shift;
293 2         6 my ( $offset, $whence ) = @_;
294              
295 2 50       7 $self->{running_initial} or croak "Cannot ->seek except during on_initial";
296              
297 2   100     10 $whence //= SEEK_SET;
298              
299 2         7 sysseek( $self->read_handle, $offset, $whence );
300             }
301              
302             =head2 seek_to_last
303              
304             $success = $filestream->seek_to_last( $str_pattern, %opts );
305              
306             Callable only during the C event. Attempts to move the read
307             position in the filehandle to just after the last occurrence of a given match.
308             C<$str_pattern> may be a literal string or regexp pattern.
309              
310             Returns a true value if the seek was successful, or false if not. Takes the
311             following named arguments:
312              
313             =over 8
314              
315             =item blocksize => INT
316              
317             Optional. Read the file in blocks of this size. Will take a default of 8KiB if
318             not defined.
319              
320             =item horizon => INT
321              
322             Optional. Give up looking for a match after this number of bytes. Will take a
323             default value of 4 times the blocksize if not defined.
324              
325             To force it to always search through the entire file contents, set this
326             explicitly to C<0>.
327              
328             =back
329              
330             Because regular file reading happens synchronously, this entire method
331             operates entirely synchronously. If the file is very large, it may take a
332             while to read back through the entire contents. While this is happening no
333             other events can be invoked in the process.
334              
335             When looking for a string or regexp match, this method appends the
336             previously-read buffer to each block read from the file, in case a match
337             becomes split across two reads. If C is reduced to a very small
338             value, take care to ensure it isn't so small that a match may not be noticed.
339              
340             This is most likely useful for seeking after the last complete line in a
341             line-based log file, to commence reading from the end, while still managing to
342             capture any partial content that isn't yet a complete line.
343              
344             on_initial => sub {
345             my $self = shift;
346             $self->seek_to_last( "\n" );
347             }
348              
349             =cut
350              
351             sub seek_to_last
352             {
353 1     1 1 9 my $self = shift;
354 1         6 my ( $str_pattern, %opts ) = @_;
355              
356 1 50       7 $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial";
357              
358 1         2 my $offset = $self->{last_size};
359              
360 1   50     5 my $blocksize = $opts{blocksize} || 8192;
361              
362 1   33     8 $opts{horizon} //= $blocksize * 4;
363 1 50       5 my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0;
364 1 50       6 $horizon = 0 if $horizon < 0;
365              
366 1 50       29 my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/;
367              
368 1         4 my $prev = "";
369 1         5 while( $offset > $horizon ) {
370 3         6 my $len = $blocksize;
371 3 50       7 $len = $offset if $len > $offset;
372 3         7 $offset -= $len;
373              
374 3         27 sysseek( $self->read_handle, $offset, SEEK_SET );
375 3         8 sysread( $self->read_handle, my $buffer, $blocksize );
376              
377             # TODO: If $str_pattern is a plain string this could be more efficient
378             # using rindex
379 3 100       37 if( () = ( $buffer . $prev ) =~ m/$re/sg ) {
380             # $+[0] will be end of last match
381 1         7 my $pos = $offset + $+[0];
382 1         7 $self->seek( $pos );
383 1         23 return 1;
384             }
385              
386 2         7 $prev = $buffer;
387             }
388              
389 0           $self->seek( $horizon );
390 0           return 0;
391             }
392              
393             =head1 TODO
394              
395             =over 4
396              
397             =item *
398              
399             Move the actual file update watching code into L, possibly as
400             a new watch/unwatch method pair C.
401              
402             =item *
403              
404             Consider if a construction-time parameter of C or C
405             might be neater than a small code block in C, if that turns out to
406             be the only or most common form of use.
407              
408             =back
409              
410             =cut
411              
412             =head1 AUTHOR
413              
414             Paul Evans
415              
416             =cut
417              
418             0x55AA;