File Coverage

blib/lib/TheSchwartz/JobScheduler.pm
Criterion Covered Total %
statement 179 197 90.8
branch 45 62 72.5
condition 9 18 50.0
subroutine 20 21 95.2
pod 3 3 100.0
total 256 301 85.0


line stmt bran cond sub pod time code
1             package TheSchwartz::JobScheduler;
2             ## no critic (ControlStructures::ProhibitPostfixControls)
3             ## no critic (Subroutines::RequireArgUnpacking)
4             ## no critic (ControlStructures::ProhibitUnlessBlocks)
5              
6 6     6   2404085 use strict;
  6         15  
  6         220  
7 6     6   30 use warnings;
  6         15  
  6         464  
8              
9             # ABSTRACT: Lightweight TheSchwartz job dispatcher with maintained database connections
10              
11             our $VERSION = '0.002'; # VERSION: generated by DZP::OurPkgVersion
12              
13 6     6   37 use Carp;
  6         10  
  6         548  
14 6     6   2251 use English '-no_match_vars';
  6         14001  
  6         39  
15 6     6   6036 use Storable;
  6         27012  
  6         453  
16 6     6   1343 use Module::Load qw( load );
  6         2751  
  6         57  
17 6     6   468 use Scalar::Util qw( refaddr );
  6         13  
  6         323  
18 6     6   810 use Const::Fast;
  6         4467  
  6         48  
19              
20 6     6   1715 use Moo;
  6         15683  
  6         46  
21 6     6   6822 use Log::Any qw( $log ), hooks => { build_context => [ \&_build_context, ], };
  6         18473  
  6         70  
22 6     6   15068 use Log::Any::Adapter::Util;
  6         28  
  6         745  
23              
24             sub _build_context {
25              
26             # my ($level, $category, $data) = @_;
27 0     0   0 my %ctx;
28 0         0 my @caller = Log::Any::Adapter::Util::get_correct_caller();
29 0         0 $ctx{file} = $caller[1];
30 0         0 $ctx{line} = $caller[2];
31 0         0 return %ctx;
32             }
33 6     6   3422 use Carp::Assert;
  6         8150  
  6         40  
34              
35 6     6   4225 use TheSchwartz::JobScheduler::Job;
  6         54  
  6         14426  
