File Coverage

blib/lib/DBIx/BatchChunker.pm
Criterion Covered Total %
statement 349 372 93.8
branch 156 192 81.2
condition 52 79 65.8
subroutine 45 47 95.7
pod 3 4 75.0
total 605 694 87.1


line stmt bran cond sub pod time code
1             package DBIx::BatchChunker;
2              
3             our $AUTHORITY = 'cpan:GSG';
4             # ABSTRACT: Run large database changes safely
5 8     8   1763058 use version;
  8         15186  
  8         56  
6             our $VERSION = 'v1.0.2'; # VERSION
7              
8 8     8   8876 use Moo;
  8         76845  
  8         58  
9 8     8   24342 use MooX::StrictConstructor;
  8         99535  
  8         64  
10              
11 8     8   291895 use CLDR::Number;
  8         745548  
  8         504  
12              
13 8     8   8475 use Types::Standard qw( Str Bool Undef ArrayRef HashRef CodeRef InstanceOf Tuple Maybe Optional slurpy );
  8         986855  
  8         112  
14 8     8   43977 use Types::Numbers qw( NumRange UnsignedInt PerlSafeInt PositiveInt PositiveOrZeroNum );
  8         3128351  
  8         140  
15 8     8   33088 use Type::Utils;
  8         81496  
  8         130  
16              
17 8     8   16451 use List::Util 1.33 (qw( min max sum any first )); # has any/all/etc.
  8         225  
  8         918  
18 8     8   57 use Math::BigInt upgrade => 'Math::BigFloat';
  8         18  
  8         166  
19 8     8   734 use Math::BigFloat;
  8         20  
  8         102  
20 8     8   935 use POSIX qw( ceil );
  8         17  
  8         97  
21 8     8   675 use Scalar::Util qw( blessed weaken );
  8         15  
  8         455  
22 8     8   7016 use Term::ProgressBar 2.14; # with silent option
  8         763144  
  8         485  
23 8     8   859 use Time::HiRes qw( time sleep );
  8         1722  
  8         83  
24              
25 8     8   8905 use DBIx::BatchChunker::LoopState;
  8         48  
  8         535  
26              
27             # Don't export the above, but don't conflict with StrictConstructor, either
28 8     8   82 use namespace::clean -except => [qw< new meta >];
  8         15  
  8         113  
