File Coverage

blib/lib/App/Basis/Queue.pm
Criterion Covered Total %
statement 33 327 10.0
branch 0 84 0.0
condition 0 21 0.0
subroutine 11 49 22.4
pod 13 14 92.8
total 57 495 11.5


line stmt bran cond sub pod time code
1             # ABSTRACT: Simple database backed FIFO queues
2              
3              
4              
5             package App::Basis::Queue;
6             $App::Basis::Queue::VERSION = '0.3';
7 1     1   66223 use 5.10.0;
  1         4  
  1         47  
8 1     1   6 use feature 'state';
  1         2  
  1         81  
9 1     1   5 use strict;
  1         12  
  1         25  
10 1     1   5 use warnings;
  1         1  
  1         32  
11 1     1   775 use Moo;
  1         16864  
  1         5  
12 1     1   2838 use MooX::Types::MooseLike::Base qw/InstanceOf HashRef Str/;
  1         6905  
  1         117  
13 1     1   1131 use JSON;
  1         20836  
  1         8  
14 1     1   1213 use Data::UUID;
  1         982  
  1         82  
15 1     1   8 use Try::Tiny;
  1         1  
  1         64  
16 1     1   5 use POSIX qw( strftime);
  1         2  
  1         11  
17 1     1   67 use Time::HiRes qw(gettimeofday tv_interval );
  1         1  
  1         8  
