File Coverage

blib/lib/Net/Async/PostgreSQL/Client.pm
Criterion Covered Total %
statement 24 107 22.4
branch 0 20 0.0
condition 0 5 0.0
subroutine 8 29 27.5
pod 8 11 72.7
total 40 172 23.2


line stmt bran cond sub pod time code
1             package Net::Async::PostgreSQL::Client;
2             {
3             $Net::Async::PostgreSQL::Client::VERSION = '0.007';
4             }
5 1     1   1714 use strict;
  1         3  
  1         45  
6 1     1   6 use warnings;
  1         2  
  1         33  
7 1     1   1010 use Protocol::PostgreSQL::Client '0.008';
  1         138430  
  1         38  
8 1     1   15 use parent qw{IO::Async::Protocol::Stream};
  1         2  
  1         7  
9 1     1   8502 use Scalar::Util ();
  1         3  
  1         30  
10              
11             =head1 NAME
12              
13             Net::Async::PostgreSQL - support for the PostgreSQL wire protocol
14              
15             =head1 VERSION
16              
17             version 0.007
18              
19             =head1 SYNOPSIS
20              
21             # Simple queries are performed similar to DBI:
22             $dbh->do(q{insert into something (x,y,z) values (1,2,3)});
23              
24             # These can also use bind variables:
25             $dbh->do(q{insert into something (x,y,z) values (?,?,?)}, undef, 1,2,3);
26              
27             # Prepared statements work the same as DBI by default
28             my $sth = $dbh->prepare(q{select * from table where name = ?});
29             $sth->bind_param(1, 'test');
30             $sth->execute;
31              
32             # ... but have async_ versions for passing handlers:
33             my $sth = $dbh->async_prepare(
34             sql => q{select * from table where name = ?},
35             on_error => sub { warn "failed" }
36             );
37             $sth->async_execute(
38             on_bind_request => sub {
39             return @param;
40             },
41             on_header => sub { ... },
42             on_row => sub { ... },
43             on_error => sub { ... },
44             on_complete => sub { ... },
45             );
46              
47             # And there's a helper method for doing regular queries:
48             $dbh->run_query(
49             sql => q{select * from something where id = ?},
50             parameters => [1],
51             on_row => sub { warn "Had " . $_[1]->{} },
52             on_error => sub { warn "Error encountered" },
53             on_complete => sub { warn "all done" }
54             );
55              
56             =head1 DESCRIPTION
57              
58             The interface is provided by L, which attempts to offer something close to
59             L but with support for event-based request handling.
60              
61             See L for more details.
62              
63             =cut
64              
65 1     1   1026 use Socket qw(SOCK_STREAM);
  1         5937  
  1         271  
