line  
 stmt  
 bran  
 cond  
 sub  
 pod  
 time  
 code  
 
1 
 
  
 
   
 
 
 
 
 
 
 
 
 
 
 
 package Database::Async;  
 
2 
 
 
 
 
 
 
 
 
 
 
 
 
 
 # ABSTRACT: database interface for use with IO::Async  
 
3 
 
2
 
 
 
 
 
  
2
   
 
 
 
171267
 
 use strict;  
 
  
 
2
 
 
 
 
 
 
 
 
 
23
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
61
 
    
 
4 
 
2
 
 
 
 
 
  
2
   
 
 
 
10
 
 use warnings;  
 
  
 
2
 
 
 
 
 
 
 
 
 
4
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
84
 
    
 
5 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
6 
 
 
 
 
 
 
 
 
 
 
 
 
 
 our $VERSION = '0.016';  
 
7 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
8 
 
2
 
 
 
 
 
  
2
   
 
 
 
923
 
 use parent qw(Database::Async::DB IO::Async::Notifier);  
 
  
 
2
 
 
 
 
 
 
 
 
 
628
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
24
 
    
 
9 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
10 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 NAME  
 
11 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
12 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Database::Async - provides a database abstraction layer for L   
 
13 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
14 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 SYNOPSIS  
 
15 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
16 
 
 
 
 
 
 
 
 
 
 
 
 
 
  # Just looking up one thing?  
 
17 
 
 
 
 
 
 
 
 
 
 
 
 
 
  my ($id) = $db->query(  
 
18 
 
 
 
 
 
 
 
 
 
 
 
 
 
   q{select id from some_table where name = ?},  
 
19 
 
 
 
 
 
 
 
 
 
 
 
 
 
   bind => ['some name']  
 
20 
 
 
 
 
 
 
 
 
 
 
 
 
 
  )->single  
 
21 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # This is an example, so we want the result immediately - in  
 
22 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # real async code, you'd rarely call Future->get, but would  
 
23 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # typically use `->then` or `->on_done` instead  
 
24 
 
 
 
 
 
 
 
 
 
 
 
 
 
   ->get;  
 
25 
 
 
 
 
 
 
 
 
 
 
 
 
 
  # or, with Future::AsyncAwait, try:  
 
26 
 
 
 
 
 
 
 
 
 
 
 
 
 
  my ($id) = await $db->query(  
 
27 
 
 
 
 
 
 
 
 
 
 
 
 
 
   q{select id from some_table where name = ?},  
 
28 
 
 
 
 
 
 
 
 
 
 
 
 
 
   bind => ['some name']  
 
29 
 
 
 
 
 
 
 
 
 
 
 
 
 
  )->single;  
 
30 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
31 
 
 
 
 
 
 
 
 
 
 
 
 
 
  # Simple query  
 
32 
 
 
 
 
 
 
 
 
 
 
 
 
 
  $db->query(q{select id, some_data from some_table})  
 
33 
 
 
 
 
 
 
 
 
 
 
 
 
 
     ->row_hashrefs  
 
34 
 
 
 
 
 
 
 
 
 
 
 
 
 
     ->each(sub {  
 
35 
 
 
 
 
 
 
 
 
 
 
 
 
 
         printf "ID %d, data %s\n", $_->{id}, $_->{some_data};  
 
36 
 
 
 
 
 
 
 
 
 
 
 
 
 
     })  
 
37 
 
 
 
 
 
 
 
 
 
 
 
 
 
     # If you want to complete the full query, don't forget to call  
 
38 
 
 
 
 
 
 
 
 
 
 
 
 
 
     # ->get or ->retain here!  
 
39 
 
 
 
 
 
 
 
 
 
 
 
 
 
     ->retain;  
 
40 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
41 
 
 
 
 
 
 
 
 
 
 
 
 
 
  # Transactions  
 
42 
 
 
 
 
 
 
 
 
 
 
 
 
 
  $db->transaction(sub {  
 
43 
 
 
 
 
 
 
 
 
 
 
 
 
 
   my ($tx) = @_;  
 
44 
 
 
 
 
 
 
 
 
 
 
 
 
 
  })->commit  
 
