File Coverage

blib/lib/Gearman/Objects.pm
Criterion Covered Total %
statement 103 109 94.5
branch 36 46 78.2
condition 4 7 57.1
subroutine 23 24 95.8
pod 6 8 75.0
total 172 194 88.6


line stmt bran cond sub pod time code
1             package Gearman::Objects;
2 18     18   1724 use version ();
  18         2314  
  18         729  
3             $Gearman::Objects::VERSION = version->declare("2.002.001"); #TRIAL
4              
5 18     18   75 use strict;
  18         24  
  18         357  
6 18     18   72 use warnings;
  18         20  
  18         500  
7              
8             =head1 NAME
9              
10             Gearman::Objects - a parent class for L and L
11              
12             =head1 METHODS
13              
14             =cut
15              
16 18     18   71 use constant DEFAULT_PORT => 4730;
  18         23  
  18         1326  
17              
18 18     18   86 use Carp ();
  18         40  
  18         333  
19 18     18   4225 use IO::Socket::IP ();
  18         187473  
  18         379  
20 18     18   15112 use IO::Socket::SSL ();
  18         735173  
  18         585  
21 18     18   142 use Socket ();
  18         27  
  18         465  
22 18         120 use List::MoreUtils qw/
23             first_index
24 18     18   11431 /;
  18         153974  
25 18         1812 use Ref::Util qw/
26             is_plain_arrayref
27             is_plain_hashref
28             is_plain_ref
29             is_ref
30 18     18   18955 /;
  18         10197  
31              
32 18         142 use fields qw/
33             debug
34             job_servers
35             js_count
36             prefix
37             sock_cache
38 18     18   3085 /;
  18         7982  
