File Coverage

blib/lib/AnyEvent/DBI.pm
Criterion Covered Total %
statement 24 149 16.1
branch 0 46 0.0
condition 0 9 0.0
subroutine 8 30 26.6
pod 10 11 90.9
total 42 245 17.1


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::DBI - asynchronous DBI access
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::DBI;
8              
9             my $cv = AnyEvent->condvar;
10              
11             my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12              
13             $dbh->exec ("select * from test where num=?", 10, sub {
14             my ($dbh, $rows, $rv) = @_;
15              
16             $#_ or die "failure: $@";
17              
18             print "@$_\n"
19             for @$rows;
20              
21             $cv->broadcast;
22             });
23              
24             # asynchronously do sth. else here
25              
26             $cv->wait;
27              
28             =head1 DESCRIPTION
29              
30             This module is an L user, you need to make sure that you use and
31             run a supported event loop.
32              
33             This module implements asynchronous DBI access by forking or executing
34             separate "DBI-Server" processes and sending them requests.
35              
36             It means that you can run DBI requests in parallel to other tasks.
37              
38             With DBD::mysql, the overhead for very simple statements
39             ("select 0") is somewhere around 50% compared to an explicit
40             prepare_cached/execute/fetchrow_arrayref/finish combination. With
41             DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42              
43             =head2 ERROR HANDLING
44              
45             This module defines a number of functions that accept a callback
46             argument. All callbacks used by this module get their AnyEvent::DBI handle
47             object passed as first argument.
48              
49             If the request was successful, then there will be more arguments,
50             otherwise there will only be the C<$dbh> argument and C<$@> contains an
51             error message.
52              
53             A convenient way to check whether an error occurred is to check C<$#_> -
54             if that is true, then the function was successful, otherwise there was an
55             error.
56              
57             =cut
58              
59             package AnyEvent::DBI;
60              
61 1     1   762 use common::sense;
  1         12  
  1         4  
62              
63 1     1   50 use Carp;
  1         2  
  1         92  
64 1     1   395 use Convert::Scalar ();
  1         380  
  1         18  
65 1     1   496 use AnyEvent::Fork ();
  1         19725  
  1         26  
66 1     1   393 use CBOR::XS ();
  1         4670  
  1         33  
67              
68 1     1   6 use AnyEvent ();
  1         2  
  1         14  
69 1     1   5 use AnyEvent::Util ();
  1         2  
  1         11  
70              
71 1     1   4 use Errno ();
  1         2  
  1         1439  
