File Coverage

blib/lib/Atomic/Pipe.pm
Criterion Covered Total %
statement 591 669 88.3
branch 234 330 70.9
condition 158 277 57.0
subroutine 95 101 94.0
pod 41 46 89.1
total 1119 1423 78.6


line stmt bran cond sub pod time code
1             package Atomic::Pipe;
2 67     67   9926695 use strict;
  67         96  
  67         1856  
3 67     67   632 use warnings;
  67         143  
  67         3203  
4              
5             our $VERSION = '0.029';
6              
7 67     67   23199 use IO();
  67         49282  
  67         1510  
8 67     67   27303 use IO::Handle();
  67         270435  
  67         1503  
9 67     67   369 use Fcntl();
  67         101  
  67         619  
10 67     67   24518 use bytes();
  67         24222  
  67         4256  
11              
12             BEGIN {
13 67 50   67   132 if (eval { require IO::Select; 1 }) {
  67         24609  
  67         85218  
14 67         1956 *HAVE_IO_SELECT = sub() { 1 };
15             }
16             else {
17 0         0 *HAVE_IO_SELECT = sub() { 0 };
18             }
19             }
20              
21 67     67   358 use Carp qw/croak confess/;
  67         112  
  67         3329  
22 67     67   238 use Config qw/%Config/;
  67         89  
  67         1844  
23 67     67   213 use List::Util qw/min/;
  67         90  
  67         3003  
24 67     67   217 use Scalar::Util qw/blessed/;
  67         86  
  67         2056  
25              
26 67     67   24855 use Errno qw/EINTR EAGAIN EPIPE/;
  67         93970  
  67         9116  
27             my %RETRY_ERRNO;
28             BEGIN {
29 67     67   260 %RETRY_ERRNO = (EINTR() => 1);
30 67 50       31066 $RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
31             }
32              
33             BEGIN {
34             # POSIX says writes of 512 or less are atomic, but some platforms allow for
35             # larger ones.
36 67     67   26892 require POSIX;
37 67 50 33     401583 if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
  67         671  
38 67         413 *PIPE_BUF = \&POSIX::PIPE_BUF;
39             }
40             else {
41 0         0 *PIPE_BUF = sub() { 512 };
42             }
43              
44 67 50 33     334 if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
  67         660  
45 67         142 *SSIZE_MAX = \&POSIX::SSIZE_MAX;
46             }
47             else {
48 0         0 *SSIZE_MAX = sub() { 512 };
49             }
50              
51             {
52             # Using the default pipe size as a read size is significantly faster
53             # than a larger value on my test machine.
54 67         126 my $read_size = min(SSIZE_MAX(), 65_536);
  67         279  
55 67         562 *DEFAULT_READ_SIZE = sub() { $read_size };
  0         0  
56             }
57              
58 67         123 my $can_thread = 1;
59 67   33     350 $can_thread &&= $] >= 5.008001;
60 67   33     766 $can_thread &&= $Config{'useithreads'};
61              
62             # Threads are broken on perl 5.10.0 built with gcc 4.8+
63 67 0 33     419 if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
      33        
      0        
64 0         0 my @parts = split /\./, $Config{'gccversion'};
65 0 0 0     0 $can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
      0        
66             }
67              
68 67   33     296 $can_thread &&= !$INC{'Devel/Cover.pm'};
69              
70 67 50       138 if (!$can_thread) {
    0          
71 67         170 *_get_tid = sub() { 0 };
72             }
73             elsif ($INC{'threads.pm'}) {
74 0         0 *_get_tid = sub() { threads->tid() };
  0         0  
75             }
76             else {
77 0 0       0 *_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
  0         0  
78             }
79              
80 67 50       211 if ($^O eq 'MSWin32') {
81 0         0 local $@;
82 0 0       0 eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
  0         0  
  0         0  
83 0 0       0 eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
  0         0  
  0         0  
84 0         0 *IS_WIN32 = sub() { 1 };
85             }
86             else {
87 67         2207 *IS_WIN32 = sub() { 0 };
88             }
89             }
90              
91 67     67   373 use constant READ_SIZE => 'read_size';
  67         74  
  67         4669  
92 67     67   291 use constant RH => 'rh';
  67         78  
  67         2465  
93 67     67   263 use constant WH => 'wh';
  67         93  
  67         2136  
94 67     67   233 use constant EOF => 'eof';
  67         94  
  67         2026  
95 67     67   241 use constant STATE => 'state';
  67         85  
  67         2429  
96 67     67   257 use constant OUT_BUFFER => 'out_buffer';
  67         82  
  67         2216  
97 67     67   320 use constant IN_BUFFER => 'in_buffer';
  67         82  
  67         2539  
98 67     67   351 use constant IN_BUFFER_SIZE => 'in_buffer_size';
  67         81  
  67         2526  
99 67     67   253 use constant READ_BLOCKING => 'read_blocking';
  67         143  
  67         2507  
100 67     67   870 use constant WRITE_BLOCKING => 'write_blocking';
  67         228  
  67         2194  
101 67     67   332 use constant BURST_PREFIX => 'burst_prefix';
  67         93  
  67         2698  
102 67     67   481 use constant BURST_POSTFIX => 'burst_postfix';
  67         82  
  67         2798  
103 67     67   275 use constant ADJUSTED_DSIZE => 'adjusted_dsize';
  67         83  
  67         3075  
104 67     67   242 use constant MESSAGE_KEY => 'message_key';
  67         81  
  67         2655  
105 67     67   340 use constant MIXED_BUFFER => 'mixed_buffer';
  67         115  
  67         2704  
106 67     67   271 use constant DELIMITER_SIZE => 'delimiter_size';
  67         82  
  67         2371  
107 67     67   244 use constant INVALID_STATE => 'invalid_state';
  67         86  
  67         2194  
108 67     67   239 use constant HIT_EPIPE => 'hit_epipe';
  67         186  
  67         2851  
109 67     67   272 use constant USE_IO_SELECT => 'use_io_select';
  67         58  
  67         2937  
110 67     67   304 use constant COMPRESSION => 'compression';
  67         67  
  67         2501  