39              
40             sub new {
41 24     24 0 12459 my $self = shift;
42 24         50 my (%opts) = @_;
43 24 100       90 unless (is_ref($self)) {
44 10         28 $self = fields::new($self);
45             }
46 24         3788 $self->{job_servers} = [];
47 24         42 $self->{js_count} = 0;
48              
49             $opts{job_servers}
50 24 100       78 && $self->set_job_servers($opts{job_servers});
51              
52 24         104 $self->debug($opts{debug});
53 24         102 $self->prefix($opts{prefix});
54              
55 24         47 $self->{sock_cache} = {};
56              
57 24         58 return $self;
58             } ## end sub new
59              
60             =head2 job_servers([$js])
61              
62             getter/setter
63              
64             C<$js> array reference or scalar
65              
66             =cut
67              
68             sub job_servers {
69 10     10 1 2452 my ($self) = shift;
70 10 100       77 (@_) && $self->set_job_servers(@_);
71              
72 10 100       29 return wantarray ? @{ $self->{job_servers} } : $self->{job_servers};
  9         40  
73             } ## end sub job_servers
74              
75             =head2 set_job_servers($js)
76              
77             set job_servers attribute by canonicalized C<$js>_
78              
79             =cut
80              
81             sub set_job_servers {
82 7     7 1 10 my $self = shift;
83 7         21 my $list = $self->canonicalize_job_servers(@_);
84              
85 7         8 $self->{js_count} = scalar @{$list};
  7         13  
86 7         12 return $self->{job_servers} = $list;
87             } ## end sub set_job_servers
88              
89             =head2 canonicalize_job_servers($js)
90              
91             C<$js> a string, hash reference or array reference of aforementioned.
92              
93             Hash reference should contain at least host key.
94              
95             All keys: host, port (4730 on default), use_ssl, key_file, cert_file,
96             ca_certs, socket_cb
97              
98             B [canonicalized list]
99              
100             =cut
101              
102             sub canonicalize_job_servers {
103 12     12 1 1066 my ($self) = shift;
104 12         17 my @in;
105 12 100       30 if (is_plain_ref($_[0])) {
106 8 100       21 if (is_plain_arrayref($_[0])) {
    50          
107 4         6 @in = @{ $_[0] };
  4         9  
108             }
109             elsif (is_plain_hashref($_[0])) {
110 4         9 @in = ($_[0]);
111             }
112             else {
113 0         0 Carp::croak "unsupported argument type ", ref($_[0]);
114             }
115             } ## end if (is_plain_ref($_[0]...))
116             else {
117 4         7 @in = @_;
118             }
119              
120 12         18 my $out = [];
121 12         26 foreach my $i (@in) {
122 11 100       34 if (is_ref($i)) {
    100          
123 5   50     12 $i->{port} ||= Gearman::Objects::DEFAULT_PORT;
124             }
125             elsif ($i !~ /:/) {
126 2         4 $i .= ':' . Gearman::Objects::DEFAULT_PORT;
127             }
128 11         10 push @{$out}, $i;
  11         22  
129             } ## end foreach my $i (@in)
130              
131 12         41 return $out;
132             } ## end sub canonicalize_job_servers
133              
134             sub debug {
135 33     33 0 798 return shift->_property("debug", @_);
136             }
137              
138             =head2 prefix([$prefix])
139              
140             getter/setter
141              
142             =cut
143              
144             sub prefix {
145 32     32 1 761 return shift->_property("prefix", @_);
146             }
147              
148             =head2 socket($js, [$timeout])
149              
150             depends on C
151             prepare L
152             or L
153              
154             =over
155              
156             =item
157              
158             C<$host_port> peer address
159              
160             =item
161              
162             C<$timeout> default: 1
163              
164             =back
165              
166             B depends on C IO::Socket::(IP|SSL) on success
167              
168             =cut
169              
170             sub socket {
171 3     3 1 1907 my ($self, $js, $t) = @_;
172 3 50       19 unless (is_ref($js)) {
173 0         0 my ($h, $p) = ($js =~ /^(.*):(\d+)$/);
174 0         0 $js = { host => $h, port => $p };
175             }
176              
177             my %opts = (
178             PeerPort => $js->{port},
179             PeerHost => $js->{host},
180 3   100     31 Timeout => $t || 1
181             );
182              
183 3         8 my $sc = "IO::Socket::IP";
184 3 100       10 if ($js->{use_ssl}) {
185 2         3 $sc = "IO::Socket::SSL";
186 2         5 for (qw/ key_file cert_file ca_certs /) {
187 6 50       17 $js->{$_} || next;
188 0         0 $opts{ join('_', "SSL", $_) } = $js->{$_};
189             }
190             } ## end if ($js->{use_ssl})
191              
192 3 100       15 $js->{socket_cb} && $js->{socket_cb}->(\%opts);
193              
194 3         44 my $s = $sc->new(%opts);
195 3 100       79044 unless ($s) {
196             $self->debug() && Carp::carp("connection failed error='$@'",
197             $js->{use_ssl}
198 2 0       10 ? ", ssl_error='$IO::Socket::SSL::SSL_ERROR'"
    50          
199             : "");
200             } ## end unless ($s)
201              
202 3         16 return $s;
203             } ## end sub socket
204              
205             =head2 sock_nodelay($sock)
206              
207             set TCP_NODELAY on $sock, die on failure
208              
209             =cut
210              
211             sub sock_nodelay {
212 0     0 1 0 my ($self, $sock) = @_;
213 0 0       0 setsockopt($sock, Socket::IPPROTO_TCP, Socket::TCP_NODELAY, pack("l", 1))
214             or Carp::croak "setsockopt: $!";
215             }
216              
217             # _sock_cache($js, [$sock, $delete])
218             #
219             # B $sock || undef
220             #
221              
222             sub _sock_cache {
223 5     5   1309 my ($self, $js, $sock, $delete) = @_;
224 5         11 my $hp = $self->_js_str($js);
225 5 100       15 if ($sock) {
226 3         8 $self->{sock_cache}->{$hp} = $sock;
227             }
228              
229             return $delete
230             ? delete($self->{sock_cache}->{$hp})
231 5 100       30 : $self->{sock_cache}->{$hp};
232             } ## end sub _sock_cache
233              
234             #
235             # _property($name, [$value])
236             # set/get
237             sub _property {
238 65     65   97 my $self = shift;
239 65         64 my $name = shift;
240 65 50       123 $name || return;
241 65 100       118 if (@_) {
242 52         107 $self->{$name} = shift;
243             }
244              
245 65         130 return $self->{$name};
246             } ## end sub _property
247              
248             #
249             #_js_str($js)
250             #
251             # return host:port
252             sub _js_str {
253 10     10   371 my ($self, $js) = @_;
254 10 100       33 return is_plain_hashref($js) ? join(':', @{$js}{qw/host port/}) : $js;
  6         25  
255             }
256              
257             #
258             # _js($js_str)
259             #
260             # return job_servers item || undef
261             #
262             sub _js {
263 1     1   2 my ($self, $js_str) = @_;
264 1         4 my @s = $self->job_servers();
265 1     1   17 my $i = first_index { $js_str eq $self->_js_str($_) } @s;
  1         4  
266 1 50 33     14 return ($i == -1 || $i > $#s) ? undef : $s[$i];
267             } ## end sub _js
268              
269             1;