File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 428 514 83.2
branch 150 242 61.9
condition 121 230 52.6
subroutine 67 77 87.0
pod 31 36 86.1
total 797 1099 72.5


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 39     39   9177207 use strict;
  39         313  
  39         1116  
3 39     39   209 use warnings;
  39         78  
  39         1477  
4              
5             our $VERSION = '0.020';
6              
7 39     39   17759 use IO();
  39         30031  
  39         977  
8 39     39   250 use Fcntl();
  39         85  
  39         509  
9 39     39   191 use bytes();
  39         70  
  39         934  
10              
11 39     39   216 use Carp qw/croak confess/;
  39         86  
  39         2230  
12 39     39   305 use Config qw/%Config/;
  39         85  
  39         2079  
13 39     39   230 use List::Util qw/min/;
  39         111  
  39         2809  
14 39     39   339 use Scalar::Util qw/blessed/;
  39         87  
  39         2009  
15              
16 39     39   19178 use Errno qw/EINTR EAGAIN EPIPE/;
  39         57182  
  39         5543  
17             my %RETRY_ERRNO;
18             BEGIN {
19 39     39   226 %RETRY_ERRNO = (EINTR() => 1);
20 39 50       22682 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
21             }
22              
23             BEGIN {
24             # POSIX says writes of 512 or less are atomic, but some platforms allow for
25             # larger ones.
26 39     39   333 require POSIX;
27 39 50 33     584 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  39         407  
28 39         146 *PIPE_BUF = \&POSIX::PIPE_BUF;
29             }
30             else {
31 0         0 *PIPE_BUF = sub() { 512 };
32             }
33              
34 39 50 33     342 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  39         222  
35 39         96 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
36             }
37             else {
38 0         0 *SSIZE_MAX = sub() { 512 };
39             }
40              
41             {
42             # Using the default pipe size as a read size is significantly faster
43             # than a larger value on my test machine.
44 39         135 my $read_size = min(SSIZE_MAX(), 65_536);
  39         315  
45 39         425 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
46             }
47              
48 39         96 my $can_thread = 1;
49 39   33     601 $can_thread &&= $] >= 5.008001;
50 39   33     634 $can_thread &&= $Config{'useithreads'};
51              
52             # Threads are broken on perl 5.10.0 built with gcc 4.8+
53 39 0 33     175 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
54 0         0 my @parts = split /\./, $Config{'gccversion'};
55 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
56             }
57              
58 39   33     188 $can_thread &&= !$INC{'Devel/Cover.pm'};
59              
60 39 50       111 if (!$can_thread) {
    0          
61 39         125 *_get_tid = sub() { 0 };
62             }
63             elsif ($INC{'threads.pm'}) {
64 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
65             }
66             else {
67 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
68             }
69              
70 39 50       161 if ($^O eq 'MSWin32') {
71 0         0 local $@;
72 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
73 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
74 0         0 *IS_WIN32 = sub() { 1 };
75             }
76             else {
77 39         1346 *IS_WIN32 = sub() { 0 };
78             }
79             }
80              
81 39     39   306 use constant READ_SIZE => 'read_size';
  39         108  
  39         2517  
82 39     39   271 use constant RH => 'rh';
  39         80  
  39         2221  
83 39     39   243 use constant WH => 'wh';
  39         148  
  39         2894  
84 39     39   449 use constant EOF => 'eof';
  39         96  
  39         2324  
85 39     39   829 use constant STATE => 'state';
  39         112  
  39         2641  
86 39     39   292 use constant OUT_BUFFER => 'out_buffer';
  39         76  
  39         1989  
87 39     39   278 use constant IN_BUFFER => 'in_buffer';
  39         78  
  39         2014  
88 39     39   247 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  39         83  
  39         1892  
89 39     39   215 use constant READ_BLOCKING => 'read_blocking';
  39         59  
  39         1886  
90 39     39   210 use constant WRITE_BLOCKING => 'write_blocking';
  39         118  
  39         1893  
91 39     39   238 use constant BURST_PREFIX => 'burst_prefix';
  39         72  
  39         1978  
92 39     39   210 use constant BURST_POSTFIX => 'burst_postfix';
  39         263  
  39         2106  
