File Coverage

blib/lib/Mojo/MySQL/Database.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Mojo::MySQL::Database;
2 3     3   15 use Mojo::Base 'Mojo::EventEmitter';
  3         5  
  3         27  
3              
4 3     3   81076 use DBD::mysql;
  0            
  0            
5             use IO::Handle;
6             use Mojo::IOLoop;
7             use Mojo::MySQL::Results;
8              
9             has [qw(dbh mysql)];
10             has max_statements => 10;
11              
12             sub DESTROY {
13             my $self = shift;
14             if ((my $dbh = $self->dbh) && (my $mysql = $self->mysql)) { $mysql->_enqueue($dbh) }
15             }
16              
17             sub backlog { scalar @{shift->{waiting} || []} }
18              
19             sub begin { shift->_dbh(begin_work => @_) }
20              
21             sub commit { shift->dbh->commit }
22              
23             sub disconnect {
24             my $self = shift;
25             $self->_unwatch;
26             $self->dbh->disconnect;
27             }
28              
29             sub do { shift->_dbh(do => @_) }
30              
31             sub ping { shift->dbh->ping }
32              
33             sub query {
34             my ($self, $query) = (shift, shift);
35             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
36              
37             # Blocking
38             unless ($cb) {
39             my $sth = $self->_dequeue(0, $query);
40             $sth->execute(@_);
41             return Mojo::MySQL::Results->new(db => $self, sth => $sth);
42             }
43              
44             # Non-blocking
45             push @{$self->{waiting}}, {args => [@_], cb => $cb, query => $query};
46             $self->$_ for qw(_next _watch);
47             }
48              
49             sub rollback { shift->dbh->rollback }
50              
51             sub _dbh {
52             my ($self, $method) = (shift, shift);
53             $self->dbh->$method(@_);
54             return $self;
55             }
56              
57             sub _dequeue {
58             my ($self, $async, $query) = @_;
59              
60             my $queue = $self->{queue} ||= [];
61             for (my $i = 0; $i <= $#$queue; $i++) {
62             my $sth = $queue->[$i];
63             return splice @$queue, $i, 1 if !(!$sth->{async} ^ !$async) && $sth->{Statement} eq $query;
64             }
65              
66             return $self->dbh->prepare($query, $async ? {async => 1} : ());
67             }
68              
69             sub _enqueue {
70             my ($self, $sth) = @_;
71             push @{$self->{queue}}, $sth;
72             shift @{$self->{queue}} while @{$self->{queue}} > $self->max_statements;
73             }
74              
75             sub _next {
76             my $self = shift;
77              
78             return unless my $next = $self->{waiting}[0];
79             return if $next->{sth};
80              
81             my $sth = $next->{sth} = $self->_dequeue(1, $next->{query});
82             $sth->execute(@{$next->{args}});
83             }
84              
85             sub _unwatch {
86             my $self = shift;
87             return unless delete $self->{watching};
88             Mojo::IOLoop->singleton->reactor->remove($self->{handle});
89             }
90              
91             sub _watch {
92             my $self = shift;
93              
94             return if $self->{watching} || $self->{watching}++;
95              
96             my $dbh = $self->dbh;
97             $self->{handle} ||= do {
98             open my $FH, '<&', $dbh->mysql_fd or die "Dup mysql_fd: $!";
99             $FH;
100             };
101             Mojo::IOLoop->singleton->reactor->io(
102             $self->{handle} => sub {
103             my $reactor = shift;
104              
105             return unless my $waiting = $self->{waiting};
106             return unless @$waiting and $waiting->[0]{sth} and $waiting->[0]{sth}->mysql_async_ready;
107             my ($sth, $cb) = @{shift @$waiting}{qw(sth cb)};
108              
109             # Do not raise exceptions inside the event loop
110             my $result = do { local $sth->{RaiseError} = 0; $sth->mysql_async_result; };
111             my $err = defined $result ? undef : $dbh->errstr;
112              
113             $self->$cb($err, Mojo::MySQL::Results->new(db => $self, sth => $sth));
114             $self->_next;
115             $self->_unwatch unless $self->backlog;
116             }
117             )->watch($self->{handle}, 1, 0);
118             }
119              
120             1;
121              
122             =encoding utf8
123              
124             =head1 NAME
125              
126             Mojo::MySQL::Database - Database
127              
128             =head1 SYNOPSIS
129              
130             use Mojo::MySQL::Database;
131              
132             my $db = Mojo::MySQL::Database->new(mysql => $mysql, dbh => $dbh);
133              
134             =head1 DESCRIPTION
135              
136             L is a container for database handles used by L.
137              
138             =head1 ATTRIBUTES
139              
140             L implements the following attributes.
141              
142             =head2 dbh
143              
144             my $dbh = $db->dbh;
145             $db = $db->dbh(DBI->new);
146              
147             Database handle used for all queries.
148              
149             =head2 mysql
150              
151             my $mysql = $db->mysql;
152             $db = $db->mysql(Mojo::MySQL->new);
153              
154             L object this database belongs to.
155              
156             =head2 max_statements
157              
158             my $max = $db->max_statements;
159             $db = $db->max_statements(5);
160              
161             Maximum number of statement handles to cache for future queries, defaults to
162             C<10>.
163              
164             =head1 METHODS
165              
166             L inherits all methods from L and
167             implements the following new ones.
168              
169             =head2 backlog
170              
171             my $num = $db->backlog;
172              
173             Number of waiting non-blocking queries.
174              
175             =head2 begin
176              
177             $db = $db->begin;
178              
179             Begin transaction.
180              
181             =head2 commit
182              
183             $db->commit;
184              
185             Commit transaction.
186              
187             =head2 disconnect
188              
189             $db->disconnect;
190              
191             Disconnect database handle and prevent it from getting cached again.
192              
193             =head2 do
194              
195             $db = $db->do('create table foo (bar varchar(255))');
196              
197             Execute a statement and discard its result.
198              
199             =head2 ping
200              
201             my $bool = $db->ping;
202              
203             Check database connection.
204              
205             =head2 query
206              
207             my $results = $db->query('select * from foo');
208             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
209              
210             Execute a statement and return a L object with the results.
211             The statement handle will be automatically cached again when that object is
212             destroyed, so future queries can reuse it to increase performance. You can
213             also append a callback to perform operation non-blocking.
214              
215             $db->query('select * from foo' => sub {
216             my ($db, $err, $results) = @_;
217             ...
218             });
219             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
220              
221             =head2 rollback
222              
223             $db->rollback;
224              
225             Rollback transaction.
226              
227             =head1 SEE ALSO
228              
229             L, L, L.
230              
231             =cut