File Coverage

feersum_psgi.c.inc
Criterion Covered Total %
statement 501 624 80.2
branch 323 556 58.0
condition n/a
subroutine n/a
pod n/a
total 824 1180 69.8


line stmt bran cond sub pod time code
1              
2             // Extract first IP from X-Forwarded-For header (leftmost = original client)
3             static SV*
4 18           extract_forwarded_addr(pTHX_ struct feer_req *r)
5             {
6             size_t val_len;
7 18           const char *val = find_header_value(r, "x-forwarded-for", 15, &val_len);
8 18 100         if (!val || val_len == 0) return NULL;
    50          
9              
10             // Skip leading whitespace
11 15 50         while (val_len > 0 && (*val == ' ' || *val == '\t')) { val++; val_len--; }
    50          
    50          
12 15 50         if (val_len == 0) return NULL;
13              
14             // Find end of first IP (comma or space or end)
15 15           size_t ip_len = 0;
16 187 100         while (ip_len < val_len && val[ip_len] != ',' && val[ip_len] != ' ') ip_len++;
    100          
    100          
17              
18 15 50         if (ip_len == 0 || ip_len > 45) return NULL; // max IPv6 length is 45 chars
    50          
19              
20             // Copy to null-terminated buffer for inet_pton validation
21             char ip_buf[46];
22 15           memcpy(ip_buf, val, ip_len);
23 15           ip_buf[ip_len] = '\0';
24              
25             // Validate as IPv4 or IPv6 address using inet_pton
26             struct in_addr addr4;
27             struct in6_addr addr6;
28 15 100         if (inet_pton(AF_INET, ip_buf, &addr4) == 1) {
29 9           return newSVpvn(val, ip_len); // valid IPv4
30             }
31 6 100         if (inet_pton(AF_INET6, ip_buf, &addr6) == 1) {
32 2           return newSVpvn(val, ip_len); // valid IPv6
33             }
34              
35             // Not a valid IP address - return NULL (caller will use original REMOTE_ADDR)
36             trace("X-Forwarded-For contains invalid IP: %s\n", ip_buf);
37 4           return NULL;
38             }
39              
40             static SV*
41 17           extract_forwarded_proto(pTHX_ struct feer_req *r)
42             {
43             size_t val_len;
44 17           const char *val = find_header_value(r, "x-forwarded-proto", 17, &val_len);
45 17 100         if (!val || val_len == 0) return NULL;
    50          
46              
47             // Skip whitespace
48 7 50         while (val_len > 0 && (*val == ' ' || *val == '\t')) { val++; val_len--; }
    50          
    50          
49              
50             // Check for exact https/http (reject "httpx", "https2", etc.)
51 7 50         if (val_len >= 5 && str_case_eq_fixed("https", val, 5) &&
    50          
52 7 100         (val_len == 5 || val[5] == ' ' || val[5] == '\t' || val[5] == ','))
    50          
    50          
    50          
53 7           return newSVpvs("https");
54 0 0         if (val_len >= 4 && str_case_eq_fixed("http", val, 4) &&
    0          
55 0 0         (val_len == 4 || val[4] == ' ' || val[4] == '\t' || val[4] == ','))
    0          
    0          
    0          
56 0           return newSVpvs("http");
57              
58 0           return NULL;
59             }
60              
61             /* Determine the URL scheme for a connection.
62             * Returns a new SV ("https" or forwarded proto), or NULL for default "http". */
63             static SV *
64 271           feer_determine_url_scheme(pTHX_ struct feer_conn *c)
65             {
66             #ifdef FEERSUM_HAS_H2
67             /* H2 requires TLS (ALPN), so scheme is always https.
68             * The :scheme pseudo-header is validated but not propagated. */
69             if (c->is_h2_stream) return newSVpvs("https");
70             #endif
71             #ifdef FEERSUM_HAS_TLS
72 271 100         if (c->tls) return newSVpvs("https");
73             #endif
74 233 100         if (c->proxy_ssl) return newSVpvs("https");
75 231 100         if (c->proxy_proto_version > 0 && c->proxy_dst_port == 443)
    100          
76 5           return newSVpvs("https");
77 226 100         if (c->cached_use_reverse_proxy && c->req) {
    50          
78 17           SV *fwd = extract_forwarded_proto(aTHX_ c->req);
79 17 100         if (fwd) return fwd;
80             }
81 219           return NULL;
82             }
83              
84             // Initialize PSGI env constants (called once at startup)
85             static void
86 49           feersum_init_psgi_env_constants(pTHX)
87             {
88 49 50         if (psgi_env_version) return; // already initialized
89              
90             // Only share truly immutable values that middleware will never modify
91 49           psgi_env_version = newRV((SV*)psgi_ver);
92 49           psgi_env_errors = newRV((SV*)PL_stderrgv);
93             }
94              
95             // Build PSGI env hash directly (optimized - no template clone)
96             static HV*
97 260           feersum_build_psgi_env(pTHX)
98             {
99 260           HV *e = newHV();
100             // Pre-size hash: ~13 constants + ~10 per-request + ~15 headers = ~38
101 260           hv_ksplit(e, 48);
102              
103             // Truly immutable constants - safe to share via refcount
104 260           hv_stores(e, "psgi.version", SvREFCNT_inc_simple_NN(psgi_env_version));
105 260           hv_stores(e, "psgi.errors", SvREFCNT_inc_simple_NN(psgi_env_errors));
106              
107             // Boolean constants - PL_sv_yes/no are immortal and safe to share
108 260           hv_stores(e, "psgi.run_once", SvREFCNT_inc_simple_NN(&PL_sv_no));
109 260           hv_stores(e, "psgi.nonblocking", SvREFCNT_inc_simple_NN(&PL_sv_yes));
110 260           hv_stores(e, "psgi.multithread", SvREFCNT_inc_simple_NN(&PL_sv_no));
111 260           hv_stores(e, "psgi.multiprocess", SvREFCNT_inc_simple_NN(&PL_sv_no));
112 260           hv_stores(e, "psgi.streaming", SvREFCNT_inc_simple_NN(&PL_sv_yes));
113 260           hv_stores(e, "psgix.input.buffered", SvREFCNT_inc_simple_NN(&PL_sv_yes));
114 260           hv_stores(e, "psgix.output.buffered", SvREFCNT_inc_simple_NN(&PL_sv_yes));
115 260           hv_stores(e, "psgix.body.scalar_refs", SvREFCNT_inc_simple_NN(&PL_sv_yes));
116 260           hv_stores(e, "psgix.output.guard", SvREFCNT_inc_simple_NN(&PL_sv_yes));
117              
118 260           hv_stores(e, "SCRIPT_NAME", newSVpvs(""));
119              
120 260           return e;
121             }
122              
123             static HV*
124 260           feersum_env(pTHX_ struct feer_conn *c)
125             {
126             HV *e;
127             int i,j;
128 260           struct feer_req *r = c->req;
129              
130             // Initialize constants on first call
131 260 100         if (unlikely(!psgi_env_version))
132 49           feersum_init_psgi_env_constants(aTHX);
133              
134             // Build env hash directly instead of cloning template (2x faster)
135 260           e = feersum_build_psgi_env(aTHX);
136              
137             trace("generating header (fd %d) %.*s\n",
138             c->fd, (int)r->uri_len, r->uri);
139              
140             // SERVER_NAME and SERVER_PORT - copy because these SVs are SvREADONLY
141             // and middleware may modify in-place (e.g., Plack::Middleware::ReverseProxy).
142             {
143 260           struct feer_listen *conn_lsnr = c->listener;
144 260 50         hv_stores(e, "SERVER_NAME",
145             conn_lsnr->server_name ? newSVsv(conn_lsnr->server_name) : newSVpvs(""));
146 260 50         hv_stores(e, "SERVER_PORT",
147             conn_lsnr->server_port ? newSVsv(conn_lsnr->server_port) : newSVpvs("0"));
148             }
149 260           hv_stores(e, "REQUEST_URI", feersum_env_uri(aTHX_ r));
150             #ifdef FEERSUM_HAS_H2
151             hv_stores(e, "REQUEST_METHOD", feersum_env_method_h2(aTHX_ c, r));
152             if (unlikely(c->is_h2_stream))
153             hv_stores(e, "SERVER_PROTOCOL", newSVpvs("HTTP/2"));
154             else
155             #else
156 260           hv_stores(e, "REQUEST_METHOD", feersum_env_method(aTHX_ r));
157             #endif
158 260           hv_stores(e, "SERVER_PROTOCOL", SvREFCNT_inc_simple_NN(feersum_env_protocol(aTHX_ r)));
159              
160 260           feersum_set_conn_remote_info(aTHX_ c);
161              
162             // Reverse proxy mode: trust X-Forwarded-For for REMOTE_ADDR
163 260 100         if (c->cached_use_reverse_proxy) {
164 13           SV *fwd_addr = extract_forwarded_addr(aTHX_ r);
165 13 100         hv_stores(e, "REMOTE_ADDR", fwd_addr ? fwd_addr : SvREFCNT_inc_simple_NN(c->remote_addr));
166             } else {
167 247           hv_stores(e, "REMOTE_ADDR", SvREFCNT_inc_simple_NN(c->remote_addr));
168             }
169 260           hv_stores(e, "REMOTE_PORT", SvREFCNT_inc_simple_NN(c->remote_port));
170              
171             {
172 260           SV *scheme = feer_determine_url_scheme(aTHX_ c);
173 260 100         hv_stores(e, "psgi.url_scheme", scheme ? scheme : newSVpvs("http"));
174             }
175              
176 260           hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
177              
178             // Always provide psgi.input (for both PSGI and native handlers)
179             // For requests without body, it will be an empty stream (returns 0 on read)
180 260           hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c, 0));
181              
182 260 100         if (c->cached_request_cb_is_psgi && c->server->psgix_io) {
    50          
183 75           SV *fake_fh = newSViv(c->fd); // fd value for psgix.io magic backing SV
184 75           SV *selfref = sv_2mortal(feer_conn_2sv(c));
185 75           sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
186 75           hv_stores(e, "psgix.io", fake_fh);
187             }
188              
189 260 50         if (c->trailers) {
190 0           hv_stores(e, "psgix.h2.trailers", newRV_inc((SV*)c->trailers));
191             }
192              
193 260 100         if (c->proxy_tlvs) {
194 1           hv_stores(e, "psgix.proxy_tlvs", SvREFCNT_inc_simple_NN(c->proxy_tlvs));
195             }
196              
197 260 100         if (likely(!r->path)) feersum_set_path_and_query(aTHX_ r);
198 260           hv_stores(e, "PATH_INFO", SvREFCNT_inc_simple_NN(r->path));
199 260           hv_stores(e, "QUERY_STRING", SvREFCNT_inc_simple_NN(r->query));
200              
201 260           SV *cur_val = NULL; // tracks current header value for multi-value header merging
202 260           char *kbuf = header_key_buf; // use static buffer (pre-initialized with "HTTP_")
203              
204 1026 100         for (i=0; inum_headers; i++) {
205 766           struct phr_header *hdr = &(r->headers[i]);
206             // Note: obs-fold (hdr->name == NULL) is rejected at parse time per RFC 7230
207 801           if (unlikely(hdr->name_len == 14) &&
208 35           str_case_eq_fixed("content-length", hdr->name, 14))
209             {
210             // content length shouldn't show up as HTTP_CONTENT_LENGTH but
211             // as CONTENT_LENGTH in the env-hash.
212 35           continue;
213             }
214 741           else if (unlikely(hdr->name_len == 12) &&
215 10           str_case_eq_fixed("content-type", hdr->name, 12))
216             {
217 7           cur_val = newSVpvn(hdr->value, hdr->value_len);
218 7           hv_stores(e, "CONTENT_TYPE", cur_val);
219 7           continue;
220             }
221              
222             // Skip headers with names too long for our buffer (defensive - should be
223             // rejected at parse time with 431, but guard against edge cases)
224 724 50         if (unlikely(hdr->name_len > MAX_HEADER_NAME_LEN)) {
225             trace("skipping oversized header name (len=%zu) on fd %d\n",
226             hdr->name_len, c->fd);
227 0           continue;
228             }
229              
230 724           size_t klen = 5+hdr->name_len;
231 724           char *key = kbuf + 5;
232 6944 100         for (j=0; jname_len; j++) {
233             // Use combined lookup table (uppercase + dash-to-underscore)
234 6220           *key++ = ascii_upper_dash[(unsigned char)hdr->name[j]];
235             }
236              
237 724           SV **fetched = hv_fetch(e, kbuf, klen, 1);
238             trace("adding header to env (fd %d) %.*s: %.*s\n",
239             c->fd, (int)klen, kbuf, (int)hdr->value_len, hdr->value);
240              
241             // hv_fetch with lval=1 should always succeed, but check for OOM safety
242 724 50         if (unlikely(fetched == NULL)) {
243             trace("hv_fetch returned NULL (OOM?) on fd %d\n", c->fd);
244 0           continue;
245             }
246 724           cur_val = *fetched; // track for multi-value header merging
247 724 100         if (unlikely(SvPOK(cur_val))) {
248             trace("... is multivalue\n");
249             // extend header with comma
250 3           sv_catpvn(cur_val, ", ", 2);
251 3           sv_catpvn(cur_val, hdr->value, hdr->value_len);
252             }
253             else {
254             // change from undef to a real value
255 721           sv_setpvn(cur_val, hdr->value, hdr->value_len);
256             }
257             }
258              
259             #ifdef FEERSUM_HAS_H2
260             /* Map :authority pseudo-header to HTTP_HOST for H2 streams (RFC 9113 §8.3.1).
261             * Only set if no regular Host header was already present. */
262             if (unlikely(c->is_h2_stream)) {
263             struct feer_h2_stream *stream = (struct feer_h2_stream *)c->read_ev_timer.data;
264             if (stream && stream->h2_authority && !hv_exists(e, "HTTP_HOST", 9)) {
265             STRLEN alen;
266             const char *aval = SvPV(stream->h2_authority, alen);
267             hv_stores(e, "HTTP_HOST", newSVpvn(aval, alen));
268             }
269             /* Extended CONNECT (RFC 8441): REQUEST_METHOD already set to GET
270             * above; add remaining H1-equivalent headers so existing PSGI
271             * WebSocket middleware works transparently.
272             * Matches HAProxy/nghttpx H2↔H1 upgrade translation. */
273             if (stream && stream->is_tunnel && stream->h2_protocol) {
274             STRLEN plen;
275             const char *pval = SvPV(stream->h2_protocol, plen);
276             hv_stores(e, "HTTP_UPGRADE", newSVpvn(pval, plen));
277             hv_stores(e, "HTTP_CONNECTION", newSVpvs("Upgrade"));
278             hv_stores(e, "psgix.h2.protocol", newSVpvn(pval, plen));
279             hv_stores(e, "psgix.h2.extended_connect", newSViv(1));
280             }
281             }
282             #endif
283              
284 260           return e;
285             }
286              
287             #define COPY_NORM_HEADER(_str) \
288             for (i = 0; i < r->num_headers; i++) {\
289             struct phr_header *hdr = &(r->headers[i]);\
290             /* Invariant: obs-fold and oversized names already rejected at parse time */\
291             if (unlikely(hdr->name_len > MAX_HEADER_NAME_LEN)) continue; /* defense-in-depth */\
292             char *k = kbuf;\
293             for (j = 0; j < hdr->name_len; j++) { char n = hdr->name[j]; *k++ = _str; }\
294             SV** val = hv_fetch(e, kbuf, hdr->name_len, 1);\
295             if (unlikely(!val)) continue; /* OOM safety */\
296             if (unlikely(SvPOK(*val))) {\
297             sv_catpvn(*val, ", ", 2);\
298             sv_catpvn(*val, hdr->value, hdr->value_len);\
299             } else {\
300             sv_setpvn(*val, hdr->value, hdr->value_len);\
301             }\
302             }\
303             break;
304              
305             // Static buffer for feersum_env_headers (reuses header_key_buf area after HTTP_ prefix)
306             static HV*
307 6           feersum_env_headers(pTHX_ struct feer_req *r, int norm)
308             {
309             size_t i; size_t j; HV* e;
310 6           e = newHV();
311             // Pre-allocate hash buckets based on expected header count to avoid rehashing
312 6 50         if (r->num_headers > 0)
313 6           hv_ksplit(e, r->num_headers);
314 6           char *kbuf = header_key_buf + 5; // reuse static buffer, skip the "HTTP_" prefix area
315 6           switch (norm) {
316 1           case HEADER_NORM_SKIP:
317 26 50         COPY_NORM_HEADER(n)
    100          
    50          
    50          
    100          
318 1           case HEADER_NORM_LOCASE:
319 26 50         COPY_NORM_HEADER(ascii_lower[(unsigned char)n])
    100          
    50          
    50          
    100          
320 1           case HEADER_NORM_UPCASE:
321 26 50         COPY_NORM_HEADER(ascii_upper[(unsigned char)n])
    100          
    50          
    50          
    100          
322 2           case HEADER_NORM_LOCASE_DASH:
323 82 50         COPY_NORM_HEADER(ascii_lower_dash[(unsigned char)n])
    100          
    50          
    50          
    100          
324 1           case HEADER_NORM_UPCASE_DASH:
325 26 50         COPY_NORM_HEADER(ascii_upper_dash[(unsigned char)n])
    100          
    50          
    50          
    100          
326 0           default:
327 0           break;
328             }
329 6           return e;
330             }
331              
332             INLINE_UNLESS_DEBUG static SV*
333 5           feersum_env_header(pTHX_ struct feer_req *r, SV *name)
334             {
335             size_t i;
336 11 50         for (i = 0; i < r->num_headers; i++) {
337 11           struct phr_header *hdr = &(r->headers[i]);
338             // Note: continuation headers (name == NULL) are rejected at parse time
339 11 100         if (unlikely(hdr->name_len == SvCUR(name)
    50          
340             && str_case_eq_both(SvPVX(name), hdr->name, hdr->name_len))) {
341 5           return newSVpvn(hdr->value, hdr->value_len);
342             }
343             }
344 0           return &PL_sv_undef;
345             }
346              
347             INLINE_UNLESS_DEBUG static ssize_t
348 16           feersum_env_content_length(pTHX_ struct feer_conn *c)
349             {
350 16           return c->expected_cl;
351             }
352              
353             static SV*
354 7           feersum_env_io(pTHX_ struct feer_conn *c)
355             {
356 7           dSP;
357              
358             // Prevent double-call: io() can only be called once per connection
359 7 100         if (unlikely(c->io_taken))
360 1           croak("io() already called on this connection");
361              
362             trace("feersum_env_io for fd=%d\n", c->fd);
363              
364             #ifdef FEERSUM_HAS_H2
365             /* H2 tunnel: auto-accept, create socketpair, expose sv[1] as IO handle */
366             if (c->is_h2_stream) {
367             struct feer_h2_stream *stream = (struct feer_h2_stream *)c->read_ev_timer.data;
368             if (!stream || !stream->is_tunnel)
369             croak("io() is not supported on regular HTTP/2 streams");
370             h2_tunnel_auto_accept(aTHX_ c, stream);
371             /* Re-fetch stream: auto_accept → session_send may have freed it */
372             stream = (struct feer_h2_stream *)c->read_ev_timer.data;
373             if (!stream)
374             croak("io() tunnel: stream freed during auto-accept");
375             feer_h2_setup_tunnel(aTHX_ stream);
376             if (!stream->tunnel_established)
377             croak("Failed to create tunnel socketpair");
378              
379             SV *sv = newSViv(stream->tunnel_sv1);
380              
381             ENTER;
382             SAVETMPS;
383             PUSHMARK(SP);
384             XPUSHs(sv);
385             mXPUSHs(newSViv(stream->tunnel_sv1));
386             PUTBACK;
387              
388             call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
389             SPAGAIN;
390              
391             if (unlikely(SvTRUE(ERRSV))) {
392             FREETMPS;
393             LEAVE;
394             SvREFCNT_dec(sv);
395             croak("Failed to create tunnel IO handle: %-p", ERRSV);
396             }
397              
398             if (unlikely(!SvROK(sv))) {
399             /* _raw failed: new_from_fd returned undef.
400             * Leave tunnel_sv1 intact so feer_h2_stream_free closes it. */
401             FREETMPS;
402             LEAVE;
403             SvREFCNT_dec(sv);
404             croak("Failed to create tunnel IO handle");
405             }
406              
407             SV *io_glob = SvRV(sv);
408             GvSV(io_glob) = newRV_inc(c->self);
409              
410             /* sv[1] now owned by the IO handle */
411             stream->tunnel_sv1 = -1;
412             c->io_taken = 1;
413              
414             FREETMPS;
415             LEAVE;
416             return sv;
417             }
418             #endif
419              
420             #ifdef FEERSUM_HAS_TLS
421             /* TLS tunnel: create socketpair relay for bidirectional I/O over TLS */
422 6 100         if (c->tls) {
423 3           feer_tls_setup_tunnel(c);
424 3 50         if (!c->tls_tunnel)
425 0           croak("Failed to create TLS tunnel socketpair");
426              
427 3           SV *sv = newSViv(c->tls_tunnel_sv1);
428              
429 3           ENTER;
430 3           SAVETMPS;
431 3 50         PUSHMARK(SP);
432 3 50         XPUSHs(sv);
433 3 50         mXPUSHs(newSViv(c->tls_tunnel_sv1));
434 3           PUTBACK;
435              
436 3           call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
437 3           SPAGAIN;
438              
439 3 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
440 0 0         FREETMPS;
441 0           LEAVE;
442 0           SvREFCNT_dec(sv);
443 0 0         croak("Failed to create TLS tunnel IO handle: %-p", ERRSV);
444             }
445              
446 3 50         if (unlikely(!SvROK(sv))) {
447             /* _raw failed: new_from_fd returned undef.
448             * Leave tls_tunnel_sv1 intact so feer_tls_free_conn closes it. */
449 0 0         FREETMPS;
450 0           LEAVE;
451 0           SvREFCNT_dec(sv);
452 0           croak("Failed to create TLS tunnel IO handle");
453             }
454              
455 3           SV *io_glob = SvRV(sv);
456 3           GvSV(io_glob) = newRV_inc(c->self);
457              
458             /* sv[1] now owned by the IO handle */
459 3           c->tls_tunnel_sv1 = -1;
460 3           c->io_taken = 1;
461 3           stop_read_timer(c);
462 3           stop_write_timer(c);
463              
464             /* Keep read watcher active — TLS reads relay to tunnel */
465              
466 3 50         FREETMPS;
467 3           LEAVE;
468 3           return sv;
469             }
470             #endif
471              
472             // Create a scalar to hold the IO handle
473 3           SV *sv = newSViv(c->fd);
474              
475 3           ENTER;
476 3           SAVETMPS;
477              
478 3 50         PUSHMARK(SP);
479 3 50         XPUSHs(sv);
480 3 50         mXPUSHs(newSViv(c->fd));
481 3           PUTBACK;
482              
483             // Call Feersum::Connection::_raw to create IO::Socket::INET
484 3           call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
485 3           SPAGAIN;
486              
487 3 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
488 0 0         FREETMPS;
489 0           LEAVE;
490 0           SvREFCNT_dec(sv);
491 0 0         croak("Failed to create IO handle: %-p", ERRSV);
492             }
493              
494             // Verify _raw created a valid reference
495 3 50         if (unlikely(!SvROK(sv))) {
496 0 0         FREETMPS;
497 0           LEAVE;
498 0           SvREFCNT_dec(sv);
499 0           croak("Failed to create IO handle: new_from_fd returned undef");
500             }
501              
502             // Store back-reference to connection in the glob's scalar slot
503 3           SV *io_glob = SvRV(sv);
504 3           GvSV(io_glob) = newRV_inc(c->self);
505              
506             // Push any remaining rbuf data into the socket buffer
507 3 50         if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
    50          
    50          
    100          
