File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 424 510 83.1
branch 147 238 61.7
condition 114 221 51.5
subroutine 67 77 87.0
pod 31 36 86.1
total 783 1082 72.3


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 37     37   8606479 use strict;
  37         267  
  37         1069  
3 37     37   205 use warnings;
  37         78  
  37         1315  
4              
5             our $VERSION = '0.019';
6              
7 37     37   17639 use IO();
  37         26982  
  37         900  
8 37     37   261 use Fcntl();
  37         83  
  37         777  
9 37     37   203 use bytes();
  37         66  
  37         997  
10              
11 37     37   221 use Carp qw/croak confess/;
  37         76  
  37         1950  
12 37     37   239 use Config qw/%Config/;
  37         64  
  37         1821  
13 37     37   217 use List::Util qw/min/;
  37         69  
  37         2616  
14 37     37   229 use Scalar::Util qw/blessed/;
  37         85  
  37         2027  
15              
16 37     37   19512 use Errno qw/EINTR EAGAIN EPIPE/;
  37         52746  
  37         4831  
17             my %RETRY_ERRNO;
18             BEGIN {
19 37     37   211 %RETRY_ERRNO = (EINTR() => 1);
20 37 50       21220 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
21             }
22              
23             BEGIN {
24             # POSIX says writes of 512 or less are atomic, but some platforms allow for
25             # larger ones.
26 37     37   323 require POSIX;
27 37 50 33     520 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  37         219  
28 37         151 *PIPE_BUF = \&POSIX::PIPE_BUF;
29             }
30             else {
31 0         0 *PIPE_BUF = sub() { 512 };
32             }
33              
34 37 50 33     199 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  37         155  
35 37         74 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
36             }
37             else {
38 0         0 *SSIZE_MAX = sub() { 512 };
39             }
40              
41             {
42             # Using the default pipe size as a read size is significantly faster
43             # than a larger value on my test machine.
44 37         74 my $read_size = min(SSIZE_MAX(), 65_536);
  37         270  
45 37         728 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
46             }
47              
48 37         135 my $can_thread = 1;
49 37   33     388 $can_thread &&= $] >= 5.008001;
50 37   33     561 $can_thread &&= $Config{'useithreads'};
51              
52             # Threads are broken on perl 5.10.0 built with gcc 4.8+
53 37 0 33     177 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
54 0         0 my @parts = split /\./, $Config{'gccversion'};
55 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
56             }
57              
58 37   33     104 $can_thread &&= !$INC{'Devel/Cover.pm'};
59              
60 37 50       227 if (!$can_thread) {
    0          
61 37         204 *_get_tid = sub() { 0 };
62             }
63             elsif ($INC{'threads.pm'}) {
64 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
65             }
66             else {
67 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
68             }
69              
70 37 50       282 if ($^O eq 'MSWin32') {
71 0         0 local $@;
72 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
73 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
74 0         0 *IS_WIN32 = sub() { 1 };
75             }
76             else {
77 37         1309 *IS_WIN32 = sub() { 0 };
78             }
79             }
80              
81 37     37   276 use constant READ_SIZE => 'read_size';
  37         107  
  37         3050  
82 37     37   237 use constant RH => 'rh';
  37         69  
  37         2163  
83 37     37   228 use constant WH => 'wh';
  37         138  
  37         2240  
84 37     37   236 use constant EOF => 'eof';
  37         72  
  37         2020  
85 37     37   699 use constant STATE => 'state';
  37         89  
  37         2294  
86 37     37   324 use constant OUT_BUFFER => 'out_buffer';
  37         70  
  37         2062  
87 37     37   227 use constant IN_BUFFER => 'in_buffer';
  37         88  
  37         2006  
88 37     37   223 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  37         132  
  37         2091  
89 37     37   243 use constant READ_BLOCKING => 'read_blocking';
  37         108  
  37         2073  
90 37     37   242 use constant WRITE_BLOCKING => 'write_blocking';
  37         66  
  37         1865  
91 37     37   217 use constant BURST_PREFIX => 'burst_prefix';
  37         58  
  37         1708  
