File Coverage

blib/lib/App/derived.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package App::derived;
2              
3 1     1   23994 use strict;
  1         2  
  1         34  
4 1     1   5 use warnings;
  1         1  
  1         24  
5 1     1   25 use 5.008005;
  1         7  
  1         49  
6 1     1   1278 use File::Temp qw/tempfile/;
  1         29949  
  1         87  
7 1     1   1044 use File::Copy;
  1         2755  
  1         67  
8 1     1   557 use Proclet;
  0            
  0            
9             use JSON ();
10             use Log::Minimal;
11              
12             our $VERSION = '0.10';
13              
14             my $_JSON = JSON->new()
15             ->utf8(1)
16             ->shrink(1)
17             ->space_before(0)
18             ->space_after(0)
19             ->indent(0);
20              
21             sub new {
22             my $class = shift;
23             my %opt = ref $_[0] ? %{$_[0]} : @_;
24             my %args = (
25             proclet => Proclet->new(enable_log_worker => ($ENV{LM_DEBUG} ? 1 : 0)),
26             interval => 10,
27             host => 0,
28             port => 12306,
29             timeout => 10,
30             services => {},
31             %opt
32             );
33             bless \%args, $class;
34             }
35              
36             sub add_service {
37             my $self = shift;
38             my ($key, $cmd) = @_;
39             my ($tmpfh,$tmpfile) = tempfile(UNLINK=>0, EXLOCK=>0);
40             print $tmpfh $_JSON->encode({
41             status=>"INIT",
42             persec => '0E0',
43             latest => '0E0',
44             });
45             close $tmpfh;
46             $self->{services}->{$key} = {
47             cmd => ['bash', '-c', $cmd],
48             file => $tmpfile,
49             prev => undef,
50             };
51             infof("register service: %s", $key);
52             $self->{proclet}->service(
53             code => sub {
54             $0 = "$0 worker $key";
55             $self->worker($key);
56             exit;
57             },
58             tag => $key.'_worker',
59             );
60             }
61              
62             sub add_plugin {
63             my $self = shift;
64             my ( $plugin, $args) = @_;
65              
66             my %args = (
67             %$args,
68             _services => $self->{services},
69             _proclet => $self->{proclet},
70             );
71             infof("register plugin: %s", $plugin);
72             my $instance = $plugin->new(\%args);
73             $instance->init();
74             }
75              
76             sub run {
77             my $self = shift;
78             $self->{proclet}->run;
79             }
80              
81             sub DESTROY {
82             my $self = shift;
83             for my $key ( keys %{$self->{services}} ) {
84             unlink $self->{services}->{$key}->{file};
85             }
86             }
87              
88             sub worker {
89             my ($self, $service_key) = @_;
90             srand();
91             my $service = $self->{services}->{$service_key};
92             my $n = time;
93             $n = $n - ( $n % $self->{interval}) + $self->{interval} + int(rand($self->{interval}));; #next + random
94             my $stop = 1;
95             local $SIG{TERM} = sub { $stop = 0 };
96              
97             while ( $stop ) {
98             my $current = time();
99             while ( $n < $current ) {
100             $n = $n + $self->{interval};
101             }
102             while ( $stop ) {
103             last if time() >= $n;
104             select undef, undef, undef, 0.1 ## no critic;
105             }
106             $n = $n + $self->{interval};
107             local $Log::Minimal::AUTODUMP = 1;
108             debugf("exec command for %s => %s", $service_key, $service);
109             my ($result, $exit_code) = cap_cmd($service->{cmd});
110             debugf("command [%s]: exit_code:%s result:%s", $service_key, $exit_code, $result);
111             if ( ! defined $result ) {
112             atomic_write($service->{file}, {
113             status => "ERROR",
114             persec => undef,
115             latest => undef,
116             raw => undef,
117             exit_code => $exit_code,
118             last_update => time,
119             });
120             next;
121             }
122            
123             my $orig = $result;
124             $result =~ s!^[^0-9]+!!;
125             {
126             no warnings;
127             $result = int($result);
128             }
129             if ( ! defined $service->{prev} ) {
130             $service->{prev} = $result;
131             atomic_write( $service->{file}, {
132             status => "CALCURATE",
133             persec => "E0E",
134             latest => $result,
135             raw => $orig,
136             exit_code => $exit_code,
137             last_update => time,
138             });
139             next;
140             }
141             my $derive = ($result - $service->{prev}) / $self->{interval};
142             atomic_write( $service->{file}, {
143             status => "OK",
144             persec => $derive,
145             latest => $result,
146             raw => $orig,
147             exit_code => $exit_code,
148             last_update => time,
149             });
150             $service->{prev} = $result;
151             }
152             }
153              
154             sub cap_cmd {
155             my ($cmdref) = @_;
156             pipe my $logrh, my $logwh
157             or die "Died: failed to create pipe:$!";
158             my $pid = fork;
159             if ( ! defined $pid ) {
160             die "Died: fork failed: $!";
161             }
162              
163             elsif ( $pid == 0 ) {
164             #child
165             close $logrh;
166             open STDOUT, '>&', $logwh
167             or die "Died: failed to redirect STDOUT";
168             close $logwh;
169             exec @$cmdref;
170             die "Died: exec failed: $!";
171             }
172             close $logwh;
173             my $result;
174             while(<$logrh>){
175             chomp;chomp;
176             $result .= $_;
177             }
178             close $logrh;
179             while (wait == -1) {}
180             my $exit_code = $?;
181             $exit_code = $exit_code >> 8;
182             if ( $exit_code != 0 ) {
183             warnf("Error: command exited with code: $exit_code");
184             }
185             return ($result, $exit_code);
186             }
187              
188             sub atomic_write {
189             my ($writefile, $body) = @_;
190             my ($tmpfh,$tmpfile) = tempfile(UNLINK=>0);
191             print $tmpfh $_JSON->encode($body);
192             close($tmpfh);
193             move( $tmpfile, $writefile);
194             }
195              
196              
197             1;
198             __END__