File Coverage

blib/lib/Clutch/Client.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Clutch::Client;
2 5     5   579077 use strict;
  5         12  
  5         168  
3 5     5   25 use warnings;
  5         9  
  5         128  
4 5     5   2335 use Clutch::Utils;
  0            
  0            
5             use IO::Select;
6             use Carp ();
7              
8             sub new {
9             my $class = shift;
10             my %args = @_ == 1 ? %{$_[0]} : @_;
11              
12             Carp::croak "Mandatory parameter 'servers'" unless $args{servers};
13              
14             my $rr = delete $args{rr} || 'Clutch::Client::RR';
15              
16             bless {
17             servers => undef,
18             timeout => 10,
19             rr => $rr->new($args{servers}),
20             %args,
21             }, $class;
22             }
23              
24             sub request_background {
25             my ($self, $function, $args) = @_;
26             $self->_request('request_background', $function, $args);
27             }
28              
29             sub request {
30             my ($self, $function, $args) = @_;
31             $self->_request('request', $function, $args);
32             }
33              
34             sub _request {
35             my ($self, $cmd_name, $function, $args) = @_;
36              
37             my $server = $self->{rr}->next;
38             my $sock = Clutch::Utils::new_client($server);
39              
40             my $msg = Clutch::Utils::make_request($cmd_name, $function, $args);
41             Clutch::Utils::write_all($sock, $msg, $self->{timeout}, $self);
42              
43             my $buf='';
44             while (1) {
45             my $rlen = Clutch::Utils::read_timeout(
46             $sock, \$buf, $MAX_REQUEST_SIZE - length($buf), length($buf), $self->{timeout}, $self
47             ) or return;
48              
49             Clutch::Utils::verify_buffer($buf) and last;
50             }
51             $sock->close();
52             return $buf ? Clutch::Utils::json->decode($buf) : undef;
53             }
54              
55             sub request_multi {
56             my ($self, $args) = @_;
57             $self->_verify_multi_args($args);
58             $self->_request_multi('request', $args);
59             }
60              
61             sub request_background_multi {
62             my ($self, $args) = @_;
63             $self->_verify_multi_args($args);
64             $self->_request_multi('request_background', $args);
65             }
66              
67             sub _verify_multi_args {
68             my ($self, $args) = @_;
69              
70             for my $arg (@$args) {
71             if ($arg->{function} eq '') {
72             Carp::croak "there is no function to the argument of multi_request";
73             }
74             }
75             }
76              
77             sub _request_multi {
78             my ($self, $cmd_name, $args) = @_;
79              
80             my $request_count = scalar(@$args);
81             my $is = IO::Select->new;
82              
83             my %sockets_map;
84             for my $i (0 .. ($request_count - 1)) {
85             my $server = $self->{rr}->next;
86             my $sock = Clutch::Utils::new_client($server);
87             $is->add($sock);
88             $sockets_map{$sock}=$i;
89              
90             my $msg = Clutch::Utils::make_request($cmd_name, $args->[$i]->{function}, ($args->[$i]->{args}||''));
91             Clutch::Utils::write_all($sock, $msg, $self->{timeout}, $self);
92             }
93              
94             my @res;
95             while ($request_count) {
96             if (my @ready = $is->can_read($self->{timeout})) {
97             for my $sock (@ready) {
98             my $buf='';
99             while (1) {
100             my $rlen = Clutch::Utils::read_timeout(
101             $sock, \$buf, $MAX_REQUEST_SIZE - length($buf), length($buf), $self->{timeout}, $self
102             ) or return;
103              
104             Clutch::Utils::verify_buffer($buf) and last;
105             }
106             my $idx = $sockets_map{$sock};
107              
108             $request_count--;
109             $is->remove($sock);
110             $sock->close();
111              
112             $res[$idx] = $buf ? Clutch::Utils::json->decode($buf) : undef;
113             }
114             }
115             }
116             wantarray ? @res : \@res;
117             }
118              
119             package
120             Clutch::Client::RR;
121              
122             sub new {
123             my ($class, $servers) = @_;
124             bless +{
125             servers => $servers,
126             }, $class;
127             }
128              
129             sub next {
130             my $self = shift;
131             push(@{$self->{servers}}, shift(@{$self->{servers}}));
132             $self->{servers}[0];
133             }
134              
135             1;
136              
137             __END__