File Coverage

blib/lib/Forks/Queue/SQLite.pm
Criterion Covered Total %
statement 462 619 74.6
branch 175 286 61.1
condition 92 164 56.1
subroutine 51 55 92.7
pod 15 18 83.3
total 795 1142 69.6


line stmt bran cond sub pod time code
1             package Forks::Queue::SQLite;
2 61     61   2288502 use strict;
  61         169  
  61         1963  
3 61     61   363 use warnings;
  61         157  
  61         1860  
4 61     61   310 use Carp;
  61         140  
  61         3011  
5 61     61   13355 use JSON;
  61         169710  
  61         564  
6 61     61   7773 use DBI;
  61         140  
  61         2307  
7 61     61   419 use DBD::SQLite;
  61         151  
  61         1695  
8 61     61   350 use Time::HiRes 'time';
  61         135  
  61         498  
9 61     61   9258 use base 'Forks::Queue';
  61         169  
  61         14921  
10 61     61   1292 use 5.010; # implementation contains // //= operators
  61         220  
11              
12             our $VERSION = '0.15';
13             our ($DEBUG,$XDEBUG);
14             *DEBUG = \$Forks::Queue::DEBUG;
15             *XDEBUG = \$Forks::Queue::XDEBUG;
16              
17             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
18              
19             our $jsonizer = JSON->new->allow_nonref(1)->ascii(1);
20              
21             sub new {
22 38     38 1 3480 my $class = shift;
23 38         348 my %opts = (%Forks::Queue::OPTS, @_);
24 38   33     694 my $_DEBUG = $DEBUG || $opts{debug};
25              
26 38 50 66     295 if ($opts{join} && !$opts{db_file}) {
27 0         0 croak "Forks::Queue::SQLite: db_file opt required with join";
28             }
29 38 50 66     339 if ($opts{file} && !$opts{db_file}) {
30 0         0 carp "file => passed to Forks::Queue::SQLite constructor! ",
31             "You probably meant db_file => ... !";
32             }
33 38   66     539 $opts{db_file} //= _impute_file(); # $opts{file} = $opts{db_file};
34 38   50     290 $opts{limit} //= -1;
35 38   50     340 $opts{on_limit} //= 'fail';
36 38   50     230 $opts{style} //= 'fifo';
37 38         122 my $list = delete $opts{list};
38              
39 38 100 100     1767 if (!$opts{join} && -f $opts{db_file}) {
40 2         1242 carp "Forks::Queue: sqlite db file $opts{db_file} already exists!";
41             }
42              
43 38   66     335 my $exists = $opts{join} && -f $opts{db_file};
44 38         360 $opts{_pid} = [ $$, TID() ];
45             # process id is tied to database handle. If process id doesn't match
46             # $self->{_pid}, we must open a new process id.
47              
48 38         421 my $self = bless { %opts }, $class;
49              
50 38 100       211 if (!$exists) {
51             # sometimes unlink doesn't work -- just on NTFS or on other
52             # systems, too?
53 36         407 my $z1 = -f $opts{db_file};
54 36         528 my $z2 = unlink $opts{db_file};
55             $_DEBUG && print STDERR "Creating new database at ",
56 36 50       377 $opts{db_file}," (unlink=$z1 ^ $z2)\n";
57             my $dbh = DBI->connect("dbi:SQLite:dbname=" . $opts{db_file},
58 36         1191 "", "");
59 36         88120 $self->{_dbh} = $opts{_dbh} = $dbh;
60 36 50 66     315 if ($z1 && !$z2) {
61             # file is not fresh. Reinitialize.
62 0         0 my $z3 = $dbh->do("DROP TABLE the_queue");
63 0         0 my $z4 = $dbh->do("DROP TABLE pids");
64 0         0 my $z5 = $dbh->do("DROP TABLE status");
65             $_DEBUG && print STDERR "Could not erase existing db file ",
66 0 0       0 $opts{db_file}, " for reuse ... reinitialized $z3 $z4 $z5\n";
67             }
68 36 50       128 if (!eval { $self->_init }) {
  36         277  
69 0         0 carp __PACKAGE__, ": db initialization failed";
70 0         0 return;
71             }
72             } else {
73 2 50       13 $_DEBUG && print STDERR "Using existing database at ",$opts{db_file},"\n";
74 2         26 $self->_dbh;
75             }
76 38 100       292 if (defined($list)) {
77 5 50       154 if (ref($list) eq 'ARRAY') {
78 5         89 $self->push( @$list );
79             } else {
80 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
81             }
82             }
83              
84 38         665 return $self;
85             }
86              
87             # wrapper for database operations I expect to succeed, but may fail with
88             # intermittent synchronization issues ("attempt to write to a readonly
89             # database...") on perl v5.10 and v5.12. Pausing and retrying the operation
90             # generally fixes these issues.
91             sub _try {
92 24378     24378   52766 my ($count, $code) = @_;
93 24378 50       55708 $count = 1 if $] >= 5.014;
94 24378         39703 my $z = $code->();
95 24378         122402 my ($f0,$f1) = (1,1);
96 24378         63529 while (!$z) {
97 1 50       9 last if --$count <= 0;
98 0         0 ($f0,$f1)=($f1,$f0+$f1);
99 0         0 my (undef,undef,$lcaller) = caller(0);
100 0 0       0 $DEBUG && print STDERR "retry after ${f0}s: $lcaller\a\n";
101 0         0 sleep $f0;
102 0         0 $z = $code->();
103             }
104 24378         62759 return $z;
105             }
106              
107             sub _init {
108 36     36   148 my $self = shift;
109 36         115 my $dbh = $self->{_dbh};
110              
111 36         617 my $z1 = $dbh->do("CREATE TABLE the_queue (
112             timestamp decimal(27,15), batchid mediumint,
113             item text)");
114 36 50       537202 if (!$z1) {
115 0         0 carp __PACKAGE__, ": error creating init table";
116 0         0 return;
117             }
118              
119 36         576 my $z2 = $dbh->do("CREATE TABLE pids (pid mediumint,tid mediumint)");
120 36 50       428525 if (!$z2) {
121 0         0 carp __PACKAGE__, ": error creating init table";
122 0         0 return;
123             }
124              
125 36         731 my $sth = $dbh->prepare("INSERT INTO pids VALUES (?,?)");
126 36         4903 my $z3 = $sth->execute(@{$self->{_pid}});
  36         376059  
127 36 50       480 if (!$z3) {
128 0         0 carp __PACKAGE__, ": error adding process id to tracker";
129 0         0 return;
130             }
131              
132 36         542 my $z4 = $dbh->do("CREATE TABLE status(key text,value text)");
133 36 50       369041 if (!$z4) {
134 0         0 carp __PACKAGE__, ": error creating init table";
135 0         0 return;
136             }
137              
138 36         644 $self->_status("db_file", $self->{db_file});
139 36         206 $self->_status("owner", "@{$self->{_pid}}");
  36         753  
140 36         413 $self->_status("style", $self->{style});
141 36         556 $self->_status("limit", $self->{limit});
142 36         356 $self->_status("on_limit", $self->{on_limit});
143 36         699 return 1;
144             }
145              
146 28268 50   28268 0 74331 sub TID { $INC{'threads.pm'} ? threads->tid : 0 }
147              
148             sub _dbh {
149 28069     28069   38777 my $self = shift;
150 28069         48684 my $tid = TID();
151 28069 100 100     179348 if ($self->{_dbh} && $$ == $self->{_pid}[0] && $tid == $self->{_pid}[1]) {
      66        
152 28055         193295 return $self->{_dbh};
153             }
154 14 50       2947 if (Forks::Queue::__inGD()) {
155             # database already destroyed? Don't try to recreate in GD.
156 0         0 return;
157             }
158              
159 14         289 $self->{_pid} = [$$,$tid];
160             $self->{_dbh} =
161 14         680 DBI->connect("dbi:SQLite:dbname=".$self->{db_file},"","");
162 14         18098 $self->{_dbh}{AutoCommit} = 1;
163 14 100       232 if (!$self->{_DESTROY}) {
164 12         596 $self->{_dbh}->begin_work;
165 12         646 $self->{_dbh}->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid");
166 12         959764 $self->{_dbh}->do("INSERT INTO pids VALUES ($$,$tid)");
167 12         227814 $self->{_dbh}->commit;
168 12         434 $self->{style} = $self->_status("style");
169 12         132 $self->{limit} = $self->_status("limit");
170 12         175 $self->{on_limit} = $self->_status("on_limit");
171             }
172 14         140 return $self->{_dbh};
173             }
174              
175             # DESTROY responsibilities for F::Q::SQLite:
176             # remove pid+tid from pids table
177             # disconnect from database
178             # delete database file if last pid/tid and !persist
179             sub DESTROY {
180 34     34   22123773 my $self = shift;
181 34         283 $self->{_DESTROY}++;
182 34   33     624 my $_DEBUG = $self->{debug} // $DEBUG;
183 34         145 my $dbh;
184 34 50       2639 if (Forks::Queue::__inGD()) {
185 0 0       0 return unless eval { $dbh = $self->_dbh; 1 };
  0         0  
  0         0  
186             } else {
187 34         215 $dbh = $self->_dbh;
188             }
189 34 50       251 my $tid = $self->{_pid} ? $self->{_pid}[1] : TID();
190 34         147 my $t = [[-1]];
191 34   33     445 my $pid_rm = $dbh && eval {
192             $dbh->{PrintWarn} = # suppress "attempt to write ..."
193             $dbh->{PrintError} = 0; # warnings, particularly on 5.010, 5.012
194             $dbh->begin_work;
195              
196             my $z1 = _try(3, sub {
197 34     34   678 $dbh->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid") } );
198              
199             if ($z1) {
200             my $sth = $dbh->prepare("SELECT COUNT(*) FROM pids");
201             my $z2 = $sth->execute;
202             $t = $sth->fetchall_arrayref;
203             } else {
204             $_DEBUG && print STDERR "$$ DESTROY: DELETE FROM pids failed\n";
205             $t = [[-2]];
206             }
207             $dbh->commit;
208             $_DEBUG and print STDERR "$$ DESTROY npids=$t->[0][0]\n";
209             1;
210             };
211 34 50       209 $dbh && eval { $dbh->disconnect };
  34         3485  
212 34 100 33     4862 if ($t && $t->[0] && $t->[0][0] == 0) {
      66        
213 23 50       114 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
214 23 100       287 if (!$self->{persist}) {
215 22         22004313 sleep 1;
216 22         8638 unlink $self->{db_file};
217             }
218             } else {
219             }
220             }
221              
222             sub _status {
223             # if transactions are desired, they must be provided by the caller
224 22405     22405   32181 my $self = shift;
225 22405         46715 my $dbh = $self->_dbh;
226 22405 0 33     50643 return if !$dbh && $self->{_DESTROY};
227 22405 100       44017 if (@_ == 1) {
    50          
228 22197         107806 my $sth = $dbh->prepare("SELECT value FROM status WHERE key=?");
229 22197 0 33     1354201 if (!$sth && $self->{_DESTROY}) {
230 0         0 warn "prepare failed in global destruction: $$";
231 0         0 return;
232             }
233              
234 22197         38655 my $key = $_[0];
235 22197     22197   112993 my $z = _try( 3, sub { $sth->execute($key) } );
  22197         1259441  
236              
237 22197 50       83095 if (!$z) {
238 0         0 carp __PACKAGE__, ": lookup on status key '$_[0]' failed";
239 0         0 return;
240             }
241 22197         116256 my $t = $sth->fetchall_arrayref;
242 22197 100       57321 if (@$t == 0) {
243 22126         326375 return; # no value
244             }
245 71         1649 return $t->[0][0];
246             } elsif (@_ == 2) {
247 208         1048 my ($key,$value) = @_;
248 208         1981 my $sth1 = $dbh->prepare("DELETE FROM status WHERE key=?");
249 208         21290 my $sth2 = $dbh->prepare("INSERT INTO status VALUES(?,?)");
250              
251 208     208   13622 my $z1 = _try( 3, sub { $sth1->execute($key) } );
  208         23759  
252 208   33 208   2623 my $z2 = $z1 && _try( 5, sub { $sth2->execute($key,$value) } );
  208         2013206  
253              
254 208   33     7634 return $z1 && $z2;
255             } else {
256 0         0 croak "Forks::Queue::SQLite: wrong number of args to _status call";
257             }
258 0         0 return;
259             }
260              
261             sub end {
262 8     8 1 10002261 my $self = shift;
263 8         86 my $dbh = $self->_dbh;
264              
265 8         45 my $end = $self->_end;
266 8 50       127 if ($end) {
267 0         0 carp "Forks::Queue: end() called from $$, ",
268             "previously called from $end";
269             }
270              
271 8 50       64 if (!$end) {
272 8         81 $dbh->begin_work;
273 8         260 $self->_status("end",$$);
274 8         109298 $dbh->commit;
275             }
276 8         140 $self->_notify;
277 8         45 return;
278             }
279              
280             sub _end {
281 22157     22157   37238 my $self = shift;
282 22157   100     84126 return $self->{_end} ||= $self->_status("end");
283             # XXX - can end condition be cleared? Not yet, but when it can,
284             # this code will have to change
285             }
286              
287              
288             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
289              
290             sub Forks::Queue::SQLite::MagicLimit::TIESCALAR {
291 4     4   63 my ($pkg,$obj) = @_;
292 4         81 return bless \$obj,$pkg;
293             }
294              
295             sub Forks::Queue::SQLite::MagicLimit::FETCH {
296 82 50   82   548 $XDEBUG && print STDERR "MagicLimit::FETCH => ",${$_[0]}->{limit},"\n";
  0         0  
297 82         133 return ${$_[0]}->{limit};
  82         1070  
298             }
299              
300             sub Forks::Queue::SQLite::MagicLimit::STORE {
301 4     4   16 my ($tie,$val) = @_;
302 4 50       43 $XDEBUG && print STDERR "MagicLimit::STORE => $val\n";
303 4         39 my $queue = $$tie;
304 4         18 my $oldval = $queue->{limit};
305 4         33 $queue->{limit} = $val;
306              
307 4         17 my $dbh = $queue->_dbh;
308 4         50 $dbh->begin_work;
309 4         77 $queue->_status("limit",$val);
310 4         43625 $dbh->commit;
311 4         64 return $oldval;
312             }
313              
314             sub limit :lvalue {
315 36     36 1 1000786 my $self = shift;
316 36 100       564 if (!$self->{_limit_magic}) {
317 4         233 tie $self->{_limit_magic}, 'Forks::Queue::SQLite::MagicLimit', $self;
318 4 50       38 $XDEBUG && print STDERR "tied \$self->\{_limit_magic\}\n";
319             }
320 36 100       210 if (@_) {
321 10         76 $self->_dbh->begin_work;
322 10 50       225 $XDEBUG && print STDERR "setting _limit_magic to $_[0]\n";
323 10         101 $self->_status("limit", shift);
324 10 100       63 if (@_) {
325 6 50       42 $XDEBUG && print STDERR "setting on_limit to $_[0]\n";
326 6         123 $self->_status("on_limit", $self->{on_limit} = $_[0]);
327             }
328 10         54 $self->_dbh->commit;
329             } else {
330 26         191 $self->{limit} = $self->_status("limit");
331 26 50       111 $XDEBUG && print STDERR "updating {limit} to $self->{limit}\n";
332             }
333 36         391 return $self->{_limit_magic};
334             }
335              
336             sub status {
337 178     178 1 20008669 my $self = shift;
338 178         746 my $dbh = $self->_dbh;
339 178         790 my $status = {};
340 178         2162 my $sth = $dbh->prepare("SELECT key,value FROM status");
341 178         51303 my $z = $sth->execute;
342 178         5956 my $tt = $sth->fetchall_arrayref;
343 178         1083 foreach my $t (@$tt) {
344 903         3333 $status->{$t->[0]} = $t->[1];
345             }
346 178         745 $status->{avail} = $self->_avail; # update {count}, {avail}
347 178   100     1413 $status->{end} //= 0;
348 178         2187 return $status;
349             }
350              
351             sub _avail {
352             # if transactions are needed, set them up in the caller
353 3660     3660   8723 my ($self,$dbh) = @_;
354 3660   33     13081 $dbh ||= $self->_dbh;
355 3660 50       7813 return unless $dbh;
356 3660         19213 my $sth = $dbh->prepare("SELECT COUNT(*) FROM the_queue");
357 3660 50       205014 return unless $sth;
358 3660         186294 my $z = $sth->execute;
359 3660         52178 my $tt = $sth->fetchall_arrayref;
360 3660         52966 return $self->{avail} = $tt->[0][0];
361             }
362              
363             sub _maintain {
364 0     0   0 my ($self) = @_;
365 0         0 return;
366             }
367              
368             sub push {
369 140     140 1 928 my ($self,@items) = @_;
370 140         1939 $self->_push(+1,@items);
371             }
372              
373             sub enqueue {
374 2     2 1 1498 my ($self,@items) = @_;
375 2         6 my $tfactor = +1;
376 2         6 my (@deferred_items,$failed_items);
377 2         4 my $pushed = 0;
378 2   33     16 my $_DEBUG = $self->{debug} // $DEBUG;
379              
380 2 50       27 if ($self->_end) {
381             carp "Forks::Queue: put call from process $$ ",
382 0         0 "after end call from process " . $self->{_end};
383 0         0 return 0;
384             }
385              
386 2         7 my $limit = $self->{limit};
387 2 50       8 $limit = 9E9 if $self->{limit} <= 0;
388 2         8 my $dbh = $self->_dbh;
389              
390 2         63 $dbh->begin_work;
391 2         60 my $stamp = Time::HiRes::time;
392 2         22 my $id = $self->_batch_id($stamp,$dbh);
393             # For Thread::queue compatibility, enqueue puts all items on
394             # the queue without blocking if there is even one free space,
395 2 50 33     32 if (@items && $self->_avail < $limit) {
396 2         23 foreach my $item (@items) {
397 8         31 $self->_add($item, $stamp, $id++);
398 8         19 $pushed++;
399             }
400 2         6 @items = ();
401             }
402 2         17925 $dbh->commit;
403 2 50       26 if (@items > 0) {
404 0         0 @deferred_items = @items;
405 0         0 $failed_items = @deferred_items;
406             }
407 2 50       22 $self->_notify if $pushed;
408              
409 2 50       8 if ($failed_items) {
410 0 0       0 if ($self->{on_limit} eq 'fail') {
411 0         0 carp "Forks::Queue: queue buffer is full ",
412             "and $failed_items items were not added";
413             } else {
414 0 0       0 $_DEBUG && print STDERR "$$ $failed_items on enqueue. ",
415             "Waiting for capacity\n";
416 0         0 $self->_wait_for_capacity;
417 0 0       0 $_DEBUG && print STDERR "$$ got some capacity\n";
418 0         0 $pushed += $self->enqueue(@deferred_items);
419             }
420             }
421 2         15 return $pushed;
422             }
423              
424             sub unshift {
425 0     0 1 0 my ($self,@items) = @_;
426 0         0 $self->_push(-1,@items);
427             }
428              
429             sub _add {
430             # do not use transactions here!
431             # if they are needed, call begin_work/commit from the caller
432 1308     1308   2953 my ($self,$item,$timestamp,$id) = @_;
433 1308         7443 my $jitem = $jsonizer->encode($item);
434 1308         3117 my $dbh = $self->_dbh;
435 1308         5189 my $sth = $dbh->prepare("INSERT INTO the_queue VALUES(?,?,?)");
436 1308     1308   67985 my $z = _try(3, sub { $sth->execute($timestamp, $id, $jitem) } );
  1308         46269  
437 1308         14413 return $z;
438             }
439              
440             sub _push {
441 148     148   900 my ($self,$tfactor,@items) = @_;
442              
443 148         548 my (@deferred_items,$failed_items);
444 148         420 my $pushed = 0;
445 148   33     1512 my $_DEBUG = $self->{debug} // $DEBUG;
446              
447 148 100       788 if ($self->_end) {
448             carp "Forks::Queue: put call from process $$ ",
449 2         484 "after end call from process " . $self->{_end};
450 2         240 return 0;
451             }
452              
453 146         586 my $limit = $self->{limit};
454 146 100       625 $limit = 9E9 if $self->{limit} <= 0;
455              
456 146         467 my $dbh = $self->_dbh;
457            
458              
459 146         2097 $dbh->begin_work;
460 146         3366 my $stamp = Time::HiRes::time;
461 146         914 my $id = $self->_batch_id($stamp,$dbh);
462 146   100     1481 while (@items && $self->_avail < $limit) {
463 1276         3150 my $item = shift @items;
464 1276         3984 $self->_add($item, $stamp, $id++);
465 1276         5722 $pushed++;
466             }
467 146         1556443 $dbh->commit;
468 146 100       1696 if (@items > 0) {
469 21         136 @deferred_items = @items;
470 21         76 $failed_items = @deferred_items;
471             }
472 146 100       1776 $self->_notify if $pushed;
473              
474 146 100       526 if ($failed_items) {
475 21 100       181 if ($self->{on_limit} eq 'fail') {
476 13         5607 carp "Forks::Queue: queue buffer is full ",
477             "and $failed_items items were not added";
478             } else {
479 8 50       214 $_DEBUG && print STDERR "$$ $failed_items on put. ",
480             "Waiting for capacity\n";
481 8         152 $self->_wait_for_capacity;
482 8 50       52 $_DEBUG && print STDERR "$$ got some capacity\n";
483 8         187 $pushed += $self->_push($tfactor,@deferred_items);
484             }
485             }
486 146         4961 return $pushed;
487             }
488              
489             sub _wait_for_item {
490 7     7   37 my $self = shift;
491 7         19 my $ready = 0;
492 7         26 do {
493 2121   100     9873 $ready = $self->_avail || $self->_end || $self->_expired;
494 2121 100 50     14383220 sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
495             } while !$ready;
496 7         34 return $self->{avail};
497             }
498              
499             sub _wait_for_capacity {
500 8     8   56 my $self = shift;
501 8 50       252 if ($self->{limit} <= 0) {
502 0         0 return 9E9;
503             }
504 8         47 my $ready = 0;
505 8 50       59 my $count = @_ ? shift : 1;
506 8         87 while (!$ready) {
507 16 100       243 last if $self->_avail + $count <= $self->{limit};
508 8 50       140 last if $self->_end;
509 8   50     16001765 sleep($Forks::Queue::SLEEP_INTERVAL || 1);
510             }
511 8         69 return $self->{avail} + $count <= $self->{limit};
512             }
513              
514             sub _batch_id {
515 148     148   512 my ($self,$stamp,$dbh) = @_;
516 148   33     434 $dbh ||= $self->_dbh;
517 148         767 my $sth = $dbh->prepare("SELECT MAX(batchid) FROM the_queue WHERE timestamp=?");
518 148         24301 my $z = $sth->execute($stamp);
519 148         1912 my $tt = $sth->fetchall_arrayref;
520 148 50       692 if (@$tt == 0) {
521 0         0 return 0;
522             } else {
523 148         2138 return $tt->[0][0];
524             }
525             }
526              
527             sub dequeue {
528 8     8 1 3362 my $self = shift;
529 8 50       48 Forks::Queue::_validate_input($_[0], 'count', 1) if @_;
530 3   50     20 my $count = $_[0] || 1;
531 3 50 33     13 if ($self->limit > 0 && $count > $self->limit) {
532 0         0 croak "dequeue: exceeds queue size limit";
533             }
534 3 50       15 if ($self->{style} ne 'lifo') {
535 3 50       21 return @_ ? $self->_retrieve(-1,1,2,0,$_[0])
536             : $self->_retrieve(-1,1,2,0);
537             } else {
538 0 0       0 return @_ ? $self->_retrieve(+1,1,2,0,$_[0])
539             : $self->_retrieve(+1,1,2,0);
540             }
541             }
542              
543             sub shift :method {
544 37     37 1 1000335 my $self = shift;
545             # purge, block
546 37 100       480 return @_ ? $self->_retrieve(-1,1,1,0,$_[0]) : $self->_retrieve(-1,1,1,0);
547             }
548              
549             sub pop {
550 9     9 1 2729 my $self = shift;
551 9 100       43 Forks::Queue::_validate_input($_[0], 'index', 1) if @_;
552             # purge, block
553 9   100     62 my @popped = $self->_retrieve(+1,1,1,0,$_[0] // 1);
554 9 100       72 return @_ ? reverse(@popped) : $popped[0];
555             }
556              
557             sub shift_nb {
558 1     1 1 7 my $self = shift;
559             # purge, no block
560 1 50       4 return @_ ? $self->_retrieve(-1,1,0,0,$_[0]) : $self->_retrieve(-1,1,0,0);
561             }
562              
563             sub pop_nb {
564 2     2 1 15 my $self = shift;
565             # purge, no block
566 2 50       27 my @popped = @_
567             ? $self->_retrieve(+1,1,0,0,$_[0]) : $self->_retrieve(+1,1,0,0);
568 2 50       13 return @_ ? @popped : $popped[0];
569 0         0 return @popped;
570             }
571              
572             sub extract {
573 26     26 1 15332 my $self = shift;
574 26 100       184 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
575 23   100     89 my $index = shift || 0;
576 23 100       96 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
577 18   100     90 my $count = $_[0] // 1;
578 18         43 my $reverse = 0;
579              
580 18         33 my $tfactor = -1;
581 18 100       68 if ($self->{style} eq 'lifo') {
582 9         18 $tfactor = 1;
583 9         20 $reverse = 1;
584             }
585 18 50       54 if ($count <= 0) {
586 0         0 carp "Forks::Queue::extract: count must be positive";
587 0         0 return;
588             }
589 18 100       49 if ($index < 0) {
590 8 50       29 if ($index + $count > 0) {
591 0         0 $count = -$index;
592             }
593 8         27 $index = -$index - 1;
594 8         17 $index -= $count - 1;
595              
596 8         32 $tfactor *= -1;
597 8         21 $reverse = !$reverse;
598             }
599             # purge, no block
600 18         178 my @items = $self->_retrieve( $tfactor, 1, 0, $index, $index+$count);
601 18 100       71 if ($reverse) {
602 9         30 @items = reverse(@items);
603             }
604 18 100 66     155 return @_ ? @items : $items[0] // ();
605             }
606              
607             sub insert {
608 14     14 1 2621 my ($self, $pos, @items) = @_;
609 14         59 Forks::Queue::_validate_input($pos,'index');
610 10         15 my (@deferred_items);
611 10   33     62 my $_DEBUG = $self->{debug} // $DEBUG;
612 10         20 my $inserted = 0;
613 10 50       31 if ($self->_end) {
614             carp "Forks::Queue: insert call from process $$ ",
615 0         0 "after end call from process " . $self->{_end} . "\n";
616 0         0 return 0;
617             }
618              
619 10         25 my $limit = $self->{limit};
620 10 50       28 $limit = 9E9 if $self->{limit} <= 0;
621              
622 10 100       31 if ($pos >= $self->_avail) {
623 2 50       12 if ($self->{on_limit} eq 'tq-compat') {
624 0         0 my $limit = $self->{limit};
625 0         0 $self->{limit} = 0;
626 0         0 my $enq = $self->enqueue(@items);
627 0         0 $self->{limit} = $limit;
628 0         0 return $enq;
629             } else {
630 2         45 return $self->put(@items);
631             }
632             }
633 8 100       28 if ($pos <= -$self->_avail) {
634             #return $self->unshift(@items);
635 2         6 $pos = 0;
636             }
637 8 100       22 if ($pos < 0) {
638 2         8 $pos += $self->_avail;
639             }
640              
641             # find timestamps for items $pos and $pos+1
642             # choose 0+@items intermediate timestamps
643             # if $pos+1 is undef, use current time as timestamp
644             # as in the _push function, add items
645 8         23 my $dbh = $self->_dbh;
646 8         39 my $sths = $dbh->prepare(
647             "SELECT timestamp,batchid FROM the_queue ORDER BY timestamp,batchid LIMIT ?");
648 8         498 $dbh->begin_work;
649 8         919 my $z = $sths->execute($pos+1);
650 8         144 my $tt = $sths->fetchall_arrayref;
651 8         42 $DB::single = 1;
652 8         18 my ($t1,$t2,$b1,$b2);
653 8 50       24 if (@$tt > 0) {
654 8         19 $t2 = $tt->[-1][0];
655 8         13 $b2 = $tt->[-1][1];
656             } else {
657 0         0 $t2 = Time::HiRes::time();
658 0         0 $b2 = 0;
659             }
660 8 50       25 if (@$tt == $pos) {
    100          
661 0         0 $t1 = $t2;
662 0         0 $b1 = $b2;
663 0         0 $b2 = 0;
664 0 0       0 if ($t2 < 0) {
665 0         0 $t2 = -Time::HiRes::time();
666             } else {
667 0         0 $t2 = Time::HiRes::time();
668             }
669             } elsif ($pos == 0) {
670 2         9 $t1 = $t2 - 100000;
671 2         5 $b1 = 0;
672             } else {
673 6         12 $t1 = $tt->[-2][0];
674 6         11 $b1 = $tt->[-2][1];
675             }
676              
677 8         14 my ($t3,$b3);
678 8 100       21 if ($t1 == $t2) {
679 6         32 my $sthr = $dbh->prepare("UPDATE the_queue SET batchid=batchid+?
680             WHERE timestamp=? AND batchid>=?");
681 6         1335 $sthr->execute(0+@items,$t1,$b2);
682 6         29 $t3 = $t1;
683 6         77 $b3 = $b1+1;
684             } else {
685 2         8 $t3 = ($t1 + $t2) / 2;
686 2         5 $b3 = 0;
687 2 50       9 if ($t3 == $t1) {
688 0         0 $b3 = $b1+1;
689             }
690             }
691 8 50       47 if ($self->{on_limit} eq "tq-compat") {
692 0         0 for my $item (@items) {
693 0         0 $self->_add($item,$t3,$b3);
694 0         0 $inserted++;
695 0         0 $b3++;
696             }
697 0         0 @items = ();
698             } else {
699 8   100     60 while (@items && $self->_avail < $limit) {
700 24         57 my $item = shift @items;
701 24     24   180 _try(3, sub { $self->_add($item,$t3,$b3) });
  24         76  
702 24         71 $inserted++;
703 24         383 $b3++;
704             }
705             }
706 8         83373 $dbh->commit;
707 8 100       84 if (@items > 0) {
708 2         16 @deferred_items = @items;
709             }
710 8 100       32 if (@deferred_items) {
711 2 50       18 if ($self->{on_limit} eq 'fail') {
712 2         838 carp "Forks::Queue: queue buffer is full and ",
713             0+@deferred_items," items were not inserted";
714             } else {
715 0 0       0 $_DEBUG && print STDERR "$$ ",0+@deferred_items, " on insert. ",
716             "Waiting for capacity\n";
717 0         0 $self->_wait_for_capacity;
718 0 0       0 $_DEBUG && print STDERR "$$ got some capacity\n";
719 0         0 $inserted += $self->insert($pos+$inserted,@deferred_items);
720             }
721             }
722 8 50       376 $self->_notify if $inserted;
723 8         111 return $inserted;
724             }
725              
726             sub _retrieve {
727 117     117   292 my $self = shift;
728 117         279 my $tfactor = shift;
729             # tfactor = -1: select newest items first
730             # tfactor = +1: select oldest items first
731 117         236 my $purge = shift;
732             # purge = 0: do not delete items that we retrieve
733             # purge = 1: delete items that we retrieve
734 117         261 my $block = shift;
735             # block = 0: no block if queue is empty
736             # block = 1: block only if queue is empty
737             # block = 2: block if full request can not be fulfilled
738 117         249 my $lo = shift;
739 117 100       451 my $hi = @_ ? $_[0] : $lo+1;
740 117 50       390 return if $hi <= $lo;
741              
742             # attempt to retrieve items $lo .. $hi and return them
743             # retrieved items are removed from the queue if $purge is set
744             # get newest items first if $tfactor > 0, oldest first if $tfactor < 0
745             # only block while
746             # $block is set
747             # zero items have been found
748              
749 117 50 66     475 if ($lo > 0 && $block) {
750 0         0 carp __PACKAGE__, ": _retrieve() didn't expect block=$block and lo=$lo";
751 0         0 $block = 0;
752             }
753              
754 117 100       511 my $order = $tfactor > 0
755             ? "timestamp DESC,batchid DESC" : "timestamp,batchid";
756 117         484 my $dbh = $self->_dbh;
757 117         1398 my $sths = $dbh->prepare(
758             "SELECT item,batchid,timestamp FROM the_queue
759             ORDER BY $order LIMIT ?");
760 117   66     11984 my $sthd = $purge && $dbh->prepare(
761             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND batchid=?");
762 117         5247 my @return;
763 117 50       404 if (!$sths) {
764 0         0 warn "prepare queue SELECT statement failed: $dbh->errstr";
765             }
766              
767 117         452 while (@return <= 0) {
768 19936 50       51920 my $limit = $hi - @return + ($lo < 0 ? $lo : 0);
769 19936         83425 $dbh->begin_work;
770 19936   33     1631432 my $z = $sths && $sths->execute($limit);
771 19936   33     332724 my $tt = $sths && $sths->fetchall_arrayref;
772 19936 50 33     58587 if ($lo < 0 && -$lo > @$tt) {
773 0         0 $hi += (@$tt - $lo);
774 0         0 $lo += (@$tt - $lo);
775             }
776 19936 100 66     133496 if (!$tt || @$tt == 0) {
    100 66        
    100 100        
777 14         481 $dbh->rollback;
778 14 100       82 if ($block) {
779 7         76 $self->_wait_for_item;
780 7         35 next;
781             } else {
782 7         185 return;
783             }
784             } elsif ($block > 1 && $lo == 0 && @$tt < $hi) {
785             # not enough items on queue to satisfy request
786 19812         479076 $dbh->rollback;
787 19812         100667 next;
788             } elsif (@$tt <= $lo) {
789             # not enough items on queue to satisfy request
790 6         286 $dbh->rollback;
791 6         225 return;
792             }
793 104 100       335 $hi = @$tt if $hi > @$tt;
794              
795 104         629 foreach my $itt ($lo .. $hi-1) {
796 443 50       1245 if (!defined($tt->[$itt])) {
797 0         0 warn "\nResult $itt from $lo .. $hi-1 is undefined!";
798             }
799 443         675 my ($item,$bid,$timestamp) = @{$tt->[$itt]};
  443         1098  
800 443         2752 CORE::push @return, $jsonizer->decode($item);
801 443 100       1045 if ($purge) {
802              
803 399     399   2026 my $zd = _try(4, sub { $sthd->execute($item,$timestamp,$bid)} );
  399         18034  
804 399 50       1808 if (!$zd) {
805 0         0 warn "Forks::Queue::SQLite: ",
806             "purge failed: $item,$timestamp,$bid";
807             }
808             }
809             }
810 104         684172 $dbh->commit;
811             } continue {
812 19923 100       48796 if ($block) {
813 19864 100 100     58704 if ($self->_end || $self->_expired) {
814 29         192 $block = 0;
815             }
816             }
817             }
818 104 100 33     2830 return @_ ? @return : $return[0] // ();
819             }
820              
821              
822              
823             sub _pop {
824 0     0   0 my $self = shift;
825 0         0 my $tfactor = shift;
826 0         0 my $purge = shift;
827 0         0 my $block = shift;
828 0         0 my $wantarray = shift;
829 0         0 my ($count) = @_;
830 0   0     0 $count ||= 1;
831              
832 0         0 my $order = "timestamp,batchid";
833 0 0       0 if ($tfactor > 0) {
834 0         0 $order = "timestamp DESC,batchid DESC";
835             }
836 0         0 my $dbh = $self->_dbh;
837 0         0 my $sths = $dbh->prepare(
838             "SELECT item,timestamp,pid FROM the_queue ORDER BY $order LIMIT ?");
839 0         0 my $sthd = $dbh->prepare(
840             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND pid=?");
841 0         0 my @return = ();
842 0         0 while (@return == 0) {
843 0         0 my $limit = $count - @return;
844 0         0 my $z = $sths->execute($limit);
845 0         0 my $tt = $sths->fetchall_arrayref;
846 0 0       0 if (@$tt == 0) {
847 0 0 0     0 if ($block && $self->_wait_for_item) {
848 0         0 next;
849             } else {
850 0         0 last;
851             }
852             }
853 0         0 foreach my $t (@$tt) {
854 0         0 my ($item,$bid,$timestamp) = @$t;
855 0         0 CORE::push @return, $jsonizer->decode($item);
856 0 0       0 if ($purge) {
857 0         0 $dbh->begin_work;
858 0         0 my $zd = $sthd->execute($item,$timestamp,$bid);
859 0 0       0 if (!$zd) {
860 0         0 carp "purge failed: $item,$timestamp,$bid\n";
861             }
862 0         0 $dbh->commit;
863             }
864             }
865             }
866 0 0       0 return $wantarray ? @return : $return[0];
867             }
868              
869             sub clear {
870 16     16 1 5936 my $self = shift;
871 16         73 my $dbh = $self->_dbh;
872 16         169 $dbh->begin_work;
873 16         535 $dbh->do("DELETE FROM the_queue");
874 16         190559 $dbh->commit;
875             }
876              
877             sub peek_front {
878 35     35 0 742 my $self = shift;
879 35         73 my ($index) = @_;
880 35   100     116 $index ||= 0;
881 35 100       86 if ($index < 0) {
882 10         36 return $self->peek_back(-$index - 1);
883             }
884             # no purge, no block, always retrieve a single item
885 25         96 return $self->_retrieve(-1,0,0,$index);
886             }
887              
888             sub peek_back {
889 27     27 0 65 my $self = shift;
890 27         50 my ($index) = @_;
891 27   100     107 $index ||= 0;
892 27 100       70 if ($index < 0) {
893 5         21 return $self->peek_front(-$index - 1);
894             }
895             # no purge, no block, always retrieve a single item
896 22         64 return $self->_retrieve(+1,0,0,$index);
897             }
898              
899             sub _notify {
900 161 50   161   862 return unless $Forks::Queue::NOTIFY_OK;
901              
902 161         469 my $self = shift;
903 161         650 my $dbh = $self->_dbh;
904 161         1810 my $sth = $dbh->prepare("SELECT pid,tid FROM pids");
905 161         28590 my $z = $sth->execute;
906 161         4763 my $pt = $sth->fetchall_arrayref;
907 161         1105 my @pids = map { $_->[0] } grep { $_->[0] != $$ } @$pt;
  85         442  
  246         1393  
908 161 100       647 if (@pids) {
909 64 50 33     476 ($self->{debug} // $DEBUG) && print STDERR "$$ notify: @pids\n";
910 64         3725 kill 'IO', @pids;
911             }
912 161 100       590 my @tids = map { $_->[1] } grep { $_->[0] == $$ && $_->[1] != TID() } @$pt;
  0         0  
  246         1591  
913 161 50       3108 if (@tids) {
914 0         0 foreach my $tid (@tids) {
915 0         0 my $thr = threads->object($tid);
916 0 0       0 $thr && $thr->kill('IO');
917             }
918             }
919             }
920              
921             my $id = 0;
922             sub _impute_file {
923 17     17   131 my $base = $0;
924 17         162 $base =~ s{.*[/\\](.)}{$1};
925 17         92 $base =~ s{[/\\]$}{};
926 17         57 $id++;
927 17         50 my @candidates;
928 17 50       272 if ($^O eq 'MSWin32') {
929 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
930             } else {
931 17         158 @candidates = qw(/tmp /var/tmp);
932             }
933 17         240 for my $candidate ($ENV{FORKS_QUEUE_DIR},
934             $ENV{TMPDIR}, $ENV{TEMP},
935             $ENV{TMP}, @candidates,
936             $ENV{HOME}, ".") {
937 85 50 66     1203 if (defined($candidate) && $candidate ne '' &&
      66        
      33        
      33        
938             -d $candidate && -w _ && -x _) {
939 17         276 return $candidate . "/fq-$$-$id-$base.sql3";
940             }
941             }
942 0           my $file = "./fq-$$-$id-$base.sql3";
943 0           carp __PACKAGE__, ": queue db file $file might not be a good location!";
944 0           return $file;
945             }
946              
947             sub _DUMP {
948 0     0     my ($self,$fh_dump) = @_;
949 0           my $dbh = $self->_dbh;
950 0   0       $fh_dump ||= *STDERR;
951              
952 0           my $sth = $dbh->prepare("SELECT * FROM pids");
953 0           my $z = $sth->execute;
954 0           print {$fh_dump} "\n\n=== pids ===\n------------\n";
  0            
955 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
956 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
957             }
958              
959 0           $sth = $dbh->prepare("SELECT * FROM status");
960 0           $z = $sth->execute;
961 0           print {$fh_dump} "\n\n=== status ===\n--------------\n";
  0            
962 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
963 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
964             }
965              
966 0           $sth = $dbh->prepare("SELECT * FROM the_queue");
967 0           $z = $sth->execute;
968 0           print {$fh_dump} "\n\n=== queue ===\n-------------\n";
  0            
969 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
970 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
971             }
972 0           print {$fh_dump} "\n\n";
  0            
973             }
974              
975             1;
976              
977             =head1 NAME
978              
979             Forks::Queue::SQLite - SQLite-based implementation of Forks::Queue
980              
981             =head1 VERSION
982              
983             0.15
984              
985             =head1 SYNOPSIS
986              
987             my $q = Forks::Queue->new( impl => 'SQLite', db_file => "queue-file" );
988             $q->put( "job1" );
989             $q->put( { name => "job2", task => "do something", data => [42,19] } );
990             ...
991             $q->end;
992             for my $w (1 .. $num_workers) {
993             if (fork() == 0) {
994             my $task;
995             while (defined($task = $q->get)) {
996             ... perform task in child ...
997             }
998             exit;
999             }
1000             }
1001              
1002             =head1 DESCRIPTION
1003              
1004             SQLite-based implementation of L.
1005             It requires the C libraries and the L
1006             Perl module.
1007              
1008             =head1 METHODS
1009              
1010             See L for an overview of the methods supported by
1011             this C implementation.
1012              
1013             =head2 new
1014              
1015             =head2 $queue = Forks::Queue::SQLite->new( %opts )
1016              
1017             =head2 $queue = Forks::Queue->new( impl => 'SQLite', %opts )
1018              
1019             The C constructor recognized the following
1020             configuration options.
1021              
1022             =over 4
1023              
1024             =item * db_file
1025              
1026             The name of the file to use to store queue data and metadata.
1027             If omitted, a temporary filename is chosen.
1028              
1029             =item * style
1030              
1031             =item * limit
1032              
1033             =item * on_limit
1034              
1035             =item * join
1036              
1037             =item * persist
1038              
1039             See L for descriptions of these options.
1040              
1041             =back
1042              
1043             =head1 LICENSE AND COPYRIGHT
1044              
1045             Copyright (c) 2017-2019, Marty O'Brien.
1046              
1047             This library is free software; you can redistribute it and/or modify
1048             it under the same terms as Perl itself, either Perl version 5.10.1 or,
1049             at your option, any later version of Perl 5 you may have available.
1050              
1051             See http://dev.perl.org/licenses/ for more information.
1052              
1053             =cut