File Coverage

blib/lib/Net/IMP/Cascade.pm
Criterion Covered Total %
statement 290 426 68.0
branch 150 306 49.0
condition 63 99 63.6
subroutine 24 29 82.7
pod 4 4 100.0
total 531 864 61.4


line stmt bran cond sub pod time code
1 4     4   2183 use strict;
  4         9  
  4         92  
2 4     4   16 use warnings;
  4         7  
  4         113  
3 4     4   1482 no if $] >= 5.017011, warnings => 'experimental::smartmatch';
  4         33  
  4         20  
4              
5             ############################################################################
6             # BEWARE! complex stuff!
7             # to aid with debugging problems:
8             # - switch on debug mode
9             # - to see whats going on in direction dir part p:
10             # grep for '[dir][p]'
11             # - to see whats transfering out of direction dir part p into next part/up:
12             # grep for '[dir][p>'
13             #
14             # basic design
15             # - we do not have lots of member variables, instead we put everything into
16             # new_analyzer as normal variables and declare various $sub = sub ... which
17             # use these variables. Thus the variables get bound once to the sub and we
18             # don't need to access it with $self->{field}... or so all the time
19             # - subs and data structures are described in new_analyzer, the most important
20             # are
21             # - $global_in - this is the sub data method
22             # - $part_in - called from global_in, itself and callbacks to put data
23             # into a specific part. Feeds the data in the associated analyzer
24             # - $imp_callback - callback for the analyzer of a specific part
25              
26             ############################################################################
27              
28             package Net::IMP::Cascade;
29 4     4   344 use base 'Net::IMP::Base';
  4         11  
  4         1201  
30             use fields (
31 4         19 'parts', # analyzer objects
32             # we do everything with closures inside new_analyzer here, so that the
33             # object has only fields for accessing some closures from subs
34             'dataf', # called from sub data
35             'closef', # called from DESTROY
36 4     4   25 );
  4         8  
37              
38 4     4   238 use Net::IMP; # constants
  4         8  
  4         274  
39 4     4   20 use Carp 'croak';
  4         7  
  4         163  
40 4     4   22 use Scalar::Util 'weaken';
  4         8  
  4         152  
41 4     4   1425 use Hash::Util qw(lock_ref_keys);
  4         6132  
  4         24  
42 4     4   292 use Net::IMP::Debug;
  4         9  
  4         26  
43 4     4   1655 use Data::Dumper;
  4         23782  
  4         15866  
