File Coverage

blib/lib/Parallel/Benchmark.pm
Criterion Covered Total %
statement 126 132 95.4
branch 13 16 81.2
condition 2 3 66.6
subroutine 25 28 89.2
pod 2 2 100.0
total 168 181 92.8


line stmt bran cond sub pod time code
1             package Parallel::Benchmark;
2 21     21   2777960 use strict;
  21         51  
  21         874  
3 21     21   115 use warnings;
  21         180  
  21         2318  
4             our $VERSION = '0.11';
5              
6 21     21   12466 use Mouse;
  21         778035  
  21         6714  
7 21     21   60082 use Log::Minimal;
  21         892678  
  21         196  
8 21     21   3959 use Time::HiRes qw/ tv_interval gettimeofday /;
  21         55  
  21         218  
9 21     21   15763 use Parallel::ForkManager 1.12;
  21         2208697  
  21         1140  
10 21     21   10443 use Parallel::Scoreboard;
  21         76871  
  21         815  
11 21     21   162 use File::Temp qw/ tempdir /;
  21         54  
  21         1726  
12 21     21   129 use POSIX qw/ SIGUSR1 SIGUSR2 SIGTERM /;
  21         36  
  21         162  
13 21     21   12757 use Try::Tiny;
  21         31093  
  21         1512  
14 21     21   144 use Scalar::Util qw/ blessed /;
  21         38  
  21         31435  
