File Coverage

blib/lib/AnyEvent/DBI.pm
Criterion Covered Total %
statement 24 150 16.0
branch 0 46 0.0
condition 0 9 0.0
subroutine 8 30 26.6
pod 10 11 90.9
total 42 246 17.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::DBI - asynchronous DBI access
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::DBI;
8              
9             my $cv = AnyEvent->condvar;
10              
11             my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12              
13             $dbh->exec ("select * from test where num=?", 10, sub {
14             my ($dbh, $rows, $rv) = @_;
15              
16             $#_ or die "failure: $@";
17              
18             print "@$_\n"
19             for @$rows;
20              
21             $cv->broadcast;
22             });
23              
24             # asynchronously do sth. else here
25              
26             $cv->wait;
27              
28             =head1 DESCRIPTION
29              
30             This module is an L user, you need to make sure that you use and
31             run a supported event loop.
32              
33             This module implements asynchronous DBI access by forking or executing
34             separate "DBI-Server" processes and sending them requests.
35              
36             It means that you can run DBI requests in parallel to other tasks.
37              
38             With DBD::mysql, the overhead for very simple statements
39             ("select 0") is somewhere around 50% compared to an explicit
40             prepare_cached/execute/fetchrow_arrayref/finish combination. With
41             DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42              
43             =head2 ERROR HANDLING
44              
45             This module defines a number of functions that accept a callback
46             argument. All callbacks used by this module get their AnyEvent::DBI handle
47             object passed as first argument.
48              
49             If the request was successful, then there will be more arguments,
50             otherwise there will only be the C<$dbh> argument and C<$@> contains an
51             error message.
52              
53             A convenient way to check whether an error occurred is to check C<$#_> -
54             if that is true, then the function was successful, otherwise there was an
55             error.
56              
57             =cut
58              
59             package AnyEvent::DBI;
60              
61 1     1   795 use common::sense;
  1         10  
  1         4  
62              
63 1     1   44 use Carp;
  1         2  
  1         64  
64 1     1   440 use Convert::Scalar ();
  1         451  
  1         17  
65 1     1   486 use AnyEvent::Fork ();
  1         17948  
  1         25  
66 1     1   494 use CBOR::XS ();
  1         4395  
  1         19  
67              
68 1     1   6 use AnyEvent ();
  1         1  
  1         11  
69 1     1   4 use AnyEvent::Util ();
  1         1  
  1         10  
70              
71 1     1   3 use Errno ();
  1         2  
  1         1455  
