File Coverage

blib/lib/GearmanX/Starter.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package GearmanX::Starter;
2              
3 1     1   25967 use strict;
  1         2  
  1         44  
4 1     1   6 use warnings;
  1         3  
  1         35  
5              
6 1     1   504 use Gearman::XS qw(:constants);
  0            
  0            
7             use Gearman::XS::Worker;
8             use Perl::Unsafe::Signals;
9             use POSIX;
10              
11             our $VERSION = '0.05';
12              
13             our $WORKER;
14              
15             our $QUIT;
16              
17             sub new {
18             my $class = shift;
19              
20             bless {}, $class;
21             }
22              
23             sub start {
24             my $self = shift;
25              
26             my $args = shift;
27              
28             my $worker_name = $args->{name} || die "Need name for worker";
29              
30             my $init_func = $args->{init_func};
31              
32             my $logger = $args->{logger};
33              
34             my $servers = $args->{servers} || [[]];
35              
36             my $sigterm = $args->{sigterm} || [ 'TERM' ];
37             my $sleep = $args->{sleep_and_retry} || 0;
38              
39             $logger->info("Forking daemon for $worker_name") if $logger;
40              
41             _Init() and return 1;
42              
43             my $critical;
44             for my $sig (@$sigterm) {
45             $SIG{$sig} = sub {
46             $QUIT++;
47             die "GearmanXQuitLoop\n" if !$critical
48             };
49             }
50             $init_func->() if $init_func;
51              
52             $logger->info("Creating $worker_name worker") if $logger;
53             $0 = $worker_name;
54              
55             $WORKER = $args->{worker} || Gearman::XS::Worker->new;
56              
57             for my $server (@$servers) {
58             if ($WORKER->add_server(@$server) != GEARMAN_SUCCESS) {
59             $logger->logdie("Unable to add job server [@$server] to worker $worker_name: " . $WORKER->error)
60             if $logger;
61             }
62             }
63              
64             # Wrap each function in another function that flags
65             # that we're in a critical section of code
66             $logger->info("Adding functions to $worker_name worker") if $logger;
67             my $func_list = $args->{func_list} || [];
68             if ( !exists($args->{dereg_func}) || $args->{dereg_func} ) {
69             my $dereg_fn_name = $args->{dereg_func} || "dereg:$$";
70             $dereg_fn_name =~ s/%PID%/$$/;
71             push @$func_list, [ $dereg_fn_name, \&_unregister ];
72             }
73             for my $fun (@$func_list) {
74             my ($name, $f, $dont_wrap, $options) = @$fun;
75             my $wrapper = $dont_wrap ? $f : sub { $critical = 1; goto $f };
76             my $ret2 = $WORKER->add_function($name, 0, $wrapper, $options);
77             if ($ret2 != GEARMAN_SUCCESS) {
78             $logger->logdie("Failed to register callback function ($name) for worker $worker_name:" . $WORKER->error)
79             if $logger;
80             }
81             }
82              
83             $logger->info("Starting $worker_name loop") if $logger;
84             my $error_method = $sleep ? 'logwarn' : 'logdie';
85             while (1) {
86             my $res = eval {
87             $critical = 0;
88             my $ret;
89             UNSAFE_SIGNALS { $ret = $WORKER->work };
90             if ($ret != GEARMAN_SUCCESS) {
91             $logger->$error_method('Failed to initiate waiting for a job: '. $WORKER->error)
92             if $logger;
93             sleep $sleep;
94             }
95             1;
96             };
97             if ( !$res && $@ !~ /GearmanXQuitLoop/ ) {
98             $logger->logdie("Error running loop for worker $worker_name [$@]:".$WORKER->error)
99             if $logger;
100             }
101             last if $QUIT;
102             }
103             $logger->info("Exiting $worker_name")
104             if $logger;
105             exit 0;
106             }
107              
108             # Daemon code co-opted from Proc::Daemon
109             sub _Fork {
110             my $self = shift;
111              
112             my $pid;
113             if (defined($pid = fork)) {
114             return $pid;
115             } else {
116             die "Can't fork: $!";
117             }
118             }
119              
120             sub _OpenMax {
121             my $openmax = POSIX::sysconf( &POSIX::_SC_OPEN_MAX );
122             (!defined($openmax) || $openmax < 0) ? 64 : $openmax;
123             }
124              
125             # Daemonize this process
126             sub _Init {
127             my $sess_id;
128              
129             _Fork() and return 1;
130              
131             die "Unable to detach from controlling terminal"
132             unless $sess_id = POSIX::setsid();
133              
134             $SIG{'HUP'} = 'IGNORE';
135              
136             _Fork() and exit 0;
137              
138             ## Change working directory
139             chdir "/";
140             ## Clear file creation mask
141             umask 0;
142             ## Close open file descriptors
143             foreach my $i (0 .. _OpenMax()) { POSIX::close($i); }
144              
145             ## Reopen stderr, stdout, stdin to /dev/null
146             open(STDIN, "+>/dev/null");
147             open(STDOUT, "+>&STDIN");
148             open(STDERR, "+>&STDIN");
149              
150             return 0;
151             }
152              
153             sub _unregister {
154             my $job = shift;
155              
156             $WORKER->unregister($job->workload);
157              
158             return "1";
159             }
160              
161             1;
162              
163             __END__