File Coverage

blib/lib/Forks/Queue/File.pm
Criterion Covered Total %
statement 676 855 79.0
branch 221 396 55.8
condition 101 216 46.7
subroutine 76 81 93.8
pod 15 19 78.9
total 1089 1567 69.5


line stmt bran cond sub pod time code
1             package Forks::Queue::File;
2 100     100   1234237 use strict;
  100         244  
  100         3281  
3 100     100   617 use warnings;
  100         219  
  100         2970  
4 100     100   498 use Carp;
  100         207  
  100         6004  
5 100     100   68277 use JSON;
  100         836021  
  100         593  
6 100     100   14156 use Time::HiRes;
  100         2882  
  100         530  
7 100     100   7251 use base 'Forks::Queue';
  100         223  
  100         34725  
8 100     100   1902 use 5.010; # sorry, v5.08. I love the // //= operators too much
  100         405  
9              
10             our $VERSION = '0.15';
11             our $DEBUG;
12             *DEBUG = \$Forks::Queue::DEBUG;
13              
14             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
15              
16              
17             # prefer functional JSON calls because we still want to use JSON
18             # during global destruction, and a JSON object might not be available
19             # then
20             sub jsonize {
21 4050     4050 0 20049 JSON::to_json($_[0], { allow_nonref=>1, ascii=>1 } );
22             }
23              
24             sub dejsonize {
25 16666     16666 0 86436 JSON::from_json($_[0], { allow_nonref => 1, ascii => 1 } );
26             }
27              
28             # if we exercise firm control over line endings,
29             # we won't have any DOS vs Unix vs Mac fights.
30 100     100   713 use constant EOL => "\x{0a}";
  100         217  
  100         85281  
