File Coverage

blib/lib/Proclet.pm
Criterion Covered Total %
statement 148 194 76.2
branch 31 62 50.0
condition 11 16 68.7
subroutine 23 26 88.4
pod 2 6 33.3
total 215 304 70.7


line stmt bran cond sub pod time code
1             package Proclet;
2              
3 8     8   236837 use strict;
  8         16  
  8         333  
4 8     8   41 use warnings;
  8         17  
  8         253  
5 8     8   8325 use Parallel::Prefork 0.13;
  8         90534  
  8         89  
6 8     8   446 use Carp;
  8         17  
  8         998  
7 8     8   8789 use Data::Validator;
  8         2408682  
  8         292  
8 8     8   89 use Mouse;
  8         16  
  8         73  
9 8     8   2096 use Mouse::Util::TypeConstraints;
  8         17  
  8         46  
10 8     8   13966 use Log::Minimal env_debug => 'PROCLET_DEBUG';
  8         230939  
  8         61  
11 8     8   9912 use IO::Select;
  8         14412  
  8         368  
12 8     8   123 use Term::ANSIColor;
  8         16  
  8         417  
13 8     8   7197 use File::Which;
  8         8050  
  8         438  
14 8     8   6938 use Time::Crontab;
  8         64285  
  8         268  
15 8     8   8652 use String::ShellQuote;
  8         7609  
  8         6990  
16              
17             subtype 'ServiceProcs'
18             => as 'Int'
19             => where { $_ >= 0 };
20              
21             subtype 'ServicePort'
22             => as 'Int'
23             => where { $_ > 0 && $_ % 1000 == 0 };
24              
25             subtype 'Proclet::Service'
26             => as 'HashRef'
27             => message { "This argument must be String or ArrayRef or CodeRef" };
28             coerce 'Proclet::Service'
29             => from 'ArrayRef' => via {
30             my $command = $_;
31             +{generator => sub {
32             my $notice = shift;
33             my @command = @{$command};
34             my @o_command = @command;
35             my $bash = which("bash");
36             if ( @command == 1 && $bash ) { unshift @command, $bash, "-c" }
37             sub {
38             infof "Exec command: ". shell_quote(@o_command)."\n" if $notice;
39             exec(@command);
40             die $!
41             }
42             }}
43             }
44             => from 'Str' => via {
45             my $o_command = $_;
46             +{generator => sub {
47             my ($notice, $port, $tag) = @_;
48             my $command = $o_command; #copy
49             $command =~ s/\$PORT/$port/g if $port;
50             my @command = ($command);
51             my @o_command = @command;
52             if ( my $bash = which("bash") ) { unshift @command, $bash, "-c" }
53             sub {
54             infof "Exec command: ". shell_quote(@o_command)."\n" if $notice;
55             exec(@command);
56             die $!
57             }
58             }}
59             }
60             => from 'CodeRef' => via {
61             my $command = $_;
62             +{generator => sub {
63             my ($notice, $port, $tag) = @_;
64             sub {
65             infof "Start callback: " . $tag . "\n" if $notice;
66             $command->($port);
67             }
68             }};
69             };
70              
71              
72             subtype 'Proclet::Scheduler'
73             => as 'CodeRef'
74             => message { "This argument must be String or CodeRef" };
75             coerce 'Proclet::Scheduler'
76             => from 'Str' => via {
77             my $str = $_;
78             my $crontab = Time::Crontab->new($str);
79             sub {
80             my $unixtime = shift;
81             $crontab->match($unixtime);
82             }
83             };
84              
85              
86 8     8   260 no Mouse::Util::TypeConstraints;
  8         11  
  8         116  
