File Coverage

blib/lib/AnyEvent/DBI/MySQL.pm
Criterion Covered Total %
statement 58 189 30.6
branch 1 52 1.9
condition 1 40 2.5
subroutine 19 32 59.3
pod 1 1 100.0
total 80 314 25.4


line stmt bran cond sub pod time code
1             package AnyEvent::DBI::MySQL;
2 12     12   3540152 use 5.010001;
  12         53  
3 12     12   76 use warnings;
  12         22  
  12         585  
4 12     12   77 use strict;
  12         23  
  12         340  
5 12     12   2185 use utf8;
  12         47  
  12         107  
6 12     12   440 use Carp;
  12         22  
  12         1361  
7              
8             our $VERSION = 'v2.1.0';
9              
10             ## no critic(ProhibitMultiplePackages Capitalization ProhibitNoWarnings)
11              
12 12     12   72 use base qw( DBI );
  12         20  
  12         7450  
13 12     12   66231 use AnyEvent;
  12         37264  
  12         489  
14 12     12   100 use Scalar::Util qw( weaken );
  12         20  
  12         8517  
15              
16             my @DATA;
17             my @NEXT_ID = ();
18             my $NEXT_ID = 0;
19             my $PRIVATE = 'private_' . __PACKAGE__;
20             my $PRIVATE_async = "$PRIVATE/async";
21              
22             # Force connect_cached() but with unique key in $attr - this guarantee
23             # cached $dbh will be reused only after they no longer in use by user.
24             # Use {RootClass} instead of $class->connect_cached() because
25             # DBI->connect_cached() will call $class->connect() in turn.
26             sub connect { ## no critic(ProhibitBuiltinHomonyms)
27 2     2 1 3498 my ($class, $dsn, $user, $pass, $attr) = @_;
28 2     0   17 local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0         0  
  0         0  
29 2 50       10 my $id = @NEXT_ID ? pop @NEXT_ID : $NEXT_ID++;
30              
31 2   50     8 $attr //= {};
32 2         5 $attr->{RootClass} = $class;
33 2         6 $attr->{$PRIVATE} = $id;
34 2         14 my $dbh = DBI->connect_cached($dsn, $user, $pass, $attr);
35 0 0         return if !$dbh;
36              
37             # weaken cached $dbh to have DESTROY called when user stop using it
38 0           my $cache = $dbh->{Driver}{CachedKids};
39 0 0         for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
  0            
  0            
40 0           weaken($cache->{$_});
41             }
42              
43 0           weaken(my $weakdbh = $dbh);
44 0           my $io_cb; $io_cb = sub {
45 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; warn "$msg\n" };
  0            
  0            
46 0           my $data = $DATA[$id];
47 0           my $cb = delete $data->{cb};
48 0           my $h = delete $data->{h};
49 0           my $args=delete $data->{call_again};
50 0 0 0       if ($cb && $h) {
51 0   0       $cb->( $h->mysql_async_result, $h, $args // ());
52             }
53             else {
54 0           $DATA[$id] = {};
55 0 0 0       if ($weakdbh && $weakdbh->{mysql_auto_reconnect}) {
56 0           $weakdbh->ping; # initiate reconnect
57 0 0         if ($weakdbh->ping) { # check is reconnect was successful
58 0           $DATA[ $id ] = {
59             io => AnyEvent->io(
60             fh => $weakdbh->mysql_fd,
61             poll => 'r',
62             cb => $io_cb,
63             ),
64             };
65             }
66             }
67             }
68 0           };
69 0           $DATA[ $id ] = {
70             io => AnyEvent->io(
71             fh => $dbh->mysql_fd,
72             poll => 'r',
73             cb => $io_cb,
74             ),
75             };
76              
77 0           return $dbh;
78             }
79              
80              
81             package AnyEvent::DBI::MySQL::db;
82 12     12   74 use base qw( DBI::db );
  12         19  
  12         5266  
83 12     12   184 use Carp;
  12         21  
  12         822  
84 12     12   59 use Scalar::Util qw( weaken );
  12         20  
  12         8483  
