File Coverage

blib/lib/Schedule/LongSteps/Storage/DBIxClass.pm
Criterion Covered Total %
statement 15 27 55.5
branch 0 2 0.0
condition 0 2 0.0
subroutine 5 9 55.5
pod 1 1 100.0
total 21 41 51.2


line stmt bran cond sub pod time code
1             package Schedule::LongSteps::Storage::DBIxClass;
2             $Schedule::LongSteps::Storage::DBIxClass::VERSION = '0.022';
3 1     1   7 use Moose;
  1         2  
  1         6  
4             extends qw/Schedule::LongSteps::Storage/;
5              
6 1     1   6350 use DateTime;
  1         406891  
  1         49  
7 1     1   8 use Log::Any qw/$log/;
  1         1  
  1         10  
8 1     1   710 use Scope::Guard;
  1         368  
  1         38  
9 1     1   323 use Action::Retry;
  1         32898  
  1         625  
10              
11             has 'schema' => ( is => 'ro', isa => 'DBIx::Class::Schema', required => 1);
12             has 'resultset_name' => ( is => 'ro', isa => 'Str', required => 1);
13              
14             has 'limit_per_tick' => ( is => 'ro', isa => 'Int', default => 50 );
15              
16             sub _get_resultset{
17 0     0     my ($self) = @_;
18 0           return $self->schema()->resultset($self->resultset_name());
19             }
20              
21             around [ 'prepare_due_processes', 'create_process' ] => sub{
22             my ($orig, $self, @rest ) = @_;
23              
24             # Transfer the current autocommit nature of the DBH
25             # as a transation might have been created on this DBH outside
26             # of this schema. A transaction on DBI sets AutoCommit to false
27             # on the DBH. transaction_depth is just a boolean on the storage.
28              
29             # First restore transaction depth as it was.
30             my $pre_transaction_depth = $self->schema()->storage()->transaction_depth();
31             my $guard = Scope::Guard->new(
32             sub{
33             $log->trace("Restoring transaction_depth = $pre_transaction_depth");
34             $self->schema()->storage()->transaction_depth( $pre_transaction_depth );
35             });
36              
37             my $current_transaction_depth = $self->schema()->storage()->dbh()->{AutoCommit} ? 0 : 1;
38             $log->trace("Setting transaction_depth as NOT dbh AutoCommit = ".$current_transaction_depth);
39             $self->schema()->storage()->transaction_depth( $current_transaction_depth );
40             return $self->$orig( @rest );
41             };
42              
43              
44             =head1 NAME
45              
46             Schedule::LongSteps::Storage::DBIxClass - DBIx::Class based storage.
47              
48             =head1 SYNOPSIS
49              
50             First instantiate a storage with your L<DBIx::Class::Schema> and the name
51             of the resultset that represent the stored process:
52              
53             my $storage = Schedule::LongSteps::Storage::DBIxClass->new({
54             schema => $dbic_schema,
55             resultset_name => 'LongstepsProcess'
56             });
57              
58             Then build and use a L<Schedule::LongSteps> object:
59              
60             my $long_steps = Schedule::LongSteps->new({ storage => $storage });
61              
62             ...
63              
64             =head1 ATTRIBUTES
65              
66             =over
67              
68             =item schema
69              
70             You DBIx::Class::Schema. Mandatory.
71              
72             =item resultset_name
73              
74             The name of the resultset holding the processes in your Schema. See section 'RESULTSET REQUIREMENTS'. Mandatory.
75              
76             =item limit_per_tick
77              
78             The maximum number of processes that will actually run each time you
79             call $longsteps->run_due_processes(). Use that to control how long it takes to run
80             a single call to $longsteps->run_due_processes().
81              
82             Note that you can have an arbitrary number of processes all doing $longsteps->run_due_processes() AT THE SAME TIME.
83              
84             This will ensure that no process step is run more than one time.
85              
86             Default to 50.
87              
88             =back
89              
90             =head1 RESULTSET REQUIREMENTS
91              
92             The resultset to use with this storage MUST contain the following columns, constraints and indices:
93              
94             =over
95              
96             =item id
97              
98             A unique primary key auto incrementable identifier
99              
100             =item process_class
101              
102             A VARCHAR long enough to hold your L<Schedule::LongSteps::Process> class names. NOT NULL.
103              
104             =item what
105              
106             A VARCHAR long enough to hold the name of one of your steps. Can be NULL.
107              
108             =item status
109              
110             A VARCHAR(50) NOT NULL, defaults to 'pending'
111              
112             =item run_at
113              
114             A Datetime (or timestamp with timezone in PgSQL). Will hold a UTC Timezoned date of the next run. Default to NULL.
115              
116             Please index this so it is fast to select a range.
117              
118             =item run_id
119              
120             A CHAR or VARCHAR (at least 36). Default to NULL.
121              
122             Please index this so it is fast to select rows with a matching run_id
123              
124             =item state
125              
126             A Reasonably long TEXT field (or JSON field in supporting databases) capable of holding
127             a JSON dump of pure Perl data. NOT NULL.
128              
129             You HAVE to implement inflating and deflating yourself. See L<DBIx::Class::InflateColumn::Serializer::JSON>
130             or similar techniques.
131              
132             See t/fullblown.t for a full blown working example.
133              
134             =item error
135              
136             A reasonably long TEXT field capable of holding a full stack trace in case something goes wrong. Defaults to NULL.
137              
138             =back
139              
140             =cut
141              
142             =head2 prepare_due_processes
143              
144             See L<Schedule::LongSteps::Storage::DBIxClass>
145              
146             =cut
147              
148             sub prepare_due_processes{
149             my ($self) = @_;
150              
151             my $now = DateTime->now();
152             my $rs = $self->_get_resultset();
153             my $dtf = $self->schema()->storage()->datetime_parser();
154              
155             my $uuid = $self->uuid()->create_str();
156             $log->info("Creating batch ID $uuid");
157              
158              
159             # Note that we do not use the SELECT FOR UPDATE technique here.
160             # Instead this generates a single UPDATE statement like this one:
161             # UPDATE longsteps_process SET run_id = ?, status = ? WHERE ( id IN ( SELECT me.id FROM longsteps_process me WHERE ( ( run_at <= ? AND run_id IS NULL ) ) LIMIT ? ) )
162             my $stuff = sub{
163             $rs->search({
164             run_at => { '<=' => $dtf->format_datetime( $now ) },
165             run_id => undef,
166             }, {
167             rows => $self->limit_per_tick(),
168             } )
169             ->update({
170             run_id => $uuid,
171             status => 'running'
172             });
173             };
174             $stuff->();
175              
176             # And return them as individual results.
177             return $rs->search({
178             run_id => $uuid,
179             })->all();
180             }
181              
182             =head2 create_process
183              
184             See L<Schedule::LongSteps::Storage>
185              
186             This override adds retrying in case of deadlock detection.
187              
188             =cut
189              
190             sub create_process{
191             my ($self, $process_properties) = @_;
192             return $self->_retry_transaction(
193             sub{
194             return $self->_get_resultset()->create($process_properties);
195             });
196             }
197              
198             =head2 find_process
199              
200             See L<Schedule::LongSteps::Storage>
201              
202             =cut
203              
204             sub find_process{
205 0     0 1   my ($self, $process_id) = @_;
206 0           return $self->_get_resultset()->find({ id => $process_id });
207             }
208              
209             =head2 update_process
210              
211             Overrides L<Schedule::LongSteps::Storage#update_process> to add
212             some retrying in case of DB deadlock detection.
213              
214             =cut
215              
216             override 'update_process' => sub{
217             my ($self, $process, $properties) = @_;
218             return $self->_retry_transaction( sub{
219             $log->trace("Attempting to update process ".$process->id());
220             $process->update( $properties );
221             } );
222             };
223              
224             sub _retry_transaction{
225 0     0     my ($self, $code) = @_;
226              
227             my $retry = Action::Retry->new(
228             attempt_code => $code,
229             retry_if_code => sub{
230 0     0     my $exception = $_[0];
231             # The driver tells us to retry the transaction.
232             # For instance: https://dev.mysql.com/doc/refman/5.6/en/innodb-deadlocks-handling.html
233              
234             # Note that if this is false, then the Action::Retry code
235             # will set $@ to the last error and return whatever the code has returned (most
236             # probably undef in this case.
237             # This is managed by the error testing after the call to 'run'
238 0   0       return !! ( ( $exception || '' ) =~ m/try restarting transaction/ );
239             },
240 0           strategy => 'Fibonacci',
241             );
242 0           my $ret = $retry->run();
243 0 0         if( my $err = $@ ){
244 0           confess($err);
245             }
246 0           return $ret;
247             }
248              
249             __PACKAGE__->meta->make_immutable();