18              
19             my $MAX_PROCESS_ITEMS = 100;
20              
21             # -----------------------------------------------------------------------------
22             ## class initialisation
23             ## instancation variables
24             # -----------------------------------------------------------------------------
25              
26             has 'dbh' => (
27             is => 'ro',
28             isa => InstanceOf ['DBI::db']
29             );
30              
31             has 'prefix' => (
32             is => 'ro',
33             isa => Str,
34             default => sub { 'qsdb'; },
35             );
36              
37             has 'debug' => (
38             is => 'rw',
39             default => sub { 0; },
40             writer => 'set_debug'
41             );
42              
43             has 'skip_table_check' => (
44             is => 'ro',
45             default => sub { 0; },
46             ) ;
47              
48             # -----------------------------------------------------------------------------
49             # once the class in instanciated then we need to ensure that we have the
50             # tables created
51              
52              
53             sub BUILD {
54 0     0 0   my $self = shift;
55              
56 0           $self->_set_db_type( $self->{dbh}->{Driver}->{Name} );
57 0 0         die("Valid Database connection required") if ( !$self->_db_type() );
58              
59             # if we are using sqlite then we need to set a pragma to allow
60             # cascading deletes on FOREIGN keys
61 0 0         if ( $self->_db_type() eq 'SQLite' ) {
62 0           $self->{dbh}->do("PRAGMA foreign_keys = ON");
63             }
64              
65             # ensue we have the tables created (if wanted)
66 0 0         $self->_create_tables() if( !$self->skip_table_check);
67              
68             # get the first list of queues we have
69 0           $self->list_queues();
70             }
71              
72             # -----------------------------------------------------------------------------
73             ## class private variables
74             # -----------------------------------------------------------------------------
75              
76             has _queue_list => (
77             is => 'rwp', # like ro, but creates _set_queue_list too
78             lazy => 1,
79             default => sub { {} },
80             writer => '_set_queue_list',
81             init_arg => undef # dont allow setting in constructor ;
82             );
83              
84             has _db_type => (
85             is => 'rwp', # like ro, but creates _set_queue_list too
86             lazy => 1,
87             default => sub {''},
88             writer => '_set_db_type',
89             init_arg => undef # dont allow setting in constructor ;
90             );
91              
92             has _processor => (
93             is => 'ro',
94             lazy => 1,
95             default => sub { my $hostname = `hostname`; $hostname =~ s/\s//g; $hostname . "::$ENV{USER}" . "::" . $$ },
96             init_arg => undef # dont allow setting in constructor ;
97             );
98              
99             # -----------------------------------------------------------------------------
100             ## class private methods
101             # -----------------------------------------------------------------------------
102              
103             sub _debug {
104 0     0     my $self = shift;
105              
106 0 0         return if ( !$self->{debug} );
107              
108 0           my $msg = shift;
109 0           $msg =~ s/^/ /gsm;
110              
111 0           say STDERR $msg;
112             }
113              
114             # -----------------------------------------------------------------------------
115             sub _build_sql_stmt {
116 0     0     my ( $query, $p ) = @_;
117 0 0         our @params = $p ? @$p : ();
118 0           $query =~ s/\s+$//;
119 0 0         $query .= ' ;' if ( $query !~ /;$/ );
120              
121             # make sure we repesent NULL properly, do quoting - only basic its only for debug
122 0           our $i = 0;
123             {
124              
125 0           sub _repl {
126 0     0     my $out = 'NULL';
127              
128             # quote strings, leave numbers untouched, not doing floats
129 0 0         if ( defined $params[$i] ) {
130 0 0         $out = $params[$i] =~ /^\d+$/ ? $params[$i] : "'$params[$i]'";
131             }
132 0           $i++;
133              
134 0           return $out;
135             }
136 0 0 0       $query =~ s/\?/_repl/gex if ( @params && scalar(@params) );
  0            
137             }
138              
139 0           return $query;
140             }
141              
142             # -----------------------------------------------------------------------------
143             sub _query_db {
144 0     0     state $sth_map = {};
145 0           my $self = shift;
146 0           my ( $query, $p, $no_results ) = @_;
147 0 0         my @params = $p ? @$p : ();
148 0           my %result;
149              
150 0           $query =~ s/\s+$//;
151 0 0         $query .= ' ;' if ( $query !~ /;$/ );
152              
153 0 0         if ( $self->{debug} ) {
154              
155 0           $self->_debug( "ACTUAL QUERY: $query\nQUERY PARAMS: " . to_json(@params) );
156 0           my $sql = _build_sql_stmt( $query, $p );
157 0           $self->_debug( 'BUILT QUERY : ' . $sql . "\n" );
158             }
159              
160             try {
161 0     0     my $sth;
162              
163             # key based on query and fields we are using
164 0           my $key = "$query." . join( '.', @params );
165 0 0         if ( $sth_map->{$key} ) {
166 0           $sth = $sth_map->{$key};
167             }
168             else {
169              
170 0           $sth = $self->{dbh}->prepare($query);
171              
172             # save the handle for next time
173 0           $sth_map->{$key} = $sth;
174              
175             }
176 0           my $rv = $sth->execute(@params);
177 0 0         if ( !$no_results ) {
178              
179             # so as to get an array of hashes
180 0           $result{rows} = $sth->fetchall_arrayref( {} );
181 0           $result{row_count} = scalar( @{ $result{rows} } );
  0            
182 0           $result{success} = 1;
183              
184 0           $self->_debug( 'QUERY RESPONSE: ' . to_json( $result{rows} ) . "\n" );
185             }
186             else {
187 0 0         if ($rv) {
188 0           $result{row_count} = $sth->rows;
189 0           $result{success} = 1;
190             }
191             }
192              
193             }
194             catch {
195 0     0     $result{error} = "Failed to prepare/execute query: $query\nparams: " . to_json($p) . "\nerror: $@\n";
196              
197             # $self->_debug( $result{error} );
198 0           };
199 0           return \%result;
200             }
201              
202             # -----------------------------------------------------------------------------
203             sub _update_db {
204 0     0     my $self = shift;
205 0           my ( $table, $query, $params ) = @_;
206              
207 0           $query = "UPDATE $table $query";
208              
209 0           my $resp = $self->_query_db( $query, $params, 1 );
210              
211 0           return $resp;
212             }
213              
214             # -----------------------------------------------------------------------------
215             # we will hold onto statement handles to speed up inserts
216              
217             sub _insert_db {
218 0     0     state $sth_map = {};
219 0           my $self = shift;
220 0           my ( $table, $f, $p ) = @_;
221 0 0         my @params = $p ? @$p : ();
222              
223             # key based on table and fields we are inserting
224 0           my $key = "$table." . join( '.', @$f );
225 0           my ( $query, $sql, $sth );
226              
227 0 0         if ( $sth_map->{$key} ) {
228 0           $sth = $sth_map->{$key};
229             }
230             else {
231 0           $query = "INSERT INTO $table (" . join( ',', @$f ) . ") values (" . join( ',', map {'?'} @$f ) . ") ;";
  0            
232              
233 0           $self->_debug($query);
234 0           $sth = $self->{dbh}->prepare($query);
235              
236             # cache the handle for next time
237 0           $sth_map->{$key} = $sth;
238             }
239 0           my $rv = $sth->execute(@params);
240              
241 0           return { row_count => $rv, error => 0 };
242             }
243              
244             # -----------------------------------------------------------------------------
245              
246             sub _delete_db_record {
247 0     0     my $self = shift;
248 0           my ( $table, $q, $v ) = @_;
249 0           my $query = "DELETE FROM $table $q ;";
250              
251             # run the delte and don't fetch results
252 0           my $resp = $self->_query_db( $query, $v, 1 );
253 0           return $resp;
254             }
255              
256             # -----------------------------------------------------------------------------
257             # as all the indexes are constructued the same, lets have a helper
258             sub _create_index_str {
259 0     0     my ( $table, $field ) = @_;
260              
261 0           return sprintf( "CREATE INDEX %s_%s_idx on %s(%s) ;", $table, $field, $table, $field );
262             }
263              
264             # -----------------------------------------------------------------------------
265             sub _create_sqlite_table {
266 0     0     my $self = shift;
267 0           my $table = shift;
268 0           $self->_debug("Creating SQLite tables");
269              
270 0           my $sql = "CREATE TABLE $table (
271             counter INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
272             id VARCHAR(128) NOT NULL UNIQUE,
273             queue_name VARCHAR(128) NOT NULL,
274             added TIMESTAMP DEFAULT current_timestamp,
275             processed BOOLEAN DEFAULT 0,
276             processor VARCHAR(128),
277             process_start TIMESTAMP,
278             processing_time FLOAT,
279             process_failure SMALLINT DEFAULT 0,
280             data TEXT ) ;";
281              
282 0           $self->_debug($sql);
283 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
  0            
284             }
285              
286             # -----------------------------------------------------------------------------
287             sub _create_postgres_table {
288 0     0     my $self = shift;
289 0           my $table = shift;
290 0           $self->_debug("Creating PostgreSQL tables");
291              
292             # big/serial creates an auto incrementing column in PostgreSQL
293 0           my $sql = "CREATE TABLE $table (
294             counter BIGSERIAL PRIMARY KEY UNIQUE,
295             id VARCHAR(128) NOT NULL UNIQUE,
296             queue_name VARCHAR(128) NOT NULL,
297             added TIMESTAMP WITH TIME ZONE DEFAULT now(),
298             processed SMALLINT DEFAULT 0,
299             processor VARCHAR(128),
300             process_start TIMESTAMP,
301             processing_time FLOAT,
302             process_failure SMALLINT DEFAULT 0,
303             data TEXT ) ;";
304              
305 0           $self->_debug($sql);
306 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
  0            
