File Coverage

blib/lib/MCE/Util.pm
Criterion Covered Total %
statement 105 219 47.9
branch 38 166 22.8
condition 9 24 37.5
subroutine 17 23 73.9
pod 1 1 100.0
total 170 433 39.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Utility functions.
4             ##
5             ###############################################################################
6              
7             package MCE::Util;
8              
9 107     107   741 use strict;
  107         193  
  107         3303  
10 107     107   567 use warnings;
  107         217  
  107         3076  
11              
12 107     107   483 no warnings qw( threads recursion uninitialized numeric );
  107         975  
  107         6030  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17              
18 107     107   12933 use IO::Handle ();
  107         141673  
  107         2559  
19 107     107   13144 use Socket qw( AF_UNIX SOL_SOCKET SO_SNDBUF SO_RCVBUF );
  107         83352  
  107         8465  
20 107     107   5991 use Time::HiRes qw( sleep time );
  107         13640  
  107         584  
21 107     107   59496 use Errno ();
  107         141174  
  107         3447  
22 107     107   851 use base qw( Exporter );
  107         194  
  107         23550  
23              
24             my ($_is_winenv, $_zero_bytes, %_sock_ready);
25              
26             BEGIN {
27 107 50   107   1336 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
28 107         295807 $_zero_bytes = pack('L', 0);
29             }
30              
31             sub CLONE {
32 0     0   0 %_sock_ready = ();
33             }
34              
35             our $LF = "\012"; Internals::SvREADONLY($LF, 1);
36              
37             our @EXPORT_OK = qw( $LF get_ncpu );
38             our %EXPORT_TAGS = ( all => \@EXPORT_OK );
39              
40             ###############################################################################
41             ## ----------------------------------------------------------------------------
42             ## The get_ncpu subroutine, largely adopted from Test::Smoke::Util.pm,
43             ## returns the number of logical (online/active/enabled) CPU cores;
44             ## never smaller than one.
45             ##
46             ## A warning is emitted to STDERR when it cannot recognize the operating
47             ## system or the external command failed.
48             ##
49             ###############################################################################
50              
51             my $g_ncpu;
52              
53             sub get_ncpu {
54 66 100   66 1 233 return $g_ncpu if (defined $g_ncpu);
55              
56 61         618 local $ENV{PATH} = "/usr/sbin:/sbin:/usr/bin:/bin:$ENV{PATH}";
57 61         289 $ENV{PATH} =~ /(.*)/; $ENV{PATH} = $1; ## Remove tainted'ness
  61         288  
58              
59 61         133 my $ncpu = 1;
60              
61             OS_CHECK: {
62 61         108 local $_ = lc $^O;
  61         274  
63              
64 61 50       548 /linux|android/ && do {
65 61         127 my ( $count, $fh );
66 61 50       4369 if ( open $fh, '<', '/proc/stat' ) {
    0          
67 61         4283 $count = grep { /^cpu\d/ } <$fh>;
  1464         2963  
68 61         850 close $fh;
69             }
70             elsif ( open $fh, '<', '/proc/cpuinfo' ) {
71 0         0 $count = grep { /^processor/ } <$fh>;
  0         0  
72 0         0 close $fh;
73             }
74 61 50       333 $ncpu = $count if $count;
75 61         489 last OS_CHECK;
76             };
77              
78 0 0       0 /bsd|darwin|dragonfly/ && do {
79 0         0 chomp( my @output = `sysctl -n hw.ncpu 2>/dev/null` );
80 0 0       0 $ncpu = $output[0] if @output;
81 0         0 last OS_CHECK;
82             };
83              
84 0 0       0 /aix/ && do {
85 0         0 my @output = `lparstat -i 2>/dev/null | grep "^Online Virtual CPUs"`;
86 0 0       0 if ( @output ) {
87 0         0 $output[0] =~ /(\d+)\n$/;
88 0 0       0 $ncpu = $1 if $1;
89             }
90 0 0       0 if ( !$ncpu ) {
91 0         0 @output = `pmcycles -m 2>/dev/null`;
92 0 0       0 if ( @output ) {
93 0         0 $ncpu = scalar @output;
94             } else {
95 0         0 @output = `lsdev -Cc processor -S Available 2>/dev/null`;
96 0 0       0 $ncpu = scalar @output if @output;
97             }
98             }
99 0         0 last OS_CHECK;
100             };
101              
102 0 0       0 /gnu/ && do {
103 0         0 chomp( my @output = `nproc 2>/dev/null` );
104 0 0       0 $ncpu = $output[0] if @output;
105 0         0 last OS_CHECK;
106             };
107              
108 0 0       0 /haiku/ && do {
109 0         0 my @output = `sysinfo -cpu 2>/dev/null | grep "^CPU #"`;
110 0 0       0 $ncpu = scalar @output if @output;
111 0         0 last OS_CHECK;
112             };
113              
114 0 0       0 /hp-?ux/ && do {
115 0         0 my $count = grep { /^processor/ } `ioscan -fkC processor 2>/dev/null`;
  0         0  
116 0 0       0 $ncpu = $count if $count;
117 0         0 last OS_CHECK;
118             };
119              
120 0 0       0 /irix/ && do {
121 0         0 my @out = grep { /\s+processors?$/i } `hinv -c processor 2>/dev/null`;
  0         0  
122 0 0       0 $ncpu = (split ' ', $out[0])[0] if @out;
123 0         0 last OS_CHECK;
124             };
125              
126 0 0       0 /osf|solaris|sunos|svr5|sco/ && do {
127 0 0       0 if (-x '/usr/sbin/psrinfo') {
128 0         0 my $count = grep { /on-?line/ } `psrinfo 2>/dev/null`;
  0         0  
129 0 0       0 $ncpu = $count if $count;
130             }
131             else {
132 0         0 my @output = grep { /^NumCPU = \d+/ } `uname -X 2>/dev/null`;
  0         0  
133 0 0       0 $ncpu = (split ' ', $output[0])[2] if @output;
134             }
135 0         0 last OS_CHECK;
136             };
137              
138 0 0       0 /mswin|mingw|msys|cygwin/ && do {
139 0 0       0 if (exists $ENV{NUMBER_OF_PROCESSORS}) {
140 0         0 $ncpu = $ENV{NUMBER_OF_PROCESSORS};
141             }
142 0         0 last OS_CHECK;
143             };
144              
145 0         0 warn "MCE::Util::get_ncpu: command failed or unknown operating system\n";
146             }
147              
148 61 50 33     509 $ncpu = 1 if (!$ncpu || $ncpu < 1);
149              
150 61         455 return $g_ncpu = $ncpu;
151             }
152              
153             ###############################################################################
154             ## ----------------------------------------------------------------------------
155             ## Private methods for pipes and sockets.
156             ##
157             ###############################################################################
158              
159             sub _destroy_pipes {
160 193     193   1086 my ($_obj, @_params) = @_;
161 193         916 local ($!,$?); local $SIG{__DIE__};
  193         674  
162              
163 193         782 for my $_p (@_params) {
164 386 50       1155 next unless (defined $_obj->{$_p});
165              
166 386 50       1298 if (ref $_obj->{$_p} eq 'ARRAY') {
167 0         0 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  0         0  
168 0 0       0 next unless (defined $_obj->{$_p}[$_i]);
169 0 0       0 close $_obj->{$_p}[$_i] if (fileno $_obj->{$_p}[$_i]);
170 0         0 undef $_obj->{$_p}[$_i];
171             }
172             }
173             else {
174 386 50       5711 close $_obj->{$_p} if (fileno $_obj->{$_p});
175 386         2391 undef $_obj->{$_p};
176             }
177             }
178              
179 193         1229 return;
180             }
181              
182             sub _destroy_socks {
183 110     110   1922 my ($_obj, @_params) = @_;
184 110         1258 local ($!,$?,$@); local $SIG{__DIE__};
  110         814  
185              
186 110         708 for my $_p (@_params) {
187 772 100       3029 next unless (defined $_obj->{$_p});
188              
189 606 100       2586 if (ref $_obj->{$_p} eq 'ARRAY') {
190 154         398 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  154         671  
191 478 50       1467 next unless (defined $_obj->{$_p}[$_i]);
192 478 50       1659 if (fileno $_obj->{$_p}[$_i]) {
193 478 50       923 syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
194 478         25488 eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) };
195 478         8242 close $_obj->{$_p}[$_i];
196             }
197 478         2848 undef $_obj->{$_p}[$_i];
198             }
199             }
200             else {
201 452 50       1797 if (fileno $_obj->{$_p}) {
202 452 50       1075 syswrite($_obj->{$_p}, '0') if $_is_winenv;
203 452         32816 eval q{ CORE::shutdown($_obj->{$_p}, 2) };
204 452         9875 close $_obj->{$_p};
205             }
206 452         3688 undef $_obj->{$_p};
207             }
208             }
209              
210 110         959 return;
211             }
212              
213             sub _pipe_pair {
214 431     431   1422 my ($_obj, $_r_sock, $_w_sock, $_i) = @_;
215 431         1573 local $!;
216              
217 431 50       1025 if (defined $_i) {
218             # remove tainted'ness
219 0         0 ($_i) = $_i =~ /(.*)/;
220 0 0       0 pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) or die "pipe: $!\n";
221 0         0 $_obj->{$_w_sock}[$_i]->autoflush(1);
222             }
223             else {
224 431 50       17291 pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) or die "pipe: $!\n";
225 431         3206 $_obj->{$_w_sock}->autoflush(1);
226             }
227              
228 431         21824 return;
229             }
230              
231             sub _sock_pair {
232 1011     1011   2977 my ($_obj, $_r_sock, $_w_sock, $_i, $_seq) = @_;
233 1011         1454 my $_size = 16384; local ($!, $@);
  1011         2725  
234              
235 1011 100       2425 if (defined $_i) {
236             # remove tainted'ness
237 517         3043 ($_i) = $_i =~ /(.*)/;
238              
239 517 100 66     20787 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
240             socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
241 391 50       18059 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
242 0 0       0 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
243             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
244             };
245             }
246             else {
247 126 50       6590 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
248             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
249             }
250              
251 517 50       5804 if ($^O !~ /aix|linux|android/) {
252 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
253 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
254 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
255 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
256             }
257              
258 517         2588 $_obj->{$_r_sock}[$_i]->autoflush(1);
259 517         18809 $_obj->{$_w_sock}[$_i]->autoflush(1);
260             }
261             else {
262 494 100 66     26078 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
263             socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
264 465 50       25422 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
265 0 0       0 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
266             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
267             };
268             }
269             else {
270 29 50       1944 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
271             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
272             }
273              
274 494 50       6193 if ($^O !~ /aix|linux|android/) {
275 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
276 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
277 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
278 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
279             }
280              
281 494         2782 $_obj->{$_r_sock}->autoflush(1);
282 494         20655 $_obj->{$_w_sock}->autoflush(1);
283             }
284              
285 1011         28632 return;
286             }
287              
288             sub _sock_ready {
289 0     0   0 my ($_socket, $_timeout) = @_;
290 0 0 0     0 return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1;
291              
292 0         0 my ($_val_bytes, $_delay, $_start) = (pack('L', 0), 0, time);
293              
294 0 0       0 if (!defined $_timeout) {
295 0         0 $_sock_ready{"$_socket"}++;
296             }
297             else {
298 0 0       0 $_timeout = undef if $_timeout < 0;
299 0 0       0 $_timeout += $_start if $_timeout;
300             }
301              
302 0         0 while (1) {
303             # MSWin32 FIONREAD - from winsock2.h macro
304 0         0 ioctl($_socket, 0x4004667f, $_val_bytes);
305              
306 0 0       0 return '' if $_val_bytes ne $_zero_bytes;
307 0 0 0     0 return 1 if $_timeout && time > $_timeout;
308              
309             # delay after a while to not consume a CPU core
310 0 0       0 sleep(0.015), next if $_delay;
311 0 0       0 $_delay = 1 if time - $_start > 0.030;
312             }
313             }
314              
315             sub _sock_ready_w {
316 0     0   0 my ($_socket) = @_;
317 0 0       0 return if $_sock_ready{"${_socket}_w"} > 1;
318              
319 0         0 my $_vec = '';
320 0         0 $_sock_ready{"${_socket}_w"}++;
321              
322 0         0 while (1) {
323 0         0 vec($_vec, fileno($_socket), 1) = 1;
324 0 0       0 return if select(undef, $_vec, undef, 0) > 0;
325 0         0 sleep 0.045;
326             }
327              
328 0         0 return;
329             }
330              
331             sub _sysread {
332             ( @_ == 3
333             ? CORE::sysread($_[0], $_[1], $_[2])
334             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
335             )
336 1283 50   1283   627572 or do {
    50          
337 0 0       0 goto \&_sysread if ($! == Errno::EINTR());
338             };
339             }
340              
341             sub _sysread2 {
342 0     0   0 my ($_bytes, $_delay, $_start);
343             # called by MCE/Core/Manager.pm
344              
345             SYSREAD: $_bytes = ( @_ == 3
346             ? CORE::sysread($_[0], $_[1], $_[2])
347             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
348             )
349 0 0       0 or do {
    0          
350 0 0       0 unless ( defined $_bytes ) {
351 0 0       0 goto SYSREAD if ($! == Errno::EINTR());
352              
353             # non-blocking operation could not be completed
354 0 0 0     0 if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) {
355 0 0       0 sleep(0.015), goto SYSREAD if $_delay;
356              
357             # delay after a while to not consume a CPU core
358 0 0       0 $_start = time unless $_start;
359 0 0       0 $_delay = 1 if time - $_start > 0.030;
360              
361 0         0 goto SYSREAD;
362             }
363             }
364             };
365              
366 0         0 return $_bytes;
367             }
368              
369             sub _nonblocking {
370 1208 50   1208   4765 if ($^O eq 'MSWin32') {
371             # MSWin32 FIONBIO - from winsock2.h macro
372 0 0       0 my $nonblocking = $_[1] ? pack('L', 1) : pack('L', 0);
373 0         0 ioctl($_[0], 0x8004667e, $nonblocking);
374             }
375             else {
376 1208 100       11659 $_[0]->blocking( $_[1] ? 0 : 1 );
377             }
378              
379 1208         3585 return;
380             }
381              
382             ###############################################################################
383             ## ----------------------------------------------------------------------------
384             ## Private methods, providing high-resolution time, for MCE->yield,
385             ## MCE::Child->yield, and MCE::Hobo->yield.
386             ##
387             ###############################################################################
388              
389             ## Use monotonic clock if available.
390              
391 107         319 use constant CLOCK_MONOTONIC => eval {
392 107         815 Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() );
393 107         33695 1;
394 107     107   938 };
  107         239  
395              
396             sub _sleep {
397 0     0     my ( $seconds ) = @_;
398 0 0         return if ( $seconds < 0 );
399              
400 0 0         if ( $INC{'Coro/AnyEvent.pm'} ) {
    0          
    0          
401 0           Coro::AnyEvent::sleep( $seconds );
402             }
403             elsif ( &Time::HiRes::d_nanosleep ) {
404 0           Time::HiRes::nanosleep( $seconds * 1e9 );
405             }
406             elsif ( &Time::HiRes::d_usleep ) {
407 0           Time::HiRes::usleep( $seconds * 1e6 );
408             }
409             else {
410 0           Time::HiRes::sleep( $seconds );
411             }
412              
413 0           return;
414             }
415              
416             sub _time {
417 0     0     return ( CLOCK_MONOTONIC )
418             ? Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() )
419             : Time::HiRes::time();
420             }
421              
422             1;
423              
424             __END__