File Coverage

blib/lib/Protocol/SPDY/Stream.pm
Criterion Covered Total %
statement 76 147 51.7
branch 19 62 30.6
condition 6 19 31.5
subroutine 26 46 56.5
pod 37 37 100.0
total 164 311 52.7


line stmt bran cond sub pod time code
1             package Protocol::SPDY::Stream;
2             $Protocol::SPDY::Stream::VERSION = '1.001';
3 4     4   456 use strict;
  4         5  
  4         135  
4 4     4   18 use warnings;
  4         5  
  4         117  
5 4     4   438 use parent qw(Mixin::Event::Dispatch);
  4         272  
  4         15  
6              
7             =head1 NAME
8              
9             Protocol::SPDY::Stream - single stream representation within a L connection
10              
11             =head1 VERSION
12              
13             version 1.001
14              
15             =head1 SYNOPSIS
16              
17             # You'd likely be using a subclass or other container here instead
18             my $spdy = Protocol::SPDY->new;
19             # Create initial stream - this example is for an HTTP request
20             my $stream = $spdy->create_frame(
21             # 0 is the default, use 1 if you don't want anything back from the
22             # other side, for example server push
23             unidirectional => 0,
24             # Set to 1 if we're not expecting to send any further frames on this stream
25             # - a GET request with no additional headers for example
26             fin => 0,
27             # Normally headers are provided as an arrayref to preserve order,
28             # but for convenience you could use a hashref instead
29             headers => [
30             ':method' => 'PUT',
31             ':path:' => '/some/path?some=param',
32             ':version' => 'HTTP/1.1',
33             ':host' => 'localhost:1234',
34             ':scheme' => 'https',
35             ]
36             );
37             # Update the headers - regular HTTP allows trailing headers, with SPDY
38             # you can send additional headers at any time
39             $stream->headers(
40             # There's more to come
41             fin => 0,
42             # Again, arrayref or hashref are allowed here
43             headers => [
44             'content-length' => 5,
45             ]
46             );
47             # Normally scalar (byte) data here, although scalar ref (\'something')
48             # and Future are also allowed
49             $stream->send_data('hello');
50             # as a scalar ref:
51             # $stream->send_data(\(my $buffer = "some data"));
52             # as a Future:
53             # $stream->send_data(my $f = Future->new);
54             # $f->done('the data you expected');
55             # If you want to cancel the stream at any time, use ->reset
56             $stream->reset('CANCEL'); # or STREAM_CANCEL if you've imported the constants
57             # Normally you'd indicate finished by marking a data packet as the final one:
58             $stream->send_data('', fin => 1);
59             # ... and an empty data packet should also be fine:
60             # $stream->send_data('', fin => 1);
61              
62             =head1 DESCRIPTION
63              
64             =head2 HTTP semantics
65              
66             Each stream corresponds to a single HTTP request/response exchange. The request
67             is contained within the SYN_STREAM frame, with optional additional HEADERS
68             after the initial stream creation, and the response will be in the SYN_REPLY,
69             which must at least include the C<:status> and C<:version> headers (so
70             the SYN_REPLY must contain the C<200 OK> response, you can't send that in
71             a later HEADERS packet).
72              
73             =head2 Window handling
74              
75             Each outgoing data frame will decrement the window size; a data frame
76             can only be sent if the data length is less than or equal to the remaining
77             window size. Sending will thus be paused if the window size is insufficient;
78             note that it may be possible for the window size to be less than zero.
79              
80             * Each frame we receive and process will trigger a window update response.
81             This applies to data frames only; windowing does not apply to control frames.
82             If we have several frames queued up for processing, we will defer the window
83             update until we know the total buffer space freed by processing those frames.
84             * Each data frame we send will cause an equivalent reduction in our window
85             size
86              
87             * Extract all frames from buffer
88             * For each frame:
89             * If we have a stream ID for the frame, pass it to that stream
90             * Stream processing for new data
91             * Calculate total from all new data frames
92             * Send window update if required
93              
94             =head2 Error handling
95              
96             There are two main types of error case: stream-level errors, which can
97             be handled by closing that stream, or connection-level errors, where
98             things have gone so badly wrong that the entire connection needs to be
99             dropped.
100              
101             Stream-level errors are handled by RST_STREAM frames.
102              
103             Connection-level errors are typically cases where framing has gone out
104             of sync (compression failures, incorrect packet lengths, etc.) and
105             these are handled by sending a single GOAWAY frame then closing the
106             connection immediately.
107              
108             =head2 Server push support
109              
110             The server can push additional streams to the client to avoid the unnecessary
111             extra SYN_STREAM request/response cycle for additional resources that the server
112             knows will be needed to fulfull the main request.
113              
114             A server push response is requested with L - this example involves
115             a single associated stream:
116              
117             try {
118             my $assoc = $stream->push_stream;
119             $assoc->closed->on_ready(sub {
120             # Associated stream completed or failed - either way,
121             # we can now start sending the main data
122             $stream->send_data($html);
123             })->on_fail(sub {
124             # The other side might already have the data or not
125             # support server push, so don't panic if our associated
126             # stream closes before we expected it
127             warn "Associated stream was rejected";
128             });
129             } catch {
130             # We'll get an exception if we tried to push data on a stream
131             # we'd already marked as FIN on our side.
132             warn "Our code is broken";
133             $stream->connection->goaway;
134             };
135              
136             You can then send that stream using L as usual:
137              
138             $assoc->start(
139             headers => {
140             ':scheme' => 'https',
141             ':host' => 'localhost',
142             ':path' => '/image/logo.png',
143             }
144             );
145              
146             Note that associated streams can only be initiated before the
147             main stream is in FIN state.
148              
149             Generally it's safest to create all the associated streams immediately
150             after the initial SYN_STREAM request has been received from the client,
151             since that will pass enough information back that the client will know
152             how to start arranging the responses for caching. You should then be
153             able to send data on the streams as and when it becomes available. The
154             L C method may be useful here.
155              
156             Attempting to initiate server-pushed streams after sending content is
157             liable to hit race conditions - see section 3.3.1 in the SPDY spec.
158              
159             =cut
160              
161 4     4   16794 use Protocol::SPDY::Constants ':all';
  4         7  
  4         653  