45 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # This returns a Future, so if you want to wait for it to complete,  
 
46 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # call `->get` (throws an exception if something goes wrong)  
 
47 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # or `->await` (just waits for it to succeed or fail, but ignores  
 
48 
 
 
 
 
 
 
 
 
 
 
 
 
 
   # the result).  
 
49 
 
 
 
 
 
 
 
 
 
 
 
 
 
  ->get;  
 
50 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
51 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 DESCRIPTION  
 
52 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
53 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Database support for L. This is the base API, see L    
 
54 
 
 
 
 
 
 
 
 
 
 
 
 
 
 and subclasses for specific database functionality.  
 
55 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
56 
 
 
 
 
 
 
 
 
 
 
 
 
 
 B.   
 
57 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
58 
 
 
 
 
 
 
 
 
 
 
 
 
 
 L provides a basic API for interacting with a database, but this is   
 
59 
 
 
 
 
 
 
 
 
 
 
 
 
 
 very low level and uses a synchronous design. See L if you're   
 
60 
 
 
 
 
 
 
 
 
 
 
 
 
 
 familiar with L and want an interface that follows it more closely.   
 
61 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
62 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Typically a database only allows a single query to run at a time.  
 
63 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Other queries will be queued.  
 
64 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
65 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Set up a pool of connections to provide better parallelism:  
 