508             STRLEN rbuf_len;
509 1           const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
510 1           IO *io = GvIOp(io_glob);
511 1 50         if (io) {
512 1           SSize_t pushed = PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
513 1 50         if (likely(pushed == (SSize_t)rbuf_len)) {
514 1           SvCUR_set(c->rbuf, 0);
515 1           *SvPVX(c->rbuf) = '\0';
516 0 0         } else if (pushed > 0) {
517 0           sv_chop(c->rbuf, rbuf_ptr + pushed);
518 0           trouble("PerlIO_unread partial in io(): %zd of %"Sz_uf" bytes fd=%d\n",
519             pushed, (Sz)rbuf_len, c->fd);
520             } else {
521 0           trouble("PerlIO_unread failed in io() fd=%d\n", c->fd);
522             }
523             }
524             }
525              
526             // Stop Feersum's watchers - user now owns the socket
527 3           stop_read_watcher(c);
528 3           stop_read_timer(c);
529 3           stop_write_timer(c);
530             // don't stop write watcher in case there's outstanding data
531              
532             // Mark that io() was called
533 3           c->io_taken = 1;
534              
535 3 50         FREETMPS;
536 3           LEAVE;
537              
538 3           return sv;
539             }
540              
541             static SSize_t
542 3           feersum_return_from_io(pTHX_ struct feer_conn *c, SV *io_sv, const char *func_name)
543             {
544             #ifdef FEERSUM_HAS_H2
545             if (unlikely(c->is_h2_stream))
546             croak("%s: not supported on HTTP/2 streams", func_name);
547             #endif
548              
549 3 50         if (!SvROK(io_sv) || !isGV_with_GP(SvRV(io_sv)))
    50          
    50          
    0          
550 0           croak("%s requires a filehandle", func_name);
551              
552 3           GV *gv = (GV *)SvRV(io_sv);
553 3 50         IO *io = GvIO(gv);
    50          
    0          
    50          
554 3 50         if (!io || !IoIFP(io))
    50          
555 0           croak("%s: invalid filehandle", func_name);
556              
557 3           PerlIO *fp = IoIFP(io);
558              
559             // Check if there's buffered data to pull back
560 3           SSize_t cnt = PerlIO_get_cnt(fp);
561 3 50         if (cnt > 0) {
562             // Get pointer to buffered data
563             // Note: ptr remains valid until next PerlIO operation on fp.
564             // sv_catpvn doesn't touch fp, so this is safe.
565 0           STDCHAR *ptr = PerlIO_get_ptr(fp);
566 0 0         if (ptr) {
567             // Ensure we have an rbuf
568 0 0         if (!c->rbuf)
569 0           c->rbuf = newSV(READ_BUFSZ);
570              
571             // Append buffered data to feersum's rbuf
572 0           sv_catpvn(c->rbuf, (const char *)ptr, cnt);
573              
574             // Mark buffer as consumed (must happen before any other PerlIO ops)
575 0           PerlIO_set_ptrcnt(fp, ptr + cnt, 0);
576              
577             trace("pulled %zd bytes back to feersum fd=%d\n", (size_t)cnt, c->fd);
578             }
579             }
580              
581             /* Reset connection state for next request (like keepalive reset) */
582 3           change_responding_state(c, RESPOND_NOT_STARTED);
583 3           change_receiving_state(c, RECEIVE_HEADERS);
584 3           c->expected_cl = 0;
585 3           c->received_cl = 0;
586 3           c->io_taken = 0;
587 3           free_request(c);
588              
589 3 50         if (c->rbuf && cnt <= 0)
    50          
590 3           SvCUR_set(c->rbuf, 0);
591              
592 3           start_read_watcher(c);
593 3           restart_read_timer(c);
594 3           restart_header_timer(c);
595              
596 3           return cnt > 0 ? cnt : 0;
597             }
598              
599             static void
600 469           feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
601             int streaming)
602             {
603             const char *ptr;
604             I32 i;
605              
606             trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
607              
608 469 50         if (unlikely(!SvOK(message) || !(SvIOK(message) || SvPOK(message)))) {
    100          
    50          
    50          
609 0           croak("Must define an HTTP status code or message");
610             }
611              
612 469           I32 avl = av_len(headers);
613 469 50         if (unlikely((avl+1) % 2 == 1)) {
614 0           croak("expected even-length array, got %d", avl+1);
615             }
616              
617             #ifdef FEERSUM_HAS_H2
618             if (unlikely(c->is_h2_stream)) {
619             feersum_h2_start_response(aTHX_ c, message, headers, streaming);
620             return;
621             }
622             #endif
623              
624 469 100         if (unlikely(c->responding != RESPOND_NOT_STARTED))
625 4           croak("already responding?!");
626 465 100         change_responding_state(c, streaming ? RESPOND_STREAMING : RESPOND_NORMAL);
627              
628             // int or 3 chars? use a stock message
629 465           UV code = 0;
630 465 100         if (SvIOK(message))
631 388           code = SvIV(message);
632             else {
633 77           STRLEN mlen = SvCUR(message);
634 77           const int numtype = grok_number(SvPVX_const(message), mlen > 3 ? 3 : mlen, &code);
635 77 50         if (unlikely(numtype != IS_NUMBER_IN_UV))
636 0           code = 0;
637             }
638             trace2("starting response fd=%d code=%"UVuf"\n",c->fd,code);
639              
640 465 50         if (unlikely(!code))
641 0           croak("first parameter is not a number or doesn't start with digits");
642              
643             if (FEERSUM_RESP_START_ENABLED()) {
644             FEERSUM_RESP_START(c->fd, (int)code);
645             }
646              
647             // for PSGI it's always just an IV so optimize for that
648 465 100         if (likely(!SvPOK(message) || SvCUR(message) == 3)) {
    100          
649             // Use cached status SVs for common codes to avoid newSVpvf overhead
650 389           switch (code) {
651 387           case 200: message = status_200; break;
652 0           case 201: message = status_201; break;
653 1           case 204: message = status_204; break;
654 0           case 301: message = status_301; break;
655 0           case 302: message = status_302; break;
656 1           case 304: message = status_304; break;
657 0           case 400: message = status_400; break;
658 0           case 404: message = status_404; break;
659 0           case 500: message = status_500; break;
660 0           default:
661 0           ptr = http_code_to_msg(code);
662 0           message = sv_2mortal(newSVpvf("%"UVuf" %s",code,ptr));
663 0           break;
664             }
665             }
666              
667             // don't generate or strip Content-Length headers for responses that MUST NOT have a body
668             // RFC 7230: 1xx, 204, 205, 304 responses MUST NOT contain a message body
669 464 50         c->auto_cl = (code == 204 || code == 205 || code == 304 ||
    100          
670 929 100         (100 <= code && code <= 199)) ? 0 : 1;
    50          
    50          
671              
672 465 100         add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
673 465           add_sv_to_wbuf(c, message);
674 465           add_crlf_to_wbuf(c);
675              
676 465           bool has_content_length = 0;
677 465           SV **ary = AvARRAY(headers);
678 1064 100         for (i=0; i
679 599           SV *hdr = ary[i];
680 599           SV *val = ary[i+1];
681 599 50         if (unlikely(!hdr || !SvOK(hdr))) {
    50          
682             trace("skipping undef header key");
683 56           continue;
684             }
685 599 50         if (unlikely(!val || !SvOK(val))) {
    50          
686             trace("skipping undef header value");
687 0           continue;
688             }
689              
690             STRLEN hlen;
691 599           const char *hp = SvPV(hdr, hlen);
692 599 100         if (unlikely(hlen == 14) && str_case_eq_fixed("content-length", hp, 14)) {
    50          
693 65 100         if (likely(c->auto_cl) && !streaming) {
    100          
694             trace("ignoring content-length header in the response\n");
695 56           continue;
696             }
697             // In streaming mode, keep Content-Length (for sendfile support)
698 9           has_content_length = 1;
699             }
700              
701 543           add_sv_to_wbuf(c, hdr);
702 543           add_const_to_wbuf(c, ": ", 2);
703 543           add_sv_to_wbuf(c, val);
704 543           add_crlf_to_wbuf(c);
705             }
706              
707 465 100         if (likely(c->is_http11)) {
708 449           add_const_to_wbuf(c, DATE_BUF, DATE_HEADER_LENGTH);
709 449 100         if (!c->is_keepalive)
710 322           add_const_to_wbuf(c, "Connection: close" CRLF, 19);
711 16 100         } else if (c->is_keepalive && !streaming)
    100          
712 1           add_const_to_wbuf(c, "Connection: keep-alive" CRLF, 24);
713              
714 465 100         if (streaming) {
715             // Use chunked encoding only if no Content-Length provided
716             // (Content-Length is used with sendfile for zero-copy file transfer)
717             // Skip chunked for 1xx responses (e.g. 101 Switching Protocols)
718 57 100         if (c->is_http11 && !has_content_length && code >= 200) {
    100          
    50          
719 40           add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
720 40           c->use_chunked = 1;
721             }
722             else {
723 17           add_crlf_to_wbuf(c);
724 17           c->use_chunked = 0;
725             // cant do keep-alive for streaming http/1.0 since client completes read on close
726 17 100         if (c->is_keepalive && !has_content_length) c->is_keepalive = 0;
    50          
727             }
728             }
729              
730             // For streaming responses, start writing headers immediately.
731             // For non-streaming (RESPOND_NORMAL), feersum_write_whole_body will
732             // call conn_write_ready after the body is buffered. This is critical
733             // because conn_write_ready triggers immediate writes and would
734             // prematurely finish the response before body is ready.
735 465 100         if (streaming)
736 57           conn_write_ready(c);
737 465           }
738              
739             static size_t
740 408           feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
741             {
742             size_t RETVAL;
743             I32 i;
744 408           bool body_is_string = 0;
745             STRLEN cur;
746              
747 408 50         if (c->responding != RESPOND_NORMAL)
748 0           croak("can't use write_whole_body when in streaming mode");
749              
750             #ifdef FEERSUM_HAS_H2
751             if (unlikely(c->is_h2_stream)) {
752             /* H2 streams use nghttp2 submit_response, not wbuf */
753             SV *body_sv;
754             if (!SvOK(body)) {
755             body_sv = sv_2mortal(newSVpvs(""));
756             } else if (SvROK(body)) {
757             SV *refd = SvRV(body);
758             if (SvOK(refd) && !SvROK(refd)) {
759             body_sv = refd;
760             } else if (SvTYPE(refd) == SVt_PVAV) {
761             AV *ab = (AV*)refd;
762             body_sv = sv_2mortal(newSVpvs(""));
763             I32 amax = av_len(ab);
764             for (i = 0; i <= amax; i++) {
765             SV *sv = fetch_av_normal(aTHX_ ab, i);
766             if (sv) sv_catsv(body_sv, sv);
767             }
768             } else {
769             croak("body must be a scalar, scalar reference or array reference");
770             }
771             } else {
772             body_sv = body;
773             }
774             return feersum_h2_write_whole_body(aTHX_ c, body_sv);
775             }
776             #endif
777              
778 408 50         if (!SvOK(body)) {
779 0           body = sv_2mortal(newSVpvs(""));
780 0           body_is_string = 1;
781             }
782 408 100         else if (SvROK(body)) {
783 291           SV *refd = SvRV(body);
784 291 100         if (SvOK(refd) && !SvROK(refd)) {
    50          
785 146           body = refd;
786 146           body_is_string = 1;
787             }
788 145 50         else if (SvTYPE(refd) != SVt_PVAV) {
789 0           croak("body must be a scalar, scalar reference or array reference");
790             }
791             }
792             else {
793 117           body_is_string = 1;
794             }
795              
796             SV *cl_sv; // content-length future
797             struct iovec *cl_iov;
798 408 100         if (likely(c->auto_cl))
799 405           add_placeholder_to_wbuf(c, &cl_sv, &cl_iov);
800             else
801 3           add_crlf_to_wbuf(c);
802              
803 408 100         if (body_is_string) {
804 263           cur = add_sv_to_wbuf(c,body);
805 263           RETVAL = cur;
806             }
807             else {
808 145           AV *abody = (AV*)SvRV(body);
809 145           I32 amax = av_len(abody);
810 145           RETVAL = 0;
811 305 100         for (i=0; i<=amax; i++) {
812 160           SV *sv = fetch_av_normal(aTHX_ abody, i);
813 160 100         if (unlikely(!sv)) continue;
814 159           cur = add_sv_to_wbuf(c,sv);
815             trace("body part i=%d sv=%p cur=%"Sz_uf"\n", i, sv, (Sz)cur);
816 159           RETVAL += cur;
817             }
818             }
819              
820 408 100         if (likely(c->auto_cl)) {
821             char cl_buf[48]; // 16 (prefix) + 20 (max uint64) + 4 (CRLF x2) + padding
822 405           int cl_len = format_content_length(cl_buf, RETVAL);
823 405           sv_setpvn(cl_sv, cl_buf, cl_len);
824 405           update_wbuf_placeholder(c, cl_sv, cl_iov);
825             }
826              
827 408           change_responding_state(c, RESPOND_SHUTDOWN);
828 408           conn_write_ready(c);
829 408           return RETVAL;
830             }
831              
832             static void
833 3           call_died (pTHX_ struct feer_conn *c, const char *cb_type)
834             {
835 3           dSP;
836             #if DEBUG >= 1
837             trace("An error was thrown in the %s callback: %-p\n",cb_type,ERRSV);
838             #endif
839 3 50         PUSHMARK(SP);
840 3 50         mXPUSHs(newSVsv(ERRSV));
    50          
841 3           PUTBACK;
842 3           call_pv("Feersum::DIED", G_DISCARD|G_EVAL|G_VOID);
843 3           SPAGAIN;
844              
845 3           respond_with_server_error(c, "Request handler exception\n", 500);
846 3 50         sv_setsv(ERRSV, &PL_sv_undef);
847 3           }
848              
849             static void
850 16           feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
851             {
852 16           dSP;
853 16           ENTER;
854 16           SAVETMPS;
855 16 50         PUSHMARK(SP);
856 16 50         mXPUSHs(feer_conn_2sv(c));
857 16 50         XPUSHs(streamer);
858 16           PUTBACK;
859 16           call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
860 16           SPAGAIN;
861 16 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
862 0           call_died(aTHX_ c, "PSGI stream initiator");
863             }
864 16           PUTBACK;
865 16 50         FREETMPS;
866 16           LEAVE;
867 16           }
868              
869             static void
870 78           feersum_handle_psgi_response(
871             pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
872             {
873 78 50         if (unlikely(!SvOK(ret) || !SvROK(ret))) {
    50          
874 0 0         sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
875 0           call_died(aTHX_ c, "PSGI request");
876 0           return;
877             }
878              
879 78 50         if (unlikely(!IsArrayRef(ret))) {
    100          
880 16 50         if (likely(can_recurse)) {
881             trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
882 16           feersum_start_psgi_streaming(aTHX_ c, ret);
883             }
884             else {
885 0 0         sv_setpvs(ERRSV, "PSGI attempt to recurse in a streaming callback");
886 0           call_died(aTHX_ c, "PSGI request");
887             }
888 16           return;
889             }
890              
891 62           AV *psgi_triplet = (AV*)SvRV(ret);
892 62 50         if (unlikely(av_len(psgi_triplet)+1 != 3)) {
893 0 0         sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
894 0           call_died(aTHX_ c, "PSGI request");
895 0           return;
896             }
897              
898             trace("PSGI response triplet, c=%p av=%p\n", c, psgi_triplet);
899             // we know there's three elems so *should* be safe to de-ref
900 62           SV **msg_p = av_fetch(psgi_triplet,0,0);
901 62           SV **hdrs_p = av_fetch(psgi_triplet,1,0);
902 62           SV **body_p = av_fetch(psgi_triplet,2,0);
903 62 50         if (unlikely(!msg_p || !hdrs_p || !body_p)) {
    50          
    50          
    50          
904 0 0         sv_setpvs(ERRSV, "Invalid PSGI array response (NULL element)");
905 0           call_died(aTHX_ c, "PSGI request");
906 0           return;
907             }
908 62           SV *msg = *msg_p;
909 62           SV *hdrs = *hdrs_p;
910 62           SV *body = *body_p;
911              
912             AV *headers;
913 62 50         if (IsArrayRef(hdrs))
    50          
914 62           headers = (AV*)SvRV(hdrs);
915             else {
916 0 0         sv_setpvs(ERRSV, "PSGI Headers must be an array-ref");
917 0           call_died(aTHX_ c, "PSGI request");
918 0           return;
919             }
920              
921 62 50         if (likely(IsArrayRef(body))) {
    100          
922 57           feersum_start_response(aTHX_ c, msg, headers, 0);
923 57           feersum_write_whole_body(aTHX_ c, body);
924             }
925 5 50         else if (likely(SvROK(body))) { // probably an IO::Handle-like object
926 5           feersum_start_response(aTHX_ c, msg, headers, 1);
927             #ifdef FEERSUM_HAS_H2
928             if (unlikely(c->is_h2_stream)) {
929             /* H2: drain the IO::Handle synchronously since there is no
930             * per-stream write watcher; nghttp2 handles flow control. */
931             pump_h2_io_handle(aTHX_ c, body);
932             } else
933             #endif
934             {
935 5           c->poll_write_cb = newSVsv(body);
936 5           c->poll_write_cb_is_io_handle = 1;
937 5           conn_write_ready(c);
938             }
939             }
940             else {
941 0 0         sv_setpvs(ERRSV, "Expected PSGI array-ref or IO::Handle-like body");
942 0           call_died(aTHX_ c, "PSGI request");
943 0           return;
944             }
945             }
946              
947             static int
948 58           feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
949             {
950             int RETVAL;
951 58 100         if (is_writer) {
952             trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
953 52 100         if (c->poll_write_cb) {
954 1           SV *tmp = c->poll_write_cb;
955 1           c->poll_write_cb = NULL; // NULL before dec: prevents re-entrant double-free
956 1           c->poll_write_cb_is_io_handle = 0;
957 1           SvREFCNT_dec(tmp);
958             }
959             #ifdef FEERSUM_HAS_H2
960             if (unlikely(c->is_h2_stream)) {
961             if (c->responding < RESPOND_SHUTDOWN) {
962             feersum_h2_close_write(aTHX_ c);
963             change_responding_state(c, RESPOND_SHUTDOWN);
964             }
965             } else
966             #endif
967 52 50         if (c->responding < RESPOND_SHUTDOWN) {
968 52           finish_wbuf(c); // only adds terminator if use_chunked is set
969 52           change_responding_state(c, RESPOND_SHUTDOWN);
970 52           conn_write_ready(c);
971             }
972 52           RETVAL = 1;
973             }
974             else {
975             trace("close reader fd=%d, c=%p\n", c->fd, c);
976 6 50         if (c->poll_read_cb) {
977 0           SV *tmp = c->poll_read_cb;
978 0           c->poll_read_cb = NULL;
979 0           SvREFCNT_dec(tmp);
980             }
981 6 100         if (c->rbuf) {
982 1           SvREFCNT_dec(c->rbuf);
983 1           c->rbuf = NULL;
984             }
985 6 50         if (c->fd >= 0
986             #ifdef FEERSUM_HAS_H2
987             && !c->is_h2_stream
988             #endif
989             )
990 6           RETVAL = shutdown(c->fd, SHUT_RD);
991             else
992 0           RETVAL = -1; // already closed or H2 stream
993 6           change_receiving_state(c, RECEIVE_SHUTDOWN);
994             }
995              
996             // disassociate the handle from the conn
997 58           SvREFCNT_dec(c->self);
998 58           return RETVAL;
999             }
1000              
1001             static SV*
1002 6           feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard)
1003             {
1004 6 100         if (guard) {
1005 4 100         if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
1006 4 50         c->ext_guard = SvOK(guard) ? newSVsv(guard) : NULL;
1007             }
1008 6 50         return c->ext_guard ? newSVsv(c->ext_guard) : &PL_sv_undef;
1009             }
1010              
1011             static void
1012 482           call_request_callback (struct feer_conn *c)
1013             {
1014             dTHX;
1015 482           dSP;
1016             int flags;
1017 482           struct feer_server *server = c->server;
1018 482           c->in_callback++;
1019 482           SvREFCNT_inc_void_NN(c->self);
1020 482           server->total_requests++;
1021              
1022             trace("request callback c=%p\n", c);
1023              
1024 482           ENTER;
1025 482           SAVETMPS;
1026 482 50         PUSHMARK(SP);
1027              
1028 482 100         if (server->request_cb_is_psgi) {
1029 75           HV *env = feersum_env(aTHX_ c);
1030 75 50         mXPUSHs(newRV_noinc((SV*)env));
1031 75           flags = G_EVAL|G_SCALAR;
1032             }
1033             else {
1034 407 50         mXPUSHs(feer_conn_2sv(c));
1035 407           flags = G_DISCARD|G_EVAL|G_VOID;
1036             }
1037              
1038 482           PUTBACK;
1039 482           int returned = call_sv(server->request_cb_cv, flags);
1040 482           SPAGAIN;
1041              
1042             trace("called request callback, errsv? %d\n", SvTRUE(ERRSV) ? 1 : 0);
1043              
1044 482 50         if (unlikely(SvTRUE(ERRSV))) {
    100          
1045 3           call_died(aTHX_ c, "request");
1046 3           returned = 0; // pretend nothing got returned
1047             }
1048              
1049 482           SV *psgi_response = NULL;
1050 482 100         if (server->request_cb_is_psgi && likely(returned >= 1)) {
    50          
1051 75           psgi_response = POPs;
1052 75           SvREFCNT_inc_void_NN(psgi_response);
1053             }
1054              
1055             trace("leaving request callback\n");
1056 482           PUTBACK;
1057              
1058 482 100         if (psgi_response) {
1059 75           feersum_handle_psgi_response(aTHX_ c, psgi_response, 1); // can_recurse
1060 75           SvREFCNT_dec(psgi_response);
1061             }
1062              
1063 482           c->in_callback--;
1064 482           SvREFCNT_dec(c->self);
1065              
1066 482 50         FREETMPS;
1067 482           LEAVE;
1068 482           }
1069              
1070             static void
1071 26           call_poll_callback (struct feer_conn *c, bool is_write)
1072             {
1073             dTHX;
1074 26           dSP;
1075              
1076 26 100         SV *cb = (is_write) ? c->poll_write_cb : c->poll_read_cb;
1077              
1078 26 50         if (unlikely(cb == NULL)) return;
1079              
1080 26           c->in_callback++;
1081              
1082             trace("%s poll callback c=%p cbrv=%p\n",
1083             is_write ? "write" : "read", c, cb);
1084              
1085 26           ENTER;
1086 26           SAVETMPS;
1087 26 50         PUSHMARK(SP);
1088 26           SV *hdl = new_feer_conn_handle(aTHX_ c, is_write);
1089 26 50         mXPUSHs(hdl);
1090 26           PUTBACK;
1091 26           call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
1092 26           SPAGAIN;
1093              
1094             trace("called %s poll callback, errsv? %d\n",
1095             is_write ? "write" : "read", SvTRUE(ERRSV) ? 1 : 0);
1096              
1097 26 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
1098 0 0         call_died(aTHX_ c, is_write ? "write poll" : "read poll");
1099             }
1100              
1101             // Neutralize the mortal handle before FREETMPS so its DESTROY doesn't
1102             // call feersum_close_handle and prematurely close the connection.
1103             // If the callback already called close(), SvUVX is already 0.
1104             {
1105 26           SV *inner = SvRV(hdl);
1106 26 50         if (SvUVX(inner)) {
1107 26           SvUVX(inner) = 0;
1108 26           SvREFCNT_dec(c->self); // balance new_feer_conn_handle's inc
1109             }
1110             }
1111              
1112             trace("leaving %s poll callback\n", is_write ? "write" : "read");
1113 26           PUTBACK;
1114 26 50         FREETMPS;
1115 26           LEAVE;
1116              
1117 26           c->in_callback--;
1118             }
1119              
1120             static void
1121 11           pump_io_handle (struct feer_conn *c)
1122             {
1123             dTHX;
1124 11           dSP;
1125              
1126 11 50         if (unlikely(c->poll_write_cb == NULL)) return;
1127              
1128 11           c->in_callback++;
1129              
1130             trace("pump io handle %d\n", c->fd);
1131              
1132 11           SV *old_rs = PL_rs;
1133 11 50         SvREFCNT_inc_simple_void(old_rs);
1134 11           PL_rs = sv_2mortal(newRV_noinc(newSViv(IO_PUMP_BUFSZ)));
1135 11           sv_setsv(get_sv("/", GV_ADD), PL_rs);
1136              
1137 11           ENTER;
1138 11           SAVETMPS;
1139              
1140 11 50         PUSHMARK(SP);
1141 11 50         XPUSHs(c->poll_write_cb);
1142 11           PUTBACK;
1143 11           int returned = call_method("getline", G_SCALAR|G_EVAL);
1144 11           SPAGAIN;
1145              
1146             trace("called getline on io handle fd=%d errsv=%d returned=%d\n",
1147             c->fd, SvTRUE(ERRSV) ? 1 : 0, returned);
1148              
1149 11 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
1150             /* Restore PL_rs before call_died — it invokes Feersum::DIED which
1151             * is Perl code that (or whose callees) may read $/ */
1152 0           PL_rs = old_rs;
1153 0           sv_setsv(get_sv("/", GV_ADD), old_rs);
1154 0           call_died(aTHX_ c, "getline on io handle");
1155             // Clear poll callback to prevent re-invocation after error
1156 0 0         if (c->poll_write_cb) {
1157 0           SvREFCNT_dec(c->poll_write_cb);
1158 0           c->poll_write_cb = NULL;
1159             }
1160 0           c->poll_write_cb_is_io_handle = 0;
1161 0           goto done_pump_io;
1162             }
1163              
1164 11           SV *ret = NULL;
1165 11 50         if (returned > 0)
1166 11           ret = POPs;
1167 11 50         if (ret && SvMAGICAL(ret))
    50          