93 39     39   239 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  39         151  
  39         2125  
94 39     39   276 use constant MESSAGE_KEY => 'message_key';
  39         95  
  39         2055  
95 39     39   271 use constant MIXED_BUFFER => 'mixed_buffer';
  39         83  
  39         1978  
96 39     39   235 use constant DELIMITER_SIZE => 'delimiter_size';
  39         113  
  39         2022  
97 39     39   247 use constant INVALID_STATE => 'invalid_state';
  39         105  
  39         1874  
98 39     39   220 use constant HIT_EPIPE => 'hit_epipe';
  39         70  
  39         221308  
99              
100 14     14 1 447 sub wh { shift->{+WH} }
101 0     0 1 0 sub rh { shift->{+RH} }
102              
103             sub throw_invalid {
104 6     6 0 34 my $self = shift;
105 6 50 66     96 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
106 6         2170 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
107             }
108              
109             sub read_size {
110 0     0 1 0 my $self = shift;
111 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
112 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
113             }
114              
115             sub fill_buffer {
116 19101     19101 1 26878 my $self = shift;
117              
118 19101 50       41031 $self->throw_invalid() if $self->{+INVALID_STATE};
119              
120 19101 50       39516 my $rh = $self->{+RH} or die "Not a read handle";
121              
122 19101 100       38507 return 0 if $self->{+EOF};
123              
124 19070   100     37883 $self->{+IN_BUFFER_SIZE} //= 0;
125              
126 19070   50     57130 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
127 19070         23181 if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
128             $to_read = min($self->_win32_pipe_ready(), $to_read);
129             }
130              
131 19070 50       36174 return 0 unless $to_read;
132              
133 19070         25826 while(1) {
134 19070         27767 my $rbuff = '';
135 19070         27247712 my $got = sysread($rh, $rbuff, $to_read);
136 19070 100       87777 unless(defined $got) {
137 33 50       789 return 0 if $! == EAGAIN; # NON-BLOCKING
138 0 0       0 next if $RETRY_ERRNO{0 + $!}; # interrupted or something, try again
139 0         0 $self->throw_invalid("$!");
140             }
141              
142 19037 100       41925 if ($got) {
143 19026         321830 $self->{+IN_BUFFER} .= $rbuff;
144 19026         29696 $self->{+IN_BUFFER_SIZE} += $got;
145 19026         57903 return $got;
146             }
147             else {
148 11         233 $self->{+EOF} = 1;
149 11         149 return 0;
150             }
151             }
152              
153 0         0 return 0;
154             }
155              
156 800175     800175   1326581 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
157 18     18   106 sub _peek_from_buffer { shift->_from_buffer(@_) }
158              
159             sub _from_buffer {
160 800193     800193   1040650 my $self = shift;
161 800193         1465825 my ($size, %params) = @_;
162              
163 800193 100 100     2515907 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
164 19011         44655 $self->fill_buffer;
165 19011 100       46621 unless($self->{+IN_BUFFER_SIZE} >= $size) {
166 3 100 66     98 return unless $params{eof_invalid} && $self->{+EOF};
167 2         36 $self->throw_invalid($params{eof_invalid});
168             }
169             }
170              
171 800190         1016182 my $out;
172              
173 800190 100       1315100 if ($params{remove}) {
174 800174         1037640 $self->{+IN_BUFFER_SIZE} -= $size;
175 800174         2278170 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
176             }
177             else {
178 16         82 $out = substr($self->{+IN_BUFFER}, 0, $size);
179             }
180              
181 800190         2244725 return $out;
182             }
183              
184             sub eof {
185 43     43 1 1992570 my $self = shift;
186              
187 43 100       364 $self->throw_invalid() if $self->{+INVALID_STATE};
188              
189 40 100       262 return 0 if $self->fill_buffer;
190 37 100       390 return 0 unless $self->{+EOF};
191 22 50       106 return 0 if $self->{+IN_BUFFER_SIZE};
192              
193 22 50       157 if (my $buffer = $self->{+MIXED_BUFFER}) {
194 22 100 66     349 return 0 if $buffer->{lines} || length $buffer->{lines};
195 15 50 33     158 return 0 if $buffer->{burst} || length $buffer->{burst};
196             }
197              
198 15         195 return 1;
199             }
200              
201             sub _fh_mode {
202 4     4   2947 my $self = shift;
203 4         11 my ($fh) = @_;
204              
205 4   50     33 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
206 4 100       22 return '<&' if $mode == Fcntl::O_RDONLY();
207 2 50       10 return '>&' if $mode == Fcntl::O_WRONLY();
208 0         0 return undef;
209             }
210              
211             my %MODE_TO_DIR = (
212             '<&' => RH(),
213             '<&=' => RH(),
214             '>&' => WH(),
215             '>&=' => WH(),
216             );
217             sub _mode_to_dir {
218 9     9   14 my $self = shift;
219 9         24 my ($mode) = @_;
220 9         37 return $MODE_TO_DIR{$mode};
221             }
222              
223             sub read_fifo {
224 7     7 1 24691 my $class = shift;
225 7         31 my ($fifo, %params) = @_;
226              
227 7 50       105 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
228              
229 7 50       298 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
230 7         41 binmode($fh);
231              
232 7         81 return bless({%params, RH() => $fh}, $class);
233             }
234              
235             sub write_fifo {
236 6     6 1 20497 my $class = shift;
237 6         160 my ($fifo, %params) = @_;
238              
239 6 50       459 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
240              
241 6 50       809 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
242 6         86 binmode($fh);
243              
244 6         340 return bless({%params, WH() => $fh}, $class);
245             }
246              
247             sub from_fh {
248 5     5 1 6991 my $class = shift;
249 5         10 my $ifh = pop;
250 5         11 my ($mode) = @_;
251              
252 5 50       45 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
253              
254 5   33     29 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
255 5   33     24 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
256              
257 5 50       110 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
258 5         16 binmode($fh);
259              
260 5         29 return bless({$dir => $fh}, $class);
261             }
262              
263             sub from_fd {
264 4     4 1 4446 my $class = shift;
265 4         11 my ($mode, $fd) = @_;
266              
267 4   33     10 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
268 4 50       87 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
269              
270 4 50       28 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
271              
272 4         13 binmode($fh);
273 4         28 return bless({$dir => $fh}, $class);
274             }
275              
276             sub new {
277 1     1 1 86 my $class = shift;
278 1         3 my (%params) = @_;
279              
280 1         3 my ($rh, $wh);
281 1 50       45 pipe($rh, $wh) or die "Could not create pipe: $!";
282              
283 1         4 binmode($wh);
284 1         3 binmode($rh);
285              
286 1         10 return bless({%params, RH() => $rh, WH() => $wh}, $class);
287             }
288              
289             sub pair {
290 38     38 1 61888 my $class = shift;
291 38         190 my (%params) = @_;
292              
293 38         120 my $mixed = delete $params{mixed_data_mode};
294              
295 38         112 my ($rh, $wh);
296 38 50       2259 pipe($rh, $wh) or die "Could not create pipe: $!";
297              
298 38         246 binmode($wh);
299 38         179 binmode($rh);
300              
301 38         338 my $r = bless({%params, RH() => $rh}, $class);
302 38         226 my $w = bless({%params, WH() => $wh}, $class);
303              
304 38 100       171 if ($mixed) {
305 16         84 $r->set_mixed_data_mode();
306 16         45 $w->set_mixed_data_mode();
307             }
308              
309 38         391 return ($r, $w);
310             }
311              
312             sub set_mixed_data_mode {
313 32     32 1 75 my $self = shift;
314              
315 32 50       123 $self->throw_invalid() if $self->{+INVALID_STATE};
316              
317 32 100       210 $self->read_blocking(0) if $self->{+RH};
318              
319 32   50     302 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
320 32   50     159 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
321 32   50     198 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
322             }
323              
324             sub get_line_burst_or_data {
325 56     56 1 2011863 my $self = shift;
326 56         418 my %params = @_;
327              
328 56   33     497 my $rh = $self->{+RH} // croak "Not a read handle";
329              
330 56   33     537 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
331 56   33     321 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
332 56   33     387 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
333              
334 56   100     1092 my $buffer = $self->{+MIXED_BUFFER} //= {
335             lines => '',
336             burst => '',
337             in_burst => 0,
338             in_message => 0,
339             do_extra_loop => 0,
340             strip_term => 0,
341             };
342              
343 56         229 my $peek;
344              
345 56         164 while (1) {
346             $self->throw_invalid('Incomplete message received before EOF')
347 108 100 66     676 if $self->{+EOF} && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
348              
349 107 100 66     719 if($buffer->{lines} || length($buffer->{lines})) {
350             # Look for a complete line
351 64         145 my ($line, $term);
352 64         1066 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
353              
354 64 100       334 return (line => "${line}${term}") if $term;
355 50 50 66     729 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
356              
357 42         95 $buffer->{lines} = $line;
358 42 50 66     189 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
359             }
360              
361 85 100       414 if ($buffer->{in_message}) {
362 10         220 my ($id, $message) = $self->_extract_message(one_part_only => 1);
363              
364 10 50       65 unless(defined $id) {
365 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
366 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
367             }
368              
369 10         46 $buffer->{strip_term}++;
370 10         30 $buffer->{in_message} = 0;
371 10 100       109 return (message => $message) if defined $message;
372             }
373              
374 82 100       279 if ($buffer->{strip_term}) {
375 10   50     81 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
376              
377 10 50       75 $self->throw_invalid("No message terminator") unless $term eq $postfix;
378 10         45 $buffer->{strip_term}--;
379             }
380              
381 82 100       303 if ($buffer->{in_burst}) {
382 18   50     156 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
383              
384 16 100       76 if ($peek eq $key) {
385 10         214 $self->_get_from_buffer(1); # Strip the key
386 10         30 $buffer->{in_message} = 1;
387 10         58 $buffer->{in_burst} = 0;
388 10         71 next;
389             }
390              
391 6   50     28 $buffer->{burst} //= '';
392 6         16 my ($burst_data, $term);
393 6         208 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
394 6         28 $buffer->{burst} .= $burst_data;
395              
396 6 100       32 if ($term) {
397 4         12 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
398 4         12 $buffer->{in_burst} = 0;
399 4         12 $buffer->{do_extra_loop}++;
400 4         40 return (burst => delete($buffer->{burst}));
401             }
402             else {
403 2         24 $self->{+IN_BUFFER_SIZE} = 0;
404             }
405              
406 2 50       54 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
407             }
408              
409 66 100 100     845 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
410 35 100 100     194 return (peek => $peek) if $peek && !$self->{+EOF};
411              
412 33 100       194 return unless $self->{+EOF};
413              
414             # Do at least one more iteration after EOF
415 18 100       226 return if $buffer->{+EOF}++;
416              
417             # But do not try to split the empty buffer
418 11         66 next;
419             }
420              
421             # Look for the start of a burst, anything before a burst is line data
422 31         92 my $linedata;
423 31         1061 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
424 31 50       204 $buffer->{lines} .= $linedata if defined $linedata;
425              
426 31 100       176 if ($buffer->{in_burst}) {
427 16         90 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
428             }
429             else {
430 15         77 $self->{+IN_BUFFER_SIZE} = 0;
431             }
432             }
433             }
434              
435             sub debug {
436 0     0 0 0 my ($id, $buffer) = @_;
437              
438 0         0 print "---debug $id---\n";
439 0         0 for my $key (sort keys %$buffer) {
440 0   0     0 my $val = $buffer->{$key} // '';
441 0         0 $val =~ s/\x0E/\\x0E/g;
442 0         0 $val =~ s/\x0F/\\x0F/g;
443 0         0 $val =~ s/\x10/\\x10/g;
444 0         0 $val =~ s/\n/\\n/g;
445 0         0 $val =~ s/\r/\\r/g;
446 0         0 print "$key: |$val|\n\n";
447             };
448             }
449              
450             # This is a heavily modified version of a pattern suggested on stack-overflow
451             # and also used in Win32::PowerShell::IPC.
452             my $peek_named_pipe;
453             sub _win32_pipe_ready {
454 0     0   0 my $self = shift;
455 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
456              
457 0         0 my $buf = "";
458 0         0 my $buflen = 0;
459              
460 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
461             || die "Can't load PeekNamedPipe from kernel32.dll";
462              
463 0         0 my $got = pack('L', 0);
464 0         0 my $avail = pack('L', 0);
465 0         0 my $remain = pack('L', 0);
466              
467 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
468              
469 0 0       0 $self->{+EOF} = 1 if $ret == 0;
470              
471 0         0 return unpack('L', $avail);
472             }
473              
474             my $set_named_pipe_handle_state;
475             sub _win32_set_pipe_state {
476 0     0   0 my $self = shift;
477 0         0 my ($state) = @_;
478 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
479              
480 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
481             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
482              
483             # Block or non-block?
484 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
485              
486 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
487              
488 0         0 return $ret;
489             }
490              
491             sub read_blocking {
492 21     21 0 237 my $self = shift;
493 21 50       139 my $rh = $self->{+RH} or croak "Not a reader";
494              
495 21 50       153 ($self->{+READ_BLOCKING}) = @_ if @_;
496              
497 21         69 unless (IS_WIN32) {
498 21         460 $rh->blocking(@_);
499             }
500              
501 21         98 return $self->{+READ_BLOCKING};
502             }
503              
504             sub write_blocking {
505 5     5 0 11 my $self = shift;
506 5 50       34 my $wh = $self->{+WH} or croak "Not a writer";
507              
508 5 50       16 return $self->{+WRITE_BLOCKING} unless @_;
509              
510 5         22 my ($val) = @_;
511 5         11 $self->{+WRITE_BLOCKING} = $val;
512              
513 5         10 if (IS_WIN32) {
514             $self->_win32_set_pipe_state(@_) if @_;
515             }
516             else {
517 5         11 my $flags = 0;
518 5 50       67 fcntl($wh, &Fcntl::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
519 5 100       20 if ($val) { $flags ^= &Fcntl::O_NONBLOCK } # Remove non-blocking
  1         73  
520 4         12 else { $flags |= &Fcntl::O_NONBLOCK } # Add non-blocking to the flags
521 5 50       61 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
522             }
523              
524 5         25 return $self->{+WRITE_BLOCKING};
525             }
526              
527             sub blocking {
528 10     10 1 1216068 my $self = shift;
529              
530 10 100 66     226 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
531 5         57 return $self->read_blocking(@_);
532             }
533             elsif ($self->{+WH} && !$self->{+RH}) {
534 5         31 return $self->write_blocking(@_);
535             }
536              
537 0         0 my $r = $self->read_blocking(@_);
538 0         0 my $w = $self->write_blocking(@_);
539              
540 0 0 0     0 return 1 if $r && $w;
541 0 0 0     0 return 0 if !$r && !$w;
542 0         0 return undef;
543             }
544              
545             sub size {
546 0     0 1 0 my $self = shift;
547 0 0       0 return unless defined &Fcntl::F_GETPIPE_SZ;
548 0   0     0 my $fh = $self->{+WH} // $self->{+RH};
549 0         0 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
550             }
551              
552             sub resize {
553 3     3 1 5949 my $self = shift;
554 3         60 my ($size) = @_;
555              
556 3 50       102 return unless defined &Fcntl::F_SETPIPE_SZ;
557 3   33     75 my $fh = $self->{+WH} // $self->{+RH};
558              
559 3         84 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size);
560             }
561              
562             my $ONE_MB = 1 * 1024 * 1024;
563              
564             sub max_size {
565 0 0   0 1 0 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
566              
567 0 0       0 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
568 0         0 chomp(my $val = <$max>);
569 0         0 close($max);
570 0   0     0 return $val || $ONE_MB;
571             }
572              
573             sub resize_or_max {
574 0     0 1 0 my $self = shift;
575 0         0 my ($size) = @_;
576 0         0 $size = min($size, $self->max_size);
577 0         0 $self->resize($size);
578             }
579              
580             sub is_reader {
581 4     4 1 824 my $self = shift;
582 4 50 33     43 return 1 if $self->{+RH} && !$self->{+WH};
583 0         0 return undef;
584             }
585              
586             sub is_writer {
587 4     4 1 8 my $self = shift;
588 4 50 33     31 return 1 if $self->{+WH} && !$self->{+RH};
589 0         0 return undef;
590             }
591              
592             sub clone_writer {
593 3     3 1 15 my $self = shift;
594 3         14 my $class = blessed($self);
595 3 50       64 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
596 3         19 return bless({WH() => $fh}, $class);
597             }
598              
599             sub clone_reader {
600 0     0 1 0 my $self = shift;
601 0         0 my $class = blessed($self);
602 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
603 0         0 return bless({RH() => $fh}, $class);
604             }
605              
606             sub writer {
607 0     0 1 0 my $self = shift;
608              
609 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
610              
611 0 0       0 return 1 unless $self->{+RH};
612              
613 0         0 close(delete $self->{+RH});
614 0         0 return 1;
615             }
616              
617             sub reader {
618 1     1 1 5 my $self = shift;
619              
620 1 50       5 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
621              
622 1 50       3 return 1 unless $self->{+WH};
623              
624 1         8 close(delete $self->{+WH});
625 1         5 return 1;
626             }
627              
628             sub close {
629 12     12 1 8012 my $self = shift;
630 12 100       363 close(delete $self->{+WH}) if $self->{+WH};
631 12 100       292 close(delete $self->{+RH}) if $self->{+RH};
632 12         68 return;
633             }
634              
635             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
636             my $dsize = PIPE_BUF - $psize;
637              
638             sub delimiter_size {
639 35 50   35 0 4655 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
640 35   100     1040 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
641             }
642              
643             sub fits_in_burst {
644 8     8 1 52 my $self = shift;
645 8         25 my ($data) = @_;
646              
647 8   100     109 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
648 8 100       155 return undef unless $size <= PIPE_BUF;
649              
650 7         42 return $size;
651             }
652              
653             sub write_burst {
654 4     4 1 2757 my $self = shift;
655 4         38 my ($data) = @_;
656              
657 4   100     44 my $size = $self->fits_in_burst($data) // return undef;
658              
659 3   100     12 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  3         46  
660 3         29 $self->flush();
661              
662 3         13 return 1;
663             }
664              
665             sub DESTROY {
666 99     99   12983822 my $self = shift;
667 99 50       633 return if $self->{+HIT_EPIPE};
668 99 50       648 $self->flush(blocking => 1) if $self->pending_output;
669             }
670              
671             sub pending_output {
672 101     101 1 1475 my $self = shift;
673 101 100       9194 my $buffer = $self->{+OUT_BUFFER} or return 0;
674 32 100       4703 return 0 unless @$buffer;
675 1         5 return 1;
676             }
677              
678             sub flush {
679 688457     688457 1 2757703 my $self = shift;
680 688457         1060322 my %params = @_;
681 688457   66     2150549 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
682              
683 688457   50     1275112 my $buffer = $self->{+OUT_BUFFER} // return;
684              
685 688457         1188325 while (@$buffer) {
686 1241538         1896658 my $set = $buffer->[0];
687 1241538         2408689 my $got = $self->_write_burst(@$set);
688              
689 1241536 100 100     3788506 return unless $blocking || defined $got;
690 653110 100       1339554 next unless defined $got;
691              
692 400058         1466698 shift @$buffer;
693             }
694              
695 100029         267900 return;
696             }
697              
698             sub _write_burst {
699 1241537     1241537   1648335 my $self = shift;
700 1241537         2150195 my ($data, $size) = @_;
701              
702 1241537 50       2489498 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
703              
704 1241537 100       2132202 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
705              
706 1241536   100     3118072 my $prefix = $self->{+BURST_PREFIX} // '';
707 1241536   100     2838422 my $postfix = $self->{+BURST_POSTFIX} // '';
708              
709 1241536 100 66     3495470 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
710              
711 1241536         1528078 my $wrote;
712 1241536         1574973 my $loop = 0;
713             SWRITE: {
714 1241536         1576382 $wrote = syswrite($wh, $data, $size);
  1241536         679963628  
715 1241536 100 100     7900265 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
716 1         49 $self->{+HIT_EPIPE} = 1;
717 1         3 delete $self->{+OUT_BUFFER};
718 1         186 croak "Disconnected pipe";
719             }
720 1241535 100 100     4750588 return undef if $! == EAGAIN || (IS_WIN32 && $! == 28); # NON-BLOCKING
721 400058 50 33     1815666 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
722 400058 50       1092103 last SWRITE if $wrote == $size;
723 0   0     0 $wrote //= "";
724 0         0 die "$wrote vs $size: $!";
725             }
726              
727 400058         1184390 return $wrote;
728             }
729              
730             sub _adjusted_dsize {
731 32     32   457 my $self = shift;
732              
733 32 50       503 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
734              
735 32   100     1124 my $message_key = $self->{+MESSAGE_KEY} // '';
736 32   100     955 my $prefix = $self->{+BURST_PREFIX} // '';
737 32   100     670 my $postfix = $self->{+BURST_POSTFIX} // '';
738              
739 32         1568 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
740 32         51728 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
741             }
742              
743             sub write_message {
744 100029     100029 1 23719335 my $self = shift;
745 100029         234667 my ($data) = @_;
746              
747 100029         155111 my $tid = _get_tid();
748 100029   100     371420 my $message_key = $self->{+MESSAGE_KEY} // '';
749 100029   66     278379 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
750 100029         303379 my $dtotal = bytes::length($data);
751              
752 100029         532196 my $parts = int($dtotal / $adjusted_dsize);
753 100029 50       285712 $parts++ if $dtotal % $adjusted_dsize;
754              
755 100029         173938 my $id = $parts - 1;
756              
757             # Unwinding the loop for a 1-part message for micro-optimization
758 100029 100       261329 if ($parts == 1) {
759 16         88 my $bytes = $data;
760 16         46 my $size = $dtotal;
761 16         325 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
762              
763 16 50 66     432 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
764              
765 16   100     250 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  16         206  
766             }
767             else {
768 100013         261306 for (my $part = 0; $part < $parts; $part++) {
769 400041         993162 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
770 400041         2666964 my $size = bytes::length($bytes);
771              
772 400041         2610942 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
773              
774 400041   100     873277 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
775 400041   100     1552453 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  400041         1780545  
776             }
777             }
778              
779 100029         297720 $self->flush();
780 100029         408017 return $parts;
781             }
782              
783             sub read_message {
784 100040     100040 1 130534583 my $self = shift;
785 100040         201954 my %params = @_;
786              
787 100040         235117 my ($id, $out) = $self->_extract_message(%params);
788              
789 100040 100       425614 return $out if defined $id;
790 1         21 return;
791             }
792              
793             sub _extract_message {
794 100050     100050   142303 my $self = shift;
795 100050         157514 my %params = @_;
796              
797 100050   100     282725 my $state = $self->{+STATE} //= {};
798              
799 100050         146592 while (1) {
800 400078 50       761167 unless ($state->{key}) {
801 400078 100       751691 my $key_bytes = $self->_get_from_buffer($psize) or return;
802              
803 400077         605422 my %key;
804 400077         1422569 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
805 400077         810985 $state->{key} = \%key;
806             }
807              
808 400077         581646 my $key = $state->{key};
809              
810 400077   50     713026 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
811              
812 400077         610753 my $id = $key->{id};
813 400077         525466 my $tag = join ':' => @{$key}{qw/pid tid/};
  400077         1025651  
814 400077   100     536876 push @{$state->{parts}->{$tag} //= []} => $id;
  400077         1113781  
815 400077 100       2049715 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
816              
817 400077         636372 delete $state->{key};
818              
819 400077 100       729643 unless ($id == 0) {
820 300035 100       546362 return ($id, undef) if $params{one_part_only};
821 300028         662673 next;
822             }
823              
824 100042         222033 my $message = delete $state->{buffers}->{$tag};
825 100042         176555 my $parts = delete $state->{parts}->{$tag};
826              
827 100042 100       729333 return ($id, $message) unless $params{debug};
828              
829             return (
830             $id,
831             {
832             message => $message,
833             parts => $parts,
834             pid => $key->{pid},
835             tid => $key->{tid},
836             },
837 1         55 );
838             }
839             }
840              
841             1;
842              
843             __END__