File Coverage

blib/lib/Paranoid/IO/FileMultiplexer.pm
Criterion Covered Total %
statement 502 539 93.1
branch 127 210 60.4
condition 44 100 44.0
subroutine 46 46 100.0
pod 10 10 100.0
total 729 905 80.5


line stmt bran cond sub pod time code
1             # Paranoid::IO::FileMultiplexer -- File Multiplexer Object
2             #
3             # $Id: lib/Paranoid/IO/FileMultiplexer.pm, 2.09 2021/12/28 15:46:49 acorliss Exp $
4             #
5             # This software is free software. Similar to Perl, you can redistribute it
6             # and/or modify it under the terms of either:
7             #
8             # a) the GNU General Public License
9             # as published by the
10             # Free Software Foundation ; either version 1
11             # , or any later version
12             # , or
13             # b) the Artistic License 2.0
14             # ,
15             #
16             # subject to the following additional term: No trademark rights to
17             # "Paranoid" have been or are conveyed under any of the above licenses.
18             # However, "Paranoid" may be used fairly to describe this unmodified
19             # software, in good faith, but not as a trademark.
20             #
21             # (c) 2005 - 2021, Arthur Corliss (corliss@digitalmages.com)
22             # (tm) 2008 - 2021, Paranoid Inc. (www.paranoid.com)
23             #
24             #####################################################################
25              
26             #####################################################################
27             #
28             # Environment definitions
29             #
30             #####################################################################
31              
32             package Paranoid::IO::FileMultiplexer;
33              
34 11     11   7843 use 5.008;
  11         33  
35              
36 11     11   77 use strict;
  11         33  
  11         242  
37 11     11   44 use warnings;
  11         22  
  11         297  
38 11     11   55 use vars qw($VERSION);
  11         22  
  11         418  
39 11     11   55 use base qw(Exporter);
  11         33  
  11         649  
40 11     11   66 use Paranoid;
  11         22  
  11         429  
41 11     11   6512 use Paranoid::IO qw(:all);
  11         22  
  11         2112  
42 11     11   88 use Paranoid::Debug qw(:all);
  11         33  
  11         1518  
43 11     11   110 use Carp;
  11         22  
  11         583  
44 11     11   66 use Fcntl qw(:DEFAULT :flock :mode :seek);
  11         11  
  11         5071  
45 11     11   6402 use Paranoid::IO::FileMultiplexer::Block::FileHeader;
  11         22  
  11         550  
46 11     11   88 use Paranoid::IO::FileMultiplexer::Block::StreamHeader;
  11         22  
  11         363  
47 11     11   66 use Paranoid::IO::FileMultiplexer::Block::BATHeader;
  11         22  
  11         660  
48              
49             ($VERSION) = ( q$Revision: 2.09 $ =~ /(\d+(?:\.\d+)+)/sm );
50              
51 11     11   66 use constant PIOFMVER => '1.0';
  11         22  
  11         1672  
52 11     11   121 use constant PERMMASK => 0666;
  11         33  
  11         638  
53 11     11   66 use constant DEFBSIZE => 4096;
  11         22  
  11         462  
54              
55 11     11   66 use constant ADDR_BAT => 0;
  11         22  
  11         429  
56 11     11   66 use constant ADDR_BLK => 1;
  11         22  
  11         429  
57 11     11   55 use constant ADDR_OFT => 2;
  11         22  
  11         54857  
