File Coverage

blib/lib/Gearman/Starter.pm
Criterion Covered Total %
statement 45 170 26.4
branch 0 50 0.0
condition 0 11 0.0
subroutine 15 33 45.4
pod 0 7 0.0
total 60 271 22.1


line stmt bran cond sub pod time code
1             package Gearman::Starter;
2 3     3   295008 use 5.008001;
  3         13  
  3         130  
3 3     3   16 use strict;
  3         7  
  3         90  
4 3     3   61 use warnings;
  3         22  
  3         158  
5              
6             our $VERSION = "0.03";
7              
8 3     3   2428 use Gearman::Starter::Util;
  3         8  
  3         92  
9              
10 3     3   4713 use Getopt::Long;
  3         54200  
  3         21  
11 3     3   5355 use Pod::Usage qw/pod2usage/;
  3         192934  
  3         393  
12              
13 3     3   2959 use Class::Inspector;
  3         10110  
  3         108  
14 3     3   2548 use Filesys::Notify::Simple;
  3         5952  
  3         81  
15 3     3   2616 use Gearman::Worker;
  3         52724  
  3         90  
16 3     3   2302 use Hash::Rename qw/hash_rename/;
  3         5340  
  3         169  
17 3     3   2506 use Module::Load ();
  3         2846  
  3         52  
18 3     3   19 use IO::Socket::INET;
  3         3  
  3         33  
19 3     3   4343 use Parallel::Prefork;
  3         17510  
  3         35  
20 3     3   2761 use Parallel::Scoreboard;
  3         14218  
  3         223  
21              
22             use Class::Accessor::Lite (
23 3         34 new => 1,
24             ro => [qw/prefix port listen max_workers max_requests_per_child scoreboard_dir on_fail/],
25             rw => [qw/start_time scoreboard jobs/],
26 3     3   27 );
  3         6  
