File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 427 514 83.0
branch 150 242 61.9
condition 121 236 51.2
subroutine 67 77 87.0
pod 31 36 86.1
total 796 1105 72.0


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 38     38   7726977 use strict;
  38         104  
  38         1443  
3 38     38   446 use warnings;
  38         77  
  38         2591  
4              
5             our $VERSION = '0.022';
6              
7 38     38   15623 use IO();
  38         37217  
  38         1441  
8 38     38   283 use Fcntl();
  38         86  
  38         696  
9 38     38   16331 use bytes();
  38         19085  
  38         1150  
10              
11 38     38   227 use Carp qw/croak confess/;
  38         68  
  38         2472  
12 38     38   217 use Config qw/%Config/;
  38         255  
  38         1566  
13 38     38   226 use List::Util qw/min/;
  38         183  
  38         2612  
14 38     38   210 use Scalar::Util qw/blessed/;
  38         95  
  38         1971  
15              
16 38     38   17837 use Errno qw/EINTR EAGAIN EPIPE/;
  38         65918  
  38         7985  
17             my %RETRY_ERRNO;
18             BEGIN {
19 38     38   224 %RETRY_ERRNO = (EINTR() => 1);
20 38 50       24145 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
21             }
22              
23             BEGIN {
24             # POSIX says writes of 512 or less are atomic, but some platforms allow for
25             # larger ones.
26 38     38   20602 require POSIX;
27 38 50 33     283438 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  38         330  
28 38         198 *PIPE_BUF = \&POSIX::PIPE_BUF;
29             }
30             else {
31 0         0 *PIPE_BUF = sub() { 512 };
32             }
33              
34 38 50 33     229 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  38         176  
35 38         105 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
36             }
37             else {
38 0         0 *SSIZE_MAX = sub() { 512 };
39             }
40              
41             {
42             # Using the default pipe size as a read size is significantly faster
43             # than a larger value on my test machine.
44 38         76 my $read_size = min(SSIZE_MAX(), 65_536);
  38         501  
45 38         700 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
46             }
47              
48 38         161 my $can_thread = 1;
49 38   33     354 $can_thread &&= $] >= 5.008001;
50 38   33     591 $can_thread &&= $Config{'useithreads'};
51              
52             # Threads are broken on perl 5.10.0 built with gcc 4.8+
53 38 0 33     227 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
54 0         0 my @parts = split /\./, $Config{'gccversion'};
55 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
56             }
57              
58 38   33     116 $can_thread &&= !$INC{'Devel/Cover.pm'};
59              
60 38 50       116 if (!$can_thread) {
    0          
61 38         119 *_get_tid = sub() { 0 };
62             }
63             elsif ($INC{'threads.pm'}) {
64 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
65             }
66             else {
67 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
68             }
69              
70 38 50       159 if ($^O eq 'MSWin32') {
71 0         0 local $@;
72 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
73 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
74 0         0 *IS_WIN32 = sub() { 1 };
75             }
76             else {
77 38         1384 *IS_WIN32 = sub() { 0 };
78             }
79             }
80              
81 38     38   305 use constant READ_SIZE => 'read_size';
  38         68  
  38         3262  
82 38     38   216 use constant RH => 'rh';
  38         62  
  38         1809  
83 38     38   178 use constant WH => 'wh';
  38         53  
  38         1519  
84 38     38   170 use constant EOF => 'eof';
  38         64  
  38         1559  
85 38     38   184 use constant STATE => 'state';
  38         64  
  38         1565  
86 38     38   165 use constant OUT_BUFFER => 'out_buffer';
  38         63  
  38         1478  
87 38     38   175 use constant IN_BUFFER => 'in_buffer';
  38         61  
  38         1857  
88 38     38   282 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  38         64  
  38         1817  
89 38     38   176 use constant READ_BLOCKING => 'read_blocking';
  38         64  
  38         1460  
90 38     38   161 use constant WRITE_BLOCKING => 'write_blocking';
  38         50  
  38         1455  
91 38     38   166 use constant BURST_PREFIX => 'burst_prefix';
  38         65  
  38         1949  
92 38     38   355 use constant BURST_POSTFIX => 'burst_postfix';
  38         449  
  38         2180  
