File Coverage

blib/lib/Proc/Forkmap.pm
Criterion Covered Total %
statement 66 118 55.9
branch 14 56 25.0
condition 5 15 33.3
subroutine 7 10 70.0
pod 2 2 100.0
total 94 201 46.7


line stmt bran cond sub pod time code
1             package Proc::Forkmap;
2 5     5   816687 use IO::Select;
  5         11946  
  5         333  
3 5     5   3085 use POSIX ":sys_wait_h";
  5         52833  
  5         49  
4 5     5   15521 use File::Temp qw(tempfile);
  5         156050  
  5         593  
5 5     5   45 use strict;
  5         26  
  5         214  
6 5     5   38 use warnings;
  5         10  
  5         353  
7 5     5   71 use v5.10;
  5         16  
8             require Exporter;
9              
10             our @ISA = qw(Exporter);
11             our @EXPORT = qw/
12             forkmap_settings
13             forkmap
14             /;
15              
16             our $VERSION = '0.2305';
17              
18             my $MAX_PROCS = 4;
19             my $TIMEOUT = 0;
20             my $IPC = 1;
21             my $TEMPFILE_DIR;
22              
23             sub forkmap_settings {
24 0     0 1 0 my %opts = @_;
25 0 0       0 $MAX_PROCS = $opts{MAX_PROCS} if exists $opts{MAX_PROCS};
26 0 0       0 $TIMEOUT = $opts{TIMEOUT} if exists $opts{TIMEOUT};
27 0 0       0 $IPC = $opts{IPC} if exists $opts{IPC};
28 0 0       0 $TEMPFILE_DIR = $opts{TEMPFILE_DIR} if exists $opts{TEMPFILE_DIR};
29             }
30              
31             sub forkmap (&@) {
32 4     4 1 942012 my $fn = shift;
33 4 50       28 my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
34 4         12 my @args = @_;
35              
36 4 50       16 unless ($IPC) {
37 0         0 my $procs = 0;
38 0         0 my $i = 0;
39              
40 0   0     0 while ($i <= $#args || $procs > 0) {
41 0   0     0 while ($procs < $MAX_PROCS && $i <= $#args) {
42 0         0 my $idx = $i;
43 0         0 my $pid = fork();
44 0 0       0 die "bad fork: $!" unless defined $pid;
45 0 0       0 if ($pid == 0) {
46 0 0       0 if ($TIMEOUT) {
47 0     0   0 local $SIG{ALRM} = sub { die "timeout: pid [$$]" };
  0         0  
48 0         0 alarm $TIMEOUT;
49 0         0 eval {
50 0         0 local $_ = $args[$idx];
51 0         0 $fn->($_);
52 0         0 alarm 0;
53             };
54 0 0       0 if ($@) {
55 0         0 die "$@";
56             }
57             } else {
58 0         0 local $_ = $args[$idx];
59 0         0 $fn->($_);
60             }
61 0 0       0 $cb->($idx) if $cb;
62 0         0 exit 0;
63             }
64 0         0 $procs++;
65 0         0 $i++;
66             }
67              
68 0         0 while (waitpid(-1, WNOHANG) > 0) {
69 0         0 $procs--;
70             }
71 0 0 0     0 select undef, undef, undef, 0.1 if $procs >= $MAX_PROCS || $i > $#args;
72             }
73 0         0 return;
74             }
75              
76 4         48 my $sel = IO::Select->new();
77 4         64 my @res;
78 4         8 my $procs = 0;
79 4         8 my $i = 0;
80              
81 4   100     24 while ($i <= $#args || $procs > 0) {
82 25082   66     91621 while ($procs < $MAX_PROCS && $i <= $#args) {
83 9 50       683 pipe(my $r, my $w) or die "bad pipe: $!";
84 9         80 my $idx = $i;
85 9         31587 my $pid = fork();
86 9 50       905 die "bad fork: $!" unless defined $pid;
87 9 100       590 if ($pid == 0) {
88 3         435 close $r;
89 3         402 my ($fh, $fname) = tempfile(DIR => $TEMPFILE_DIR, UNLINK => 0);
90 3 50       48984 if ($TIMEOUT) {
91 0     0   0 local $SIG{ALRM} = sub { die "timeout: pid [$$] index [$idx]" };
  0         0  
92 0         0 alarm $TIMEOUT;
93 0         0 eval {
94 0         0 local $_ = $args[$idx];
95 0         0 my $t = $fn->($_);
96 0 0       0 print $fh $t if defined $t;
97 0         0 alarm 0;
98             };
99 0 0       0 if ($@) {
100 0 0       0 close $fh if defined fileno $fh;
101 0 0       0 unlink $fname if defined $fname;
102 0         0 die "$@";
103             }
104             } else {
105 3         75 eval {
106 3         48 local $_ = $args[$idx];
107 3         231 my $t = $fn->($_);
108 3 50       123 print $fh $t if defined $t;
109             };
110 3 50       70 if ($@) {
111 0 0       0 close $fh if defined fileno $fh;
112 0 0       0 unlink $fname if defined $fname;
113 0         0 die "$@";
114             }
115             }
116 3         370 close $fh;
117 3         94 print $w "$idx:$fname\n";
118 3         74 close $w;
119 3         1637 exit 0;
120             }
121 6         773 close $w;
122 6         815 $sel->add($r);
123 6         2049 $procs++;
124 6         570 $i++;
125             }
126              
127 25079         63891 for my $fh ($sel->can_read(0.1)) {
128 6         9047 my $line = <$fh>;
129 6 100       58 if (defined $line) {
130 3         19 chomp $line;
131 3         114 my ($idx, $fname) = split /:/, $line, 2;
132 3 50       207 open my $f, "<", $fname or do {
133 0         0 warn "bad file [$fname]: $!";
134 0         0 $res[$idx] = undef;
135 0         0 next;
136             };
137 3         25 my $d = do {local $/; <$f>};
  3         65  
  3         144  
138 3         67 close $f;
139 3         14427 unlink $fname;
140              
141 3 50       87 if ($cb) {
142 0         0 $cb->($idx, $d);
143             } else {
144 3         47 $res[$idx] = $d;
145             }
146             } else {
147 3         22 $sel->remove($fh);
148 3         338 close $fh;
149             }
150             }
151 25079         347985 while (waitpid(-1, WNOHANG) > 0) {
152 3         36 $procs--;
153             }
154             }
155              
156 1 50       94 return $cb ? undef : @res;
157             }
158              
159             1;
160              
161             __END__