111 67     67   272 use constant COMPRESSION_LEVEL => 'compression_level';
  67         104  
  67         2721  
112 67     67   259 use constant COMPRESSION_DICTIONARY => 'compression_dictionary';
  67         270  
  67         2317  
113 67     67   225 use constant COMPRESSION_DICTIONARY_FILE => 'compression_dictionary_file';
  67         100  
  67         2669  
114 67     67   257 use constant KEEP_COMPRESSED => 'keep_compressed';
  67         93  
  67         437437  
115              
116 65     65 1 1297 sub wh { shift->{+WH} }
117 7     7 1 18368 sub rh { shift->{+RH} }
118              
119             sub throw_invalid {
120 34     34 0 76 my $self = shift;
121 34 50 66     306 $self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
122 34         17882 confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
123             }
124              
125             sub read_size {
126 0     0 1 0 my $self = shift;
127 0 0       0 ($self->{+READ_SIZE}) = @_ if @_;
128 0   0     0 return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
129             }
130              
131             sub use_io_select {
132 170666     170666 1 183530 my $self = shift;
133 170666 100       252571 if (@_) {
134 27 50 100     113 croak "IO::Select is not installed, cannot enable use_io_select" if $_[0] && !HAVE_IO_SELECT;
135 27 100       133 $self->{+USE_IO_SELECT} = $_[0] ? 1 : 0;
136 27 100       89 delete $self->{_select} unless $_[0];
137             }
138 170666         182733 return 0 unless HAVE_IO_SELECT;
139 170666         212589 my $val = $self->{+USE_IO_SELECT};
140 170666 100       372229 return defined($val) ? ($val ? 1 : 0) : IS_WIN32 ? 0 : 1;
    100          
141             }
142              
143 16     16 1 76 sub compression { $_[0]->{+COMPRESSION} }
144 6     6 1 24 sub compression_level { $_[0]->{+COMPRESSION_LEVEL} }
145 10     10 1 54 sub compression_dictionary { $_[0]->{+COMPRESSION_DICTIONARY} }
146 8     8 1 88 sub compression_dictionary_file { $_[0]->{+COMPRESSION_DICTIONARY_FILE} }
147 10 100   10 1 42 sub keep_compressed { $_[0]->{+KEEP_COMPRESSED} ? 1 : 0 }
148              
149             sub fill_buffer {
150 170857     170857 1 180846 my $self = shift;
151              
152 170857 50       276368 $self->throw_invalid() if $self->{+INVALID_STATE};
153              
154 170857 50       314209 my $rh = $self->{+RH} or die "Not a read handle";
155              
156 170857 100       264582 return 0 if $self->{+EOF};
157              
158 170639   100     243247 $self->{+IN_BUFFER_SIZE} //= 0;
159              
160 170639   50     400704 my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
161              
162 170639         290476 my $use_select = $self->use_io_select;
163              
164 170639 100       279386 if ($use_select) {
165 146406   66     234891 my $sel = $self->{_select} //= IO::Select->new($rh);
166 146406   100     339794 my $blocking = $self->{+READ_BLOCKING} // 1;
167 146406 100       453912 my @ready = $sel->can_read($blocking ? undef : 0);
168 146406 100       4600969 return 0 unless @ready;
169             }
170 0         0 elsif (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
171             $to_read = min($self->_win32_pipe_ready(), $to_read);
172             }
173              
174 170468 50       256465 return 0 unless $to_read;
175              
176 170468         164111 while (1) {
177 170468         189753 my $rbuff = '';
178 170468         8848063 my $got = sysread($rh, $rbuff, $to_read);
179 170468 100       449579 unless(defined $got) {
180 93 50       1410 return 0 if $! == EAGAIN; # NON-BLOCKING
181 0 0       0 if ($RETRY_ERRNO{0 + $!}) {
182 0 0       0 next unless $use_select; # retry on EINTR in fallback mode
183 0         0 return 0; # IO::Select handles EINTR
184             }
185 0         0 $self->throw_invalid("$!");
186             }
187              
188 170375 100       252815 if ($got) {
189 170331         2444575 $self->{+IN_BUFFER} .= $rbuff;
190 170331         247352 $self->{+IN_BUFFER_SIZE} += $got;
191 170331         366467 return $got;
192             }
193             else {
194 44         417 $self->{+EOF} = 1;
195 44         161 return 0;
196             }
197             }
198              
199 0         0 return 0;
200             }
201              
202 5600768     5600768   7662694 sub _get_from_buffer { $_[0]->_from_buffer($_[1], remove => 1) }
203 66     66   191 sub _peek_from_buffer { shift->_from_buffer(@_) }
204              
205             sub _from_buffer {
206 5600834     5600834   5816303 my $self = shift;
207 5600834         7842018 my ($size, %params) = @_;
208              
209 5600834 100 66     13092715 unless ($self->{+IN_BUFFER_SIZE} && $self->{+IN_BUFFER_SIZE} >= $size) {
210 170269         313162 $self->fill_buffer;
211 170269 100       373065 unless($self->{+IN_BUFFER_SIZE} >= $size) {
212 4 50 33     133 return unless $params{eof_invalid} && $self->{+EOF};
213 0         0 $self->throw_invalid($params{eof_invalid});
214             }
215             }
216              
217 5600830         5507249 my $out;
218              
219 5600830 100       6826133 if ($params{remove}) {
220 5600764         5769784 $self->{+IN_BUFFER_SIZE} -= $size;
221 5600764         11236296 $out = substr($self->{+IN_BUFFER}, 0, $size, '');
222             }
223             else {
224 66         202 $out = substr($self->{+IN_BUFFER}, 0, $size);
225             }
226              
227 5600830         11471541 return $out;
228             }
229              
230             sub _has_dict {
231             return defined($_[0]->{+COMPRESSION_DICTIONARY})
232 92   100 92   401 || defined($_[0]->{+COMPRESSION_DICTIONARY_FILE});
233             }
234              
235             # NOTE: raw zstd dictionaries do not embed a dict-ID, so a mismatched peer
236             # dict will silently decode to garbage rather than fail. Both ends must agree
237             # on byte-identical dictionary content.
238             sub _build_cdict {
239 10     10   12 my $self = shift;
240 10   50     30 my $level = $self->{+COMPRESSION_LEVEL} // 3;
241 10         38 require Compress::Zstd::CompressionDictionary;
242 10 100       28 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
243 4         20 return Compress::Zstd::CompressionDictionary->new_from_file($path, $level);
244             }
245 6         146 return Compress::Zstd::CompressionDictionary->new($self->{+COMPRESSION_DICTIONARY}, $level);
246             }
247              
248             sub _build_ddict {
249 10     10   12 my $self = shift;
250 10         32 require Compress::Zstd::DecompressionDictionary;
251 10 100       24 if (defined(my $path = $self->{+COMPRESSION_DICTIONARY_FILE})) {
252 4         20 return Compress::Zstd::DecompressionDictionary->new_from_file($path);
253             }
254 6         82 return Compress::Zstd::DecompressionDictionary->new($self->{+COMPRESSION_DICTIONARY});
255             }
256              
257             sub _compress {
258 45     45   138 my ($self, $data) = @_;
259 45 100       85 if ($self->_has_dict) {
260 10         60 require Compress::Zstd::CompressionContext;
261 10   66     204 my $ctx = $self->{_compression_ctx} //= Compress::Zstd::CompressionContext->new;
262 10   33     38 my $cdict = $self->{_compression_cdict} //= $self->_build_cdict;
263 10         704 return $ctx->compress_using_dict($data, $cdict);
264             }
265 35   100     2562 return Compress::Zstd::compress($data, $self->{+COMPRESSION_LEVEL} // 3);
266             }
267              
268             sub _decompress {
269 47     47   1535 my ($self, $data) = @_;
270 47         54 my $out;
271 47 100       70 if ($self->_has_dict) {
272 12         52 require Compress::Zstd::DecompressionContext;
273 12   66     244 my $ctx = $self->{_decompression_ctx} //= Compress::Zstd::DecompressionContext->new;
274 12   66     46 my $ddict = $self->{_decompression_ddict} //= $self->_build_ddict;
275 12         430 $out = $ctx->decompress_using_dict($data, $ddict);
276             }
277             else {
278 35         861 $out = Compress::Zstd::decompress($data);
279             }
280 47 100       120 $self->throw_invalid("zstd decompression failed") unless defined $out;
281 41         84 return $out;
282             }
283              
284             sub eof {
285 506     506 1 8156783 my $self = shift;
286              
287 506 100       1212 $self->throw_invalid() if $self->{+INVALID_STATE};
288              
289 492 100       1508 return 0 if $self->fill_buffer;
290 426 100       1347 return 0 unless $self->{+EOF};
291 208 100       537 return 0 if $self->{+IN_BUFFER_SIZE};
292              
293 148 50       327 if (my $buffer = $self->{+MIXED_BUFFER}) {
294 148 100 66     888 return 0 if $buffer->{lines} || defined($buffer->{lines}) && length($buffer->{lines});
      66        
295 100 50 66     383 return 0 if $buffer->{burst} || defined($buffer->{lines}) && length($buffer->{burst});
      33        
296             }
297              
298 100         368 return 1;
299             }
300              
301             sub _fh_mode {
302 8     8   1760 my $self = shift;
303 8         10 my ($fh) = @_;
304              
305 8   50     36 my $mode = fcntl($fh, Fcntl::F_GETFL(), 0) // return undef;
306 8 100       31 return '<&' if $mode == Fcntl::O_RDONLY();
307 4 50       16 return '>&' if $mode == Fcntl::O_WRONLY();
308 0         0 return undef;
309             }
310              
311             my %MODE_TO_DIR = (
312             '<&' => RH(),
313             '<&=' => RH(),
314             '>&' => WH(),
315             '>&=' => WH(),
316             );
317             sub _mode_to_dir {
318 18     18   20 my $self = shift;
319 18         21 my ($mode) = @_;
320 18         54 return $MODE_TO_DIR{$mode};
321             }
322              
323             sub _check_params {
324 190     190   737 my ($class, %params) = @_;
325             croak "IO::Select is not installed, cannot enable use_io_select"
326 190 50 100     1177 if $params{+USE_IO_SELECT} && !HAVE_IO_SELECT;
327              
328 190 100       740 if (defined(my $algo = $params{+COMPRESSION})) {
329 34 100       346 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
330             croak "compression => 'zstd' requires Compress::Zstd"
331 32 50       46 unless eval { require Compress::Zstd; 1 };
  32         182  
  32         74  
332             }
333              
334             croak "compression_dictionary and compression_dictionary_file are mutually exclusive"
335 188 100 100     2525 if defined($params{+COMPRESSION_DICTIONARY}) && defined($params{+COMPRESSION_DICTIONARY_FILE});
336              
337             croak "compression_dictionary requires compression to be enabled"
338             if (defined($params{+COMPRESSION_DICTIONARY}) || defined($params{+COMPRESSION_DICTIONARY_FILE}))
339 186 100 100     2760 && !defined($params{+COMPRESSION});
      100        
340             }
341              
342             sub read_fifo {
343 22     22 1 49589941 my $class = shift;
344 22         107 my ($fifo, %params) = @_;
345              
346 22         195 $class->_check_params(%params);
347 22 50       477 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
348              
349 22 50       995 open(my $fh, '+<', $fifo) or die "Could not open fifo ($fifo) for reading: $!";
350 22         110 binmode($fh);
351              
352 22         207 return bless({%params, RH() => $fh}, $class);
353             }
354              
355             sub write_fifo {
356 13     13 1 202829 my $class = shift;
357 13         442 my ($fifo, %params) = @_;
358              
359 13         960 $class->_check_params(%params);
360 13 50       1122 croak "File '$fifo' is not a pipe (-p check)" unless -p $fifo;
361              
362 13 50       2816 open(my $fh, '>', $fifo) or die "Could not open fifo ($fifo) for writing: $!";
363 13         2148 binmode($fh);
364              
365 13         880 return bless({%params, WH() => $fh}, $class);
366             }
367              
368             sub from_fh {
369 10     10 1 318131 my $class = shift;
370 10         14 my $ifh = pop;
371 10         16 my ($mode) = @_;
372              
373 10 50       48 croak "Filehandle is not a pipe (-p check)" unless -p $ifh;
374              
375 10   33     24 $mode //= $class->_fh_mode($ifh) // croak "Could not determine filehandle mode, please specify '>&' or '<&'";
      66        
376 10   33     19 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
377              
378 10 50       108 open(my $fh, $mode, $ifh) or croak "Could not clone ($mode) filehandle: $!";
379 10         14 binmode($fh);
380              
381 10         36 return bless({$dir => $fh}, $class);
382             }
383              
384             sub from_fd {
385 8     8 1 4209 my $class = shift;
386 8         12 my ($mode, $fd) = @_;
387              
388 8   33     12 my $dir = $class->_mode_to_dir($mode) // croak "Invalid mode: $mode";
389 8 50       124 open(my $fh, $mode, $fd) or croak "Could not open ($mode) fd$fd: $!";
390              
391 8 50       31 croak "Filehandle is not a pipe (-p check)" unless -p $fh;
392              
393 8         12 binmode($fh);
394 8         34 return bless({$dir => $fh}, $class);
395             }
396              
397             sub new {
398 10     10 1 169516 my $class = shift;
399 10         79 my (%params) = @_;
400              
401 10         34 $class->_check_params(%params);
402              
403 2         2 my ($rh, $wh);
404 2 50       67 pipe($rh, $wh) or die "Could not create pipe: $!";
405              
406 2         4 binmode($wh);
407 2         3 binmode($rh);
408              
409 2         15 return bless({%params, RH() => $rh, WH() => $wh}, $class);
410             }
411              
412             sub pair {
413 145     145 1 55706364 my $class = shift;
414 145         692 my (%params) = @_;
415              
416 145         941 $class->_check_params(%params);
417              
418 145         310 my $mixed = delete $params{mixed_data_mode};
419              
420 145         250 my ($rh, $wh);
421 145 50       5359 pipe($rh, $wh) or die "Could not create pipe: $!";
422              
423 145         396 binmode($wh);
424 145         248 binmode($rh);
425              
426 145         868 my $r = bless({%params, RH() => $rh}, $class);
427 145         550 my $w = bless({%params, WH() => $wh}, $class);
428              
429 145 100       395 if ($mixed) {
430 52         281 $r->set_mixed_data_mode();
431 52         106 $w->set_mixed_data_mode();
432             }
433              
434 145         733 return ($r, $w);
435             }
436              
437             sub set_mixed_data_mode {
438 104     104 1 167 my $self = shift;
439              
440 104 50       271 $self->throw_invalid() if $self->{+INVALID_STATE};
441              
442 104 100       460 $self->read_blocking(0) if $self->{+RH};
443              
444 104   50     497 $self->{+BURST_PREFIX} //= "\x0E"; # Shift out
445 104   50     438 $self->{+BURST_POSTFIX} //= "\x0F"; # Shift in
446 104   50     320 $self->{+MESSAGE_KEY} //= "\x10"; # Data link escape
447             }
448              
449             sub set_compression {
450 16     16 1 159 my $self = shift;
451 16         48 my ($algo, $level) = @_;
452              
453 16 100       37 if (!defined $algo) {
454 6         10 delete $self->{+COMPRESSION};
455 6         10 delete $self->{+COMPRESSION_LEVEL};
456 6         10 delete $self->{_compression_ctx};
457 6         6 delete $self->{_compression_cdict};
458 6         10 delete $self->{_decompression_ctx};
459 6         10 delete $self->{_decompression_ddict};
460 6         8 return;
461             }
462              
463 10 100       351 croak "Unknown compression algorithm '$algo'" unless $algo eq 'zstd';
464             croak "compression => 'zstd' requires Compress::Zstd"
465 8 50       13 unless eval { require Compress::Zstd; 1 };
  8         65  
  8         20  
466              
467 8         26 $self->{+COMPRESSION} = $algo;
468             # Omitted $level preserves the previously set level; pass undef to
469             # set_compression(undef) first if a full reset back to the default is desired.
470 8 100       18 $self->{+COMPRESSION_LEVEL} = $level if defined $level;
471              
472             # Cached objects depend on level / dict; force rebuild.
473 8         12 delete $self->{_compression_ctx};
474 8         11 delete $self->{_compression_cdict};
475 8         14 delete $self->{_decompression_ctx};
476 8         8 delete $self->{_decompression_ddict};
477              
478 8         14 return;
479             }
480              
481             sub set_compression_dictionary {
482 4     4 1 18 my ($self, $bytes) = @_;
483 4 50       8 if (defined $bytes) {
484             croak "compression_dictionary requires compression to be enabled"
485 4 50       8 unless defined $self->{+COMPRESSION};
486 4         6 $self->{+COMPRESSION_DICTIONARY} = $bytes;
487 4         4 delete $self->{+COMPRESSION_DICTIONARY_FILE};
488             }
489             else {
490 0         0 delete $self->{+COMPRESSION_DICTIONARY};
491             }
492 4         6 delete $self->{_compression_cdict};
493 4         6 delete $self->{_decompression_ddict};
494 4         6 return;
495             }
496              
497             sub set_compression_dictionary_file {
498 8     8 1 926 my ($self, $path) = @_;
499 8 100       14 if (defined $path) {
500             croak "compression_dictionary requires compression to be enabled"
501 4 50       14 unless defined $self->{+COMPRESSION};
502 4         6 $self->{+COMPRESSION_DICTIONARY_FILE} = $path;
503 4         8 delete $self->{+COMPRESSION_DICTIONARY};
504             }
505             else {
506 4         6 delete $self->{+COMPRESSION_DICTIONARY_FILE};
507             }
508 8         12 delete $self->{_compression_cdict};
509 8         20 delete $self->{_decompression_ddict};
510 8         12 return;
511             }
512              
513             sub set_keep_compressed {
514 4     4 1 10 my ($self, $val) = @_;
515 4 100       10 $self->{+KEEP_COMPRESSED} = $val ? 1 : 0;
516 4         6 return;
517             }
518              
519             sub get_line_burst_or_data {
520 188     188 1 2868055 my $self = shift;
521 188         1093 my %params = @_;
522              
523 188   33     738 my $rh = $self->{+RH} // croak "Not a read handle";
524              
525 188   33     929 my $prefix = $self->{+BURST_PREFIX} // croak "missing 'burst_prefix', not in mixed_data_mode";
526 188   33     825 my $postfix = $self->{+BURST_POSTFIX} // croak "missing 'burst_postfix', not in mixed_data_mode";
527 188   33     685 my $key = $self->{+MESSAGE_KEY} // croak "missing 'message_key', not in mixed_data_mode";
528              
529 188   100     3813 my $buffer = $self->{+MIXED_BUFFER} //= {
530             lines => '',
531             burst => '',
532             in_burst => 0,
533             in_message => 0,
534             do_extra_loop => 0,
535             strip_term => 0,
536             };
537              
538 188         364 my $peek;
539              
540 188         283 while (1) {
541             $self->throw_invalid('Incomplete message received before EOF')
542 364 100 66     1241 if $self->eof && (keys(%{$self->{+STATE}->{buffers}}) || keys (%{$self->{+STATE}->{parts}}));
      66        
543              
544 358 100 66     1555 if($buffer->{lines} || length($buffer->{lines})) {
545             # Look for a complete line
546 186         263 my ($line, $term);
547 186         2755 ($line, $term, $buffer->{lines}) = split /(\r?\n|\r\n?)/, $buffer->{lines}, 2;
548              
549 186 100       874 return (line => "${line}${term}") if $term;
550 134 50 66     668 return (line => $line) if $self->{+EOF} && !$self->{+IN_BUFFER_SIZE} && defined($line) && length($line);
      66        
      33        
551              
552 110         142 $buffer->{lines} = $line;
553 110 50 66     240 $peek = $line if $params{peek_line} && defined($line) && length($line);
      66        
554             }
555              
556 282 100       688 if ($buffer->{in_message}) {
557 40         232 my ($id, $message) = $self->_extract_message(one_part_only => 1);
558              
559 40 50       113 unless(defined $id) {
560 0 0 0     0 next unless $self->{+EOF} && !$self->{+IN_BUFFER_SIZE};
561 0         0 $self->throw_invalid('Incomplete burst data received before end of pipe');
562             }
563              
564 40         66 $buffer->{strip_term}++;
565 40         64 $buffer->{in_message} = 0;
566 40 100       96 if (defined $message) {
567 14 100       28 if ($self->{+COMPRESSION}) {
568 4         6 my $compressed = $message;
569 4         8 my $decompressed = $self->_decompress($compressed);
570             return (message => $decompressed, compressed => $compressed)
571 4 100       14 if $self->{+KEEP_COMPRESSED};
572 2         8 return (message => $decompressed);
573             }
574 10         60 return (message => $message);
575             }
576             }
577              
578 268 100       676 if ($buffer->{strip_term}) {
579 38   50     95 my $term = $self->_get_from_buffer(1, eof_invalid => 'EOF before message terminator') // return;
580              
581 38 50       408 $self->throw_invalid("No message terminator") unless $term eq $postfix;
582 38         76 $buffer->{strip_term}--;
583             }
584              
585 268 100       735 if ($buffer->{in_burst}) {
586 66   50     378 my $peek = $self->_peek_from_buffer(1, eof_invalid => 'Incomplete burst data received before end of pipe') // next;
587              
588 66 100       183 if ($peek eq $key) {
589 40         471 $self->_get_from_buffer(1); # Strip the key
590 40         62 $buffer->{in_message} = 1;
591 40         91 $buffer->{in_burst} = 0;
592 40         91 next;
593             }
594              
595 26   50     58 $buffer->{burst} //= '';
596 26         50 my ($burst_data, $term);
597 26         414 ($burst_data, $term, $self->{+IN_BUFFER}) = split /(\Q$postfix\E)/, $self->{+IN_BUFFER}, 2;
598 26         70 $buffer->{burst} .= $burst_data;
599              
600 26 100       58 if ($term) {
601 18         38 $self->{+IN_BUFFER_SIZE} = length($self->{+IN_BUFFER});
602 18         30 $buffer->{in_burst} = 0;
603 18         22 $buffer->{do_extra_loop}++;
604 18         36 my $compressed = delete $buffer->{burst};
605 18 100       98 if ($self->{+COMPRESSION}) {
606 6         12 my $decompressed = $self->_decompress($compressed);
607             return (burst => $decompressed, compressed => $compressed)
608 6 100       24 if $self->{+KEEP_COMPRESSED};
609 4         12 return (burst => $decompressed);
610             }
611 12         124 return (burst => $compressed);
612             }
613             else {
614 8         48 $self->{+IN_BUFFER_SIZE} = 0;
615             }
616              
617 8 50       116 $self->throw_invalid('Incomplete burst data received before end of pipe') if $self->{+EOF};
618             }
619              
620 202 100 66     989 unless ($self->{+IN_BUFFER_SIZE} || $self->fill_buffer()) {
621 96 100 66     303 return (peek => $peek) if $peek && !$self->{+EOF};
622              
623 92 100       371 return unless $self->{+EOF};
624              
625             # Do at least one more iteration after EOF
626 54 100       572 return if $buffer->{+EOF}++;
627              
628             # But do not try to split the empty buffer
629 30         87 next;
630             }
631              
632             # Look for the start of a burst, anything before a burst is line data
633 106         367 my $linedata;
634 106         2685 ($linedata, $buffer->{in_burst}, $self->{+IN_BUFFER}) = split /(\Q$prefix\E)/, $self->{+IN_BUFFER}, 2;
635 106 50       482 $buffer->{lines} .= $linedata if defined $linedata;
636              
637 106 100       254 if ($buffer->{in_burst}) {
638 66         183 $self->{+IN_BUFFER_SIZE} -= length($linedata) + length($buffer->{in_burst});
639             }
640             else {
641 40         83 $self->{+IN_BUFFER_SIZE} = 0;
642             }
643             }
644             }
645              
646             sub debug {
647 0     0 0 0 my ($id, $buffer) = @_;
648              
649 0         0 print "---debug $id---\n";
650 0         0 for my $key (sort keys %$buffer) {
651 0   0     0 my $val = $buffer->{$key} // '';
652 0         0 $val =~ s/\x0E/\\x0E/g;
653 0         0 $val =~ s/\x0F/\\x0F/g;
654 0         0 $val =~ s/\x10/\\x10/g;
655 0         0 $val =~ s/\n/\\n/g;
656 0         0 $val =~ s/\r/\\r/g;
657 0         0 print "$key: |$val|\n\n";
658             };
659             }
660              
661             # This is a heavily modified version of a pattern suggested on stack-overflow
662             # and also used in Win32::PowerShell::IPC.
663             my $peek_named_pipe;
664             sub _win32_pipe_ready {
665 0     0   0 my $self = shift;
666 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+RH}));
667              
668 0         0 my $buf = "";
669 0         0 my $buflen = 0;
670              
671 0   0     0 $peek_named_pipe //= Win32::API->new("kernel32", 'PeekNamedPipe', 'NPIPPP', 'N')
      0        
