File Coverage

blib/lib/Parallel/ForkManager/Scaled.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Parallel::ForkManager::Scaled;
2 8     8   216831 use Moo;
  8         38571  
  8         43  
3 8     8   8575 use namespace::clean;
  8         65061  
  8         26  
4 8     8   2314 use Unix::Statgrab;
  0            
  0            
5             use List::Util qw( min max );
6             use Storable qw( freeze thaw );
7              
8             use 5.010;
9              
10             our $VERSION = '0.17';
11              
12             extends 'Parallel::ForkManager';
13              
14             has hard_min_procs => ( is => 'rw', lazy => 1, builder => 1 );
15             has hard_max_procs => ( is => 'rw', lazy => 1, builder => 1 );
16             has soft_min_procs => ( is => 'rw', lazy => 1, builder => 1, trigger => 1 );
17             has soft_max_procs => ( is => 'rw', lazy => 1, builder => 1, trigger => 1 );
18             has initial_procs => ( is => 'lazy' );
19             has update_frequency => ( is => 'rw', default => 1 );
20             has idle_target => ( is => 'rw', default => 0 );
21             has idle_threshold => ( is => 'rw', default => 1 );
22             has run_on_update => ( is => 'rw', clearer => 1, predicate => 1 );
23              
24             has last_update => ( is => 'rwp', default => sub{ time } );
25             has _stats_pct => ( is => 'rw', clearer => 1, predicate => 1, handles => [ qw( idle ) ] );
26             has _host_info => ( is => 'rw', clearer => 1, predicate => 1, lazy => 1, builder => 1, handles => [ qw( ncpus ) ] );
27             has _last_stats => ( is => 'rw', clearer => 1, predicate => 1, default => sub{ get_cpu_stats } );
28              
29             has __unstorable => ( is => 'ro', init_arg => undef, default => sub{[qw( _stats_pct _host_info _last_stats )]} );
30              
31             #
32             # Once Parallel::ForkManager has converted to Moo (in development)
33             # this will no longer be necessary. Probably. :)
34             #
35             sub FOREIGNBUILDARGS {
36             my ($class, @args) = @_;
37             my @ret;
38              
39             my $args = @args > 1 ? {@args} : $args[0];
40              
41             push @ret, 1; # will get changed later in BUILD()
42             push @ret, $args->{tempdir} if defined $args->{tempdir};
43              
44             @ret;
45             }
46              
47             sub BUILD {
48             my $self = shift;
49             $self->set_max_procs(min($self->soft_max_procs, max($self->soft_min_procs, $self->initial_procs)));
50             $self->update_stats_pct;
51             };
52              
53             sub _build_hard_min_procs { 1 }
54             sub _build_hard_max_procs { (shift->ncpus // 1) * 2 }
55             sub _build_soft_min_procs { shift->hard_min_procs };
56             sub _build_soft_max_procs { shift->hard_max_procs };
57             sub _build__host_info { get_host_info }
58              
59             # pick a value half way between our soft min and max
60             sub _build_initial_procs {
61             my $self = shift;
62             $self->hard_min_procs+int(($self->soft_max_procs-$self->soft_min_procs)/2);
63             }
64              
65             # soft min cannot be less than hard min
66             sub _trigger_soft_min_procs {
67             my ($self, $newval) = @_;
68              
69             $self->soft_min_procs($self->hard_min_procs)
70             if $newval < $self->hard_min_procs;
71             }
72              
73             # soft max cannot exceed hard_max
74             sub _trigger_soft_max_procs {
75             my ($self, $newval) = @_;
76              
77             $self->soft_max_procs($self->hard_max_procs)
78             if $newval > $self->hard_max_procs;
79             }
80              
81             sub update_stats_pct {
82             my $self = shift;
83              
84             my $stats = get_cpu_stats;
85             my $pcts = $stats->get_cpu_stats_diff($self->_last_stats)->get_cpu_percents;
86              
87             # Not enough time has elapsed to get a difference, libstatgrab returned NaN
88             # Allow it initially to get _stats_pct set but not after
89             return if $self->_stats_pct && $pcts->idle eq 'NaN';
90              
91             $self->_stats_pct($pcts);
92             $self->_last_stats($stats);
93             $self->_set_last_update(time);
94             }
95              
96             #
97             # (Possibly) adjust our max_procs before the call to start().
98             #
99             before start => sub {
100             my $self = shift;
101            
102             return if time - $self->last_update < $self->update_frequency;
103              
104             $self->update_stats_pct;
105              
106             my $new_procs;
107             my $min_ok = max( 0, $self->idle_target - $self->idle_threshold);
108             my $max_ok = min(100, $self->idle_target + $self->idle_threshold);
109              
110             #
111             # It's possible for idle to be NaN if not enough time has elapsed between
112             # the initial call to update_stats_pct and the latest call. In this case
113             # neither check against $self->idle will be true and no update will occur
114             #
115             if ($self->idle >= $max_ok && $self->running_procs >= $self->max_procs) {
116             $new_procs = $self->adjust_up;
117              
118             } elsif ($self->idle <= $min_ok) {
119             $new_procs = $self->adjust_down;
120             }
121              
122             my $prev_procs = $self->max_procs;
123              
124             $self->set_max_procs($new_procs)
125             if $new_procs;
126              
127             $self->run_on_update->($self, $prev_procs)
128             if ($self->run_on_update && ref($self->run_on_update) eq 'CODE');
129             };
130              
131             #
132             # constrain max_procs to be within our soft min and max
133             #
134             around set_max_procs => sub {
135             my ($orig, $self, $new_val) = @_;
136              
137             $orig->($self,
138             min( $self->soft_max_procs, max($self->soft_min_procs, $new_val)
139             )
140             );
141             };
142              
143             sub stats {
144             my $self = shift;
145             my $prev_procs = shift // $self->max_procs;
146              
147             sprintf(
148             "%5.1f id %3d run %3d omax %3d nmax %3d smin %3d smax %3d hmin %3d hmax",
149             $self->idle,
150             scalar($self->running_procs),
151             $prev_procs,
152             $self->max_procs,
153             $self->soft_min_procs,
154             $self->soft_max_procs,
155             $self->hard_min_procs,
156             $self->hard_max_procs
157             );
158             }
159              
160             sub dump_stats {
161             my $self = shift;
162             print STDERR $self->stats(@_)."\n";
163             }
164              
165             #
166             # Increase soft_max_procs to a maximum of hard_max_procs
167             #
168             # We'll use the system's idle percentage to tell us how much
169             # to increase by, the more idle the system is, the more we'll
170             # allow soft_max_procs to grow. Hopefully this will allow us
171             # to quickly adjust to the system without over-loading it if
172             # it's already close to our target idle state
173             #
174             sub adjust_soft_max {
175             my $self = shift;
176             $self->soft_max_procs(
177             min($self->hard_max_procs,
178             $self->soft_max_procs
179             + max(1, int(
180             ($self->hard_max_procs - $self->max_procs)
181             * ($self->idle - $self->idle_target)
182             / 100
183             ))
184             )
185             );
186             }
187              
188             #
189             # Decrease soft_min_procs, the system is too busy
190             #
191             sub adjust_soft_min {
192             my $self = shift;
193             $self->soft_min_procs(
194             max($self->hard_min_procs,
195             $self->hard_min_procs
196             + max(0, int(
197             ($self->max_procs - $self->hard_min_procs)
198             * ($self->idle_target - $self->idle)
199             / 100
200             ))
201             )
202             );
203             }
204              
205             #
206             # Adjust our number of running processes (max_procs) to half way between
207             # the current number and our soft max. If we're already at
208             # soft max, try to adjust the soft max up first.
209             #
210             # Set the soft min to the current number of running procs
211             # as it wasn't enough to hit our idle target so we shouldn't
212             # go below it again (although we can if we actually need to).
213             #
214             sub adjust_up {
215             my $self = shift;
216             my $cur = $self->max_procs;
217              
218             my $max = $cur >= $self->soft_max_procs
219             ? $self->adjust_soft_max
220             : $self->soft_max_procs;
221              
222             $self->soft_min_procs($cur);
223             $cur + max(1,int(($max - $cur)/2));
224             }
225              
226             sub adjust_down {
227             my $self = shift;
228             my $cur = $self->max_procs;
229              
230             my $min = $cur <= $self->soft_min_procs
231             ? $self->adjust_soft_min
232             : $self->soft_min_procs;
233              
234             # Shouldn't happen, but test for it anyway
235             return undef unless $cur > $min;
236              
237             $self->soft_max_procs($cur);
238             $min + int(($cur - $min)/2);
239             }
240              
241              
242             #
243             # libstatgrab doesn't like freeze/thaw (saw assertion errors from vector.c)
244             # so we need to set those # attributes that house Unix::Statgrab objects to
245             # undef before # being frozen. Restore them after freezing.
246             #
247             # Also, freeze/thaw can't handle CODE references so we'll clear
248             # our run_on_update hook. There will still be problems with the
249             # underlying Parallel::ForkManager hooks but I'm not going to
250             # try to fix those here. That should be handled by Parallel::ForkManager
251             # I believe.
252             #
253             sub STORABLE_freeze {
254             my ($self, $cloning) = @_;
255             state $storing = 0;
256             return if $cloning || $storing;
257              
258             # libstatgrab isn't happy when it's frozen / thawed
259             my %save;
260             for (@{$self->__unstorable}) {
261             $save{$_} = $self->$_;
262             $self->$_(undef);
263             }
264             $save{run_on_update} = $self->run_on_update;
265             $self->clear_run_on_update;
266              
267             $storing = 1;
268             my $ret = freeze($self);
269             $storing = 0;
270              
271             $self->$_($save{$_}) for @{$self->__unstorable};
272             $self->run_on_update($save{run_on_update});
273              
274             $ret;
275             };
276              
277             #
278             # Since our Unix::Statgrab objects are all lazily built, they were
279             # set to undef before freeze(). We need to clear them in thaw() so
280             # they can be re-built. Not perfect but should keep things working
281             #
282             # We will have lost the run_on_update hook if it was set, but nothing
283             # to be done about that.
284             #
285             sub STORABLE_thaw {
286             my ($self, $cloning, $data) = @_;
287             state $thawing = 0;
288              
289             return if $cloning || $thawing;
290              
291             $thawing = 1;
292             %$self = %{thaw($data)};
293              
294             eval "\$self->_clear$_" for @{$self->__unstorable};
295              
296             # And this non-hidden code ref
297             $self->clear_run_on_update;
298              
299             $thawing = 0;
300             }
301              
302             1;
303              
304             __END__