File Coverage

blib/lib/AnyEvent/MySQL.pm
Criterion Covered Total %
statement 65 618 10.5
branch 0 270 0.0
condition 0 51 0.0
subroutine 22 81 27.1
pod 1 1 100.0
total 88 1021 8.6


line stmt bran cond sub pod time code
1             package AnyEvent::MySQL;
2              
3 1     1   12679 use 5.006;
  1         2  
4 1     1   3 use strict;
  1         1  
  1         18  
5 1     1   2 use warnings;
  1         4  
  1         34  
6              
7             =head1 NAME
8              
9             AnyEvent::MySQL - Pure Perl AnyEvent socket implementation of MySQL client
10              
11             =head1 VERSION
12              
13             Version 1.1.6
14              
15             =cut
16              
17             our $VERSION = '1.001007';
18              
19 1     1   395 use AnyEvent::MySQL::Imp;
  1         2  
  1         108  
20              
21              
22             =head1 SYNOPSIS
23              
24             This package is used in my company since 2012 to today (2014). I think it should be stable.
25             (though some data type fetching through prepared command are not implemented)
26              
27             Please read the test.pl file as a usage example. >w<
28              
29             #!/usr/bin/perl
30              
31             use strict;
32             use warnings;
33              
34             BEGIN {
35             eval {
36             require AE;
37             require Data::Dumper;
38             require Devel::StackTrace;
39             require EV;
40             };
41             if( $@ ) {
42             warn "require module fail: $@";
43             exit;
44             }
45             }
46              
47             $EV::DIED = sub {
48             print "EV::DIED: $@\n";
49             print Devel::StackTrace->new->as_string;
50             };
51              
52             use lib 'lib';
53             use AnyEvent::MySQL;
54              
55             my $end = AE::cv;
56              
57             my $dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { PrintError => 1 }, sub {
58             my($dbh) = @_;
59             if( $dbh ) {
60             warn "Connect success!";
61             $dbh->pre_do("set names latin1");
62             $dbh->pre_do("set names utf8");
63             }
64             else {
65             warn "Connect fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
66             $end->send;
67             }
68             });
69              
70             $dbh->do("select * from t1 where a<=?", {}, 15, sub {
71             my $rv = shift;
72             if( defined($rv) ) {
73             warn "Do success: $rv";
74             }
75             else {
76             warn "Do fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
77             }
78             $end->send;
79             });
80              
81             #$end->recv;
82             my $end2 = AE::cv;
83              
84             #$dbh->prepare("update t1 set a=1 where b=1", sub {
85             #$dbh->prepare("select * from t1", sub {
86             my $sth = $dbh->prepare("select b, a aaa from t1 where a>?", sub {
87             #$dbh->prepare("select * from type_all", sub {
88             warn "prepared!";
89             $end2->send;
90             });
91              
92             #$end2->recv;
93              
94             my $end3 = AE::cv;
95              
96             $sth->execute(1, sub {
97             warn "executed! $_[0]";
98             $end3->send($_[0]);
99             });
100              
101             my $fth = $end3->recv;
102              
103             my $end4 = AE::cv;
104              
105             $fth->bind_col(2, \my $a, sub {
106             warn $_[0];
107             });
108             my $fetch; $fetch = sub {
109             $fth->fetch(sub {
110             if( $_[0] ) {
111             warn "Get! $a";
112             $fetch->();
113             }
114             else {
115             warn "Get End!";
116             undef $fetch;
117             $end4->send;
118             }
119             });
120             }; $fetch->();
121              
122             #$fth->bind_columns(\my($a, $b), sub {
123             # warn $_[0];
124             # warn $AnyEvent::MySQL::errstr;
125             #});
126             #my $fetch; $fetch = sub {
127             # $fth->fetch(sub {
128             # if( $_[0] ) {
129             # warn "Get! ($a, $b)";
130             # $fetch->();
131             # }
132             # else {
133             # undef $fetch;
134             # $end4->send;
135             # }
136             # });
137             #}; $fetch->();
138              
139             #my $fetch; $fetch = sub {
140             # $fth->fetchrow_array(sub {
141             # if( @_ ) {
142             # warn "Get! (@_)";
143             # $fetch->();
144             # }
145             # else {
146             # undef $fetch;
147             # $end4->send;
148             # }
149             # });
150             #}; $fetch->();
151              
152             #my $fetch; $fetch = sub {
153             # $fth->fetchrow_arrayref(sub {
154             # if( $_[0] ) {
155             # warn "Get! (@{$_[0]})";
156             # $fetch->();
157             # }
158             # else {
159             # undef $fetch;
160             # $end4->send;
161             # }
162             # });
163             #}; $fetch->();
164              
165             #my $fetch; $fetch = sub {
166             # $fth->fetchrow_hashref(sub {
167             # if( $_[0] ) {
168             # warn "Get! (@{[%{$_[0]}]})";
169             # $fetch->();
170             # }
171             # else {
172             # undef $fetch;
173             # $end4->send;
174             # }
175             # });
176             #}; $fetch->();
177              
178             $end4->recv;
179              
180             #tcp_connect 0, 3306, sub {
181             # my $fh = shift;
182             # my $hd = AnyEvent::Handle->new( fh => $fh );
183             # AnyEvent::MySQL::Imp::do_auth($hd, 'tiwi', '', sub {
184             # undef $hd;
185             # warn $_[0];
186             # $end->send;
187             # });
188             #};
189              
190             my $end5 = AE::cv;
191              
192             $dbh->selectall_arrayref("select a*2, b from t1 where a<=?", {}, 15, sub {
193             warn "selectall_arrayref";
194             warn Dumper($_[0]);
195             });
196              
197             $dbh->selectall_hashref("select a*2, b from t1", 'b', sub {
198             warn "selectall_hashref";
199             warn Dumper($_[0]);
200             });
201              
202             $dbh->selectall_hashref("select a*2, b from t1", ['b', 'a*2'], sub {
203             warn "selectall_hashref";
204             warn Dumper($_[0]);
205             });
206              
207             $dbh->selectall_hashref("select a*2, b from t1", sub {
208             warn "selectall_hashref";
209             warn Dumper($_[0]);
210             });
211              
212             $dbh->selectcol_arrayref("select a*2, b from t1", { Columns => [1,2,1] }, sub {
213             warn "selectcol_arrayref";
214             warn Dumper($_[0]);
215             });
216              
217             $dbh->selectall_arrayref("select * from t3", sub {
218             warn "selectall_arrayref t3";
219             warn Dumper($_[0]);
220             });
221              
222             $dbh->selectrow_array("select * from t1 where a>? order by a", {}, 2, sub {
223             warn "selectrow_array";
224             warn Dumper(\@_);
225             });
226              
227             $dbh->selectrow_arrayref("select * from t1 where a>? order by a", {}, 2, sub {
228             warn "selectrow_arrayref";
229             warn Dumper($_[0]);
230             });
231              
232             $dbh->selectrow_hashref("select * from t1 where a>? order by a", {}, 2, sub {
233             warn "selectrow_hashref";
234             warn Dumper($_[0]);
235             });
236              
237             my $st = $dbh->prepare("select * from t1 where a>? order by a");
238              
239             $st->execute(2, sub {
240             warn "fetchall_arrayref";
241             warn Dumper($_[0]->fetchall_arrayref());
242             });
243              
244             $st->execute(2, sub {
245             warn "fetchall_hashref(a)";
246             warn Dumper($_[0]->fetchall_hashref('a'));
247             });
248              
249             $st->execute(2, sub {
250             warn "fetchall_hashref";
251             warn Dumper($_[0]->fetchall_hashref());
252             });
253              
254             $st->execute(2, sub {
255             warn "fetchcol_arrayref";
256             warn Dumper($_[0]->fetchcol_arrayref());
257             });
258              
259             $dbh->begin_work( sub {
260             warn "txn begin.. @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
261             } );
262              
263             $dbh->do("update t1 set a=? b=?", {}, 3, 4, sub {
264             warn "error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
265             } );
266              
267             $dbh->do("update t1 set b=b+1", {}, sub {
268             warn "after error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
269             } );
270              
271             $dbh->commit( sub {
272             warn "aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
273             } );
274              
275             $dbh->do("update t1 set b=b+1", {}, sub {
276             warn "after aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
277             $end5->send;
278             } );
279              
280             #my $txh = $dbh->begin_work(sub {
281             # warn "txn begin.. @_";
282             #});
283             #
284             #$dbh->do("insert into t1 values (50,50)", { Tx => $txh }, sub {
285             # warn "insert in txn @_ insertid=".$dbh->last_insert_id;
286             #});
287             #
288             #$txh->rollback(sub {
289             # warn "rollback txn @_";
290             #});
291             #
292             #$dbh->selectall_arrayref("select * from t1", sub {
293             # warn "check rollback txn: ".Dumper($_[0]);
294             #});
295             #
296             #my $txh2 = $dbh->begin_work(sub {
297             # warn "txn2 begin.. @_";
298             #});
299             #
300             #$dbh->do("insert into t1 values (50,50)", { Tx => $txh2 }, sub {
301             # warn "insert in txn2 @_ insertid=".$dbh->last_insert_id;
302             #});
303             #
304             #$txh2->commit(sub {
305             # warn "commit txn2 @_";
306             #});
307             #
308             #$dbh->selectall_arrayref("select * from t1", sub {
309             # warn "check commit txn: ".Dumper($_[0]);
310             #});
311             #
312             #$dbh->do("delete from t1 where a=50", sub {
313             # warn "remove the effect @_";
314             #});
315             #
316             #my $update_st;
317             #
318             #my $txh3; $txh3 = $dbh->begin_work(sub {
319             # warn "txn3 begin.. @_";
320             #});
321             #
322             # $update_st = $dbh->prepare("insert into t1 values (?,?)", sub {
323             # warn "prepare insert @_";
324             # });
325             # $update_st->execute(60, 60, { Tx => $txh3 }, sub {
326             # warn "insert 60 @_";
327             # });
328             #
329             # $dbh->selectall_arrayref("select * from t1", { Tx => $txh3 }, sub {
330             # warn "select in txn3: ".Dumper($_[0]);
331             # });
332             #
333             # $txh3->rollback(sub {
334             # warn "txh3 rollback @_";
335             # });
336             #
337             # $dbh->selectall_arrayref("select * from t1", sub {
338             # warn "select out txn3: ".Dumper($_[0]);
339             # });
340              
341             #$st_all = $dbh->prepare("select `date`, `time`, `datetime`, `timestamp` from all_type", sub {
342             # warn "prepare st_all @_";
343             #});
344             #
345             #$st_all->execute
346              
347             $end5->recv;
348              
349             my $readonly_dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { ReadOnly => 1 }, sub {
350             # ... we can only use "select" and "show" command on this handle
351             });
352              
353             $end->recv;
354              
355             =cut
356              
357       0     sub _empty_cb {}
358              
359             =head2 $dbh = AnyEvent::MySQL->connect($data_source, $username, [$auth, [\%attr,]] $cb->($dbh, 1))
360              
361             =cut
362             sub connect {
363 0     0 1   shift;
364 0           return AnyEvent::MySQL::db->new(@_);
365             }
366              
367             package AnyEvent::MySQL::db;
368              
369 1     1   4 use strict;
  1         1  
  1         15  