31             # Anything that can't be a valid JSON substring is ok to use here
32              
33             sub _lock {
34             # a file based queue generally lends itself to file based
35             # advisory locking, though it doesn't work on Solaris with threads.
36             # The generic _lock and _unlock functions can support other
37             # schemes.
38              
39            
40 15659     15659   25780 my $self = shift;
41 15659 50       31911 if ($self->{_locked}) {
42 0         0 Carp::cluck "$$ acquiring lock but already have lock";
43 0         0 return;
44             }
45 15659   33     43640 my $_DEBUG = $self->{debug} // $DEBUG;
46              
47 15659         91653 local $! = 0;
48 15659 100       44637 if ($self->{_lockdir}) {
    50          
49 256         1226 my $z = Dir::Flock::lock($self->{_lockdir});
50 256 50       786 $_DEBUG && print STDERR ">> flock_dir lock by ".
51             _PID() . " z=$z \!$=$!\n";
52 256 0 33     756 if (!$z && !$self->{_DESTROY}) {
53 0         0 carp "Forks::Queue: lock queue by flock_dir failed: $!";
54             }
55 256         731 $self->{__locked} = $z;
56             } elsif ($self->{lock}) {
57             # file-based advisory file locking with flock
58             # Doesn't work across threads in Solaris, since fcntl implementation
59             # passes the process id but not the thread id to the locking
60             # functions.
61            
62 15403         583613 open my $lockfh, ">>", $self->{lock};
63 15403         307608 my $z = flock $lockfh, 2;
64 15403   66     53794 while (!$z && $Forks::Queue::NOTIFY_OK && $!{EINTR}) {
      33        
65             # SIGIO can interrupt flock
66 129         34828 $z = flock $lockfh, 2;
67             }
68 15403         34447 $self->{lockfh} = $lockfh;
69 15403 50       28368 $_DEBUG && print STDERR ">> flock lock by " . _PID() . "\n";
70 15403         38635 $self->{__locked} = $z;
71             }
72 15659         61171 $self->{_locked} = 1;
73             }
74              
75             sub _unlock {
76 15659     15659   25515 my $self = shift;
77 15659         23492 $self->{_locked} = 0;
78 15659   33     45307 my $_DEBUG = $self->{debug} // $DEBUG;
79 15659         76246 local $! = 0;
80 15659 100       45797 if ($self->{_lockdir}) {
    50          
81 256         1158 my $z = Dir::Flock::unlock($self->{_lockdir});
82 256 50       694 $_DEBUG && print STDERR "<< flock_dir unlock by " . _PID() . " z=$z\n";
83             } elsif ($self->{lockfh}) {
84 15403         208926 my $z = close delete $self->{lockfh};
85 15403 50       75011 $_DEBUG && print STDERR "<< flock unlock by " . _PID() . " z=$z\n";
86             }
87 15659         52047 return;
88             }
89              
90              
91              
92              
93             # execute a block of code in a way where only one
94             # thread/process can be executing code for this queue
95             sub _SYNC (&$) {
96 15623     15623   33441 my ($block,$self) = @_;
97 15623 50       449249 return if Forks::Queue::__inGD();
98 15623   33     63465 my $_DEBUG = $self->{debug} // $DEBUG;
99              
100             # _lock can fail if queue object is being DESTROYed.
101 15623         48069 $self->_lock;
102 15623         43404 my $result = $block->($self);
103 15623 50 33     65142 $self->_unlock if $self->{__locked} || !$self->{_DESTROY};
104 15623         42720 return $result;
105             }
106              
107             sub _SYNCWA (&$) { # wantarray version of _SYNC
108 36     36   68 my ($block,$self) = @_;
109 36   33     124 my $_DEBUG = $self->{debug} // $DEBUG;
110              
111 36         97 $self->_lock;
112 36         112 my @result = $block->($self);
113 36         147 $self->_unlock;
114 36         74 return @result;
115             }
116              
117             sub _PID {
118 1942 50   1942   17248 $INC{'threads.pm'} ? join("-", $$, threads->tid) : $$
119             }
120              
121             sub new {
122 104     104 1 10003 my $class = shift;
123 104         1169 my %opts = (%Forks::Queue::OPTS, @_);
124              
125 104         424 ${^_nfs} = 0;
126 104   100     1021 $opts{file} //= _impute_file();
127 93         2368 $opts{lock} = $opts{file} . ".lock";
128 93         886 my $list = delete $opts{list};
129              
130             # my $fh;
131              
132 93   50     2957 $opts{_header_size} //= 2048;
133 93         495 $opts{_end} = 0; # whether "end" has been called for this obj
134 93         447 $opts{_pos} = 0; # "cursor", index of next item to shift out
135 93         822 $opts{_tell} = $opts{_header_size}; # file position of cursor
136              
137 93         496 $opts{_count} = 0; # index of next item to be appended
138 93         1579 $opts{_pids} = { _PID() => 'P' };
139 93         1057 $opts{_version} = $Forks::Queue::VERSION;
140 93         1720 $opts{_qid} = Forks::Queue::Util::QID();
141              
142             # how often to refactor the queue file. use small values to keep file
143             # sizes small and large values to improve performance
144 93   50     1670 $opts{_maintenance_freq} //= 128;
145              
146              
147            
148              
149 93 50       29033 open my $fh1, '>>', $opts{lock} or die;
150 93 50       2321 close $fh1 or die;
151              
152 93         5002 my $self = bless { %opts }, $class;
153              
154             # Normal flock can not be used with multi-threaded solaris or aix
155             # and may be flaky with files on NFS directories.
156 93 50 33     3933 if ($^O eq 'solaris' || $^O eq 'aix') {
    50          
    50          
157 0   0     0 $opts{dflock} //= 1;
158             } elsif (${^_nfs}) {
159 0   0     0 $opts{dflock} //= 1;
160             } elsif (Forks::Queue::Util::__is_nfs( $opts{file} )) {
161 0   0     0 $opts{dflock} //= 1;
162             }
163              
164 67 100       1979 if ($opts{dflock}) {
165             # Dir::Flock (included in this distribution) provides a safer
166             # (if more cumbersome) advisory locking method to synchronize
167             # the queue.
168 100     100   787 no warnings 'numeric';
  100         202  
  100         76175  
169 11         9505 require Dir::Flock;
170 11         89 $Dir::Flock::HEARTBEAT_CHECK = 5;
171 11         39 $Dir::Flock::PAUSE_LENGTH = 0.01;
172             }
173              
174              
175 67 100 66     1993 if ($opts{join} && -f $opts{file}) {
176 2         105 $DB::single = 1;
177 2 50       388 open my $fh2, '+<', $opts{file} or die;
178 2         72 $self->{_fh} = *$fh2;
179 2         63 my $fhx = select $fh2; $| = 1; select $fhx;
  2         57  
  2         61  
180 2     2   152 _SYNC { $self->_read_header } $self;
  2         89  
181             } else {
182 65 100       2806 if (-f $opts{file}) {
183 4         4548 carp "Forks::Queue: Queue file $opts{file} already exists. ",
184             "Expect trouble if another process created this file.";
185 4         344 my $z = unlink $opts{file};
186             }
187 65 50       11629 open my $fh3, '>', $opts{file} or die;
188 65 50       2077 close $fh3 or die;
189              
190 65 50       3847 open my $fh4, '+<', $opts{file} or die;
191 65         1762 my $fhx = select $fh4; $| = 1; select $fhx;
  65         861  
  65         1217  
192 65         4061 $self->{_fh} = *$fh4;
193 65         872 seek $fh4, 0, 0;
194 65 100       860 if ($opts{dflock}) {
195 11         232 $self->{_lockdir} = Dir::Flock::getDir($opts{lock},$opts{persist});
196             }
197              
198 65         804 $self->{_locked}++;
199 65         2003 $self->_write_header;
200 65         526 $self->{_locked}--;
201 65 50       531 if (tell($fh4) < $self->{_header_size}) {
202 65         2206 print $fh4 "\0" x ($self->{_header_size} - tell($fh4));
203             }
204             }
205 67 100       528 if (defined($list)) {
206 6 50       59 if (ref($list) eq 'ARRAY') {
207 6         307 $self->push( @$list );
208             } else {
209 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
210             }
211             }
212 67         7959 return $self;
213             }
214              
215              
216             sub DESTROY {
217 85     85   31745132 my $self = shift;
218 85         727 my $pid = _PID();
219 85   33     1302 my $_DEBUG = $self->{debug} // $DEBUG;
220 85 50       521 $_DEBUG && print STDERR "$pid DESTROY called\n";
221 85         393 $self->{_DESTROY}++;
222 85 50       3350 if (Forks::Queue::__inGD()) {
223 0         0 $self->{_locked} = -1;
224 0 0       0 if (my $h = $self->_read_header) {
225 0 0       0 $_DEBUG && print STDERR "$pid DESTROY header at GD: $h\n";
226 0         0 my $role = delete $self->{_pids}{$pid};
227 0 0 0     0 if ($role && $role eq 'P') {
228 0         0 $self->{_pids} = {};
229 0 0       0 $_DEBUG && print STDERR "$pid DESTROY role=P\n";
230 0         0 $self->_write_header;
231             }
232             }
233 0         0 delete $self->{_locked};
234             } else {
235 85         270 eval {
236             _SYNC {
237 85 50   85   512 if ($self->_read_header) {
238             $_DEBUG and print STDERR
239             "$pid DESTROY: pids at destruction: ",
240 85 50       342 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
241 85         349 delete $self->{_pids}{$pid};
242 85         563 $self->_write_header;
243 85 50       601 $_DEBUG and print STDERR "$pid DESTROY header updated.\n";
244             } else {
245 0 0       0 $_DEBUG and print STDERR
246             "$$ DESTROY: header not available\n";
247             }
248 85         1809 } $self;
249 85 50       998 $_DEBUG && print STDERR
250             "$pid DESTROY final header read complete\n";
251             };
252 85 50       374 if ($@) {
253 0 0       0 if ($@ !~ /malformed JSON ...* at character offset 0/) {
    0          
254 100     100   2785 use Data::Dumper;
  100         20507  
  100         39646  
255 0         0 print STDERR Dumper($@,$self);
256             } elsif ($_DEBUG) {
257 0         0 print STDERR "$pid DESTROY error reading header: $@";
258             }
259             }
260             }
261 85 50       1330 $self->{_fh} && close $self->{_fh};
262             $_DEBUG and print STDERR "$pid DESTROY: remaining pids: ",
263 85 50       499 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
264 85 100 66     994 if ($self->{_pids} && 0 == keys %{$self->{_pids}} && !$self->{persist}) {
  85   100     8710  
265 49 50       242 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
266 49         141 my $u2 = -1;
267 49         2800 my $u1 = unlink $self->{lock};
268 49 50       2768 $u2 = unlink $self->{file} unless $self->{persist};
269 49 50       349 $_DEBUG and print STDERR
270             "$$ DESTROY unlink results $u1/$u2 $self->{lock} $self->{file}\n";
271 49 50       1728 $_DEBUG and print STDERR
272             "$$ DESTROY: unlink time " . Time::HiRes::time . "\n";
273             }
274             }
275              
276             # the key to a shared file acting as a queue is the header,
277             # which holds the queue metadata like the file position of
278             # the current front and back of the queue, and the identifiers
279             # of processes that are using the queue.
280             #
281             # this function should only be called from inside a _SYNC block.
282              
283             sub _read_header {
284 15601     15601   31535 my ($self) = @_;
285 15601 50       33917 Carp::cluck "unsafe _read_header" unless $self->{_locked};
286 15601         57078 local $/ = EOL;
287 15601         38097 my $_DEBUG = $self->_debug;
288 15601         27014 my $h = "";
289 15601 100       32728 if ($self->{_DESTROY}) {
290 100     100   847 no warnings 'closed';
  100         234  
  100         49788  
291 85         1276 seek $self->{_fh}, 0, 0;
292 85   50     2188 $h = readline($self->{_fh}) // "";
293 85 50       420 $_DEBUG && print STDERR
294             "$$ Read ",length($h)," bytes from header during DESTROY\n";
295             } else {
296 15516         65211 local $! = 0;
297 15516 50       194578 if (seek $self->{_fh}, 0, 0) {
298 15516         216875 $h = readline($self->{_fh});
299             } else {
300 0         0 Carp::cluck "_read_header: invalid seek $!";
301 0         0 return;
302             }
303             }
304 15601 50       45929 if (!$h) {
305 0 0       0 if ($self->{_DESTROY}) {
306 0 0       0 $_DEBUG && print STDERR "$$ in DESTROY and header not found\n";
307 0         0 return;
308             }
309 0         0 Carp::cluck "_read_header: header not found";
310             }
311 15601         26892 chomp($h);
312 15601         32892 $h = dejsonize($h);
313 15601         554346 $self->{_pos} = $h->{index};
314 15601         26343 $self->{_end} = $h->{end};
315 15601         24014 $self->{_tell} = $h->{tell};
316 15601         22936 $self->{_count} = $h->{count};
317 15601         23111 $self->{_header_size} = $h->{headerSize};
318 15601         22598 $self->{_maintenance_freq} = $h->{maintFreq};
319 15601         24224 $self->{_version} = $h->{version};
320 15601         30404 $self->{_pids} = $h->{pids};
321 15601   100     54774 $self->{_lockdir} = $h->{lockdir} || undef;
322 15601 50       35118 $self->{limit} = $h->{limit} if $h->{limit};
323              
324 15601 50       28362 $_DEBUG && print STDERR "$$ read header\n";
325              
326 15601         32604 $h->{avail} = $self->{_avail} = $h->{count} - $h->{index}; # not written
327 15601         54925 return $h;
328             }
329              
330             sub _write_header {
331 1005     1005   12989 my ($self) = @_;
332 1005 50       3277 Carp::cluck "unsafe _write_header" unless $self->{_locked};
333 1005   33     5842 my $_DEBUG = $self->{debug} // $DEBUG;
334             my $header = { index => $self->{_pos}, end => $self->{_end},
335             tell => $self->{_tell}, count => $self->{_count},
336             limit => $self->{limit},
337             pids => $self->{_pids},
338             qid => $self->{_qid},
339             headerSize => $self->{_header_size},
340             maintFreq => $self->{_maintenance_freq},
341             ($self->{_lockdir} ? (lockdir => $self->{_lockdir}) : ()),
342 1005 100       14216 version => $self->{_version} };
343              
344 1005         4146 my $headerstr = jsonize($header);
345 1005         44425 while (length($headerstr) >= $self->{_header_size}) {
346 0         0 $self->_increase_header_size(length($headerstr) + 32);
347 0         0 $header->{tell} = $self->{_tell};
348 0         0 $headerstr = jsonize($header);
349             }
350              
351 1005         2641 eval {
352 100     100   790 no warnings;
  100         217  
  100         693103  
353 1005         10754 seek $self->{_fh}, 0, 0;
354 1005         2762 print {$self->{_fh}} $headerstr,EOL;
  1005         22037  
355 1005 50       9573 $_DEBUG && print STDERR "$$ updated header $headerstr\n";
356             };
357             }
358              
359             sub _notify {
360 660 50   660   2077 return unless $Forks::Queue::NOTIFY_OK;
361              
362 660         1224 my $self = shift;
363 660   33     3282 my $_DEBUG = $self->{debug} // $DEBUG;
364 660     660   4951 _SYNC { $self->_read_header } $self;
  660         2297  
365 660         2724 my @ids = keys %{$self->{_pids}};
  660         3840  
366 660         1619 my (@pids,@tids);
367 660         1633 my $me = _PID();
368 660 50       1864 $_DEBUG && print STDERR "$$ _notify \$me=$me \@ids=@ids\n";
369 660         2440 foreach my $id (@ids) {
370 1242         5807 my ($p,$t) = split /-/,$id;
371 1242 50       2981 if (!$p) {
372 0         0 ($p,$t) = (-$t,0);
373             }
374 1242 100 33     5773 if ($p != $$) {
    50          
375 582         1996 push @pids, $p;
376             } elsif (defined($t) && $id ne $me) {
377 0         0 push @tids, $t;
378             }
379             }
380 660 50       1869 if (@tids) {
381 0 0       0 $_DEBUG && print STDERR "$$ notify: tid @tids\n";
382 0         0 foreach my $tid (@tids) {
383 0         0 my $thr = threads->object($tid);
384 0 0       0 if ($thr) {
    0          
385 0         0 my $z7;
386 0 0 0     0 $thr && ($z7 = $thr->kill('IO')) &&
      0        
387             $_DEBUG && print STDERR
388             "$$ _notify to tid $$-$tid \$z7=$z7\n";
389 0 0       0 if ($tid ne $tids[-1]) {
390             #Time::HiRes::sleep 0.25;
391             }
392              
393             # $thr->kill is not reliable?
394            
395             } elsif ($tid == 0) {
396 0 0       0 $_DEBUG && print STDERR "$$ _notify SIGIO to tid main\n";
397 0         0 kill 'IO', $$;
398             } else {
399 0 0       0 $_DEBUG && print STDERR
400             "$$ _notify failed to SIGIO tid $tid\n";
401             }
402             }
403             }
404 660 100       2011 if (@pids) {
405 405 50       1056 $_DEBUG && print STDERR "$$ _notify to pids @pids\n";
406 405         23894 kill 'IO', @pids;
407             }
408             }
409              
410             sub clear {
411 62     62 1 16271 my $self = shift;
412 62 50       122 if (! eval { $self->_check_pid; 1 } ) {
  62         254  
  62         209  
413 0         0 carp("File::Queue::clear operation failed: $@");
414 0         0 return;
415             }
416             _SYNC {
417 62     62   301 $self->_read_header;
418 62         207 $self->{_pos} = 0;
419 62         138 $self->{_tell} = $self->{_header_size};
420 62         109 $self->{_count} = 0;
421 62         2033 truncate $self->{_fh}, $self->{_tell};
422 62         345 $self->_write_header;
423 62         690 } $self;
424             }
425              
426             sub end {
427 19     19 1 31686729 my ($self) = @_;
428 19 50       80 if (! eval { $self->_check_pid; 1 } ) {
  19         138  
  19         191  
429 0         0 carp "Forks::Queue::end operation failed: $@";
430 0         0 return;
431             }
432             _SYNC {
433 19     19   139 $self->_read_header;
434 19 50       112 if ($self->{_end}) {
435 0         0 carp "Forks::Queue: end() called from $$, ",
436             "previously called from $self->{_end}";
437             } else {
438 19         77 $self->{_end} = _PID();
439             }
440 19         100 $self->_write_header;
441 19         698 } $self;
442 19         263 $self->_notify;
443 19         88 return;
444             }
445              
446             sub status {
447 361     361 1 61030621 my ($self) = @_;
448 361     361   4498 my $status = _SYNC { $self->_read_header } $self;
  361         2021  
449 361         3206 $status->{file} = $self->{file};
450 361         5021 $status->{filesize} = -s $self->{_fh};
451 361         1274 $status->{end} = $self->{_end};
452 361         1635 return $status;
453             }
454              
455             sub _check_pid {
456 986     986   2604 my ($self) = @_;
457 986   33     7445 my $_DEBUG = $self->{debug} // $DEBUG;
458 986 100       5802 if (!defined $self->{_pids}{_PID()}) {
459 28 50       1069 if ($Forks::Queue::NOTIFY_OK) {
460 28 50       553 if (_PID() =~ /.-[1-9]/) {
461             # SIGIO can't be reliably passed to threads, so can't
462             # rely on long sleep command being interrupted
463 0         0 $Forks::Queue::SLEEP_INTERVAL = 1;
464             }
465 28     223   2387 $SIG{IO} = sub { };
466             }
467 28         4500 my $ostatus = open $self->{_fh}, '+<', $self->{file};
468 28         590 for (1..5) {
469 28 50       571 last if $ostatus;
470 0         0 sleep int(sqrt($_));
471 0         0 $ostatus = open $self->{_fh}, '+<', $self->{file};
472             }
473 28 50       432 if (!$ostatus) {
474 0         0 Carp::confess("Forks::Queue::check_pid: ",
475             "Could not open $self->{file} after 5 tries: $!");
476 0         0 return;
477             }
478 28 50       415 if ($self->{_locked}) {
479 0 0       0 $_DEBUG && print STDERR
480             "Forks::Queue: $$ new pid update header\n";
481 0         0 $self->{_pids}{_PID()} = 'C';
482 0         0 $self->_write_header;
483 0         0 return;
484             } else {
485 28 50       307 $_DEBUG and print STDERR "Forks::Queue: $$ new pid sync\n";
486             _SYNC {
487 28     28   799 $self->_read_header;
488 28         185 $self->{_pids}{_PID()} = 'C';
489 28         836 $self->_write_header;
490 28         1019 } $self;
491 28         201 return;
492             }
493             }
494 958         2729 return;
495             }
496              
497             sub _increase_header_size {
498 0     0   0 my ($self,$min_size) = @_;
499             # assumes $self has been updated by $self->_read_header recently
500 0 0       0 return if $min_size <= $self->{_header_size};
501              
502 0         0 local $/ = EOL;
503 0         0 my $delta = $min_size - $self->{_header_size};
504 0         0 seek $self->{_fh}, $self->{_header_size}, 0;
505 0         0 my @data = readline($self->{_fh});
506 0         0 seek $self->{_fh}, 0, 0;
507 0         0 print {$self->{_fh}} "\0" x $min_size;
  0         0  
508 0         0 print {$self->{_fh}} @data;
  0         0  
509 0         0 $self->{_header_size} = $min_size;
510 0         0 $self->{_tell} += $delta;
511 0         0 return;
512             }
513              
514             sub _maintain {
515 5     5   15 my ($self) = @_;
516             # assumes $self has been updated by $self->_read_header recently
517              
518 5         16 my $delta = $self->{_tell} - $self->{_header_size};
519 5 50       16 return if $delta == 0;
520 5         22 local $/ = EOL;
521 5         62 seek $self->{_fh}, $self->{_tell}, 0;
522 5         183 my @data = readline($self->{_fh});
523 5         52 seek $self->{_fh}, $self->{_header_size}, 0;
524 5         17 print {$self->{_fh}} @data;
  5         76  
525 5         109 truncate $self->{_fh}, tell($self->{_fh});
526              
527 5         20 $self->{_avail} = $self->{_count} = @data;
528 5         10 $self->{_pos} = 0;
529 5         11 $self->{_tell} = $self->{_header_size};
530 5         34 return;
531             }
532              
533             sub push {
534 475     475 1 2594 my ($self,@items) = @_;
535 475 50       1720 if (! eval { $self->_check_pid; 1 } ) {
  475         3239  
  475         1992  
536 0         0 carp "Forks::Queue::put call from process $$ failed: $@";
537 0         0 return;
538             }
539              
540 475         1083 my (@deferred_items,$failed_items);
541 475         1022 my $pushed = 0;
542             _SYNC {
543 475     355   2756 $self->_read_header;
544 475 100       1745 if ($self->{_end}) {
545             carp "Forks::Queue: put call from process $$ ",
546 5         1801 "after end call from process ", $self->{_end}, "!";
547 5         691 return 0;
548             }
549              
550             # put: add whatever items there is room for
551             # enqueue: add all items if there is room for one item
552 470 100       1494 if ($self->{limit} > 0) {
553 159 100       579 if ($Forks::Queue::File::_ENQUEUE) {
554 4 50       19 if ($self->{_avail} >= $self->{limit}) {
555 0         0 $failed_items = @deferred_items = @items;
556 0         0 @items = ();
557             }
558             } else {
559 155         610 $failed_items = $self->{_avail} + @items - $self->{limit};
560 155 100       483 if ($failed_items > 0) {
561 120         745 @deferred_items = splice @items, -$failed_items;
562 120 100       458 if (@items == 0) {
563 4         10 return;
564             }
565             } else {
566 35         94 $failed_items = 0;
567             }
568             }
569             }
570              
571 466 50       1475 if (@items > 0) {
572 466         6442 seek $self->{_fh}, 0, 2;
573 466 50       2629 if (tell($self->{_fh}) < $self->{_tell}) {
574 0         0 Carp::cluck "funny seek";
575 0         0 seek $self->{_fh}, $self->{_tell}, 0;
576             }
577 466         1397 foreach my $item (@items) {
578 2981         6257 my $json = jsonize($item);
579 2981         65797 print {$self->{_fh}} $json,EOL;
  2981         36874  
580 2981         11813 $self->{_count}++;
581 2981         4421 $self->{_avail}++;
582 2981         4726 $pushed++;
583 2981 50       7293 $self->_debug && print STDERR
584             "$$ put item [$json] $pushed/",0+@items,"\n";
585             }
586             }
587 466         1842 $self->_write_header;
588 475         6831 } $self;
589 475 50 66     8092 if ($pushed && $self->_debug) {
590 0         0 print STDERR "_notify from push(\$pushed=$pushed)\n";
591             }
592 475 100       3003 $self->_notify if $pushed;
593              
594 475 100       2012 if ($failed_items) {
595 120 100       486 if ($self->{on_limit} eq 'fail') {
596 49         15919 carp "Forks::Queue: queue buffer is full ",
597             "and $failed_items items were not added";
598             } else {
599 71 50       306 $self->_debug && print STDERR
600             "$$ $failed_items on put. Waiting for capacity\n";
601 71         679 $self->_wait_for_capacity;
602 71 50       320 $self->_debug && print STDERR "$$ got some capacity\n";
603 71         1237 return $pushed + $self->push(@deferred_items);
604             }
605             }
606 404         9038 return $pushed;
607             }
608              
609             sub enqueue {
610 4     4 1 3195 undef $Forks::Queue::File::_ENQUEUE;
611 4         20 local $Forks::Queue::File::_ENQUEUE = 1;
612 4         30 return Forks::Queue::File::push(@_);
613             }
614              
615             sub unshift {
616 0     0 1 0 my ($self,@items) = @_;
617 0         0 return $self->insert(0, @items);
618             }
619              
620             sub _SLEEP {
621 13205     13205   19089 my $self = shift;
622             # my $tid = threads->self;
623 13205   50     111091526 my $n = sleep($Forks::Queue::SLEEP_INTERVAL || 1);
624             #Carp::cluck("LONG SLEEP \$n=$n") if $n > 10;
625 13205         69592 return $n;
626             }
627              
628             sub _wait_for_item {
629 99     99   323 my ($self) = @_;
630 99         260 my $ready = 0;
631 99         238 do {
632 13180     13180   86364 _SYNC { $self->_read_header } $self;
  13180         31447  
633 13180   100     94233 $ready = $self->{_avail} || $self->{_end} || $self->_expired;
634 13180 100       29228 if (!$ready) {
635 13081         24735 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL||1)
636             }
637             } while !$ready;
638 99         436 return $self->{_avail};
639             }
640              
641             sub _wait_for_capacity {
642 71     71   326 my ($self) = @_;
643 71         183 my $ready = 0;
644 71         151 do {
645 142 50       758 if ($self->{limit} <= 0) {
646 0         0 $ready = 1;
647             } else {
648 142     142   1571 _SYNC { $self->_read_header } $self;
  142         710  
649 142   66     1381 $ready = $self->{_avail} < $self->{limit} && !$self->{_end};
650 142 100       526 if (!$ready) {
651 71         257 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
652             }
653             }
654             } while !$ready;
655 71         310 return $self->{_avail} < $self->{limit};
656             }
657              
658             sub dequeue {
659 25     25 1 7099 my $self = shift;
660 25 50       150 Forks::Queue::_validate_input($_[0],'count',1) if @_;
661 15 50       98 if ($self->{style} ne 'lifo') {
662 15 50       150 return @_ ? $self->_dequeue_front(@_) : $self->_dequeue_front;
663             } else {
664 0 0       0 return @_ ? $self->_dequeue_back(@_) : $self->_dequeue_back;
665             }
666             }
667              
668             sub _dequeue_back {
669 0     0   0 my $self = shift;
670 0 0 0     0 my $count = @_ ? $_[0] // 1 : 1;
671 0 0       0 if (! eval { $self->_check_pid; 1 } ) {
  0         0  
  0         0  
672 0         0 carp "Forks::Queue::pop operation failed: $@";
673 0         0 return;
674             }
675 0 0 0     0 if ($self->limit > 0 && $count > $self->limit) {
676             # error message compatible with Thread::Queue
677 0         0 croak "dequeue: 'count' argument ($count) exceeds queue size limit (",
678             $self->limit, ")";
679             }
680 0         0 my @return;
681 0         0 local $/ = EOL;
682 0         0 while (@return == 0) {
683             _SYNC {
684 0 0 0 0   0 return if $self->{_avail} < $count && !$self->{_end};
685 0         0 seek $self->{_fh}, $self->{_tell}, 0;
686 0         0 my $avail = $self->{_avail};
687 0         0 while ($avail > $count) {
688 0         0 scalar readline($self->{_fh});
689 0         0 $avail--;
690             }
691 0         0 my $spot = tell $self->{_fh};
692 0         0 @return = map dejsonize($_), readline($self->{_fh});
693 0         0 truncate $self->{_fh}, $spot;
694 0         0 $self->{_count} -= @return;
695 0         0 $self->_write_header;
696 0         0 } $self;
697 0 0 0     0 last if @return || $self->{_end} || $self->_expired;
      0        
698 0         0 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
699             }
700 0 0       0 $self->_notify if @return;
701 0 0 0     0 if ($self->_expired && @return == 0) {
702 0 0       0 return @_ ? $self->pop_nb(@_) : $self->pop_nb;
703             }
704 0 0 0     0 return @_ ? @return : $return[0] // ();
705             }
706              
707             sub _dequeue_front {
708 15     15   46 my $self = shift;
709 15 50 50     59 my $count = @_ ? $_[0] // 1 : 1;
710 15 50       40 if (! eval { $self->_check_pid; 1 } ) {
  15         70  
  15         66  
711 0         0 carp "Forks::Queue::shift operation failed: $@";
712 0         0 return;
713             }
714 15 50 33     388 if ($self->limit > 0 && $count > $self->limit) {
715             # error message compatible with Thread::Queue
716 0         0 croak "dequeue: 'count' argument ($count) exceeds queue size limit (",
717             $self->limit, ")";
718             }
719 15         35 my @return;
720 15         58 local $/ = EOL;
721 15         50 while (@return == 0) {
722             _SYNC {
723 62     62   496 $self->_read_header;
724 62 100 66     691 return if $self->{_avail} < $count && !$self->{_end};
725 10         148 seek $self->{_fh}, $self->{_tell}, 0;
726 10   66     99 while (@return < $count && $self->{_avail} > 0) {
727 35         186 my $item = readline($self->{_fh});
728 35 50       107 if (!defined($item)) {
729 0         0 $self->_write_header;
730 0         0 return;
731             }
732 35         62 chomp($item);
733 35         48 eval {
734 35         81 CORE::push @return, dejsonize($item);
735             };
736 35 50       784 if ($@) {
737 0         0 $self->_write_header;
738 0         0 die "JSON was \"$item\", error was $@";
739             }
740 35         75 $self->{_pos}++;
741 35         78 $self->{_tell} = tell $self->{_fh};
742 35         119 $self->{_avail}--;
743             }
744 10 50 33     86 if ($self->{_maintenance_freq} &&
745             $self->{_pos} >= $self->{_maintenance_freq}) {
746              
747 0         0 $self->_maintain;
748             }
749 10         47 $self->_write_header;
750 62         1804 } $self;
751 62 100 66     1429 last if @return || $self->{_end} || $self->_expired;
      66        
752 47         243 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
753             }
754 15 100       88 $self->_notify if @return;
755 15 100 66     117 if ($self->_expired && @return == 0) {
756 5 50       204 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
757             }
758 10 50 0     163 return @_ ? @return : $return[0] // ();
759             }
760              
761             sub shift :method {
762 99     99 1 2001071 my ($self,$count) = @_;
763 99   100     924 $count ||= 1;
764 99 50       345 if (! eval { $self->_check_pid; 1 } ) {
  99         674  
  99         398  
765 0         0 carp "Forks::Queue::shift method failed: $@";
766 0         0 return;
767             }
768              
769 99         305 my @return;
770 99         475 while (@return == 0) {
771 99         268 my $h;
772 99 100       819 return if !$self->_wait_for_item;
773 90         407 local $/ = EOL;
774             _SYNC {
775 90     90   638 $self->_read_header;
776              
777 90         1354 seek $self->{_fh}, $self->{_tell}, 0;
778 90   100     1112 while (@return < $count && $self->{_avail} > 0) {
779 728         2273 my $item = readline($self->{_fh});
780 728 50       1571 if (defined($item)) {
781 728         1144 chomp($item);
782 728         1021 eval {
783 728         1319 CORE::push @return, dejsonize($item);
784             };
785 728 50       14872 if ($@) {
786 0         0 $self->_write_header;
787 0         0 die "JSON was \"$item\", error was $@";
788             }
789 728         1167 $self->{_pos}++;
790 728         1373 $self->{_tell} = tell $self->{_fh};
791 728         2468 $self->{_avail}--;
792             }
793             }
794 90 100 66     643 if ($self->{_maintenance_freq} &&
795             $self->{_pos} >= $self->{_maintenance_freq}) {
796              
797 5         59 $self->_maintain;
798             }
799 90         427 $self->_write_header;
800 90         1310 } $self;
801             }
802 90 50       830 $self->_notify if @return;
803 90 100 100     1008 if (!wantarray && @_ < 2) {
804 48   33     475 return $return[0] // ();
805             } else {
806 42         400 return @return;
807             }
808             }
809              
810             sub shift_nb {
811 7     7 1 110 my ($self,$count) = @_;
812 7   100     66 $count ||= 1;
813 7 50       50 if (! eval { $self->_check_pid; 1 } ) {
  7         53  
  7         53  
814 0         0 carp "Forks::Queue::shift operation failed: $@";
815 0         0 return;
816             }
817              
818 7         25 my @return;
819             my $h;
820             #return if !$self->_wait_for_item;
821 7         48 local $/ = EOL;
822             _SYNC {
823 7     7   38 $self->_read_header;
824              
825 7         137 seek $self->{_fh}, $self->{_tell}, 0;
826 7   66     176 while (@return < $count && $self->{_avail} > 0) {
827 20         92 my $item = readline($self->{_fh});
828 20 50       59 if (!defined($item)) {
829 0         0 $self->_write_header;
830 0         0 return;
831             }
832 20         33 chomp($item);
833 20         37 eval {
834 20         37 CORE::push @return, dejsonize($item);
835             };
836 20 50       507 if ($@) {
837 0         0 die "JSON was \"$item\", error was $@";
838             }
839 20         33 $self->{_pos}++;
840 20         53 $self->{_tell} = tell $self->{_fh};
841 20         74 $self->{_avail}--;
842             }
843 7 50 33     113 if ($self->{_maintenance_freq} &&
844             $self->{_pos} >= $self->{_maintenance_freq}) {
845              
846 0         0 $self->_maintain;
847             }
848 7         46 $self->_write_header;
849 7         14 return;
850 7         189 } $self;
851 7 100       113 $self->_notify if @return;
852 7 100 66     52 if (!wantarray && @_ < 2) {
853 2   33     33 return $return[0] // ();
854             } else {
855 5         77 return @return;
856             }
857             }
858              
859             sub peek_front {
860 71     71 0 1479 my ($self, $index) = @_;
861 71   100     223 $index ||= 0;
862 71 100       140 if ($index < 0) {
863 20         58 return $self->peek_back(-$index - 1);
864             }
865 51 50       91 if (! eval { $self->_check_pid; 1 } ) {
  51         137  
  51         113  
866 0         0 carp "Forks::Queue::peek operation failed: $@";
867 0         0 return;
868             }
869 51         77 my @return;
870 51         162 local $/ = EOL;
871              
872 51         83 my $h;
873 51     51   242 _SYNC { $self->_read_header } $self;
  51         153  
874 51 100       231 return if $self->{_avail} <= $index;
875              
876             _SYNC {
877 49     49   154 $self->_read_header;
878              
879 49         548 seek $self->{_fh}, $self->{_tell}, 0;
880 49         112 my $item;
881 49         154 while ($index-- >= 0) {
882 351         821 $item = readline($self->{_fh});
883 351 50       714 if (!defined($item)) {
884 0         0 return;
885             }
886             }
887 49         88 chomp($item);
888              
889 49         110 CORE::push @return, dejsonize($item);
890 49         335 } $self;
891 49         451 return $return[0];
892             }
893              
894             sub peek_back {
895 58     58 0 147 my ($self, $index) = @_;
896 58   100     200 $index ||= 0;
897 58 100       132 if ($index < 0) {
898 10         27 return $self->peek_front(-$index - 1);
899             }
900 48 50       64 if (! eval { $self->_check_pid; 1 } ) {
  48         127  
  48         122  
901 0         0 carp "Forks::Queue::peek operation failed: $@";
902 0         0 return;
903             }
904 48         93 my $count = $index + 1;
905 48         166 local $/ = EOL;
906 48         85 my @return;
907              
908             my $h;
909             _SYNC {
910 48     48   142 $self->_read_header;
911 48 100       131 return if $self->{_avail} <= $index;
912              
913 43         534 seek $self->{_fh}, $self->{_tell}, 0;
914 43         155 my $pos = $self->{_pos};
915 43         133 while ($pos + $count < $self->{_count}) {
916 528         1084 scalar readline($self->{_fh});
917 528         897 $pos++;
918             }
919 43         202 my $item = readline($self->{_fh});
920 43         198 chomp($item);
921 43         93 @return = dejsonize($item);
922 48         315 } $self;
923 48         454 return $return[0];
924             }
925              
926             sub extract {
927 52     52 1 30655 my $self = shift;
928 52 100       243 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
929 46   100     150 my $index = shift || 0;
930 46 100       134 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
931            
932 36   100     92 my $count = $_[0] // 1;
933             # my $count = @_ ? shift : 1;
934 36 100       80 if ($self->{style} eq 'lifo') {
935 18         23 $index = -1 - $index;
936 18         32 $index -= $count - 1;
937             }
938 36         129 local $/ = EOL;
939 36         56 my @return;
940             _SYNCWA {
941 36     36   102 $self->_read_header;
942 36         77 my $n = $self->{_avail};
943 36 50       79 if ($count <= 0) {
944 0         0 carp "Forks::Queue::extract: count must be positive";
945 0         0 return;
946             }
947 36 100       70 if ($index < 0) {
948 18         34 $index = $index + $n;
949 18 100       35 if ($index < 0) {
950 8         14 $count += $index;
951 8         13 $index = 0;
952             }
953             }
954 36 100 100     130 if ($count <= 0 || $index >= $n) {
955 8         21 return;
956             }
957 28 100       68 if ($index + $count >= $n) {
958 6         12 $count = $n - $index;
959             }
960              
961 28         339 seek $self->{_fh}, $self->{_tell}, 0;
962 28         471 scalar readline($self->{_fh}) for 0..$index-1; # skip
963 28         84 my $save = tell $self->{_fh};
964             @return = map {
965 28         82 my $item = readline($self->{_fh});
  84         1060  
966 84         122 chomp($item);
967 84         108 $self->{_avail}--;
968 84         98 $self->{_count}--;
969 84         137 dejsonize($item);
970             } 1..$count;
971 28         971 my @buffer = readline($self->{_fh});
972 28         226 seek $self->{_fh}, $save, 0;
973 28         61 print {$self->{_fh}} @buffer;
  28         389  
974 28         575 truncate $self->{_fh}, tell $self->{_fh};
975 28         116 $self->_write_header;
976 36         260 } $self;
977 36 100       448 $self->_notify if @return;
978 36 100 66     239 return @_ ? @return : $return[0] // ();
979             }
980              
981             sub insert {
982 28     28 1 5196 my ($self, $pos, @items) = @_;
983 28         128 Forks::Queue::_validate_input( $pos, 'index' );
984 20 50       36 if (! eval { $self->_check_pid; 1 } ) {
  20         53  
  20         39  
985 0         0 carp "Forks::Queue::insert operation failed: $@";
986 0         0 return;
987             }
988 20         72 local $/ = EOL;
989 20         31 my $nitems = @items;
990 20         32 my (@deferred_items, $failed_items);
991 20         33 my $inserted = 0;
992             _SYNC {
993 20     20   54 $self->_read_header;
994 20 50       43 if ($self->{_end}) {
995             carp "Forks::Queue::insert call from process $$ ",
996 0         0 "after end call from process ", $self->{_end}, "!";
997 0         0 return 0;
998             }
999 20 50 33     90 if ($self->{on_limit} ne "tq-compat" && $self->{limit} > 0) {
1000 20         68 my $failed_items = $self->{_avail} + @items - $self->{limit};
1001 20 100       43 if ($failed_items > 0) {
1002 4         13 @deferred_items = splice @items, -$failed_items;
1003 4 50       12 if (@items == 0) {
1004 0         0 return;
1005             }
1006             } else {
1007 16         27 $failed_items = 0;
1008             }
1009             }
1010              
1011 20 100       36 if ($pos < 0) {
1012 8         12 $pos += $self->{_avail};
1013             }
1014 20 100       59 if ($pos >= $self->{_avail}) {
1015             # insert at end of queue (append)
1016 4         45 seek $self->{_fh}, 0, 2;
1017 4 50       20 if (tell($self->{_fh}) < $self->{_tell}) {
1018 0         0 Carp::cluck("funny seek");
1019 0         0 seek $self->{_fh}, $self->{_tell}, 0;
1020             }
1021 4         9 foreach my $item (@items) {
1022 16         22 print {$self->{_fh}} jsonize($item),EOL;
  16         50  
1023 16         456 $self->{_count}++;
1024 16         25 $self->{_avail}++;
1025 16         20 $inserted++;
1026 16 50       33 $self->_debug && print STDERR
1027             "$$ insert item $inserted/",0+@items,"\n";
1028             }
1029 4         13 $self->_write_header;
1030 4         10 return;
1031             }
1032 16 100       28 if ($pos < 0) {
1033 4         6 $pos = 0;
1034             }
1035 16         182 seek $self->{_fh}, $self->{_tell}, 0;
1036 16         94 while ($pos > 0) {
1037 64         167 scalar readline($self->{_fh});
1038 64         153 $pos--;
1039             }
1040 16         36 my $save = tell($self->{_fh});
1041 16         185 my @buffer = readline($self->{_fh});
1042 16         128 seek $self->{_fh}, $save, 0;
1043 16         42 foreach my $item (@items) {
1044 48         64 print {$self->{_fh}} jsonize($item),EOL;
  48         162  
1045 48         1484 $self->{_count}++;
1046 48         68 $self->{_avail}++;
1047 48         60 $inserted++;
1048 48 50       116 $self->_debug && print STDERR
1049             "$$ insert item $inserted/",0+@items,"\n";
1050             }
1051 16         23 print {$self->{_fh}} @buffer;
  16         218  
1052 16         90 $self->_write_header;
1053 20         216 } $self;
1054 20 50       279 if ($failed_items) {
1055 0 0       0 if ($self->{on_limit} eq 'fail') {
1056 0         0 carp "Forks::Queue: queue buffer is full ",
1057             "and $failed_items items were not inserted";
1058             } else {
1059 0 0       0 $self->_debug && print STDERR
1060             "$$ $failed_items on insert. Waiting for capacity\n";
1061 0         0 $self->_wait_for_capacity;
1062 0 0       0 $self->_debug && print STDERR "$$ got some capacity\n";
1063 0         0 return $inserted + $self->insert($pos+$inserted, @deferred_items);
1064             }
1065             }
1066 20 50       84 $self->_notify if $inserted;
1067 20         118 return $inserted;
1068             }
1069              
1070             sub pop {
1071 22     22 1 5893 my ($self,$count) = @_;
1072 22   100     140 $count ||= 1;
1073 22 50       51 if (! eval { $self->_check_pid; 1 } ) {
  22         92  
  22         92  
1074 0         0 carp "Forks::Queue::pop operation failed: $@";
1075 0         0 return;
1076             }
1077 22         97 local $/ = EOL;
1078 22         56 my @return;
1079 22         69 while (@return == 0) {
1080 22         41 my $h;
1081             do {
1082 28     28   364 _SYNC { $self->_read_header } $self;
  28         124  
1083             } while (!$self->{_avail} && !$self->{_end} &&
1084 22   66     31 1 + _SLEEP($self)); #sleep($Forks::Queue::SLEEP_INTERVAL || 1));
      66        
1085              
1086 22 50 66     119 return if $self->{_end} && !$self->{_avail};
1087              
1088             _SYNC {
1089 22     22   90 $self->_read_header;
1090 22         308 seek $self->{_fh}, $self->{_tell}, 0;
1091 22 100       116 if ($self->{_avail} <= $count) {
1092 5         107 my @items = readline($self->{_fh});
1093 5         24 chomp(@items);
1094 5         19 @return = map dejsonize($_), @items;
1095 5         213 truncate $self->{_fh}, $self->{_tell};
1096 5         34 $self->{_count} -= @items;
1097             } else {
1098 17         39 my $pos = $self->{_pos};
1099 17         63 while ($pos + $count < $self->{_count}) {
1100 136         334 scalar readline($self->{_fh});
1101 136         296 $pos++;
1102             }
1103 17         48 my $eof = tell $self->{_fh};
1104 17         176 my @items = readline($self->{_fh});
1105 17         488 truncate $self->{_fh}, $eof;
1106 17         71 $self->{_count} -= @items;
1107 17         48 chomp(@items);
1108 17         70 @return = map dejsonize($_), @items;
1109             }
1110 22         575 $self->_write_header;
1111 22         267 } $self;
1112             }
1113 22 50       133 $self->_notify if @return;
1114 22 100 66     120 if (!wantarray && @_ < 2) {
1115 2         19 return $return[0];
1116             } else {
1117 20         121 return @return;
1118             }
1119             }
1120              
1121             sub pop_nb {
1122 4     4 1 31 my ($self,$count) = @_;
1123 4   50     72 $count ||= 1;
1124 4 50       6 if (! eval { $self->_check_pid; 1 } ) {
  4         65  
  4         71  
1125 0         0 carp "Forks::Queue::pop operation failed: $@";
1126 0         0 return;
1127             }
1128 4         75 local $/ = EOL;
1129 4         12 my @return;
1130             my $h;
1131 4     4   104 _SYNC { $self->_read_header } $self;
  4         37  
1132 4 50 33     23 return if $self->{_end} && !$self->{_avail};
1133              
1134             _SYNC {
1135 4     4   25 $self->_read_header;
1136              
1137 4         50 seek $self->{_fh}, $self->{_tell}, 0;
1138 4 50       31 if ($self->{_avail} <= $count) {
1139 4         41 my @items = readline($self->{_fh});
1140 4         35 chomp(@items);
1141 4         9 @return = map dejsonize($_), @items;
1142 4         99 truncate $self->{_fh}, $self->{_tell};
1143 4         16 $self->{_count} -= @items;
1144 4         32 $self->_write_header;
1145 4         11 return;
1146             }
1147              
1148 0         0 my $pos = $self->{_pos};
1149 0         0 while ($pos + $count < $self->{_count}) {
1150 0         0 scalar readline($self->{_fh});
1151 0         0 $pos++;
1152             }
1153 0         0 my $eof = tell $self->{_fh};
1154 0         0 my @items = readline($self->{_fh});
1155 0         0 truncate $self->{_fh}, $eof;
1156 0         0 $self->{_count} -= @items;
1157 0         0 chomp(@items);
1158 0         0 @return = map dejsonize($_), @items;
1159 0         0 $self->_write_header;
1160 0         0 return;
1161 4         66 } $self;
1162 4 50       51 $self->_notify if @return;
1163 4 50 33     64 if (!wantarray && @_ < 2) {
1164 4         38 return $return[0];
1165             } else {
1166 0         0 return @return;
1167             }
1168             }
1169              
1170             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
1171              
1172             sub Forks::Queue::File::MagicLimit::TIESCALAR {
1173 20     20   214 my ($pkg,$obj) = @_;
1174 20         266 return bless \$obj,$pkg;
1175             }
1176              
1177             sub Forks::Queue::File::MagicLimit::FETCH {
1178 379     379   2061 return ${$_[0]}->{limit};
  379         2714  
1179             }
1180              
1181             sub Forks::Queue::File::MagicLimit::STORE {
1182 15     15   61 my ($tie,$val) = @_;
1183 15         142 my $queue = $$tie;
1184 15         57 my $oldval = $queue->{limit};
1185 15         38 $queue->{limit} = $val;
1186 15     15   196 _SYNC { $queue->_write_header } $queue;
  15         52  
1187 15         1277 return $oldval;
1188             }
1189              
1190             sub limit :lvalue {
1191 164     164 1 9003693 my $self = shift;
1192 164 50       1023 if (! eval { $self->_check_pid; 1 } ) {
  164         1611  
  164         595  
1193 0         0 carp "Forke::Queue::limit operation failed: $@";
1194 0         0 return;
1195             }
1196 164 100       1546 if (!$self->{_limit_magic}) {
1197 20         840 tie $self->{_limit_magic},'Forks::Queue::File::MagicLimit', $self;
1198             }
1199 164     164   2337 _SYNC { $self->_read_header } $self;
  164         908  
1200 164 100       1074 if (@_) {
1201 43         133 $self->{limit} = shift @_;
1202 43 100       141 if (@_) {
1203 28         201 $self->{on_limit} = shift @_;
1204             }
1205 43     43   535 _SYNC { $self->_write_header } $self;
  43         268  
1206             }
1207 164         1276 return $self->{_limit_magic};
1208             }
1209              
1210             sub _debug {
1211 19254   33 19254   65087 shift->{debug} // $Forks::Queue::DEBUG;
1212             }
1213              
1214             sub _DUMP {
1215 0     0   0 my ($self,$fh_dump) = @_;
1216 0   0     0 $fh_dump ||= *STDERR;
1217 0         0 open my $fh_qdata, '<', $self->{file};
1218 0         0 print {$fh_dump} <$fh_qdata>;
  0         0  
1219 0         0 close $fh_qdata;
1220             }
1221              
1222             my $id = 0;
1223             sub _impute_file {
1224 51     51   336 my $base = $0;
1225 51         544 $base =~ s{.*[/\\](.)}{$1};
1226 51         286 $base =~ s{[/\\]$}{};
1227 51         156 $id++;
1228 51         161 my $file;
1229             my @candidates;
1230 51 50       515 if ($^O eq 'MSWin32') {
1231 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
1232             } else {
1233 51         297 @candidates = qw(/tmp /var/tmp);
1234             }
1235              
1236             # try hard to avoid using an NFS drive
1237 51         617 for my $candidate ($ENV{FORKS_QUEUE_DIR},
1238             $ENV{TMPDIR}, $ENV{TEMP},
1239             $ENV{TMP}, @candidates,
1240             $ENV{HOME}, ".") {
1241 255 100       629 next if !defined($candidate);
1242 51 50 33     2145 if (-d $candidate && -w _ && -x _) {
      33        
1243 51   33     1003 $file //= "$candidate/.fq-$$-$id-base";
1244 51 50       376 next if Forks::Queue::Util::__is_nfs($candidate);
1245 40         895 ${^_nfs} = 0;
1246 40         4565 return "$candidate/.fq-$$-$id-$base";
1247             }
1248             }
1249              
1250 0           ${^_nfs} = 1;
1251 0           carp "Forks::Queue::File: queue file $file might be on an NFS filesystem!";
1252 0           return $file;
1253             }
1254              
1255             1;
1256              
1257             =head1 NAME
1258              
1259             Forks::Queue::File - file-based implementation of Forks::Queue
1260              
1261             =head1 VERSION
1262              
1263             0.15
1264              
1265             =head1 SYNOPSIS
1266              
1267             my $q = Forks::Queue::File->new( file => "queue-file" );
1268             $q->put( "job1" );
1269             $q->put( { name => "job2", task => "do something", data => [42,19] } );
1270             ...
1271             $q->end;
1272             for my $w (1 .. $num_workers) {
1273             if (fork() == 0) {
1274             my $task;
1275             while (defined($task = $q->get)) {
1276             ... perform task in child ...
1277             }
1278             exit;
1279             }
1280             }
1281              
1282             =head1 METHODS
1283              
1284             See L for an overview of the methods supported by
1285             this C implementation.
1286              
1287             =head2 new
1288              
1289             =head2 $queue = Forks::Queue::File->new( %opts )
1290              
1291             =head2 $queue = Forks::Queue->new( impl => 'File', %opts )
1292              
1293             The C constructor recognized the following configuration
1294             options.
1295              
1296             =over 4
1297              
1298             =item * file
1299              
1300             The name of the file to use to score queue data and metadata.
1301             If omitted, a temporary filename is chosen.
1302              
1303             It is strongly recommended not to use a file that would reside on an
1304             NFS filesystem, since these filesystems have notorious difficulty
1305             with synchronizing files across processes.
1306              
1307             =item * style
1308              
1309             =item * limit
1310              
1311             =item * on_limit
1312              
1313             =item * join
1314              
1315             =item * persist
1316              
1317             See L for descriptions of these options.
1318              
1319             =item * debug
1320              
1321             Boolean value to enable or disable debugging on this queue,
1322             overriding the value in C<$Forks::Queue::DEBUG>.
1323              
1324             =item * dflock
1325              
1326             Boolean value to enable directory-based alternative to flock
1327             for synchronization of the queue across processeses. The module
1328             will often be able to guess whether this flag should be
1329             set by default, but it should be used explicitly in some cases
1330             such as sharing a queue over processes on different hosts
1331             accessing a shared, networked filesystem.
1332              
1333             =back
1334              
1335             =head1 BUGS AND LIMITATIONS
1336              
1337             As with anything that requires C, you should avoid allowing the
1338             queue file to reside on an NFS drive.
1339              
1340             =head1 LICENSE AND COPYRIGHT
1341              
1342             Copyright (c) 2017-2019, Marty O'Brien.
1343              
1344             This library is free software; you can redistribute it and/or modify
1345             it under the same terms as Perl itself, either Perl version 5.10.1 or,
1346             at your option, any later version of Perl 5 you may have available.
1347              
1348             See http://dev.perl.org/licenses/ for more information.
1349              
1350             =cut