File Coverage

blib/lib/Gearman/Objects.pm
Criterion Covered Total %
statement 104 110 94.5
branch 38 48 79.1
condition 4 7 57.1
subroutine 23 24 95.8
pod 7 9 77.7
total 176 198 88.8


line stmt bran cond sub pod time code
1             package Gearman::Objects;
2 18     18   1125 use version ();
  18         1385  
  18         626  
3             $Gearman::Objects::VERSION = version->declare("2.003_002");
4              
5 18     18   58 use strict;
  18         22  
  18         292  
6 18     18   53 use warnings;
  18         22  
  18         435  
7              
8             =head1 NAME
9              
10             Gearman::Objects - a parent class for L and L
11              
12             =head1 METHODS
13              
14             =cut
15              
16 18     18   54 use constant DEFAULT_PORT => 4730;
  18         19  
  18         1096  
17              
18 18     18   64 use Carp ();
  18         22  
  18         196  
19 18     18   3873 use IO::Socket::IP ();
  18         149247  
  18         290  
20 18     18   12791 use IO::Socket::SSL ();
  18         627482  
  18         437  
21 18     18   132 use Socket ();
  18         27  
  18         357  
22 18         124 use List::MoreUtils qw/
23             first_index
24 18     18   8923 /;
  18         123699  
25              
26 18         130 use fields qw/
27             debug
28             job_servers
29             js_count
30             prefix
31             sock_cache
32 18     18   10950 /;
  18         6282  
