File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 412 498 82.7
branch 136 228 59.6
condition 115 225 51.1
subroutine 65 75 86.6
pod 30 35 85.7
total 758 1061 71.4


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 36     36   7922426 use strict;
  36         227  
  36         982  
3 36     36   169 use warnings;
  36         72  
  36         1218  
4              
5             our $VERSION = '0.018';
6              
7 36     36   16586 use IO();
  36         25343  
  36         817  
8 36     36   226 use Fcntl();
  36         69  
  36         834  
9 36     36   203 use bytes();
  36         57  
  36         1154  
10              
11 36     36   195 use Carp qw/croak confess/;
  36         71  
  36         1757  
12 36     36   255 use Config qw/%Config/;
  36         77  
  36         1745  
13 36     36   190 use List::Util qw/min/;
  36         71  
  36         2318  
14 36     36   246 use Scalar::Util qw/blessed/;
  36         115  
  36         1623  
15              
16 36     36   18530 use Errno qw/EINTR EAGAIN/;
  36         48716  
  36         4368  
17             my %RETRY_ERRNO;
18             BEGIN {
19 36     36   218 %RETRY_ERRNO = (EINTR() => 1);
20 36 50       19415 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
21             }
22              
23             BEGIN {
24             # POSIX says writes of 512 or less are atomic, but some platforms allow for
25             # larger ones.
26 36     36   303 require POSIX;
27 36 50 33     534 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  36         203  
28 36         114 *PIPE_BUF = \&POSIX::PIPE_BUF;
29             }
30             else {
31 0         0 *PIPE_BUF = sub() { 512 };
32             }
33              
34 36 50 33     185 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  36         137  
35 36         77 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
36             }
37             else {
38 0         0 *SSIZE_MAX = sub() { 512 };
39             }
40              
41             {
42             # Using the default pipe size as a read size is significantly faster
43             # than a larger value on my test machine.
44 36         70 my $read_size = min(SSIZE_MAX(), 65_536);
  36         243  
45 36         307 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
46             }
47              
48 36         79 my $can_thread = 1;
49 36   33     466 $can_thread &&= $] >= 5.008001;
50 36   33     580 $can_thread &&= $Config{'useithreads'};
51              
52             # Threads are broken on perl 5.10.0 built with gcc 4.8+
53 36 0 33     158 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
54 0         0 my @parts = split /\./, $Config{'gccversion'};
55 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
56             }
57              
58 36   33     160 $can_thread &&= !$INC{'Devel/Cover.pm'};
59              
60 36 50       104 if (!$can_thread) {
    0          
61 36         139 *_get_tid = sub() { 0 };
62             }
63             elsif ($INC{'threads.pm'}) {
64 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
65             }
66             else {
67 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
68             }
69              
70 36 50       399 if ($^O eq 'MSWin32') {
71 0         0 local $@;
72 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
73 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
74 0         0 *IS_WIN32 = sub() { 1 };
75             }
76             else {
77 36         1114 *IS_WIN32 = sub() { 0 };
78             }
79             }
80              
81 36     36   305 use constant READ_SIZE => 'read_size';
  36         88  
  36         3368  
82 36     36   265 use constant RH => 'rh';
  36         54  
  36         1942  
83 36     36   196 use constant WH => 'wh';
  36         109  
  36         2078  
84 36     36   270 use constant EOF => 'eof';
  36         62  
  36         1824  
85 36     36   725 use constant STATE => 'state';
  36         128  
  36         2021  
86 36     36   254 use constant OUT_BUFFER => 'out_buffer';
  36         161  
  36         1890  
87 36     36   208 use constant IN_BUFFER => 'in_buffer';
  36         73  
  36         1874  
88 36     36   233 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  36         74  
  36         1991  
89 36     36   210 use constant READ_BLOCKING => 'read_blocking';
  36         74  
  36         1921  
90 36     36   211 use constant WRITE_BLOCKING => 'write_blocking';
  36         95  
  36         1609  
91 36     36   188 use constant BURST_PREFIX => 'burst_prefix';
  36         59  
  36         1587  
