File Coverage

blib/lib/Mojo/mysql/Database.pm
Criterion Covered Total %
statement 27 133 20.3
branch 0 56 0.0
condition 0 18 0.0
subroutine 9 38 23.6
pod 10 10 100.0
total 46 255 18.0


line stmt bran cond sub pod time code
1             package Mojo::mysql::Database;
2 18     18   117 use Mojo::Base 'Mojo::EventEmitter';
  18         37  
  18         151  
3              
4 18     18   8284 use Carp;
  18         38  
  18         1490  
5 18     18   12966 use Mojo::IOLoop;
  18         5992003  
  18         139  
6 18     18   1092 use Mojo::JSON 'to_json';
  18         60  
  18         1405  
7 18     18   9812 use Mojo::mysql::Results;
  18         58  
  18         124  
8 18     18   9442 use Mojo::mysql::Transaction;
  18         106  
  18         112  
9 18     18   838 use Mojo::Promise;
  18         35  
  18         158  
10 18     18   619 use Mojo::Util 'monkey_patch';
  18         33  
  18         993  
11 18     18   104 use Scalar::Util 'weaken';
  18         32  
  18         48084  
12              
13             has [qw(dbh mysql)];
14             has results_class => 'Mojo::mysql::Results';
15              
16             for my $name (qw(delete insert select update)) {
17             monkey_patch __PACKAGE__, $name, sub {
18 0     0     my $self = shift;
        0      
        0      
        0      
19 0 0         my @cb = ref $_[-1] eq 'CODE' ? pop : ();
20 0           return $self->query($self->mysql->abstract->$name(@_), @cb);
21             };
22             monkey_patch __PACKAGE__, "${name}_p", sub {
23 0     0     my $self = shift;
        0      
        0      
        0      
24 0           return $self->query_p($self->mysql->abstract->$name(@_));
25             };
26             }
27              
28             sub DESTROY {
29 0     0     my $self = shift;
30 0 0         return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
31 0           $self->_cleanup_sth;
32 0 0 0       return unless (my $mysql = $self->mysql) and (my $dbh = $self->dbh);
33 0           $mysql->_enqueue($dbh, $self->{handle});
34             }
35              
36 0 0   0 1   sub backlog { scalar @{shift->{waiting} || []} }
  0            
