File Coverage

blib/lib/Gearman/Objects.pm
Criterion Covered Total %
statement 104 110 94.5
branch 38 48 79.1
condition 4 7 57.1
subroutine 23 24 95.8
pod 7 9 77.7
total 176 198 88.8


line stmt bran cond sub pod time code
1             package Gearman::Objects;
2 18     18   1051 use version ();
  18         1384  
  18         642  
3             $Gearman::Objects::VERSION = version->declare("2.003_001");
4              
5 18     18   62 use strict;
  18         19  
  18         272  
6 18     18   48 use warnings;
  18         18  
  18         392  
7              
8             =head1 NAME
9              
10             Gearman::Objects - a parent class for L and L
11              
12             =head1 METHODS
13              
14             =cut
15              
16 18     18   54 use constant DEFAULT_PORT => 4730;
  18         19  
  18         1026  
17              
18 18     18   64 use Carp ();
  18         23  
  18         186  
19 18     18   3264 use IO::Socket::IP ();
  18         155612  
  18         294  
20 18     18   12571 use IO::Socket::SSL ();
  18         615196  
  18         466  
21 18     18   121 use Socket ();
  18         27  
  18         347  
22 18         113 use List::MoreUtils qw/
23             first_index
24 18     18   9016 /;
  18         127706  
25              
26 18         135 use fields qw/
27             debug
28             job_servers
29             js_count
30             prefix
31             sock_cache
32 18     18   11062 /;
  18         6278  