72              
73             our $VERSION = '3.02';
74              
75             =head2 METHODS
76              
77             =over 4
78              
79             =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80              
81             Returns a database handle for the given database. Each database handle
82             has an associated server process that executes statements in order. If
83             you want to run more than one statement in parallel, you need to create
84             additional database handles.
85              
86             The advantage of this approach is that transactions work as state is
87             preserved.
88              
89             Example:
90              
91             $dbh = new AnyEvent::DBI
92             "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93              
94             Additional key-value pairs can be used to adjust behaviour:
95              
96             =over 4
97              
98             =item on_error => $callback->($dbh, $filename, $line, $fatal)
99              
100             When an error occurs, then this callback will be invoked. On entry, C<$@>
101             is set to the error message. C<$filename> and C<$line> is where the
102             original request was submitted.
103              
104             If the fatal argument is true then the database connection is shut down
105             and your database handle became invalid. In addition to invoking the
106             C callback, all of your queued request callbacks are called
107             without only the C<$dbh> argument.
108              
109             If omitted, then C will be called on any errors, fatal or not.
110              
111             =item on_connect => $callback->($dbh[, $success])
112              
113             If you supply an C callback, then this callback will be
114             invoked after the database connect attempt. If the connection succeeds,
115             C<$success> is true, otherwise it is missing and C<$@> contains the
116             C<$DBI::errstr>.
117              
118             Regardless of whether C is supplied, connect errors will result in
119             C being called. However, if no C callback is supplied, then
120             connection errors are considered fatal. The client will C and the C
121             callback will be called with C<$fatal> true.
122              
123             When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124             will not C. You still cannot, however, use the $dbh object you
125             received from C to make requests.
126              
127             =item fork_template => $AnyEvent::Fork-object
128              
129             C uses C<< AnyEvent::Fork->new >> to create the database
130             slave, which in turn either C's a new process (similar to the old
131             C constructor argument) or uses a process forked early (see
132             L).
133              
134             With this argument you can provide your own fork template. This can be
135             useful if you create a lot of C handles and want to save
136             memory (And speed up startup) by not having to load C again
137             and again into your child processes:
138              
139             my $template = AnyEvent::Fork
140             ->new # create new template
141             ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142              
143             for (...) {
144             $dbh = new AnyEvent::DBI ...
145             fork_template => $template;
146              
147             =item timeout => seconds
148              
149             If you supply a timeout parameter (fractional values are supported), then
150             a timer is started any time the DBI handle expects a response from the
151             server. This includes connection setup as well as requests made to the
152             backend. The timeout spans the duration from the moment the first data
153             is written (or queued to be written) until all expected responses are
154             returned, but is postponed for "timeout" seconds each time more data is
155             returned from the server. If the timer ever goes off then a fatal error is
156             generated. If you have an C handler installed, then it will be
157             called, otherwise your program will die().
158              
159             When altering your databases with timeouts it is wise to use
160             transactions. If you quit due to timeout while performing insert, update
161             or schema-altering commands you can end up not knowing if the action was
162             submitted to the database, complicating recovery.
163              
164             Timeout errors are always fatal.
165              
166             =back
167              
168             Any additional key-value pairs will be rolled into a hash reference
169             and passed as the final argument to the C<< DBI->connect (...) >>
170             call. For example, to suppress errors on STDERR and send them instead to an
171             AnyEvent::Handle you could do:
172              
173             $dbh = new AnyEvent::DBI
174             "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175             PrintError => 0,
176             on_error => sub {
177             $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178             };
179              
180             =cut
181              
182             sub new {
183 0     0 1   my ($class, $dbi, $user, $pass, %arg) = @_;
184              
185             # we use our own socketpair, so we always have a socket
186             # available, even before the forked process exsist.
187             # this is mostly done so this module is compatible
188             # to versions of itself older than 3.0.
189 0 0         my ($client, $server) = AnyEvent::Util::portable_socketpair
190             or croak "unable to create AnyEvent::DBI communications pipe: $!";
191              
192 0           my $fork = delete $arg{fork_template};
193              
194 0           my %dbi_args = %arg;
195 0           delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
196              
197 0           my $self = bless \%arg, $class;
198              
199 0           $self->{fh} = $client;
200              
201 0           my $rbuf;
202 0           my @caller = (caller)[1,2]; # the "default" caller
203              
204 0 0         $fork = $fork ? $fork->fork : AnyEvent::Fork->new
    0          
205             or croak "fork: $!";
206              
207 0           $fork->require ("AnyEvent::DBI::Slave");
208 0           $fork->send_arg ($VERSION);
209 0           $fork->send_fh ($server);
210              
211             # we don't rely on the callback, because we use our own
212             # socketpair, for better or worse.
213 0     0     $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
214              
215             {
216 0           Convert::Scalar::weaken (my $self = $self);
  0            
217              
218 0           my $cbor = new CBOR::XS;
219              
220             $self->{rw} = AE::io $client, 0, sub {
221 0     0     my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
222              
223 0 0         if ($len > 0) {
    0          
    0          
224             # we received data, so reset the timer
225 0           $self->{last_activity} = AE::now;
226              
227 0           for my $res ($cbor->incr_parse_multiple ($rbuf)) {
228 0 0         last unless $self;
229              
230 0           my $req = shift @{ $self->{queue} };
  0            
231              
232 0 0         if (defined $res->[0]) {
233 0           $res->[0] = $self;
234 0           $req->[0](@$res);
235             } else {
236 0           my $cb = shift @$req;
237 0           local $@ = $res->[1];
238 0           $cb->($self);
239 0 0         $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
240             if $self; # cb() could have deleted it
241             }
242              
243             # no more queued requests, so become idle
244 0 0 0       if ($self && !@{ $self->{queue} }) {
  0            
245 0           undef $self->{last_activity};
246 0           $self->{tw_cb}->();
247             }
248             }
249              
250             } elsif (defined $len) {
251             # todo, caller?
252 0           $self->_error ("unexpected eof", @caller, 1);
253             } elsif ($! != Errno::EAGAIN) {
254             # todo, caller?
255 0           $self->_error ("read error: $!", @caller, 1);
256             }
257 0           };
258              
259             $self->{tw_cb} = sub {
260 0 0 0 0     if ($self->{timeout} && $self->{last_activity}) {
261 0 0         if (AE::now > $self->{last_activity} + $self->{timeout}) {
262             # we did time out
263 0           my $req = $self->{queue}[0];
264 0           $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
265             } else {
266             # we need to re-set the timeout watcher
267             $self->{tw} = AE::timer
268             $self->{last_activity} + $self->{timeout} - AE::now,
269             0,
270             $self->{tw_cb},
271 0           ;
272             }
273             } else {
274             # no timeout check wanted, or idle
275 0           undef $self->{tw};
276             }
277 0           };
278              
279             $self->{ww_cb} = sub {
280 0     0     $self->{last_activity} = AE::now;
281              
282             my $len = syswrite $client, $self->{wbuf}
283 0 0         or return delete $self->{ww};
284              
285 0           substr $self->{wbuf}, 0, $len, "";
286 0           };
287             }
288              
289             $self->_req (
290             sub {
291 0 0   0     return unless $self;
292 0           $self->{child_pid} = $_[1];
293             },
294 0           (caller)[1,2],
295             "req_pid"
296             );
297              
298             $self->_req (
299             sub {
300 0 0   0     return unless $self;
301 0 0         &{ $self->{on_connect} } if $self->{on_connect};
  0            
302             },
303 0           (caller)[1,2],
304             req_open => $dbi, $user, $pass, %dbi_args
305             );
306              
307 0           $self
308             }
309              
310             sub _server_pid {
311             shift->{child_pid}
312 0     0     }
313              
314             sub kill_child {
315 0     0 0   my $self = shift;
316              
317 0 0         if (my $pid = delete $self->{child_pid}) {
318             # kill and reap process
319 0           my $kid_watcher; $kid_watcher = AE::child $pid, sub {
320 0     0     undef $kid_watcher;
321 0           };
322 0           kill TERM => $pid;
323             }
324              
325 0           delete $self->{rw};
326 0           delete $self->{ww};
327 0           delete $self->{tw};
328 0           close delete $self->{fh};
329             }
330              
331             sub DESTROY {
332 0     0     shift->kill_child;
333             }
334              
335             sub _error {
336 0     0     my ($self, $error, $filename, $line, $fatal) = @_;
337              
338 0 0         if ($fatal) {
339 0           delete $self->{tw};
340 0           delete $self->{rw};
341 0           delete $self->{ww};
342 0           delete $self->{fh};
343              
344             # for fatal errors call all enqueued callbacks with error
345 0           while (my $req = shift @{$self->{queue}}) {
  0            
346 0           local $@ = $error;
347 0           $req->[0]->($self);
348             }
349 0           $self->kill_child;
350             }
351              
352 0           local $@ = $error;
353              
354 0 0         if ($self->{on_error}) {
355 0           $self->{on_error}($self, $filename, $line, $fatal)
356             } else {
357 0           die "$error at $filename, line $line\n";
358             }
359             }
360              
361             =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
362              
363             Sets (or clears, with C) the C handler.
364              
365             =cut
366              
367             sub on_error {
368 0     0 1   $_[0]{on_error} = $_[1];
369             }
370              
371             =item $dbh->timeout ($seconds)
372              
373             Sets (or clears, with C) the database timeout. Useful to extend the
374             timeout when you are about to make a really long query.
375              
376             =cut
377              
378             sub timeout {
379 0     0 1   my ($self, $timeout) = @_;
380              
381 0           $self->{timeout} = $timeout;
382              
383             # reschedule timer if one was running
384 0           $self->{tw_cb}->();
385             }
386              
387             sub _req {
388 0     0     my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
389              
390 0 0         unless ($self->{fh}) {
391 0           local $@ = my $err = 'no database connection';
392 0           $cb->($self);
393 0           $self->_error ($err, $filename, $line, 1);
394 0           return;
395             }
396              
397 0           push @{ $self->{queue} }, [$cb, $filename, $line];
  0            
398              
399             # re-start timeout if necessary
400 0 0 0       if ($self->{timeout} && !$self->{tw}) {
401 0           $self->{last_activity} = AE::now;
402 0           $self->{tw_cb}->();
403             }
404              
405 0           $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
406              
407 0 0         unless ($self->{ww}) {
408 0           my $len = syswrite $self->{fh}, $self->{wbuf};
409 0           substr $self->{wbuf}, 0, $len, "";
410              
411             # still any left? then install a write watcher
412             $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
413 0 0         if length $self->{wbuf};
414             }
415             }
416              
417             =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
418              
419             An accessor for the database handle attributes, such as C,
420             C, C and so on. If you provide an C<$attr_value>
421             (which might be C), then the given attribute will be set to that
422             value.
423              
424             The callback will be passed the database handle and the attribute's value
425             if successful.
426              
427             If an error occurs and the C callback returns, then only C<$dbh>
428             will be passed and C<$@> contains the error message.
429              
430             =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
431              
432             Executes the given SQL statement with placeholders replaced by
433             C<@args>. The statement will be prepared and cached on the server side, so
434             using placeholders is extremely important.
435              
436             The callback will be called with a weakened AnyEvent::DBI object as the
437             first argument and the result of C as (or C
438             if the statement wasn't a select statement) as the second argument.
439              
440             Third argument is the return value from the C<< DBI->execute >> method
441             call.
442              
443             If an error occurs and the C callback returns, then only C<$dbh>
444             will be passed and C<$@> contains the error message.
445              
446             =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
447              
448             An accessor for the statement attributes of the most recently executed
449             statement, such as C or C.
450              
451             The callback will be passed the database handle and the attribute's value
452             if successful.
453              
454             If an error occurs and the C callback returns, then only C<$dbh>
455             will be passed and C<$@> contains the error message.
456              
457             =item $dbh->begin_work ($cb->($dbh[, $rc]))
458              
459             =item $dbh->commit ($cb->($dbh[, $rc]))
460              
461             =item $dbh->rollback ($cb->($dbh[, $rc]))
462              
463             The begin_work, commit, and rollback methods expose the equivalent
464             transaction control method of the DBI driver. On success, C<$rc> is true.
465              
466             If an error occurs and the C callback returns, then only C<$dbh>
467             will be passed and C<$@> contains the error message.
468              
469             =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
470              
471             This gives access to database driver private methods. Because they
472             are not standard you cannot always depend on the value of C<$rc> or
473             C<$dbi_err>. Check the documentation for your specific driver/function
474             combination to see what it returns.
475              
476             Note that the first argument will be eval'ed to produce the argument list to
477             the func() method. This must be done because the serialization protocol
478             between the AnyEvent::DBI server process and your program does not support the
479             passage of closures.
480              
481             Here's an example to extend the query language in SQLite so it supports an
482             intstr() function:
483              
484             $cv = AnyEvent->condvar;
485             $dbh->func (
486             q{
487             instr => 2, sub {
488             my ($string, $search) = @_;
489             return index $string, $search;
490             },
491             },
492             create_function => sub {
493             return $cv->send ($@)
494             unless $#_;
495             $cv->send (undef, @_[1,2,3]);
496             }
497             );
498              
499             my ($err,$rc,$errcode,$errstr) = $cv->recv;
500              
501             die $err if defined $err;
502             die "EVAL failed: $errstr"
503             if $errcode;
504              
505             # otherwise, we can ignore $rc and $errcode for this particular func
506              
507             =cut
508              
509             for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
510 0     0 1   eval 'sub ' . $cmd_name . '{
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0     0 1    
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
511             my $cb = pop;
512             splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
513             &_req
514             }';
515             }
516              
517             =back
518              
519             =head1 SEE ALSO
520              
521             L, L, L.
522              
523             =head1 AUTHOR AND CONTACT
524              
525             Marc Lehmann (current maintainer)
526             http://home.schmorp.de/
527              
528             Adam Rosenstein
529             http://www.redcondor.com/
530              
531             =cut
532              
533             1