File Coverage

blib/lib/AnyEvent/DBI/MySQL.pm
Criterion Covered Total %
statement 62 185 33.5
branch 1 48 2.0
condition 1 37 2.7
subroutine 20 33 60.6
pod 1 1 100.0
total 85 304 27.9


line stmt bran cond sub pod time code
1             package AnyEvent::DBI::MySQL;
2              
3 13     13   349499 use warnings;
  13         32  
  13         441  
4 13     13   68 use strict;
  13         24  
  13         417  
5 13     13   16064 use utf8;
  13         138  
  13         71  
6 13     13   470 use feature ':5.10';
  13         23  
  13         1985  
7 13     13   71 use Carp;
  13         24  
  13         1186  
8              
9 13     13   11179 use version; our $VERSION = qv('1.0.5'); # REMINDER: update Changes
  13         33067  
  13         83  
10              
11             ## no critic(ProhibitMultiplePackages Capitalization ProhibitNoWarnings)
12              
13             # REMINDER: update dependencies in Build.PL
14 13     13   1460 use base qw( DBI );
  13         29  
  13         34364  
15 13     13   304738 use AnyEvent;
  13         41266  
  13         479  
16 13     13   98 use Scalar::Util qw( weaken );
  13         27  
  13         9882  
17              
18             my @DATA;
19             my @NEXT_ID = ();
20             my $NEXT_ID = 0;
21             my $PRIVATE = 'private_' . __PACKAGE__;
22             my $PRIVATE_async = "$PRIVATE/async";
23              
24             # Force connect_cached() but with unique key in $attr - this guarantee
25             # cached $dbh will be reused only after they no longer in use by user.
26             # Use {RootClass} instead of $class->connect_cached() because
27             # DBI->connect_cached() will call $class->connect() in turn.
28             sub connect { ## no critic(ProhibitBuiltinHomonyms)
29 2     2 1 5154 my ($class, $dsn, $user, $pass, $attr) = @_;
30 2     0   20 local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0         0  
  0         0  
31 2 50       12 my $id = @NEXT_ID ? pop @NEXT_ID : $NEXT_ID++;
32              
33 2   50     8 $attr //= {};
34 2         6 $attr->{RootClass} = $class;
35 2         7 $attr->{$PRIVATE} = $id;
36 2         19 my $dbh = DBI->connect_cached($dsn, $user, $pass, $attr);
37 0 0         return if !$dbh;
38              
39             # weaken cached $dbh to have DESTROY called when user stop using it
40 0           my $cache = $dbh->{Driver}{CachedKids};
41 0 0         for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
  0            
  0            
42 0           weaken($cache->{$_});
43             }
44              
45             $DATA[ $id ] = {
46             io => AnyEvent->io(
47             fh => $dbh->mysql_fd,
48             poll => 'r',
49             cb => sub {
50 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; warn "$msg\n" };
  0            
  0            
51 0           my $data = $DATA[$id];
52 0           my $cb = delete $data->{cb};
53 0           my $h = delete $data->{h};
54 0           my $args=delete $data->{call_again};
55 0 0 0       if ($cb && $h) {
56 0   0       $cb->( $h->mysql_async_result, $h, $args // ());
57             }
58             },
59 0           ),
60             };
61              
62 0           return $dbh;
63             }
64              
65              
66             package AnyEvent::DBI::MySQL::db;
67 13     13   227 use base qw( DBI::db );
  13         28  
  13         8812  
68 13     13   87 use Carp;
  13         25  
  13         1092  
69 13     13   72 use Scalar::Util qw( weaken );
  13         25  
  13         10920  
