File Coverage

blib/lib/AnyEvent/Pg/Pool/Multiserver.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package AnyEvent::Pg::Pool::Multiserver;
2              
3             our $VERSION = '0.4';
4              
5 1     1   408 use strict;
  1         1  
  1         26  
6 1     1   4 use warnings;
  1         0  
  1         19  
7 1     1   3 use utf8;
  1         6  
  1         4  
8 1     1   17 use v5.10;
  1         2  
  1         28  
9              
10 1     1   3 use Carp qw( croak carp );
  1         1  
  1         44  
11 1     1   784 use AnyEvent;
  1         3896  
  1         37  
12 1     1   241 use AnyEvent::Pg::Pool;
  0            
  0            
13             use Future;
14             use Params::Validate qw( validate_with );
15              
16             use fields qw(
17             pool
18             local
19             );
20              
21             use Class::XSAccessor {
22             getters => {
23             local => 'local'
24             },
25             };
26              
27             sub new {
28             my $class = shift;
29             my $params = {@_};
30              
31             my $self = fields::new( $class );
32              
33             $params = $self->_validate_new( $params );
34              
35             my $pool = {};
36              
37             foreach my $server ( @{ $params->{servers} } ) {
38             my $dbh = AnyEvent::Pg::Pool->new(
39             $server->{conn},
40             connection_retries => 10,
41             connection_delay => 1,
42             size => 4,
43             on_error => sub { carp 'Some error'; },
44             on_transient_error => sub { carp 'Transient error'; },
45             on_connect_error => sub { carp 'Connection error'; },
46             );
47              
48             croak 'server_id must be unique' if $pool->{ $server->{id} };
49             $pool->{ $server->{id} } = { dbh => $dbh, name => $server->{name}, id => $server->{id} };
50             }
51              
52             $self->{pool} = $pool;
53             $self->{local} = $params->{local};
54              
55             return $self;
56             }
57              
58             sub _validate_new {
59             my __PACKAGE__ $self = shift;
60             my $params = shift;
61              
62             $params = validate_with(
63             params => $params,
64             spec => {
65             servers => 1,
66             local => 1,
67             },
68             );
69              
70             return $params;
71             }
72              
73             sub selectall_arrayref {
74             my __PACKAGE__ $self = shift;
75             my $params = {@_};
76              
77             $params = $self->_validate_selectall_arrayref( $params );
78              
79             my @futures = ();
80              
81             my @pool = ();
82             if ( defined $params->{server_id} ) {
83             push @pool, $params->{server_id};
84             }
85             else {
86             @pool = keys %{ $self->{pool} };
87             }
88              
89             foreach my $server_id ( @pool ) {
90             my $server = $self->{pool}{ $server_id };
91              
92             push @futures, $self->_get_future_push_query(
93             query => $params->{query},
94             args => $params->{args},
95             server => $server,
96             cb_server => $params->{cb_server},
97             type => 'selectall_arrayref_slice',
98             );
99             }
100              
101             my $main_future = Future->wait_all( @futures );
102             $main_future->on_done( sub {
103             my @results = ();
104             my @errors = ();
105              
106             foreach my $future ( @futures ) {
107             my ( $server, $result, $error ) = $future->get();
108              
109             if ( !$error ) {
110             push @results, @$result;
111             }
112             else {
113             push @errors, {
114             server_name => $server->{name},
115             server_id => $server->{id},
116             error => $error,
117             };
118             }
119             }
120              
121             $params->{cb}->(
122             scalar( @results ) ? [ @results ] : undef,
123             scalar( @errors ) ? [ @errors ] : undef,
124             );
125              
126             undef $main_future;
127             } );
128              
129             return;
130             }
131              
132             sub _validate_selectall_arrayref {
133             my __PACKAGE__ $self = shift;
134             my $params = shift;
135              
136             $params = validate_with(
137             params => $params,
138             spec => {
139             query => 1,
140             args => 0,
141             cb => 1,
142             server_id => 0,
143             cb_server => 0,
144             },
145             );
146              
147             return $params;
148             }
149              
150             sub _get_future_push_query {
151             my __PACKAGE__ $self = shift;
152             my $params = {@_};
153              
154             my $future = Future->new();
155              
156             my $watcher;
157              
158             $watcher = $params->{server}{dbh}->push_query(
159             query => $params->{query},
160             args => $params->{args},
161             on_error => sub {
162             carp shift;
163             $future->done( $params->{server}, undef, 'Push error' );
164             undef $watcher;
165             },
166             on_result => sub {
167             my $p = shift;
168             my $w = shift;
169             my $res = shift;
170              
171             if ( $res->errorMessage ) {
172             carp $res->errorMessage;
173             $future->done( $params->{server}, undef, $res->errorMessage );
174             undef $watcher;
175             return;
176             }
177              
178             my $result;
179              
180             if ( $params->{type} eq 'selectall_arrayref_slice' ) {
181             $result = _fetchall_arrayref_slice( $params->{server}{id}, $res );
182             }
183             elsif ( $params->{type} eq 'selectrow_hashref' ) {
184             $result = _fetchrow_hashref( $params->{server}{id}, $res );
185             }
186             elsif ( $params->{type} eq 'selectrow_array' ) {
187             $result = _fetchrow_array( $params->{server}{id}, $res );
188             }
189             elsif ( $params->{type} eq 'do' ) {
190             $result = _fetch_do( $params->{server}{id}, $res );
191             }
192              
193             my $cb = sub {
194             $future->done( $params->{server}, $result );
195             undef $watcher;
196             };
197              
198             if ( $params->{cb_server} ) {
199             $params->{cb_server}->( result => $result, cb => $cb );
200             }
201             else {
202             $cb->();
203             }
204             },
205             );
206              
207             return $future;
208             }
209              
210             sub _fetchall_arrayref_slice {
211             my $id = shift;
212             my $res = shift;
213              
214             my $result = [];
215              
216             if ( $res->nRows ) {
217             foreach my $row ( $res->rowsAsHashes ) {
218             $row->{_server_id} = $id;
219             push @$result, $row;
220             }
221             }
222              
223             return $result;
224             }
225              
226             sub _fetchrow_hashref {
227             my $id = shift;
228             my $res = shift;
229              
230             my $result;
231              
232             if ( $res->nRows ) {
233             $result = $res->rowAsHash(0);
234             $result->{_server_id} = $id;
235             }
236              
237             return $result;
238             }
239              
240             sub _fetchrow_array {
241             my $id = shift;
242             my $res = shift;
243              
244             my $result;
245              
246             if ( $res->nRows ) {
247             $result = [ $id, $res->row(0) ];
248             }
249              
250             return $result;
251             }
252              
253             sub _fetch_do {
254             my $id = shift;
255             my $res = shift;
256              
257             my $result;
258              
259             if ( $res->cmdRows ) {
260             $result = [ $id, $res->cmdRows ];
261             }
262              
263             return $result;
264             }
265              
266             sub selectrow_hashref {
267             my __PACKAGE__ $self = shift;
268             my $params = {@_};
269              
270             $params = $self->_validate_selectrow_hashref( $params );
271              
272             my $future = $self->_get_future_push_query(
273             query => $params->{query},
274             args => $params->{args},
275             server => $self->{pool}{ $params->{server_id} },
276             cb_server => $params->{cb_server},
277             type => 'selectrow_hashref',
278             );
279              
280             $future->on_done( sub {
281             my ( $server, $result, $error ) = $future->get();
282              
283             if ( !$error ) {
284             $params->{cb}->( $result, undef );
285             }
286             else {
287             $params->{cb}->( undef, {
288             server_name => $server->{name},
289             server_id => $server->{id},
290             error => $error,
291             } );
292             }
293              
294             undef $future;
295             } );
296              
297             return;
298             }
299              
300             sub _validate_selectrow_hashref {
301             my __PACKAGE__ $self = shift;
302             my $params = shift;
303              
304             $params = validate_with(
305             params => $params,
306             spec => {
307             query => 1,
308             args => 0,
309             cb => 1,
310             server_id => 1,
311             cb_server => 0,
312             },
313             );
314              
315             return $params;
316             }
317              
318             sub selectrow_array {
319             my __PACKAGE__ $self = shift;
320             my $params = {@_};
321              
322             $params = $self->_validate_selectrow_array( $params );
323              
324             my $future = $self->_get_future_push_query(
325             query => $params->{query},
326             args => $params->{args},
327             server => $self->{pool}{ $params->{server_id} },
328             cb_server => $params->{cb_server},
329             type => 'selectrow_array',
330             );
331              
332             $future->on_done( sub {
333             my ( $server, $result, $error ) = $future->get();
334              
335             if ( !$error ) {
336             $params->{cb}->( $result, undef );
337             }
338             else {
339             $params->{cb}->( undef, {
340             server_name => $server->{name},
341             server_id => $server->{id},
342             error => $error,
343             } );
344             }
345              
346             undef $future;
347             } );
348              
349             return;
350             }
351              
352             sub _validate_selectrow_array {
353             my __PACKAGE__ $self = shift;
354             my $params = shift;
355              
356             $params = validate_with(
357             params => $params,
358             spec => {
359             query => 1,
360             args => 0,
361             cb => 1,
362             server_id => 1,
363             cb_server => 0,
364             },
365             );
366              
367             return $params;
368             }
369              
370             sub do {
371             my __PACKAGE__ $self = shift;
372             my $params = {@_};
373              
374             $params = $self->_validate_do( $params );
375              
376             my $future = $self->_get_future_push_query(
377             query => $params->{query},
378             args => $params->{args},
379             server => $self->{pool}{ $params->{server_id} },
380             cb_server => $params->{cb_server},
381             type => 'do',
382             );
383              
384             $future->on_done( sub {
385             my ( $server, $result, $error ) = $future->get();
386              
387             if ( !$error ) {
388             $params->{cb}->( $result, undef );
389             }
390             else {
391             $params->{cb}->( undef, {
392             server_name => $server->{name},
393             server_id => $server->{id},
394             error => $error,
395             } );
396             }
397              
398             undef $future;
399             } );
400              
401             return;
402             }
403              
404             sub _validate_do {
405             my __PACKAGE__ $self = shift;
406             my $params = shift;
407              
408             $params = validate_with(
409             params => $params,
410             spec => {
411             query => 1,
412             args => 0,
413             cb => 1,
414             server_id => 1,
415             cb_server => 0,
416             },
417             );
418              
419             return $params;
420             }
421              
422             1;
423              
424             =head1 NAME
425              
426             AnyEvent::Pg::Pool::Multiserver - Asyncronious multiserver requests to Postgresql with AnyEvent::Pg
427              
428             =head1 SYNOPSIS
429              
430             my $servers = [
431             {
432             id => 1,
433             name => 'remote 1',
434             conn => 'host=remote1 port=5432 dbname=mydb user=myuser password=mypass',
435             },
436             {
437             id => 2,
438             name => 'remote 2',
439             conn => 'host=remote2 port=5432 dbname=mydb user=myuser password=mypass',
440             },
441             ];
442             my $pool = AnyEvent::Pg::Pool::Multiserver->new( servers => $servers, local => 1 );
443              
444             # multi-server request
445              
446             $pool->selectall_arrayref(
447             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
448             args => [ 1 ],
449             cb => sub {
450             my $results = shift;
451             my $errors = shift;
452              
453             if ( $errors ) {
454             foreach my $srv ( @$errors ) {
455             say "err $srv->{error} with $srv->{server_name} $srv->{server_id}";
456             }
457             }
458              
459             if ( $results ) {
460             foreach my $val ( @$results ) {
461             say "server_id=$val->{_server_id} value=$val->{val}";
462             }
463             }
464             },
465             );
466              
467             # single-server request
468              
469             $pool->selectall_arrayref(
470             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
471             args => [ 1 ],
472             server_id => 1,
473             cb => sub { ... },
474             );
475              
476             # multi-server request with sub-callbacks to some data manipulation
477             # and may be to make another request to current server
478              
479             # main request | server_1 select -> ... select end -> cb_server call -> subrequests to current server | wait both | global callback
480             # | server_2 select -> ... select end -> cb_server call -> subrequests to current server | subrequests |
481              
482             $pool->selectall_arrayref(
483             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
484             args => [ 1 ],
485             cb => sub { ... },
486             cb_server => sub {
487             my $params = { @_ };
488              
489             my $result_of_main_request = $params->{result};
490              
491             # Now we can do some sub-request to current server
492              
493             # And MUST call cb
494             $params->{cb}->();
495             },
496             );
497              
498             # single-server request to select row in arrayref
499              
500             $pool->selectrow_array(
501             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
502             args => [ 1 ],
503             server_id => 1,
504             cb => sub {
505             my $result = shift;
506             my $error = shift;
507              
508             if ( $error ) {
509             say "err $error->{error} with $error->{server_name} $error->{server_id}";
510             }
511              
512             if ( $result ) {
513             say "server_id=$result->[ 0 ] value=$result->[ 1 ]";
514             }
515             },
516             );
517              
518             # single-server request to select row in hashref
519              
520             $pool->selectrow_hashref(
521             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
522             args => [ 1 ],
523             server_id => 1,
524             cb => sub {
525             my $result = shift;
526             my $error = shift;
527              
528             if ( $error ) {
529             say "err $error->{error} with $error->{server_name} $error->{server_id}";
530             }
531              
532             if ( $result ) {
533             say "server_id=$result->{_server_id} value=$result->{val}";
534             }
535             },
536             );
537              
538             # single-server request to do something
539              
540             $pool->do(
541             query => 'UPDATE table SET column = 1 WHERE id = $1;',
542             args => [ 1 ],
543             server_id => 1,
544             cb => sub {
545             my $result = shift;
546             my $error = shift;
547              
548             if ( $error ) {
549             say "err $error->{error} with $error->{server_name} $error->{server_id}";
550             }
551              
552             if ( $result ) {
553             say "server_id=$result->[ 0 ] updated=$result->[ 1 ]";
554             }
555             },
556             );
557              
558             # local-server request to do something
559              
560             $pool->do(
561             query => 'UPDATE table SET column = 1 WHERE id = $1;',
562             args => [ 1 ],
563             server_id => $pool->local(),
564             cb => sub { ... },
565             );
566              
567             =head1 DESCRIPTION
568              
569             =head2 selectall_arrayref
570              
571             query and args are the same, that in AnyEvent::Pg
572              
573             Required: query, cb
574             Optional: args, cb_server, server_id
575              
576             =head2 selectrow_array
577              
578             query and args are the same, that in AnyEvent::Pg
579              
580             Required: query, server_id, cb
581             Optional: args, cb_server
582              
583             =head2 selectrow_hashref
584              
585             query and args are the same, that in AnyEvent::Pg
586              
587             Required: query, server_id, cb
588             Optional: args, cb_server
589              
590             =head2 do
591              
592             query and args are the same, that in AnyEvent::Pg
593              
594             Required: query, server_id, cb
595             Optional: args, cb_server
596              
597             =head1 SOURCE AVAILABILITY
598              
599             The source code for this module is available
600             at L
601              
602             =head1 AUTHOR
603              
604             Andrey Kuzmin, Ekak-tus@mail.ruE
605              
606             =head1 COPYRIGHT AND LICENSE
607              
608             Copyright (C) 2014 by Andrey Kuzmin
609              
610             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
611              
612             =cut