File Coverage

blib/lib/Gearman/Server/Listener.pm
Criterion Covered Total %
statement 21 52 40.3
branch 0 12 0.0
condition 0 3 0.0
subroutine 7 10 70.0
pod 2 2 100.0
total 30 79 37.9


line stmt bran cond sub pod time code
1             package Gearman::Server::Listener;
2 1     1   3 use version;
  1         1  
  1         5  
3             $Gearman::Server::Listener::VERSION = qv("v1.130.1");
4              
5 1     1   61 use strict;
  1         1  
  1         15  
6 1     1   3 use warnings;
  1         1  
  1         20  
7              
8 1     1   4 use base 'Danga::Socket';
  1         1  
  1         88  
9 1         4 use fields qw/
10             server
11             accept_per_loop
12 1     1   4 /;
  1         1  
13              
14 1     1   37 use Errno qw(EAGAIN);
  1         1  
  1         36  
15 1         297 use Socket qw/
16             IPPROTO_TCP
17             TCP_NODELAY
18             SOL_SOCKET
19             SO_ERROR
20 1     1   3 /;
  1         1  
21              
22             sub new {
23 0     0 1   my Gearman::Server::Listener $self = shift;
24 0           my $sock = shift;
25 0           my $server = shift;
26              
27 0           my %opts = @_;
28              
29 0           my $accept_per_loop = delete $opts{accept_per_loop};
30              
31 0 0         warn "Extra options passed into new: " . join(', ', keys %opts) . "\n"
32             if keys %opts;
33              
34 0 0 0       $accept_per_loop = 10
35             unless defined $accept_per_loop and $accept_per_loop >= 1;
36              
37 0 0         $self = fields::new($self) unless ref $self;
38              
39             # make sure provided listening socket is non-blocking
40 0           IO::Handle::blocking($sock, 0);
41              
42 0           $self->SUPER::new($sock);
43              
44 0           $self->{server} = $server;
45 0           $self->{accept_per_loop} = int($accept_per_loop);
46              
47 0           $self->watch_read(1);
48              
49 0           return $self;
50             } ## end sub new
51              
52             sub event_read {
53 0     0 1   my Gearman::Server::Listener $self = shift;
54              
55 0           my $listen_sock = $self->sock;
56              
57 0           local $!;
58 0           local $SIG{PIPE} = "IGNORE";
59              
60 0           my $remaining = $self->{accept_per_loop};
61              
62 0           while (my $csock = $listen_sock->accept) {
63 0           IO::Handle::blocking($csock, 0);
64 0 0         setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
65              
66 0           my $server = $self->{server};
67              
68 0           $server->debug(
69             sprintf("Listen child making a Client for %d.", fileno($csock)));
70 0           $server->new_client($csock);
71 0 0         return unless $remaining-- > 0;
72             } ## end while (my $csock = $listen_sock...)
73              
74 0 0         return if $! == EAGAIN;
75              
76 0           warn "Error accepting incoming connection: $!\n";
77              
78 0           $self->watch_read(0);
79              
80             Danga::Socket->AddTimer(
81             .1,
82             sub {
83 0     0     $self->watch_read(1);
84             }
85 0           );
86             } ## end sub event_read
87              
88             1;