37              
38             sub begin {
39 0     0 1   my $self = shift;
40 0           my $tx = Mojo::mysql::Transaction->new(db => $self);
41 0           weaken $tx->{db};
42 0           return $tx;
43             }
44              
45             sub disconnect {
46 0     0 1   my $self = shift;
47 0           $self->_cleanup_sth;
48 0           $self->_unwatch;
49 0           $self->dbh->disconnect;
50             }
51              
52 0     0 1   sub pid { shift->_dbh_attr('mysql_thread_id') }
53              
54 0     0 1   sub ping { shift->dbh->ping }
55              
56             sub query {
57 0     0 1   my ($self, $query) = (shift, shift);
58 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
59              
60             # Blocking
61 0 0         unless ($cb) {
62 0 0         Carp::confess('Cannot perform blocking query, while waiting for async response') if $self->backlog;
63 0           my $sth = $self->dbh->prepare($query);
64 0     0     local $sth->{HandleError} = sub { $_[0] = Carp::shortmess($_[0]); 0 };
  0            
  0            
65 0           _bind_params($sth, @_);
66 0           my $rv = $sth->execute;
67 0           my $res = $self->results_class->new(db => $self, is_blocking => 1, sth => $sth);
68 0 0 0       $res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
69 0           return $res;
70             }
71              
72             # Non-blocking
73 0           push @{$self->{waiting}}, {args => [@_], err => Carp::shortmess('__MSG__'), cb => $cb, query => $query};
  0            
74 0           $self->$_ for qw(_next _watch);
75 0           return $self;
76             }
77              
78             sub query_p {
79 0     0 1   my $self = shift;
80 0           my $promise = Mojo::Promise->new;
81 0 0   0     $self->query(@_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
  0            
82 0           return $promise;
83             }
84              
85 0     0 1   sub quote { shift->dbh->quote(shift) }
86              
87 0     0 1   sub quote_id { shift->dbh->quote_identifier(shift) }
88              
89             sub tables {
90 0     0 1   shift->query('show tables')->arrays->reduce(sub { push @$a, $b->[0]; $a }, []);
  0     0      
  0            
91             }
92              
93             sub _bind_params {
94 0     0     my $sth = shift;
95 0           for my $i (0 .. $#_) {
96 0           my $param = $_[$i];
97 0           my %attrs;
98 0 0         if (ref $param eq 'HASH') {
99 0 0 0       if (exists $param->{json}) {
    0          
100 0           $param = to_json $param->{json};
101             }
102             elsif (exists $param->{type} && exists $param->{value}) {
103 0           ($param, $attrs{TYPE}) = @$param{qw(value type)};
104             }
105             }
106              
107 0           $sth->bind_param($i + 1, $param, \%attrs);
108             }
109 0           return $sth;
110             }
111              
112             sub _cleanup_sth {
113 0     0     my $self = shift;
114 0           delete $self->{done_sth};
115 0 0         $_->{cb}($self, 'Premature connection close', undef) for @{delete $self->{waiting} || []};
  0            
116             }
117              
118             sub _dbh_attr {
119 0     0     my $self = shift;
120 0 0         my $dbh = ref $self ? $self->dbh : shift;
121 0           my $name = shift;
122 0           $name =~ s!^mysql!{lc $dbh->{Driver}{Name}}!e;
  0            
  0            
123 0 0         return $dbh->{$name} = shift if @_;
124 0           return $dbh->{$name};
125             }
126              
127             sub _next {
128 0     0     my $self = shift;
129              
130 0 0         return unless my $next = $self->{waiting}[0];
131 0 0         return if $next->{sth};
132              
133 0           my $dbh = $self->dbh;
134 0 0         my $flag = lc $dbh->{Driver}{Name} eq 'mariadb' ? 'mariadb_async' : 'async';
135 0           my $sth = $next->{sth} = $self->dbh->prepare($next->{query}, {$flag => 1});
136 0           _bind_params($sth, @{$next->{args}});
  0            
137 0           $sth->execute;
138             }
139              
140             sub _unwatch {
141 0 0   0     Mojo::IOLoop->singleton->reactor->remove(delete $_[0]->{handle}) if $_[0]->{handle};
142             }
143              
144             sub _watch {
145 0     0     my $self = shift;
146 0 0         return if $self->{handle};
147              
148 0           my $dbh = $self->dbh;
149 0           my $driver = lc $dbh->{Driver}{Name};
150 0           my $ready_method = "${driver}_async_ready";
151 0           my $result_method = "${driver}_async_result";
152 0 0         my $fd = $driver eq 'mariadb' ? $dbh->mariadb_sockfd : $dbh->mysql_fd;
153 0 0         open $self->{handle}, '<&', $fd or die "Could not dup $driver fd: $!";
154             Mojo::IOLoop->singleton->reactor->io(
155             $self->{handle} => sub {
156 0 0   0     return unless my $waiting = $self->{waiting};
157 0 0 0       return unless @$waiting and $waiting->[0]{sth} and $waiting->[0]{sth}->$ready_method;
      0        
158 0           my ($cb, $err, $sth) = @{shift @$waiting}{qw(cb err sth)};
  0            
159              
160             # Do not raise exceptions inside the event loop
161 0           my $rv = do { local $sth->{RaiseError} = 0; $sth->$result_method };
  0            
  0            
162 0           my $res = $self->results_class->new(db => $self, sth => $sth);
163              
164 0 0         $err = undef if defined $rv;
165 0 0         $err =~ s!\b__MSG__\b!{$dbh->errstr}!e if defined $err;
  0            
  0            
166 0 0 0       $res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
167              
168 0           $self->$cb($err, $res);
169 0           $self->_next;
170 0 0         $self->_unwatch unless $self->backlog;
171             }
172 0           )->watch($self->{handle}, 1, 0);
173             }
174              
175             1;
176              
177             =encoding utf8
178              
179             =head1 NAME
180              
181             Mojo::mysql::Database - Database
182              
183             =head1 SYNOPSIS
184              
185             use Mojo::mysql::Database;
186              
187             my $db = Mojo::mysql::Database->new(mysql => $mysql, dbh => $dbh);
188              
189             =head1 DESCRIPTION
190              
191             L is a container for database handles used by L.
192              
193             =head1 ATTRIBUTES
194              
195             L implements the following attributes.
196              
197             =head2 dbh
198              
199             my $dbh = $db->dbh;
200             $db = $db->dbh(DBI->new);
201              
202             Database handle used for all queries.
203              
204             =head2 mysql
205              
206             my $mysql = $db->mysql;
207             $db = $db->mysql(Mojo::mysql->new);
208              
209             L object this database belongs to.
210              
211             =head2 results_class
212              
213             $class = $db->results_class;
214             $db = $db->results_class("MyApp::Results");
215              
216             Class to be used by L, defaults to L. Note that
217             this class needs to have already been loaded before L is called.
218              
219             =head1 METHODS
220              
221             L inherits all methods from L and
222             implements the following new ones.
223              
224             =head2 backlog
225              
226             my $num = $db->backlog;
227              
228             Number of waiting non-blocking queries.
229              
230             =head2 begin
231              
232             my $tx = $db->begin;
233              
234             Begin transaction and return L object, which will
235             automatically roll back the transaction unless
236             L has been called before it is destroyed.
237              
238             # Add names in a transaction
239             eval {
240             my $tx = $db->begin;
241             $db->query('insert into names values (?)', 'Baerbel');
242             $db->query('insert into names values (?)', 'Wolfgang');
243             $tx->commit;
244             };
245             say $@ if $@;
246              
247             =head2 delete
248              
249             my $results = $db->delete($table, \%where);
250              
251             Generate a C statement with L (usually an
252             L object) and execute it with L. You can also append a
253             callback to perform operations non-blocking.
254              
255             $db->delete(some_table => sub {
256             my ($db, $err, $results) = @_;
257             ...
258             });
259             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
260              
261             =head2 delete_p
262              
263             my $promise = $db->delete_p($table, \%where, \%options);
264              
265             Same as L, but performs all operations non-blocking and returns a
266             L object instead of accepting a callback.
267              
268             $db->delete_p('some_table')->then(sub {
269             my $results = shift;
270             ...
271             })->catch(sub {
272             my $err = shift;
273             ...
274             })->wait;
275              
276             =head2 disconnect
277              
278             $db->disconnect;
279              
280             Disconnect database handle and prevent it from getting cached again.
281              
282             =head2 insert
283              
284             my $results = $db->insert($table, \@values || \%fieldvals, \%options);
285              
286             Generate an C statement with L (usually an
287             L object) and execute it with L. You can also append a
288             callback to perform operations non-blocking.
289              
290             $db->insert(some_table => {foo => 'bar'} => sub {
291             my ($db, $err, $results) = @_;
292             ...
293             });
294             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
295              
296             =head2 insert_p
297              
298             my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
299              
300             Same as L, but performs all operations non-blocking and returns a
301             L object instead of accepting a callback.
302              
303             $db->insert_p(some_table => {foo => 'bar'})->then(sub {
304             my $results = shift;
305             ...
306             })->catch(sub {
307             my $err = shift;
308             ...
309             })->wait;
310              
311             =head2 pid
312              
313             my $pid = $db->pid;
314              
315             Return the connection id of the backend server process.
316              
317             =head2 ping
318              
319             my $bool = $db->ping;
320              
321             Check database connection.
322              
323             =head2 query
324              
325             my $results = $db->query('select * from foo');
326             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
327             my $results = $db->query('insert into foo values (?)', {json => {bar => 'baz'}});
328             my $results = $db->query('insert into foo values (?)', {type => SQL_INTEGER, value => 42});
329              
330             Execute a blocking statement and return a L object with the
331             results. You can also append a callback to perform operation non-blocking.
332              
333             $db->query('select * from foo' => sub {
334             my ($db, $err, $results) = @_;
335             ...
336             });
337             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
338              
339             Hash reference arguments containing a value named C, will be encoded to
340             JSON text with L. To accomplish the reverse, you can use
341             the method L, which automatically decodes data back
342             to Perl data structures.
343              
344             $db->query('insert into foo values (x) values (?)', {json => {bar => 'baz'}});
345             $db->query('select * from foo')->expand->hash->{x}{bar}; # baz
346              
347             Hash reference arguments containing values named C and C can be
348             used to bind specific L data types (see L) to
349             placeholders. This is needed to pass binary data in parameters; see
350             L for more information.
351              
352             # Insert binary data
353             use DBI ':sql_types';
354             $db->query('insert into bar values (?)', {type => SQL_BLOB, value => $bytes});
355              
356             =head2 query_p
357              
358             my $promise = $db->query_p('select * from foo');
359              
360             Same as L, but performs all operations non-blocking and returns a
361             L object instead of accepting a callback.
362              
363             $db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
364             my $results = shift;
365             ...
366             })->catch(sub {
367             my $err = shift;
368             ...
369             })->wait;
370              
371             =head2 quote
372              
373             my $escaped = $db->quote($str);
374              
375             Quote a string literal for use as a literal value in an SQL statement.
376              
377             =head2 quote_id
378              
379             my $escaped = $db->quote_id($id);
380              
381             Quote an identifier (table name etc.) for use in an SQL statement.
382              
383             =head2 select
384              
385             my $results = $db->select($source, $fields, $where, $order);
386              
387             Generate a C
388             L object) and execute it with L. You can also append a
389             callback to perform operations non-blocking.
390              
391             $db->select(some_table => ['foo'] => sub {
392             my ($db, $err, $results) = @_;
393             ...
394             });
395             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
396              
397             =head2 select_p
398              
399             my $promise = $db->select_p($source, $fields, $where, $order);
400              
401             Same as L, but performs all operations non-blocking and returns a
402             L object instead of accepting a callback.
403              
404             $db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
405             my $results = shift;
406             ...
407             })->catch(sub {
408             my $err = shift;
409             ...
410             })->wait;
411              
412             =head2 update
413              
414             my $results = $db->update($table, \%fieldvals, \%where);
415              
416             Generate an C statement with L (usually an
417             L object) and execute it with L. You can also append a
418             callback to perform operations non-blocking.
419              
420             $db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
421             my ($db, $err, $results) = @_;
422             ...
423             });
424             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
425              
426             =head2 update_p
427              
428             my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
429              
430             Same as L, but performs all operations non-blocking and returns a
431             L object instead of accepting a callback.
432              
433             $db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
434             my $results = shift;
435             ...
436             })->catch(sub {
437             my $err = shift;
438             ...
439             })->wait;
440              
441             =head2 tables
442              
443             my $tables = $db->tables;
444              
445             Return an array reference with table names for this database.
446              
447             =head1 SEE ALSO
448              
449             L.
450              
451             =cut