File Coverage

blib/lib/AnyEvent/Pg/Pool/Multiserver.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package AnyEvent::Pg::Pool::Multiserver;
2              
3             our $VERSION = '0.2';
4              
5 1     1   796 use strict;
  1         2  
  1         42  
6 1     1   5 use warnings;
  1         2  
  1         35  
7 1     1   5 use utf8;
  1         10  
  1         8  
8 1     1   32 use v5.10;
  1         3  
  1         50  
9              
10 1     1   6 use Carp qw( croak carp );
  1         2  
  1         70  
11 1     1   1505 use AnyEvent;
  1         9956  
  1         48  
12 1     1   433 use AnyEvent::Pg::Pool;
  0            
  0            
13             use Future;
14             use Params::Validate qw( validate_with );
15              
16             use fields qw(
17             pool
18             local
19             );
20              
21             use Class::XSAccessor {
22             getters => {
23             local => 'local'
24             },
25             };
26              
27             sub new {
28             my $class = shift;
29             my $params = {@_};
30              
31             my $self = fields::new( $class );
32              
33             $params = $self->_validate_new( $params );
34              
35             my $pool = {};
36              
37             foreach my $server ( @{ $params->{servers} } ) {
38             my $dbh = AnyEvent::Pg::Pool->new(
39             $server->{conn},
40             connection_retries => 10,
41             connection_delay => 1,
42             size => 4,
43             on_error => sub { carp 'Some error'; },
44             on_transient_error => sub { carp 'Transient error'; },
45             on_connect_error => sub { carp 'Connection error'; },
46             );
47              
48             croak 'server_id must be unique' if $pool->{ $server->{id} };
49             $pool->{ $server->{id} } = { dbh => $dbh, name => $server->{name}, id => $server->{id} };
50             }
51              
52             $self->{pool} = $pool;
53             $self->{local} = $params->{local};
54              
55             return $self;
56             }
57              
58             sub _validate_new {
59             my __PACKAGE__ $self = shift;
60             my $params = shift;
61              
62             $params = validate_with(
63             params => $params,
64             spec => {
65             servers => 1,
66             local => 1,
67             },
68             );
69              
70             return $params;
71             }
72              
73             sub selectall_arrayref {
74             my __PACKAGE__ $self = shift;
75             my $params = {@_};
76              
77             $params = $self->_validate_selectall_arrayref( $params );
78              
79             my @futures = ();
80              
81             my @pool = ();
82             if ( defined $params->{server_id} ) {
83             push @pool, $params->{server_id};
84             }
85             else {
86             @pool = keys %{ $self->{pool} };
87             }
88              
89             foreach my $server_id ( @pool ) {
90             my $server = $self->{pool}{ $server_id };
91              
92             push @futures, $self->_get_future_push_query(
93             query => $params->{query},
94             args => $params->{args},
95             server => $server,
96             cb_server => $params->{cb_server},
97             type => 'selectall_arrayref_slice',
98             );
99             }
100              
101             my $main_future = Future->wait_all( @futures );
102             $main_future->on_done( sub {
103             my @results = ();
104             my @errors = ();
105              
106             foreach my $future ( @futures ) {
107             my ( $server, $result, $error ) = $future->get();
108              
109             if ( !$error ) {
110             push @results, @$result;
111             }
112             else {
113             push @errors, {
114             server_name => $server->{name},
115             server_id => $server->{id},
116             error => $error,
117             };
118             }
119             }
120              
121             $params->{cb}->(
122             scalar( @results ) ? [ @results ] : undef,
123             scalar( @errors ) ? [ @errors ] : undef,
124             );
125              
126             undef $main_future;
127             } );
128              
129             return;
130             }
131              
132             sub _validate_selectall_arrayref {
133             my __PACKAGE__ $self = shift;
134             my $params = shift;
135              
136             $params = validate_with(
137             params => $params,
138             spec => {
139             query => 1,
140             args => 0,
141             cb => 1,
142             server_id => 0,
143             cb_server => 0,
144             },
145             );
146              
147             return $params;
148             }
149              
150             sub _get_future_push_query {
151             my __PACKAGE__ $self = shift;
152             my $params = {@_};
153              
154             my $future = Future->new();
155              
156             my $watcher;
157              
158             $watcher = $params->{server}{dbh}->push_query(
159             query => $params->{query},
160             args => $params->{args},
161             on_error => sub {
162             carp shift;
163             $future->done( $params->{server}, undef, 'Push error' );
164             undef $watcher;
165             },
166             on_result => sub {
167             my $p = shift;
168             my $w = shift;
169             my $res = shift;
170              
171             if ( $res->errorMessage ) {
172             carp $res->errorMessage;
173             $future->done( $params->{server}, undef, $res->errorMessage );
174             undef $watcher;
175             return;
176             }
177              
178             my $result;
179              
180             if ( $params->{type} eq 'selectall_arrayref_slice' ) {
181             $result = _fetchall_arrayref_slice( $params->{server}{id}, $res );
182             }
183             elsif ( $params->{type} eq 'selectrow_hashref' ) {
184             $result = _fetchrow_hashref( $params->{server}{id}, $res );
185             }
186             elsif ( $params->{type} eq 'selectrow_array' ) {
187             $result = _fetchrow_array( $params->{server}{id}, $res );
188             }
189             elsif ( $params->{type} eq 'do' ) {
190             $result = _fetch_do( $params->{server}{id}, $res );
191             }
192              
193             my $cb = sub {
194             $future->done( $params->{server}, $result );
195             undef $watcher;
196             };
197              
198             if ( $params->{cb_server} ) {
199             $params->{cb_server}->( result => $result, cb => $cb );
200             }
201             else {
202             $cb->();
203             }
204             },
205             );
206              
207             return $future;
208             }
209              
210             sub _fetchall_arrayref_slice {
211             my $id = shift;
212             my $res = shift;
213              
214             my $result = [];
215              
216             if ( $res->nRows ) {
217             foreach my $row ( $res->rowsAsHashes ) {
218             $row->{_server_id} = $id;
219             push @$result, $row;
220             }
221             }
222              
223             return $result;
224             }
225              
226             sub _fetchrow_hashref {
227             my $id = shift;
228             my $res = shift;
229              
230             my $result;
231              
232             if ( $res->nRows ) {
233             $result = $res->rowAsHash(0);
234             $result->{_server_id} = $id;
235             }
236              
237             return $result;
238             }
239              
240             sub _fetchrow_array {
241             my $id = shift;
242             my $res = shift;
243              
244             my $result;
245              
246             if ( $res->nRows ) {
247             $result = [ $id, $res->row(0) ];
248             }
249              
250             return $result;
251             }
252              
253             sub _fetch_do {
254             my $id = shift;
255             my $res = shift;
256              
257             # TODO return real result
258              
259             return [ $id, 1 ];
260             }
261              
262             sub selectrow_hashref {
263             my __PACKAGE__ $self = shift;
264             my $params = {@_};
265              
266             $params = $self->_validate_selectrow_hashref( $params );
267              
268             my $future = $self->_get_future_push_query(
269             query => $params->{query},
270             args => $params->{args},
271             server => $self->{pool}{ $params->{server_id} },
272             cb_server => $params->{cb_server},
273             type => 'selectrow_hashref',
274             );
275              
276             $future->on_done( sub {
277             my ( $server, $result, $error ) = $future->get();
278              
279             if ( !$error ) {
280             $params->{cb}->( $result, undef );
281             }
282             else {
283             $params->{cb}->( undef, {
284             server_name => $server->{name},
285             server_id => $server->{id},
286             error => $error,
287             } );
288             }
289              
290             undef $future;
291             } );
292              
293             return;
294             }
295              
296             sub _validate_selectrow_hashref {
297             my __PACKAGE__ $self = shift;
298             my $params = shift;
299              
300             $params = validate_with(
301             params => $params,
302             spec => {
303             query => 1,
304             args => 0,
305             cb => 1,
306             server_id => 1,
307             cb_server => 0,
308             },
309             );
310              
311             return $params;
312             }
313              
314             sub selectrow_array {
315             my __PACKAGE__ $self = shift;
316             my $params = {@_};
317              
318             $params = $self->_validate_selectrow_array( $params );
319              
320             my $future = $self->_get_future_push_query(
321             query => $params->{query},
322             args => $params->{args},
323             server => $self->{pool}{ $params->{server_id} },
324             cb_server => $params->{cb_server},
325             type => 'selectrow_array',
326             );
327              
328             $future->on_done( sub {
329             my ( $server, $result, $error ) = $future->get();
330              
331             if ( !$error ) {
332             $params->{cb}->( $result, undef );
333             }
334             else {
335             $params->{cb}->( undef, {
336             server_name => $server->{name},
337             server_id => $server->{id},
338             error => $error,
339             } );
340             }
341              
342             undef $future;
343             } );
344              
345             return;
346             }
347              
348             sub _validate_selectrow_array {
349             my __PACKAGE__ $self = shift;
350             my $params = shift;
351              
352             $params = validate_with(
353             params => $params,
354             spec => {
355             query => 1,
356             args => 0,
357             cb => 1,
358             server_id => 1,
359             cb_server => 0,
360             },
361             );
362              
363             return $params;
364             }
365              
366             sub do {
367             my __PACKAGE__ $self = shift;
368             my $params = {@_};
369              
370             $params = $self->_validate_do( $params );
371              
372             my $future = $self->_get_future_push_query(
373             query => $params->{query},
374             args => $params->{args},
375             server => $self->{pool}{ $params->{server_id} },
376             cb_server => $params->{cb_server},
377             type => 'do',
378             );
379              
380             $future->on_done( sub {
381             my ( $server, $result, $error ) = $future->get();
382              
383             if ( !$error ) {
384             $params->{cb}->( $result, undef );
385             }
386             else {
387             $params->{cb}->( undef, {
388             server_name => $server->{name},
389             server_id => $server->{id},
390             error => $error,
391             } );
392             }
393              
394             undef $future;
395             } );
396              
397             return;
398             }
399              
400             sub _validate_do {
401             my __PACKAGE__ $self = shift;
402             my $params = shift;
403              
404             $params = validate_with(
405             params => $params,
406             spec => {
407             query => 1,
408             args => 0,
409             cb => 1,
410             server_id => 1,
411             cb_server => 0,
412             },
413             );
414              
415             return $params;
416             }
417              
418             1;
419              
420             =head1 NAME
421              
422             AnyEvent::Pg::Pool::Multiserver - Asyncronious multiserver requests to Postgresql with AnyEvent::Pg
423              
424             =head1 SYNOPSIS
425              
426             my $servers = [
427             {
428             id => 1,
429             name => 'remote 1',
430             conn => 'host=remote1 port=5432 dbname=mydb user=myuser password=mypass',
431             },
432             {
433             id => 2,
434             name => 'remote 2',
435             conn => 'host=remote2 port=5432 dbname=mydb user=myuser password=mypass',
436             },
437             ];
438             my $pool = AnyEvent::Pg::Pool::Multiserver->new( servers => $servers, local => 1 );
439              
440             # multi-server request
441              
442             $pool->selectall_arrayref(
443             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
444             args => [ 1 ],
445             cb => sub {
446             my $results = shift;
447             my $errors = shift;
448              
449             if ( $errors ) {
450             foreach my $srv ( @$errors ) {
451             say "err $srv->{error} with $srv->{server_name} $srv->{server_id}";
452             }
453             }
454              
455             if ( $results ) {
456             foreach my $val ( @$results ) {
457             say "server_id=$val->{_server_id} value=$val->{val}";
458             }
459             }
460             },
461             );
462              
463             # single-server request
464              
465             $pool->selectall_arrayref(
466             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
467             args => [ 1 ],
468             server_id => 1,
469             cb => sub { ... },
470             );
471              
472             # multi-server request with sub-callbacks to some data manipulation
473             # and may be to make another request to current server
474              
475             # main request | server_1 select -> ... select end -> cb_server call -> subrequests to current server | wait both | global callback
476             # | server_2 select -> ... select end -> cb_server call -> subrequests to current server | subrequests |
477              
478             $pool->selectall_arrayref(
479             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
480             args => [ 1 ],
481             cb => sub { ... },
482             cb_server => sub {
483             my $params = { @_ };
484              
485             my $result_of_main_request = $params->{result};
486              
487             # Now we can do some sub-request to current server
488              
489             # And MUST call cb
490             $params->{cb}->();
491             },
492             );
493              
494             # single-server request to select row in arrayref
495              
496             $pool->selectrow_array(
497             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
498             args => [ 1 ],
499             server_id => 1,
500             cb => sub {
501             my $result = shift;
502             my $error = shift;
503              
504             if ( $error ) {
505             say "err $error->{error} with $error->{server_name} $error->{server_id}";
506             }
507              
508             if ( $result ) {
509             say "server_id=$result->[ 0 ] value=$result->[ 1 ]";
510             }
511             },
512             );
513              
514             # single-server request to select row in hashref
515              
516             $pool->selectrow_hashref(
517             query => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
518             args => [ 1 ],
519             server_id => 1,
520             cb => sub {
521             my $result = shift;
522             my $error = shift;
523              
524             if ( $error ) {
525             say "err $error->{error} with $error->{server_name} $error->{server_id}";
526             }
527              
528             if ( $result ) {
529             say "server_id=$result->{_server_id} value=$result->{val}";
530             }
531             },
532             );
533              
534             # single-server request to do something
535              
536             $pool->do(
537             query => 'UPDATE table SET column = 1 WHERE id = $1;',
538             args => [ 1 ],
539             server_id => 1,
540             cb => sub {
541             my $result = shift;
542             my $error = shift;
543              
544             if ( $error ) {
545             say "err $error->{error} with $error->{server_name} $error->{server_id}";
546             }
547              
548             if ( $result ) {
549             say "server_id=$result->[ 0 ] updated=$result->[ 1 ]";
550             }
551             },
552             );
553              
554             # local-server request to do something
555              
556             $pool->do(
557             query => 'UPDATE table SET column = 1 WHERE id = $1;',
558             args => [ 1 ],
559             server_id => $pool->local(),
560             cb => sub { ... },
561             );
562              
563             =head1 DESCRIPTION
564              
565             =head2 selectall_arrayref
566              
567             query and args are the same, that in AnyEvent::Pg
568              
569             Required: query, cb
570             Optional: args, cb_server, server_id
571              
572             =head2 selectrow_array
573              
574             query and args are the same, that in AnyEvent::Pg
575              
576             Required: query, server_id, cb
577             Optional: args, cb_server
578              
579             =head2 selectrow_hashref
580              
581             query and args are the same, that in AnyEvent::Pg
582              
583             Required: query, server_id, cb
584             Optional: args, cb_server
585              
586             =head2 do
587              
588             query and args are the same, that in AnyEvent::Pg
589              
590             Required: query, server_id, cb
591             Optional: args, cb_server
592              
593             =head1 SOURCE AVAILABILITY
594              
595             The source code for this module is available
596             at L
597              
598             =head1 AUTHOR
599              
600             Andrey Kuzmin, Ekak-tus@mail.ruE
601              
602             =head1 COPYRIGHT AND LICENSE
603              
604             Copyright (C) 2014 by Andrey Kuzmin
605              
606             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
607              
608             =cut