58              
59             #####################################################################
60             #
61             # Module code follows
62             #
63             #####################################################################
64              
65             sub new {
66              
67             # Purpose: Creates a PIOFM object for manipulation
68             # Returns: Object reference or undef
69             # Usage: $obj = Paranoid::IO::FileMultiplexer->new(
70             # file => $fn,
71             # readOnly => 0,
72             # perms => $perms,
73             # blockSize => $bsize,
74             # );
75              
76 154     154 1 6402 my $class = shift;
77 154         572 my %args = @_;
78 154         2288 my $self = {
79             file => undef,
80             readOnly => 0,
81             perms => PERMMASK ^ umask,
82             header => undef,
83             streams => {},
84             streamPos => {},
85             blockSize => DEFBSIZE,
86             corrupted => 0,
87             %args
88             };
89              
90             pdebug( 'entering w/f: %s bs: %s p: %s ro: %s',
91 154         913 PDLEVEL1, @args{qw(file blockSize perms readOnly)} );
92 154         440 pIn();
93              
94 154         297 bless $self, $class;
95              
96             # Mandatory file name required
97             $self = undef
98 154 100 100     902 unless defined $args{file} and length $args{file};
99              
100 154 100       385 if ( defined $self ) {
101              
102             # Enable the lock stack
103 132         341 PIOLOCKSTACK = 1;
104              
105             # Attempt to open the file
106 132 50       297 if ( $$self{ro} ) {
107 0 0       0 $self = undef unless $self->_oldFile;
108             } else {
109 132 100 100     407 $self = undef unless $self->_newFile or $self->_oldFile;
110             }
111              
112             } else {
113 22         77 pdebug( 'invalid file name: %s', PDLEVEL1, $args{file} );
114             }
115              
116 154         440 pOut();
117 154         385 pdebug( 'leaving w/%s', PDLEVEL1, $self );
118              
119 154         847 return $self;
120             }
121              
122             sub _newFile {
123              
124             # Purpose: Attempts to open the file as a new file
125             # Returns: Boolean
126             # Usage: $rv = $obj->_newFile;
127              
128 132     132   209 my $self = shift;
129 132         275 my $file = $$self{file};
130 132         198 my $bsize = $$self{blockSize};
131 132         286 my $rv = 0;
132 132         209 my $header;
133              
134 132         495 pdebug( 'entering', PDLEVEL2 );
135 132         352 pIn();
136              
137 132 50       451 if ( !$$self{readOnly} ) {
138              
139             # Allocate the header object (it will fail on invalid block sizes)
140 132         649 $header =
141             Paranoid::IO::FileMultiplexer::Block::FileHeader->new( $file,
142             $bsize );
143 132 100       396 if ( defined $header ) {
144              
145             # Open the file exclusively and get an flock
146 99         407 $rv = popen( $file, O_CREAT | O_RDWR | O_EXCL, $$self{perms} );
147 99 100       275 if ($rv) {
148              
149             # Lock file
150 77         275 pflock( $file, LOCK_EX );
151              
152             # Allocate the block and write the initial signature
153 77 50       561 $rv = $header->allocate and $header->writeSig;
154 77 50       242 $$self{header} = $header if $rv;
155              
156             # Release the lock
157 77         242 pflock( $file, LOCK_UN );
158             }
159             }
160             } else {
161 0         0 pdebug( 'cannot create a new file in readOnly mode', PDLEVEL1 );
162             }
163              
164 132         374 pOut();
165 132         495 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
166              
167 132         748 return $rv;
168             }
169              
170             sub _oldFile {
171              
172             # Purpose: Attempts to open the file as a new file
173             # Returns: Boolean
174             # Usage: $rv = $obj->_newFile;
175              
176 55     55   99 my $self = shift;
177 55         143 my $file = $$self{file};
178 55         88 my $bsize = $$self{blockSize};
179 55         88 my $rv = 0;
180 55         88 my $header;
181              
182 55         143 pdebug( 'entering', PDLEVEL2 );
183 55         143 pIn();
184              
185             # Allocate the header object (it will fail on invalid block sizes)
186 55         253 $header = Paranoid::IO::FileMultiplexer::Block::FileHeader->new( $file,
187             $bsize );
188 55 100       198 if ( defined $header ) {
189              
190             # Open the file exclusively and get an flock
191             $rv = popen( $file, ( $$self{readOnly} ? O_RDONLY : O_RDWR ),
192 22 50       121 $$self{perms} );
193 22 50       110 if ($rv) {
194              
195             # Lock file
196 22         88 pflock( $file, LOCK_SH );
197              
198             # Read an existing signature
199 22   66     99 $rv = $header->readSig && $header->readStreams;
200 22 100       66 if ($rv) {
201 11         33 $$self{header} = $header;
202 11         44 $$self{blockSize} = $header->blockSize;
203             }
204              
205             # Release the lock
206 22         99 pflock( $file, LOCK_UN );
207             }
208             }
209              
210 55         165 pOut();
211 55         220 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
212              
213 55         341 return $rv;
214             }
215              
216             sub header {
217              
218             # Purpose: Returns a reference to the header object
219             # Returns: Ref
220             # Usage: $header = $obj->header;
221              
222 363     363 1 129679 my $self = shift;
223 363         2475 return $$self{header};
224             }
225              
226             sub _reload {
227              
228             # Purpose: Reloads the file header information and purges the stream
229             # cache
230             # Returns: Boolean
231             # Usage: $rv = $obj->_reload;
232              
233 8     8   35 my $self = shift;
234 8         37 my $file = $$self{file};
235 8         80 my $header = $$self{header};
236 8         29 my $rv = 1;
237              
238 8         91 pdebug( 'entering', PDLEVEL4 );
239 8         38 pIn();
240              
241 8 50       77 if ( pflock( $file, LOCK_SH ) ) {
242 8 50 33     169 if ( $header->readSig && $header->readStreams ) {
243 8         116 $$self{streams} = {};
244             } else {
245 0         0 $$self{corrupt} = 1;
246 0         0 $rv = 0;
247             }
248 8         47 pflock( $file, LOCK_UN );
249             }
250              
251 8         41 pOut();
252 8         31 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
253              
254 8         20 return $rv;
255             }
256              
257             sub _getStream {
258              
259             # Purpose: Retrieves or creates a stream header object
260             # Returns: Ref
261             # Usage: $ref = $obj->_getStream($name);
262              
263 2948     2948   5226 my $self = shift;
264 2948         4404 my $sname = shift;
265 2948         6060 my $header = $$self{header};
266 2948         4511 my $file = $$self{file};
267 2948         5012 my ( $rv, %streams, $stream );
268              
269 2948         7879 pdebug( 'entering w/%s', PDLEVEL2, $sname );
270 2948         7638 pIn();
271              
272 2948 50 33     13237 if ( defined $sname and length $sname ) {
273              
274             # Reload if header fails validation
275 2948 100       11815 $self->_reload unless $header->validateBlocks;
276              
277             # Create the stream object if we don't have one cached
278 2948 100       10422 unless ( exists $$self{streams}{$sname} ) {
279 34         179 %streams = $header->streams;
280 34 50       151 if ( exists $streams{$sname} ) {
281             $stream =
282             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new(
283 34         192 $$self{file}, $streams{$sname}, $header->blockSize,
284             $sname );
285 34 50       144 if ( pflock( $file, LOCK_SH ) ) {
286 34 50 33     396 $$self{streams}{$sname} = $stream
287             if $stream->readSig
288             and $stream->readBATs;
289 34         164 pflock( $file, LOCK_UN );
290             }
291 34 50       154 unless ( exists $$self{streams}{$sname} ) {
292 0         0 pdebug( 'stream \'%s\' failed consistency checks',
293             PDLEVEL1, $sname );
294 0         0 $$self{corrupt} = 1;
295             }
296             } else {
297 0         0 pdebug( 'attempted to access a non-existent stream (%s)',
298             PDLEVEL1, $sname );
299             }
300             }
301              
302             # Retrieve a reference to the stream object
303             $stream =
304             exists $$self{streams}{$sname}
305 2948 50       8805 ? $$self{streams}{$sname}
306             : undef;
307              
308             # Reload stream signature if EOS has changed outside of this process
309 2948 50       6572 if ( defined $stream ) {
310 2948 50       10554 unless ( $stream->validateEOS ) {
311 0 0 0     0 unless ( $stream->readSig and $stream->readBATs ) {
312 0         0 $stream = undef;
313 0         0 pdebug( 'stream \'%s\' failed consistency checks',
314             PDLEVEL1, $sname );
315 0         0 $$self{corrupt} = 1;
316             }
317             }
318              
319             # Return the stream reference
320 2948         5842 $rv = $stream;
321             }
322             }
323              
324 2948         7030 pOut();
325 2948         6873 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
326              
327 2948         8960 return $rv;
328             }
329              
330             sub _getBAT {
331              
332             # Purpose: Returns a BAT which has been loaded and validated
333             # Returns: Ref
334             # Usage: $ref = $obj->_getBAT($sname, $seq);
335              
336 1215     1215   2072 my $self = shift;
337 1215         2455 my $sname = shift;
338 1215         2320 my $seq = shift;
339 1215         2424 my $file = $$self{file};
340 1215         2354 my ( $rv, $stream, @bats, $bat );
341              
342 1215         3522 pdebug( 'entering w/(%s)(%s)', PDLEVEL4, $sname, $seq );
343 1215         3442 pIn();
344              
345 1215         3010 $stream = $self->_getStream($sname);
346 1215 50       3096 if ( defined $stream ) {
347              
348             # Get the list of BATs
349 1215         3821 @bats = $stream->bats;
350              
351 1215 50       4827 if ( $seq <= $#bats ) {
352             $bat = Paranoid::IO::FileMultiplexer::Block::BATHeader->new(
353 1215         7378 $$self{file}, $bats[$seq], $$self{blockSize}, $sname, $seq );
354 1215 50       3615 if ( pflock( $file, LOCK_SH ) ) {
355 1215 50 33     6102 $rv = $bat
      33        
356             if defined $bat
357             and $bat->readSig
358             and $bat->readData;
359 1215         4739 pflock( $file, LOCK_UN );
360             }
361 1215 50       3935 pdebug( 'BAT %s for stream \'%s\' failed consistency checks',
362             PDLEVEL1, $seq, $sname )
363             unless $rv;
364             }
365             }
366              
367 1215         3316 pOut();
368 1215         3074 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
369              
370 1215         4054 return $rv;
371             }
372              
373             sub _chkData {
374              
375             # Purpose: Checks that a data block appears to be present
376             # Returns: Boolean
377             # Usage: $rv = $obj->_chkData;
378              
379 22     22   55 my $self = shift;
380 22         55 my $bn = shift;
381 22         66 my $file = $$self{file};
382 22         66 my $bsize = $$self{blockSize};
383 22         55 my ( $rv, $block, $raw );
384              
385 22         88 pdebug( 'entering w/%s', PDLEVEL4, $bn );
386 22         154 pIn();
387              
388 22         99 $block = Paranoid::IO::FileMultiplexer::Block->new( $file, $bn, $bsize );
389 22   33     176 $rv = ( defined $block and $block->bread( \$raw, 0, 1 ) == 1 );
390              
391 22 50       99 unless ($rv) {
392 0         0 pdebug( 'data block list at dn %s but cannot be read', PDLEVEL1,
393             $bn );
394 0         0 $rv = 0;
395 0         0 $$self{corrupted} = 1;
396             }
397              
398 22         88 pOut();
399 22         77 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
400              
401 22         165 return $rv;
402             }
403              
404             sub _chkBAT {
405              
406             # Purpose: Checks that a BAT appears consistent
407             # Returns: Boolean
408             # Usage: $rv = $obj->_chkBAT($bn, $snmae, $seq);
409              
410 22     22   55 my $self = shift;
411 22         44 my $bn = shift;
412 22         55 my $sname = shift;
413 22         33 my $seq = shift;
414 22         44 my $file = $$self{file};
415 22         44 my $bsize = $$self{blockSize};
416 22         55 my ( $rv, $block, @data );
417              
418 22         66 pdebug( 'entering w/%s', PDLEVEL4, $bn );
419 22         77 pIn();
420              
421 22         121 $block = Paranoid::IO::FileMultiplexer::Block::BATHeader->new( $file, $bn,
422             $bsize, $sname, $seq );
423 22   33     143 $rv = ( defined $block and $block->readSig and $block->readData );
424              
425 22 50       132 unless ($rv) {
426 0         0 pdebug( 'BAT at bn %s fails consistency checks', PDLEVEL1, $bn );
427 0         0 $rv = 0;
428 0         0 $$self{corrupted} = 1;
429             }
430              
431 22 50       88 if ($rv) {
432 22         99 @data = $block->dataBlocks;
433 22 50       77 foreach (@data) { $rv = 0 unless $self->_chkData($_) }
  22         99  
434             }
435              
436 22         55 pOut();
437 22         55 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
438              
439 22         165 return $rv;
440             }
441              
442             sub _chkStream {
443              
444             # Purpose: Checks that a stream appears consistent
445             # Returns: Boolean
446             # Usage: $rv = $obj->_chkStream($bn, $sname);
447              
448 44     44   77 my $self = shift;
449 44         121 my $bn = shift;
450 44         66 my $sname = shift;
451 44         121 my $file = $$self{file};
452 44         110 my $bsize = $$self{blockSize};
453 44         77 my ( $rv, $i, $block, @bats );
454              
455 44         154 pdebug( 'entering w/%s', PDLEVEL4, $bn );
456 44         506 pIn();
457              
458 44         176 $block =
459             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new( $file, $bn,
460             $bsize, $sname );
461 44   66     286 $rv = ( defined $block and $block->readSig and $block->readBATs );
462              
463 44 100       176 unless ($rv) {
464 11         66 pdebug( 'Stream at bn %s (%s) fails consistency checks',
465             PDLEVEL1, $bn, $sname, $sname, $sname );
466 11         33 $rv = 0;
467 11         22 $$self{corrupted} = 1;
468             }
469              
470 44 100       121 if ($rv) {
471 33         143 @bats = $block->bats;
472 33         88 $i = 0;
473 33         77 foreach (@bats) {
474 22 50       110 $rv = 0 unless $self->_chkBAT( $_, $sname, $i );
475 22         55 $i++;
476             }
477             }
478              
479 44         132 pOut();
480 44         121 pdebug( 'leaving w/rv: %s', PDLEVEL4, $rv );
481              
482 44         330 return $rv;
483             }
484              
485             sub chkConsistency {
486              
487             # Purpose: Checks the file for consistency
488             # Returns: Boolean
489             # Usage: $rv = $obj->chkConsistency;
490              
491 22     22 1 88 my $self = shift;
492 22         55 my $file = $$self{file};
493 22         55 my $header = $$self{header};
494 22         88 my $bsize = $$self{blockSize};
495 22         55 my $rv = 1;
496 22         33 my %streams;
497              
498 22         99 pdebug( 'entering', PDLEVEL1 );
499 22         77 pIn();
500              
501             # TODO: There is one major flaw in this consistency check, in that is
502             # TODO: possible to list a header block as a data block in a BAT.
503             # TODO: Writes to said block will obviously introduce consistency errors
504             # TODO: and corruption in the future. Depending on the size of the file,
505             # TODO: however, doing an exhaustive search on all data blocks and making
506             # TODO: sure they're not in use as a header block could be memory
507             # TODO: intensive. We might have to bite the bullet, though.
508             #
509             # Possible solution (which isn't perfect): look for signatures and see if
510             # they load error free. I.e., any block that starts with PIOFM. If we've
511             # already passed the rest of the consistency checks, anything pointing to
512             # what looks like a header block, but doesn't pass consistency checks, we
513             # really don't care about. We might warn if it does pass, though, and
514             # then brute-force check each data block number against a full list of
515             # stream/BAT block numbers.
516              
517             # Apply a read lock for the duration
518 22 50       88 if ( pflock( $file, LOCK_SH ) ) {
519              
520             # Check header
521 22 50 33     121 if ( $header->readSig && $header->readStreams ) {
522              
523             # Check streams
524 22         418 %streams = $header->streams;
525 22         143 foreach ( sort keys %streams ) {
526 44 100       176 $rv = 0 unless $self->_chkStream( $streams{$_}, $_ );
527             }
528              
529             } else {
530 0         0 pdebug( 'file header failed consistency checks', PDLEVEL1 );
531 0         0 $$self{corrupted} = 1;
532 0         0 $rv = 0;
533             }
534              
535 22         110 pflock( $file, LOCK_UN );
536              
537             } else {
538 0         0 pdebug( 'failed to get a read lock', PDLEVEL1 );
539 0         0 $rv = 0;
540             }
541              
542 22 100       88 if ($rv) {
543 11         33 $$self{corrupted} = 0;
544             } else {
545 11         33 $$self{corrupted} = 1;
546 11         44 pdebug( 'error - setting corrupted flag to true', PDLEVEL1 );
547             }
548              
549 22         99 pOut();
550 22         88 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
551              
552 22         209 return $rv;
553             }
554              
555             sub _addBlock {
556              
557             # Purpose: Adds a data block to the file and updates the file header
558             # Returns: Integer (block number of new block)
559             # Usage: $bn = $self->_addBlock;
560              
561 114     114   306 my $self = shift;
562 114         264 my $header = $$self{header};
563 114         218 my ( $rv, $bn, $data );
564              
565 114         401 pdebug( 'entering', PDLEVEL2 );
566 114         363 pIn();
567              
568 114         317 $bn = $header->blocks;
569             $data =
570             Paranoid::IO::FileMultiplexer::Block->new( $$self{file}, $bn,
571 114         634 $$self{blockSize} );
572 114 50 33     642 $rv = $bn if defined $data and $data->allocate and $header->incrBlocks;
      33        
573              
574 114         421 pOut();
575 114         318 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
576              
577 114         727 return $rv;
578             }
579              
580             sub _addBAT {
581              
582             # Purpose: Adds a BAT to the file and updates the file header, and calls
583             # _addBlock
584             # Returns: Integer (block number of new BAT)
585             # Usage: $bn = $self->_addBAT($sname, $seq);
586              
587 44     44   143 my $self = shift;
588 44         132 my $sname = shift;
589 44         77 my $seq = shift;
590 44         99 my $header = $$self{header};
591 44         99 my ( $rv, $bn, $bat );
592              
593 44         143 pdebug( 'entering', PDLEVEL2 );
594 44         143 pIn();
595              
596 44         187 $bn = $header->blocks;
597             $bat =
598             Paranoid::IO::FileMultiplexer::Block::BATHeader->new( $$self{file},
599 44         1199 $bn, $$self{blockSize}, $sname, $seq );
600 44 50 33     341 $rv = $bn
      33        
      33        
601             if defined $bat
602             and $bat->allocate
603             and $bat->writeSig
604             and $header->incrBlocks;
605              
606 44 50       385 $bat->addData( $self->_addBlock ) if defined $rv;
607              
608 44         165 pOut();
609 44         143 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
610              
611 44         429 return $rv;
612             }
613              
614             sub _addStream {
615              
616             # Purpose: Adds a Stream to the file and updates the file header, and calls
617             # _addBAT
618             # Returns: Integer (block number of new stream)
619             # Usage: $bn = $self->_addStream($sname);
620              
621 33     33   88 my $self = shift;
622 33         66 my $sname = shift;
623 33         99 my $header = $$self{header};
624 33         77 my ( $rv, $bn, $stream );
625              
626 33         110 pdebug( 'entering', PDLEVEL2 );
627 33         99 pIn();
628              
629 33         110 $bn = $header->blocks;
630             $stream =
631             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new( $$self{file},
632 33         286 $bn, $$self{blockSize}, $sname );
633 33 50 33     363 $rv = $bn
      33        
      33        
634             if defined $stream
635             and $stream->allocate
636             and $stream->writeSig
637             and $header->incrBlocks;
638              
639 33 50       264 $stream->addBAT( $self->_addBAT( $sname, 0 ) ) if defined $rv;
640              
641 33         143 pOut();
642 33         110 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
643              
644 33         198 return $rv;
645             }
646              
647             sub addStream {
648              
649             # Purpose: Adds the requested stream
650             # Returns: Boolean
651             # Usage: $rv = $obj->addStream($name);
652              
653 44     44 1 220 my $self = shift;
654 44         110 my $sname = shift;
655 44         99 my $file = $$self{file};
656 44         121 my $header = $$self{header};
657 44   66     231 my $bypass = $$self{readOnly} || $$self{corrupted};
658 44         99 my $rv = 0;
659              
660 44         165 pdebug( 'entering w/(%s)', PDLEVEL1, $sname );
661 44         154 pIn();
662              
663 44 100       154 unless ($bypass) {
664              
665             # Get an exclusive lock
666 33 50       143 if ( pflock( $file, LOCK_EX ) ) {
667              
668             # Validate file header block count
669 33         88 $rv = 1;
670 33 50       176 $rv = $self->_reload unless $header->validateBlocks;
671              
672             # Add the stream
673 33 50 50     198 $rv = $header->addStream( $sname, $header->blocks )
674             and $self->_addStream($sname)
675             if $rv;
676              
677             # Release the lock
678 33         143 pflock( $file, LOCK_UN );
679              
680             } else {
681 0         0 pdebug( 'failed to get an exclusive lock', PDLEVEL1 );
682             }
683             }
684              
685 44         143 pOut();
686 44         132 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
687              
688 44         231 return $rv;
689             }
690              
691             sub _calcAddr {
692              
693             # Purpose: Calculates the (BAT, Data, offset) address of the stream
694             # position
695             # Returns: Array (BAT #, Data #, offset)
696             # Usage: @addr = $self->_calcAddr($pos);
697              
698 1408     1408   3070 my $self = shift;
699 1408         3465 my $pos = shift;
700 1408         3191 my $bsize = $$self{blockSize};
701 1408         2614 my ( @rv, $bat, $max );
702              
703 1408 100       4477 if ( $pos < $bsize ) {
704 549         1754 @rv = ( 0, 0, $pos );
705             } else {
706              
707             $bat = Paranoid::IO::FileMultiplexer::Block::BATHeader->new(
708 859         3878 $$self{file}, 0, $bsize );
709 859 50       2179 if ( defined $bat ) {
710 859         2235 $max = $bat->maxData;
711              
712 859         3838 $rv[ADDR_BAT] = int( $pos / ( $max * $bsize ) );
713 859         3170 $rv[ADDR_BLK] =
714             int( ( $pos - ( $rv[ADDR_BAT] * $max * $bsize ) ) / $bsize );
715 859         3736 $rv[ADDR_OFT] = $pos -
716             ( $rv[ADDR_BAT] * $max * $bsize + $rv[ADDR_BLK] * $bsize );
717              
718             }
719             }
720              
721 1408         13901 return @rv;
722             }
723              
724             sub strmSeek {
725              
726             # Purpose: Updates the stream cursor position
727             # Returns: Integer/undef on error
728             # Usage: $rv = $obj->_strmSeek($sname, $pos, $whence);
729              
730 1702     1702 1 2652 my $self = shift;
731 1702         2469 my $sname = shift;
732 1702         3094 my $pos = shift;
733 1702         2749 my $whence = shift;
734 1702         2381 my $cur = 0;
735 1702         2324 my $rv = 1;
736              
737 1702         5197 pdebug( 'entering w/(%s)(%s)(%s)', PDLEVEL2, $sname, $pos, $whence );
738 1702         4037 pIn();
739              
740 1702 50       3693 $whence = SEEK_SET unless defined $whence;
741 1702 50       3779 $pos = 0 unless defined $whence;
742              
743 1702 100       3264 if ( $whence == SEEK_SET ) {
744 1180         3295 $$self{streamPos}{$sname} = $pos;
745             } else {
746 522 50       2042 $cur = $$self{streamPos}{$sname} if exists $$self{streamPos}{$sname};
747              
748 522 50       2173 if ( $whence == SEEK_CUR ) {
    50          
749 0         0 $cur += $pos;
750             } elsif ( $whence == SEEK_END ) {
751 522         2090 $cur = $$self{streams}{$sname}->eos + $pos;
752             } else {
753 0         0 pdebug( 'invalid value for whence in seek (%s)',
754             PDLEVEL1, $whence );
755 0         0 $rv = undef;
756             }
757 522         1656 $$self{streamPos}{$sname} = $cur;
758             }
759 1702 50       5241 $$self{streamPos}{$sname} = 0 if $$self{streamPos}{$sname} < 0;
760              
761 1702 50       5210 $rv = $$self{streamPos}{$sname} if defined $rv;
762 1702 100       4320 $rv = '0 but true' if $rv == 0;
763              
764 1702         4423 pOut();
765 1702         3568 pdebug( 'leaving w/rv: %s', PDLEVEL2, $rv );
766              
767 1702         5815 return $rv;
768             }
769              
770             sub strmTell {
771              
772             # Purpose: Returns the current stream cursor position
773             # Returns: Integer
774             # Usage: $rv = $obj->_strmTell($sname);
775              
776 1156     1156 1 2045 my $self = shift;
777 1156         1872 my $sname = shift;
778 1156         1516 my $rv;
779              
780 1156 100       3565 $$self{streamPos}{$sname} = 0 unless exists $$self{streamPos}{$sname};
781              
782 1156         4237 return $$self{streamPos}{$sname};
783             }
784              
785             sub _growStream {
786              
787             # Purpose: Grows the stream as needed to accomodate the upcoming write
788             # based on the address of the write's starting position
789             # Returns: Boolean/Integer (bn of last block added)
790             # Usage: $rv = $obj->_growStream($sname, @addr);
791              
792 577     577   1282 my $self = shift;
793 577         997 my $sname = shift;
794 577         1562 my @addr = @_;
795 577         1271 my $file = $$self{file};
796 577         981 my $rv = 1;
797 577         1139 my ( $max, $stream, $bat, @bats, @blocks );
798              
799 577         1733 pdebug( 'entering w/(%s)(%s, %s, %s)', PDLEVEL3, $sname, @addr );
800 577         1551 pIn();
801              
802             # Get the stream and list of bats
803 577         2002 $stream = $self->_getStream($sname);
804 577         2425 @bats = $stream->bats;
805              
806             # Start padding BATs
807 577         2739 while ( $#bats <= $addr[ADDR_BAT] ) {
808              
809             # Add a BAT
810 577 100       1956 if ( $#bats < $addr[ADDR_BAT] ) {
811              
812             # Only add a BAT if we're still below the BAT address
813 11         77 $rv = $self->_addBAT( $sname, scalar @bats );
814 11 50       66 if ($rv) {
815 11         77 $stream->addBAT($rv);
816 11         55 @bats = $stream->bats;
817             } else {
818 0         0 last;
819             }
820             }
821              
822             # Add data blocks as needed
823 577         2570 $bat = $self->_getBAT( $sname, $#bats );
824 577         2221 @blocks = $bat->dataBlocks;
825 577 50       3979 while (
826             $#bats == $addr[ADDR_BAT]
827             ? $#blocks < $addr[ADDR_BLK]
828             : !$bat->full
829             ) {
830              
831 70         327 $rv = $self->_addBlock;
832 70 50       272 if ($rv) {
833 70         334 $bat->addData($rv);
834 70         244 @blocks = $bat->dataBlocks;
835             } else {
836 0         0 last;
837             }
838             }
839              
840 577 50       2541 last if $#bats == $addr[ADDR_BAT];
841             }
842              
843 577 50       1954 pdebug( 'failed to grow the stream (%s)', PDLEVEL1, $sname ) unless $rv;
844              
845 577         1742 pOut();
846 577         1561 pdebug( 'leaving w/rv: %s', PDLEVEL3, $rv );
847              
848 577         5182 return $rv;
849             }
850              
851             sub _strmWrite {
852              
853             # Purpose: Writes to the specified stream
854             # Returns: Integer/undef (bytes written/error)
855             # Usage: $bytes = $obj->_strmWrite($sname, $content);
856              
857 555     555   895 my $self = shift;
858 555         973 my $sname = shift;
859 555         1103 my $content = shift;
860 555         1295 my $file = $$self{file};
861 555         7966 my $bsize = $$self{blockSize};
862 555         1761 my ( $rv, $stream, $bat, $block, $pos );
863 555         0 my ( @addr, @blocks, $bn, $blkLeft, $offset, $clength, $chunk, $bw );
864              
865 555 50       2097 pdebug(
866             'entering w/(%s)(%s)',
867             PDLEVEL2, $sname,
868 555         3569 ( defined $content ? "@{[ length $content ]} bytes" : $content ),
869             );
870 555         1854 pIn();
871              
872 555 50       1648 if ( pflock( $file, LOCK_EX ) ) {
873              
874 555         1675 $stream = $self->_getStream($sname);
875 555 50 33     5443 if ( defined $stream and defined $content and length $content ) {
      33        
876              
877             # Get the current position
878 555         1884 $pos = $self->strmTell($sname);
879              
880             # Get the address
881 555         3389 @addr = $self->_calcAddr( $pos + length $content );
882              
883             # Allocate blocks as needed
884 555 50       2301 if ( $self->_growStream( $sname, @addr ) ) {
885 555         2299 @addr = $self->_calcAddr($pos);
886              
887             # Get the specified BAT and data block
888 555         2249 $bat = $self->_getBAT( $sname, $addr[ADDR_BAT] );
889 555         2409 @blocks = $bat->dataBlocks;
890              
891             # Get the specified block
892 555         3471 $block =
893             Paranoid::IO::FileMultiplexer::Block->new( $file,
894             $blocks[ $addr[ADDR_BLK] ], $bsize );
895              
896 555 50 33     3067 if ( defined $bat and defined $block ) {
897              
898             # Start writing
899 555         1291 $offset = $rv = 0;
900 555         2181 while ( $rv < length $content ) {
901              
902             # We need to know how much room is left in the block
903 625         1765 $blkLeft = $bsize - $addr[ADDR_OFT];
904              
905             # We need to know if the remaining content will fit in
906             # that block
907 625         1762 $clength = length($content) - $offset;
908 625 100       2332 $chunk = $clength <= $blkLeft ? $clength : $blkLeft;
909              
910             # Write the chunk
911 625         2782 $bw =
912             $block->bwrite( $content, $addr[ADDR_OFT], $chunk,
913             $offset );
914 625         1459 $rv += $bw;
915 625         1233 $offset += $bw;
916 625         1435 $pos += $bw;
917              
918             # Exit if we couldn't write the full chunk
919 625 50       1976 unless ( $bw == $chunk ) {
920 0         0 pdebug(
921             'failed to write entire contents: %s bytes',
922             PDLEVEL1, $rv );
923 0         0 last;
924             }
925              
926             # Get the next block if we have bytes left
927 625 100       3301 if ( $rv < length $content ) {
928 70         284 @addr = $self->_calcAddr($pos);
929 70 50       338 unless ( $bat->sequence == $addr[ADDR_BAT] ) {
930 0         0 $bat =
931             $self->_getBAT( $sname, $addr[ADDR_BAT] );
932 0         0 @blocks = $bat->dataBlocks;
933             }
934              
935             # Get the specified block
936             $block =
937 70         330 Paranoid::IO::FileMultiplexer::Block->new(
938             $file, $blocks[ $addr[ADDR_BLK] ], $bsize );
939             }
940             }
941             }
942              
943             # Update stream position and EOS
944 555 50       1560 if ($rv) {
945 555         2761 $self->strmSeek( $sname, $pos, SEEK_SET );
946 555 100       2896 $stream->writeEOS($pos) if $stream->eos < $pos;
947             }
948              
949             }
950              
951             }
952 555         1925 pflock( $file, LOCK_UN );
953             }
954              
955 555         1751 pOut();
956 555         1732 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
957              
958 555         6879 return $rv;
959             }
960              
961             sub strmWrite {
962              
963             # Purpose: Calls _strmWrite after making sure the file can be written to
964             # Returns: Integer/undef
965             # Usage: $bw = $obj->strmWrite($sname, $content);
966              
967 555     555 1 1355 my $self = shift;
968 555         1570 my @args = @_;
969 555   33     3194 my $bypass = $$self{readOnly} || $$self{corrupted};
970              
971 555 50       1505 pdebug( 'can\'t write to files that are corrupted or read-only',
972             PDLEVEL1 )
973             if $bypass;
974              
975 555 50       2490 return $bypass ? undef : $self->_strmWrite(@args);
976             }
977              
978             sub _strmRead {
979              
980             # Purpose: Reads from the specified stream
981             # Returns: Integer/undef (bytes read/error)
982             # Usage: $bytes = $obj->_strmRead($sname, $content, $bytes);
983              
984 68     68   115 my $self = shift;
985 68         136 my $sname = shift;
986 68         115 my $cref = shift;
987 68   50     193 my $btr = shift || 0;
988 68         164 my $file = $$self{file};
989 68         175 my $bsize = $$self{blockSize};
990 68         130 my $rv = 0;
991 68         228 my ( $stream, $pos, $eos, @addr, $content );
992 68         0 my ( $bat, @blocks, $block, $ctr, $br, $offset );
993              
994 68         240 pdebug( 'entering w/(%s)(%s)(%s)', PDLEVEL2, $sname, $cref, $btr );
995 68         238 pIn();
996              
997 68 50       245 if ( pflock( $file, LOCK_SH ) ) {
998              
999 68         228 $stream = $self->_getStream($sname);
1000 68 50 33     633 if ( defined $stream and defined $cref and ref $cref eq 'SCALAR' ) {
      33        
1001              
1002             # Get the current position
1003 68         241 $pos = $self->strmTell($sname);
1004              
1005             # Get the address
1006 68         223 @addr = $self->_calcAddr($pos);
1007              
1008             # Get the End Of Stream position
1009 68         262 $eos = $stream->eos;
1010              
1011             # Start reading
1012 68         217 $$cref = '';
1013 68   100     451 while ( $pos < $eos and $rv < $btr ) {
1014              
1015             # Get the specified BAT
1016 83         344 $bat = $self->_getBAT( $sname, $addr[ADDR_BAT] );
1017 83 50       269 if ( defined $bat ) {
1018              
1019             # Get the specified data block
1020 83         276 @blocks = $bat->dataBlocks;
1021 83         459 $block =
1022             Paranoid::IO::FileMultiplexer::Block->new( $file,
1023             $blocks[ $addr[ADDR_BLK] ], $bsize );
1024 83 50       258 if ( defined $block ) {
1025              
1026             # Take and early out if pos equals eos
1027 83 50       260 last unless $pos < $eos;
1028              
1029             # Figure out how much of the block we have left to
1030             # read
1031 83         320 $ctr = $bsize - $addr[ADDR_OFT];
1032              
1033             # Reduce it if the read finishes in this block
1034 83 100       272 $ctr = $btr - $rv if $ctr > $btr - $rv;
1035              
1036             # Reduce it further if EOS is even closer
1037 83 100       314 $ctr = $eos - $pos if $ctr > $eos - $pos;
1038              
1039             # Read the chunk
1040 83         342 $br =
1041             $block->bread( \$content, $addr[ADDR_OFT], $ctr );
1042 83         214 $rv += $br;
1043 83         191 $pos += $br;
1044 83         304 @addr = $self->_calcAddr($pos);
1045 83         1563 $$cref .= $content;
1046              
1047 83 50       727 unless ( $br == $ctr ) {
1048 0         0 pdebug(
1049             'failed to read entire chunk: %s/%s bytes',
1050             PDLEVEL1, $br, $ctr );
1051 0         0 last;
1052             }
1053              
1054             }
1055             }
1056             }
1057              
1058             # Update stream pointer
1059 68         296 $self->strmSeek( $sname, $pos, SEEK_SET );
1060              
1061             } else {
1062 0 0       0 if ( defined $stream ) {
1063 0         0 pdebug( 'invalid value passed for the content reference: %s',
1064             PDLEVEL1, $cref );
1065 0         0 $rv = undef;
1066             }
1067             }
1068              
1069 68         228 pflock( $file, LOCK_UN );
1070             }
1071              
1072 68         226 pOut();
1073 68         183 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
1074              
1075 68         887 return $rv;
1076             }
1077              
1078             sub strmRead {
1079              
1080             # Purpose: Calls _strmRead after making sure the file can be read from
1081             # Returns: Integer/undef
1082             # Usage: $br = $obj->strmRead($stream, \$content, $bytes);
1083              
1084 68     68 1 244 my $self = shift;
1085 68         264 my @args = @_;
1086 68         180 my $bypass = $$self{corrupted};
1087              
1088 68 50       246 pdebug( 'can\'t read from files that are corrupted', PDLEVEL1 )
1089             if $bypass;
1090              
1091 68 50       337 return $bypass ? undef : $self->_strmRead(@args);
1092             }
1093              
1094             sub strmAppend {
1095              
1096             # Purpose: Seeks to the end of the stream and writes new content there
1097             # Returns: Integer/undef (bytes written/error)
1098             # Usage: $bytes = $obj->_strmAppend($sname, $content);
1099              
1100 511     511 1 198769 my $self = shift;
1101 511         1388 my $sname = shift;
1102 511         1101 my $content = shift;
1103 511         1437 my $file = $$self{file};
1104 511         1143 my ( $rv, $stream, $pos );
1105              
1106 511 50       1947 pdebug( 'entering w/(%s)(%s)',
1107             PDLEVEL1, $sname,
1108 511         3992 ( defined $content ? "@{[ length $content ]} bytes" : $content ) );
1109 511         2036 pIn();
1110              
1111 511 50       2061 if ( pflock( $file, LOCK_EX ) ) {
1112 511         2389 $stream = $self->_getStream($sname);
1113 511 50       1850 if ( defined $stream ) {
1114 511         2103 $pos = $self->strmTell($sname);
1115 511 50       2316 if ( $self->strmSeek( $sname, 0, SEEK_END ) ) {
1116 511         2023 $rv = $self->strmWrite( $sname, $content );
1117 511         2519 $self->strmSeek( $sname, $pos, SEEK_SET );
1118             }
1119             }
1120             }
1121              
1122 511         2054 pOut();
1123 511         1417 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
1124              
1125 511         2482 return $rv;
1126             }
1127              
1128             sub _strmTruncate {
1129              
1130             # Purpose: Truncates the stream to the specified length. This will zero
1131             # out any data written past the new EOS.
1132             # Returns: Boolean
1133             # Usage: $rv = $obj->_strmTruncate($sname, $neos);
1134              
1135 11     11   33 my $self = shift;
1136 11         22 my $sname = shift;
1137 11         22 my $neos = shift;
1138 11         33 my $file = $$self{file};
1139 11         33 my ( $rv, $stream, $eos, $zeroes, $zl );
1140              
1141 11         55 pdebug( 'entering w/(%s)(%s)', PDLEVEL1, $sname, $neos );
1142 11         44 pIn();
1143              
1144 11 50       44 if ( pflock( $file, LOCK_EX ) ) {
1145 11         55 $stream = $self->_getStream($sname);
1146 11 50       55 if ( defined $stream ) {
1147 11         55 $eos = $stream->eos;
1148              
1149 11 50       66 if ( $neos < $eos ) {
1150              
1151             # Zero out old data beyond the new EOS
1152 11         33 $zl = $eos - $neos;
1153 11         484 $zeroes = pack "x$zl";
1154 11 50 33     77 $rv =
1155             $self->strmSeek( $sname, $neos, SEEK_SET )
1156             and $self->strmWrite( $sname, $zeroes )
1157             and $stream->writeEOS($neos);
1158             }
1159             }
1160             }
1161              
1162 11         77 pOut();
1163 11         44 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
1164              
1165 11         121 return $rv;
1166             }
1167              
1168             sub strmTruncate {
1169              
1170             # Purpose: Calls _strmTruncate after making sure the file can be written to
1171             # Returns: Integer/undef
1172             # Usage: $bw = $obj->strmTruncate($sname, $neos);
1173              
1174 11     11 1 44 my $self = shift;
1175 11         44 my @args = @_;
1176 11   33     88 my $bypass = $$self{readOnly} || $$self{corrupted};
1177              
1178 11 50       33 pdebug( 'can\'t write to files that are corrupted or read-only',
1179             PDLEVEL1 )
1180             if $bypass;
1181              
1182 11 50       66 return $bypass ? undef : $self->_strmTruncate(@args);
1183             }
1184              
1185             sub DESTROY {
1186              
1187 154     154   12055 my $self = shift;
1188              
1189             pclose( $$self{file} )
1190 154 100 100     1352 if defined $$self{file} and length $$self{file};
1191              
1192 154         4559 return 1;
1193             }
1194              
1195             1;
1196              
1197             __END__