307             }
308              
309             # -----------------------------------------------------------------------------
310             sub _create_mysql_table {
311 0     0     my $self = shift;
312 0           my $table = shift;
313 0           $self->_debug("Creating MySQL tables");
314              
315 0           my $sql = "CREATE TABLE $table (
316             counter INT NOT NULL PRIMARY KEY AUTO_INCREMENT UNIQUE,
317             id VARCHAR(128) NOT NULL UNIQUE,
318             queue_name VARCHAR(128) NOT NULL,
319             added TIMESTAMP DEFAULT current_timestamp,
320             processed SMALLINT DEFAULT 0,
321             processor VARCHAR(128),
322             process_start TIMESTAMP,
323             processing_time FLOAT,
324             process_failure SMALLINT DEFAULT 0,
325             data TEXT ) ;";
326              
327 0           $self->_debug($sql);
328 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
  0            
329              
330             }
331              
332             # -----------------------------------------------------------------------------
333             # create all the tables and indexes
334             sub _create_tables {
335 0     0     my $self = shift;
336 0           my $sql;
337 0           my $table = $self->{prefix} . '_queue';
338              
339             # as the checking for tables and indexes is fraught with issues over multiple
340             # databases its easier to not print the errors and catch the creation failures
341             # and ignore them!
342 0           my $p = $self->{dbh}->{PrintError};
343 0           $self->{dbh}->{PrintError} = 0;
344              
345             # I am assuming either table does not exist then nor does the other and we should
346             # create both
347 0 0         if ( $self->_db_type() eq 'SQLite' ) {
    0          
    0          
348 0           $self->_create_sqlite_table($table);
349             }
350             elsif ( $self->_db_type() eq 'Pg' ) {
351 0           $self->_create_postgres_table($table);
352             }
353             elsif ( $self->_db_type() eq 'mysql' ) {
354 0           $self->_create_mysql_table($table);
355             }
356             else {
357 0           die "Unhandled database type " . $self->_db_type();
358             }
359              
360 0           foreach my $field (qw/counter id added queue_name processed process_failure/) {
361 0           my $sql = _create_index_str( $table, $field );
362              
363 0           $self->_debug($sql);
364 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
  0            
365             }
366              
367             # restore the PrintError setting
368 0           $self->{dbh}->{PrintError} = $p;
369             }
370              
371             # -----------------------------------------------------------------------------
372              
373              
374             sub add {
375 0     0 1   state $uuid = Data::UUID->new();
376 0           my $self = shift;
377 0           my $qname = shift;
378 0 0         my $data = @_ % 2 ? shift : {@_};
379              
380 0 0         if ( ref($data) ne 'HASH' ) {
381 0           die "add accepts a hash or a hashref of parameters";
382             }
383 0           my $status = 0;
384 0           my $resp;
385 0 0 0       if ( !$qname || !$data ) {
386 0           my $err = "Missing queue name or data";
387 0           $self->_debug($err);
388 0           warn $err;
389 0           return $status;
390             }
391              
392             try {
393 0     0     my $json_str = encode_json($data);
394              
395             # we manage the id's for the queue entries as we cannot depend on a common SQL method of adding
396             # a record and getting its uniq ID back
397             #my $uuid = Data::UUID->new();
398 0           my $message_id = $uuid->create_b64();
399 0           $resp = $self->_insert_db(
400             $self->{prefix} . '_queue',
401             [qw(id queue_name added data)],
402             [ $message_id, $qname, strftime( "%Y-%m-%d %H:%M:%S", localtime() ), $json_str ]
403             );
404              
405 0 0         $status = $message_id if ( !$resp->{error} );
406             }
407             catch {
408 0     0     my $e = $@;
409 0           warn $e;
410 0           };
411              
412 0           return $status;
413             }
414              
415             # -----------------------------------------------------------------------------
416              
417              
418             sub process {
419 0     0 1   my $self = shift;
420 0           my $qname = shift;
421 0 0         my $params = @_ % 2 ? die "method: process - Odd number of values passed where even is expected.\n" : {@_};
422 0           my $processed_count = 0;
423              
424             # update queue list
425 0           $self->list_queues();
426              
427             # if the queue does not exist
428 0 0         return 0 if ( !$self->{_queue_list}->{$qname} );
429              
430 0   0       $params->{count} ||= 1;
431 0 0 0       die __PACKAGE__ . " process requires a callback function" if ( !$params->{callback} || ref( $params->{callback} ) ne 'CODE' );
432              
433 0 0         if ( $params->{count} > $MAX_PROCESS_ITEMS ) {
434 0           warn "Reducing process count from $params->{count} to $MAX_PROCESS_ITEMS";
435 0           $params->{count} = $MAX_PROCESS_ITEMS;
436             }
437              
438             # get list of IDs we can process, as SQLite has an issue with ORDER BY and LIMIT in an UPDATE call
439             # so we have to do things in 2 stages, which means it is not easy to mark lots of records to be processed
440             # but that its possibly a good thing
441 0           my $sql = sprintf(
442             "SELECT id FROM %s_queue
443             WHERE queue_name = ?
444             AND processed = 0
445             AND process_failure = 0
446             ORDER BY added ASC
447             LIMIT ?;", $self->{prefix}
448             );
449 0           my $ids = $self->_query_db( $sql, [ $qname, $params->{count} ] );
450 0           my @t;
451 0           foreach my $row ( @{ $ids->{rows} } ) {
  0            
452 0           push @t, "'$row->{id}'";
453             }
454              
455             # if there are no items to update, return
456 0 0         return $processed_count if ( !scalar(@t) );
457 0           my $id_list = join( ',', @t );
458              
459             # mark items that I am going to process
460 0           my $update = "SET processor=?
461             WHERE id IN ( $id_list) AND processed = 0 ;";
462 0           my $resp = $self->_update_db( $self->{prefix} . "_queue", $update, [ $self->_processor() ] );
463 0 0         return $processed_count if ( !$resp->{row_count} );
464              
465             # refetch the list to find out which ones we are going to process, incase another system was doing things
466             # at the same time
467 0           $sql = sprintf(
468             "SELECT * FROM %s_queue
469             WHERE queue_name = ?
470             AND processed = 0
471             AND processor = ?
472             AND process_failure = 0
473             ORDER BY added ASC
474             LIMIT ?;", $self->{prefix}
475             );
476 0           my $info = $self->_query_db( $sql, [ $qname, $self->_processor(), $params->{count} ] );
477              
478 0           foreach my $row ( @{ $info->{rows} } ) {
  0            
479              
480             # unpack the data
481 0           $row->{data} = decode_json( $row->{data} );
482 0           my $state = 0;
483 0           my $start = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
484 0           my $st = [gettimeofday] ;
485 0           my $invalid = 0;
486 0           my $elapsed ;
487             try {
488 0     0     $state = $params->{callback}->( $self, $qname, $row );
489             }
490             catch {
491 0     0     warn "invalid callback $@";
492 0           $invalid++;
493 0           };
494 0           $elapsed = tv_interval ( $st);
495              
496 0 0         if ($invalid) {
    0          
497              
498             # if the callback was invalid then we should not mark this as a process failure
499             # just clear the processor
500 0           $update = "SET processor=?, WHERE id = ? AND processed = 0 ;";
501 0           $info = $self->_update_db( $self->{prefix} . "_queue", $update, [ '', $row->{id} ] );
502             }
503             elsif ($state) {
504             # show we have processed it
505 0           $update = "SET processed=1, process_start=?, processing_time=? WHERE id = ? AND processed = 0 ;";
506 0           $info = $self->_update_db( $self->{prefix} . "_queue", $update, [ $start, $elapsed, $row->{id} ] );
507 0           $processed_count++;
508             }
509             else {
510             # mark the failure
511 0           $update = "SET process_failure=1, processing_time=? WHERE id = ? AND processed = 0 ;";
512 0           $info = $self->_update_db( $self->{prefix} . "_queue", $update, [ $elapsed, $row->{id} ] );
513             }
514             }
515              
516 0           return $processed_count;
517             }
518              
519             # -----------------------------------------------------------------------------
520              
521              
522             sub process_failures {
523 0     0 1   my $self = shift;
524 0           my $qname = shift;
525 0 0         my $params = @_ % 2 ? die "method: process - Odd number of values passed where even is expected.\n" : {@_};
526 0           my $processed_count = 0;
527              
528             # update queue list
529 0           $self->list_queues();
530              
531             # if the queue does not exist
532 0 0         return 0 if ( !$self->{_queue_list}->{$qname} );
533              
534 0   0       $params->{count} ||= 1;
535 0 0 0       die __PACKAGE__ . " process requires a callback function" if ( !$params->{callback} || ref( $params->{callback} ) ne 'CODE' );
536              
537 0 0         if ( $params->{count} > $MAX_PROCESS_ITEMS ) {
538 0           warn "Reducing process count from $params->{count} to $MAX_PROCESS_ITEMS";
539 0           $params->{count} = $MAX_PROCESS_ITEMS;
540             }
541              
542             # get list of IDs we can process
543 0           my $sql = sprintf(
544             "SELECT id FROM %s_queue
545             WHERE queue_name = ?
546             AND processed = 0
547             AND process_failure = 1
548             ORDER BY added ASC
549             LIMIT ?;", $self->{prefix}
550             );
551 0           my $ids = $self->_query_db( $sql, [ $qname, $params->{count} ] );
552 0           my @t;
553 0           foreach my $row ( @{ $ids->{rows} } ) {
  0            
554 0           push @t, "'$row->{id}'";
555             }
556              
557             # if there are no items to update, return
558 0 0         return $processed_count if ( !scalar(@t) );
559 0           my $id_list = join( ',', @t );
560              
561             # mark items that I am going to process
562 0           my $update = "SET processor=?
563             WHERE id IN ( $id_list) AND processed = 0 ;";
564 0           my $resp = $self->_update_db( $self->{prefix} . "_queue", $update, [ $self->_processor() ] );
565 0 0         return $processed_count if ( !$resp->{row_count} );
566              
567             # refetch the list to find out which ones we are going to process, incase another system was doing things
568             # at the same time
569 0           $sql = sprintf(
570             "SELECT * FROM %s_queue
571             WHERE queue_name = ?
572             AND processed = 0
573             AND processor = ?
574             AND process_failure = 1
575             ORDER BY added ASC
576             LIMIT ?;", $self->{prefix}
577             );
578 0           my $info = $self->_query_db( $sql, [ $qname, $self->_processor(), $params->{count} ] );
579              
580 0           foreach my $row ( @{ $info->{rows} } ) {
  0            
581              
582             # unpack the data
583 0           $row->{data} = decode_json( $row->{data} );
584              
585 0           my $state = 0;
586             try {
587 0     0     $state = $params->{callback}->( $self, $qname, $row );
588             }
589             catch {
590 0     0     warn "invalid callback $@";
591 0           };
592              
593             # we don't do anything else with the record, we assume that the callback
594             # function will have done something like delete it or re-write it
595             }
596              
597 0           return $processed_count;
598             }
599              
600             # -----------------------------------------------------------------------------
601              
602              
603             sub queue_size {
604 0     0 1   my $self = shift;
605 0           my $qname = shift;
606              
607             # # update queue list
608             # $self->list_queues();
609              
610             # # if the queue does not exist then it must be empty!
611             # return 0 if ( !$self->{_queue_list}->{$qname} );
612              
613 0           my $sql = sprintf(
614             "SELECT count(*) as count FROM %s_queue
615             WHERE queue_name = ?
616             AND processed = 0
617             AND process_failure = 0 ;", $self->{prefix}
618             );
619 0           my $resp = $self->_query_db( $sql, [ $qname ] );
620              
621 0 0         return $resp->{row_count} ? $resp->{rows}->[0]->{count} : 0;
622             }
623              
624             # -----------------------------------------------------------------------------
625              
626              
627             sub list_queues {
628 0     0 1   my $self = shift;
629 0           my %ques;
630              
631 0           my $result = $self->_query_db( sprintf( 'SELECT DISTINCT queue_name FROM %s_queue;', $self->{prefix} ) );
632              
633 0 0         if ( !$result->{error} ) {
634 0           %ques = map { $_->{queue_name} => 1 } @{ $result->{rows} };
  0            
  0            
635             }
636              
637 0           $self->_set_queue_list( \%ques );
638              
639 0           return [ keys %ques ];
640             }
641              
642             # -----------------------------------------------------------------------------
643              
644              
645             sub stats {
646 0     0 1   my $self = shift;
647 0           my $qname = shift;
648 0           my %all_stats = ();
649              
650             # update queue list
651 0           $self->list_queues();
652              
653             # queue_size also calls list_queues, so we don't need to do it!
654 0           $all_stats{unprocessed} = $self->queue_size($qname);
655              
656             # if the queue does not exist then it must be empty!
657 0 0         if ( $self->{_queue_list}->{$qname} ) {
658              
659 0           $all_stats{unprocessed} = $self->queue_size($qname);
660              
661 0           my $sql = sprintf(
662             "SELECT count(*) as count FROM %s_queue
663             WHERE queue_name = ?
664             AND processed = 1 ;", $self->{prefix}
665             );
666 0           my $resp = $self->_query_db( $sql, [$qname] );
667 0           $all_stats{processed} = $resp->{rows}->[0]->{count};
668              
669 0           $sql = sprintf(
670             "SELECT count(*) as count FROM %s_queue
671             WHERE queue_name = ?
672             AND processed = 0
673             AND process_failure = 1 ;", $self->{prefix}
674             );
675 0           $resp = $self->_query_db( $sql, [ $qname] );
676 0           $all_stats{failures} = $resp->{rows}->[0]->{count};
677              
678 0           $sql = sprintf(
679             "SELECT
680             min(process_failure) as min_process_failure,
681             max(process_failure) as max_process_failure,
682             avg(process_failure) as avg_process_failure,
683             min(added) as earliest_added,
684             max(added) as latest_added,
685             min( length(data)) as min_data_size,
686             max( length(data)) as max_data_size,
687             avg( length(data)) as avg_data_size,
688             min( processing_time) as min_elapsed,
689             max( processing_time) as max_elapsed,
690             avg( processing_time) as avg_elapsed
691             FROM %s_queue
692             WHERE queue_name = ?;", $self->{prefix}
693             );
694 0           $resp = $self->_query_db( $sql, [$qname] );
695              
696 0           foreach my $k ( keys %{ $resp->{rows}->[0] } ) {
  0            
697 0   0       $all_stats{$k} = $resp->{rows}->[0]->{$k} || "0";
698             }
699              
700             # number of records in the table
701 0           $all_stats{total_records} = $all_stats{processed} + $all_stats{unprocessed} + $all_stats{failures};
702 0   0       $all_stats{total_records} ||= '0';
703              
704             }
705              
706             # make sure hse things have a zero value so calculations don't fail
707 0           foreach my $f (
708             qw( unprocessed processed failures
709             max process_failure avg process_failure earliest_added latest_added
710             min_data_size max_data_size avg_data_size total_records
711             total_records min_proc max_proc avg_proc)
712             )
713             {
714 0   0       $all_stats{$f} ||= '0';
715             }
716              
717 0           return \%all_stats;
718             }
719              
720             # -----------------------------------------------------------------------------
721              
722              
723             sub delete_record {
724 0     0 1   my $self = shift;
725 0           my $data = shift;
726              
727 0           my $sql = sprintf( "WHERE id = ? AND queue_name = ?", $self->{prefix} );
728 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue", $sql, [ $data->{id}, $data->{queue_name} ] );
729              
730 0           return $resp->{row_count};
731             }
732              
733             # -----------------------------------------------------------------------------
734              
735              
736             sub reset_record {
737 0     0 1   my $self = shift;
738 0           my $data = shift;
739              
740 0           my $sql = "SET process_failure=0 WHERE id = ? AND queue_name = ? AND processed=0 AND process_failure > 0";
741 0           my $resp = $self->_update_db( $self->{prefix} . "_queue", $sql, [ $data->{id}, $data->{queue_name} ] );
742              
743 0           return $resp->{row_count};
744             }
745              
746             # -----------------------------------------------------------------------------
747              
748              
749             sub purge_queue {
750 0     0 1   my $self = shift;
751 0           my $qname = shift;
752              
753             # # update queue list
754             # $self->list_queues();
755              
756             # # if the queue does not exist then we cannot purge it!
757             # return 0 if ( !$self->{_queue_list}->{$qname} );
758              
759 0           my $sql = "WHERE processed=1 OR process_failure = 1";
760 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue", $sql );
761              
762 0           return $resp->{row_count};
763             }
764              
765             # -----------------------------------------------------------------------------
766              
767              
768             sub remove_queue {
769 0     0 1   my $self = shift;
770 0           my $qname = shift;
771              
772             # # update queue list
773             # $self->list_queues();
774              
775             # # if the queue does not exist
776             # return 0 if ( !$self->{_queue_list}->{$qname} );
777              
778 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue", "WHERE queue_name=?", [$qname] );
779 0           return $resp->{success};
780             }
781              
782             # -----------------------------------------------------------------------------
783              
784              
785             sub reset_failures {
786 0     0 1   my $self = shift;
787 0           my $qname = shift;
788              
789             # # update queue list
790             # $self->list_queues();
791              
792             # # if the queue does not exist
793             # return 0 if ( !$self->{_queue_list}->{$qname} );
794              
795 0           my $sql = "SET process_failure=0";
796 0           $sql .= sprintf( " WHERE queue_name = ? AND process_failure = 1", $self->{prefix} );
797 0           my $resp = $self->_update_db( $self->{prefix} . "_queue", $sql, [ $qname ] );
798              
799 0 0         return $resp->{row_count} ? $resp->{row_count} : 0 ;
800             }
801              
802             # -----------------------------------------------------------------------------
803              
804              
805             sub remove_failues {
806 0     0 1   my $self = shift;
807 0           my $qname = shift;
808              
809 0           my $sql = sprintf( "WHERE process_failure = 1", $self->{prefix} );
810 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue", $sql );
811              
812 0           return $resp->{row_count};
813             }
814              
815             # -----------------------------------------------------------------------------
816              
817              
818             sub remove_tables {
819 0     0 1   my $self = shift;
820              
821 0           my $sql = sprintf( 'DROP TABLE %s_queue;', $self->{prefix} );
822 0           $self->_debug($sql);
823 0           $self->{dbh}->do($sql);
824             }
825              
826             # -----------------------------------------------------------------------------
827              
828             1;
829              
830             __END__