File Coverage

blib/lib/Forks/Queue/Shmem.pm
Criterion Covered Total %
statement 70 72 97.2
branch 17 26 65.3
condition 8 16 50.0
subroutine 7 7 100.0
pod 1 1 100.0
total 103 122 84.4


line stmt bran cond sub pod time code
1             package Forks::Queue::Shmem;
2 56     56   270028 use base 'Forks::Queue::File';
  56         217  
  56         12586  
3 56     56   385 use strict;
  56         159  
  56         1315  
4 56     56   330 use warnings;
  56         128  
  56         1391  
5 56     56   434 use Carp;
  56         128  
  56         49583  
6              
7             our $VERSION = '0.15';
8             our $DEV_SHM = "/dev/shm";
9             our $DEBUG;
10             *DEBUG = \$Forks::Queue::DEBUG;
11              
12             sub new {
13 43     43 1 2585 my $class = shift;
14 43         369 my %opts = (%Forks::Queue::OPTS, @_);
15              
16 43 50       1147 if (! -d $DEV_SHM) {
17 0         0 croak "\$DEV_SHM not set to a valid shared memory virtual filesystem";
18             }
19              
20 43 100       443 if ($opts{file}) {
21 22   33     258 $opts{loc} //= $opts{file};
22 22         107 $opts{loc} =~ s{.*/(.)}{$1};
23 22         55 $opts{loc} =~ s{/+$}{};
24 22         119 $opts{file} = "$DEV_SHM/" . $opts{loc};
25             } else {
26 21         190 $opts{file} = _impute_file();
27             }
28              
29 43         385 $opts{lock} = $opts{file} . ".lock";
30 43   50     235 $opts{limit} //= -1;
31 43   50     196 $opts{on_limit} //= 'fail';
32 43   50     168 $opts{style} //= 'fifo';
33 43         130 my $list = delete $opts{list};
34              
35 43   50     649 $opts{_header_size} //= 2048;
36 43         126 $opts{_end} = 0; # whether "end" has been called for this obj
37 43         126 $opts{_pos} = 0; # "cursor", index of next item to shift out
38 43         121 $opts{_tell} = $opts{_header_size}; # file position of cursor
39              
40 43         139 $opts{_count} = 0; # index of next item to be appended
41 43         301 $opts{_pids} = { Forks::Queue::File::_PID() => 'P' };
42             # $opts{_pids} = { $$ => 'P' };
43 43         292 $opts{_qid} = Forks::Queue::Util::QID();
44              
45             # how often to refactor the queue file. use small values to keep file
46             # sizes small and large values to improve performance
47 43   50     484 $opts{_maintenance_freq} //= 32;
48              
49             open my $fh5, '>>', $opts{lock}
50 43 50       3885 or die "Forks::Queue::Shmem: ",
51             "failed to create lock file '$opts{lock}': $!";
52 43 50       643 close $fh5 or die;
53              
54 43         887 my $self = bless { %opts }, $class;
55              
56 43 100 66     517 if ($opts{join} && -f $opts{file}) {
57 2         19 $DB::single = 1;
58 2 50       86 open my $fh6, '+<', $opts{file} or die;
59 2         21 $self->{_fh} = *$fh6;
60 2         14 my $fhx = select $fh6; $| = 1; select $fhx;
  2         14  
  2         13  
61 2     2   27 Forks::Queue::File::_SYNC { $self->_read_header } $self;
  2         38  
62             } else {
63 41 100       733 if (-f $opts{file}) {
64 2         1326 carp "Forks::Queue: Queue file $opts{file} already exists. ",
65             "Expect trouble if another process created this file.";
66             }
67 41 50       1722 open my $fh8, '>>', $opts{file} or croak(
68             "Forks::Queue: could not create queue file $opts{file}: $!");
69 41 50       608 close $fh8 or croak(
70             "Forks::Queue: bizarre error closing queue file $opts{file} $!");
71              
72 41 50       1984 open my $fh7, '+<', $opts{file} or croak(
73             "Forks::Queue: error re-opening queue file $opts{file} $!");
74              
75 41         315 my $fx = select $fh7;
76 41         231 $| = 1;
77 41         215 select $fx;
78              
79 41         777 $self->{_fh} = *$fh7;
80 41         395 seek $fh7, 0, 0;
81              
82 41         261 $self->{_locked}++;
83 41         748 $self->_write_header;
84 41         494 $self->{_locked}--;
85 41 50       297 if (tell($fh7) < $self->{_header_size}) {
86 41         754 print $fh7 "\0" x ($self->{_header_size} - tell($fh7));
87             }
88             }
89 43 100       299 if (defined($list)) {
90 5 50       23 if (ref($list) eq 'ARRAY') {
91 5         68 $self->push( @$list );
92             } else {
93 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
94             }
95             }
96              
97 43         429 return $self;
98             }
99              
100             my $id = 0;
101             sub _impute_file {
102 21     21   195 my $base = $0;
103 21         423 $base =~ s{.*[/\\](.)}{$1};
104 21         223 $base =~ s{[/\\]$}{};
105 21         62 $id++;
106 21         318 return "$DEV_SHM/shmq-$$-$id-$base";
107             }
108              
109             1;
110              
111             =head1 NAME
112              
113             Forks::Queue::Shmem - Forks::Queue implementation using shared memory
114              
115             =head1 SYNOPSIS
116              
117             use Forks::Queue::Shmem;
118             $q = Forks::Queue::Shmem->new;
119              
120             use Forks::Queue;
121             $q = Forks::Queue->new( impl => 'Shmem, ... );
122              
123             =head1 VERSION
124              
125             0.15
126              
127             =head1 DESCRIPTION
128              
129             Shared memory implementation of L.
130             Only available on systems that have a C virtual filesystem.
131              
132             A shared memory implementation is appropriate for programs that
133             rapidly update the queue but are not likely to let the size of data
134             in the queue exceed the available memory on the host machine.
135             Use L if you demand high capacity for your queue.
136              
137             See L for the public API to this class.
138              
139             =head2 Constructor options
140              
141             In addition to the standard options described in
142             L, the
143             C constructor also recognizes some
144             additional options:
145              
146             =over 4
147              
148             =item * file
149              
150             The name of the filename to hold the queue data. An absolute
151             pathname should not be provided here. The virtual queue file
152             will reside under the shared memory virtual filesystem
153             (probably C) on your system, if it exists.
154              
155             =item * style
156              
157             =item * limit
158              
159             =item * on_limit
160              
161             =item * join
162              
163             =item * persist
164              
165             See L for descriptions of these options.
166              
167             =item * debug
168              
169             Boolean value to enable or disable debugging on this queue,
170             overriding the value in C<$Forks::Queue::DEBUG>.
171              
172             =back
173              
174             =head1 LICENSE AND COPYRIGHT
175              
176             Copyright (c) 2017-2019, Marty O'Brien.
177              
178             This library is free software; you can redistribute it and/or modify
179             it under the same terms as Perl itself, either Perl version 5.10.1 or,
180             at your option, any later version of Perl 5 you may have available.
181              
182             See http://dev.perl.org/licenses/ for more information.
183              
184             =cut