line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Parallel::Fork::BossWorker; |
2
|
|
|
|
|
|
|
# |
3
|
|
|
|
|
|
|
# $Id: BossWorker.pm 11 2011-07-16 15:49:45Z twilde $ |
4
|
|
|
|
|
|
|
# |
5
|
|
|
|
|
|
|
|
6
|
6
|
|
|
6
|
|
149496
|
use 5.008008; |
|
6
|
|
|
|
|
24
|
|
|
6
|
|
|
|
|
228
|
|
7
|
6
|
|
|
6
|
|
36
|
use strict; |
|
6
|
|
|
|
|
6
|
|
|
6
|
|
|
|
|
258
|
|
8
|
6
|
|
|
6
|
|
30
|
use warnings; |
|
6
|
|
|
|
|
66
|
|
|
6
|
|
|
|
|
156
|
|
9
|
6
|
|
|
6
|
|
30
|
use Carp; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
630
|
|
10
|
6
|
|
|
6
|
|
7290
|
use Data::Dumper qw(Dumper); |
|
6
|
|
|
|
|
73500
|
|
|
6
|
|
|
|
|
498
|
|
11
|
6
|
|
|
6
|
|
8244
|
use IO::Handle; |
|
6
|
|
|
|
|
49020
|
|
|
6
|
|
|
|
|
282
|
|
12
|
6
|
|
|
6
|
|
6006
|
use IO::Select; |
|
6
|
|
|
|
|
13086
|
|
|
6
|
|
|
|
|
7452
|
|
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
# Perl module variables |
15
|
|
|
|
|
|
|
our @ISA = qw(); |
16
|
|
|
|
|
|
|
our $VERSION = '0.05'; |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub new { |
19
|
6
|
|
|
6
|
1
|
2796
|
my $class = shift; |
20
|
6
|
|
|
|
|
30
|
my %values = @_; |
21
|
|
|
|
|
|
|
|
22
|
6
|
|
50
|
|
|
162
|
my $self = { |
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
23
|
|
|
|
|
|
|
result_handler => $values{result_handler} || undef, # Method for handling output of the workers |
24
|
|
|
|
|
|
|
worker_count => $values{worker_count} || 10, # Number of workers |
25
|
|
|
|
|
|
|
global_timeout => $values{global_timeout} || 0, # Number of seconds before the worker terminates the job, 0 for unlimited |
26
|
|
|
|
|
|
|
work_handler => $values{work_handler}, # Handler which will process the data from the boss |
27
|
|
|
|
|
|
|
work_queue => [], |
28
|
|
|
|
|
|
|
msg_delimiter => $values{msg_delimiter} || "\0\0\0", |
29
|
|
|
|
|
|
|
select => IO::Select->new(), |
30
|
|
|
|
|
|
|
}; |
31
|
6
|
|
|
|
|
96
|
$self->{msg_delimiter_length} = length($self->{msg_delimiter}); |
32
|
6
|
|
33
|
|
|
42
|
bless $self, ref($class) || $class; |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# The work handler is required |
35
|
6
|
50
|
|
|
|
54
|
if (not defined $self->{work_handler}) { |
36
|
0
|
|
|
|
|
0
|
croak("Parameters \`work_handler' is required."); |
37
|
|
|
|
|
|
|
} |
38
|
|
|
|
|
|
|
|
39
|
6
|
|
|
|
|
30
|
return $self; |
40
|
|
|
|
|
|
|
} |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
sub add_work(\@) { |
43
|
48
|
|
|
48
|
1
|
1872
|
my $self = shift; |
44
|
48
|
|
|
|
|
42
|
my $work = shift; |
45
|
48
|
|
|
|
|
54
|
unshift (@{ $self->{work_queue} }, $work); |
|
48
|
|
|
|
|
96
|
|
46
|
|
|
|
|
|
|
} |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub process { |
49
|
6
|
|
|
6
|
1
|
36
|
my $self = shift; |
50
|
6
|
|
|
|
|
12
|
my $handler = shift; |
51
|
|
|
|
|
|
|
|
52
|
6
|
|
|
|
|
6
|
eval { |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
# If a worker dies, there's a problem |
55
|
|
|
|
|
|
|
local $SIG{CHLD} = sub { |
56
|
5
|
|
|
5
|
|
660847
|
my $pid = wait(); |
57
|
5
|
50
|
|
|
|
173
|
if (defined $self->{workers}->{$pid}) { |
58
|
0
|
|
|
|
|
0
|
confess("Worker $pid died."); |
59
|
|
|
|
|
|
|
} |
60
|
6
|
|
|
|
|
294
|
}; |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# Start the workers |
63
|
6
|
|
|
|
|
36
|
$self->start(); |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
# Read from the workers, loop until they all shut down |
66
|
1
|
|
|
|
|
27
|
while (%{$self->{workers}}) { |
|
5
|
|
|
|
|
72
|
|
67
|
4
|
|
|
|
|
55
|
while (my @ready = $self->{select}->can_read()) { |
68
|
8
|
|
|
|
|
9362303
|
foreach my $fh (@ready) { |
69
|
13
|
|
|
|
|
74
|
my $result = $self->receive($fh); |
70
|
13
|
50
|
|
|
|
44
|
if (!$result) { |
71
|
0
|
|
|
|
|
0
|
$self->{select}->remove($fh); |
72
|
0
|
|
|
|
|
0
|
print STDERR "$fh got eof\n"; |
73
|
0
|
|
|
|
|
0
|
next; |
74
|
|
|
|
|
|
|
} |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
# Process the result handler |
77
|
13
|
100
|
66
|
|
|
83
|
if ($result->{data} && defined $self->{result_handler}) { |
78
|
8
|
|
|
|
|
18
|
&{ $self->{result_handler} }( $result->{data} ); |
|
8
|
|
|
|
|
178
|
|
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
# If there's still work to be done, send it to the worker, otherwise shut it down |
82
|
13
|
100
|
|
|
|
42725
|
if ($#{ $self->{work_queue} } > -1) { |
|
13
|
|
|
|
|
85
|
|
83
|
8
|
|
|
|
|
27
|
my $worker = $self->{workers}->{$result->{pid}}; |
84
|
8
|
|
|
|
|
35
|
$self->send( |
85
|
|
|
|
|
|
|
$self->{workers}->{ $result->{pid} }, # Worker's pipe |
86
|
8
|
|
|
|
|
20
|
pop(@{ $self->{work_queue} }) |
87
|
|
|
|
|
|
|
); |
88
|
|
|
|
|
|
|
} else { |
89
|
5
|
|
|
|
|
35
|
$self->{select}->remove($fh); |
90
|
5
|
|
|
|
|
279
|
my $fh = $self->{workers}->{ $result->{pid} }; |
91
|
5
|
|
|
|
|
13
|
delete($self->{workers}->{ $result->{pid} }); |
92
|
5
|
|
|
|
|
13152
|
close($fh); |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
} |
95
|
|
|
|
|
|
|
} |
96
|
|
|
|
|
|
|
} |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
# Wait for our children so the process table won't fill up |
99
|
1
|
|
|
|
|
255437
|
while ((my $pid = wait()) != -1) { } |
100
|
|
|
|
|
|
|
}; |
101
|
|
|
|
|
|
|
|
102
|
1
|
50
|
|
|
|
13
|
if ($@) { |
103
|
0
|
|
|
|
|
0
|
croak($@); |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
sub start { |
108
|
6
|
|
|
6
|
0
|
12
|
my $self = shift(); |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
# Create a pipe for the workers to communicate to the boss |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
# Create the workers |
113
|
6
|
|
|
|
|
18
|
foreach (1..$self->{worker_count}) { |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
# Open a pipe for the worker |
116
|
20
|
|
|
|
|
771
|
pipe(my $from_boss, my $to_worker); |
117
|
20
|
|
|
|
|
675
|
pipe(my $from_worker, my $to_boss); |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
# Fork off a worker |
120
|
20
|
|
|
|
|
27623
|
my $pid = fork(); |
121
|
|
|
|
|
|
|
|
122
|
20
|
100
|
|
|
|
1616
|
if ($pid > 0) { |
|
|
50
|
|
|
|
|
|
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# Boss |
125
|
15
|
|
|
|
|
763
|
$self->{workers}->{$pid} = $to_worker; |
126
|
15
|
|
|
|
|
152
|
$self->{from_worker}->{$pid} = $from_worker; |
127
|
15
|
|
|
|
|
922
|
$self->{select}->add($from_worker); |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
# Close unused pipes |
130
|
15
|
|
|
|
|
2335
|
close($to_boss); |
131
|
15
|
|
|
|
|
901
|
close($from_boss); |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
} elsif ($pid == 0) { |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# Worker |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
# Close unused pipes |
138
|
5
|
|
|
|
|
608
|
close($from_worker); |
139
|
5
|
|
|
|
|
143
|
close($to_worker); |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
# Setup communication pipes |
142
|
5
|
|
|
|
|
203
|
$self->{to_boss} = $to_boss; |
143
|
5
|
|
|
|
|
1216
|
open(STDIN, '/dev/null'); |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Send the initial request |
146
|
5
|
|
|
|
|
608
|
$self->send($to_boss, {pid => $$}); |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
# Start processing |
149
|
5
|
|
|
|
|
556
|
$self->worker($from_boss); |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
# When the worker subroutine completes, exit |
152
|
5
|
|
|
|
|
4626
|
exit; |
153
|
|
|
|
|
|
|
} else { |
154
|
0
|
|
|
|
|
0
|
confess("Failed to fork: $!"); |
155
|
|
|
|
|
|
|
} |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
sub worker(\*) { |
160
|
5
|
|
|
5
|
0
|
13
|
my $self = shift(); |
161
|
5
|
|
|
|
|
11
|
my $from_boss = shift(); |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
# Read instructions from the server |
164
|
5
|
|
|
|
|
20
|
while (my $instructions = $self->receive($from_boss)) { |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
# If the handler's children die, that's not our business |
167
|
8
|
|
|
|
|
310
|
$SIG{CHLD} = 'IGNORE'; |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
# Execute the handler with the given instructions |
170
|
8
|
|
|
|
|
142
|
my $result; |
171
|
8
|
|
|
|
|
26
|
eval { |
172
|
|
|
|
|
|
|
# Handle alarms |
173
|
|
|
|
|
|
|
local $SIG{ALRM} = sub { |
174
|
0
|
|
|
0
|
|
0
|
die "Work handler timed out." |
175
|
8
|
|
|
|
|
372
|
}; |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
# Set alarm |
178
|
8
|
|
|
|
|
187
|
alarm($self->{global_timeout}); |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
# Execute the handler and get it's result |
181
|
8
|
50
|
|
|
|
40
|
if (defined $self->{work_handler}) { |
182
|
8
|
|
|
|
|
16
|
$result = &{ $self->{work_handler} }($instructions); |
|
8
|
|
|
|
|
89
|
|
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
# Disable alarm |
186
|
8
|
|
|
|
|
12011331
|
alarm(0); |
187
|
|
|
|
|
|
|
}; |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
# Warn on errors |
190
|
8
|
50
|
|
|
|
49
|
if ($@) { |
191
|
0
|
|
|
|
|
0
|
croak("Worker $$ error: $@"); |
192
|
|
|
|
|
|
|
} |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
# Send the result to the server |
195
|
8
|
|
|
|
|
95
|
$self->send($self->{to_boss}, {pid => $$, data => $result}); |
196
|
|
|
|
|
|
|
} |
197
|
|
|
|
|
|
|
} |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
sub receive(\*) { |
200
|
26
|
|
|
26
|
0
|
56
|
my $self = shift(); |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
# Get the file handle |
203
|
26
|
|
|
|
|
153
|
my $fh = shift(); |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
# Get a value from the file handle |
206
|
26
|
|
|
|
|
52
|
my $value; |
207
|
|
|
|
|
|
|
my $char; |
208
|
26
|
|
|
|
|
12152653
|
while (read($fh, $char, 1)) { |
209
|
1687
|
|
|
|
|
1975
|
$value .= $char; |
210
|
1687
|
100
|
|
|
|
15710
|
if (substr($value, -($self->{msg_delimiter_length})) eq $self->{msg_delimiter}) { |
211
|
21
|
|
|
|
|
76
|
$value = substr($value, 0, -($self->{msg_delimiter_length})); |
212
|
21
|
|
|
|
|
82
|
last; |
213
|
|
|
|
|
|
|
} |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
# Deserialize the data |
217
|
6
|
|
|
6
|
|
48
|
no strict; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
240
|
|
218
|
6
|
|
|
6
|
|
36
|
no warnings; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
1506
|
|
219
|
26
|
|
|
|
|
9216
|
my $data = eval($value); |
220
|
|
|
|
|
|
|
|
221
|
26
|
50
|
|
|
|
255
|
if ($@) { |
222
|
0
|
0
|
|
|
|
0
|
print STDERR "Value: '$value'\n" if $ENV{PFBW_DEBUG}; |
223
|
0
|
|
|
|
|
0
|
confess("Failed to deserialize data: $@"); |
224
|
|
|
|
|
|
|
} |
225
|
|
|
|
|
|
|
|
226
|
26
|
|
|
|
|
145
|
return $data; |
227
|
|
|
|
|
|
|
} |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
sub send(\*$) { |
230
|
21
|
|
|
21
|
0
|
102
|
my $self = shift(); |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
# Get the file handle |
233
|
21
|
|
|
|
|
145
|
my $fh = shift(); |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
# Get the value which will be sent |
236
|
21
|
|
|
|
|
68
|
my $value = shift(); |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
# Print the value to the file handle |
239
|
21
|
|
|
|
|
149
|
local $Data::Dumper::Deepcopy = 1; |
240
|
21
|
|
|
|
|
162
|
local $Data::Dumper::Indent = 0; |
241
|
21
|
|
|
|
|
89
|
local $Data::Dumper::Purity = 1; |
242
|
21
|
|
|
|
|
370
|
print $fh Dumper($value) . $self->{msg_delimiter}; |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
# Force the file handle to flush |
245
|
21
|
|
|
|
|
8420
|
$fh->flush(); |
246
|
|
|
|
|
|
|
} |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
1; |
249
|
|
|
|
|
|
|
__END__ |