File Coverage

blib/lib/Net/IMP/Cascade.pm
Criterion Covered Total %
statement 290 426 68.0
branch 150 306 49.0
condition 63 99 63.6
subroutine 24 29 82.7
pod 4 4 100.0
total 531 864 61.4


line stmt bran cond sub pod time code
1 4     4   2927 use strict;
  4         17  
  4         115  
2 4     4   20 use warnings;
  4         7  
  4         148  
3 4     4   2076 no if $] >= 5.017011, warnings => 'experimental::smartmatch';
  4         42  
  4         27  
4              
5             ############################################################################
6             # BEWARE! complex stuff!
7             # to aid with debugging problems:
8             # - switch on debug mode
9             # - to see whats going on in direction dir part p:
10             # grep for '[dir][p]'
11             # - to see whats transfering out of direction dir part p into next part/up:
12             # grep for '[dir][p>'
13             #
14             # basic design
15             # - we do not have lots of member variables, instead we put everything into
16             # new_analyzer as normal variables and declare various $sub = sub ... which
17             # use these variables. Thus the variables get bound once to the sub and we
18             # don't need to access it with $self->{field}... or so all the time
19             # - subs and data structures are described in new_analyzer, the most important
20             # are
21             # - $global_in - this is the sub data method
22             # - $part_in - called from global_in, itself and callbacks to put data
23             # into a specific part. Feeds the data in the associated analyzer
24             # - $imp_callback - callback for the analyzer of a specific part
25              
26             ############################################################################
27              
28             package Net::IMP::Cascade;
29 4     4   409 use base 'Net::IMP::Base';
  4         8  
  4         1590  
30             use fields (
31 4         22 'parts', # analyzer objects
32             # we do everything with closures inside new_analyzer here, so that the
33             # object has only fields for accessing some closures from subs
34             'dataf', # called from sub data
35             'closef', # called from DESTROY
36 4     4   37 );
  4         8  
37              
38 4     4   296 use Net::IMP; # constants
  4         8  
  4         383  
39 4     4   26 use Carp 'croak';
  4         7  
  4         198  
40 4     4   26 use Scalar::Util 'weaken';
  4         7  
  4         221  
41 4     4   1960 use Hash::Util qw(lock_ref_keys);
  4         8919  
  4         29  
42 4     4   368 use Net::IMP::Debug;
  4         9  
  4         29  
43 4     4   2141 use Data::Dumper;
  4         21378  
  4         22175  
