File Coverage

blib/lib/AnyEvent/MySQL.pm
Criterion Covered Total %
statement 65 618 10.5
branch 0 270 0.0
condition 0 51 0.0
subroutine 22 81 27.1
pod 1 1 100.0
total 88 1021 8.6


line stmt bran cond sub pod time code
1             package AnyEvent::MySQL;
2              
3 1     1   13591 use 5.006;
  1         3  
4 1     1   10 use strict;
  1         1  
  1         19  
5 1     1   4 use warnings;
  1         5  
  1         40  
6              
7             =encoding utf8
8              
9             =head1 NAME
10              
11             AnyEvent::MySQL - Pure Perl AnyEvent socket implementation of MySQL client
12              
13             =head1 VERSION
14              
15             Version 1.2.1
16              
17             =cut
18              
19             our $VERSION = '1.002001';
20              
21 1     1   378 use AnyEvent::MySQL::Imp;
  1         5  
  1         210  
22              
23              
24             =head1 SYNOPSIS
25              
26             This package is used in my company since 2012 to today (2017). I think it should be stable.
27             (though some data type fetching through prepared command are not implemented)
28              
29             Please read the test.pl file as a usage example. >w<
30              
31             #!/usr/bin/perl
32              
33             use strict;
34             use warnings;
35              
36             BEGIN {
37             eval {
38             require AE;
39             require Data::Dumper;
40             require Devel::StackTrace;
41             require EV;
42             };
43             if( $@ ) {
44             warn "require module fail: $@";
45             exit;
46             }
47             }
48              
49             $EV::DIED = sub {
50             print "EV::DIED: $@\n";
51             print Devel::StackTrace->new->as_string;
52             };
53              
54             use lib 'lib';
55             use AnyEvent::MySQL;
56              
57             my $end = AE::cv;
58              
59             my $dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { PrintError => 1 }, sub {
60             my($dbh) = @_;
61             if( $dbh ) {
62             warn "Connect success!";
63             $dbh->pre_do("set names latin1");
64             $dbh->pre_do("set names utf8");
65             }
66             else {
67             warn "Connect fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
68             $end->send;
69             }
70             });
71              
72             $dbh->do("select * from t1 where a<=?", {}, 15, sub {
73             my $rv = shift;
74             if( defined($rv) ) {
75             warn "Do success: $rv";
76             }
77             else {
78             warn "Do fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
79             }
80             $end->send;
81             });
82              
83             #$end->recv;
84             my $end2 = AE::cv;
85              
86             #$dbh->prepare("update t1 set a=1 where b=1", sub {
87             #$dbh->prepare("select * from t1", sub {
88             my $sth = $dbh->prepare("select b, a aaa from t1 where a>?", sub {
89             #$dbh->prepare("select * from type_all", sub {
90             warn "prepared!";
91             $end2->send;
92             });
93              
94             #$end2->recv;
95              
96             my $end3 = AE::cv;
97              
98             $sth->execute(1, sub {
99             warn "executed! $_[0]";
100             $end3->send($_[0]);
101             });
102              
103             my $fth = $end3->recv;
104              
105             my $end4 = AE::cv;
106              
107             $fth->bind_col(2, \my $a, sub {
108             warn $_[0];
109             });
110             my $fetch; $fetch = sub {
111             $fth->fetch(sub {
112             if( $_[0] ) {
113             warn "Get! $a";
114             $fetch->();
115             }
116             else {
117             warn "Get End!";
118             undef $fetch;
119             $end4->send;
120             }
121             });
122             }; $fetch->();
123              
124             #$fth->bind_columns(\my($a, $b), sub {
125             # warn $_[0];
126             # warn $AnyEvent::MySQL::errstr;
127             #});
128             #my $fetch; $fetch = sub {
129             # $fth->fetch(sub {
130             # if( $_[0] ) {
131             # warn "Get! ($a, $b)";
132             # $fetch->();
133             # }
134             # else {
135             # undef $fetch;
136             # $end4->send;
137             # }
138             # });
139             #}; $fetch->();
140              
141             #my $fetch; $fetch = sub {
142             # $fth->fetchrow_array(sub {
143             # if( @_ ) {
144             # warn "Get! (@_)";
145             # $fetch->();
146             # }
147             # else {
148             # undef $fetch;
149             # $end4->send;
150             # }
151             # });
152             #}; $fetch->();
153              
154             #my $fetch; $fetch = sub {
155             # $fth->fetchrow_arrayref(sub {
156             # if( $_[0] ) {
157             # warn "Get! (@{$_[0]})";
158             # $fetch->();
159             # }
160             # else {
161             # undef $fetch;
162             # $end4->send;
163             # }
164             # });
165             #}; $fetch->();
166              
167             #my $fetch; $fetch = sub {
168             # $fth->fetchrow_hashref(sub {
169             # if( $_[0] ) {
170             # warn "Get! (@{[%{$_[0]}]})";
171             # $fetch->();
172             # }
173             # else {
174             # undef $fetch;
175             # $end4->send;
176             # }
177             # });
178             #}; $fetch->();
179              
180             $end4->recv;
181              
182             #tcp_connect 0, 3306, sub {
183             # my $fh = shift;
184             # my $hd = AnyEvent::Handle->new( fh => $fh );
185             # AnyEvent::MySQL::Imp::do_auth($hd, 'tiwi', '', sub {
186             # undef $hd;
187             # warn $_[0];
188             # $end->send;
189             # });
190             #};
191              
192             my $end5 = AE::cv;
193              
194             $dbh->selectall_arrayref("select a*2, b from t1 where a<=?", {}, 15, sub {
195             warn "selectall_arrayref";
196             warn Dumper($_[0]);
197             });
198              
199             $dbh->selectall_hashref("select a*2, b from t1", 'b', sub {
200             warn "selectall_hashref";
201             warn Dumper($_[0]);
202             });
203              
204             $dbh->selectall_hashref("select a*2, b from t1", ['b', 'a*2'], sub {
205             warn "selectall_hashref";
206             warn Dumper($_[0]);
207             });
208              
209             $dbh->selectall_hashref("select a*2, b from t1", sub {
210             warn "selectall_hashref";
211             warn Dumper($_[0]);
212             });
213              
214             $dbh->selectcol_arrayref("select a*2, b from t1", { Columns => [1,2,1] }, sub {
215             warn "selectcol_arrayref";
216             warn Dumper($_[0]);
217             });
218              
219             $dbh->selectall_arrayref("select * from t3", sub {
220             warn "selectall_arrayref t3";
221             warn Dumper($_[0]);
222             });
223              
224             $dbh->selectrow_array("select * from t1 where a>? order by a", {}, 2, sub {
225             warn "selectrow_array";
226             warn Dumper(\@_);
227             });
228              
229             $dbh->selectrow_arrayref("select * from t1 where a>? order by a", {}, 2, sub {
230             warn "selectrow_arrayref";
231             warn Dumper($_[0]);
232             });
233              
234             $dbh->selectrow_hashref("select * from t1 where a>? order by a", {}, 2, sub {
235             warn "selectrow_hashref";
236             warn Dumper($_[0]);
237             });
238              
239             my $st = $dbh->prepare("select * from t1 where a>? order by a");
240              
241             $st->execute(2, sub {
242             warn "fetchall_arrayref";
243             warn Dumper($_[0]->fetchall_arrayref());
244             });
245              
246             $st->execute(2, sub {
247             warn "fetchall_hashref(a)";
248             warn Dumper($_[0]->fetchall_hashref('a'));
249             });
250              
251             $st->execute(2, sub {
252             warn "fetchall_hashref";
253             warn Dumper($_[0]->fetchall_hashref());
254             });
255              
256             $st->execute(2, sub {
257             warn "fetchcol_arrayref";
258             warn Dumper($_[0]->fetchcol_arrayref());
259             });
260              
261             $dbh->begin_work( sub {
262             warn "txn begin.. @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
263             } );
264              
265             $dbh->do("update t1 set a=? b=?", {}, 3, 4, sub {
266             warn "error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
267             } );
268              
269             $dbh->do("update t1 set b=b+1", {}, sub {
270             warn "after error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
271             } );
272              
273             $dbh->commit( sub {
274             warn "aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
275             } );
276              
277             $dbh->do("update t1 set b=b+1", {}, sub {
278             warn "after aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
279             $end5->send;
280             } );
281              
282             #my $txh = $dbh->begin_work(sub {
283             # warn "txn begin.. @_";
284             #});
285             #
286             #$dbh->do("insert into t1 values (50,50)", { Tx => $txh }, sub {
287             # warn "insert in txn @_ insertid=".$dbh->last_insert_id;
288             #});
289             #
290             #$txh->rollback(sub {
291             # warn "rollback txn @_";
292             #});
293             #
294             #$dbh->selectall_arrayref("select * from t1", sub {
295             # warn "check rollback txn: ".Dumper($_[0]);
296             #});
297             #
298             #my $txh2 = $dbh->begin_work(sub {
299             # warn "txn2 begin.. @_";
300             #});
301             #
302             #$dbh->do("insert into t1 values (50,50)", { Tx => $txh2 }, sub {
303             # warn "insert in txn2 @_ insertid=".$dbh->last_insert_id;
304             #});
305             #
306             #$txh2->commit(sub {
307             # warn "commit txn2 @_";
308             #});
309             #
310             #$dbh->selectall_arrayref("select * from t1", sub {
311             # warn "check commit txn: ".Dumper($_[0]);
312             #});
313             #
314             #$dbh->do("delete from t1 where a=50", sub {
315             # warn "remove the effect @_";
316             #});
317             #
318             #my $update_st;
319             #
320             #my $txh3; $txh3 = $dbh->begin_work(sub {
321             # warn "txn3 begin.. @_";
322             #});
323             #
324             # $update_st = $dbh->prepare("insert into t1 values (?,?)", sub {
325             # warn "prepare insert @_";
326             # });
327             # $update_st->execute(60, 60, { Tx => $txh3 }, sub {
328             # warn "insert 60 @_";
329             # });
330             #
331             # $dbh->selectall_arrayref("select * from t1", { Tx => $txh3 }, sub {
332             # warn "select in txn3: ".Dumper($_[0]);
333             # });
334             #
335             # $txh3->rollback(sub {
336             # warn "txh3 rollback @_";
337             # });
338             #
339             # $dbh->selectall_arrayref("select * from t1", sub {
340             # warn "select out txn3: ".Dumper($_[0]);
341             # });
342              
343             #$st_all = $dbh->prepare("select `date`, `time`, `datetime`, `timestamp` from all_type", sub {
344             # warn "prepare st_all @_";
345             #});
346             #
347             #$st_all->execute
348              
349             $end5->recv;
350              
351             my $readonly_dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { ReadOnly => 1 }, sub {
352             # ... we can only use "select" and "show" and "set names" command on this handle
353             });
354              
355             $end->recv;
356              
357             =cut
358              
359       0     sub _empty_cb {}
360              
361             =head2 $dbh = AnyEvent::MySQL->connect($data_source, $username, [$auth, [\%attr,]] $cb->($dbh, 1))
362              
363             =cut
364             sub connect {
365 0     0 1   shift;
366 0           return AnyEvent::MySQL::db->new(@_);
367             }
368              
369             package AnyEvent::MySQL::db;
370              
371 1     1   11 use strict;
  1         3  
  1         29  
