File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 623 686 90.8
branch 251 346 72.5
condition 183 288 63.5
subroutine 98 103 95.1
pod 41 45 91.1
total 1196 1468 81.4


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 74     74   11326244 use strict;
  74         130  
  74         2160  
3 74     74   481 use warnings;
  74         126  
  74         3635  
4              
5             our $VERSION = '0.031';
6              
7 74     74   24607 use IO();
  74         58924  
  74         1855  
8 74     74   32235 use IO::Handle();
  74         310034  
  74         1607  
9 74     74   408 use Fcntl();
  74         88  
  74         727  
10 74     74   25696 use bytes();
  74         28316  
  74         4512  
11              
12             BEGIN {
13 74 50   74   161 if (eval { require IO::Select; 1 }) {
  74         29150  
  74         94412  
14 74         2034 *HAVE_IO_SELECT = sub() { 1 };
15             }
16             else {
17 0         0 *HAVE_IO_SELECT = sub() { 0 };
18             }
19             }
20              
21 74     74   381 use Carp qw/croak confess/;
  74         83  
  74         3939  
22 74     74   336 use Config qw/%Config/;
  74         87  
  74         2270  
23 74     74   311 use List::Util qw/min/;
  74         88  
  74         3495  
24 74     74   232 use Scalar::Util qw/blessed/;
  74         107  
  74         2760  
25              
26 74     74   28819 use Errno qw/EINTR EAGAIN EPIPE/;
  74         89744  
  74         10894  
27             my (%RETRY_ERRNO, %NONBLOCK_ERRNO);
28             BEGIN {
29 74     74   315 %RETRY_ERRNO = (EINTR() => 1);
30 74 50       1047 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
31              
32             # EWOULDBLOCK == EAGAIN on most platforms, but POSIX allows them to differ.
33 74         327 %NONBLOCK_ERRNO = (EAGAIN() => 1);
34 74 50       32169 $NONBLOCK_ERRNO{Errno->EWOULDBLOCK} = 1 if Errno->can('EWOULDBLOCK');
35             }
36              
37             BEGIN {
38             # POSIX says writes of 512 or less are atomic, but some platforms allow for
39             # larger ones.
40 74     74   31817 require POSIX;
41 74 50 33     403167 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  74         548  
42 74         267 *PIPE_BUF = \&POSIX::PIPE_BUF;
43             }
44             else {
45 0         0 *PIPE_BUF = sub() { 512 };
46             }
47              
48 74 50 33     355 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  74         287  
49 74         110 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
50             }
51             else {
52             # POSIX guarantees SSIZE_MAX is at least 32767 (_POSIX_SSIZE_MAX).
53 0         0 *SSIZE_MAX = sub() { 32767 };
54             }
55              
56             {
57             # Using the default pipe size as a read size is significantly faster
58             # than a larger value on my test machine.
59 74         106 my $read_size = min(SSIZE_MAX(), 65_536);
  74         274  
60 74         839 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
61             }
62              
63 74         154 my $can_thread = 1;
64 74   33     398 $can_thread &&= $] >= 5.008001;
65 74   33     847 $can_thread &&= $Config{'useithreads'};
66              
67             # Threads are broken on perl 5.10.0 built with gcc 4.8+
68 74 0 33     274 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
69 0         0 my @parts = split /\./, $Config{'gccversion'};
70 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
71             }
72              
73 74   33     190 $can_thread &&= !$INC{'Devel/Cover.pm'};
74              
75 74 50       158 if (!$can_thread) {
    0          
76 74         111 *_get_tid = sub() { 0 };
77             }
78             elsif ($INC{'threads.pm'}) {
79 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
80             }
81             else {
82 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
83             }
84              
85 74 50       225 if ($^O eq 'MSWin32') {
86 0         0 local $@;
87 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
88 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
89 0         0 *IS_WIN32 = sub() { 1 };
90             }
91             else {
92 74         2096 *IS_WIN32 = sub() { 0 };
93             }
94             }
95              
96 74     74   392 use constant READ_SIZE => 'read_size';
  74         99  
  74         4800  
97 74     74   306 use constant RH => 'rh';
  74         87  
  74         2800  
98 74     74   270 use constant WH => 'wh';
  74         99  
  74         2141  
99 74     74   209 use constant EOF => 'eof';
  74         80  
  74         1863  
100 74     74   224 use constant STATE => 'state';
  74         89  
  74         1996  
101 74     74   217 use constant OUT_BUFFER => 'out_buffer';
  74         94  
  74         1765  
102 74     74   235 use constant IN_BUFFER => 'in_buffer';
  74         77  
  74         1829  
103 74     74   234 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  74         87  
  74         2198  
104 74     74   449 use constant READ_BLOCKING => 'read_blocking';
  74         383  
  74         2576  
105 74     74   294 use constant WRITE_BLOCKING => 'write_blocking';
  74         88  
  74         2146  
106 74     74   222 use constant BURST_PREFIX => 'burst_prefix';
  74         79  
  74         2116  
107 74     74   217 use constant BURST_POSTFIX => 'burst_postfix';
  74         123  
  74         2439  
108 74     74   304 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  74         89  
  74         2986  
109 74     74   344 use constant MESSAGE_KEY => 'message_key';
  74         73  
  74         2638  
110 74     74   205 use constant MIXED_BUFFER => 'mixed_buffer';
  74         130  
  74         2182  
111 74     74   240 use constant DELIMITER_SIZE => 'delimiter_size';
  74         81  
  74         2630  