370 1     1   2 use warnings;
  1         2  
  1         19  
371              
372 1     1   4 use AE;
  1         1  
  1         12  
373 1     1   3 use AnyEvent::Socket;
  1         1  
  1         74  
374 1     1   4 use AnyEvent::Handle;
  1         1  
  1         16  
375 1     1   3 use Scalar::Util qw(weaken dualvar);
  1         1  
  1         41  
376 1     1   4 use Guard;
  1         1  
  1         42  
377              
378             # connection state
379             use constant {
380 1         63 BUSY_CONN => 1,
381             IDLE_CONN => 2,
382             ZOMBIE_CONN => 3,
383 1     1   4 };
  1         1  
384              
385             # transaction state
386             use constant {
387 1         71 NO_TXN => 1,
388             EMPTY_TXN => 2,
389             CLEAN_TXN => 3,
390             DIRTY_TXN => 4,
391             DEAD_TXN => 5,
392 1     1   3 };
  1         1  
393              
394             # transaction control token
395             use constant {
396 1         68 TXN_TASK => 1,
397             TXN_BEGIN => 2,
398             TXN_COMMIT => 3,
399             TXN_ROLLBACK => 4,
400 1     1   3 };
  1         1  
401              
402             use constant {
403 1         3472 AUTHi => 0,
404             ATTRi => 1,
405             HDi => 2,
406             CONNi => 9,
407             ON_CONNi => 11,
408              
409             CONN_STATEi => 3,
410             TXN_STATEi => 4,
411              
412             TASKi => 5,
413             STi => 6,
414             FALLBACKi => 10,
415              
416             ERRi => 7,
417             ERRSTRi => 8,
418 1     1   3 };
  1         1  
