File Coverage

blib/lib/Hypersonic/UA/SSE.pm
Criterion Covered Total %
statement 51 61 83.6
branch 0 4 0.0
condition 1 2 50.0
subroutine 16 18 88.8
pod 0 13 0.0
total 68 98 69.3


line stmt bran cond sub pod time code
1             package Hypersonic::UA::SSE;
2              
3 1     1   204317 use strict;
  1         1  
  1         45  
4 1     1   4 use warnings;
  1         1  
  1         41  
5 1     1   13 use 5.010;
  1         2  
6              
7             our $VERSION = '0.15';
8              
9 1     1   3 use constant MAX_SSE_CONNS => 256;
  1         2  
  1         53  
10              
11             use constant {
12 1         1483 SLOT_FD => 0,
13             SLOT_URL => 1,
14             SLOT_CALLBACKS => 2,
15             SLOT_RECONNECT => 3,
16             SLOT_RETRY => 4,
17 1     1   3 };
  1         8  
18              
19             sub generate_c_code {
20 1     1 0 10643 my ($class, $builder, $opts) = @_;
21              
22 1   50     6 my $max_conns = $opts->{max_sse_conns} // MAX_SSE_CONNS;
23              
24 1         4 $class->gen_sse_registry($builder, $max_conns);
25 1         3 $class->gen_sse_parser($builder);
26 1         4 $class->gen_xs_new($builder);
27 1         2 $class->gen_xs_connect($builder);
28 1         3 $class->gen_xs_parse_events($builder);
29 1         4 $class->gen_xs_recv_chunk($builder);
30 1         2 $class->gen_xs_get_last_id($builder);
31 1         2 $class->gen_xs_set_retry($builder);
32 1         2 $class->gen_xs_close($builder);
33             }
34              
35             sub get_xs_functions {
36             return {
37 1     1 0 5431 'Hypersonic::UA::SSE::new' => { source => 'xs_sse_new', is_xs_native => 1 },
38             'Hypersonic::UA::SSE::connect' => { source => 'xs_sse_connect', is_xs_native => 1 },
39             'Hypersonic::UA::SSE::parse_events' => { source => 'xs_sse_parse_events', is_xs_native => 1 },
40             'Hypersonic::UA::SSE::recv_chunk' => { source => 'xs_sse_recv_chunk', is_xs_native => 1 },
41             'Hypersonic::UA::SSE::get_last_id' => { source => 'xs_sse_get_last_id', is_xs_native => 1 },
42             'Hypersonic::UA::SSE::set_retry' => { source => 'xs_sse_set_retry', is_xs_native => 1 },
43             'Hypersonic::UA::SSE::close' => { source => 'xs_sse_close', is_xs_native => 1 },
44             };
45             }
46              
47             sub gen_sse_registry {
48 1     1 0 1 my ($class, $builder, $max_conns) = @_;
49              
50 1         10 $builder->line('#include ')
51             ->line('#include ')
52             ->line('#include ')
53             ->line('#include ')
54             ->line('#include ')
55             ->blank;
56              
57 1         6 $builder->line("#define MAX_SSE_CONNS $max_conns")
58             ->line('#define SSE_BUFFER_INITIAL 65536')
59             ->blank;
60              
61 1         15 $builder->line('typedef struct {')
62             ->line(' int fd;')
63             ->line(' int tls;')
64             ->line(' int connected;')
65             ->line(' int reconnect;')
66             ->line(' int retry_ms;')
67             ->line(' char* buffer;')
68             ->line(' size_t buffer_len;')
69             ->line(' size_t buffer_cap;')
70             ->line(' char last_id[256];')
71             ->line(' char event_type[256];')
72             ->line(' char* event_data;')
73             ->line(' size_t event_data_len;')
74             ->line(' size_t event_data_cap;')
75             ->line('} SSEConnection;')
76             ->blank;
77              
78 1         4 $builder->line("static SSEConnection sse_registry[MAX_SSE_CONNS];")
79             ->blank;
80              
81             # Helper: find connection by fd
82 1         10 $builder->line('static SSEConnection* sse_find(int fd) {')
83             ->line(' int i;')
84             ->line(' for (i = 0; i < MAX_SSE_CONNS; i++) {')
85             ->line(' if (sse_registry[i].fd == fd) {')
86             ->line(' return &sse_registry[i];')
87             ->line(' }')
88             ->line(' }')
89             ->line(' return NULL;')
90             ->line('}')
91             ->blank;
92              
93             # Helper: allocate connection slot
94 1         17 $builder->line('static SSEConnection* sse_alloc(int fd) {')
95             ->line(' int i;')
96             ->line(' for (i = 0; i < MAX_SSE_CONNS; i++) {')
97             ->line(' if (sse_registry[i].fd == 0) {')
98             ->line(' SSEConnection* c = &sse_registry[i];')
99             ->line(' memset(c, 0, sizeof(SSEConnection));')
100             ->line(' c->fd = fd;')
101             ->line(' c->retry_ms = 3000;')
102             ->line(' c->reconnect = 1;')
103             ->line(' c->buffer = (char*)malloc(SSE_BUFFER_INITIAL);')
104             ->line(' c->buffer_cap = SSE_BUFFER_INITIAL;')
105             ->line(' c->event_data = (char*)malloc(SSE_BUFFER_INITIAL);')
106             ->line(' c->event_data_cap = SSE_BUFFER_INITIAL;')
107             ->line(' return c;')
108             ->line(' }')
109             ->line(' }')
110             ->line(' return NULL;')
111             ->line('}')
112             ->blank;
113              
114             # Helper: free connection
115 1         7 $builder->line('static void sse_free(SSEConnection* c) {')
116             ->line(' if (c->buffer) free(c->buffer);')
117             ->line(' if (c->event_data) free(c->event_data);')
118             ->line(' c->fd = 0;')
119             ->line(' c->buffer = NULL;')
120             ->line(' c->event_data = NULL;')
121             ->line('}')
122             ->blank;
123              
124             # Helper: append to buffer
125 1         16 $builder->line('static int sse_buffer_append(SSEConnection* c, const char* data, size_t len) {')
126             ->line(' if (c->buffer_len + len > c->buffer_cap) {')
127             ->line(' size_t new_cap = c->buffer_cap * 2;')
128             ->line(' while (new_cap < c->buffer_len + len) new_cap *= 2;')
129             ->line(' char* new_buf = (char*)realloc(c->buffer, new_cap);')
130             ->line(' if (!new_buf) return 0;')
131             ->line(' c->buffer = new_buf;')
132             ->line(' c->buffer_cap = new_cap;')
133             ->line(' }')
134             ->line(' memcpy(c->buffer + c->buffer_len, data, len);')
135             ->line(' c->buffer_len += len;')
136             ->line(' return 1;')
137             ->line('}')
138             ->blank;
139             }
140              
141             sub gen_sse_parser {
142 1     1 0 2 my ($class, $builder) = @_;
143              
144 1         134 $builder->comment('Parse SSE events from buffer, return array of events')
145             ->line('static AV* sse_parse_events(SSEConnection* c) {')
146             ->line(' AV* events = newAV();')
147             ->line(' char* p = c->buffer;')
148             ->line(' char* end = c->buffer + c->buffer_len;')
149             ->line(' char* event_start = p;')
150             ->blank
151             ->line(' c->event_type[0] = \'\\0\';')
152             ->line(' c->event_data_len = 0;')
153             ->blank
154             ->line(' while (p < end) {')
155             ->line(' char* line_start = p;')
156             ->line(' while (p < end && *p != \'\\n\') p++;')
157             ->line(' if (p >= end) break;')
158             ->blank
159             ->line(' size_t line_len = p - line_start;')
160             ->line(' p++;')
161             ->blank
162             ->line(' if (line_len > 0 && line_start[line_len - 1] == \'\\r\') {')
163             ->line(' line_len--;')
164             ->line(' }')
165             ->blank
166             ->line(' if (line_len == 0) {')
167             ->line(' if (c->event_data_len > 0) {')
168             ->line(' HV* event = newHV();')
169             ->blank
170             ->line(' if (c->event_data_len > 0 && c->event_data[c->event_data_len - 1] == \'\\n\') {')
171             ->line(' c->event_data_len--;')
172             ->line(' }')
173             ->blank
174             ->line(' if (c->event_type[0]) {')
175             ->line(' hv_stores(event, "event", newSVpv(c->event_type, 0));')
176             ->line(' }')
177             ->line(' hv_stores(event, "data", newSVpvn(c->event_data, c->event_data_len));')
178             ->line(' if (c->last_id[0]) {')
179             ->line(' hv_stores(event, "id", newSVpv(c->last_id, 0));')
180             ->line(' }')
181             ->blank
182             ->line(' av_push(events, newRV_noinc((SV*)event));')
183             ->blank
184             ->line(' c->event_type[0] = \'\\0\';')
185             ->line(' c->event_data_len = 0;')
186             ->line(' }')
187             ->line(' event_start = p;')
188             ->line(' continue;')
189             ->line(' }')
190             ->blank
191             ->line(' if (line_start[0] == \':\') {')
192             ->line(' continue;')
193             ->line(' }')
194             ->blank
195             ->line(' char* colon = memchr(line_start, \':\', line_len);')
196             ->line(' char* field = line_start;')
197             ->line(' size_t field_len;')
198             ->line(' char* value;')
199             ->line(' size_t value_len;')
200             ->blank
201             ->line(' if (colon) {')
202             ->line(' field_len = colon - line_start;')
203             ->line(' value = colon + 1;')
204             ->line(' value_len = line_len - field_len - 1;')
205             ->line(' if (value_len > 0 && value[0] == \' \') {')
206             ->line(' value++;')
207             ->line(' value_len--;')
208             ->line(' }')
209             ->line(' } else {')
210             ->line(' field_len = line_len;')
211             ->line(' value = "";')
212             ->line(' value_len = 0;')
213             ->line(' }')
214             ->blank
215             ->line(' if (field_len == 4 && memcmp(field, "data", 4) == 0) {')
216             ->line(' if (c->event_data_len + value_len + 1 > c->event_data_cap) {')
217             ->line(' size_t new_cap = c->event_data_cap * 2;')
218             ->line(' c->event_data = realloc(c->event_data, new_cap);')
219             ->line(' c->event_data_cap = new_cap;')
220             ->line(' }')
221             ->line(' if (c->event_data_len > 0) {')
222             ->line(' c->event_data[c->event_data_len++] = \'\\n\';')
223             ->line(' }')
224             ->line(' memcpy(c->event_data + c->event_data_len, value, value_len);')
225             ->line(' c->event_data_len += value_len;')
226             ->line(' }')
227             ->line(' else if (field_len == 5 && memcmp(field, "event", 5) == 0) {')
228             ->line(' size_t copy_len = value_len < 255 ? value_len : 255;')
229             ->line(' memcpy(c->event_type, value, copy_len);')
230             ->line(' c->event_type[copy_len] = \'\\0\';')
231             ->line(' }')
232             ->line(' else if (field_len == 2 && memcmp(field, "id", 2) == 0) {')
233             ->line(' if (!memchr(value, \'\\0\', value_len)) {')
234             ->line(' size_t copy_len = value_len < 255 ? value_len : 255;')
235             ->line(' memcpy(c->last_id, value, copy_len);')
236             ->line(' c->last_id[copy_len] = \'\\0\';')
237             ->line(' }')
238             ->line(' }')
239             ->line(' else if (field_len == 5 && memcmp(field, "retry", 5) == 0) {')
240             ->line(' int retry = 0;')
241             ->line(' int valid = 1;')
242             ->line(' size_t i;')
243             ->line(' for (i = 0; i < value_len; i++) {')
244             ->line(' if (value[i] >= \'0\' && value[i] <= \'9\') {')
245             ->line(' retry = retry * 10 + (value[i] - \'0\');')
246             ->line(' } else {')
247             ->line(' valid = 0;')
248             ->line(' break;')
249             ->line(' }')
250             ->line(' }')
251             ->line(' if (valid && value_len > 0) {')
252             ->line(' c->retry_ms = retry;')
253             ->line(' }')
254             ->line(' }')
255             ->line(' }')
256             ->blank
257             ->line(' if (event_start > c->buffer) {')
258             ->line(' size_t remaining = end - event_start;')
259             ->line(' memmove(c->buffer, event_start, remaining);')
260             ->line(' c->buffer_len = remaining;')
261             ->line(' }')
262             ->blank
263             ->line(' return events;')
264             ->line('}')
265             ->blank;
266             }
267              
268             sub gen_xs_new {
269 1     1 0 2 my ($class, $builder) = @_;
270              
271 1         30 $builder->comment('Create new SSE object')
272             ->xs_function('xs_sse_new')
273             ->xs_preamble
274             ->line('int fd;')
275             ->line('int retry;')
276             ->line('int reconnect;')
277             ->line('SSEConnection* c;')
278             ->line('AV* obj;')
279             ->line('SV* rv;')
280             ->blank
281             ->line('if (items < 2) croak("Usage: Hypersonic::UA::SSE->new(fd, [retry], [reconnect])");')
282             ->blank
283             ->line('fd = (int)SvIV(ST(1));')
284             ->line('retry = (items > 2) ? SvIV(ST(2)) : 3000;')
285             ->line('reconnect = (items > 3) ? SvIV(ST(3)) : 1;')
286             ->blank
287             ->line('c = sse_alloc(fd);')
288             ->line('if (!c) croak("SSE registry full");')
289             ->blank
290             ->line('c->retry_ms = retry;')
291             ->line('c->reconnect = reconnect;')
292             ->blank
293             ->line('obj = newAV();')
294             ->line('av_extend(obj, 4);')
295             ->line('av_store(obj, 0, newSViv(fd));')
296             ->line('av_store(obj, 1, &PL_sv_undef);')
297             ->line('av_store(obj, 2, newRV_noinc((SV*)newHV()));')
298             ->line('av_store(obj, 3, newSViv(reconnect));')
299             ->line('av_store(obj, 4, newSViv(retry));')
300             ->blank
301             ->line('rv = newRV_noinc((SV*)obj);')
302             ->line('sv_bless(rv, gv_stashpv("Hypersonic::UA::SSE", GV_ADD));')
303             ->line('ST(0) = sv_2mortal(rv);')
304             ->xs_return('1')
305             ->xs_end
306             ->blank;
307             }
308              
309             sub gen_xs_connect {
310 1     1 0 2 my ($class, $builder) = @_;
311              
312 1         29 $builder->comment('Mark SSE connection as established')
313             ->xs_function('xs_sse_connect')
314             ->xs_preamble
315             ->line('AV* obj;')
316             ->line('SV** fd_sv;')
317             ->line('int fd;')
318             ->line('SSEConnection* c;')
319             ->blank
320             ->line('if (items != 1) croak("Usage: $sse->connect()");')
321             ->line('obj = (AV*)SvRV(ST(0));')
322             ->line('fd_sv = av_fetch(obj, 0, 0);')
323             ->line('fd = SvIV(*fd_sv);')
324             ->blank
325             ->line('c = sse_find(fd);')
326             ->line('if (c) {')
327             ->line(' c->connected = 1;')
328             ->line(' ST(0) = &PL_sv_yes;')
329             ->line('} else {')
330             ->line(' ST(0) = &PL_sv_no;')
331             ->line('}')
332             ->xs_return('1')
333             ->xs_end
334             ->blank;
335             }
336              
337             sub gen_xs_parse_events {
338 1     1 0 1 my ($class, $builder) = @_;
339              
340 1         15 $builder->comment('Parse and return pending SSE events')
341             ->xs_function('xs_sse_parse_events')
342             ->xs_preamble
343             ->line('if (items != 1) croak("Usage: $sse->parse_events()");')
344             ->line('AV* obj = (AV*)SvRV(ST(0));')
345             ->line('SV** fd_sv = av_fetch(obj, 0, 0);')
346             ->line('int fd = SvIV(*fd_sv);')
347             ->blank
348             ->line('SSEConnection* c = sse_find(fd);')
349             ->line('if (!c) {')
350             ->line(' ST(0) = sv_2mortal(newRV_noinc((SV*)newAV()));')
351             ->line(' XSRETURN(1);')
352             ->line('}')
353             ->blank
354             ->line('AV* events = sse_parse_events(c);')
355             ->line('ST(0) = sv_2mortal(newRV_noinc((SV*)events));')
356             ->xs_return('1')
357             ->xs_end
358             ->blank;
359             }
360              
361             sub gen_xs_recv_chunk {
362 1     1 0 2 my ($class, $builder) = @_;
363              
364 1         26 $builder->comment('Non-blocking receive for SSE data')
365             ->xs_function('xs_sse_recv_chunk')
366             ->xs_preamble
367             ->line('if (items != 1) croak("Usage: $sse->recv_chunk()");')
368             ->line('AV* obj = (AV*)SvRV(ST(0));')
369             ->line('SV** fd_sv = av_fetch(obj, 0, 0);')
370             ->line('int fd = SvIV(*fd_sv);')
371             ->blank
372             ->line('SSEConnection* c = sse_find(fd);')
373             ->line('if (!c || !c->connected) {')
374             ->line(' ST(0) = &PL_sv_undef;')
375             ->line(' XSRETURN(1);')
376             ->line('}')
377             ->blank
378             ->line('static char recv_buf[65536];')
379             ->line('ssize_t n = recv(fd, recv_buf, sizeof(recv_buf), MSG_DONTWAIT);')
380             ->blank
381             ->line('if (n < 0) {')
382             ->line(' if (errno == EAGAIN || errno == EWOULDBLOCK) {')
383             ->line(' ST(0) = sv_2mortal(newSViv(0));')
384             ->line(' } else {')
385             ->line(' c->connected = 0;')
386             ->line(' ST(0) = sv_2mortal(newSViv(-1));')
387             ->line(' }')
388             ->line(' XSRETURN(1);')
389             ->line('}')
390             ->blank
391             ->line('if (n == 0) {')
392             ->line(' c->connected = 0;')
393             ->line(' ST(0) = &PL_sv_undef;')
394             ->line(' XSRETURN(1);')
395             ->line('}')
396             ->blank
397             ->line('sse_buffer_append(c, recv_buf, n);')
398             ->line('ST(0) = sv_2mortal(newSViv(n));')
399             ->xs_return('1')
400             ->xs_end
401             ->blank;
402             }
403              
404             sub gen_xs_get_last_id {
405 1     1 0 2 my ($class, $builder) = @_;
406              
407 1         43 $builder->comment('Get Last-Event-ID for reconnect')
408             ->xs_function('xs_sse_get_last_id')
409             ->xs_preamble
410             ->line('if (items != 1) croak("Usage: $sse->get_last_id()");')
411             ->line('AV* obj = (AV*)SvRV(ST(0));')
412             ->line('SV** fd_sv = av_fetch(obj, 0, 0);')
413             ->line('int fd = SvIV(*fd_sv);')
414             ->blank
415             ->line('SSEConnection* c = sse_find(fd);')
416             ->line('if (c && c->last_id[0]) {')
417             ->line(' ST(0) = sv_2mortal(newSVpv(c->last_id, 0));')
418             ->line('} else {')
419             ->line(' ST(0) = &PL_sv_undef;')
420             ->line('}')
421             ->xs_return('1')
422             ->xs_end
423             ->blank;
424             }
425              
426             sub gen_xs_set_retry {
427 1     1 0 2 my ($class, $builder) = @_;
428              
429 1         18 $builder->comment('Set retry interval')
430             ->xs_function('xs_sse_set_retry')
431             ->xs_preamble
432             ->line('if (items != 2) croak("Usage: $sse->set_retry(ms)");')
433             ->line('AV* obj = (AV*)SvRV(ST(0));')
434             ->line('SV** fd_sv = av_fetch(obj, 0, 0);')
435             ->line('int fd = SvIV(*fd_sv);')
436             ->line('int retry_ms = SvIV(ST(1));')
437             ->blank
438             ->line('SSEConnection* c = sse_find(fd);')
439             ->line('if (c) {')
440             ->line(' c->retry_ms = retry_ms;')
441             ->line(' ST(0) = &PL_sv_yes;')
442             ->line('} else {')
443             ->line(' ST(0) = &PL_sv_no;')
444             ->line('}')
445             ->xs_return('1')
446             ->xs_end
447             ->blank;
448             }
449              
450             sub gen_xs_close {
451 1     1 0 2 my ($class, $builder) = @_;
452              
453 1         16 $builder->comment('Close SSE connection')
454             ->xs_function('xs_sse_close')
455             ->xs_preamble
456             ->line('if (items != 1) croak("Usage: $sse->close()");')
457             ->line('AV* obj = (AV*)SvRV(ST(0));')
458             ->line('SV** fd_sv = av_fetch(obj, 0, 0);')
459             ->line('int fd = SvIV(*fd_sv);')
460             ->blank
461             ->line('SSEConnection* c = sse_find(fd);')
462             ->line('if (c) {')
463             ->line(' c->connected = 0;')
464             ->line(' c->reconnect = 0;')
465             ->line(' close(fd);')
466             ->line(' sse_free(c);')
467             ->line('}')
468             ->blank
469             ->line('ST(0) = &PL_sv_yes;')
470             ->xs_return('1')
471             ->xs_end
472             ->blank;
473             }
474              
475             # Perl methods for callback management
476             sub on {
477 0     0 0   my ($self, $event_type, $callback) = @_;
478 0           my $callbacks = $self->[SLOT_CALLBACKS];
479 0           $callbacks->{$event_type} = $callback;
480 0           return $self;
481             }
482              
483             sub emit {
484 0     0 0   my ($self, $event_type, $event) = @_;
485 0           my $callbacks = $self->[SLOT_CALLBACKS];
486 0 0         if (my $cb = $callbacks->{$event_type}) {
487 0           $cb->($event);
488             }
489 0 0         if (my $cb = $callbacks->{'*'}) {
490 0           $cb->($event_type, $event);
491             }
492             }
493              
494             1;