File Coverage

blib/lib/Jonk.pm
Criterion Covered Total %
statement 15 132 11.3
branch 0 30 0.0
condition 0 29 0.0
subroutine 5 32 15.6
pod 5 5 100.0
total 25 228 10.9


line stmt bran cond sub pod time code
1             package Jonk;
2 1     1   52891 use strict;
  1         3  
  1         50  
3 1     1   7 use warnings;
  1         3  
  1         157  
4 1     1   749 use Jonk::Job;
  1         3  
  1         32  
5 1     1   1399 use Try::Tiny;
  1         3206  
  1         77  
6 1     1   9 use Carp ();
  1         2  
  1         3044  
7              
8             our $VERSION = '0.10_02';
9              
10             sub new {
11 0     0 1   my ($class, $dbh, $opts) = @_;
12              
13 0 0         unless ($dbh) {
14 0           Carp::croak('missing job queue database handle.');
15             }
16              
17 0           my $functions = _parse_functions($opts);
18 0   0       my $table_name = $opts->{table_name} || 'job';
19 0           my $driver = _verify_driver($dbh);
20              
21 0           bless {
22             dbh => $dbh,
23             table_name => $table_name,
24             functions => $functions,
25             driver => $driver,
26             has_func => scalar(keys %{$functions}) ? 1 : 0,
27              
28             _errstr => undef,
29              
30             insert_time_callback => ($opts->{insert_time_callback}||sub{
31 0     0     my ( $sec, $min, $hour, $mday, $mon, $year, undef, undef, undef ) = localtime(time);
32 0           return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
33 0           }),
34              
35             insert_query => sprintf(
36             'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,?,0,?)'
37             ,$table_name
38             ),
39              
40             grab_query => sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $table_name),
41              
42             lookup_job_query => sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ? AND run_after <= ?', $table_name),
43              
44             find_job_query => sprintf(
45             'SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
46 0 0 0       $table_name, (join(', ', map { "'$_'" } keys %{$functions})), ($opts->{job_find_size} || 50)
  0   0        
47             ),
48              
49             delete_query => sprintf('DELETE FROM %s WHERE id = ?', $table_name),
50              
51             failed_query => sprintf('UPDATE %s SET retry_cnt = retry_cnt + 1, run_after = ?, grabbed_until = 0, priority = ? WHERE id = ?', $table_name),
52              
53             unixtime_query => _settled_unixtime_query($driver),
54              
55             }, $class;
56             }
57              
58             sub _parse_functions {
59 0     0     my $opts = shift;
60              
61 0   0       my $functions = $opts->{functions} || [];
62 0   0       my $default_grab_for = $opts->{default_grab_for} || (60*60);
63              
64 0           my $funcs = +{};
65 0           for (my $i = 0; $i < @{$functions}; $i++) {
  0            
66 0           my $func = $functions->[$i];
67              
68 0           my $value;
69 0 0         if (not defined $functions->[$i+1]) {$i++ }
  0 0          
  0            
70             elsif (ref $functions->[$i+1]) {$value = $functions->[++$i]}
71              
72 0   0       $value->{grab_for} ||= $default_grab_for;
73 0   0 0     $value->{serializer} ||= ($opts->{default_serializer} || sub {$_[0]});
  0   0        
74 0   0 0     $value->{deserializer} ||= ($opts->{default_deserializer} || sub {$_[0]});
  0   0        
75              
76 0           $funcs->{$func} = $value;
77             }
78 0           $funcs;
79             }
80              
81             sub _verify_driver {
82 0     0     my $dbh = shift;
83 0           my $driver = $dbh->{Driver}{Name};
84 0 0         $driver =~ /(mysql|SQLite|Pg)/ ? $driver : Carp::croak('Jonk support only mysql,SQLite,Pg');
85             }
86              
87             sub _settled_unixtime_query {
88 0     0     my $driver = shift;
89              
90 0 0         if ($driver eq 'Pg') {
    0          
91 0           return "SELECT TRUNC(EXTRACT('epoch' from NOW()))";
92             } elsif ($driver eq 'mysql') {
93 0           return 'SELECT UNIX_TIMESTAMP()';
94             }
95             }
96              
97 0     0 1   sub errstr {$_[0]->{_errstr}}
98              
99             sub insert {
100 0     0 1   my ($self, $func, $arg, $opt) = @_;
101              
102 0           my $job_id;
103             try {
104 0     0     $self->{_errstr} = undef;
105 0           local $self->{dbh}->{RaiseError} = 1;
106 0           local $self->{dbh}->{PrintError} = 0;
107              
108 0   0       my $serializer = $self->{functions}->{$func}->{serializer} || sub {$_[0]};
109 0           my $sth = $self->{dbh}->prepare_cached($self->{insert_query});
110 0           $sth->bind_param(1, $func);
111 0           $sth->bind_param(2, $serializer->($arg), _bind_param_attr($self->{driver}));
112 0           $sth->bind_param(3, $self->{insert_time_callback}->());
113 0   0       $sth->bind_param(4, $opt->{run_after}||0);
114 0   0       $sth->bind_param(5, $opt->{priority} ||0);
115 0           $sth->execute();
116              
117 0           $job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
118 0           $sth->finish;
119             } catch {
120 0     0     $self->{_errstr} = "can't insert for job queue database: $_"
121 0           };
122              
123 0           $job_id;
124             }
125              
126             sub _bind_param_attr {
127 0     0     my $driver = shift;
128              
129 0 0         if ( $driver eq 'Pg' ) {
    0          
130 0           return { pg_type => DBD::Pg::PG_BYTEA() };
131             } elsif ( $driver eq 'SQLite' ) {
132 0           return DBI::SQL_BLOB();
133             }
134 0           return;
135             }
136              
137             sub _server_unixitime {
138 0     0     my $self = shift;
139 0 0         return time() if $self->{driver} eq 'SQLite';
140 0           $self->{dbh}->selectrow_array($self->{unixtime_query});
141             }
142              
143             sub _grab_job {
144 0     0     my ($self, $callback, $opt) = @_;
145              
146 0           my $job;
147             try {
148 0     0     $self->{_errstr} = undef;
149 0           local $self->{dbh}->{RaiseError} = 1;
150 0           local $self->{dbh}->{PrintError} = 0;
151              
152 0           my $time = $self->_server_unixitime;
153 0           my $sth = $callback->($time);
154              
155 0           while (my $row = $sth->fetchrow_hashref) {
156 0           $job = $self->_grab_a_job($row, $time, $opt);
157 0 0         last if $job;
158             }
159              
160 0           $sth->finish;
161             } catch {
162 0     0     $self->{_errstr} = "can't grab job from job queue database: $_";
163 0           };
164              
165 0           $job;
166              
167             }
168              
169             sub _grab_a_job {
170 0     0     my ($self, $row, $time) = @_;
171              
172 0           my $sth = $self->{dbh}->prepare_cached($self->{grab_query});
173 0           $sth->execute(
174             ($time + ($self->{functions}->{$row->{func}}->{grab_for})),
175             $row->{id},
176             $row->{grabbed_until}
177             );
178 0           my $grabbed = $sth->rows;
179 0           $sth->finish;
180 0 0         $grabbed ? Jonk::Job->new($self => $row) : undef;
181             }
182              
183             sub lookup_job {
184 0     0 1   my ($self, $job_id) = @_;
185              
186             $self->_grab_job(
187             sub {
188 0     0     my $time = shift;
189 0           my $sth = $self->{dbh}->prepare_cached($self->{lookup_job_query});
190 0           $sth->execute($job_id, $time, $time);
191 0           $sth;
192             }
193 0           );
194             }
195              
196             sub find_job {
197 0     0 1   my ($self, $opts) = @_;
198              
199 0 0         unless ($self->{has_func}) {
200 0           Carp::croak('missin find_job functions.');
201             }
202              
203             $self->_grab_job(
204             sub {
205 0     0     my $time = shift;
206 0           my $sth = $self->{dbh}->prepare_cached($self->{find_job_query});
207 0           $sth->execute($time, $time);
208 0           $sth;
209             }
210 0           );
211             }
212              
213             sub _delete {
214 0     0     my ($self, $job_id) = @_;
215              
216             try {
217 0     0     my $sth = $self->{dbh}->prepare_cached($self->{delete_query});
218 0           $sth->execute($job_id);
219 0           $sth->finish;
220 0           return $sth->rows;
221             } catch {
222 0     0     $self->{_errstr} = "can't dequeue job from job queue database: $_";
223 0           return;
224 0           };
225             }
226              
227             sub _failed {
228 0     0     my ($self, $job_id, $opt) = @_;
229              
230 0 0         my $retry_delay = $self->_server_unixitime + (defined($opt->{retry_delay}) ? $opt->{retry_delay} : 60);
231 0 0         my $priority = (defined($opt->{priority}) ? $opt->{priority} : 0);
232              
233             try {
234 0     0     my $sth = $self->{dbh}->prepare_cached($self->{failed_query});
235 0           $sth->execute($retry_delay, $priority, $job_id);
236 0           $sth->finish;
237 0           return $sth->rows;
238             } catch {
239 0     0     warn 'ababaaaba';
240 0           $self->{_errstr} = "can't update job from job queue database: $_";
241 0           return;
242 0           };
243             }
244              
245             1;
246              
247             __END__