33              
34             sub new {
35 22     22 0 11104 my $self = shift;
36 22         33 my (%opts) = @_;
37 22 100       53 unless (ref $self) {
38 10         25 $self = fields::new($self);
39             }
40 22         2950 $self->{job_servers} = [];
41 22         34 $self->{js_count} = 0;
42              
43             $opts{job_servers}
44 22 100       59 && $self->set_job_servers($opts{job_servers});
45              
46 22         76 $self->debug($opts{debug});
47 22         74 $self->prefix($opts{prefix});
48              
49 22         37 $self->{sock_cache} = {};
50              
51 22         41 return $self;
52             } ## end sub new
53              
54             =head2 job_servers([$js])
55              
56             getter/setter
57              
58             C<$js> array reference, hash reference or scalar
59              
60             =cut
61              
62             sub job_servers {
63 9     9 1 2048 my ($self) = shift;
64 9 100       31 (@_) && $self->set_job_servers(@_);
65              
66 9 100       25 return wantarray ? @{ $self->{job_servers} } : $self->{job_servers};
  8         36  
67             } ## end sub job_servers
68              
69             =head2 set_job_servers($js)
70              
71             set job_servers attribute by canonicalized C<$js>_
72              
73             =cut
74              
75             sub set_job_servers {
76 6     6 1 8 my $self = shift;
77 6         17 my $list = $self->canonicalize_job_servers(@_);
78              
79 6         8 $self->{js_count} = scalar @{$list};
  6         10  
80 6         11 return $self->{job_servers} = $list;
81             } ## end sub set_job_servers
82              
83             =head2 canonicalize_job_servers($js)
84              
85             C<$js> a string, hash reference or array reference of aforementioned.
86              
87             Hash reference should contain at least host key.
88              
89             All keys: host, port (4730 on default), use_ssl, key_file, cert_file,
90             ca_certs, socket_cb
91              
92             B [canonicalized list]
93              
94             =cut
95              
96             sub canonicalize_job_servers {
97 11     11 1 863 my ($self) = shift;
98 11         11 my @in;
99              
100 11         17 my $ref = ref($_[0]);
101 11 100       20 if ($ref) {
102 7 100       22 if ($ref eq "ARRAY") {
    50          
103 3         5 @in = @{ $_[0] };
  3         8  
104             }
105             elsif ($ref eq "HASH") {
106 4         9 @in = ($_[0]);
107             }
108             else {
109 0         0 Carp::croak "unsupported argument type ", ref($_[0]);
110             }
111             } ## end if (is_plain_ref($_[0]...))
112             else {
113 4         6 @in = @_;
114             }
115              
116 11         14 my $out = [];
117 11         20 foreach my $i (@in) {
118 11 100       26 if (ref($i)) {
    100          
119 5   50     12 $i->{port} ||= Gearman::Objects::DEFAULT_PORT;
120             } elsif ($i !~ /:/) {
121 2         4 $i .= ':' . Gearman::Objects::DEFAULT_PORT;
122             }
123 11         9 push @{$out}, $i;
  11         19  
124             } ## end foreach (@in)
125 11         37 return $out;
126             } ## end sub canonicalize_job_servers
127              
128             sub debug {
129 31     31 0 639 return shift->_property("debug", @_);
130             }
131              
132             =head2 func($func)
133              
134             B C<< $prefix ? $prefix\t$func : $func >>
135              
136             =cut
137              
138             sub func {
139 6     6 1 9 my ($self, $func) = @_;
140 6         10 my $prefix = $self->prefix;
141 6 100       24 return defined($prefix) ? join("\t", $prefix, $func) : $func;
142             }
143              
144             =head2 prefix([$prefix])
145              
146             getter/setter
147              
148             =cut
149              
150             sub prefix {
151 34     34 1 620 return shift->_property("prefix", @_);
152             }
153              
154             =head2 socket($js, [$timeout])
155              
156             depends on C
157             prepare L
158             or L
159              
160             =over
161              
162             =item
163              
164             C<$host_port> peer address
165              
166             =item
167              
168             C<$timeout> default: 1
169              
170             =back
171              
172             B depends on C IO::Socket::(IP|SSL) on success
173              
174             =cut
175              
176             sub socket {
177 3     3 1 1504 my ($self, $js, $t) = @_;
178 3 50       13 unless (ref($js)) {
179 0         0 my ($h, $p) = ($js =~ /^(.*):(\d+)$/);
180 0         0 $js = { host => $h, port => $p };
181             }
182              
183             my %opts = (
184             PeerPort => $js->{port},
185             PeerHost => $js->{host},
186 3   100     27 Timeout => $t || 1
187             );
188              
189 3         4 my $sc = "IO::Socket::IP";
190 3 100       10 if ($js->{use_ssl}) {
191 2         4 $sc = "IO::Socket::SSL";
192 2         6 for (qw/ key_file cert_file ca_certs /) {
193 6 50       13 $js->{$_} || next;
194 0         0 $opts{ join('_', "SSL", $_) } = $js->{$_};
195             }
196             } ## end if ($js->{use_ssl})
197              
198 3 100       14 $js->{socket_cb} && $js->{socket_cb}->(\%opts);
199              
200 3         36 my $s = $sc->new(%opts);
201 3 100       76516 unless ($s) {
202             $self->debug() && Carp::carp("connection failed error='$@'",
203             $js->{use_ssl}
204 2 0       11 ? ", ssl_error='$IO::Socket::SSL::SSL_ERROR'"
    50          
205             : "");
206             } ## end unless ($s)
207              
208 3         17 return $s;
209             } ## end sub socket
210              
211             =head2 sock_nodelay($sock)
212              
213             set TCP_NODELAY on $sock, die on failure
214              
215             =cut
216              
217             sub sock_nodelay {
218 0     0 1 0 my ($self, $sock) = @_;
219 0 0       0 setsockopt($sock, Socket::IPPROTO_TCP, Socket::TCP_NODELAY, pack("l", 1))
220             or Carp::croak "setsockopt: $!";
221             }
222              
223             # _sock_cache($js, [$sock, $delete])
224             #
225             # B $sock || undef
226             #
227              
228             sub _sock_cache {
229 5     5   1386 my ($self, $js, $sock, $delete) = @_;
230 5         12 my $hp = $self->_js_str($js);
231 5 100       17 if ($sock) {
232 3         9 $self->{sock_cache}->{$hp} = $sock;
233             }
234              
235             return $delete
236             ? delete($self->{sock_cache}->{$hp})
237 5 100       26 : $self->{sock_cache}->{$hp};
238             } ## end sub _sock_cache
239              
240             #
241             # _property($name, [$value])
242             # set/get
243             sub _property {
244 65     65   69 my $self = shift;
245 65         54 my $name = shift;
246 65 50       106 $name || return;
247 65 100       92 if (@_) {
248 48         79 $self->{$name} = shift;
249             }
250              
251 65         112 return $self->{$name};
252             } ## end sub _property
253              
254             #
255             #_js_str($js)
256             #
257             # return host:port
258             sub _js_str {
259 10     10   502 my ($self, $js) = @_;
260 10 100       40 return ref($js) eq "HASH" ? join(':', @{$js}{qw/host port/}) : $js;
  6         24  
261             }
262              
263             #
264             # _js($js_str)
265             #
266             # return job_servers item || undef
267             #
268             sub _js {
269 1     1   4 my ($self, $js_str) = @_;
270 1         3 my @s = $self->job_servers();
271 1     1   41 my $i = first_index { $js_str eq $self->_js_str($_) } @s;
  1         5  
272 1 50 33     19 return ($i == -1 || $i > $#s) ? undef : $s[$i];
273             } ## end sub _js
274              
275             1;