66              
67             BEGIN {
68 1     1   3 foreach my $k (qw(
69             send_message
70             has_queued
71             is_authenticated
72             is_first_message
73             initial_request
74             queue
75             send_next_in_queue
76             message
77             debug
78             handle_message
79             message_length
80             simple_query
81             copy_data
82             copy_done
83             backend_state
84             active_statement
85             prepare
86             prepare_async
87             row_description
88             is_ready
89             send_copy_data
90             add_handler_for_event
91             )) {
92 1     1   11 no strict 'refs';
  1         3  
  1         67  
93 22     0   54 *{join '::', __PACKAGE__, $k} = sub { shift->pg->$k(@_) };
  22         1327  
  0            
94             }
95             }
96              
97             =head1 METHODS
98              
99             =cut
100              
101             sub new {
102 0     0 1   my $class = shift;
103 0           my %args = @_;
104              
105             # Clear any options that will cause the parent class to complain
106 0           my $loop = delete $args{loop};
107              
108             # Want the IO::Async::Protocol constructor, so SUPER is good enough for us here
109 0           my $self = $class->SUPER::new;
110             $self->pg->add_handler_for_event(send_request => $self->_capture_weakself(sub {
111 0     0     my ($self) = splice @_, 0, 2; # ignore pg object
112 0           $self->write(@_);
113 0           return 1;
114 0           }));
115 0           $self->configure(%args);
116              
117             # Automatically add to the event loop if we were passed one
118 0 0         $loop->add($self) if $loop;
119 0           return $self;
120             }
121              
122             sub sap {
123 0     0 0   my ($self, $sub) = @_;
124 0           Scalar::Util::weaken $self;
125 0     0     return sub { $self->$sub(@_); };
  0            
126             }
127              
128             =head2 configure
129              
130             Apply callbacks and other parameters, preparing state for event loop start.
131              
132             =cut
133              
134             sub configure {
135 0     0 1   my $self = shift;
136 0           my %args = @_;
137              
138             # Debug flag is used to control the copious amounts of data that we dump out when tracing
139 0 0         $self->{debug} = $args{debug} if exists $args{debug};
140              
141             # %args = $self->pg->configure(%args);
142 0           foreach (qw{host service}) {
143 0 0         $self->{$_} = delete $args{$_} if exists $args{$_};
144             }
145 0           %args = $self->pg->configure(%args);
146 0           $self->SUPER::configure(%args);
147 0           return $self;
148             }
149              
150             sub pg {
151 0     0 0   my $self = shift;
152 0 0         if(@_) {
153 0           $self->{pg} = shift;
154 0           return $self;
155             }
156 0   0       $self->{pg} ||= Protocol::PostgreSQL::Client->new;
157 0           return $self->{pg};
158             }
159              
160             =head2 on_connection_established
161              
162             Prepare and activate a new transport.
163              
164             =cut
165              
166             sub on_connection_established {
167 0     0 1   my $self = shift;
168 0           my $sock = shift;
169 0 0         my $transport = IO::Async::Stream->new(handle => $sock)
170             or die "No transport?";
171 0           $self->configure(transport => $transport);
172 0           $self->debug("Have transport " . $self->transport);
173             }
174              
175             =head2 on_starttls
176              
177             Upgrade the underlying stream to use TLS.
178              
179             =cut
180              
181             sub on_starttls {
182 0     0 1   my $self = shift;
183 0           $self->debug("Upgrading to TLS");
184              
185 0           require IO::Async::SSLStream;
186              
187             $self->SSL_upgrade(
188             on_upgraded => $self->_capture_weakself(sub {
189 0     0     my ($self) = @_;
190 0           $self->debug("TLS upgrade complete");
191              
192 0           $self->{tls_enabled} = 1;
193 0           $self->initial_request;
194             }),
195 0     0     on_error => sub { die "error @_"; }
196 0           );
197             }
198              
199             =head2 connect
200              
201             =cut
202              
203             sub connect {
204 0     0 1   my $self = shift;
205 0           my %args = @_;
206              
207 0           my $on_connected = delete $args{on_connected};
208 0 0         my $host = exists $args{host} ? delete $args{host} : $self->{host};
209             $self->pg->add_handler_for_event(password => sub {
210 0     0     my $self = shift;
211 0           $self->send_message('PasswordMessage', password => $self->{pass});
212 0           return 0; # single-shot event
213 0           });
214             $self->SUPER::connect(
215             %args,
216             service => $args{service} || $self->{service} || 5432,
217             host => $host,
218             socktype => SOCK_STREAM,
219             on_resolve_error => sub {
220 0     0     die "Resolution failed for $host";
221             },
222             on_connect_error => sub {
223 0     0     die "Could not connect to $host";
224             },
225             on_connected => $self->sap(sub {
226 0     0     my $self = shift;
227 0           my ($pg, $sock) = @_;
228 0           $self->pg->initial_request;
229 0 0         $on_connected->($self) if $on_connected;
230             })
231 0   0       );
232             }
233              
234             =head2 on_read
235              
236             Handle read requests by passing full packets back to the protocol handler.
237              
238             =cut
239              
240             sub on_read {
241 0     0 1   my $self = shift;
242 0           my ($buffref, $eof) = @_;
243 0 0         return 0 unless length($$buffref) >= 5;
244 0           my ($code, $size) = unpack('C1N1', $$buffref);
245 0 0         if(length($$buffref) >= $size+1) {
246 0           $self->pg->handle_message(substr $$buffref, 0, $size+1, '');
247 0           return 1;
248             }
249 0           return 0;
250             }
251              
252             =head2 do
253              
254              
255             =cut
256              
257             sub do {
258 0     0 1   my $self = shift;
259 0           my ($sql, $attrib, @param) = @_;
260 0           $self->simple_query($sql);
261 0           return $self;
262             }
263              
264             sub on_password {
265 0     0 0   my $self = shift;
266             }
267              
268             =head2 terminate
269              
270             Sends the Terminate message to the database server and closes the connection for a clean
271             shutdown.
272              
273             =cut
274              
275             sub terminate {
276 0     0 1   my $self = shift;
277 0 0         return unless $self->transport;
278              
279 0           my $transport = $self->transport;
280 0           Scalar::Util::weaken(my $loop = $transport->get_loop);
281             # TODO could just ->close_when_empty?
282             $transport->configure(on_outgoing_empty => $self->_capture_weakself(sub {
283 0     0     my $self = shift;
284 0           $self->close;
285 0           $loop->later($self->_capture_weakself(sub { $self->pg->invoke_event('closed'); }));
  0            
286 0           }));
287 0           $self->send_message('Terminate');
288 0           return $self;
289             }
290              
291             1;
292              
293             __END__