File Coverage

lib/Remote/Perl.pm
Criterion Covered Total %
statement 202 205 98.5
branch 71 84 84.5
condition 43 55 78.1
subroutine 25 28 89.2
pod 5 6 83.3
total 346 378 91.5


line stmt bran cond sub pod time code
1 139     139   15667465 use v5.36;
  137     0   687  
  0            
2             package Remote::Perl;
3              
4 137     135   79226 use autodie qw(open);
  134         2995320  
  133         966  
5 133     132   54281 use List::Util qw(min);
  132         439  
  132         15264  
6 132     132   53991 use IO::Select;
  132         162118  
  132         10077  
7              
8 132     132   106720 use Remote::Perl::Bootstrap qw(bootstrap_payload wait_for_ready);
  132         399  
  132         11253  
9 132     132   67421 use Remote::Perl::ModuleServer;
  132         374  
  132         11504  
10 132         26851 use Remote::Perl::Protocol qw(
11             PROTOCOL_VERSION
12             MSG_HELLO MSG_READY MSG_RUN MSG_DATA MSG_EOF
13             MSG_CREDIT MSG_MOD_REQ MSG_MOD_MISSING MSG_RETURN
14             MSG_SIGNAL MSG_SIGNAL_ACK
15             MSG_ERROR MSG_BYE
16             STREAM_CONTROL STREAM_STDIN STREAM_STDOUT STREAM_STDERR
17             TMPFILE_NONE TMPFILE_AUTO TMPFILE_LINUX TMPFILE_PERL TMPFILE_NAMED
18             FLAGS_WARNINGS
19             encode_message encode_hello encode_credit decode_return
20             encode_run
21 132     132   72037 );
  132         442  
22 132     132   65902 use Remote::Perl::Transport;
  132         428  
  132         434458  