112 74     74   310 use constant INVALID_STATE => 'invalid_state';
  74         103  
  74         2382  
113 74     74   279 use constant HIT_EPIPE => 'hit_epipe';
  74         75  
  74         3276  
114 74     74   255 use constant USE_IO_SELECT => 'use_io_select';
  74         103  
  74         2606  
115 74     74   242 use constant COMPRESSION => 'compression';
  74         87  
  74         2619  
116 74     74   254 use constant COMPRESSION_LEVEL => 'compression_level';
  74         77  
  74         1963  
117 74     74   261 use constant COMPRESSION_DICTIONARY => 'compression_dictionary';
  74         150  
  74         2511  
118 74     74   257 use constant COMPRESSION_DICTIONARY_FILE => 'compression_dictionary_file';
  74         95  
  74         2503  
119 74     74   365 use constant KEEP_COMPRESSED => 'keep_compressed';
  74         74  
  74         428889  
120              
121 71     71 1 1736 sub wh { shift->{+WH} }
122 10     10 1 16504 sub rh { shift->{+RH} }
123              
124             sub throw_invalid {
125 37     37 0 454 my $self = shift;
126 37 50 66     255 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
127 37         14393 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
128             }
129              
130             sub read_size {
131 0     0 1 0 my $self = shift;
132 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
133 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
134             }
135              
136             sub use_io_select {
137 168605     168605 1 169810 my $self = shift;
138 168605 100       239388 if (@_) {
139 27 50 100     107 croak "IO::Select is not installed, cannot enable use_io_select" if $_[0] && !HAVE_IO_SELECT;
140 27 100       143 $self->{+USE_IO_SELECT} = $_[0] ? 1 : 0;
141 27 100       51 delete $self->{_select} unless $_[0];
142             }
143 168605         141123 return 0 unless HAVE_IO_SELECT;
144 168605         219245 my $val = $self->{+USE_IO_SELECT};
145 168605 100       324698 return defined($val) ? ($val ? 1 : 0) : IS_WIN32 ? 0 : 1;
    100          
146             }
147              
148 19     19 1 117 sub compression { $_[0]->{+COMPRESSION} }
149 6     6 1 20 sub compression_level { $_[0]->{+COMPRESSION_LEVEL} }
150 10     10 1 52 sub compression_dictionary { $_[0]->{+COMPRESSION_DICTIONARY} }
151 8     8 1 34 sub compression_dictionary_file { $_[0]->{+COMPRESSION_DICTIONARY_FILE} }
152 10 100   10 1 44 sub keep_compressed { $_[0]->{+KEEP_COMPRESSED} ? 1 : 0 }
153              
154             sub fill_buffer {
155 168799     168799 1 165939 my $self = shift;
156              
157 168799 50       267555 $self->throw_invalid() if $self->{+INVALID_STATE};
158              
159 168799 50       289474 my $rh = $self->{+RH} or die "Not a read handle";
160              
161 168799 100       252113 return 0 if $self->{+EOF};
162              
163 168578   100     228578 $self->{+IN_BUFFER_SIZE} //= 0;
164              
165 168578   50     381944 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
166              
167 168578         266716 my $use_select = $self->use_io_select;
168              
169 168578 100       219018 if ($use_select) {
170 144395   66     207340 my $sel = $self->{_select} //= IO::Select->new($rh);
171 144395   100     321789 my $blocking = $self->{+READ_BLOCKING} // 1;
172 144395 100       444851 my @ready = $sel->can_read($blocking ? undef : 0);
173 144395 100       3869304 return 0 unless @ready;
174             }
175 0         0 elsif (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
176             $to_read = min($self->_win32_pipe_ready(), $to_read);
177             }
178              
179 168394 50       261328 return 0 unless $to_read;
180              
181 168394         154522 while (1) {
182 168394         191311 my $rbuff = '';
183 168394         6831495 my $got = sysread($rh, $rbuff, $to_read);
184 168394 100       362372 unless(defined $got) {
185 97 50       1728 return 0 if $NONBLOCK_ERRNO{0 + $!}; # NON-BLOCKING
186 0 0       0 if ($RETRY_ERRNO{0 + $!}) {
187 0 0       0 next unless $use_select; # retry on EINTR in fallback mode
188 0         0 return 0; # IO::Select handles EINTR
189             }
190 0         0 $self->throw_invalid("$!");
191             }
192              
193 168297 100       223581 if ($got) {
194 168248         2178603 $self->{+IN_BUFFER} .= $rbuff;
195 168248         220441 $self->{+IN_BUFFER_SIZE} += $got;
196 168248         322451 return $got;
197             }
198             else {
199 49         432 $self->{+EOF} = 1;
200 49         134 return 0;
201             }
202             }
203              
204 0         0 return 0;
205             }
206              
207 5600779     5600779   7116396 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
208 68     68   161 sub _peek_from_buffer { shift->_from_buffer(@_) }
209              
210             sub _from_buffer {
211 5600847     5600847   5378805 my $self = shift;
212 5600847         7106670 my ($size, %params) = @_;
213              
214 5600847 100 100     12122857 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
215 168181         313239 $self->fill_buffer;
216 168181 100       291662 unless($self->{+IN_BUFFER_SIZE} >= $size) {
217 7 50 33     174 return unless $params{eof_invalid} && $self->{+EOF};
218 0         0 $self->throw_invalid($params{eof_invalid});
219             }
220             }
221              
222 5600840         5186370 my $out;
223              
224 5600840 100       6416953 if ($params{remove}) {
225 5600772         5473124 $self->{+IN_BUFFER_SIZE} -= $size;
226 5600772         9962580 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
227             }
228             else {
229 68         183 $out = substr($self->{+IN_BUFFER}, 0, $size);
230             }
231              
232 5600840         9315648 return $out;
233             }
234              
235             sub _has_dict {
236             return defined($_[0]->{+COMPRESSION_DICTIONARY})
237 100   100 100   365 || defined($_[0]->{+COMPRESSION_DICTIONARY_FILE});
238             }
239              
240             # NOTE: raw zstd dictionaries do not embed a dict-ID, so a mismatched peer
241             # dict will silently decode to garbage rather than fail. Both ends must agree
242             # on byte-identical dictionary content.
243             sub _build_cdict {
244 10     10   10 my $self = shift;
245 10   50     28 my $level = $self->{+COMPRESSION_LEVEL} // 3;
246 10         26 require Compress::Zstd::CompressionDictionary;
247 10 100       20 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
248 4         18 return Compress::Zstd::CompressionDictionary->new_from_file($path, $level);
249             }
250 6         122 return Compress::Zstd::CompressionDictionary->new($self->{+COMPRESSION_DICTIONARY}, $level);
251             }
252              
253             sub _build_ddict {
254 10     10   10 my $self = shift;
255 10         28 require Compress::Zstd::DecompressionDictionary;
256 10 100       24 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
257 4         18 return Compress::Zstd::DecompressionDictionary->new_from_file($path);
258             }
259 6         104 return Compress::Zstd::DecompressionDictionary->new($self->{+COMPRESSION_DICTIONARY});
260             }
261              
262             sub _compress {
263 49     49   105 my ($self, $data) = @_;
264 49 100       106 if ($self->_has_dict) {
265 10         48 require Compress::Zstd::CompressionContext;
266 10   66     228 my $ctx = $self->{_compression_ctx} //= Compress::Zstd::CompressionContext->new;
267 10   33     34 my $cdict = $self->{_compression_cdict} //= $self->_build_cdict;
268 10         744 return $ctx->compress_using_dict($data, $cdict);
269             }
270 39   100     11442 return Compress::Zstd::compress($data, $self->{+COMPRESSION_LEVEL} // 3);
271             }
272              
273             sub _decompress {
274 51     51   1523 my ($self, $data) = @_;
275 51         51 my $out;
276 51 100       111 if ($self->_has_dict) {
277 12         48 require Compress::Zstd::DecompressionContext;
278 12   66     228 my $ctx = $self->{_decompression_ctx} //= Compress::Zstd::DecompressionContext->new;
279 12   66     38 my $ddict = $self->{_decompression_ddict} //= $self->_build_ddict;
280 12         434 $out = $ctx->decompress_using_dict($data, $ddict);
281             }
282             else {
283 39         937 $out = Compress::Zstd::decompress($data);
284             }
285 51 100       140 $self->throw_invalid("zstd decompression failed") unless defined $out;
286 45         90 return $out;
287             }
288              
289             sub eof {
290 521     521 1 8440920 my $self = shift;
291              
292 521 100       1349 $self->throw_invalid() if $self->{+INVALID_STATE};
293              
294 507 100       1376 return 0 if $self->fill_buffer;
295 443 100       1195 return 0 unless $self->{+EOF};
296 211 100       380 return 0 if $self->{+IN_BUFFER_SIZE};
297              
298 151 50       360 if (my $buffer = $self->{+MIXED_BUFFER}) {
299 151 100 100     626 return 0 if defined($buffer->{lines}) && length($buffer->{lines});
300 102 100 100     427 return 0 if defined($buffer->{burst}) && length($buffer->{burst});
301             }
302              
303 101         368 return 1;
304             }
305              
306             sub _fh_mode {
307 8     8   1635 my $self = shift;
308 8         11 my ($fh) = @_;
309              
310 8   50     31 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
311 8 100       20 return '<&' if $mode == Fcntl::O_RDONLY();
312 4 50       12 return '>&' if $mode == Fcntl::O_WRONLY();
313 0         0 return undef;
314             }
315              
316             my %MODE_TO_DIR = (
317             '<&' => RH(),
318             '<&=' => RH(),
319             '>&' => WH(),
320             '>&=' => WH(),
321             );
322             sub _mode_to_dir {
323 24     24   24 my $self = shift;
324 24         29 my ($mode) = @_;
325 24         54 return $MODE_TO_DIR{$mode};
326             }
327              
328             sub _check_params {
329 228     228   740 my ($class, %params) = @_;
330             croak "IO::Select is not installed, cannot enable use_io_select"
331 228 50 100     1330 if $params{+USE_IO_SELECT} && !HAVE_IO_SELECT;
332              
333 228 100       681 if (defined(my $algo = $params{+COMPRESSION})) {
334 41 100       638 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
335             croak "compression => 'zstd' requires Compress::Zstd"
336 37 50       38 unless eval { require Compress::Zstd; 1 };
  37         183  
  37         79  
337             }
338              
339             croak "compression_dictionary and compression_dictionary_file are mutually exclusive"
340 224 100 100     947 if defined($params{+COMPRESSION_DICTIONARY}) && defined($params{+COMPRESSION_DICTIONARY_FILE});
341              
342             croak "compression_dictionary requires compression to be enabled"
343             if (defined($params{+COMPRESSION_DICTIONARY}) || defined($params{+COMPRESSION_DICTIONARY_FILE}))
344 222 100 100     2503 && !defined($params{+COMPRESSION});
      100        
345             }
346              
347             sub read_fifo {
348 22     22 1 47280416 my $class = shift;
349 22         143 my ($fifo, %params) = @_;
350              
351 22         191 $class->_check_params(%params);
352 22 50       657 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
353              
354 22 50       1065 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
355 22         64 binmode($fh);
356              
357 22         208 return $class->_new_from_params(\%params, RH() => $fh);
358             }
359              
360             sub write_fifo {
361 13     13 1 184012 my $class = shift;
362 13         546 my ($fifo, %params) = @_;
363              
364 13         1496 $class->_check_params(%params);
365 13 50       991 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
366              
367 13 50       2537 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
368 13         724 binmode($fh);
369              
370 13         1099 return $class->_new_from_params(\%params, WH() => $fh);
371             }
372              
373             sub from_fh {
374 14     14 1 316645 my $class = shift;
375              
376             # Mode is optional: from_fh($fh, %params) or from_fh($mode, $fh, %params).
377 14         18 my $mode;
378 14 100 66     75 $mode = shift if @_ && !ref($_[0]) && $MODE_TO_DIR{$_[0]};
      66        
379 14         30 my $ifh = shift;
380 14         22 my %params = @_;
381              
382 14         37 $class->_check_params(%params);
383              
384 13 50       62 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
385              
386 13   33     26 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
387 13   33     28 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
388              
389 13 50       154 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
390 13         19 binmode($fh);
391              
392 13         62 return $class->_new_from_params(\%params, $dir => $fh);
393             }
394              
395             sub from_fd {
396 12     12 1 3872 my $class = shift;
397 12         24 my ($mode, $fd, %params) = @_;
398              
399 12         24 $class->_check_params(%params);
400              
401 11   33     17 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
402 11 50       141 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
403              
404 11 50       37 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
405              
406 11         13 binmode($fh);
407 11         35 return $class->_new_from_params(\%params, $dir => $fh);
408             }
409              
410             sub _new_from_params {
411 373     373   1434 my ($class, $params, @handles) = @_;
412              
413 373         577 my $mixed = delete $params->{mixed_data_mode};
414              
415 373         1567 my $self = bless({%$params, @handles}, $class);
416 373 100       1369 $self->set_mixed_data_mode() if $mixed;
417              
418 373         1348 return $self;
419             }
420              
421             sub new {
422 12     12 1 162703 my $class = shift;
423 12         63 my (%params) = @_;
424              
425 12         45 $class->_check_params(%params);
426              
427 4         5 my ($rh, $wh);
428 4 50       165 pipe($rh, $wh) or die "Could not create pipe: $!";
429              
430 4         10 binmode($wh);
431 4         24 binmode($rh);
432              
433 4         26 return $class->_new_from_params(\%params, RH() => $rh, WH() => $wh);
434             }
435              
436             sub pair {
437 155     155 1 52698747 my $class = shift;
438 155         829 my (%params) = @_;
439              
440 155         806 $class->_check_params(%params);
441              
442 155         382 my ($rh, $wh);
443 155 50       5953 pipe($rh, $wh) or die "Could not create pipe: $!";
444              
445 155         418 binmode($wh);
446 155         261 binmode($rh);
447              
448 155         1169 my $r = $class->_new_from_params({%params}, RH() => $rh);
449 155         853 my $w = $class->_new_from_params({%params}, WH() => $wh);
450              
451 155         813 return ($r, $w);
452             }
453              
454             sub set_mixed_data_mode {
455 111     111 1 121 my $self = shift;
456              
457 111 50       245 $self->throw_invalid() if $self->{+INVALID_STATE};
458              
459 111 100       461 $self->read_blocking(0) if $self->{+RH};
460              
461 111   50     384 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
462 111   50     323 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
463 111   50     311 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
464             }
465              
466             sub set_compression {
467 16     16 1 98 my $self = shift;
468 16         45 my ($algo, $level) = @_;
469              
470 16 100       42 if (!defined $algo) {
471 6         10 delete $self->{+COMPRESSION};
472 6         8 delete $self->{+COMPRESSION_LEVEL};
473 6         6 delete $self->{_compression_ctx};
474 6         6 delete $self->{_compression_cdict};
475 6         6 delete $self->{_decompression_ctx};
476 6         8 delete $self->{_decompression_ddict};
477 6         6 delete $self->{_compress_cache};
478 6         8 return;
479             }
480              
481 10 100       329 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
482             croak "compression => 'zstd' requires Compress::Zstd"
483 8 50       20 unless eval { require Compress::Zstd; 1 };
  8         60  
  8         18  
484              
485 8         28 $self->{+COMPRESSION} = $algo;
486             # Omitted $level preserves the previously set level; pass undef to
487             # set_compression(undef) first if a full reset back to the default is desired.
488 8 100       17 $self->{+COMPRESSION_LEVEL} = $level if defined $level;
489              
490             # Cached objects depend on level / dict; force rebuild.
491 8         22 delete $self->{_compression_ctx};
492 8         10 delete $self->{_compression_cdict};
493 8         12 delete $self->{_decompression_ctx};
494 8         6 delete $self->{_decompression_ddict};
495 8         9 delete $self->{_compress_cache};
496              
497 8         15 return;
498             }
499              
500             sub set_compression_dictionary {
501 4     4 1 18 my ($self, $bytes) = @_;
502 4 50       10 if (defined $bytes) {
503             croak "compression_dictionary requires compression to be enabled"
504 4 50       10 unless defined $self->{+COMPRESSION};
505 4         6 $self->{+COMPRESSION_DICTIONARY} = $bytes;
506 4         4 delete $self->{+COMPRESSION_DICTIONARY_FILE};
507             }
508             else {
509 0         0 delete $self->{+COMPRESSION_DICTIONARY};
510             }
511 4         6 delete $self->{_compression_cdict};
512 4         2 delete $self->{_decompression_ddict};
513 4         6 delete $self->{_compress_cache};
514 4         6 return;
515             }
516              
517             sub set_compression_dictionary_file {
518 8     8 1 904 my ($self, $path) = @_;
519 8 100       18 if (defined $path) {
520             croak "compression_dictionary requires compression to be enabled"
521 4 50       8 unless defined $self->{+COMPRESSION};
522 4         6 $self->{+COMPRESSION_DICTIONARY_FILE} = $path;
523 4         6 delete $self->{+COMPRESSION_DICTIONARY};
524             }
525             else {
526 4         6 delete $self->{+COMPRESSION_DICTIONARY_FILE};
527             }
528 8         16 delete $self->{_compression_cdict};
529 8         20 delete $self->{_decompression_ddict};
530 8         12 delete $self->{_compress_cache};
531 8         8 return;
532             }
533              
534             sub set_keep_compressed {
535 4     4 1 8 my ($self, $val) = @_;
536 4 100       12 $self->{+KEEP_COMPRESSED} = $val ? 1 : 0;
537 4         6 return;
538             }
539              
540             sub get_line_burst_or_data {
541 190     190 1 2867284 my $self = shift;
542 190         502 my %params = @_;
543              
544 190   33     800 my $rh = $self->{+RH} // croak "Not a read handle";
545              
546 190   33     802 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
547 190   33     1132 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
548 190   33     1075 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
549              
550 190   100     3294 my $buffer = $self->{+MIXED_BUFFER} //= {
551             lines => '',
552             burst => '',
553             in_burst => 0,
554             in_message => 0,
555             strip_term => 0,
556             };
557              
558 190         595 my $peek;
559              
560 190         369 while (1) {
561             $self->throw_invalid('Incomplete message received before EOF')
562 376 100 66     1201 if $self->eof && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
563              
564 370 100 66     1222 if($buffer->{lines} || length($buffer->{lines})) {
565             # Look for a complete line
566 194         221 my ($line, $term);
567 194         3086 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
568              
569 194 100       735 return (line => "${line}${term}") if $term;
570 142 50 66     591 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
571              
572 118         155 $buffer->{lines} = $line;
573 118 50 66     293 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
574             }
575              
576 294 100       605 if ($buffer->{in_message}) {
577 40         353 my ($id, $message) = $self->_extract_message(one_part_only => 1);
578              
579 40 50       92 unless(defined $id) {
580 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
581 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
582             }
583              
584 40         63 $buffer->{strip_term}++;
585 40         58 $buffer->{in_message} = 0;
586 40 100       87 if (defined $message) {
587 14 100       37 if ($self->{+COMPRESSION}) {
588 4         4 my $compressed = $message;
589 4         12 my $decompressed = $self->_decompress($compressed);
590             return (message => $decompressed, compressed => $compressed)
591 4 100       16 if $self->{+KEEP_COMPRESSED};
592 2         58 return (message => $decompressed);
593             }
594 10         57 return (message => $message);
595             }
596             }
597              
598 280 100       433 if ($buffer->{strip_term}) {
599 38   50     79 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
600              
601 38 50       85 $self->throw_invalid("No message terminator") unless $term eq $postfix;
602 38         57 $buffer->{strip_term}--;
603             }
604              
605 280 100       423 if ($buffer->{in_burst}) {
606 68   50     160 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
607              
608 68 100       161 if ($peek eq $key) {
609 40         171 $self->_get_from_buffer(1); # Strip the key
610 40         58 $buffer->{in_message} = 1;
611 40         71 $buffer->{in_burst} = 0;
612 40         84 next;
613             }
614              
615 28   100     76 $buffer->{burst} //= '';
616 28         60 my ($burst_data, $term);
617 28         507 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
618 28         77 $buffer->{burst} .= $burst_data;
619              
620 28 100       49 if ($term) {
621 20         40 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
622 20         36 $buffer->{in_burst} = 0;
623 20         29 my $compressed = delete $buffer->{burst};
624 20 100       54 if ($self->{+COMPRESSION}) {
625 8         20 my $decompressed = $self->_decompress($compressed);
626             return (burst => $decompressed, compressed => $compressed)
627 8 100       24 if $self->{+KEEP_COMPRESSED};
628 6         31 return (burst => $decompressed);
629             }
630 12         56 return (burst => $compressed);
631             }
632             else {
633 8         28 $self->{+IN_BUFFER_SIZE} = 0;
634             }
635              
636 8 50       76 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
637             }
638              
639 212 100 100     697 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
640 96 100 66     275 return (peek => $peek) if $peek && !$self->{+EOF};
641              
642 92 100       311 return unless $self->{+EOF};
643              
644             # Do at least one more iteration after EOF
645 54 100       211 return if $buffer->{+EOF}++;
646              
647             # But do not try to split the empty buffer
648 30         53 next;
649             }
650              
651             # Look for the start of a burst, anything before a burst is line data
652 116         140 my $linedata;
653 116         5727 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
654 116 50       428 $buffer->{lines} .= $linedata if defined $linedata;
655              
656 116 100       405 if ($buffer->{in_burst}) {
657 68         204 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
658             }
659             else {
660 48         79 $self->{+IN_BUFFER_SIZE} = 0;
661             }
662             }
663             }
664              
665             # This is a heavily modified version of a pattern suggested on stack-overflow
666             # and also used in Win32::PowerShell::IPC.
667             my $peek_named_pipe;
668             sub _win32_pipe_ready {
669 0     0   0 my $self = shift;
670 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
671              
672 0         0 my $buf = "";
673 0         0 my $buflen = 0;
674              
675 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
676             || die "Can't load PeekNamedPipe from kernel32.dll";
677              
678 0         0 my $got = pack('L', 0);
679 0         0 my $avail = pack('L', 0);
680 0         0 my $remain = pack('L', 0);
681              
682 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
683              
684 0 0       0 $self->{+EOF} = 1 if $ret == 0;
685              
686 0         0 return unpack('L', $avail);
687             }
688              
689             my $set_named_pipe_handle_state;
690             sub _win32_set_pipe_state {
691 0     0   0 my $self = shift;
692 0         0 my ($state) = @_;
693 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
694              
695 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
696             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
697              
698             # Block or non-block?
699 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
700              
701 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
702              
703 0         0 return $ret;
704             }
705              
706             sub read_blocking {
707 74     74 0 221 my $self = shift;
708 74 50       282 my $rh = $self->{+RH} or croak "Not a reader";
709              
710 74 50       424 ($self->{+READ_BLOCKING}) = @_ if @_;
711              
712 74         154 unless (IS_WIN32) {
713 74         1112 $rh->blocking(@_);
714             }
715              
716 74         193 return $self->{+READ_BLOCKING};
717             }
718              
719             sub write_blocking {
720 24     24 0 135 my $self = shift;
721 24 50       93 my $wh = $self->{+WH} or croak "Not a writer";
722              
723 24 50       60 return $self->{+WRITE_BLOCKING} unless @_;
724              
725 24         44 my ($val) = @_;
726 24         42 $self->{+WRITE_BLOCKING} = $val;
727              
728 24         26 if (IS_WIN32) {
729             $self->_win32_set_pipe_state(@_) if @_;
730             }
731             else {
732 24         131 my $flags = fcntl($wh, &Fcntl::F_GETFL, 0); # Get the current flags
733 24 50       53 die $! unless defined $flags;
734 24 100       48 if ($val) { $flags &= ~&Fcntl::O_NONBLOCK } # Clear O_NONBLOCK
  5         193  
735 19         57 else { $flags |= &Fcntl::O_NONBLOCK } # Set O_NONBLOCK
736 24 50       207 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!;
737             }
738              
739 24         77 return $self->{+WRITE_BLOCKING};
740             }
741              
742             sub blocking {
743 37     37 1 4992408 my $self = shift;
744              
745 37 100 100     783 if ($self->{+RH} && !$self->{+WH}) {
    100 66        
746 15         176 return $self->read_blocking(@_);
747             }
748             elsif ($self->{+WH} && !$self->{+RH}) {
749 21         102 return $self->write_blocking(@_);
750             }
751              
752 1         3 my $r = $self->read_blocking(@_);
753 1         4 my $w = $self->write_blocking(@_);
754              
755 1 50 33     3 return 1 if $r && $w;
756 1 50 33     13 return 0 if !$r && !$w;
757 0         0 return undef;
758             }
759              
760             sub size {
761 2     2 1 5472 my $self = shift;
762 2 50       6 return unless defined &Fcntl::F_GETPIPE_SZ;
763 2   33     26 my $fh = $self->{+WH} // $self->{+RH};
764 2         11 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
765             }
766              
767             sub resize {
768 10     10 1 22472 my $self = shift;
769 10         169 my ($size) = @_;
770              
771 10 50       177 return unless defined &Fcntl::F_SETPIPE_SZ;
772 10   66     203 my $fh = $self->{+WH} // $self->{+RH};
773              
774             # Force numeric: fcntl(F_SETPIPE_SZ, $string) is interpreted as
775             # buffer-mode and silently fails with EINVAL.
776 10         226 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size + 0);
777             }
778              
779             my $ONE_MB = 1 * 1024 * 1024;
780              
781             sub max_size {
782 3 50   3 1 2142 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
783              
784 3 50       88 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
785 3         68 chomp(my $val = <$max>);
786 3         17 close($max);
787             # Force numeric. <$max> returns a string; passing it to
788             # fcntl(F_SETPIPE_SZ) directly triggers the same EINVAL bug
789             # resize() guards against. Numify here so any caller of
790             # max_size() that hands the result to fcntl gets an int.
791 3   33     23 return ($val + 0) || $ONE_MB;
792             }
793              
794             sub resize_or_max {
795 1     1 1 8 my $self = shift;
796 1         2 my ($size) = @_;
797 1         5 $size = min($size, $self->max_size);
798 1         4 $self->resize($size);
799             }
800              
801             sub is_reader {
802 9     9 1 1278 my $self = shift;
803 9 50 33     54 return 1 if $self->{+RH} && !$self->{+WH};
804 0         0 return undef;
805             }
806              
807             sub is_writer {
808 8     8 1 9 my $self = shift;
809 8 50 33     42 return 1 if $self->{+WH} && !$self->{+RH};
810 0         0 return undef;
811             }
812              
813             sub clone_writer {
814 6     6 1 19 my $self = shift;
815 6         7 my $class = blessed($self);
816 6 50       55 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
817 6         14 return bless({WH() => $fh}, $class);
818             }
819              
820             sub clone_reader {
821 0     0 1 0 my $self = shift;
822 0         0 my $class = blessed($self);
823 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
824 0         0 return bless({RH() => $fh}, $class);
825             }
826              
827             sub writer {
828 0     0 1 0 my $self = shift;
829              
830 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
831              
832 0 0       0 return 1 unless $self->{+RH};
833              
834 0         0 close(delete $self->{+RH});
835 0         0 return 1;
836             }
837              
838             sub reader {
839 3     3 1 18 my $self = shift;
840              
841 3 50       8 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
842              
843 3 50       10 return 1 unless $self->{+WH};
844              
845 3         9 $self->_flush_before_close;
846 3         25 close(delete $self->{+WH});
847 3         13 return 1;
848             }
849              
850             # Buffered bursts from non-blocking writes must not be silently dropped when
851             # the write handle goes away (DESTROY would also croak trying to flush them
852             # without a handle).
853             sub _flush_before_close {
854 45     45   64 my $self = shift;
855 45 50 33     520 return if $self->{+HIT_EPIPE} || $self->{+INVALID_STATE};
856 45 100       499 $self->flush(blocking => 1) if $self->pending_output;
857             }
858              
859             sub close {
860 44     44 1 39835 my $self = shift;
861 44 100       165 if ($self->{+WH}) {
862 42         200 $self->_flush_before_close;
863 42         1062 close(delete $self->{+WH});
864             }
865 44 100       959 close(delete $self->{+RH}) if $self->{+RH};
866 44         105 return;
867             }
868              
869             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
870             my $dsize = PIPE_BUF - $psize;
871              
872             sub delimiter_size {
873 109 50   109 0 3422 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
874 109   100     3146 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
875             }
876              
877             # The typical fits_in_burst() then write_burst() sequence would compress the
878             # same payload twice; remember the last result.
879             sub _compress_cached {
880 13     13   21 my ($self, $data) = @_;
881              
882 13         21 my $cache = $self->{_compress_cache};
883 13 100 100     43 return $cache->[1] if $cache && $cache->[0] eq $data;
884              
885 12         25 my $out = $self->_compress($data);
886 12         55 $self->{_compress_cache} = [$data, $out];
887 12         29 return $out;
888             }
889              
890             sub fits_in_burst {
891 13     13 1 81 my $self = shift;
892 13         27 my ($data) = @_;
893              
894 13 100       35 $data = $self->_compress_cached($data) if $self->{+COMPRESSION};
895              
896 13   100     72 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
897 13 100       87 return undef unless $size <= PIPE_BUF;
898              
899 11         26 return $size;
900             }
901              
902             sub write_burst {
903 19765     19765 1 212824 my $self = shift;
904 19765         19511 my ($data) = @_;
905              
906 19765 100       23420 $data = $self->_compress_cached($data) if $self->{+COMPRESSION};
907              
908 19765   100     23938 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
909 19765 100       47758 return undef unless $size <= PIPE_BUF;
910              
911 19763   100     16404 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  19763         31159  
912 19763         28236 $self->flush();
913              
914 19761         22723 return 1;
915             }
916              
917             sub DESTROY {
918 376     376   18398085 my $self = shift;
919 376         12899 local ($., $@, $!, $^E, $?);
920 376 100 100     4810 return if $self->{+HIT_EPIPE} || $self->{+INVALID_STATE};
921 352 100 100     27888 $self->flush(blocking => 1) if $self->{+WH} && $self->pending_output;
922             }
923              
924             sub pending_output {
925 19941     19941 1 41441 my $self = shift;
926 19941 100       28840 my $buffer = $self->{+OUT_BUFFER} or return 0;
927 19846 100       42942 return 0 unless @$buffer;
928 9         54 return 1;
929             }
930              
931             sub flush {
932 220126     220126 1 295978 my $self = shift;
933 220126         362066 my %params = @_;
934 220126   100     942212 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
935              
936 220126   50     415463 my $buffer = $self->{+OUT_BUFFER} // return;
937              
938 220126         408925 while (@$buffer) {
939 820245         1221744 my $set = $buffer->[0];
940 820245         1802518 my $got = $self->_write_burst(@$set);
941              
942 820239 100 100     1675141 return unless $blocking || defined $got;
943 819985 100       1363758 next unless defined $got;
944              
945 819983         2535055 shift @$buffer;
946             }
947              
948 219866         471041 return;
949             }
950              
951             sub _write_burst {
952 820243     820243   997763 my $self = shift;
953 820243         1415451 my ($data, $size) = @_;
954              
955 820243 50       1702581 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
956              
957 820243 100       1474572 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
958              
959 820241   100     2328479 my $prefix = $self->{+BURST_PREFIX} // '';
960 820241   100     1908383 my $postfix = $self->{+BURST_POSTFIX} // '';
961              
962 820241 100 66     2251533 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
963              
964 820241         924944 my $wrote;
965             SWRITE: {
966 820241         930748 $wrote = syswrite($wh, $data, $size);
  820241         1005200438  
967              
968             # $! is only meaningful when syswrite fails.
969 820241 100       2515785 unless (defined $wrote) {
970 258 100 100     1337 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
971 2         4 $self->{+HIT_EPIPE} = 1;
972 2         4 delete $self->{+OUT_BUFFER};
973 2         263 croak "Disconnected pipe";
974             }
975 256 100 100     1028 return undef if $NONBLOCK_ERRNO{0 + $!} || (IS_WIN32 && $! == 28); # NON-BLOCKING
976 2 50       7 redo SWRITE if $RETRY_ERRNO{0 + $!};
977 2         10 $self->throw_invalid("syswrite failed: $!");
978             }
979              
980 819983 50       1887475 redo SWRITE unless $wrote;
981 819983 50       1763261 last SWRITE if $wrote == $size;
982 0         0 die "partial write: $wrote vs $size: $!";
983             }
984              
985 819983         1952959 return $wrote;
986             }
987              
988             sub _adjusted_dsize {
989 94     94   741 my $self = shift;
990              
991 94 50       1395 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
992              
993 94   100     4222 my $message_key = $self->{+MESSAGE_KEY} // '';
994 94   100     2889 my $prefix = $self->{+BURST_PREFIX} // '';
995 94   100     2800 my $postfix = $self->{+BURST_POSTFIX} // '';
996              
997 94         3265 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
998 94         3034 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
999             }
1000              
1001             sub write_message {
1002 200124     200124 1 4746974 my $self = shift;
1003 200124         363917 my ($data) = @_;
1004              
1005 200124 100       510302 $data = $self->_compress($data) if $self->{+COMPRESSION};
1006              
1007 200124         290836 my $tid = _get_tid();
1008 200124   100     599492 my $message_key = $self->{+MESSAGE_KEY} // '';
1009 200124   66     449361 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
1010 200124         641548 my $dtotal = bytes::length($data);
1011              
1012 200124         988541 my $parts = int($dtotal / $adjusted_dsize);
1013 200124 50       469966 $parts++ if $dtotal % $adjusted_dsize;
1014              
1015 200124         294987 my $id = $parts - 1;
1016              
1017             # Unwinding the loop for a 1-part message for micro-optimization
1018 200124 100       425773 if ($parts == 1) {
1019 77         131 my $bytes = $data;
1020 77         139 my $size = $dtotal;
1021 77         990 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
1022              
1023 77 100 66     860 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
1024              
1025 77   100     530 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  77         550  
1026             }
1027             else {
1028 200047         450297 for (my $part = 0; $part < $parts; $part++) {
1029 800149         1479489 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
1030 800149         4868669 my $size = bytes::length($bytes);
1031              
1032 800149         5056613 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
1033              
1034 800149   100     1347130 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
1035 800149   100     2580008 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  800149         3174029  
1036             }
1037             }
1038              
1039 200124         534418 $self->flush();
1040 200124         917774 return $parts;
1041             }
1042              
1043             sub read_message {
1044 700180     700180 1 565339149 my $self = shift;
1045 700180         1067013 my %params = @_;
1046              
1047 700180         1324294 my ($id, $out) = $self->_extract_message(%params);
1048              
1049 700179 100       1083702 return unless defined $id;
1050              
1051 700173 100       2195711 return $out unless $self->{+COMPRESSION};
1052              
1053 31 100       60 if ($params{debug}) {
1054 4         4 my $compressed = $out->{message};
1055 4         12 $out->{message} = $self->_decompress($compressed);
1056 4 100       12 $out->{compressed} = $compressed if $self->{+KEEP_COMPRESSED};
1057 4         42 return $out;
1058             }
1059              
1060 27         30 my $compressed = $out;
1061 27         50 my $decompressed = $self->_decompress($compressed);
1062              
1063 25 100 100     65 return ($decompressed, $compressed) if $self->{+KEEP_COMPRESSED} && wantarray;
1064 23         126 return $decompressed;
1065             }
1066              
1067             sub _extract_message {
1068 700220     700220   682996 my $self = shift;
1069 700220         827758 my %params = @_;
1070              
1071 700220   100     1522817 my $state = $self->{+STATE} //= {};
1072              
1073 700220         704137 while (1) {
1074 2800354 50       3905235 unless ($state->{key}) {
1075 2800354         3700879 my $key_bytes = $self->_get_from_buffer($psize);
1076 2800354 100 66     6099299 unless (defined($key_bytes) && length($key_bytes)) {
1077             # Leftover bytes smaller than a header at EOF mean a writer
1078             # died mid-message; a plain return here would look like a
1079             # clean EOF and silently drop data.
1080             $self->throw_invalid("EOF inside message header (truncated message)")
1081 7 100 100     43 if $self->{+EOF} && $self->{+IN_BUFFER_SIZE};
1082 6         51 return;
1083             }
1084              
1085 2800347         2657496 my %key;
1086 2800347         6884382 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
1087 2800347         3920449 $state->{key} = \%key;
1088             }
1089              
1090 2800347         2903757 my $key = $state->{key};
1091              
1092 2800347   50     3622874 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
1093              
1094 2800347         3089068 my $id = $key->{id};
1095 2800347         2793039 my $tag = join ':' => @{$key}{qw/pid tid/};
  2800347         5076955  
1096 2800347   100     2741469 push @{$state->{parts}->{$tag} //= []} => $id;
  2800347         6137511  
1097 2800347 100       13990741 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
1098              
1099 2800347         3257129 delete $state->{key};
1100              
1101 2800347 100       3738352 unless ($id == 0) {
1102 2100160 100       2730315 return ($id, undef) if $params{one_part_only};
1103 2100134         3560009 next;
1104             }
1105              
1106 700187         1679886 my $message = delete $state->{buffers}->{$tag};
1107 700187         1000599 my $parts = delete $state->{parts}->{$tag};
1108              
1109 700187 100       3876700 return ($id, $message) unless $params{debug};
1110              
1111             return (
1112             $id,
1113             {
1114             message => $message,
1115             parts => $parts,
1116             pid => $key->{pid},
1117             tid => $key->{tid},
1118             },
1119 13         545 );
1120             }
1121             }
1122              
1123             1;
1124              
1125             __END__