672             || die "Can't load PeekNamedPipe from kernel32.dll";
673              
674 0         0 my $got = pack('L', 0);
675 0         0 my $avail = pack('L', 0);
676 0         0 my $remain = pack('L', 0);
677              
678 0         0 my $ret = $peek_named_pipe->Call($wh, $buf, $buflen, $got, $avail, $remain);
679              
680 0 0       0 $self->{+EOF} = 1 if $ret == 0;
681              
682 0         0 return unpack('L', $avail);
683             }
684              
685             my $set_named_pipe_handle_state;
686             sub _win32_set_pipe_state {
687 0     0   0 my $self = shift;
688 0         0 my ($state) = @_;
689 0         0 my $wh = Win32API::File::FdGetOsFHandle(fileno($self->{+WH}));
690              
691 0   0     0 $set_named_pipe_handle_state //= Win32::API->new("kernel32", 'SetNamedPipeHandleState', 'NPPP', 'N')
      0        
692             || die "Can't load SetNamedPipeHandleState from kernel32.dll";
693              
694             # Block or non-block?
695 0 0       0 my $lpmode = $state ? pack('L', 0x00000000) : pack('L', 0x00000001);
696              
697 0         0 my $ret = $set_named_pipe_handle_state->Call($wh, $lpmode, +0, +0);
698              
699 0         0 return $ret;
700             }
701              
702             sub read_blocking {
703 67     67 0 147 my $self = shift;
704 67 50       476 my $rh = $self->{+RH} or croak "Not a reader";
705              
706 67 50       525 ($self->{+READ_BLOCKING}) = @_ if @_;
707              
708 67         108 unless (IS_WIN32) {
709 67         1025 $rh->blocking(@_);
710             }
711              
712 67         200 return $self->{+READ_BLOCKING};
713             }
714              
715             sub write_blocking {
716 21     21 0 33 my $self = shift;
717 21 50       153 my $wh = $self->{+WH} or croak "Not a writer";
718              
719 21 50       47 return $self->{+WRITE_BLOCKING} unless @_;
720              
721 21         48 my ($val) = @_;
722 21         38 $self->{+WRITE_BLOCKING} = $val;
723              
724 21         22 if (IS_WIN32) {
725             $self->_win32_set_pipe_state(@_) if @_;
726             }
727             else {
728 21         241 my $flags = fcntl($wh, &Fcntl::F_GETFL, 0); # Get the current flags
729 21 50       44 die $! unless defined $flags;
730 21 100       270 if ($val) { $flags &= ~&Fcntl::O_NONBLOCK } # Clear O_NONBLOCK
  5         104  
731 16         47 else { $flags |= &Fcntl::O_NONBLOCK } # Set O_NONBLOCK
732 21 50       185 fcntl($wh, &Fcntl::F_SETFL, $flags) || die $!;
733             }
734              
735 21         62 return $self->{+WRITE_BLOCKING};
736             }
737              
738             sub blocking {
739 36     36 1 5234494 my $self = shift;
740              
741 36 100 66     1006 if ($self->{+RH} && !$self->{+WH}) {
    50 33        
742 15         394 return $self->read_blocking(@_);
743             }
744             elsif ($self->{+WH} && !$self->{+RH}) {
745 21         67 return $self->write_blocking(@_);
746             }
747              
748 0         0 my $r = $self->read_blocking(@_);
749 0         0 my $w = $self->write_blocking(@_);
750              
751 0 0 0     0 return 1 if $r && $w;
752 0 0 0     0 return 0 if !$r && !$w;
753 0         0 return undef;
754             }
755              
756             sub size {
757 2     2 1 5564 my $self = shift;
758 2 50       6 return unless defined &Fcntl::F_GETPIPE_SZ;
759 2   33     13 my $fh = $self->{+WH} // $self->{+RH};
760 2         25 fcntl($fh, Fcntl::F_GETPIPE_SZ(), 0);
761             }
762              
763             sub resize {
764 10     10 1 25514 my $self = shift;
765 10         344 my ($size) = @_;
766              
767 10 50       162 return unless defined &Fcntl::F_SETPIPE_SZ;
768 10   66     169 my $fh = $self->{+WH} // $self->{+RH};
769              
770             # Force numeric: fcntl(F_SETPIPE_SZ, $string) is interpreted as
771             # buffer-mode and silently fails with EINVAL.
772 10         138 fcntl($fh, Fcntl::F_SETPIPE_SZ(), $size + 0);
773             }
774              
775             my $ONE_MB = 1 * 1024 * 1024;
776              
777             sub max_size {
778 3 50   3 1 588 return $ONE_MB unless -e '/proc/sys/fs/pipe-max-size';
779              
780 3 50       67 open(my $max, '<', '/proc/sys/fs/pipe-max-size') or return $ONE_MB;
781 3         59 chomp(my $val = <$max>);
782 3         18 close($max);
783             # Force numeric. <$max> returns a string; passing it to
784             # fcntl(F_SETPIPE_SZ) directly triggers the same EINVAL bug
785             # resize() guards against. Numify here so any caller of
786             # max_size() that hands the result to fcntl gets an int.
787 3   33     22 return ($val + 0) || $ONE_MB;
788             }
789              
790             sub resize_or_max {
791 1     1 1 7 my $self = shift;
792 1         3 my ($size) = @_;
793 1         3 $size = min($size, $self->max_size);
794 1         4 $self->resize($size);
795             }
796              
797             sub is_reader {
798 8     8 1 1277 my $self = shift;
799 8 50 33     54 return 1 if $self->{+RH} && !$self->{+WH};
800 0         0 return undef;
801             }
802              
803             sub is_writer {
804 8     8 1 13 my $self = shift;
805 8 50 33     57 return 1 if $self->{+WH} && !$self->{+RH};
806 0         0 return undef;
807             }
808              
809             sub clone_writer {
810 6     6 1 17 my $self = shift;
811 6         8 my $class = blessed($self);
812 6 50       53 open(my $fh, '>&:raw', $self->{+WH}) or die "Could not clone filehandle: $!";
813 6         15 return bless({WH() => $fh}, $class);
814             }
815              
816             sub clone_reader {
817 0     0 1 0 my $self = shift;
818 0         0 my $class = blessed($self);
819 0 0       0 open(my $fh, '<&:raw', $self->{+RH}) or die "Could not clone filehandle: $!";
820 0         0 return bless({RH() => $fh}, $class);
821             }
822              
823             sub writer {
824 0     0 1 0 my $self = shift;
825              
826 0 0       0 croak "pipe was set to reader, cannot set to writer" unless $self->{+WH};
827              
828 0 0       0 return 1 unless $self->{+RH};
829              
830 0         0 close(delete $self->{+RH});
831 0         0 return 1;
832             }
833              
834             sub reader {
835 2     2 1 9 my $self = shift;
836              
837 2 50       4 croak "pipe was set to writer, cannot set to reader" unless $self->{+RH};
838              
839 2 50       5 return 1 unless $self->{+WH};
840              
841 2         8 close(delete $self->{+WH});
842 2         7 return 1;
843             }
844              
845             sub close {
846 40     40 1 31268 my $self = shift;
847 40 100       1019 close(delete $self->{+WH}) if $self->{+WH};
848 40 100       1066 close(delete $self->{+RH}) if $self->{+RH};
849 40         104 return;
850             }
851              
852             my $psize = 16; # 32bit pid, 32bit tid, 32 bit size, 32 bit int part id;
853             my $dsize = PIPE_BUF - $psize;
854              
855             sub delimiter_size {
856 100 50   100 0 3139 return $_[0]->{+DELIMITER_SIZE} if defined $_[0]->{+DELIMITER_SIZE};
857 100   100     2962 return $_[0]->{+DELIMITER_SIZE} //= bytes::length($_[0]->{+BURST_PREFIX} // '') + bytes::length($_[0]->{+BURST_POSTFIX} // '');
      100        
      66        
858             }
859              
860             sub fits_in_burst {
861 12     12 1 70 my $self = shift;
862 12         20 my ($data) = @_;
863              
864 12 100       29 $data = $self->_compress($data) if $self->{+COMPRESSION};
865              
866 12   66     51 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
867 12 100       133 return undef unless $size <= PIPE_BUF;
868              
869 10         25 return $size;
870             }
871              
872             sub write_burst {
873 14     14 1 6398 my $self = shift;
874 14         91 my ($data) = @_;
875              
876 14 100       145 $data = $self->_compress($data) if $self->{+COMPRESSION};
877              
878             # Intentionally not delegating to fits_in_burst() — that would compress twice.
879 14   100     157 my $size = bytes::length($data) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
880 14 100       160 return undef unless $size <= PIPE_BUF;
881              
882 12   100     26 push @{$self->{+OUT_BUFFER} //= []} => [$data, $size];
  12         181  
883 12         48 $self->flush();
884              
885 12         37 return 1;
886             }
887              
888             sub DESTROY {
889 347     347   20843955 my $self = shift;
890 347 100       2174 return if $self->{+HIT_EPIPE};
891 345 100       1386 $self->flush(blocking => 1) if $self->pending_output;
892             }
893              
894             sub pending_output {
895 349     349 1 2348 my $self = shift;
896 349 100       31414 my $buffer = $self->{+OUT_BUFFER} or return 0;
897 89 100       19376 return 0 unless @$buffer;
898 3         23 return 1;
899             }
900              
901             sub flush {
902 200491     200491 1 305009 my $self = shift;
903 200491         385945 my %params = @_;
904 200491   100     1003277 my $blocking = $params{blocking} // $self->{+WRITE_BLOCKING} // 1;
      100        
905              
906 200491   50     463745 my $buffer = $self->{+OUT_BUFFER} // return;
907              
908 200491         429408 while (@$buffer) {
909 800611         1274906 my $set = $buffer->[0];
910 800611         1756117 my $got = $self->_write_burst(@$set);
911              
912 800607 100 100     1810269 return unless $blocking || defined $got;
913 800233 100       1494070 next unless defined $got;
914              
915 800230         2680172 shift @$buffer;
916             }
917              
918 200113         481853 return;
919             }
920              
921             sub _write_burst {
922 800609     800609   1150852 my $self = shift;
923 800609         1504959 my ($data, $size) = @_;
924              
925 800609 50       1869710 my $wh = $self->{+WH} or croak "Cannot call write on a pipe reader";
926              
927 800609 100       1586692 croak "Disconnected pipe" if $self->{+HIT_EPIPE};
928              
929 800607   100     2254661 my $prefix = $self->{+BURST_PREFIX} // '';
930 800607   100     2052152 my $postfix = $self->{+BURST_POSTFIX} // '';
931              
932 800607 100 66     2491111 $data = "${prefix}${data}${postfix}" if length($prefix) || length($postfix);
933              
934 800607         1064447 my $wrote;
935 800607         1037875 my $loop = 0;
936             SWRITE: {
937 800607         1063339 $wrote = syswrite($wh, $data, $size);
  800607         1029510988  
938 800607 100 100     4488161 if ($! == EPIPE || (IS_WIN32 && $! == 22)) {
939 2         20 $self->{+HIT_EPIPE} = 1;
940 2         3 delete $self->{+OUT_BUFFER};
941 2         254 croak "Disconnected pipe";
942             }
943 800605 100 100     2693384 return undef if $! == EAGAIN || (IS_WIN32 && $! == 28); # NON-BLOCKING
944 800230 50 33     3532594 redo SWRITE if !$wrote || $RETRY_ERRNO{0 + $!};
945 800230 50       1904923 last SWRITE if $wrote == $size;
946 0   0     0 $wrote //= "";
947 0         0 die "$wrote vs $size: $!";
948             }
949              
950 800230         2597279 return $wrote;
951             }
952              
953             sub _adjusted_dsize {
954 90     90   428 my $self = shift;
955              
956 90 50       1510 return $self->{+ADJUSTED_DSIZE} if defined $self->{+ADJUSTED_DSIZE};
957              
958 90   100     4595 my $message_key = $self->{+MESSAGE_KEY} // '';
959 90   100     2542 my $prefix = $self->{+BURST_PREFIX} // '';
960 90   100     1947 my $postfix = $self->{+BURST_POSTFIX} // '';
961              
962 90         3153 my $fix_size = bytes::length($prefix) + bytes::length($postfix) + bytes::length($message_key);
963 90         2234 return $self->{+ADJUSTED_DSIZE} = $dsize - $fix_size;
964             }
965              
966             sub write_message {
967 200120     200120 1 5346367 my $self = shift;
968 200120         419357 my ($data) = @_;
969              
970 200120 100       534479 $data = $self->_compress($data) if $self->{+COMPRESSION};
971              
972 200120         309177 my $tid = _get_tid();
973 200120   100     687928 my $message_key = $self->{+MESSAGE_KEY} // '';
974 200120   66     502722 my $adjusted_dsize = $self->{+ADJUSTED_DSIZE} // $self->_adjusted_dsize;
975 200120         657238 my $dtotal = bytes::length($data);
976              
977 200120         1008836 my $parts = int($dtotal / $adjusted_dsize);
978 200120 50       534730 $parts++ if $dtotal % $adjusted_dsize;
979              
980 200120         350119 my $id = $parts - 1;
981              
982             # Unwinding the loop for a 1-part message for micro-optimization
983 200120 100       470994 if ($parts == 1) {
984 73         150 my $bytes = $data;
985 73         113 my $size = $dtotal;
986 73         969 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
987              
988 73 100 66     815 my $out_size = $dtotal + ($self->{+DELIMITER_SIZE} // $self->delimiter_size) + $psize + ($message_key ? 1 : 0);
989              
990 73   100     906 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  73         714  
991             }
992             else {
993 200047         529041 for (my $part = 0; $part < $parts; $part++) {
994 800149         1700941 my $bytes = bytes::substr($data, $part * $adjusted_dsize, $adjusted_dsize);
995 800149         5720861 my $size = bytes::length($bytes);
996              
997 800149         5674438 my $out = $message_key . pack("l2L2", $$, $tid, $id--, $size) . $bytes;
998              
999 800149   100     1445380 my $out_size = bytes::length($out) + ($self->{+DELIMITER_SIZE} // $self->delimiter_size);
1000 800149   100     2956834 push @{$self->{+OUT_BUFFER} //= []} => [$out, $out_size];
  800149         3765563  
1001             }
1002             }
1003              
1004 200120         660281 $self->flush();
1005 200120         890344 return $parts;
1006             }
1007              
1008             sub read_message {
1009 700173     700173 1 601925420 my $self = shift;
1010 700173         1145865 my %params = @_;
1011              
1012 700173         1498751 my ($id, $out) = $self->_extract_message(%params);
1013              
1014 700173 100       1284385 return unless defined $id;
1015              
1016 700169 100       2411155 return $out unless $self->{+COMPRESSION};
1017              
1018 29 100       60 if ($params{debug}) {
1019 4         6 my $compressed = $out->{message};
1020 4         8 $out->{message} = $self->_decompress($compressed);
1021 4 100       12 $out->{compressed} = $compressed if $self->{+KEEP_COMPRESSED};
1022 4         8 return $out;
1023             }
1024              
1025 25         28 my $compressed = $out;
1026 25         42 my $decompressed = $self->_decompress($compressed);
1027              
1028 23 100 100     68 return ($decompressed, $compressed) if $self->{+KEEP_COMPRESSED} && wantarray;
1029 21         143 return $decompressed;
1030             }
1031              
1032             sub _extract_message {
1033 700213     700213   760429 my $self = shift;
1034 700213         848805 my %params = @_;
1035              
1036 700213   100     1591232 my $state = $self->{+STATE} //= {};
1037              
1038 700213         773263 while (1) {
1039 2800347 50       4307630 unless ($state->{key}) {
1040 2800347 100       4276423 my $key_bytes = $self->_get_from_buffer($psize) or return;
1041              
1042 2800343         3378848 my %key;
1043 2800343         7778875 @key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
1044 2800343         4230424 $state->{key} = \%key;
1045             }
1046              
1047 2800343         3160455 my $key = $state->{key};
1048              
1049 2800343   50     3967792 my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
1050              
1051 2800343         3352852 my $id = $key->{id};
1052 2800343         2961566 my $tag = join ':' => @{$key}{qw/pid tid/};
  2800343         5747596  
1053 2800343   100     3057716 push @{$state->{parts}->{$tag} //= []} => $id;
  2800343         6577233  
1054 2800343 100       14496480 $state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
1055              
1056 2800343         3567623 delete $state->{key};
1057              
1058 2800343 100       4141958 unless ($id == 0) {
1059 2100160 100       2904889 return ($id, undef) if $params{one_part_only};
1060 2100134         4150696 next;
1061             }
1062              
1063 700183         1737014 my $message = delete $state->{buffers}->{$tag};
1064 700183         1052549 my $parts = delete $state->{parts}->{$tag};
1065              
1066 700183 100       4020471 return ($id, $message) unless $params{debug};
1067              
1068             return (
1069             $id,
1070             {
1071             message => $message,
1072             parts => $parts,
1073             pid => $key->{pid},
1074             tid => $key->{tid},
1075             },
1076 13         408 );
1077             }
1078             }
1079              
1080             1;
1081              
1082             __END__