File Coverage

blib/lib/Gearman/Glutch.pm
Criterion Covered Total %
statement 63 95 66.3
branch 2 16 12.5
condition 1 6 16.6
subroutine 17 21 80.9
pod 3 6 50.0
total 86 144 59.7


line stmt bran cond sub pod time code
1             package Gearman::Glutch;
2 3     3   84837 use strict;
  3         10  
  3         119  
3 3     3   15 use warnings;
  3         9  
  3         97  
4 3     3   890 use utf8;
  3         16  
  3         18  
5 3     3   2543 use parent qw/Class::Accessor::Fast/;
  3         1226  
  3         16  
6              
7 3     3   14131 use 5.010001;
  3         14  
  3         174  
8             our $VERSION = '0.02';
9              
10 3     3   3107 use POSIX ":sys_wait_h";
  3         23437  
  3         23  
11              
12 3     3   6978 use Gearman::Worker;
  3         193860  
  3         245  
13 3     3   3409 use Gearman::Server;
  3         176301  
  3         134  
14 3     3   39 use Danga::Socket;
  3         8  
  3         3379  
15              
16             __PACKAGE__->mk_accessors(qw/server port max_workers max_reqs_per_child on_spawn_child on_complete/);
17              
18             sub new {
19 1     1 1 2727 my $class = shift;
20 1 50       75 my %args = @_==1 ? %{$_[0]} : @_;
  0         0  
21 1   33     43 my $port = $args{port} || Carp::croak "Missing mandatory parameter: port";
22 1         68 return bless {
23             server => Gearman::Server->new(),
24             functions => [],
25             max_workers => 1,
26             %args
27             }, $class;
28             }
29              
30             sub register_function {
31 1     1 1 12361 my $self = shift;
32 1         4 push @{$self->{functions}}, \@_;
  1         20  
33             }
34              
35             sub run {
36 1     1 1 17 my $self = shift;
37              
38 1         48 local $SIG{PIPE} = 'IGNORE'; # handled manually
39              
40 1         29 my $ssock = $self->server->create_listening_sock($self->port);
41 1         778 $self->{ssock} = $ssock;
42              
43             # great graceful shutdown support
44 1         24 $Gearmand::graceful_shutdown = 0;
45             local *Gearmand::shutdown_graceful = sub {
46 0 0   0   0 return if $Gearmand::graceful_shutdown;
47 0         0 my $ofds = Danga::Socket->OtherFds;
48 0         0 delete $ofds->{fileno($ssock)};
49 0         0 $ssock->close;
50 0         0 $Gearmand::graceful_shutdown = 1;
51 0 0       0 exit 0 unless $self->server->jobs_outstanding;
52 1         32 };
53              
54             # spawn workers
55 1         12 for my $i (1..$self->max_workers) {
56 1         24 $self->spawn_worker();
57             }
58              
59             local $SIG{CHLD} = sub {
60 0     0   0 my $kid;
61 0         0 do {
62 0         0 $kid = waitpid(-1, WNOHANG);
63 0 0       0 if ($kid) {
64 0         0 delete $self->{pids}->{$kid};
65             }
66             } while $kid > 0;
67              
68 0         0 while (0+(keys %{$self->{pids}}) < $self->max_workers) {
  0         0  
69 0         0 $self->spawn_worker();
70             }
71 1         53 };
72              
73             $SIG{INT} = $SIG{TERM} = sub {
74 1     1   40002 $self->shutdown();
75 1         215 exit 0;
76 1         26 };
77              
78             # run main loop
79 1         45 Danga::Socket->EventLoop();
80             }
81              
82             sub spawn_worker {
83 1     1 0 2 my $self = shift;
84              
85             my $pid = $self->server->start_worker(sub {
86 0     0   0 close $self->{ssock};
87 0         0 $self->{child}++;
88              
89 0         0 my $worker = Gearman::Worker->new();
90 0         0 for my $func (@{$self->{functions}}) {
  0         0  
91 0         0 $worker->register_function(@$func);
92             }
93 0         0 my $i=0;
94             $worker->work(
95             stop_if => sub {
96 0 0 0     0 if (defined($self->max_reqs_per_child) && $i >= $self->max_reqs_per_child) {
97 0         0 1;
98             } else {
99 0         0 0
100             }
101             },
102             on_complete => sub {
103 0 0       0 $self->on_complete->(@_) if $self->on_complete;
104 0         0 ++$i;
105             },
106 0         0 );
107 0         0 exit 0;
108 1         380 });
109 1 50       11207 $self->on_spawn_child->($pid) if $self->on_spawn_child;
110 1         58 $self->{pids}->{$pid}++;
111 1         40 return $pid;
112             }
113              
114             sub DESTROY {
115 0     0   0 my $self = shift;
116 0 0       0 return if $self->{child};
117 0         0 $self->shutdown();
118             }
119              
120             sub shutdown :method {
121 1     1 0 19 my $self = shift;
122              
123 1         12 $self->signal_all_children('TERM');
124 1         22 $self->_wait_all_children();
125             }
126              
127             sub signal_all_children {
128 1     1 0 19 my ( $self, $sig ) = @_;
129 1         13 for my $pid ( sort keys %{ $self->{pids} } ) {
  1         190  
130 1         68 kill $sig, $pid;
131             }
132             }
133              
134             sub _wait_all_children {
135 1     1   3 my $self = shift;
136              
137 1         7 my $kid;
138 1         4 do {
139 1         54 $kid = waitpid(-1, WNOHANG);
140             } while $kid > 0;
141             }
142              
143             1;
144              
145             __END__