92 36     36   199 use constant BURST_POSTFIX => 'burst_postfix';
  36         67  
  36         1521  
93 36     36   186 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  36         54  
  36         1693  
94 36     36   211 use constant MESSAGE_KEY => 'message_key';
  36         169  
  36         1900  
95 36     36   218 use constant MIXED_BUFFER => 'mixed_buffer';
  36         71  
  36         1847  
96 36     36   220 use constant DELIMITER_SIZE => 'delimiter_size';
  36         99  
  36         1921  
97 36     36   210 use constant INVALID_STATE => 'invalid_state';
  36         73  
  36         188519  
98              
99 12     12 1 290 sub wh { shift->{+WH} }
100 0     0 1 0 sub rh { shift->{+RH} }
101              
102             sub throw_invalid {
103 6     6 0 27 my $self = shift;
104 6 50 66     81 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
105 6         1874 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
106             }
107              
108             sub read_size {
109 0     0 1 0 my $self = shift;
110 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
111 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
112             }
113              
114             sub fill_buffer {
115 19067     19067 1 26158 my $self = shift;
116              
117 19067 50       41512 $self->throw_invalid() if $self->{+INVALID_STATE};
118              
119 19067 50       42883 my $rh = $self->{+RH} or die "Not a read handle";
120              
121 19067 100       37011 return 0 if $self->{+EOF};
122              
123 19037   100     38459 $self->{+IN_BUFFER_SIZE} //= 0;
124              
125 19037   50     55560 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
126 19037         23172 if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
127             $to_read = min($self->_win32_pipe_ready(), $to_read);
128             }
129              
130 19037 50       38439 return 0 unless $to_read;
131              
132 19037         26204 while(1) {
133 19037         28210 my $rbuff = '';
134 19037         26341256 my $got = sysread($rh, $rbuff, $to_read);
135 19037 100       97450 unless(defined $got) {
136 29 50       494 return 0 if $! == EAGAIN; # NON-BLOCKING
137 0 0       0 next if $RETRY_ERRNO{0 + $!}; # interrupted or something, try again
138 0         0 $self->throw_invalid("$!");
139             }
140              
141 19008 100       42198 if ($got) {
142 18998         306424 $self->{+IN_BUFFER} .= $rbuff;
143 18998         31650 $self->{+IN_BUFFER_SIZE} += $got;
144 18998         60643 return $got;
145             }
146             else {
147 10         124 $self->{+EOF} = 1;
148 10         79 return 0;
149             }
150             }
151              
152 0         0 return 0;
153             }
154              
155 800175     800175   1349513 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
156 18     18   74 sub _peek_from_buffer { shift->_from_buffer(@_) }
157              
158             sub _from_buffer {
159 800193     800193   1046225 my $self = shift;
160 800193         1462351 my ($size, %params) = @_;
161              
162 800193 100 100     2623084 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
163 18984         48371 $self->fill_buffer;
164 18984 100       49938 unless($self->{+IN_BUFFER_SIZE} >= $size) {
165 3 100 66     86 return unless $params{eof_invalid} && $self->{+EOF};
166 2         38 $self->throw_invalid($params{eof_invalid});
167             }
168             }
169              
170 800190         1034184 my $out;
171              
172 800190 100       1297776 if ($params{remove}) {
173 800174         1083527 $self->{+IN_BUFFER_SIZE} -= $size;
174 800174         2275093 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
175             }
176             else {
177 16         62 $out = substr($self->{+IN_BUFFER}, 0, $size);
178             }
179              
180 800190         2261959 return $out;
181             }
182              
183             sub eof {
184 41     41 1 1476377 my $self = shift;
185              
186 41 100       178 $self->throw_invalid() if $self->{+INVALID_STATE};
187              
188 38 100       174 return 0 if $self->fill_buffer;
189 35 100       287 return 0 unless $self->{+EOF};
190 21 50       70 return 0 if $self->{+IN_BUFFER_SIZE};
191              
192 21 50       98 if (my $buffer = $self->{+MIXED_BUFFER}) {
193 21 100 66     178 return 0 if $buffer->{lines} || length $buffer->{lines};
194 14 50 33     142 return 0 if $buffer->{burst} || length $buffer->{burst};
195             }
196              
197 14         178 return 1;
198             }
199              
200             sub _fh_mode {
201 4     4   1677 my $self = shift;
202 4         9 my ($fh) = @_;
203              
204 4   50     31 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
205 4 100       20 return '<&' if $mode == Fcntl::O_RDONLY();
206 2 50       12 return '>&' if $mode == Fcntl::O_WRONLY();
207 0         0 return undef;
208             }
209              
210             my %MODE_TO_DIR = (
211             '<&' => RH(),
212             '<&=' => RH(),
213             '>&' => WH(),
214             '>&=' => WH(),
215             );
216             sub _mode_to_dir {
217 9     9   20 my $self = shift;
218 9         16 my ($mode) = @_;
219 9         32 return $MODE_TO_DIR{$mode};
220             }
221              
222             sub read_fifo {
223 7     7 1 24870 my $class = shift;
224 7         28 my ($fifo, %params) = @_;
225              
226 7 50       119 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
227              
228 7 50       295 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
229 7         28 binmode($fh);
230              
231 7         64 return bless({%params, RH() => $fh}, $class);
232             }
233              
234             sub write_fifo {
235 6     6 1 24489 my $class = shift;
236 6         178 my ($fifo, %params) = @_;
237              
238 6 50       521 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
239              
240 6 50       893 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
241 6         183 binmode($fh);
242              
243 6         252 return bless({%params, WH() => $fh}, $class);
244             }
245              
246             sub from_fh {
247 5     5 1 7036 my $class = shift;
248 5         10 my $ifh = pop;
249 5         14 my ($mode) = @_;
250              
251 5 50       47 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
252              
253 5   33     23 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
254 5   33     21 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
255              
256 5 50       115 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
257 5         18 binmode($fh);
258              
259 5         37 return bless({$dir => $fh}, $class);
260             }
261              
262             sub from_fd {
263 4     4 1 3127 my $class = shift;
264 4         9 my ($mode, $fd) = @_;
265              
266 4   33     8 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
267 4 50       83 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
268              
269 4 50       30 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
270              
271 4         11 binmode($fh);
272 4         25 return bless({$dir => $fh}, $class);
273             }
274              
275             sub new {
276 1     1 1 99 my $class = shift;
277 1         3 my (%params) = @_;
278              
279 1         2 my ($rh, $wh);
280 1 50       49 pipe($rh, $wh) or die "Could not create pipe: $!";
281              
282 1         4 binmode($wh);
283 1         3 binmode($rh);
284              
285 1         9 return bless({%params, RH() => $rh, WH() => $wh}, $class);
286             }
287              
288             sub pair {
289 35     35 1 51229 my $class = shift;
290 35         142 my (%params) = @_;
291              
292 35         82 my $mixed = delete $params{mixed_data_mode};
293              
294 35         93 my ($rh, $wh);
295 35 50       1466 pipe($rh, $wh) or die "Could not create pipe: $!";
296              
297 35         165 binmode($wh);
298 35         78 binmode($rh);
299              
300 35         256 my $r = bless({%params, RH() => $rh}, $class);
301 35         145 my $w = bless({%params, WH() => $wh}, $class);
302              
303 35 100       125 if ($mixed) {
304 14         67 $r->set_mixed_data_mode();
305 14         46 $w->set_mixed_data_mode();
306             }
307              
308 35         222 return ($r, $w);
309             }
310              
311             sub set_mixed_data_mode {
312 28     28 1 47 my $self = shift;
313              
314 28 50       178 $self->throw_invalid() if $self->{+INVALID_STATE};
315              
316 28 100       157 $self->read_blocking(0) if $self->{+RH};
317              
318 28   50     212 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
319 28   50     160 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
320 28   50     107 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
321             }
322              
323             sub get_line_burst_or_data {
324 52     52 1 1411043 my $self = shift;
325              
326 52   33     291 my $rh = $self->{+RH} // croak "Not a read handle";
327              
328 52   33     352 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
329 52   33     302 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
330 52   33     233 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
331              
332 52   100     1096 my $buffer = $self->{+MIXED_BUFFER} //= {
333             lines => '',
334             burst => '',
335             in_burst => 0,
336             in_message => 0,
337             do_extra_loop => 0,
338             strip_term => 0,
339             };
340              
341 52         145 while (1) {
342             $self->throw_invalid('Incomplete message received before EOF')
343 102 100 66     523 if $self->{+EOF} && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
344              
345 101 100 66     481 if($buffer->{lines} || length($buffer->{lines})) {
346             # Look for a complete line
347 59         112 my ($line, $term);
348 59         707 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
349              
350 59 100       259 return (line => "${line}${term}") if $term;
351 45 50 66     245 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
352              
353 38         83 $buffer->{lines} = $line;
354             }
355              
356 80 100       155 if ($buffer->{in_message}) {
357 10         105 my ($id, $message) = $self->_extract_message(one_part_only => 1);
358              
359 10 50       39 unless(defined $id) {
360 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
361 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
362             }
363              
364 10         26 $buffer->{strip_term}++;
365 10         27 $buffer->{in_message} = 0;
366 10 100       36 return (message => $message) if defined $message;
367             }
368              
369 77 100       206 if ($buffer->{strip_term}) {
370 10   50     27 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
371              
372 10 50       31 $self->throw_invalid("No message terminator") unless $term eq $postfix;
373 10         13 $buffer->{strip_term}--;
374             }
375              
376 77 100       152 if ($buffer->{in_burst}) {
377 18   50     71 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
378              
379 16 100       98 if ($peek eq $key) {
380 10         100 $self->_get_from_buffer(1); # Strip the key
381 10         32 $buffer->{in_message} = 1;
382 10         48 $buffer->{in_burst} = 0;
383 10         27 next;
384             }
385              
386 6   50     40 $buffer->{burst} //= '';
387 6         16 my ($burst_data, $term);
388 6         226 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
389 6         26 $buffer->{burst} .= $burst_data;
390              
391 6 100       28 if ($term) {
392 4         8 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
393 4         8 $buffer->{in_burst} = 0;
394 4         8 $buffer->{do_extra_loop}++;
395 4         28 return (burst => delete($buffer->{burst}));
396             }
397             else {
398 2         26 $self->{+IN_BUFFER_SIZE} = 0;
399             }
400              
401 2 50       52 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
402             }
403              
404 61 100 100     532 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
405 31 100       158 return unless $self->{+EOF};
406              
407             # Do at least one more iteration after EOF
408 17 100       114 return if $buffer->{+EOF}++;
409              
410             # But do not try to split the empty buffer
411 10         27 next;
412             }
413              
414             # Look for the start of a burst, anything before a burst is line data
415 30         141 my $linedata;
416 30         647 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
417 30 50       125 $buffer->{lines} .= $linedata if defined $linedata;
418              
419 30 100       78 if ($buffer->{in_burst}) {
420 16         53 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
421             }
422             else {
423 14         28 $self->{+IN_BUFFER_SIZE} = 0;
424             }
425             }
426             }
427              
428             sub debug {
429 0     0 0 0 my ($id, $buffer) = @_;
430              
431 0         0 print "---debug $id---\n";
432 0         0 for my $key (sort keys %$buffer) {
433 0   0     0 my $val = $buffer->{$key} // '';
434 0         0 $val =~ s/\x0E/\\x0E/g;
435 0         0 $val =~ s/\x0F/\\x0F/g;
436 0         0 $val =~ s/\x10/\\x10/g;
437 0         0 $val =~ s/\n/\\n/g;
438 0         0 $val =~ s/\r/\\r/g;
439 0         0 print "$key: |$val|\n\n";
440             };
441             }
442              
443             # This is a heavily modified version of a pattern suggested on stack-overflow
444             # and also used in Win32::PowerShell::IPC.
445             my $peek_named_pipe;
446             sub _win32_pipe_ready {
447 0     0   0 my $self = shift;
448 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
449              
450 0         0 my $buf = "";
451 0         0 my $buflen = 0;
452              
453 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
454             || die "Can't load PeekNamedPipe from kernel32.dll";
455              
456 0         0 my $got = pack('L', 0);
457 0         0 my $avail = pack('L', 0);
458 0         0 my $remain = pack('L', 0);
459              
460 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
461              
462 0 0       0 $self->{+EOF} = 1 if $ret == 0;
463              
464 0         0 return unpack('L', $avail);
465             }
466              
467             my $set_named_pipe_handle_state;
468             sub _win32_set_pipe_state {
469 0     0   0 my $self = shift;
470 0         0 my ($state) = @_;
471 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
472              
473 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
474             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
475              
476             # Block or non-block?
477 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
478              
479 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
480              
481 0         0 return $ret;
482             }
483              
484             sub read_blocking {
485 19     19 0 49 my $self = shift;
486 19 50       93 my $rh = $self->{+RH} or croak "Not a reader";
487              
488 19 50       124 ($self->{+READ_BLOCKING}) = @_ if @_;
489              
490 19         36 unless (IS_WIN32) {
491 19         366 $rh->blocking(@_);
492             }
493              
494 19         97 return $self->{+READ_BLOCKING};
495             }
496              
497             sub write_blocking {
498 5     5 0 12 my $self = shift;
499 5 50       21 my $wh = $self->{+WH} or croak "Not a writer";
500              
501 5 50       17 return $self->{+WRITE_BLOCKING} unless @_;
502              
503 5         10 my ($val) = @_;
504 5         15 $self->{+WRITE_BLOCKING} = $val;
505              
506 5         10 if (IS_WIN32) {
507             $self->_win32_set_pipe_state(@_) if @_;
508             }
509             else {
510 5         10 my $flags = 0;
511 5 50       56 fcntl($wh, &Fcntl::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
512 5 100       22 if ($val) { $flags ^= &Fcntl::O_NONBLOCK } # Remove non-blocking
  1         77  
513 4         12 else { $flags |= &Fcntl::O_NONBLOCK } # Add non-blocking to the flags
514 5 50       59 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
515             }
516              
517 5         34 return $self->{+WRITE_BLOCKING};
518             }
519              
520             sub blocking {
521 10     10 1 1093996 my $self = shift;
522              
523 10 100 66     281 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
524 5         56 return $self->read_blocking(@_);
525             }
526             elsif ($self->{+WH} && !$self->{+RH}) {
527 5         38 return $self->write_blocking(@_);
528             }
529              
530 0         0 my $r = $self->read_blocking(@_);
531 0         0 my $w = $self->write_blocking(@_);
532              
533 0 0 0     0 return 1 if $r && $w;
534 0 0 0     0 return 0 if !$r && !$w;
535 0         0 return undef;
536             }
537              
538             sub size {
539 0     0 1 0 my $self = shift;
540 0 0       0 return unless defined &Fcntl::F_GETPIPE_SZ;
541 0   0     0 my $fh = $self->{+WH} // $self->{+RH};
542 0         0 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
543             }
544              
545             sub resize {
546 3     3 1 4446 my $self = shift;
547 3         54 my ($size) = @_;
548              
549 3 50       150 return unless defined &Fcntl::F_SETPIPE_SZ;
550 3   33     111 my $fh = $self->{+WH} // $self->{+RH};
551              
552 3         57 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size);
553             }
554              
555             my $ONE_MB = 1 * 1024 * 1024;
556              
557             sub max_size {
558 0 0   0 1 0 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
559              
560 0 0       0 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
561 0         0 chomp(my $val = <$max>);
562 0         0 close($max);
563 0   0     0 return $val || $ONE_MB;
564             }
565              
566             sub resize_or_max {
567 0     0 1 0 my $self = shift;
568 0         0 my ($size) = @_;
569 0         0 $size = min($size, $self->max_size);
570 0         0 $self->resize($size);
571             }
572              
573             sub is_reader {
574 4     4 1 855 my $self = shift;
575 4 50 33     43 return 1 if $self->{+RH} && !$self->{+WH};
576 0         0 return undef;
577             }
578              
579             sub is_writer {
580 4     4 1 10 my $self = shift;
581 4 50 33     28 return 1 if $self->{+WH} && !$self->{+RH};
582 0         0 return undef;
583             }
584              
585             sub clone_writer {
586 3     3 1 18 my $self = shift;
587 3         15 my $class = blessed($self);
588 3 50       66 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
589 3         22 return bless({WH() => $fh}, $class);
590             }
591              
592             sub clone_reader {
593 0     0 1 0 my $self = shift;
594 0         0 my $class = blessed($self);
595 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
596 0         0 return bless({RH() => $fh}, $class);
597             }
598              
599             sub writer {
600 0     0 1 0 my $self = shift;
601              
602 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
603              
604 0 0       0 return 1 unless $self->{+RH};
605              
606 0         0 close(delete $self->{+RH});
607 0         0 return 1;
608             }
609              
610             sub reader {
611 1     1 1 5 my $self = shift;
612              
613 1 50       4 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
614              
615 1 50       4 return 1 unless $self->{+WH};
616              
617 1         7 close(delete $self->{+WH});
618 1         6 return 1;
619             }
620              
621             sub close {
622 10     10 1 3977 my $self = shift;
623 10 50       342 close(delete $self->{+WH}) if $self->{+WH};
624 10 50       190 close(delete $self->{+RH}) if $self->{+RH};
625 10         23 return;
626             }
627              
628             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
629             my $dsize = PIPE_BUF - $psize;
630              
631             sub delimiter_size {
632 34 50   34 0 3898 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
633 34   100     1302 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
634             }
635              
636             sub fits_in_burst {
637 4     4 1 13 my $self = shift;
638 4         23 my ($data) = @_;
639              
640 4   100     77 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
641 4 100       88 return undef unless $size <= PIPE_BUF;
642              
643 3         19 return $size;
644             }
645              
646             sub write_burst {
647 4     4 1 1859 my $self = shift;
648 4         25 my ($data) = @_;
649              
650 4   100     41 my $size = $self->fits_in_burst($data) // return undef;
651              
652 3   100     14 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  3         43  
653 3         90 $self->flush();
654              
655 3         16 return 1;
656             }
657              
658             sub DESTROY {
659 94     94   12615215 my $self = shift;
660 94 50 66     8538 $self->flush(blocking => 1) if $self->{+OUT_BUFFER} && @{$self->{+OUT_BUFFER}};
  31         4143  
661             }
662              
663             sub flush {
664 714156     714156 1 2941896 my $self = shift;
665 714156         1131550 my %params = @_;
666 714156   66     2257954 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
667              
668 714156   50     1332088 my $buffer = $self->{+OUT_BUFFER} // return;
669              
670 714156         1247717 while (@$buffer) {
671 1298764         1948615 my $set = $buffer->[0];
672 1298764         2497900 my $got = $self->_write_burst(@$set);
673              
674 1298764 100 100     4165795 return unless $blocking || defined $got;
675 684637 100       1392960 next unless defined $got;
676              
677 400058         1430548 shift @$buffer;
678             }
679              
680 100029         252302 return;
681             }
682              
683             sub _write_burst {
684 1298763     1298763   1651566 my $self = shift;
685 1298763         2247240 my ($data, $size) = @_;
686              
687 1298763 50       2639087 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
688              
689 1298763   100     3213405 my $prefix = $self->{+BURST_PREFIX} // '';
690 1298763   100     3102762 my $postfix = $self->{+BURST_POSTFIX} // '';
691              
692 1298763 100 66     3605396 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
693              
694 1298763         1542138 my $wrote;
695 1298763         1638199 my $loop = 0;
696             SWRITE: {
697 1298763         1587595 $wrote = syswrite($wh, $data, $size);
  1298763         683248336  
698 1298763 100 50     9209601 return undef if $! == EAGAIN || ($! == 28 && IS_WIN32); # NON-BLOCKING
      66        
699 400058 50 33     1912529 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
700 400058 50       1009858 last SWRITE if $wrote == $size;
701 0   0     0 $wrote //= "";
702 0         0 die "$wrote vs $size: $!";
703             }
704              
705 400058         1119549 return $wrote;
706             }
707              
708             sub _adjusted_dsize {
709 32     32   264 my $self = shift;
710              
711 32 50       341 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
712              
713 32   100     1057 my $message_key = $self->{+MESSAGE_KEY} // '';
714 32   100     928 my $prefix = $self->{+BURST_PREFIX} // '';
715 32   100     729 my $postfix = $self->{+BURST_POSTFIX} // '';
716              
717 32         1726 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
718 32         47879 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
719             }
720              
721             sub write_message {
722 100029     100029 1 23619632 my $self = shift;
723 100029         224352 my ($data) = @_;
724              
725 100029         156320 my $tid = _get_tid();
726 100029   100     358182 my $message_key = $self->{+MESSAGE_KEY} // '';
727 100029   66     260441 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
728 100029         300414 my $dtotal = bytes::length($data);
729              
730 100029         533908 my $parts = int($dtotal / $adjusted_dsize);
731 100029 50       254858 $parts++ if $dtotal % $adjusted_dsize;
732              
733 100029         174517 my $id = $parts - 1;
734              
735             # Unwinding the loop for a 1-part message for micro-optimization
736 100029 100       247365 if ($parts == 1) {
737 16         52 my $bytes = $data;
738 16         59 my $size = $dtotal;
739 16         208 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
740              
741 16 50 66     252 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
742              
743 16   100     179 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  16         271  
744             }
745             else {
746 100013         251130 for (my $part = 0; $part < $parts; $part++) {
747 400041         933217 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
748 400041         2604944 my $size = bytes::length($bytes);
749              
750 400041         2404254 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
751              
752 400041   100     806858 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
753 400041   100     1516193 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  400041         1765746  
754             }
755             }
756              
757 100029         298595 $self->flush();
758 100029         406004 return $parts;
759             }
760              
761             sub read_message {
762 100040     100040 1 129080544 my $self = shift;
763 100040         198442 my %params = @_;
764              
765 100040         242797 my ($id, $out) = $self->_extract_message(%params);
766              
767 100040 100       425532 return $out if defined $id;
768 1         12 return;
769             }
770              
771             sub _extract_message {
772 100050     100050   145969 my $self = shift;
773 100050         159660 my %params = @_;
774              
775 100050   100     295882 my $state = $self->{+STATE} //= {};
776              
777 100050         148379 while (1) {
778 400078 50       749415 unless ($state->{key}) {
779 400078 100       767149 my $key_bytes = $self->_get_from_buffer($psize) or return;
780              
781 400077         573322 my %key;
782 400077         1439224 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
783 400077         819922 $state->{key} = \%key;
784             }
785              
786 400077         605112 my $key = $state->{key};
787              
788 400077   50     740404 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
789              
790 400077         601858 my $id = $key->{id};
791 400077         542790 my $tag = join ':' => @{$key}{qw/pid tid/};
  400077         1038847  
792 400077   100     536065 push @{$state->{parts}->{$tag} //= []} => $id;
  400077         1120325  
793 400077 100       2055546 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
794              
795 400077         644553 delete $state->{key};
796              
797 400077 100       754649 unless ($id == 0) {
798 300035 100       538690 return ($id, undef) if $params{one_part_only};
799 300028         674161 next;
800             }
801              
802 100042         229326 my $message = delete $state->{buffers}->{$tag};
803 100042         184060 my $parts = delete $state->{parts}->{$tag};
804              
805 100042 100       721339 return ($id, $message) unless $params{debug};
806              
807             return (
808             $id,
809             {
810             message => $message,
811             parts => $parts,
812             pid => $key->{pid},
813             tid => $key->{tid},
814             },
815 1         46 );
816             }
817             }
818              
819             1;
820              
821             __END__