File Coverage

blib/lib/Hypersonic/UA/Async.pm
Criterion Covered Total %
statement 111 112 99.1
branch 2 4 50.0
condition 1 2 50.0
subroutine 24 24 100.0
pod 0 16 0.0
total 138 158 87.3


line stmt bran cond sub pod time code
1             package Hypersonic::UA::Async;
2              
3 12     12   229797 use strict;
  12         43  
  12         614  
4 12     12   60 use warnings;
  12         25  
  12         1006  
5 12     12   263 use 5.010;
  12         44  
6              
7             our $VERSION = '0.15';
8              
9             # Use Hypersonic::Event for backend detection
10 12     12   7395 use Hypersonic::Event;
  12         36  
  12         606  
11              
12             # Maximum concurrent async contexts
13 12     12   89 use constant MAX_ASYNC_CONTEXTS => 1024;
  12         24  
  12         1410  
14              
15             # Async context states
16             use constant {
17 12         1455 STATE_CONNECTING => 0,
18             STATE_TLS => 1,
19             STATE_SENDING => 2,
20             STATE_RECEIVING => 3,
21             STATE_DONE => 4,
22             STATE_ERROR => 5,
23             STATE_CANCELLED => 6,
24 12     12   65 };
  12         36  
25              
26             # Poll wait events
27             use constant {
28 12         742 WAIT_NONE => 0,
29             WAIT_READ => 1,
30             WAIT_WRITE => 2,
31 12     12   94 };
  12         23  
32              
33             # Object slots (array-based for performance)
34             use constant {
35 12         47270 SLOT_ID => 0,
36             SLOT_UA => 1,
37             SLOT_FUTURE => 2,
38 12     12   54 };
  12         32  
