File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 427 514 83.0
branch 150 242 61.9
condition 121 236 51.2
subroutine 67 77 87.0
pod 31 36 86.1
total 796 1105 72.0


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 38     38   6694283 use strict;
  38         80  
  38         1268  
3 38     38   330 use warnings;
  38         83  
  38         2155  
4              
5             our $VERSION = '0.023';
6              
7 38     38   14503 use IO();
  38         32470  
  38         979  
8 38     38   228 use Fcntl();
  38         72  
  38         440  
9 38     38   14166 use bytes();
  38         16016  
  38         970  
10              
11 38     38   262 use Carp qw/croak confess/;
  38         59  
  38         2416  
12 38     38   217 use Config qw/%Config/;
  38         104  
  38         1487  
13 38     38   211 use List::Util qw/min/;
  38         69  
  38         2112  
14 38     38   160 use Scalar::Util qw/blessed/;
  38         63  
  38         1421  
15              
16 38     38   15098 use Errno qw/EINTR EAGAIN EPIPE/;
  38         47390  
  38         5611  
17             my %RETRY_ERRNO;
18             BEGIN {
19 38     38   167 %RETRY_ERRNO = (EINTR() => 1);
20 38 50       20256 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
21             }
22              
23             BEGIN {
24             # POSIX says writes of 512 or less are atomic, but some platforms allow for
25             # larger ones.
26 38     38   17435 require POSIX;
27 38 50 33     217666 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  38         245  
28 38         133 *PIPE_BUF = \&POSIX::PIPE_BUF;
29             }
30             else {
31 0         0 *PIPE_BUF = sub() { 512 };
32             }
33              
34 38 50 33     198 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  38         138  
35 38         76 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
36             }
37             else {
38 0         0 *SSIZE_MAX = sub() { 512 };
39             }
40              
41             {
42             # Using the default pipe size as a read size is significantly faster
43             # than a larger value on my test machine.
44 38         71 my $read_size = min(SSIZE_MAX(), 65_536);
  38         379  
45 38         378 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
46             }
47              
48 38         241 my $can_thread = 1;
49 38   33     221 $can_thread &&= $] >= 5.008001;
50 38   33     466 $can_thread &&= $Config{'useithreads'};
51              
52             # Threads are broken on perl 5.10.0 built with gcc 4.8+
53 38 0 33     160 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
54 0         0 my @parts = split /\./, $Config{'gccversion'};
55 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
56             }
57              
58 38   33     93 $can_thread &&= !$INC{'Devel/Cover.pm'};
59              
60 38 50       90 if (!$can_thread) {
    0          
61 38         62 *_get_tid = sub() { 0 };
62             }
63             elsif ($INC{'threads.pm'}) {
64 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
65             }
66             else {
67 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
68             }
69              
70 38 50       130 if ($^O eq 'MSWin32') {
71 0         0 local $@;
72 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
73 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
74 0         0 *IS_WIN32 = sub() { 1 };
75             }
76             else {
77 38         1189 *IS_WIN32 = sub() { 0 };
78             }
79             }
80              
81 38     38   241 use constant READ_SIZE => 'read_size';
  38         51  
  38         2547  
82 38     38   164 use constant RH => 'rh';
  38         43  
  38         1231  
83 38     38   120 use constant WH => 'wh';
  38         39  
  38         1008  
84 38     38   214 use constant EOF => 'eof';
  38         48  
  38         1114  
85 38     38   142 use constant STATE => 'state';
  38         42  
  38         1001  
86 38     38   103 use constant OUT_BUFFER => 'out_buffer';
  38         46  
  38         980  
87 38     38   128 use constant IN_BUFFER => 'in_buffer';
  38         42  
  38         1664  
88 38     38   158 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  38         38  
  38         1144  
89 38     38   125 use constant READ_BLOCKING => 'read_blocking';
  38         38  
  38         1196  
90 38     38   131 use constant WRITE_BLOCKING => 'write_blocking';
  38         42  
  38         1226  
91 38     38   131 use constant BURST_PREFIX => 'burst_prefix';
  38         50  
  38         1549  
92 38     38   159 use constant BURST_POSTFIX => 'burst_postfix';
  38         228  
  38         1596  