419              
420             sub _push_task {
421 0     0     my($dbh, $task) = @_;
422 0           push @{$dbh->{_}[TASKi]}, $task;
  0            
423 0 0         _process_task($dbh) if( $dbh->{_}[CONN_STATEi]==IDLE_CONN );
424             }
425              
426             sub _unshift_task {
427 0     0     my($dbh, $task) = @_;
428 0           unshift @{$dbh->{_}[TASKi]}, $task;
  0            
429             }
430              
431             sub _report_error {
432 0     0     my($dbh, $method, $error_num, $error_str) = @_;
433              
434 0           $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = $error_num;
435 0           $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = $error_str;
436 0 0         warn "$dbh $method failed: $error_str ($error_num)\n" if( $dbh->{_}[ATTRi]{PrintError} );
437              
438 0 0         $dbh->{_}[TXN_STATEi] = DEAD_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
439             }
440              
441             sub _reconnect {
442 0     0     my $dbh = shift;
443 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
444 0           my $retry; $retry = AE::timer .1, 0, sub {
445 0     0     undef $retry;
446 0           _connect($dbh);
447 0           };
448             }
449              
450             sub _connect {
451 0     0     my $dbh = shift;
452 0   0       my $cb = $dbh->{_}[ON_CONNi] || \&AnyEvent::MySQL::_empty_cb;
453 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
454              
455 0           my $param = $dbh->{Name};
456 0           my $database;
457 0 0         if( index($param, '=')>=0 ) {
458             $param = {
459 0           map { split /=/, $_, 2 } split /;/, $param
  0            
460             };
461 0 0         if( $param->{host} =~ /(.*):(.*)/ ) {
462 0           $param->{host} = $1;
463 0           $param->{port} = $2;
464             }
465             }
466             else {
467 0           $param = { database => $param };
468             }
469              
470 0   0       $param->{port} ||= 3306;
471              
472 0 0 0       if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
473 0   0       my $sock = $param->{mysql_socket} || `mysql_config --socket`;
474 0 0         if( !$sock ) {
475 0           _report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
476 0           $cb->();
477 0           return;
478             }
479 0           $param->{host} = '/unix';
480 0           $param->{port} = $sock;
481             }
482              
483 0           warn "Connecting to $param->{host}:$param->{port} ...";
484 0           weaken( my $wdbh = $dbh );
485             $dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
486 0     0     my $fh = shift;
487 0 0         if( !$fh ) {
488 0           warn "Connect to $param->{host}:$param->{port} fail: $! retry later.";
489 0           undef $wdbh->{_}[CONNi];
490              
491 0           _reconnect($wdbh);
492 0           return;
493             }
494 0           warn "Connected ($param->{host}:$param->{port})";
495              
496             $wdbh->{_}[HDi] = AnyEvent::Handle->new(
497             fh => $fh,
498             on_error => sub {
499 0 0         return if !$wdbh;
500              
501 0           my $wwdbh = $wdbh;
502 0 0         if( $_[1] ) {
503 0           warn "Disconnected from $param->{host}:$param->{port} by $_[2] reconnect later.";
504 0           undef $wwdbh->{_}[HDi];
505 0           undef $wwdbh->{_}[CONNi];
506 0           $wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
507 0           _report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
508 0 0         if( $wwdbh->{_}[FALLBACKi] ) {
509 0           $wwdbh->{_}[FALLBACKi]();
510             }
511             }
512             },
513 0           );
514              
515             AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
516 0           my($success, $err_num_and_msg, $thread_id) = @_;
517 0 0         return if !$wdbh;
518 0 0         if( $success ) {
519 0           $wdbh->{mysql_thread_id} = $thread_id;
520             $cb->($wdbh, guard {
521 0 0         _process_task($wdbh) if $wdbh;
522 0           });
523             }
524             else {
525 0           warn "MySQL auth error: $err_num_and_msg retry later.";
526 0           undef $wdbh->{_}[HDi];
527 0           undef $wdbh->{_}[CONNi];
528 0 0         _reconnect($wdbh) if $wdbh;
529             }
530 0           });
531 0           });
532             }
533              
534             sub _process_task {
535 0     0     my $dbh = shift;
536 0           $dbh->{_}[CONN_STATEi] = IDLE_CONN;
537 0           $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
538 0           $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
539 0           $dbh->{_}[FALLBACKi] = undef;
540 0           weaken( my $wdbh = $dbh );
541              
542 0 0         if( !$dbh->{_}[HDi] ) {
543 0           _reconnect($dbh);
544 0           return;
545             }
546              
547 0           my $task = shift @{$dbh->{_}[TASKi]};
  0            
548 0 0         return if( !$task );
549              
550             my $next = sub {
551 0 0   0     _process_task($wdbh) if $wdbh;
552 0           };
553              
554             $dbh->{_}[FALLBACKi] = sub {
555 0     0     undef $dbh->{_}[FALLBACKi];
556 0 0 0       if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
557 0           ++$task->[3];
558 0           warn "redo the task later.. ($task->[3])";
559 0           unshift @{$dbh->{_}[TASKi]}, $task;
  0            
560             }
561             else {
562 0           $task->[2]();
563             }
564 0           _reconnect($dbh);
565 0           };
566 0 0         if( $task->[0]==TXN_TASK ) {
    0          
    0          
    0          
567 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
568 0           _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
569 0           $task->[2]();
570 0           _process_task($dbh);
571             }
572             else {
573 0 0         $dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
574 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
575             $task->[1](sub {
576 0 0 0 0     if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
577 0           _rollback($dbh, $next);
578             }
579             else {
580 0           $next->();
581             }
582 0           });
583             }
584             }
585             elsif( $task->[0]==TXN_BEGIN ) {
586 0 0         if( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
    0          
587 0           $dbh->{_}[TXN_STATEi] = DEAD_TXN;
588 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
589 0           $task->[1]($next);
590             }
591             elsif( $dbh->{_}[TXN_STATEi]==EMPTY_TXN ) {
592 0           $task->[2]($next);
593 0           _process_task($dbh);
594             }
595             else {
596 0           warn "It's in a transaction already.. Abort the old one and begin the new one.";
597 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
598             _rollback($dbh, sub {
599 0     0     $task->[1]($next);
600 0           });
601             }
602             }
603             elsif( $task->[0]==TXN_COMMIT ) {
604 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
    0          
605 0           _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
606 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
607 0           $task->[2]();
608 0           _process_task($dbh);
609             }
610             elsif( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
611 0           $task->[2]();
612 0           _process_task($dbh);
613             }
614             else {
615 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
616 0           $task->[1]($next);
617             }
618             }
619             elsif( $task->[0]==TXN_ROLLBACK ) {
620 0 0         if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
    0          
621 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
622 0           $task->[2](1);
623 0           _process_task($dbh);
624             }
625             elsif( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
626 0           $task->[2]();
627 0           _process_task($dbh);
628             }
629             else {
630 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
631 0           $task->[1]($next);
632             }
633             }
634             else {
635 0           warn "Never be here";
636             }
637             }
638              
639             sub _text_prepare {
640 0     0     my $statement = shift;
641 0           $statement =~ s(\?){
642 0           my $value = shift;
643 0 0         if( defined($value) ) {
644 0           $value =~ s/\\/\\\\/g;
645 0           $value =~ s/'/\\'/g;
646 0           "'$value'";
647             }
648             else {
649 0           'NULL';
650             }
651             }ge;
652 0           return $statement;
653             }
654              
655             =head2 $dbh = AnyEvent::MySQL::db->new($dsn, $username, [$auth, [\%attr,]] [$cb->($dbh, $next_guard)])
656              
657             $cb will be called when each time the db connection is connected, reconnected,
658             or tried but failed.
659              
660             If failed, the $dbh in the $cb's args will be undef.
661              
662             You can do some connection initialization here, such as
663             set names utf8;
664              
665             But you should NOT rely on this for work flow control,
666             cause the reconnection can occur anytime.
667              
668             =cut
669             sub new {
670 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
671 0           my($class, $dsn, $username, $auth, $attr) = @_;
672              
673 0           my $dbh = bless { _ => [] }, $class;
674 0 0         if( $dsn =~ /^DBI:mysql:(.*)$/ ) {
675 0           $dbh->{Name} = $1;
676             }
677             else {
678 0           die "invalid dsn format";
679             }
680 0           $dbh->{Username} = $username;
681 0           $dbh->{_}[AUTHi] = $auth;
682 0 0         $dbh->{_}[ATTRi] = +{ Verbose => 1, %{ $attr || {} } };
  0            
683 0           $dbh->{_}[CONN_STATEi] = BUSY_CONN;
684 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
685 0           $dbh->{_}[TASKi] = [];
686 0           $dbh->{_}[ON_CONNi] = $cb;
687              
688 0           _connect($dbh);
689              
690 0           return $dbh;
691             }
692              
693             =head2 $error_num = $dbh->err
694              
695             =cut
696             sub err {
697 0     0     return $_[0]{_}[ERRi];
698             }
699              
700             =head2 $error_str = $dbh->errstr
701              
702             =cut
703             sub errstr {
704 0     0     return $_[0]{_}[ERRSTRi];
705             }
706              
707             =head2 $rv = $dbh->last_insert_id
708              
709             Non-blocking get the value immediately
710              
711             =cut
712             sub last_insert_id {
713 0     0     $_[0]{mysql_insertid};
714             }
715              
716             sub _do {
717 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
718 0           my($rev_dir, $dbh, $statement, $attr, @bind_values) = @_;
719              
720 0 0 0       if( $dbh->{_}[ATTRi]{ReadOnly} && $statement !~ /^\s*(?:show|select)/i ){
721 0           _report_error($dbh, 'do', 1227, 'unable to perform write queries on a ReadOnly handle');
722 0           $cb->();
723 0           return;
724             }
725              
726             my @args = ($dbh, [TXN_TASK, sub {
727 0     0     my $next_act = shift;
728 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
729             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
730 0           eval {
731 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
732 0           $dbh->{mysql_insertid} = $_[2];
733 0           $cb->($_[1]);
734             }
735             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
736 0           _report_error($dbh, 'do', $_[1], $_[3]);
737 0           $cb->();
738             }
739             else {
740 0           $cb->(0+@{$_[2]});
  0            
741             }
742             };
743 0           $next_act->();
744 0           });
745 0           }, $cb, 0]);
746              
747 0 0         if( $rev_dir ) {
748 0           _unshift_task(@args);
749             }
750             else {
751 0           _push_task(@args);
752             }
753             }
754              
755             =head2 $dbh->do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])
756              
757             =cut
758             sub do {
759 0     0     unshift @_, 0;
760 0           &_do;
761             }
762              
763             =head2 $dbh->pre_do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])
764              
765             This method is like $dbh->do except that $dbh->pre_do will unshift
766             job into the queue instead of push.
767              
768             This method is for the initializing actions in the AnyEvent::MySQL->connect's callback
769              
770             =cut
771             sub pre_do {
772 0     0     unshift @_, 1;
773 0           &_do;
774             }
775              
776             =head2 $dbh->selectall_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))
777              
778             =cut
779             sub selectall_arrayref {
780 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
781 0           my($dbh, $statement, $attr, @bind_values) = @_;
782              
783             _push_task($dbh, [TXN_TASK, sub {
784 0     0     my $next_act = shift;
785 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
786             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
787 0           eval {
788 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
789 0           $dbh->{mysql_insertid} = $_[2];
790 0           $cb->([]);
791             }
792             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
793 0           _report_error($dbh, 'selectall_arrayref', $_[1], $_[3]);
794 0           $cb->();
795             }
796             else {
797 0           $cb->($_[2]);
798             }
799             };
800 0           $next_act->();
801 0           });
802 0           }, $cb, 0]);
803             }
804              
805              
806             =head2 $dbh->selectall_hashref($statement, [$key_field|\@key_field], [\%attr, [@bind_values,]] $cb->($hash_ref))
807              
808             =cut
809              
810             sub selectall_hashref {
811 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
812 0           my($dbh, $statement, $key_field) = splice @_, 0, 3;
813              
814 0           my @key_field;
815 0 0         if( ref($key_field) eq 'ARRAY' ) {
    0          
    0          
816 0           @key_field = @$key_field;
817             }
818             elsif( ref($key_field) eq 'HASH' ) {
819 0           unshift @_, $key_field;
820 0           @key_field = ();
821             }
822             elsif( defined($key_field) ) {
823 0           @key_field = ($key_field);
824             }
825             else {
826 0           @key_field = ();
827             }
828              
829 0           my($attr, @bind_values) = @_;
830              
831             _push_task($dbh, [TXN_TASK, sub {
832 0     0     my $next_act = shift;
833 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
834             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
835 0           eval {
836 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
837 0           $dbh->{mysql_insertid} = $_[2];
838 0 0         if( @key_field ) {
839 0           $cb->({});
840             }
841             else {
842 0           $cb->([]);
843             }
844             }
845             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
846 0           _report_error($dbh, 'selectall_hashref', $_[1], $_[3]);
847 0           $cb->();
848             }
849             else {
850 0           my $res;
851 0 0         if( @key_field ) {
852 0           $res = {};
853             }
854             else {
855 0           $res = [];
856             }
857 0           for(my $i=$#{$_[2]}; $i>=0; --$i) {
  0            
858 0           my %record;
859 0           for(my $j=$#{$_[2][$i]}; $j>=0; --$j) {
  0            
860 0           $record{$_[1][$j][4]} = $_[2][$i][$j];
861             }
862 0 0         if( @key_field ) {
863 0           my $h = $res;
864 0           for(@key_field[0..$#key_field-1]) {
865 0   0       $h->{$record{$_}} ||= {};
866 0           $h = $h->{$record{$_}};
867             }
868 0           $h->{$record{$key_field[-1]}} = \%record;
869             }
870             else {
871 0           push @$res, \%record;
872             }
873             }
874 0           $cb->($res);
875             }
876             };
877 0           $next_act->();
878 0           });
879 0           }, $cb, 0]);
880             }
881              
882             =head2 $dbh->selectcol_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))
883              
884             =cut
885             sub selectcol_arrayref {
886 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
887 0           my($dbh, $statement, $attr, @bind_values) = @_;
888 0   0       $attr ||= {};
889 0 0         my @columns = map { $_-1 } @{ $attr->{Columns} || [1] };
  0            
  0            
890              
891             _push_task($dbh, [TXN_TASK, sub {
892 0     0     my $next_act = shift;
893 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
894             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
895 0           eval {
896 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
897 0           $dbh->{mysql_insertid} = $_[2];
898 0           $cb->([]);
899             }
900             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
901 0           _report_error($dbh, 'selectcol_arrayref', $_[1], $_[3]);
902 0           $cb->();
903             }
904             else {
905             my @res = map {
906 0           my $r = $_;
907 0           map { $r->[$_] } @columns
  0            
908 0           } @{$_[2]};
  0            
909 0           $cb->(\@res);
910             }
911             };
912 0           $next_act->();
913 0           });
914 0           }, $cb, 0]);
915             }
916              
917             =head2 $dbh->selectrow_array($statement, [\%attr, [@bind_values,]], $cb->(@row_ary))
918              
919             =cut
920             sub selectrow_array {
921 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
922 0           my($dbh, $statement, $attr, @bind_values) = @_;
923              
924             _push_task($dbh, [TXN_TASK, sub {
925 0     0     my $next_act = shift;
926 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
927             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
928 0           eval {
929 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
930 0           $dbh->{mysql_insertid} = $_[2];
931 0           $cb->();
932             }
933             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
934 0           _report_error($dbh, 'selectrow_array', $_[1], $_[3]);
935 0           $cb->();
936             }
937             else {
938 0 0         $cb->($_[2][0] ? @{$_[2][0]} : ());
  0            
939             }
940             };
941 0           $next_act->();
942 0           });
943 0           }, $cb, 0]);
944             }
945              
946             =head2 $dbh->selectrow_arrayref($statement, [\%attr, [@bind_values,]], $cb->($ary_ref))
947              
948             =cut
949             sub selectrow_arrayref {
950 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
951 0           my($dbh, $statement, $attr, @bind_values) = @_;
952              
953             _push_task($dbh, [TXN_TASK, sub {
954 0     0     my $next_act = shift;
955 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
956             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
957 0           eval {
958 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
959 0           $dbh->{mysql_insertid} = $_[2];
960 0           $cb->(undef);
961             }
962             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
963 0           _report_error($dbh, 'selectrow_arrayref', $_[1], $_[3]);
964 0           $cb->(undef);
965             }
966             else {
967 0           $cb->($_[2][0]);
968             }
969             };
970 0           $next_act->();
971 0           });
972 0           }, $cb, 0]);
973             }
974              
975             =head2 $dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))
976              
977             =cut
978             sub selectrow_hashref {
979 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
980 0           my($dbh, $statement, $attr, @bind_values) = @_;
981              
982             _push_task($dbh, [TXN_TASK, sub {
983 0     0     my $next_act = shift;
984 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
985             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
986 0           eval {
987 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
988 0           $dbh->{mysql_insertid} = $_[2];
989 0           $cb->(undef);
990             }
991             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
992 0           _report_error($dbh, 'selectrow_hashref', $_[1], $_[3]);
993 0           $cb->(undef);
994             }
995             else {
996 0 0         if( $_[2][0] ) {
997 0           my %record;
998 0           for(my $j=$#{$_[2][0]}; $j>=0; --$j) {
  0            
999 0           $record{$_[1][$j][4]} = $_[2][0][$j];
1000             }
1001 0           $cb->(\%record);
1002             }
1003             else {
1004 0           $cb->(undef);
1005             }
1006             }
1007             };
1008 0           $next_act->();
1009 0           });
1010 0           }, $cb, 0]);
1011             }
1012              
1013             =head2 $sth = $dbh->prepare($statement, [$cb->($sth)])
1014              
1015             $cb will be called each time when this statement is prepared
1016             (or re-prepared when the db connection is reconnected)
1017              
1018             if the preparation is not success,
1019             the $sth in the $cb's arg will be undef.
1020              
1021             So you should NOT rely on this for work flow controlling.
1022              
1023             =cut
1024             sub prepare {
1025 0     0     my $dbh = $_[0];
1026              
1027 0           my $sth = AnyEvent::MySQL::st->new(@_);
1028 0           push @{$dbh->{_}[STi]}, $sth;
  0            
1029 0           weaken($dbh->{_}[STi][-1]);
1030 0           return $sth;
1031             }
1032              
1033             =head2 $dbh->begin_work([$cb->($rv)])
1034              
1035             =cut
1036             sub begin_work {
1037 0     0     my $dbh = shift;
1038 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1039              
1040             _push_task($dbh, [TXN_BEGIN, sub {
1041 0     0     my $next_act = shift;
1042 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'begin');
1043             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1044 0           eval {
1045 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1046 0           $dbh->{_}[TXN_STATEi] = EMPTY_TXN;
1047 0           $cb->(1);
1048             }
1049             else {
1050 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1051 0           _report_error($dbh, 'begin_work', $_[1], $_[3]);
1052             }
1053             else {
1054 0           _report_error($dbh, 'begin_work', 2000, "Unexpected result: $_[0]");
1055             }
1056 0           $cb->();
1057             }
1058             };
1059 0           $next_act->();
1060 0           });
1061 0           }, $cb, 0]);
1062             }
1063              
1064             =head2 $dbh->commit([$cb->($rv)])
1065              
1066             =cut
1067             sub commit {
1068 0     0     my $dbh = shift;
1069 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1070              
1071             _push_task($dbh, [TXN_COMMIT, sub {
1072 0     0     my $next_act = shift;
1073              
1074 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'commit');
1075             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1076 0           eval {
1077 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1078 0           $dbh->{_}[TXN_STATEi] = NO_TXN;
1079 0           $cb->(1);
1080             }
1081             else {
1082 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1083 0           _report_error($dbh, 'commit', $_[1], $_[3]);
1084             }
1085             else {
1086 0           _report_error($dbh, 'commit', 2000, "Unexpected result: $_[0]");
1087             }
1088 0           $cb->();
1089             }
1090             };
1091 0           $next_act->();
1092 0           });
1093 0           }, $cb, 0]);
1094             }
1095              
1096             =head2 $dbh->rollback([$cb->($rv)])
1097              
1098             =cut
1099             sub rollback {
1100 0     0     my $dbh = shift;
1101 0   0       my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
1102              
1103             _push_task($dbh, [TXN_ROLLBACK, sub {
1104 0     0     my $next_act = shift;
1105              
1106             _rollback($dbh, $next_act, sub {
1107 0 0         $dbh->{_}[TXN_STATEi] = NO_TXN if( $_[0] );
1108 0           &$cb;
1109 0           });
1110 0           }, $cb, 0]);
1111             }
1112             sub _rollback {
1113 0     0     my($dbh, $next_act, $cb) = @_;
1114              
1115 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'rollback');
1116             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1117 0     0     eval {
1118 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
1119 0 0         $cb->(1) if $cb;
1120             }
1121             else {
1122 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1123 0           _report_error($dbh, 'rollback', $_[1], $_[3]);
1124             }
1125             else {
1126 0           _report_error($dbh, 'rollback', 2000, "Unexpected result: $_[0]");
1127             }
1128 0 0         $cb->() if $cb;
1129             }
1130             };
1131 0           $next_act->();
1132 0           });
1133             }
1134              
1135             =head2 $dbh->ping(sub {my $alive = shift;});
1136              
1137             =cut
1138              
1139             sub ping {
1140 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1141 0           my ($dbh) = @_;
1142              
1143             _push_task($dbh, [TXN_TASK, sub {
1144 0     0     my $next_act = shift;
1145 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_PING);
1146             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
1147 0           eval {
1148 0 0         if ($_[0]==AnyEvent::MySQL::Imp::RES_OK) {
1149 0           $cb->(1);
1150             }
1151             else {
1152 0           $cb->(0);
1153             }
1154             };
1155 0           $next_act->();
1156 0           });
1157 0           }, $cb, 0]);
1158             }
1159              
1160             package AnyEvent::MySQL::st;
1161              
1162 1     1   5 use strict;
  1         1  
  1         19  