85              
86             my $GLOBAL_DESTRUCT = 0;
87 12     12   18608 END { $GLOBAL_DESTRUCT = 1; }
88              
89             sub DESTROY {
90 0     0     my ($dbh) = @_;
91              
92 0 0         if ($GLOBAL_DESTRUCT) {
93 0           return $dbh->SUPER::DESTROY();
94             }
95              
96 0           $DATA[ $dbh->{$PRIVATE} ] = {};
97 0           push @NEXT_ID, $dbh->{$PRIVATE};
98 0 0         if (!$dbh->{Active}) {
99 0           $dbh->SUPER::DESTROY();
100             }
101             else {
102             # un-weaken cached $dbh to keep it for next connect_cached()
103 0           my $cache = $dbh->{Driver}{CachedKids};
104 0 0         for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
  0            
  0            
105 0           $cache->{$_} = $dbh;
106             }
107             }
108 0           return;
109             }
110              
111             sub do { ## no critic(ProhibitBuiltinHomonyms)
112 0     0     my ($dbh, @args) = @_;
113 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
114 0           my $ref = ref $args[-1];
115 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
116 0           my $data = $DATA[ $dbh->{$PRIVATE} ];
117 0 0         if ($data->{cb}) {
118 0           croak q{can't make more than one asynchronous query simultaneously};
119             }
120 0           $data->{cb} = pop @args;
121 0           $data->{h} = $dbh;
122 0           weaken($data->{h});
123 0   0       $args[1] //= {};
124 0   0       $args[1]->{async} //= 1;
125 0 0         if (!$args[1]->{async}) {
126 0           my $cb = delete $data->{cb};
127 0           my $h = delete $data->{h};
128 0           $cb->( $dbh->SUPER::do(@args), $h );
129 0           return;
130             }
131             }
132             else {
133 0   0       $args[1] //= {};
134 0 0         if ($args[1]->{async}) {
135 0           croak q{callback required};
136             }
137             }
138 0           return $dbh->SUPER::do(@args);
139             }
140              
141             sub prepare {
142 0     0     my ($dbh, @args) = @_;
143 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
144 0   0       $args[1] //= {};
145 0   0       $args[1]->{async} //= 1;
146 0 0         my $sth = $dbh->SUPER::prepare(@args) or return;
147 0           $sth->{$PRIVATE} = $dbh->{$PRIVATE};
148 0           $sth->{$PRIVATE_async} = $args[1]->{async};
149 0           return $sth;
150             }
151              
152             { # replace C implementations in Driver.xst because it doesn't play nicely with DBI subclassing
153 12     12   92 no warnings 'redefine';
  12         22  
  12         1953  
154             *DBI::db::selectrow_array = \&DBD::_::db::selectrow_array;
155             *DBI::db::selectrow_arrayref= \&DBD::_::db::selectrow_arrayref;
156             *DBI::db::selectall_arrayref= \&DBD::_::db::selectall_arrayref;
157             }
158              
159             my @methods = qw(
160             selectcol_arrayref
161             selectrow_hashref
162             selectall_hashref
163             selectrow_array
164             selectrow_arrayref
165             selectall_arrayref
166             );
167             for (@methods) {
168             my $method = $_;
169             my $super = "SUPER::$method";
170 12     12   66 no strict 'refs';
  12         18  
  12         5914  
171             *{$method} = sub {
172 0     0     my ($dbh, @args) = @_;
173 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
174              
175 0 0         my $attr_idx = $method eq 'selectall_hashref' ? 2 : 1;
176 0           my $ref = ref $args[$attr_idx];
177 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
178 0           splice @args, $attr_idx, 0, {};
179             } else {
180 0   0       $args[$attr_idx] //= {};
181             }
182              
183 0           $ref = ref $args[-1];
184 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
185 0           my $data = $DATA[ $dbh->{$PRIVATE} ];
186 0   0       $args[$attr_idx]->{async} //= 1;
187 0           my $cb = $args[-1];
188             # The select*() functions should be called twice:
189             # - first time they'll do only prepare() and execute()
190             # * we should return false from execute() to interrupt them
191             # after execute(), before they'll start fetching data
192             # * we shouldn't weaken {h} because their $sth will be
193             # destroyed when they will be interrupted
194             # - second time they'll do only data fetching:
195             # * they should get ready $sth instead of query param,
196             # so they'll skip prepare()
197             # * this $sth should be AnyEvent::DBI::MySQL::st::ready,
198             # so they'll skip execute()
199 0           $data->{call_again} = [@args[1 .. $#args-1]];
200 0           weaken($dbh);
201             $args[-1] = sub {
202 0     0     my (undef, $sth, $args) = @_;
203 0 0         return if !$dbh;
204 0 0         if ($dbh->err) {
205 0           $cb->();
206             }
207             else {
208 0           bless $sth, 'AnyEvent::DBI::MySQL::st::ready';
209 0           $cb->( $dbh->$super($sth, @{$args}) );
  0            
210             }
211 0           };
212 0 0         if (!$args[$attr_idx]->{async}) {
213 0           delete $data->{call_again};
214 0           $cb->( $dbh->$super(@args[0 .. $#args-1]) );
215 0           return;
216             }
217             }
218             else {
219 0 0         if ($args[$attr_idx]->{async}) {
220 0           croak q{callback required};
221             } else {
222 0           $args[$attr_idx]->{async} = 0;
223             }
224             }
225              
226 0           return $dbh->$super(@args);
227             };
228             }
229              
230              
231             package AnyEvent::DBI::MySQL::st;
232 12     12   79 use base qw( DBI::st );
  12         23  
  12         4301  
233 12     12   75 use Carp;
  12         20  
  12         803  
234 12     12   73 use Scalar::Util qw( weaken );
  12         32  
  12         4721  
235              
236             sub execute {
237 0     0     my ($sth, @args) = @_;
238 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
239 0           my $data = $DATA[ $sth->{$PRIVATE} ];
240 0           my $ref = ref $args[-1];
241 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
    0          
242 0 0         if ($data->{cb}) {
243 0           croak q{can't make more than one asynchronous query simultaneously};
244             }
245 0           $data->{cb} = pop @args;
246 0           $data->{h} = $sth;
247 0 0         if (!$sth->{$PRIVATE_async}) {
248 0           my $cb = delete $data->{cb};
249 0           my $h = delete $data->{h};
250 0           $cb->( $sth->SUPER::execute(@args), $h );
251 0           return;
252             }
253 0           $sth->SUPER::execute(@args);
254 0 0         if ($sth->err) { # execute failed, I/O won't happens
255 0           my $cb = delete $data->{cb};
256 0           my $h = delete $data->{h};
257 0           my $args=delete $data->{call_again};
258 0   0       $cb->( undef, $h, $args // () );
259             }
260 0           return;
261             }
262             elsif ($sth->{$PRIVATE_async}) {
263 0           croak q{callback required};
264             }
265 0           return $sth->SUPER::execute(@args);
266             }
267              
268              
269             package AnyEvent::DBI::MySQL::st::ready;
270 12     12   72 use base qw( DBI::st );
  12         16  
  12         3935  
271 0     0     sub execute { return '0E0' };
272              
273              
274             1; # Magic true value required at end of module
275             __END__