File Coverage

blib/lib/MCE/Util.pm
Criterion Covered Total %
statement 105 216 48.6
branch 38 164 23.1
condition 11 30 36.6
subroutine 17 23 73.9
pod 1 1 100.0
total 172 434 39.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Utility functions.
4             ##
5             ###############################################################################
6              
7             package MCE::Util;
8              
9 107     107   836 use strict;
  107         203  
  107         3517  
10 107     107   545 use warnings;
  107         214  
  107         3422  
11              
12 107     107   520 no warnings qw( threads recursion uninitialized numeric );
  107         1470  
  107         7045  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17              
18 107     107   13891 use IO::Handle ();
  107         141611  
  107         2538  
19 107     107   13242 use Socket qw( AF_UNIX SOL_SOCKET SO_SNDBUF SO_RCVBUF );
  107         82963  
  107         8737  
20 107     107   5964 use Time::HiRes qw( sleep time );
  107         13531  
  107         756  
21 107     107   63304 use Errno ();
  107         151589  
  107         4219  
22 107     107   773 use base qw( Exporter );
  107         230  
  107         25989  
23              
24             my ($_is_winenv, $_zero_bytes, %_sock_ready);
25              
26             BEGIN {
27 107 50   107   1693 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
28 107         300418 $_zero_bytes = pack('L', 0);
29             }
30              
31             sub CLONE {
32 0     0   0 %_sock_ready = ();
33             }
34              
35             our $LF = "\012"; Internals::SvREADONLY($LF, 1);
36              
37             our @EXPORT_OK = qw( $LF get_ncpu );
38             our %EXPORT_TAGS = ( all => \@EXPORT_OK );
39              
40             ###############################################################################
41             ## ----------------------------------------------------------------------------
42             ## The get_ncpu subroutine, largely adopted from Test::Smoke::Util.pm,
43             ## returns the number of logical (online/active/enabled) CPU cores;
44             ## never smaller than one.
45             ##
46             ## A warning is emitted to STDERR when it cannot recognize the operating
47             ## system or the external command failed.
48             ##
49             ###############################################################################
50              
51             my $g_ncpu;
52              
53             sub get_ncpu {
54 66 100   66 1 235 return $g_ncpu if (defined $g_ncpu);
55              
56 61         651 local $ENV{PATH} = "/usr/sbin:/sbin:/usr/bin:/bin:$ENV{PATH}";
57 61         301 $ENV{PATH} =~ /(.*)/; $ENV{PATH} = $1; ## Remove tainted'ness
  61         288  
58              
59 61         160 my $ncpu = 1;
60              
61             OS_CHECK: {
62 61         112 local $_ = lc $^O;
  61         308  
63              
64 61 50       345 /linux/ && do {
65 61         138 my ( $count, $fh );
66 61 50       5106 if ( open $fh, '<', '/proc/stat' ) {
67 61         4998 $count = grep { /^cpu\d/ } <$fh>;
  1464         3147  
68 61         1008 close $fh;
69             }
70 61 50       334 $ncpu = $count if $count;
71 61         431 last OS_CHECK;
72             };
73              
74 0 0       0 /bsd|darwin|dragonfly/ && do {
75 0         0 chomp( my @output = `sysctl -n hw.ncpu 2>/dev/null` );
76 0 0       0 $ncpu = $output[0] if @output;
77 0         0 last OS_CHECK;
78             };
79              
80 0 0       0 /aix/ && do {
81 0         0 my @output = `lparstat -i 2>/dev/null | grep "^Online Virtual CPUs"`;
82 0 0       0 if ( @output ) {
83 0         0 $output[0] =~ /(\d+)\n$/;
84 0 0       0 $ncpu = $1 if $1;
85             }
86 0 0       0 if ( !$ncpu ) {
87 0         0 @output = `pmcycles -m 2>/dev/null`;
88 0 0       0 if ( @output ) {
89 0         0 $ncpu = scalar @output;
90             } else {
91 0         0 @output = `lsdev -Cc processor -S Available 2>/dev/null`;
92 0 0       0 $ncpu = scalar @output if @output;
93             }
94             }
95 0         0 last OS_CHECK;
96             };
97              
98 0 0       0 /gnu/ && do {
99 0         0 chomp( my @output = `nproc 2>/dev/null` );
100 0 0       0 $ncpu = $output[0] if @output;
101 0         0 last OS_CHECK;
102             };
103              
104 0 0       0 /haiku/ && do {
105 0         0 my @output = `sysinfo -cpu 2>/dev/null | grep "^CPU #"`;
106 0 0       0 $ncpu = scalar @output if @output;
107 0         0 last OS_CHECK;
108             };
109              
110 0 0       0 /hp-?ux/ && do {
111 0         0 my $count = grep { /^processor/ } `ioscan -fkC processor 2>/dev/null`;
  0         0  
112 0 0       0 $ncpu = $count if $count;
113 0         0 last OS_CHECK;
114             };
115              
116 0 0       0 /irix/ && do {
117 0         0 my @out = grep { /\s+processors?$/i } `hinv -c processor 2>/dev/null`;
  0         0  
118 0 0       0 $ncpu = (split ' ', $out[0])[0] if @out;
119 0         0 last OS_CHECK;
120             };
121              
122 0 0       0 /osf|solaris|sunos|svr5|sco/ && do {
123 0 0       0 if (-x '/usr/sbin/psrinfo') {
124 0         0 my $count = grep { /on-?line/ } `psrinfo 2>/dev/null`;
  0         0  
125 0 0       0 $ncpu = $count if $count;
126             }
127             else {
128 0         0 my @output = grep { /^NumCPU = \d+/ } `uname -X 2>/dev/null`;
  0         0  
129 0 0       0 $ncpu = (split ' ', $output[0])[2] if @output;
130             }
131 0         0 last OS_CHECK;
132             };
133              
134 0 0       0 /mswin|mingw|msys|cygwin/ && do {
135 0 0       0 if (exists $ENV{NUMBER_OF_PROCESSORS}) {
136 0         0 $ncpu = $ENV{NUMBER_OF_PROCESSORS};
137             }
138 0         0 last OS_CHECK;
139             };
140              
141 0         0 warn "MCE::Util::get_ncpu: command failed or unknown operating system\n";
142             }
143              
144 61 50 33     533 $ncpu = 1 if (!$ncpu || $ncpu < 1);
145              
146 61         588 return $g_ncpu = $ncpu;
147             }
148              
149             ###############################################################################
150             ## ----------------------------------------------------------------------------
151             ## Private methods for pipes and sockets.
152             ##
153             ###############################################################################
154              
155             sub _destroy_pipes {
156 193     193   1427 my ($_obj, @_params) = @_;
157 193         1521 local ($!,$?); local $SIG{__DIE__};
  193         980  
158              
159 193         911 for my $_p (@_params) {
160 386 50       1561 next unless (defined $_obj->{$_p});
161              
162 386 50       1648 if (ref $_obj->{$_p} eq 'ARRAY') {
163 0         0 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  0         0  
164 0 0       0 next unless (defined $_obj->{$_p}[$_i]);
165 0 0       0 close $_obj->{$_p}[$_i] if (fileno $_obj->{$_p}[$_i]);
166 0         0 undef $_obj->{$_p}[$_i];
167             }
168             }
169             else {
170 386 50       7554 close $_obj->{$_p} if (fileno $_obj->{$_p});
171 386         3134 undef $_obj->{$_p};
172             }
173             }
174              
175 193         1506 return;
176             }
177              
178             sub _destroy_socks {
179 110     110   2620 my ($_obj, @_params) = @_;
180 110         1798 local ($!,$?,$@); local $SIG{__DIE__};
  110         1113  
181              
182 110         1078 for my $_p (@_params) {
183 772 100       4203 next unless (defined $_obj->{$_p});
184              
185 606 100       3002 if (ref $_obj->{$_p} eq 'ARRAY') {
186 154         564 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  154         994  
187 478 50       1875 next unless (defined $_obj->{$_p}[$_i]);
188 478 50       2341 if (fileno $_obj->{$_p}[$_i]) {
189 478 50       1316 syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
190 478         35153 eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) };
191 478         12402 close $_obj->{$_p}[$_i];
192             }
193 478         3373 undef $_obj->{$_p}[$_i];
194             }
195             }
196             else {
197 452 50       2404 if (fileno $_obj->{$_p}) {
198 452 50       1334 syswrite($_obj->{$_p}, '0') if $_is_winenv;
199 452         42048 eval q{ CORE::shutdown($_obj->{$_p}, 2) };
200 452         12831 close $_obj->{$_p};
201             }
202 452         4383 undef $_obj->{$_p};
203             }
204             }
205              
206 110         1323 return;
207             }
208              
209             sub _pipe_pair {
210 431     431   1638 my ($_obj, $_r_sock, $_w_sock, $_i) = @_;
211 431         1803 local $!;
212              
213 431 50       1255 if (defined $_i) {
214             # remove tainted'ness
215 0         0 ($_i) = $_i =~ /(.*)/;
216 0 0       0 pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) or die "pipe: $!\n";
217 0         0 $_obj->{$_w_sock}[$_i]->autoflush(1);
218             }
219             else {
220 431 50       19785 pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) or die "pipe: $!\n";
221 431         3665 $_obj->{$_w_sock}->autoflush(1);
222             }
223              
224 431         24564 return;
225             }
226              
227             sub _sock_pair {
228 1011     1011   3589 my ($_obj, $_r_sock, $_w_sock, $_i, $_seq) = @_;
229 1011         1681 my $_size = 16384; local ($!, $@);
  1011         3114  
230              
231 1011 100       2528 if (defined $_i) {
232             # remove tainted'ness
233 517         3992 ($_i) = $_i =~ /(.*)/;
234              
235 517 100 66     25112 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
236             socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
237 391 50       21976 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
238 0 0       0 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
239             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
240             };
241             }
242             else {
243 126 50       7329 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
244             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
245             }
246              
247 517 50 33     5661 if ($^O ne 'aix' && $^O ne 'linux') {
248 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
249 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
250 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
251 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
252             }
253              
254 517         2746 $_obj->{$_r_sock}[$_i]->autoflush(1);
255 517         21222 $_obj->{$_w_sock}[$_i]->autoflush(1);
256             }
257             else {
258 494 100 66     29102 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
259             socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
260 465 50       29703 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
261 0 0       0 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
262             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
263             };
264             }
265             else {
266 29 50       2223 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
267             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
268             }
269              
270 494 50 33     5481 if ($^O ne 'aix' && $^O ne 'linux') {
271 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
272 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
273 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
274 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
275             }
276              
277 494         3303 $_obj->{$_r_sock}->autoflush(1);
278 494         22227 $_obj->{$_w_sock}->autoflush(1);
279             }
280              
281 1011         31572 return;
282             }
283              
284             sub _sock_ready {
285 0     0   0 my ($_socket, $_timeout) = @_;
286 0 0 0     0 return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1;
287              
288 0         0 my ($_val_bytes, $_delay, $_start) = (pack('L', 0), 0, time);
289              
290 0 0       0 if (!defined $_timeout) {
291 0         0 $_sock_ready{"$_socket"}++;
292             }
293             else {
294 0 0       0 $_timeout = undef if $_timeout < 0;
295 0 0       0 $_timeout += $_start if $_timeout;
296             }
297              
298 0         0 while (1) {
299             # MSWin32 FIONREAD - from winsock2.h macro
300 0         0 ioctl($_socket, 0x4004667f, $_val_bytes);
301              
302 0 0       0 return '' if $_val_bytes ne $_zero_bytes;
303 0 0 0     0 return 1 if $_timeout && time > $_timeout;
304              
305             # delay after a while to not consume a CPU core
306 0 0       0 sleep(0.015), next if $_delay;
307 0 0       0 $_delay = 1 if time - $_start > 0.030;
308             }
309             }
310              
311             sub _sock_ready_w {
312 0     0   0 my ($_socket) = @_;
313 0 0       0 return if $_sock_ready{"${_socket}_w"} > 1;
314              
315 0         0 my $_vec = '';
316 0         0 $_sock_ready{"${_socket}_w"}++;
317              
318 0         0 while (1) {
319 0         0 vec($_vec, fileno($_socket), 1) = 1;
320 0 0       0 return if select(undef, $_vec, undef, 0) > 0;
321 0         0 sleep 0.045;
322             }
323              
324 0         0 return;
325             }
326              
327             sub _sysread {
328             ( @_ == 3
329             ? CORE::sysread($_[0], $_[1], $_[2])
330             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
331             )
332 1278 50   1278   793442 or do {
    50          
333 0 0       0 goto \&_sysread if ($! == Errno::EINTR());
334             };
335             }
336              
337             sub _sysread2 {
338 0     0   0 my ($_bytes, $_delay, $_start);
339             # called by MCE/Core/Manager.pm
340              
341             SYSREAD: $_bytes = ( @_ == 3
342             ? CORE::sysread($_[0], $_[1], $_[2])
343             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
344             )
345 0 0       0 or do {
    0          
346 0 0       0 unless ( defined $_bytes ) {
347 0 0       0 goto SYSREAD if ($! == Errno::EINTR());
348              
349             # non-blocking operation could not be completed
350 0 0 0     0 if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) {
351 0 0       0 sleep(0.015), goto SYSREAD if $_delay;
352              
353             # delay after a while to not consume a CPU core
354 0 0       0 $_start = time unless $_start;
355 0 0       0 $_delay = 1 if time - $_start > 0.030;
356              
357 0         0 goto SYSREAD;
358             }
359             }
360             };
361              
362 0         0 return $_bytes;
363             }
364              
365             sub _nonblocking {
366 1194 50   1194   4849 if ($^O eq 'MSWin32') {
367             # MSWin32 FIONBIO - from winsock2.h macro
368 0 0       0 my $nonblocking = $_[1] ? pack('L', 1) : pack('L', 0);
369 0         0 ioctl($_[0], 0x8004667e, $nonblocking);
370             }
371             else {
372 1194 100       11845 $_[0]->blocking( $_[1] ? 0 : 1 );
373             }
374              
375 1194         4060 return;
376             }
377              
378             ###############################################################################
379             ## ----------------------------------------------------------------------------
380             ## Private methods, providing high-resolution time, for MCE->yield,
381             ## MCE::Child->yield, and MCE::Hobo->yield.
382             ##
383             ###############################################################################
384              
385             ## Use monotonic clock if available.
386              
387 107         352 use constant CLOCK_MONOTONIC => eval {
388 107         1142 Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() );
389 107         36765 1;
390 107     107   946 };
  107         281  
391              
392             sub _sleep {
393 0     0     my ( $seconds ) = @_;
394 0 0         return if ( $seconds < 0 );
395              
396 0 0         if ( $INC{'Coro/AnyEvent.pm'} ) {
    0          
    0          
397 0           Coro::AnyEvent::sleep( $seconds );
398             }
399             elsif ( &Time::HiRes::d_nanosleep ) {
400 0           Time::HiRes::nanosleep( $seconds * 1e9 );
401             }
402             elsif ( &Time::HiRes::d_usleep ) {
403 0           Time::HiRes::usleep( $seconds * 1e6 );
404             }
405             else {
406 0           Time::HiRes::sleep( $seconds );
407             }
408              
409 0           return;
410             }
411              
412             sub _time {
413 0     0     return ( CLOCK_MONOTONIC )
414             ? Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() )
415             : Time::HiRes::time();
416             }
417              
418             1;
419              
420             __END__