1168 0           ret = sv_2mortal(newSVsv(ret));
1169              
1170 11 50         if (unlikely(!ret || !SvOK(ret))) {
    100          
1171             // returned undef, so call the close method out of nicety
1172 5 50         PUSHMARK(SP);
1173 5 50         XPUSHs(c->poll_write_cb);
1174 5           PUTBACK;
1175 5           call_method("close", G_VOID|G_DISCARD|G_EVAL);
1176 5           SPAGAIN;
1177              
1178 5 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
1179 0 0         trouble("Couldn't close body IO handle: %-p",ERRSV);
1180 0 0         sv_setsv(ERRSV, &PL_sv_undef);
1181             }
1182              
1183 5           SvREFCNT_dec(c->poll_write_cb);
1184 5           c->poll_write_cb = NULL;
1185 5           c->poll_write_cb_is_io_handle = 0;
1186 5           finish_wbuf(c);
1187 5           change_responding_state(c, RESPOND_SHUTDOWN);
1188              
1189 5           goto done_pump_io;
1190             }
1191              
1192 6 50         if (c->use_chunked)
1193 6           add_chunk_sv_to_wbuf(c, ret);
1194             else
1195 0           add_sv_to_wbuf(c, ret);
1196              
1197 11           done_pump_io:
1198             trace("leaving pump io handle %d\n", c->fd);
1199              
1200 11           PUTBACK;
1201 11 50         FREETMPS;
1202 11           LEAVE;
1203              
1204 11           PL_rs = old_rs;
1205 11           sv_setsv(get_sv("/", GV_ADD), old_rs);
1206 11           SvREFCNT_dec(old_rs);
1207              
1208 11           c->in_callback--;
1209             }
1210              
1211             static int
1212 9           psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
1213             {
1214 9           dSP;
1215              
1216 9           struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
1217             trace("invoking psgix.io magic for fd=%d\n", c->fd);
1218              
1219 9           sv_unmagic(sv, PERL_MAGIC_ext);
1220              
1221             #ifdef FEERSUM_HAS_H2
1222             /* H2 tunnel: create socketpair and expose sv[1] as the IO handle.
1223             * For PSGI: auto-send 200 HEADERS and enable response swallowing so
1224             * apps can use the same psgix.io code for H1 and H2 transparently. */
1225             if (c->is_h2_stream) {
1226             struct feer_h2_stream *stream = (struct feer_h2_stream *)c->read_ev_timer.data;
1227             if (!stream || !stream->is_tunnel) {
1228             trouble("psgix.io: not supported on regular H2 streams fd=%d\n", c->fd);
1229             return 0;
1230             }
1231             h2_tunnel_auto_accept(aTHX_ c, stream);
1232             /* Re-fetch stream: auto_accept → session_send may have freed it */
1233             stream = (struct feer_h2_stream *)c->read_ev_timer.data;
1234             if (!stream) {
1235             trouble("psgix.io: stream freed during auto_accept fd=%d\n", c->fd);
1236             return 0;
1237             }
1238             feer_h2_setup_tunnel(aTHX_ stream);
1239             if (!stream->tunnel_established) {
1240             trouble("psgix.io: tunnel setup failed fd=%d\n", c->fd);
1241             struct feer_conn *parent = stream->parent;
1242             SvREFCNT_inc_void_NN(parent->self);
1243             h2_submit_rst(parent->h2_session, stream->stream_id,
1244             NGHTTP2_INTERNAL_ERROR);
1245             feer_h2_session_send(parent);
1246             SvREFCNT_dec(parent->self);
1247             return 0;
1248             }
1249              
1250             ENTER;
1251             SAVETMPS;
1252             PUSHMARK(SP);
1253             XPUSHs(sv);
1254             mXPUSHs(newSViv(stream->tunnel_sv1));
1255             PUTBACK;
1256              
1257             call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
1258             SPAGAIN;
1259              
1260             if (unlikely(SvTRUE(ERRSV))) {
1261             call_died(aTHX_ c, "psgix.io H2 tunnel magic");
1262             } else {
1263             SV *io_glob = SvRV(sv);
1264             GvSV(io_glob) = newRV_inc(c->self);
1265             stream->tunnel_sv1 = -1;
1266             }
1267              
1268             c->io_taken = 1;
1269             PUTBACK;
1270             FREETMPS;
1271             LEAVE;
1272             return 0;
1273             }
1274             #endif
1275              
1276             #ifdef FEERSUM_HAS_TLS
1277             /* TLS tunnel: create socketpair relay for psgix.io over TLS */
1278 9 50         if (c->tls) {
1279 0           feer_tls_setup_tunnel(c);
1280 0 0         if (!c->tls_tunnel) {
1281 0           trouble("psgix.io: TLS tunnel setup failed fd=%d\n", c->fd);
1282 0           return 0;
1283             }
1284              
1285 0           ENTER;
1286 0           SAVETMPS;
1287 0 0         PUSHMARK(SP);
1288 0 0         XPUSHs(sv);
1289 0 0         mXPUSHs(newSViv(c->tls_tunnel_sv1));
1290 0           PUTBACK;
1291              
1292 0           call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
1293 0           SPAGAIN;
1294              
1295 0 0         if (unlikely(SvTRUE(ERRSV))) {
    0          
1296 0           call_died(aTHX_ c, "psgix.io TLS tunnel magic");
1297             /* fd NOT consumed by _raw on failure; conn cleanup will close it */
1298             } else {
1299 0           SV *io_glob = SvRV(sv);
1300 0           GvSV(io_glob) = newRV_inc(c->self);
1301 0           c->tls_tunnel_sv1 = -1;
1302             }
1303              
1304 0           c->io_taken = 1;
1305 0           stop_read_timer(c);
1306 0           stop_write_timer(c);
1307 0           PUTBACK;
1308 0 0         FREETMPS;
1309 0           LEAVE;
1310 0           return 0;
1311             }
1312             #endif
1313              
1314 9           ENTER;
1315 9           SAVETMPS;
1316              
1317 9 50         PUSHMARK(SP);
1318 9 50         XPUSHs(sv);
1319 9 50         mXPUSHs(newSViv(c->fd));
1320 9           PUTBACK;
1321              
1322 9           call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
1323 9           SPAGAIN;
1324              
1325 9 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
1326 0           call_died(aTHX_ c, "psgix.io magic");
1327             }
1328             else {
1329 9           SV *io_glob = SvRV(sv);
1330 9           GvSV(io_glob) = newRV_inc(c->self);
1331              
1332             // Put whatever remainder data into the socket buffer.
1333             // Optimizes for the websocket case.
1334             // Use return_from_psgix_io() to pull data back for keepalive.
1335 9 50         if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
    50          
    50          
    50          
1336             STRLEN rbuf_len;
1337 9           const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
1338 9           IO *io = GvIOp(io_glob);
1339 9 50         if (unlikely(!io)) {
1340 0           trouble("psgix.io: GvIOp returned NULL fd=%d\n", c->fd);
1341             // Skip unread, data will remain in rbuf
1342             }
1343             else {
1344             // PerlIO_unread copies the data internally, so it's safe to
1345             // clear rbuf after. Use SvCUR_set to keep buffer allocated
1346             // (more efficient for potential reuse).
1347 9           SSize_t pushed = PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
1348 9 50         if (likely(pushed == (SSize_t)rbuf_len)) {
1349             // All data pushed back successfully
1350 9           SvCUR_set(c->rbuf, 0);
1351 0 0         } else if (pushed > 0) {
1352             // Partial push - keep remaining data in rbuf
1353 0           sv_chop(c->rbuf, rbuf_ptr + pushed);
1354 0           trouble("PerlIO_unread partial: %zd of %"Sz_uf" bytes fd=%d\n",
1355             (size_t)pushed, (Sz)rbuf_len, c->fd);
1356             } else {
1357             // Push failed entirely - keep rbuf as-is
1358 0           trouble("PerlIO_unread failed in psgix.io magic fd=%d\n", c->fd);
1359             }
1360             }
1361             }
1362              
1363             // Stop Feersum's watchers - user now owns the socket
1364 9           stop_read_watcher(c);
1365 9           stop_read_timer(c);
1366 9           stop_write_timer(c);
1367             // don't stop write watcher in case there's outstanding data
1368              
1369 9           c->io_taken = 1;
1370             }
1371              
1372 9 50         FREETMPS;
1373 9           LEAVE;
1374              
1375 9           return 0;
1376             }
1377              
1378             #ifdef FEERSUM_HAS_H2
1379             static void
1380             h2_tunnel_auto_accept(pTHX_ struct feer_conn *c, struct feer_h2_stream *stream)
1381             {
1382             if (c->responding != RESPOND_NOT_STARTED)
1383             return;
1384             AV *empty_hdr = newAV();
1385             SV *status_sv = newSVpvs("200");
1386             feersum_start_response(aTHX_ c, status_sv, empty_hdr, 1);
1387             SvREFCNT_dec(status_sv);
1388             SvREFCNT_dec((SV *)empty_hdr);
1389             /* Re-fetch stream: feersum_start_response → feer_h2_session_send may
1390             * have freed it via h2_on_stream_close_cb (e.g. RST_STREAM queued) */
1391             stream = (struct feer_h2_stream *)c->read_ev_timer.data;
1392             if (stream)
1393             stream->tunnel_swallow_response = 1;
1394             }
1395              
1396             static void
1397             pump_h2_io_handle(pTHX_ struct feer_conn *c, SV *body)
1398             {
1399             dSP;
1400             SV *io = newSVsv(body);
1401             c->in_callback++;
1402             SV *old_rs = PL_rs;
1403             SvREFCNT_inc_simple_void(old_rs);
1404             PL_rs = sv_2mortal(newRV_noinc(newSViv(IO_PUMP_BUFSZ)));
1405             sv_setsv(get_sv("/", GV_ADD), PL_rs);
1406             ENTER;
1407             SAVETMPS;
1408             for (;;) {
1409             ENTER; SAVETMPS;
1410             PUSHMARK(SP);
1411             XPUSHs(io);
1412             PUTBACK;
1413             int returned = call_method("getline", G_SCALAR|G_EVAL);
1414             SPAGAIN;
1415             if (unlikely(SvTRUE(ERRSV))) {
1416             /* Restore PL_rs before call_died — it invokes Feersum::DIED which
1417             * is Perl code that (or whose callees) may read $/ */
1418             PL_rs = old_rs;
1419             sv_setsv(get_sv("/", GV_ADD), old_rs);
1420             call_died(aTHX_ c, "getline on H2 io handle");
1421             PUTBACK; FREETMPS; LEAVE;
1422             break;
1423             }
1424             SV *ret = NULL;
1425             if (returned > 0) ret = POPs;
1426             if (ret && SvMAGICAL(ret)) ret = sv_2mortal(newSVsv(ret));
1427             if (!ret || !SvOK(ret)) {
1428             PUSHMARK(SP); XPUSHs(io); PUTBACK;
1429             call_method("close", G_VOID|G_DISCARD|G_EVAL);
1430             SPAGAIN;
1431             if (unlikely(SvTRUE(ERRSV))) {
1432             trouble("Couldn't close body IO handle: %-p",ERRSV);
1433             sv_setsv(ERRSV, &PL_sv_undef);
1434             }
1435             PUTBACK; FREETMPS; LEAVE;
1436             feersum_h2_close_write(aTHX_ c);
1437             break;
1438             }
1439             feersum_h2_write_chunk(aTHX_ c, ret);
1440             PUTBACK; FREETMPS; LEAVE;
1441             }
1442             FREETMPS; LEAVE;
1443             PL_rs = old_rs;
1444             sv_setsv(get_sv("/", GV_ADD), old_rs);
1445             SvREFCNT_dec(old_rs);
1446             c->in_callback--;
1447             SvREFCNT_dec(io);
1448             }
1449              
1450             static SV*
1451             feersum_env_method_h2(pTHX_ struct feer_conn *c, struct feer_req *r)
1452             {
1453             if (unlikely(c->is_h2_stream)) {
1454             struct feer_h2_stream *stream = (struct feer_h2_stream *)c->read_ev_timer.data;
1455             if (stream && stream->is_tunnel && stream->h2_protocol)
1456             return SvREFCNT_inc_simple_NN(method_GET);
1457             }
1458             return feersum_env_method(aTHX_ r);
1459             }
1460             #endif