92 37     37   218 use constant BURST_POSTFIX => 'burst_postfix';
  37         72  
  37         1642  
93 37     37   198 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  37         77  
  37         1797  
94 37     37   324 use constant MESSAGE_KEY => 'message_key';
  37         101  
  37         2183  
95 37     37   244 use constant MIXED_BUFFER => 'mixed_buffer';
  37         69  
  37         1790  
96 37     37   242 use constant DELIMITER_SIZE => 'delimiter_size';
  37         101  
  37         1927  
97 37     37   221 use constant INVALID_STATE => 'invalid_state';
  37         75  
  37         1725  
98 37     37   220 use constant HIT_EPIPE => 'hit_epipe';
  37         77  
  37         206251  
99              
100 12     12 1 395 sub wh { shift->{+WH} }
101 0     0 1 0 sub rh { shift->{+RH} }
102              
103             sub throw_invalid {
104 6     6 0 16 my $self = shift;
105 6 50 66     89 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
106 6         2265 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
107             }
108              
109             sub read_size {
110 0     0 1 0 my $self = shift;
111 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
112 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
113             }
114              
115             sub fill_buffer {
116 19149     19149 1 28400 my $self = shift;
117              
118 19149 50       40672 $self->throw_invalid() if $self->{+INVALID_STATE};
119              
120 19149 50       40324 my $rh = $self->{+RH} or die "Not a read handle";
121              
122 19149 100       34523 return 0 if $self->{+EOF};
123              
124 19119   100     36027 $self->{+IN_BUFFER_SIZE} //= 0;
125              
126 19119   50     56365 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
127 19119         23606 if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
128             $to_read = min($self->_win32_pipe_ready(), $to_read);
129             }
130              
131 19119 50       34789 return 0 unless $to_read;
132              
133 19119         24783 while(1) {
134 19119         27721 my $rbuff = '';
135 19119         27408921 my $got = sysread($rh, $rbuff, $to_read);
136 19119 100       93717 unless(defined $got) {
137 29 50       585 return 0 if $! == EAGAIN; # NON-BLOCKING
138 0 0       0 next if $RETRY_ERRNO{0 + $!}; # interrupted or something, try again
139 0         0 $self->throw_invalid("$!");
140             }
141              
142 19090 100       44413 if ($got) {
143 19080         297473 $self->{+IN_BUFFER} .= $rbuff;
144 19080         31341 $self->{+IN_BUFFER_SIZE} += $got;
145 19080         56181 return $got;
146             }
147             else {
148 10         117 $self->{+EOF} = 1;
149 10         84 return 0;
150             }
151             }
152              
153 0         0 return 0;
154             }
155              
156 800175     800175   1321231 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
157 18     18   113 sub _peek_from_buffer { shift->_from_buffer(@_) }
158              
159             sub _from_buffer {
160 800193     800193   1054765 my $self = shift;
161 800193         1449774 my ($size, %params) = @_;
162              
163 800193 100 100     2481162 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
164 19066         50768 $self->fill_buffer;
165 19066 100       45720 unless($self->{+IN_BUFFER_SIZE} >= $size) {
166 3 100 66     110 return unless $params{eof_invalid} && $self->{+EOF};
167 2         26 $self->throw_invalid($params{eof_invalid});
168             }
169             }
170              
171 800190         991718 my $out;
172              
173 800190 100       1265904 if ($params{remove}) {
174 800174         1054966 $self->{+IN_BUFFER_SIZE} -= $size;
175 800174         2206627 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
176             }
177             else {
178 16         53 $out = substr($self->{+IN_BUFFER}, 0, $size);
179             }
180              
181 800190         2216415 return $out;
182             }
183              
184             sub eof {
185 41     41 1 1800841 my $self = shift;
186              
187 41 100       204 $self->throw_invalid() if $self->{+INVALID_STATE};
188              
189 38 100       217 return 0 if $self->fill_buffer;
190 35 100       296 return 0 unless $self->{+EOF};
191 21 50       391 return 0 if $self->{+IN_BUFFER_SIZE};
192              
193 21 50       133 if (my $buffer = $self->{+MIXED_BUFFER}) {
194 21 100 66     375 return 0 if $buffer->{lines} || length $buffer->{lines};
195 14 50 33     92 return 0 if $buffer->{burst} || length $buffer->{burst};
196             }
197              
198 14         100 return 1;
199             }
200              
201             sub _fh_mode {
202 4     4   1669 my $self = shift;
203 4         8 my ($fh) = @_;
204              
205 4   50     33 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
206 4 100       18 return '<&' if $mode == Fcntl::O_RDONLY();
207 2 50       11 return '>&' if $mode == Fcntl::O_WRONLY();
208 0         0 return undef;
209             }
210              
211             my %MODE_TO_DIR = (
212             '<&' => RH(),
213             '<&=' => RH(),
214             '>&' => WH(),
215             '>&=' => WH(),
216             );
217             sub _mode_to_dir {
218 9     9   17 my $self = shift;
219 9         13 my ($mode) = @_;
220 9         35 return $MODE_TO_DIR{$mode};
221             }
222              
223             sub read_fifo {
224 7     7 1 26562 my $class = shift;
225 7         28 my ($fifo, %params) = @_;
226              
227 7 50       118 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
228              
229 7 50       301 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
230 7         34 binmode($fh);
231              
232 7         76 return bless({%params, RH() => $fh}, $class);
233             }
234              
235             sub write_fifo {
236 6     6 1 24228 my $class = shift;
237 6         214 my ($fifo, %params) = @_;
238              
239 6 50       432 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
240              
241 6 50       829 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
242 6         94 binmode($fh);
243              
244 6         238 return bless({%params, WH() => $fh}, $class);
245             }
246              
247             sub from_fh {
248 5     5 1 6675 my $class = shift;
249 5         9 my $ifh = pop;
250 5         13 my ($mode) = @_;
251              
252 5 50       46 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
253              
254 5   33     33 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
255 5   33     21 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
256              
257 5 50       131 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
258 5         22 binmode($fh);
259              
260 5         80 return bless({$dir => $fh}, $class);
261             }
262              
263             sub from_fd {
264 4     4 1 3071 my $class = shift;
265 4         9 my ($mode, $fd) = @_;
266              
267 4   33     10 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
268 4 50       87 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
269              
270 4 50       31 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
271              
272 4         10 binmode($fh);
273 4         26 return bless({$dir => $fh}, $class);
274             }
275              
276             sub new {
277 1     1 1 89 my $class = shift;
278 1         3 my (%params) = @_;
279              
280 1         3 my ($rh, $wh);
281 1 50       46 pipe($rh, $wh) or die "Could not create pipe: $!";
282              
283 1         6 binmode($wh);
284 1         2 binmode($rh);
285              
286 1         9 return bless({%params, RH() => $rh, WH() => $wh}, $class);
287             }
288              
289             sub pair {
290 36     36 1 48275 my $class = shift;
291 36         142 my (%params) = @_;
292              
293 36         104 my $mixed = delete $params{mixed_data_mode};
294              
295 36         95 my ($rh, $wh);
296 36 50       1637 pipe($rh, $wh) or die "Could not create pipe: $!";
297              
298 36         195 binmode($wh);
299 36         119 binmode($rh);
300              
301 36         291 my $r = bless({%params, RH() => $rh}, $class);
302 36         181 my $w = bless({%params, WH() => $wh}, $class);
303              
304 36 100       141 if ($mixed) {
305 14         70 $r->set_mixed_data_mode();
306 14         48 $w->set_mixed_data_mode();
307             }
308              
309 36         246 return ($r, $w);
310             }
311              
312             sub set_mixed_data_mode {
313 28     28 1 88 my $self = shift;
314              
315 28 50       115 $self->throw_invalid() if $self->{+INVALID_STATE};
316              
317 28 100       135 $self->read_blocking(0) if $self->{+RH};
318              
319 28   50     257 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
320 28   50     128 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
321 28   50     136 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
322             }
323              
324             sub get_line_burst_or_data {
325 52     52 1 1413679 my $self = shift;
326              
327 52   33     413 my $rh = $self->{+RH} // croak "Not a read handle";
328              
329 52   33     377 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
330 52   33     244 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
331 52   33     357 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
332              
333 52   100     1019 my $buffer = $self->{+MIXED_BUFFER} //= {
334             lines => '',
335             burst => '',
336             in_burst => 0,
337             in_message => 0,
338             do_extra_loop => 0,
339             strip_term => 0,
340             };
341              
342 52         141 while (1) {
343             $self->throw_invalid('Incomplete message received before EOF')
344 102 100 66     721 if $self->{+EOF} && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
345              
346 101 100 66     564 if($buffer->{lines} || length($buffer->{lines})) {
347             # Look for a complete line
348 59         98 my ($line, $term);
349 59         810 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
350              
351 59 100       347 return (line => "${line}${term}") if $term;
352 45 50 66     543 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
353              
354 38         75 $buffer->{lines} = $line;
355             }
356              
357 80 100       207 if ($buffer->{in_message}) {
358 10         100 my ($id, $message) = $self->_extract_message(one_part_only => 1);
359              
360 10 50       94 unless(defined $id) {
361 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
362 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
363             }
364              
365 10         25 $buffer->{strip_term}++;
366 10         42 $buffer->{in_message} = 0;
367 10 100       44 return (message => $message) if defined $message;
368             }
369              
370 77 100       198 if ($buffer->{strip_term}) {
371 10   50     36 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
372              
373 10 50       34 $self->throw_invalid("No message terminator") unless $term eq $postfix;
374 10         35 $buffer->{strip_term}--;
375             }
376              
377 77 100       234 if ($buffer->{in_burst}) {
378 18   50     84 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
379              
380 16 100       71 if ($peek eq $key) {
381 10         135 $self->_get_from_buffer(1); # Strip the key
382 10         15 $buffer->{in_message} = 1;
383 10         35 $buffer->{in_burst} = 0;
384 10         26 next;
385             }
386              
387 6   50     24 $buffer->{burst} //= '';
388 6         18 my ($burst_data, $term);
389 6         188 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
390 6         74 $buffer->{burst} .= $burst_data;
391              
392 6 100       30 if ($term) {
393 4         12 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
394 4         32 $buffer->{in_burst} = 0;
395 4         12 $buffer->{do_extra_loop}++;
396 4         28 return (burst => delete($buffer->{burst}));
397             }
398             else {
399 2         24 $self->{+IN_BUFFER_SIZE} = 0;
400             }
401              
402 2 50       32 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
403             }
404              
405 61 100 100     475 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
406 31 100       162 return unless $self->{+EOF};
407              
408             # Do at least one more iteration after EOF
409 17 100       106 return if $buffer->{+EOF}++;
410              
411             # But do not try to split the empty buffer
412 10         24 next;
413             }
414              
415             # Look for the start of a burst, anything before a burst is line data
416 30         122 my $linedata;
417 30         842 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
418 30 50       160 $buffer->{lines} .= $linedata if defined $linedata;
419              
420 30 100       148 if ($buffer->{in_burst}) {
421 16         69 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
422             }
423             else {
424 14         92 $self->{+IN_BUFFER_SIZE} = 0;
425             }
426             }
427             }
428              
429             sub debug {
430 0     0 0 0 my ($id, $buffer) = @_;
431              
432 0         0 print "---debug $id---\n";
433 0         0 for my $key (sort keys %$buffer) {
434 0   0     0 my $val = $buffer->{$key} // '';
435 0         0 $val =~ s/\x0E/\\x0E/g;
436 0         0 $val =~ s/\x0F/\\x0F/g;
437 0         0 $val =~ s/\x10/\\x10/g;
438 0         0 $val =~ s/\n/\\n/g;
439 0         0 $val =~ s/\r/\\r/g;
440 0         0 print "$key: |$val|\n\n";
441             };
442             }
443              
444             # This is a heavily modified version of a pattern suggested on stack-overflow
445             # and also used in Win32::PowerShell::IPC.
446             my $peek_named_pipe;
447             sub _win32_pipe_ready {
448 0     0   0 my $self = shift;
449 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
450              
451 0         0 my $buf = "";
452 0         0 my $buflen = 0;
453              
454 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
455             || die "Can't load PeekNamedPipe from kernel32.dll";
456              
457 0         0 my $got = pack('L', 0);
458 0         0 my $avail = pack('L', 0);
459 0         0 my $remain = pack('L', 0);
460              
461 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
462              
463 0 0       0 $self->{+EOF} = 1 if $ret == 0;
464              
465 0         0 return unpack('L', $avail);
466             }
467              
468             my $set_named_pipe_handle_state;
469             sub _win32_set_pipe_state {
470 0     0   0 my $self = shift;
471 0         0 my ($state) = @_;
472 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
473              
474 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
475             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
476              
477             # Block or non-block?
478 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
479              
480 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
481              
482 0         0 return $ret;
483             }
484              
485             sub read_blocking {
486 19     19 0 51 my $self = shift;
487 19 50       122 my $rh = $self->{+RH} or croak "Not a reader";
488              
489 19 50       120 ($self->{+READ_BLOCKING}) = @_ if @_;
490              
491 19         46 unless (IS_WIN32) {
492 19         371 $rh->blocking(@_);
493             }
494              
495 19         96 return $self->{+READ_BLOCKING};
496             }
497              
498             sub write_blocking {
499 5     5 0 14 my $self = shift;
500 5 50       20 my $wh = $self->{+WH} or croak "Not a writer";
501              
502 5 50       12 return $self->{+WRITE_BLOCKING} unless @_;
503              
504 5         10 my ($val) = @_;
505 5         15 $self->{+WRITE_BLOCKING} = $val;
506              
507 5         11 if (IS_WIN32) {
508             $self->_win32_set_pipe_state(@_) if @_;
509             }
510             else {
511 5         10 my $flags = 0;
512 5 50       67 fcntl($wh, &Fcntl::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
513 5 100       23 if ($val) { $flags ^= &Fcntl::O_NONBLOCK } # Remove non-blocking
  1         70  
514 4         16 else { $flags |= &Fcntl::O_NONBLOCK } # Add non-blocking to the flags
515 5 50       59 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
516             }
517              
518 5         22 return $self->{+WRITE_BLOCKING};
519             }
520              
521             sub blocking {
522 10     10 1 1079585 my $self = shift;
523              
524 10 100 66     253 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
525 5         58 return $self->read_blocking(@_);
526             }
527             elsif ($self->{+WH} && !$self->{+RH}) {
528 5         28 return $self->write_blocking(@_);
529             }
530              
531 0         0 my $r = $self->read_blocking(@_);
532 0         0 my $w = $self->write_blocking(@_);
533              
534 0 0 0     0 return 1 if $r && $w;
535 0 0 0     0 return 0 if !$r && !$w;
536 0         0 return undef;
537             }
538              
539             sub size {
540 0     0 1 0 my $self = shift;
541 0 0       0 return unless defined &Fcntl::F_GETPIPE_SZ;
542 0   0     0 my $fh = $self->{+WH} // $self->{+RH};
543 0         0 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
544             }
545              
546             sub resize {
547 3     3 1 4260 my $self = shift;
548 3         66 my ($size) = @_;
549              
550 3 50       117 return unless defined &Fcntl::F_SETPIPE_SZ;
551 3   33     84 my $fh = $self->{+WH} // $self->{+RH};
552              
553 3         57 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size);
554             }
555              
556             my $ONE_MB = 1 * 1024 * 1024;
557              
558             sub max_size {
559 0 0   0 1 0 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
560              
561 0 0       0 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
562 0         0 chomp(my $val = <$max>);
563 0         0 close($max);
564 0   0     0 return $val || $ONE_MB;
565             }
566              
567             sub resize_or_max {
568 0     0 1 0 my $self = shift;
569 0         0 my ($size) = @_;
570 0         0 $size = min($size, $self->max_size);
571 0         0 $self->resize($size);
572             }
573              
574             sub is_reader {
575 4     4 1 801 my $self = shift;
576 4 50 33     42 return 1 if $self->{+RH} && !$self->{+WH};
577 0         0 return undef;
578             }
579              
580             sub is_writer {
581 4     4 1 9 my $self = shift;
582 4 50 33     31 return 1 if $self->{+WH} && !$self->{+RH};
583 0         0 return undef;
584             }
585              
586             sub clone_writer {
587 3     3 1 13 my $self = shift;
588 3         12 my $class = blessed($self);
589 3 50       69 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
590 3         20 return bless({WH() => $fh}, $class);
591             }
592              
593             sub clone_reader {
594 0     0 1 0 my $self = shift;
595 0         0 my $class = blessed($self);
596 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
597 0         0 return bless({RH() => $fh}, $class);
598             }
599              
600             sub writer {
601 0     0 1 0 my $self = shift;
602              
603 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
604              
605 0 0       0 return 1 unless $self->{+RH};
606              
607 0         0 close(delete $self->{+RH});
608 0         0 return 1;
609             }
610              
611             sub reader {
612 1     1 1 6 my $self = shift;
613              
614 1 50       4 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
615              
616 1 50       4 return 1 unless $self->{+WH};
617              
618 1         7 close(delete $self->{+WH});
619 1         6 return 1;
620             }
621              
622             sub close {
623 11     11 1 4925 my $self = shift;
624 11 100       399 close(delete $self->{+WH}) if $self->{+WH};
625 11 100       359 close(delete $self->{+RH}) if $self->{+RH};
626 11         56 return;
627             }
628              
629             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
630             my $dsize = PIPE_BUF - $psize;
631              
632             sub delimiter_size {
633 35 50   35 0 5328 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
634 35   100     1273 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
635             }
636              
637             sub fits_in_burst {
638 8     8 1 59 my $self = shift;
639 8         24 my ($data) = @_;
640              
641 8   100     94 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
642 8 100       207 return undef unless $size <= PIPE_BUF;
643              
644 7         47 return $size;
645             }
646              
647             sub write_burst {
648 4     4 1 1890 my $self = shift;
649 4         37 my ($data) = @_;
650              
651 4   100     38 my $size = $self->fits_in_burst($data) // return undef;
652              
653 3   100     12 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  3         47  
654 3         56 $self->flush();
655              
656 3         25 return 1;
657             }
658              
659             sub DESTROY {
660 95     95   12735980 my $self = shift;
661 95 50       448 return if $self->{+HIT_EPIPE};
662 95 50       597 $self->flush(blocking => 1) if $self->pending_output;
663             }
664              
665             sub pending_output {
666 97     97 1 1374 my $self = shift;
667 97 100       7922 my $buffer = $self->{+OUT_BUFFER} or return 0;
668 32 100       3800 return 0 unless @$buffer;
669 1         6 return 1;
670             }
671              
672             sub flush {
673 684254     684254 1 2641416 my $self = shift;
674 684254         1079966 my %params = @_;
675 684254   66     2111012 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
676              
677 684254   50     1209424 my $buffer = $self->{+OUT_BUFFER} // return;
678              
679 684254         1146453 while (@$buffer) {
680 1222954         1700847 my $set = $buffer->[0];
681 1222954         2296685 my $got = $self->_write_burst(@$set);
682              
683 1222952 100 100     3574084 return unless $blocking || defined $got;
684 638729 100       1266297 next unless defined $got;
685              
686 400058         1392845 shift @$buffer;
687             }
688              
689 100029         247754 return;
690             }
691              
692             sub _write_burst {
693 1222953     1222953   1590912 my $self = shift;
694 1222953         2056412 my ($data, $size) = @_;
695              
696 1222953 50       2405800 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
697              
698 1222953 100       2098275 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
699              
700 1222952   100     2929137 my $prefix = $self->{+BURST_PREFIX} // '';
701 1222952   100     2697882 my $postfix = $self->{+BURST_POSTFIX} // '';
702              
703 1222952 100 66     3350229 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
704              
705 1222952         1514259 my $wrote;
706 1222952         1476851 my $loop = 0;
707             SWRITE: {
708 1222952         1530899 $wrote = syswrite($wh, $data, $size);
  1222952         649655003  
709 1222952 100 100     8058423 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
710 1         51 $self->{+HIT_EPIPE} = 1;
711 1         4 delete $self->{+OUT_BUFFER};
712 1         231 croak "Disconnected pipe";
713             }
714 1222951 100 100     4749237 return undef if $! == EAGAIN || (IS_WIN32 && $! == 28); # NON-BLOCKING
715 400058 50 33     1578682 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
716 400058 50       962638 last SWRITE if $wrote == $size;
717 0   0     0 $wrote //= "";
718 0         0 die "$wrote vs $size: $!";
719             }
720              
721 400058         1094580 return $wrote;
722             }
723              
724             sub _adjusted_dsize {
725 32     32   275 my $self = shift;
726              
727 32 50       456 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
728              
729 32   100     1039 my $message_key = $self->{+MESSAGE_KEY} // '';
730 32   100     771 my $prefix = $self->{+BURST_PREFIX} // '';
731 32   100     643 my $postfix = $self->{+BURST_POSTFIX} // '';
732              
733 32         1923 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
734 32         48564 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
735             }
736              
737             sub write_message {
738 100029     100029 1 23556140 my $self = shift;
739 100029         222263 my ($data) = @_;
740              
741 100029         148185 my $tid = _get_tid();
742 100029   100     384417 my $message_key = $self->{+MESSAGE_KEY} // '';
743 100029   66     251773 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
744 100029         290121 my $dtotal = bytes::length($data);
745              
746 100029         491179 my $parts = int($dtotal / $adjusted_dsize);
747 100029 50       261731 $parts++ if $dtotal % $adjusted_dsize;
748              
749 100029         161131 my $id = $parts - 1;
750              
751             # Unwinding the loop for a 1-part message for micro-optimization
752 100029 100       251890 if ($parts == 1) {
753 16         77 my $bytes = $data;
754 16         48 my $size = $dtotal;
755 16         278 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
756              
757 16 50 66     281 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
758              
759 16   100     374 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  16         228  
760             }
761             else {
762 100013         256060 for (my $part = 0; $part < $parts; $part++) {
763 400041         896905 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
764 400041         2456326 my $size = bytes::length($bytes);
765              
766 400041         2410527 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
767              
768 400041   100     812818 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
769 400041   100     1449036 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  400041         1681241  
770             }
771             }
772              
773 100029         286777 $self->flush();
774 100029         414737 return $parts;
775             }
776              
777             sub read_message {
778 100040     100040 1 127356350 my $self = shift;
779 100040         202218 my %params = @_;
780              
781 100040         251368 my ($id, $out) = $self->_extract_message(%params);
782              
783 100040 100       416139 return $out if defined $id;
784 1         19 return;
785             }
786              
787             sub _extract_message {
788 100050     100050   145362 my $self = shift;
789 100050         152464 my %params = @_;
790              
791 100050   100     278875 my $state = $self->{+STATE} //= {};
792              
793 100050         146985 while (1) {
794 400078 50       745397 unless ($state->{key}) {
795 400078 100       749309 my $key_bytes = $self->_get_from_buffer($psize) or return;
796              
797 400077         579377 my %key;
798 400077         1462721 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
799 400077         834247 $state->{key} = \%key;
800             }
801              
802 400077         596724 my $key = $state->{key};
803              
804 400077   50     723284 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
805              
806 400077         617061 my $id = $key->{id};
807 400077         531600 my $tag = join ':' => @{$key}{qw/pid tid/};
  400077         1036818  
808 400077   100     533318 push @{$state->{parts}->{$tag} //= []} => $id;
  400077         1095718  
809 400077 100       2027579 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
810              
811 400077         638485 delete $state->{key};
812              
813 400077 100       715625 unless ($id == 0) {
814 300035 100       537349 return ($id, undef) if $params{one_part_only};
815 300028         637653 next;
816             }
817              
818 100042         228166 my $message = delete $state->{buffers}->{$tag};
819 100042         180710 my $parts = delete $state->{parts}->{$tag};
820              
821 100042 100       705463 return ($id, $message) unless $params{debug};
822              
823             return (
824             $id,
825             {
826             message => $message,
827             parts => $parts,
828             pid => $key->{pid},
829             tid => $key->{tid},
830             },
831 1         53 );
832             }
833             }
834              
835             1;
836              
837             __END__