File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 630 690 91.3
branch 257 348 73.8
condition 188 291 64.6
subroutine 98 103 95.1
pod 41 45 91.1
total 1214 1477 82.1


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 75     75   11772590 use strict;
  75         113  
  75         2186  
3 75     75   423 use warnings;
  75         119  
  75         3492  
4              
5             our $VERSION = '0.032';
6              
7 75     75   87260 use IO();
  75         58076  
  75         1806  
8 75     75   31979 use IO::Handle();
  75         308122  
  75         1573  
9 75     75   462 use Fcntl();
  75         107  
  75         721  
10 75     75   26247 use bytes();
  75         28605  
  75         4503  
11              
12             BEGIN {
13 75 50   75   145 if (eval { require IO::Select; 1 }) {
  75         29060  
  75         97984  
14 75         1978 *HAVE_IO_SELECT = sub() { 1 };
15             }
16             else {
17 0         0 *HAVE_IO_SELECT = sub() { 0 };
18             }
19             }
20              
21 75     75   439 use Carp qw/croak confess/;
  75         76  
  75         3856  
22 75     75   316 use Config qw/%Config/;
  75         102  
  75         2202  
23 75     75   581 use List::Util qw/min/;
  75         129  
  75         3712  
24 75     75   255 use Scalar::Util qw/blessed/;
  75         337  
  75         3191  
25              
26 75     75   29562 use Errno qw/EINTR EAGAIN EPIPE/;
  75         92056  
  75         11000  
27             my (%RETRY_ERRNO, %NONBLOCK_ERRNO);
28             BEGIN {
29 75     75   306 %RETRY_ERRNO = (EINTR() => 1);
30 75 50       1305 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
31              
32             # EWOULDBLOCK == EAGAIN on most platforms, but POSIX allows them to differ.
33 75         172 %NONBLOCK_ERRNO = (EAGAIN() => 1);
34 75 50       32918 $NONBLOCK_ERRNO{Errno->EWOULDBLOCK} = 1 if Errno->can('EWOULDBLOCK');
35             }
36              
37             BEGIN {
38             # POSIX says writes of 512 or less are atomic, but some platforms allow for
39             # larger ones.
40 75     75   32252 require POSIX;
41 75 50 33     409677 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  75         457  
42 75         394 *PIPE_BUF = \&POSIX::PIPE_BUF;
43             }
44             else {
45 0         0 *PIPE_BUF = sub() { 512 };
46             }
47              
48 75 50 33     349 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  75         191  
49 75         149 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
50             }
51             else {
52             # POSIX guarantees SSIZE_MAX is at least 32767 (_POSIX_SSIZE_MAX).
53 0         0 *SSIZE_MAX = sub() { 32767 };
54             }
55              
56             {
57             # Using the default pipe size as a read size is significantly faster
58             # than a larger value on my test machine.
59 75         98 my $read_size = min(SSIZE_MAX(), 65_536);
  75         301  
60 75         949 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
61             }
62              
63 75         129 my $can_thread = 1;
64 75   33     417 $can_thread &&= $] >= 5.008001;
65 75   33     815 $can_thread &&= $Config{'useithreads'};
66              
67             # Threads are broken on perl 5.10.0 built with gcc 4.8+
68 75 0 33     247 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
69 0         0 my @parts = split /\./, $Config{'gccversion'};
70 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
71             }
72              
73 75   33     177 $can_thread &&= !$INC{'Devel/Cover.pm'};
74              
75 75 50       264 if (!$can_thread) {
    0          
76 75         109 *_get_tid = sub() { 0 };
77             }
78             elsif ($INC{'threads.pm'}) {
79 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
80             }
81             else {
82 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
83             }
84              
85 75 50       229 if ($^O eq 'MSWin32') {
86 0         0 local $@;
87 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
88 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
89 0         0 *IS_WIN32 = sub() { 1 };
90             }
91             else {
92 75         1991 *IS_WIN32 = sub() { 0 };
93             }
94             }
95              
96 75     75   406 use constant READ_SIZE => 'read_size';
  75         96  
  75         5343  
97 75     75   304 use constant RH => 'rh';
  75         88  
  75         2423  
98 75     75   261 use constant WH => 'wh';
  75         85  
  75         2003  
99 75     75   237 use constant EOF => 'eof';
  75         87  
  75         2255  
100 75     75   275 use constant STATE => 'state';
  75         109  
  75         2098  
101 75     75   233 use constant OUT_BUFFER => 'out_buffer';
  75         78  
  75         2083  
102 75     75   247 use constant IN_BUFFER => 'in_buffer';
  75         74  
  75         2327  
103 75     75   226 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  75         89  
  75         2054  
104 75     75   468 use constant READ_BLOCKING => 'read_blocking';
  75         451  
  75         2599  
105 75     75   280 use constant WRITE_BLOCKING => 'write_blocking';
  75         95  
  75         2515  
106 75     75   238 use constant BURST_PREFIX => 'burst_prefix';
  75         77  
  75         2326  
107 75     75   396 use constant BURST_POSTFIX => 'burst_postfix';
  75         76  
  75         2795  
108 75     75   346 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  75         90  
  75         2432  
109 75     75   283 use constant MESSAGE_KEY => 'message_key';
  75         101  
  75         2218  
110 75     75   238 use constant MIXED_BUFFER => 'mixed_buffer';
  75         95  
  75         2068  
111 75     75   361 use constant DELIMITER_SIZE => 'delimiter_size';
  75         213  
  75         2807  
112 75     75   245 use constant INVALID_STATE => 'invalid_state';
  75         76  
  75         2435  
