File Coverage

blib/lib/File/Queue.pm
Criterion Covered Total %
statement 12 97 12.3
branch 0 44 0.0
condition 0 6 0.0
subroutine 4 11 36.3
pod 7 7 100.0
total 23 165 13.9


line stmt bran cond sub pod time code
1             package File::Queue;
2              
3 1     1   30581 use strict;
  1         3  
  1         49  
4 1     1   1017 use IO::File;
  1         22522  
  1         262  
5 1     1   10 use Fcntl 'SEEK_END', 'SEEK_SET', 'O_CREAT', 'O_RDWR';
  1         9  
  1         199  
6 1     1   6 use Carp qw(carp croak);
  1         2  
  1         2620  
7              
8             our $VERSION = '1.01';
9              
10             sub new
11             {
12 0     0 1   my $class = shift;
13 0           my $mi = $class . '->new()';
14              
15 0 0         croak "$mi requires an even number of parameters" if (@_ & 1);
16 0           my %params = @_;
17              
18             # convert to lower case
19 0           while( my($key, $val) = each %params)
20             {
21 0           delete $params{$key};
22 0           $params{ lc($key) } = $val;
23             }
24              
25 0 0         croak "$mi needs an File parameter" unless exists $params{file};
26 0           my $queue_file = delete $params{file};
27 0           my $idx_file = $queue_file . '.idx';
28 0           $queue_file .= '.dat';
29              
30 0           my $self;
31 0   0       my $mode = delete $params{mode} || '0600';
32 0   0       $self->{block_size} = delete $params{blocksize} || 64;
33 0   0       $self->{seperator} = delete $params{seperator} || "\n";
34 0           $self->{sep_length} = length $self->{seperator};
35              
36 0 0         croak "Seperator length cannot be greater than BlockSize" if ($self->{sep_length} > $self->{block_size});
37              
38 0           $self->{queue_file} = $queue_file;
39 0           $self->{idx_file} = $idx_file;
40              
41 0 0         $self->{queue} = new IO::File $queue_file, O_CREAT | O_RDWR, $mode or croak $!;
42 0 0         $self->{idx} = new IO::File $idx_file, O_CREAT | O_RDWR, $mode or croak $!;
43              
44             ### Default ptr to 0, replace it with value in idx file if one exists
45 0           $self->{idx}->sysseek(0, SEEK_SET);
46 0           $self->{idx}->sysread($self->{ptr}, 1024);
47 0 0         $self->{ptr} = '0' unless $self->{ptr};
48            
49 0 0         if($self->{ptr} > -s $queue_file)
50             {
51 0           carp "Ptr is greater than queue file size, resetting ptr to '0'";
52              
53 0 0         $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
54 0           $self->{idx}->sysseek(0, SEEK_SET);
55 0 0         $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!";
56             }
57              
58 0           bless $self, $class;
59 0           return $self;
60             }
61              
62             sub enq
63             {
64 0     0 1   my ($self, $element) = @_;
65              
66 0           $self->{queue}->sysseek(0, SEEK_END);
67              
68 0 0         if(ref $element)
69             {
70 0           croak 'Cannot handle references';
71             }
72              
73 0 0         if($element =~ s/$self->{seperator}//g)
74             {
75 0           carp "Removed illegal seperator(s) from $element";
76             }
77              
78 0 0         $self->{queue}->syswrite("$element$self->{seperator}") or croak "Could not syswrite to queue: $!";
79             }
80              
81             sub deq
82             {
83 0     0 1   my $self = shift;
84 0           my $element;
85              
86 0           $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
87              
88 0           my $i;
89 0           while($self->{queue}->sysread($_, $self->{block_size}))
90             {
91              
92 0           $i = index($_, $self->{seperator});
93 0 0         if($i != -1)
94             {
95 0           $element .= substr($_, 0, $i);
96 0           $self->{ptr} += $i + $self->{sep_length};
97 0           $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
98              
99 0           last;
100             }
101             else
102             {
103             ## If seperator isn't found, go back 'sep_length' spaces to ensure we don't miss it between reads
104 0           $element .= substr($_, 0, -$self->{sep_length}, '');
105 0           $self->{ptr} += $self->{block_size} - $self->{sep_length};
106 0           $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
107             }
108             }
109              
110             ## If queue seek pointer is at the EOF, truncate the queue file
111 0 0         if($self->{queue}->sysread($_, 1) == 0)
112             {
113 0 0         $self->{queue}->truncate(0) or croak "Could not truncate queue: $!";
114 0           $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET);
115             }
116              
117             ## Set idx file contents to point to the current seek position in queue file
118 0 0         $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
119 0           $self->{idx}->sysseek(0, SEEK_SET);
120 0 0         $self->{idx}->syswrite($self->{ptr}) or croak "Could not syswrite to idx: $!";
121              
122 0           return $element;
123             }
124              
125             sub peek
126             {
127 0     0 1   my ($self, $count) = @_;
128 0 0         croak "Invalid argument to peek ($count)" unless $count > 0;
129              
130 0           my $elements;
131              
132 0           $self->{queue}->sysseek($self->{ptr}, SEEK_SET);
133              
134 0           my (@items, $remainder);
135             GATHER:
136 0           while($self->{queue}->sysread($_, $self->{block_size}))
137             {
138 0 0         if(defined $remainder)
139             {
140 0           $_ = $remainder . $_;
141             }
142              
143 0           @items = split /$self->{seperator}/, $_, -1;
144 0           $remainder = pop @items;
145              
146 0           foreach (@items)
147             {
148 0           push @$elements, $_;
149 0 0         last GATHER if $count == @$elements;
150             }
151             }
152              
153 0           return $elements;
154             }
155              
156             sub reset
157             {
158 0     0 1   my $self = shift;
159              
160 0 0         $self->{idx}->truncate(0) or croak "Could not truncate idx: $!";
161 0           $self->{idx}->sysseek(0, SEEK_SET);
162 0 0         $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!";
163              
164 0           $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET);
165             }
166              
167             sub close
168             {
169 0     0 1   my $self = shift;
170              
171 0           $self->{idx}->close();
172 0           $self->{queue}->close();
173             }
174              
175             sub delete
176             {
177 0     0 1   my $self = shift;
178              
179 0           $self->close();
180              
181 0           unlink $self->{queue_file};
182 0           unlink $self->{idx_file};
183             }
184              
185             1;