36              
37             const my $DEFAULT_OPTION_UNIQKEY => 'no_check';
38              
39             has databases => (
40             is => 'ro',
41             required => 1,
42             );
43              
44             has dbh_callback => (
45             is => 'ro',
46             required => 0,
47             );
48              
49             has _funcmap => (
50             is => 'ro',
51             default => sub { {}; },
52             );
53              
54             has opts => (
55             is => 'ro',
56             default => sub {
57             return { handle_uniqkey => $DEFAULT_OPTION_UNIQKEY, };
58             },
59             );
60              
61             sub insert {
62 14     14 1 2157 my ( $self, %args ) = @_;
63              
64             # use Data::Dumper;
65             # warn Dumper \%args;
66              
67 14 50       95 croak q{No job} unless exists $args{'job'};
68 14         56 my $job = $args{'job'};
69 14 50       102 croak q{Argument job not TheSchwartz::JobScheduler::Job} unless $job->isa('TheSchwartz::JobScheduler::Job');
70              
71             # my $databases = $args{'databases'};
72 14         135 my $databases = $self->databases;
73 14 100       172 $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg;
74              
75             # $log->debugf( 'TheSchwartz::JobScheduler::insert(): databases: %s', $databases );
76 14         1280 foreach my $database_id ( keys %{$databases} ) {
  14         62  
77              
78             # $log->debugf( 'TheSchwartz::JobScheduler::insert(): db: %s', $db );
79              
80 14 100       76 my $dbh_callback = exists $args{'dbh_callback'} ? $args{'dbh_callback'} : $self->dbh_callback;
81 14         66 my $dbh = _get_dbh( $database_id, $dbh_callback );
82              
83             # $log->debugf( 'TheSchwartz::JobScheduler::insert(): dbh: %s', $dbh );
84 14   50     103 my $prefix = $databases->{$database_id}->{'prefix'} // q{};
85              
86             # $log->debugf( 'TheSchwartz::JobScheduler::insert(): prefix: %s', $prefix );
87              
88 14         32 my $jobid;
89              
90             # local $EVAL_ERROR = undef;
91             # my $r = eval {
92             # $job->funcid( $self->funcname_to_id( $database, $job->funcname ) );
93 14         98 $job->funcid( $self->funcname_to_id( $dbh, $prefix, $job->funcname ) );
94 14         68 $job->insert_time(time);
95              
96 14 100 100     121 if ( $job->uniqkey && $self->opts->{'handle_uniqkey'} eq 'acknowledge' ) {
97 2         7 my $row = $job->as_hashref;
98 2         7 my @query_cols = qw( jobid );
99 2         6 my @where_cols = qw( funcid uniqkey );
100 2         7 my $sql = sprintf 'SELECT %s FROM %sjob WHERE funcid = ? AND uniqkey = ?', ( join q{, }, @query_cols ), $prefix;
101              
102 2         14 my $sth = $dbh->prepare_cached($sql);
103 2         206 my $i = 1;
104 2         7 for my $where_col (@where_cols) {
105 4         19 $sth->bind_param( $i++, $row->{$where_col}, _bind_param_attr( $dbh, $prefix, $where_col ), );
106             }
107 2         112 $sth->execute();
108              
109             # Strange if there would be more than one entry!
110 2         7 my @job_ids;
111 2         32 while ( my $ref = $sth->fetchrow_arrayref ) {
112 2         14 push @job_ids, $ref->[0];
113             }
114              
115             # $sth->finish;
116             # https://metacpan.org/pod/DBI#finish
117             # Indicate that no more data will be fetched from this statement
118             # handle before it is either executed again or destroyed.
119             # You almost certainly do not need to call this method.
120             # Adding calls to finish after loop that fetches all rows
121             # is a common mistake, don't do it, it can mask genuine
122             # problems like uncaught fetch errors.
123 2 50       289 return $job_ids[0] if (@job_ids);
124             }
125              
126 12         69 my $row = $job->as_hashref;
127 12         29 my @col = keys %{$row};
  12         66  
128 12         119 my $sql = sprintf 'INSERT INTO %sjob (%s) VALUES (%s)', $prefix, ( join q{, }, @col ), ( join q{, }, (q{?}) x @col );
129              
130 12         170 my $sth = $dbh->prepare_cached($sql);
131 12         1840 my $i = 1;
132 12         37 for my $col (@col) {
133 67         234 $sth->bind_param( $i++, $row->{$col}, _bind_param_attr( $dbh, $prefix, $col ), );
134             }
135 12         218287 $sth->execute();
136              
137 11         178 $jobid = _insert_id( $dbh, $prefix, $sth, "${prefix}job", 'jobid' );
138              
139 11         145 $log->debugf( 'TheSchwartz::JobScheduler::insert() jobid: %s', $jobid );
140 11 50       2034 return $jobid if defined $jobid;
141             }
142              
143 0         0 return;
144             }
145              
146             sub funcname_to_id {
147 27     27 1 3753 my ( $self, $dbh, $prefix, $funcname ) = @_;
148 27         195 $log->debugf( 'TheSchwartz::JobScheduler::funcname_to_id(%s, %s, %s)', $dbh, $prefix, $funcname );
149              
150             # my ( $dbh, $prefix ) = ($database->dbh, $database->prefix);
151              
152 27         153 my $dbid = refaddr $dbh;
153 27 100       169 unless ( exists $self->_funcmap->{$dbid} ) {
154 20         229 my $sth = $dbh->prepare_cached("SELECT funcid, funcname FROM ${prefix}funcmap");
155 20         4159 $sth->execute;
156 20         417 while ( my $row = $sth->fetchrow_arrayref ) {
157 16         266 $self->_funcmap->{$dbid}{ $row->[1] } = $row->[0];
158             }
159              
160             # $sth->finish;
161             # See above
162             }
163              
164 27 100       175 unless ( exists $self->_funcmap->{$dbid}{$funcname} ) {
165             ## This might fail in a race condition since funcname is UNIQUE
166 12         115 my $sth = $dbh->prepare_cached("INSERT INTO ${prefix}funcmap (funcname) VALUES (?)");
167 12         1123 local $EVAL_ERROR = undef;
168 12         37 my $r = eval { $sth->execute($funcname) };
  12         195350  
169 12 100       154 if ( !$r ) {
170 1         3 my $error = $EVAL_ERROR;
171 1         28 $log->warn( ' Unable to insert the funcname \'%s\'. Error: %s', $funcname, $error );
172             }
173              
174 12         114 my $id = _insert_id( $dbh, $prefix, $sth, "${prefix}funcmap", 'funcid' );
175              
176             ## If we got an exception, try to load the record again
177 12 100       57 if ($EVAL_ERROR) {
178 1         8 $sth = $dbh->prepare_cached("SELECT funcid FROM ${prefix}funcmap WHERE funcname = ?");
179 1         176 $sth->execute($funcname);
180 1 50       21 $id = $sth->fetchrow_arrayref->[0]
181             or croak "Can't find or create funcname $funcname: $EVAL_ERROR";
182             }
183              
184 12         145 $self->_funcmap->{$dbid}{$funcname} = $id;
185             }
186              
187 27         267 $log->debugf( 'TheSchwartz::JobScheduler::funcname_to_id(): %s', $self->_funcmap->{$dbid}{$funcname} );
188 27         248 return $self->_funcmap->{$dbid}{$funcname};
189             }
190              
191             sub _insert_id {
192 23     23   144 my ( $dbh, $prefix, $sth, $table, $col ) = @_;
193              
194             # my ( $dbh, $prefix ) = ($database->dbh, $database->prefix);
195              
196 23         1107 my $driver = $dbh->{Driver}{Name};
197 23 50       237 if ( $driver eq 'mysql' ) {
    50          
    50          
198 0         0 return $dbh->{mysql_insertid};
199             }
200             elsif ( $driver eq 'Pg' ) {
201 0         0 return $dbh->last_insert_id( undef, undef, undef, undef, { sequence => ( join q{_}, $table, $col, 'seq' ) } );
202             }
203             elsif ( $driver eq 'SQLite' ) {
204 23         347 return $dbh->func('last_insert_rowid');
205             }
206             else {
207 0         0 croak "Don't know how to get last insert id for $driver";
208             }
209             }
210              
211             sub list_jobs {
212 6     6 1 2835 my ( $self, %args ) = @_;
213 6         44 $log->debugf( 'TheSchwartz::JobScheduler::list_jobs(%s)', \%args );
214 6 50       47 croak q{No search_params} unless exists $args{'search_params'};
215 6         19 my $search_params = $args{'search_params'};
216              
217 6 50       24 croak q{No funcname} unless exists $search_params->{funcname};
218              
219 6         18 my @options;
220             push @options,
221             {
222             key => 'run_after',
223             op => '<=',
224             value => $search_params->{run_after}
225 6 50       22 } if exists $search_params->{run_after};
226             push @options,
227             {
228             key => 'grabbed_until',
229             op => '<=',
230             value => $search_params->{grabbed_until}
231             }
232 6 50       27 if exists $search_params->{grabbed_until};
233              
234 6 50       37 if ( $search_params->{coalesce} ) {
235 0   0     0 $search_params->{coalesce_op} ||= q{=};
236             push @options,
237             {
238             key => 'coalesce',
239             op => $search_params->{coalesce_op},
240             value => $search_params->{coalesce}
241 0         0 };
242             }
243              
244 6         15 my @jobs;
245 6         41 my $databases = $self->databases;
246 6         15 foreach my $database_id ( keys %{$databases} ) {
  6         30  
247 10 100       88 my $dbh_callback = exists $args{'dbh_callback'} ? $args{'dbh_callback'} : $self->dbh_callback;
248 10         40 my $dbh = _get_dbh( $database_id, $dbh_callback );
249 10   50     77 my $prefix = $databases->{$database_id}->{'prefix'} // q{};
250              
251 10         27 local $EVAL_ERROR = undef;
252 10         24 my $r = eval {
253 10         56 my $funcid = $self->funcname_to_id( $dbh, $prefix, $search_params->{funcname} );
254              
255 10         67 my $sql = "SELECT * FROM ${prefix}job WHERE funcid = ?";
256 10         49 my @value = ($funcid);
257 10         38 for (@options) {
258 0         0 $sql .= " AND $_->{key} $_->{op} ?";
259 0         0 push @value, $_->{value};
260             }
261              
262 10         93 my $sth = $dbh->prepare_cached($sql);
263 10         2184 $sth->execute(@value);
264 10         557 while ( my $ref = $sth->fetchrow_hashref ) {
265 9         59 $log->debugf( 'TheSchwartz::JobScheduler::list_jobs(): fetch:ref: %s', $ref );
266              
267             # my $job_fields = Storable::dclone( $ref );
268             # $job->search_params( Storable::nfreeze( $job->search_params ) ) if ref $job->search_params;
269 9         78 my $arg_tmp = _cond_thaw( $ref->{'arg'} );
270 9         434 my $job = TheSchwartz::JobScheduler::Job->new($ref);
271 9         150 $job->arg($arg_tmp);
272 9         296 push @jobs, $job;
273             }
274 10         56 1;
275             };
276 10 50       996 if ( !$r ) {
277 0         0 my $error = $EVAL_ERROR;
278 0         0 $log->warn( ' Unable to fetch jobs for funcname \'%s\' (id: %s). Error: %s', $search_params->{funcname}, $error );
279             }
280             }
281              
282 6         77 $log->debugf( 'TheSchwartz::JobScheduler::list_jobs(): %s', \@jobs );
283 6         65 return @jobs;
284             }
285              
286             sub _bind_param_attr {
287 71     71   168 my ( $dbh, $prefix, $col ) = @_;
288              
289             # my ( $dbh, $prefix ) = ( $database->dbh, $database->prefix );
290              
291 71 100       461 return if $col ne 'arg';
292              
293 12         179 my $driver = $dbh->{Driver}{Name};
294 12 50       87 if ( $driver eq 'Pg' ) {
    50          
295 0         0 return { pg_type => DBD::Pg::PG_BYTEA() };
296             }
297             elsif ( $driver eq 'SQLite' ) {
298 12         121 return DBI::SQL_BLOB();
299             }
300 0         0 return;
301             }
302              
303             # Shamelessly copied from TheSchwartz::Job
304             # Perl::Critic applied
305             sub _cond_thaw {
306 9     9   36 my $data = shift;
307              
308 9         88 my $magic = eval { Storable::read_magic($data); };
  9         53  
309 9 100 66     677 if ( $magic
      33        
      33        
310             && $magic->{major}
311             && $magic->{major} >= 2
312             && $magic->{major} <= 5 ) ## no critic (ValuesAndExpressions::ProhibitMagicNumbers)
313             {
314 8         21 my $thawed = eval { Storable::thaw($data) };
  8         34  
315 8 50       397 if ($@) { ## no critic (Variables::ProhibitPunctuationVars)
316              
317             # false alarm... looked like a Storable, but wasn't.
318 0         0 return $data;
319             }
320 8         42 return $thawed;
321             }
322             else {
323 1         6 return $data;
324             }
325             }
326              
327             sub _get_dbh {
328 31     31   164545 my ( $database_id, $dbh_callback ) = @_;
329              
330             # my $cb = $dbh_callback;
331 31         111 my $dbh;
332 31 100       103 if ( ref $dbh_callback eq 'CODE' ) {
333 19         36 $dbh = &{$dbh_callback}($database_id);
  19         79  
334             }
335             else {
336 12         19 my $dbh_callback_code = $dbh_callback;
337 12         116 my ( $module, $creator ) = split qr/\-\>/msx, $dbh_callback_code;
338 12         37 local $EVAL_ERROR = undef;
339 12         21 my $r = eval { load $module };
  12         51  
340 12 100       3568 if ($EVAL_ERROR) {
341 1         150 croak q{Cannot load dbh_callback module '}, $module, q{'};
342             }
343 11         15 my $callback;
344 11         18 local $EVAL_ERROR = undef;
345 11         22 $r = eval { $callback = $module->$creator() };
  11         130  
346 11 100       5228 if ($EVAL_ERROR) {
347 1         94 croak q{Cannot instantiate dbh_callback module '}, "$module->$creator", q{'};
348             }
349 10         20 local $EVAL_ERROR = undef;
350 10         19 $r = eval { $dbh = $callback->dbh($database_id) };
  10         51  
351 10 100       2864 if ($EVAL_ERROR) {
352 1         78 croak q{Cannot get dbh from callback '}, "$module->$creator->dbh( $database_id )", q{'};
353             }
354             }
355              
356 28         18096 return $dbh;
357             }
358              
359             1;
360              
361             __END__
362              
363             =pod
364              
365             =encoding UTF-8
366              
367             =head1 NAME
368              
369             TheSchwartz::JobScheduler - Lightweight TheSchwartz job dispatcher with maintained database connections
370              
371             =head1 VERSION
372              
373             version 0.002
374              
375             =head1 SYNOPSIS
376              
377             use TheSchwartz::JobScheduler;
378             my @databases = (
379             { id => 'db_1', prefix => 'theschwartz_schema.', },
380             { id => 'db_2', prefix => 'theschwartz_schema.', },
381             );
382             use Database::ManagedHandle;
383             sub get_dbh {
384             my ($db_id) = @_;
385             my $mh1 = Database::ManagedHandle->instance;
386             return $mh1->dbh( $db_id );
387             }
388              
389             my $client = TheSchwartz::JobScheduler->new(
390             databases => \@databases,
391             dbh_callback => \&get_dbh,
392             );
393             my $job_id = $client->insert(
394             job => TheSchwartz::JobScheduler::Job->new(
395             funcname => 'fetch',
396             arg => {type => 'site', url => 'https://example.com/'},
397             ),
398             );
399              
400             my $job1 = TheSchwartz::JobScheduler::Job->new;
401             $job1->funcname("WorkerName");
402             $job1->arg({ foo => "bar" });
403             $job1->uniqkey("uniqkey");
404             $job1->run_after( time + 60 );
405             $client->insert( job => $job1 );
406             my $job2 = TheSchwartz::JobScheduler::Job->new(
407             funcname => 'WorkerName',
408             arg => { foo => 'baz' },
409             );
410             $client->insert( job => $job2 );
411              
412             my @jobs = $client->list_jobs( search_params => { funcname => 'funcname' }, );
413             for my $job (@jobs) {
414             print $job->jobid;
415             }
416              
417             =head1 DESCRIPTION
418              
419             TheSchwartz::JobScheduler is an interface to insert a new job into
420             TheSchwartz job queue (maintained by a database).
421              
422             The rationale behind this module is using it in a long running web service,
423             for instance, in L<Dancer2>. Because the database connections cannot
424             be relied to stay open indefinitely, we get a new database handle
425             for each operation.
426              
427             This module is solely created for the purpose of injecting a new job
428             from web servers without loading additional TheSchwartz and
429             Data::ObjectDriver modules onto your system. Your TheSchwartz job worker
430             processes will still need to be implemented using the full featured
431             L<TheSchwartz::Worker> module.
432              
433             =head2 Configuration: Databases and Their Handles
434              
435             L<TheSchwartz> can use several different databases simultaneously,
436             for instance, to share load and distribute jobs safely to only
437             those workers who could, in turn, demand restricted access.
438             This makes TheSchwartz very decentralized.
439              
440             If your setup is reasonably simple, for instance, a webapp,
441             e.g. L<Dancer2>, and L<TheSchwartz>
442             as a worker system executing long running tasks which would
443             disrupt the webapp, then perhaps you only use one database.
444             In that case, you can consider using the same database handle
445             in both webapp and TheSchwartz. If you use database transactions
446             to ensure an atomized commit, you can involve TheSchwartz::JobScheduler
447             in the same transaction. If your transaction fails after worker task
448             is inserted, then also the worker task gets cancelled (rollbacked).
449              
450             If, however, your TheSchwartz system is complex or otherwise separate
451             from the systems which create the tasks, or you simply use more than one
452             database in TheSchwartz, you cannot share your other database handles
453             with TheSchwartz::JobScheduler. Scheduler might need to access all databases
454             in sequence to place the task in the right one. Besides this,
455             TheSchwartz::JobScheduler is prepared for the possibility of one or more
456             databases being off-line. It loops through all the databases
457             until it gets a working database handle.
458              
459             Database handles are provided by the calling program.
460             This allows the caller to use any available system to provide
461             the handles. If TheSchwartz::JobScheduler receives an C<undef>
462             instead of a database handle, it tries the next database.
463             If there is no working database handles, it croaks.
464              
465             Database configuration does not need database addresses, dns:s
466             or usernames and passwords. Because TheSchwartz::JobScheduler
467             gets the database handle from outside, it only needs to know
468             a database id to separate between databases and a possible
469             prefix for each database. Prefix is prepended to every
470             database table and sequence name. If your database uses a different schema
471             than the default one for TheSchwartz tables, use C<prefix>
472             to solve this.
473              
474             my %dbs = (
475             db_1 => [ 'dbi:SQLite:...', undef, undef, {} ],
476             db_2 => [ 'dbi:SQLite:...', undef, undef, {} ],
477             );
478             sub get_dbh {
479             my ($id) = @_;
480             my @connection_info = @{ $dbs->{ $id } };
481             return DBI->connect( @connection_info );
482             };
483             my %databases = (
484             db_1 => { prefix => 'theschwartz_schema.', dbh_callback => \&get_dbh, },
485             db_2 => { prefix => 'another_schema.', dbh_callback => \&get_dbh, },
486             );
487             use TheSchwartz::JobScheduler;
488             my $scheduler = TheSchwartz::JobScheduler->new(
489             databases => \%databases,
490             dbh_callback => \&get_dbh,
491             );
492              
493             In the following example the calling
494             program is using L<Database::ManagedHandle>, a module
495             which makes certain that a database handle is always usable.
496              
497             # First create a Database::ManagedHandle config class
498             # See Database::ManagedHandle for instructions
499             # Then just use it:
500             my %databases = (
501             db_1 => {
502             prefix => 'theschwartz_schema.',
503             },
504             db_2 => {
505             {
506             prefix => 'another_schema.',
507             },
508             );
509             use TheSchwartz::JobScheduler;
510             my $scheduler = TheSchwartz::JobScheduler->new(
511             databases => \%databases,
512             dbh_callback => 'Database::ManagedHandle->instance',
513             );
514              
515             =head2 DBH Callback
516              
517             The item C<dbh_callback> can be either a CODE reference, i.e. a subroutine,
518             or a string which when executed with C<eval> will produce an object.
519             This object must have at least one method: C<dbh()>. This method,
520             when called, must return either a C<DBI::db> object (such as created by
521             C<< DBI->connect >>, or an C<undef>.
522              
523             use TheSchwartz::JobScheduler;
524             my $scheduler = TheSchwartz::JobScheduler->new(
525             databases => \%databases,
526             dbh_callback => 'Database::ManagedHandle->instance',
527             );
528              
529             You can specify C<dbh_callback> either when creating the client object
530             or when calling C<insert()> or C<list_jobs()>. If you are using
531             TheSchwartz::JobScheduler as part of another system, for example,
532             a web service, you will probably want to share one opened database handle
533             because that will allow you to include TheSchwartz::JobScheduler into
534             a transaction.
535              
536             my %databases = (
537             db_1 => { prefix => 'theschwartz_schema.', },
538             db_2 => { prefix => 'another_schema.', },
539             );
540             sub get_dbh {
541             my ($id) = @_;
542             my @connection_info = @{ $databases{ $id } };
543             return DBI->connect( @connection_info );
544             };
545             use TheSchwartz::JobScheduler;
546             my $scheduler = TheSchwartz::JobScheduler->new(
547             databases => \%databases,
548             );
549             my $job = TheSchwartz::JobScheduler::Job->new(
550             funcname => 'my_func',
551             );
552             $scheduler->insert(
553             job => $job,
554             dbh_callback => $get_dbh,
555             );
556              
557             =head2 Uniqkey
558              
559             The C<uniqkey> field is an arbitrary string identifier
560             used to prevent applications from posting duplicate jobs.
561             At most one with the same uniqkey value can be posted
562             to a single TheSchwartz database.
563              
564             There are, however, valid situations when inserting the same
565             job and uniqkey would make sense. For instance, in a case
566             when several different actions one after another but independent of each other
567             would result in the same job being required to run.
568              
569             Note, the job arguments do not enter into the uniqueness consideration,
570             only job name and unique key (C<funcid> and C<uniqkey> fields).
571              
572             Depending on the database and whether uniqueness is protected with
573             database constraints, such as primary keys, trying to insert another
574             job with the same C<uniqkey> can cause an error, the previous row being
575             rewritten with new content and new arguments, or another row being created.
576              
577             User can choose how to deal with this situation.
578             When instantiating C<TheSchwartz::JobScheduler>, user can define
579             the additional option C<handle_uniqkey> with any of the
580             following values:
581              
582             =over 8
583              
584             =item B<no_check>
585              
586             This option does not do any checking on the condition. If the database
587             is configured to not allow an insert operation, it will throw
588             an exception. User must be prepared for this, for instance,
589             by enclosing the operation in C<eval>.
590              
591             This is the default setting.
592              
593             =item B<overwrite>
594              
595             Update the fields C<arg>, C<insert_time>, C<run_after>, C<grabbed_until>,
596             C<priority> and C<coalesce>, and return the existing entry's C<jobid>.
597             This setting will create a slight overhead.
598              
599             Not yet implemented.
600              
601             =item B<acknowledge>
602              
603             If there is already a matching entry (C<funcid> and C<uniqkey> fields),
604             no change will be made. The C<jobid> of the existing entry will be returned.
605             This setting will create a slight overhead.
606              
607             =back
608              
609             B<N.B. This option is used only when TheSchwartz::JobScheduler::Job has
610             set the field C<uniqkey>.
611             If you don't use uniqkey, this problem will never arise.>
612              
613             B<N.B.2. Using either c<overwrite> or C<acknowledge> is the recommended
614             value. Only in situations which require extreme throughput, should you
615             consider other alternatives for this problem.>
616              
617             # Depending on the database table settings,
618             # this will either throw an exception or
619             # it will pass and result with invalid table data.
620             my $scheduler = TheSchwartz::JobScheduler->new(
621             databases => \%databases,
622             dbh_callback => 'Database::ManagedHandle->instance',
623             opts => {
624             handle_uniqkey => 'no_check',
625             },
626             );
627             my $job = TheSchwartz::JobScheduler::Job->new(
628             funcname => 'Test::uniqkey',
629             arg => { an_item => 'value A' },
630             uniqkey => 'UNIQUE_STR_A',
631             );
632             $scheduler->insert( $job );
633             $job = TheSchwartz::JobScheduler::Job->new(
634             funcname => 'Test::uniqkey',
635             arg => { an_item => 'value B' },
636             uniqkey => 'UNIQUE_STR_A',
637             );
638             $scheduler->insert( $job );
639              
640             =head2 Logging
641              
642             TheSchwartz::JobScheduler uses the excellent L<Log::Any> to produce logging messages.
643              
644             The easiest way to get the logging messages printed is to add the following line
645             in the preamble of your program:
646              
647             use Log::Any::Adapter ('Stdout', log_level => 'debug' );
648              
649             Alternative, you can do this on the command line:
650              
651             perl '-MLog::Any::Adapter(Stdout, log_level=>trace)'
652              
653             =head2 databases
654              
655             The databases used by TheSchwartz.
656              
657             Please see above L</"Configuration: Databases and Their Handles">.
658              
659             =head2 dbh_callback
660              
661             Callback for TheSchwartz::JobScheduler to get a database handle.
662              
663             Please see above L</"Configuration: Databases and Their Handles">.
664              
665             =head2 opts
666              
667             Additional options for controlling other features, including uniqkey.
668              
669             Please see above L<Uniqkey>.
670              
671             Example:
672              
673             my $scheduler = TheSchwartz::JobScheduler->new(
674             databases => \@databases,
675             dbh_callback => 'Database::ManagedHandle->instance',
676             opts => {
677             handle_uniqkey => 'no_check',
678             },
679             );
680              
681             =head2 insert
682              
683             Return a list of active jobs collected from all accessible databases.
684             Create a job.
685              
686             Parameters: job (TheSchwartz::JobScheduler::Job)
687              
688             my @jobs = $client->insert(
689             job => TheSchwartz::JobScheduler::Job->new(
690             funcname => 'fetch',
691             arg => {type => 'site', url => 'https://example.com/'},
692             ),
693             );
694              
695             =head2 funcname_to_id
696              
697             Fetch function id from database. If not exists, then insert.
698              
699             =head2 list_jobs
700              
701             Return a list of active jobs collected from all accessible databases.
702              
703             Parameters: A hash containing named parameters.
704              
705             my @jobs = $client->list_jobs(
706             search_params => { funcname => 'fetch_webpage'},
707             );
708              
709             =begin Pod::Coverage
710              
711              
712              
713              
714             =end Pod::Coverage
715              
716             =for stopwords TheSchwartz DBI Uniqkey uniqkey webapp
717              
718             =head1 THANKS
719              
720             This module is very much inspired by L<TheSchwartz::Simple>.
721              
722             =head1 SEE ALSO
723              
724             =over 8
725              
726             =item L<TheSchwartz>
727              
728             =item L<TheSchwartz::Simple>
729              
730             =back
731              
732             =head1 AUTHOR
733              
734             Mikko Koivunalho <mikkoi@cpan.org>
735              
736             =head1 COPYRIGHT AND LICENSE
737              
738             This software is copyright (c) 2023 by Mikko Koivunalho.
739              
740             This is free software; you can redistribute it and/or modify it under
741             the same terms as the Perl 5 programming language system itself.
742              
743             =cut