372 1     1   6 use warnings;
  1         3  
  1         38  
373              
374 1     1   7 use AE;
  1         3  
  1         22  
375 1     1   5 use AnyEvent::Socket;
  1         4  
  1         121  
376 1     1   8 use AnyEvent::Handle;
  1         3  
  1         37  
377 1     1   8 use Scalar::Util qw(weaken dualvar);
  1         2  
  1         69  
378 1     1   10 use Guard;
  1         3  
  1         76  
379              
380             # connection state
381             use constant {
382 1         120 BUSY_CONN => 1,
383             IDLE_CONN => 2,
384             ZOMBIE_CONN => 3,
385 1     1   9 };
  1         2  
386              
387             # transaction state
388             use constant {
389 1         116 NO_TXN => 1,
390             EMPTY_TXN => 2,
391             CLEAN_TXN => 3,
392             DIRTY_TXN => 4,
393             DEAD_TXN => 5,
394 1     1   10 };
  1         3  
395              
396             # transaction control token
397             use constant {
398 1         114 TXN_TASK => 1,
399             TXN_BEGIN => 2,
400             TXN_COMMIT => 3,
401             TXN_ROLLBACK => 4,
402 1     1   9 };
  1         3  
403              
404             use constant {
405 1         6407 AUTHi => 0,
406             ATTRi => 1,
407             HDi => 2,
408             CONNi => 9,
409             ON_CONNi => 11,
410              
411             CONN_STATEi => 3,
412             TXN_STATEi => 4,
413              
414             TASKi => 5,
415             STi => 6,
416             FALLBACKi => 10,
417              
418             ERRi => 7,
419             ERRSTRi => 8,
420 1     1   9 };
  1         3  
