File Coverage

blib/lib/Mojo/mysql/Database.pm
Criterion Covered Total %
statement 27 132 20.4
branch 0 54 0.0
condition 0 18 0.0
subroutine 9 38 23.6
pod 10 10 100.0
total 46 252 18.2


line stmt bran cond sub pod time code
1             package Mojo::mysql::Database;
2 18     18   132 use Mojo::Base 'Mojo::EventEmitter';
  18         38  
  18         157  
3              
4 18     18   3364 use Carp;
  18         42  
  18         1100  
5 18     18   6798 use Mojo::IOLoop;
  18         2206924  
  18         101  
6 18     18   847 use Mojo::JSON 'to_json';
  18         45  
  18         1071  
7 18     18   8420 use Mojo::mysql::Results;
  18         51  
  18         136  
8 18     18   8202 use Mojo::mysql::Transaction;
  18         53  
  18         151  
9 18     18   619 use Mojo::Promise;
  18         40  
  18         91  
10 18     18   511 use Mojo::Util 'monkey_patch';
  18         51  
  18         824  
11 18     18   135 use Scalar::Util 'weaken';
  18         34  
  18         41018  
12              
13             has [qw(dbh mysql)];
14             has results_class => 'Mojo::mysql::Results';
15              
16             for my $name (qw(delete insert select update)) {
17             monkey_patch __PACKAGE__, $name, sub {
18 0     0     my $self = shift;
        0      
        0      
        0      
19 0 0         my @cb = ref $_[-1] eq 'CODE' ? pop : ();
20 0           return $self->query($self->mysql->abstract->$name(@_), @cb);
21             };
22             monkey_patch __PACKAGE__, "${name}_p", sub {
23 0     0     my $self = shift;
        0      
        0      
        0      
24 0           return $self->query_p($self->mysql->abstract->$name(@_));
25             };
26             }
27              
28             sub DESTROY {
29 0     0     my $self = shift;
30 0           $self->_cleanup_sth;
31 0 0 0       return unless (my $mysql = $self->mysql) and (my $dbh = $self->dbh);
32 0           $mysql->_enqueue($dbh, $self->{handle});
33             }
34              
35 0 0   0 1   sub backlog { scalar @{shift->{waiting} || []} }
  0            
