File Coverage

blib/lib/MCE/Util.pm
Criterion Covered Total %
statement 105 219 47.9
branch 38 166 22.8
condition 9 24 37.5
subroutine 17 23 73.9
pod 1 1 100.0
total 170 433 39.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Utility functions.
4             ##
5             ###############################################################################
6              
7             package MCE::Util;
8              
9 113     113   715 use strict;
  113         219  
  113         4246  
10 113     113   537 use warnings;
  113         177  
  113         7419  
11              
12 113     113   584 no warnings qw( threads recursion uninitialized numeric );
  113         221  
  113         7401  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17              
18 113     113   11315 use IO::Handle ();
  113         122442  
  113         2858  
19 113     113   10518 use Socket qw( AF_UNIX SOL_SOCKET SO_SNDBUF SO_RCVBUF );
  113         112649  
  113         10024  
20 113     113   654 use Time::HiRes qw( sleep time );
  113         163  
  113         738  
21 113     113   58806 use Errno ();
  113         207135  
  113         3878  
22 113     113   716 use base qw( Exporter );
  113         168  
  113         27303  
23              
24             my ($_is_winenv, $_zero_bytes, %_sock_ready);
25              
26             BEGIN {
27 113 50   113   1046 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
28 113         379850 $_zero_bytes = pack('L', 0);
29             }
30              
31             sub CLONE {
32 0     0   0 %_sock_ready = ();
33             }
34              
35             our $LF = "\012"; Internals::SvREADONLY($LF, 1);
36              
37             our @EXPORT_OK = qw( $LF get_ncpu );
38             our %EXPORT_TAGS = ( all => \@EXPORT_OK );
39              
40             ###############################################################################
41             ## ----------------------------------------------------------------------------
42             ## The get_ncpu subroutine, largely adopted from Test::Smoke::Util.pm,
43             ## returns the number of logical (online/active/enabled) CPU cores;
44             ## never smaller than one.
45             ##
46             ## A warning is emitted to STDERR when it cannot recognize the operating
47             ## system or the external command failed.
48             ##
49             ###############################################################################
50              
51             my $g_ncpu;
52              
53             sub get_ncpu {
54 75 100   75 1 385 return $g_ncpu if (defined $g_ncpu);
55              
56 70         1192 local $ENV{PATH} = "/usr/sbin:/sbin:/usr/bin:/bin:$ENV{PATH}";
57 70         465 $ENV{PATH} =~ /(.*)/; $ENV{PATH} = $1; ## Remove tainted'ness
  70         418  
58              
59 70         169 my $ncpu = 1;
60              
61             OS_CHECK: {
62 70         123 local $_ = lc $^O;
  70         335  
63              
64 70 50       689 /linux|android/ && do {
65 70         162 my ( $count, $fh );
66 70 50       6082 if ( open $fh, '<', '/proc/stat' ) {
    0          
67 70         8342 $count = grep { /^cpu\d/ } <$fh>;
  1470         3329  
68 70         1050 close $fh;
69             }
70             elsif ( open $fh, '<', '/proc/cpuinfo' ) {
71 0         0 $count = grep { /^processor/ } <$fh>;
  0         0  
72 0         0 close $fh;
73             }
74 70 50       354 $ncpu = $count if $count;
75 70         526 last OS_CHECK;
76             };
77              
78 0 0       0 /bsd|darwin|dragonfly/ && do {
79 0         0 chomp( my @output = `sysctl -n hw.ncpu 2>/dev/null` );
80 0 0       0 $ncpu = $output[0] if @output;
81 0         0 last OS_CHECK;
82             };
83              
84 0 0       0 /aix/ && do {
85 0         0 my @output = `lparstat -i 2>/dev/null | grep "^Online Virtual CPUs"`;
86 0 0       0 if ( @output ) {
87 0         0 $output[0] =~ /(\d+)\n$/;
88 0 0       0 $ncpu = $1 if $1;
89             }
90 0 0       0 if ( !$ncpu ) {
91 0         0 @output = `pmcycles -m 2>/dev/null`;
92 0 0       0 if ( @output ) {
93 0         0 $ncpu = scalar @output;
94             } else {
95 0         0 @output = `lsdev -Cc processor -S Available 2>/dev/null`;
96 0 0       0 $ncpu = scalar @output if @output;
97             }
98             }
99 0         0 last OS_CHECK;
100             };
101              
102 0 0       0 /gnu/ && do {
103 0         0 chomp( my @output = `nproc 2>/dev/null` );
104 0 0       0 $ncpu = $output[0] if @output;
105 0         0 last OS_CHECK;
106             };
107              
108 0 0       0 /haiku/ && do {
109 0         0 my @output = `sysinfo -cpu 2>/dev/null | grep "^CPU #"`;
110 0 0       0 $ncpu = scalar @output if @output;
111 0         0 last OS_CHECK;
112             };
113              
114 0 0       0 /hp-?ux/ && do {
115 0         0 my $count = grep { /^processor/ } `ioscan -fkC processor 2>/dev/null`;
  0         0  
116 0 0       0 $ncpu = $count if $count;
117 0         0 last OS_CHECK;
118             };
119              
120 0 0       0 /irix/ && do {
121 0         0 my @out = grep { /\s+processors?$/i } `hinv -c processor 2>/dev/null`;
  0         0  
122 0 0       0 $ncpu = (split ' ', $out[0])[0] if @out;
123 0         0 last OS_CHECK;
124             };
125              
126 0 0       0 /osf|solaris|sunos|svr5|sco/ && do {
127 0 0       0 if (-x '/usr/sbin/psrinfo') {
128 0         0 my $count = grep { /on-?line/ } `psrinfo 2>/dev/null`;
  0         0  
129 0 0       0 $ncpu = $count if $count;
130             }
131             else {
132 0         0 my @output = grep { /^NumCPU = \d+/ } `uname -X 2>/dev/null`;
  0         0  
133 0 0       0 $ncpu = (split ' ', $output[0])[2] if @output;
134             }
135 0         0 last OS_CHECK;
136             };
137              
138 0 0       0 /mswin|mingw|msys|cygwin/ && do {
139 0 0       0 if (exists $ENV{NUMBER_OF_PROCESSORS}) {
140 0         0 $ncpu = $ENV{NUMBER_OF_PROCESSORS};
141             }
142 0         0 last OS_CHECK;
143             };
144              
145 0         0 warn "MCE::Util::get_ncpu: command failed or unknown operating system\n";
146             }
147              
148 70 50 33     536 $ncpu = 1 if (!$ncpu || $ncpu < 1);
149              
150 70         646 return $g_ncpu = $ncpu;
151             }
152              
153             ###############################################################################
154             ## ----------------------------------------------------------------------------
155             ## Private methods for pipes and sockets.
156             ##
157             ###############################################################################
158              
159             sub _destroy_pipes {
160 211     211   1033 my ($_obj, @_params) = @_;
161 211         815 local ($!,$?); local $SIG{__DIE__};
  211         925  
162              
163 211         1448 for my $_p (@_params) {
164 422 50       21239 next unless (defined $_obj->{$_p});
165              
166 422 50       984 if (ref $_obj->{$_p} eq 'ARRAY') {
167 0         0 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  0         0  
168 0 0       0 next unless (defined $_obj->{$_p}[$_i]);
169 0 0       0 close $_obj->{$_p}[$_i] if (fileno $_obj->{$_p}[$_i]);
170 0         0 undef $_obj->{$_p}[$_i];
171             }
172             }
173             else {
174 422 50       5548 close $_obj->{$_p} if (fileno $_obj->{$_p});
175 422         2048 undef $_obj->{$_p};
176             }
177             }
178              
179 211         1142 return;
180             }
181              
182             sub _destroy_socks {
183 110     110   1855 my ($_obj, @_params) = @_;
184 110         1087 local ($!,$?,$@); local $SIG{__DIE__};
  110         5058  
185              
186 110         696 for my $_p (@_params) {
187 814 100       2968 next unless (defined $_obj->{$_p});
188              
189 642 100       3222 if (ref $_obj->{$_p} eq 'ARRAY') {
190 166         335 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  166         1121  
191 538 50       1416 next unless (defined $_obj->{$_p}[$_i]);
192 538 50       1428 if (fileno $_obj->{$_p}[$_i]) {
193 538 50       1135 syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
194 538         32020 eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) };
195 538         10267 close $_obj->{$_p}[$_i];
196             }
197 538         2762 undef $_obj->{$_p}[$_i];
198             }
199             }
200             else {
201 476 50       1542 if (fileno $_obj->{$_p}) {
202 476 50       1096 syswrite($_obj->{$_p}, '0') if $_is_winenv;
203 476         36311 eval q{ CORE::shutdown($_obj->{$_p}, 2) };
204 476         13418 close $_obj->{$_p};
205             }
206 476         2872 undef $_obj->{$_p};
207             }
208             }
209              
210 110         1112 return;
211             }
212              
213             sub _pipe_pair {
214 473     473   1264 my ($_obj, $_r_sock, $_w_sock, $_i) = @_;
215 473         1572 local $!;
216              
217 473 50       909 if (defined $_i) {
218             # remove tainted'ness
219 0         0 ($_i) = $_i =~ /(.*)/;
220 0 0       0 pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) or die "pipe: $!\n";
221 0         0 $_obj->{$_w_sock}[$_i]->autoflush(1);
222             }
223             else {
224 473 50       19282 pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) or die "pipe: $!\n";
225 473         3341 $_obj->{$_w_sock}->autoflush(1);
226             }
227              
228 473         22446 return;
229             }
230              
231             sub _sock_pair {
232 1111     1111   3428 my ($_obj, $_r_sock, $_w_sock, $_i, $_seq) = @_;
233 1111         1491 my $_size = 16384; local ($!, $@);
  1111         2808  
234              
235 1111 100       2816 if (defined $_i) {
236             # remove tainted'ness
237 587         2816 ($_i) = $_i =~ /(.*)/;
238              
239 587 100 66     26964 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
240             socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
241 447 50       25774 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
242 0 0       0 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
243             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
244             };
245             }
246             else {
247 140 50       7345 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
248             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
249             }
250              
251 587 50       5701 if ($^O !~ /aix|linux|android/) {
252 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
253 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
254 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
255 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
256             }
257              
258 587         2776 $_obj->{$_r_sock}[$_i]->autoflush(1);
259 587         21965 $_obj->{$_w_sock}[$_i]->autoflush(1);
260             }
261             else {
262 524 100 66     31115 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
263             socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
264 501 50       28505 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
265 0 0       0 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
266             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
267             };
268             }
269             else {
270 23 50       1666 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
271             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
272             }
273              
274 524 50       5602 if ($^O !~ /aix|linux|android/) {
275 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
276 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
277 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
278 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
279             }
280              
281 524         2885 $_obj->{$_r_sock}->autoflush(1);
282 524         19628 $_obj->{$_w_sock}->autoflush(1);
283             }
284              
285 1111         30886 return;
286             }
287              
288             sub _sock_ready {
289 0     0   0 my ($_socket, $_timeout) = @_;
290 0 0 0     0 return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1;
291              
292 0         0 my ($_val_bytes, $_delay, $_start) = (pack('L', 0), 0, time);
293              
294 0 0       0 if (!defined $_timeout) {
295 0         0 $_sock_ready{"$_socket"}++;
296             }
297             else {
298 0 0       0 $_timeout = undef if $_timeout < 0;
299 0 0       0 $_timeout += $_start if $_timeout;
300             }
301              
302 0         0 while (1) {
303             # MSWin32 FIONREAD - from winsock2.h macro
304 0         0 ioctl($_socket, 0x4004667f, $_val_bytes);
305              
306 0 0       0 return '' if $_val_bytes ne $_zero_bytes;
307 0 0 0     0 return 1 if $_timeout && time > $_timeout;
308              
309             # delay after a while to not consume a CPU core
310 0 0       0 sleep(0.015), next if $_delay;
311 0 0       0 $_delay = 1 if time - $_start > 0.030;
312             }
313             }
314              
315             sub _sock_ready_w {
316 0     0   0 my ($_socket) = @_;
317 0 0       0 return if $_sock_ready{"${_socket}_w"} > 1;
318              
319 0         0 my $_vec = '';
320 0         0 $_sock_ready{"${_socket}_w"}++;
321              
322 0         0 while (1) {
323 0         0 vec($_vec, fileno($_socket), 1) = 1;
324 0 0       0 return if select(undef, $_vec, undef, 0) > 0;
325 0         0 sleep 0.045;
326             }
327              
328 0         0 return;
329             }
330              
331             sub _sysread {
332             ( @_ == 3
333             ? CORE::sysread($_[0], $_[1], $_[2])
334             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
335             )
336 1321 50   1321   1214216 or do {
    50          
337 0 0       0 goto \&_sysread if ($! == Errno::EINTR());
338             };
339             }
340              
341             sub _sysread2 {
342 0     0   0 my ($_bytes, $_delay, $_start);
343             # called by MCE/Core/Manager.pm
344              
345             SYSREAD: $_bytes = ( @_ == 3
346             ? CORE::sysread($_[0], $_[1], $_[2])
347             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
348             )
349 0 0       0 or do {
    0          
350 0 0       0 unless ( defined $_bytes ) {
351 0 0       0 goto SYSREAD if ($! == Errno::EINTR());
352              
353             # non-blocking operation could not be completed
354 0 0 0     0 if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) {
355 0 0       0 sleep(0.015), goto SYSREAD if $_delay;
356              
357             # delay after a while to not consume a CPU core
358 0 0       0 $_start = time unless $_start;
359 0 0       0 $_delay = 1 if time - $_start > 0.030;
360              
361 0         0 goto SYSREAD;
362             }
363             }
364             };
365              
366 0         0 return $_bytes;
367             }
368              
369             sub _nonblocking {
370 2216 50   2216   8233 if ($^O eq 'MSWin32') {
371             # MSWin32 FIONBIO - from winsock2.h macro
372 0 0       0 my $nonblocking = $_[1] ? pack('L', 1) : pack('L', 0);
373 0         0 ioctl($_[0], 0x8004667e, $nonblocking);
374             }
375             else {
376 2216 100       20955 $_[0]->blocking( $_[1] ? 0 : 1 );
377             }
378              
379 2216         4542 return;
380             }
381              
382             ###############################################################################
383             ## ----------------------------------------------------------------------------
384             ## Private methods, providing high-resolution time, for MCE->yield,
385             ## MCE::Child->yield, and MCE::Hobo->yield.
386             ##
387             ###############################################################################
388              
389             ## Use monotonic clock if available.
390              
391 113         377 use constant CLOCK_MONOTONIC => eval {
392 113         1491 Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() );
393 113         43897 1;
394 113     113   1081 };
  113         1156  
