File Coverage

blib/lib/App/Basis/Queue.pm
Criterion Covered Total %
statement 41 540 7.5
branch 0 204 0.0
condition 0 54 0.0
subroutine 14 64 21.8
pod 18 19 94.7
total 73 881 8.2


line stmt bran cond sub pod time code
1             # ABSTRACT: Simple database backed FIFO queues
2              
3             =head1 NAME
4              
5             App::Basis::Queue
6              
7             =head1 SYNOPSIS
8              
9             use App::Basis::Queue;
10              
11             my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
12             my $dbh = DBI->connect( $dsn, "", "",
13             { RaiseError => 1, PrintError => 0, } )
14             or die "Could not connect to DB $dsn" ;
15              
16             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
17              
18             # save some application audit data for later processing
19             $queue->add(
20             queue => '/invoice/pay',
21             data => {
22             ip => 12.12.12.12,
23             session_id => 12324324345,
24             client_id => 248296432984,
25             amount => 250.45,
26             reply => '/payments/made'
27             },
28             ) ;
29              
30             # in another process, we want to process that data
31              
32             use App::Basis::Queue;
33              
34             # for the example this will be paying an invoice
35             sub processing_callback {
36             my ( $queue, $qname, $record ) = @_;
37              
38             # call the payment system
39             # pay_money( $record->{client_id}, $record->{amount}) ;
40              
41             # chatter back that the payment has been made, assume it worked
42             $queue->pub( queue => $record->{reply},
43             data => {
44             client_id => $record->{ client_id},
45             success => 1,
46             }
47             ) ;
48             }
49              
50              
51             my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
52             my $dbh = DBI->connect( $dsn, "", "",
53             { RaiseError => 1, PrintError => 0, } )
54             or die "Could not connect to DB $dsn" ;
55             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
56             $queue->process(
57             queue => 'app_start',
58             count => 10,
59             callback => \&processing_callback
60             ) ;
61              
62             # for pubsub we do
63              
64             use App::Basis::Queue;
65              
66             my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
67             my $dbh = DBI->connect( $dsn, "", "",
68             { RaiseError => 1, PrintError => 0, } )
69             or die "Could not connect to DB $dsn" ;
70             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
71             # for a system that wants to know when servers have started
72             $queue->publish( queue => '/chat/helo', data => { host => 'abc, msg => 'helo world') ;
73              
74             # in another process
75              
76             use App::Basis::Queue;
77             my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
78             my $dbh = DBI->connect( $dsn, "", "",
79             { RaiseError => 1, PrintError => 0, } )
80             or die "Could not connect to DB $dsn" ;
81             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
82              
83              
84              
85              
86             =head1 DESCRIPTION
87              
88             Why have another queuing system? Well for me I wanted a queuing system that did not mean
89             I needed to install and maintain another server (ie RabbitMQ). Something that could run
90             against existing DBs (eg PostgreSQL). PGQ was an option, but as it throws away queued items
91             if there is not a listener, then this was useless! Some of the Job/Worker systems required you to create
92             classes and plugins to process the queue. Queue::DBI almost made the grade but only has one queue. Minon
93             maybe could do what was needed but I did not find it in time.
94              
95             I need multiple queues plus new requirement queue wildcards!
96              
97             So I created this simple/basic system. You need to expire items, clean the queue and do things like that by hand,
98             there is no automation. You process items in the queue in chunks, not via a nice iterator.
99              
100             There is no queue polling per se you need to process the queue and try again when all are done,
101             there can only be one consumer of a record which is a good thing, if you cannot process an item it can be marked as
102             failed to be handled by a cleanup function you will need to create.
103              
104             =head1 NOTES
105              
106             I would use msgpack instead of JSON to store the data, but processing BLOBS in PostgreSQL is tricky.
107              
108             To make the various inserts/queries work faster I cache the prepared statement handles against
109             a key and the fields that are being inserted, this speeds up the inserts roughly by 3x
110              
111             =head1 AUTHOR
112              
113             kmulholland, moodfarm@cpan.org
114              
115             =head1 VERSIONS
116              
117             v0.1 2013-08-02, initial work
118              
119             =head1 TODO
120              
121             Currently the processing functions only process the earliest MAX_PROCESS_ITEMS but
122             by making use of the counter in the info table, then we could procss the entire table
123             or at least a much bigger number and do it in chunks of MAX_PROCESS_ITEMS
124              
125             Processing could be by date
126              
127             Add a method to move processed items to queue_name/processed and failures to queue_name/failures or
128             add them to these queues when marking them as processed or failed, will need a number of other methods to
129             be updated but keeps less items in the unprocessed queue
130              
131             =head1 See Also
132              
133             L, L, L
134              
135             =head1 API
136              
137             =cut
138              
139             package App::Basis::Queue;
140             $App::Basis::Queue::VERSION = '000.400.000';
141 2     2   116702 use 5.10.0;
  2         8  
142 2     2   11 use feature 'state';
  2         4  
  2         178  
143 2     2   11 use strict;
  2         8  
  2         38  
144 2     2   15 use warnings;
  2         4  
  2         54  
145 2     2   3479 use Moo;
  2         49591  
  2         15  
146 2     2   7474 use MooX::Types::MooseLike::Base qw/InstanceOf HashRef Str/;
  2         13773  
  2         180  
147 2     2   2184 use JSON;
  2         32570  
  2         11  
148 2     2   1762 use Data::UUID;
  2         1509  
  2         134  
149 2     2   13 use Try::Tiny;
  2         4  
  2         109  
150 2     2   18 use POSIX qw( strftime);
  2         3  
  2         17  
151 2     2   110 use Time::HiRes qw(gettimeofday tv_interval );
  2         4  
  2         15  
152              
153             # use Data::Printer ;
154              
155             # -----------------------------------------------------------------------------
156              
157 2     2   231 use constant MSG_TASK => 'task';
  2         3  
  2         106  
158 2     2   10 use constant MSG_CHATTER => 'chatter';
  2         3  
  2         86  
159 2     2   21 use constant MAX_PROCESS_ITEMS => 100;
  2         4  
  2         17079  
160              
161             # -----------------------------------------------------------------------------
162             ## class initialisation
163             ## instancation variables
164             # -----------------------------------------------------------------------------
165              
166             has 'dbh' => (
167             is => 'ro',
168             isa => InstanceOf ['DBI::db']
169             );
170              
171             has 'prefix' => (
172             is => 'ro',
173             isa => Str,
174             default => sub { 'qsdb'; },
175             );
176              
177             has 'debug' => (
178             is => 'rw',
179             default => sub { 0; },
180             writer => 'set_debug'
181             );
182              
183             has 'skip_table_check' => (
184             is => 'ro',
185             default => sub { 0; },
186             );
187              
188             has 'subscriptions' => (
189             is => 'ro',
190             init_arg => 0,
191             default => sub { {} },
192             );
193              
194             # this is the number of events listened to
195             has 'ev_count' => (
196             is => 'ro',
197             init_arg => 0,
198             default => sub { {} },
199             );
200              
201             # when listening for chatter events we will wait for this many seconds
202             # before trying again
203             has 'listen_delay' => (
204             is => 'ro',
205             default => sub {1},
206             );
207              
208             # -----------------------------------------------------------------------------
209             # once the class in instanciated then we need to ensure that we have the
210             # tables created
211              
212             =head2 B
213              
214             Create a new instance of a queue
215              
216             prefix - set a prefix name of the tables, allows you to have dev/test/live versions in the same database
217             debug - set basic STDERR debugging on or off
218             skip_table_check - don't check to see if the tables need creating
219              
220             my $queue = App::Basis::Queue->new( dbh => $dbh ) ;
221              
222             =cut
223              
224             sub BUILD {
225 0     0 0   my $self = shift;
226              
227 0           $self->_set_db_type( $self->{dbh}->{Driver}->{Name} );
228 0 0         die("Valid Database connection required") if ( !$self->_db_type() );
229              
230             # if we are using sqlite then we need to set a pragma to allow
231             # cascading deletes on FOREIGN keys
232 0 0         if ( $self->_db_type() eq 'SQLite' ) {
233 0           $self->{dbh}->do("PRAGMA foreign_keys = ON");
234             }
235              
236             # ensue we have the tables created (if wanted)
237 0 0         $self->_create_tables() if ( !$self->skip_table_check );
238              
239             # get the first list of queues we have
240 0           $self->list_queues();
241             }
242              
243             # -----------------------------------------------------------------------------
244             # TODO: add a DEMOLISH method to clean up unprocessed items when the object
245             # handle goes out of scope
246              
247             # -----------------------------------------------------------------------------
248             ## class private variables
249             # -----------------------------------------------------------------------------
250              
251             has _queue_list => (
252             is => 'rwp', # like ro, but creates _set_queue_list too
253             lazy => 1,
254             default => sub { {} },
255             writer => '_set_queue_list',
256             init_arg => undef # dont allow setting in constructor ;
257             );
258              
259             has _db_type => (
260             is => 'rwp', # like ro, but creates _set_queue_list too
261             lazy => 1,
262             default => sub {''},
263             writer => '_set_db_type',
264             init_arg => undef # dont allow setting in constructor ;
265             );
266              
267             has _processor => (
268             is => 'ro',
269             lazy => 1,
270             default => sub {
271             my $hostname = `hostname`;
272             $hostname =~ s/\s//g;
273             $hostname . "::$ENV{USER}" . "::" . $$;
274             },
275             init_arg => undef # dont allow setting in constructor ;
276             );
277              
278             # -----------------------------------------------------------------------------
279             ## class private methods
280             # -----------------------------------------------------------------------------
281              
282             sub _debug {
283 0     0     my $self = shift;
284              
285 0 0         return if ( !$self->{debug} );
286              
287 0           my $msg = shift;
288 0           $msg =~ s/^/ /gsm;
289              
290 0           say STDERR $msg;
291             }
292              
293             # -----------------------------------------------------------------------------
294             sub _build_sql_stmt {
295 0     0     my ( $query, $p ) = @_;
296 0 0         our @params = $p ? @$p : ();
297 0           $query =~ s/\s+$//;
298 0 0         $query .= ' ;' if ( $query !~ /;$/ );
299              
300             # make sure we repesent NULL properly, do quoting - only basic its only for debug
301 0           our $i = 0;
302             {
303              
304 0           sub _repl {
305 0     0     my $out = 'NULL';
306              
307             # quote strings, leave numbers untouched, not doing floats
308 0 0         if ( defined $params[$i] ) {
309 0 0         $out = $params[$i] =~ /^\d+$/ ? $params[$i] : "'$params[$i]'";
310             }
311 0           $i++;
312              
313 0           return $out;
314             }
315 0 0 0       $query =~ s/\?/_repl/gex if ( @params && scalar(@params) );
  0            
316             }
317              
318 0           return $query;
319             }
320              
321             # -----------------------------------------------------------------------------
322             sub _query_db {
323 0     0     state $sth_map = {};
324 0           my $self = shift;
325 0           my ( $query, $p, $no_results ) = @_;
326 0 0         my @params = $p ? @$p : ();
327 0           my %result;
328              
329 0           $query =~ s/\s+$//;
330 0 0         $query .= ' ;' if ( $query !~ /;$/ );
331              
332 0 0         if ( $self->{debug} ) {
333              
334 0           $self->_debug(
335             "ACTUAL QUERY: $query\nQUERY PARAMS: " . to_json( \@params ) );
336 0           my $sql = _build_sql_stmt( $query, $p );
337 0           $self->_debug( 'BUILT QUERY : ' . $sql . "\n" );
338             }
339              
340             try {
341 0     0     my $sth;
342              
343             # key based on query and fields we are using
344 0           my $key = "$query." . join( '.', @params );
345 0 0         if ( $sth_map->{$key} ) {
346 0           $sth = $sth_map->{$key};
347             }
348             else {
349              
350 0           $sth = $self->{dbh}->prepare($query);
351              
352             # save the handle for next time
353 0           $sth_map->{$key} = $sth;
354              
355             }
356 0           my $rv = $sth->execute(@params);
357 0 0         if ( !$no_results ) {
358              
359             # so as to get an array of hashes
360 0           $result{rows} = $sth->fetchall_arrayref( {} );
361 0           $result{row_count} = scalar( @{ $result{rows} } );
  0            
362 0           $result{success} = 1;
363              
364             $self->_debug(
365 0           'QUERY RESPONSE: ' . to_json( $result{rows} ) . "\n" );
366             }
367             else {
368 0 0         if ($rv) {
369 0           $result{row_count} = $sth->rows;
370 0           $result{success} = 1;
371             }
372             }
373              
374             }
375             catch {
376             $result{error}
377 0     0     = "Failed to prepare/execute query: $query\nparams: "
378             . to_json($p)
379             . "\nerror: $@\n";
380              
381             # $self->_debug( $result{error} );
382 0           };
383 0           return \%result;
384             }
385              
386             # -----------------------------------------------------------------------------
387             sub _update_db {
388 0     0     my $self = shift;
389 0           my ( $table, $query, $params ) = @_;
390              
391 0           $query = "UPDATE $table $query";
392              
393 0           my $resp = $self->_query_db( $query, $params, 1 );
394              
395 0           return $resp;
396             }
397              
398             # -----------------------------------------------------------------------------
399             # we will hold onto statement handles to speed up inserts
400              
401             sub _insert_db {
402 0     0     state $sth_map = {};
403 0           my $self = shift;
404 0           my ( $table, $f, $p ) = @_;
405 0 0         my @params = $p ? @$p : ();
406              
407             # key based on table and fields we are inserting
408 0           my $key = "$table." . join( '.', @$f );
409 0           my ( $query, $sql, $sth );
410              
411 0 0         if ( $sth_map->{$key} ) {
412 0           $sth = $sth_map->{$key};
413             }
414             else {
415             $query
416             = "INSERT INTO $table ("
417             . join( ',', @$f )
418             . ") values ("
419 0           . join( ',', map {'?'} @$f ) . ") ;";
  0            
420              
421 0           $self->_debug($query);
422 0           $sth = $self->{dbh}->prepare($query);
423              
424             # cache the handle for next time
425 0           $sth_map->{$key} = $sth;
426             }
427 0           my $rv = $sth->execute(@params);
428              
429 0           return { row_count => $rv, error => 0 };
430             }
431              
432             # -----------------------------------------------------------------------------
433              
434             sub _delete_db_record {
435 0     0     my $self = shift;
436 0           my ( $table, $q, $v ) = @_;
437 0           my $query = "DELETE FROM $table $q ;";
438              
439             # run the delete and don't fetch results
440 0           my $resp = $self->_query_db( $query, $v, 1 );
441 0           return $resp;
442             }
443              
444             # -----------------------------------------------------------------------------
445             # as all the indexes are constructued the same, lets have a helper
446             sub _create_index_str {
447 0     0     my ( $table, $field ) = @_;
448              
449 0           return sprintf( "CREATE INDEX %s_%s_idx on %s(%s) ;",
450             $table, $field, $table, $field );
451             }
452              
453             # -----------------------------------------------------------------------------
454             sub _create_sqlite_table {
455 0     0     my $self = shift;
456 0           my ($table) = @_;
457 0           $self->_debug("Creating SQLite tables");
458              
459             # set WAL mode rather than the default DELETE as its faster
460 0     0     try { $self->{dbh}->do("PRAGMA journal mode = WAL;"); } catch {};
  0            
461              
462 0           my $sql = "CREATE TABLE $table (
463             counter INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
464             id VARCHAR(128) NOT NULL UNIQUE,
465             queue_name VARCHAR(128) NOT NULL,
466             msg_type VARCHAR(8),
467             persist BOOLEAN DEFAULT 0,
468             added TIMESTAMP DEFAULT current_timestamp,
469             processed BOOLEAN DEFAULT 0,
470             processor VARCHAR(128),
471             process_start TIMESTAMP,
472             processing_time FLOAT,
473             process_failure SMALLINT DEFAULT 0,
474             data TEXT ) ;";
475              
476 0           $self->_debug($sql);
477 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
478              
479             }
480              
481             # -----------------------------------------------------------------------------
482             sub _create_postgres_table {
483 0     0     my $self = shift;
484 0           my ($table) = @_;
485 0           $self->_debug("Creating PostgreSQL tables");
486              
487             # big/serial creates an auto incrementing column in PostgreSQL
488 0           my $sql = "CREATE TABLE $table (
489             counter BIGSERIAL PRIMARY KEY UNIQUE,
490             id VARCHAR(128) NOT NULL UNIQUE,
491             queue_name VARCHAR(128) NOT NULL,
492             msg_type VARCHAR(8),
493             persist BOOLEAN DEFAULT 0,
494             added TIMESTAMP WITH TIME ZONE DEFAULT now(),
495             processed SMALLINT DEFAULT 0,
496             processor VARCHAR(128),
497             process_start TIMESTAMP,
498             processing_time FLOAT,
499             process_failure SMALLINT DEFAULT 0,
500             data TEXT ) ;";
501              
502 0           $self->_debug($sql);
503 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
504             }
505              
506             # -----------------------------------------------------------------------------
507             sub _create_mysql_table {
508 0     0     my $self = shift;
509 0           my ($table) = @_;
510 0           $self->_debug("Creating MySQL tables");
511              
512 0           my $sql = "CREATE TABLE $table (
513             counter INT NOT NULL PRIMARY KEY AUTO_INCREMENT UNIQUE,
514             id VARCHAR(128) NOT NULL UNIQUE,
515             queue_name VARCHAR(128) NOT NULL,
516             msg_type VARCHAR(8),
517             persist BOOLEAN DEFAULT 0,
518             added TIMESTAMP DEFAULT current_timestamp,
519             processed SMALLINT DEFAULT 0,
520             processor VARCHAR(128),
521             process_start TIMESTAMP,
522             processing_time FLOAT,
523             process_failure SMALLINT DEFAULT 0,
524             data TEXT ) ;";
525              
526 0           $self->_debug($sql);
527 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
528              
529             }
530              
531             # -----------------------------------------------------------------------------
532             # create all the tables and indexes
533             sub _create_tables {
534 0     0     my $self = shift;
535 0           my $sql;
536 0           my $table = $self->{prefix} . '_queue';
537              
538             # as the checking for tables and indexes is fraught with issues
539             # over multiple databases its easier to not print the errors and
540             # catch the creation failures and ignore them!
541 0           my $p = $self->{dbh}->{PrintError};
542 0           $self->{dbh}->{PrintError} = 0;
543              
544             # I am assuming either table does not exist then nor does the
545             # other and we should create both
546 0 0         if ( $self->_db_type() eq 'SQLite' ) {
    0          
    0          
547 0           $self->_create_sqlite_table($table);
548             }
549             elsif ( $self->_db_type() eq 'Pg' ) {
550 0           $self->_create_postgres_table($table);
551             }
552             elsif ( $self->_db_type() eq 'mysql' ) {
553 0           $self->_create_mysql_table($table);
554             }
555             else {
556 0           die "Unhandled database type " . $self->_db_type();
557             }
558              
559 0           foreach my $field (
560             qw/counter id added queue_name msg_type persist processed process_failure/
561             )
562             {
563 0           my $sql = _create_index_str( $table, $field );
564              
565 0           $self->_debug($sql);
566 0     0     try { $self->{dbh}->do($sql); } catch {};
  0            
567             }
568              
569             # restore the PrintError setting
570 0           $self->{dbh}->{PrintError} = $p;
571             }
572              
573             # -----------------------------------------------------------------------------
574             # _add
575             # Add some data into a named queue. Could be a task or a chatter mesg
576             # * This does not handle wildcard queues *
577              
578             sub _add {
579 0     0     state $uuid = Data::UUID->new();
580 0           my $self = shift;
581 0 0         my $params = @_ % 2 ? shift : {@_};
582              
583 0 0         if ( ref($params) ne 'HASH' ) {
584 0           warn "_add accepts a hash or a hashref of parameters";
585 0           return 0;
586             }
587              
588             # to keep what was here before the change to the parameters
589 0           my $qname = $params->{queue};
590 0           my $msg_type = $params->{type};
591 0           my $persist = $params->{persist};
592 0           my $data = $params->{data};
593              
594 0 0         if ( ref($data) ne 'HASH' ) {
595 0           warn "_add data parameter must be a hashref";
596 0           return 0;
597             }
598 0           my $status = 0;
599 0           my $resp;
600 0 0 0       if ( !$qname || !$data ) {
601 0           my $err = "Missing queue name or data";
602 0           $self->_debug($err);
603 0           warn $err;
604 0           return $status;
605             }
606 0 0         if ( $qname =~ /\*/ ) {
607 0           my $err = "Bad queue name, cannot contain '*'";
608 0           $self->_debug($err);
609 0           warn $err;
610 0           return $status;
611             }
612              
613             try {
614 0     0     my $json_str = encode_json($data);
615              
616             # we manage the id's for the queue entries as we cannot depend
617             # on a common SQL method of adding a record and getting its uniq ID back
618              
619 0           my $message_id = $uuid->create_b64();
620             $resp = $self->_insert_db(
621 0           $self->{prefix} . '_queue',
622             [qw(id queue_name added data msg_type persist)],
623             [ $message_id, $qname,
624             strftime( "%Y-%m-%d %H:%M:%S", localtime() ),
625             $json_str, $msg_type, $persist
626             ]
627             );
628              
629 0 0         $status = $message_id if ( !$resp->{error} );
630             }
631             catch {
632 0     0     my $e = $@;
633 0           warn $e;
634 0           };
635              
636 0           return $status;
637             }
638              
639             # -----------------------------------------------------------------------------
640              
641             =head2 add
642              
643             Add task data into a named queue. This creates a 'task' that needs to be processed.
644              
645             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
646              
647             # save some application audit data
648             $queue->add(
649             queue => 'app_start',
650             data => {
651             ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
652             appid => 2, app_name => 'twitter'
653             },
654             ) ;
655              
656             * This does not handle wildcard queues *
657              
658             =head3 queue
659              
660             name of the queue
661              
662             =head3 data
663              
664             data to store against the queue, can be a scalar, hashref or arrayref
665              
666             =cut
667              
668             sub add {
669 0     0 1   my $self = shift;
670 0 0         my $params = @_ % 2 ? shift : {@_};
671              
672 0 0         if ( ref($params) ne 'HASH' ) {
673 0           warn "add accepts a hash or a hashref of parameters";
674 0           return 0;
675             }
676 0           $params->{type} = MSG_TASK;
677 0           $params->{persist} = 0;
678              
679 0           return $self->_add($params);
680             }
681              
682             # -----------------------------------------------------------------------------
683             # try and find a match for the qname, replace SQL wildcard with perl ones
684              
685             sub _valid_qname {
686 0     0     my $self = shift;
687 0           my ($qname) = @_;
688              
689             # update queue list
690 0           $self->list_queues();
691              
692 0           $qname =~ s/%/*/g;
693 0 0         my $wild = ( $qname =~ /\*/ ) ? 1 : 0;
694              
695 0           my $match = 0;
696 0           foreach my $q ( keys %{ $self->{_queue_list} } ) {
  0            
697 0 0 0       if ( ( $wild && $q =~ $qname ) || $self->{_queue_list}->{$qname} ) {
      0        
698 0           $match++;
699 0           last;
700             }
701             }
702              
703 0           return $match;
704             }
705              
706             # -----------------------------------------------------------------------------
707              
708             =head2 process
709              
710             process up to 100 tasks from the name queue(s)
711              
712             a reference to the queue object is passed to the callback along with the name of
713             the queue and the record that is to be procssed.
714              
715             If the callback returns a non-zero value then the record will be marked as processed.
716             If the callback returns a zero value, then the processing is assumed to have failed
717             and the failure count will be incremented by 1. If the failue count matches our
718             maximum allowed limit then the item will not be available for any further processing.
719              
720             sub processing_callback {
721             my ( $queue, $qname, $record ) = @_;
722              
723             return 1;
724             }
725              
726             $queue->process(
727             queue => 'queue_name',
728             count => 5,
729             callback => \&processing_callback
730             ) ;
731              
732             qname can contain wildcards and all matching queues will be scanned
733              
734             # add things to different queues, but with a common root
735             $queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
736             $queue->add( queue => '/celestial/planets', data => { list: [ "earth", "pluto", "mars"]}) ;
737              
738             # process all the 'celestial' bodies queues
739             $queue->process( queue => '/celestial/*', count => 5, callback => \&processing_callback) ;
740              
741             =cut
742              
743             sub process {
744 0     0 1   my $self = shift;
745 0 0         my $params = @_ % 2 ? shift : {@_};
746              
747 0 0         if ( ref($params) ne 'HASH' ) {
748 0           warn "process accepts a hash or a hashref of parameters";
749 0           return 0;
750             }
751              
752 0           my $processed_count = 0;
753 0           my $qname = $params->{queue};
754              
755             # if the queue does not exist
756 0 0         return 0 if ( !$self->_valid_qname($qname) );
757              
758             # switch to SQL wildcard
759 0           $qname =~ s/\*/%/g;
760              
761 0   0       $params->{count} ||= 1;
762             die __PACKAGE__ . " process requires a callback function"
763 0 0 0       if ( !$params->{callback} || ref( $params->{callback} ) ne 'CODE' );
764              
765 0 0         if ( $params->{count} > MAX_PROCESS_ITEMS ) {
766 0           warn "Reducing process count from $params->{count} to "
767             . MAX_PROCESS_ITEMS;
768 0           $params->{count} = MAX_PROCESS_ITEMS;
769             }
770              
771             # get list of IDs we can process, as SQLite has an issue
772             # with ORDER BY and LIMIT in an UPDATE call so we have to do things
773             # in 2 stages, which means it is not easy to mark lots of records
774             # to be processed but that its possibly a good thing
775             my $sql = sprintf(
776             "SELECT id FROM %s_queue
777             WHERE queue_name LIKE ?
778             AND processed = 0
779             AND process_failure = 0
780             AND msg_type = ?
781             ORDER BY added ASC
782             LIMIT ?;", $self->{prefix}
783 0           );
784             my $ids
785 0           = $self->_query_db( $sql, [ $qname, MSG_TASK, $params->{count} ] );
786 0           my @t;
787 0           foreach my $row ( @{ $ids->{rows} } ) {
  0            
788 0           push @t, "'$row->{id}'";
789             }
790              
791             # if there are no items to update, return
792 0 0         return $processed_count if ( !scalar(@t) );
793 0           my $id_list = join( ',', @t );
794              
795             # mark items that I am going to process
796 0           my $update = "SET processor=?
797             WHERE id IN ( $id_list) AND processed = 0 ;";
798 0           my $resp = $self->_update_db( $self->{prefix} . "_queue",
799             $update, [ $self->_processor() ] );
800 0 0         return $processed_count if ( !$resp->{row_count} );
801              
802             # refetch the list to find out which ones we are going to process,
803             # in case another system was doing things at the same time
804             $sql = sprintf(
805             "SELECT * FROM %s_queue
806             WHERE queue_name LIKE ?
807             AND processed = 0
808             AND processor = ?
809             AND process_failure = 0
810             AND msg_type = ?
811             ORDER BY added ASC
812             LIMIT ?;", $self->{prefix}
813 0           );
814             my $info = $self->_query_db( $sql,
815 0           [ $qname, $self->_processor(), MSG_TASK, $params->{count} ] );
816              
817 0           foreach my $row ( @{ $info->{rows} } ) {
  0            
818              
819             # unpack the data
820 0           $row->{data} = decode_json( $row->{data} );
821 0           my $state = 0;
822 0           my $start = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
823 0           my $st = [gettimeofday];
824 0           my $invalid = 0;
825 0           my $elapsed;
826             try {
827 0     0     $state = $params->{callback}->( $self, $qname, $row );
828             }
829             catch {
830 0     0     warn "process: error in callback $@";
831 0           $invalid++;
832 0           };
833 0           $elapsed = tv_interval($st);
834              
835 0 0         if ($invalid) {
    0          
836              
837             # if the callback was invalid then we should not mark this
838             # as a process failure just clear the processor
839 0           $update = "SET processor=?, WHERE id = ? AND processed = 0 ;";
840             $info = $self->_update_db( $self->{prefix} . "_queue",
841 0           $update, [ '', $row->{id} ] );
842             }
843             elsif ($state) {
844              
845             # show we have processed it
846 0           $update
847             = "SET processed=1, process_start=?, processing_time=? WHERE id = ? AND processed = 0 ;";
848             $info = $self->_update_db( $self->{prefix} . "_queue",
849 0           $update, [ $start, $elapsed, $row->{id} ] );
850 0           $processed_count++;
851             }
852             else {
853             # mark the failure
854 0           $update
855             = "SET process_failure=1, processing_time=? WHERE id = ? AND processed = 0 ;";
856             $info = $self->_update_db( $self->{prefix} . "_queue",
857 0           $update, [ $elapsed, $row->{id} ] );
858             }
859             }
860              
861 0           return $processed_count;
862             }
863              
864             # -----------------------------------------------------------------------------
865              
866             =head2 process_failures
867              
868             process up to 100 tasks from the queue
869             a refrence to the queue object is passed to the callback along with the name of the queue
870             and the record that is to be procssed. As these are failures we are not interested
871             in an value of the callback function.
872              
873             sub processing_failure_callback {
874             my ( $queue, $qname, $record ) = @_;
875              
876             # items before 2013 were completely wrong so we can delete
877             if( $record->{added} < '2013-01-01') {
878             $queue->delete_record( $record) ;
879             } else {
880             # failures in 2013 was down to a bad processing function
881             $queue->reset_record( $record) ;
882             }
883             }
884              
885             $queue->process(
886             queue => 'queue_name',
887             count => 5,
888             callback => \&processing_failure_callback
889             ) ;
890              
891             # again we can use wildcards here for queue names
892              
893             # add things to different queues, but with a common root
894             $queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
895             $queue->add( queue => '/celestial/planets', data => { list: [ "moon", "pluto", "mars"]}) ;
896             # process, obviously 'moon' will fail our planet processing
897             $queue->process(
898             queue => 'queue_name',
899             count => 5,
900             callback => \&processing_callback
901             ) ;
902              
903             # process all the 'celestial' bodies queues for failures - probably will just have the moon in it
904             $queue->process_failures(
905             queue => '/celestial/*',
906             count => 5,
907             callback => \&processing_failure_callback
908             ) ;
909              
910             =cut
911              
912             sub process_failures {
913 0     0 1   my $self = shift;
914 0 0         my $params = @_ % 2 ? shift : {@_};
915              
916 0 0         if ( ref($params) ne 'HASH' ) {
917 0           warn "process_failures accepts a hash or a hashref of parameters";
918 0           return 0;
919             }
920              
921 0           my $qname = $params->{queue};
922              
923 0           my $processed_count = 0;
924              
925             # switch to SQL wildcard
926 0           $qname =~ s/\*/%/g;
927              
928 0 0         return 0 if ( !$self->_valid_qname($qname) );
929              
930 0   0       $params->{count} ||= 1;
931             die __PACKAGE__ . " process requires a callback function"
932 0 0 0       if ( !$params->{callback} || ref( $params->{callback} ) ne 'CODE' );
933              
934 0 0         if ( $params->{count} > MAX_PROCESS_ITEMS ) {
935 0           warn "Reducing process_failures count from $params->{count} to"
936             . MAX_PROCESS_ITEMS;
937 0           $params->{count} = MAX_PROCESS_ITEMS;
938             }
939              
940             # get list of IDs we can process
941             my $sql = sprintf(
942             "SELECT id FROM %s_queue
943             WHERE queue_name LIKE ?
944             AND processed = 0
945             AND process_failure = 1
946             AND msg_type = ?
947             ORDER BY added ASC
948             LIMIT ?;", $self->{prefix}
949 0           );
950             my $ids
951 0           = $self->_query_db( $sql, [ $qname, MSG_TASK, $params->{count} ] );
952 0           my @t;
953 0           foreach my $row ( @{ $ids->{rows} } ) {
  0            
954 0           push @t, "'$row->{id}'";
955             }
956              
957             # if there are no items to update, return
958 0 0         return $processed_count if ( !scalar(@t) );
959 0           my $id_list = join( ',', @t );
960              
961             # mark items that I am going to process
962 0           my $update = "SET processor=?
963             WHERE id IN ( $id_list) AND processed = 0 ;";
964 0           my $resp = $self->_update_db( $self->{prefix} . "_queue",
965             $update, [ $self->_processor() ] );
966 0 0         return $processed_count if ( !$resp->{row_count} );
967              
968             # refetch the list to find out which ones we are going to process,
969             # in case another system was doing things at the same time
970             $sql = sprintf(
971             "SELECT * FROM %s_queue
972             WHERE queue_name LIKE ?
973             AND processed = 0
974             AND processor = ?
975             AND process_failure = 1
976             AND msg_type = ?
977             ORDER BY added ASC
978             LIMIT ?;", $self->{prefix}
979 0           );
980             my $info = $self->_query_db( $sql,
981 0           [ $qname, $self->_processor(), MSG_TASK, $params->{count} ] );
982              
983 0           foreach my $row ( @{ $info->{rows} } ) {
  0            
984              
985             # unpack the data
986 0           $row->{data} = decode_json( $row->{data} );
987              
988 0           my $state = 0;
989             try {
990 0     0     $state = $params->{callback}->( $self, $qname, $row );
991             }
992             catch {
993 0     0     warn "process_failures: error in callback $@";
994 0           };
995              
996             # we don't do anything else with the record, we assume that the callback
997             # function will have done something like delete it or re-write it
998             }
999              
1000 0           return $processed_count;
1001             }
1002              
1003             # -----------------------------------------------------------------------------
1004              
1005             =head2 queue_size
1006              
1007             get the count of unprocessed TASK items in the queue
1008              
1009             my $count = $queue->queue_size( queue => 'queue_name') ;
1010             say "there are $count unprocessed items in the queue" ;
1011              
1012             # queue size can manage wildcards
1013             $queue->queue_size( queue => '/celestial/*') ;
1014              
1015             =cut
1016              
1017             sub queue_size {
1018 0     0 1   my $self = shift;
1019 0 0         my $params = @_ % 2 ? shift : {@_};
1020              
1021 0 0         if ( ref($params) ne 'HASH' ) {
1022 0           warn "queue_size accepts a hash or a hashref of parameters";
1023 0           return 0;
1024             }
1025 0           my ($qname) = $params->{queue};
1026              
1027             # switch to SQL wildcard
1028 0           $qname =~ s/\*/%/g;
1029              
1030             my $sql = sprintf(
1031             "SELECT count(*) as count FROM %s_queue
1032             WHERE queue_name LIKE ?
1033             AND processed = 0
1034             AND process_failure = 0
1035             AND msg_type = ? ;", $self->{prefix}
1036 0           );
1037 0           my $resp = $self->_query_db( $sql, [ $qname, MSG_TASK ] );
1038              
1039 0 0         return $resp->{row_count} ? $resp->{rows}->[0]->{count} : 0;
1040             }
1041              
1042             # -----------------------------------------------------------------------------
1043              
1044             =head2 list_queues
1045              
1046             obtains a list of all the queues used by this database
1047              
1048             my $qlist = $queue->list_queues() ;
1049             foreach my $q (@$qlist) {
1050             say $q ;
1051             }
1052              
1053             =cut
1054              
1055             sub list_queues {
1056 0     0 1   my $self = shift;
1057 0           my %ques;
1058              
1059             my $result = $self->_query_db(
1060             sprintf( 'SELECT DISTINCT queue_name FROM %s_queue;',
1061             $self->{prefix} )
1062 0           );
1063              
1064 0 0         if ( !$result->{error} ) {
1065 0           %ques = map { $_->{queue_name} => 1 } @{ $result->{rows} };
  0            
  0            
1066             }
1067              
1068 0           $self->_set_queue_list( \%ques );
1069              
1070 0           return [ keys %ques ];
1071             }
1072              
1073             # -----------------------------------------------------------------------------
1074              
1075             =head2 stats
1076              
1077             obtains stats about the task data in the queue, this may be time/processor intensive
1078             so use with care!
1079              
1080             provides counts of unprocessed, processed, failures
1081             max process_failure, avg process_failure, earliest_added, latest_added,
1082             min_data_size, max_data_size, avg_data_size, total_records
1083             avg_elapsed, max_elapsed, min_elapsed
1084              
1085             my $stats = $queue->stats( queue => 'queue_name') ;
1086             say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;
1087              
1088             # for all matching wildcard queues
1089             my $all_stats = $queue->stats( queue => '/celestial/*') ;
1090              
1091             =cut
1092              
1093             sub stats {
1094 0     0 1   my $self = shift;
1095 0 0         my $params = @_ % 2 ? shift : {@_};
1096              
1097 0 0         if ( ref($params) ne 'HASH' ) {
1098 0           warn "stats accepts a hash or a hashref of parameters";
1099 0           return {};
1100             }
1101 0           my ($qname) = $params->{queue};
1102 0           my %all_stats = ();
1103              
1104             # update queue list
1105 0           $self->list_queues();
1106              
1107             # switch to SQL wildcard
1108 0           $qname =~ s/%/*/g;
1109              
1110             # work through all the queues and only count that match our qname
1111 0           foreach my $q ( keys %{ $self->{_queue_list} } ) {
  0            
1112 0 0         next if ( !$self->_valid_qname($q) );
1113 0 0 0       next if ( ( $qname =~ /\*/ && $qname !~ $q ) || $qname ne $q );
      0        
