File Coverage

blib/lib/DBIx/TxnPool.pm
Criterion Covered Total %
statement 24 139 17.2
branch 0 44 0.0
condition 0 19 0.0
subroutine 8 38 21.0
pod 8 13 61.5
total 40 253 15.8


line stmt bran cond sub pod time code
1             package DBIx::TxnPool;
2              
3 1     1   21781 use strict;
  1         2  
  1         26  
4 1     1   5 use warnings;
  1         2  
  1         33  
5 1     1   5 use Exporter 5.57 qw( import );
  1         27  
  1         40  
6              
7 1     1   826 use Try::Tiny;
  1         1525  
  1         54  
8 1     1   829 use Signal::Mask;
  1         14101  
  1         31  
9 1     1   6 use Carp qw( confess );
  1         2  
  1         98  
10              
11             our $VERSION = 0.11;
12             our $BlockSignals = [ qw( TERM INT ) ];
13             our @EXPORT = qw( txn_item txn_post_item txn_commit txn_sort );
14              
15             # It's better to look for the "try restarting transaction" string
16             # because sometime may be happens other error: Lock wait timeout exceeded
17 1     1   5 use constant DEADLOCK_REGEXP => qr/try restarting transaction/o;
  1         1  
  1         1083  
18              
19             sub new {
20 0     0 0   my ( $class, %args ) = @_;
21              
22             confess( "The dbh should be defined" )
23 0 0         unless $args{dbh};
24              
25 0   0       $args{size} ||= 100;
26 0   0       $args{block_signals} ||= $BlockSignals;
27 0   0       $args{max_repeated_deadlocks} ||= 5;
28 0           $args{_amnt_nested_signals} = 0;
29 0           $args{_saved_signal_masks} = {};
30 0           $args{pool} = [];
31 0           $args{amount_deadlocks} = 0;
32              
33 0   0       bless \%args, ref $class || $class;
34             }
35              
36             sub DESTROY {
37 0     0     local $@;
38              
39             $_[0]->finish
40 0 0         unless ( $_[0]->{repeated_deadlocks} >= $_[0]->{max_repeated_deadlocks} );
41             }
42              
43             sub txn_item (&@) {
44 0     0 1   __PACKAGE__->new( %{ __make_chain( 'item_callback', @_ ) } );
  0            
45             }
46              
47             sub txn_post_item (&@) {
48 0     0 1   __make_chain( 'post_item_callback', @_ );
49             }
50              
51             sub txn_commit (&@) {
52 0     0 1   __make_chain( 'commit_callback', @_ );
53             }
54              
55             sub txn_sort (&@) {
56 0     0 1   my $ret = __make_chain( 'sort_callback', @_ );
57 0           $ret->{sort_callback_package} = caller;
58 0           $ret;
59             }
60              
61             sub __make_chain {
62 0     0     my $cb_name = shift;
63 0           my $cb_func = shift;
64 0           my $ret;
65              
66 0 0         ( $ret = ref $_[0] eq 'HASH' ? $_[0] : { @_ } )->{ $cb_name } = $cb_func;
67 0           $ret;
68             }
69              
70 0     0 1   sub dbh { $_[0]->{dbh} }
71              
72             sub add {
73 0     0 1   my ( $self, $data ) = @_;
74              
75 0           $self->{repeated_deadlocks} = 0;
76              
77             confess "assert: _amnt_nested_signals is not zero!"
78 0 0         if $self->{_amnt_nested_signals};
79              
80             try {
81 0     0     push @{ $self->{pool} }, $data;
  0            
82              
83 0 0         if ( ! $self->{sort_callback} ) {
84 0           $self->start_txn;
85             $self->_safe_signals( sub {
86 0           local $_ = $data;
87 0           $self->{item_callback}->( $self, $data );
88 0           } );
89             }
90             }
91             catch {
92 0     0     $self->_check_deadlock( $_ );
93 0           };
94              
95             $self->finish
96 0 0         if ( @{ $self->{pool} } >= $self->{size} );
  0            
97              
98             confess "assert: _amnt_nested_signals is not zero!"
99 0 0         if $self->{_amnt_nested_signals};
100             }
101              
102             sub _check_deadlock {
103 0     0     my ( $self, $error ) = @_;
104              
105 0           my $dbi_error = $DBI::err;
106              
107 0           $self->rollback_txn;
108              
109             # For example codes: https://dev.mysql.com/doc/refman/5.5/en/error-messages-server.html
110             # MySQL codes 1213 & 1205 are reasons to redo transaction again
111             # For other SQL engines i don't know codes - patches are walcome! ;-) [https://github.com/Perlover/DBIx-TxnPool]
112 0 0 0       if ( defined $dbi_error && ( $dbi_error == 1213 || $dbi_error == 1205 ) ) {
      0        
113 0           $self->{amount_deadlocks}++;
114 0 0         if ( $self->{repeated_deadlocks} >= $self->{max_repeated_deadlocks} ) {
115 0           $self->{pool} = []; # If DESTROY calls finish() there will not problems
116 0           confess( "limit ($self->{repeated_deadlocks}) of deadlock resolvings" )
117             }
118             else {
119 0           $self->play_pool;
120             }
121             } else {
122             # Fatal error - may be bad SQL statement - finish
123 0           $self->{pool} = []; # If DESTROY calls finish() there will not problems
124 0           confess( "error in item callback ($error)" );
125             }
126             }
127              
128             sub play_pool {
129 0     0 0   my $self = shift;
130              
131 0           $self->start_txn;
132              
133             $self->_safe_signals( sub {
134 0     0     select( undef, undef, undef, 0.5 * ++$self->{repeated_deadlocks} );
135 0           } );
136              
137             try {
138 0     0     foreach my $data ( @{ $self->{pool} } ) {
  0            
139             $self->_safe_signals( sub {
140 0           local $_ = $data;
141 0           $self->{item_callback}->( $self, $data );
142 0           } );
143             }
144             }
145             catch {
146 0     0     $self->_check_deadlock( $_ );
147 0           };
148             }
149              
150             sub finish {
151 0     0 1   my $self = shift;
152              
153             confess "assert: _amnt_nested_signals is not zero!"
154 0 0         if $self->{_amnt_nested_signals};
155              
156 0 0 0       if ( $self->{sort_callback} && @{ $self->{pool} } ) {
  0            
157 1     1   6 no strict 'refs';
  1         2  
  1         767  
158 0           local *a = *{"$self->{sort_callback_package}\::a"};
  0            
159 0           local *b = *{"$self->{sort_callback_package}\::b"};
  0            
160              
161 0           $self->{pool} = [ sort { $self->{sort_callback}->() } ( @{ $self->{pool} } ) ];
  0            
  0            
162              
163 0           $self->play_pool;
164             }
165              
166             try {
167 0     0     $self->{repeated_deadlocks} = 0;
168 0           $self->commit_txn;
169             }
170             catch {
171 0     0     $self->_check_deadlock( $_ );
172 0           };
173              
174 0 0         if ( exists $self->{post_item_callback} ) {
175 0           foreach my $data ( @{ $self->{pool} } ) {
  0            
176 0           local $_ = $data;
177 0           $self->{post_item_callback}->( $self, $data );
178             }
179             }
180              
181 0           $self->{pool} = [];
182              
183             confess "assert: _amnt_nested_signals is not zero!"
184 0 0         if $self->{_amnt_nested_signals};
185             }
186              
187             sub start_txn {
188 0     0 0   my $self = shift;
189              
190 0 0         if ( ! $self->{in_txn} ) {
191             $self->_safe_signals( sub {
192 0 0   0     $self->{dbh}->begin_work or confess 'DBI error: ' . $self->{dbh}->errstr;
193 0           $self->{in_txn} = 1;
194 0           } );
195             }
196             }
197              
198             sub rollback_txn {
199 0     0 0   my $self = shift;
200              
201 0 0         if ( $self->{in_txn} ) {
202             $self->_safe_signals( sub {
203 0 0   0     $self->{dbh}->rollback or confess 'DBI error: ' . $self->{dbh}->errstr;
204 0           $self->{in_txn} = undef;
205 0           } );
206             }
207             }
208              
209             sub commit_txn {
210 0     0 0   my $self = shift;
211              
212 0 0         if ( $self->{in_txn} ) {
213             $self->_safe_signals( sub {
214 0 0   0     $self->{dbh}->commit or confess 'DBI error: ' . $self->{dbh}->errstr;
215 0           $self->{in_txn} = undef;
216 0           } );
217             $self->{commit_callback}->( $self )
218 0 0         if exists $self->{commit_callback};
219             }
220             }
221              
222 0     0 1   sub amount_deadlocks { $_[0]->{amount_deadlocks} }
223              
224             sub _safe_signals {
225 0     0     my ( $self, $code ) = @_;
226              
227 0 0         if ( ! $self->{_amnt_nested_signals}++ ) {
228 0           for ( @{ $self->{block_signals} } ) {
  0            
229 0           $self->{_saved_signal_masks}{ $_ } = $Signal::Mask{ $_ };
230 0           $Signal::Mask{ $_ } = 1;
231             }
232             }
233             try {
234 0     0     $code->();
235             }
236             catch {
237 0     0     die $_;
238             }
239             finally {
240 0 0   0     if ( ! --$self->{_amnt_nested_signals} ) {
241 0           for ( @{ $self->{block_signals} } ) {
  0            
242 0           $Signal::Mask{ $_ } = delete $self->{_saved_signal_masks}{ $_ };
243             }
244             }
245 0           };
246             }
247              
248             1;
249              
250             __END__