File Coverage

blib/lib/Hypersonic/Stream.pm
Criterion Covered Total %
statement 84 84 100.0
branch 4 6 66.6
condition 7 10 70.0
subroutine 26 26 100.0
pod 0 21 0.0
total 121 147 82.3


line stmt bran cond sub pod time code
1             package Hypersonic::Stream;
2 4     4   7940 use strict;
  4         8  
  4         176  
3 4     4   14 use warnings;
  4         7  
  4         244  
4 4     4   67 use 5.010;
  4         10  
5              
6             our $VERSION = '0.17';
7              
8             use constant {
9 4         500 STATE_INIT => 0,
10             STATE_STARTED => 1,
11             STATE_FINISHED => 2,
12             STATE_ABORTED => 3,
13 4     4   17 };
  4         4  
14 4     4   21 use constant MAX_STREAMS => 65536;
  4         6  
  4         7230  
15              
16             # Class method for streaming handler detection (only Perl code needed)
17             sub is_streaming_handler {
18 3     3 0 27462 my ($class, $handler, $opts) = @_;
19 3 100 66     21 return 1 if $opts && $opts->{streaming};
20 2         2 my $proto = prototype($handler);
21 2 50 33     6 return 1 if defined $proto && $proto =~ /stream/i;
22 2         3 eval {
23 2         16 require B::Deparse;
24 2         202 my $deparser = B::Deparse->new('-p', '-sC');
25 2         2285 my $code = $deparser->coderef2text($handler);
26 2 50       26 return 1 if $code =~ /\$stream\s*->/;
27             };
28 2         10 return 0;
29             }
30              
31             # ============================================================
32             # XS Code Generation - ALL instance methods generated in C
33             # ============================================================
34              
35             sub generate_c_code {
36 6     6 0 5465 my ($class, $builder, $opts) = @_;
37 6   100     35 $opts //= {};
38 6   100     26 my $max = $opts->{max_streams} // MAX_STREAMS;
39            
40             # sys/uio.h is POSIX-only (writev + struct iovec live there).
41             # On Windows we fall back to multiple send() calls (see
42             # stream_write_chunk_http1 below). Winsock headers are pulled in
43             # by Hypersonic::JIT::Util / Hypersonic::Socket already.
44 6         39 $builder->line('#ifndef _WIN32')
45             ->line('#include ')
46             ->line('#endif')
47             ->blank;
48            
49 6         25 $class->gen_stream_registry($builder, $max);
50 6         30 $class->gen_status_text($builder);
51 6         34 $class->gen_stream_start_c($builder);
52 6         27 $class->gen_stream_write_chunk_c($builder);
53 6         28 $class->gen_stream_end_c($builder);
54 6         24 $class->gen_stream_reset_c($builder);
55            
56             # XS instance methods
57 6         25 $class->gen_xs_new($builder);
58 6         35 $class->gen_xs_fd($builder);
59 6         38 $class->gen_xs_protocol($builder);
60 6         181 $class->gen_xs_state($builder);
61 6         22 $class->gen_xs_chunks_sent($builder);
62 6         23 $class->gen_xs_is_started($builder);
63 6         16 $class->gen_xs_is_finished($builder);
64 6         23 $class->gen_xs_headers($builder);
65 6         21 $class->gen_xs_content_type($builder);
66 6         23 $class->gen_xs_write($builder);
67 6         22 $class->gen_xs_end($builder);
68 6         24 $class->gen_xs_abort($builder);
69            
70 6         19 return $builder;
71             }
72              
73             sub gen_stream_registry {
74 6     6 0 20 my ($class, $builder, $max) = @_;
75            
76 6         159 $builder->comment('Stream registry - O(1) lookup by fd')
77             ->line('#define STREAM_MAX ' . $max)
78             ->line('#define STREAM_STATE_INIT 0')
79             ->line('#define STREAM_STATE_STARTED 1')
80             ->line('#define STREAM_STATE_FINISHED 2')
81             ->line('#define STREAM_STATE_ABORTED 3')
82             ->blank
83             ->line('typedef struct {')
84             ->line(' int state;')
85             ->line(' int chunks_sent;')
86             ->line(' int http2;')
87             ->line(' int status;')
88             ->line(' char content_type[128];')
89             ->line(' char extra_headers[512];') # For Cache-Control, X-Accel-Buffering, etc.
90             ->line('} StreamState;')
91             ->blank
92             ->line('static StreamState stream_registry[STREAM_MAX];')
93             ->blank;
94             }
95              
96             sub gen_status_text {
97 6     6 0 18 my ($class, $builder) = @_;
98            
99 6         72 $builder->line('static const char* stream_status_text(int status) {')
100             ->line(' switch(status) {')
101             ->line(' case 200: return "OK";')
102             ->line(' case 201: return "Created";')
103             ->line(' case 204: return "No Content";')
104             ->line(' case 400: return "Bad Request";')
105             ->line(' case 404: return "Not Found";')
106             ->line(' case 500: return "Internal Server Error";')
107             ->line(' default: return "OK";')
108             ->line(' }')
109             ->line('}')
110             ->blank;
111             }
112              
113             sub gen_stream_start_c {
114 7     7 0 3385 my ($class, $builder) = @_;
115            
116 7         120 $builder->line('static void stream_start_http1(int fd) {')
117             ->line(' StreamState* s = &stream_registry[fd];')
118             ->line(' char headers[2048];')
119             ->line(' int len = snprintf(headers, sizeof(headers),')
120             ->line(' "HTTP/1.1 %d %s\\r\\n"')
121             ->line(' "Content-Type: %s\\r\\n"')
122             ->line(' "%s"') # Extra headers (Cache-Control, etc.)
123             ->line(' "Transfer-Encoding: chunked\\r\\n"')
124             ->line(' "Connection: keep-alive\\r\\n\\r\\n",')
125             ->line(' s->status, stream_status_text(s->status), s->content_type, s->extra_headers);')
126             ->line(' send(fd, headers, len, 0);')
127             ->line(' s->state = STREAM_STATE_STARTED;')
128             ->line(' s->chunks_sent = 0;')
129             ->line('}')
130             ->blank;
131             }
132              
133             sub gen_stream_write_chunk_c {
134 7     7 0 4005 my ($class, $builder) = @_;
135            
136 7         96 $builder->line('static void stream_write_chunk_http1(int fd, const char* data, size_t len) {')
137             ->line(' if (len == 0) return;')
138             ->line(' char size_line[32];')
139             ->line(' int header_len = snprintf(size_line, sizeof(size_line), "%zx\\r\\n", len);')
140             ->raw(<<'C')
141             #ifdef _WIN32
142             /* No writev on Windows; three send() calls instead. The chunk
143             * size line is tiny and the trailing CRLF is 2 bytes so the
144             * extra syscalls are negligible. */
145             send(fd, size_line, header_len, 0);
146             send(fd, data, len, 0);
147             send(fd, "\r\n", 2, 0);
148             #else
149             struct iovec iov[3];
150             iov[0].iov_base = size_line;
151             iov[0].iov_len = header_len;
152             iov[1].iov_base = (void*)data;
153             iov[1].iov_len = len;
154             iov[2].iov_base = "\r\n";
155             iov[2].iov_len = 2;
156             writev(fd, iov, 3);
157             #endif
158             C
159             ->line(' stream_registry[fd].chunks_sent++;')
160             ->line('}')
161             ->blank;
162             }
163              
164             sub gen_stream_end_c {
165 7     7 0 3500 my ($class, $builder) = @_;
166            
167 7         45 $builder->line('static void stream_end_http1(int fd) {')
168             ->line(' send(fd, "0\\r\\n\\r\\n", 5, 0);')
169             ->line(' stream_registry[fd].state = STREAM_STATE_FINISHED;')
170             ->line('}')
171             ->blank;
172             }
173              
174             sub gen_stream_reset_c {
175 6     6 0 14 my ($class, $builder) = @_;
176            
177 6         62 $builder->line('static void stream_reset(int fd) {')
178             ->line(' memset(&stream_registry[fd], 0, sizeof(StreamState));')
179             ->line(' stream_registry[fd].status = 200;')
180             ->line(' strcpy(stream_registry[fd].content_type, "text/plain");')
181             ->line('}')
182             ->blank;
183             }
184              
185             # XS: new(fd => N, protocol => P) - returns blessed scalar
186             sub gen_xs_new {
187 6     6 0 13 my ($class, $builder) = @_;
188            
189 6         454 $builder->xs_function('xs_stream_new')
190             ->xs_preamble
191             ->line('int fd = -1;')
192             ->line('int http2 = 0;')
193             ->line('STRLEN klen;')
194             ->line('const char* key;')
195             ->line('STRLEN plen;')
196             ->line('const char* proto;')
197             ->line('SV* fd_sv;')
198             ->line('SV* ref;')
199             ->blank
200             ->comment('Parse hash args: new(fd => N) or new(fd => N, protocol => P)')
201             ->for('int i = 1', 'i < items', 'i += 2')
202             ->if('i + 1 < items')
203             ->line('key = SvPV(ST(i), klen);')
204             ->if('klen == 2 && strncmp(key, "fd", 2) == 0')
205             ->line('fd = SvIV(ST(i + 1));')
206             ->endif
207             ->if('klen == 8 && strncmp(key, "protocol", 8) == 0')
208             ->line('proto = SvPV(ST(i + 1), plen);')
209             ->if('plen == 5 && strncmp(proto, "http2", 5) == 0')
210             ->line('http2 = 1;')
211             ->endif
212             ->endif
213             ->endif
214             ->endfor
215             ->blank
216             ->if('fd < 0 || fd >= STREAM_MAX')
217             ->line('croak("Invalid fd: %d", fd);')
218             ->endif
219             ->blank
220             ->line('stream_reset(fd);')
221             ->line('stream_registry[fd].http2 = http2;')
222             ->blank
223             ->line('fd_sv = newSViv(fd);')
224             ->line('ref = newRV_noinc(fd_sv);')
225             ->line('sv_bless(ref, gv_stashpv("Hypersonic::Stream", GV_ADD));')
226             ->line('ST(0) = sv_2mortal(ref);')
227             ->line('XSRETURN(1);')
228             ->xs_end
229             ->blank;
230             }
231              
232             sub gen_xs_fd {
233 6     6 0 15 my ($class, $builder) = @_;
234            
235 6         93 $builder->xs_function('xs_stream_fd')
236             ->xs_preamble
237             ->check_items(1, 1, '$stream->fd')
238             ->line('XSRETURN_IV(SvIV(SvRV(ST(0))));')
239             ->xs_end
240             ->blank;
241             }
242              
243             sub gen_xs_protocol {
244 6     6 0 15 my ($class, $builder) = @_;
245            
246 6         117 $builder->xs_function('xs_stream_protocol')
247             ->xs_preamble
248             ->check_items(1, 1, '$stream->protocol')
249             ->line('int fd = SvIV(SvRV(ST(0)));')
250             ->if('stream_registry[fd].http2')
251             ->line('XSRETURN_PV("http2");')
252             ->else
253             ->line('XSRETURN_PV("http1");')
254             ->endif
255             ->xs_end
256             ->blank;
257             }
258              
259             sub gen_xs_state {
260 6     6 0 23 my ($class, $builder) = @_;
261            
262 6         181 $builder->xs_function('xs_stream_state')
263             ->xs_preamble
264             ->check_items(1, 1, '$stream->state')
265             ->line('int fd = SvIV(SvRV(ST(0)));')
266             ->if('fd < 0 || fd >= STREAM_MAX')
267             ->line('XSRETURN_IV(0);')
268             ->endif
269             ->line('XSRETURN_IV(stream_registry[fd].state);')
270             ->xs_end
271             ->blank;
272             }
273              
274             sub gen_xs_chunks_sent {
275 6     6 0 12 my ($class, $builder) = @_;
276            
277 6         67 $builder->xs_function('xs_stream_chunks_sent')
278             ->xs_preamble
279             ->check_items(1, 1, '$stream->chunks_sent')
280             ->line('int fd = SvIV(SvRV(ST(0)));')
281             ->if('fd < 0 || fd >= STREAM_MAX')
282             ->line('XSRETURN_IV(0);')
283             ->endif
284             ->line('XSRETURN_IV(stream_registry[fd].chunks_sent);')
285             ->xs_end
286             ->blank;
287             }
288              
289             sub gen_xs_is_started {
290 6     6 0 10 my ($class, $builder) = @_;
291            
292 6         92 $builder->xs_function('xs_stream_is_started')
293             ->xs_preamble
294             ->check_items(1, 1, '$stream->is_started')
295             ->line('int fd = SvIV(SvRV(ST(0)));')
296             ->if('stream_registry[fd].state >= STREAM_STATE_STARTED')
297             ->line('XSRETURN_YES;')
298             ->else
299             ->line('XSRETURN_NO;')
300             ->endif
301             ->xs_end
302             ->blank;
303             }
304              
305             sub gen_xs_is_finished {
306 6     6 0 9 my ($class, $builder) = @_;
307            
308 6         77 $builder->xs_function('xs_stream_is_finished')
309             ->xs_preamble
310             ->check_items(1, 1, '$stream->is_finished')
311             ->line('int fd = SvIV(SvRV(ST(0)));')
312             ->if('stream_registry[fd].state >= STREAM_STATE_FINISHED')
313             ->line('XSRETURN_YES;')
314             ->else
315             ->line('XSRETURN_NO;')
316             ->endif
317             ->xs_end
318             ->blank;
319             }
320              
321             sub gen_xs_headers {
322 6     6 0 11 my ($class, $builder) = @_;
323            
324 6         357 $builder->xs_function('xs_stream_headers')
325             ->xs_preamble
326             ->line('int fd = SvIV(SvRV(ST(0)));')
327             ->line('StreamState* s = &stream_registry[fd];')
328             ->blank
329             ->if('s->state != STREAM_STATE_INIT')
330             ->line('croak("Cannot set headers after streaming started");')
331             ->endif
332             ->blank
333             ->if('items >= 2')
334             ->line('s->status = SvIV(ST(1));')
335             ->endif
336             ->blank
337             ->line('s->extra_headers[0] = \'\\0\';')
338             ->if('items >= 3 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV')
339             ->line('HV* hv = (HV*)SvRV(ST(2));')
340             ->line('int extra_pos = 0;')
341             ->blank
342             ->comment('Extract Content-Type')
343             ->line('SV** ct = hv_fetchs(hv, "Content-Type", 0);')
344             ->if('!ct')
345             ->line('ct = hv_fetchs(hv, "content-type", 0);')
346             ->endif
347             ->if('ct && *ct')
348             ->line('STRLEN len;')
349             ->line('const char* val = SvPV(*ct, len);')
350             ->if('len < sizeof(s->content_type)')
351             ->line('memcpy(s->content_type, val, len);')
352             ->line('s->content_type[len] = \'\\0\';')
353             ->endif
354             ->endif
355             ->blank
356             ->comment('Extract other headers (Cache-Control, Connection, X-Accel-Buffering)')
357             ->line('SV** cc = hv_fetchs(hv, "Cache-Control", 0);')
358             ->if('cc && *cc')
359             ->line('STRLEN len;')
360             ->line('const char* val = SvPV(*cc, len);')
361             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
362             ->line(' sizeof(s->extra_headers) - extra_pos, "Cache-Control: %s\\r\\n", val);')
363             ->endif
364             ->line('SV** conn = hv_fetchs(hv, "Connection", 0);')
365             ->if('conn && *conn')
366             ->line('STRLEN len;')
367             ->line('const char* val = SvPV(*conn, len);')
368             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
369             ->line(' sizeof(s->extra_headers) - extra_pos, "Connection: %s\\r\\n", val);')
370             ->endif
371             ->line('SV** xab = hv_fetchs(hv, "X-Accel-Buffering", 0);')
372             ->if('xab && *xab')
373             ->line('STRLEN len;')
374             ->line('const char* val = SvPV(*xab, len);')
375             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
376             ->line(' sizeof(s->extra_headers) - extra_pos, "X-Accel-Buffering: %s\\r\\n", val);')
377             ->endif
378             ->endif
379             ->blank
380             ->line('ST(0) = ST(0);')
381             ->line('XSRETURN(1);')
382             ->xs_end
383             ->blank;
384             }
385              
386             sub gen_xs_content_type {
387 6     6 0 11 my ($class, $builder) = @_;
388            
389 6         140 $builder->xs_function('xs_stream_content_type')
390             ->xs_preamble
391             ->if('items != 2')
392             ->line('croak("Usage: $stream->content_type(type)");')
393             ->endif
394             ->line('int fd = SvIV(SvRV(ST(0)));')
395             ->line('StreamState* s = &stream_registry[fd];')
396             ->blank
397             ->if('s->state != STREAM_STATE_INIT')
398             ->line('croak("Cannot set content_type after streaming started");')
399             ->endif
400             ->blank
401             ->line('STRLEN len;')
402             ->line('const char* ct = SvPV(ST(1), len);')
403             ->if('len < sizeof(s->content_type)')
404             ->line('memcpy(s->content_type, ct, len);')
405             ->line('s->content_type[len] = \'\\0\';')
406             ->endif
407             ->blank
408             ->line('ST(0) = ST(0);')
409             ->line('XSRETURN(1);')
410             ->xs_end
411             ->blank;
412             }
413              
414             sub gen_xs_write {
415 6     6 0 11 my ($class, $builder) = @_;
416            
417 6         154 $builder->xs_function('xs_stream_write')
418             ->xs_preamble
419             ->if('items != 2')
420             ->line('croak("Usage: $stream->write(data)");')
421             ->endif
422             ->line('int fd = SvIV(SvRV(ST(0)));')
423             ->line('StreamState* s = &stream_registry[fd];')
424             ->blank
425             ->line('STRLEN len;')
426             ->line('const char* data = SvPV(ST(1), len);')
427             ->if('len == 0')
428             ->line('ST(0) = ST(0);')
429             ->line('XSRETURN(1);')
430             ->endif
431             ->blank
432             ->if('s->state == STREAM_STATE_INIT && !s->http2')
433             ->line('stream_start_http1(fd);')
434             ->endif
435             ->blank
436             ->if('!s->http2')
437             ->line('stream_write_chunk_http1(fd, data, len);')
438             ->endif
439             ->blank
440             ->line('ST(0) = ST(0);')
441             ->line('XSRETURN(1);')
442             ->xs_end
443             ->blank;
444             }
445              
446             sub gen_xs_end {
447 6     6 0 14 my ($class, $builder) = @_;
448            
449 6         188 $builder->xs_function('xs_stream_end')
450             ->xs_preamble
451             ->line('int fd = SvIV(SvRV(ST(0)));')
452             ->line('StreamState* s = &stream_registry[fd];')
453             ->blank
454             ->if('s->state >= STREAM_STATE_FINISHED')
455             ->line('ST(0) = ST(0);')
456             ->line('XSRETURN(1);')
457             ->endif
458             ->blank
459             ->if('items >= 2 && SvOK(ST(1))')
460             ->line('STRLEN len;')
461             ->line('const char* data = SvPV(ST(1), len);')
462             ->if('len > 0')
463             ->if('s->state == STREAM_STATE_INIT && !s->http2')
464             ->line('stream_start_http1(fd);')
465             ->endif
466             ->if('!s->http2')
467             ->line('stream_write_chunk_http1(fd, data, len);')
468             ->endif
469             ->endif
470             ->endif
471             ->blank
472             ->if('s->state == STREAM_STATE_INIT && !s->http2')
473             ->line('stream_start_http1(fd);')
474             ->endif
475             ->blank
476             ->if('!s->http2')
477             ->line('stream_end_http1(fd);')
478             ->endif
479             ->blank
480             ->line('ST(0) = ST(0);')
481             ->line('XSRETURN(1);')
482             ->xs_end
483             ->blank;
484             }
485              
486             sub gen_xs_abort {
487 6     6 0 10 my ($class, $builder) = @_;
488            
489 6         147 $builder->xs_function('xs_stream_abort')
490             ->xs_preamble
491             ->line('int fd = SvIV(SvRV(ST(0)));')
492             ->line('StreamState* s = &stream_registry[fd];')
493             ->line('int code = items >= 2 ? SvIV(ST(1)) : 500;')
494             ->line('const char* reason = items >= 3 ? SvPV_nolen(ST(2)) : "Internal Server Error";')
495             ->blank
496             ->if('s->state == STREAM_STATE_INIT && !s->http2')
497             ->line('s->status = code;')
498             ->line('stream_start_http1(fd);')
499             ->line('stream_write_chunk_http1(fd, reason, strlen(reason));')
500             ->endif
501             ->blank
502             ->if('!s->http2')
503             ->line('stream_end_http1(fd);')
504             ->endif
505             ->line('s->state = STREAM_STATE_ABORTED;')
506             ->blank
507             ->line('ST(0) = ST(0);')
508             ->line('XSRETURN(1);')
509             ->xs_end
510             ->blank;
511             }
512              
513              
514             sub get_xs_functions {
515             return {
516 5     5 0 614799 'Hypersonic::Stream::new' => { source => 'xs_stream_new', is_xs_native => 1 },
517             'Hypersonic::Stream::fd' => { source => 'xs_stream_fd', is_xs_native => 1 },
518             'Hypersonic::Stream::protocol' => { source => 'xs_stream_protocol', is_xs_native => 1 },
519             'Hypersonic::Stream::state' => { source => 'xs_stream_state', is_xs_native => 1 },
520             'Hypersonic::Stream::chunks_sent' => { source => 'xs_stream_chunks_sent', is_xs_native => 1 },
521             'Hypersonic::Stream::is_started' => { source => 'xs_stream_is_started', is_xs_native => 1 },
522             'Hypersonic::Stream::is_finished' => { source => 'xs_stream_is_finished', is_xs_native => 1 },
523             'Hypersonic::Stream::headers' => { source => 'xs_stream_headers', is_xs_native => 1 },
524             'Hypersonic::Stream::content_type' => { source => 'xs_stream_content_type', is_xs_native => 1 },
525             'Hypersonic::Stream::write' => { source => 'xs_stream_write', is_xs_native => 1 },
526             'Hypersonic::Stream::end' => { source => 'xs_stream_end', is_xs_native => 1 },
527             'Hypersonic::Stream::abort' => { source => 'xs_stream_abort', is_xs_native => 1 },
528             };
529             }
530              
531             1;