1163 1     1   3 use warnings;
  1         1  
  1         22  
1164              
1165 1     1   15 use Scalar::Util qw(weaken);
  1         1  
  1         48  
1166              
1167             use constant {
1168 1         474 DBHi => 0,
1169             IDi => 1,
1170             PARAMi => 2,
1171             FIELDi => 3,
1172             STATEMENTi => 4,
1173 1     1   3 };
  1         1  
1174              
1175             =head2 $sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])
1176              
1177             =cut
1178             sub new {
1179 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1180 0           my($class, $dbh, $statement) = @_;
1181 0           my $sth = bless [], $class;
1182 0           $sth->[DBHi] = $dbh;
1183 0           $sth->[STATEMENTi] = $statement;
1184              
1185 0           return $sth;
1186             }
1187              
1188             =head2 $sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])
1189              
1190             =cut
1191             sub execute {
1192 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
1193 0 0         my $attr = ref($_[-1]) eq 'HASH' ? pop : {};
1194 0           my($sth, @bind_values) = @_;
1195 0           my $dbh = $sth->[DBHi];
1196              
1197              
1198             AnyEvent::MySQL::db::_push_task($dbh, [AnyEvent::MySQL::db::TXN_TASK, sub {
1199 0     0     my $next_act = shift;
1200              
1201             my $execute = sub {
1202 0           AnyEvent::MySQL::Imp::do_execute_param($dbh->{_}[AnyEvent::MySQL::db::HDi], $sth->[IDi], \@bind_values, $sth->[PARAMi]);
1203             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], execute => 1, sub {
1204 0           eval {
1205 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
    0          
    0          
1206 0           $cb->($_[1]);
1207             }
1208             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_RESULT ) {
1209 0           $cb->(AnyEvent::MySQL::ft->new($sth->[FIELDi], $_[2]));
1210             }
1211             elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1212 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
1213 0           $cb->();
1214             }
1215             else {
1216 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unknown response: $_[0]");
1217 0           $cb->();
1218             }
1219             };
1220 0           $next_act->();
1221 0           });
1222 0           };
1223              
1224 0 0         if( $sth->[IDi] ) {
1225 0           $execute->();
1226             }
1227             else {
1228 0           AnyEvent::MySQL::Imp::send_packet($dbh->{_}[AnyEvent::MySQL::db::HDi], 0, AnyEvent::MySQL::Imp::COM_STMT_PREPARE, $sth->[STATEMENTi]);
1229             AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], prepare => 1, sub {
1230 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_PREPARE ) {
1231 0           $sth->[IDi] = $_[1];
1232 0           $sth->[PARAMi] = $_[2];
1233 0           $sth->[FIELDi] = $_[3];
1234              
1235 0           $execute->();
1236             }
1237             else {
1238 0 0         if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
1239 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
1240 0           $cb->();
1241             }
1242             else {
1243 0           AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unexpected response: $_[0]");
1244 0           $cb->();
1245             }
1246             }
1247 0           });
1248             }
1249 0           }, $cb, 0]);
1250             }
1251              
1252             package AnyEvent::MySQL::ft;
1253              
1254 1     1   4 use strict;
  1         1  
  1         13  