87              
88             our $VERSION = '0.35';
89              
90             has '_services' => (
91             is => 'ro',
92             isa => 'ArrayRef',
93             default => sub { [] },
94             );
95              
96             has 'spawn_interval' => (
97             is => 'ro',
98             isa => 'Int',
99             default => 0,
100             );
101              
102             has 'err_respawn_interval' => (
103             is => 'ro',
104             isa => 'Int',
105             default => 1,
106             );
107              
108             has 'color' => (
109             is => 'rw',
110             isa => 'Bool',
111             default => 0,
112             );
113              
114             has 'logger' => (
115             is => 'ro',
116             isa => 'CodeRef',
117             required => 0,
118             );
119              
120             has 'enable_log_worker' => (
121             is => 'ro',
122             isa => 'Int',
123             default => 1,
124             );
125              
126             has 'exec_notice' => (
127             is => 'ro',
128             isa => 'Int',
129             default => 1,
130             );
131              
132             # for Procfile port assignment
133             has '_base_port' => (
134             is => 'ro',
135             isa => 'ServicePort',
136             required => 0,
137             );
138              
139             my $rule = Data::Validator->new(
140             code => { isa => 'Proclet::Service', coerce => 1 },
141             worker => { isa => 'ServiceProcs', default => 1 },
142             tag => { isa => 'Str', optional => 1 },
143             every => { isa => 'Proclet::Scheduler', coerce => 1, optional => 1 },
144             )->with('Method');
145              
146             our @COLORS = qw/green magenta blue yellow cyan/;
147              
148             sub service {
149 7     7 1 2274 my($self, $args) = $rule->validate(@_);
150 7   100     532 $self->{service_num} ||= 0;
151 7         12 $self->{service_num}++;
152 7   100     170 $self->{tags} ||= {};
153 7 50 33     383 my $tag = ( exists $args->{tag} && defined $args->{tag} ) ? $args->{tag} : $self->{service_num};
154 7 50       115 die "tag: $tag is already exists" if exists $self->{tags}->{$tag};
155 7         67 $self->{tags}->{$tag} = 1;
156 7         21 my $port = 0;
157 7 50       93 if ( $self->{_base_port} ) {
158 0         0 $port = $self->{_base_port};
159 0         0 $self->{_base_port} += 100;
160             }
161 7 50 33     110 my $cron = ( exists $args->{every} && defined $args->{every} ) ? $args->{every} : '';
162 7         17 push @{$self->_services}, {
  7         135  
163             code => $args->{code},
164             worker => $args->{worker},
165             tag => $tag,
166             start_port => $port,
167             color => $COLORS[ $self->{service_num} % @COLORS ],
168             cron => $cron
169             };
170             }
171              
172             my $LOGGER = '__log__';
173              
174             sub run {
175 4     4 1 48 my $self = shift;
176              
177 4         25 my %services;
178             my @services;
179 4         11 for my $service ( @{$self->_services} ) {
  4         31  
180 7         21 my $worker = $service->{worker};
181 7         49 for ( my $i = 1; $i <= $worker; $i++ ) {
182 16         92 my $sid = $service->{tag} . '.' . $i;
183 16         417 $services{$sid} = { %$service };
184 16         62 my $port = $services{$sid}{start_port} + $i - 1;
185 16         60 my $code_generator = $services{$sid}{code}{generator};
186 16         111 my $code = $code_generator->($self->exec_notice, $port, $service->{tag});
187 16 50       78 if ( $services{$sid}{cron} ) {
188 0         0 $code = $self->cron_worker($code,$services{$sid}{cron});
189             }
190 16         33 $services{$sid}{code} = $code;
191 16 100       85 if ( $self->enable_log_worker ) {
192 15         36 $services{$sid}->{pipe} = $self->create_pipe;
193             };
194 16         127 push @services, $sid;
195             }
196             }
197 4 50       18 croak('no services exists') if ! @services;
198              
199 4 100       29 if ( $self->enable_log_worker ) {
200 3         30 $services{$LOGGER} = {
201             code => $self->log_worker(),
202             };
203 3         41 push @services, $LOGGER;
204             }
205              
206 4         25 my $next;
207             my %pid2service;
208 0         0 my %running;
209 0         0 my $wait_all_children;
210             my $pm = Parallel::Prefork->new({
211             spawn_interval => $self->spawn_interval,
212             err_respawn_interval => $self->err_respawn_interval,
213             max_workers => scalar @services,
214             trap_signals => {
215             TERM => 'TERM',
216             HUP => 'TERM',
217             INT => 'INT',
218             },
219             on_child_reap => sub {
220 12     12   5322304 my ( $pm, $exit_pid, $status ) = @_;
221 12         58 local $Log::Minimal::AUTODUMP = 1;
222 12 50       291 debugf "[Proclet] on child reap: exit_pid => %s status => %s, service => %s",
223             $exit_pid, $status, exists $pid2service{$exit_pid} ? $pid2service{$exit_pid} : 'undefined';
224 12 50       368 if ( exists $pid2service{$exit_pid} ) {
225 12         54 my $sid = $pid2service{$exit_pid};
226 12         69 delete $running{$sid};
227 12         130 delete $pid2service{$exit_pid};
228 12 50       63 if ( $wait_all_children ) {
229 12 100 66     244 if ( scalar keys %running == 1 && exists $running{$LOGGER} ) {
230 2         76 kill 'TERM', $running{$LOGGER};
231 2 100       1402367 sleep(1) && kill 'TERM', $running{$LOGGER}; #safe
232             }
233             }
234             }
235 12         223 debugf "[Proclet] on_child_reap: running => %s", \%running;
236             },
237              
238             before_fork => sub {
239 13     13   4008067 local $Log::Minimal::AUTODUMP = 1;
240 13         137 debugf "[Proclet] before_fork: running => %s", \%running;
241 13         328 my $pm = shift;
242 13 100 100     3294761 if ( $self->enable_log_worker && ! exists $running{$LOGGER} ) {
243 3         7 $next = $LOGGER;
244             }
245             else {
246 10         112 for my $sid ( @services ) {
247 26 100       386 if ( ! exists $running{$sid} ) {
248 10         66 $next = $sid;
249 10         52 last;
250             }
251             }
252             }
253 13         66 debugf "[Proclet] before_fork: next => %s", $next;
254             },
255             after_fork => sub {
256 12     12   455494 my ($pm, $pid) = @_;
257 12         319 local $Log::Minimal::AUTODUMP = 1;
258 12 50       94 if ( defined $next ) {
259 12         404 debugf "[Proclet] child start: sid =>%s", $next;
260 12         1106 $pid2service{$pid} = $next;
261 12         263 $running{$next} = $pid;
262             }
263             else {
264 0         0 debugf "[Proclet] child start but next is undefined";
265             }
266 12         83 $next = undef;
267             },
268 4         523 });
269              
270 4         1741 while ($pm->signal_received !~ m!^(?:TERM|INT)$! ) {
271             $pm->start( sub {
272 1 50   1   3874 if ( defined $next ) {
273 1         80 my $service = delete $services{$next};
274 1 50       55 if ( ! $self->enable_log_worker ) {
    50          
275 0         0 $service->{code}->();
276             }
277             elsif ( $service->{pipe} ) {
278 0         0 undef %services;
279 0         0 my $logwh = $service->{pipe}->[1];
280 0         0 close $service->{pipe}->[0];
281 0 0       0 open STDOUT, '>&', $logwh
282             or die "Died: failed to redirect STDOUT";
283 0 0       0 open STDERR, '>&', $logwh
284             or die "Died: failed to redirect STDERR";
285 0         0 $service->{code}->();
286             }
287             else {
288             # logworker
289 1         39 $service->{code}->(\%services);
290             }
291             }
292             else {
293 0         0 local $Log::Minimal::AUTODUMP = 1;
294 0         0 debugf "[Proclet] child (pid=>%s) start but next is undefined",$$;
295             }
296 4         150 });
297             }
298 3         15817769 $wait_all_children = 1;
299 3         73 $pm->wait_all_children();
300             }
301              
302             sub create_pipe {
303 15     15 0 40 my $self = shift;
304 15 50       1316 pipe my $logrh, my $logwh
305             or die "Died: failed to create pipe:$!";
306 15         104 return [$logrh, $logwh];
307             }
308              
309             sub log_worker {
310 3     3 0 27 my $self = shift;
311             sub {
312 1     1   23 local $Log::Minimal::AUTODUMP = 1;
313 1         7 my $services = shift;
314 1         8 my %fileno2sid;
315 1         222 my $s = IO::Select->new();
316 1         60 debugf "[Proclet] start log worker";
317 1         46 my $maxlen = 0;
318 1         29 for my $sid ( keys %$services ) {
319 5         94 close $services->{$sid}->{pipe}->[1];
320 5         15 my $rh = $services->{$sid}->{pipe}->[0];
321 5         85 $fileno2sid{fileno($rh)} = $sid;
322 5         52 $s->add($rh);
323 5 100       404 $maxlen = length($sid) if length($sid) > $maxlen;
324             }
325 1 50       13 $maxlen = 10 if $maxlen < 10;
326 1         6 my $loop = 0;
327             local $SIG{TERM} = $SIG{INT} = sub {
328 2         696949 $loop++;
329 1         180 };
330 1         7 while ( $loop < 2 ) {
331 18         1240 my @ready = $s->can_read(1);
332 18         13211287 foreach my $fh ( @ready ) {
333 6         600 my $sid = $fileno2sid{fileno($fh)};
334 6         428 my @lt = localtime;
335 6         118 sysread($fh, my $buf, 65536);
336 6         135 for my $log ( split /\r?\n/, $buf ) {
337 10         503 my $prefix = sprintf('%02d:%02d:%02d %-'.$maxlen.'s |',$lt[2],$lt[1],$lt[0], $sid);
338 10 50       64 $prefix = colored( $prefix, $services->{$sid}->{color} ) if $self->color;
339 10         23 chomp $log;
340 10         25 chomp $log;
341 10 50       767436 if ( $self->logger ) {
342 10         114 $self->logger->($prefix . ' ' . $log . "\n");
343             } else {
344 0         0 warn $prefix . ' ' . $log . "\n";
345             }
346             }
347             }
348             }
349 1         20 debugf "[Proclet] finished log worker";
350 3         107 };
351             }
352              
353             sub current_min {
354 0     0 0   my $time = time;
355 0           $time = $time - ($time % 60);
356 0           $time;
357             }
358              
359             sub cron_worker {
360 0     0 0   my ($self,$code,$cron) = @_;
361             sub {
362 0     0     debugf "[Proclet] start cron worker";
363 0           my $live = 1;
364 0           local $SIG{TERM} = sub { $live = 0 };
  0            
365 0           local $SIG{CHLD} = 'IGNORE';
366 0           my $prev = current_min();
367 0           select undef, undef, undef, 1; ## no critic;
368 0           while ( $live ) {
369 0           my $now;
370 0           while ( $live ) {
371 0           $now = current_min();
372 0 0         last if $now != $prev;
373 0           select undef, undef, undef, 1; ## no critic;
374             }
375 0 0         last unless $live;
376 0           $prev = $now;
377 0           debugf "[Proclet] check cron";
378 0 0         next unless $cron->($now);
379 0           debugf "[Proclet] cron match and start child worker!";
380 0           my $pid = fork;
381 0 0         if ( ! defined $pid ) {
    0          
382 0           die "Died: fork failed: $!";
383             }
384             elsif ( $pid == 0 ) {
385             #child
386 0           local $SIG{TERM} = 'DEFAULT';
387 0           local $SIG{CHLD} = 'DEFAULT';
388 0           $code->();
389 0           exit 0;
390             }
391             #parent
392             }
393 0           };
394             }
395              
396             __PACKAGE__->meta->make_immutable();
397             1;
398             __END__