44              
45             my %rtypes_implemented_myself = map { $_ => 1 } (
46             IMP_PASS,
47             IMP_PREPASS,
48             IMP_REPLACE,
49             IMP_REPLACE_LATER,
50             IMP_DENY,
51             IMP_DROP,
52             #IMP_TOSENDER, # not supported yet
53             IMP_LOG,
54             IMP_ACCTFIELD,
55             IMP_FATAL,
56             );
57              
58             sub get_interface {
59 0     0 1 0 my Net::IMP::Cascade $factory = shift;
60 0         0 my $parts = $factory->{factory_args}{parts};
61              
62             # collect interfaces by part
63 0         0 my @if4part;
64 0         0 for my $p ( @$parts ) {
65 0         0 my @if;
66 0         0 for my $if ( $p->get_interface(@_)) {
67             # $if should require only return types I support
68             push @if,$if
69 0 0       0 if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
  0         0  
  0         0  
70             }
71 0 0       0 @if or return; # nothing in common
72 0         0 push @if4part,\@if
73             }
74              
75             # find interfaces which are supported by all parts
76 0         0 my @common;
77 0         0 for( my $i=0;$i<@if4part;$i++ ) {
78 0         0 for my $if_i ( @{ $if4part[$i] } ) {
  0         0  
79 0         0 my ($in_i,$out_i) = @$if_i;
80             # check if $if_i matches at least on interface description in
81             # all other parts, e.g. $if_i is same or included in $if_k
82             # - data type/proto: $in_k should be undef or same as $in_i
83             # - return types: $out_i should include $out_k
84 0         0 for( my $k=0;$k<@if4part;$k++ ) {
85 0 0       0 next if $i == $k; # same
86 0         0 for my $if_k ( @{ $if4part[$k] } ) {
  0         0  
87 0         0 my ($in_k,$out_k) = @$if_k;
88             # should be same data type or $in_k undef
89 0 0 0     0 next if $in_k and ( ! $in_i or $in_k != $in_i );
      0        
90             # $out_i should include all of $out_k
91 0         0 my %out_k = map { $_ => 1 } @$out_k;
  0         0  
92 0         0 delete @out_k{ @$out_i };
93 0 0       0 next if %out_k; # some in k are not in i
94              
95             # junction if i and k
96 0         0 push @common,[ $in_k,$out_i ];
97             }
98             }
99             }
100             }
101              
102             # remove duplicates from match
103 0         0 my (@uniq,%m);
104 0         0 for( @common ) {
105 0   0     0 my $key = ( $_->[0] // '' )."\0".join("\0",sort @{$_->[1]});
  0         0  
106 0 0       0 push @uniq,$_ if ! $m{$key}++;
107             }
108 0         0 return @uniq;
109             }
110              
111             sub set_interface {
112 0     0 1 0 my Net::IMP::Cascade $factory = shift;
113 0         0 my @if = @_;
114 0         0 my $parts = $factory->{factory_args}{parts};
115              
116 0         0 my @new_parts;
117 0         0 for(my $i=0;$i<@$parts;$i++) {
118 0 0       0 my $np = $parts->[$i]->set_interface(@if)
119             or return; # cannot use interface
120 0 0       0 $np == $parts->[$i] and next; # no change of part
121 0         0 $new_parts[$i] = $np; # got new factory for part
122             }
123              
124 0 0       0 return $factory if ! @new_parts; # interface supported by original factory
125              
126             # some parts changed, create new factory for this cascade
127 0         0 for(my $i=0;$i<@$parts;$i++) {
128 0   0     0 $new_parts[$i] ||= $parts->[$i]; # copy parts which did not change
129             }
130              
131 0         0 return ref($factory)->new_factory( parts => \@new_parts );
132             }
133              
134             sub new_analyzer {
135 11     11 1 79 my ($factory,%args) = @_;
136              
137 11         34 my $p = $factory->{factory_args}{parts};
138 11         65 my $self = $factory->SUPER::new_analyzer(%args);
139 11         28 my @imp = map { $_->new_analyzer(%args) } @$p;
  21         76  
140              
141             # $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
142             # if part is optimized away due to IMP_PASS with IMP_MAXOFFSET
143             # $parts[$dir][$pi] contains instead an integer for adjustments
144             # from this part
145 11         38 my @parts;
146              
147             # pause/continue handling
148             # maintains pause status per part
149             my @pause;
150              
151             # to make sure we don't leak due to cross-references
152 11         50 weaken( my $wself = $self );
153              
154             my $new_buf = sub {
155 160     160   1192 lock_ref_keys( my $buf = {
156             start => 0, # start of buf relativ to part
157             end => 0, # end of buf relativ to part
158             data => '', # data or undef for replace_later
159             dtype => 0, # data type
160             rtype => IMP_PASS, # IMP_PASS < IMP_PREPASS < IMP_REPLACE
161             gap => 0, # size of gap before buf?
162             gstart => 0, # start of buf relativ to cascade
163             gend => 0, # end of buf relativ to cascade
164             eof => 0 # flag if last buf in this direction
165             });
166 160 100       2549 %$buf = ( %$buf, @_ ) if @_;
167 160         613 return $buf;
168 11         75 };
169              
170             my $new_part = sub {
171 42     42   89 lock_ref_keys( my $p = {
172             ibuf => [ &$new_buf ], # buffers, at least one
173             pass => 0, # can pass up to ..
174             prepass => 0, # can prepass up to ..
175             replace_later => 0, # will replace_later up to ..
176             adjust => 0, # total local adjustments from forwarded bufs
177             });
178 42         410 return $p;
179 11         55 };
180              
181             # initialize @parts
182 11         49 for( my $i=0;$i<@imp;$i++ ) {
183 21         45 $parts[0][$i] = $new_part->(); # client -> server, flow 0>1>2>..
184 21         48 $parts[1][$#imp-$i] = $new_part->(); # server -> client, flow 9>8>7>..
185             }
186              
187             my $dump_bufs = sub {
188 0     0   0 my $bufs = shift;
189 0         0 my @out;
190 0 0       0 for my $i (@_ ? @_: 0..$#$bufs) {
191 0         0 my $buf = $bufs->[$i];
192 0 0       0 my $str = ! defined( $buf->{data} ) ? '' : do {
193 0         0 local $_ = $buf->{data};
194 0 0       0 $_ = substr($_,0,27).'...' if length($_)>30;
195 0         0 s{([\\\n\r\t[:^print:]])}{ sprintf("\\%03o",ord($1)) }esg;
  0         0  
196 0         0 $_
197             };
198             push @out, sprintf("#%02d %d..%d%s%s%s %s %s [%d,%d] '%s'",
199             $i,
200             $buf->{start},$buf->{end}, $buf->{eof} ? '$':'',
201             $buf->{gap} ? " +$buf->{gap}":"",
202             defined($buf->{data}) ? '':' RL',
203             $buf->{dtype},$buf->{rtype},
204             $buf->{gstart},$buf->{gend},
205 0 0       0 $str
    0          
    0          
206             );
207             }
208 0         0 return join("\n",@out);
209 11         85 };
210             my $dump_parts = sub {
211 0     0   0 my $dir = shift;
212 0         0 my $out = '';
213 0 0       0 for my $pi (@_ ? @_ : 0..$#imp) {
214 0         0 my $part = $parts[$dir][$pi];
215 0 0       0 if ( ! $part ) {
216 0         0 $out .= "part[$dir][$pi] - skip\n";
217 0         0 next;
218             }
219             $out .= sprintf("part[%d][%d] p|pp|rl=%d|%d|%d ibuf:\n",
220 0         0 $dir,$pi,$part->{pass},$part->{prepass},$part->{replace_later});
221 0         0 my $ib = $part->{ibuf};
222 0         0 $out .= $dump_bufs->( $part->{ibuf});
223             }
224 0         0 return $out;
225 11         87 };
226              
227             my $split_buf = sub {
228 58     58   118 my ($ibuf,$i,$fwd) = @_;
229 58         106 my $buf = $ibuf->[$i];
230 58 50       125 die "no split for packet types" if $buf->{dtype}>0;
231              
232             my $buf_before = $new_buf->(
233             %$buf,
234             eof => 0,
235             end => $buf->{start} + $fwd, # adjust end
236             defined($buf->{data})
237 58 50       374 ? ( data => substr($buf->{data},0,$fwd,'') ) # real data
238             : (), # replacement promise
239             );
240             # gap in buf_before
241 58         131 $buf->{gap} = 0;
242 58         99 $buf->{start} = $buf_before->{end};
243              
244             # if buf was not changed gend..gstart should reflect the
245             # original length of the data
246 58 100       125 if ( $buf->{rtype} != IMP_REPLACE ) {
247 54         102 $buf_before->{gend} = ( $buf->{gstart} += $fwd );
248             } else {
249             # split gstart..gend into full|0 per convention
250 4         9 $buf->{gstart} = $buf->{gend};
251             }
252              
253             # put buf_before before buf in ibuf
254 58         156 splice(@$ibuf,$i,0,$buf_before);
255 11         70 };
256              
257 11         92 my $fwd_collect; # collect bufs which can be forwarded
258             my $fwd_up; # collect what can be passed up
259 11         0 my $exec_fwd; # do the collected forwarding to next part or up
260              
261 11         0 my $global_in; # function where data gets fed into from outside (sub data)
262 11         0 my $part_in; # internal feed into each part
263              
264 11         0 my $imp_callback; # synchronization wrapper around callback for analyzers
265 11         0 my $_imp_callback; # real callback for the analyzers
266              
267             # pass passable bufs in part starting with ibuf[i]
268             # returns all bufs which can be passed and strips them from part.ibuf
269             $fwd_collect = sub {
270 148     148   362 my ($dir,$pi,$i,$r_passed) = @_;
271 148         227 my $part = $parts[$dir][$pi];
272 148         209 my $ibuf = $part->{ibuf};
273 148 50       271 $DEBUG && debug(
274             "fwd_collect[$dir][$pi]: p=$part->{pass} pp=$part->{prepass} "
275             .$dump_bufs->($ibuf));
276 148         190 my @fwd;
277 148         241 for my $pp (qw(pass prepass)) {
278 296 100       663 my $pass = $part->{$pp} or next;
279 71         179 for( ;$i<@$ibuf;$i++ ) {
280 174         305 my $buf = $ibuf->[$i];
281 174 50       333 last if ! $buf->{dtype}; # dummy buf
282 174 100 100     520 if ( $pass != IMP_MAXOFFSET and $buf->{start} >= $pass ) {
283 43 50       83 $DEBUG && debug(
284             "fwd_collect[$dir][$pi]: reset $pp due to start[$i]($buf->{start})>=$pp($pass)");
285 43         385 $part->{$pp} = 0;
286 43         87 last;
287             }
288             die "cannot pass bufs with replace_later"
289 131 50       241 if ! defined $buf->{data};
290 131 100 100     354 if ( $pass == IMP_MAXOFFSET or $buf->{end} <= $pass ) {
291             # whole buffer can be passed
292 95 50       167 $DEBUG && debug(
293             "fwd_collect[$dir][$pi]: pass whole buffer[$i] $buf->{start}..$buf->{end}");
294             $buf->{rtype} = IMP_PREPASS if $pp eq 'prepass'
295 95 100 100     208 and $buf->{rtype} == IMP_PASS;
296 95         187 push @fwd,[ $pi,$dir,$buf ];
297              
298             # r_passed is set from part_in to track position if data
299             # are passed. In case of prepass we don't pass data but
300             # only put them into fwd
301 95 100 66     206 next if $r_passed && $pp eq 'prepass';
302              
303             # track what got passed for part_in
304 91 50       157 $$r_passed = $buf->{end} if $r_passed;
305              
306             # remove passed data from ibuf, if ! r_passed also prepassed
307             # data (called from imp_callback)
308 91         129 shift(@$ibuf);
309 91         128 $i--;
310              
311 91 100       236 if ( ! @$ibuf ) {
312 24 100 66     78 if ( $part->{pass} == IMP_MAXOFFSET || $buf->{eof} ) {
313             # part done, skip it in the future
314 22         65 push @fwd,[$pi,$dir,undef]; # buf = undef is special
315             }
316             # insert dummy
317             @$ibuf = $new_buf->(
318             start => $buf->{end},
319             end => $buf->{end},
320             gstart => $buf->{gend},
321             gend => $buf->{gend},
322             # keep type for streaming data
323 24 50       98 $buf->{dtype} < 0 ? ( dtype => $buf->{dtype} ):(),
324             );
325 24         51 last;
326             }
327              
328             } else {
329             # only part of buffer can be passed
330             # split buffer and re-enter loop, this will foreward the
331             # first part and keep the later part
332 36 50       69 $DEBUG && debug(
333             "fwd_collect[$dir][$pi]: need to split buffer[$i]: $buf->{start}..$pass..$buf->{end}");
334 36         106 $split_buf->($ibuf,$i,$pass - $buf->{start});
335 36         69 redo; # don't increase $i!
336             }
337             }
338             }
339 148         430 return @fwd;
340 11         156 };
341              
342             $fwd_up = sub {
343 71     71   134 my ($dir,$buf) = @_;
344 71 100 66     228 if ( $buf->{gstart} == $buf->{gend} && ! $buf->{gap}
      100        
345             && $buf->{rtype} ~~ [ IMP_PASS, IMP_PREPASS ]) {
346             # don't repeat last (pre)pass because of empty buffer
347 3         12 return;
348             }
349              
350             return [
351             $buf->{rtype},
352             $dir,
353             $buf->{gend},
354 68 100       275 ($buf->{rtype} == IMP_REPLACE) ? ( $buf->{data} ):()
355             ];
356 11         85 };
357              
358             $exec_fwd = sub {
359 70     70   140 my @fwd = @_;
360 70 100       222 if (@fwd>1) {
361             $DEBUG && debug("trying to merge\n".join("\n", map {
362 50 0       99 ! defined $_->[0]
  0 0       0  
    50          
363             ? ""
364             : "fwd[$_->[1]][$_->[0]] " .
365             ( $_->[2] ? $dump_bufs->([$_->[2]]) : '')
366             } @fwd));
367             # try to compress
368 50         77 my ($lpi,$ldir,$lbuf);
369 50         121 for( my $i=0;$i<@fwd;$i++ ) {
370 146 100 66     521 if ( ! defined $fwd[$i][0] || ! defined $fwd[$i][2]) {
371 22         34 $lpi = undef;
372 22         55 next;
373             }
374 124 50 66     439 if ( ! defined $lpi
      66        
375             or $lpi != $fwd[$i][0]
376             or $ldir != $fwd[$i][1] ) {
377 50         72 ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
  50         109  
378 50         121 next;
379             }
380              
381 74         107 my $buf = $fwd[$i][2];
382              
383 74 100 33     484 if ( not $buf->{gap}
      66        
      100        
      66        
384             and $buf->{dtype} < 0
385             and $buf->{start} == $lbuf->{end}
386             and $buf->{rtype} == $lbuf->{rtype}
387             and $buf->{dtype} == $lbuf->{dtype}
388             ) {
389 28 100       69 if ( $buf->{rtype} == IMP_REPLACE ) {
390 17 100 100     95 if ( $lbuf->{gend} == $buf->{gend} ) {
    100          
391             # same global end, merge data
392 2         6 $lbuf->{data} .= $buf->{data};
393             } elsif ( $buf->{data} ne '' or $lbuf->{data} ne '' ) {
394             # either one not empty, no merge
395 11         27 next;
396             }
397             } else {
398             # unchanged, append
399 11         26 $lbuf->{data} .= $buf->{data};
400             }
401 17 50       47 $DEBUG && debug("merge bufs ".$dump_bufs->([$lbuf,$buf]));
402 17         45 $lbuf->{gend} = $buf->{gend};
403 17         31 $lbuf->{end} = $buf->{end};
404 17         36 $lbuf->{eof} = $buf->{eof};
405 17         50 splice(@fwd,$i,1,());
406 17         25 $i--;
407 17         38 next;
408              
409             } else {
410 46         73 ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
  46         91  
411 46         101 next;
412             }
413             }
414             }
415 70         178 while ( my $fwd = shift(@fwd)) {
416 171         570 my $npi = my $pi = shift(@$fwd);
417 171 50       313 if ( ! defined $npi ) {
418             # propagate prepared IMP callback
419 0         0 $wself->run_callback($fwd);
420 0         0 next;
421             }
422              
423 171         279 my ($dir,$buf) = @$fwd;
424              
425 171 100       284 if ( $buf ) {
426 127         156 my $np;
427 127         178 my $adjust = 0;
428 127         157 while (1) {
429 127 100       210 $npi += $dir?-1:+1;
430 127 100 100     391 last if $npi<0 or $npi>=@imp;
431 58 50       173 last if ref( $np = $parts[$dir][$npi] );
432 0         0 $adjust += $np;
433 0 0       0 $DEBUG && debug("skipping pi=$npi");
434             }
435              
436 127 100       291 if ( $buf->{eof} ) {
437             # add pass infinite to fwd to propagate eof
438 22         124 push @fwd,[ $pi,$dir,undef ];
439             }
440 127 100       217 if ( $np ) {
441             # feed into next part
442 58         169 my $nib = $np->{ibuf};
443             # adjust start,end based on end of npi and gap
444 58         145 $buf->{start} = $nib->[-1]{end} + $buf->{gap} + $adjust;
445 58         108 $buf->{end} = $buf->{start} + length($buf->{data});
446 58 50       103 $DEBUG && debug(
447             "fwd_next[$dir][$pi>$npi] ".$dump_bufs->([$buf]));
448 58         114 $part_in->($npi,$dir,$buf);
449             } else {
450             # output from cascade
451 69 100       141 my $cb = $fwd_up->($dir,$buf) or next;
452 68 50       143 $DEBUG && debug(
453             "fwd_up[$dir][$pi>>] ".$dump_bufs->([$buf]));
454 68         197 $wself->run_callback($cb);
455             }
456              
457             # special - part is done with IMP_PASS IMP_MAXOFFSET
458             } else {
459             # skip if we had a pass infinite already
460 44 100       216 next if ! ref $parts[$dir][$pi];
461              
462 24         74 $parts[$dir][$pi] = $parts[$dir][$pi]->{adjust};
463 24 100       39 if ( grep { ref($_) } @{ $parts[$dir] } ) {
  54         139  
  24         64  
464             # we have other unfinished parts, skip only this part
465 12 50       67 $DEBUG && debug(
466             "part[$dir][$pi>$npi] will be skipped in future, adjust=$parts[$dir][$pi]");
467             } else {
468             # everything can be skipped
469 12 50       35 $DEBUG && debug(
470             "part[$dir][$pi>>] all parts will be skipped in future");
471             # pass rest
472 12         56 $wself->run_callback([ IMP_PASS,$dir,IMP_MAXOFFSET ]);
473             }
474             }
475             }
476 11         173 };
477              
478             # the data function
479             # called from sub data on new data and from $process when data are finished
480             # in on part and should be transferred into the next part
481             # $pi - index into parts
482             # $dir - direction (e.g. target part is $parts[$dir][$pi])
483             # $buf - new buffer from $new_buf->() which might be merged with existing
484             $part_in = sub {
485 87     87   168 my ($pi,$dir,$buf) = @_;
486 87 50       166 $DEBUG && debug( "part_in[$dir][$pi]: ".$dump_bufs->([$buf]));
487              
488 87         140 my $part = $parts[$dir][$pi];
489 87         144 my $ibuf = $part->{ibuf};
490 87         120 my $lbuf = $ibuf->[-1];
491 87         132 my $lend = $lbuf->{end};
492              
493             # some sanity checks
494 87         120 if(1) {
495             die "data after eof [$dir][$pi] ".$dump_bufs->([$lbuf,$buf])
496 87 50       158 if $lbuf->{eof};
497 87 50       231 if ( $buf->{start} != $lend ) {
    50          
498 0 0       0 if ( $buf->{start} < $lend ) {
    0          
499 0         0 die "overlapping data off($buf->{start})
500             } elsif ( ! $buf->{gap} ) {
501 0         0 die "gap should be set because off($buf->{start})>last.end($lend) in part[$dir][$pi]"
502             }
503             } elsif ( $buf->{gap} ) {
504 0         0 die "gap specified even if off($buf->{start}) == last.end"
505             }
506 87 50       181 $part->{pass} == IMP_MAXOFFSET and die
507             "pass infinite should have been optimized by removing part[$dir][$pi]";
508             }
509              
510             # add data to buf
511 87 100 100     547 if ( $lbuf->{data} eq '' and $lbuf->{rtype} == IMP_PASS ) {
    100 66        
      100        
      66        
      66        
      66        
512             # empty dummy buffer
513 26 50       61 $DEBUG && debug("part_in[$dir][$pi]: replace dummy buffer");
514 26 50       78 @$ibuf == 1 or die "empty dummy buffer should only be at beginning";
515 26         56 @$ibuf = $buf;
516              
517             } elsif ( ! $buf->{gap}
518             and $buf->{data} eq ''
519             and $buf->{rtype} == $lbuf->{rtype}
520             and $buf->{dtype} == $lbuf->{dtype}
521             and $buf->{dtype} < 0
522             and ! $buf->{eof}
523             ) {
524             # just update eof,[g]end of lbuf
525 6 50       17 $DEBUG && debug(
526             "part_in[$dir][$pi]: set lbuf end=$buf->{end} gend=$buf->{gend}");
527 6         10 $lbuf->{end} = $buf->{end};
528 6         13 $lbuf->{gend} = $buf->{gend};
529             # nothing to do with these empty data
530 6 50       13 $DEBUG && debug("part_in[$dir][$pi] nothing to do on empty buffer");
531 6         29 return;
532              
533             } else {
534             # add new buf
535 55 50       114 $DEBUG && debug("part_in[$dir][$pi]: add new buffer");
536 55         106 push @$ibuf,$buf;
537             }
538              
539             # determine what can be forwarded immediatly
540 81         203 my @fwd = $fwd_collect->($dir,$pi,$#$ibuf,\$lend);
541              
542 81 100       227 if ( $buf->{eof} ? $lend <= $buf->{end} : $lend < $buf->{end} ) {
    100          
543             # send new data to the analyzer
544 80         145 my $rl = $part->{replace_later};
545 80         500 for(@$ibuf) {
546 173 100       505 next if $_->{start} < $lend;
547             die "last_end should be on buffer boundary"
548 84 50       197 if $_->{start} > $lend;
549 84         127 $lend = $_->{end};
550             $DEBUG && debug(
551             "analyzer[$dir][$pi] << %d bytes %s \@%d%s -> last_end=%d",
552             $_->{end} - $_->{start},
553             $_->{dtype},
554 84 0       140 $_->{start},$_->{gap} ? "(+$_->{gap})":'',
    50          
555             $lend
556             );
557             $imp[$pi]->data($dir,
558             $_->{data},
559             $_->{gap} ? $_->{start}:0,
560             $_->{dtype}
561 84 50       367 );
562             $imp[$pi]->data($dir,'',0, $_->{dtype})
563 84 100 100     328 if $buf->{eof} && $_->{data} ne '';
564 84 50       221 $rl or next;
565 0 0 0     0 if ( $rl == IMP_MAXOFFSET or $rl>= $lend ) {
566 0         0 $buf->{data} = undef;
567             } else {
568 0         0 $rl = $part->{replace_later} = 0; # reset
569             }
570             }
571             } else {
572 1 50       34 $DEBUG && debug(
573             "nothing to analyze[$dir][$pi]: last_end($lend) < end($buf->{end})");
574             }
575              
576             # forward data which can be (pre)passed
577 81 100       354 $exec_fwd->(@fwd) if @fwd;
578 11         179 };
579              
580             $_imp_callback = sub {
581 75     75   119 my $pi = shift;
582              
583 75         105 my @fwd;
584 75         135 for my $rv (@_) {
585 107         176 my $rtype = shift(@$rv);
586              
587 107 50       677 if ( $rtype ~~ [ IMP_FATAL, IMP_DENY, IMP_DROP, IMP_ACCTFIELD ]) {
    50          
    50          
    50          
    100          
    50          
    0          
588 0 0       0 $DEBUG && debug("callback[.][$pi] $rtype @$rv");
589 0         0 $wself->run_callback([ $rtype, @$rv ]);
590              
591             } elsif ( $rtype == IMP_LOG ) {
592 0         0 my ($dir,$offset,$len,$level,$msg,@extmsg) = @$rv;
593 0 0       0 $DEBUG && debug(
594             "callback[$dir][$pi] $rtype '$msg' off=$offset len=$len lvl=$level");
595             # approximate offset to real position
596 0         0 my $newoff = 0;
597 0         0 my $part = $parts[$dir][$pi];
598 0         0 for ( @{$part->{ibuf}} ) {
  0         0  
599 0 0       0 if ( $_->{start} <= $offset ) {
600             $offset = ( $_->{rtype} == IMP_REPLACE )
601             ? $_->{gstart}
602 0 0       0 : $_->{gstart} - $_->{start} + $offset;
603             } else {
604             last
605 0         0 }
606             }
607 0         0 $wself->run_callback([ IMP_LOG,$dir,$offset,$len,$level,$msg,@extmsg ]);
608              
609             } elsif ( $rtype == IMP_PAUSE ) {
610 0         0 my $dir = shift;
611 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype");
612 0 0       0 next if $pause[$pi];
613 0         0 $pause[$dir][$pi] = 1;
614 0 0       0 $wself->run_callback([ IMP_PAUSE ]) if grep { $_ } @pause > 1;
  0         0  
615              
616             } elsif ( $rtype == IMP_CONTINUE ) {
617 0         0 my $dir = shift;
618 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype");
619 0         0 delete $pause[$dir][$pi];
620             $wself->run_callback([ IMP_CONTINUE ])
621 0 0       0 if not grep { $_ } @{$pause[$dir]};
  0         0  
  0         0  
622              
623             } elsif ( $rtype ~~ [ IMP_PASS, IMP_PREPASS ] ) {
624 76         149 my ($dir,$offset) = @$rv;
625 76 50       157 $DEBUG && debug("callback[$dir][$pi] $rtype $offset");
626 76 100       195 ref(my $part = $parts[$dir][$pi]) or next; # part skippable?
627 69 100       173 if ( $rtype == IMP_PASS ) {
628 65 50       152 next if $part->{pass} == IMP_MAXOFFSET; # no change
629 65 100       157 if ( $offset == IMP_MAXOFFSET ) {
    50          
630 22         37 $part->{pass} = IMP_MAXOFFSET;
631 22         39 $part->{prepass} = 0; # pass >= prepass
632             } elsif ( $offset > $part->{pass} ) {
633 43         73 $part->{pass} = $offset;
634 43 50 33     169 if ( $part->{prepass} != IMP_MAXOFFSET
635             and $part->{prepass} <= $offset ) {
636 43         92 $part->{prepass} = 0; # pass >= prepass
637             }
638             } else {
639 0         0 next; # no change
640             }
641             } else { # IMP_PREPASS
642 4 100       12 next if $part->{prepass} == IMP_MAXOFFSET; # no change
643 2 50       4 if ( $offset == IMP_MAXOFFSET ) {
    0          
644 2         4 $part->{prepass} = IMP_MAXOFFSET;
645             } elsif ( $offset > $part->{prepass} ) {
646 0         0 $part->{prepass} = $offset;
647             } else {
648 0         0 next; # no change
649             }
650             }
651              
652             # pass/prepass got updated, so we might pass some more data
653 67         163 push @fwd, $fwd_collect->($dir,$pi,0);
654              
655             } elsif ( $rtype == IMP_REPLACE ) {
656 31         77 my ($dir,$offset,$newdata) = @$rv;
657 31 50       62 $DEBUG && debug(
658             "callback[$dir][$pi] $rtype $dir $offset len=%d",
659             length($newdata));
660 31 50       86 ref(my $part = $parts[$dir][$pi])
661             or die "called replace for passed data";
662 31         118 my $ibuf = $part->{ibuf};
663              
664             # sanity checks
665 31 50       68 die "called replace although pass=IMP_MAXOFFSET" if ! $part;
666 31 50       64 die "no replace with IMP_MAXOFFSET" if $offset == IMP_MAXOFFSET;
667             die "called replace for already passed data"
668 31 50       93 if $ibuf->[0]{start} > $offset;
669              
670 31         72 while (@$ibuf) {
671 71         108 my $buf = $ibuf->[0];
672 71 100       138 if ( $offset >= $buf->{end} ) {
673             # replace complete buffer
674 49 50       103 $DEBUG && debug(
675             "replace complete buf $buf->{start}..$buf->{end}");
676 49 100 66     222 if ( ! defined($buf->{data})
677             or $buf->{data} ne $newdata ) {
678 48         91 $buf->{rtype} = IMP_REPLACE;
679 48         80 $buf->{data} = $newdata;
680             $part->{adjust} +=
681 48         107 length($newdata) - $buf->{end} + $buf->{start};
682 48         73 $newdata = ''; # in the next buffers replace with ''
683             }
684 49         95 push @fwd,[ $pi,$dir,$buf ];
685 49         80 shift(@$ibuf);
686 49 100       116 if ( ! @$ibuf ) {
687             # all bufs eaten
688             die "called replace for future data"
689 5 50       17 if $buf->{end}<$offset;
690             @$ibuf = $new_buf->( %$buf,
691             data => '',
692             start => $buf->{end},
693             end => $buf->{end},
694             gstart => $buf->{gend},
695             # packet types cannot get partial replacement
696             # at end
697 5 50       49 $buf->{dtype} > 0 ? ( dtype => 0 ):()
698             );
699             # remove eof from buf in @fwd because we added
700             # new one
701 5         14 $fwd[-1][2]{eof} = 0;
702 5         16 last;
703             }
704 44 100       159 last if $buf->{end} == $offset;
705             } else {
706             # split buffer and replace first part
707 22 50       57 $DEBUG && debug(
708             "replace - split buf $buf->{start}..$offset..$buf->{end}");
709 22         75 $split_buf->($ibuf,0,$offset-$buf->{start});
710 22         36 redo;
711             }
712             }
713              
714             } elsif ( $rtype == IMP_REPLACE_LATER ) {
715 0         0 my ($dir,$offset) = @$rv;
716 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype $offset");
717 0 0       0 ref(my $part = $parts[$dir][$pi])
718             or die "called replace for passed data";
719 0         0 my $ibuf = $part->{ibuf};
720 0 0       0 $_->{replace_later} == IMP_MAXOFFSET and next; # no change
721              
722             # sanity checks
723 0 0       0 die "called replace_later although pass=IMP_MAX_OFFSET"
724             if ! $part;
725             die "called replace for already passed data" if
726             $offset != IMP_MAXOFFSET and
727 0 0 0     0 $ibuf->[0]{start} > $offset;
728              
729 0 0       0 if ( $offset == IMP_MAXOFFSET ) {
    0          
730 0         0 $_->{replace_later} = IMP_MAXOFFSET;
731             # change all to replace_later
732 0         0 $_->{data} = undef for(@$ibuf);
733 0         0 next;
734             } elsif ( $offset <= $part->{replace_later} ) {
735             # no change
736             } else {
737 0         0 $part->{replace_later} = $offset;
738 0         0 for(@$ibuf) {
739 0 0       0 defined($_->{data}) or next; # already replace_later
740 0 0       0 my $len = length($_->{data}) or last; # dummy buffer
741 0 0       0 if ( $_->{start} + $len <= $offset ) {
742 0         0 $_->{data} = undef;
743             } else {
744 0         0 $part->{replace_later} = 0;
745 0         0 last;
746             }
747             }
748             }
749             } else {
750 0 0       0 $DEBUG && debug("callback[.][$pi] $rtype @$rv");
751 0         0 die "don't know how to handle rtype $rtype";
752             }
753             }
754              
755             # pass to next part/output
756 75 100       223 $exec_fwd->(@fwd) if @fwd;
757 11         234 };
758              
759             # While we are in $part_in function we will only spool callbacks and process
760             # them at the end. Otherwise $dataf might cause call of callback which then
761             # causes call of dataf etc - which makes debugging a nightmare.
762              
763 11         33 my $collect_callbacks;
764             $global_in = sub {
765 31     31   73 my ($dir,$data,$offset,$dtype) = @_;
766              
767 31   100     212 my %buf = (
768             data => $data,
769             dtype => $dtype // IMP_DATA_STREAM,
770             rtype => IMP_PASS,
771             eof => $data eq '',
772             );
773              
774 31         55 my $adjust = 0;
775 31 100       68 my $pi = $dir ? $#imp:0; # enter into first or last part
776 31         54 my $np;
777 31         49 while (1) {
778 34 100       100 ref( $np = $parts[$dir][$pi] ) and last;
779 5         7 $adjust += $np;
780 5 50       8 $pi += $dir?-1:1;
781 5 100 66     31 if ( $pi<0 or $pi>$#imp ) {
782 2 50       4 $DEBUG && debug("all skipped");
783 2 50       8 if ( my $cb = $fwd_up->($dir,$new_buf->(%buf))) {
784 0         0 $self->run_callback($cb);
785             }
786 2         7 return;
787             }
788             }
789              
790 29 50       81 return if ! ref $np; # got IMP_PASS IMP_MAXOFFSET for all
791              
792 29         64 my $ibuf_end = $np->{ibuf}[-1]{gend};
793 29 50       83 if ( ! $offset ) {
    0          
    0          
794             # no gap between data
795 29         67 $buf{gstart} = $ibuf_end;
796             } elsif ( $offset < $ibuf_end ) {
797 0         0 die "overlapping data";
798             } elsif ( $offset > $ibuf_end ) {
799             # gap between data
800 0         0 $buf{gstart} = $offset;
801 0         0 $buf{gap} = $offset - $ibuf_end;
802             } else {
803             # there was no need for giving offset
804 0         0 $buf{gstart} = $ibuf_end;
805             }
806 29         72 $buf{gend} = $buf{gstart} + length($data);
807 29         64 $buf{start} = $buf{gstart} + $adjust;
808 29         63 $buf{end} = $buf{gend} + $adjust;
809              
810 29   50     148 $collect_callbacks ||= [];
811 29         129 $part_in->( $pi,$dir, $new_buf->(%buf));
812              
813 29         96 while ( my $cb = shift(@$collect_callbacks)) {
814 75         304 $_imp_callback->(@$cb);
815             }
816             $collect_callbacks = undef
817 11         117 };
  29         317  
818              
819             # wrapper which spools callbacks if within dataf
820             $imp_callback = sub {
821 75 50   75   146 if ( $collect_callbacks ) {
822             # only spool and execute later
823 75         160 push @$collect_callbacks, [ @_ ];
824 75         208 return;
825             }
826 0         0 return $_imp_callback->(@_)
827 11         75 };
828              
829             # setup callbacks
830 11         128 $imp[$_]->set_callback( $imp_callback,$_ ) for (0..$#imp);
831              
832             # make some closures available within methods
833 11         30 $self->{dataf} = $global_in;
834             $self->{closef} = sub {
835 0     0   0 $global_in = $part_in = $imp_callback = $_imp_callback = undef;
836 0         0 @parts = ();
837 11         78 };
838 11         275 return $self;
839             }
840              
841             sub data {
842 31     31 1 88 my $self = shift;
843 31         79 $self->{dataf}(@_);
844             }
845              
846             sub DESTROY {
847 11     11   4928 my $closef = shift->{closef};
848 11 50       94 $closef->() if $closef;
849             }
850              
851              
852             1;
853              
854             __END__