1255 1     1   3 use warnings;
  1         3  
  1         24  
1256              
1257             use constant {
1258 1         968 DATAi => 0,
1259             BINDi => 1,
1260             FIELDi => 2,
1261 1     1   3 };
  1         1  
1262              
1263             =head2 $fth = AnyEvent::MySQL::ft->new(\@data_set)
1264              
1265             =cut
1266             sub new {
1267 0     0     my($class, $field_set, $data_set) = @_;
1268              
1269 0           my $fth = bless [], $class;
1270 0           $fth->[FIELDi] = $field_set;
1271 0           $fth->[DATAi] = $data_set;
1272              
1273 0           return $fth;
1274             }
1275              
1276             =head2 $rc = $fth->bind_columns(@list_of_refs_to_vars_to_bind, [$cb->($rc)])
1277              
1278             =cut
1279             sub bind_columns {
1280 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1281 0           my $fth = shift;
1282 0           my @list_of_refs_to_vars_to_bind = @_;
1283              
1284 0 0         if( !@{$fth->[DATAi]} ) {
  0 0          
1285 0 0         $cb->(1) if $cb;
1286 0           return 1;
1287             }
1288 0           elsif( @{$fth->[DATAi][0]} == @list_of_refs_to_vars_to_bind ) {
1289 0           $fth->[BINDi] = \@list_of_refs_to_vars_to_bind;
1290 0 0         $cb->(1) if $cb;
1291 0           return 1;
1292             }
1293             else {
1294 0 0         $cb->() if $cb;
1295 0           return;
1296             }
1297             }
1298              
1299             =head2 $rc = $fth->bind_col($col_num, \$col_variable, [$cb->($rc)])
1300              
1301             =cut
1302             sub bind_col {
1303 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1304 0           my($fth, $col_num, $col_ref) = @_;
1305              
1306 0 0 0       if( !@{$fth->[DATAi]} ) {
  0 0          
1307 0 0         $cb->(1) if $cb;
1308 0           return 1;
1309             }
1310 0           elsif( 0<=$col_num && $col_num<=$#{$fth->[DATAi][0]} ) {
1311 0           $fth->[BINDi][$col_num] = $col_ref;
1312 0 0         $cb->(1) if $cb;
1313 0           return 1;
1314             }
1315             else {
1316 0 0         $cb->() if $cb;
1317 0           return;
1318             }
1319             }
1320              
1321             =head2 $rv = $fth->fetch([$cb->($rv)])
1322              
1323             =cut
1324             sub fetch {
1325 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1326 0           my $fth = shift;
1327              
1328 0 0 0       if( $fth->[BINDi] && $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0   0        
1329 0           my $bind = $fth->[BINDi];
1330 0           my $row = shift @{$fth->[DATAi]};
  0            
1331 0           for(my $i=0; $i<@$row; ++$i) {
1332 0 0         ${$bind->[$i]} = $row->[$i] if $bind->[$i];
  0            
1333             }
1334 0 0         $cb->(1) if $cb;
1335 0           return 1;
1336             }
1337             else {
1338 0 0         $cb->() if $cb;
1339 0           return;
1340             }
1341             }
1342              
1343             =head2 @row_ary = $fth->fetchrow_array([$cb->(@row_ary)])
1344              
1345             =cut
1346             sub fetchrow_array {
1347 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1348 0           my $fth = shift;
1349              
1350 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1351 0           my $row = shift @{$fth->[DATAi]};
  0            
1352 0 0         $cb->(@$row) if $cb;
1353 0 0         return @$row if defined wantarray;
1354             }
1355             else {
1356 0 0         $cb->() if $cb;
1357 0           return ();
1358             }
1359             }
1360              
1361             =head2 $ary_ref = $fth->fetchrow_arrayref([$cb->($ary_ref)])
1362              
1363             =cut
1364             sub fetchrow_arrayref {
1365 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1366 0           my $fth = shift;
1367              
1368 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1369 0           my $row = shift @{$fth->[DATAi]};
  0            
1370 0 0         $cb->($row) if $cb;
1371 0           return $row;
1372             }
1373             else {
1374 0 0         $cb->() if $cb;
1375 0           return;
1376             }
1377             }
1378              
1379             =head2 $hash_ref = $fth->fetchrow_hashref([$cb->($hash_ref)])
1380              
1381             =cut
1382             sub fetchrow_hashref {
1383 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1384 0           my $fth = shift;
1385              
1386 0 0 0       if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
  0            
1387 0           my $field = $fth->[FIELDi];
1388 0           my $hash = {};
1389 0           my $row = shift @{$fth->[DATAi]};
  0            
1390 0           for(my $i=0; $i<@$row; ++$i) {
1391 0           $hash->{$field->[$i][4]} = $row->[$i];
1392             }
1393 0 0         $cb->($hash) if $cb;
1394 0           return $hash;
1395             }
1396             else {
1397 0 0         $cb->() if $cb;
1398 0           return;
1399             }
1400             }
1401              
1402             =head2 $ary_ref = $fth->fetchall_arrayref([$cb->($ary_ref)])
1403              
1404             =cut
1405             sub fetchall_arrayref {
1406 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1407 0           my $fth = shift;
1408              
1409 0 0         if( $fth->[DATAi] ) {
1410 0           my $all = delete $fth->[DATAi];
1411 0 0         $cb->($all) if $cb;
1412 0           return $all;
1413             }
1414             else {
1415 0 0         $cb->() if $cb;
1416 0           return;
1417             }
1418             }
1419              
1420             =head2 $hash_ref = $fth->fetchall_hashref([($key_field|\@key_field),] [$cb->($hash_ref)])
1421              
1422             =cut
1423             sub fetchall_hashref {
1424 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1425 0           my($fth, $key_field) = @_;
1426              
1427 0           my @key_field;
1428 0 0         if( ref($key_field) eq 'ARRAY' ) {
    0          
1429 0           @key_field = @$key_field;
1430             }
1431             elsif( defined($key_field) ) {
1432 0           @key_field = ($key_field);
1433             }
1434             else {
1435 0           @key_field = ();
1436             }
1437              
1438 0 0         if( $fth->[DATAi] ) {
1439 0           my $field = $fth->[FIELDi];
1440              
1441 0           my $res;
1442 0 0         if( @key_field ) {
1443 0           $res = {};
1444             }
1445             else {
1446 0           $res = [];
1447             }
1448              
1449 0           while( @{$fth->[DATAi]} ) {
  0            
1450 0           my $row = shift @{$fth->[DATAi]};
  0            
1451 0           my %record;
1452 0           for(my $i=0; $i<@$row; ++$i) {
1453 0           $record{$field->[$i][4]} = $row->[$i];
1454             }
1455 0 0         if( @key_field ) {
1456 0           my $h = $res;
1457 0           for(@key_field[0..$#key_field-1]) {
1458 0   0       $h->{$record{$_}} ||= {};
1459 0           $h = $h->{$record{$_}};
1460             }
1461 0           $h->{$record{$key_field[-1]}} = \%record;
1462             }
1463             else {
1464 0           push @$res, \%record;
1465             }
1466             }
1467 0           delete $fth->[DATAi];
1468 0 0         $cb->($res) if $cb;
1469 0           return $res;
1470             }
1471             else {
1472 0 0         $cb->() if $cb;
1473 0           return;
1474             }
1475             }
1476              
1477             =head2 $ary_ref = $fth->fetchcol_arrayref([\%attr], [$cb->($ary_ref)])
1478              
1479             =cut
1480             sub fetchcol_arrayref {
1481 0 0   0     my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
1482 0           my($fth, $attr) = @_;
1483 0   0       $attr ||= {};
1484 0 0         my @columns = map { $_-1 } @{ $attr->{Columns} || [1] };
  0            
  0            
1485              
1486 0 0         if( $fth->[DATAi] ) {
1487             my @res = map {
1488 0           my $r = $_;
1489 0           map { $r->[$_] } @columns
  0            
1490 0           } @{ delete $fth->[DATAi] };
  0            
1491 0 0         $cb->(\@res) if $cb;
1492 0           return \@res;
1493             }
1494             else {
1495 0 0         $cb->() if $cb;
1496 0           return;
1497             }
1498             }
1499              
1500             =head1 AUTHOR
1501              
1502             Cindy Wang (CindyLinz)
1503              
1504             =head1 BUGS
1505              
1506             Please report any bugs or feature requests to C.
1507             I will be notified, and then you'll
1508             automatically be notified of progress on your bug as I make changes.
1509              
1510              
1511              
1512              
1513             =head1 SUPPORT
1514              
1515             You can find documentation for this module with the perldoc command.
1516              
1517             perldoc AnyEvent::MySQL
1518              
1519              
1520             You can also look for information at:
1521              
1522             =over 4
1523              
1524             =item * github
1525              
1526             L
1527              
1528             =item * Search CPAN
1529              
1530             L
1531              
1532             =back
1533              
1534              
1535             =head1 LICENSE AND COPYRIGHT
1536              
1537             Copyright 2011-2015 Cindy Wang (CindyLinz).
1538              
1539             This program is free software; you can redistribute it and/or modify it
1540             under the terms of either: the GNU General Public License as published
1541             by the Free Software Foundation; or the Artistic License.
1542              
1543             See http://dev.perl.org/licenses/ for more information.
1544              
1545             =head1 CONTRIBUTOR
1546              
1547             Dmitriy Shamatrin (justnoxx@github)
1548              
1549             clking (clking@github)
1550              
1551             =cut
1552              
1553             1; # End of AnyEvent::MySQL