33              
34             sub new {
35 21     21 0 7604 my $self = shift;
36 21         32 my (%opts) = @_;
37 21 100       51 unless (ref $self) {
38 10         34 $self = fields::new($self);
39             }
40 21         3128 $self->{job_servers} = [];
41 21         30 $self->{js_count} = 0;
42              
43             $opts{job_servers}
44 21 100       58 && $self->set_job_servers($opts{job_servers});
45              
46 21         73 $self->debug($opts{debug});
47 21         64 $self->prefix($opts{prefix});
48              
49 21         33 $self->{sock_cache} = {};
50              
51 21         37 return $self;
52             } ## end sub new
53              
54             =head2 job_servers([$js])
55              
56             getter/setter
57              
58             C<$js> array reference, hash reference or scalar
59              
60             =cut
61              
62             sub job_servers {
63 9     9 1 1593 my ($self) = shift;
64 9 100       24 (@_) && $self->set_job_servers(@_);
65              
66 9 100       41 return wantarray ? @{ $self->{job_servers} } : $self->{job_servers};
  8         31  
67             } ## end sub job_servers
68              
69             =head2 set_job_servers($js)
70              
71             set job_servers attribute by canonicalized C<$js>_
72              
73             =cut
74              
75             sub set_job_servers {
76 6     6 1 5 my $self = shift;
77 6         17 my $list = $self->canonicalize_job_servers(@_);
78              
79 6         4 $self->{js_count} = scalar @{$list};
  6         13  
80 6         9 return $self->{job_servers} = $list;
81             } ## end sub set_job_servers
82              
83             =head2 canonicalize_job_servers($js)
84              
85             C<$js> a string, hash reference or array reference of aforementioned.
86              
87             Hash reference should contain at least host key.
88              
89             All keys: host, port (4730 on default), use_ssl, key_file, cert_file,
90             ca_certs, socket_cb
91              
92             B [canonicalized list]
93              
94             =cut
95              
96             sub canonicalize_job_servers {
97 11     11 1 827 my ($self) = shift;
98 11         8 my @in;
99              
100 11         14 my $ref = ref($_[0]);
101 11 100       18 if ($ref) {
102 7 100       16 if ($ref eq "ARRAY") {
    50          
103 3         3 @in = @{ $_[0] };
  3         6  
104             }
105             elsif ($ref eq "HASH") {
106 4         6 @in = ($_[0]);
107             }
108             else {
109 0         0 Carp::croak "unsupported argument type ", ref($_[0]);
110             }
111             } ## end if (is_plain_ref($_[0]...))
112             else {
113 4         5 @in = @_;
114             }
115              
116 11         12 my $out = [];
117 11         15 foreach my $i (@in) {
118 11 100       31 if (ref($i)) {
    100          
119 5   50     12 $i->{port} ||= Gearman::Objects::DEFAULT_PORT;
120             } elsif ($i !~ /:/) {
121 2         4 $i .= ':' . Gearman::Objects::DEFAULT_PORT;
122             }
123 11         9 push @{$out}, $i;
  11         18  
124             } ## end foreach (@in)
125 11         36 return $out;
126             } ## end sub canonicalize_job_servers
127              
128             sub debug {
129 30     30 0 534 return shift->_property("debug", @_);
130             }
131              
132             =head2 func($func)
133              
134             B C<< $prefix ? $prefix\t$func : $func >>
135              
136             =cut
137              
138             sub func {
139 6     6 1 8 my ($self, $func) = @_;
140 6         10 my $prefix = $self->prefix;
141 6 100       25 return defined($prefix) ? join("\t", $prefix, $func) : $func;
142             }
143              
144             =head2 prefix([$prefix])
145              
146             getter/setter
147              
148             =cut
149              
150             sub prefix {
151 33     33 1 476 return shift->_property("prefix", @_);
152             }
153              
154             =head2 socket($js, [$timeout])
155              
156             depends on C
157             prepare L
158             or L
159              
160             =over
161              
162             =item
163              
164             C<$host_port> peer address
165              
166             =item
167              
168             C<$timeout> default: 1
169              
170             =back
171              
172             B depends on C IO::Socket::(IP|SSL) on success
173              
174             =cut
175              
176             sub socket {
177 3     3 1 1063 my ($self, $js, $t) = @_;
178 3 50       11 unless (ref($js)) {
179 0         0 my ($h, $p) = ($js =~ /^(.*):(\d+)$/);
180 0         0 $js = { host => $h, port => $p };
181             }
182              
183             my %opts = (
184             PeerPort => $js->{port},
185             PeerHost => $js->{host},
186 3   100     36 Timeout => $t || 1
187             );
188              
189 3         5 my $sc = "IO::Socket::IP";
190 3 100       8 if ($js->{use_ssl}) {
191 2         4 $sc = "IO::Socket::SSL";
192 2         6 for (qw/ key_file cert_file ca_certs /) {
193 6 50       11 $js->{$_} || next;
194 0         0 $opts{ join('_', "SSL", $_) } = $js->{$_};
195             }
196             } ## end if ($js->{use_ssl})
197              
198 3 100       13 $js->{socket_cb} && $js->{socket_cb}->(\%opts);
199              
200 3         34 my $s = $sc->new(%opts);
201 3 100       63596 unless ($s) {
202             $self->debug() && Carp::carp("connection failed error='$@'",
203             $js->{use_ssl}
204 2 0       9 ? ", ssl_error='$IO::Socket::SSL::SSL_ERROR'"
    50          
205             : "");
206             } ## end unless ($s)
207              
208 3         12 return $s;
209             } ## end sub socket
210              
211             =head2 sock_nodelay($sock)
212              
213             set TCP_NODELAY on $sock, die on failure
214              
215             =cut
216              
217             sub sock_nodelay {
218 0     0 1 0 my ($self, $sock) = @_;
219 0 0       0 setsockopt($sock, Socket::IPPROTO_TCP, Socket::TCP_NODELAY, pack("l", 1))
220             or Carp::croak "setsockopt: $!";
221             }
222              
223             # _sock_cache($js, [$sock, $delete])
224             #
225             # B $sock || undef
226             #
227              
228             sub _sock_cache {
229 5     5   784 my ($self, $js, $sock, $delete) = @_;
230 5         11 my $hp = $self->_js_str($js);
231 5 100       16 if ($sock) {
232 3         7 $self->{sock_cache}->{$hp} = $sock;
233             }
234              
235             return $delete
236             ? delete($self->{sock_cache}->{$hp})
237 5 100       22 : $self->{sock_cache}->{$hp};
238             } ## end sub _sock_cache
239              
240             #
241             # _property($name, [$value])
242             # set/get
243             sub _property {
244 63     63   71 my $self = shift;
245 63         40 my $name = shift;
246 63 50       95 $name || return;
247 63 100       90 if (@_) {
248 46         80 $self->{$name} = shift;
249             }
250              
251 63         106 return $self->{$name};
252             } ## end sub _property
253              
254             #
255             #_js_str($js)
256             #
257             # return host:port
258             sub _js_str {
259 10     10   232 my ($self, $js) = @_;
260 10 100       28 return ref($js) eq "HASH" ? join(':', @{$js}{qw/host port/}) : $js;
  6         20  
261             }
262              
263             #
264             # _js($js_str)
265             #
266             # return job_servers item || undef
267             #
268             sub _js {
269 1     1   2 my ($self, $js_str) = @_;
270 1         3 my @s = $self->job_servers();
271 1     1   17 my $i = first_index { $js_str eq $self->_js_str($_) } @s;
  1         3  
272 1 50 33     11 return ($i == -1 || $i > $#s) ? undef : $s[$i];
273             } ## end sub _js
274              
275             1;