93 38     38   146 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  38         44  
  38         1107  
94 38     38   120 use constant MESSAGE_KEY => 'message_key';
  38         50  
  38         1258  
95 38     38   115 use constant MIXED_BUFFER => 'mixed_buffer';
  38         37  
  38         1307  
96 38     38   123 use constant DELIMITER_SIZE => 'delimiter_size';
  38         39  
  38         1124  
97 38     38   147 use constant INVALID_STATE => 'invalid_state';
  38         39  
  38         1245  
98 38     38   133 use constant HIT_EPIPE => 'hit_epipe';
  38         37  
  38         174812  
99              
100 14     14 1 320 sub wh { shift->{+WH} }
101 0     0 1 0 sub rh { shift->{+RH} }
102              
103             sub throw_invalid {
104 6     6 0 22 my $self = shift;
105 6 50 66     150 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
106 6         2810 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
107             }
108              
109             sub read_size {
110 0     0 1 0 my $self = shift;
111 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
112 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
113             }
114              
115             sub fill_buffer {
116 24218     24218 1 27456 my $self = shift;
117              
118 24218 50       46253 $self->throw_invalid() if $self->{+INVALID_STATE};
119              
120 24218 50       50951 my $rh = $self->{+RH} or die "Not a read handle";
121              
122 24218 100       49347 return 0 if $self->{+EOF};
123              
124 24162   100     42971 $self->{+IN_BUFFER_SIZE} //= 0;
125              
126 24162   50     68075 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
127 24162         26324 if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
128             $to_read = min($self->_win32_pipe_ready(), $to_read);
129             }
130              
131 24162 50       41827 return 0 unless $to_read;
132              
133 24162         28532 while(1) {
134 24162         32408 my $rbuff = '';
135 24162         23186074 my $got = sysread($rh, $rbuff, $to_read);
136 24162 100       69396 unless(defined $got) {
137 93 50       1213 return 0 if $! == EAGAIN; # NON-BLOCKING
138 0 0       0 next if $RETRY_ERRNO{0 + $!}; # interrupted or something, try again
139 0         0 $self->throw_invalid("$!");
140             }
141              
142 24069 100       42017 if ($got) {
143 24058         329779 $self->{+IN_BUFFER} .= $rbuff;
144 24058         33907 $self->{+IN_BUFFER_SIZE} += $got;
145 24058         52491 return $got;
146             }
147             else {
148 11         104 $self->{+EOF} = 1;
149 11         66 return 0;
150             }
151             }
152              
153 0         0 return 0;
154             }
155              
156 800175     800175   1231910 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
157 16     16   127 sub _peek_from_buffer { shift->_from_buffer(@_) }
158              
159             sub _from_buffer {
160 800191     800191   909981 my $self = shift;
161 800191         1250994 my ($size, %params) = @_;
162              
163 800191 100 66     2127200 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
164 24041         53214 $self->fill_buffer;
165 24041 100       55229 unless($self->{+IN_BUFFER_SIZE} >= $size) {
166 1 50 33     8 return unless $params{eof_invalid} && $self->{+EOF};
167 0         0 $self->throw_invalid($params{eof_invalid});
168             }
169             }
170              
171 800190         835240 my $out;
172              
173 800190 100       1078935 if ($params{remove}) {
174 800174         897130 $self->{+IN_BUFFER_SIZE} -= $size;
175 800174         1873107 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
176             }
177             else {
178 16         56 $out = substr($self->{+IN_BUFFER}, 0, $size);
179             }
180              
181 800190         1811512 return $out;
182             }
183              
184             sub eof {
185 148     148 1 2422045 my $self = shift;
186              
187 148 100       430 $self->throw_invalid() if $self->{+INVALID_STATE};
188              
189 145 100       489 return 0 if $self->fill_buffer;
190 127 100       486 return 0 unless $self->{+EOF};
191 52 100       137 return 0 if $self->{+IN_BUFFER_SIZE};
192              
193 45 50       124 if (my $buffer = $self->{+MIXED_BUFFER}) {
194 45 100 66     356 return 0 if $buffer->{lines} || defined($buffer->{lines}) && length($buffer->{lines});
      66        
195 30 50 66     192 return 0 if $buffer->{burst} || defined($buffer->{lines}) && length($buffer->{burst});
      33        
196             }
197              
198 30         136 return 1;
199             }
200              
201             sub _fh_mode {
202 4     4   1154 my $self = shift;
203 4         5 my ($fh) = @_;
204              
205 4   50     18 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
206 4 100       13 return '<&' if $mode == Fcntl::O_RDONLY();
207 2 50       8 return '>&' if $mode == Fcntl::O_WRONLY();
208 0         0 return undef;
209             }
210              
211             my %MODE_TO_DIR = (
212             '<&' => RH(),
213             '<&=' => RH(),
214             '>&' => WH(),
215             '>&=' => WH(),
216             );
217             sub _mode_to_dir {
218 9     9   12 my $self = shift;
219 9         11 my ($mode) = @_;
220 9         31 return $MODE_TO_DIR{$mode};
221             }
222              
223             sub read_fifo {
224 7     7 1 1418855 my $class = shift;
225 7         23 my ($fifo, %params) = @_;
226              
227 7 50       104 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
228              
229 7 50       254 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
230 7         22 binmode($fh);
231              
232 7         58 return bless({%params, RH() => $fh}, $class);
233             }
234              
235             sub write_fifo {
236 6     6 1 28261 my $class = shift;
237 6         209 my ($fifo, %params) = @_;
238              
239 6 50       723 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
240              
241 6 50       1421 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
242 6         102 binmode($fh);
243              
244 6         392 return bless({%params, WH() => $fh}, $class);
245             }
246              
247             sub from_fh {
248 5     5 1 352330 my $class = shift;
249 5         7 my $ifh = pop;
250 5         9 my ($mode) = @_;
251              
252 5 50       34 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
253              
254 5   33     13 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
255 5   33     14 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
256              
257 5 50       70 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
258 5         9 binmode($fh);
259              
260 5         22 return bless({$dir => $fh}, $class);
261             }
262              
263             sub from_fd {
264 4     4 1 2256 my $class = shift;
265 4         5 my ($mode, $fd) = @_;
266              
267 4   33     6 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
268 4 50       50 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
269              
270 4 50       14 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
271              
272 4         6 binmode($fh);
273 4         14 return bless({$dir => $fh}, $class);
274             }
275              
276             sub new {
277 1     1 1 143989 my $class = shift;
278 1         3 my (%params) = @_;
279              
280 1         2 my ($rh, $wh);
281 1 50       45 pipe($rh, $wh) or die "Could not create pipe: $!";
282              
283 1         3 binmode($wh);
284 1         1 binmode($rh);
285              
286 1         6 return bless({%params, RH() => $rh, WH() => $wh}, $class);
287             }
288              
289             sub pair {
290 37     37 1 6148359 my $class = shift;
291 37         180 my (%params) = @_;
292              
293 37         141 my $mixed = delete $params{mixed_data_mode};
294              
295 37         96 my ($rh, $wh);
296 37 50       1707 pipe($rh, $wh) or die "Could not create pipe: $!";
297              
298 37         155 binmode($wh);
299 37         84 binmode($rh);
300              
301 37         327 my $r = bless({%params, RH() => $rh}, $class);
302 37         157 my $w = bless({%params, WH() => $wh}, $class);
303              
304 37 100       148 if ($mixed) {
305 15         78 $r->set_mixed_data_mode();
306 15         47 $w->set_mixed_data_mode();
307             }
308              
309 37         288 return ($r, $w);
310             }
311              
312             sub set_mixed_data_mode {
313 30     30 1 53 my $self = shift;
314              
315 30 50       135 $self->throw_invalid() if $self->{+INVALID_STATE};
316              
317 30 100       206 $self->read_blocking(0) if $self->{+RH};
318              
319 30   50     181 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
320 30   50     283 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
321 30   50     283 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
322             }
323              
324             sub get_line_burst_or_data {
325 56     56 1 1453425 my $self = shift;
326 56         194 my %params = @_;
327              
328 56   33     383 my $rh = $self->{+RH} // croak "Not a read handle";
329              
330 56   33     510 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
331 56   33     306 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
332 56   33     214 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
333              
334 56   100     1299 my $buffer = $self->{+MIXED_BUFFER} //= {
335             lines => '',
336             burst => '',
337             in_burst => 0,
338             in_message => 0,
339             do_extra_loop => 0,
340             strip_term => 0,
341             };
342              
343 56         146 my $peek;
344              
345 56         82 while (1) {
346             $self->throw_invalid('Incomplete message received before EOF')
347 105 100 66     627 if $self->eof && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
348              
349 104 100 66     2496 if($buffer->{lines} || length($buffer->{lines})) {
350             # Look for a complete line
351 63         96 my ($line, $term);
352 63         1432 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
353              
354 63 100       449 return (line => "${line}${term}") if $term;
355 49 50 66     422 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
356              
357 41         73 $buffer->{lines} = $line;
358 41 50 66     124 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
359             }
360              
361 82 100       202 if ($buffer->{in_message}) {
362 10         112 my ($id, $message) = $self->_extract_message(one_part_only => 1);
363              
364 10 50       56 unless(defined $id) {
365 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
366 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
367             }
368              
369 10         23 $buffer->{strip_term}++;
370 10         22 $buffer->{in_message} = 0;
371 10 100       82 return (message => $message) if defined $message;
372             }
373              
374 79 100       181 if ($buffer->{strip_term}) {
375 10   50     41 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
376              
377 10 50       41 $self->throw_invalid("No message terminator") unless $term eq $postfix;
378 10         22 $buffer->{strip_term}--;
379             }
380              
381 79 100       359 if ($buffer->{in_burst}) {
382 16   50     175 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
383              
384 16 100       54 if ($peek eq $key) {
385 10         80 $self->_get_from_buffer(1); # Strip the key
386 10         19 $buffer->{in_message} = 1;
387 10         37 $buffer->{in_burst} = 0;
388 10         32 next;
389             }
390              
391 6   50     62 $buffer->{burst} //= '';
392 6         18 my ($burst_data, $term);
393 6         202 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
394 6         26 $buffer->{burst} .= $burst_data;
395              
396 6 100       20 if ($term) {
397 4         16 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
398 4         12 $buffer->{in_burst} = 0;
399 4         12 $buffer->{do_extra_loop}++;
400 4         96 return (burst => delete($buffer->{burst}));
401             }
402             else {
403 2         16 $self->{+IN_BUFFER_SIZE} = 0;
404             }
405              
406 2 50       32 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
407             }
408              
409 63 100 66     317 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
410 32 100 66     113 return (peek => $peek) if $peek && !$self->{+EOF};
411              
412 30 100       169 return unless $self->{+EOF};
413              
414             # Do at least one more iteration after EOF
415 15 100       176 return if $buffer->{+EOF}++;
416              
417             # But do not try to split the empty buffer
418 8         36 next;
419             }
420              
421             # Look for the start of a burst, anything before a burst is line data
422 31         117 my $linedata;
423 31         1071 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
424 31 50       190 $buffer->{lines} .= $linedata if defined $linedata;
425              
426 31 100       80 if ($buffer->{in_burst}) {
427 16         57 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
428             }
429             else {
430 15         37 $self->{+IN_BUFFER_SIZE} = 0;
431             }
432             }
433             }
434              
435             sub debug {
436 0     0 0 0 my ($id, $buffer) = @_;
437              
438 0         0 print "---debug $id---\n";
439 0         0 for my $key (sort keys %$buffer) {
440 0   0     0 my $val = $buffer->{$key} // '';
441 0         0 $val =~ s/\x0E/\\x0E/g;
442 0         0 $val =~ s/\x0F/\\x0F/g;
443 0         0 $val =~ s/\x10/\\x10/g;
444 0         0 $val =~ s/\n/\\n/g;
445 0         0 $val =~ s/\r/\\r/g;
446 0         0 print "$key: |$val|\n\n";
447             };
448             }
449              
450             # This is a heavily modified version of a pattern suggested on stack-overflow
451             # and also used in Win32::PowerShell::IPC.
452             my $peek_named_pipe;
453             sub _win32_pipe_ready {
454 0     0   0 my $self = shift;
455 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
456              
457 0         0 my $buf = "";
458 0         0 my $buflen = 0;
459              
460 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
461             || die "Can't load PeekNamedPipe from kernel32.dll";
462              
463 0         0 my $got = pack('L', 0);
464 0         0 my $avail = pack('L', 0);
465 0         0 my $remain = pack('L', 0);
466              
467 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
468              
469 0 0       0 $self->{+EOF} = 1 if $ret == 0;
470              
471 0         0 return unpack('L', $avail);
472             }
473              
474             my $set_named_pipe_handle_state;
475             sub _win32_set_pipe_state {
476 0     0   0 my $self = shift;
477 0         0 my ($state) = @_;
478 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
479              
480 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
481             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
482              
483             # Block or non-block?
484 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
485              
486 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
487              
488 0         0 return $ret;
489             }
490              
491             sub read_blocking {
492 20     20 0 49 my $self = shift;
493 20 50       108 my $rh = $self->{+RH} or croak "Not a reader";
494              
495 20 50       169 ($self->{+READ_BLOCKING}) = @_ if @_;
496              
497 20         97 unless (IS_WIN32) {
498 20         569 $rh->blocking(@_);
499             }
500              
501 20         59 return $self->{+READ_BLOCKING};
502             }
503              
504             sub write_blocking {
505 5     5 0 10 my $self = shift;
506 5 50       16 my $wh = $self->{+WH} or croak "Not a writer";
507              
508 5 50       10 return $self->{+WRITE_BLOCKING} unless @_;
509              
510 5         6 my ($val) = @_;
511 5         10 $self->{+WRITE_BLOCKING} = $val;
512              
513 5         12 if (IS_WIN32) {
514             $self->_win32_set_pipe_state(@_) if @_;
515             }
516             else {
517 5         10 my $flags = 0;
518 5 50       40 fcntl($wh, &Fcntl::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
519 5 100       20 if ($val) { $flags ^= &Fcntl::O_NONBLOCK } # Remove non-blocking
  1         78  
520 4         12 else { $flags |= &Fcntl::O_NONBLOCK } # Add non-blocking to the flags
521 5 50       57 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
522             }
523              
524 5         11 return $self->{+WRITE_BLOCKING};
525             }
526              
527             sub blocking {
528 10     10 1 950244 my $self = shift;
529              
530 10 100 66     245 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
531 5         36 return $self->read_blocking(@_);
532             }
533             elsif ($self->{+WH} && !$self->{+RH}) {
534 5         32 return $self->write_blocking(@_);
535             }
536              
537 0         0 my $r = $self->read_blocking(@_);
538 0         0 my $w = $self->write_blocking(@_);
539              
540 0 0 0     0 return 1 if $r && $w;
541 0 0 0     0 return 0 if !$r && !$w;
542 0         0 return undef;
543             }
544              
545             sub size {
546 0     0 1 0 my $self = shift;
547 0 0       0 return unless defined &Fcntl::F_GETPIPE_SZ;
548 0   0     0 my $fh = $self->{+WH} // $self->{+RH};
549 0         0 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
550             }
551              
552             sub resize {
553 3     3 1 8370 my $self = shift;
554 3         45 my ($size) = @_;
555              
556 3 50       75 return unless defined &Fcntl::F_SETPIPE_SZ;
557 3   33     126 my $fh = $self->{+WH} // $self->{+RH};
558              
559 3         66 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size);
560             }
561              
562             my $ONE_MB = 1 * 1024 * 1024;
563              
564             sub max_size {
565 0 0   0 1 0 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
566              
567 0 0       0 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
568 0         0 chomp(my $val = <$max>);
569 0         0 close($max);
570 0   0     0 return $val || $ONE_MB;
571             }
572              
573             sub resize_or_max {
574 0     0 1 0 my $self = shift;
575 0         0 my ($size) = @_;
576 0         0 $size = min($size, $self->max_size);
577 0         0 $self->resize($size);
578             }
579              
580             sub is_reader {
581 4     4 1 649 my $self = shift;
582 4 50 33     56 return 1 if $self->{+RH} && !$self->{+WH};
583 0         0 return undef;
584             }
585              
586             sub is_writer {
587 4     4 1 6 my $self = shift;
588 4 50 33     24 return 1 if $self->{+WH} && !$self->{+RH};
589 0         0 return undef;
590             }
591              
592             sub clone_writer {
593 3     3 1 11 my $self = shift;
594 3         5 my $class = blessed($self);
595 3 50       31 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
596 3         7 return bless({WH() => $fh}, $class);
597             }
598              
599             sub clone_reader {
600 0     0 1 0 my $self = shift;
601 0         0 my $class = blessed($self);
602 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
603 0         0 return bless({RH() => $fh}, $class);
604             }
605              
606             sub writer {
607 0     0 1 0 my $self = shift;
608              
609 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
610              
611 0 0       0 return 1 unless $self->{+RH};
612              
613 0         0 close(delete $self->{+RH});
614 0         0 return 1;
615             }
616              
617             sub reader {
618 1     1 1 5 my $self = shift;
619              
620 1 50       3 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
621              
622 1 50       2 return 1 unless $self->{+WH};
623              
624 1         5 close(delete $self->{+WH});
625 1         3 return 1;
626             }
627              
628             sub close {
629 12     12 1 9454 my $self = shift;
630 12 100       353 close(delete $self->{+WH}) if $self->{+WH};
631 12 100       381 close(delete $self->{+RH}) if $self->{+RH};
632 12         51 return;
633             }
634              
635             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
636             my $dsize = PIPE_BUF - $psize;
637              
638             sub delimiter_size {
639 35 50   35 0 1474 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
640 35   100     1249 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
641             }
642              
643             sub fits_in_burst {
644 8     8 1 36 my $self = shift;
645 8         22 my ($data) = @_;
646              
647 8   100     75 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
648 8 100       88 return undef unless $size <= PIPE_BUF;
649              
650 7         31 return $size;
651             }
652              
653             sub write_burst {
654 4     4 1 3249 my $self = shift;
655 4         23 my ($data) = @_;
656              
657 4   100     26 my $size = $self->fits_in_burst($data) // return undef;
658              
659 3   100     38 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  3         50  
660 3         49 $self->flush();
661              
662 3         30 return 1;
663             }
664              
665             sub DESTROY {
666 97     97   12796561 my $self = shift;
667 97 50       478 return if $self->{+HIT_EPIPE};
668 97 50       449 $self->flush(blocking => 1) if $self->pending_output;
669             }
670              
671             sub pending_output {
672 99     99 1 1117 my $self = shift;
673 99 100       10730 my $buffer = $self->{+OUT_BUFFER} or return 0;
674 32 100       8635 return 0 unless @$buffer;
675 1         6 return 1;
676             }
677              
678             sub flush {
679 1092533     1092533 1 2517656 my $self = shift;
680 1092533         1166022 my %params = @_;
681 1092533   66     2225420 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
682              
683 1092533   50     1391414 my $buffer = $self->{+OUT_BUFFER} // return;
684              
685 1092533         1298478 while (@$buffer) {
686 1822777         1893472 my $set = $buffer->[0];
687 1822777         2363248 my $got = $self->_write_burst(@$set);
688              
689 1822775 100 100     3446000 return unless $blocking || defined $got;
690 830273 100       1263810 next unless defined $got;
691              
692 400058         1212474 shift @$buffer;
693             }
694              
695 100029         228887 return;
696             }
697              
698             sub _write_burst {
699 1822776     1822776   1757861 my $self = shift;
700 1822776         2140604 my ($data, $size) = @_;
701              
702 1822776 50       2554771 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
703              
704 1822776 100       2355121 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
705              
706 1822775   100     3275146 my $prefix = $self->{+BURST_PREFIX} // '';
707 1822775   100     3063743 my $postfix = $self->{+BURST_POSTFIX} // '';
708              
709 1822775 100 66     3619760 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
710              
711 1822775         1681257 my $wrote;
712 1822775         1668867 my $loop = 0;
713             SWRITE: {
714 1822775         1636922 $wrote = syswrite($wh, $data, $size);
  1822775         511570258  
715 1822775 100 100     5811649 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
716 1         11 $self->{+HIT_EPIPE} = 1;
717 1         1 delete $self->{+OUT_BUFFER};
718 1         234 croak "Disconnected pipe";
719             }
720 1822774 100 100     4374935 return undef if $! == EAGAIN || (IS_WIN32 && $! == 28); # NON-BLOCKING
721 400058 50 33     1472266 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
722 400058 50       868219 last SWRITE if $wrote == $size;
723 0   0     0 $wrote //= "";
724 0         0 die "$wrote vs $size: $!";
725             }
726              
727 400058         964385 return $wrote;
728             }
729              
730             sub _adjusted_dsize {
731 32     32   327 my $self = shift;
732              
733 32 50       2032 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
734              
735 32   100     2241 my $message_key = $self->{+MESSAGE_KEY} // '';
736 32   100     1310 my $prefix = $self->{+BURST_PREFIX} // '';
737 32   100     720 my $postfix = $self->{+BURST_POSTFIX} // '';
738              
739 32         1447 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
740 32         1446 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
741             }
742              
743             sub write_message {
744 100029     100029 1 23382081 my $self = shift;
745 100029         223042 my ($data) = @_;
746              
747 100029         173527 my $tid = _get_tid();
748 100029   100     411824 my $message_key = $self->{+MESSAGE_KEY} // '';
749 100029   66     251679 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
750 100029         335832 my $dtotal = bytes::length($data);
751              
752 100029         498492 my $parts = int($dtotal / $adjusted_dsize);
753 100029 50       238360 $parts++ if $dtotal % $adjusted_dsize;
754              
755 100029         148878 my $id = $parts - 1;
756              
757             # Unwinding the loop for a 1-part message for micro-optimization
758 100029 100       230287 if ($parts == 1) {
759 16         80 my $bytes = $data;
760 16         58 my $size = $dtotal;
761 16         354 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
762              
763 16 50 66     335 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
764              
765 16   100     261 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  16         320  
766             }
767             else {
768 100013         248965 for (my $part = 0; $part < $parts; $part++) {
769 400041         768850 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
770 400041         2104447 my $size = bytes::length($bytes);
771              
772 400041         2713843 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
773              
774 400041   100     689818 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
775 400041   100     1378088 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  400041         1495093  
776             }
777             }
778              
779 100029         297337 $self->flush();
780 100029         453240 return $parts;
781             }
782              
783             sub read_message {
784 100040     100040 1 104188698 my $self = shift;
785 100040         179750 my %params = @_;
786              
787 100040         229716 my ($id, $out) = $self->_extract_message(%params);
788              
789 100040 100       375825 return $out if defined $id;
790 1         11 return;
791             }
792              
793             sub _extract_message {
794 100050     100050   130250 my $self = shift;
795 100050         137408 my %params = @_;
796              
797 100050   100     282076 my $state = $self->{+STATE} //= {};
798              
799 100050         136800 while (1) {
800 400078 50       730111 unless ($state->{key}) {
801 400078 100       720814 my $key_bytes = $self->_get_from_buffer($psize) or return;
802              
803 400077         520203 my %key;
804 400077         1246401 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
805 400077         678361 $state->{key} = \%key;
806             }
807              
808 400077         498223 my $key = $state->{key};
809              
810 400077   50     649511 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
811              
812 400077         503510 my $id = $key->{id};
813 400077         455752 my $tag = join ':' => @{$key}{qw/pid tid/};
  400077         951492  
814 400077   100     467386 push @{$state->{parts}->{$tag} //= []} => $id;
  400077         985200  
815 400077 100       1737458 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
816              
817 400077         520269 delete $state->{key};
818              
819 400077 100       623135 unless ($id == 0) {
820 300035 100       451389 return ($id, undef) if $params{one_part_only};
821 300028         637085 next;
822             }
823              
824 100042         219385 my $message = delete $state->{buffers}->{$tag};
825 100042         154448 my $parts = delete $state->{parts}->{$tag};
826              
827 100042 100       596463 return ($id, $message) unless $params{debug};
828              
829             return (
830             $id,
831             {
832             message => $message,
833             parts => $parts,
834             pid => $key->{pid},
835             tid => $key->{tid},
836             },
837 1         68 );
838             }
839             }
840              
841             1;
842              
843             __END__