70              
71             my $GLOBAL_DESTRUCT = 0;
72 13     13   74057 END { $GLOBAL_DESTRUCT = 1; }
73              
74             sub DESTROY {
75 0     0     my ($dbh) = @_;
76              
77 0 0         if ($GLOBAL_DESTRUCT) {
78 0           return $dbh->SUPER::DESTROY();
79             }
80              
81 0           $DATA[ $dbh->{$PRIVATE} ] = {};
82 0           push @NEXT_ID, $dbh->{$PRIVATE};
83 0 0         if (!$dbh->{Active}) {
84 0           $dbh->SUPER::DESTROY();
85             }
86             else {
87             # un-weaken cached $dbh to keep it for next connect_cached()
88 0           my $cache = $dbh->{Driver}{CachedKids};
89 0 0         for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
  0            
  0            
90 0           $cache->{$_} = $dbh;
91             }
92             }
93 0           return;
94             }
95              
96             sub do { ## no critic(ProhibitBuiltinHomonyms)
97 0     0     my ($dbh, @args) = @_;
98 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
99 0           my $ref = ref $args[-1];
100 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
101 0           my $data = $DATA[ $dbh->{$PRIVATE} ];
102 0 0         if ($data->{cb}) {
103 0           croak q{can't make more than one asynchronous query simultaneously};
104             }
105 0           $data->{cb} = pop @args;
106 0           $data->{h} = $dbh;
107 0           weaken($data->{h});
108 0   0       $args[1] //= {};
109 0   0       $args[1]->{async} //= 1;
110 0 0         if (!$args[1]->{async}) {
111 0           my $cb = delete $data->{cb};
112 0           my $h = delete $data->{h};
113 0           $cb->( $dbh->SUPER::do(@args), $h );
114 0           return;
115             }
116             }
117             else {
118 0   0       $args[1] //= {};
119 0 0         if ($args[1]->{async}) {
120 0           croak q{callback required};
121             }
122             }
123 0           return $dbh->SUPER::do(@args);
124             }
125              
126             sub prepare {
127 0     0     my ($dbh, @args) = @_;
128 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
129 0   0       $args[1] //= {};
130 0   0       $args[1]->{async} //= 1;
131 0 0         my $sth = $dbh->SUPER::prepare(@args) or return;
132 0           $sth->{$PRIVATE} = $dbh->{$PRIVATE};
133 0           $sth->{$PRIVATE_async} = $args[1]->{async};
134 0           return $sth;
135             }
136              
137             { # replace C implementations in Driver.xst because it doesn't play nicely with DBI subclassing
138 13     13   86 no warnings 'redefine';
  13         25  
  13         2856  
139             *DBI::db::selectrow_array = \&DBD::_::db::selectrow_array;
140             *DBI::db::selectrow_arrayref= \&DBD::_::db::selectrow_arrayref;
141             *DBI::db::selectall_arrayref= \&DBD::_::db::selectall_arrayref;
142             }
143              
144             my @methods = qw(
145             selectcol_arrayref
146             selectrow_hashref
147             selectall_hashref
148             selectrow_array
149             selectrow_arrayref
150             selectall_arrayref
151             );
152             for (@methods) {
153             my $method = $_;
154             my $super = "SUPER::$method";
155 13     13   104 no strict 'refs';
  13         20  
  13         8941  
156             *{$method} = sub {
157 0     0     my ($dbh, @args) = @_;
158 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
159              
160 0 0         my $attr_idx = $method eq 'selectall_hashref' ? 2 : 1;
161 0           my $ref = ref $args[$attr_idx];
162 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
163 0           splice @args, $attr_idx, 0, {};
164             } else {
165 0   0       $args[$attr_idx] //= {};
166             }
167              
168 0           $ref = ref $args[-1];
169 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
170 0           my $data = $DATA[ $dbh->{$PRIVATE} ];
171 0   0       $args[$attr_idx]->{async} //= 1;
172 0           my $cb = $args[-1];
173             # The select*() functions should be called twice:
174             # - first time they'll do only prepare() and execute()
175             # * we should return false from execute() to interrupt them
176             # after execute(), before they'll start fetching data
177             # * we shouldn't weaken {h} because their $sth will be
178             # destroyed when they will be interrupted
179             # - second time they'll do only data fetching:
180             # * they should get ready $sth instead of query param,
181             # so they'll skip prepare()
182             # * this $sth should be AnyEvent::DBI::MySQL::st::ready,
183             # so they'll skip execute()
184 0           $data->{call_again} = [@args[1 .. $#args-1]];
185 0           weaken($dbh);
186             $args[-1] = sub {
187 0     0     my (undef, $sth, $args) = @_;
188 0 0         return if !$dbh;
189 0 0         if ($dbh->err) {
190 0           $cb->();
191             }
192             else {
193 0           bless $sth, 'AnyEvent::DBI::MySQL::st::ready';
194 0           $cb->( $dbh->$super($sth, @{$args}) );
  0            
195             }
196 0           };
197 0 0         if (!$args[$attr_idx]->{async}) {
198 0           delete $data->{call_again};
199 0           $cb->( $dbh->$super(@args[0 .. $#args-1]) );
200 0           return;
201             }
202             }
203             else {
204 0 0         if ($args[$attr_idx]->{async}) {
205 0           croak q{callback required};
206             } else {
207 0           $args[$attr_idx]->{async} = 0;
208             }
209             }
210              
211 0           return $dbh->$super(@args);
212             };
213             }
214              
215              
216             package AnyEvent::DBI::MySQL::st;
217 13     13   82 use base qw( DBI::st );
  13         26  
  13         8415  
218 13     13   83 use Carp;
  13         24  
  13         843  
219 13     13   67 use Scalar::Util qw( weaken );
  13         23  
  13         6168  
220              
221             sub execute {
222 0     0     my ($sth, @args) = @_;
223 0     0     local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
  0            
  0            
224 0           my $data = $DATA[ $sth->{$PRIVATE} ];
225 0           my $ref = ref $args[-1];
226 0 0 0       if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
    0          
227 0 0         if ($data->{cb}) {
228 0           croak q{can't make more than one asynchronous query simultaneously};
229             }
230 0           $data->{cb} = pop @args;
231 0           $data->{h} = $sth;
232 0 0         if (!$sth->{$PRIVATE_async}) {
233 0           my $cb = delete $data->{cb};
234 0           my $h = delete $data->{h};
235 0           $cb->( $sth->SUPER::execute(@args), $h );
236 0           return;
237             }
238 0           $sth->SUPER::execute(@args);
239 0 0         if ($sth->err) { # execute failed, I/O won't happens
240 0           my $cb = delete $data->{cb};
241 0           my $h = delete $data->{h};
242 0           my $args=delete $data->{call_again};
243 0   0       $cb->( undef, $h, $args // () );
244             }
245 0           return;
246             }
247             elsif ($sth->{$PRIVATE_async}) {
248 0           croak q{callback required};
249             }
250 0           return $sth->SUPER::execute(@args);
251             }
252              
253              
254             package AnyEvent::DBI::MySQL::st::ready;
255 13     13   84 use base qw( DBI::st );
  13         24  
  13         7276  
256 0     0     sub execute { return '0E0' };
257              
258              
259             1; # Magic true value required at end of module
260             __END__
261              
262             =encoding utf8
263              
264             =head1 NAME
265              
266             AnyEvent::DBI::MySQL - Asynchronous MySQL queries
267              
268              
269             =head1 SYNOPSIS
270              
271             use AnyEvent::DBI::MySQL;
272              
273             # get cached but not in use $dbh
274             $dbh = AnyEvent::DBI::MySQL->connect(…);
275              
276             # async
277             $dbh->do(…, sub { my ($rv, $dbh) = @_; … });
278             $sth = $dbh->prepare(…);
279             $sth->execute(…, sub { my ($rv, $sth) = @_; … });
280             $dbh->selectall_arrayref(…, sub { my ($ary_ref) = @_; … });
281             $dbh->selectall_hashref(…, sub { my ($hash_ref) = @_; … });
282             $dbh->selectcol_arrayref(…, sub { my ($ary_ref) = @_; … });
283             $dbh->selectrow_array(…, sub { my (@row_ary) = @_; … });
284             $dbh->selectrow_arrayref(…, sub { my ($ary_ref) = @_; … });
285             $dbh->selectrow_hashref(…, sub { my ($hash_ref) = @_; … });
286              
287             # sync
288             $rv = $dbh->do('…');
289             $dbh->do('…', {async=>0}, sub { my ($rv, $dbh) = @_; … });
290              
291              
292             =head1 DESCRIPTION
293              
294             This module is an L<AnyEvent> user, you need to make sure that you use and
295             run a supported event loop.
296              
297             This module implements asynchronous MySQL queries using
298             L<DBD::mysql/"ASYNCHRONOUS QUERIES"> feature. Unlike L<AnyEvent::DBI> it
299             doesn't spawn any processes.
300              
301             You shouldn't use C<< {RaiseError=>1} >> with this module and should check
302             returned values in your callback to detect errors. This is because with
303             C<< {RaiseError=>1} >> exception will be thrown B<instead> of calling your
304             callback function, which isn't what you want in most cases.
305              
306              
307             =head1 INTERFACE
308              
309             The API is trivial: use it just like usual DBI, but instead of expecting
310             return value from functions which may block add one extra parameter: callback.
311             That callback will be executed with usual returned value of used method in
312             params (only exception is extra $dbh/$sth param in do() and execute() for
313             convenience).
314              
315             =head2 SYNCHRONOUS QUERIES
316              
317             In most cases to make usual synchronous query it's enough to don't provide
318             callback - use standard DBI params and it will work just like usual DBI.
319             Only exception is prepare()/execute() pair: you should use
320             C<< {async=>0} >> attribute for prepare() to have synchronous execute().
321              
322             For convenience, you can quickly turn asynchronous query to synchronous by
323             adding C<< {async=>0} >> attribute - you don't have to rewrite code to
324             remove callback function. In this case your callback will be called
325             immediately after executing this synchronous query.
326              
327             =over
328              
329             =item connect(…)
330              
331             L<DBD::mysql> support only single asynchronous query per MySQL connection.
332             To make it easier to overcome this limitation provided connect()
333             constructor work using DBI->connect_cached() under the hood, but it reuse
334             only inactive $dbh - i.e. one which you didn't use anymore. So, connect()
335             guarantee to not return $dbh which is already in use in your code.
336             For example, in FastCGI or Mojolicious app you can safely use connect() to
337             get own $dbh per each incoming connection; after you send response and
338             close this connection that $dbh should automatically go out of scope and
339             become inactive (you can force this by C<$dbh=undef;>); after that this
340             $dbh may be returned by connect() when handling next incoming request.
341             As result you should automatically get a pool of connected $dbh which size
342             should match peak amount of simultaneously handled CGI requests.
343             You can flush that $dbh cache as documented by L<DBI> at any time.
344              
345             NOTE: To implement this caching behavior this module catch DESTROY() for
346             $dbh and instead of destroying it (and calling $dbh->disconnect()) make it
347             available for next connect() call in cache. So, if you need to call
348             $dbh->disconnect() - do it manually and don't expect it to happens
349             automatically on $dbh DESTROY(), like it work in DBI.
350              
351             Also, usual limitations for cached connections apply as documented by
352             L<DBI> (read: don't change $dbh configuration).
353              
354             =item $dbh->do(…, sub { my ($rv, $dbh) = @_; … });
355              
356             =item $sth->execute(…, sub { my ($rv, $sth) = @_; … });
357              
358             =item $dbh->selectall_arrayref(…, sub { my ($ary_ref) = @_; … });
359              
360             =item $dbh->selectall_hashref(…, sub { my ($hash_ref) = @_; … });
361              
362             =item $dbh->selectcol_arrayref(…, sub { my ($ary_ref) = @_; … });
363              
364             =item $dbh->selectrow_array(…, sub { my (@row_ary) = @_; … });
365              
366             =item $dbh->selectrow_arrayref(…, sub { my ($ary_ref) = @_; … });
367              
368             =item $dbh->selectrow_hashref(…, sub { my ($hash_ref) = @_; … });
369              
370             =back
371              
372              
373             =head1 BUGS AND LIMITATIONS
374              
375             No bugs have been reported.
376              
377             These DBI methods not supported yet (i.e. they work as usually - in
378             blocking mode), mostly because they internally run several queries and
379             should be completely rewritten to support non-blocking mode.
380              
381             NOTE: You have to provide C<< {async=>0} >> attribute to prepare() before
382             using execute_array() or execute_for_fetch().
383              
384             $sth->execute_array(…)
385             $sth->execute_for_fetch(…)
386             $dbh->table_info(…)
387             $dbh->column_info(…)
388             $dbh->primary_key_info(…)
389             $dbh->foreign_key_info(…)
390             $dbh->statistics_info(…)
391             $dbh->primary_key(…)
392             $dbh->tables(…)
393              
394              
395             =head1 SUPPORT
396              
397             Please report any bugs or feature requests through the web interface at
398             L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=AnyEvent-DBI-MySQL>.
399             I will be notified, and then you'll automatically be notified of progress
400             on your bug as I make changes.
401              
402             You can also look for information at:
403              
404             =over
405              
406             =item * RT: CPAN's request tracker
407              
408             L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=AnyEvent-DBI-MySQL>
409              
410             =item * AnnoCPAN: Annotated CPAN documentation
411              
412             L<http://annocpan.org/dist/AnyEvent-DBI-MySQL>
413              
414             =item * CPAN Ratings
415              
416             L<http://cpanratings.perl.org/d/AnyEvent-DBI-MySQL>
417              
418             =item * Search CPAN
419              
420             L<http://search.cpan.org/dist/AnyEvent-DBI-MySQL/>
421              
422             =back
423              
424              
425             =head1 SEE ALSO
426              
427             L<AnyEvent>, L<DBI>, L<AnyEvent::DBI>
428              
429              
430             =head1 AUTHOR
431              
432             Alex Efros C<< <powerman@cpan.org> >>
433              
434              
435             =head1 LICENSE AND COPYRIGHT
436              
437             Copyright 2013 Alex Efros <powerman@cpan.org>.
438              
439             This program is distributed under the MIT (X11) License:
440             L<http://www.opensource.org/licenses/mit-license.php>
441              
442             Permission is hereby granted, free of charge, to any person
443             obtaining a copy of this software and associated documentation
444             files (the "Software"), to deal in the Software without
445             restriction, including without limitation the rights to use,
446             copy, modify, merge, publish, distribute, sublicense, and/or sell
447             copies of the Software, and to permit persons to whom the
448             Software is furnished to do so, subject to the following
449             conditions:
450              
451             The above copyright notice and this permission notice shall be
452             included in all copies or substantial portions of the Software.
453              
454             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
455             EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
456             OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
457             NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
458             HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
459             WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
460             FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
461             OTHER DEALINGS IN THE SOFTWARE.
462