File Coverage

blib/lib/Protocol/HTTP2/Stream.pm
Criterion Covered Total %
statement 184 221 83.2
branch 84 128 65.6
condition 15 27 55.5
subroutine 27 30 90.0
pod 0 24 0.0
total 310 430 72.0


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Stream;
2 10     10   45 use strict;
  10         13  
  10         343  
3 10     10   41 use warnings;
  10         15  
  10         293  
4 10         3271 use Protocol::HTTP2::Constants qw(:states :endpoints :settings :frame_types
5 10     10   41 :limits :errors);
  10         13  
6 10     10   57 use Protocol::HTTP2::HeaderCompression qw( headers_decode );
  10         13  
  10         570  
7 10     10   49 use Protocol::HTTP2::Trace qw(tracer);
  10         15  
  10         21782  
8              
9             # Streams related part of Protocol::HTTP2::Conntection
10              
11             sub new_stream {
12 16     16 0 2047 my $self = shift;
13 16 50       1997 return undef if $self->goaway;
14              
15 16 50       2741 $self->{last_stream} += 2
    50          
16             if exists $self->{streams}->{ $self->{type} == CLIENT ? 1 : 2 };
17 16         2128 $self->{streams}->{ $self->{last_stream} } = {
18             'state' => IDLE,
19             'weight' => DEFAULT_WEIGHT,
20             'stream_dep' => 0,
21             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
22             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
23             };
24 16         4048 return $self->{last_stream};
25             }
26              
27             sub new_peer_stream {
28 18     18 0 2651 my $self = shift;
29 18         2634 my $stream_id = shift;
30 18 50 33     2912 if ( $stream_id < $self->{last_peer_stream}
    50          
31             || ( $stream_id % 2 ) == ( $self->{type} == CLIENT ) ? 1 : 0
32             || $self->goaway )
33             {
34 0         0 tracer->error("Peer send invalid stream id: $stream_id\n");
35 0         0 $self->error(PROTOCOL_ERROR);
36 0         0 return undef;
37             }
38 18         2646 $self->{last_peer_stream} = $stream_id;
39 18 50       2657 if ( $self->dec_setting(SETTINGS_MAX_CONCURRENT_STREAMS) <=
40             $self->{active_peer_streams} )
41             {
42 0         0 tracer->warning("SETTINGS_MAX_CONCURRENT_STREAMS exceeded\n");
43 0         0 $self->stream_error( $stream_id, REFUSED_STREAM );
44 0         0 return undef;
45             }
46 18         2189 $self->{active_peer_streams}++;
47 18         2177 tracer->debug("Active streams: $self->{active_peer_streams}");
48 18         2196 $self->{streams}->{$stream_id} = {
49             'state' => IDLE,
50             'weight' => DEFAULT_WEIGHT,
51             'stream_dep' => 0,
52             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
53             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
54             };
55 18 100       2228 $self->{on_new_peer_stream}->($stream_id)
56             if exists $self->{on_new_peer_stream};
57              
58 18         4182 return $self->{last_peer_stream};
59             }
60              
61             sub stream {
62 61     61 0 9403 my ( $self, $stream_id ) = @_;
63 61 100       12053 return undef unless exists $self->{streams}->{$stream_id};
64              
65 50         13894 $self->{streams}->{$stream_id};
66             }
67              
68             # stream_state ( $self, $stream_id, $new_state?, $pending? )
69              
70             sub stream_state {
71 51     51 0 8521 my $self = shift;
72 51         8597 my $stream_id = shift;
73 51 50       8482 return undef unless exists $self->{streams}->{$stream_id};
74 51         8506 my $s = $self->{streams}->{$stream_id};
75              
76 51 50       8781 if (@_) {
77 51         8142 my ( $new_state, $pending ) = @_;
78              
79 51 100       8566 if ($pending) {
80 1         3 $self->stream_pending_state( $stream_id, $new_state );
81             }
82             else {
83 50 50       8265 $self->{on_change_state}->( $stream_id, $s->{state}, $new_state )
84             if exists $self->{on_change_state};
85              
86 50         8143 $s->{state} = $new_state;
87              
88             # Exec callbacks for new state
89 50 100 100     8356 if ( exists $s->{cb} && exists $s->{cb}->{ $s->{state} } ) {
90 22         4242 for my $cb ( @{ $s->{cb}->{ $s->{state} } } ) {
  22         8093  
91 22         4127 $cb->();
92             }
93             }
94              
95             # Cleanup
96 50 100       22041 if ( $new_state == CLOSED ) {
97 22 100 66     4061 $self->{active_peer_streams}--
98             if $self->{active_peer_streams}
99             && ( ( $stream_id % 2 ) ^ ( $self->{type} == CLIENT ) );
100 22         4084 tracer->info(
101             "Active streams: $self->{active_peer_streams} $stream_id");
102 22         4160 for my $key ( keys %$s ) {
103 190 100       34693 next if grep { $key eq $_ } (
  950         371972  
104             qw(state weight stream_dep
105             fcw_recv fcw_send )
106             );
107 80         22440 delete $s->{$key};
108             }
109             }
110             }
111             }
112              
113 51         63616 $s->{state};
114             }
115              
116             sub stream_pending_state {
117 106     106 0 18779 my $self = shift;
118 106         18698 my $stream_id = shift;
119 106 50       19001 return undef unless exists $self->{streams}->{$stream_id};
120 106         19136 my $s = $self->{streams}->{$stream_id};
121 106 100       19024 if (@_) {
122 2         2 $s->{pending_state} = shift;
123 2 100       6 $self->{pending_stream} =
124             defined $s->{pending_state} ? $stream_id : undef;
125             }
126 106         37399 $s->{pending_state};
127             }
128              
129             sub stream_promised_sid {
130 104     104 0 18080 my $self = shift;
131 104         18407 my $stream_id = shift;
132 104 50       18118 return undef unless exists $self->{streams}->{$stream_id};
133 104         18008 my $s = $self->{streams}->{$stream_id};
134 104 50       17990 $s->{promised_sid} = shift if @_;
135 104         36430 $s->{promised_sid};
136             }
137              
138             sub stream_cb {
139 23     23 0 4195 my ( $self, $stream_id, $state, $cb ) = @_;
140              
141 23 50       4171 return undef unless exists $self->{streams}->{$stream_id};
142              
143 23         4102 push @{ $self->{streams}->{$stream_id}->{cb}->{$state} }, $cb;
  23         15747  
144             }
145              
146             sub stream_frame_cb {
147 10     10 0 2130 my ( $self, $stream_id, $frame, $cb ) = @_;
148              
149 10 50       2341 return undef unless exists $self->{streams}->{$stream_id};
150              
151 10         2126 push @{ $self->{streams}->{$stream_id}->{frame_cb}->{$frame} }, $cb;
  10         6317  
152             }
153              
154             sub stream_data {
155 36     36 0 6187 my $self = shift;
156 36         6307 my $stream_id = shift;
157 36 50       6248 return undef unless exists $self->{streams}->{$stream_id};
158 36         6179 my $s = $self->{streams}->{$stream_id};
159              
160 36 100       6419 if (@_) {
161              
162             # Exec callbacks for data
163 17 100 66     3136 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&DATA} ) {
164 5         1022 for my $cb ( @{ $s->{frame_cb}->{&DATA} } ) {
  5         2001  
165 5         1172 $cb->( $_[0] );
166             }
167             }
168             else {
169 12         6030 $s->{data} .= shift;
170             }
171             }
172              
173 36         12120 $s->{data};
174             }
175              
176             # Header Block -- The entire set of encoded header field representations
177             sub stream_header_block {
178 24     24 0 4075 my $self = shift;
179 24         4197 my $stream_id = shift;
180 24 50       4107 return undef unless exists $self->{streams}->{$stream_id};
181 24         4124 my $s = $self->{streams}->{$stream_id};
182              
183 24 50       4323 $s->{header_block} .= shift if @_;
184              
185 24         8622 $s->{header_block};
186             }
187              
188             sub stream_headers {
189 22     22 0 4193 my $self = shift;
190 22         4367 my $stream_id = shift;
191 22 50       4330 return undef unless exists $self->{streams}->{$stream_id};
192 22 50       4213 $self->{streams}->{$stream_id}->{headers} = shift if @_;
193 22         9091 $self->{streams}->{$stream_id}->{headers};
194             }
195              
196             sub stream_pp_headers {
197 0     0 0 0 my $self = shift;
198 0         0 my $stream_id = shift;
199 0 0       0 return undef unless exists $self->{streams}->{$stream_id};
200 0         0 $self->{streams}->{$stream_id}->{pp_headers};
201             }
202              
203             # Explicit content-length in headers
204             sub stream_length {
205 4     4 0 5 my $self = shift;
206 4         7 my $stream_id = shift;
207 4 50       15 return undef unless exists $self->{streams}->{$stream_id};
208 4 100       16 $self->{streams}->{$stream_id}->{length} = shift if @_;
209 4         13 $self->{streams}->{$stream_id}->{length};
210             }
211              
212             sub stream_headers_done {
213 24     24 0 4254 my $self = shift;
214 24         4110 my $stream_id = shift;
215 24 50       4093 return undef unless exists $self->{streams}->{$stream_id};
216 24         4076 my $s = $self->{streams}->{$stream_id};
217              
218 24         4112 my $res =
219             headers_decode( $self, \$s->{header_block}, 0,
220             length $s->{header_block}, $stream_id );
221              
222 24         4077 tracer->debug("Headers done for stream $stream_id\n");
223              
224 24 50       4082 return undef unless defined $res;
225              
226             # Clear header_block
227 24         4054 $s->{header_block} = '';
228              
229 24         4198 my $eh = $self->decode_context->{emitted_headers};
230 24   66     4105 my $is_response = $self->{type} == CLIENT && !$s->{promised_sid};
231              
232             return undef
233 24 50       4106 unless $self->validate_headers( $eh, $stream_id, $is_response );
234              
235 24 50       4216 if ( $s->{promised_sid} ) {
236 0         0 $self->{streams}->{ $s->{promised_sid} }->{pp_headers} = $eh;
237             }
238             else {
239 24         8082 $s->{headers} = $eh;
240             }
241              
242             # Exec callbacks for headers
243 24 100 66     4209 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&HEADERS} ) {
244 5         1100 for my $cb ( @{ $s->{frame_cb}->{&HEADERS} } ) {
  5         2096  
245 5         1046 $cb->($eh);
246             }
247             }
248              
249             # Clear emitted headers
250 24         4240 $self->decode_context->{emitted_headers} = [];
251              
252 24         8281 return 1;
253             }
254              
255             sub validate_headers {
256 24     24 0 4092 my ( $self, $headers, $stream_id, $is_response ) = @_;
257 24         4018 my $pseudo_flag = 1;
258 24         3961 my %pseudo_hash = ();
259 24 100       4063 my @h = $is_response ? (qw(:status)) : (
260             qw(:method :scheme :authority
261             :path)
262             );
263 24         4135 for my $i ( 0 .. @$headers / 2 - 1 ) {
264 110         18059 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
265 110 100       18076 if ( $h =~ /^\:/ ) {
266 60 50       9871 if ( !$pseudo_flag ) {
  204 50       67108  
    50          
267 0         0 tracer->warning(
268             "pseudo-header <$h> appears after a regular header");
269 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
270 0         0 return undef;
271             }
272             elsif ( !grep { $_ eq $h } @h ) {
273 0         0 tracer->warning("invalid pseudo-header <$h>");
274 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
275 0         0 return undef;
276             }
277             elsif ( exists $pseudo_hash{$h} ) {
278 0         0 tracer->warning("repeated pseudo-header <$h>");
279 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
280 0         0 return undef;
281             }
282              
283 60         9946 $pseudo_hash{$h} = $v;
284 60         19681 next;
285             }
286              
287 50 100       8092 $pseudo_flag = 0 if $pseudo_flag;
288              
289 50 50 33     12772 if ( $h eq 'connection' ) {
    50          
    100          
290 0         0 tracer->warning("connection header are not valid in http/2");
291 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
292 0         0 return undef;
293             }
294             elsif ( $h eq 'te' && $v ne 'trailers' ) {
295 0         0 tracer->warning("TE header can contain only value 'trailers'");
296 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
297 0         0 return undef;
298             }
299             elsif ( $h eq 'content-length' ) {
300 2         20 $self->stream_length( $stream_id, $v );
301             }
302             }
303              
304 24         4295 for my $h (@h) {
305 60 50       20916 next if exists $pseudo_hash{$h};
306              
307 0         0 tracer->warning("missed mandatory pseudo-header $h");
308 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
309 0         0 return undef;
310             }
311              
312 24         8152 1;
313             }
314              
315             # RST_STREAM for stream errors
316             sub stream_error {
317 5     5 0 1117 my ( $self, $stream_id, $error ) = @_;
318 5         1503 $self->enqueue( RST_STREAM, 0, $stream_id, $error );
319             }
320              
321             # Flow control windown of stream
322             sub _stream_fcw {
323 62     62   12157 my $dir = shift;
324 62         12067 my $self = shift;
325 62         11760 my $stream_id = shift;
326 62 50       12177 return undef unless exists $self->{streams}->{$stream_id};
327 62         11801 my $s = $self->{streams}->{$stream_id};
328              
329 62 100       11719 if (@_) {
330 42         8156 $s->{$dir} += shift;
331 42         8202 tracer->debug( "Stream $stream_id $dir now is " . $s->{$dir} . "\n" );
332             }
333 62         35327 $s->{$dir};
334             }
335              
336             sub stream_fcw_send {
337 40     40 0 7767 _stream_fcw( 'fcw_send', @_ );
338             }
339              
340             sub stream_fcw_recv {
341 22     22 0 4066 _stream_fcw( 'fcw_recv', @_ );
342             }
343              
344             sub stream_fcw_update {
345 0     0 0 0 my ( $self, $stream_id ) = @_;
346              
347             # TODO: check size of data of stream in memory
348 0         0 tracer->debug("update fcw recv of stream $stream_id\n");
349 0         0 $self->stream_fcw_recv( $stream_id, DEFAULT_INITIAL_WINDOW_SIZE );
350 0         0 $self->enqueue( WINDOW_UPDATE, 0, $stream_id, DEFAULT_INITIAL_WINDOW_SIZE );
351             }
352              
353             sub stream_blocked_data {
354 20     20 0 4075 my $self = shift;
355 20         3963 my $stream_id = shift;
356 20 50       4181 my $s = $self->{streams}->{$stream_id} or return undef;
357              
358 20 50       4126 $s->{blocked_data} = shift if @_;
359 20         8215 $s->{blocked_data};
360             }
361              
362             sub stream_send_blocked {
363 0     0 0 0 my ( $self, $stream_id ) = @_;
364 0 0       0 my $s = $self->{streams}->{$stream_id} or return undef;
365              
366 0 0 0     0 if ( length( $s->{blocked_data} )
367             && $self->stream_fcw_send($stream_id) != 0 )
368             {
369 0         0 $self->send_data($stream_id);
370             }
371             }
372              
373             sub stream_weight {
374 2     2 0 3 my ( $self, $stream_id, $weight ) = @_;
375 2 50       5 return undef unless exists $self->{streams}->{$stream_id};
376 2         3 my $s = $self->{streams}->{$stream_id};
377              
378 2 50       5 $s->{weight} = $weight if defined $weight;
379 2         3 $s->{weight};
380             }
381              
382             sub stream_end {
383 25     25 0 5088 my ( $self, $stream_id, $end_flag ) = @_;
384 25 50       5202 return undef unless exists $self->{streams}->{$stream_id};
385 25         5139 my $s = $self->{streams}->{$stream_id};
386              
387 25 100       4984 $s->{end} = $end_flag if defined $end_flag;
388 25         10267 $s->{end};
389             }
390              
391             sub stream_reprio {
392 7     7 0 12 my ( $self, $stream_id, $exclusive, $stream_dep ) = @_;
393 7 50       12 return undef unless exists $self->{streams}->{$stream_id};
394 7         8 my $s = $self->{streams};
395              
396 7 100       13 if ( $s->{$stream_id}->{stream_dep} != $stream_dep ) {
397              
398             # check if new stream_dep is stream child
399 4 50       8 if ( $stream_dep != 0 ) {
400 4         3 my $sid = $stream_dep;
401 4         12 while ( $sid = $s->{$sid}->{stream_dep} ) {
402 5 100       13 next unless $sid == $stream_id;
403              
404             # Child take my stream dep
405 1         3 $s->{$stream_dep}->{stream_dep} =
406             $s->{$stream_id}->{stream_dep};
407 1         2 last;
408             }
409             }
410              
411             # Set new stream dep
412 4         5 $s->{$stream_id}->{stream_dep} = $stream_dep;
413             }
414              
415 7 100       12 if ($exclusive) {
416              
417             # move all siblings to childs
418 1         3 for my $sid ( keys %$s ) {
419             next
420 3 100 66     17 if $s->{$sid}->{stream_dep} != $stream_dep
421             || $sid == $stream_id;
422              
423 2         5 $s->{$sid}->{stream_dep} = $stream_id;
424             }
425             }
426              
427 7         20 return 1;
428             }
429              
430             1;