1114              
1115             # queue_size also calls list_queues, so we don't need to do it!
1116 0           $all_stats{unprocessed} += $self->queue_size( queue => $q);
1117              
1118             my $sql = sprintf(
1119             "SELECT count(*) as count
1120             FROM %s_queue
1121             WHERE queue_name = ?
1122             AND msg_type = ?
1123             AND processed = 1 ;", $self->{prefix}
1124 0           );
1125 0           my $resp = $self->_query_db( $sql, [ $q, MSG_TASK ] );
1126 0   0       $all_stats{processed} += $resp->{rows}->[0]->{count} || 0;
1127              
1128             $sql = sprintf(
1129             "SELECT count(*) as count FROM %s_queue
1130             WHERE queue_name = ?
1131             AND processed = 0
1132             AND msg_type = ?
1133             AND process_failure = 1 ;", $self->{prefix}
1134 0           );
1135 0           $resp = $self->_query_db( $sql, [ $q, MSG_TASK ] );
1136 0   0       $all_stats{failures} += $resp->{rows}->[0]->{count} || 0;
1137             }
1138              
1139             # get all the stats for all matching queues
1140             my $sql = sprintf(
1141             "SELECT
1142             min(process_failure) as min_process_failure,
1143             max(process_failure) as max_process_failure,
1144             avg(process_failure) as avg_process_failure,
1145             min(added) as earliest_added,
1146             max(added) as latest_added,
1147             min( length(data)) as min_data_size,
1148             max( length(data)) as max_data_size,
1149             avg( length(data)) as avg_data_size,
1150             min( processing_time) as min_elapsed,
1151             max( processing_time) as max_elapsed,
1152             avg( processing_time) as avg_elapsed
1153             FROM %s_queue
1154             WHERE queue_name LIKE ?
1155             AND msg_type = ? ;", $self->{prefix}
1156 0           );
1157 0           my $resp = $self->_query_db( $sql, [ $qname, MSG_TASK ] );
1158              
1159 0           foreach my $k ( keys %{ $resp->{rows}->[0] } ) {
  0            
1160 0 0         if ( $k =~ /_added/ ) {
1161              
1162             }
1163             else {
1164 0   0       $all_stats{$k} += $resp->{rows}->[0]->{$k} || "0";
1165             }
1166             }
1167              
1168             # number of records in the table
1169             $all_stats{total_records}
1170             = ( $all_stats{processed} // 0 )
1171             + ( $all_stats{unprocessed} // 0 )
1172 0   0       + ( $all_stats{failures} // 0 );
      0        
      0        
1173 0   0       $all_stats{total_records} ||= '0';
1174              
1175             # make sure these things have a zero value so calculations don't fail
1176 0           foreach my $f (
1177             qw( unprocessed processed failures
1178             max process_failure avg process_failure earliest_added latest_added
1179             min_data_size max_data_size avg_data_size total_records
1180             total_records min_proc max_proc avg_proc)
1181             )
1182             {
1183 0   0       $all_stats{$f} ||= '0';
1184             }
1185              
1186 0           return \%all_stats;
1187             }
1188              
1189             # -----------------------------------------------------------------------------
1190              
1191             =head2 delete_record
1192              
1193             delete a single task record from the queue
1194             requires a data record which contains infomation we will use to determine the record
1195              
1196             may be used in processing callback functions
1197              
1198             sub processing_callback {
1199             my ( $queue, $qname, $record ) = @_;
1200              
1201             # lets remove records before 2013
1202             if( $record->{added) < '2013-01-01') {
1203             $queue->delete_record( $record) ;
1204             }
1205             return 1 ;
1206             }
1207              
1208             * This does not handle wildcard queues *
1209              
1210             =cut
1211              
1212             sub delete_record {
1213 0     0 1   my $self = shift;
1214 0           my ($data) = @_;
1215              
1216 0           my $sql = "WHERE id = ?
1217             AND queue_name = ?
1218             AND msg_type = ?";
1219             my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
1220 0           $sql, [ $data->{id}, $data->{queue_name}, MSG_TASK ] );
1221              
1222 0           return $resp->{row_count};
1223             }
1224              
1225             # -----------------------------------------------------------------------------
1226              
1227             =head2 reset_record
1228              
1229             clear failure flag from a failed task record
1230             requires a data record which contains infomation we will use to determine the record
1231              
1232             may be used in processing callback functions
1233              
1234             sub processing_callback {
1235             my ( $queue, $qname, $record ) = @_;
1236              
1237             # allow partially failed (and failed) records to be processed
1238             if( $record->{process_failure) {
1239             $queue->reset_record( $record) ;
1240             }
1241             return 1 ;
1242             }
1243              
1244             * This does not handle wildcard queues *
1245              
1246             =cut
1247              
1248             sub reset_record {
1249 0     0 1   my $self = shift;
1250 0           my ($data) = @_;
1251              
1252 0           my $sql = "SET process_failure=0
1253             WHERE id = ?
1254             AND queue_name = ?
1255             AND processed=0
1256             AND process_failure > 0
1257             AND msg_type = ?";
1258             my $resp = $self->_update_db( $self->{prefix} . "_queue",
1259 0           $sql, [ $data->{id}, $data->{queue_name}, MSG_TASK ] );
1260              
1261 0           return $resp->{row_count};
1262             }
1263              
1264             # -----------------------------------------------------------------------------
1265              
1266             =head2 publish
1267              
1268             Publish some chatter data into a named queue.
1269              
1270             arguments
1271              
1272             queue - the name of the queue to publish a chatter to
1273             data - hashref of data to be stored
1274              
1275             optional arguments
1276              
1277             persist - 0|1 flag that this message is to be the most recent persistent one
1278              
1279             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
1280              
1281             # keep track of a bit of info
1282             $queue->publish( queue => 'app_log',
1283             data => {
1284             ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
1285             appid => 2, app_name => 'twitter'
1286             }
1287             ) ;
1288              
1289             * This does not handle wildcard queues *
1290              
1291             =cut
1292              
1293             sub publish {
1294 0     0 1   my $self = shift;
1295 0 0         my $params = @_ % 2 ? shift : {@_};
1296              
1297 0 0         if ( ref($params) ne 'HASH' ) {
1298 0           warn "publish accepts a hash or a hashref of parameters";
1299 0           return 0;
1300             }
1301 0           $params->{type} = MSG_CHATTER;
1302              
1303             # make sure this is a zero or one value
1304 0           $params->{persist} = defined $params->{persist};
1305              
1306 0           return $self->_add($params);
1307             }
1308              
1309             # -----------------------------------------------------------------------------
1310             # find the most recent persistent item
1311             # queue is the only parameter, returns arrayref of items
1312              
1313             sub _recent_persist {
1314 0     0     my $self = shift;
1315 0 0         my $params = @_ % 2 ? shift : {@_};
1316              
1317 0 0         if ( ref($params) ne 'HASH' ) {
1318 0           warn "_recent_persist accepts a hash or a hashref of parameters";
1319 0           return [];
1320             }
1321 0           my $qname = $params->{queue};
1322 0           my @data;
1323              
1324             # if the queue does not exist
1325 0 0         return [] if ( !$self->_valid_qname($qname) );
1326              
1327             # switch to SQL wildcard
1328 0           $qname =~ s/\*/%/g;
1329              
1330             # find the most recent persistent items for each matching queue
1331             my $sql = sprintf(
1332             "SELECT * FROM %s_queue a
1333             WHERE a.queue_name LIKE ?
1334             AND a.msg_type = ?
1335             AND a.persist = ?
1336             AND a.counter NOT IN ( SELECT counter from %s_queue b
1337             WHERE b.queue_name = a.queue_name
1338             AND b.msg_type = a.msg_type
1339             AND b.persist = a.persist
1340             AND b.added > a.added
1341             )
1342             GROUP BY queue_name
1343             ORDER BY queue_name;", $self->{prefix}, $self->{prefix}
1344 0           );
1345              
1346             # there should only be one persist item
1347 0           my $result = $self->_query_db( $sql, [ $qname, MSG_CHATTER, 1 ] );
1348              
1349 0           foreach my $row ( @{ $result->{rows} } ) {
  0            
1350 0           $row->{data} = decode_json( $row->{data} ); # unpack the data
1351 0           push @data, $row;
1352             }
1353              
1354 0           return \@data;
1355             }
1356              
1357             # -----------------------------------------------------------------------------
1358             # get chatter data (ordered by datetime added) after a unix time
1359             # queue is the only parameter,
1360             # returns arrayref of items
1361              
1362             sub _recent_chatter {
1363 0     0     my $self = shift;
1364 0 0         my $params = @_ % 2 ? shift : {@_};
1365              
1366 0 0         if ( ref($params) ne 'HASH' ) {
1367 0           warn "_recent_chatter accepts a hash or a hashref of parameters";
1368 0           return [];
1369             }
1370              
1371 0           my $qname = $params->{queue};
1372 0           my @data;
1373              
1374             # if the queue does not exist
1375 0 0         return [] if ( !$self->_valid_qname($qname) );
1376              
1377             # switch to SQL wildcard
1378 0           $qname =~ s/\*/%/g;
1379              
1380 0           my $result;
1381             my $sql;
1382              
1383 0 0         if ( $params->{counter} ) {
1384             $sql = sprintf(
1385             "SELECT * FROM %s_queue
1386             WHERE queue_name LIKE ?
1387             AND msg_type = ?
1388             AND counter > ?
1389             GROUP BY queue_name
1390             ORDER BY counter;", $self->{prefix}
1391 0           );
1392              
1393             $result = $self->_query_db( $sql,
1394 0           [ $qname, MSG_CHATTER, $params->{counter} ] );
1395              
1396             }
1397             else {
1398             # check by date
1399             $sql = sprintf(
1400             "SELECT * FROM %s_queue
1401             WHERE queue_name LIKE ?
1402             AND msg_type = ?
1403             AND added >= ?
1404             ORDER BY counter;", $self->{prefix}
1405 0           );
1406              
1407             $result = $self->_query_db( $sql,
1408 0   0       [ $qname, MSG_CHATTER, $params->{after} // 0 ] );
1409             }
1410              
1411 0           foreach my $row ( @{ $result->{rows} } ) {
  0            
1412 0           $row->{data} = decode_json( $row->{data} ); # unpack the data
1413 0           push @data, $row;
1414             }
1415              
1416 0           return \@data;
1417             }
1418              
1419             # -----------------------------------------------------------------------------
1420              
1421             =head2 subscribe
1422              
1423             Subscribe to a named queue with a callback.
1424              
1425             arguments
1426              
1427             queue - the name of the queue to listen to, wildcards allowed
1428             callback - function to handle any matced events
1429              
1430             optional arguments
1431              
1432             after - unix time after which to listen for events, defaults to now,
1433             if set will skip persistent item checks
1434             persist - include the most recent persistent item, if using a wild card, this
1435             will match all the queues and could find multiple persistent items
1436              
1437             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
1438              
1439             # keep track of a bit of info
1440             $queue->subscribe( queue => 'app_logs/*', callback => \&handler) ;
1441             $queue->listen() ;
1442              
1443             =cut
1444              
1445             sub subscribe {
1446 0     0 1   my $self = shift;
1447 0 0         my $params = @_ % 2 ? shift : {@_};
1448              
1449 0 0         if ( ref($params) ne 'HASH' ) {
1450 0           warn "subscribe accepts a hash or a hashref of parameters";
1451 0           return 0;
1452             }
1453              
1454 0 0         if ( !$params->{queue} ) {
1455 0           warn "subscribe needs a queue name to listen to";
1456 0           return 0;
1457             }
1458              
1459 0 0         if ( ref( $params->{callback} ) ne 'CODE' ) {
1460 0           warn "subscribe needs a callback handler to send events to";
1461 0           return 0;
1462             }
1463              
1464             # add to our current subscriptions
1465              
1466 0 0         if ( $params->{after} ) {
1467              
1468             # we cannot get recent persist item if they want to check after a date
1469 0           $params->{persist} = 0;
1470             }
1471              
1472 0 0         if ( !defined $params->{after} ) {
    0          
    0          
1473 0           $params->{after} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1474             }
1475             elsif ( $params->{after} =~ /^\d+$/ ) {
1476             $params->{after}
1477 0           = strftime( "%Y-%m-%d %H:%M:%S", localtime( $params->{after} ) );
1478             }
1479             elsif ( $params->{after} !~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/ ) {
1480 0           warn(
1481             "this does not look like a datetime value I can use: '$params->{after}'"
1482             );
1483 0           $params->{after} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1484             }
1485              
1486             $self->{subscriptions}->{ $params->{queue} } = {
1487             callback => $params->{callback},
1488              
1489             # when do we want events from
1490             after => $params->{after},
1491             persist => $params->{persist},
1492 0           ev_count => 0,
1493             counter => 0
1494             };
1495             }
1496              
1497             # -----------------------------------------------------------------------------
1498              
1499             =head2 listen
1500              
1501             Listen to all subcribed channels. Loops forever unless told to stop.
1502             If there are any persistent messages, this will be passed to the callbacks first.
1503              
1504             optional arguments
1505              
1506             events - minimum number of events to listen for, stop after this many,
1507             may stop after more - this is across ALL the subscriptions
1508             datetime - unix epoch time when to stop listening, ie based on time()
1509              
1510             returns
1511             number of chatter events actually passed to ALL the handlers
1512              
1513             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
1514             $queue->subscribe( '/logs/*', \&handler) ;
1515             $queue->listen() ; # listening forever
1516              
1517             =cut
1518              
1519             sub listen {
1520 0     0 1   my $self = shift;
1521 0 0         my $params = @_ % 2 ? shift : {@_};
1522              
1523 0 0         if ( ref($params) ne 'HASH' ) {
1524 0           warn "listen accepts a hash or a hashref of parameters";
1525 0           return 0;
1526             }
1527              
1528 0 0         if ( !keys %{ $self->{subscriptions} } ) {
  0            
1529 0           warn "you have not subscribed to any queues";
1530 0           return 0;
1531             }
1532              
1533 0           $self->{ev_count} = 0;
1534              
1535             # clean things up before we listen
1536 0           foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
  0            
1537 0           my $subs = $self->{subscriptions}->{$qmatch};
1538 0           $subs->{counter} = 0;
1539 0           $subs->{ev_count} = 0;
1540             }
1541              
1542             # loop forever unless there is a reason to stop
1543 0           my $started = 0;
1544 0           while (1) {
1545              
1546 0           foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
  0            
1547 0           my $subs = $self->{subscriptions}->{$qmatch};
1548              
1549             # we may not want the most recent persistent record
1550 0 0 0       next if ( !$started && !$subs->{persist} );
1551              
1552 0           my $items;
1553 0 0         if ( !$started ) {
1554 0           $items = $self->_recent_persist( queue => $qmatch );
1555             }
1556             else {
1557             $items = $self->_recent_chatter(
1558             queue => $qmatch,
1559             after => $subs->{after},
1560             counter => $subs->{counter},
1561 0           );
1562             }
1563              
1564 0           my $state;
1565 0           foreach my $row ( @{$items} ) {
  0            
1566              
1567 0           $subs->{ev_count}++; # count matches for this queue
1568 0           $self->{ev_count}++; # and overall
1569             try {
1570             # qmatch is the name of the queue matcher
1571 0     0     $state = $subs->{callback}->( $self, $qmatch, $row );
1572 0 0         if ( $row->{added} gt $subs->{after} ) {
1573 0           $subs->{after} = $row->{added};
1574             }
1575 0 0         if ( $row->{counter} > $subs->{counter} ) {
1576 0           $subs->{counter} = $row->{counter};
1577             }
1578             }
1579             catch {
1580 0     0     warn "listen: error in callback $@";
1581 0           };
1582             }
1583             }
1584 0           $started = 1;
1585             last
1586             if ( $params->{events}
1587 0 0 0       && $self->{ev_count} > $params->{events} );
1588             last
1589             if ( $params->{datetime}
1590 0 0 0       && time() > $params->{datetime} );
1591              
1592             # wait a bit to allow the queues to fillup
1593 0           sleep( $self->{listen_delay} );
1594             }
1595              
1596 0           return $self->{ev_count};
1597             }
1598              
1599             # -----------------------------------------------------------------------------
1600              
1601             =head2 unsubscribe
1602              
1603             Unsubscribe from a named queue.
1604              
1605             sub handler {
1606             state $counter = 0 ;
1607             my $q = shift ; # we get the queue object
1608             # the queue trigger that matched, the actual queue name and the data
1609             my ($qmatch, $queue, $data) = @_ ;
1610              
1611             # we are only interested in 10 messages
1612             if( ++$counter > 10) {
1613             $q->unsubscribe( queue => $queue) ;
1614             } else {
1615             say Data::Dumper( $data) ;
1616             }
1617             }
1618              
1619             my $queue = App::Basis::Queue->new( dbh => $dbh) ;
1620             $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
1621             $queue->listen() ;
1622              
1623             =cut
1624              
1625             sub unsubscribe {
1626 0     0 1   my $self = shift;
1627 0 0         my $params = @_ % 2 ? shift : {@_};
1628              
1629 0 0         if ( ref($params) ne 'HASH' ) {
1630 0           warn "unsubscribe accepts a hash or a hashref of parameters";
1631 0           return 0;
1632             }
1633              
1634 0 0         if ( $params->{queue} ) {
1635              
1636             # does not matter if the queue name does not exist!
1637 0           delete $self->{subscriptions}->{ $params->{queue} };
1638             }
1639             }
1640              
1641             # -----------------------------------------------------------------------------
1642              
1643             =head2 purge_tasks
1644              
1645             purge will remove all processed task items and failures (process_failure >= 5).
1646             These are completely removed from the database
1647              
1648             my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
1649             $queue->purge_tasks( queue => 'queue_name') ;
1650             my $after = $queue->stats( queue => 'queue_name') ;
1651              
1652             say "removed " .( $before->{total_records} - $after->{total_records}) ;
1653              
1654              
1655             before is optional and will default to 'now'
1656              
1657             =cut
1658              
1659             sub purge_tasks {
1660 0     0 1   my $self = shift;
1661 0 0         my $params = @_ % 2 ? shift : {@_};
1662              
1663 0 0         if ( ref($params) ne 'HASH' ) {
1664 0           warn "purge_tasks accepts a hash or a hashref of parameters";
1665 0           return 0;
1666             }
1667              
1668 0           my ($qname) = $params->{queue};
1669              
1670             # SQL wildcard replace
1671 0           $qname =~ s/\*/%/g;
1672              
1673 0 0         if ( !defined $params->{before} ) {
    0          
    0          
1674 0           $params->{before} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1675             }
1676             elsif ( $params->{before} =~ /^\d+$/ ) {
1677             $params->{before}
1678 0           = strftime( "%Y-%m-%d %H:%M:%S", localtime( $params->{before} ) );
1679             }
1680             elsif ( $params->{before} !~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/ ) {
1681 0           warn(
1682             "this does not look like a datetime value I can use: '$params->{before}'"
1683             );
1684 0           $params->{before} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1685             }
1686              
1687 0           my $sql = "WHERE queue_name LIKE ?
1688             AND processed = 1
1689             OR process_failure = 1
1690             AND msg_type = ?
1691             AND added <= ?";
1692              
1693             my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
1694 0           $sql, [ $qname, MSG_TASK, $params->{before} ] );
1695              
1696             # return the number of items deleted
1697 0           return $resp->{row_count};
1698             }
1699              
1700             # -----------------------------------------------------------------------------
1701              
1702             =head2 purge_chatter
1703              
1704             purge will remove all chatter messages.
1705             These are completely removed from the database
1706              
1707             my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;
1708              
1709             say "removed $del messages" ;
1710              
1711             before is optional and will default to 'now'
1712              
1713             =cut
1714              
1715             sub purge_chatter {
1716 0     0 1   my $self = shift;
1717 0 0         my $params = @_ % 2 ? shift : {@_};
1718              
1719 0 0         if ( ref($params) ne 'HASH' ) {
1720 0           warn "purge_chatter accepts a hash or a hashref of parameters";
1721 0           return 0;
1722             }
1723              
1724 0           my ($qname) = $params->{queue};
1725              
1726             # SQL wildcard replace
1727 0           $qname =~ s/\*/%/g;
1728              
1729 0           my $sql = "WHERE queue_name LIKE ?
1730             AND processed = 1
1731             OR process_failure = 1
1732             AND msg_type = ?
1733             AND added <= ?";
1734 0           my $sql_args = [ $qname, MSG_CHATTER, $params->{before} ];
1735              
1736 0 0         if ( defined $params->{counter} ) {
1737 0           my $sql = "WHERE queue_name LIKE ?
1738             AND processed = 1
1739             OR process_failure = 1
1740             AND msg_type = ?
1741             AND counter <= ?";
1742 0           my $sql_args = [ $qname, MSG_CHATTER, $params->{counter} ];
1743              
1744             }
1745             else {
1746 0 0         if ( !defined $params->{before} ) {
    0          
    0          
1747 0           $params->{before} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1748             }
1749             elsif ( $params->{before} =~ /^\d+$/ ) {
1750             $params->{before} = strftime( "%Y-%m-%d %H:%M:%S",
1751 0           localtime( $params->{before} ) );
1752             }
1753             elsif ( $params->{before} !~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/ )
1754             {
1755 0           warn(
1756             "this does not look like a datetime value I can use: '$params->{before}'"
1757             );
1758 0           $params->{before} = strftime( "%Y-%m-%d %H:%M:%S", localtime() );
1759             }
1760             }
1761              
1762 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
1763             $sql, $sql_args );
1764              
1765             # return the number of items deleted
1766 0           return $resp->{row_count};
1767             }
1768              
1769             # -----------------------------------------------------------------------------
1770              
1771             =head2 remove_queue
1772              
1773             remove a queue and all of its records (task and chatter)
1774              
1775             $queue->remove_queue( queue => 'queue_name') ;
1776             my $after = $queue->list_queues() ;
1777             # convert list into a hash for easier checking
1778             my %a = map { $_ => 1} @after ;
1779             say "queue removed" if( !$q->{queue_name}) ;
1780              
1781             * This does not handle wildcard queues *
1782              
1783             =cut
1784              
1785             sub remove_queue {
1786 0     0 1   my $self = shift;
1787 0 0         my $params = @_ % 2 ? shift : {@_};
1788              
1789 0 0         if ( ref($params) ne 'HASH' ) {
1790 0           warn "remove_queue accepts a hash or a hashref of parameters";
1791 0           return 0;
1792             }
1793              
1794 0           my ($qname) = $params->{queue};
1795              
1796             # SQL wildcard replace
1797 0           $qname =~ s/\*/%/g;
1798              
1799 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
1800             "WHERE queue_name LIKE ?", [$qname] );
1801 0           return $resp->{success};
1802             }
1803              
1804             # -----------------------------------------------------------------------------
1805              
1806             =head2 reset_failures
1807              
1808             clear any process_failure values from all unprocessed task items
1809              
1810             my $before = $queue->stats( queue => 'queue_name') ;
1811             $queue->reset_failures( queue => 'queue_name') ;
1812             my $after = $queue->stats( queue => 'queue_name') ;
1813              
1814             say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;
1815              
1816             =cut
1817              
1818             sub reset_failures {
1819 0     0 1   my $self = shift;
1820 0 0         my $params = @_ % 2 ? shift : {@_};
1821              
1822 0 0         if ( ref($params) ne 'HASH' ) {
1823 0           warn "reset_failures accepts a hash or a hashref of parameters";
1824 0           return 0;
1825             }
1826              
1827 0           my $qname = $params->{queue};
1828              
1829             # SQL wildcard replace
1830 0           $qname =~ s/\*/%/g;
1831              
1832 0           my $sql = "SET process_failure=0";
1833 0           $sql .= " WHERE queue_name LIKE ?
1834             AND process_failure = 1
1835             AND msg_type = ?";
1836 0           my $resp = $self->_update_db( $self->{prefix} . "_queue",
1837             $sql, [ $qname, MSG_TASK ] );
1838              
1839 0 0         return $resp->{row_count} ? $resp->{row_count} : 0;
1840             }
1841              
1842             # -----------------------------------------------------------------------------
1843              
1844             =head2 remove_failures
1845              
1846             permanently delete task failures from the database
1847              
1848             $queue->remove_failues( queue => 'queue_name') ;
1849             my $stats = $queue->stats( queue => 'queue_name') ;
1850             say "failues left " .( $stats->{failures}) ;
1851              
1852             =cut
1853              
1854             sub remove_failures {
1855 0     0 1   my $self = shift;
1856 0 0         my $params = @_ % 2 ? shift : {@_};
1857              
1858 0 0         if ( ref($params) ne 'HASH' ) {
1859 0           warn "remove_failures accepts a hash or a hashref of parameters";
1860 0           return 0;
1861             }
1862              
1863 0           my ($qname) = $params->{queue};
1864              
1865             # SQL wildcard replace
1866 0           $qname =~ s/\*/%/g;
1867              
1868 0           my $sql = "WHERE process_failure = 1 AND msg_type = ?";
1869 0           my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
1870             $sql, [MSG_TASK] );
1871              
1872 0           return $resp->{row_count};
1873             }
1874              
1875             # -----------------------------------------------------------------------------
1876              
1877             =head2 remove_tables
1878              
1879             If you never need to use the database again, it can be completely removed
1880              
1881             $queue_>remove_tables() ;
1882              
1883             =cut
1884              
1885             sub remove_tables {
1886 0     0 1   my $self = shift;
1887              
1888 0           my $sql = sprintf( 'DROP TABLE %s_queue;', $self->{prefix} );
1889 0           $self->_debug($sql);
1890 0           $self->{dbh}->do($sql);
1891             }
1892              
1893             # -----------------------------------------------------------------------------
1894              
1895             1;