113 75     75   251 use constant HIT_EPIPE => 'hit_epipe';
  75         93  
  75         2961  
114 75     75   278 use constant USE_IO_SELECT => 'use_io_select';
  75         104  
  75         2587  
115 75     75   248 use constant COMPRESSION => 'compression';
  75         109  
  75         2835  
116 75     75   247 use constant COMPRESSION_LEVEL => 'compression_level';
  75         98  
  75         2101  
117 75     75   315 use constant COMPRESSION_DICTIONARY => 'compression_dictionary';
  75         202  
  75         2335  
118 75     75   249 use constant COMPRESSION_DICTIONARY_FILE => 'compression_dictionary_file';
  75         107  
  75         2297  
119 75     75   243 use constant KEEP_COMPRESSED => 'keep_compressed';
  75         78  
  75         437525  
120              
121 76     76 1 2944 sub wh { shift->{+WH} }
122 12     12 1 17950 sub rh { shift->{+RH} }
123              
124             sub throw_invalid {
125 39     39 0 86 my $self = shift;
126 39 50 66     289 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
127 39         15669 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
128             }
129              
130             sub read_size {
131 0     0 1 0 my $self = shift;
132 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
133 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
134             }
135              
136             sub use_io_select {
137 168534     168534 1 168013 my $self = shift;
138 168534 100       236518 if (@_) {
139 27 50 100     114 croak "IO::Select is not installed, cannot enable use_io_select" if $_[0] && !HAVE_IO_SELECT;
140 27 100       124 $self->{+USE_IO_SELECT} = $_[0] ? 1 : 0;
141 27 100       73 delete $self->{_select} unless $_[0];
142             }
143 168534         148407 return 0 unless HAVE_IO_SELECT;
144 168534         202687 my $val = $self->{+USE_IO_SELECT};
145 168534 100       345184 return defined($val) ? ($val ? 1 : 0) : IS_WIN32 ? 0 : 1;
    100          
146             }
147              
148 19     19 1 95 sub compression { $_[0]->{+COMPRESSION} }
149 6     6 1 34 sub compression_level { $_[0]->{+COMPRESSION_LEVEL} }
150 10     10 1 48 sub compression_dictionary { $_[0]->{+COMPRESSION_DICTIONARY} }
151 8     8 1 34 sub compression_dictionary_file { $_[0]->{+COMPRESSION_DICTIONARY_FILE} }
152 10 100   10 1 44 sub keep_compressed { $_[0]->{+KEEP_COMPRESSED} ? 1 : 0 }
153              
154             sub fill_buffer {
155 168730     168730 1 152488 my $self = shift;
156              
157 168730 50       281816 $self->throw_invalid() if $self->{+INVALID_STATE};
158              
159 168730 50       294838 my $rh = $self->{+RH} or die "Not a read handle";
160              
161 168730 100       263443 return 0 if $self->{+EOF};
162              
163 168507   100     254534 $self->{+IN_BUFFER_SIZE} //= 0;
164              
165 168507   50     377708 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
166              
167 168507         290224 my $use_select = $self->use_io_select;
168              
169 168507 100       218511 if ($use_select) {
170 144341   66     229979 my $sel = $self->{_select} //= IO::Select->new($rh);
171 144341   100     310253 my $blocking = $self->{+READ_BLOCKING} // 1;
172 144341 100       457820 my @ready = $sel->can_read($blocking ? undef : 0);
173 144341 100       4030027 return 0 unless @ready;
174             }
175 0         0 elsif (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
176             $to_read = min($self->_win32_pipe_ready(), $to_read);
177             }
178              
179 168317 50       251739 return 0 unless $to_read;
180              
181 168317         182701 while (1) {
182 168317         209289 my $rbuff = '';
183 168317         7321750 my $got = sysread($rh, $rbuff, $to_read);
184 168317 100       399578 unless(defined $got) {
185 93 50       1421 return 0 if $NONBLOCK_ERRNO{0 + $!}; # NON-BLOCKING
186 0 0       0 if ($RETRY_ERRNO{0 + $!}) {
187 0 0       0 next unless $use_select; # retry on EINTR in fallback mode
188 0         0 return 0; # IO::Select handles EINTR
189             }
190 0         0 $self->throw_invalid("$!");
191             }
192              
193 168224 100       220456 if ($got) {
194 168173         2199970 $self->{+IN_BUFFER} .= $rbuff;
195 168173         232317 $self->{+IN_BUFFER_SIZE} += $got;
196 168173         321523 return $got;
197             }
198             else {
199 51         397 $self->{+EOF} = 1;
200 51         178 return 0;
201             }
202             }
203              
204 0         0 return 0;
205             }
206              
207             # Must forward %params: callers rely on eof_invalid to turn a truncated
208             # message at EOF into an exception instead of a clean-looking EOF.
209 5600788     5600788   5573387 sub _get_from_buffer { my $self = shift; $self->_from_buffer(@_, remove => 1) }
  5600788         7856107  