36              
37             sub begin {
38 0     0 1   my $self = shift;
39 0           my $tx = Mojo::mysql::Transaction->new(db => $self);
40 0           weaken $tx->{db};
41 0           return $tx;
42             }
43              
44             sub disconnect {
45 0     0 1   my $self = shift;
46 0           $self->_cleanup_sth;
47 0           $self->_unwatch;
48 0           $self->dbh->disconnect;
49             }
50              
51 0     0 1   sub pid { shift->_dbh_attr('mysql_thread_id') }
52              
53 0     0 1   sub ping { shift->dbh->ping }
54              
55             sub query {
56 0     0 1   my ($self, $query) = (shift, shift);
57 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
58              
59             # Blocking
60 0 0         unless ($cb) {
61 0 0         Carp::confess('Cannot perform blocking query, while waiting for async response') if $self->backlog;
62 0           my $sth = $self->dbh->prepare($query);
63 0     0     local $sth->{HandleError} = sub { $_[0] = Carp::shortmess($_[0]); 0 };
  0            
  0            
64 0           _bind_params($sth, @_);
65 0           my $rv = $sth->execute;
66 0           my $res = $self->results_class->new(db => $self, is_blocking => 1, sth => $sth);
67 0 0 0       $res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
68 0           return $res;
69             }
70              
71             # Non-blocking
72 0           push @{$self->{waiting}}, {args => [@_], err => Carp::shortmess('__MSG__'), cb => $cb, query => $query};
  0            
73 0           $self->$_ for qw(_next _watch);
74 0           return $self;
75             }
76              
77             sub query_p {
78 0     0 1   my $self = shift;
79 0           my $promise = Mojo::Promise->new;
80 0 0   0     $self->query(@_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
  0            
81 0           return $promise;
82             }
83              
84 0     0 1   sub quote { shift->dbh->quote(shift) }
85              
86 0     0 1   sub quote_id { shift->dbh->quote_identifier(shift) }
87              
88             sub tables {
89 0     0 1   shift->query('show tables')->arrays->reduce(sub { push @$a, $b->[0]; $a }, []);
  0     0      
  0            
90             }
91              
92             sub _bind_params {
93 0     0     my $sth = shift;
94 0           for my $i (0 .. $#_) {
95 0           my $param = $_[$i];
96 0           my %attrs;
97 0 0         if (ref $param eq 'HASH') {
98 0 0 0       if (exists $param->{json}) {
    0          
99 0           $param = to_json $param->{json};
100             }
101             elsif (exists $param->{type} && exists $param->{value}) {
102 0           ($param, $attrs{TYPE}) = @$param{qw(value type)};
103             }
104             }
105              
106 0           $sth->bind_param($i + 1, $param, \%attrs);
107             }
108 0           return $sth;
109             }
110              
111             sub _cleanup_sth {
112 0     0     my $self = shift;
113 0           delete $self->{done_sth};
114 0 0         $_->{cb}($self, 'Premature connection close', undef) for @{delete $self->{waiting} || []};
  0            
115             }
116              
117             sub _dbh_attr {
118 0     0     my $self = shift;
119 0 0         my $dbh = ref $self ? $self->dbh : shift;
120 0           my $name = shift;
121 0           $name =~ s!^mysql!{lc $dbh->{Driver}{Name}}!e;
  0            
  0            
122 0 0         return $dbh->{$name} = shift if @_;
123 0           return $dbh->{$name};
124             }
125              
126             sub _next {
127 0     0     my $self = shift;
128              
129 0 0         return unless my $next = $self->{waiting}[0];
130 0 0         return if $next->{sth};
131              
132 0           my $dbh = $self->dbh;
133 0 0         my $flag = lc $dbh->{Driver}{Name} eq 'mariadb' ? 'mariadb_async' : 'async';
134 0           my $sth = $next->{sth} = $self->dbh->prepare($next->{query}, {$flag => 1});
135 0           _bind_params($sth, @{$next->{args}});
  0            
136 0           $sth->execute;
137             }
138              
139             sub _unwatch {
140 0 0   0     Mojo::IOLoop->singleton->reactor->remove(delete $_[0]->{handle}) if $_[0]->{handle};
141             }
142              
143             sub _watch {
144 0     0     my $self = shift;
145 0 0         return if $self->{handle};
146              
147 0           my $dbh = $self->dbh;
148 0           my $driver = lc $dbh->{Driver}{Name};
149 0           my $ready_method = "${driver}_async_ready";
150 0           my $result_method = "${driver}_async_result";
151 0 0         my $fd = $driver eq 'mariadb' ? $dbh->mariadb_sockfd : $dbh->mysql_fd;
152 0 0         open $self->{handle}, '<&', $fd or die "Could not dup $driver fd: $!";
153             Mojo::IOLoop->singleton->reactor->io(
154             $self->{handle} => sub {
155 0 0   0     return unless my $waiting = $self->{waiting};
156 0 0 0       return unless @$waiting and $waiting->[0]{sth} and $waiting->[0]{sth}->$ready_method;
      0        
157 0           my ($cb, $err, $sth) = @{shift @$waiting}{qw(cb err sth)};
  0            
158              
159             # Do not raise exceptions inside the event loop
160 0           my $rv = do { local $sth->{RaiseError} = 0; $sth->$result_method };
  0            
  0            
161 0           my $res = $self->results_class->new(db => $self, sth => $sth);
162              
163 0 0         $err = undef if defined $rv;
164 0 0         $err =~ s!\b__MSG__\b!{$dbh->errstr}!e if defined $err;
  0            
  0            
165 0 0 0       $res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
166              
167 0           $self->$cb($err, $res);
168 0           $self->_next;
169 0 0         $self->_unwatch unless $self->backlog;
170             }
171 0           )->watch($self->{handle}, 1, 0);
172             }
173              
174             1;
175              
176             =encoding utf8
177              
178             =head1 NAME
179              
180             Mojo::mysql::Database - Database
181              
182             =head1 SYNOPSIS
183              
184             use Mojo::mysql::Database;
185              
186             my $db = Mojo::mysql::Database->new(mysql => $mysql, dbh => $dbh);
187              
188             =head1 DESCRIPTION
189              
190             L is a container for database handles used by L.
191              
192             =head1 ATTRIBUTES
193              
194             L implements the following attributes.
195              
196             =head2 dbh
197              
198             my $dbh = $db->dbh;
199             $db = $db->dbh(DBI->new);
200              
201             Database handle used for all queries.
202              
203             =head2 mysql
204              
205             my $mysql = $db->mysql;
206             $db = $db->mysql(Mojo::mysql->new);
207              
208             L object this database belongs to.
209              
210             =head2 results_class
211              
212             $class = $db->results_class;
213             $db = $db->results_class("MyApp::Results");
214              
215             Class to be used by L, defaults to L. Note that
216             this class needs to have already been loaded before L is called.
217              
218             =head1 METHODS
219              
220             L inherits all methods from L and
221             implements the following new ones.
222              
223             =head2 backlog
224              
225             my $num = $db->backlog;
226              
227             Number of waiting non-blocking queries.
228              
229             =head2 begin
230              
231             my $tx = $db->begin;
232              
233             Begin transaction and return L object, which will
234             automatically roll back the transaction unless
235             L has been called before it is destroyed.
236              
237             # Add names in a transaction
238             eval {
239             my $tx = $db->begin;
240             $db->query('insert into names values (?)', 'Baerbel');
241             $db->query('insert into names values (?)', 'Wolfgang');
242             $tx->commit;
243             };
244             say $@ if $@;
245              
246             =head2 delete
247              
248             my $results = $db->delete($table, \%where);
249              
250             Generate a C statement with L (usually an
251             L object) and execute it with L. You can also append a
252             callback to perform operations non-blocking.
253              
254             $db->delete(some_table => sub {
255             my ($db, $err, $results) = @_;
256             ...
257             });
258             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
259              
260             =head2 delete_p
261              
262             my $promise = $db->delete_p($table, \%where, \%options);
263              
264             Same as L, but performs all operations non-blocking and returns a
265             L object instead of accepting a callback.
266              
267             $db->delete_p('some_table')->then(sub {
268             my $results = shift;
269             ...
270             })->catch(sub {
271             my $err = shift;
272             ...
273             })->wait;
274              
275             =head2 disconnect
276              
277             $db->disconnect;
278              
279             Disconnect database handle and prevent it from getting cached again.
280              
281             =head2 insert
282              
283             my $results = $db->insert($table, \@values || \%fieldvals, \%options);
284              
285             Generate an C statement with L (usually an
286             L object) and execute it with L. You can also append a
287             callback to perform operations non-blocking.
288              
289             $db->insert(some_table => {foo => 'bar'} => sub {
290             my ($db, $err, $results) = @_;
291             ...
292             });
293             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
294              
295             =head2 insert_p
296              
297             my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
298              
299             Same as L, but performs all operations non-blocking and returns a
300             L object instead of accepting a callback.
301              
302             $db->insert_p(some_table => {foo => 'bar'})->then(sub {
303             my $results = shift;
304             ...
305             })->catch(sub {
306             my $err = shift;
307             ...
308             })->wait;
309              
310             =head2 pid
311              
312             my $pid = $db->pid;
313              
314             Return the connection id of the backend server process.
315              
316             =head2 ping
317              
318             my $bool = $db->ping;
319              
320             Check database connection.
321              
322             =head2 query
323              
324             my $results = $db->query('select * from foo');
325             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
326             my $results = $db->query('insert into foo values (?)', {json => {bar => 'baz'}});
327             my $results = $db->query('insert into foo values (?)', {type => SQL_INTEGER, value => 42});
328              
329             Execute a blocking statement and return a L object with the
330             results. You can also append a callback to perform operation non-blocking.
331              
332             $db->query('select * from foo' => sub {
333             my ($db, $err, $results) = @_;
334             ...
335             });
336             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
337              
338             Hash reference arguments containing a value named C, will be encoded to
339             JSON text with L. To accomplish the reverse, you can use
340             the method L, which automatically decodes data back
341             to Perl data structures.
342              
343             $db->query('insert into foo values (x) values (?)', {json => {bar => 'baz'}});
344             $db->query('select * from foo')->expand->hash->{x}{bar}; # baz
345              
346             Hash reference arguments containing values named C and C can be
347             used to bind specific L data types (see L) to
348             placeholders. This is needed to pass binary data in parameters; see
349             L for more information.
350              
351             # Insert binary data
352             use DBI ':sql_types';
353             $db->query('insert into bar values (?)', {type => SQL_BLOB, value => $bytes});
354              
355             =head2 query_p
356              
357             my $promise = $db->query_p('select * from foo');
358              
359             Same as L, but performs all operations non-blocking and returns a
360             L object instead of accepting a callback.
361              
362             $db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
363             my $results = shift;
364             ...
365             })->catch(sub {
366             my $err = shift;
367             ...
368             })->wait;
369              
370             =head2 quote
371              
372             my $escaped = $db->quote($str);
373              
374             Quote a string literal for use as a literal value in an SQL statement.
375              
376             =head2 quote_id
377              
378             my $escaped = $db->quote_id($id);
379              
380             Quote an identifier (table name etc.) for use in an SQL statement.
381              
382             =head2 select
383              
384             my $results = $db->select($source, $fields, $where, $order);
385              
386             Generate a C
387             L object) and execute it with L. You can also append a
388             callback to perform operations non-blocking.
389              
390             $db->select(some_table => ['foo'] => sub {
391             my ($db, $err, $results) = @_;
392             ...
393             });
394             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
395              
396             =head2 select_p
397              
398             my $promise = $db->select_p($source, $fields, $where, $order);
399              
400             Same as L, but performs all operations non-blocking and returns a
401             L object instead of accepting a callback.
402              
403             $db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
404             my $results = shift;
405             ...
406             })->catch(sub {
407             my $err = shift;
408             ...
409             })->wait;
410              
411             =head2 update
412              
413             my $results = $db->update($table, \%fieldvals, \%where);
414              
415             Generate an C statement with L (usually an
416             L object) and execute it with L. You can also append a
417             callback to perform operations non-blocking.
418              
419             $db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
420             my ($db, $err, $results) = @_;
421             ...
422             });
423             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
424              
425             =head2 update_p
426              
427             my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
428              
429             Same as L, but performs all operations non-blocking and returns a
430             L object instead of accepting a callback.
431              
432             $db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
433             my $results = shift;
434             ...
435             })->catch(sub {
436             my $err = shift;
437             ...
438             })->wait;
439              
440             =head2 tables
441              
442             my $tables = $db->tables;
443              
444             Return an array reference with table names for this database.
445              
446             =head1 SEE ALSO
447              
448             L.
449              
450             =cut