44              
45             my %rtypes_implemented_myself = map { $_ => 1 } (
46             IMP_PASS,
47             IMP_PREPASS,
48             IMP_REPLACE,
49             IMP_REPLACE_LATER,
50             IMP_DENY,
51             IMP_DROP,
52             #IMP_TOSENDER, # not supported yet
53             IMP_LOG,
54             IMP_ACCTFIELD,
55             IMP_FATAL,
56             );
57              
58             sub get_interface {
59 0     0 1 0 my Net::IMP::Cascade $factory = shift;
60 0         0 my $parts = $factory->{factory_args}{parts};
61              
62             # collect interfaces by part
63 0         0 my @if4part;
64 0         0 for my $p ( @$parts ) {
65 0         0 my @if;
66 0         0 for my $if ( $p->get_interface(@_)) {
67             # $if should require only return types I support
68             push @if,$if
69 0 0       0 if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
  0         0  
  0         0  
70             }
71 0 0       0 @if or return; # nothing in common
72 0         0 push @if4part,\@if
73             }
74              
75             # find interfaces which are supported by all parts
76 0         0 my @common;
77 0         0 for( my $i=0;$i<@if4part;$i++ ) {
78 0         0 for my $if_i ( @{ $if4part[$i] } ) {
  0         0  
79 0         0 my ($in_i,$out_i) = @$if_i;
80             # check if $if_i matches at least on interface description in
81             # all other parts, e.g. $if_i is same or included in $if_k
82             # - data type/proto: $in_k should be undef or same as $in_i
83             # - return types: $out_i should include $out_k
84 0         0 for( my $k=0;$k<@if4part;$k++ ) {
85 0 0       0 next if $i == $k; # same
86 0         0 for my $if_k ( @{ $if4part[$k] } ) {
  0         0  
87 0         0 my ($in_k,$out_k) = @$if_k;
88             # should be same data type or $in_k undef
89 0 0 0     0 next if $in_k and ( ! $in_i or $in_k != $in_i );
      0        
90             # $out_i should include all of $out_k
91 0         0 my %out_k = map { $_ => 1 } @$out_k;
  0         0  
92 0         0 delete @out_k{ @$out_i };
93 0 0       0 next if %out_k; # some in k are not in i
94              
95             # junction if i and k
96 0         0 push @common,[ $in_k,$out_i ];
97             }
98             }
99             }
100             }
101              
102             # remove duplicates from match
103 0         0 my (@uniq,%m);
104 0         0 for( @common ) {
105 0   0     0 my $key = ( $_->[0] // '' )."\0".join("\0",sort @{$_->[1]});
  0         0  
106 0 0       0 push @uniq,$_ if ! $m{$key}++;
107             }
108 0         0 return @uniq;
109             }
110              
111             sub set_interface {
112 0     0 1 0 my Net::IMP::Cascade $factory = shift;
113 0         0 my @if = @_;
114 0         0 my $parts = $factory->{factory_args}{parts};
115              
116 0         0 my @new_parts;
117 0         0 for(my $i=0;$i<@$parts;$i++) {
118 0 0       0 my $np = $parts->[$i]->set_interface(@if)
119             or return; # cannot use interface
120 0 0       0 $np == $parts->[$i] and next; # no change of part
121 0         0 $new_parts[$i] = $np; # got new factory for part
122             }
123              
124 0 0       0 return $factory if ! @new_parts; # interface supported by original factory
125              
126             # some parts changed, create new factory for this cascade
127 0         0 for(my $i=0;$i<@$parts;$i++) {
128 0   0     0 $new_parts[$i] ||= $parts->[$i]; # copy parts which did not change
129             }
130              
131 0         0 return ref($factory)->new_factory( parts => \@new_parts );
132             }
133              
134             sub new_analyzer {
135 11     11 1 67 my ($factory,%args) = @_;
136              
137 11         24 my $p = $factory->{factory_args}{parts};
138 11         40 my $self = $factory->SUPER::new_analyzer(%args);
139 11         26 my @imp = map { $_->new_analyzer(%args) } @$p;
  21         71  
140              
141             # $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
142             # if part is optimized away due to IMP_PASS with IMP_MAXOFFSET
143             # $parts[$dir][$pi] contains instead an integer for adjustments
144             # from this part
145 11         34 my @parts;
146              
147             # pause/continue handling
148             # maintains pause status per part
149             my @pause;
150              
151             # to make sure we don't leak due to cross-references
152 11         47 weaken( my $wself = $self );
153              
154             my $new_buf = sub {
155 160     160   934 lock_ref_keys( my $buf = {
156             start => 0, # start of buf relativ to part
157             end => 0, # end of buf relativ to part
158             data => '', # data or undef for replace_later
159             dtype => 0, # data type
160             rtype => IMP_PASS, # IMP_PASS < IMP_PREPASS < IMP_REPLACE
161             gap => 0, # size of gap before buf?
162             gstart => 0, # start of buf relativ to cascade
163             gend => 0, # end of buf relativ to cascade
164             eof => 0 # flag if last buf in this direction
165             });
166 160 100       2187 %$buf = ( %$buf, @_ ) if @_;
167 160         611 return $buf;
168 11         44 };
169              
170             my $new_part = sub {
171 42     42   85 lock_ref_keys( my $p = {
172             ibuf => [ &$new_buf ], # buffers, at least one
173             pass => 0, # can pass up to ..
174             prepass => 0, # can prepass up to ..
175             replace_later => 0, # will replace_later up to ..
176             adjust => 0, # total local adjustments from forwarded bufs
177             });
178 42         355 return $p;
179 11         28 };
180              
181             # initialize @parts
182 11         38 for( my $i=0;$i<@imp;$i++ ) {
183 21         40 $parts[0][$i] = $new_part->(); # client -> server, flow 0>1>2>..
184 21         41 $parts[1][$#imp-$i] = $new_part->(); # server -> client, flow 9>8>7>..
185             }
186              
187             my $dump_bufs = sub {
188 0     0   0 my $bufs = shift;
189 0         0 my @out;
190 0 0       0 for my $i (@_ ? @_: 0..$#$bufs) {
191 0         0 my $buf = $bufs->[$i];
192 0 0       0 my $str = ! defined( $buf->{data} ) ? '' : do {
193 0         0 local $_ = $buf->{data};
194 0 0       0 $_ = substr($_,0,27).'...' if length($_)>30;
195 0         0 s{([\\\n\r\t[^:print:]])}{ sprintf("\\%03o",ord($1)) }esg;
  0         0  
196 0         0 $_
197             };
198             push @out, sprintf("#%02d %d..%d%s%s%s %s %s [%d,%d] '%s'",
199             $i,
200             $buf->{start},$buf->{end}, $buf->{eof} ? '$':'',
201             $buf->{gap} ? " +$buf->{gap}":"",
202             defined($buf->{data}) ? '':' RL',
203             $buf->{dtype},$buf->{rtype},
204             $buf->{gstart},$buf->{gend},
205 0 0       0 $str
    0          
    0          
206             );
207             }
208 0         0 return join("\n",@out);
209 11         49 };
210             my $dump_parts = sub {
211 0     0   0 my $dir = shift;
212 0         0 my $out = '';
213 0 0       0 for my $pi (@_ ? @_ : 0..$#imp) {
214 0         0 my $part = $parts[$dir][$pi];
215 0 0       0 if ( ! $part ) {
216 0         0 $out .= "part[$dir][$pi] - skip\n";
217 0         0 next;
218             }
219             $out .= sprintf("part[%d][%d] p|pp|rl=%d|%d|%d ibuf:\n",
220 0         0 $dir,$pi,$part->{pass},$part->{prepass},$part->{replace_later});
221 0         0 my $ib = $part->{ibuf};
222 0         0 $out .= $dump_bufs->( $part->{ibuf});
223             }
224 0         0 return $out;
225 11         36 };
226              
227             my $split_buf = sub {
228 58     58   108 my ($ibuf,$i,$fwd) = @_;
229 58         117 my $buf = $ibuf->[$i];
230 58 50       145 die "no split for packet types" if $buf->{dtype}>0;
231              
232             my $buf_before = $new_buf->(
233             %$buf,
234             eof => 0,
235             end => $buf->{start} + $fwd, # adjust end
236             defined($buf->{data})
237 58 50       335 ? ( data => substr($buf->{data},0,$fwd,'') ) # real data
238             : (), # replacement promise
239             );
240             # gap in buf_before
241 58         125 $buf->{gap} = 0;
242 58         95 $buf->{start} = $buf_before->{end};
243              
244             # if buf was not changed gend..gstart should reflect the
245             # original length of the data
246 58 100       122 if ( $buf->{rtype} != IMP_REPLACE ) {
247 54         95 $buf_before->{gend} = ( $buf->{gstart} += $fwd );
248             } else {
249             # split gstart..gend into full|0 per convention
250 4         10 $buf->{gstart} = $buf->{gend};
251             }
252              
253             # put buf_before before buf in ibuf
254 58         139 splice(@$ibuf,$i,0,$buf_before);
255 11         48 };
256              
257 11         73 my $fwd_collect; # collect bufs which can be forwarded
258             my $fwd_up; # collect what can be passed up
259 11         0 my $exec_fwd; # do the collected forwarding to next part or up
260              
261 11         0 my $global_in; # function where data gets fed into from outside (sub data)
262 11         0 my $part_in; # internal feed into each part
263              
264 11         0 my $imp_callback; # synchronization wrapper around callback for analyzers
265 11         0 my $_imp_callback; # real callback for the analyzers
266              
267             # pass passable bufs in part starting with ibuf[i]
268             # returns all bufs which can be passed and strips them from part.ibuf
269             $fwd_collect = sub {
270 148     148   314 my ($dir,$pi,$i,$r_passed) = @_;
271 148         242 my $part = $parts[$dir][$pi];
272 148         220 my $ibuf = $part->{ibuf};
273 148 50       303 $DEBUG && debug(
274             "fwd_collect[$dir][$pi]: p=$part->{pass} pp=$part->{prepass} "
275             .$dump_bufs->($ibuf));
276 148         208 my @fwd;
277 148         268 for my $pp (qw(pass prepass)) {
278 296 100       733 my $pass = $part->{$pp} or next;
279 71         161 for( ;$i<@$ibuf;$i++ ) {
280 174         279 my $buf = $ibuf->[$i];
281 174 50       363 last if ! $buf->{dtype}; # dummy buf
282 174 100 100     583 if ( $pass != IMP_MAXOFFSET and $buf->{start} >= $pass ) {
283 43 50       97 $DEBUG && debug(
284             "fwd_collect[$dir][$pi]: reset $pp due to start[$i]($buf->{start})>=$pp($pass)");
285 43         135 $part->{$pp} = 0;
286 43         80 last;
287             }
288             die "cannot pass bufs with replace_later"
289 131 50       278 if ! defined $buf->{data};
290 131 100 100     424 if ( $pass == IMP_MAXOFFSET or $buf->{end} <= $pass ) {
291             # whole buffer can be passed
292 95 50       197 $DEBUG && debug(
293             "fwd_collect[$dir][$pi]: pass whole buffer[$i] $buf->{start}..$buf->{end}");
294             $buf->{rtype} = IMP_PREPASS if $pp eq 'prepass'
295 95 100 100     257 and $buf->{rtype} == IMP_PASS;
296 95         202 push @fwd,[ $pi,$dir,$buf ];
297              
298             # r_passed is set from part_in to track position if data
299             # are passed. In case of prepass we don't pass data but
300             # only put them into fwd
301 95 100 66     250 next if $r_passed && $pp eq 'prepass';
302              
303             # track what got passed for part_in
304 91 50       202 $$r_passed = $buf->{end} if $r_passed;
305              
306             # remove passed data from ibuf, if ! r_passed also prepassed
307             # data (called from imp_callback)
308 91         134 shift(@$ibuf);
309 91         138 $i--;
310              
311 91 100       252 if ( ! @$ibuf ) {
312 24 100 66     82 if ( $part->{pass} == IMP_MAXOFFSET || $buf->{eof} ) {
313             # part done, skip it in the future
314 22         46 push @fwd,[$pi,$dir,undef]; # buf = undef is special
315             }
316             # insert dummy
317             @$ibuf = $new_buf->(
318             start => $buf->{end},
319             end => $buf->{end},
320             gstart => $buf->{gend},
321             gend => $buf->{gend},
322             # keep type for streaming data
323 24 50       82 $buf->{dtype} < 0 ? ( dtype => $buf->{dtype} ):(),
324             );
325 24         57 last;
326             }
327              
328             } else {
329             # only part of buffer can be passed
330             # split buffer and re-enter loop, this will foreward the
331             # first part and keep the later part
332 36 50       78 $DEBUG && debug(
333             "fwd_collect[$dir][$pi]: need to split buffer[$i]: $buf->{start}..$pass..$buf->{end}");
334 36         111 $split_buf->($ibuf,$i,$pass - $buf->{start});
335 36         68 redo; # don't increase $i!
336             }
337             }
338             }
339 148         386 return @fwd;
340 11         89 };
341              
342             $fwd_up = sub {
343 71     71   119 my ($dir,$buf) = @_;
344 71 100 66     247 if ( $buf->{gstart} == $buf->{gend} && ! $buf->{gap}
      100        
345             && $buf->{rtype} ~~ [ IMP_PASS, IMP_PREPASS ]) {
346             # don't repeat last (pre)pass because of empty buffer
347 3         11 return;
348             }
349              
350             return [
351             $buf->{rtype},
352             $dir,
353             $buf->{gend},
354 68 100       281 ($buf->{rtype} == IMP_REPLACE) ? ( $buf->{data} ):()
355             ];
356 11         36 };
357              
358             $exec_fwd = sub {
359 70     70   158 my @fwd = @_;
360 70 100       173 if (@fwd>1) {
361             $DEBUG && debug("trying to merge\n".join("\n", map {
362 50 0       155 ! defined $_->[0]
  0 0       0  
    50          
363             ? ""
364             : "fwd[$_->[1]][$_->[0]] " .
365             ( $_->[2] ? $dump_bufs->([$_->[2]]) : '')
366             } @fwd));
367             # try to compress
368 50         78 my ($lpi,$ldir,$lbuf);
369 50         121 for( my $i=0;$i<@fwd;$i++ ) {
370 146 100 66     569 if ( ! defined $fwd[$i][0] || ! defined $fwd[$i][2]) {
371 22         34 $lpi = undef;
372 22         56 next;
373             }
374 124 50 66     523 if ( ! defined $lpi
      66        
375             or $lpi != $fwd[$i][0]
376             or $ldir != $fwd[$i][1] ) {
377 50         74 ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
  50         100  
378 50         115 next;
379             }
380              
381 74         114 my $buf = $fwd[$i][2];
382              
383 74 100 33     606 if ( not $buf->{gap}
      66        
      100        
      66        
384             and $buf->{dtype} < 0
385             and $buf->{start} == $lbuf->{end}
386             and $buf->{rtype} == $lbuf->{rtype}
387             and $buf->{dtype} == $lbuf->{dtype}
388             ) {
389 28 100       80 if ( $buf->{rtype} == IMP_REPLACE ) {
390 17 100 100     96 if ( $lbuf->{gend} == $buf->{gend} ) {
    100          
391             # same global end, merge data
392 2         8 $lbuf->{data} .= $buf->{data};
393             } elsif ( $buf->{data} ne '' or $lbuf->{data} ne '' ) {
394             # either one not empty, no merge
395 11         36 next;
396             }
397             } else {
398             # unchanged, append
399 11         18 $lbuf->{data} .= $buf->{data};
400             }
401 17 50       43 $DEBUG && debug("merge bufs ".$dump_bufs->([$lbuf,$buf]));
402 17         34 $lbuf->{gend} = $buf->{gend};
403 17         27 $lbuf->{end} = $buf->{end};
404 17         29 $lbuf->{eof} = $buf->{eof};
405 17         30 splice(@fwd,$i,1,());
406 17         30 $i--;
407 17         42 next;
408              
409             } else {
410 46         69 ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
  46         83  
411 46         114 next;
412             }
413             }
414             }
415 70         182 while ( my $fwd = shift(@fwd)) {
416 171         542 my $npi = my $pi = shift(@$fwd);
417 171 50       368 if ( ! defined $npi ) {
418             # propagate prepared IMP callback
419 0         0 $wself->run_callback($fwd);
420 0         0 next;
421             }
422              
423 171         283 my ($dir,$buf) = @$fwd;
424              
425 171 100       316 if ( $buf ) {
426 127         167 my $np;
427 127         169 my $adjust = 0;
428 127         175 while (1) {
429 127 100       233 $npi += $dir?-1:+1;
430 127 100 100     483 last if $npi<0 or $npi>=@imp;
431 58 50       166 last if ref( $np = $parts[$dir][$npi] );
432 0         0 $adjust += $np;
433 0 0       0 $DEBUG && debug("skipping pi=$npi");
434             }
435              
436 127 100       279 if ( $buf->{eof} ) {
437             # add pass infinite to fwd to propagate eof
438 22         46 push @fwd,[ $pi,$dir,undef ];
439             }
440 127 100       245 if ( $np ) {
441             # feed into next part
442 58         85 my $nib = $np->{ibuf};
443             # adjust start,end based on end of npi and gap
444 58         123 $buf->{start} = $nib->[-1]{end} + $buf->{gap} + $adjust;
445 58         108 $buf->{end} = $buf->{start} + length($buf->{data});
446 58 50       121 $DEBUG && debug(
447             "fwd_next[$dir][$pi>$npi] ".$dump_bufs->([$buf]));
448 58         114 $part_in->($npi,$dir,$buf);
449             } else {
450             # output from cascade
451 69 100       141 my $cb = $fwd_up->($dir,$buf) or next;
452 68 50       148 $DEBUG && debug(
453             "fwd_up[$dir][$pi>>] ".$dump_bufs->([$buf]));
454 68         211 $wself->run_callback($cb);
455             }
456              
457             # special - part is done with IMP_PASS IMP_MAXOFFSET
458             } else {
459             # skip if we had a pass infinite already
460 44 100       203 next if ! ref $parts[$dir][$pi];
461              
462 24         41 $parts[$dir][$pi] = $parts[$dir][$pi]->{adjust};
463 24 100       59 if ( grep { ref($_) } @{ $parts[$dir] } ) {
  54         135  
  24         61  
464             # we have other unfinished parts, skip only this part
465 12 50       58 $DEBUG && debug(
466             "part[$dir][$pi>$npi] will be skipped in future, adjust=$parts[$dir][$pi]");
467             } else {
468             # everything can be skipped
469 12 50       28 $DEBUG && debug(
470             "part[$dir][$pi>>] all parts will be skipped in future");
471             # pass rest
472 12         39 $wself->run_callback([ IMP_PASS,$dir,IMP_MAXOFFSET ]);
473             }
474             }
475             }
476 11         77 };
477              
478             # the data function
479             # called from sub data on new data and from $process when data are finished
480             # in on part and should be transferred into the next part
481             # $pi - index into parts
482             # $dir - direction (e.g. target part is $parts[$dir][$pi])
483             # $buf - new buffer from $new_buf->() which might be merged with existing
484             $part_in = sub {
485 87     87   152 my ($pi,$dir,$buf) = @_;
486 87 50       183 $DEBUG && debug( "part_in[$dir][$pi]: ".$dump_bufs->([$buf]));
487              
488 87         142 my $part = $parts[$dir][$pi];
489 87         129 my $ibuf = $part->{ibuf};
490 87         129 my $lbuf = $ibuf->[-1];
491 87         127 my $lend = $lbuf->{end};
492              
493             # some sanity checks
494 87         118 if(1) {
495             die "data after eof [$dir][$pi] ".$dump_bufs->([$lbuf,$buf])
496 87 50       174 if $lbuf->{eof};
497 87 50       243 if ( $buf->{start} != $lend ) {
    50          
498 0 0       0 if ( $buf->{start} < $lend ) {
    0          
499 0         0 die "overlapping data off($buf->{start})
500             } elsif ( ! $buf->{gap} ) {
501 0         0 die "gap should be set because off($buf->{start})>last.end($lend) in part[$dir][$pi]"
502             }
503             } elsif ( $buf->{gap} ) {
504 0         0 die "gap specified even if off($buf->{start}) == last.end"
505             }
506 87 50       185 $part->{pass} == IMP_MAXOFFSET and die
507             "pass infinite should have been optimized by removing part[$dir][$pi]";
508             }
509              
510             # add data to buf
511 87 100 100     627 if ( $lbuf->{data} eq '' and $lbuf->{rtype} == IMP_PASS ) {
    100 66        
      100        
      66        
      66        
      66        
512             # empty dummy buffer
513 26 50       58 $DEBUG && debug("part_in[$dir][$pi]: replace dummy buffer");
514 26 50       65 @$ibuf == 1 or die "empty dummy buffer should only be at beginning";
515 26         54 @$ibuf = $buf;
516              
517             } elsif ( ! $buf->{gap}
518             and $buf->{data} eq ''
519             and $buf->{rtype} == $lbuf->{rtype}
520             and $buf->{dtype} == $lbuf->{dtype}
521             and $buf->{dtype} < 0
522             and ! $buf->{eof}
523             ) {
524             # just update eof,[g]end of lbuf
525 6 50       20 $DEBUG && debug(
526             "part_in[$dir][$pi]: set lbuf end=$buf->{end} gend=$buf->{gend}");
527 6         14 $lbuf->{end} = $buf->{end};
528 6         9 $lbuf->{gend} = $buf->{gend};
529             # nothing to do with these empty data
530 6 50       17 $DEBUG && debug("part_in[$dir][$pi] nothing to do on empty buffer");
531 6         34 return;
532              
533             } else {
534             # add new buf
535 55 50       116 $DEBUG && debug("part_in[$dir][$pi]: add new buffer");
536 55         112 push @$ibuf,$buf;
537             }
538              
539             # determine what can be forwarded immediatly
540 81         199 my @fwd = $fwd_collect->($dir,$pi,$#$ibuf,\$lend);
541              
542 81 100       225 if ( $buf->{eof} ? $lend <= $buf->{end} : $lend < $buf->{end} ) {
    100          
543             # send new data to the analyzer
544 80         130 my $rl = $part->{replace_later};
545 80         141 for(@$ibuf) {
546 173 100       371 next if $_->{start} < $lend;
547             die "last_end should be on buffer boundary"
548 84 50       174 if $_->{start} > $lend;
549 84         126 $lend = $_->{end};
550             $DEBUG && debug(
551             "analyzer[$dir][$pi] << %d bytes %s \@%d%s -> last_end=%d",
552             $_->{end} - $_->{start},
553             $_->{dtype},
554 84 0       174 $_->{start},$_->{gap} ? "(+$_->{gap})":'',
    50          
555             $lend
556             );
557             $imp[$pi]->data($dir,
558             $_->{data},
559             $_->{gap} ? $_->{start}:0,
560             $_->{dtype}
561 84 50       345 );
562             $imp[$pi]->data($dir,'',0, $_->{dtype})
563 84 100 100     335 if $buf->{eof} && $_->{data} ne '';
564 84 50       254 $rl or next;
565 0 0 0     0 if ( $rl == IMP_MAXOFFSET or $rl>= $lend ) {
566 0         0 $buf->{data} = undef;
567             } else {
568 0         0 $rl = $part->{replace_later} = 0; # reset
569             }
570             }
571             } else {
572 1 50       3 $DEBUG && debug(
573             "nothing to analyze[$dir][$pi]: last_end($lend) < end($buf->{end})");
574             }
575              
576             # forward data which can be (pre)passed
577 81 100       381 $exec_fwd->(@fwd) if @fwd;
578 11         93 };
579              
580             $_imp_callback = sub {
581 75     75   111 my $pi = shift;
582              
583 75         104 my @fwd;
584 75         141 for my $rv (@_) {
585 107         173 my $rtype = shift(@$rv);
586              
587 107 50       667 if ( $rtype ~~ [ IMP_FATAL, IMP_DENY, IMP_DROP, IMP_ACCTFIELD ]) {
    50          
    50          
    50          
    100          
    50          
    0          
588 0 0       0 $DEBUG && debug("callback[.][$pi] $rtype @$rv");
589 0         0 $wself->run_callback([ $rtype, @$rv ]);
590              
591             } elsif ( $rtype == IMP_LOG ) {
592 0         0 my ($dir,$offset,$len,$level,$msg,@extmsg) = @$rv;
593 0 0       0 $DEBUG && debug(
594             "callback[$dir][$pi] $rtype '$msg' off=$offset len=$len lvl=$level");
595             # approximate offset to real position
596 0         0 my $newoff = 0;
597 0         0 my $part = $parts[$dir][$pi];
598 0         0 for ( @{$part->{ibuf}} ) {
  0         0  
599 0 0       0 if ( $_->{start} <= $offset ) {
600             $offset = ( $_->{rtype} == IMP_REPLACE )
601             ? $_->{gstart}
602 0 0       0 : $_->{gstart} - $_->{start} + $offset;
603             } else {
604             last
605 0         0 }
606             }
607 0         0 $wself->run_callback([ IMP_LOG,$dir,$offset,$len,$level,$msg,@extmsg ]);
608              
609             } elsif ( $rtype == IMP_PAUSE ) {
610 0         0 my $dir = shift;
611 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype");
612 0 0       0 next if $pause[$pi];
613 0         0 $pause[$dir][$pi] = 1;
614 0 0       0 $wself->run_callback([ IMP_PAUSE ]) if grep { $_ } @pause > 1;
  0         0  
615              
616             } elsif ( $rtype == IMP_CONTINUE ) {
617 0         0 my $dir = shift;
618 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype");
619 0         0 delete $pause[$dir][$pi];
620             $wself->run_callback([ IMP_CONTINUE ])
621 0 0       0 if not grep { $_ } @{$pause[$dir]};
  0         0  
  0         0  
622              
623             } elsif ( $rtype ~~ [ IMP_PASS, IMP_PREPASS ] ) {
624 76         148 my ($dir,$offset) = @$rv;
625 76 50       159 $DEBUG && debug("callback[$dir][$pi] $rtype $offset");
626 76 100       199 ref(my $part = $parts[$dir][$pi]) or next; # part skippable?
627 69 100       126 if ( $rtype == IMP_PASS ) {
628 65 50       156 next if $part->{pass} == IMP_MAXOFFSET; # no change
629 65 100       153 if ( $offset == IMP_MAXOFFSET ) {
    50          
630 22         35 $part->{pass} = IMP_MAXOFFSET;
631 22         39 $part->{prepass} = 0; # pass >= prepass
632             } elsif ( $offset > $part->{pass} ) {
633 43         65 $part->{pass} = $offset;
634 43 50 33     178 if ( $part->{prepass} != IMP_MAXOFFSET
635             and $part->{prepass} <= $offset ) {
636 43         75 $part->{prepass} = 0; # pass >= prepass
637             }
638             } else {
639 0         0 next; # no change
640             }
641             } else { # IMP_PREPASS
642 4 100       17 next if $part->{prepass} == IMP_MAXOFFSET; # no change
643 2 50       7 if ( $offset == IMP_MAXOFFSET ) {
    0          
644 2         6 $part->{prepass} = IMP_MAXOFFSET;
645             } elsif ( $offset > $part->{prepass} ) {
646 0         0 $part->{prepass} = $offset;
647             } else {
648 0         0 next; # no change
649             }
650             }
651              
652             # pass/prepass got updated, so we might pass some more data
653 67         143 push @fwd, $fwd_collect->($dir,$pi,0);
654              
655             } elsif ( $rtype == IMP_REPLACE ) {
656 31         71 my ($dir,$offset,$newdata) = @$rv;
657 31 50       75 $DEBUG && debug(
658             "callback[$dir][$pi] $rtype $dir $offset len=%d",
659             length($newdata));
660 31 50       92 ref(my $part = $parts[$dir][$pi])
661             or die "called replace for passed data";
662 31         53 my $ibuf = $part->{ibuf};
663              
664             # sanity checks
665 31 50       70 die "called replace although pass=IMP_MAXOFFSET" if ! $part;
666 31 50       74 die "no replace with IMP_MAXOFFSET" if $offset == IMP_MAXOFFSET;
667             die "called replace for already passed data"
668 31 50       82 if $ibuf->[0]{start} > $offset;
669              
670 31         77 while (@$ibuf) {
671 71         111 my $buf = $ibuf->[0];
672 71 100       144 if ( $offset >= $buf->{end} ) {
673             # replace complete buffer
674 49 50       104 $DEBUG && debug(
675             "replace complete buf $buf->{start}..$buf->{end}");
676 49 100 66     227 if ( ! defined($buf->{data})
677             or $buf->{data} ne $newdata ) {
678 48         82 $buf->{rtype} = IMP_REPLACE;
679 48         84 $buf->{data} = $newdata;
680             $part->{adjust} +=
681 48         113 length($newdata) - $buf->{end} + $buf->{start};
682 48         80 $newdata = ''; # in the next buffers replace with ''
683             }
684 49         101 push @fwd,[ $pi,$dir,$buf ];
685 49         83 shift(@$ibuf);
686 49 100       115 if ( ! @$ibuf ) {
687             # all bufs eaten
688             die "called replace for future data"
689 5 50       20 if $buf->{end}<$offset;
690             @$ibuf = $new_buf->( %$buf,
691             data => '',
692             start => $buf->{end},
693             end => $buf->{end},
694             gstart => $buf->{gend},
695             # packet types cannot get partial replacement
696             # at end
697 5 50       45 $buf->{dtype} > 0 ? ( dtype => 0 ):()
698             );
699             # remove eof from buf in @fwd because we added
700             # new one
701 5         16 $fwd[-1][2]{eof} = 0;
702 5         40 last;
703             }
704 44 100       162 last if $buf->{end} == $offset;
705             } else {
706             # split buffer and replace first part
707 22 50       56 $DEBUG && debug(
708             "replace - split buf $buf->{start}..$offset..$buf->{end}");
709 22         59 $split_buf->($ibuf,0,$offset-$buf->{start});
710 22         53 redo;
711             }
712             }
713              
714             } elsif ( $rtype == IMP_REPLACE_LATER ) {
715 0         0 my ($dir,$offset) = @$rv;
716 0 0       0 $DEBUG && debug("callback[$dir][$pi] $rtype $offset");
717 0 0       0 ref(my $part = $parts[$dir][$pi])
718             or die "called replace for passed data";
719 0         0 my $ibuf = $part->{ibuf};
720 0 0       0 $_->{replace_later} == IMP_MAXOFFSET and next; # no change
721              
722             # sanity checks
723 0 0       0 die "called replace_later although pass=IMP_MAX_OFFSET"
724             if ! $part;
725             die "called replace for already passed data" if
726             $offset != IMP_MAXOFFSET and
727 0 0 0     0 $ibuf->[0]{start} > $offset;
728              
729 0 0       0 if ( $offset == IMP_MAXOFFSET ) {
    0          
730 0         0 $_->{replace_later} = IMP_MAXOFFSET;
731             # change all to replace_later
732 0         0 $_->{data} = undef for(@$ibuf);
733 0         0 next;
734             } elsif ( $offset <= $part->{replace_later} ) {
735             # no change
736             } else {
737 0         0 $part->{replace_later} = $offset;
738 0         0 for(@$ibuf) {
739 0 0       0 defined($_->{data}) or next; # already replace_later
740 0 0       0 my $len = length($_->{data}) or last; # dummy buffer
741 0 0       0 if ( $_->{start} + $len <= $offset ) {
742 0         0 $_->{data} = undef;
743             } else {
744 0         0 $part->{replace_later} = 0;
745 0         0 last;
746             }
747             }
748             }
749             } else {
750 0 0       0 $DEBUG && debug("callback[.][$pi] $rtype @$rv");
751 0         0 die "don't know how to handle rtype $rtype";
752             }
753             }
754              
755             # pass to next part/output
756 75 100       223 $exec_fwd->(@fwd) if @fwd;
757 11         133 };
758              
759             # While we are in $part_in function we will only spool callbacks and process
760             # them at the end. Otherwise $dataf might cause call of callback which then
761             # causes call of dataf etc - which makes debugging a nightmare.
762              
763 11         23 my $collect_callbacks;
764             $global_in = sub {
765 31     31   72 my ($dir,$data,$offset,$dtype) = @_;
766              
767 31   100     175 my %buf = (
768             data => $data,
769             dtype => $dtype // IMP_DATA_STREAM,
770             rtype => IMP_PASS,
771             eof => $data eq '',
772             );
773              
774 31         74 my $adjust = 0;
775 31 100       66 my $pi = $dir ? $#imp:0; # enter into first or last part
776 31         43 my $np;
777 31         43 while (1) {
778 34 100       100 ref( $np = $parts[$dir][$pi] ) and last;
779 5         9 $adjust += $np;
780 5 50       11 $pi += $dir?-1:1;
781 5 100 66     24 if ( $pi<0 or $pi>$#imp ) {
782 2 50       7 $DEBUG && debug("all skipped");
783 2 50       50 if ( my $cb = $fwd_up->($dir,$new_buf->(%buf))) {
784 0         0 $self->run_callback($cb);
785             }
786 2         9 return;
787             }
788             }
789              
790 29 50       68 return if ! ref $np; # got IMP_PASS IMP_MAXOFFSET for all
791              
792 29         62 my $ibuf_end = $np->{ibuf}[-1]{gend};
793 29 50       61 if ( ! $offset ) {
    0          
    0          
794             # no gap between data
795 29         60 $buf{gstart} = $ibuf_end;
796             } elsif ( $offset < $ibuf_end ) {
797 0         0 die "overlapping data";
798             } elsif ( $offset > $ibuf_end ) {
799             # gap between data
800 0         0 $buf{gstart} = $offset;
801 0         0 $buf{gap} = $offset - $ibuf_end;
802             } else {
803             # there was no need for giving offset
804 0         0 $buf{gstart} = $ibuf_end;
805             }
806 29         67 $buf{gend} = $buf{gstart} + length($data);
807 29         52 $buf{start} = $buf{gstart} + $adjust;
808 29         61 $buf{end} = $buf{gend} + $adjust;
809              
810 29   50     129 $collect_callbacks ||= [];
811 29         104 $part_in->( $pi,$dir, $new_buf->(%buf));
812              
813 29         88 while ( my $cb = shift(@$collect_callbacks)) {
814 75         321 $_imp_callback->(@$cb);
815             }
816             $collect_callbacks = undef
817 11         57 };
  29         273  
818              
819             # wrapper which spools callbacks if within dataf
820             $imp_callback = sub {
821 75 50   75   158 if ( $collect_callbacks ) {
822             # only spool and execute later
823 75         156 push @$collect_callbacks, [ @_ ];
824 75         202 return;
825             }
826 0         0 return $_imp_callback->(@_)
827 11         36 };
828              
829             # setup callbacks
830 11         77 $imp[$_]->set_callback( $imp_callback,$_ ) for (0..$#imp);
831              
832             # make some closures available within methods
833 11         29 $self->{dataf} = $global_in;
834             $self->{closef} = sub {
835 0     0   0 $global_in = $part_in = $imp_callback = $_imp_callback = undef;
836 0         0 @parts = ();
837 11         49 };
838 11         151 return $self;
839             }
840              
841             sub data {
842 31     31 1 80 my $self = shift;
843 31         74 $self->{dataf}(@_);
844             }
845              
846             sub DESTROY {
847 11     11   3263 my $closef = shift->{closef};
848 11 50       68 $closef->() if $closef;
849             }
850              
851              
852             1;
853              
854             __END__