210 71     71   194 sub _peek_from_buffer { shift->_from_buffer(@_) }
211              
212             sub _from_buffer {
213 5600859     5600859   5344245 my $self = shift;
214 5600859         8933024 my ($size, %params) = @_;
215              
216 5600859 100 100     13005382 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
217 168105         318486 $self->fill_buffer;
218 168105 100       298098 unless($self->{+IN_BUFFER_SIZE} >= $size) {
219 10 100 100     114 return unless $params{eof_invalid} && $self->{+EOF};
220 2         6 $self->throw_invalid($params{eof_invalid});
221             }
222             }
223              
224 5600849         5253088 my $out;
225              
226 5600849 100       6698435 if ($params{remove}) {
227 5600778         5416940 $self->{+IN_BUFFER_SIZE} -= $size;
228 5600778         10408924 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
229             }
230             else {
231 71         190 $out = substr($self->{+IN_BUFFER}, 0, $size);
232             }
233              
234 5600849         10456623 return $out;
235             }
236              
237             sub _has_dict {
238             return defined($_[0]->{+COMPRESSION_DICTIONARY})
239 100   100 100   420 || defined($_[0]->{+COMPRESSION_DICTIONARY_FILE});
240             }
241              
242             # NOTE: raw zstd dictionaries do not embed a dict-ID, so a mismatched peer
243             # dict will silently decode to garbage rather than fail. Both ends must agree
244             # on byte-identical dictionary content.
245             sub _build_cdict {
246 10     10   16 my $self = shift;
247 10   50     22 my $level = $self->{+COMPRESSION_LEVEL} // 3;
248 10         30 require Compress::Zstd::CompressionDictionary;
249 10 100       22 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
250 4         22 return Compress::Zstd::CompressionDictionary->new_from_file($path, $level);
251             }
252 6         96 return Compress::Zstd::CompressionDictionary->new($self->{+COMPRESSION_DICTIONARY}, $level);
253             }
254              
255             sub _build_ddict {
256 10     10   12 my $self = shift;
257 10         26 require Compress::Zstd::DecompressionDictionary;
258 10 100       20 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
259 4         20 return Compress::Zstd::DecompressionDictionary->new_from_file($path);
260             }
261 6         116 return Compress::Zstd::DecompressionDictionary->new($self->{+COMPRESSION_DICTIONARY});
262             }
263              
264             sub _compress {
265 49     49   118 my ($self, $data) = @_;
266 49 100       136 if ($self->_has_dict) {
267 10         50 require Compress::Zstd::CompressionContext;
268 10   66     172 my $ctx = $self->{_compression_ctx} //= Compress::Zstd::CompressionContext->new;
269 10   33     34 my $cdict = $self->{_compression_cdict} //= $self->_build_cdict;
270 10         762 return $ctx->compress_using_dict($data, $cdict);
271             }
272 39   100     3094 return Compress::Zstd::compress($data, $self->{+COMPRESSION_LEVEL} // 3);
273             }
274              
275             sub _decompress {
276 51     51   1477 my ($self, $data) = @_;
277 51         75 my $out;
278 51 100       73 if ($self->_has_dict) {
279 12         52 require Compress::Zstd::DecompressionContext;
280 12   66     210 my $ctx = $self->{_decompression_ctx} //= Compress::Zstd::DecompressionContext->new;
281 12   66     36 my $ddict = $self->{_decompression_ddict} //= $self->_build_ddict;
282 12         470 $out = $ctx->decompress_using_dict($data, $ddict);
283             }
284             else {
285 39         1029 $out = Compress::Zstd::decompress($data);
286             }
287 51 100       132 $self->throw_invalid("zstd decompression failed") unless defined $out;
288 45         91 return $out;
289             }
290              
291             sub eof {
292 530     530 1 8212852 my $self = shift;
293              
294 530 100       1257 $self->throw_invalid() if $self->{+INVALID_STATE};
295              
296 516 100       1602 return 0 if $self->fill_buffer;
297 444 100       1492 return 0 unless $self->{+EOF};
298 213 100       412 return 0 if $self->{+IN_BUFFER_SIZE};
299              
300 151 50       236 if (my $buffer = $self->{+MIXED_BUFFER}) {
301 151 100 100     650 return 0 if defined($buffer->{lines}) && length($buffer->{lines});
302 102 100 100     342 return 0 if defined($buffer->{burst}) && length($buffer->{burst});
303             }
304              
305 101         341 return 1;
306             }
307              
308             sub _fh_mode {
309 8     8   1691 my $self = shift;
310 8         10 my ($fh) = @_;
311              
312 8   50     76 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
313 8 100       27 return '<&' if $mode == Fcntl::O_RDONLY();
314 4 50       12 return '>&' if $mode == Fcntl::O_WRONLY();
315 0         0 return undef;
316             }
317              
318             my %MODE_TO_DIR = (
319             '<&' => RH(),
320             '<&=' => RH(),
321             '>&' => WH(),
322             '>&=' => WH(),
323             );
324             sub _mode_to_dir {
325 24     24   27 my $self = shift;
326 24         32 my ($mode) = @_;
327 24         56 return $MODE_TO_DIR{$mode};
328             }
329              
330             sub _check_params {
331 233     233   645 my ($class, %params) = @_;
332             croak "IO::Select is not installed, cannot enable use_io_select"
333 233 50 100     1560 if $params{+USE_IO_SELECT} && !HAVE_IO_SELECT;
334              
335 233 100       681 if (defined(my $algo = $params{+COMPRESSION})) {
336 41 100       616 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
337             croak "compression => 'zstd' requires Compress::Zstd"
338 37 50       39 unless eval { require Compress::Zstd; 1 };
  37         181  
  37         111  
339             }
340              
341             croak "compression_dictionary and compression_dictionary_file are mutually exclusive"
342 229 100 100     1267 if defined($params{+COMPRESSION_DICTIONARY}) && defined($params{+COMPRESSION_DICTIONARY_FILE});
343              
344             croak "compression_dictionary requires compression to be enabled"
345             if (defined($params{+COMPRESSION_DICTIONARY}) || defined($params{+COMPRESSION_DICTIONARY_FILE}))
346 227 100 100     2344 && !defined($params{+COMPRESSION});
      100        
347             }
348              
349             sub read_fifo {
350 22     22 1 48445317 my $class = shift;
351 22         93 my ($fifo, %params) = @_;
352              
353 22         228 $class->_check_params(%params);
354 22 50       494 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
355              
356 22 50       1059 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
357 22         91 binmode($fh);
358              
359 22         262 return $class->_new_from_params(\%params, RH() => $fh);
360             }
361              
362             sub write_fifo {
363 13     13 1 203942 my $class = shift;
364 13         411 my ($fifo, %params) = @_;
365              
366 13         1124 $class->_check_params(%params);
367 13 50       1707 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
368              
369 13 50       3142 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
370 13         1375 binmode($fh);
371              
372 13         717 return $class->_new_from_params(\%params, WH() => $fh);
373             }
374              
375             sub from_fh {
376 14     14 1 338645 my $class = shift;
377              
378             # Mode is optional: from_fh($fh, %params) or from_fh($mode, $fh, %params).
379 14         19 my $mode;
380 14 100 66     82 $mode = shift if @_ && !ref($_[0]) && $MODE_TO_DIR{$_[0]};
      66        
381 14         18 my $ifh = shift;
382 14         43 my %params = @_;
383              
384 14         36 $class->_check_params(%params);
385              
386 13 50       80 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
387              
388 13   33     44 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
389 13   33     33 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
390              
391 13 50       166 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
392 13         22 binmode($fh);
393              
394 13         43 return $class->_new_from_params(\%params, $dir => $fh);
395             }
396              
397             sub from_fd {
398 12     12 1 3981 my $class = shift;
399 12         25 my ($mode, $fd, %params) = @_;
400              
401 12         27 $class->_check_params(%params);
402              
403 11   33     18 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
404 11 50       145 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
405              
406 11 50       37 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
407              
408 11         17 binmode($fh);
409 11         42 return $class->_new_from_params(\%params, $dir => $fh);
410             }
411              
412             sub _new_from_params {
413 383     383   1668 my ($class, $params, @handles) = @_;
414              
415 383         689 my $mixed = delete $params->{mixed_data_mode};
416              
417 383         1452 my $self = bless({%$params, @handles}, $class);
418 383 100       980 $self->set_mixed_data_mode() if $mixed;
419              
420 383         1182 return $self;
421             }
422              
423             sub new {
424 12     12 1 164815 my $class = shift;
425 12         30 my (%params) = @_;
426              
427 12         40 $class->_check_params(%params);
428              
429 4         5 my ($rh, $wh);
430 4 50       139 pipe($rh, $wh) or die "Could not create pipe: $!";
431              
432 4         27 binmode($wh);
433 4         6 binmode($rh);
434              
435 4         23 return $class->_new_from_params(\%params, RH() => $rh, WH() => $wh);
436             }
437              
438             sub pair {
439 160     160 1 54172084 my $class = shift;
440 160         736 my (%params) = @_;
441              
442 160         897 $class->_check_params(%params);
443              
444 160         254 my ($rh, $wh);
445 160 50       5553 pipe($rh, $wh) or die "Could not create pipe: $!";
446              
447 160         467 binmode($wh);
448 160         231 binmode($rh);
449              
450 160         1345 my $r = $class->_new_from_params({%params}, RH() => $rh);
451 160         741 my $w = $class->_new_from_params({%params}, WH() => $wh);
452              
453 160         646 return ($r, $w);
454             }
455              
456             sub set_mixed_data_mode {
457 119     119 1 190 my $self = shift;
458              
459 119 50       247 $self->throw_invalid() if $self->{+INVALID_STATE};
460              
461 119 100       389 $self->read_blocking(0) if $self->{+RH};
462              
463 119   50     430 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
464 119   50     347 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
465 119   50     334 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
466             }
467              
468             sub set_compression {
469 16     16 1 107 my $self = shift;
470 16         64 my ($algo, $level) = @_;
471              
472 16 100       46 if (!defined $algo) {
473 6         10 delete $self->{+COMPRESSION};
474 6         8 delete $self->{+COMPRESSION_LEVEL};
475 6         6 delete $self->{_compression_ctx};
476 6         8 delete $self->{_compression_cdict};
477 6         4 delete $self->{_decompression_ctx};
478 6         8 delete $self->{_decompression_ddict};
479 6         8 delete $self->{_compress_cache};
480 6         10 return;
481             }
482              
483 10 100       269 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
484             croak "compression => 'zstd' requires Compress::Zstd"
485 8 50       35 unless eval { require Compress::Zstd; 1 };
  8         48  
  8         12  
486              
487 8         26 $self->{+COMPRESSION} = $algo;
488             # Omitted $level preserves the previously set level; pass undef to
489             # set_compression(undef) first if a full reset back to the default is desired.
490 8 100       24 $self->{+COMPRESSION_LEVEL} = $level if defined $level;
491              
492             # Cached objects depend on level / dict; force rebuild.
493 8         11 delete $self->{_compression_ctx};
494 8         10 delete $self->{_compression_cdict};
495 8         8 delete $self->{_decompression_ctx};
496 8         9 delete $self->{_decompression_ddict};
497 8         8 delete $self->{_compress_cache};
498              
499 8         14 return;
500             }
501              
502             sub set_compression_dictionary {
503 4     4 1 16 my ($self, $bytes) = @_;
504 4 50       10 if (defined $bytes) {
505             croak "compression_dictionary requires compression to be enabled"
506 4 50       8 unless defined $self->{+COMPRESSION};
507 4         6 $self->{+COMPRESSION_DICTIONARY} = $bytes;
508 4         6 delete $self->{+COMPRESSION_DICTIONARY_FILE};
509             }
510             else {
511 0         0 delete $self->{+COMPRESSION_DICTIONARY};
512             }
513 4         6 delete $self->{_compression_cdict};
514 4         4 delete $self->{_decompression_ddict};
515 4         4 delete $self->{_compress_cache};
516 4         6 return;
517             }
518              
519             sub set_compression_dictionary_file {
520 8     8 1 910 my ($self, $path) = @_;
521 8 100       14 if (defined $path) {
522             croak "compression_dictionary requires compression to be enabled"
523 4 50       10 unless defined $self->{+COMPRESSION};
524 4         8 $self->{+COMPRESSION_DICTIONARY_FILE} = $path;
525 4         6 delete $self->{+COMPRESSION_DICTIONARY};
526             }
527             else {
528 4         6 delete $self->{+COMPRESSION_DICTIONARY_FILE};
529             }
530 8         16 delete $self->{_compression_cdict};
531 8         20 delete $self->{_decompression_ddict};
532 8         8 delete $self->{_compress_cache};
533 8         12 return;
534             }
535              
536             sub set_keep_compressed {
537 4     4 1 10 my ($self, $val) = @_;
538 4 100       10 $self->{+KEEP_COMPRESSED} = $val ? 1 : 0;
539 4         6 return;
540             }
541              
542             sub get_line_burst_or_data {
543 194     194 1 2860560 my $self = shift;
544 194         613 my %params = @_;
545              
546 194   33     1000 my $rh = $self->{+RH} // croak "Not a read handle";
547              
548 194   33     1178 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
549 194   33     907 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
550 194   33     812 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
551              
552 194   100     4021 my $buffer = $self->{+MIXED_BUFFER} //= {
553             lines => '',
554             burst => '',
555             in_burst => 0,
556             in_message => 0,
557             strip_term => 0,
558             };
559              
560 194         353 my $peek;
561              
562 194         265 while (1) {
563             $self->throw_invalid('Incomplete message received before EOF')
564 385 100 66     1120 if $self->eof && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
565              
566 379 100 66     1315 if($buffer->{lines} || length($buffer->{lines})) {
567             # Look for a complete line
568 194         254 my ($line, $term);
569 194         2098 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
570              
571 194 100       636 return (line => "${line}${term}") if $term;
572 142 50 66     707 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
573              
574 118         153 $buffer->{lines} = $line;
575 118 50 66     226 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
576             }
577              
578 303 100       440 if ($buffer->{in_message}) {
579 43         261 my ($id, $message) = $self->_extract_message(one_part_only => 1);
580              
581 42 100       120 unless(defined $id) {
582             $self->throw_invalid('Incomplete burst data received before end of pipe')
583 1 50 33     4 if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
584              
585 1         1 my $before = $self->{+IN_BUFFER_SIZE};
586 1         3 $self->fill_buffer;
587 1 50 33     55 next if $self->{+EOF} || $self->{+IN_BUFFER_SIZE} > $before;
588              
589             # Not EOF and no new bytes arrived: another pass cannot make
590             # progress. Return empty so a non-blocking caller can wait and
591             # retry instead of spinning inside this call.
592 1         6 return;
593             }
594              
595 41         52 $buffer->{strip_term}++;
596 41         58 $buffer->{in_message} = 0;
597 41 100       89 if (defined $message) {
598 15 100       51 if ($self->{+COMPRESSION}) {
599 4         6 my $compressed = $message;
600 4         6 my $decompressed = $self->_decompress($compressed);
601             return (message => $decompressed, compressed => $compressed)
602 4 100       16 if $self->{+KEEP_COMPRESSED};
603 2         6 return (message => $decompressed);
604             }
605 11         55 return (message => $message);
606             }
607             }
608              
609 286 100       438 if ($buffer->{strip_term}) {
610 38   50     77 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
611              
612 38 50       77 $self->throw_invalid("No message terminator") unless $term eq $postfix;
613 38         47 $buffer->{strip_term}--;
614             }
615              
616 286 100       569 if ($buffer->{in_burst}) {
617 71   50     217 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
618              
619 71 100       159 if ($peek eq $key) {
620 42         210 $self->_get_from_buffer(1); # Strip the key
621 42         65 $buffer->{in_message} = 1;
622 42         81 $buffer->{in_burst} = 0;
623 42         84 next;
624             }
625              
626 29   100     311 $buffer->{burst} //= '';
627 29         50 my ($burst_data, $term);
628 29         541 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
629 29         76 $buffer->{burst} .= $burst_data;
630              
631 29 100       65 if ($term) {
632 20         51 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
633 20         28 $buffer->{in_burst} = 0;
634 20         47 my $compressed = delete $buffer->{burst};
635 20 100       46 if ($self->{+COMPRESSION}) {
636 8         17 my $decompressed = $self->_decompress($compressed);
637             return (burst => $decompressed, compressed => $compressed)
638 8 100       23 if $self->{+KEEP_COMPRESSED};
639 6         24 return (burst => $decompressed);
640             }
641 12         60 return (burst => $compressed);
642             }
643             else {
644 9         38 $self->{+IN_BUFFER_SIZE} = 0;
645             }
646              
647 9 100       83 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
648             }
649              
650 216 100 66     810 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
651 97 100 66     237 return (peek => $peek) if $peek && !$self->{+EOF};
652              
653 93 100       355 return unless $self->{+EOF};
654              
655             # Do at least one more iteration after EOF
656 54 100       245 return if $buffer->{+EOF}++;
657              
658             # But do not try to split the empty buffer
659 30         55 next;
660             }
661              
662             # Look for the start of a burst, anything before a burst is line data
663 119         326 my $linedata;
664 119         3701 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
665 119 50       416 $buffer->{lines} .= $linedata if defined $linedata;
666              
667 119 100       220 if ($buffer->{in_burst}) {
668 71         153 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
669             }
670             else {
671 48         89 $self->{+IN_BUFFER_SIZE} = 0;
672             }
673             }
674             }
675              
676             # This is a heavily modified version of a pattern suggested on stack-overflow
677             # and also used in Win32::PowerShell::IPC.
678             my $peek_named_pipe;
679             sub _win32_pipe_ready {
680 0     0   0 my $self = shift;
681 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
682              
683 0         0 my $buf = "";
684 0         0 my $buflen = 0;
685              
686 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
687             || die "Can't load PeekNamedPipe from kernel32.dll";
688              
689 0         0 my $got = pack('L', 0);
690 0         0 my $avail = pack('L', 0);
691 0         0 my $remain = pack('L', 0);
692              
693 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
694              
695 0 0       0 $self->{+EOF} = 1 if $ret == 0;
696              
697 0         0 return unpack('L', $avail);
698             }
699              
700             my $set_named_pipe_handle_state;
701             sub _win32_set_pipe_state {
702 0     0   0 my $self = shift;
703 0         0 my ($state) = @_;
704 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
705              
706 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
707             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
708              
709             # Block or non-block?
710 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
711              
712 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
713              
714 0         0 return $ret;
715             }
716              
717             sub read_blocking {
718 81     81 0 251 my $self = shift;
719 81 50       320 my $rh = $self->{+RH} or croak "Not a reader";
720              
721 81 50       437 ($self->{+READ_BLOCKING}) = @_ if @_;
722              
723 81         292 unless (IS_WIN32) {
724 81         1099 $rh->blocking(@_);
725             }
726              
727 81         156 return $self->{+READ_BLOCKING};
728             }
729              
730             sub write_blocking {
731 24     24 0 56 my $self = shift;
732 24 50       117 my $wh = $self->{+WH} or croak "Not a writer";
733              
734 24 50       142 return $self->{+WRITE_BLOCKING} unless @_;
735              
736 24         50 my ($val) = @_;
737 24         43 $self->{+WRITE_BLOCKING} = $val;
738              
739 24         36 if (IS_WIN32) {
740             $self->_win32_set_pipe_state(@_) if @_;
741             }
742             else {
743 24         147 my $flags = fcntl($wh, &Fcntl::F_GETFL, 0); # Get the current flags
744 24 50       45 die $! unless defined $flags;
745 24 100       70 if ($val) { $flags &= ~&Fcntl::O_NONBLOCK } # Clear O_NONBLOCK
  5         147  
746 19         50 else { $flags |= &Fcntl::O_NONBLOCK } # Set O_NONBLOCK
747 24 50       250 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!;
748             }
749              
750 24         73 return $self->{+WRITE_BLOCKING};
751             }
752              
753             sub blocking {
754 40     40 1 5273230 my $self = shift;
755              
756 40 100 100     803 if ($self->{+RH} && !$self->{+WH}) {
    100 66        
757 18         307 return $self->read_blocking(@_);
758             }
759             elsif ($self->{+WH} && !$self->{+RH}) {
760 21         103 return $self->write_blocking(@_);
761             }
762              
763 1         4 my $r = $self->read_blocking(@_);
764 1         3 my $w = $self->write_blocking(@_);
765              
766 1 50 33     3 return 1 if $r && $w;
767 1 50 33     4 return 0 if !$r && !$w;
768 0         0 return undef;
769             }
770              
771             sub size {
772 2     2 1 5646 my $self = shift;
773 2 50       5 return unless defined &Fcntl::F_GETPIPE_SZ;
774 2   33     12 my $fh = $self->{+WH} // $self->{+RH};
775 2         9 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
776             }
777              
778             sub resize {
779 10     10 1 26680 my $self = shift;
780 10         149 my ($size) = @_;
781              
782 10 50       197 return unless defined &Fcntl::F_SETPIPE_SZ;
783 10   66     279 my $fh = $self->{+WH} // $self->{+RH};
784              
785             # Force numeric: fcntl(F_SETPIPE_SZ, $string) is interpreted as
786             # buffer-mode and silently fails with EINVAL.
787 10         514 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size + 0);
788             }
789              
790             my $ONE_MB = 1 * 1024 * 1024;
791              
792             sub max_size {
793 3 50   3 1 508 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
794              
795 3 50       69 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
796 3         98 chomp(my $val = <$max>);
797 3         18 close($max);
798             # Force numeric. <$max> returns a string; passing it to
799             # fcntl(F_SETPIPE_SZ) directly triggers the same EINVAL bug
800             # resize() guards against. Numify here so any caller of
801             # max_size() that hands the result to fcntl gets an int.
802 3   33     34 return ($val + 0) || $ONE_MB;
803             }
804              
805             sub resize_or_max {
806 1     1 1 6 my $self = shift;
807 1         3 my ($size) = @_;
808 1         3 $size = min($size, $self->max_size);
809 1         4 $self->resize($size);
810             }
811              
812             sub is_reader {
813 9     9 1 1295 my $self = shift;
814 9 50 33     63 return 1 if $self->{+RH} && !$self->{+WH};
815 0         0 return undef;
816             }
817              
818             sub is_writer {
819 8     8 1 10 my $self = shift;
820 8 50 33     61 return 1 if $self->{+WH} && !$self->{+RH};
821 0         0 return undef;
822             }
823              
824             sub clone_writer {
825 6     6 1 17 my $self = shift;
826 6         8 my $class = blessed($self);
827 6 50       55 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
828 6         21 return bless({WH() => $fh}, $class);
829             }
830              
831             sub clone_reader {
832 0     0 1 0 my $self = shift;
833 0         0 my $class = blessed($self);
834 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
835 0         0 return bless({RH() => $fh}, $class);
836             }
837              
838             sub writer {
839 0     0 1 0 my $self = shift;
840              
841 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
842              
843 0 0       0 return 1 unless $self->{+RH};
844              
845 0         0 close(delete $self->{+RH});
846 0         0 return 1;
847             }
848              
849             sub reader {
850 3     3 1 23 my $self = shift;
851              
852 3 50       12 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
853              
854 3 50       11 return 1 unless $self->{+WH};
855              
856 3         10 $self->_flush_before_close;
857 3         31 close(delete $self->{+WH});
858 3         40 return 1;
859             }
860              
861             # Buffered bursts from non-blocking writes must not be silently dropped when
862             # the write handle goes away (DESTROY would also croak trying to flush them
863             # without a handle).
864             sub _flush_before_close {
865 48     48   89 my $self = shift;
866 48 50 33     516 return if $self->{+HIT_EPIPE} || $self->{+INVALID_STATE};
867 48 100       405 $self->flush(blocking => 1) if $self->pending_output;
868             }
869              
870             sub close {
871 47     47 1 42382 my $self = shift;
872 47 100       169 if ($self->{+WH}) {
873 45         207 $self->_flush_before_close;
874 45         1174 close(delete $self->{+WH});
875             }
876 47 100       1053 close(delete $self->{+RH}) if $self->{+RH};
877 47         225 return;
878             }
879              
880             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
881             my $dsize = PIPE_BUF - $psize;
882              
883             sub delimiter_size {
884 110 50   110 0 2719 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
885 110   100     3118 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
886             }
887              
888             # The typical fits_in_burst() then write_burst() sequence would compress the
889             # same payload twice; remember the last result.
890             sub _compress_cached {
891 13     13   21 my ($self, $data) = @_;
892              
893 13         17 my $cache = $self->{_compress_cache};
894 13 100 100     44 return $cache->[1] if $cache && $cache->[0] eq $data;
895              
896 12         24 my $out = $self->_compress($data);
897 12         87 $self->{_compress_cache} = [$data, $out];
898 12         33 return $out;
899             }
900              
901             sub fits_in_burst {
902 13     13 1 80 my $self = shift;
903 13         22 my ($data) = @_;
904              
905 13 100       37 $data = $self->_compress_cached($data) if $self->{+COMPRESSION};
906              
907 13   100     37 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
908 13 100       84 return undef unless $size <= PIPE_BUF;
909              
910 11         29 return $size;
911             }
912              
913             sub write_burst {
914 19765     19765 1 206210 my $self = shift;
915 19765         19714 my ($data) = @_;
916              
917 19765 100       23480 $data = $self->_compress_cached($data) if $self->{+COMPRESSION};
918              
919 19765   100     21798 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
920 19765 100       46673 return undef unless $size <= PIPE_BUF;
921              
922 19763   100     16462 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  19763         30633  
923 19763         27456 $self->flush();
924              
925 19761         20640 return 1;
926             }
927              
928             sub DESTROY {
929 386     386   19617187 my $self = shift;
930 386         11277 local ($., $@, $!, $^E, $?);
931 386 100 100     5069 return if $self->{+HIT_EPIPE} || $self->{+INVALID_STATE};
932 360 100 100     25517 $self->flush(blocking => 1) if $self->{+WH} && $self->pending_output;
933             }
934              
935             sub pending_output {
936 19946     19946 1 40725 my $self = shift;
937 19946 100       28186 my $buffer = $self->{+OUT_BUFFER} or return 0;
938 19847 100       45887 return 0 unless @$buffer;
939 9         54 return 1;
940             }
941              
942             sub flush {
943 220106     220106 1 307071 my $self = shift;
944 220106         415950 my %params = @_;
945 220106   100     1028046 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
946              
947 220106   50     460334 my $buffer = $self->{+OUT_BUFFER} // return;
948              
949 220106         409489 while (@$buffer) {
950 820225         1588095 my $set = $buffer->[0];
951 820225         1839333 my $got = $self->_write_burst(@$set);
952              
953 820219 100 100     1745524 return unless $blocking || defined $got;
954 819986 100       1369677 next unless defined $got;
955              
956 819984         2764324 shift @$buffer;
957             }
958              
959 219867         540628 return;
960             }
961              
962             sub _write_burst {
963 820223     820223   1095679 my $self = shift;
964 820223         1531260 my ($data, $size) = @_;
965              
966 820223 50       1800794 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
967              
968 820223 100       1987645 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
969              
970 820221   100     2477515 my $prefix = $self->{+BURST_PREFIX} // '';
971 820221   100     1950746 my $postfix = $self->{+BURST_POSTFIX} // '';
972              
973 820221 100 66     2317957 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
974              
975 820221         983600 my $wrote;
976             SWRITE: {
977 820221         968201 $wrote = syswrite($wh, $data, $size);
  820221         1053533669  
978              
979             # $! is only meaningful when syswrite fails.
980 820221 100       2624147 unless (defined $wrote) {
981 237 100 100     1543 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
982 2         3 $self->{+HIT_EPIPE} = 1;
983 2         2 delete $self->{+OUT_BUFFER};
984 2         258 croak "Disconnected pipe";
985             }
986 235 100 100     1000 return undef if $NONBLOCK_ERRNO{0 + $!} || (IS_WIN32 && $! == 28); # NON-BLOCKING
987 2 50       8 redo SWRITE if $RETRY_ERRNO{0 + $!};
988 2         26 $self->throw_invalid("syswrite failed: $!");
989             }
990              
991 819984 50       1470180 redo SWRITE unless $wrote;
992 819984 50       1843599 last SWRITE if $wrote == $size;
993 0         0 die "partial write: $wrote vs $size: $!";
994             }
995              
996 819984         2043959 return $wrote;
997             }
998              
999             sub _adjusted_dsize {
1000 95     95   512 my $self = shift;
1001              
1002 95 50       1703 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
1003              
1004 95   100     3568 my $message_key = $self->{+MESSAGE_KEY} // '';
1005 95   100     2678 my $prefix = $self->{+BURST_PREFIX} // '';
1006 95   100     2156 my $postfix = $self->{+BURST_POSTFIX} // '';
1007              
1008 95         4388 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
1009 95         2793 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
1010             }
1011              
1012             sub write_message {
1013 200125     200125 1 5057172 my $self = shift;
1014 200125         394897 my ($data) = @_;
1015              
1016 200125 100       488488 $data = $self->_compress($data) if $self->{+COMPRESSION};
1017              
1018 200125         296916 my $tid = _get_tid();
1019 200125   100     700939 my $message_key = $self->{+MESSAGE_KEY} // '';
1020 200125   66     460707 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
1021 200125         643007 my $dtotal = bytes::length($data);
1022              
1023 200125         1089944 my $parts = int($dtotal / $adjusted_dsize);
1024 200125 50       516580 $parts++ if $dtotal % $adjusted_dsize;
1025              
1026 200125         300639 my $id = $parts - 1;
1027              
1028             # Unwinding the loop for a 1-part message for micro-optimization
1029 200125 100       437148 if ($parts == 1) {
1030 78         201 my $bytes = $data;
1031 78         175 my $size = $dtotal;
1032 78         959 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
1033              
1034 78 100 100     1110 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
1035              
1036 78   100     823 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  78         632  
1037             }
1038             else {
1039 200047         516717 for (my $part = 0; $part < $parts; $part++) {
1040 800149         1616707 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
1041 800149         4927874 my $size = bytes::length($bytes);
1042              
1043 800149         5481152 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
1044              
1045 800149   100     1444241 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
1046 800149   100     2866473 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  800149         3439064  
1047             }
1048             }
1049              
1050 200125         624640 $self->flush();
1051 200125         916453 return $parts;
1052             }
1053              
1054             sub read_message {
1055 700181     700181 1 583508355 my $self = shift;
1056 700181         1119465 my %params = @_;
1057              
1058 700181         1422078 my ($id, $out) = $self->_extract_message(%params);
1059              
1060 700179 100       1197138 return unless defined $id;
1061              
1062 700173 100       2262368 return $out unless $self->{+COMPRESSION};
1063              
1064 31 100       46 if ($params{debug}) {
1065 4         6 my $compressed = $out->{message};
1066 4         10 $out->{message} = $self->_decompress($compressed);
1067 4 100       38 $out->{compressed} = $compressed if $self->{+KEEP_COMPRESSED};
1068 4         14 return $out;
1069             }
1070              
1071 27         55 my $compressed = $out;
1072 27         58 my $decompressed = $self->_decompress($compressed);
1073              
1074 25 100 100     59 return ($decompressed, $compressed) if $self->{+KEEP_COMPRESSED} && wantarray;
1075 23         124 return $decompressed;
1076             }
1077              
1078             sub _extract_message {
1079 700224     700224   819712 my $self = shift;
1080 700224         792140 my %params = @_;
1081              
1082 700224   100     1788201 my $state = $self->{+STATE} //= {};
1083              
1084 700224         776112 while (1) {
1085 2800358 100       4180105 unless ($state->{key}) {
1086 2800357         3975983 my $key_bytes = $self->_get_from_buffer($psize);
1087 2800357 100 66     6285830 unless (defined($key_bytes) && length($key_bytes)) {
1088             # Leftover bytes smaller than a header at EOF mean a writer
1089             # died mid-message; a plain return here would look like a
1090             # clean EOF and silently drop data.
1091             $self->throw_invalid("EOF inside message header (truncated message)")
1092 7 100 100     57 if $self->{+EOF} && $self->{+IN_BUFFER_SIZE};
1093 6         14 return;
1094             }
1095              
1096 2800350         2881179 my %key;
1097 2800350         7614039 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
1098 2800350         4140742 $state->{key} = \%key;
1099             }
1100              
1101 2800351         3030967 my $key = $state->{key};
1102              
1103 2800351   100     3890464 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
1104              
1105 2800348         3138290 my $id = $key->{id};
1106 2800348         2857314 my $tag = join ':' => @{$key}{qw/pid tid/};
  2800348         5489944  
1107 2800348   100     2782824 push @{$state->{parts}->{$tag} //= []} => $id;
  2800348         6204987  
1108 2800348 100       15114556 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
1109              
1110 2800348         3329969 delete $state->{key};
1111              
1112 2800348 100       3838813 unless ($id == 0) {
1113 2100160 100       2896598 return ($id, undef) if $params{one_part_only};
1114 2100134         3880129 next;
1115             }
1116              
1117 700188         1742425 my $message = delete $state->{buffers}->{$tag};
1118 700188         997120 my $parts = delete $state->{parts}->{$tag};
1119              
1120 700188 100       4058026 return ($id, $message) unless $params{debug};
1121              
1122             return (
1123             $id,
1124             {
1125             message => $message,
1126             parts => $parts,
1127             pid => $key->{pid},
1128             tid => $key->{tid},
1129             },
1130 13         380 );
1131             }
1132             }
1133              
1134             1;
1135              
1136             __END__