15              
16             has benchmark => (
17             is => "rw",
18             isa => "CodeRef",
19             default => sub { sub { return 1 } },
20             );
21              
22             has setup => (
23             is => "rw",
24             isa => "CodeRef",
25             default => sub { sub { } },
26             );
27              
28             has teardown => (
29             is => "rw",
30             isa => "CodeRef",
31             default => sub { sub { } },
32             );
33              
34             has time => (
35             is => "rw",
36             isa => "Int",
37             default => 3,
38             );
39              
40             has concurrency => (
41             is => "rw",
42             isa => "Int",
43             default => 1,
44             );
45              
46             has debug => (
47             is => "rw",
48             isa => "Bool",
49             default => 0,
50             trigger => sub {
51             my ($self, $val) = @_;
52             $ENV{LM_DEBUG} = $val;
53             },
54             );
55              
56             has stash => (
57             is => "rw",
58             isa => "HashRef",
59             default => sub { +{} },
60             );
61              
62             has scoreboard => (
63             is => "rw",
64             default => sub {
65             my $dir = tempdir( CLEANUP => 1 );
66             Parallel::Scoreboard->new( base_dir => $dir );
67             },
68             );
69              
70             sub run {
71 20     20 1 224 my $self = shift;
72              
73 20 50       345 local $Log::Minimal::COLOR = 1
74             if -t *STDERR; ## no critic
75             local $Log::Minimal::PRINT = sub {
76 81     81   39293 my ( $time, $type, $message, $trace) = @_;
77 81         17458 warn "$time [$$] [$type] $message\n";
78 20         258 };
79              
80 20         268 infof "starting benchmark: concurrency: %d, time: %d",
81             $self->concurrency, $self->time;
82              
83 20         436 my $pm = Parallel::ForkManager->new( $self->concurrency );
84 20         108993 $pm->set_waitpid_blocking_sleep(0); # true blocking calls enabled
85 20         115 my $result = {
86             score => 0,
87             elapsed => 0,
88             stashes => {},
89             };
90             $pm->run_on_finish(
91             sub {
92 15     15   3346125 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
93 15 50       128 if (defined $data) {
94 15         78 $result->{score} += $data->[1];
95 15         76 $result->{elapsed} += $data->[2];
96 15         259 $result->{stashes}->{ $data->[0] } = $data->[3];
97             }
98             }
99 20         241 );
100 20         226 my $pids = {};
101             local $SIG{INT} = $SIG{TERM} = sub {
102 0     0   0 infof "terminating benchmark processes...";
103 0         0 kill SIGTERM, keys %$pids;
104 0         0 $pm->wait_all_children;
105 0         0 exit;
106 20         704 };
107              
108             CHILD:
109 20         200 for my $n ( 1 .. $self->concurrency ) {
110 46         534 my $pid = $pm->start;
111 46 100       185382 if ($pid) {
112             # parent
113 31         918 $pids->{$pid} = 1;
114 31         752 next CHILD;
115             }
116             else {
117             # child
118 15     0   8369 local $SIG{INT} = $SIG{TERM} = sub { exit };
  0         0  
119 15         1464 debugf "spwan child process[%d]", $n;
120 15         698 my $r = $self->_run_on_child($n);
121 15         763 $pm->finish(0, $r);
122 0         0 exit;
123             }
124             }
125              
126 5         644 $self->_wait_for_finish_setup($pids);
127              
128 5         443 kill SIGUSR1, keys %$pids;
129 5         92 my $start = [gettimeofday];
130             try {
131             my $teardown = sub {
132 4         9664375 alarm 0;
133 4         247 kill SIGUSR2, keys %$pids;
134 4         209 $pm->wait_all_children;
135 4         242 die;
136 5     5   465 };
137 5         130 local $SIG{INT} = $teardown;
138 5         111 local $SIG{ALRM} = $teardown;
139 5         172 alarm $self->time;
140 5         315 $pm->wait_all_children;
141 1         65 alarm 0;
142 5         174 };
143              
144 5         240 $result->{elapsed} = tv_interval($start);
145              
146             infof "done benchmark: score %s, elapsed %.3f sec = %.3f / sec",
147             $result->{score},
148             $result->{elapsed},
149             $result->{score} / $result->{elapsed},
150 5         220 ;
151 5         261 $result;
152             }
153              
154             sub _run_on_child {
155 15     15   288 my $self = shift;
156 15         219 my $n = shift;
157              
158 15         154 my $r = [ $n, 0, 0, {} ];
159             try {
160 15     15   10829 $self->scoreboard->update("setup_start");
161 15         36405 $self->setup->( $self, $n );
162 14         4003607 $self->scoreboard->update("setup_done");
163 14         1582 $r = $self->_run_benchmark_on_child($n);
164 13         651 $self->teardown->( $self, $n );
165             }
166             catch {
167 3     3   2000316 my $e = $_;
168 3         85 critf "benchmark process[%d] died: %s", $n, $e;
169 15         4529 };
170 15         548 return $r;
171             }
172              
173             sub _wait_for_finish_setup {
174 5     5   181 my $self = shift;
175 5         115 my $pids = shift;
176 5         84 while (1) {
177 9         9003676 sleep 1;
178 9         304 debugf "waiting for all children finish setup()";
179 9         326 my $stats = $self->scoreboard->read_all();
180 9         9227 my $done = 0;
181 9         165 for my $pid (keys %$pids) {
182 29 100       238 if (my $s = $stats->{$pid}) {
    50          
183 28 100       180 $done++ if $s eq "setup_done";
184             }
185             elsif ( kill(0, $pid) == 1 ) {
186             # maybe died...
187 1         6 delete $pids->{$pid};
188             }
189             }
190 9 100       132 last if $done == keys %$pids;
191             }
192             }
193              
194             sub _run_benchmark_on_child {
195 14     14   1625 my $self = shift;
196 14         86 my $n = shift;
197              
198 14         105 my ($wait, $run) = (1, 1);
199 14     14   730 local $SIG{USR1} = sub { $wait = 0 };
  14         323  
200 14     11   5088 local $SIG{USR2} = sub { $run = 0 };
  11         31029128  
201 14     0   960 local $SIG{INT} = sub {};
202              
203 14         21929315 sleep 1 while $wait;
204              
205 14         524 debugf "starting benchmark process[%d]", $n;
206              
207 14         710 my $benchmark = $self->benchmark;
208 14         537 my $score = 0;
209 14         356 my $start = [gettimeofday];
210              
211             try {
212 14     14   1986 $score += $benchmark->( $self, $n ) while $run;
213             }
214             catch {
215 3     3   91 my $e = $_;
216 3         18 my $class = blessed $e;
217 3 100 66     119 if ( $class && $class eq __PACKAGE__ . "::HaltedException" ) {
218 2         47 infof "benchmark process[%d] halted: %s", $n, $$e;
219             }
220             else {
221 1         54 die $e;
222             }
223 14         971 };
224              
225 13         2927 my $elapsed = tv_interval($start);
226              
227 13         674 debugf "done benchmark process[%d]: score %s, elapsed %.3f sec.",
228             $n, $score, $elapsed;
229              
230 13         791 return [ $n, $score, $elapsed, $self->stash ];
231             }
232              
233             sub halt {
234 2     2 1 153 my $self = shift;
235 2         35 my $msg = shift;
236 2         132 die bless \$msg, __PACKAGE__ . "::HaltedException";
237             }
238              
239             1;
240             __END__