162 4     4   19 use Scalar::Util ();
  4         4  
  4         112  
163              
164             use overload
165             '""' => 'to_string',
166 58     58   349 bool => sub { 1 },
167 4     4   1014 fallback => 1;
  4         777  
  4         31  
168              
169             =head1 METHODS
170              
171             =cut
172              
173             =head2 new
174              
175             Instantiates a new stream. Expects the following named parameters:
176              
177             =over 4
178              
179             =item * connection - the L subclass which is
180             managing this side of the connection
181              
182             =item * stream_id - the ID to use for this stream
183              
184             =item * version - SPDY version, usually 3
185              
186             =back
187              
188             =cut
189              
190             sub new {
191 9     9 1 23 my $class = shift;
192 9         29 my %args = @_;
193 9         21 my $fin = delete $args{fin};
194 9         16 my $uni = delete $args{uni};
195 9         48 my $self = bless {
196             %args,
197             from_us => 1,
198             }, $class;
199 9 50       239 $self->{transfer_window} = $self->initial_window_size unless exists $self->{transfer_window};
200 9         35 Scalar::Util::weaken($self->{connection});
201 9 50       23 $self->finished->done if $fin;
202 9 50       21 $self->remote_finished->done if $uni;
203 9         30 $self;
204             }
205              
206             =head2 new_from_syn
207              
208             Constructs a new instance from a L
209             frame object.
210              
211             =cut
212              
213             sub new_from_syn {
214 0     0 1 0 my $class = shift;
215 0         0 my $frame = shift;
216 0         0 my %args = @_;
217 0         0 my $self = bless {
218             id => $frame->stream_id,
219             version => $frame->version,
220             connection => $args{connection},
221             from_us => 0,
222             }, $class;
223 0         0 Scalar::Util::weaken($self->{connection});
224 0         0 $self->update_received_headers_from($frame);
225              
226             # Check whether we were expecting any more data
227 0 0       0 $self->remote_finished->done if $frame->fin;
228 0 0       0 $self->finished->done if $frame->uni;
229 0 0       0 if(my $parent_id = $frame->associated_stream_id) {
230             # We've received a unidirectional frame from the other
231             # side, this means it's server-push stream.
232 0         0 $self->{associated_stream_id} = $parent_id;
233 0 0       0 die "not unidirectional?" unless $frame->uni;
234 0 0       0 $self->associated_stream->invoke_event(push => $self) if $self->associated_stream;
235 0         0 $self->accepted->done;
236             }
237 0         0 $self;
238             }
239              
240             =head2 update_received_headers_from
241              
242             Updates L from the given frame.
243              
244             =cut
245              
246             sub update_received_headers_from {
247 8     8 1 9 my $self = shift;
248 8         13 my $frame = shift;
249 8         71 my $hdr = $frame->headers_as_simple_hashref;
250 8         28 $self->{received_headers}{$_} = $hdr->{$_} for keys %$hdr;
251 8         17 $self
252             }
253              
254             =head2 from_us
255              
256             Returns true if we initiated this stream.
257              
258             =cut
259              
260 2 50   2 1 12 sub from_us { shift->{from_us} ? 1 : 0 }
261              
262             =head2 id
263              
264             Returns the ID for this stream.
265              
266             =cut
267              
268 60     60 1 2054 sub id { shift->{id} }
269              
270             =head2 seen_reply
271              
272             Returns true if we have seen a reply for this stream yet.
273              
274             =cut
275              
276 14 100   14 1 85 sub seen_reply { shift->{seen_reply} ? 1 : 0 }
277              
278             =head2 connection
279              
280             Returns the L instance which owns us.
281              
282             =cut
283              
284 10     10 1 45 sub connection { shift->{connection} }
285              
286             =head2 priority
287              
288             Returns the priority for this stream (0-7).
289              
290             =cut
291              
292 8     8 1 25 sub priority { shift->{version} }
293              
294             =head2 version
295              
296             Returns the SPDY version for this stream (probably 3).
297              
298             =cut
299              
300 8     8 1 59 sub version { shift->{version} }
301              
302             =head2 syn_frame
303              
304             Generates a SYN_STREAM frame for starting this stream.
305              
306             =cut
307              
308             sub syn_frame {
309 8     8 1 12 my $self = shift;
310 8         15 my %args = @_;
311 8   50     48 $args{headers} ||= [];
312 8         38 Protocol::SPDY::Frame::Control::SYN_STREAM->new(
313             %args,
314             associated_stream_id => $self->associated_stream_id,
315             stream_id => $self->id,
316             priority => $self->priority,
317             slot => 0,
318             version => $self->version,
319             );
320             }
321              
322             =head2 sent_header
323              
324             Returns the given header from our recorded list of sent headers
325              
326             =cut
327              
328 0     0 1 0 sub sent_header { $_[0]->{sent_headers}{$_[1]} }
329              
330             =head2 sent_headers
331              
332             Returns the hashref of all sent headers. Please don't change the value, it
333             might break something: changing this will B send any updates to the
334             other side.
335              
336             =cut
337              
338 0     0 1 0 sub sent_headers { $_[0]->{sent_headers} }
339              
340             =head2 received_header
341              
342             Returns the given header from our recorded list of received headers.
343              
344             =cut
345              
346 2     2 1 716 sub received_header { $_[0]->{received_headers}{$_[1]} }
347              
348             =head2 received_headers
349              
350             Returns the hashref of all received headers.
351              
352             =cut
353              
354 0     0 1 0 sub received_headers { $_[0]->{received_headers} }
355              
356             =head2 handle_frame
357              
358             Attempt to handle the given frame.
359              
360             =cut
361              
362             sub handle_frame {
363 10     10 1 12 my $self = shift;
364 10         11 my $frame = shift;
365              
366 10 50       44 if($frame->is_data) {
    50          
    100          
    100          
    50          
    0          
367 0         0 my $len = length($frame->payload);
368 0         0 $self->invoke_event(data => $frame->payload);
369 0         0 $self->queue_window_update($len);
370             } elsif($frame->type_name eq 'WINDOW_UPDATE') {
371 0         0 my $delta = $frame->window_delta;
372 0         0 $self->{transfer_window} += $delta;
373 0         0 $self->invoke_event(transfer_window => $self->transfer_window, $delta);
374             } elsif($frame->type_name eq 'RST_STREAM') {
375 2 50       7 return $self->accepted->fail($frame->status_code_as_text) if $self->from_us;
376 0         0 $self->closed->fail($frame->status_code_as_text);
377             } elsif($frame->type_name eq 'SYN_REPLY') {
378 6 50       13 die "SYN_REPLY on a stream which has already been refused or replied" if $self->accepted->is_ready;
379 6         37 $self->update_received_headers_from($frame);
380 6         12 $self->accepted->done;
381 6         2090 $self->replied->done;
382             } elsif($frame->type_name eq 'HEADERS') {
383 2 50       7 die "HEADERS on a stream which has not yet seen a reply" unless $self->accepted->is_ready;
384 2         26 $self->update_received_headers_from($frame);
385 2         10 $self->invoke_event(headers => $frame);
386             } elsif($frame->type_name eq 'SYN_STREAM') {
387 0         0 die "SYN_STREAM on an existing stream";
388             } else {
389 0         0 die "what is $frame ?";
390             }
391              
392 8 50       2168 if($frame->fin) {
393 0 0       0 die "Duplicate FIN received" if $self->remote_fin;
394 0         0 $self->remote_finished->done;
395             }
396             }
397              
398             =head2 send_window_update
399              
400             Send out any pending window updates.
401              
402             =cut
403              
404             sub send_window_update {
405 0     0 1 0 my $self = shift;
406 0 0       0 return unless my $delta = delete $self->{pending_update};
407 0         0 $self->window_update(window_delta => $delta);
408 0         0 $self
409             }
410              
411             =head2 queue_window_update
412              
413             Request a window update due to data frame processing.
414              
415             =cut
416              
417             sub queue_window_update {
418 0     0 1 0 my $self = shift;
419 0         0 my $len = shift;
420 0 0       0 if(exists $self->{pending_update}) {
421 0         0 $self->{pending_update} += $len;
422             } else {
423 0         0 $self->{pending_update} = $len;
424 0         0 $self->connection->batch->on_done($self->curry::send_window_update);
425             }
426 0         0 $self
427             }
428              
429             =head2 queue_frame
430              
431             Asks our connection object to queue the given frame instance.
432              
433             =cut
434              
435             sub queue_frame {
436 10     10 1 14 my $self = shift;
437 10         15 my $frame = shift;
438 10 50       46 $self->finished->done if $frame->fin;
439 10         22 $self->connection->queue_frame($frame);
440             }
441              
442             =head2 start
443              
444             Start this stream off by sending a SYN_STREAM frame.
445              
446             =cut
447              
448             sub start {
449 8     8 1 16 my $self = shift;
450 8         29 $self->queue_frame($self->syn_frame(@_));
451 8         81 $self
452             }
453              
454             =head2 reply
455              
456             Sends a reply to the stream instantiation request.
457              
458             =cut
459              
460             sub reply {
461 0     0 1 0 my $self = shift;
462 0         0 my %args = @_;
463 0         0 my $flags = 0;
464 0 0       0 $flags |= FLAG_FIN if $args{fin};
465 0 0       0 $self->queue_frame(
466             Protocol::SPDY::Frame::Control::SYN_REPLY->new(
467             stream_id => $self->id,
468             version => $self->version,
469             headers => $args{headers},
470             fin => ($args{fin} ? 1 : 0),
471             )
472             );
473             }
474              
475             =head2 reset
476              
477             Sends a reset request for this frame.
478              
479             =cut
480              
481             sub reset {
482 0     0 1 0 my $self = shift;
483 0         0 my $status = shift;
484 0         0 $self->queue_frame(
485             Protocol::SPDY::Frame::Control::RST_STREAM->new(
486             stream_id => $self->id,
487             status => $status,
488             )
489             );
490             }
491              
492             =head2 push_stream
493              
494             Creates and returns a new C stream.
495              
496             Note that a pushed stream starts with a B< SYN_STREAM > frame but with
497             headers that are usually found in a B< SYN_REPLY > frame.
498              
499             =cut
500              
501             sub push_stream {
502 0     0 1 0 my $self = shift;
503 0 0       0 die "This stream is in FIN state" if $self->finished->is_ready;
504              
505 0         0 $self->connection->create_stream(
506             uni => 1,
507             fin => 0,
508             associated_stream_id => $self->id,
509             );
510             }
511              
512             =head2 headers
513              
514             Send out headers for this frame.
515              
516             =cut
517              
518             sub headers {
519 0     0 1 0 my $self = shift;
520 0         0 my %args = @_;
521 0         0 $self->queue_frame(
522             Protocol::SPDY::Frame::Control::HEADERS->new(
523             %args,
524             stream_id => $self->id,
525             version => $self->version,
526             )
527             );
528             }
529              
530             =head2 window_update
531              
532             Update information on the current window progress.
533              
534             =cut
535              
536             sub window_update {
537 0     0 1 0 my $self = shift;
538 0         0 my %args = @_;
539 0 0       0 die "No window_delta" unless defined $args{window_delta};
540 0         0 $self->queue_frame(
541             Protocol::SPDY::Frame::Control::WINDOW_UPDATE->new(
542             %args,
543             stream_id => $self->id,
544             version => $self->version,
545             )
546             );
547             }
548              
549             =head2 send_data
550              
551             Sends a data packet.
552              
553             =cut
554              
555             sub send_data {
556 2     2 1 55 my $self = shift;
557 2         4 my $data = shift;
558 2         4 my %args = @_;
559 2         7 $self->queue_frame(
560             Protocol::SPDY::Frame::Data->new(
561             %args,
562             stream_id => $self->id,
563             payload => $data,
564             )
565             );
566 2         39 $self
567             }
568              
569             =head1 METHODS - Accessors
570              
571             These provide read-only access to various pieces of state information.
572              
573             =head2 associated_stream_id
574              
575             Which stream we're associated to. Returns 0 if there isn't one.
576              
577             =cut
578              
579 8 50   8 1 51 sub associated_stream_id { shift->{associated_stream_id} || 0 }
580              
581             =head2 associated_stream
582              
583             The L for the associated stream
584             (the "parent" stream to this one, if it exists). Returns undef
585             if not found.
586              
587             =cut
588              
589             sub associated_stream {
590 0     0 1 0 my $self = shift;
591 0         0 $self->connection->stream_by_id($self->associated_stream_id)
592             }
593              
594             =head2 remote_fin
595              
596             Returns true if the remote has sent us a FIN (half-closed state).
597              
598             =cut
599              
600 0 0   0 1 0 sub remote_fin { shift->{remote_fin} ? 1 : 0 }
601              
602             =head2 local_fin
603              
604             Returns true if we have sent FIN to the remote (half-closed state).
605              
606             =cut
607              
608 0 0   0 1 0 sub local_fin { shift->{local_fin} ? 1 : 0 }
609              
610             =head2 initial_window_size
611              
612             Initial window size. Default is 64KB for a new stream.
613              
614             =cut
615              
616 9   50 9 1 67 sub initial_window_size { shift->{initial_window_size} // 65536 }
617              
618             =head2 transfer_window
619              
620             Remaining bytes in the current transfer window.
621              
622             =cut
623              
624 0     0 1 0 sub transfer_window { shift->{transfer_window} }
625              
626             =head2 to_string
627              
628             String representation of this stream, for debugging.
629              
630             =cut
631              
632             sub to_string {
633 0     0 1 0 my $self = shift;
634 0         0 'SPDY:Stream ID ' . $self->id
635             }
636              
637             =head1 METHODS - Futures
638              
639             The following L-returning methods are available. Attach events using
640             C, C or C or helpers such as C as usual:
641              
642             $stream->replied->then(sub {
643             # This also returns a Future, allowing chaining
644             $stream->send_data('...')
645             })->on_fail(sub {
646             die 'here';
647             });
648              
649             or from the server side:
650              
651             $stream->closed->then(sub {
652             # cleanup here after the stream goes away
653             })->on_fail(sub {
654             die "Our stream was reset from the other side: " . shift;
655             });
656              
657             =cut
658              
659             =head2 replied
660              
661             We have received a SYN_REPLY from the other side. If the stream is reset before
662             that happens, this will be cancelled with the reason as the first parameter.
663              
664             =cut
665              
666             sub replied {
667 14     14 1 30 my $self = shift;
668             $self->{future_replied} ||= Future->new->on_done(sub {
669 6     6   191 $self->{seen_reply} = 1
670             })
671 14   66     90 }
672              
673             =head2 finished
674              
675             This frame has finished sending everything, i.e. we've set the FIN flag on a packet.
676             The difference between this and L is that the other side may have more to
677             say. Will be cancelled with the reason on reset.
678              
679             =cut
680              
681             sub finished {
682 0     0 1 0 my $self = shift;
683 0   0     0 $self->{future_finished} ||= Future->new
684             }
685              
686             =head2 remote_finished
687              
688             This frame has had all the data it's going to get from the other side,
689             i.e. we're sending unidirectional data or we have seen the FIN flag on
690             an incoming packet.
691              
692             =cut
693              
694             sub remote_finished {
695 0     0 1 0 my $self = shift;
696             $self->{future_remote_finished} ||= Future->new->on_done(sub {
697 0     0   0 $self->{remote_fin} = 1;
698 0   0     0 });
699             }
700              
701             =head2 closed
702              
703             The stream has been closed on both sides - either through reset or "natural causes".
704             Might still be cancelled if the parent object disappears.
705              
706             =cut
707              
708             sub closed {
709 0     0 1 0 my $self = shift;
710 0   0     0 $self->{future_closed} ||= Future->needs_all($self->finished, $self->remote_finished)
711             }
712              
713             =head2 accepted
714              
715             The remote accepted this stream immediately after our initial SYN_STREAM. If you
716             want notification on rejection, use an ->on_fail handler on this method.
717              
718             =cut
719              
720             sub accepted {
721 24     24 1 521 my $self = shift;
722 24   66     107 $self->{future_accepted} ||= Future->new
723             }
724              
725             1;
726              
727             __END__