23              
24             our $VERSION = '0.004';
25              
26             # -- Tmpfile strategy mapping --------------------------------------------------
27              
28             my %_TMPFILE_FLAG = (
29             off => TMPFILE_NONE,
30             1 => TMPFILE_AUTO,
31             auto => TMPFILE_AUTO,
32             linux => TMPFILE_LINUX,
33             perl => TMPFILE_PERL,
34             named => TMPFILE_NAMED,
35             );
36              
37 1456     1456   2503 sub _tmpfile_flag($v) {
  1456         2350  
  1456         2964  
38 1456 100       11662 return TMPFILE_NONE unless $v;
39             die "Unknown tmpfile strategy '$v'; valid values: auto, linux, perl, named\n"
40 456 50       2620 unless exists $_TMPFILE_FLAG{$v};
41 456         2757 return $_TMPFILE_FLAG{$v};
42             }
43              
44             # -- Constructor ---------------------------------------------------------------
45              
46 1522     1522 1 282300867 sub new($class, %args) {
  1522         10194  
  1522         16318  
  1522         4506  
47             my $self = bless {
48             cmd => $args{cmd} // die("'cmd' is required\n"),
49             window => $args{window} // 65_536,
50             serve => $args{serve} // 0,
51             tmpfile => $args{tmpfile} // 0,
52             data_warn => $args{data_warn} // 0,
53             _mod_srv => ($args{serve}
54             ? Remote::Perl::ModuleServer->new(
55             inc => $args{inc} // \@INC,
56             serve_filter => $args{serve_filter},
57             )
58 1522 100 50     182250 : undef),
      100        
      100        
      100        
      100        
      50        
59             # set by _connect:
60             _t => undef, # Remote::Perl::Transport
61             _parser => undef, # Remote::Perl::Protocol::Parser
62             _done => 0,
63             _ready => 0,
64             # reset before each run:
65             _on_stdout => undef,
66             _on_stderr => undef,
67             _returned => 0,
68             _return_code => undef,
69             _return_msg => undef,
70             _stdin_credits => 0,
71             _stdin_eof_sent => 0,
72             _no_stdin => 0,
73             _stdin_str => undef, # plain-string stdin buffer; undef = use fh or none
74             }, $class;
75 1522         14966 $self->_connect;
76 1422         35354 return $self;
77             }
78              
79             # -- Internal: connect and handshake -------------------------------------------
80              
81 1522     1522   9381 sub _connect($self) {
  1522         30820  
  1522         2646  
82 1522         27171 my $t = Remote::Perl::Transport->new(cmd => $self->{cmd});
83 1522         11861 $t->connect;
84 1423         34418 $self->{_t} = $t;
85 1423         361400 $self->{_parser} = Remote::Perl::Protocol::Parser->new;
86              
87 1423         94075 $t->write_bytes(bootstrap_payload(serve => $self->{serve}));
88              
89             # Scan for the readiness marker; any leftover bytes belong to the protocol.
90 1423         22595 my $leftover = wait_for_ready($t->out_fh);
91 1423 50       6242 $self->{_parser}->feed($leftover) if length $leftover;
92              
93             $self->_send(MSG_HELLO, STREAM_CONTROL,
94 1423         37746 encode_hello(PROTOCOL_VERSION, $self->{window}));
95              
96 1423     2846   48618 $self->_pump_until(sub { $self->{_ready} });
  2846         27247  
97              
98 1423 100       10283 unless ($self->{_ready}) {
99 1         15 my $err = '';
100 1         11 while ($t->stderr_ready(0.25)) {
101 1         168 my $chunk = $t->read_stderr(4096);
102 1 50 33     15 last unless defined $chunk && length $chunk;
103 1         6 $err .= $chunk;
104             }
105 1         250456 $err =~ s/\s+\z//;
106 1 50       75 my $detail = length($err) ? ": $err" : '';
107 1         43 die "remperl: connection failed$detail\n";
108             }
109             }
110              
111             # -- Internal: wire I/O --------------------------------------------------------
112              
113 7542     7542   13101 sub _send($self, $type, $stream, $body = '') {
  7542         13591  
  7542         12283  
  7542         9480  
  7542         18528  
  7542         10244  
114 7542         36004 $self->{_t}->write_bytes(encode_message($type, $stream, $body));
115             }
116              
117             # -- Internal: event loop ------------------------------------------------------
118              
119             # Drive the event loop until $pred->() returns true or the connection closes.
120             # $stdin_fh: optional filehandle whose data is forwarded to STREAM_STDIN.
121             # Plain-string stdin is handled via _stdin_str / _drain_stdin_str; $stdin_fh
122             # is always undef in that case (strings have no real fd for select).
123 4301     4301   8141 sub _pump_until($self, $pred, $stdin_fh = undef) {
  4301         8323  
  4301         7604  
  4301         8394  
  4301         7457  
124 4301         11254 my $t = $self->{_t};
125 4301         27981 my $out_fd = fileno($t->out_fh);
126 4301 100 50     14838 my $in_fd = defined($stdin_fh) ? (fileno($stdin_fh) // -1) : -1;
127              
128 4301   100     10262 until ($pred->() || $self->{_done}) {
129 8215         27199 my $sel = IO::Select->new($t->out_fh);
130 8215 100 100     698169 if ($in_fd >= 0 && !$self->{_stdin_eof_sent}
      100        
131             && $self->{_stdin_credits} > 0) {
132 28         136 $sel->add($stdin_fh);
133             }
134              
135 8215         34082 for my $fh ($sel->can_read(1)) {
136 8215 100       235485617 if (fileno($fh) == $out_fd) {
    50          
137 8187         56313 my $data = $t->read_bytes(65_536);
138 8187 100       21848 unless (defined $data) { $self->{_done} = 1; last }
  1         17  
  1         15  
139 8186         56437 $self->_dispatch($_) for $self->{_parser}->feed($data);
140             }
141             elsif (fileno($fh) == $in_fd) {
142 28         154 $self->_forward_stdin($stdin_fh);
143             }
144             }
145             }
146             }
147              
148 10427     10427   16681 sub _dispatch($self, $msg) {
  10427         15317  
  10427         15777  
  10427         39866  
149 10427         18884 my ($type, $stream, $body) = @{$msg}{qw(type stream body)};
  10427         34164  
150              
151 10427 100       96996 if ($type == MSG_READY) { $self->{_ready} = 1 }
  1422 100       15992  
    100          
    100          
    100          
    100          
    100          
    100          
    50          
152              
153             elsif ($type == MSG_DATA) {
154 1471 100       6283 if ($stream == STREAM_STDOUT) {
    50          
155 1177   33 0   10295 ($self->{_on_stdout} // sub {})->($body);
156 1177         9575 $self->_send(MSG_CREDIT, STREAM_STDOUT, encode_credit(length($body)));
157             }
158             elsif ($stream == STREAM_STDERR) {
159 294   33 0   1693 ($self->{_on_stderr} // sub {})->($body);
160 294         1564 $self->_send(MSG_CREDIT, STREAM_STDERR, encode_credit(length($body)));
161             }
162             }
163              
164             elsif ($type == MSG_EOF) { } # EOF on stdout/stderr; RETURN signals end-of-run
165              
166             elsif ($type == MSG_CREDIT) {
167 1721 50       6291 if ($stream == STREAM_STDIN) {
168 1721 100 66     26419 if ($self->{_no_stdin} && !$self->{_stdin_eof_sent}) {
169 1424         5936 $self->_send(MSG_EOF, STREAM_STDIN);
170 1424         13103 $self->{_stdin_eof_sent} = 1;
171             }
172             else {
173 297         701 $self->{_stdin_credits} += unpack('N', $body);
174 297 100       1169 $self->_drain_stdin_str if defined $self->{_stdin_str};
175             }
176             }
177             }
178              
179             elsif ($type == MSG_MOD_REQ) {
180             die "remperl: serve => 1 but no ModuleServer configured\n"
181 31 50 66     657 if $self->{serve} && !$self->{_mod_srv};
182 31 100 66     379 my $source = ($self->{serve} && $self->{_mod_srv}) ? $self->{_mod_srv}->find($body) : undef;
183 31 100       133 if (defined $source) {
184 16         66 my $chunk = 65_536;
185 16         255 for (my $off = 0; $off < length($source); $off += $chunk) {
186 16         101 $self->_send(MSG_DATA, $stream, substr($source, $off, $chunk));
187             }
188 16         48 $self->_send(MSG_EOF, $stream);
189             }
190             else {
191 15         142 $self->_send(MSG_MOD_MISSING, $stream);
192             }
193             }
194              
195             elsif ($type == MSG_SIGNAL_ACK) { } # reserved for future unresponsive-remote detection
196              
197             elsif ($type == MSG_RETURN) {
198 1456         9246 ($self->{_return_code}, $self->{_return_msg}) = decode_return($body);
199 1456         19343 $self->{_returned} = 1;
200             }
201              
202 10         1200 elsif ($type == MSG_ERROR) { die "remperl: remote error: $body\n" }
203              
204 1422         11086 elsif ($type == MSG_BYE) { $self->{_done} = 1 }
205             }
206              
207 269     269   338 sub _drain_stdin_str($self) {
  269         337  
  269         283  
208 269   100     1547 while ($self->{_stdin_credits} > 0 && length($self->{_stdin_str})) {
209 260         948 my $take = min(65_536, $self->{_stdin_credits}, length($self->{_stdin_str}));
210 260         1153 $self->_send(MSG_DATA, STREAM_STDIN, substr($self->{_stdin_str}, 0, $take, ''));
211 260         1869 $self->{_stdin_credits} -= $take;
212             }
213 269 100 100     2563 if (!length($self->{_stdin_str}) && !$self->{_stdin_eof_sent}) {
214 9         86 $self->_send(MSG_EOF, STREAM_STDIN);
215 9         84 $self->{_stdin_eof_sent} = 1;
216             }
217             }
218              
219 28     28   81 sub _forward_stdin($self, $stdin_fh) {
  28         59  
  28         76  
  28         58  
220 28         185 my $n = min(65_536, $self->{_stdin_credits});
221 28         66 my $data;
222 28         711 my $bytes = sysread($stdin_fh, $data, $n);
223 28 50       231 if (!defined $bytes) {
    100          
224 0         0 die "sysread stdin: $!\n";
225             }
226             elsif ($bytes == 0) {
227 23         128 $self->_send(MSG_EOF, STREAM_STDIN);
228 23         136 $self->{_stdin_eof_sent} = 1;
229             }
230             else {
231 5         22 $self->_send(MSG_DATA, STREAM_STDIN, $data);
232 5         39 $self->{_stdin_credits} -= $bytes;
233             }
234             }
235              
236             # -- Public API ----------------------------------------------------------------
237              
238             # Run Perl source code on the remote. Returns the exit code (integer).
239             # In list context also returns the error message (empty string on success).
240             #
241             # Options:
242             # on_stdout => sub($chunk) { ... } called for each stdout DATA chunk
243             # on_stderr => sub($chunk) { ... } called for each stderr DATA chunk
244             # stdin => $fh_or_str filehandle or plain string to use as
245             # remote STDIN (omit or pass undef for none)
246             # args => \@args (optional) set remote @ARGV before running
247 1456     1456 1 133368 sub run_code($self, $source, %opts) {
  1456         6935  
  1456         6917  
  1456         4638  
  1456         2216  
248 1456         5549 $self->{_on_stdout} = $opts{on_stdout};
249 1456         3462 $self->{_on_stderr} = $opts{on_stderr};
250 1456         4597 $self->{_returned} = 0;
251 1456         3626 $self->{_return_code} = undef;
252 1456         8339 $self->{_return_msg} = '';
253 1456         3622 $self->{_stdin_credits} = 0;
254 1456         3816 $self->{_stdin_eof_sent} = 0;
255              
256 1456 100       9512 my $tmpfile = exists $opts{tmpfile} ? $opts{tmpfile} : $self->{tmpfile};
257 1456   100     5817 my $warnings = $opts{warnings} // 0;
258 1456 100       9837 my $flags = _tmpfile_flag($tmpfile) | ($warnings ? FLAGS_WARNINGS : 0);
259              
260 1456 100 100     11476 if (!$flags && $self->{data_warn} && $source =~ /^__DATA__(?:\r?\n|$)/m) {
      100        
261 5         110 warn "remperl: script contains __DATA__ but --tmpfile is not set;"
262             . " use --tmpfile for __DATA__ support\n";
263             }
264              
265 1456         2982 my $stdin = $opts{stdin};
266 1456         2498 my $stdin_fh;
267 1456 100       3943 if (!defined $stdin) {
    100          
268 1424         3079 $self->{_no_stdin} = 1;
269 1424         3184 $self->{_stdin_str} = undef;
270             }
271             elsif (!ref($stdin)) {
272             # Plain string: buffer it; credits drive delivery in _dispatch.
273 9         59 $self->{_no_stdin} = 0;
274 9         70 $self->{_stdin_str} = $stdin;
275             }
276             else {
277 23         71 $self->{_no_stdin} = 0;
278 23         72 $self->{_stdin_str} = undef;
279 23         53 $stdin_fh = $stdin;
280             }
281              
282 1456 100       5780 my @argv = $opts{args} ? @{$opts{args}} : ();
  21         71  
283 1456         14130 $self->_send(MSG_RUN, STREAM_CONTROL, encode_run($flags, $source, @argv));
284 1456     6816   22370 $self->_pump_until(sub { $self->{_returned} }, $stdin_fh);
  6816         44891  
285              
286             return wantarray
287             ? ($self->{_return_code}, $self->{_return_msg})
288 1446 100       73063 : $self->{_return_code};
289             }
290              
291             # Read a local file and run its contents as Perl source on the remote.
292             # Accepts the same options as run_code, including tmpfile.
293 841     841 1 1220992 sub run_file($self, $path, %opts) {
  841         2245  
  841         2034  
  841         3329  
  841         1166  
294 841         16254 open(my $fh, '<', $path);
295 841         333444 local $/;
296 841         25420 my $source = <$fh>;
297 841         10679 return $self->run_code($source, %opts);
298             }
299              
300             # Send BYE and wait for the remote to echo it back, then close the transport.
301 1422     1422 1 333745 sub disconnect($self) {
  1422         29906  
  1422         1750  
302 1422 50       4331 return if $self->{_done};
303 1422         3825 eval { $self->_send(MSG_BYE, STREAM_CONTROL) };
  1422         6576  
304 1422 50   2844   92588 $self->_pump_until(sub { $self->{_done} }) unless $self->{_done};
  2844         15002  
305 1422         12021 $self->{_t}->disconnect;
306             }
307              
308 4     4 0 49924 sub pid($self) { $self->{_t}->pid }
  4         16  
  4         8  
  4         60  
309              
310             # Send a signal to the remote script by name (e.g. 'INT', 'TERM').
311             # The remote relay process delivers it to the executor and replies with
312             # MSG_SIGNAL_ACK, which can later be used to detect unresponsive remotes.
313 2     2 1 70 sub send_signal($self, $signame) {
  2         18  
  2         10  
  2         14  
314 2         14 $self->_send(MSG_SIGNAL, STREAM_CONTROL, $signame);
315             }
316              
317 1400     1400   69561 sub DESTROY($self) {
  1400         3309  
  1400         2137  
318 1400 50       74161 eval { $self->disconnect } unless $self->{_done};
  0            
319             }
320              
321             1;
322              
323             __END__