line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
|
2
|
|
|
|
|
|
|
package RPC::ToWorker; |
3
|
|
|
|
|
|
|
|
4
|
1
|
|
|
1
|
|
247587
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
89
|
|
5
|
1
|
|
|
1
|
|
6
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
59
|
|
6
|
|
|
|
|
|
|
require Exporter; |
7
|
1
|
|
|
1
|
|
1002
|
use File::Slurp::Remote::BrokenDNS qw($myfqdn %fqdnify); |
|
1
|
|
|
|
|
12370
|
|
|
1
|
|
|
|
|
158
|
|
8
|
1
|
|
|
1
|
|
9
|
use Tie::Function::Examples qw(%q_perl); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
86
|
|
9
|
1
|
|
|
1
|
|
1701
|
use IO::Event; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
use IO::Event::Callback; |
11
|
|
|
|
|
|
|
use Eval::LineNumbers qw(eval_line_numbers); |
12
|
|
|
|
|
|
|
use Carp qw(confess); |
13
|
|
|
|
|
|
|
use IO::Handle; |
14
|
|
|
|
|
|
|
use Socket; |
15
|
|
|
|
|
|
|
use IO::Event::Callback; |
16
|
|
|
|
|
|
|
use Proc::Parallel::RemoteKiller; |
17
|
|
|
|
|
|
|
use Scalar::Util qw(refaddr weaken); |
18
|
|
|
|
|
|
|
use Time::HiRes qw(time); |
19
|
|
|
|
|
|
|
require POSIX; |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
our $VERSION = 0.601; |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
our @EXPORT = qw(do_remote_job); |
24
|
|
|
|
|
|
|
our @ISA = qw(Exporter); |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
our $command = 'perl'; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
our $max_retry = 10; |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
my $timer_interval = 5; |
31
|
|
|
|
|
|
|
my $reconnect_timeout = 7200; |
32
|
|
|
|
|
|
|
my $listen_port = 28328; |
33
|
|
|
|
|
|
|
my $listener; |
34
|
|
|
|
|
|
|
my $poll_interval = 15; |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
our $remote_killer; |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
my %waiting; |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
our $debug = 0; |
41
|
|
|
|
|
|
|
our $debug_create = 0; |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
my %forced_polling; |
44
|
|
|
|
|
|
|
my $last_poll = 0; |
45
|
|
|
|
|
|
|
our $doing_force_poll = 1; |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
sub force_poll |
48
|
|
|
|
|
|
|
{ |
49
|
|
|
|
|
|
|
# work around a bug in IO::Event or maybe Event |
50
|
|
|
|
|
|
|
return if $doing_force_poll; |
51
|
|
|
|
|
|
|
return $last_poll + $poll_interval < time; |
52
|
|
|
|
|
|
|
local $doing_force_poll = 1; |
53
|
|
|
|
|
|
|
$last_poll = time; |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
print STDERR "--------------------------------- forced poll start ----------------------------\n"; |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
for my $a (keys %forced_polling) { |
58
|
|
|
|
|
|
|
my $ioe = $forced_polling{$a}; |
59
|
|
|
|
|
|
|
if ($ioe) { |
60
|
|
|
|
|
|
|
$ioe->ie_input(); |
61
|
|
|
|
|
|
|
} else { |
62
|
|
|
|
|
|
|
delete $forced_polling{$a}; |
63
|
|
|
|
|
|
|
} |
64
|
|
|
|
|
|
|
} |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
print STDERR "--------------------------------- forced poll end ------------------------------\n"; |
67
|
|
|
|
|
|
|
} |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sub do_remote_job |
70
|
|
|
|
|
|
|
{ |
71
|
|
|
|
|
|
|
my (%params) = @_; |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
$params{can_retry} = 1 unless defined $params{can_retry}; |
74
|
|
|
|
|
|
|
my $can_retry = $params{can_retry}; |
75
|
|
|
|
|
|
|
my $host = $params{host}; |
76
|
|
|
|
|
|
|
my $when_done = $params{when_done} || confess "when_done is a required parameter"; |
77
|
|
|
|
|
|
|
my $data = $params{data}; |
78
|
|
|
|
|
|
|
my $chdir = $params{chdir} ||= '.'; |
79
|
|
|
|
|
|
|
my $eval = $params{eval}; |
80
|
|
|
|
|
|
|
my $desc = $params{desc} ||= "job on $host"; |
81
|
|
|
|
|
|
|
my $prefix = $params{prefix} ||= "$host:"; |
82
|
|
|
|
|
|
|
my $preload = $params{preload} ||= []; |
83
|
|
|
|
|
|
|
my $prequel = $params{prequel} ||= ''; |
84
|
|
|
|
|
|
|
my $alldone = bless { %params }, 'RPC::ToWorker::AllDone'; |
85
|
|
|
|
|
|
|
my $status = $params{status} ||= sub { 0; }; |
86
|
|
|
|
|
|
|
$params{failure} ||= sub { |
87
|
|
|
|
|
|
|
print STDERR "DIE DIE DIE DIE DIE: $desc: @_"; |
88
|
|
|
|
|
|
|
# exit 1; hangs! |
89
|
|
|
|
|
|
|
POSIX::_exit(1); |
90
|
|
|
|
|
|
|
}; |
91
|
|
|
|
|
|
|
my $died_at = $params{died_at} ||= $params{failure}; |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
$params{alldone} = $alldone; |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
$preload = [ split(' ', $preload) ] unless ref $preload; |
96
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
while(! $listener) { |
98
|
|
|
|
|
|
|
$listener = IO::Event::Socket::INET->new( |
99
|
|
|
|
|
|
|
Listen => 100, |
100
|
|
|
|
|
|
|
Proto => 'tcp', |
101
|
|
|
|
|
|
|
LocalPort => ++$listen_port, |
102
|
|
|
|
|
|
|
); |
103
|
|
|
|
|
|
|
unless ($listener) { |
104
|
|
|
|
|
|
|
warn "# Cannot listen on port $listen_port: $!"; |
105
|
|
|
|
|
|
|
redo; |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
my $timer = IO::Event->timer( |
108
|
|
|
|
|
|
|
interval => $timer_interval, |
109
|
|
|
|
|
|
|
cb => sub { |
110
|
|
|
|
|
|
|
for my $e (keys %waiting) { |
111
|
|
|
|
|
|
|
my $r = $waiting{$e}; |
112
|
|
|
|
|
|
|
next if time < $r->{start_time} + $reconnect_timeout; |
113
|
|
|
|
|
|
|
next if $r->{alldone}{failed}; |
114
|
|
|
|
|
|
|
if ($r->{alldone}{compile_finished} && $can_retry && ! $r->{alldone}{master_go} && $can_retry < $max_retry) { |
115
|
|
|
|
|
|
|
$r->{alldone}->{retrying} = 1; |
116
|
|
|
|
|
|
|
$r->{alldone}{failed} = "Timed out, retrying"; |
117
|
|
|
|
|
|
|
my %new = %$r; |
118
|
|
|
|
|
|
|
delete $new{alldone}; |
119
|
|
|
|
|
|
|
$new{can_retry}++; |
120
|
|
|
|
|
|
|
$new{desc} = "RETRY$new{can_retry} $new{desc}"; |
121
|
|
|
|
|
|
|
do_remote_job(%new); |
122
|
|
|
|
|
|
|
print STDERR "RETRYING REMOTE JOB $desc\n"; |
123
|
|
|
|
|
|
|
} else { |
124
|
|
|
|
|
|
|
$r->{failure}->("Timed out waiting for job $desc on $host to connect to $listen_port for cookie $e"); |
125
|
|
|
|
|
|
|
} |
126
|
|
|
|
|
|
|
} |
127
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
128
|
|
|
|
|
|
|
}, |
129
|
|
|
|
|
|
|
); |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
# $listener->event->prio(1); |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
$remote_killer = Proc::Parallel::RemoteKiller->new(); |
134
|
|
|
|
|
|
|
} |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
my $slavefh = new IO::Handle; |
137
|
|
|
|
|
|
|
my $parentfh = new IO::Handle; |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
socketpair($slavefh, $parentfh, AF_UNIX, SOCK_STREAM, PF_UNSPEC) |
140
|
|
|
|
|
|
|
or die "socketpair: $!"; |
141
|
|
|
|
|
|
|
my $pid = fork(); |
142
|
|
|
|
|
|
|
my $slave; |
143
|
|
|
|
|
|
|
if ($pid) { |
144
|
|
|
|
|
|
|
# parent |
145
|
|
|
|
|
|
|
$parentfh->close(); |
146
|
|
|
|
|
|
|
$slavefh->blocking(0); |
147
|
|
|
|
|
|
|
$slavefh->autoflush(1); |
148
|
|
|
|
|
|
|
$slave = IO::Event::Callback->new($slavefh, |
149
|
|
|
|
|
|
|
werror => sub { |
150
|
|
|
|
|
|
|
$params{failure}->("Could not write to stdin for $desc: $!"); |
151
|
|
|
|
|
|
|
$alldone->{failed}->("Could not write to stdin for $desc: $!"); |
152
|
|
|
|
|
|
|
}, |
153
|
|
|
|
|
|
|
input => sub { |
154
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
155
|
|
|
|
|
|
|
while (<$ioe>) { |
156
|
|
|
|
|
|
|
if (/^SLAVE PID=(\d+)\n/) { |
157
|
|
|
|
|
|
|
$remote_killer->note($host, $1); |
158
|
|
|
|
|
|
|
$params{pid} = $1; |
159
|
|
|
|
|
|
|
$alldone->{slavepid} = $1; |
160
|
|
|
|
|
|
|
next; |
161
|
|
|
|
|
|
|
} elsif (/^compile finished\./) { |
162
|
|
|
|
|
|
|
$alldone->{compile_finished} = 1; |
163
|
|
|
|
|
|
|
next; |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
if ($params{output_handler}) { |
166
|
|
|
|
|
|
|
$params{output_handler}->($_); |
167
|
|
|
|
|
|
|
} else { |
168
|
|
|
|
|
|
|
print STDERR "$prefix SSH/ERROR: $_"; |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
172
|
|
|
|
|
|
|
}, |
173
|
|
|
|
|
|
|
eof => sub { |
174
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
175
|
|
|
|
|
|
|
$ioe->close(); |
176
|
|
|
|
|
|
|
$params{output_handler}->("EOF on ssh ($desc)\n") if $debug_create; |
177
|
|
|
|
|
|
|
}, |
178
|
|
|
|
|
|
|
); |
179
|
|
|
|
|
|
|
$params{output_handler}->("startup ($desc)\n") if $debug_create; |
180
|
|
|
|
|
|
|
} elsif (defined $pid) { |
181
|
|
|
|
|
|
|
# child |
182
|
|
|
|
|
|
|
$slavefh->close(); |
183
|
|
|
|
|
|
|
$parentfh->autoflush(1); |
184
|
|
|
|
|
|
|
$parentfh->blocking(0); |
185
|
|
|
|
|
|
|
print $parentfh "Foo!\n" if $debug; |
186
|
|
|
|
|
|
|
open STDIN, "<&", \$parentfh or die "dup onto STDIN: $!"; |
187
|
|
|
|
|
|
|
open STDOUT, ">&", \$parentfh or die "dup onto STDOUT: $!"; |
188
|
|
|
|
|
|
|
open STDERR, ">&", \$parentfh or die "dup onto STDERR: $!"; |
189
|
|
|
|
|
|
|
if (0 && $fqdnify{$host} eq $myfqdn) { # XXX why is this not reliable? |
190
|
|
|
|
|
|
|
exec $command |
191
|
|
|
|
|
|
|
or die "exec $command: $!"; |
192
|
|
|
|
|
|
|
} else { |
193
|
|
|
|
|
|
|
exec 'ssh', $host, '-o', 'StrictHostKeyChecking=no', '-o', 'BatchMode=yes', $command, |
194
|
|
|
|
|
|
|
or do { |
195
|
|
|
|
|
|
|
$params{failure}->("exec ssh $host $command: $!"); |
196
|
|
|
|
|
|
|
return; |
197
|
|
|
|
|
|
|
}; |
198
|
|
|
|
|
|
|
} |
199
|
|
|
|
|
|
|
} else { |
200
|
|
|
|
|
|
|
die "cannot fork: $!"; |
201
|
|
|
|
|
|
|
} |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
my $cookie; |
204
|
|
|
|
|
|
|
do { |
205
|
|
|
|
|
|
|
$cookie = "C".rand(100000000); |
206
|
|
|
|
|
|
|
} while defined $waiting{"$cookie MASTER"}; |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
$waiting{"$cookie MASTER"} = bless { |
209
|
|
|
|
|
|
|
slave => $slave, |
210
|
|
|
|
|
|
|
start_time => time, |
211
|
|
|
|
|
|
|
%params, |
212
|
|
|
|
|
|
|
}, 'RPC::ToWorker::Master'; |
213
|
|
|
|
|
|
|
|
214
|
|
|
|
|
|
|
$waiting{"$cookie OUTPUT"} = bless { |
215
|
|
|
|
|
|
|
slave => $slave, |
216
|
|
|
|
|
|
|
start_time => time, |
217
|
|
|
|
|
|
|
%params, |
218
|
|
|
|
|
|
|
}, 'RPC::ToWorker::Output'; |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
my $stream = ''; |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
if ($params{stream_in} || $params{stream_out}) { |
223
|
|
|
|
|
|
|
$stream = eval_line_numbers(<
|
224
|
|
|
|
|
|
|
my \$stream = new IO::Socket::INET ( |
225
|
|
|
|
|
|
|
PeerAddr => '$myfqdn:$listen_port', |
226
|
|
|
|
|
|
|
Proto => 'tcp', |
227
|
|
|
|
|
|
|
); |
228
|
|
|
|
|
|
|
die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$stream; |
229
|
|
|
|
|
|
|
\$stream->autoflush(1); |
230
|
|
|
|
|
|
|
print \$stream "$cookie STREAM\\n" |
231
|
|
|
|
|
|
|
or die; |
232
|
|
|
|
|
|
|
END_STREAM |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
$waiting{"$cookie STREAM"} = bless { |
235
|
|
|
|
|
|
|
slave => $slave, |
236
|
|
|
|
|
|
|
start_time => time, |
237
|
|
|
|
|
|
|
%params, |
238
|
|
|
|
|
|
|
}, 'RPC::ToWorker::Stream'; |
239
|
|
|
|
|
|
|
} |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
my $pre = ''; |
242
|
|
|
|
|
|
|
$pre .= "use $_; " for @$preload; |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
my $p5lib = $ENV{PERL5LIB} || ''; |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
my $av0 = "slave for $myfqdn:$$ - $desc: "; |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
print $slave eval_line_numbers(<
|
249
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'starting'; |
250
|
|
|
|
|
|
|
use strict; |
251
|
|
|
|
|
|
|
use warnings; |
252
|
|
|
|
|
|
|
BEGIN { |
253
|
|
|
|
|
|
|
print "SLAVE PID=\$\$\\n"; |
254
|
|
|
|
|
|
|
chdir($q_perl{$chdir}) or die "cannot chdir to $chdir on $host: \$!"; |
255
|
|
|
|
|
|
|
unshift(\@INC, split(':', $q_perl{$p5lib})); |
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
END_SLAVE0 |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
print $slave $prequel; |
260
|
|
|
|
|
|
|
print $slave "\n"; |
261
|
|
|
|
|
|
|
print $slave eval_line_numbers(<
|
262
|
|
|
|
|
|
|
use IO::Socket::INET; |
263
|
|
|
|
|
|
|
use Storable qw(freeze thaw); |
264
|
|
|
|
|
|
|
$pre |
265
|
|
|
|
|
|
|
END_SLAVE1 |
266
|
|
|
|
|
|
|
print $slave eval_line_numbers(<
|
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
if ($debug) { |
269
|
|
|
|
|
|
|
open(DEBUG, ">&STDERR") or die "dup STDERR: $!"; |
270
|
|
|
|
|
|
|
print STDERR "Dup to DEBUG should have worked\\n"; |
271
|
|
|
|
|
|
|
select(DEBUG); |
272
|
|
|
|
|
|
|
\$| = 1; |
273
|
|
|
|
|
|
|
select(STDOUT); |
274
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
275
|
|
|
|
|
|
|
} |
276
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'redirecting STDOUT'; |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
my \$output = new IO::Socket::INET ( |
280
|
|
|
|
|
|
|
PeerAddr => '$myfqdn:$listen_port', |
281
|
|
|
|
|
|
|
Proto => 'tcp', |
282
|
|
|
|
|
|
|
); |
283
|
|
|
|
|
|
|
die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$output; |
284
|
|
|
|
|
|
|
\$output->autoflush(1); |
285
|
|
|
|
|
|
|
print \$output "$cookie OUTPUT\\n"; |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
if ($debug) { |
288
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
289
|
|
|
|
|
|
|
print STDERR "Connected for Output\\n"; |
290
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
291
|
|
|
|
|
|
|
print "Output connected\\n"; |
292
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
293
|
|
|
|
|
|
|
print \$output "test foo\\n"; |
294
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
295
|
|
|
|
|
|
|
} |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
open STDOUT, ">&", \$output or die "dup to STDOUT: \$!"; |
298
|
|
|
|
|
|
|
select STDOUT; |
299
|
|
|
|
|
|
|
\$| = 1; |
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
if ($debug) { |
302
|
|
|
|
|
|
|
print STDERR "stderr test\\n"; |
303
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__; |
304
|
|
|
|
|
|
|
} |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'connecting STREAM'; |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
$stream |
309
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'setting up MASTER'; |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
my \$master = new IO::Socket::INET ( |
313
|
|
|
|
|
|
|
PeerAddr => '$myfqdn:$listen_port', |
314
|
|
|
|
|
|
|
Proto => 'tcp', |
315
|
|
|
|
|
|
|
); |
316
|
|
|
|
|
|
|
die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$master; |
317
|
|
|
|
|
|
|
\$master->autoflush(1); |
318
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
319
|
|
|
|
|
|
|
print \$master "$cookie MASTER\\n" |
320
|
|
|
|
|
|
|
or die; |
321
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'looking for "go" from master'; |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
my \$go = <\$master>; |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
chomp(\$go); |
328
|
|
|
|
|
|
|
exit 1 if \$go eq 'suicide'; |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
\$go =~ /go (\\d+)/; |
331
|
|
|
|
|
|
|
my \$amt = \$1; |
332
|
|
|
|
|
|
|
die unless \$amt; |
333
|
|
|
|
|
|
|
|
334
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'downloading initial data'; |
335
|
|
|
|
|
|
|
|
336
|
|
|
|
|
|
|
my \$buf = ''; |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
while (length(\$buf) < \$amt) { |
339
|
|
|
|
|
|
|
read(\$master, \$buf, \$amt - length(\$buf), length(\$buf)) or die; |
340
|
|
|
|
|
|
|
} |
341
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'reconstituting initial data'; |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
my \$data = \${thaw(\$buf)}; |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
\$RPC::ToWorker::Callback::master = # suppress used-once warning |
349
|
|
|
|
|
|
|
\$RPC::ToWorker::Callback::master = \$master; |
350
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
352
|
|
|
|
|
|
|
END_SLAVE2 |
353
|
|
|
|
|
|
|
print $slave eval_line_numbers(<
|
354
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'RUNNING'; |
356
|
|
|
|
|
|
|
my \@r; |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
359
|
|
|
|
|
|
|
eval { |
360
|
|
|
|
|
|
|
sub slave_eval { |
361
|
|
|
|
|
|
|
$eval |
362
|
|
|
|
|
|
|
} |
363
|
|
|
|
|
|
|
\@r = slave_eval(\$data); |
364
|
|
|
|
|
|
|
}; |
365
|
|
|
|
|
|
|
printf DEBUG "debug test %d\\n", __LINE__ if $debug; |
366
|
|
|
|
|
|
|
|
367
|
|
|
|
|
|
|
if (\$\@) { |
368
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'returning failure'; |
369
|
|
|
|
|
|
|
print STDERR \$\@; |
370
|
|
|
|
|
|
|
my \$err = freeze(\\\$\@); |
371
|
|
|
|
|
|
|
printf \$master "DATA %d RETURN_ERROR\\n%s", length(\$err), \$err; |
372
|
|
|
|
|
|
|
# exit 1; hangs |
373
|
|
|
|
|
|
|
POSIX::_exit(1); |
374
|
|
|
|
|
|
|
} |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'returning results'; |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
my \$ret = freeze(\\\@r); |
379
|
|
|
|
|
|
|
printf \$master "DATA %d RETURN_VALUES\\n%s", length(\$ret), \$ret; |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
\$0 = $q_perl{$av0} . 'exiting'; |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
exit; |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
BEGIN { print STDERR "compile finished.\\n" } |
386
|
|
|
|
|
|
|
END_SLAVE3 |
387
|
|
|
|
|
|
|
shutdown($slavefh, 1); # done writing |
388
|
|
|
|
|
|
|
} |
389
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
sub ie_connection |
391
|
|
|
|
|
|
|
{ |
392
|
|
|
|
|
|
|
my ($pkg, $ioe) = @_; |
393
|
|
|
|
|
|
|
print STDERR "# GOT CONNECTION\n" if $RPC::ToWorker::debug; |
394
|
|
|
|
|
|
|
my $newfh = $ioe->accept(); |
395
|
|
|
|
|
|
|
# $newfh->event->prio(1); |
396
|
|
|
|
|
|
|
$forced_polling{refaddr($newfh)} = $newfh; |
397
|
|
|
|
|
|
|
weaken($forced_polling{refaddr($newfh)}); |
398
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
sub ie_input |
402
|
|
|
|
|
|
|
{ |
403
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
404
|
|
|
|
|
|
|
my $cookie = <$ioe>; |
405
|
|
|
|
|
|
|
return unless $cookie; |
406
|
|
|
|
|
|
|
chomp($cookie); |
407
|
|
|
|
|
|
|
print STDERR "# GOT COOKIE $cookie\n" if $debug; |
408
|
|
|
|
|
|
|
unless ($waiting{$cookie}) { |
409
|
|
|
|
|
|
|
warn "Unknown cookie '$cookie'"; |
410
|
|
|
|
|
|
|
next; |
411
|
|
|
|
|
|
|
} |
412
|
|
|
|
|
|
|
$ioe->handler($waiting{$cookie}); |
413
|
|
|
|
|
|
|
# $ioe->event->prio(4); |
414
|
|
|
|
|
|
|
my $o = $waiting{$cookie}; |
415
|
|
|
|
|
|
|
$o->{output_handler}->(sprintf("using fd %d for $cookie (%s)\n", $ioe->fileno, $o->{desc})) if $RPC::ToWorker::debug_create; |
416
|
|
|
|
|
|
|
$waiting{$cookie}->send_initial_data($ioe); |
417
|
|
|
|
|
|
|
delete $waiting{$cookie}; |
418
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
419
|
|
|
|
|
|
|
} |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
sub ie_eof |
422
|
|
|
|
|
|
|
{ |
423
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
424
|
|
|
|
|
|
|
$ioe->close(); |
425
|
|
|
|
|
|
|
} |
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
package RPC::ToWorker::Master; |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
# |
431
|
|
|
|
|
|
|
# This is on the master |
432
|
|
|
|
|
|
|
# |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
use strict; |
435
|
|
|
|
|
|
|
use warnings; |
436
|
|
|
|
|
|
|
use Storable qw(freeze thaw); |
437
|
|
|
|
|
|
|
use Module::Load qw(load); |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub send_initial_data |
440
|
|
|
|
|
|
|
{ |
441
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
if ($self->{alldone}{retrying}) { |
444
|
|
|
|
|
|
|
print $ioe "suicide\n"; |
445
|
|
|
|
|
|
|
} else { |
446
|
|
|
|
|
|
|
$self->{alldone}{master_go} = 1; |
447
|
|
|
|
|
|
|
|
448
|
|
|
|
|
|
|
my $id = freeze(\($self->{data} || undef)); |
449
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
printf $ioe "go %d\n", length($id); # don't suicide |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
print $ioe $id; |
453
|
|
|
|
|
|
|
print STDERR "# DATA SENT\n" if $RPC::ToWorker::debug; |
454
|
|
|
|
|
|
|
} |
455
|
|
|
|
|
|
|
} |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
sub ie_input |
458
|
|
|
|
|
|
|
{ |
459
|
|
|
|
|
|
|
my ($self, $ioe, $ibr) = @_; |
460
|
|
|
|
|
|
|
$self->{output_handler}->("control socket input ready ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
461
|
|
|
|
|
|
|
while ($$ibr =~ /\A(DATA (\d+) ([^\n]+)\n)/ && length($$ibr) - length($1) >= $2) { |
462
|
|
|
|
|
|
|
my ($header, $dsize, $control) = ($1, $2, $3); |
463
|
|
|
|
|
|
|
my $data = thaw(substr($$ibr, length($header), $dsize)); |
464
|
|
|
|
|
|
|
substr($$ibr, 0, length($header) + $dsize, ''); |
465
|
|
|
|
|
|
|
if ($control =~ /^RETURN_VALUES$/) { |
466
|
|
|
|
|
|
|
$self->{output_handler}->("return values sent - $dsize ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
467
|
|
|
|
|
|
|
eval { |
468
|
|
|
|
|
|
|
$self->{when_done}->(@$data); |
469
|
|
|
|
|
|
|
}; |
470
|
|
|
|
|
|
|
$self->{failure}->("when done for $self->{desc}: $@") if $@; |
471
|
|
|
|
|
|
|
$self->{return_values_sent} = 1; |
472
|
|
|
|
|
|
|
$ioe->close(); |
473
|
|
|
|
|
|
|
} elsif ($control =~ /^RETURN_ERROR$/) { |
474
|
|
|
|
|
|
|
my $error = $$data; |
475
|
|
|
|
|
|
|
$self->{failure}->("SLAVE FAILURE: $error"); |
476
|
|
|
|
|
|
|
$self->{alldone}{failured} = "SLAVE FAILURE: $error"; |
477
|
|
|
|
|
|
|
$self->{output_handler}->("return error ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
478
|
|
|
|
|
|
|
} elsif ($control =~ /^CALL (\S+) with (.*?) after loading (.*)/) { |
479
|
|
|
|
|
|
|
my ($func, $with, $mods) = ($1, $2, $3); |
480
|
|
|
|
|
|
|
for my $mod (split(' ', $mods)) { |
481
|
|
|
|
|
|
|
load $mod; |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
for my $item (split(' ',$with)) { |
484
|
|
|
|
|
|
|
push(@$data, $item); |
485
|
|
|
|
|
|
|
push(@$data, $self->{local_data}{$item}); |
486
|
|
|
|
|
|
|
} |
487
|
|
|
|
|
|
|
my @ret; |
488
|
|
|
|
|
|
|
eval { |
489
|
|
|
|
|
|
|
no strict 'refs'; |
490
|
|
|
|
|
|
|
@ret = &{$func}(@$data); |
491
|
|
|
|
|
|
|
}; |
492
|
|
|
|
|
|
|
$self->{failure}->("call to $func on behalf of $self->{desc}: $@") if $@; |
493
|
|
|
|
|
|
|
my $ret = freeze(\@ret); |
494
|
|
|
|
|
|
|
printf $ioe "DATA %d DONE_RESPONSE\n%s", length($ret), $ret or die; |
495
|
|
|
|
|
|
|
} else { |
496
|
|
|
|
|
|
|
$self->{failure}->("SLAVE FAILURE: could not parse input from slave"); |
497
|
|
|
|
|
|
|
$self->{alldone}{failured} = "SLAVE FAILURE: could not parse input from slave"; |
498
|
|
|
|
|
|
|
$self->{output_handler}->("return parse error ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
499
|
|
|
|
|
|
|
} |
500
|
|
|
|
|
|
|
} |
501
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
502
|
|
|
|
|
|
|
} |
503
|
|
|
|
|
|
|
|
504
|
|
|
|
|
|
|
sub ie_werror |
505
|
|
|
|
|
|
|
{ |
506
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
507
|
|
|
|
|
|
|
return if $self->{alldone}{retrying}; |
508
|
|
|
|
|
|
|
IO::Event->timer( |
509
|
|
|
|
|
|
|
after => 5, |
510
|
|
|
|
|
|
|
cb => sub { $self->{failure}->("Could not write to control socket for: $self->{desc}") }, |
511
|
|
|
|
|
|
|
); |
512
|
|
|
|
|
|
|
print STDERR "Failed: Could not write to control socket for job: $self->{desc}, will suicide soon, after queued output has chance to print\n"; |
513
|
|
|
|
|
|
|
$self->{alldone}{failured} = "Could not write to control socket for job: $self->{desc}"; |
514
|
|
|
|
|
|
|
$self->{output_handler}->("Write error on control socket ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
sub ie_eof |
518
|
|
|
|
|
|
|
{ |
519
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
$self->{output_handler}->("EOF on control socket ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
522
|
|
|
|
|
|
|
$ioe->close(); |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
return if $self->{return_values_sent}; |
525
|
|
|
|
|
|
|
return if $self->{alldone}{retrying}; |
526
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
IO::Event->timer( |
528
|
|
|
|
|
|
|
after => 5, |
529
|
|
|
|
|
|
|
cb => sub { $self->{failure}->("No return values from remote job: $self->{desc}") }, |
530
|
|
|
|
|
|
|
); |
531
|
|
|
|
|
|
|
print STDERR "Failed: no return values from remote job: $self->{desc}, will suicide soon, after queued output has chance to print\n"; |
532
|
|
|
|
|
|
|
$self->{alldone}{failured} = "No return values from remote job: $self->{desc}"; |
533
|
|
|
|
|
|
|
} |
534
|
|
|
|
|
|
|
|
535
|
|
|
|
|
|
|
package RPC::ToWorker::Output; |
536
|
|
|
|
|
|
|
|
537
|
|
|
|
|
|
|
use strict; |
538
|
|
|
|
|
|
|
use warnings; |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
sub send_initial_data |
541
|
|
|
|
|
|
|
{ |
542
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
543
|
|
|
|
|
|
|
shutdown($ioe->filehandle(), 1); # we don't write to this one |
544
|
|
|
|
|
|
|
# $ioe->event->prio(6); |
545
|
|
|
|
|
|
|
} |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
sub ie_input |
548
|
|
|
|
|
|
|
{ |
549
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
550
|
|
|
|
|
|
|
while (<$ioe>) { |
551
|
|
|
|
|
|
|
next if /ssh_exchange_identification: Connection closed by remote host/; |
552
|
|
|
|
|
|
|
if ($self->{output_handler}) { |
553
|
|
|
|
|
|
|
$self->{output_handler}->($_) |
554
|
|
|
|
|
|
|
} else { |
555
|
|
|
|
|
|
|
print STDERR "$self->{prefix} OUTPUT: $_"; |
556
|
|
|
|
|
|
|
} |
557
|
|
|
|
|
|
|
} |
558
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
559
|
|
|
|
|
|
|
} |
560
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
sub ie_eof |
562
|
|
|
|
|
|
|
{ |
563
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
564
|
|
|
|
|
|
|
$ioe->close(); |
565
|
|
|
|
|
|
|
$self->{output_handler}->("EOF on output socket ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
566
|
|
|
|
|
|
|
} |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
package RPC::ToWorker::Stream; |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
use strict; |
571
|
|
|
|
|
|
|
use warnings; |
572
|
|
|
|
|
|
|
use Storable qw(freeze thaw); |
573
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
sub send_initial_data |
575
|
|
|
|
|
|
|
{ |
576
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
577
|
|
|
|
|
|
|
$self->{stream_werror} ||= sub { |
578
|
|
|
|
|
|
|
my ($self, $ioe) = @_; |
579
|
|
|
|
|
|
|
IO::Event::unloop_all(); |
580
|
|
|
|
|
|
|
die "Write error sending data to $self->{desc}: $!"; |
581
|
|
|
|
|
|
|
}; |
582
|
|
|
|
|
|
|
for my $h (@IO::Event::Callback::handlers, 'setup') { |
583
|
|
|
|
|
|
|
$self->{"stream_$h"} ||= sub {}; |
584
|
|
|
|
|
|
|
} |
585
|
|
|
|
|
|
|
$self->{'stream_setup'}->($self, $ioe); |
586
|
|
|
|
|
|
|
# $ioe->event->prio(5); |
587
|
|
|
|
|
|
|
} |
588
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
sub ie_input { $_[0]->{'stream_input'}->(@_) }; |
590
|
|
|
|
|
|
|
sub ie_connection { $_[0]->{'stream_connection'}->(@_) }; |
591
|
|
|
|
|
|
|
sub ie_read_ready { $_[0]->{'stream_read_ready'}->(@_) }; |
592
|
|
|
|
|
|
|
sub ie_werror { $_[0]->{'stream_werror'}->(@_) }; |
593
|
|
|
|
|
|
|
sub ie_eof { $_[0]->{'stream_eof'}->(@_) }; |
594
|
|
|
|
|
|
|
sub ie_output { $_[0]->{'stream_output'}->(@_) }; |
595
|
|
|
|
|
|
|
sub ie_outputdone { $_[0]->{'stream_outputdone'}->(@_) }; |
596
|
|
|
|
|
|
|
sub ie_connected { $_[0]->{'stream_connected'}->(@_) }; |
597
|
|
|
|
|
|
|
sub ie_connect_failed { $_[0]->{'stream_connect_failed'}->(@_)}; |
598
|
|
|
|
|
|
|
sub ie_died { $_[0]->{'stream_died'}->(@_) }; |
599
|
|
|
|
|
|
|
sub ie_timer { $_[0]->{'stream_timer'}->(@_) }; |
600
|
|
|
|
|
|
|
sub ie_exception { $_[0]->{'stream_exception'}->(@_) }; |
601
|
|
|
|
|
|
|
sub ie_outputoverflow { $_[0]->{'stream_outputoverflow'}->(@_)}; |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
package RPC::ToWorker::AllDone; |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
use strict; |
606
|
|
|
|
|
|
|
use warnings; |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
sub DESTROY |
609
|
|
|
|
|
|
|
{ |
610
|
|
|
|
|
|
|
my ($self) = @_; |
611
|
|
|
|
|
|
|
$self->{failure}->($self->{failed}) if $self->{failed} && ! $self->{retrying}; |
612
|
|
|
|
|
|
|
$RPC::ToWorker::remote_killer->forget($self->{host}, $self->{slavepid}) |
613
|
|
|
|
|
|
|
if $self->{slavepid}; |
614
|
|
|
|
|
|
|
$self->{all_done}->() if $self->{all_done}; |
615
|
|
|
|
|
|
|
$RPC::ToWorker::remote_killer->forget($self->{host}, $self->{pid}) |
616
|
|
|
|
|
|
|
if $self->{pid}; |
617
|
|
|
|
|
|
|
RPC::ToWorker::force_poll(); |
618
|
|
|
|
|
|
|
$self->{output_handler}->("Alldone on ($self->{desc})\n") if $RPC::ToWorker::debug_create; |
619
|
|
|
|
|
|
|
} |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
1; |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
__END__ |