File Coverage

blib/lib/Gearman/Server/Listener.pm
Criterion Covered Total %
statement 35 52 67.3
branch 3 12 25.0
condition 1 3 33.3
subroutine 8 10 80.0
pod 2 2 100.0
total 49 79 62.0


line stmt bran cond sub pod time code
1             package Gearman::Server::Listener;
2 3     3   20 use version ();
  3         7  
  3         168  
3             $Gearman::Server::Listener::VERSION = version->declare("1.140_001");
4              
5 3     3   21 use strict;
  3         3  
  3         109  
6 3     3   17 use warnings;
  3         6  
  3         153  
7              
8             =head1 NAME
9              
10             Gearman::Server::Listener - a listener for L
11              
12             =head1 DESCRIPTION
13              
14             Based on L
15              
16             =cut
17              
18 3     3   20 use base 'Danga::Socket';
  3         5  
  3         644  
19 3         28 use fields qw/
20             server
21             accept_per_loop
22 3     3   22 /;
  3         67  
23              
24 3     3   281 use Errno qw(EAGAIN);
  3         6  
  3         268  
25 3         1782 use Socket qw/
26             IPPROTO_TCP
27             TCP_NODELAY
28             SOL_SOCKET
29             SO_ERROR
30 3     3   18 /;
  3         5  
31              
32             =head1 METHODS
33              
34             =cut
35             sub new {
36 7     7 1 14 my Gearman::Server::Listener $self = shift;
37 7         9 my $sock = shift;
38 7         11 my $server = shift;
39              
40 7         23 my %opts = @_;
41              
42 7         12 my $accept_per_loop = delete $opts{accept_per_loop};
43              
44 7 50       28 warn "Extra options passed into new: " . join(', ', keys %opts) . "\n"
45             if keys %opts;
46              
47 7 50 33     24 $accept_per_loop = 10
48             unless defined $accept_per_loop and $accept_per_loop >= 1;
49              
50 7 50       42 $self = fields::new($self) unless ref $self;
51              
52             # make sure provided listening socket is non-blocking
53 7         1352 IO::Handle::blocking($sock, 0);
54              
55 7         51 $self->SUPER::new($sock);
56              
57 7         419 $self->{server} = $server;
58 7         17 $self->{accept_per_loop} = int($accept_per_loop);
59              
60 7         37 $self->watch_read(1);
61              
62 7         174 return $self;
63             } ## end sub new
64              
65             =head2 event_read()
66              
67             wait for connection
68              
69             =cut
70             sub event_read {
71 0     0 1   my Gearman::Server::Listener $self = shift;
72              
73 0           my $listen_sock = $self->sock;
74              
75 0           local $!;
76 0           local $SIG{PIPE} = "IGNORE";
77              
78 0           my $remaining = $self->{accept_per_loop};
79              
80 0           while (my $csock = $listen_sock->accept) {
81 0           IO::Handle::blocking($csock, 0);
82 0 0         setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
83              
84 0           my $server = $self->{server};
85              
86 0           $server->debug(
87             sprintf("Listen child making a Client for %d.", fileno($csock)));
88 0           $server->new_client($csock);
89 0 0         return unless $remaining-- > 0;
90             } ## end while (my $csock = $listen_sock...)
91              
92 0 0         return if $! == EAGAIN;
93              
94 0           warn "Error accepting incoming connection: $!\n";
95              
96 0           $self->watch_read(0);
97              
98             Danga::Socket->AddTimer(
99             .1,
100             sub {
101 0     0     $self->watch_read(1);
102             }
103 0           );
104             } ## end sub event_read
105              
106             1;