93 38     38   186 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  38         110  
  38         1611  
94 38     38   168 use constant MESSAGE_KEY => 'message_key';
  38         76  
  38         1554  
95 38     38   176 use constant MIXED_BUFFER => 'mixed_buffer';
  38         57  
  38         2116  
96 38     38   178 use constant DELIMITER_SIZE => 'delimiter_size';
  38         54  
  38         1587  
97 38     38   264 use constant INVALID_STATE => 'invalid_state';
  38         125  
  38         1888  
98 38     38   184 use constant HIT_EPIPE => 'hit_epipe';
  38         52  
  38         232845  
99              
100 13     13 1 344 sub wh { shift->{+WH} }
101 0     0 1 0 sub rh { shift->{+RH} }
102              
103             sub throw_invalid {
104 6     6 0 16 my $self = shift;
105 6 50 66     106 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
106 6         3375 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
107             }
108              
109             sub read_size {
110 0     0 1 0 my $self = shift;
111 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
112 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
113             }
114              
115             sub fill_buffer {
116 25318     25318 1 54857 my $self = shift;
117              
118 25318 50       87452 $self->throw_invalid() if $self->{+INVALID_STATE};
119              
120 25318 50       92270 my $rh = $self->{+RH} or die "Not a read handle";
121              
122 25318 100       92806 return 0 if $self->{+EOF};
123              
124 25262   100     106570 $self->{+IN_BUFFER_SIZE} //= 0;
125              
126 25262   50     124970 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
127 25262         38940 if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
128             $to_read = min($self->_win32_pipe_ready(), $to_read);
129             }
130              
131 25262 50       79701 return 0 unless $to_read;
132              
133 25262         47657 while(1) {
134 25262         55235 my $rbuff = '';
135 25262         27407094 my $got = sysread($rh, $rbuff, $to_read);
136 25262 100       141754 unless(defined $got) {
137 81 50       1062 return 0 if $! == EAGAIN; # NON-BLOCKING
138 0 0       0 next if $RETRY_ERRNO{0 + $!}; # interrupted or something, try again
139 0         0 $self->throw_invalid("$!");
140             }
141              
142 25181 100       77220 if ($got) {
143 25170         427844 $self->{+IN_BUFFER} .= $rbuff;
144 25170         73957 $self->{+IN_BUFFER_SIZE} += $got;
145 25170         95842 return $got;
146             }
147             else {
148 11         110 $self->{+EOF} = 1;
149 11         63 return 0;
150             }
151             }
152              
153 0         0 return 0;
154             }
155              
156 800175     800175   1634309 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
157 16     16   69 sub _peek_from_buffer { shift->_from_buffer(@_) }
158              
159             sub _from_buffer {
160 800191     800191   1194974 my $self = shift;
161 800191         1784995 my ($size, %params) = @_;
162              
163 800191 100 66     3095790 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
164 25149         112400 $self->fill_buffer;
165 25149 100       113312 unless($self->{+IN_BUFFER_SIZE} >= $size) {
166 1 50 33     42 return unless $params{eof_invalid} && $self->{+EOF};
167 0         0 $self->throw_invalid($params{eof_invalid});
168             }
169             }
170              
171 800190         1148494 my $out;
172              
173 800190 100       1511234 if ($params{remove}) {
174 800174         1201495 $self->{+IN_BUFFER_SIZE} -= $size;
175 800174         2777873 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
176             }
177             else {
178 16         49 $out = substr($self->{+IN_BUFFER}, 0, $size);
179             }
180              
181 800190         2592128 return $out;
182             }
183              
184             sub eof {
185 144     144 1 2562210 my $self = shift;
186              
187 144 100       653 $self->throw_invalid() if $self->{+INVALID_STATE};
188              
189 141 100       496 return 0 if $self->fill_buffer;
190 119 100       616 return 0 unless $self->{+EOF};
191 52 100       175 return 0 if $self->{+IN_BUFFER_SIZE};
192              
193 45 50       133 if (my $buffer = $self->{+MIXED_BUFFER}) {
194 45 100 66     25272 return 0 if $buffer->{lines} || defined($buffer->{lines}) && length($buffer->{lines});
      66        
195 30 50 66     161 return 0 if $buffer->{burst} || defined($buffer->{lines}) && length($buffer->{burst});
      33        
196             }
197              
198 30         183 return 1;
199             }
200              
201             sub _fh_mode {
202 4     4   1138 my $self = shift;
203 4         6 my ($fh) = @_;
204              
205 4   50     17 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
206 4 100       13 return '<&' if $mode == Fcntl::O_RDONLY();
207 2 50       6 return '>&' if $mode == Fcntl::O_WRONLY();
208 0         0 return undef;
209             }
210              
211             my %MODE_TO_DIR = (
212             '<&' => RH(),
213             '<&=' => RH(),
214             '>&' => WH(),
215             '>&=' => WH(),
216             );
217             sub _mode_to_dir {
218 9     9   9 my $self = shift;
219 9         13 my ($mode) = @_;
220 9         29 return $MODE_TO_DIR{$mode};
221             }
222              
223             sub read_fifo {
224 7     7 1 2341852 my $class = shift;
225 7         35 my ($fifo, %params) = @_;
226              
227 7 50       281 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
228              
229 7 50       346 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
230 7         40 binmode($fh);
231              
232 7         82 return bless({%params, RH() => $fh}, $class);
233             }
234              
235             sub write_fifo {
236 6     6 1 38241 my $class = shift;
237 6         151 my ($fifo, %params) = @_;
238              
239 6 50       816 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
240              
241 6 50       1593 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
242 6         169 binmode($fh);
243              
244 6         526 return bless({%params, WH() => $fh}, $class);
245             }
246              
247             sub from_fh {
248 5     5 1 464947 my $class = shift;
249 5         9 my $ifh = pop;
250 5         7 my ($mode) = @_;
251              
252 5 50       28 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
253              
254 5   33     37 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
255 5   33     13 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
256              
257 5 50       64 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
258 5         10 binmode($fh);
259              
260 5         20 return bless({$dir => $fh}, $class);
261             }
262              
263             sub from_fd {
264 4     4 1 2226 my $class = shift;
265 4         8 my ($mode, $fd) = @_;
266              
267 4   33     7 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
268 4 50       56 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
269              
270 4 50       15 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
271              
272 4         5 binmode($fh);
273 4         13 return bless({$dir => $fh}, $class);
274             }
275              
276             sub new {
277 1     1 1 183991 my $class = shift;
278 1         3 my (%params) = @_;
279              
280 1         3 my ($rh, $wh);
281 1 50       65 pipe($rh, $wh) or die "Could not create pipe: $!";
282              
283 1         5 binmode($wh);
284 1         3 binmode($rh);
285              
286 1         12 return bless({%params, RH() => $rh, WH() => $wh}, $class);
287             }
288              
289             sub pair {
290 37     37 1 6967550 my $class = shift;
291 37         164 my (%params) = @_;
292              
293 37         384 my $mixed = delete $params{mixed_data_mode};
294              
295 37         109 my ($rh, $wh);
296 37 50       9091 pipe($rh, $wh) or die "Could not create pipe: $!";
297              
298 37         203 binmode($wh);
299 37         83 binmode($rh);
300              
301 37         294 my $r = bless({%params, RH() => $rh}, $class);
302 37         157 my $w = bless({%params, WH() => $wh}, $class);
303              
304 37 100       151 if ($mixed) {
305 15         85 $r->set_mixed_data_mode();
306 15         52 $w->set_mixed_data_mode();
307             }
308              
309 37         280 return ($r, $w);
310             }
311              
312             sub set_mixed_data_mode {
313 30     30 1 63 my $self = shift;
314              
315 30 50       132 $self->throw_invalid() if $self->{+INVALID_STATE};
316              
317 30 100       222 $self->read_blocking(0) if $self->{+RH};
318              
319 30   50     261 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
320 30   50     147 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
321 30   50     130 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
322             }
323              
324             sub get_line_burst_or_data {
325 52     52 1 625421 my $self = shift;
326 52         200 my %params = @_;
327              
328 52   33     360 my $rh = $self->{+RH} // croak "Not a read handle";
329              
330 52   33     496 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
331 52   33     329 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
332 52   33     293 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
333              
334 52   100     1474 my $buffer = $self->{+MIXED_BUFFER} //= {
335             lines => '',
336             burst => '',
337             in_burst => 0,
338             in_message => 0,
339             do_extra_loop => 0,
340             strip_term => 0,
341             };
342              
343 52         113 my $peek;
344              
345 52         219 while (1) {
346             $self->throw_invalid('Incomplete message received before EOF')
347 101 100 66     587 if $self->eof && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
348              
349 100 100 66     556 if($buffer->{lines} || length($buffer->{lines})) {
350             # Look for a complete line
351 59         113 my ($line, $term);
352 59         1307 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
353              
354 59 100       399 return (line => "${line}${term}") if $term;
355 45 50 66     413 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
356              
357 37         59 $buffer->{lines} = $line;
358 37 50 66     115 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
359             }
360              
361 78 100       231 if ($buffer->{in_message}) {
362 10         61 my ($id, $message) = $self->_extract_message(one_part_only => 1);
363              
364 10 50       30 unless(defined $id) {
365 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
366 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
367             }
368              
369 10         17 $buffer->{strip_term}++;
370 10         18 $buffer->{in_message} = 0;
371 10 100       39 return (message => $message) if defined $message;
372             }
373              
374 75 100       566 if ($buffer->{strip_term}) {
375 10   50     18 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
376              
377 10 50       30 $self->throw_invalid("No message terminator") unless $term eq $postfix;
378 10         14 $buffer->{strip_term}--;
379             }
380              
381 75 100       273 if ($buffer->{in_burst}) {
382 16   50     115 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
383              
384 16 100       100 if ($peek eq $key) {
385 10         64 $self->_get_from_buffer(1); # Strip the key
386 10         18 $buffer->{in_message} = 1;
387 10         20 $buffer->{in_burst} = 0;
388 10         23 next;
389             }
390              
391 6   50     32 $buffer->{burst} //= '';
392 6         12 my ($burst_data, $term);
393 6         236 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
394 6         30 $buffer->{burst} .= $burst_data;
395              
396 6 100       22 if ($term) {
397 4         12 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
398 4         12 $buffer->{in_burst} = 0;
399 4         60 $buffer->{do_extra_loop}++;
400 4         36 return (burst => delete($buffer->{burst}));
401             }
402             else {
403 2         14 $self->{+IN_BUFFER_SIZE} = 0;
404             }
405              
406 2 50       36 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
407             }
408              
409 59 100 66     349 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
410 28 100 66     101 return (peek => $peek) if $peek && !$self->{+EOF};
411              
412 26 100       144 return unless $self->{+EOF};
413              
414             # Do at least one more iteration after EOF
415 15 100       121 return if $buffer->{+EOF}++;
416              
417             # But do not try to split the empty buffer
418 8         27 next;
419             }
420              
421             # Look for the start of a burst, anything before a burst is line data
422 31         45 my $linedata;
423 31         1226 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
424 31 50       211 $buffer->{lines} .= $linedata if defined $linedata;
425              
426 31 100       104 if ($buffer->{in_burst}) {
427 16         44 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
428             }
429             else {
430 15         53 $self->{+IN_BUFFER_SIZE} = 0;
431             }
432             }
433             }
434              
435             sub debug {
436 0     0 0 0 my ($id, $buffer) = @_;
437              
438 0         0 print "---debug $id---\n";
439 0         0 for my $key (sort keys %$buffer) {
440 0   0     0 my $val = $buffer->{$key} // '';
441 0         0 $val =~ s/\x0E/\\x0E/g;
442 0         0 $val =~ s/\x0F/\\x0F/g;
443 0         0 $val =~ s/\x10/\\x10/g;
444 0         0 $val =~ s/\n/\\n/g;
445 0         0 $val =~ s/\r/\\r/g;
446 0         0 print "$key: |$val|\n\n";
447             };
448             }
449              
450             # This is a heavily modified version of a pattern suggested on stack-overflow
451             # and also used in Win32::PowerShell::IPC.
452             my $peek_named_pipe;
453             sub _win32_pipe_ready {
454 0     0   0 my $self = shift;
455 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
456              
457 0         0 my $buf = "";
458 0         0 my $buflen = 0;
459              
460 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
461             || die "Can't load PeekNamedPipe from kernel32.dll";
462              
463 0         0 my $got = pack('L', 0);
464 0         0 my $avail = pack('L', 0);
465 0         0 my $remain = pack('L', 0);
466              
467 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
468              
469 0 0       0 $self->{+EOF} = 1 if $ret == 0;
470              
471 0         0 return unpack('L', $avail);
472             }
473              
474             my $set_named_pipe_handle_state;
475             sub _win32_set_pipe_state {
476 0     0   0 my $self = shift;
477 0         0 my ($state) = @_;
478 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
479              
480 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
481             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
482              
483             # Block or non-block?
484 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
485              
486 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
487              
488 0         0 return $ret;
489             }
490              
491             sub read_blocking {
492 20     20 0 55 my $self = shift;
493 20 50       140 my $rh = $self->{+RH} or croak "Not a reader";
494              
495 20 50       138 ($self->{+READ_BLOCKING}) = @_ if @_;
496              
497 20         234 unless (IS_WIN32) {
498 20         385 $rh->blocking(@_);
499             }
500              
501 20         64 return $self->{+READ_BLOCKING};
502             }
503              
504             sub write_blocking {
505 5     5 0 11 my $self = shift;
506 5 50       16 my $wh = $self->{+WH} or croak "Not a writer";
507              
508 5 50       15 return $self->{+WRITE_BLOCKING} unless @_;
509              
510 5         11 my ($val) = @_;
511 5         7 $self->{+WRITE_BLOCKING} = $val;
512              
513 5         7 if (IS_WIN32) {
514             $self->_win32_set_pipe_state(@_) if @_;
515             }
516             else {
517 5         10 my $flags = 0;
518 5 50       35 fcntl($wh, &Fcntl::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
519 5 100       24 if ($val) { $flags ^= &Fcntl::O_NONBLOCK } # Remove non-blocking
  1         101  
520 4         12 else { $flags |= &Fcntl::O_NONBLOCK } # Add non-blocking to the flags
521 5 50       51 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
522             }
523              
524 5         14 return $self->{+WRITE_BLOCKING};
525             }
526              
527             sub blocking {
528 10     10 1 1305724 my $self = shift;
529              
530 10 100 66     243 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
531 5         89 return $self->read_blocking(@_);
532             }
533             elsif ($self->{+WH} && !$self->{+RH}) {
534 5         148 return $self->write_blocking(@_);
535             }
536              
537 0         0 my $r = $self->read_blocking(@_);
538 0         0 my $w = $self->write_blocking(@_);
539              
540 0 0 0     0 return 1 if $r && $w;
541 0 0 0     0 return 0 if !$r && !$w;
542 0         0 return undef;
543             }
544              
545             sub size {
546 0     0 1 0 my $self = shift;
547 0 0       0 return unless defined &Fcntl::F_GETPIPE_SZ;
548 0   0     0 my $fh = $self->{+WH} // $self->{+RH};
549 0         0 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
550             }
551              
552             sub resize {
553 3     3 1 7809 my $self = shift;
554 3         69 my ($size) = @_;
555              
556 3 50       162 return unless defined &Fcntl::F_SETPIPE_SZ;
557 3   33     51 my $fh = $self->{+WH} // $self->{+RH};
558              
559 3         66 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size);
560             }
561              
562             my $ONE_MB = 1 * 1024 * 1024;
563              
564             sub max_size {
565 0 0   0 1 0 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
566              
567 0 0       0 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
568 0         0 chomp(my $val = <$max>);
569 0         0 close($max);
570 0   0     0 return $val || $ONE_MB;
571             }
572              
573             sub resize_or_max {
574 0     0 1 0 my $self = shift;
575 0         0 my ($size) = @_;
576 0         0 $size = min($size, $self->max_size);
577 0         0 $self->resize($size);
578             }
579              
580             sub is_reader {
581 4     4 1 633 my $self = shift;
582 4 50 33     36 return 1 if $self->{+RH} && !$self->{+WH};
583 0         0 return undef;
584             }
585              
586             sub is_writer {
587 4     4 1 6 my $self = shift;
588 4 50 33     21 return 1 if $self->{+WH} && !$self->{+RH};
589 0         0 return undef;
590             }
591              
592             sub clone_writer {
593 3     3 1 17 my $self = shift;
594 3         6 my $class = blessed($self);
595 3 50       69 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
596 3         17 return bless({WH() => $fh}, $class);
597             }
598              
599             sub clone_reader {
600 0     0 1 0 my $self = shift;
601 0         0 my $class = blessed($self);
602 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
603 0         0 return bless({RH() => $fh}, $class);
604             }
605              
606             sub writer {
607 0     0 1 0 my $self = shift;
608              
609 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
610              
611 0 0       0 return 1 unless $self->{+RH};
612              
613 0         0 close(delete $self->{+RH});
614 0         0 return 1;
615             }
616              
617             sub reader {
618 1     1 1 7 my $self = shift;
619              
620 1 50       5 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
621              
622 1 50       6 return 1 unless $self->{+WH};
623              
624 1         7 close(delete $self->{+WH});
625 1         6 return 1;
626             }
627              
628             sub close {
629 12     12 1 11131 my $self = shift;
630 12 100       447 close(delete $self->{+WH}) if $self->{+WH};
631 12 100       400 close(delete $self->{+RH}) if $self->{+RH};
632 12         65 return;
633             }
634              
635             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
636             my $dsize = PIPE_BUF - $psize;
637              
638             sub delimiter_size {
639 35 50   35 0 1397 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
640 35   100     1428 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
641             }
642              
643             sub fits_in_burst {
644 8     8 1 35 my $self = shift;
645 8         19 my ($data) = @_;
646              
647 8   100     95 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
648 8 100       142 return undef unless $size <= PIPE_BUF;
649              
650 7         18 return $size;
651             }
652              
653             sub write_burst {
654 4     4 1 3615 my $self = shift;
655 4         33 my ($data) = @_;
656              
657 4   100     38 my $size = $self->fits_in_burst($data) // return undef;
658              
659 3   100     12 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  3         44  
660 3         33 $self->flush();
661              
662 3         15 return 1;
663             }
664              
665             sub DESTROY {
666 97     97   13769439 my $self = shift;
667 97 50       623 return if $self->{+HIT_EPIPE};
668 97 50       601 $self->flush(blocking => 1) if $self->pending_output;
669             }
670              
671             sub pending_output {
672 99     99 1 1551 my $self = shift;
673 99 100       11246 my $buffer = $self->{+OUT_BUFFER} or return 0;
674 32 100       9422 return 0 unless @$buffer;
675 1         7 return 1;
676             }
677              
678             sub flush {
679 692345     692345 1 2631380 my $self = shift;
680 692345         1240032 my %params = @_;
681 692345   66     2498508 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
682              
683 692345   50     1421457 my $buffer = $self->{+OUT_BUFFER} // return;
684              
685 692345         1335693 while (@$buffer) {
686 1246395         2030393 my $set = $buffer->[0];
687 1246395         2735742 my $got = $self->_write_burst(@$set);
688              
689 1246393 100 100     3898411 return unless $blocking || defined $got;
690 654079 100       1590644 next unless defined $got;
691              
692 400058         1778522 shift @$buffer;
693             }
694              
695 100029         322639 return;
696             }
697              
698             sub _write_burst {
699 1246394     1246394   2052285 my $self = shift;
700 1246394         2440147 my ($data, $size) = @_;
701              
702 1246394 50       3012123 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
703              
704 1246394 100       2575245 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
705              
706 1246393   100     3771597 my $prefix = $self->{+BURST_PREFIX} // '';
707 1246393   100     3479136 my $postfix = $self->{+BURST_POSTFIX} // '';
708              
709 1246393 100 66     4179834 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
710              
711 1246393         1801935 my $wrote;
712 1246393         1855312 my $loop = 0;
713             SWRITE: {
714 1246393         1753414 $wrote = syswrite($wh, $data, $size);
  1246393         817936173  
715 1246393 100 100     7268544 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
716 1         19 $self->{+HIT_EPIPE} = 1;
717 1         2 delete $self->{+OUT_BUFFER};
718 1         257 croak "Disconnected pipe";
719             }
720 1246392 100 100     5411870 return undef if $! == EAGAIN || (IS_WIN32 && $! == 28); # NON-BLOCKING
721 400058 50 33     2256001 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
722 400058 50       1322542 last SWRITE if $wrote == $size;
723 0   0     0 $wrote //= "";
724 0         0 die "$wrote vs $size: $!";
725             }
726              
727 400058         1445428 return $wrote;
728             }
729              
730             sub _adjusted_dsize {
731 32     32   1129 my $self = shift;
732              
733 32 50       847 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
734              
735 32   100     2736 my $message_key = $self->{+MESSAGE_KEY} // '';
736 32   100     1197 my $prefix = $self->{+BURST_PREFIX} // '';
737 32   100     1414 my $postfix = $self->{+BURST_POSTFIX} // '';
738              
739 32         1474 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
740 32         1167 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
741             }
742              
743             sub write_message {
744 100029     100029 1 24691253 my $self = shift;
745 100029         334000 my ($data) = @_;
746              
747 100029         236733 my $tid = _get_tid();
748 100029   100     582695 my $message_key = $self->{+MESSAGE_KEY} // '';
749 100029   66     420795 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
750 100029         527275 my $dtotal = bytes::length($data);
751              
752 100029         732786 my $parts = int($dtotal / $adjusted_dsize);
753 100029 50       401828 $parts++ if $dtotal % $adjusted_dsize;
754              
755 100029         234230 my $id = $parts - 1;
756              
757             # Unwinding the loop for a 1-part message for micro-optimization
758 100029 100       318141 if ($parts == 1) {
759 16         56 my $bytes = $data;
760 16         73 my $size = $dtotal;
761 16         313 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
762              
763 16 50 66     369 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
764              
765 16   100     294 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  16         229  
766             }
767             else {
768 100013         378431 for (my $part = 0; $part < $parts; $part++) {
769 400041         1181699 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
770 400041         3042733 my $size = bytes::length($bytes);
771              
772 400041         3970974 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
773              
774 400041   100     1024085 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
775 400041   100     2131434 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  400041         2267985  
776             }
777             }
778              
779 100029         407504 $self->flush();
780 100029         704039 return $parts;
781             }
782              
783             sub read_message {
784 100040     100040 1 153743926 my $self = shift;
785 100040         254435 my %params = @_;
786              
787 100040         292344 my ($id, $out) = $self->_extract_message(%params);
788              
789 100040 100       527878 return $out if defined $id;
790 1         14 return;
791             }
792              
793             sub _extract_message {
794 100050     100050   177711 my $self = shift;
795 100050         180445 my %params = @_;
796              
797 100050   100     372824 my $state = $self->{+STATE} //= {};
798              
799 100050         160999 while (1) {
800 400078 50       986038 unless ($state->{key}) {
801 400078 100       904040 my $key_bytes = $self->_get_from_buffer($psize) or return;
802              
803 400077         674440 my %key;
804 400077         1743512 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
805 400077         989447 $state->{key} = \%key;
806             }
807              
808 400077         647676 my $key = $state->{key};
809              
810 400077   50     850091 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
811              
812 400077         722176 my $id = $key->{id};
813 400077         667076 my $tag = join ':' => @{$key}{qw/pid tid/};
  400077         1302130  
814 400077   100     614974 push @{$state->{parts}->{$tag} //= []} => $id;
  400077         1448717  
815 400077 100       2288282 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
816              
817 400077         742066 delete $state->{key};
818              
819 400077 100       808644 unless ($id == 0) {
820 300035 100       631996 return ($id, undef) if $params{one_part_only};
821 300028         866053 next;
822             }
823              
824 100042         273919 my $message = delete $state->{buffers}->{$tag};
825 100042         235426 my $parts = delete $state->{parts}->{$tag};
826              
827 100042 100       787075 return ($id, $message) unless $params{debug};
828              
829             return (
830             $id,
831             {
832             message => $message,
833             parts => $parts,
834             pid => $key->{pid},
835             tid => $key->{tid},
836             },
837 1         81 );
838             }
839             }
840              
841             1;
842              
843             __END__