39              
40             sub generate_c_code {
41 5     5 0 13280 my ($class, $builder, $opts) = @_;
42              
43 5         33 $class->gen_async_context_registry($builder, $opts);
44 5         91 $class->gen_async_poll_one($builder);
45 5         40 $class->gen_async_advance_state($builder);
46 5         53 $class->gen_xs_start_request($builder);
47 5         46 $class->gen_xs_poll($builder);
48 5         26 $class->gen_xs_poll_batch($builder, $opts);
49 5         35 $class->gen_xs_get_fd($builder);
50 5         49 $class->gen_xs_get_events($builder);
51 5         62 $class->gen_xs_cancel($builder);
52 5         28 $class->gen_xs_cleanup($builder);
53 5         25 $class->gen_xs_get_future($builder);
54 5         24 $class->gen_xs_get_state($builder);
55 5         17 $class->gen_xs_get_result($builder);
56            
57             # Generate tick here after all async structures/headers are defined
58 5         22 $class->gen_xs_tick($builder, $opts);
59             }
60              
61             sub get_xs_functions {
62             return {
63 3     3 0 8227 'Hypersonic::UA::Async::start_request' => { source => 'xs_async_start_request', is_xs_native => 1 },
64             'Hypersonic::UA::Async::poll' => { source => 'xs_async_poll', is_xs_native => 1 },
65             'Hypersonic::UA::Async::poll_batch' => { source => 'xs_async_poll_batch', is_xs_native => 1 },
66             'Hypersonic::UA::Async::get_fd' => { source => 'xs_async_get_fd', is_xs_native => 1 },
67             'Hypersonic::UA::Async::get_events' => { source => 'xs_async_get_events', is_xs_native => 1 },
68             'Hypersonic::UA::Async::cancel' => { source => 'xs_async_cancel', is_xs_native => 1 },
69             'Hypersonic::UA::Async::cleanup' => { source => 'xs_async_cleanup', is_xs_native => 1 },
70             'Hypersonic::UA::Async::get_future' => { source => 'xs_async_get_future', is_xs_native => 1 },
71             'Hypersonic::UA::Async::get_state' => { source => 'xs_async_get_state', is_xs_native => 1 },
72             'Hypersonic::UA::Async::get_result' => { source => 'xs_async_get_result', is_xs_native => 1 },
73             'Hypersonic::UA::tick' => { source => 'xs_ua_tick', is_xs_native => 1 },
74             };
75             }
76              
77             sub gen_async_context_registry {
78 28     28 0 6649549 my ($class, $builder, $opts) = @_;
79              
80             # Get the best event backend for this platform
81 28         902 my $backend_name = Hypersonic::Event->best_backend;
82 28         215 my $event_backend = Hypersonic::Event->backend($backend_name);
83            
84             # Store backend for other methods to use
85 28         211 $opts->{event_backend} = $event_backend;
86 28         162 $opts->{event_backend_name} = $backend_name;
87              
88             # Add required includes for networking
89 28         553 $builder->line('#include ')
90             ->line('#include ')
91             ->line('#include ')
92             ->line('#include ')
93             ->line('#include ')
94             ->line('#include ');
95            
96             # Add event backend includes and defines
97 28         133 $builder->line($event_backend->includes)
98             ->line($event_backend->defines)
99             ->blank;
100            
101 28         3861 $builder->line('#define MAX_ASYNC_CONTEXTS 1024')
102             ->line('#ifndef MAX_EVENTS')
103             ->line('#define MAX_EVENTS 256')
104             ->line('#endif')
105             ->blank
106             ->line('#define ASYNC_STATE_CONNECTING 0')
107             ->line('#define ASYNC_STATE_TLS 1')
108             ->line('#define ASYNC_STATE_SENDING 2')
109             ->line('#define ASYNC_STATE_RECEIVING 3')
110             ->line('#define ASYNC_STATE_DONE 4')
111             ->line('#define ASYNC_STATE_ERROR 5')
112             ->line('#define ASYNC_STATE_CANCELLED 6')
113             ->blank
114             ->line('#define ASYNC_WAIT_NONE 0')
115             ->line('#define ASYNC_WAIT_READ 1')
116             ->line('#define ASYNC_WAIT_WRITE 2')
117             ->blank
118             ->line('typedef struct {')
119             ->line(' int fd;')
120             ->line(' void *ssl;') # Use void* instead of SSL* to avoid OpenSSL dependency
121             ->line(' int state;')
122             ->line(' int tls;')
123             ->line(' char *host;')
124             ->line(' int port;')
125             ->line(' char *request;')
126             ->line(' size_t request_len;')
127             ->line(' size_t request_sent;')
128             ->line(' char *recv_buffer;')
129             ->line(' size_t recv_buffer_len;')
130             ->line(' size_t recv_buffer_cap;')
131             ->line(' SV *future_sv;')
132             ->line(' SV *callback;')
133             ->line(' time_t deadline;')
134             ->line(' char *error;')
135             ->line(' int in_use;')
136             ->line('} AsyncContext;')
137             ->blank
138             ->line('static AsyncContext async_registry[MAX_ASYNC_CONTEXTS];')
139             ->line('static int async_ev_fd = -1;') # event loop fd (kqueue/epoll) for batched polling
140             ->blank
141             ->comment('Async connection pool for keep-alive')
142             ->line('#define ASYNC_POOL_SIZE 512')
143             ->line('typedef struct {')
144             ->line(' int fd;')
145             ->line(' char host[256];')
146             ->line(' int port;')
147             ->line(' time_t expires;')
148             ->line('} AsyncPooledConn;')
149             ->blank
150             ->line('static AsyncPooledConn async_conn_pool[ASYNC_POOL_SIZE];')
151             ->blank
152             ->comment('Get a pooled connection')
153             ->line('static int async_pool_get(const char *host, int port) {')
154             ->line(' int i;')
155             ->line(' time_t now = time(NULL);')
156             ->line(' for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
157             ->line(' if (async_conn_pool[i].fd > 0 && async_conn_pool[i].port == port &&')
158             ->line(' strcmp(async_conn_pool[i].host, host) == 0 && async_conn_pool[i].expires > now) {')
159             ->line(' int fd = async_conn_pool[i].fd;')
160             ->line(' async_conn_pool[i].fd = 0;')
161             ->line(' return fd;')
162             ->line(' }')
163             ->line(' }')
164             ->line(' return -1;')
165             ->line('}')
166             ->blank
167             ->comment('Return connection to pool (10 second keep-alive)')
168             ->line('static void async_pool_put(int fd, const char *host, int port) {')
169             ->line(' int i;')
170             ->line(' if (fd < 0) return;')
171             ->line(' time_t now = time(NULL);')
172             ->line(' for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
173             ->line(' if (async_conn_pool[i].fd <= 0 || async_conn_pool[i].expires <= now) {')
174             ->line(' if (async_conn_pool[i].fd > 0) close(async_conn_pool[i].fd);')
175             ->line(' async_conn_pool[i].fd = fd;')
176             ->line(' strncpy(async_conn_pool[i].host, host, 255);')
177             ->line(' async_conn_pool[i].host[255] = 0;')
178             ->line(' async_conn_pool[i].port = port;')
179             ->line(' async_conn_pool[i].expires = now + 10;')
180             ->line(' return;')
181             ->line(' }')
182             ->line(' }')
183             ->line(' close(fd);')
184             ->line('}')
185             ->blank
186             ->line('static int async_alloc_slot(void) {')
187             ->line(' int i;')
188             ->line(' for (i = 0; i < MAX_ASYNC_CONTEXTS; i++) {')
189             ->line(' if (!async_registry[i].in_use) {')
190             ->line(' memset(&async_registry[i], 0, sizeof(AsyncContext));')
191             ->line(' async_registry[i].in_use = 1;')
192             ->line(' async_registry[i].fd = -1;')
193             ->line(' async_registry[i].future_sv = NULL;')
194             ->line(' return i;')
195             ->line(' }')
196             ->line(' }')
197             ->line(' return -1;')
198             ->line('}')
199             ->blank
200             ->line('static void async_free_slot(int slot) {')
201             ->line(' if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
202             ->line(' AsyncContext *ctx = &async_registry[slot];')
203             ->comment(' Return connection to pool if successful')
204             ->line(' if (ctx->fd >= 0 && ctx->state == ASYNC_STATE_DONE && ctx->host) {')
205             ->line(' async_pool_put(ctx->fd, ctx->host, ctx->port);')
206             ->line(' ctx->fd = -1;')
207             ->line(' }')
208             ->line(' if (ctx->fd >= 0) close(ctx->fd);')
209             ->line(' if (ctx->host) free(ctx->host);')
210             ->line(' if (ctx->request) free(ctx->request);')
211             ->line(' if (ctx->recv_buffer) free(ctx->recv_buffer);')
212             ->line(' if (ctx->error) free(ctx->error);')
213             ->line(' if (ctx->callback) SvREFCNT_dec(ctx->callback);')
214             ->line(' if (ctx->future_sv) SvREFCNT_dec(ctx->future_sv);')
215             ->line(' memset(ctx, 0, sizeof(AsyncContext));')
216             ->line(' }')
217             ->line('}')
218             ->blank;
219             }
220              
221             sub gen_async_poll_one {
222 16     16 0 186 my ($class, $builder) = @_;
223              
224 16         1827 $builder->comment('Poll a single async context, return events needed')
225             ->line('static int async_poll_one(int slot) {')
226             ->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) return ASYNC_WAIT_NONE;')
227             ->line(' AsyncContext *ctx = &async_registry[slot];')
228             ->line(' if (!ctx->in_use) return ASYNC_WAIT_NONE;')
229             ->blank
230             ->line(' switch (ctx->state) {')
231             ->line(' case ASYNC_STATE_CONNECTING:')
232             ->line(' return ASYNC_WAIT_WRITE;')
233             ->blank
234             ->line(' case ASYNC_STATE_TLS:')
235             ->line(' return ASYNC_WAIT_WRITE;')
236             ->blank
237             ->line(' case ASYNC_STATE_SENDING: {')
238             ->line(' ssize_t n = send(ctx->fd, ctx->request + ctx->request_sent,')
239             ->line(' ctx->request_len - ctx->request_sent, MSG_DONTWAIT);')
240             ->line(' if (n > 0) {')
241             ->line(' ctx->request_sent += n;')
242             ->line(' if (ctx->request_sent >= ctx->request_len) {')
243             ->line(' ctx->state = ASYNC_STATE_RECEIVING;')
244             ->line(' return ASYNC_WAIT_READ;')
245             ->line(' }')
246             ->line(' } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {')
247             ->line(' ctx->state = ASYNC_STATE_ERROR;')
248             ->line(' ctx->error = strdup("send failed");')
249             ->line(' return ASYNC_WAIT_NONE;')
250             ->line(' }')
251             ->line(' return ASYNC_WAIT_WRITE;')
252             ->line(' }')
253             ->blank
254             ->line(' case ASYNC_STATE_RECEIVING: {')
255             ->line(' if (!ctx->recv_buffer) {')
256             ->line(' ctx->recv_buffer_cap = 8192;')
257             ->line(' ctx->recv_buffer = (char *)malloc(ctx->recv_buffer_cap);')
258             ->line(' ctx->recv_buffer_len = 0;')
259             ->line(' }')
260             ->blank
261             ->line(' if (ctx->recv_buffer_len >= ctx->recv_buffer_cap - 1) {')
262             ->line(' ctx->recv_buffer_cap *= 2;')
263             ->line(' ctx->recv_buffer = (char *)realloc(ctx->recv_buffer, ctx->recv_buffer_cap);')
264             ->line(' }')
265             ->blank
266             ->line(' ssize_t n = recv(ctx->fd, ctx->recv_buffer + ctx->recv_buffer_len,')
267             ->line(' ctx->recv_buffer_cap - ctx->recv_buffer_len - 1, MSG_DONTWAIT);')
268             ->line(' if (n > 0) {')
269             ->line(' ctx->recv_buffer_len += n;')
270             ->line(' ctx->recv_buffer[ctx->recv_buffer_len] = 0;')
271             ->blank
272             ->comment(' Check if response is complete (Content-Length or connection close)')
273             ->line(' char *headers_end = strstr(ctx->recv_buffer, "\\r\\n\\r\\n");')
274             ->line(' if (headers_end) {')
275             ->line(' char *body_start = headers_end + 4;')
276             ->line(' size_t body_received = ctx->recv_buffer_len - (body_start - ctx->recv_buffer);')
277             ->blank
278             ->comment(' Parse Content-Length')
279             ->line(' char *cl = strcasestr(ctx->recv_buffer, "Content-Length:");')
280             ->line(' if (cl && cl < headers_end) {')
281             ->line(' size_t content_length = atol(cl + 15);')
282             ->line(' if (body_received >= content_length) {')
283             ->line(' ctx->state = ASYNC_STATE_DONE;')
284             ->line(' return ASYNC_WAIT_NONE;')
285             ->line(' }')
286             ->line(' } else {')
287             ->comment(' No Content-Length - check for Transfer-Encoding: chunked')
288             ->line(' char *te = strcasestr(ctx->recv_buffer, "Transfer-Encoding:");')
289             ->line(' if (te && te < headers_end && strcasestr(te, "chunked")) {')
290             ->comment(' Check for chunked end marker')
291             ->line(' if (strstr(body_start, "\\r\\n0\\r\\n\\r\\n") || ')
292             ->line(' (body_received >= 5 && memcmp(body_start, "0\\r\\n\\r\\n", 5) == 0)) {')
293             ->line(' ctx->state = ASYNC_STATE_DONE;')
294             ->line(' return ASYNC_WAIT_NONE;')
295             ->line(' }')
296             ->line(' }')
297             ->line(' }')
298             ->line(' }')
299             ->blank
300             ->line(' return ASYNC_WAIT_READ;')
301             ->line(' } else if (n == 0) {')
302             ->line(' ctx->state = ASYNC_STATE_DONE;')
303             ->line(' return ASYNC_WAIT_NONE;')
304             ->line(' } else if (errno != EAGAIN && errno != EWOULDBLOCK) {')
305             ->line(' ctx->state = ASYNC_STATE_ERROR;')
306             ->line(' ctx->error = strdup("recv failed");')
307             ->line(' return ASYNC_WAIT_NONE;')
308             ->line(' }')
309             ->line(' return ASYNC_WAIT_READ;')
310             ->line(' }')
311             ->blank
312             ->line(' case ASYNC_STATE_DONE:')
313             ->line(' case ASYNC_STATE_ERROR:')
314             ->line(' case ASYNC_STATE_CANCELLED:')
315             ->line(' return ASYNC_WAIT_NONE;')
316             ->line(' }')
317             ->blank
318             ->line(' return ASYNC_WAIT_NONE;')
319             ->line('}')
320             ->blank;
321             }
322              
323             sub gen_async_advance_state {
324 7     7 0 34 my ($class, $builder) = @_;
325            
326 7         187 $builder->comment('Advance state for a single slot (check connect, send, recv)')
327             ->line('static int async_advance_state(int slot) {')
328             ->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) return ASYNC_WAIT_NONE;')
329             ->line(' AsyncContext *ctx = &async_registry[slot];')
330             ->line(' if (!ctx->in_use) return ASYNC_WAIT_NONE;')
331             ->blank
332             ->line(' if (ctx->state == ASYNC_STATE_CONNECTING) {')
333             ->line(' int err = 0;')
334             ->line(' socklen_t errlen = sizeof(err);')
335             ->line(' getsockopt(ctx->fd, SOL_SOCKET, SO_ERROR, &err, &errlen);')
336             ->line(' if (err == 0) {')
337             ->line(' ctx->state = ASYNC_STATE_SENDING;')
338             ->line(' } else {')
339             ->line(' ctx->state = ASYNC_STATE_ERROR;')
340             ->line(' ctx->error = strdup("connect failed");')
341             ->line(' return ASYNC_WAIT_NONE;')
342             ->line(' }')
343             ->line(' }')
344             ->blank
345             ->line(' return async_poll_one(slot);')
346             ->line('}')
347             ->blank;
348             }
349              
350             sub gen_xs_poll_batch {
351 5     5 0 14 my ($class, $builder, $opts) = @_;
352              
353 5         22 my $event_backend = $opts->{event_backend};
354 5         8 my $backend_name = $opts->{event_backend_name};
355              
356             # Check if this backend supports the slot API (kqueue, epoll)
357 5         51 my $use_advanced = $backend_name =~ /^(kqueue|epoll)$/;
358              
359 5         81 $builder->comment("Batch poll all pending slots using $backend_name")
360             ->xs_function('xs_async_poll_batch')
361             ->xs_preamble
362             ->line('int i;')
363             ->line('int slot_count;')
364             ->line('int registered;')
365             ->line('int nev;')
366             ->line('AV *ready;');
367              
368             # Add event struct declaration for advanced backends (C89 compliance).
369             # Use slot_event_struct (not event_struct): io_uring's slot tracking
370             # is implemented on a private epoll fd, so the buffer passed to
371             # gen_wait_once must be epoll_event[] there, not io_uring_cqe[].
372 5 50       22 if ($use_advanced) {
373 5         61 my $event_struct = $event_backend->slot_event_struct;
374 5         20 $builder->line("struct $event_struct events[MAX_EVENTS];");
375             }
376              
377 5         21 $builder->line('if (items < 1) croak("Usage: poll_batch(@slots)");')
378             ->blank;
379            
380 5 50       22 if ($use_advanced) {
381             # Create event loop if needed
382 5         21 $builder->comment('Create event loop if not exists')
383             ->line('if (async_ev_fd < 0) {');
384            
385             # Use the backend's gen_create_loop method
386 5         33 $event_backend->gen_create_loop($builder, 'async_ev_fd');
387            
388 5         18 $builder->line('}')
389             ->blank;
390            
391             # Collect slots and register with event loop
392 5         99 $builder->comment('Collect all pending slots and register with event loop')
393             ->line('slot_count = items;')
394             ->line('registered = 0;')
395             ->blank
396             ->line('for (i = 0; i < slot_count; i++) {')
397             ->line(' int slot = SvIV(ST(i));')
398             ->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) continue;')
399             ->line(' AsyncContext *ctx = &async_registry[slot];')
400             ->line(' if (!ctx->in_use || ctx->fd < 0) continue;')
401             ->blank
402             ->line(' int events = async_poll_one(slot);')
403             ->line(' if (events == ASYNC_WAIT_NONE) continue;')
404             ->blank;
405            
406             # Add for read or write based on what we need
407 5         20 $builder->line(' if (events == ASYNC_WAIT_READ) {');
408 5         23 $event_backend->gen_add_with_slot($builder, 'async_ev_fd', 'ctx->fd', 'slot', 'read');
409 5         19 $builder->line(' } else {');
410 5         17 $event_backend->gen_add_with_slot($builder, 'async_ev_fd', 'ctx->fd', 'slot', 'write');
411 5         36 $builder->line(' }')
412             ->line(' registered++;')
413             ->line('}')
414             ->blank;
415            
416             # Wait for events (events array already declared at top for C89 compliance)
417 5         14 $builder->comment('Wait for events (short timeout for responsiveness)');
418            
419             # Use gen_wait_once (no loop control statements)
420 5         21 $event_backend->gen_wait_once($builder, 'async_ev_fd', 'events', 'nev', '1'); # 1ms timeout
421            
422             # Process ready events
423 5         33 $builder->blank
424             ->comment('Process ready events')
425             ->line('ready = newAV();')
426             ->line('for (i = 0; i < nev; i++) {');
427            
428             # Get slot from event
429 5         18 $event_backend->gen_get_slot($builder, 'events', 'i', 'slot');
430            
431 5         129 $builder->line(' if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
432             ->line(' int result = async_advance_state(slot);')
433             ->line(' if (result == ASYNC_WAIT_NONE) {')
434             ->line(' av_push(ready, newSViv(slot));')
435             ->line(' }')
436             ->line(' }')
437             ->line('}')
438             ->blank
439             ->line('ST(0) = sv_2mortal(newRV_noinc((SV *)ready));')
440             ->xs_return('1')
441             ->xs_end
442             ->blank;
443             } else {
444             # Fallback to select-based implementation for other backends.
445             # NOTE: `ready` is already declared at the top of the function
446             # (line ~366 above); declaring `AV *ready = newAV();` here is a
447             # C redeclaration-with-no-linkage error on gcc >= 14 and on the
448             # Fedora 43 smoker (perl 5.38.5). Just assign.
449 0         0 $builder->comment('Fallback to select() for portability')
450             ->line('ready = newAV();')
451             ->line('for (i = 0; i < items; i++) {')
452             ->line(' int slot = SvIV(ST(i));')
453             ->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) continue;')
454             ->line(' AsyncContext *ctx = &async_registry[slot];')
455             ->line(' if (!ctx->in_use || ctx->fd < 0) continue;')
456             ->blank
457             ->line(' int events = async_poll_one(slot);')
458             ->line(' if (events == ASYNC_WAIT_NONE) {')
459             ->line(' av_push(ready, newSViv(slot));')
460             ->line(' continue;')
461             ->line(' }')
462             ->blank
463             ->comment('Check readiness with select')
464             ->line(' fd_set rfds, wfds;')
465             ->line(' FD_ZERO(&rfds); FD_ZERO(&wfds);')
466             ->line(' if (events == ASYNC_WAIT_READ) FD_SET(ctx->fd, &rfds);')
467             ->line(' if (events == ASYNC_WAIT_WRITE) FD_SET(ctx->fd, &wfds);')
468             ->line(' struct timeval tv = {0, 0};')
469             ->line(' int sel = select(ctx->fd + 1, &rfds, &wfds, NULL, &tv);')
470             ->line(' if (sel > 0) {')
471             ->line(' int result = async_advance_state(slot);')
472             ->line(' if (result == ASYNC_WAIT_NONE) {')
473             ->line(' av_push(ready, newSViv(slot));')
474             ->line(' }')
475             ->line(' }')
476             ->line('}')
477             ->blank
478             ->line('ST(0) = sv_2mortal(newRV_noinc((SV *)ready));')
479             ->xs_return('1')
480             ->xs_end
481             ->blank;
482             }
483             }
484              
485             sub gen_xs_start_request {
486 10     10 0 135 my ($class, $builder) = @_;
487              
488 10         1957 $builder->comment('Start an async request')
489             ->xs_function('xs_async_start_request')
490             ->xs_preamble
491             ->line('if (items < 5) croak("Usage: start_request($method, $url, $body, $future_or_cb, $ua_sv)");')
492             ->blank
493             ->line('SV *method_sv = ST(0);')
494             ->line('SV *url_sv = ST(1);')
495             ->line('SV *body_sv = ST(2);')
496             ->line('SV *future_or_cb = ST(3);')
497             ->line('SV *ua_sv = ST(4);')
498             ->blank
499             ->line('int slot = async_alloc_slot();')
500             ->line('if (slot < 0) croak("Too many async requests");')
501             ->blank
502             ->line('AsyncContext *ctx = &async_registry[slot];')
503             ->blank
504             ->comment('Parse URL')
505             ->line('STRLEN url_len;')
506             ->line('const char *url = SvPV(url_sv, url_len);')
507             ->blank
508             ->line('const char *scheme_end = strstr(url, "://");')
509             ->line('if (!scheme_end) {')
510             ->line(' async_free_slot(slot);')
511             ->line(' croak("Invalid URL");')
512             ->line('}')
513             ->blank
514             ->line('ctx->tls = (scheme_end - url == 5 && memcmp(url, "https", 5) == 0);')
515             ->blank
516             ->line('const char *host_start = scheme_end + 3;')
517             ->line('const char *host_end = host_start;')
518             ->line('ctx->port = ctx->tls ? 443 : 80;')
519             ->blank
520             ->line('while (*host_end && *host_end != \':\' && *host_end != \'/\') host_end++;')
521             ->blank
522             ->line('int host_len = host_end - host_start;')
523             ->line('ctx->host = (char *)malloc(host_len + 1);')
524             ->line('memcpy(ctx->host, host_start, host_len);')
525             ->line('ctx->host[host_len] = 0;')
526             ->blank
527             ->if('*host_end == \':\'')
528             ->line('ctx->port = atoi(host_end + 1);')
529             ->line('while (*host_end && *host_end != \'/\') host_end++;')
530             ->endif
531             ->blank
532             ->line('const char *path = (*host_end == \'/\') ? host_end : "/";')
533             ->blank
534             ->comment('Try to get a pooled connection')
535             ->line('int pooled_fd = async_pool_get(ctx->host, ctx->port);')
536             ->blank
537             ->comment('Build request')
538             ->line('STRLEN method_len;')
539             ->line('const char *method = SvPV(method_sv, method_len);')
540             ->blank
541             ->line('STRLEN body_len = 0;')
542             ->line('const char *body = NULL;')
543             ->if('SvOK(body_sv)')
544             ->line('body = SvPV(body_sv, body_len);')
545             ->endif
546             ->blank
547             ->line('size_t req_cap = method_len + strlen(path) + host_len + 128 + body_len;')
548             ->line('ctx->request = (char *)malloc(req_cap);')
549             ->blank
550             ->line('int req_len = snprintf(ctx->request, req_cap,')
551             ->line(' "%s %s HTTP/1.1\\r\\n"')
552             ->line(' "Host: %s\\r\\n"')
553             ->line(' "Connection: keep-alive\\r\\n"')
554             ->line(' "User-Agent: Hypersonic/1.0\\r\\n",')
555             ->line(' method, path, ctx->host);')
556             ->blank
557             ->if('body_len > 0')
558             ->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len,')
559             ->line(' "Content-Length: %zu\\r\\n\\r\\n", body_len);')
560             ->line('memcpy(ctx->request + req_len, body, body_len);')
561             ->line('req_len += body_len;')
562             ->else
563             ->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len, "\\r\\n");')
564             ->endif
565             ->blank
566             ->line('ctx->request_len = req_len;')
567             ->line('ctx->request_sent = 0;')
568             ->blank
569             ->comment('Store callback or future')
570             ->if('SvROK(future_or_cb) && SvTYPE(SvRV(future_or_cb)) == SVt_PVCV')
571             ->line('ctx->callback = SvREFCNT_inc(future_or_cb);')
572             ->line('ctx->future_sv = NULL;')
573             ->else
574             ->comment('Store the future SV directly')
575             ->line('ctx->future_sv = SvREFCNT_inc(future_or_cb);')
576             ->line('ctx->callback = NULL;')
577             ->endif
578             ->blank
579             ->comment('Use pooled connection if available, otherwise create new socket')
580             ->if('pooled_fd >= 0')
581             ->line('ctx->fd = pooled_fd;')
582             ->line('ctx->state = ASYNC_STATE_SENDING;')
583             ->else
584             ->comment('Create socket and set non-blocking')
585             ->line('ctx->fd = socket(AF_INET, SOCK_STREAM, 0);')
586             ->line('if (ctx->fd < 0) {')
587             ->line(' async_free_slot(slot);')
588             ->line(' croak("socket() failed");')
589             ->line('}')
590             ->line('int opt = 1;')
591             ->line('setsockopt(ctx->fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));')
592             ->line('int flags = fcntl(ctx->fd, F_GETFL, 0);')
593             ->line('fcntl(ctx->fd, F_SETFL, flags | O_NONBLOCK);')
594             ->blank
595             ->comment('Start async connect')
596             ->line('struct hostent *he = gethostbyname(ctx->host);')
597             ->if('!he')
598             ->line('async_free_slot(slot);')
599             ->line('croak("DNS resolution failed");')
600             ->endif
601             ->blank
602             ->line('struct sockaddr_in addr;')
603             ->line('memset(&addr, 0, sizeof(addr));')
604             ->line('addr.sin_family = AF_INET;')
605             ->line('addr.sin_port = htons(ctx->port);')
606             ->line('memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length);')
607             ->blank
608             ->line('int rc = connect(ctx->fd, (struct sockaddr *)&addr, sizeof(addr));')
609             ->if('rc < 0 && errno != EINPROGRESS')
610             ->line('async_free_slot(slot);')
611             ->line('croak("connect() failed");')
612             ->endif
613             ->blank
614             ->line('ctx->state = ASYNC_STATE_CONNECTING;')
615             ->endif
616             ->blank
617             ->comment('Auto-tick: process all pending async requests immediately')
618             ->comment('This drives the event loop without requiring manual tick() calls')
619             ->if('SvROK(ua_sv)')
620             ->line('dSP;')
621             ->line('ENTER; SAVETMPS;')
622             ->line('PUSHMARK(SP);')
623             ->line('XPUSHs(ua_sv);')
624             ->line('PUTBACK;')
625             ->line('call_method("tick", G_DISCARD);')
626             ->line('FREETMPS; LEAVE;')
627             ->endif
628             ->blank
629             ->line('ST(0) = sv_2mortal(newSViv(slot));')
630             ->xs_return('1')
631             ->xs_end
632             ->blank;
633             }
634              
635             sub gen_xs_poll {
636 7     7 0 70 my ($class, $builder) = @_;
637              
638 7         641 $builder->comment('Poll async context, advance state machine')
639             ->xs_function('xs_async_poll')
640             ->xs_preamble
641             ->line('if (items < 1) croak("Usage: poll($slot)");')
642             ->blank
643             ->line('int slot = SvIV(ST(0));')
644             ->line('if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) {')
645             ->line(' ST(0) = sv_2mortal(newSViv(ASYNC_WAIT_NONE));')
646             ->line(' XSRETURN(1);')
647             ->line('}')
648             ->blank
649             ->line('AsyncContext *ctx = &async_registry[slot];')
650             ->if('!ctx->in_use')
651             ->line('ST(0) = sv_2mortal(newSViv(ASYNC_WAIT_NONE));')
652             ->line('XSRETURN(1);')
653             ->endif
654             ->blank
655             ->comment('Check connect completion if connecting')
656             ->if('ctx->state == ASYNC_STATE_CONNECTING')
657             ->comment('Use select to check if socket is writable (connect complete)')
658             ->line('fd_set wfds;')
659             ->line('FD_ZERO(&wfds);')
660             ->line('FD_SET(ctx->fd, &wfds);')
661             ->line('struct timeval tv = {0, 0};')
662             ->line('int sel = select(ctx->fd + 1, NULL, &wfds, NULL, &tv);')
663             ->if('sel > 0 && FD_ISSET(ctx->fd, &wfds)')
664             ->comment('Socket is writable - check for actual connection')
665             ->line('int err = 0;')
666             ->line('socklen_t errlen = sizeof(err);')
667             ->line('getsockopt(ctx->fd, SOL_SOCKET, SO_ERROR, &err, &errlen);')
668             ->if('err == 0')
669             ->comment('Connected, move to sending')
670             ->line('ctx->state = ASYNC_STATE_SENDING;')
671             ->else
672             ->line('ctx->state = ASYNC_STATE_ERROR;')
673             ->line('ctx->error = strdup("connect failed");')
674             ->endif
675             ->endif
676             ->endif
677             ->blank
678             ->line('int events = async_poll_one(slot);')
679             ->line('ST(0) = sv_2mortal(newSViv(events));')
680             ->xs_return('1')
681             ->xs_end
682             ->blank;
683             }
684              
685             sub gen_xs_get_fd {
686 6     6 0 58 my ($class, $builder) = @_;
687              
688 6         153 $builder->comment('Get file descriptor for async context')
689             ->xs_function('xs_async_get_fd')
690             ->xs_preamble
691             ->line('if (items < 1) croak("Usage: get_fd($slot)");')
692             ->blank
693             ->line('int slot = SvIV(ST(0));')
694             ->line('if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS || !async_registry[slot].in_use) {')
695             ->line(' ST(0) = sv_2mortal(newSViv(-1));')
696             ->line(' XSRETURN(1);')
697             ->line('}')
698             ->blank
699             ->line('ST(0) = sv_2mortal(newSViv(async_registry[slot].fd));')
700             ->xs_return('1')
701             ->xs_end
702             ->blank;
703             }
704              
705             sub gen_xs_get_events {
706 6     6 0 30 my ($class, $builder) = @_;
707              
708 6         121 $builder->comment('Get events needed for async context')
709             ->xs_function('xs_async_get_events')
710             ->xs_preamble
711             ->line('if (items < 1) croak("Usage: get_events($slot)");')
712             ->blank
713             ->line('int slot = SvIV(ST(0));')
714             ->line('int events = async_poll_one(slot);')
715             ->line('ST(0) = sv_2mortal(newSViv(events));')
716             ->xs_return('1')
717             ->xs_end
718             ->blank;
719             }
720              
721             sub gen_xs_cancel {
722 6     6 0 67 my ($class, $builder) = @_;
723              
724 6         187 $builder->comment('Cancel async request')
725             ->xs_function('xs_async_cancel')
726             ->xs_preamble
727             ->line('if (items < 1) croak("Usage: cancel($slot)");')
728             ->blank
729             ->line('int slot = SvIV(ST(0));')
730             ->line('if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS && async_registry[slot].in_use) {')
731             ->line(' async_registry[slot].state = ASYNC_STATE_CANCELLED;')
732             ->line('}')
733             ->xs_return('0')
734             ->xs_end
735             ->blank;
736             }
737              
738             sub gen_xs_cleanup {
739 6     6 0 42 my ($class, $builder) = @_;
740              
741 6         131 $builder->comment('Cleanup completed async request')
742             ->xs_function('xs_async_cleanup')
743             ->xs_preamble
744             ->line('if (items < 1) croak("Usage: cleanup($slot)");')
745             ->blank
746             ->line('int slot = SvIV(ST(0));')
747             ->line('async_free_slot(slot);')
748             ->xs_return('0')
749             ->xs_end
750             ->blank;
751             }
752              
753             sub gen_xs_get_future {
754 6     6 0 42 my ($class, $builder) = @_;
755              
756 6         221 $builder->comment('Get future SV for async context')
757             ->xs_function('xs_async_get_future')
758             ->xs_preamble
759             ->line('if (items < 1) croak("Usage: get_future($slot)");')
760             ->blank
761             ->line('int slot = SvIV(ST(0));')
762             ->line('if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS || !async_registry[slot].in_use) {')
763             ->line(' ST(0) = &PL_sv_undef;')
764             ->line(' XSRETURN(1);')
765             ->line('}')
766             ->blank
767             ->line('SV *future = async_registry[slot].future_sv;')
768             ->line('ST(0) = future ? sv_2mortal(SvREFCNT_inc(future)) : &PL_sv_undef;')
769             ->xs_return('1')
770             ->xs_end
771             ->blank;
772             }
773              
774             sub gen_xs_get_state {
775 5     5 0 11 my ($class, $builder) = @_;
776              
777 5         107 $builder->comment('Get state of async context')
778             ->xs_function('xs_async_get_state')
779             ->xs_preamble
780             ->line('if (items < 1) croak("Usage: get_state($slot)");')
781             ->blank
782             ->line('int slot = SvIV(ST(0));')
783             ->line('if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS || !async_registry[slot].in_use) {')
784             ->line(' ST(0) = sv_2mortal(newSViv(-1));')
785             ->line(' XSRETURN(1);')
786             ->line('}')
787             ->blank
788             ->line('ST(0) = sv_2mortal(newSViv(async_registry[slot].state));')
789             ->xs_return('1')
790             ->xs_end
791             ->blank;
792             }
793              
794             sub gen_xs_get_result {
795 5     5 0 10 my ($class, $builder) = @_;
796              
797 5         225 $builder->comment('Get result buffer from async context')
798             ->xs_function('xs_async_get_result')
799             ->xs_preamble
800             ->line('if (items < 1) croak("Usage: get_result($slot)");')
801             ->blank
802             ->line('int slot = SvIV(ST(0));')
803             ->line('if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS || !async_registry[slot].in_use) {')
804             ->line(' ST(0) = &PL_sv_undef;')
805             ->line(' XSRETURN(1);')
806             ->line('}')
807             ->blank
808             ->line('AsyncContext *ctx = &async_registry[slot];')
809             ->blank
810             ->if('ctx->state == ASYNC_STATE_ERROR && ctx->error')
811             ->comment('Return error as a list (0, error_msg)')
812             ->line('ST(0) = sv_2mortal(newSViv(0));')
813             ->line('ST(1) = sv_2mortal(newSVpv(ctx->error, 0));')
814             ->line('XSRETURN(2);')
815             ->elsif('ctx->state == ASYNC_STATE_DONE && ctx->recv_buffer')
816             ->comment('Return success as a list (1, body)')
817             ->line('ST(0) = sv_2mortal(newSViv(1));')
818             ->line('ST(1) = sv_2mortal(newSVpvn(ctx->recv_buffer, ctx->recv_buffer_len));')
819             ->line('XSRETURN(2);')
820             ->else
821             ->comment('Not ready yet')
822             ->line('ST(0) = &PL_sv_undef;')
823             ->line('XSRETURN(1);')
824             ->endif
825             ->xs_end
826             ->blank;
827             }
828              
829             sub gen_xs_tick {
830 7     7 0 41 my ($class, $builder, $opts) = @_;
831              
832 7         22 my $event_backend = $opts->{event_backend};
833 7   50     31 my $backend_name = $opts->{event_backend_name} // 'kqueue';
834              
835 7         478 $builder->comment("Process pending async events - AUTO-TICKING pure C path ($backend_name)")
836             ->comment('Loops until all requests complete OR no progress for 1ms')
837             ->xs_function('xs_ua_tick')
838             ->xs_preamble
839             ->line('int i;')
840             ->line('int nev;')
841             ->line('I32 j;')
842             ->line('if (items < 1) croak("Usage: $ua->tick()");')
843             ->blank
844             ->line('SV *self_sv = ST(0);')
845             ->line('HV *ua_hv = (HV *)SvRV(self_sv);')
846             ->blank
847             ->comment('Get the _async_pending array')
848             ->line('SV **pending_svp = hv_fetch(ua_hv, "_async_pending", 14, 0);')
849             ->if('!pending_svp || !SvROK(*pending_svp)')
850             ->line('ST(0) = sv_2mortal(newSViv(0));')
851             ->line('XSRETURN(1);')
852             ->endif
853             ->blank
854             ->line('AV *pending_av = (AV *)SvRV(*pending_svp);')
855             ->blank
856             ->comment('Main tick loop - process until no pending or no progress')
857             ->line('int total_completed = 0;')
858             ->line('int iterations = 0;')
859             ->line('int max_iterations = 1000; /* Safety cap */')
860             ->blank
861             ->line('tick_loop:')
862             ->line('{')
863             ->line(' I32 len = av_len(pending_av) + 1;')
864             ->line(' if (len == 0 || iterations++ >= max_iterations) {')
865             ->line(' ST(0) = sv_2mortal(newSViv(len));')
866             ->line(' XSRETURN(1);')
867             ->line(' }')
868             ->blank
869             ->comment(' Collect slots from pending array')
870             ->line(' int slots[MAX_ASYNC_CONTEXTS];')
871             ->line(' int slot_count = 0;')
872             ->blank
873             ->line(' for (j = 0; j < len; j++) {')
874             ->line(' SV **slot_svp = av_fetch(pending_av, j, 0);')
875             ->line(' if (!slot_svp) continue;')
876             ->line(' int slot = SvIV(*slot_svp);')
877             ->line(' if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
878             ->line(' AsyncContext *ctx = &async_registry[slot];')
879             ->line(' if (ctx->in_use && ctx->fd >= 0) {')
880             ->line(' slots[slot_count++] = slot;')
881             ->line(' }')
882             ->line(' }')
883             ->line(' }')
884             ->blank;
885            
886             # Create event loop using the backend
887 7         32 $builder->comment('Create event loop if needed')
888             ->line('if (async_ev_fd < 0) {');
889 7         52 $event_backend->gen_create_loop($builder, 'async_ev_fd');
890 7         30 $builder->line('}')
891             ->blank;
892            
893             # Register all fds - use backend's native slot-tracking struct.
894             # See note in gen_xs_poll_batch above: io_uring overrides this to
895             # epoll_event because its slot helpers run on a private epoll fd.
896 7         38 my $event_struct = $event_backend->slot_event_struct;
897            
898 7         148 $builder->comment('Register all fds with event loop')
899             ->line('int change_count = 0;')
900             ->blank
901             ->line('for (i = 0; i < slot_count; i++) {')
902             ->line(' int slot = slots[i];')
903             ->line(' AsyncContext *ctx = &async_registry[slot];')
904             ->line(' int events = async_poll_one(slot);')
905             ->line(' if (events == ASYNC_WAIT_NONE) continue;')
906             ->blank
907             ->line(' if (events == ASYNC_WAIT_READ) {');
908 7         23 $event_backend->gen_add_with_slot($builder, 'async_ev_fd', 'ctx->fd', 'slot', 'read');
909 7         19 $builder->line(' } else {');
910 7         29 $event_backend->gen_add_with_slot($builder, 'async_ev_fd', 'ctx->fd', 'slot', 'write');
911 7         37 $builder->line(' }')
912             ->line(' change_count++;')
913             ->line('}')
914             ->blank;
915            
916             # Wait for events
917 7         38 $builder->comment('Wait for events (1ms timeout)')
918             ->line("struct $event_struct ready_events[MAX_EVENTS];");
919 7         23 $event_backend->gen_wait_once($builder, 'async_ev_fd', 'ready_events', 'nev', '1');
920            
921             # Process ready events
922 7         30 $builder->blank
923             ->comment('Process ready events in pure C')
924             ->line('for (i = 0; i < nev; i++) {');
925 7         20 $event_backend->gen_get_slot($builder, 'ready_events', 'i', 'slot');
926 7         534 $builder->line(' if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
927             ->line(' async_advance_state(slot);')
928             ->line(' }')
929             ->line('}')
930             ->blank
931             ->comment('Check for completed slots and resolve futures')
932             ->line('int completed = 0;')
933             ->line('for (j = len - 1; j >= 0; j--) {')
934             ->line(' SV **slot_svp = av_fetch(pending_av, j, 0);')
935             ->line(' if (!slot_svp) continue;')
936             ->line(' int slot = SvIV(*slot_svp);')
937             ->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) continue;')
938             ->blank
939             ->line(' AsyncContext *ctx = &async_registry[slot];')
940             ->line(' if (!ctx->in_use) continue;')
941             ->blank
942             ->line(' if (ctx->state == ASYNC_STATE_DONE || ctx->state == ASYNC_STATE_ERROR) {')
943             ->comment(' Resolve future')
944             ->line(' if (ctx->future_sv && SvOK(ctx->future_sv)) {')
945             ->line(' dSP;')
946             ->line(' ENTER; SAVETMPS;')
947             ->line(' PUSHMARK(SP);')
948             ->line(' XPUSHs(ctx->future_sv);')
949             ->blank
950             ->line(' if (ctx->state == ASYNC_STATE_DONE && ctx->recv_buffer) {')
951             ->line(' XPUSHs(sv_2mortal(newSVpv(ctx->recv_buffer, ctx->recv_buffer_len)));')
952             ->line(' PUTBACK;')
953             ->line(' call_method("done", G_DISCARD);')
954             ->line(' } else {')
955             ->line(' XPUSHs(sv_2mortal(newSVpv(ctx->error ? ctx->error : "unknown error", 0)));')
956             ->line(' PUTBACK;')
957             ->line(' call_method("fail", G_DISCARD);')
958             ->line(' }')
959             ->line(' FREETMPS; LEAVE;')
960             ->line(' }')
961             ->blank
962             ->comment(' Cleanup slot')
963             ->line(' async_free_slot(slot);')
964             ->line(' av_delete(pending_av, j, G_DISCARD);')
965             ->line(' completed++;')
966             ->line(' }')
967             ->line('}')
968             ->blank
969             ->line(' total_completed += completed;')
970             ->blank
971             ->comment(' If we made progress AND still have pending, loop back immediately')
972             ->comment(' This drives requests to completion without manual tick() calls')
973             ->line(' I32 remaining = av_len(pending_av) + 1;')
974             ->line(' if (remaining > 0 && (completed > 0 || nev > 0)) {')
975             ->line(' goto tick_loop;')
976             ->line(' }')
977             ->line('}')
978             ->blank
979             ->comment('Return remaining pending count (0 = all done)')
980             ->line('I32 final_remaining = av_len(pending_av) + 1;')
981             ->line('ST(0) = sv_2mortal(newSViv(final_remaining));')
982             ->xs_return('1')
983             ->xs_end
984             ->blank;
985             }
986              
987             1;
988              
989             __END__