421              
422             sub _push_task {
423 0     0     my($dbh, $task) = @_;
424 0           push @{$dbh->{_}[TASKi]}, $task;
  0            
425 0 0         _process_task($dbh) if( $dbh->{_}[CONN_STATEi]==IDLE_CONN );
426             }
427              
428             sub _unshift_task {
429 0     0     my($dbh, $task) = @_;
430 0           unshift @{$dbh->{_}[TASKi]}, $task;
  0            
431             }
432              
433             sub _report_error {
434 0     0     my($dbh, $method, $error_num, $error_str) = @_;
435              
436 0           $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = $error_num;
437 0           $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = $error_str;
438 0 0         warn "$dbh $method failed: $error_str ($error_num)\n" if( $dbh->{_}[ATTRi]{PrintError} );
439              
440 0 0         $dbh->{_}[TXN_STATEi] = DEAD_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
441             }
442              
443             sub _reconnect {
444 0     0     my $dbh = shift;
445 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
446 0           my $retry; $retry = AE::timer .1, 0, sub {
447 0     0     undef $retry;
448 0           _connect($dbh);
449 0           };
450             }
451              
452             sub _connect {
453 0     0     my $dbh = shift;
454 0   0       my $cb = $dbh->{_}[ON_CONNi] || \&AnyEvent::MySQL::_empty_cb;
455 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
456              
457 0           my $param = $dbh->{Name};
458 0           my $database;
459 0 0         if( index($param, '=')>=0 ) {
460             $param = {
461 0           map { split /=/, $_, 2 } split /;/, $param
  0            
462             };
463 0 0         if( $param->{host} =~ /(.*):(.*)/ ) {
464 0           $param->{host} = $1;
465 0           $param->{port} = $2;
466             }
467             }
468             else {
469 0           $param = { database => $param };
470             }
471              
472 0   0       $param->{port} ||= 3306;
473              
474 0 0 0       if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
475 0   0       my $sock = $param->{mysql_socket} || `mysql_config --socket`;
476 0 0         if( !$sock ) {
477 0           _report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
478 0           $cb->();
479 0           return;
480             }
481 0           $param->{host} = '/unix';
482 0           $param->{port} = $sock;
483             }
484              
485 0           warn "Connecting to $param->{host}:$param->{port} ...";
486 0           weaken( my $wdbh = $dbh );
487             $dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
488 0     0     my $fh = shift;
489 0 0         if( !$fh ) {
490 0           warn "Connect to $param->{host}:$param->{port} fail: $! retry later.";
491 0           undef $wdbh->{_}[CONNi];
492              
493 0           _reconnect($wdbh);
494 0           return;
495             }
496 0           warn "Connected ($param->{host}:$param->{port})";
497              
498             $wdbh->{_}[HDi] = AnyEvent::Handle->new(
499             fh => $fh,
500             on_error => sub {
501 0 0         return if !$wdbh;
502              
503 0           my $wwdbh = $wdbh;
504 0 0         if( $_[1] ) {
505 0           warn "Disconnected from $param->{host}:$param->{port} by $_[2] reconnect later.";
506 0           undef $wwdbh->{_}[HDi];
507 0           undef $wwdbh->{_}[CONNi];
508 0           $wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
509 0           _report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
510 0 0         if( $wwdbh->{_}[FALLBACKi] ) {
511 0           $wwdbh->{_}[FALLBACKi]();
512             }
513             }
514             },
515 0           );
516              
517             AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
518 0           my($success, $err_num_and_msg, $thread_id) = @_;
519 0 0         return if !$wdbh;
520 0 0         if( $success ) {
521 0           $wdbh->{mysql_thread_id} = $thread_id;
522             $cb->($wdbh, guard {
523 0 0         _process_task($wdbh) if $wdbh;
524 0           });
525             }
526             else {
527 0           warn "MySQL auth error: $err_num_and_msg retry later.";
528 0           undef $wdbh->{_}[HDi];
529 0           undef $wdbh->{_}[CONNi];
530 0 0         _reconnect($wdbh) if $wdbh;
531             }
532 0           });
533 0           });
534             }
535              
536             sub _process_task {
537 0     0     my $dbh = shift;
538 0           $dbh->{_}[CONN_STATEi] = IDLE_CONN;
539 0           $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
540 0           $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
541 0           $dbh->{_}[FALLBACKi] = undef;
542 0           weaken( my $wdbh = $dbh );
543              
544 0 0         if( !$dbh->{_}[HDi] ) {
545 0           _reconnect($dbh);
546 0           return;
547             }
548              
549 0           my $task = shift @{$dbh->{_}[TASKi]};
  0            
550 0 0         return if( !$task );
551              
552             my $next = sub {
553 0 0   0     _process_task($wdbh) if $wdbh;
554 0           };
555              
556             $dbh->{_}[FALLBACKi] = sub {
557 0     0     undef $dbh->{_}[FALLBACKi];
558 0 0 0       if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
559 0           ++$task->[3];
560 0           warn "redo the task later.. ($task->[3])";
561 0           unshift @{$dbh->{_}[TASKi]}, $task;
  0            
562             }
563             else {
564 0           $task->[2]();
565             }
566 0           _reconnect($dbh);
567 0           };
568 0 0         if( $task->[0]==TXN_TASK ) {
    0          
    0          
    0          
569 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
570 0           _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
571 0           $task->[2]();
572 0           _process_task($dbh);
573             }
574             else {
575 0 0         $dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
576 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
577             $task->[1](sub {
578 0 0 0 0     if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
579 0           _rollback($dbh, $next);
580             }
581             else {
582 0           $next->();
583             }
584 0           });
585             }
586             }
587             elsif( $task->[0]==TXN_BEGIN ) {
588 0 0         if( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
    0          
589 0           $dbh->{_}[TXN_STATEi] = DEAD_TXN;
590 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
591 0           $task->[1]($next);
592             }
593             elsif( $dbh->{_}[TXN_STATEi]==EMPTY_TXN ) {
594 0           $task->[2]($next);
595 0           _process_task($dbh);
596             }
597             else {
598 0           warn "It's in a transaction already.. Abort the old one and begin the new one.";
599 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
600             _rollback($dbh, sub {
601 0     0     $task->[1]($next);
602 0           });
603             }
604             }
605             elsif( $task->[0]==TXN_COMMIT ) {
606 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
    0          
607 0           _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
608 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
609 0           $task->[2]();
610 0           _process_task($dbh);
611             }
612             elsif( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
613 0           $task->[2]();
614 0           _process_task($dbh);
615             }
616             else {
617 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
618 0           $task->[1]($next);
619             }
620             }
621             elsif( $task->[0]==TXN_ROLLBACK ) {
622 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
    0          
623 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
624 0           $task->[2](1);
625 0           _process_task($dbh);
626             }
627             elsif( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
628 0           $task->[2]();
629 0           _process_task($dbh);
630             }
631             else {
632 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
633 0           $task->[1]($next);
634             }
635             }
636             else {
637 0           warn "Never be here";
638             }
639             }
640              
641             sub _text_prepare {
642 0     0     my $statement = shift;
643 0           $statement =~ s(\?){
644 0           my $value = shift;
645 0 0         if( defined($value) ) {
646 0           $value =~ s/\\/\\\\/g;
647 0           $value =~ s/'/\\'/g;
648 0           "'$value'";
649             }
650             else {
651 0           'NULL';
652             }
653             }ge;
654 0           return $statement;
655             }
656              
657             =head2 $dbh = AnyEvent::MySQL::db->new($dsn, $username, [$auth, [\%attr,]] [$cb->($dbh, $next_guard)])
658              
659             $cb will be called when each time the db connection is connected, reconnected,
660             or tried but failed.
661              
662             If failed, the $dbh in the $cb's args will be undef.
663              
664             You can do some connection initialization here, such as
665             set names utf8;
666              
667             But you should NOT rely on this for work flow control,
668             cause the reconnection can occur anytime.
669              
670             =cut
671             sub new {
672 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
673 0           my($class, $dsn, $username, $auth, $attr) = @_;
674              
675 0           my $dbh = bless { _ => [] }, $class;
676 0 0         if( $dsn =~ /^DBI:mysql:(.*)$/ ) {
677 0           $dbh->{Name} = $1;
678             }
679             else {
680 0           die "invalid dsn format";
681             }
682 0           $dbh->{Username} = $username;
683 0           $dbh->{_}[AUTHi] = $auth;
684 0 0         $dbh->{_}[ATTRi] = +{ Verbose => 1, %{ $attr || {} } };
  0            
685 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
686 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
687 0           $dbh->{_}[TASKi] = [];
688 0           $dbh->{_}[ON_CONNi] = $cb;
689              
690 0           _connect($dbh);
691              
692 0           return $dbh;
693             }
694              
695             =head2 $error_num = $dbh->err
696              
697             =cut
698             sub err {
699 0     0     return $_[0]{_}[ERRi];
700             }
701              
702             =head2 $error_str = $dbh->errstr
703              
704             =cut
705             sub errstr {
706 0     0     return $_[0]{_}[ERRSTRi];
707             }
708              
709             =head2 $rv = $dbh->last_insert_id
710              
711             Non-blocking get the value immediately
712              
713             =cut
714             sub last_insert_id {
715 0     0     $_[0]{mysql_insertid};
716             }
717              
718             sub _do {
719 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
720 0           my($rev_dir, $dbh, $statement, $attr, @bind_values) = @_;
721              
722 0 0 0       if( $dbh->{_}[ATTRi]{ReadOnly} && $statement !~ /^\s*(?:show|select|set\s+names)\s+/i ){
723 0           _report_error($dbh, 'do', 1227, 'unable to perform write queries on a ReadOnly handle');
724 0           $cb->();
725 0           return;
726             }
727              
728             my @args = ($dbh, [TXN_TASK, sub {
729 0     0     my $next_act = shift;
730 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
731             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
732 0           eval {
733 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
734 0           $dbh->{mysql_insertid} = $_[2];
735 0           $cb->($_[1]);
736             }
737             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
738 0           _report_error($dbh, 'do', $_[1], $_[3]);
739 0           $cb->();
740             }
741             else {
742 0           $cb->(0+@{$_[2]});
  0            
743             }
744             };
745 0           $next_act->();
746 0           });
747 0           }, $cb, 0]);
748              
749 0 0         if( $rev_dir ) {
750 0           _unshift_task(@args);
751             }
752             else {
753 0           _push_task(@args);
754             }
755             }
756              
757             =head2 $dbh->do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])
758              
759             =cut
760             sub do {
761 0     0     unshift @_, 0;
762 0           &_do;
763             }
764              
765             =head2 $dbh->pre_do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])
766              
767             This method is like $dbh->do except that $dbh->pre_do will unshift
768             job into the queue instead of push.
769              
770             This method is for the initializing actions in the AnyEvent::MySQL->connect's callback
771              
772             =cut
773             sub pre_do {
774 0     0     unshift @_, 1;
775 0           &_do;
776             }
777              
778             =head2 $dbh->selectall_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))
779              
780             =cut
781             sub selectall_arrayref {
782 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
783 0           my($dbh, $statement, $attr, @bind_values) = @_;
784              
785             _push_task($dbh, [TXN_TASK, sub {
786 0     0     my $next_act = shift;
787 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
788             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
789 0           eval {
790 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
791 0           $dbh->{mysql_insertid} = $_[2];
792 0           $cb->([]);
793             }
794             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
795 0           _report_error($dbh, 'selectall_arrayref', $_[1], $_[3]);
796 0           $cb->();
797             }
798             else {
799 0           $cb->($_[2]);
800             }
801             };
802 0           $next_act->();
803 0           });
804 0           }, $cb, 0]);
805             }
806              
807              
808             =head2 $dbh->selectall_hashref($statement, [$key_field|\@key_field], [\%attr, [@bind_values,]] $cb->($hash_ref))
809              
810             =cut
811              
812             sub selectall_hashref {
813 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
814 0           my($dbh, $statement, $key_field) = splice @_, 0, 3;
815              
816 0           my @key_field;
817 0 0         if( ref($key_field) eq 'ARRAY' ) {
    0          
    0          
818 0           @key_field = @$key_field;
819             }
820             elsif( ref($key_field) eq 'HASH' ) {
821 0           unshift @_, $key_field;
822 0           @key_field = ();
823             }
824             elsif( defined($key_field) ) {
825 0           @key_field = ($key_field);
826             }
827             else {
828 0           @key_field = ();
829             }
830              
831 0           my($attr, @bind_values) = @_;
832              
833             _push_task($dbh, [TXN_TASK, sub {
834 0     0     my $next_act = shift;
835 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
836             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
837 0           eval {
838 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
839 0           $dbh->{mysql_insertid} = $_[2];
840 0 0         if( @key_field ) {
841 0           $cb->({});
842             }
843             else {
844 0           $cb->([]);
845             }
846             }
847             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
848 0           _report_error($dbh, 'selectall_hashref', $_[1], $_[3]);
849 0           $cb->();
850             }
851             else {
852 0           my $res;
853 0 0         if( @key_field ) {
854 0           $res = {};
855             }
856             else {
857 0           $res = [];
858             }
859 0           for(my $i=$#{$_[2]}; $i>=0; --$i) {
  0            
860 0           my %record;
861 0           for(my $j=$#{$_[2][$i]}; $j>=0; --$j) {
  0            
862 0           $record{$_[1][$j][4]} = $_[2][$i][$j];
863             }
864 0 0         if( @key_field ) {
865 0           my $h = $res;
866 0           for(@key_field[0..$#key_field-1]) {
867 0   0       $h->{$record{$_}} ||= {};
868 0           $h = $h->{$record{$_}};
869             }
870 0           $h->{$record{$key_field[-1]}} = \%record;
871             }
872             else {
873 0           push @$res, \%record;
874             }
875             }
876 0           $cb->($res);
877             }
878             };
879 0           $next_act->();
880 0           });
881 0           }, $cb, 0]);
882             }
883              
884             =head2 $dbh->selectcol_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))
885              
886             =cut
887             sub selectcol_arrayref {
888 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
889 0           my($dbh, $statement, $attr, @bind_values) = @_;
890 0   0       $attr ||= {};
891 0 0         my @columns = map { $_-1 } @{ $attr->{Columns} || [1] };
  0            
  0            
892              
893             _push_task($dbh, [TXN_TASK, sub {
894 0     0     my $next_act = shift;
895 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
896             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
897 0           eval {
898 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
899 0           $dbh->{mysql_insertid} = $_[2];
900 0           $cb->([]);
901             }
902             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
903 0           _report_error($dbh, 'selectcol_arrayref', $_[1], $_[3]);
904 0           $cb->();
905             }
906             else {
907             my @res = map {
908 0           my $r = $_;
909 0           map { $r->[$_] } @columns
  0            
910 0           } @{$_[2]};
  0            
911 0           $cb->(\@res);
912             }
913             };
914 0           $next_act->();
915 0           });
916 0           }, $cb, 0]);
917             }
918              
919             =head2 $dbh->selectrow_array($statement, [\%attr, [@bind_values,]], $cb->(@row_ary))
920              
921             =cut
922             sub selectrow_array {
923 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
924 0           my($dbh, $statement, $attr, @bind_values) = @_;
925              
926             _push_task($dbh, [TXN_TASK, sub {
927 0     0     my $next_act = shift;
928 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
929             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
930 0           eval {
931 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
932 0           $dbh->{mysql_insertid} = $_[2];
933 0           $cb->();
934             }
935             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
936 0           _report_error($dbh, 'selectrow_array', $_[1], $_[3]);
937 0           $cb->();
938             }
939             else {
940 0 0         $cb->($_[2][0] ? @{$_[2][0]} : ());
  0            
941             }
942             };
943 0           $next_act->();
944 0           });
945 0           }, $cb, 0]);
946             }
947              
948             =head2 $dbh->selectrow_arrayref($statement, [\%attr, [@bind_values,]], $cb->($ary_ref))
949              
950             =cut
951             sub selectrow_arrayref {
952 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
953 0           my($dbh, $statement, $attr, @bind_values) = @_;
954              
955             _push_task($dbh, [TXN_TASK, sub {
956 0     0     my $next_act = shift;
957 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
958             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
959 0           eval {
960 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
961 0           $dbh->{mysql_insertid} = $_[2];
962 0           $cb->(undef);
963             }
964             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
965 0           _report_error($dbh, 'selectrow_arrayref', $_[1], $_[3]);
966 0           $cb->(undef);
967             }
968             else {
969 0           $cb->($_[2][0]);
970             }
971             };
972 0           $next_act->();
973 0           });
974 0           }, $cb, 0]);
975             }
976              
977             =head2 $dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))
978              
979             =cut
980             sub selectrow_hashref {
981 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
982 0           my($dbh, $statement, $attr, @bind_values) = @_;
983              
984             _push_task($dbh, [TXN_TASK, sub {
985 0     0     my $next_act = shift;
986 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
987             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
988 0           eval {
989 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
990 0           $dbh->{mysql_insertid} = $_[2];
991 0           $cb->(undef);
992             }
993             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
994 0           _report_error($dbh, 'selectrow_hashref', $_[1], $_[3]);
995 0           $cb->(undef);
996             }
997             else {
998 0 0         if( $_[2][0] ) {
999 0           my %record;
1000 0           for(my $j=$#{$_[2][0]}; $j>=0; --$j) {
  0            
1001 0           $record{$_[1][$j][4]} = $_[2][0][$j];
1002             }
1003 0           $cb->(\%record);
1004             }
1005             else {
1006 0           $cb->(undef);
1007             }
1008             }
1009             };
1010 0           $next_act->();
1011 0           });
1012 0           }, $cb, 0]);
1013             }
1014              
1015             =head2 $sth = $dbh->prepare($statement, [$cb->($sth)])
1016              
1017             $cb will be called each time when this statement is prepared
1018             (or re-prepared when the db connection is reconnected)
1019              
1020             if the preparation is not success,
1021             the $sth in the $cb's arg will be undef.
1022              
1023             So you should NOT rely on this for work flow controlling.
1024              
1025             =cut
1026             sub prepare {
1027 0     0     my $dbh = $_[0];
1028              
1029 0           my $sth = AnyEvent::MySQL::st->new(@_);
1030 0           push @{$dbh->{_}[STi]}, $sth;
  0            
1031 0           weaken($dbh->{_}[STi][-1]);
1032 0           return $sth;
1033             }
1034              
1035             =head2 $dbh->begin_work([$cb->($rv)])
1036              
1037             =cut
1038             sub begin_work {
1039 0     0     my $dbh = shift;
1040 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1041              
1042             _push_task($dbh, [TXN_BEGIN, sub {
1043 0     0     my $next_act = shift;
1044 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'begin');
1045             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1046 0           eval {
1047 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1048 0           $dbh->{_}[TXN_STATEi] = EMPTY_TXN;
1049 0           $cb->(1);
1050             }
1051             else {
1052 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1053 0           _report_error($dbh, 'begin_work', $_[1], $_[3]);
1054             }
1055             else {
1056 0           _report_error($dbh, 'begin_work', 2000, "Unexpected result: $_[0]");
1057             }
1058 0           $cb->();
1059             }
1060             };
1061 0           $next_act->();
1062 0           });
1063 0           }, $cb, 0]);
1064             }
1065              
1066             =head2 $dbh->commit([$cb->($rv)])
1067              
1068             =cut
1069             sub commit {
1070 0     0     my $dbh = shift;
1071 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1072              
1073             _push_task($dbh, [TXN_COMMIT, sub {
1074 0     0     my $next_act = shift;
1075              
1076 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'commit');
1077             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1078 0           eval {
1079 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1080 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
1081 0           $cb->(1);
1082             }
1083             else {
1084 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1085 0           _report_error($dbh, 'commit', $_[1], $_[3]);
1086             }
1087             else {
1088 0           _report_error($dbh, 'commit', 2000, "Unexpected result: $_[0]");
1089             }
1090 0           $cb->();
1091             }
1092             };
1093 0           $next_act->();
1094 0           });
1095 0           }, $cb, 0]);
1096             }
1097              
1098             =head2 $dbh->rollback([$cb->($rv)])
1099              
1100             =cut
1101             sub rollback {
1102 0     0     my $dbh = shift;
1103 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1104              
1105             _push_task($dbh, [TXN_ROLLBACK, sub {
1106 0     0     my $next_act = shift;
1107              
1108             _rollback($dbh, $next_act, sub {
1109 0 0         $dbh->{_}[TXN_STATEi] = NO_TXN if( $_[0] );
1110 0           &$cb;
1111 0           });
1112 0           }, $cb, 0]);
1113             }
1114             sub _rollback {
1115 0     0     my($dbh, $next_act, $cb) = @_;
1116              
1117 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'rollback');
1118             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1119 0     0     eval {
1120 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1121 0 0         $cb->(1) if $cb;
1122             }
1123             else {
1124 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1125 0           _report_error($dbh, 'rollback', $_[1], $_[3]);
1126             }
1127             else {
1128 0           _report_error($dbh, 'rollback', 2000, "Unexpected result: $_[0]");
1129             }
1130 0 0         $cb->() if $cb;
1131             }
1132             };
1133 0           $next_act->();
1134 0           });
1135             }
1136              
1137             =head2 $dbh->ping(sub {my $alive = shift;});
1138              
1139             =cut
1140              
1141             sub ping {
1142 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1143 0           my ($dbh) = @_;
1144              
1145             _push_task($dbh, [TXN_TASK, sub {
1146 0     0     my $next_act = shift;
1147 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_PING);
1148             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1149 0           eval {
1150 0 0         if ($_[0]==AnyEvent::MySQL::Imp::RES_OK) {
1151 0           $cb->(1);
1152             }
1153             else {
1154 0           $cb->(0);
1155             }
1156             };
1157 0           $next_act->();
1158 0           });
1159 0           }, $cb, 0]);
1160             }
1161              
1162             package AnyEvent::MySQL::st;
1163              
1164 1     1   12 use strict;
  1         3  
  1         27  