29              
30             # This is now an unused, dummy variable
31             our $DB_MAX_ID = ~0;
32              
33             #pod =encoding utf8
34             #pod
35             #pod =head1 SYNOPSIS
36             #pod
37             #pod use DBIx::BatchChunker;
38             #pod
39             #pod my $account_rs = $schema->resultset('Account')->search({
40             #pod account_type => 'deprecated',
41             #pod });
42             #pod
43             #pod my %params = (
44             #pod chunk_size => 5000,
45             #pod target_time => 5,
46             #pod
47             #pod rs => $account_rs,
48             #pod id_name => 'account_id',
49             #pod
50             #pod coderef => sub { $_[1]->delete },
51             #pod sleep => 1,
52             #pod verbose => 1,
53             #pod
54             #pod progress_name => 'Deleting deprecated accounts',
55             #pod process_past_max => 1,
56             #pod );
57             #pod
58             #pod # EITHER:
59             #pod # 1) Automatically construct and execute the changes:
60             #pod
61             #pod DBIx::BatchChunker->construct_and_execute(%params);
62             #pod
63             #pod # OR
64             #pod # 2) Manually construct and execute the changes:
65             #pod
66             #pod my $batch_chunker = DBIx::BatchChunker->new(%params);
67             #pod
68             #pod $batch_chunker->calculate_ranges;
69             #pod $batch_chunker->execute;
70             #pod
71             #pod =head1 DESCRIPTION
72             #pod
73             #pod This utility class is for running a large batch of DB changes in a manner that doesn't
74             #pod cause huge locks, outages, and missed transactions. It's highly flexible to allow for
75             #pod many different kinds of change operations, and dynamically adjusts chunks to its
76             #pod workload.
77             #pod
78             #pod It works by splitting up DB operations into smaller chunks within a loop. These chunks
79             #pod are transactionalized, either naturally as single-operation bulk work or by the loop
80             #pod itself. The full range is calculated beforehand to get the right start/end points.
81             #pod A L will be created to let the deployer know the
82             #pod processing status.
83             #pod
84             #pod There are two ways to use this class: call the automatic constructor and executor
85             #pod (L) or manually construct the object and call its methods. See
86             #pod L for examples of both.
87             #pod
88             #pod B You should not rely on this class to magically fix any and all locking
89             #pod problems the DB might experience just because it's being used. Thorough testing and
90             #pod best practices are still required.
91             #pod
92             #pod =head2 Processing Modes
93             #pod
94             #pod This class has several different modes of operation, depending on what was passed to
95             #pod the constructor:
96             #pod
97             #pod =head3 DBIC Processing
98             #pod
99             #pod If both L and L are passed, a chunk ResultSet is built from the base
100             #pod ResultSet, to add in a C clause, and the new ResultSet is passed into the
101             #pod coderef. The coderef should run some sort of active ResultSet operation from there.
102             #pod
103             #pod An L should be provided, but if it is missing it will be looked up based on
104             #pod the primary key of the ResultSource.
105             #pod
106             #pod If L is also enabled, then each chunk is wrapped in a transaction and the
107             #pod coderef is called for each row in the chunk. In this case, the coderef is passed a
108             #pod Result object instead of the chunk ResultSet.
109             #pod
110             #pod Note that whether L is enabled or not, the coderef execution is encapsulated
111             #pod in DBIC's retry logic, so any failures will re-connect and retry the coderef. Because of
112             #pod this, any changes you make within the coderef should be idempotent, or should at least be
113             #pod able to skip over any already-processed rows.
114             #pod
115             #pod =head3 Active DBI Processing
116             #pod
117             #pod If an L (DBI statement handle args) is passed without a L, the statement
118             #pod handle is merely executed on each iteration with the start and end IDs. It is assumed
119             #pod that the SQL for the statement handle contains exactly two placeholders for a C
120             #pod clause. For example:
121             #pod
122             #pod my $update_stmt = q{
123             #pod UPDATE
124             #pod accounts a
125             #pod JOIN account_updates au USING (account_id)
126             #pod SET
127             #pod a.time_stamp = au.time_stamp
128             #pod WHERE
129             #pod a.account_id BETWEEN ? AND ? AND
130             #pod a.time_stamp != au.time_stamp
131             #pod });
132             #pod
133             #pod The C clause should, of course, match the IDs being used in the loop.
134             #pod
135             #pod The statement is ran with L for retry protection. Therefore, the
136             #pod statement should also be idempotent.
137             #pod
138             #pod =head3 Query DBI Processing
139             #pod
140             #pod If both a L and a L are passed, the statement handle is prepared and
141             #pod executed. Like the L mode, the SQL for the statement should
142             #pod contain exactly two placeholders for a C clause. Then the C<$sth> is passed to
143             #pod the coderef. It's up to the coderef to extract data from the executed statement handle,
144             #pod and do something with it.
145             #pod
146             #pod If C is enabled, each chunk is wrapped in a transaction and the coderef is
147             #pod called for each row in the chunk. In this case, the coderef is passed a hashref of the
148             #pod row instead of the executed C<$sth>, with lowercase alias names used as keys.
149             #pod
150             #pod Note that in both cases, the coderef execution is encapsulated in a L
151             #pod call to either C or C (using L), so any failures will
152             #pod re-connect and retry the coderef. Because of this, any changes you make within the
153             #pod coderef should be idempotent, or should at least be able to skip over any
154             #pod already-processed rows.
155             #pod
156             #pod =head3 DIY Processing
157             #pod
158             #pod If a L is passed but neither a C nor a C are passed, then the
159             #pod multiplier loop does not touch the database. The coderef is merely passed the start and
160             #pod end IDs for each chunk. It is expected that the coderef will run through all database
161             #pod operations using those start and end points.
162             #pod
163             #pod It's still valid to include L, L, and/or L in the
164             #pod constructor to enable features like L or
165             #pod L.
166             #pod
167             #pod If you're not going to include any min/max statements for L, you will
168             #pod need to set L and L yourself, either in the constructor or before the
169             #pod L call. Using L is also not an option in this case, as
170             #pod this tries to call L without a way to do so.
171             #pod
172             #pod =head3 TL;DR Version
173             #pod
174             #pod $stmt = Active DBI Processing
175             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
176             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
177             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
178             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
179             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
180             #pod
181             #pod =head1 ATTRIBUTES
182             #pod
183             #pod See the L section for more in-depth descriptions of these attributes and their
184             #pod usage.
185             #pod
186             #pod =head2 DBIC Processing Attributes
187             #pod
188             #pod =head3 rs
189             #pod
190             #pod A L. This is used by all methods as the base ResultSet onto which
191             #pod the DB changes will be applied. Required for DBIC processing.
192             #pod
193             #pod =cut
194              
195             has rs => (
196             is => 'ro',
197             isa => InstanceOf['DBIx::Class::ResultSet'],
198             required => 0,
199             );
200              
201             #pod =head3 rsc
202             #pod
203             #pod A L. This is only used to override L for min/max
204             #pod calculations. Optional.
205             #pod
206             #pod =cut
207              
208             has rsc => (
209             is => 'ro',
210             isa => InstanceOf['DBIx::Class::ResultSetColumn'],
211             required => 0,
212             );
213              
214             #pod =head3 count_rs
215             #pod
216             #pod A L, only used to override L for row counting calculations.
217             #pod For 99.9% of cases, you do not need to set this. Though, it could be used for the rare
218             #pod case where the original Resulset would run into indexing problems with its row counting
219             #pod statement and needs something broader to compensate.
220             #pod
221             #pod B Do not set this unless you know what you're doing. Having a different C
222             #pod ResultSet from the base ResultSet means that the row counts to size up the chunk workload
223             #pod will be different from the workload itself. If the row counts are too high, you may end
224             #pod up with workloads that are too quick and L may compensate
225             #pod with an overly large chunk size, anyway. If the row counts are too low, you risk having
226             #pod a oversized chunk that gets processed and locks rows for too long, and chunk resizing may
227             #pod even skip blocks that it thinks have no rows to process.
228             #pod
229             #pod =cut
230              
231             has count_rs => (
232             is => 'ro',
233             isa => InstanceOf['DBIx::Class::ResultSet'],
234             required => 0,
235             );
236              
237             #pod =head3 dbic_retry_opts
238             #pod
239             #pod A hashref of DBIC retry options. These options control how retry protection works within
240             #pod DBIC. So far, there are two supported options:
241             #pod
242             #pod max_attempts = Number of times to retry
243             #pod retry_handler = Coderef that returns true to continue to retry or false to re-throw
244             #pod the last exception
245             #pod
246             #pod The default is to let the DBIC storage engine handle its own protection, which will retry
247             #pod once if the DB connection was disconnected. If you specify any options, even a blank
248             #pod hashref, BatchChunker will fill in a default C of 10, and an always-true
249             #pod C. This is similar to L's defaults.
250             #pod
251             #pod Under the hood, these are options that are passed to the as-yet-undocumented
252             #pod L. The C has access to the same
253             #pod BlockRunner object (passed as its only argument) and its methods/accessors, such as C,
254             #pod C, and C.
255             #pod
256             #pod =cut
257              
258             has dbic_retry_opts => (
259             is => 'ro',
260             isa => HashRef,
261             required => 0,
262             predicate => '_has_dbic_retry_opts',
263             );
264              
265             sub _dbic_block_runner {
266 854     854   6449 my ($self, $method, $coderef) = @_;
267              
268 854         3500 my $storage = $self->dbic_storage;
269              
270             # Block running disabled
271 854 100       4818 unless ($self->_has_dbic_retry_opts) {
272 682 100       3458 return $storage->txn_do($coderef) if $method eq 'txn';
273 604         6983 return $storage->dbh_do($coderef);
274             }
275              
276             # A very light wrapper around BlockRunner. No need to load BlockRunner, since DBIC
277             # loads it in before us if we're using this method.
278             DBIx::Class::Storage::BlockRunner->new(
279             # in case they are not defined with a custom dbic_retry_opts
280             max_attempts => 10,
281 106     106   122496 retry_handler => sub { 1 },
282              
283             # never overrides the important ones below
284 172 100       1009 %{ $self->dbic_retry_opts },
  172         8654  
285              
286             storage => $storage,
287             wrap_txn => ($method eq 'txn' ? 1 : 0),
288             )->run($coderef);
289             }
290              
291             #pod =head2 DBI Processing Attributes
292             #pod
293             #pod =head3 dbi_connector
294             #pod
295             #pod A L object. Instead of L statement handles, this is the
296             #pod recommended way for BatchChunker to interface with the DBI, as it handles retries on
297             #pod failures. The connection mode used is whatever default is set within the object.
298             #pod
299             #pod Required for DBI Processing, unless L is specified.
300             #pod
301             #pod =cut
302              
303             has dbi_connector => (
304             is => 'ro',
305             isa => InstanceOf['DBIx::Connector::Retry'],
306             required => 0,
307             );
308              
309             #pod =head3 dbic_storage
310             #pod
311             #pod A DBIC storage object, as an alternative for L. There may be times when
312             #pod you want to run plain DBI statements, but are still using DBIC. In these cases, you
313             #pod don't have to create a L object to run those statements.
314             #pod
315             #pod This uses a BlockRunner object for retry protection, so the options in
316             #pod L would apply here.
317             #pod
318             #pod Required for DBI Processing, unless L is specified.
319             #pod
320             #pod =cut
321              
322             has dbic_storage => (
323             is => 'ro',
324             isa => InstanceOf['DBIx::Class::Storage::DBI'],
325             required => 0,
326             );
327              
328             #pod =head3 min_stmt
329             #pod
330             #pod =head3 max_stmt
331             #pod
332             #pod SQL statement strings or an arrayref of parameters for L.
333             #pod
334             #pod When executed, these statements should each return a single value, either the minimum or
335             #pod maximum ID that will be affected by the DB changes. These are used by
336             #pod L. Required if using either type of DBI Processing.
337             #pod
338             #pod =cut
339              
340             my $SQLStringOrSTHArgs_type = Type::Utils::declare(
341             name => 'SQLStringOrSTHArgs',
342             # Allow an SQL string, an optional hashref/undef, and any number of strings/undefs
343             parent => Tuple->parameterize(Str, Optional[Maybe[HashRef]], slurpy ArrayRef[Maybe[Str]]),
344             coercion => sub { $_ = [ $_ ] if Str->check($_); $_ },
345             message => sub { 'Must be either an SQL string or an arrayref of parameters for $sth creation (SQL + hashref/undef + binds)' },
346             );
347              
348             has min_stmt => (
349             is => 'ro',
350             isa => $SQLStringOrSTHArgs_type,
351             required => 0,
352             coerce => 1,
353             );
354              
355             has max_stmt => (
356             is => 'ro',
357             isa => $SQLStringOrSTHArgs_type,
358             required => 0,
359             coerce => 1,
360             );
361              
362             #pod =head3 stmt
363             #pod
364             #pod A SQL statement string or an arrayref of parameters for L + binds.
365             #pod
366             #pod If using L (no coderef), this is a L statement
367             #pod (usually DML like C). If using L (with
368             #pod coderef), this is a passive DQL (C
369             #pod
370             #pod In either case, the statement should contain C placeholders, which will be
371             #pod executed with the start/end ID points. If there are already bind placeholders in the
372             #pod arrayref, then make sure the C bind points are last on the list.
373             #pod
374             #pod Required for DBI Processing.
375             #pod
376             #pod =cut
377              
378             has stmt => (
379             is => 'ro',
380             isa => $SQLStringOrSTHArgs_type,
381             required => 0,
382             coerce => 1,
383             );
384              
385             #pod =head3 count_stmt
386             #pod
387             #pod A C
388             #pod L.
389             #pod
390             #pod Like L, it should contain C placeholders. In fact, the SQL should look
391             #pod exactly like the L query, except with C instead of the column list.
392             #pod
393             #pod Used only for L. Optional, but recommended for
394             #pod L.
395             #pod
396             #pod =cut
397              
398             has count_stmt => (
399             is => 'ro',
400             isa => $SQLStringOrSTHArgs_type,
401             required => 0,
402             coerce => 1,
403             );
404              
405             #pod =head2 Progress Bar Attributes
406             #pod
407             #pod =head3 progress_bar
408             #pod
409             #pod The progress bar used for all methods. This can be specified right before the method
410             #pod call to override the default used for that method. Unlike most attributes, this one
411             #pod is read-write, so it can be switched on-the-fly.
412             #pod
413             #pod Don't forget to remove or switch to a different progress bar if you want to use a
414             #pod different one for another method:
415             #pod
416             #pod $batch_chunker->progress_bar( $calc_pb );
417             #pod $batch_chunker->calculate_ranges;
418             #pod $batch_chunker->progress_bar( $loop_pb );
419             #pod $batch_chunker->execute;
420             #pod
421             #pod All of this is optional. If the progress bar isn't specified, the method will create
422             #pod a default one. If the terminal isn't interactive, the default L will
423             #pod be set to C to naturally skip the output.
424             #pod
425             #pod =cut
426              
427             has progress_bar => (
428             is => 'rw',
429             isa => InstanceOf['Term::ProgressBar'],
430             );
431              
432             #pod =head3 progress_name
433             #pod
434             #pod A string used by L to assist in creating a progress bar. Ignored if
435             #pod L is already specified.
436             #pod
437             #pod This is the preferred way of customizing the progress bar without having to create one
438             #pod from scratch.
439             #pod
440             #pod =cut
441              
442             has progress_name => (
443             is => 'rw',
444             isa => Str,
445             required => 0,
446             lazy => 1,
447             default => sub {
448             my $rs = shift->rs;
449             'Processing'.(defined $rs ? ' '.$rs->result_source->name : '');
450             },
451             );
452              
453             #pod =head3 cldr
454             #pod
455             #pod A L object. English speakers that use a typical C<1,234.56> format would
456             #pod probably want to leave it at the default. Otherwise, you should provide your own.
457             #pod
458             #pod =cut
459              
460             has cldr => (
461             is => 'rw',
462             isa => InstanceOf['CLDR::Number'],
463             required => 0,
464             lazy => 1,
465             default => sub { CLDR::Number->new(locale => 'en') },
466             );
467              
468             #pod =head3 verbose
469             #pod
470             #pod Boolean. By default, this is on, which displays timing stats on each chunk, as well as
471             #pod total numbers. This is still subject to non-interactivity checks from L.
472             #pod
473             #pod (This was previously defaulted to off, and called C, prior to v1.0.0.)
474             #pod
475             #pod =for Pod::Coverage debug
476             #pod
477             #pod =cut
478              
479             has verbose => (
480             is => 'rw',
481             isa => Bool,
482             required => 0,
483             default => 1,
484             );
485              
486             # Backwards-compatibility
487             *debug = \&verbose;
488              
489             #pod =head2 Common Attributes
490             #pod
491             #pod =head3 id_name
492             #pod
493             #pod The column name used as the iterator in the processing loops. This should be a primary
494             #pod key or integer-based (indexed) key, tied to the L.
495             #pod
496             #pod Optional. Used mainly in DBIC processing. If not specified, it will look up
497             #pod the first primary key column from L and use that.
498             #pod
499             #pod This can still be specified for other processing modes to use in progress bars.
500             #pod
501             #pod =cut
502              
503             has id_name => (
504             is => 'rw',
505             isa => Str,
506             required => 0,
507             trigger => \&_fix_id_name,
508             );
509              
510             sub _fix_id_name {
511 68     68   9258 my ($self, $id_name) = @_;
512 68 100 100     8658 return if !$id_name || $id_name =~ /\./ || !defined $self->rs; # prevent an infinite trigger loop
      100        
513 14         86 $self->id_name( $self->rs->current_source_alias.".$id_name" );
514             }
515              
516             #pod =head3 coderef
517             #pod
518             #pod The coderef that will be called either on each chunk or each row, depending on how
519             #pod L is set. The first input is always the BatchChunker object. The rest
520             #pod vary depending on the processing mode:
521             #pod
522             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
523             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
524             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
525             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
526             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
527             #pod
528             #pod The loop does not monitor the return values from the coderef.
529             #pod
530             #pod Required for all processing modes except L.
531             #pod
532             #pod =cut
533              
534             has coderef => (
535             is => 'ro',
536             isa => CodeRef,
537             required => 0,
538             );
539              
540             #pod =head3 chunk_size
541             #pod
542             #pod The amount of rows to be processed in each loop.
543             #pod
544             #pod This figure should be sized to keep per-chunk processing time at around 5 seconds. If
545             #pod this is too large, rows may lock for too long. If it's too small, processing may be
546             #pod unnecessarily slow.
547             #pod
548             #pod Default is 1 row, which is only appropriate if L (on by default) is
549             #pod enabled. This will cause the processing to slowly ramp up to the target time as
550             #pod BatchChunker gathers more data.
551             #pod
552             #pod Otherwise, if you using static chunk sizes with C turned off, figure out
553             #pod the right chunk size with a few test runs and set it here.
554             #pod
555             #pod (This was previously defaulted to 1000 rows, prior to v1.0.0.)
556             #pod
557             #pod =cut
558              
559             has chunk_size => (
560             is => 'rw',
561             isa => PositiveInt,
562             required => 0,
563             default => 1,
564             );
565              
566             #pod =head3 target_time
567             #pod
568             #pod The target runtime (in seconds) that chunk processing should strive to achieve, not
569             #pod including L. If the chunk processing times are too high or too low, this will
570             #pod dynamically adjust L to try to match the target.
571             #pod
572             #pod BatchChunker will still use the initial C, and it will need at least one
573             #pod chunk processed, before it makes adjustments. If the starting chunk size is grossly
574             #pod inaccurate to the workload, you could end up with several chunks in the beginning causing
575             #pod long-lasting locks before the runtime targeting reduces them down to a reasonable size.
576             #pod
577             #pod (Chunk size reductions are prioritized before increases, so it should re-size as soon as
578             #pod it finds the problem. But, one bad chunk could be all it takes to cause an outage.)
579             #pod
580             #pod Default is 5 seconds. Set this to zero to turn off runtime targeting. (This was
581             #pod previously defaulted to off prior to v0.92, and set to 15 in v0.92.)
582             #pod
583             #pod =cut
584              
585             has target_time => (
586             is => 'ro',
587             isa => PositiveOrZeroNum,
588             required => 0,
589             default => 5,
590             );
591              
592             #pod =head3 sleep
593             #pod
594             #pod The number of seconds to sleep after each chunk. It uses L's version, so
595             #pod fractional numbers are allowed.
596             #pod
597             #pod Default is 0.5 seconds, which is fine for most operations. You can likely get away with
598             #pod zero for smaller operations, but test it out first. If processing is going to take up a
599             #pod lot of disk I/O, you may want to consider a higher setting. If the database server
600             #pod spends too much time on processing, the replicas may have a hard time keeping up with
601             #pod standard load.
602             #pod
603             #pod This will increase the overall processing time of the loop, so try to find a balance
604             #pod between the two.
605             #pod
606             #pod (This was previously defaulted to 0 seconds, prior to v1.0.0.)
607             #pod
608             #pod =cut
609              
610             has 'sleep' => (
611             is => 'ro',
612             isa => PositiveOrZeroNum,
613             required => 0,
614             default => 0.5,
615             );
616              
617             #pod =head3 max_runtime
618             #pod
619             #pod The number of seconds that the entire process is allowed to run. If you have a
620             #pod long-running I operation that you don't want to run for days, you can set
621             #pod this attribute, execute the operation, and run it again at a later date. The L
622             #pod will be set to the last-used ID, so the operation can be continued with another
623             #pod L call. Or you can use this to figure out if it finished or not.
624             #pod
625             #pod Turned off by default. If you use this, you should add in extra multipler operations to
626             #pod separate out the time math, like C<6 * 60 * 60> for 6 hours.
627             #pod
628             #pod =cut
629              
630             has max_runtime => (
631             is => 'ro',
632             isa => PositiveOrZeroNum,
633             required => 0,
634             default => 0,
635             );
636              
637             #pod =head3 process_past_max
638             #pod
639             #pod Boolean that controls whether to check past the L during the loop. If the loop
640             #pod hits the end point, it will run another maximum ID check in the DB, and adjust C
641             #pod accordingly. If it somehow cannot run a DB check (no L or L available,
642             #pod for example), the last chunk will just be one at the end of C<< max_id + chunk_size >>.
643             #pod
644             #pod This is useful if the entire table is expected to be processed, and you don't want to
645             #pod miss any new rows that come up between L and the end of the loop.
646             #pod
647             #pod Turned off by default.
648             #pod
649             #pod =cut
650              
651             has process_past_max => (
652             is => 'ro',
653             isa => Bool,
654             required => 0,
655             default => 0,
656             );
657              
658             #pod =head3 single_rows
659             #pod
660             #pod Boolean that controls whether single rows are passed to the L or the chunk's
661             #pod ResultSets/statement handle is passed.
662             #pod
663             #pod Since running single-row operations in a DB is painfully slow (compared to bulk
664             #pod operations), this also controls whether the entire set of coderefs are encapsulated into
665             #pod a DB transaction. Transactionalizing the entire chunk brings the speed, and atomicity,
666             #pod back to what a bulk operation would be. (Bulk operations are still faster, but you can't
667             #pod do anything you want in a single DML statement.)
668             #pod
669             #pod Used only by L and L.
670             #pod
671             #pod =cut
672              
673             has single_rows => (
674             is => 'ro',
675             isa => Bool,
676             required => 0,
677             default => 0,
678             );
679              
680             #pod =head3 min_chunk_percent
681             #pod
682             #pod The minimum row count, as a percentage of L. This value is actually
683             #pod expressed in decimal form, i.e.: between 0 and 1.
684             #pod
685             #pod This value will be used to determine when to process, skip, or expand a block, based on
686             #pod a count query. The default is C<0.5> or 50%, which means that it will try to expand the
687             #pod block to a larger size if the row count is less than 50% of the chunk size. Zero-sized
688             #pod blocks will be skipped entirely.
689             #pod
690             #pod This "chunk resizing" is useful for large regions of the table that have been deleted, or
691             #pod when the incrementing ID has large gaps in it for other reasons. Wasting time on
692             #pod numerical gaps that span millions can slow down the processing considerably, especially
693             #pod if L is enabled.
694             #pod
695             #pod If this needs to be disabled, set this to 0. The maximum chunk percentage does not have
696             #pod a setting and is hard-coded at C<< 100% + min_chunk_percent >>.
697             #pod
698             #pod If DBIC processing isn't used, L is also required to enable chunk resizing.
699             #pod
700             #pod =cut
701              
702             has min_chunk_percent => (
703             is => 'ro',
704             isa => Type::Utils::declare(
705             name => 'PositiveZeroToOneNum',
706             parent => NumRange->parameterize(0, 1),
707             message => sub { 'Must be a number between 0 and 1' },
708             ),
709             required => 0,
710             default => 0.5,
711             );
712              
713             #pod =head3 min_id
714             #pod
715             #pod =head3 max_id
716             #pod
717             #pod Used by L to figure out the main start and end points. Calculated by
718             #pod L.
719             #pod
720             #pod Manually setting this is not recommended, as each database is different and the
721             #pod information may have changed between the DB change development and deployment. Instead,
722             #pod use L to fill in these values right before running the loop.
723             #pod
724             #pod When the operation is finished, C will be set to the last processed ID, just in
725             #pod case it was stopped early and needs to be restarted (eg: L is set).
726             #pod Alternately, you can run L again to confirm from the database.
727             #pod
728             #pod =cut
729              
730             has min_id => (
731             is => 'rw',
732             isa => UnsignedInt,
733             );
734              
735             has max_id => (
736             is => 'rw',
737             isa => UnsignedInt,
738             );
739              
740             # Big number handling
741             has _use_bignums => (
742             is => 'rw',
743             isa => Bool,
744             default => 0,
745             trigger => \&_upgrade_attrs_to_bigint,
746             );
747              
748             my @BIGNUM_BC_ATTRS = (qw< chunk_size min_id max_id >);
749             my @BIGNUM_LS_ATTRS = (qw< start end prev_end multiplier_range multiplier_step chunk_size chunk_count >);
750              
751             sub _check_bignums {
752 839     839   3342 my ($self) = shift;
753 839 100       36885 return 1 if $self->_use_bignums; # already checked these
754              
755             # Auto-set _use_bignums if we detect that we need it
756 658         7399 my $set_bignums = 0;
757              
758             # If other values are passed, check those, too
759 658         3057 foreach my $val (@_) {
760 621 100       4972 next unless defined $val;
761 454 50 33     4249 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
762             }
763              
764             # Check BatchChunker attributes
765 658         21770 foreach my $attr (@BIGNUM_BC_ATTRS) {
766 1974         89610 my $val = $self->$attr();
767 1974 100       43972 next unless defined $val;
768 1844 100 66     14626 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
769             }
770              
771             # Check LoopState attributes
772 658 100       30594 if (my $ls = $self->loop_state) {
773 563         5465 foreach my $attr (@BIGNUM_LS_ATTRS) {
774 3941         187781 my $val = $ls->$attr();
775 3941 100       44144 next unless defined $val;
776 3321 100 66     11761 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
777             }
778             }
779              
780 658 100       2861 $self->_use_bignums(1) if $set_bignums;
781 658         3059 return $set_bignums;
782             }
783              
784             sub _upgrade_attrs_to_bigint {
785 5     5   185 my ($self, $is_on) = @_;
786 5 50       20 return unless $is_on;
787              
788             # Fix BatchChunker attributes
789 5         24 foreach my $attr (@BIGNUM_BC_ATTRS) {
790 15         13212 my $val = $self->$attr();
791 15 100       124 next unless defined $val; # nothing to upgrade
792 11 100       46 next if blessed $val; # already upgraded
793 9         131 $self->$attr( Math::BigInt->new($val) );
794             }
795              
796             # Fix LoopState attributes
797 5         4090 my $ls = $self->loop_state;
798 5 100       112 return unless $ls;
799 3         11 foreach my $attr (@BIGNUM_LS_ATTRS) {
800 21         29549 my $val = $ls->$attr();
801 21 100       233 next unless defined $val; # nothing to upgrade
802 15 50       42 next if blessed $val; # already upgraded
803 15         62 $ls->$attr( Math::BigInt->new($val) );
804             }
805             }
806              
807             #pod =head3 loop_state
808             #pod
809             #pod A L object designed to hold variables during the
810             #pod processing loop. The object will be cleared out after use. Most of the complexity is
811             #pod needed for chunk resizing.
812             #pod
813             #pod =cut
814              
815             has loop_state => (
816             is => 'rw',
817             isa => InstanceOf['DBIx::BatchChunker::LoopState'],
818             required => 0,
819             init_arg => undef,
820             clearer => 'clear_loop_state',
821             );
822              
823             # Backwards-compatibility
824             *_loop_state = \&loop_state;
825              
826             #pod =for Pod::Coverage BUILDARGS BUILD
827             #pod
828             #pod =cut
829              
830             around BUILDARGS => sub {
831             my $next = shift;
832             my $class = shift;
833              
834             my %args = @_ == 1 ? %{ $_[0] } : @_;
835              
836             # debug -> verbose
837             $args{verbose} //= delete $args{debug} if exists $args{debug};
838              
839             # Auto-building of rsc and id_name can be a weird dependency dance, so it's better to
840             # handle it here.
841             my ($rsc, $rs, $id_name) = @args{qw< rsc rs id_name >};
842             if (defined $rsc && !$id_name) {
843             $args{id_name} = $rsc->{_as};
844             }
845             elsif (!defined $rsc && $id_name && defined $rs) {
846             $args{rsc} = $rs->get_column( $args{id_name} );
847             }
848             elsif (!defined $rsc && !$id_name && defined $rs) {
849             $args{id_name} = ($rs->result_source->primary_columns)[0];
850             $args{rsc} = $rs->get_column( $args{id_name} );
851             }
852             $rsc = $args{rsc};
853              
854             # Default count_rs is just $rs
855             $args{count_rs} //= $rs if defined $rs;
856              
857             # Auto-add dbic_storage, if available
858             if (!defined $args{dbic_storage} && (defined $rs || defined $rsc)) {
859             $args{dbic_storage} = defined $rs ? $rs->result_source->storage : $rsc->_resultset->result_source->storage;
860             }
861              
862             # Find something to use as a dbi_connector, if it doesn't already exist
863             my @old_attrs = qw< sth min_sth max_sth count_sth >;
864             my @new_attrs = map { my $k = $_; $k =~ s/sth$/stmt/; $k } @old_attrs;
865             my $example_key = first { $args{$_} } @old_attrs;
866             if ($example_key && !defined $args{dbi_connector}) {
867             warn join "\n",
868             'The sth/*_sth options are now considered legacy usage in DBIx::BatchChunker. Because there is no',
869             'way to re-acquire the password, any attempt to reconnect will fail. Please use dbi_connector and',
870             'stmt/*_stmt instead for reconnection support.',
871             ''
872             ;
873              
874             # NOTE: There was a way to monkey-patch _connect to use $dbh->clone, but I've considered it
875             # too intrusive of a solution to use. Better to demand that the user switch to the new
876             # attributes, but have something that still works in most cases.
877              
878             # Attempt to build some sort of Connector object
879             require DBIx::Connector::Retry;
880             my $dbh = $args{$example_key}->{Database};
881              
882             my $conn = DBIx::Connector::Retry->new(
883             connect_info => [
884             join(':', 'dbi', $dbh->{Driver}{Name}, $dbh->{Name}),
885             $dbh->{Username},
886             '', # XXX: Can't acquire the password
887             # Sane %attr defaults on the off-chance that it actually re-connects
888             { AutoCommit => 1, RaiseError => 1 },
889             ],
890              
891             # Do not disconnect on DESTROY. The $dbh might still be used post-run.
892             disconnect_on_destroy => 0,
893             );
894              
895             # Pretend $conn->_connect was called and store our pre-existing $dbh
896             $conn->{_pid} = $$;
897             $conn->{_tid} = threads->tid if $INC{'threads.pm'};
898             $conn->{_dbh} = $dbh;
899             $conn->driver;
900              
901             $args{dbi_connector} = $conn;
902             }
903              
904             # Handle legacy options for sth/*_sth
905             foreach my $old_attr (grep { $args{$_} } @old_attrs) {
906             my $new_attr = $old_attr;
907             $new_attr =~ s/sth$/stmt/;
908              
909             my $sth = delete $args{$old_attr};
910             $args{$new_attr} ||= [ $sth->{Statement} ];
911             }
912              
913             # Now check to make sure dbi_connector is available for DBI processing
914             die 'DBI processing requires a dbi_connector or dbic_storage attribute!' if (
915             !(defined $args{dbi_connector} || defined $args{dbic_storage}) &&
916             (defined first { $args{$_} } @new_attrs)
917             );
918              
919             # Other sanity checks
920             die 'Range calculations require one of these attr sets: rsc, rs, or dbi_connector|dbic_storage + min_stmt + max_stmt' unless (
921             defined $args{rsc} ||
922             (defined $args{min_stmt} && defined $args{max_stmt}) ||
923             (!defined $args{dbi_connector} && !defined $args{dbic_storage} && defined $args{coderef}) # DIY mode is exempt
924             );
925              
926             die 'Block execution requires one of these attr sets: dbi_connector|dbic_storage + stmt, rs + coderef, or coderef' unless (
927             $args{stmt} ||
928             (defined $args{rs} && $args{coderef}) ||
929             $args{coderef}
930             );
931              
932             if (exists $args{target_time} && $args{target_time} == 0 && !$args{chunk_size}) {
933             warn join "\n",
934             'Dynamic chunk resizing is turned off and the chunk_size is still set to its default of 1.',
935             'This is probably not desirable, and you should find an appropriate static chunk size for',
936             'your workload.',
937             ''
938             ;
939             }
940              
941             $class->$next( %args );
942             };
943              
944             sub BUILD {
945 35     35 0 12583 my $self = shift;
946             # Make sure id_name gets fixed at the right time
947 35         1160 $self->_fix_id_name( $self->id_name );
948 35         148 $self->_check_bignums;
949             }
950              
951             #pod =head1 CONSTRUCTORS
952             #pod
953             #pod See L for information on what can be passed into these constructors.
954             #pod
955             #pod =head2 new
956             #pod
957             #pod my $batch_chunker = DBIx::BatchChunker->new(...);
958             #pod
959             #pod A standard object constructor. If you use this constructor, you will need to
960             #pod manually call L and L to execute the DB changes.
961             #pod
962             #pod =head2 construct_and_execute
963             #pod
964             #pod my $batch_chunker = DBIx::BatchChunker->construct_and_execute(...);
965             #pod
966             #pod Constructs a DBIx::BatchChunker object and automatically calls
967             #pod L and L on it. Anything passed to this method will be passed
968             #pod through to the constructor.
969             #pod
970             #pod Returns the constructed object, post-execution. This is typically only useful if you want
971             #pod to inspect the attributes after the process has finished. Otherwise, it's safe to just
972             #pod ignore the return and throw away the object immediately.
973             #pod
974             #pod =cut
975              
976             sub construct_and_execute {
977 2     2 1 10228 my $class = shift;
978 2         70 my $db_change = $class->new(@_);
979              
980 1         6 $db_change->calculate_ranges;
981 1         6 $db_change->execute;
982              
983 1         92 return $db_change;
984             }
985              
986             #pod =head1 METHODS
987             #pod
988             #pod =head2 calculate_ranges
989             #pod
990             #pod my $batch_chunker = DBIx::BatchChunker->new(
991             #pod rsc => $account_rsc, # a ResultSetColumn
992             #pod ### OR ###
993             #pod rs => $account_rs, # a ResultSet
994             #pod id_name => 'account_id', # can be looked up if not provided
995             #pod ### OR ###
996             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
997             #pod min_stmt => $min_stmt, # a SQL statement or DBI $sth args
998             #pod max_stmt => $max_stmt, # ditto
999             #pod
1000             #pod ### Optional but recommended ###
1001             #pod id_name => 'account_id', # will also be added into the progress bar title
1002             #pod chunk_size => 20_000, # default is 1000
1003             #pod
1004             #pod ### Optional ###
1005             #pod progress_bar => $progress, # defaults to a 2-count 'Calculating ranges' bar
1006             #pod
1007             #pod # ...other attributes for execute...
1008             #pod );
1009             #pod
1010             #pod my $has_data_to_process = $batch_chunker->calculate_ranges;
1011             #pod
1012             #pod Given a L, L, or L statement
1013             #pod argument set, this method calculates the min/max IDs of those objects. It fills in the
1014             #pod L and L attributes, based on the ID data, and then returns 1.
1015             #pod
1016             #pod If either of the min/max statements don't return any ID data, this method will return 0.
1017             #pod
1018             #pod =cut
1019              
1020             sub calculate_ranges {
1021 31     31 1 14446 my $self = shift;
1022              
1023 31   100     891 my $column_name = $self->id_name || '';
1024 31         420 $column_name =~ s/^\w+\.//;
1025              
1026 31   33     791 my $progress = $self->progress_bar || Term::ProgressBar->new({
1027             name => 'Calculating ranges'.($column_name ? " for $column_name" : ''),
1028             count => 2,
1029             ETA => 'linear',
1030             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
1031             });
1032              
1033             # Actually run the statements
1034 31         111924 my ($min_id, $max_id);
1035 31 100       404 if ($self->rsc) {
    100          
1036             $self->_dbic_block_runner( run => sub {
1037             # In case the sub is retried
1038 17     17   6287 $progress->update(0);
1039              
1040 17         1242 $min_id = $self->rsc->min;
1041 17         84599 $progress->update(1);
1042              
1043 17         1297 $max_id = $self->rsc->max;
1044 17         57414 $progress->update(2);
1045 17         175 });
1046             }
1047             elsif ($self->dbic_storage) {
1048             $self->_dbic_block_runner( run => sub {
1049 5     5   1625 my $dbh = $self->dbic_storage->dbh;
1050              
1051             # In case the sub is retried
1052 5         6140 $progress->update(0);
1053              
1054 5         310 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  5         112  
1055 5         1198 $progress->update(1);
1056              
1057 5         262 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  5         54  
1058 5         1541 $progress->update(2);
1059 5         47 });
1060             }
1061             else {
1062             $self->dbi_connector->run(sub {
1063 9     9   2574 my $dbh = $_;
1064              
1065             # In case the sub is retried
1066 9         57 $progress->update(0);
1067              
1068 9         583 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  9         196  
1069 9         2753 $progress->update(1);
1070              
1071 9         761 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  9         137  
1072 9         2814 $progress->update(2);
1073 9         474 });
1074             }
1075              
1076             # Set the ranges and return
1077 31 50 33     4981 return 0 unless defined $min_id && defined $max_id;
1078              
1079             # This would be the primary spot where we notice we need to upgrade, so check the values before
1080             # we attempt to mangle them.
1081 31 100       179 if ($self->_check_bignums($min_id, $max_id)) {
1082 2         56 $min_id = Math::BigFloat->new($min_id)->as_int;
1083 2         1628 $max_id = Math::BigFloat->new($max_id)->as_int;
1084             }
1085             else {
1086 29         85 $min_id = int $min_id;
1087 29         77 $max_id = int $max_id;
1088             }
1089              
1090 31         2243 $self->min_id($min_id);
1091 31         4590 $self->max_id($max_id);
1092              
1093 31         4073 return 1;
1094             }
1095              
1096             #pod =head2 execute
1097             #pod
1098             #pod my $batch_chunker = DBIx::BatchChunker->new(
1099             #pod # ...other attributes for calculate_ranges...
1100             #pod
1101             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1102             #pod stmt => $do_stmt, # INSERT/UPDATE/DELETE $stmt with BETWEEN placeholders
1103             #pod ### OR ###
1104             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1105             #pod stmt => $select_stmt, # SELECT $stmt with BETWEEN placeholders
1106             #pod count_stmt => $count_stmt, # SELECT COUNT $stmt to be used for min_chunk_percent; optional
1107             #pod coderef => $coderef, # called code that does the actual work
1108             #pod ### OR ###
1109             #pod rs => $account_rs, # base ResultSet, which gets filtered with -between later on
1110             #pod id_name => 'account_id', # can be looked up if not provided
1111             #pod coderef => $coderef, # called code that does the actual work
1112             #pod ### OR ###
1113             #pod coderef => $coderef, # DIY database work; just pass the $start/$end IDs
1114             #pod
1115             #pod ### Optional but recommended ###
1116             #pod sleep => 0.25, # number of seconds to sleep each chunk; defaults to 0
1117             #pod process_past_max => 1, # use this if processing the whole table
1118             #pod single_rows => 1, # does $coderef get a single $row or the whole $chunk_rs / $stmt
1119             #pod min_chunk_percent => 0.25, # minimum row count of chunk size percentage; defaults to 0.5 (or 50%)
1120             #pod target_time => 5, # target runtime for dynamic chunk size scaling; default is 5 seconds
1121             #pod max_runtime => 12 * 60 * 60, # stop processing after 12 hours
1122             #pod
1123             #pod progress_name => 'Updating Accounts', # easier than creating your own progress_bar
1124             #pod
1125             #pod ### Optional ###
1126             #pod progress_bar => $progress, # defaults to "Processing $source_name" bar
1127             #pod verbose => 1, # displays timing stats on each chunk
1128             #pod );
1129             #pod
1130             #pod $batch_chunker->execute if $batch_chunker->calculate_ranges;
1131             #pod
1132             #pod Applies the configured DB changes in chunks. Runs through the loop, processing a
1133             #pod statement handle, ResultSet, and/or coderef as it goes. Each loop iteration processes a
1134             #pod chunk of work, determined by L.
1135             #pod
1136             #pod The L method should be run first to fill in L and L.
1137             #pod If either of these are missing, the function will assume L couldn't
1138             #pod find them and warn about it.
1139             #pod
1140             #pod More details can be found in the L and L sections.
1141             #pod
1142             #pod =cut
1143              
1144             sub execute {
1145 33     33 1 40451 my $self = shift;
1146 33         150 $self->_check_bignums;
1147              
1148 33         121 my $count;
1149 33 100 66     674 if (defined $self->min_id && defined $self->max_id) {
1150 32         1614 $count = $self->max_id - $self->min_id + 1;
1151             }
1152              
1153             # Fire up the progress bar
1154 33   33     2605 my $progress = $self->progress_bar || Term::ProgressBar->new({
1155             name => $self->progress_name,
1156             count => $count || 1,
1157             ETA => 'linear',
1158             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
1159             });
1160              
1161 33 100       112590 unless ($count) {
1162 1         7 $progress->message('No chunks; nothing to process...');
1163 1         65 return;
1164             }
1165              
1166 32 100       1020 if ($self->verbose) {
1167             $progress->message(
1168             sprintf "(%s total chunks; %s total rows)",
1169 1         56 map { $self->cldr->decimal_formatter->format($_) } ( ceil($count / $self->chunk_size), $count)
  2         64780  
1170             );
1171             }
1172              
1173             # Loop state setup
1174 32         3901 $self->clear_loop_state;
1175 32         5045 my $ls = $self->loop_state( DBIx::BatchChunker::LoopState->new({
1176             batch_chunker => $self,
1177             progress_bar => $progress,
1178             }) );
1179              
1180             # Da loop
1181 32   100     3583 while (
      100        
1182             !defined $ls->prev_end || # first chunk
1183             defined $ls->start || # still in the middle of chunk resizing
1184             $ls->prev_end < $self->max_id # processed chunk, but still more to go
1185             ) {
1186 735         248901 $ls->multiplier_range($ls->multiplier_range + $ls->multiplier_step);
1187              
1188             # this could be already set, if chunk resizing
1189 735 0       405952 $ls->start(
    50          
    100          
1190             defined $ls->prev_end ? $ls->prev_end + 1 :
1191             defined $ls->min_id ? $ls->min_id :
1192             1
1193             ) unless defined $ls->start;
1194              
1195 735         270643 $ls->end(
1196             min(
1197             $ls->start + ceil($ls->multiplier_range * $ls->chunk_size) - 1, # ceil, because multiplier_* could be fractional
1198             $self->max_id, # ensure we never exceed max_id
1199             )
1200             );
1201 735         708591 $ls->chunk_count(undef);
1202              
1203             # Early loop exit because of maximum run time
1204 735 100 100     43287 if ($self->max_runtime && time - $ls->total_timer > $self->max_runtime) {
1205 1         10 $progress->message('Ran past the maximum run time');
1206 1         67 last;
1207             }
1208              
1209 734 50       4339 next unless $self->_process_past_max_checker;
1210              
1211             # The actual DB processing
1212 734 100       6915 next unless $self->_process_block;
1213              
1214             # Record the time quickly
1215 551         75846 $ls->prev_runtime(time - $ls->chunk_timer);
1216              
1217             # Give the DB a little bit of breathing room
1218 551 50       241662333 sleep $self->sleep if $self->sleep;
1219              
1220 551         22055 $self->_print_chunk_status('processed');
1221 551         14818 $self->_increment_progress;
1222 551         70817 $self->_runtime_checker;
1223              
1224             # End-of-loop activities (skipped by early next)
1225 551         6055 $ls->_reset_chunk_state;
1226             }
1227              
1228             # Re-set min_id and clear the loop state
1229 32 50       17720 $self->min_id( $ls->prev_end ) if defined $ls->prev_end;
1230 32         17120 $self->clear_loop_state;
1231              
1232             # Keep the finished time from the progress bar, in case there are other loops or output
1233 32 50       1228 unless ($progress->silent) {
1234 0         0 $progress->update( $progress->target );
1235 0         0 print "\n";
1236             }
1237             }
1238              
1239             #pod =head1 PRIVATE METHODS
1240             #pod
1241             #pod =head2 _process_block
1242             #pod
1243             #pod Runs the DB work and passes it to the coderef. Its return value determines whether the
1244             #pod block should be processed or not.
1245             #pod
1246             #pod =cut
1247              
1248             sub _process_block {
1249 734     734   2592 my ($self) = @_;
1250              
1251 734         17529 my $ls = $self->loop_state;
1252 734         8623 my $conn = $self->dbi_connector;
1253 734         5377 my $coderef = $self->coderef;
1254 734         3123 my $rs = $self->rs;
1255 734   66     7734 my $count_rs = $self->count_rs // $rs;
1256              
1257             # Figure out if the row count is worth the work
1258 734         1646 my $chunk_rs;
1259 734         2582 my $count_stmt = $self->count_stmt;
1260 734         1797 my $chunk_count;
1261 734 100 100     5538 if ($count_stmt && defined $self->dbic_storage) {
    100          
    100          
1262             $self->_dbic_block_runner( run => sub {
1263 97 100   97   31269 $chunk_count = $self->dbic_storage->dbh->selectrow_array(
1264             @$count_stmt,
1265             (@$count_stmt == 1 ? undef : ()),
1266             $ls->start, $ls->end,
1267             );
1268 97         1062 });
1269             }
1270             elsif ($count_stmt) {
1271             $chunk_count = $conn->run(sub {
1272 110 100   110   36317 $_->selectrow_array(
1273             @$count_stmt,
1274             (@$count_stmt == 1 ? undef : ()),
1275             $ls->start, $ls->end,
1276             );
1277 110         4382 });
1278             }
1279             elsif (defined $count_rs) {
1280 360         9676 $chunk_rs = $count_rs->search({
1281             $self->id_name => { -between => [$ls->start, $ls->end] },
1282             });
1283              
1284             $self->_dbic_block_runner( run => sub {
1285 360     360   191825 $chunk_count = $chunk_rs->count;
1286 360         313176 });
1287             }
1288              
1289 734 100       13437160 $chunk_count = Math::BigInt->new($chunk_count) if $self->_check_bignums($chunk_count);
1290 734         56043 $ls->chunk_count($chunk_count);
1291              
1292 734 100       291470 return unless $self->_chunk_count_checker;
1293              
1294             # NOTE: Try to minimize the amount of closures by using $self as much as possible
1295             # inside coderefs.
1296              
1297             # Do the work
1298 551 100 66     4998 if (my $stmt = $self->stmt) {
    100          
1299             ### Statement handle
1300 218 100       1547 my @prepare_args = @$stmt > 2 ? @$stmt[0..1] : @$stmt;
1301 218 100       6004 my @execute_args = (
1302             (@$stmt > 2 ? @$stmt[2..$#$stmt] : ()),
1303             $ls->start, $ls->end,
1304             );
1305              
1306 218 100 66     9498 if ($self->single_rows && $coderef) {
1307             # Transactional work
1308 69 100       356 if ($self->dbic_storage) {
1309             $self->_dbic_block_runner( txn => sub {
1310 50     50   30309 $self->loop_state->_mark_chunk_timer; # reset timer on retries
1311              
1312 50         2080 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1313 50         25783 $sth->execute(@execute_args);
1314              
1315 50         2740 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  73         62392  
1316 45         425 });
1317             }
1318             else {
1319             $conn->txn(sub {
1320 87     87   56319 $self->loop_state->_mark_chunk_timer; # reset timer on retries
1321              
1322 87         4775 my $sth = $_->prepare(@prepare_args);
1323 87         13693 $sth->execute(@execute_args);
1324              
1325 29         6928 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  22         197  
1326 24         5954 });
1327             }
1328             }
1329             else {
1330             # Bulk work (or DML)
1331 149 100       810 if ($self->dbic_storage) {
1332             $self->_dbic_block_runner( run => sub {
1333 101     101   67273 $self->loop_state->_mark_chunk_timer; # reset timer on retries
1334              
1335 101         13369 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1336 101         95131 $sth->execute(@execute_args);
1337              
1338 53 100       2140 $self->coderef->($self, $sth) if $self->coderef;
1339 53         660 });
1340             }
1341             else {
1342             $conn->run(sub {
1343 144     144   90461 $self->loop_state->_mark_chunk_timer; # reset timer on retries
1344              
1345 144         9805 my $sth = $_->prepare(@prepare_args);
1346 144         41011 $sth->execute(@execute_args);
1347              
1348 96 100       13716 $self->coderef->($self, $sth) if $self->coderef;
1349 96         4047 });
1350             }
1351             }
1352             }
1353             elsif (defined $rs && $coderef) {
1354             ### ResultSet with coderef
1355              
1356 271 100       1459 if ($self->single_rows) {
1357             # Transactional work
1358             $self->_dbic_block_runner( txn => sub {
1359             # reset timer/$rs on retries
1360 86     86   44422 $self->loop_state->_mark_chunk_timer;
1361 86         9306 $chunk_rs->reset;
1362              
1363 86         33017 while (my $row = $chunk_rs->next) { $self->coderef->($self, $row) }
  76         195038  
1364 81         798 });
1365             }
1366             else {
1367             # Bulk work
1368             $self->_dbic_block_runner( run => sub {
1369             # reset timer/$rs on retries
1370 238     238   100515 $self->loop_state->_mark_chunk_timer;
1371 238         10778 $chunk_rs->reset;
1372              
1373 238         72516 $self->coderef->($self, $chunk_rs);
1374 190         1928 });
1375             }
1376             }
1377             else {
1378             ### Something a bit more free-form
1379              
1380 62         1776 $self->$coderef($ls->start, $ls->end);
1381             }
1382              
1383 551         20428244 return 1;
1384             }
1385              
1386             #pod =head2 _process_past_max_checker
1387             #pod
1388             #pod Checks to make sure the current endpoint is actually the end, by checking the database.
1389             #pod Its return value determines whether the block should be processed or not.
1390             #pod
1391             #pod See L.
1392             #pod
1393             #pod =cut
1394              
1395             sub _process_past_max_checker {
1396 734     734   2502 my ($self) = @_;
1397 734         19429 my $ls = $self->loop_state;
1398 734         24077 my $progress = $ls->progress_bar;
1399              
1400 734 100       9668 return 1 unless $self->process_past_max;
1401 84 100       1595 return 1 unless $ls->end >= $self->max_id;
1402              
1403             # No checks for DIY, if they didn't include a max_stmt
1404 6 50 33     292 unless (defined $self->rsc || $self->max_stmt) {
1405             # There's no way to size this, so add one more chunk
1406 0         0 $ls->end($self->max_id + $ls->chunk_size);
1407 0         0 return 1;
1408             }
1409              
1410             # Run another MAX check
1411 6 50       144 $progress->message('Reached end; re-checking max ID') if $self->verbose;
1412 6         50 my $new_max_id;
1413 6 50       64 if (defined( my $rsc = $self->rsc )) {
    0          
1414             $self->_dbic_block_runner( run => sub {
1415 6     6   3234 $new_max_id = $rsc->max;
1416 6         57 });
1417             }
1418             elsif ($self->dbic_storage) {
1419             $self->_dbic_block_runner( run => sub {
1420 0     0   0 ($new_max_id) = $self->dbic_storage->dbh->selectrow_array(@{ $self->max_stmt });
  0         0  
1421 0         0 });
1422             }
1423             else {
1424             ($new_max_id) = $self->dbi_connector->run(sub {
1425 0     0   0 $_->selectrow_array(@{ $self->max_stmt });
  0         0  
1426 0         0 });
1427             }
1428 6         21738 $ls->_mark_chunk_timer; # the above query shouldn't impact runtimes
1429              
1430             # Convert $new_max_id if necessary
1431 6 100       241 $new_max_id = Math::BigInt->new($new_max_id) if $self->_check_bignums($new_max_id);
1432              
1433 6 50 33     433 if (!$new_max_id || $new_max_id eq '0E0') {
    100          
    50          
1434             # No max: No affected rows to change
1435 0 0       0 $progress->message('No max ID found; nothing left to process...') if $self->verbose;
1436 0         0 $ls->end($self->max_id);
1437              
1438 0         0 $ls->prev_check('no max');
1439 0         0 return 0;
1440             }
1441             elsif ($new_max_id > $self->max_id) {
1442             # New max ID
1443 2 50       281 $progress->message( sprintf 'New max ID set from %s to %s', $self->max_id, $new_max_id ) if $self->verbose;
1444 2         48 $self->max_id($new_max_id);
1445 2         1529 $progress->target( $new_max_id - $self->min_id + 1 );
1446 2         11760 $progress->update( $progress->last_update );
1447             }
1448             elsif ($new_max_id == $self->max_id) {
1449             # Same max ID
1450 4 50       469 $progress->message( sprintf 'Found max ID %s; same as end', $new_max_id ) if $self->verbose;
1451             }
1452             else {
1453             # Max too low
1454 0 0       0 $progress->message( sprintf 'Found max ID %s; ignoring...', $new_max_id ) if $self->verbose;
1455             }
1456              
1457             # Run another boundary check with the new max_id value
1458 6         796 $ls->end( min($ls->end, $self->max_id) );
1459              
1460 6         2945 return 1;
1461             }
1462              
1463             #pod =head2 _chunk_count_checker
1464             #pod
1465             #pod Checks the chunk count to make sure it's properly sized. If not, it will try to shrink
1466             #pod or expand the current chunk (in C increments) as necessary. Its return value
1467             #pod determines whether the block should be processed or not.
1468             #pod
1469             #pod See L.
1470             #pod
1471             #pod This is not to be confused with the L, which adjusts C
1472             #pod after processing, based on previous run times.
1473             #pod
1474             #pod =cut
1475              
1476             sub _chunk_count_checker {
1477 734     734   2144 my ($self) = @_;
1478 734         33412 my $ls = $self->loop_state;
1479 734         25650 my $progress = $ls->progress_bar;
1480              
1481             # Chunk sizing is essentially disabled, so bounce out of here
1482 734 100 100     16794 if ($self->min_chunk_percent <= 0 || !defined $ls->chunk_count) {
1483 450         15389 $ls->prev_check('disabled');
1484 450         31932 return 1;
1485             }
1486              
1487 284         7772 my $chunk_percent = $ls->chunk_count / $ls->chunk_size;
1488 284         439958 my $count_check_time = time - $ls->chunk_timer; # should only include the COUNT time at this point
1489 284         8795 $ls->checked_count( $ls->checked_count + 1 );
1490              
1491 284 100 66     24401 if ($ls->chunk_count == 0 && $self->min_chunk_percent > 0) {
    100 100        
    100          
    100          
    50          
    100          
    100          
1492             # No rows: Skip the block entirely, and accelerate the stepping
1493 31         7469 $self->_print_chunk_status('skipped');
1494              
1495 31         611 $self->_increment_progress;
1496              
1497 31         6863 my $step = $ls->multiplier_step;
1498 31         418 $ls->_reset_chunk_state;
1499 31         29832 $ls->multiplier_step( $step * 2 );
1500              
1501 31         41624 $ls->prev_check('skipped rows');
1502 31         1972 return 0;
1503             }
1504             elsif ($ls->end - $ls->start <= 0) {
1505             # Down to a single ID: We _have_ to process it
1506 27         27484 $ls->prev_check('at a single ID');
1507              
1508             # Complain, because this can be dangerous with a wild enough Row:ID ratio
1509 27 100       2466 if ($ls->chunk_count > 1) {
1510 9         1947 $progress->message('WARNING: Processing a single ID with many rows attached because resizing cannot proceed any further.');
1511 9         563 $progress->message('Consider flipping the relationship so that IDs and row counts are 1:1.');
1512             }
1513              
1514 27         5896 return 1;
1515             }
1516             elsif ($chunk_percent > 1 + $self->min_chunk_percent) {
1517             # Too many rows: Backtrack to the previous range and try to bisect
1518 29         39439 $self->_print_chunk_status('shrunk');
1519 29         578 $ls->_mark_chunk_timer;
1520 29         1197 $ls->_decrease_multiplier;
1521 29         54065 $ls->prev_check('too many rows');
1522 29         3147 return 0;
1523             }
1524             elsif ($self->target_time && $count_check_time > $self->target_time * 1.05) {
1525             # COUNT statement too slow: Backtrack to the previous range and try to bisect
1526              
1527             # This is a rare failure, so print a warning
1528 57         145509 my $integer = $self->cldr->decimal_formatter;
1529 57         173073 my $decimal = $self->cldr->decimal_formatter(
1530             minimum_fraction_digits => 2,
1531             maximum_fraction_digits => 2,
1532             );
1533 57         102592 $progress->message( sprintf(
1534             'WARNING: COUNT statement was too slow; took %5s sec to return %s rows.',
1535             $decimal->format($count_check_time),
1536             $integer->format( $ls->chunk_count )
1537             ) );
1538              
1539 57         279250 $self->_print_chunk_status('shrunk');
1540 57         1109 $ls->_mark_chunk_timer;
1541 57         3976 $ls->_decrease_multiplier;
1542 57         175068 $ls->prev_check('COUNT too slow');
1543 57         4698 return 0;
1544             }
1545              
1546             # The above four are more important than skipping the count checks. Better to
1547             # have too few rows than too many. The single ID check prevents infinite loops
1548             # from bisecting, though.
1549              
1550             elsif ($ls->checked_count > 10) {
1551             # Checked too many times: Just process it
1552 0         0 $ls->prev_check('too many checks');
1553 0         0 return 1;
1554             }
1555             elsif ($ls->end >= $self->max_id) {
1556             # At the end: Just process it
1557 11         8374 $ls->prev_check('at max_id');
1558 11         568 return 1;
1559             }
1560             elsif ($chunk_percent < $self->min_chunk_percent) {
1561             # Too few rows: Keep the start ID and accelerate towards a better endpoint
1562 66         22367 $self->_print_chunk_status('expanded');
1563 66         1374 $ls->_mark_chunk_timer;
1564 66         2400 $ls->_increase_multiplier;
1565 66         4963 $ls->prev_check('too few rows');
1566 66         3850 return 0;
1567             }
1568              
1569 63         54323 $ls->prev_check('nothing wrong');
1570 63         3965 return 1;
1571             }
1572              
1573             #pod =head2 _runtime_checker
1574             #pod
1575             #pod Stores the previously processed chunk's runtime, and then adjusts C as
1576             #pod necessary.
1577             #pod
1578             #pod See L.
1579             #pod
1580             #pod =cut
1581              
1582             sub _runtime_checker {
1583 551     551   2446 my ($self) = @_;
1584 551         15384 my $ls = $self->loop_state;
1585 551 100       7716 return unless $self->target_time;
1586 144 50 33     4082 return unless $ls->chunk_size && $ls->prev_runtime; # prevent DIV/0
1587              
1588 144         11697 my $timings = $ls->last_timings;
1589              
1590 144   66     5604 my $new_timing = {
1591             runtime => $ls->prev_runtime,
1592             chunk_count => $ls->chunk_count || $ls->chunk_size,
1593             };
1594 144         12936 $new_timing->{chunk_per} = $new_timing->{chunk_count} / $ls->chunk_size;
1595              
1596             # Rowtime: a measure of how much of the chunk_size actually impacted the runtime
1597 144         231039 $new_timing->{rowtime} = $new_timing->{runtime} / $new_timing->{chunk_per};
1598              
1599             # Store the last five processing times
1600 144         115415 push @$timings, $new_timing;
1601 144 100       726 shift @$timings if @$timings > 5;
1602              
1603             # Figure out the averages and adjustment factor
1604 144         894 my $ttl = scalar @$timings;
1605 144         571 my $avg_rowtime = sum(map { $_->{rowtime} } @$timings) / $ttl;
  583         2735  
1606 144         187367 my $adjust_factor = $self->target_time / $avg_rowtime;
1607              
1608 144         100767 my $new_target_chunk_size = $ls->chunk_size;
1609 144         1391 my $adjective;
1610 144 100       1018 if ($adjust_factor > 1.05) {
    50          
1611             # Too fast: Raise the chunk size
1612              
1613 54 100       29686 return unless $ttl >= 5; # must have a full set of timings
1614 9 50   45   115 return if any { $_->{runtime} >= $self->target_time } @$timings; # must ALL have low runtimes
  45         203  
1615              
1616 9         79 $new_target_chunk_size *= min(2, $adjust_factor); # never more than double
1617 9         2274 $adjective = 'fast';
1618             }
1619             elsif ($adjust_factor < 0.95) {
1620             # Too slow: Lower the chunk size
1621              
1622 90 100       51158 return unless $ls->prev_runtime > $self->target_time; # last runtime must actually be too high
1623              
1624 69 50       2254 $new_target_chunk_size *=
1625             ($ls->prev_runtime < $self->target_time * 3) ?
1626             max(0.5, $adjust_factor) : # never less than half...
1627             $adjust_factor # ...unless the last runtime was waaaay off
1628             ;
1629 69 50       862 $new_target_chunk_size = 1 if $new_target_chunk_size < 1;
1630 69         204 $adjective = 'slow';
1631             }
1632              
1633 78         287 $new_target_chunk_size = int $new_target_chunk_size;
1634 78 100       2238 return if $new_target_chunk_size == $ls->chunk_size; # either nothing changed or it's too miniscule
1635 10 50       5561 return if $new_target_chunk_size < 1;
1636              
1637             # Print out a processing line, if enabled
1638 10 50       255 if ($self->verbose) {
1639             # CLDR number formatters
1640 0         0 my $integer = $self->cldr->decimal_formatter;
1641 0         0 my $percent = $self->cldr->percent_formatter;
1642              
1643 0         0 $ls->{progress_bar}->message( sprintf(
1644             "Processing too %s, avg %4s of target time, adjusting chunk size from %s to %s",
1645             $adjective,
1646             $percent->format( 1 / $adjust_factor ),
1647             $integer->format( $ls->chunk_size ),
1648             $integer->format( $new_target_chunk_size ),
1649             ) );
1650             }
1651              
1652             # Change it!
1653 10         464 $ls->chunk_size($new_target_chunk_size);
1654 10 100       739 $ls->_reset_last_timings if $adjective eq 'fast'; # never snowball too quickly
1655 10         626 return 1;
1656             }
1657              
1658             #pod =head2 _increment_progress
1659             #pod
1660             #pod Increments the progress bar.
1661             #pod
1662             #pod =cut
1663              
1664             sub _increment_progress {
1665 582     582   2088 my ($self) = @_;
1666 582         19679 my $ls = $self->loop_state;
1667 582         23441 my $progress = $ls->progress_bar;
1668              
1669 582         21136 my $so_far = $ls->end - $self->min_id + 1;
1670 582 50       123157 $progress->target($so_far+1) if $ls->end > $self->max_id;
1671 582         35778 $progress->update($so_far);
1672             }
1673              
1674             #pod =head2 _print_chunk_status
1675             #pod
1676             #pod Prints out a standard chunk status line, if L is enabled. What it prints is
1677             #pod generally uniform, but it depends on the processing action. Most of the data is
1678             #pod pulled from L.
1679             #pod
1680             #pod =cut
1681              
1682             sub _print_chunk_status {
1683 734     734   3712 my ($self, $action) = @_;
1684 734 100       48252 return unless $self->verbose;
1685              
1686 14         442 my $ls = $self->loop_state;
1687 14   50     165 my $sleep = $self->sleep || 0;
1688              
1689             # CLDR number formatters
1690 14         337 my $integer = $self->cldr->decimal_formatter;
1691 14         25843 my $percent = $self->cldr->percent_formatter;
1692 14         52080 my $decimal = $self->cldr->decimal_formatter(
1693             minimum_fraction_digits => 2,
1694             maximum_fraction_digits => 2,
1695             );
1696              
1697 14         18856 my $message;
1698 14 50 33     344 if ($ls->start < 1_000_000_000 && $ls->end < 1_000_000_000) {
1699 14         814 $message = sprintf(
1700             'IDs %6u to %6u %9s, %9s rows found',
1701             $ls->start, $ls->end, $action,
1702             $integer->format( $ls->chunk_count ),
1703             );
1704             }
1705             else {
1706 0         0 $message = sprintf(
1707             'IDs %s to %s %s, %s rows found',
1708             $ls->start, $ls->end, $action,
1709             $ls->chunk_count,
1710             );
1711             }
1712              
1713 14 100       12040 $message .= sprintf(
1714             ' (%4s of chunk size)',
1715             $percent->format( $ls->chunk_count / $ls->chunk_size ),
1716             ) if $ls->chunk_count;
1717              
1718 14 100       22907 if ($action eq 'processed') {
1719 5 50       144 $message .= $sleep ?
1720             sprintf(
1721             ', %5s+%s sec runtime+sleep',
1722             $decimal->format( $ls->prev_runtime ),
1723             $decimal->format( $sleep )
1724             ) :
1725             sprintf(
1726             ', %5s sec runtime',
1727             $decimal->format( $ls->prev_runtime ),
1728             )
1729             ;
1730             }
1731              
1732             # Reduce spacing if the numbers are too large
1733 14 50 33     13998 if ($ls->start > 1_000_000_000 || $ls->end > 1_000_000_000) {
1734 0         0 $message =~ s/\s+/ /g;
1735 0         0 $message =~ s/\(\s+/\(/g;
1736             }
1737              
1738 14         740 return $ls->progress_bar->message($message);
1739             }
1740              
1741             #pod =head1 CAVEATS
1742             #pod
1743             #pod =head2 Big Number Support
1744             #pod
1745             #pod If the module detects that the ID numbers are no longer safe for standard Perl NV
1746             #pod storage, it will automatically switch to using L and L for
1747             #pod big number support. If any blessed numbers are already being used to define the
1748             #pod attributes, this will also switch on the support.
1749             #pod
1750             #pod =head2 String-based IDs
1751             #pod
1752             #pod If you're working with C types or other string-based IDs to represent integers,
1753             #pod these may be subject to whatever string-based comparison rules your RDBMS uses when
1754             #pod calculating with C/C or using C. Row counting and chunk size scaling
1755             #pod will try to compensate, but will be mixing string-based comparisons from the RDBMS and
1756             #pod Perl-based integer math.
1757             #pod
1758             #pod Using the C function may help, but it may also cause critical indexes to be
1759             #pod ignored, especially if the function is used on the left-hand side against the column.
1760             #pod Strings with the exact same length may be safe from comparison weirdness, but YMMV.
1761             #pod
1762             #pod Non-integer inputs from ID columns, such as GUIDs or other alphanumeric strings, are not
1763             #pod currently supported. They would have to be converted to integers via SQL, and doing so
1764             #pod may run into a similar risk of having your RDBMS ignore indexes.
1765             #pod
1766             #pod =head1 SEE ALSO
1767             #pod
1768             #pod L, L, L
1769             #pod
1770             #pod =cut
1771              
1772             1;
1773              
1774             __END__