File Coverage

lib/AnyEvent/DBD/Pg.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package AnyEvent::DBD::Pg;
2              
3 1     1   32132 use 5.008008; # don't use old crap without utf8
  1         4  
  1         46  
4 1     1   5 use common::sense 3;m{
  1         16  
  1         7  
5             use strict;
6             use warnings;
7             }x;
8 1     1   70 use Scalar::Util 'weaken';
  1         6  
  1         112  
9 1     1   4 use Carp;
  1         1  
  1         121  
10 1     1   2757 use DBI;
  1         20681  
  1         77  
11 1     1   603 use DBD::Pg ':async';
  0            
  0            
12             use AE 5;
13             use Time::HiRes 'time';
14              
15             =head1 NAME
16              
17             AnyEvent::DBD::Pg - AnyEvent interface to DBD::Pg's async interface
18              
19             =cut
20              
21             our $VERSION = '0.03';
22              
23             =head1 SYNOPSIS
24              
25             use AnyEvent::DBD::Pg;
26            
27             my $adb = AnyEvent::DBD::Pg->new('dbi:Pg:dbname=test', user => 'pass', {
28             pg_enable_utf8 => 1,
29             pg_server_prepare => 0,
30             quote_char => '"',
31             name_sep => ".",
32             }, debug => 1);
33            
34             $adb->queue_size( 4 );
35             $adb->debug( 1 );
36            
37             $adb->connect;
38            
39             $adb->selectcol_arrayref("select pg_sleep( 0.1 ), 1", { Columns => [ 1 ] }, sub {
40             my $rc = shift or return warn;
41             my $res = shift;
42             warn "Got <$adb->{qd}> = $rc / @{$res}";
43             $adb->selectrow_hashref("select data,* from tx limit 2", {}, sub {
44             my $rc = shift or return warn;
45             warn "Got $adb->{qd} = $rc [@_]";
46             });
47             });
48            
49             $adb->execute("update tx set data = data;",sub {
50             my $rc = shift or return warn;
51             warn "Got exec: $rc";
52             #my $st = shift;
53             #$st->finish;
54             });
55            
56             $adb->execute("select from 1",sub {
57             shift or return warn;
58             warn "Got $adb->{qd} = @_";
59             });
60            
61             $adb->selectrow_array("select pg_sleep( 0.1 ), 2", {}, sub {
62             shift or return warn;
63             warn "Got $adb->{qd} = [@_]";
64             $adb->selectrow_hashref("select * from tx limit 1", {}, sub {
65             warn "Got $adb->{qd} = [@_]";
66             $adb->execute("select * from tx", sub {
67             my $rc = shift or return warn;
68             my $st = shift;
69             while(my $row = $st->fetchrow_hashref) { warn "$row->{id}"; }
70             $st->finish;
71             exit;
72             });
73             });
74             });
75             AE::cv->recv;
76              
77             =cut
78              
79             sub new {
80             my ($pkg,$dsn,$user,$pass,$args,@args) = @_;
81             $args ||= {};
82             my $self = bless {@args},$pkg;
83             $self->{cnn} = [$dsn,$user,$pass,$args];
84             $self->{queue_size} = 2;
85             $self->{queue} = [];
86             #$self->{current};
87             #$self->connect;
88             $self->{querynum} = 0;
89             $self->{queuetime} = 0;
90             $self->{querytime} = 0;
91             $self;
92             }
93              
94             BEGIN {
95             for my $method (qw( cnn queue_size debug )) {
96             *$method = sub { @_ > 1 ? $_[0]->{$method} = $_[1] : $_[0]->{$method} }
97             }
98             }
99              
100             sub connect {
101             my $self = shift;
102             my ($dsn,$user,$pass,$args) = @{ $self->{cnn} };
103             local $args->{RaiseError} = 0;
104             local $args->{PrintError} = 0;
105            
106             # TODO: it we have opened, for ex, 1,3,5 fds, then we got 2 for f1, 4 for dbi, 6 for f2, which is wrong;
107            
108             open my $fn1, '>','/dev/null';
109             open my $fn2, '>','/dev/null';
110             open my $fn3, '>','/dev/null';
111             my $candidate = fileno($fn2);
112             my $next = fileno($fn3);
113             close $fn2;
114             close $fn3;
115             if( $self->{db} = DBI->connect($dsn,$user,$pass,$args) ) {
116             open my $fn3, '>','/dev/null';
117             if (fileno $fn3 == $next) {
118             $self->{fh} = $candidate;
119             } else {
120             die sprintf "Bad descriptor definition implementation: got too many fds: [ %d -> %d -> %d <> %d -> ? -> %d ]\n",
121             fileno($fn1), $candidate,$next, fileno($fn3);
122             }
123             warn "Connection to $dsn established\n" if $self->{debug} > 2;
124             $self->{lasttry} = undef;
125             $self->{gone} = undef;
126             return $self->{db}->ping;
127             } else {
128             $self->{gone} = time unless defined $self->{gone};
129             $self->{lasttry} = time;
130             warn "Connection to $dsn failed: ".DBI->errstr;
131             return 0;
132             }
133             }
134              
135             our %METHOD = (
136             selectrow_array => 'fetchrow_array',
137             selectrow_arrayref => 'fetchrow_arrayref',
138             selectrow_hashref => 'fetchrow_hashref',
139             selectall_arrayref => 'fetchall_arrayref',
140             selectall_hashref => 'fetchall_hashref',
141             selectcol_arrayref => sub {
142             my ($st,$args) = @_;
143             $st->fetchall_arrayref($args->{Columns});
144             },
145             execute => sub { $_[0]; }, # just pass the $st
146             );
147              
148             sub DESTROY {}
149              
150             sub _dequeue {
151             my $self = shift;
152             if ($self->{db}->{pg_async_status} == 1 ) {
153             warn "Can't dequeue, while processing query ($self->{current}[0])";
154             return;
155             }
156             #warn "Run dequeue with status=$self->{db}->{pg_async_status}";
157             return $self->{current} = undef unless @{ $self->{queue} };
158             my $next = shift @{ $self->{queue} };
159             my $at = shift @$next;
160             $self->{queuetime} += time - $at;
161             my $method = shift @$next;
162             local $self->{queuing} = 0;
163             $self->$method(@$next);
164             }
165              
166             our $AUTOLOAD;
167             sub AUTOLOAD {
168             my ($method) = $AUTOLOAD =~ /([^:]+)$/;
169             my $self = shift;
170             die sprintf qq{Can't locate autoloaded object method "%s" (%s) via package "%s" at %s line %s.\n}, $method, $AUTOLOAD, ref $self, (caller)[1,2]
171             unless exists $METHOD{$method};
172             my $fetchmethod = $METHOD{$method};
173             defined $fetchmethod or croak "Method $method not implemented yet";
174             ref (my $cb = pop) eq 'CODE' or croak "need callback";
175             if ($self->{db}->{pg_async_status} == 1 or $self->{current} ) {
176             if ( @{ $self->{queue} } >= $self->{queue_size} - 1 ) {
177             my $c = 1;
178             my $counter = ++$self->{querynum};
179             local $@ = "Query $_[0] run out of queue size $self->{queue_size}";
180             printf STDERR "\e[036;1mQ$counter\e[0m. [\e[03${c};1m%0.4fs\e[0m] < \e[03${c};1m%s\e[0m > ".("\e[031;1mQuery run out of queue size\e[0m")."\n", 0 , $_[0];
181             return $cb->();
182             } else {
183             warn "Query $_[0] pushed to queue\n" if $self->{debug} > 1;
184             push @{ $self->{queue} }, [time(), $method, @_,$cb];
185             return;
186             }
187             }
188             my $query = shift;
189             my $args = shift || {};
190             $args->{pg_async} = PG_ASYNC;
191             my $counter = ++$self->{querynum};
192             warn "prepare call <$query>( @_ ), async status = ".$self->{db}->{pg_async_status} if $self->{debug} > 2;
193             $self->{current} = [$query,@_];
194             $self->{current_start} = time();
195            
196             weaken $self;
197             $self or return;
198             my ($st,$w,$t,$check);
199             my @watchers;
200             push @watchers, sub {
201             $self and $st or warn("no self"), @watchers = (), return 1;
202             #warn "check status=$self->{db}->{pg_async_status}\n";
203             if($self->{db}->{pg_async_status} and $st->pg_ready()) {
204             undef $w;
205             local $@;
206             my $res = $st->pg_result;
207             my $run = time - $self->{current_start};
208             $self->{querytime} += $run;
209             my ($diag,$DIE);
210             if ($self->{debug}) {
211             $diag = $self->{current}[0];
212             my @bind = @{ $self->{current} };
213             shift @bind;
214             $diag =~ s{\?}{ "'".shift(@bind)."'" }sge;
215             } else {
216             $diag = $self->{current}[0];
217             }
218             if (!$res) {
219             $DIE = $self->{db}->errstr;
220             }
221             local $self->{qd} = $diag;
222             if ($self->{debug}) {
223             my $c = $run < 0.01 ? '2' : $run < 0.1 ? '3' : '1';
224             my $x = $DIE ? '1' : '6';
225             printf STDERR "\e[036;1mQ$counter\e[0m. [\e[03${c};1m%0.4fs\e[0m] < \e[03${x};1m%s\e[0m > ".($DIE ? "\e[031;1m$DIE\e[0m" : '')."\n", $run , $diag;
226             }
227             local $self->{queuing} = @{ $self->{queue} };
228             if ($res) {
229             if (ref $fetchmethod) {
230             $cb->($res, $st->$fetchmethod($args));
231             } else {
232             $cb->($res, $st->$fetchmethod);
233            
234             # my @res = $st->$fetchmethod;
235             # undef $st;
236             # $self->_dequeue();
237             # $cb->($res, @res);
238             }
239             undef $st;
240             undef $self->{current};
241             $self->_dequeue();
242             @watchers = ();
243             } else {
244             local $@ = $DIE;
245             #warn "st failed: $@";
246             $st->finish;
247             $cb->();
248             undef $st;
249             undef $self->{current};
250             @watchers = ();
251             $self->_dequeue();
252             }
253             return 1;
254             }
255             return 0;
256             #undef $w;
257             };
258             $st = $self->{db}->prepare($query,$args)
259             and $st->execute(@_)
260             or return do{
261             undef $st;
262             @watchers = ();
263            
264             local $@ = $self->{db}->errstr;
265             warn;
266             $cb->();
267            
268             $self->_dequeue;
269             };
270             # At all we don't need timers for the work, but if we have some bugs, it will help us to find them
271             push @watchers, AE::timer 1,1, $watchers[0];
272             push @watchers, AE::io $self->{fh}, 0, $watchers[0];
273             $watchers[0]() and return;
274             return;
275             }
276              
277             =head1 METHODS
278              
279             =over 4
280              
281             =item connect()
282              
283             Establish connection to database
284              
285             =item selectrow_array( $query, [\%args], $cb->( $rc, ... ))
286              
287             Execute PG_ASYNC prepare, than push result of C into callback
288              
289             =item selectrow_arrayref( $query, [\%args], $cb->( $rc, \@row ))
290              
291             Execute PG_ASYNC prepare, than push result of C into callback
292              
293             =item selectrow_hashref( $query, [\%args], $cb->( $rc, \%row ))
294              
295             Execute PG_ASYNC prepare, than push result of C into callback
296              
297             =item selectall_arrayref( $query, [\%args], $cb->( $rc, \@rows ))
298              
299             Execute PG_ASYNC prepare, than push result of C into callback
300              
301             =item selectall_hashref( $query, [\%args], $cb->( $rc, \@rows ))
302              
303             Execute PG_ASYNC prepare, than push result of C into callback
304              
305             =item selectcol_arrayref( $query, { Columns => [...], ... }, $cb->( $rc, \@rows ))
306              
307             Execute PG_ASYNC prepare, than push result of C into callback
308              
309             =item execute( $query, [\%args], $cb->( $rc, $sth ))
310              
311             Execute PG_ASYNC prepare, than invoke callback, pushing resulting sth to it.
312              
313             B: result already passed as first argument
314              
315             =back
316              
317             =head1 AUTHOR
318              
319             Mons Anderson, C<< >>
320              
321             =head1 LICENSE AND COPYRIGHT
322              
323             Copyright 2010 Mons Anderson.
324              
325             This program is free software; you can redistribute it and/or modify it
326             under the terms of either: the GNU General Public License as published
327             by the Free Software Foundation; or the Artistic License.
328              
329             =cut
330              
331             1;