1165 1     1   8 use warnings;
  1         3  
  1         50  
1166              
1167 1     1   31 use Scalar::Util qw(weaken);
  1         4  
  1         131  
1168              
1169             use constant {
1170 1         825 DBHi => 0,
1171             IDi => 1,
1172             PARAMi => 2,
1173             FIELDi => 3,
1174             STATEMENTi => 4,
1175 1     1   10 };
  1         3  
1176              
1177             =head2 $sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])
1178              
1179             =cut
1180             sub new {
1181 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1182 0           my($class, $dbh, $statement) = @_;
1183 0           my $sth = bless [], $class;
1184 0           $sth->[DBHi] = $dbh;
1185 0           $sth->[STATEMENTi] = $statement;
1186              
1187 0           return $sth;
1188             }
1189              
1190             =head2 $sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])
1191              
1192             =cut
1193             sub execute {
1194 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1195 0 0         my $attr = ref($_[-1]) eq 'HASH' ? pop : {};
1196 0           my($sth, @bind_values) = @_;
1197 0           my $dbh = $sth->[DBHi];
1198              
1199              
1200             AnyEvent::MySQL::db::_push_task($dbh, [AnyEvent::MySQL::db::TXN_TASK, sub {
1201 0     0     my $next_act = shift;
1202              
1203             my $execute = sub {
1204 0           AnyEvent::MySQL::Imp::do_execute_param($dbh->{_}[AnyEvent::MySQL::db::HDi], $sth->[IDi], \@bind_values, $sth->[PARAMi]);
1205             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], execute => 1, sub {
1206 0           eval {
1207 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
    0          
1208 0           $cb->($_[1]);
1209             }
1210             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_RESULT ) {
1211 0           $cb->(AnyEvent::MySQL::ft->new($sth->[FIELDi], $_[2]));
1212             }
1213             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1214 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
1215 0           $cb->();
1216             }
1217             else {
1218 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unknown response: $_[0]");
1219 0           $cb->();
1220             }
1221             };
1222 0           $next_act->();
1223 0           });
1224 0           };
1225              
1226 0 0         if( $sth->[IDi] ) {
1227 0           $execute->();
1228             }
1229             else {
1230 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[AnyEvent::MySQL::db::HDi], 0, AnyEvent::MySQL::Imp::COM_STMT_PREPARE, $sth->[STATEMENTi]);
1231             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], prepare => 1, sub {
1232 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_PREPARE ) {
1233 0           $sth->[IDi] = $_[1];
1234 0           $sth->[PARAMi] = $_[2];
1235 0           $sth->[FIELDi] = $_[3];
1236              
1237 0           $execute->();
1238             }
1239             else {
1240 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1241 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
1242 0           $cb->();
1243             }
1244             else {
1245 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unexpected response: $_[0]");
1246 0           $cb->();
1247             }
1248             }
1249 0           });
1250             }
1251 0           }, $cb, 0]);
1252             }
1253              
1254             package AnyEvent::MySQL::ft;
1255              
1256 1     1   11 use strict;
  1         2  
  1         36  