27              
28             sub reload {
29 0     0 0   my $self = shift;
30 0 0         @{$self->{Reload} || []};
  0            
31             }
32              
33             sub servers {
34 0     0 0   my $self = shift;
35 0 0 0       @{ ref $self->{server} ? $self->{server} : [$self->{server} || ()] };
  0            
36             }
37              
38             sub modules {
39 0     0 0   my $self = shift;
40 0 0 0       @{ ref $self->{module} ? $self->{module} : [$self->{module} || ()] };
  0            
41             }
42              
43 0   0 0 0   sub pid {shift->{pid} ||= []}
44              
45             sub parse_options {
46 0     0 0   my ($class, @argv) = @_;
47              
48 0           my $p = Getopt::Long::Parser->new(
49             config => [qw/default posix_default no_ignore_case auto_help pass_through/]
50             );
51 0           my %opt = (
52             'max-workers' => 10,
53             'max-requests-per-child' => 100,
54             'listen' => '0.0.0.0',
55             );
56 0 0         $p->getoptionsfromarray(\@argv, \%opt, qw/
57             server|s=s@
58             prefix=s
59             max-workers=i
60             max-requests-per-child=i
61             scoreboard-dir=s
62             listen=s
63             port=i
64             Reload|R=s@
65             /) or pod2usage;
66 0 0 0       pod2usage unless $opt{server} && @{$opt{server}};
  0            
67              
68 0           while (@argv) {
69 0           my $mod = shift @argv;
70 0 0         last if $mod eq '--';
71 0           push @{ $opt{module} }, $mod;
  0            
72             }
73 0     0     hash_rename %opt, code => sub {tr/-/_/};
  0            
74              
75 0           (\%opt, \@argv);
76             }
77              
78             sub new_with_options {
79 0     0 0   my ($class, @argv) = @_;
80 0           my ($opt,) = $class->parse_options(@argv);
81 0           $class->new($opt);
82             }
83              
84             sub run {
85 0     0 0   my $self = shift;
86              
87 0 0         if ($self->reload) {
88 0           $self->_launch_watcher;
89             }
90              
91 0           $self->start_time(time);
92              
93 0 0         if ( $self->scoreboard_dir ) {
94 0           $self->scoreboard(Parallel::Scoreboard->new(
95             base_dir => $self->scoreboard_dir,
96             ));
97             }
98              
99 0 0         if ( defined $self->port ) {
100 0           my $pid = $self->_launch_monitor_socket;
101 0           push @{$self->pid}, $pid;
  0            
102             }
103 0           $self->jobs($self->_load_jobs);
104              
105 0           $self->_run;
106             }
107              
108             sub _load_jobs {
109 0     0     my $self = shift;
110 0           my %jobs;
111 0           for my $klass ($self->modules) {
112 0           Module::Load::load($klass);
113 0           my @jobs = grep /^job_/, @{Class::Inspector->functions($klass)};
  0            
114 0           for my $job (@jobs) {
115 0           (my $job_name = $job) =~ s/^job_//; # Sledgeish dispatching
116 0           $jobs{$job_name} = $klass->can($job);
117             }
118             }
119 0           \%jobs;
120             }
121              
122             sub _run {
123 0     0     my $self = shift;
124              
125 0           my $pm = Parallel::Prefork->new({
126             max_workers => $self->max_workers,
127             trap_signals => {
128             TERM => 'TERM',
129             HUP => 'TERM',
130             USR1 => undef,
131             }
132             });
133              
134 0           while ( $pm->signal_received ne 'TERM' ) {
135 0 0         $pm->start and next;
136              
137 0           $0 = "$0 (worker)";
138 0           my $counter = 0;
139 0     0     $SIG{TERM} = sub { $counter = $self->max_requests_per_child };
  0            
140              
141             # Gearman::Worker isn't fork-safe
142 0           my $worker = Gearman::Worker->new;
143 0           $worker->job_servers($self->servers);
144 0 0         $worker->prefix($self->prefix) if $self->prefix;
145 0           my %jobs = %{$self->jobs};
  0            
146 0           for my $job_name (keys %jobs) {
147 0           $worker->register_function($job_name, $jobs{$job_name});
148             }
149              
150 0 0         if ( $self->scoreboard ) {
151 0           $self->scoreboard->update('. 0');
152             }
153              
154             $worker->work(
155             on_start => sub {
156 0     0     $counter++;
157 0 0         $self->scoreboard && $self->scoreboard->update( sprintf "%s %s %s", 'A', $counter, shift);
158             },
159             on_complete => sub {
160 0 0   0     $self->scoreboard && $self->scoreboard->update( sprintf "%s %s", '_', $counter );
161             },
162             ($self->on_fail ? (on_fail => $self->on_fail) : ()),
163             stop_if => sub {
164 0     0     $counter >= $self->max_requests_per_child;
165             }
166 0 0         );
167 0           $pm->finish;
168             }
169              
170 0           $pm->wait_all_children;
171              
172 0           for my $pid ( @{ $self->pid } ) {
  0            
173 0 0         next unless $pid;
174 0           kill 'TERM', $pid;
175 0           waitpid( $pid, 0 );
176             }
177             }
178              
179             sub _launch_watcher {
180 0     0     my $self = shift;
181 0           while ( 1 ) {
182 0           my $pid = fork;
183 0 0         die "fork failed: $!" unless defined $pid;
184 0 0         if ( $pid ) {
185             #main process
186 0           my $watcher = Filesys::Notify::Simple->new([$self->reload, $0]);
187 0           warn "Watching @{[$self->reload]} for file updates.\n";
  0            
188 0           NOTIFY: while ( 1 ) {
189 0           my @restart;
190             # this is blocking
191             $watcher->wait(sub {
192 0     0     my @events = @_;
193 0           @events = grep {
194 0           $_->{path} !~ m![/\\][\._]|\.bak$|~$!
195             } @events;
196 0 0         return unless @events;
197 0           @restart = @events;
198 0           });
199 0 0         next NOTIFY unless @restart;
200 0           for my $ev (@restart) {
201 0           warn "-- $ev->{path} updated.\n";
202             }
203 0           warn "Killing the existing worker (pid:$pid)\n";
204 0           kill 'TERM', $pid;
205 0           waitpid( $pid, 0 );
206 0           warn "Successfully killed! Restarting the new worker process.\n";
207 0           last NOTIFY;
208             }
209             }
210             else {
211             # child process
212 0           return;
213             }
214             }
215             }
216              
217             sub _launch_monitor_socket {
218 0     0     my $self = shift;
219              
220 0           my $sock = IO::Socket::INET->new(
221             Listen => 5,
222             LocalAddr => $self->listen,
223             LocalPort => $self->port,
224             Proto => 'tcp',
225             Reuse => 1,
226             );
227 0 0         die $! unless $sock;
228              
229 0           my $pid = fork;
230 0 0         die "fork failed: $!" unless defined $pid;
231              
232 0 0         if ( $pid ) {
233             #main process
234 0           return $pid;
235             }
236             else {
237             # status worker
238 0           $0 = "$0 (status worker)";
239 0     0     local $SIG{TERM} = sub { exit(0) };
  0            
240 0           while ( 1 ) {
241 0           my $client = $sock->accept();
242 0           my $system_info = 'gearman_servers: ' . join ",", $self->servers;
243 0 0         $system_info .= ' prefix: ' . $self->prefix if $self->prefix;
244 0           $system_info .= ' class: ' . join ",", $self->modules;
245 0           my $uptime = time - $self->start_time;
246 0           print $client <<"EOF";
247             System: $system_info
248             Uptime: $uptime
249             EOF
250 0 0         if ( $self->scoreboard ) {
251 0           my $output = Gearman::Starter::Util::display_scoreboard($self->scoreboard);
252 0           print $client $output;
253             }
254             else {
255 0           print $client "ERROR: scoreboard is disabled\n";
256             }
257 0           $client->close;
258             }
259             }
260             }
261              
262              
263             1;
264             __END__