66 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
67 
 
 
 
 
 
 
 
 
 
 
 
 
 
     my $dbh = Database::Async->new(  
 
68 
 
 
 
 
 
 
 
 
 
 
 
 
 
         uri  => 'postgresql://write@maindb/dbname?sslmode=require',  
 
69 
 
 
 
 
 
 
 
 
 
 
 
 
 
         pool => {  
 
70 
 
 
 
 
 
 
 
 
 
 
 
 
 
             max => 4,  
 
71 
 
 
 
 
 
 
 
 
 
 
 
 
 
         },  
 
72 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
73 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
74 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Queries and transactions will then automatically be distributed  
 
75 
 
 
 
 
 
 
 
 
 
 
 
 
 
 among these connections. However, note that:  
 
76 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
77 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =over 4  
 
78 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
79 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * all queries within a transaction will be made on the same connection  
 
80 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
81 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * ordering guarantees are weaker: queries will be started in  
 
82 
 
 
 
 
 
 
 
 
 
 
 
 
 
 order on the next available connection  
 
83 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
84 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =back  
 
85 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
86 
 
 
 
 
 
 
 
 
 
 
 
 
 
 With a single connection, you could expect:  
 
87 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
88 
 
 
 
 
 
 
 
 
 
 
 
 
 
     Future->needs_all(  
 
89 
 
 
 
 
 
 
 
 
 
 
 
 
 
      $dbh->do(q{insert into x ...}),  
 
90 
 
 
 
 
 
 
 
 
 
 
 
 
 
      $dbh->do(q{select from x ...})  
 
91 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
92 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
93 
 
 
 
 
 
 
 
 
 
 
 
 
 
 to insert the rows first, then return them in the C call. B.    
 
94 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
95 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 Pool configuration  
 
96 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
97 
 
 
 
 
 
 
 
 
 
 
 
 
 
 The following parameters are currently accepted for defining the pool:  
 
98 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
99 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =over 4  
 
100 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
101 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - minimum number of total connections to maintain, defaults to 0   
 
102 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
103 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - maximum permitted active connections, default is 1   
 
104 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
105 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - how to iterate through the available URIs, options include   
 
106 
 
 
 
 
 
 
 
 
 
 
 
 
 
 C and C (default, round-robin behaviour).    
 
107 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
108 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - algorithm for managing connection timeouts or failures. The default   
 
109 
 
 
 
 
 
 
 
 
 
 
 
 
 
 is an exponential backoff with 10ms initial delay, 30s maximum, resetting on successful  
 
110 
 
 
 
 
 
 
 
 
 
 
 
 
 
 connection.  
 
111 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
112 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =back  
 
113 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
114 
 
 
 
 
 
 
 
 
 
 
 
 
 
 See L for more details.   
 
115 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
116 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 DBI  
 
117 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
118 
 
 
 
 
 
 
 
 
 
 
 
 
 
 The interface is not the same as L, but here are some approximate equivalents for   
 
119 
 
 
 
 
 
 
 
 
 
 
 
 
 
 common patterns:  
 
120 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
121 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head3 selectall_hashref  
 
122 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
123 
 
 
 
 
 
 
 
 
 
 
 
 
 
 In L:   
 
124 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
125 
 
 
 
 
 
 
 
 
 
 
 
 
 
  print $_->{id} . "\n" for  
 
126 
 
 
 
 
 
 
 
 
 
 
 
 
 
   $dbh->selectall_hashref(  
 
127 
 
 
 
 
 
 
 
 
 
 
 
 
 
    q{select * from something where id = ?},  
 
128 
 
 
 
 
 
 
 
 
 
 
 
 
 
    undef,  
 
129 
 
 
 
 
 
 
 
 
 
 
 
 
 
    $id  
 
130 
 
 
 
 
 
 
 
 
 
 
 
 
 
   )->@*;  
 
131 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
132 
 
 
 
 
 
 
 
 
 
 
 
 
 
 In L:   
 
133 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
134 
 
 
 
 
 
 
 
 
 
 
 
 
 
  print $_->{id} . "\n" for  
 
135 
 
 
 
 
 
 
 
 
 
 
 
 
 
   $db->query(  
 
136 
 
 
 
 
 
 
 
 
 
 
 
 
 
    q{select * from something where id = ?},  
 
137 
 
 
 
 
 
 
 
 
 
 
 
 
 
    bind => [  
 
138 
 
 
 
 
 
 
 
 
 
 
 
 
 
     $id  
 
139 
 
 
 
 
 
 
 
 
 
 
 
 
 
    ])->row_hashrefs  
 
140 
 
 
 
 
 
 
 
 
 
 
 
 
 
      ->as_arrayref  
 
141 
 
 
 
 
 
 
 
 
 
 
 
 
 
      ->@*  
 
142 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
143 
 
 
 
 
 
 
 
 
 
 
 
 
 
 In L:   
 
144 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
145 
 
 
 
 
 
 
 
 
 
 
 
 
 
  my $sth = $dbh->prepare(q{select * from something where id = ?});  
 
146 
 
 
 
 
 
 
 
 
 
 
 
 
 
  for my $id (1, 2, 3) {  
 
147 
 
 
 
 
 
 
 
 
 
 
 
 
 
   $sth->bind(0, $id, 'bigint');  
 
148 
 
 
 
 
 
 
 
 
 
 
 
 
 
   $sth->execute;  
 
149 
 
 
 
 
 
 
 
 
 
 
 
 
 
   while(my $row = $sth->fetchrow_hashref) {  
 
150 
 
 
 
 
 
 
 
 
 
 
 
 
 
    print $row->{name} . "\n";  
 
151 
 
 
 
 
 
 
 
 
 
 
 
 
 
   }  
 
152 
 
 
 
 
 
 
 
 
 
 
 
 
 
  }  
 
153 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
154 
 
 
 
 
 
 
 
 
 
 
 
 
 
 In L:   
 
155 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
156 
 
 
 
 
 
 
 
 
 
 
 
 
 
  my $sth = $db->prepare(q{select * from something where id = ?});  
 
157 
 
 
 
 
 
 
 
 
 
 
 
 
 
  (Future::Utils::fmap_void  {  
 
158 
 
 
 
 
 
 
 
 
 
 
 
 
 
   my ($id) = @_;  
 
159 
 
 
 
 
 
 
 
 
 
 
 
 
 
   $sth->bind(0, $id, 'bigint')  
 
160 
 
 
 
 
 
 
 
 
 
 
 
 
 
    ->then(sub { $sth->execute })  
 
161 
 
 
 
 
 
 
 
 
 
 
 
 
 
    ->then(sub {  
 
162 
 
 
 
 
 
 
 
 
 
 
 
 
 
     $sth->row_hashrefs  
 
163 
 
 
 
 
 
 
 
 
 
 
 
 
 
      ->each(sub {  
 
164 
 
 
 
 
 
 
 
 
 
 
 
 
 
       print $_->{name} . "\n";  
 
165 
 
 
 
 
 
 
 
 
 
 
 
 
 
      })->completed  
 
166 
 
 
 
 
 
 
 
 
 
 
 
 
 
    })  
 
167 
 
 
 
 
 
 
 
 
 
 
 
 
 
  } foreach => [1, 2, 3 ])->get;  
 
168 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
169 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
170 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
171 
 
2
 
 
 
 
 
  
2
   
 
 
 
9074
 
 use mro;  
 
  
 
2
 
 
 
 
 
 
 
 
 
4
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
19
 
    
 
172 
 
2
 
 
 
 
 
  
2
   
 
 
 
1036
 
 no indirect;  
 
  
 
2
 
 
 
 
 
 
 
 
 
2245
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
10
 
    
 
173 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
174 
 
2
 
 
 
 
 
  
2
   
 
 
 
613
 
 use Future::AsyncAwait;  
 
  
 
2
 
 
 
 
 
 
 
 
 
3235
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
12
 
    
 
175 
 
2
 
 
 
 
 
  
2
   
 
 
 
1141
 
 use Syntax::Keyword::Try;  
 
  
 
2
 
 
 
 
 
 
 
 
 
2121
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
12
 
    
 
176 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
177 
 
2
 
 
 
 
 
  
2
   
 
 
 
1296
 
 use URI;  
 
  
 
2
 
 
 
 
 
 
 
 
 
9998
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
80
 
    
 
178 
 
2
 
 
 
 
 
  
2
   
 
 
 
996
 
 use URI::db;  
 
  
 
2
 
 
 
 
 
 
 
 
 
24307
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
79
 
    
 
179 
 
2
 
 
 
 
 
  
2
   
 
 
 
1040
 
 use Module::Load ();  
 
  
 
2
 
 
 
 
 
 
 
 
 
2445
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
67
 
    
 
180 
 
2
 
 
 
 
 
  
2
   
 
 
 
16
 
 use Scalar::Util qw(blessed);  
 
  
 
2
 
 
 
 
 
 
 
 
 
6
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
117
 
    
 
181 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
182 
 
2
 
 
 
 
 
  
2
   
 
 
 
872
 
 use Database::Async::Engine;  
 
  
 
2
 
 
 
 
 
 
 
 
 
6
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
72
 
    
 
183 
 
2
 
 
 
 
 
  
2
   
 
 
 
871
 
 use Database::Async::Pool;  
 
  
 
2
 
 
 
 
 
 
 
 
 
8
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
83
 
    
 
184 
 
2
 
 
 
 
 
  
2
   
 
 
 
1007
 
 use Database::Async::Query;  
 
  
 
2
 
 
 
 
 
 
 
 
 
39
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
96
 
    
 
185 
 
2
 
 
 
 
 
  
2
   
 
 
 
16
 
 use Database::Async::StatementHandle;  
 
  
 
2
 
 
 
 
 
 
 
 
 
5
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
57
 
    
 
186 
 
2
 
 
 
 
 
  
2
   
 
 
 
1056
 
 use Database::Async::Transaction;  
 
  
 
2
 
 
 
 
 
 
 
 
 
6
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
81
 
    
 
187 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
188 
 
2
 
 
 
 
 
  
2
   
 
 
 
13
 
 use Log::Any qw($log);  
 
  
 
2
 
 
 
 
 
 
 
 
 
4
 
    
 
  
 
2
 
 
 
 
 
 
 
 
 
8
 
    
 
189 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
190 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 METHODS  
 
191 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
192 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
193 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
194 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 transaction  
 
195 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
196 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Resolves to a L which will yield a L    
 
197 
 
 
 
 
 
 
 
 
 
 
 
 
 
 instance once ready.  
 
198 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
199 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
200 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
201 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
0
 
 async sub transaction {  
 
202 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     my ($self, @args) = @_;  
 
203 
 
 
 
 
 
 
 
 
 
 
 
 
 
     Scalar::Util::weaken(  
 
204 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
         $self->{transactions}[@{$self->{transactions}}] =  
 
  
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
    
 
205 
 
 
 
 
 
 
 
 
 
 
 
 
 
             my $txn = Database::Async::Transaction->new(  
 
206 
 
 
 
 
 
 
 
 
 
 
 
 
 
                 database => $self,  
 
207 
 
 
 
 
 
 
 
 
 
 
 
 
 
                 @args  
 
208 
 
 
 
 
 
 
 
 
 
 
 
 
 
             )  
 
209 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
210 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     await $txn->begin;  
 
211 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     return $txn;  
 
212 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
213 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
214 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 txn  
 
215 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
216 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Executes code within a transaction. This is meant as a shorter form of  
 
217 
 
 
 
 
 
 
 
 
 
 
 
 
 
 the common idiom  
 
218 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
219 
 
 
 
 
 
 
 
 
 
 
 
 
 
  $db->transaction  
 
220 
 
 
 
 
 
 
 
 
 
 
 
 
 
     ->then(sub {  
 
221 
 
 
 
 
 
 
 
 
 
 
 
 
 
      my ($txn) = @_;  
 
222 
 
 
 
 
 
 
 
 
 
 
 
 
 
      Future->call($code)  
 
223 
 
 
 
 
 
 
 
 
 
 
 
 
 
       ->then(sub {  
 
224 
 
 
 
 
 
 
 
 
 
 
 
 
 
        $txn->commit  
 
225 
 
 
 
 
 
 
 
 
 
 
 
 
 
       })->on_fail(sub {  
 
226 
 
 
 
 
 
 
 
 
 
 
 
 
 
        $txn->rollback  
 
227 
 
 
 
 
 
 
 
 
 
 
 
 
 
       });  
 
228 
 
 
 
 
 
 
 
 
 
 
 
 
 
     })  
 
229 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
230 
 
 
 
 
 
 
 
 
 
 
 
 
 
 The code must return a L, and the transaction will only be committed   
 
231 
 
 
 
 
 
 
 
 
 
 
 
 
 
 if that L resolves cleanly.   
 
232 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
233 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Returns a L which resolves once the transaction is committed.   
 
234 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
235 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
236 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
237 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
0
 
 async sub txn {  
 
238 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     my ($self, $code, @args) = @_;  
 
239 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     my $txn = await $self->transaction;  
 
240 
 
 
 
 
 
 
 
 
 
 
 
 
 
     try {  
 
241 
 
 
 
 
 
 
 
 
 
 
 
 
 
         my @data = await Future->call(  
 
242 
 
 
 
 
 
 
 
 
 
 
 
 
 
             $code => ($txn, @args)  
 
243 
 
 
 
 
 
 
 
 
 
 
 
 
 
         );  
 
244 
 
 
 
 
 
 
 
 
 
 
 
 
 
         await $txn->commit;  
 
245 
 
 
 
 
 
 
 
 
 
 
 
 
 
         return @data;  
 
246 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
     } catch {  
 
247 
 
 
 
 
 
 
 
 
 
 
 
 
 
         my $exception = $@;  
 
248 
 
 
 
 
 
 
 
 
 
 
 
 
 
         try {  
 
249 
 
 
 
 
 
 
 
 
 
 
 
 
 
             await $txn->rollback;  
 
250 
 
 
 
 
 
 
 
 
 
 
 
 
 
         } catch {  
 
251 
 
 
 
 
 
 
 
 
 
 
 
 
 
             $log->warnf("exception %s in rollback", $@);  
 
252 
 
 
 
 
 
 
 
 
 
 
 
 
 
         }  
 
253 
 
 
 
 
 
 
 
 
 
 
 
 
 
         die $exception;  
 
254 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
255 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
256 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
257 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 METHODS - Internal  
 
258 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
259 
 
 
 
 
 
 
 
 
 
 
 
 
 
 You're welcome to call these, but they're mostly intended  
 
260 
 
 
 
 
 
 
 
 
 
 
 
 
 
 for internal usage, and the API B change in future versions.   
 
261 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
262 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
263 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
264 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 uri  
 
265 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
266 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Returns the configured L for populating database instances.   
 
267 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
268 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
269 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
270 
 
3
 
 
 
 
 
  
3
   
 
  
1
   
 
96
 
 sub uri { shift->{uri} }  
 
271 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
272 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 pool  
 
273 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
274 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Returns the L instance.   
 
275 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
276 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
277 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
278 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub pool {  
 
279 
 
16
 
 
 
 
 
  
16
   
 
  
1
   
 
2085
 
     my ($self) = @_;  
 
280 
 
16
 
 
 
  
 66
   
 
 
 
 
 
114
 
     $self->{pool} //= Database::Async::Pool->new(  
 
281 
 
 
 
 
 
 
 
 
 
 
 
 
 
         $self->pool_args  
 
282 
 
 
 
 
 
 
 
 
 
 
 
 
 
     )  
 
283 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
284 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
285 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 pool_args  
 
286 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
287 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Returns a list of standard pool constructor arguments.  
 
288 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
289 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
290 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
291 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub pool_args {  
 
292 
 
3
 
 
 
 
 
  
3
   
 
  
1
   
 
10
 
     my ($self) = @_;  
 
293 
 
 
 
 
 
 
 
 
 
 
 
 
 
     return (  
 
294 
 
3
 
 
 
 
 
 
 
 
 
35
 
         request_engine => $self->curry::weak::request_engine,  
 
295 
 
 
 
 
 
 
 
 
 
 
 
 
 
         uri            => $self->uri,  
 
296 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
297 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
298 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
299 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 configure  
 
300 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
301 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Applies configuration, see L for details.   
 
302 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
303 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Supports the following named parameters:  
 
304 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
305 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =over 4  
 
306 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
307 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - the endpoint to use when connecting a new engine instance   
 
308 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
309 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - the parameters to pass when instantiating a new L    
 
310 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
311 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - parameters for setting up the pool, or a L instance    
 
312 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
313 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =item * C - default encoding to apply to parameters, queries and results, defaults to C    
 
314 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
315 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =back  
 
316 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
317 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
318 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
319 
 
 
 
 
 
 
 
 
 
 
 
 
 
 my %encoding_map = (  
 
320 
 
 
 
 
 
 
 
 
 
 
 
 
 
     'utf8'    => 'UTF-8',  
 
321 
 
 
 
 
 
 
 
 
 
 
 
 
 
     'utf-8'   => 'UTF-8',  
 
322 
 
 
 
 
 
 
 
 
 
 
 
 
 
     'UTF8'    => 'UTF-8',  
 
323 
 
 
 
 
 
 
 
 
 
 
 
 
 
     'unicode' => 'UTF-8',  
 
324 
 
 
 
 
 
 
 
 
 
 
 
 
 
 );  
 
325 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
326 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub configure {  
 
327 
 
5
 
 
 
 
 
  
5
   
 
  
1
   
 
6627
 
     my ($self, %args) = @_;  
 
328 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
329 
 
5
 
  
 50
   
 
 
 
 
 
 
 
21
 
     if(my $encoding = delete $args{encoding}) {  
 
330 
 
  
0
   
 
 
 
  
  0
   
 
 
 
 
 
0
 
         $self->{encoding} = $encoding_map{$encoding} // $encoding;  
 
331 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
332 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
333 
 
5
 
  
 50
   
 
 
 
 
 
 
 
14
 
     if(my $uri = delete $args{uri}) {  
 
334 
 
 
 
 
 
 
 
 
 
 
 
 
 
         # This could be any type of object. We make  
 
335 
 
 
 
 
 
 
 
 
 
 
 
 
 
         # the assumption here that it safely serialises  
 
336 
 
 
 
 
 
 
 
 
 
 
 
 
 
         # to a standard URI. Some of the database  
 
337 
 
 
 
 
 
 
 
 
 
 
 
 
 
         # engines provide such a standard (e.g. PostgreSQL).  
 
338 
 
 
 
 
 
 
 
 
 
 
 
 
 
         # Others may not...  
 
339 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
         $self->{uri} = URI->new("$uri");  
 
340 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
341 
 
5
 
  
 50
   
 
 
 
 
 
 
 
15
 
     if(exists $args{engine}) {  
 
342 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
         $self->{engine_parameters} = delete $args{engine};  
 
343 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
344 
 
5
 
  
 50
   
 
 
 
 
 
 
 
12
 
     if(exists $args{type}) {  
 
345 
 
  
0
   
 
 
 
 
 
 
 
 
 
0
 
         $self->{type} = delete $args{type};  
 
346 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
347 
 
5
 
  
100
   
 
 
 
 
 
 
 
16
 
     if(my $pool = delete $args{pool}) {  
 
348 
 
3
 
  
100
   
 
 
 
 
 
 
 
15
 
         if(blessed $pool) {  
 
349 
 
1
 
 
 
 
 
 
 
 
 
3
 
             $self->{pool} = $pool;  
 
350 
 
 
 
 
 
 
 
 
 
 
 
 
 
         } else {  
 
351 
 
2
 
 
 
 
 
 
 
 
 
8
 
             $self->{pool} = Database::Async::Pool->new(  
 
352 
 
 
 
 
 
 
 
 
 
 
 
 
 
                 $self->pool_args,  
 
353 
 
 
 
 
 
 
 
 
 
 
 
 
 
                 %$pool,  
 
354 
 
 
 
 
 
 
 
 
 
 
 
 
 
             );  
 
355 
 
 
 
 
 
 
 
 
 
 
 
 
 
         }  
 
356 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
357 
 
5
 
 
 
 
 
 
 
 
 
24
 
     $self->next::method(%args);  
 
358 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
359 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
360 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 sub encoding { shift->{encoding} }  
 
361 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
362 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 ryu  
 
363 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
364 
 
 
 
 
 
 
 
 
 
 
 
 
 
 A L instance, used for requesting sources, sinks and timers.   
 
365 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
366 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
367 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
368 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub ryu {  
 
369 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
     my ($self) = @_;  
 
370 
 
  
0
   
 
 
 
  
  0
   
 
 
 
 
 
 
 
     $self->{ryu} //= do {  
 
371 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
         $self->add_child(  
 
372 
 
 
 
 
 
 
 
 
 
 
 
 
 
             my $ryu = Ryu::Async->new  
 
373 
 
 
 
 
 
 
 
 
 
 
 
 
 
         );  
 
374 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
         $ryu  
 
375 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
376 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
377 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
378 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 new_source  
 
379 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
380 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Instantiates a new L.   
 
381 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
382 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
383 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
384 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 sub new_source { shift->ryu->source }  
 
385 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
386 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 new_sink  
 
387 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
388 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Instantiates a new L.   
 
389 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
390 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
391 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
392 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 sub new_sink { shift->ryu->sink }  
 
393 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
394 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 new_future  
 
395 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
396 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Instantiates a new L.   
 
397 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
398 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
399 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
400 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 sub new_future { shift->loop->new_future }  
 
401 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
402 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head1 METHODS - Internal, engine-related  
 
403 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
404 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
405 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
406 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 request_engine  
 
407 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
408 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Attempts to instantiate and connect to a new L   
 
409 
 
 
 
 
 
 
 
 
 
 
 
 
 
 subclass. Returns a L which should resolve to a new   
 
410 
 
 
 
 
 
 
 
 
 
 
 
 
 
 L instance when ready to use.   
 
411 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
412 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
413 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
414 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 async sub request_engine {  
 
415 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     my ($self) = @_;  
 
416 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Requesting new engine');  
 
417 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     my $engine = $self->engine_instance;  
 
418 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Connecting');  
 
419 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     return await $engine->connect;  
 
420 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
421 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
422 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 engine_instance  
 
423 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
424 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Loads the appropriate engine class and attaches to the loop.  
 
425 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
426 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
427 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
428 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub engine_instance {  
 
429 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
     my ($self) = @_;  
 
430 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     my $uri = $self->uri;  
 
431 
 
  
0
   
 
 
 
  
  0
   
 
 
 
 
 
 
 
     my $type = $self->{type} // $uri->scheme;  
 
432 
 
 
 
 
 
 
 
 
 
 
 
 
 
     die 'unknown database type ' . $type  
 
433 
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
         unless my $engine_class = $Database::Async::Engine::ENGINE_MAP{$type};  
 
434 
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
     Module::Load::load($engine_class) unless $engine_class->can('new');  
 
435 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Instantiating new %s', $engine_class);  
 
436 
 
 
 
 
 
 
 
 
 
 
 
 
 
     my %param = (  
 
437 
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
         %{$self->{engine_parameters} || {}},  
 
  
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
    
 
438 
 
 
 
 
 
 
 
 
 
 
 
 
 
         (defined($uri) ? (uri => $uri) : ()),  
 
439 
 
 
 
 
 
 
 
 
 
 
 
 
 
         db => $self,  
 
440 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
441 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
442 
 
 
 
 
 
 
 
 
 
 
 
 
 
     # Only recent engine versions support this parameter  
 
443 
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
     if(my $encoding = $self->encoding) {  
 
444 
 
  
0
   
 
  
  0
   
 
 
 
 
 
 
 
 
 
         if($engine_class->can('encoding')) {  
 
445 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
             $param{encoding} = $self->encoding;  
 
446 
 
 
 
 
 
 
 
 
 
 
 
 
 
         } else {  
 
447 
 
 
 
 
 
 
 
 
 
 
 
 
 
             # If we're given this parameter, let's not ignore it silently  
 
448 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
             die 'Database engine ' . $engine_class . ' does not support encoding parameter, try upgrading that module from CPAN or remove the encoding configuration in Database::Async';  
 
449 
 
 
 
 
 
 
 
 
 
 
 
 
 
         }  
 
450 
 
 
 
 
 
 
 
 
 
 
 
 
 
     }  
 
451 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
452 
 
 
 
 
 
 
 
 
 
 
 
 
 
     $self->add_child(  
 
453 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
         my $engine = $engine_class->new(%param)  
 
454 
 
 
 
 
 
 
 
 
 
 
 
 
 
     );  
 
455 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $engine;  
 
456 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
457 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
458 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 engine_ready  
 
459 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
460 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Called by L instances when the engine is   
 
461 
 
 
 
 
 
 
 
 
 
 
 
 
 
 ready for queries.  
 
462 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
463 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
464 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
465 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub engine_ready {  
 
466 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
     my ($self, $engine) = @_;  
 
467 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $self->pool->queue_ready_engine($engine);  
 
468 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
469 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
470 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub engine_disconnected {  
 
471 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
0
   
 
 
 
     my ($self, $engine) = @_;  
 
472 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $self->pool->unregister_engine($engine);  
 
473 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
474 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
475 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
0
   
 
 
 
 sub db { shift }  
 
476 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
477 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =head2 queue_query  
 
478 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
479 
 
 
 
 
 
 
 
 
 
 
 
 
 
 Assign the given query to the next available engine instance.  
 
480 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
481 
 
 
 
 
 
 
 
 
 
 
 
 
 
 =cut  
 
482 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
483 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
1
   
 
 
 
 async sub queue_query {  
 
484 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     my ($self, $query) = @_;  
 
485 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Queuing query %s', $query);  
 
486 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     my $engine = await $self->pool->next_engine;  
 
487 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Query %s about to run on %s', $query, $engine);  
 
488 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     return await $engine->handle_query($query);  
 
489 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
490 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
491 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub diagnostics {  
 
492 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
0
   
 
 
 
     my ($self) = @_;  
 
493 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
494 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
495 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub notification {  
 
496 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
0
   
 
 
 
     my ($self, $engine, $channel, $data) = @_;  
 
497 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $log->tracef('Database notifies us via %s of %s', $channel, $data);  
 
498 
 
  
0
   
 
 
 
 
 
 
 
 
 
 
 
     $self->notification_source($channel)->emit($data);  
 
499 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
500 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
501 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sub notification_source {  
 
502 
 
  
0
   
 
 
 
 
 
  
0
   
 
  
0
   
 
 
 
     my ($self, $name) = @_;  
 
503 
 
0
 
 
 
  
  0
   
 
 
 
 
 
 
 
     $self->{notification_source}{$name} //= $self->new_source;  
 
504 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }  
 
505 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
506 
 
 
 
 
 
 
 
 
 
 
 
 
 
 1;  
 
507 
 
 
 
 
 
 
 
 
 
 
 
 
 
    
 
508 
 
 
 
 
 
 
 
 
 
 
 
 
 
 __END__