395              
396             sub _sleep {
397 0     0     my ( $seconds ) = @_;
398 0 0         return if ( $seconds < 0 );
399              
400 0 0         if ( $INC{'Coro/AnyEvent.pm'} ) {
    0          
    0          
401 0           Coro::AnyEvent::sleep( $seconds );
402             }
403             elsif ( &Time::HiRes::d_nanosleep ) {
404 0           Time::HiRes::nanosleep( $seconds * 1e9 );
405             }
406             elsif ( &Time::HiRes::d_usleep ) {
407 0           Time::HiRes::usleep( $seconds * 1e6 );
408             }
409             else {
410 0           Time::HiRes::sleep( $seconds );
411             }
412              
413 0           return;
414             }
415              
416             sub _time {
417 0     0     return ( CLOCK_MONOTONIC )
418             ? Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() )
419             : Time::HiRes::time();
420             }
421              
422             1;
423              
424             __END__
425              
426             ###############################################################################
427             ## ----------------------------------------------------------------------------
428             ## Module usage.
429             ##
430             ###############################################################################
431              
432             =head1 NAME
433              
434             MCE::Util - Utility functions
435              
436             =head1 VERSION
437              
438             This document describes MCE::Util version 1.902
439              
440             =head1 SYNOPSIS
441              
442             use MCE::Util;
443              
444             =head1 DESCRIPTION
445              
446             A utility module for MCE. Nothing is exported by default. Exportable is
447             get_ncpu.
448              
449             =head2 get_ncpu()
450              
451             Returns the number of logical (online/active/enabled) CPU cores; never smaller
452             than one.
453              
454             my $ncpu = MCE::Util::get_ncpu();
455              
456             Specifying 'auto' for max_workers calls MCE::Util::get_ncpu automatically.
457             MCE 1.521 sets an upper-limit when specifying 'auto'. The reason is mainly
458             to safeguard apps from spawning 100 workers on a box having 100 cores.
459             This is important for apps which are IO-bound.
460              
461             use MCE;
462              
463             ## 'Auto' is the total # of logical cores (lcores) (8 maximum, MCE 1.521).
464             ## The computed value will not exceed the # of logical cores on the box.
465              
466             my $mce = MCE->new(
467              
468             max_workers => 'auto', ## 1 on HW with 1-lcores; 2 on 2-lcores
469             max_workers => 16, ## 16 on HW with 4-lcores; 16 on 32-lcores
470              
471             max_workers => 'auto', ## 4 on HW with 4-lcores; 8 on 16-lcores
472             max_workers => 'auto*1.5', ## 4 on HW with 4-lcores; 12 on 16-lcores
473             max_workers => 'auto*2.0', ## 4 on HW with 4-lcores; 16 on 16-lcores
474             max_workers => 'auto/2.0', ## 2 on HW with 4-lcores; 4 on 16-lcores
475             max_workers => 'auto+3', ## 4 on HW with 4-lcores; 11 on 16-lcores
476             max_workers => 'auto-1', ## 3 on HW with 4-lcores; 7 on 16-lcores
477              
478             max_workers => MCE::Util::get_ncpu, ## run on all lcores
479             );
480              
481             In summary:
482              
483             1. Auto has an upper-limit of 8 in MCE 1.521 (# of lcores, 8 maximum)
484             2. Math may be applied with auto (*/+-) to change the upper limit
485             3. The computed value for auto will not exceed the total # of lcores
486             4. One can specify max_workers explicitly to a hard value
487             5. MCE::Util::get_ncpu returns the actual # of lcores
488              
489             =head1 ACKNOWLEDGMENTS
490              
491             The portable code for detecting the number of processors was adopted from
492             L<Test::Smoke::SysInfo>.
493              
494             =head1 INDEX
495              
496             L<MCE|MCE>, L<MCE::Core>
497              
498             =head1 AUTHOR
499              
500             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
501              
502             =cut
503