1257 1     1   8 use warnings;
  1         3  
  1         47  
1258              
1259             use constant {
1260 1         1690 DATAi => 0,
1261             BINDi => 1,
1262             FIELDi => 2,
1263 1     1   8 };
  1         3  
1264              
1265             =head2 $fth = AnyEvent::MySQL::ft->new(\@data_set)
1266              
1267             =cut
1268             sub new {
1269 0     0     my($class, $field_set, $data_set) = @_;
1270              
1271 0           my $fth = bless [], $class;
1272 0           $fth->[FIELDi] = $field_set;
1273 0           $fth->[DATAi] = $data_set;
1274              
1275 0           return $fth;
1276             }
1277              
1278             =head2 $rc = $fth->bind_columns(@list_of_refs_to_vars_to_bind, [$cb->($rc)])
1279              
1280             =cut
1281             sub bind_columns {
1282 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1283 0           my $fth = shift;
1284 0           my @list_of_refs_to_vars_to_bind = @_;
1285              
1286 0 0         if( !@{$fth->[DATAi]} ) {
  0 0          
1287 0 0         $cb->(1) if $cb;
1288 0           return 1;
1289             }
1290 0           elsif( @{$fth->[DATAi][0]} == @list_of_refs_to_vars_to_bind ) {
1291 0           $fth->[BINDi] = \@list_of_refs_to_vars_to_bind;
1292 0 0         $cb->(1) if $cb;
1293 0           return 1;
1294             }
1295             else {
1296 0 0         $cb->() if $cb;
1297 0           return;
1298             }
1299             }
1300              
1301             =head2 $rc = $fth->bind_col($col_num, \$col_variable, [$cb->($rc)])
1302              
1303             =cut
1304             sub bind_col {
1305 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1306 0           my($fth, $col_num, $col_ref) = @_;
1307              
1308 0 0 0       if( !@{$fth->[DATAi]} ) {
  0 0          
1309 0 0         $cb->(1) if $cb;
1310 0           return 1;
1311             }
1312 0           elsif( 0<=$col_num && $col_num<=$#{$fth->[DATAi][0]} ) {
1313 0           $fth->[BINDi][$col_num] = $col_ref;
1314 0 0         $cb->(1) if $cb;
1315 0           return 1;
1316             }
1317             else {
1318 0 0         $cb->() if $cb;
1319 0           return;
1320             }
1321             }
1322              
1323             =head2 $rv = $fth->fetch([$cb->($rv)])
1324              
1325             =cut
1326             sub fetch {
1327 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1328 0           my $fth = shift;
1329              
1330 0 0 0       if( $fth->[BINDi] && $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0   0        
1331 0           my $bind = $fth->[BINDi];
1332 0           my $row = shift @{$fth->[DATAi]};
  0            
1333 0           for(my $i=0; $i<@$row; ++$i) {
1334 0 0         ${$bind->[$i]} = $row->[$i] if $bind->[$i];
  0            
1335             }
1336 0 0         $cb->(1) if $cb;
1337 0           return 1;
1338             }
1339             else {
1340 0 0         $cb->() if $cb;
1341 0           return;
1342             }
1343             }
1344              
1345             =head2 @row_ary = $fth->fetchrow_array([$cb->(@row_ary)])
1346              
1347             =cut
1348             sub fetchrow_array {
1349 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1350 0           my $fth = shift;
1351              
1352 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1353 0           my $row = shift @{$fth->[DATAi]};
  0            
1354 0 0         $cb->(@$row) if $cb;
1355 0 0         return @$row if defined wantarray;
1356             }
1357             else {
1358 0 0         $cb->() if $cb;
1359 0           return ();
1360             }
1361             }
1362              
1363             =head2 $ary_ref = $fth->fetchrow_arrayref([$cb->($ary_ref)])
1364              
1365             =cut
1366             sub fetchrow_arrayref {
1367 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1368 0           my $fth = shift;
1369              
1370 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1371 0           my $row = shift @{$fth->[DATAi]};
  0            
1372 0 0         $cb->($row) if $cb;
1373 0           return $row;
1374             }
1375             else {
1376 0 0         $cb->() if $cb;
1377 0           return;
1378             }
1379             }
1380              
1381             =head2 $hash_ref = $fth->fetchrow_hashref([$cb->($hash_ref)])
1382              
1383             =cut
1384             sub fetchrow_hashref {
1385 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1386 0           my $fth = shift;
1387              
1388 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1389 0           my $field = $fth->[FIELDi];
1390 0           my $hash = {};
1391 0           my $row = shift @{$fth->[DATAi]};
  0            
1392 0           for(my $i=0; $i<@$row; ++$i) {
1393 0           $hash->{$field->[$i][4]} = $row->[$i];
1394             }
1395 0 0         $cb->($hash) if $cb;
1396 0           return $hash;
1397             }
1398             else {
1399 0 0         $cb->() if $cb;
1400 0           return;
1401             }
1402             }
1403              
1404             =head2 $ary_ref = $fth->fetchall_arrayref([$cb->($ary_ref)])
1405              
1406             =cut
1407             sub fetchall_arrayref {
1408 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1409 0           my $fth = shift;
1410              
1411 0 0         if( $fth->[DATAi] ) {
1412 0           my $all = delete $fth->[DATAi];
1413 0 0         $cb->($all) if $cb;
1414 0           return $all;
1415             }
1416             else {
1417 0 0         $cb->() if $cb;
1418 0           return;
1419             }
1420             }
1421              
1422             =head2 $hash_ref = $fth->fetchall_hashref([($key_field|\@key_field),] [$cb->($hash_ref)])
1423              
1424             =cut
1425             sub fetchall_hashref {
1426 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1427 0           my($fth, $key_field) = @_;
1428              
1429 0           my @key_field;
1430 0 0         if( ref($key_field) eq 'ARRAY' ) {
    0          
1431 0           @key_field = @$key_field;
1432             }
1433             elsif( defined($key_field) ) {
1434 0           @key_field = ($key_field);
1435             }
1436             else {
1437 0           @key_field = ();
1438             }
1439              
1440 0 0         if( $fth->[DATAi] ) {
1441 0           my $field = $fth->[FIELDi];
1442              
1443 0           my $res;
1444 0 0         if( @key_field ) {
1445 0           $res = {};
1446             }
1447             else {
1448 0           $res = [];
1449             }
1450              
1451 0           while( @{$fth->[DATAi]} ) {
  0            
1452 0           my $row = shift @{$fth->[DATAi]};
  0            
1453 0           my %record;
1454 0           for(my $i=0; $i<@$row; ++$i) {
1455 0           $record{$field->[$i][4]} = $row->[$i];
1456             }
1457 0 0         if( @key_field ) {
1458 0           my $h = $res;
1459 0           for(@key_field[0..$#key_field-1]) {
1460 0   0       $h->{$record{$_}} ||= {};
1461 0           $h = $h->{$record{$_}};
1462             }
1463 0           $h->{$record{$key_field[-1]}} = \%record;
1464             }
1465             else {
1466 0           push @$res, \%record;
1467             }
1468             }
1469 0           delete $fth->[DATAi];
1470 0 0         $cb->($res) if $cb;
1471 0           return $res;
1472             }
1473             else {
1474 0 0         $cb->() if $cb;
1475 0           return;
1476             }
1477             }
1478              
1479             =head2 $ary_ref = $fth->fetchcol_arrayref([\%attr], [$cb->($ary_ref)])
1480              
1481             =cut
1482             sub fetchcol_arrayref {
1483 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1484 0           my($fth, $attr) = @_;
1485 0   0       $attr ||= {};
1486 0 0         my @columns = map { $_-1 } @{ $attr->{Columns} || [1] };
  0            
  0            
1487              
1488 0 0         if( $fth->[DATAi] ) {
1489             my @res = map {
1490 0           my $r = $_;
1491 0           map { $r->[$_] } @columns
  0            
1492 0           } @{ delete $fth->[DATAi] };
  0            
1493 0 0         $cb->(\@res) if $cb;
1494 0           return \@res;
1495             }
1496             else {
1497 0 0         $cb->() if $cb;
1498 0           return;
1499             }
1500             }
1501              
1502             =head1 AUTHOR
1503              
1504             Cindy Wang (CindyLinz)
1505              
1506             =head1 CONTRIBUTOR
1507              
1508             Dmitriy Shamatrin L
1509              
1510             clking L
1511              
1512             =head1 BUGS
1513              
1514             Please report any bugs or feature requests to C.
1515             I will be notified, and then you'll
1516             automatically be notified of progress on your bug as I make changes.
1517              
1518              
1519              
1520              
1521             =head1 SUPPORT
1522              
1523             You can find documentation for this module with the perldoc command.
1524              
1525             perldoc AnyEvent::MySQL
1526              
1527              
1528             You can also look for information at:
1529              
1530             =over 4
1531              
1532             =item * github
1533              
1534             L
1535              
1536             =item * Search CPAN
1537              
1538             L
1539              
1540             =back
1541              
1542              
1543             =head1 LICENSE AND COPYRIGHT
1544              
1545             Copyright 2011-2015 Cindy Wang (CindyLinz).
1546              
1547             This program is free software; you can redistribute it and/or modify it
1548             under the terms of either: the GNU General Public License as published
1549             by the Free Software Foundation; or the Artistic License.
1550              
1551             See http://dev.perl.org/licenses/ for more information.
1552              
1553             =cut
1554              
1555             1; # End of AnyEvent::MySQL