72              
73             our $VERSION = '3.04';
74              
75             =head2 METHODS
76              
77             =over 4
78              
79             =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80              
81             Returns a database handle for the given database. Each database handle
82             has an associated server process that executes statements in order. If
83             you want to run more than one statement in parallel, you need to create
84             additional database handles.
85              
86             The advantage of this approach is that transactions work as state is
87             preserved.
88              
89             Example:
90              
91             $dbh = new AnyEvent::DBI
92             "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93              
94             Additional key-value pairs can be used to adjust behaviour:
95              
96             =over 4
97              
98             =item on_error => $callback->($dbh, $filename, $line, $fatal)
99              
100             When an error occurs, then this callback will be invoked. On entry, C<$@>
101             is set to the error message. C<$filename> and C<$line> is where the
102             original request was submitted.
103              
104             If the fatal argument is true then the database connection is shut down
105             and your database handle became invalid. In addition to invoking the
106             C callback, all of your queued request callbacks are called
107             without only the C<$dbh> argument.
108              
109             If omitted, then C will be called on any errors, fatal or not.
110              
111             =item on_connect => $callback->($dbh[, $success])
112              
113             If you supply an C callback, then this callback will be
114             invoked after the database connect attempt. If the connection succeeds,
115             C<$success> is true, otherwise it is missing and C<$@> contains the
116             C<$DBI::errstr>.
117              
118             Regardless of whether C is supplied, connect errors will result in
119             C being called. However, if no C callback is supplied, then
120             connection errors are considered fatal. The client will C and the C
121             callback will be called with C<$fatal> true.
122              
123             When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124             will not C. You still cannot, however, use the $dbh object you
125             received from C to make requests.
126              
127             =item fork_template => $AnyEvent::Fork-object
128              
129             C uses C<< AnyEvent::Fork->new >> to create the database
130             slave, which in turn either C's a new process (similar to the old
131             C constructor argument) or uses a process forked early (see
132             L).
133              
134             With this argument you can provide your own fork template. This can be
135             useful if you create a lot of C handles and want to save
136             memory (And speed up startup) by not having to load C again
137             and again into your child processes:
138              
139             my $template = AnyEvent::Fork
140             ->new # create new template
141             ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142              
143             for (...) {
144             $dbh = new AnyEvent::DBI ...
145             fork_template => $template;
146              
147             =item timeout => seconds
148              
149             If you supply a timeout parameter (fractional values are supported), then
150             a timer is started any time the DBI handle expects a response from the
151             server. This includes connection setup as well as requests made to the
152             backend. The timeout spans the duration from the moment the first data
153             is written (or queued to be written) until all expected responses are
154             returned, but is postponed for "timeout" seconds each time more data is
155             returned from the server. If the timer ever goes off then a fatal error is
156             generated. If you have an C handler installed, then it will be
157             called, otherwise your program will die().
158              
159             When altering your databases with timeouts it is wise to use
160             transactions. If you quit due to timeout while performing insert, update
161             or schema-altering commands you can end up not knowing if the action was
162             submitted to the database, complicating recovery.
163              
164             Timeout errors are always fatal.
165              
166             =back
167              
168             Any additional key-value pairs will be rolled into a hash reference
169             and passed as the final argument to the C<< DBI->connect (...) >>
170             call. For example, to suppress errors on STDERR and send them instead to an
171             AnyEvent::Handle you could do:
172              
173             $dbh = new AnyEvent::DBI
174             "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175             PrintError => 0,
176             on_error => sub {
177             $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178             };
179              
180             =cut
181              
182             sub new {
183 0     0 1   my ($class, $dbi, $user, $pass, %arg) = @_;
184              
185             # we use our own socketpair, so we always have a socket
186             # available, even before the forked process exsist.
187             # this is mostly done so this module is compatible
188             # to versions of itself older than 3.0.
189 0 0         my ($client, $server) = AnyEvent::Util::portable_socketpair
190             or croak "unable to create AnyEvent::DBI communications pipe: $!";
191              
192 0           AnyEvent::fh_unblock $client;
193              
194 0           my $fork = delete $arg{fork_template};
195              
196 0           my %dbi_args = %arg;
197 0           delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
198              
199 0           my $self = bless \%arg, $class;
200              
201 0           $self->{fh} = $client;
202              
203 0           my $rbuf;
204 0           my @caller = (caller)[1,2]; # the "default" caller
205              
206 0 0         $fork = $fork ? $fork->fork : AnyEvent::Fork->new
    0          
207             or croak "fork: $!";
208              
209 0           $fork->require ("AnyEvent::DBI::Slave");
210 0           $fork->send_arg ($VERSION);
211 0           $fork->send_fh ($server);
212              
213             # we don't rely on the callback, because we use our own
214             # socketpair, for better or worse.
215 0     0     $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216              
217             {
218 0           Convert::Scalar::weaken (my $self = $self);
  0            
219              
220 0           my $cbor = new CBOR::XS;
221              
222             $self->{rw} = AE::io $client, 0, sub {
223 0     0     my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
224              
225 0 0         if ($len > 0) {
    0          
    0          
226             # we received data, so reset the timer
227 0           $self->{last_activity} = AE::now;
228              
229 0           for my $res ($cbor->incr_parse_multiple ($rbuf)) {
230 0 0         last unless $self;
231              
232 0           my $req = shift @{ $self->{queue} };
  0            
233              
234 0 0         if (defined $res->[0]) {
235 0           $res->[0] = $self;
236 0           $req->[0](@$res);
237             } else {
238 0           my $cb = shift @$req;
239 0           local $@ = $res->[1];
240 0           $cb->($self);
241 0 0         $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
242             if $self; # cb() could have deleted it
243             }
244              
245             # no more queued requests, so become idle
246 0 0 0       if ($self && !@{ $self->{queue} }) {
  0            
247 0           undef $self->{last_activity};
248 0           $self->{tw_cb}->();
249             }
250             }
251              
252             } elsif (defined $len) {
253             # todo, caller?
254 0           $self->_error ("unexpected eof", @caller, 1);
255             } elsif ($! != Errno::EAGAIN) {
256             # todo, caller?
257 0           $self->_error ("read error: $!", @caller, 1);
258             }
259 0           };
260              
261             $self->{tw_cb} = sub {
262 0 0 0 0     if ($self->{timeout} && $self->{last_activity}) {
263 0 0         if (AE::now > $self->{last_activity} + $self->{timeout}) {
264             # we did time out
265 0           my $req = $self->{queue}[0];
266 0           $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
267             } else {
268             # we need to re-set the timeout watcher
269             $self->{tw} = AE::timer
270             $self->{last_activity} + $self->{timeout} - AE::now,
271             0,
272             $self->{tw_cb},
273 0           ;
274             }
275             } else {
276             # no timeout check wanted, or idle
277 0           undef $self->{tw};
278             }
279 0           };
280              
281             $self->{ww_cb} = sub {
282 0     0     $self->{last_activity} = AE::now;
283              
284             my $len = syswrite $client, $self->{wbuf}
285 0 0         or return delete $self->{ww};
286              
287 0           substr $self->{wbuf}, 0, $len, "";
288 0           };
289             }
290              
291             $self->_req (
292             sub {
293 0 0   0     return unless $self;
294 0           $self->{child_pid} = $_[1];
295             },
296 0           (caller)[1,2],
297             "req_pid"
298             );
299              
300             $self->_req (
301             sub {
302 0 0   0     return unless $self;
303 0 0         &{ $self->{on_connect} } if $self->{on_connect};
  0            
304             },
305 0           (caller)[1,2],
306             req_open => $dbi, $user, $pass, %dbi_args
307             );
308              
309 0           $self
310             }
311              
312             sub _server_pid {
313             shift->{child_pid}
314 0     0     }
315              
316             sub kill_child {
317 0     0 0   my $self = shift;
318              
319 0 0         if (my $pid = delete $self->{child_pid}) {
320             # kill and reap process
321 0           my $kid_watcher; $kid_watcher = AE::child $pid, sub {
322 0     0     undef $kid_watcher;
323 0           };
324 0           kill TERM => $pid;
325             }
326              
327 0           delete $self->{rw};
328 0           delete $self->{ww};
329 0           delete $self->{tw};
330 0           close delete $self->{fh};
331             }
332              
333             sub DESTROY {
334 0     0     shift->kill_child;
335             }
336              
337             sub _error {
338 0     0     my ($self, $error, $filename, $line, $fatal) = @_;
339              
340 0 0         if ($fatal) {
341 0           delete $self->{tw};
342 0           delete $self->{rw};
343 0           delete $self->{ww};
344 0           delete $self->{fh};
345              
346             # for fatal errors call all enqueued callbacks with error
347 0           while (my $req = shift @{$self->{queue}}) {
  0            
348 0           local $@ = $error;
349 0           $req->[0]->($self);
350             }
351 0           $self->kill_child;
352             }
353              
354 0           local $@ = $error;
355              
356 0 0         if ($self->{on_error}) {
357 0           $self->{on_error}($self, $filename, $line, $fatal)
358             } else {
359 0           die "$error at $filename, line $line\n";
360             }
361             }
362              
363             =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
364              
365             Sets (or clears, with C) the C handler.
366              
367             =cut
368              
369             sub on_error {
370 0     0 1   $_[0]{on_error} = $_[1];
371             }
372              
373             =item $dbh->timeout ($seconds)
374              
375             Sets (or clears, with C) the database timeout. Useful to extend the
376             timeout when you are about to make a really long query.
377              
378             =cut
379              
380             sub timeout {
381 0     0 1   my ($self, $timeout) = @_;
382              
383 0           $self->{timeout} = $timeout;
384              
385             # reschedule timer if one was running
386 0           $self->{tw_cb}->();
387             }
388              
389             sub _req {
390 0     0     my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
391              
392 0 0         unless ($self->{fh}) {
393 0           local $@ = my $err = 'no database connection';
394 0           $cb->($self);
395 0           $self->_error ($err, $filename, $line, 1);
396 0           return;
397             }
398              
399 0           push @{ $self->{queue} }, [$cb, $filename, $line];
  0            
400              
401             # re-start timeout if necessary
402 0 0 0       if ($self->{timeout} && !$self->{tw}) {
403 0           $self->{last_activity} = AE::now;
404 0           $self->{tw_cb}->();
405             }
406              
407 0           $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
408              
409 0 0         unless ($self->{ww}) {
410 0           my $len = syswrite $self->{fh}, $self->{wbuf};
411 0           substr $self->{wbuf}, 0, $len, "";
412              
413             # still any left? then install a write watcher
414             $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
415 0 0         if length $self->{wbuf};
416             }
417             }
418              
419             =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420              
421             An accessor for the database handle attributes, such as C,
422             C, C and so on. If you provide an C<$attr_value>
423             (which might be C), then the given attribute will be set to that
424             value.
425              
426             The callback will be passed the database handle and the attribute's value
427             if successful.
428              
429             If an error occurs and the C callback returns, then only C<$dbh>
430             will be passed and C<$@> contains the error message.
431              
432             =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
433              
434             Executes the given SQL statement with placeholders replaced by
435             C<@args>. The statement will be prepared and cached on the server side, so
436             using placeholders is extremely important.
437              
438             The callback will be called with a weakened AnyEvent::DBI object as the
439             first argument and the result of C as (or C
440             if the statement wasn't a select statement) as the second argument.
441              
442             Third argument is the return value from the C<< DBI->execute >> method
443             call.
444              
445             If an error occurs and the C callback returns, then only C<$dbh>
446             will be passed and C<$@> contains the error message.
447              
448             =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
449              
450             An accessor for the statement attributes of the most recently executed
451             statement, such as C or C.
452              
453             The callback will be passed the database handle and the attribute's value
454             if successful.
455              
456             If an error occurs and the C callback returns, then only C<$dbh>
457             will be passed and C<$@> contains the error message.
458              
459             =item $dbh->begin_work ($cb->($dbh[, $rc]))
460              
461             =item $dbh->commit ($cb->($dbh[, $rc]))
462              
463             =item $dbh->rollback ($cb->($dbh[, $rc]))
464              
465             The begin_work, commit, and rollback methods expose the equivalent
466             transaction control method of the DBI driver. On success, C<$rc> is true.
467              
468             If an error occurs and the C callback returns, then only C<$dbh>
469             will be passed and C<$@> contains the error message.
470              
471             =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
472              
473             This gives access to database driver private methods. Because they
474             are not standard you cannot always depend on the value of C<$rc> or
475             C<$dbi_err>. Check the documentation for your specific driver/function
476             combination to see what it returns.
477              
478             Note that the first argument will be eval'ed to produce the argument list to
479             the func() method. This must be done because the serialization protocol
480             between the AnyEvent::DBI server process and your program does not support the
481             passage of closures.
482              
483             Here's an example to extend the query language in SQLite so it supports an
484             intstr() function:
485              
486             $cv = AnyEvent->condvar;
487             $dbh->func (
488             q{
489             instr => 2, sub {
490             my ($string, $search) = @_;
491             return index $string, $search;
492             },
493             },
494             create_function => sub {
495             return $cv->send ($@)
496             unless $#_;
497             $cv->send (undef, @_[1,2,3]);
498             }
499             );
500              
501             my ($err,$rc,$errcode,$errstr) = $cv->recv;
502              
503             die $err if defined $err;
504             die "EVAL failed: $errstr"
505             if $errcode;
506              
507             # otherwise, we can ignore $rc and $errcode for this particular func
508              
509             =cut
510              
511             for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
512 0     0 1   eval 'sub ' . $cmd_name . '{
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
513             my $cb = pop;
514             splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
515             &_req
516             }';
517             }
518              
519             =back
520              
521             =head1 SEE ALSO
522              
523             L, L, L.
524              
525             =head1 AUTHOR AND CONTACT
526              
527             Marc Lehmann (current maintainer)
528             http://home.schmorp.de/
529              
530             Adam Rosenstein
531             http://www.redcondor.com/
532              
533             =cut
534              
535             1