File Coverage

blib/lib/Hypersonic/Stream.pm
Criterion Covered Total %
statement 84 84 100.0
branch 4 6 66.6
condition 7 10 70.0
subroutine 26 26 100.0
pod 0 21 0.0
total 121 147 82.3


line stmt bran cond sub pod time code
1             package Hypersonic::Stream;
2 4     4   8404 use strict;
  4         6  
  4         161  
3 4     4   18 use warnings;
  4         5  
  4         298  
4 4     4   82 use 5.010;
  4         13  
5              
6             our $VERSION = '0.15';
7              
8             use constant {
9 4         594 STATE_INIT => 0,
10             STATE_STARTED => 1,
11             STATE_FINISHED => 2,
12             STATE_ABORTED => 3,
13 4     4   20 };
  4         11  
14 4     4   23 use constant MAX_STREAMS => 65536;
  4         9  
  4         8454  
15              
16             # Class method for streaming handler detection (only Perl code needed)
17             sub is_streaming_handler {
18 3     3 0 26874 my ($class, $handler, $opts) = @_;
19 3 100 66     17 return 1 if $opts && $opts->{streaming};
20 2         4 my $proto = prototype($handler);
21 2 50 33     6 return 1 if defined $proto && $proto =~ /stream/i;
22 2         2 eval {
23 2         12 require B::Deparse;
24 2         129 my $deparser = B::Deparse->new('-p', '-sC');
25 2         2279 my $code = $deparser->coderef2text($handler);
26 2 50       21 return 1 if $code =~ /\$stream\s*->/;
27             };
28 2         10 return 0;
29             }
30              
31             # ============================================================
32             # XS Code Generation - ALL instance methods generated in C
33             # ============================================================
34              
35             sub generate_c_code {
36 6     6 0 5587 my ($class, $builder, $opts) = @_;
37 6   100     40 $opts //= {};
38 6   100     72 my $max = $opts->{max_streams} // MAX_STREAMS;
39            
40 6         42 $builder->line('#include ')
41             ->blank;
42            
43 6         36 $class->gen_stream_registry($builder, $max);
44 6         37 $class->gen_status_text($builder);
45 6         28 $class->gen_stream_start_c($builder);
46 6         30 $class->gen_stream_write_chunk_c($builder);
47 6         45 $class->gen_stream_end_c($builder);
48 6         22 $class->gen_stream_reset_c($builder);
49            
50             # XS instance methods
51 6         24 $class->gen_xs_new($builder);
52 6         30 $class->gen_xs_fd($builder);
53 6         29 $class->gen_xs_protocol($builder);
54 6         23 $class->gen_xs_state($builder);
55 6         125 $class->gen_xs_chunks_sent($builder);
56 6         27 $class->gen_xs_is_started($builder);
57 6         26 $class->gen_xs_is_finished($builder);
58 6         24 $class->gen_xs_headers($builder);
59 6         27 $class->gen_xs_content_type($builder);
60 6         26 $class->gen_xs_write($builder);
61 6         25 $class->gen_xs_end($builder);
62 6         21 $class->gen_xs_abort($builder);
63            
64 6         23 return $builder;
65             }
66              
67             sub gen_stream_registry {
68 6     6 0 22 my ($class, $builder, $max) = @_;
69            
70 6         189 $builder->comment('Stream registry - O(1) lookup by fd')
71             ->line('#define STREAM_MAX ' . $max)
72             ->line('#define STREAM_STATE_INIT 0')
73             ->line('#define STREAM_STATE_STARTED 1')
74             ->line('#define STREAM_STATE_FINISHED 2')
75             ->line('#define STREAM_STATE_ABORTED 3')
76             ->blank
77             ->line('typedef struct {')
78             ->line(' int state;')
79             ->line(' int chunks_sent;')
80             ->line(' int http2;')
81             ->line(' int status;')
82             ->line(' char content_type[128];')
83             ->line(' char extra_headers[512];') # For Cache-Control, X-Accel-Buffering, etc.
84             ->line('} StreamState;')
85             ->blank
86             ->line('static StreamState stream_registry[STREAM_MAX];')
87             ->blank;
88             }
89              
90             sub gen_status_text {
91 6     6 0 20 my ($class, $builder) = @_;
92            
93 6         83 $builder->line('static const char* stream_status_text(int status) {')
94             ->line(' switch(status) {')
95             ->line(' case 200: return "OK";')
96             ->line(' case 201: return "Created";')
97             ->line(' case 204: return "No Content";')
98             ->line(' case 400: return "Bad Request";')
99             ->line(' case 404: return "Not Found";')
100             ->line(' case 500: return "Internal Server Error";')
101             ->line(' default: return "OK";')
102             ->line(' }')
103             ->line('}')
104             ->blank;
105             }
106              
107             sub gen_stream_start_c {
108 7     7 0 4252 my ($class, $builder) = @_;
109            
110 7         134 $builder->line('static void stream_start_http1(int fd) {')
111             ->line(' StreamState* s = &stream_registry[fd];')
112             ->line(' char headers[2048];')
113             ->line(' int len = snprintf(headers, sizeof(headers),')
114             ->line(' "HTTP/1.1 %d %s\\r\\n"')
115             ->line(' "Content-Type: %s\\r\\n"')
116             ->line(' "%s"') # Extra headers (Cache-Control, etc.)
117             ->line(' "Transfer-Encoding: chunked\\r\\n"')
118             ->line(' "Connection: keep-alive\\r\\n\\r\\n",')
119             ->line(' s->status, stream_status_text(s->status), s->content_type, s->extra_headers);')
120             ->line(' send(fd, headers, len, 0);')
121             ->line(' s->state = STREAM_STATE_STARTED;')
122             ->line(' s->chunks_sent = 0;')
123             ->line('}')
124             ->blank;
125             }
126              
127             sub gen_stream_write_chunk_c {
128 7     7 0 4574 my ($class, $builder) = @_;
129            
130 7         126 $builder->line('static void stream_write_chunk_http1(int fd, const char* data, size_t len) {')
131             ->line(' if (len == 0) return;')
132             ->line(' char size_line[32];')
133             ->line(' int header_len = snprintf(size_line, sizeof(size_line), "%zx\\r\\n", len);')
134             ->line(' struct iovec iov[3];')
135             ->line(' iov[0].iov_base = size_line;')
136             ->line(' iov[0].iov_len = header_len;')
137             ->line(' iov[1].iov_base = (void*)data;')
138             ->line(' iov[1].iov_len = len;')
139             ->line(' iov[2].iov_base = "\\r\\n";')
140             ->line(' iov[2].iov_len = 2;')
141             ->line(' writev(fd, iov, 3);')
142             ->line(' stream_registry[fd].chunks_sent++;')
143             ->line('}')
144             ->blank;
145             }
146              
147             sub gen_stream_end_c {
148 7     7 0 3316 my ($class, $builder) = @_;
149            
150 7         52 $builder->line('static void stream_end_http1(int fd) {')
151             ->line(' send(fd, "0\\r\\n\\r\\n", 5, 0);')
152             ->line(' stream_registry[fd].state = STREAM_STATE_FINISHED;')
153             ->line('}')
154             ->blank;
155             }
156              
157             sub gen_stream_reset_c {
158 6     6 0 12 my ($class, $builder) = @_;
159            
160 6         54 $builder->line('static void stream_reset(int fd) {')
161             ->line(' memset(&stream_registry[fd], 0, sizeof(StreamState));')
162             ->line(' stream_registry[fd].status = 200;')
163             ->line(' strcpy(stream_registry[fd].content_type, "text/plain");')
164             ->line('}')
165             ->blank;
166             }
167              
168             # XS: new(fd => N, protocol => P) - returns blessed scalar
169             sub gen_xs_new {
170 6     6 0 10 my ($class, $builder) = @_;
171            
172 6         428 $builder->xs_function('xs_stream_new')
173             ->xs_preamble
174             ->line('int fd = -1;')
175             ->line('int http2 = 0;')
176             ->line('STRLEN klen;')
177             ->line('const char* key;')
178             ->line('STRLEN plen;')
179             ->line('const char* proto;')
180             ->line('SV* fd_sv;')
181             ->line('SV* ref;')
182             ->blank
183             ->comment('Parse hash args: new(fd => N) or new(fd => N, protocol => P)')
184             ->for('int i = 1', 'i < items', 'i += 2')
185             ->if('i + 1 < items')
186             ->line('key = SvPV(ST(i), klen);')
187             ->if('klen == 2 && strncmp(key, "fd", 2) == 0')
188             ->line('fd = SvIV(ST(i + 1));')
189             ->endif
190             ->if('klen == 8 && strncmp(key, "protocol", 8) == 0')
191             ->line('proto = SvPV(ST(i + 1), plen);')
192             ->if('plen == 5 && strncmp(proto, "http2", 5) == 0')
193             ->line('http2 = 1;')
194             ->endif
195             ->endif
196             ->endif
197             ->endfor
198             ->blank
199             ->if('fd < 0 || fd >= STREAM_MAX')
200             ->line('croak("Invalid fd: %d", fd);')
201             ->endif
202             ->blank
203             ->line('stream_reset(fd);')
204             ->line('stream_registry[fd].http2 = http2;')
205             ->blank
206             ->line('fd_sv = newSViv(fd);')
207             ->line('ref = newRV_noinc(fd_sv);')
208             ->line('sv_bless(ref, gv_stashpv("Hypersonic::Stream", GV_ADD));')
209             ->line('ST(0) = sv_2mortal(ref);')
210             ->line('XSRETURN(1);')
211             ->xs_end
212             ->blank;
213             }
214              
215             sub gen_xs_fd {
216 6     6 0 13 my ($class, $builder) = @_;
217            
218 6         102 $builder->xs_function('xs_stream_fd')
219             ->xs_preamble
220             ->check_items(1, 1, '$stream->fd')
221             ->line('XSRETURN_IV(SvIV(SvRV(ST(0))));')
222             ->xs_end
223             ->blank;
224             }
225              
226             sub gen_xs_protocol {
227 6     6 0 11 my ($class, $builder) = @_;
228            
229 6         133 $builder->xs_function('xs_stream_protocol')
230             ->xs_preamble
231             ->check_items(1, 1, '$stream->protocol')
232             ->line('int fd = SvIV(SvRV(ST(0)));')
233             ->if('stream_registry[fd].http2')
234             ->line('XSRETURN_PV("http2");')
235             ->else
236             ->line('XSRETURN_PV("http1");')
237             ->endif
238             ->xs_end
239             ->blank;
240             }
241              
242             sub gen_xs_state {
243 6     6 0 11 my ($class, $builder) = @_;
244            
245 6         205 $builder->xs_function('xs_stream_state')
246             ->xs_preamble
247             ->check_items(1, 1, '$stream->state')
248             ->line('int fd = SvIV(SvRV(ST(0)));')
249             ->if('fd < 0 || fd >= STREAM_MAX')
250             ->line('XSRETURN_IV(0);')
251             ->endif
252             ->line('XSRETURN_IV(stream_registry[fd].state);')
253             ->xs_end
254             ->blank;
255             }
256              
257             sub gen_xs_chunks_sent {
258 6     6 0 12 my ($class, $builder) = @_;
259            
260 6         123 $builder->xs_function('xs_stream_chunks_sent')
261             ->xs_preamble
262             ->check_items(1, 1, '$stream->chunks_sent')
263             ->line('int fd = SvIV(SvRV(ST(0)));')
264             ->if('fd < 0 || fd >= STREAM_MAX')
265             ->line('XSRETURN_IV(0);')
266             ->endif
267             ->line('XSRETURN_IV(stream_registry[fd].chunks_sent);')
268             ->xs_end
269             ->blank;
270             }
271              
272             sub gen_xs_is_started {
273 6     6 0 16 my ($class, $builder) = @_;
274            
275 6         100 $builder->xs_function('xs_stream_is_started')
276             ->xs_preamble
277             ->check_items(1, 1, '$stream->is_started')
278             ->line('int fd = SvIV(SvRV(ST(0)));')
279             ->if('stream_registry[fd].state >= STREAM_STATE_STARTED')
280             ->line('XSRETURN_YES;')
281             ->else
282             ->line('XSRETURN_NO;')
283             ->endif
284             ->xs_end
285             ->blank;
286             }
287              
288             sub gen_xs_is_finished {
289 6     6 0 13 my ($class, $builder) = @_;
290            
291 6         81 $builder->xs_function('xs_stream_is_finished')
292             ->xs_preamble
293             ->check_items(1, 1, '$stream->is_finished')
294             ->line('int fd = SvIV(SvRV(ST(0)));')
295             ->if('stream_registry[fd].state >= STREAM_STATE_FINISHED')
296             ->line('XSRETURN_YES;')
297             ->else
298             ->line('XSRETURN_NO;')
299             ->endif
300             ->xs_end
301             ->blank;
302             }
303              
304             sub gen_xs_headers {
305 6     6 0 14 my ($class, $builder) = @_;
306            
307 6         463 $builder->xs_function('xs_stream_headers')
308             ->xs_preamble
309             ->line('int fd = SvIV(SvRV(ST(0)));')
310             ->line('StreamState* s = &stream_registry[fd];')
311             ->blank
312             ->if('s->state != STREAM_STATE_INIT')
313             ->line('croak("Cannot set headers after streaming started");')
314             ->endif
315             ->blank
316             ->if('items >= 2')
317             ->line('s->status = SvIV(ST(1));')
318             ->endif
319             ->blank
320             ->line('s->extra_headers[0] = \'\\0\';')
321             ->if('items >= 3 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV')
322             ->line('HV* hv = (HV*)SvRV(ST(2));')
323             ->line('int extra_pos = 0;')
324             ->blank
325             ->comment('Extract Content-Type')
326             ->line('SV** ct = hv_fetchs(hv, "Content-Type", 0);')
327             ->if('!ct')
328             ->line('ct = hv_fetchs(hv, "content-type", 0);')
329             ->endif
330             ->if('ct && *ct')
331             ->line('STRLEN len;')
332             ->line('const char* val = SvPV(*ct, len);')
333             ->if('len < sizeof(s->content_type)')
334             ->line('memcpy(s->content_type, val, len);')
335             ->line('s->content_type[len] = \'\\0\';')
336             ->endif
337             ->endif
338             ->blank
339             ->comment('Extract other headers (Cache-Control, Connection, X-Accel-Buffering)')
340             ->line('SV** cc = hv_fetchs(hv, "Cache-Control", 0);')
341             ->if('cc && *cc')
342             ->line('STRLEN len;')
343             ->line('const char* val = SvPV(*cc, len);')
344             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
345             ->line(' sizeof(s->extra_headers) - extra_pos, "Cache-Control: %s\\r\\n", val);')
346             ->endif
347             ->line('SV** conn = hv_fetchs(hv, "Connection", 0);')
348             ->if('conn && *conn')
349             ->line('STRLEN len;')
350             ->line('const char* val = SvPV(*conn, len);')
351             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
352             ->line(' sizeof(s->extra_headers) - extra_pos, "Connection: %s\\r\\n", val);')
353             ->endif
354             ->line('SV** xab = hv_fetchs(hv, "X-Accel-Buffering", 0);')
355             ->if('xab && *xab')
356             ->line('STRLEN len;')
357             ->line('const char* val = SvPV(*xab, len);')
358             ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
359             ->line(' sizeof(s->extra_headers) - extra_pos, "X-Accel-Buffering: %s\\r\\n", val);')
360             ->endif
361             ->endif
362             ->blank
363             ->line('ST(0) = ST(0);')
364             ->line('XSRETURN(1);')
365             ->xs_end
366             ->blank;
367             }
368              
369             sub gen_xs_content_type {
370 6     6 0 12 my ($class, $builder) = @_;
371            
372 6         254 $builder->xs_function('xs_stream_content_type')
373             ->xs_preamble
374             ->if('items != 2')
375             ->line('croak("Usage: $stream->content_type(type)");')
376             ->endif
377             ->line('int fd = SvIV(SvRV(ST(0)));')
378             ->line('StreamState* s = &stream_registry[fd];')
379             ->blank
380             ->if('s->state != STREAM_STATE_INIT')
381             ->line('croak("Cannot set content_type after streaming started");')
382             ->endif
383             ->blank
384             ->line('STRLEN len;')
385             ->line('const char* ct = SvPV(ST(1), len);')
386             ->if('len < sizeof(s->content_type)')
387             ->line('memcpy(s->content_type, ct, len);')
388             ->line('s->content_type[len] = \'\\0\';')
389             ->endif
390             ->blank
391             ->line('ST(0) = ST(0);')
392             ->line('XSRETURN(1);')
393             ->xs_end
394             ->blank;
395             }
396              
397             sub gen_xs_write {
398 6     6 0 13 my ($class, $builder) = @_;
399            
400 6         198 $builder->xs_function('xs_stream_write')
401             ->xs_preamble
402             ->if('items != 2')
403             ->line('croak("Usage: $stream->write(data)");')
404             ->endif
405             ->line('int fd = SvIV(SvRV(ST(0)));')
406             ->line('StreamState* s = &stream_registry[fd];')
407             ->blank
408             ->line('STRLEN len;')
409             ->line('const char* data = SvPV(ST(1), len);')
410             ->if('len == 0')
411             ->line('ST(0) = ST(0);')
412             ->line('XSRETURN(1);')
413             ->endif
414             ->blank
415             ->if('s->state == STREAM_STATE_INIT && !s->http2')
416             ->line('stream_start_http1(fd);')
417             ->endif
418             ->blank
419             ->if('!s->http2')
420             ->line('stream_write_chunk_http1(fd, data, len);')
421             ->endif
422             ->blank
423             ->line('ST(0) = ST(0);')
424             ->line('XSRETURN(1);')
425             ->xs_end
426             ->blank;
427             }
428              
429             sub gen_xs_end {
430 6     6 0 14 my ($class, $builder) = @_;
431            
432 6         231 $builder->xs_function('xs_stream_end')
433             ->xs_preamble
434             ->line('int fd = SvIV(SvRV(ST(0)));')
435             ->line('StreamState* s = &stream_registry[fd];')
436             ->blank
437             ->if('s->state >= STREAM_STATE_FINISHED')
438             ->line('ST(0) = ST(0);')
439             ->line('XSRETURN(1);')
440             ->endif
441             ->blank
442             ->if('items >= 2 && SvOK(ST(1))')
443             ->line('STRLEN len;')
444             ->line('const char* data = SvPV(ST(1), len);')
445             ->if('len > 0')
446             ->if('s->state == STREAM_STATE_INIT && !s->http2')
447             ->line('stream_start_http1(fd);')
448             ->endif
449             ->if('!s->http2')
450             ->line('stream_write_chunk_http1(fd, data, len);')
451             ->endif
452             ->endif
453             ->endif
454             ->blank
455             ->if('s->state == STREAM_STATE_INIT && !s->http2')
456             ->line('stream_start_http1(fd);')
457             ->endif
458             ->blank
459             ->if('!s->http2')
460             ->line('stream_end_http1(fd);')
461             ->endif
462             ->blank
463             ->line('ST(0) = ST(0);')
464             ->line('XSRETURN(1);')
465             ->xs_end
466             ->blank;
467             }
468              
469             sub gen_xs_abort {
470 6     6 0 14 my ($class, $builder) = @_;
471            
472 6         150 $builder->xs_function('xs_stream_abort')
473             ->xs_preamble
474             ->line('int fd = SvIV(SvRV(ST(0)));')
475             ->line('StreamState* s = &stream_registry[fd];')
476             ->line('int code = items >= 2 ? SvIV(ST(1)) : 500;')
477             ->line('const char* reason = items >= 3 ? SvPV_nolen(ST(2)) : "Internal Server Error";')
478             ->blank
479             ->if('s->state == STREAM_STATE_INIT && !s->http2')
480             ->line('s->status = code;')
481             ->line('stream_start_http1(fd);')
482             ->line('stream_write_chunk_http1(fd, reason, strlen(reason));')
483             ->endif
484             ->blank
485             ->if('!s->http2')
486             ->line('stream_end_http1(fd);')
487             ->endif
488             ->line('s->state = STREAM_STATE_ABORTED;')
489             ->blank
490             ->line('ST(0) = ST(0);')
491             ->line('XSRETURN(1);')
492             ->xs_end
493             ->blank;
494             }
495              
496              
497             sub get_xs_functions {
498             return {
499 5     5 0 681786 'Hypersonic::Stream::new' => { source => 'xs_stream_new', is_xs_native => 1 },
500             'Hypersonic::Stream::fd' => { source => 'xs_stream_fd', is_xs_native => 1 },
501             'Hypersonic::Stream::protocol' => { source => 'xs_stream_protocol', is_xs_native => 1 },
502             'Hypersonic::Stream::state' => { source => 'xs_stream_state', is_xs_native => 1 },
503             'Hypersonic::Stream::chunks_sent' => { source => 'xs_stream_chunks_sent', is_xs_native => 1 },
504             'Hypersonic::Stream::is_started' => { source => 'xs_stream_is_started', is_xs_native => 1 },
505             'Hypersonic::Stream::is_finished' => { source => 'xs_stream_is_finished', is_xs_native => 1 },
506             'Hypersonic::Stream::headers' => { source => 'xs_stream_headers', is_xs_native => 1 },
507             'Hypersonic::Stream::content_type' => { source => 'xs_stream_content_type', is_xs_native => 1 },
508             'Hypersonic::Stream::write' => { source => 'xs_stream_write', is_xs_native => 1 },
509             'Hypersonic::Stream::end' => { source => 'xs_stream_end', is_xs_native => 1 },
510             'Hypersonic::Stream::abort' => { source => 'xs_stream_abort', is_xs_native => 1 },
511             };
512             }
513              
514             1;