File Coverage

Feersum.xs
Criterion Covered Total %
statement 968 1211 79.9
branch 620 1308 47.4
condition n/a
subroutine n/a
pod n/a
total 1588 2519 63.0


line stmt bran cond sub pod time code
1             #include "EVAPI.h"
2             #include
3             #include
4             #include
5             #include
6             #include
7             #include
8             #include
9             #include
10             #include
11              
12             #include "ppport.h"
13              
14              
15             ///////////////////////////////////////////////////////////////
16             // "Compile Time Options" - See Feersum.pm POD for information
17              
18             #define MAX_HEADERS 64
19             #define MAX_HEADER_NAME_LEN 128
20             #define MAX_BODY_LEN 2147483647
21              
22             #define READ_BUFSZ 4096
23             #define READ_INIT_FACTOR 2
24             #define READ_GROW_FACTOR 8
25              
26             #define AUTOCORK_WRITES 1
27              
28             #if 0
29             # define FLASH_SOCKET_POLICY_SUPPORT
30             #endif
31              
32             #ifndef FLASH_SOCKET_POLICY
33             # define FLASH_SOCKET_POLICY "\n\n\n\n\n\n"
34             #endif
35              
36             // may be lower for your platform (e.g. Solaris is 16). See POD.
37             #define FEERSUM_IOMATRIX_SIZE 64
38              
39             // auto-detected in Makefile.PL by perl versions and ithread usage; override
40             // that here. See POD for details.
41             #if 0
42             # undef FEERSUM_STEAL
43             #endif
44              
45             ///////////////////////////////////////////////////////////////
46              
47              
48             #ifdef __GNUC__
49             # define likely(x) __builtin_expect(!!(x), 1)
50             # define unlikely(x) __builtin_expect(!!(x), 0)
51             #else
52             # define likely(x) (x)
53             # define unlikely(x) (x)
54             #endif
55              
56             #ifndef CRLF
57             #define CRLF "\015\012"
58             #endif
59             #define CRLFx2 CRLF CRLF
60              
61             // make darwin, solaris and bsd happy:
62             #ifndef SOL_TCP
63             #define SOL_TCP IPPROTO_TCP
64             #endif
65              
66             // Wish-list: %z formats for perl sprintf. Would make compiling a lot less
67             // noisy for systems that warn size_t and STRLEN are incompatible with
68             // %d/%u/%x.
69             #if Size_t_size == LONGSIZE
70             # define Sz_f "l"
71             # define Sz_t long
72             #elif Size_t_size == 8 && defined HAS_QUAD && QUADKIND == QUAD_IS_LONG_LONG
73             # define Sz_f "ll"
74             # define Sz_t long long
75             #else
76             // hope "int" works.
77             # define Sz_f ""
78             # define Sz_t int
79             #endif
80              
81             #define Sz_uf Sz_f"u"
82             #define Sz_xf Sz_f"x"
83             #define Ssz_df Sz_f"d"
84             #define Sz unsigned Sz_t
85             #define Ssz Sz_t
86              
87             #define WARN_PREFIX "Feersum: "
88              
89             #ifndef DEBUG
90             #ifndef __inline
91             #define __inline
92             #endif
93             #define INLINE_UNLESS_DEBUG __inline
94             #else
95             #define INLINE_UNLESS_DEBUG
96             #endif
97              
98             #define trouble(f_, ...) warn(WARN_PREFIX f_, ##__VA_ARGS__);
99              
100             #ifdef DEBUG
101             #define trace(f_, ...) warn("%s:%-4d [%d] " f_, __FILE__, __LINE__, (int)getpid(), ##__VA_ARGS__)
102             #else
103             #define trace(...)
104             #endif
105              
106             #if DEBUG >= 2
107             #define trace2(f_, ...) trace(f_, ##__VA_ARGS__)
108             #else
109             #define trace2(...)
110             #endif
111              
112             #if DEBUG >= 3
113             #define trace3(f_, ...) trace(f_, ##__VA_ARGS__)
114             #else
115             #define trace3(...)
116             #endif
117              
118             #include "picohttpparser-git/picohttpparser.c"
119             #include "rinq.c"
120              
121             // Check FEERSUM_IOMATRIX_SIZE against what's actually usable on this
122             // platform. See Feersum.pm for an explanation
123             #if defined(IOV_MAX) && FEERSUM_IOMATRIX_SIZE > IOV_MAX
124             # undef FEERSUM_IOMATRIX_SIZE
125             # define FEERSUM_IOMATRIX_SIZE IOV_MAX
126             #elif defined(UIO_MAXIOV) && FEERSUM_IOMATRIX_SIZE > UIO_MAXIOV
127             # undef FEERSUM_IOMATRIX_SIZE
128             # define FEERSUM_IOMATRIX_SIZE UIO_MAXIOV
129             #endif
130              
131             struct iomatrix {
132             unsigned offset;
133             unsigned count;
134             struct iovec iov[FEERSUM_IOMATRIX_SIZE];
135             SV *sv[FEERSUM_IOMATRIX_SIZE];
136             };
137              
138             struct feer_req {
139             SV *buf;
140             const char* method;
141             size_t method_len;
142             const char* path;
143             size_t path_len;
144             int minor_version;
145             size_t num_headers;
146             struct phr_header headers[MAX_HEADERS];
147             };
148              
149             enum feer_respond_state {
150             RESPOND_NOT_STARTED = 0,
151             RESPOND_NORMAL = 1,
152             RESPOND_STREAMING = 2,
153             RESPOND_SHUTDOWN = 3
154             };
155             #define RESPOND_STR(_n,_s) do { \
156             switch(_n) { \
157             case RESPOND_NOT_STARTED: _s = "NOT_STARTED(0)"; break; \
158             case RESPOND_NORMAL: _s = "NORMAL(1)"; break; \
159             case RESPOND_STREAMING: _s = "STREAMING(2)"; break; \
160             case RESPOND_SHUTDOWN: _s = "SHUTDOWN(4)"; break; \
161             } \
162             } while (0)
163              
164             enum feer_receive_state {
165             RECEIVE_HEADERS = 0,
166             RECEIVE_BODY = 1,
167             RECEIVE_STREAMING = 2,
168             RECEIVE_SHUTDOWN = 3
169             };
170             #define RECEIVE_STR(_n,_s) do { \
171             switch(_n) { \
172             case RECEIVE_HEADERS: _s = "HEADERS(0)"; break; \
173             case RECEIVE_BODY: _s = "BODY(1)"; break; \
174             case RECEIVE_STREAMING: _s = "STREAMING(2)"; break; \
175             case RECEIVE_SHUTDOWN: _s = "SHUTDOWN(3)"; break; \
176             } \
177             } while (0)
178              
179             struct feer_conn {
180             SV *self;
181             int fd;
182             struct sockaddr *sa;
183              
184             struct ev_io read_ev_io;
185             struct ev_io write_ev_io;
186             struct ev_timer read_ev_timer;
187              
188             SV *rbuf;
189             struct rinq *wbuf_rinq;
190              
191             SV *poll_write_cb;
192             SV *ext_guard;
193              
194             struct feer_req *req;
195             ssize_t expected_cl;
196             ssize_t received_cl;
197              
198             enum feer_respond_state responding;
199             enum feer_receive_state receiving;
200              
201             int in_callback;
202             int is_http11:1;
203             int poll_write_cb_is_io_handle:1;
204             int auto_cl:1;
205             };
206              
207             typedef struct feer_conn feer_conn_handle; // for typemap
208              
209             #define dCONN struct feer_conn *c = (struct feer_conn *)w->data
210             #define IsArrayRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVAV)
211             #define IsCodeRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVCV)
212              
213             static HV* feersum_env(pTHX_ struct feer_conn *c);
214             static void feersum_start_response
215             (pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming);
216             static size_t feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
217             static void feersum_handle_psgi_response(
218             pTHX_ struct feer_conn *c, SV *ret, bool can_recurse);
219             static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);
220             static SV* feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard);
221              
222             static void start_read_watcher(struct feer_conn *c);
223             static void stop_read_watcher(struct feer_conn *c);
224             static void restart_read_timer(struct feer_conn *c);
225             static void stop_read_timer(struct feer_conn *c);
226             static void start_write_watcher(struct feer_conn *c);
227             static void stop_write_watcher(struct feer_conn *c);
228              
229             static void try_conn_write(EV_P_ struct ev_io *w, int revents);
230             static void try_conn_read(EV_P_ struct ev_io *w, int revents);
231             static void conn_read_timeout(EV_P_ struct ev_timer *w, int revents);
232             static bool process_request_headers(struct feer_conn *c, int body_offset);
233             static void sched_request_callback(struct feer_conn *c);
234             static void call_died (pTHX_ struct feer_conn *c, const char *cb_type);
235             static void call_request_callback(struct feer_conn *c);
236             static void call_poll_callback (struct feer_conn *c, bool is_write);
237             static void pump_io_handle (struct feer_conn *c, SV *io);
238              
239             static void conn_write_ready (struct feer_conn *c);
240             static void respond_with_server_error(struct feer_conn *c, const char *msg, STRLEN msg_len, int code);
241              
242             static void update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov);
243             static STRLEN add_sv_to_wbuf (struct feer_conn *c, SV *sv);
244             static STRLEN add_const_to_wbuf (struct feer_conn *c, const char *str, size_t str_len);
245             #define add_crlf_to_wbuf(c) add_const_to_wbuf(c,CRLF,2)
246             static void finish_wbuf (struct feer_conn *c);
247             static void add_chunk_sv_to_wbuf (struct feer_conn *c, SV *sv);
248             static void add_placeholder_to_wbuf (struct feer_conn *c, SV **sv, struct iovec **iov_ref);
249              
250             static void uri_decode_sv (SV *sv);
251             static bool str_eq(const char *a, int a_len, const char *b, int b_len);
252             static bool str_case_eq(const char *a, int a_len, const char *b, int b_len);
253             static SV* fetch_av_normal (pTHX_ AV *av, I32 i);
254              
255             static const char *http_code_to_msg (int code);
256             static int prep_socket (int fd, int is_tcp);
257              
258             static HV *feer_stash, *feer_conn_stash;
259             static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
260             static MGVTBL psgix_io_vtbl;
261              
262             static SV *request_cb_cv = NULL;
263             static bool request_cb_is_psgi = 0;
264             static SV *shutdown_cb_cv = NULL;
265             static bool shutting_down = 0;
266             static int active_conns = 0;
267             static double read_timeout = 5.0;
268              
269             static SV *feer_server_name = NULL;
270             static SV *feer_server_port = NULL;
271              
272             static ev_io accept_w;
273             static ev_prepare ep;
274             static ev_check ec;
275             struct ev_idle ei;
276              
277             static struct rinq *request_ready_rinq = NULL;
278              
279             static AV *psgi_ver;
280             static SV *psgi_serv10, *psgi_serv11, *crlf_sv;
281              
282             // TODO: make this thread-local if and when there are multiple C threads:
283             struct ev_loop *feersum_ev_loop = NULL;
284             static HV *feersum_tmpl_env = NULL;
285              
286             INLINE_UNLESS_DEBUG
287             static SV*
288 101           fetch_av_normal (pTHX_ AV *av, I32 i)
289             {
290 101           SV **elt = av_fetch(av, i, 0);
291 101 50         if (elt == NULL) return NULL;
292 101           SV *sv = *elt;
293             // copy to remove magic
294 101 100         if (unlikely(SvMAGICAL(sv))) sv = sv_2mortal(newSVsv(sv));
295 101 100         if (unlikely(!SvOK(sv))) return NULL;
    50          
    50          
    50          
296             // usually array ref elems aren't RVs (for PSGI anyway)
297 98 100         if (unlikely(SvROK(sv))) sv = SvRV(sv);
298 98           return sv;
299             }
300              
301             INLINE_UNLESS_DEBUG
302             static struct iomatrix *
303 1193           next_iomatrix (struct feer_conn *c)
304             {
305 1193           bool add_iomatrix = 0;
306             struct iomatrix *m;
307              
308 1193 100         if (!c->wbuf_rinq) {
309             trace3("next_iomatrix(%d): head\n", c->fd);
310 156           add_iomatrix = 1;
311             }
312             else {
313             // get the tail-end struct
314 1037           m = (struct iomatrix *)c->wbuf_rinq->prev->ref;
315             trace3("next_iomatrix(%d): tail, count=%d, offset=%d\n",
316             c->fd, m->count, m->offset);
317 1037 50         if (m->count >= FEERSUM_IOMATRIX_SIZE) {
318 0           add_iomatrix = 1;
319             }
320             }
321              
322 1193 100         if (add_iomatrix) {
323             trace3("next_iomatrix(%d): malloc\n", c->fd);
324 156           Newx(m,1,struct iomatrix);
325 156           Poison(m,1,struct iomatrix);
326 156           m->offset = m->count = 0;
327 156           rinq_push(&c->wbuf_rinq, m);
328             }
329              
330             trace3("next_iomatrix(%d): end, count=%d, offset=%d\n",
331             c->fd, m->count, m->offset);
332 1193           return m;
333             }
334              
335             INLINE_UNLESS_DEBUG
336             static STRLEN
337 517           add_sv_to_wbuf(struct feer_conn *c, SV *sv)
338             {
339 517           struct iomatrix *m = next_iomatrix(c);
340 517           int idx = m->count++;
341             STRLEN cur;
342 517 100         if (unlikely(SvMAGICAL(sv))) {
343 2           sv = newSVsv(sv); // copy to force it to be normal.
344             }
345 515 50         else if (unlikely(SvPADTMP(sv))) {
346             // PADTMPs have their PVs re-used, so we can't simply keep a
347             // reference. TEMPs maybe behave in a similar way and are potentially
348             // stealable. If not stealing, we must make a copy.
349             #ifdef FEERSUM_STEAL
350 0 0         if (SvFLAGS(sv) == (SVs_PADTMP|SVf_POK|SVp_POK)) {
351             trace3("STEALING\n");
352 0           SV *theif = newSV(0);
353 0           sv_upgrade(theif, SVt_PV);
354              
355 0           SvPV_set(theif, SvPVX(sv));
356 0           SvLEN_set(theif, SvLEN(sv));
357 0           SvCUR_set(theif, SvCUR(sv));
358              
359             // make the temp null
360 0 0         (void)SvOK_off(sv);
361 0           SvPV_set(sv, NULL);
362 0           SvLEN_set(sv, 0);
363 0           SvCUR_set(sv, 0);
364              
365 0           SvFLAGS(theif) |= SVf_READONLY|SVf_POK|SVp_POK;
366              
367 0           sv = theif;
368             }
369             else {
370 0           sv = newSVsv(sv);
371             }
372             #else
373             sv = newSVsv(sv);
374             #endif
375             }
376             else {
377 515           sv = SvREFCNT_inc(sv);
378             }
379              
380 517 100         m->iov[idx].iov_base = SvPV(sv, cur);
381 517           m->iov[idx].iov_len = cur;
382 517           m->sv[idx] = sv;
383              
384 517           return cur;
385             }
386              
387             INLINE_UNLESS_DEBUG
388             static STRLEN
389 561           add_const_to_wbuf(struct feer_conn *c, const char *str, size_t str_len)
390             {
391 561           struct iomatrix *m = next_iomatrix(c);
392 561           int idx = m->count++;
393 561           m->iov[idx].iov_base = (void*)str;
394 561           m->iov[idx].iov_len = str_len;
395 561           m->sv[idx] = NULL;
396 561           return str_len;
397             }
398              
399             INLINE_UNLESS_DEBUG
400             static void
401 115           add_placeholder_to_wbuf(struct feer_conn *c, SV **sv, struct iovec **iov_ref)
402             {
403 115           struct iomatrix *m = next_iomatrix(c);
404 115           int idx = m->count++;
405 115           *sv = newSV(31);
406 115           SvPOK_on(*sv);
407 115           m->sv[idx] = *sv;
408 115           *iov_ref = &m->iov[idx];
409 115           }
410              
411             INLINE_UNLESS_DEBUG
412             static void
413 28           finish_wbuf(struct feer_conn *c)
414             {
415 28 100         if (!c->is_http11) return; // nothing required
416 24           add_const_to_wbuf(c, "0\r\n\r\n", 5); // terminating chunk
417             }
418              
419             INLINE_UNLESS_DEBUG
420             static void
421 115           update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov)
422             {
423             STRLEN cur;
424             // can't pass iov_len for cur; incompatible pointer type on some systems:
425 115 50         iov->iov_base = SvPV(sv,cur);
426 115           iov->iov_len = cur;
427 115           }
428              
429             static void
430 48           add_chunk_sv_to_wbuf(struct feer_conn *c, SV *sv)
431             {
432             SV *chunk;
433             struct iovec *chunk_iov;
434 48           add_placeholder_to_wbuf(c, &chunk, &chunk_iov);
435 48           STRLEN cur = add_sv_to_wbuf(c, sv);
436 48           add_crlf_to_wbuf(c);
437 48           sv_setpvf(chunk, "%"Sz_xf CRLF, (Sz)cur);
438 48           update_wbuf_placeholder(c, chunk, chunk_iov);
439 48           }
440              
441             static const char *
442 83           http_code_to_msg (int code) {
443             // http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
444 83           switch (code) {
445 0           case 100: return "Continue";
446 0           case 101: return "Switching Protocols";
447 0           case 102: return "Processing"; // RFC 2518
448 74           case 200: return "OK";
449 0           case 201: return "Created";
450 0           case 202: return "Accepted";
451 0           case 203: return "Non Authoritative Information";
452 0           case 204: return "No Content";
453 0           case 205: return "Reset Content";
454 0           case 206: return "Partial Content";
455 0           case 207: return "Multi-Status"; // RFC 4918 (WebDav)
456 0           case 300: return "Multiple Choices";
457 0           case 301: return "Moved Permanently";
458 1           case 302: return "Found";
459 0           case 303: return "See Other";
460 2           case 304: return "Not Modified";
461 0           case 305: return "Use Proxy";
462 0           case 307: return "Temporary Redirect";
463 1           case 400: return "Bad Request";
464 0           case 401: return "Unauthorized";
465 0           case 402: return "Payment Required";
466 0           case 403: return "Forbidden";
467 1           case 404: return "Not Found";
468 0           case 405: return "Method Not Allowed";
469 0           case 406: return "Not Acceptable";
470 0           case 407: return "Proxy Authentication Required";
471 2           case 408: return "Request Timeout";
472 0           case 409: return "Conflict";
473 0           case 410: return "Gone";
474 0           case 411: return "Length Required";
475 0           case 412: return "Precondition Failed";
476 0           case 413: return "Request Entity Too Large";
477 0           case 414: return "Request URI Too Long";
478 0           case 415: return "Unsupported Media Type";
479 0           case 416: return "Requested Range Not Satisfiable";
480 0           case 417: return "Expectation Failed";
481 0           case 418: return "I'm a teapot";
482 0           case 421: return "Too Many Connections"; // Microsoft?
483 0           case 422: return "Unprocessable Entity"; // RFC 4918
484 0           case 423: return "Locked"; // RFC 4918
485 0           case 424: return "Failed Dependency"; // RFC 4918
486 0           case 425: return "Unordered Collection"; // RFC 3648
487 0           case 426: return "Upgrade Required"; // RFC 2817
488 0           case 449: return "Retry With"; // Microsoft
489 0           case 450: return "Blocked by Parental Controls"; // Microsoft
490 2           case 500: return "Internal Server Error";
491 0           case 501: return "Not Implemented";
492 0           case 502: return "Bad Gateway";
493 0           case 503: return "Service Unavailable";
494 0           case 504: return "Gateway Timeout";
495 0           case 505: return "HTTP Version Not Supported";
496 0           case 506: return "Variant Also Negotiates"; // RFC 2295
497 0           case 507: return "Insufficient Storage"; // RFC 4918
498 0           case 509: return "Bandwidth Limit Exceeded"; // Apache mod
499 0           case 510: return "Not Extended"; // RFC 2774
500 0           case 530: return "User access denied"; // ??
501 0           default: break;
502             }
503              
504             // default to the Nxx group names in RFC 2616
505 0 0         if (100 <= code && code <= 199) {
    0          
506 0           return "Informational";
507             }
508 0 0         else if (200 <= code && code <= 299) {
    0          
509 0           return "Success";
510             }
511 0 0         else if (300 <= code && code <= 399) {
    0          
512 0           return "Redirection";
513             }
514 0 0         else if (400 <= code && code <= 499) {
    0          
515 0           return "Client Error";
516             }
517             else {
518 0           return "Error";
519             }
520             }
521              
522             static int
523 118           prep_socket(int fd, int is_tcp)
524             {
525             int flags;
526              
527             // make it non-blocking
528 118           flags = O_NONBLOCK;
529 118 50         if (unlikely(fcntl(fd, F_SETFL, flags) < 0))
530 0           return -1;
531              
532 118 50         if (likely(is_tcp)) {
533             // flush writes immediately
534 118           flags = 1;
535 118 50         if (unlikely(setsockopt(fd, SOL_TCP, TCP_NODELAY, &flags, sizeof(int))))
536 0           return -1;
537             }
538              
539             // handle URG data inline
540 118           flags = 1;
541 118 50         if (unlikely(setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &flags, sizeof(int))))
542 0           return -1;
543              
544             // disable lingering
545 118           struct linger linger = { .l_onoff = 0, .l_linger = 0 };
546 118 50         if (unlikely(setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger))))
547 0           return -1;
548              
549 118           return 0;
550             }
551              
552             INLINE_UNLESS_DEBUG static void
553 221           safe_close_conn(struct feer_conn *c, const char *where)
554             {
555 221 100         if (unlikely(c->fd < 0))
556 103           return;
557              
558             // make it blocking
559 118           fcntl(c->fd, F_SETFL, 0);
560              
561 118 50         if (unlikely(close(c->fd)))
562 0           perror(where);
563              
564 118           c->fd = -1;
565             }
566              
567             static struct feer_conn *
568 118           new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
569             {
570 118           SV *self = newSV(0);
571 118 50         SvUPGRADE(self, SVt_PVMG); // ensures sv_bless doesn't reallocate
572 118 50         SvGROW(self, sizeof(struct feer_conn));
    50          
573 118           SvPOK_only(self);
574 118           SvIOK_on(self);
575 118           SvIV_set(self,conn_fd);
576              
577 118           struct feer_conn *c = (struct feer_conn *)SvPVX(self);
578 118           Zero(c, 1, struct feer_conn);
579              
580 118           c->self = self;
581 118           c->fd = conn_fd;
582 118           c->sa = sa;
583 118           c->responding = RESPOND_NOT_STARTED;
584 118           c->receiving = RECEIVE_HEADERS;
585              
586 118           ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
587 118           c->read_ev_io.data = (void *)c;
588              
589 118           ev_init(&c->read_ev_timer, conn_read_timeout);
590 118           c->read_ev_timer.data = (void *)c;
591              
592             trace3("made conn fd=%d self=%p, c=%p, cur=%"Sz_uf", len=%"Sz_uf"\n",
593             c->fd, self, c, (Sz)SvCUR(self), (Sz)SvLEN(self));
594              
595 118           SV *rv = newRV_inc(c->self);
596 118           sv_bless(rv, feer_conn_stash); // so DESTROY can get called on read errors
597 118           SvREFCNT_dec(rv);
598              
599 118           SvREADONLY_on(self); // turn off later for blessing
600 118           active_conns++;
601 118           return c;
602             }
603              
604             // for use in the typemap:
605             INLINE_UNLESS_DEBUG
606             static struct feer_conn *
607 204           sv_2feer_conn (SV *rv)
608             {
609 204 50         if (unlikely(!sv_isa(rv,"Feersum::Connection")))
610 0           croak("object is not of type Feersum::Connection");
611 204           return (struct feer_conn *)SvPVX(SvRV(rv));
612             }
613              
614             INLINE_UNLESS_DEBUG
615             static SV*
616 125           feer_conn_2sv (struct feer_conn *c)
617             {
618 125           return newRV_inc(c->self);
619             }
620              
621             static feer_conn_handle *
622 108           sv_2feer_conn_handle (SV *rv, bool can_croak)
623             {
624             trace3("sv 2 conn_handle\n");
625 108 50         if (unlikely(!SvROK(rv))) croak("Expected a reference");
626             // do not allow subclassing
627 108           SV *sv = SvRV(rv);
628 108 50         if (likely(
    100          
    50          
    50          
629             sv_isobject(rv) &&
630             (SvSTASH(sv) == feer_conn_writer_stash ||
631             SvSTASH(sv) == feer_conn_reader_stash)
632             )) {
633 108 50         UV uv = SvUV(sv);
634 108 100         if (uv == 0) {
635 21 100         if (can_croak) croak("Operation not allowed: Handle is closed.");
636 17           return NULL;
637             }
638 87           return INT2PTR(feer_conn_handle*,uv);
639             }
640              
641 0 0         if (can_croak)
642 0           croak("Expected a Feersum::Connection::Writer or ::Reader object");
643 0           return NULL;
644             }
645              
646             static SV *
647 28           new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
648             {
649             SV *sv;
650 28           SvREFCNT_inc_void_NN(c->self);
651 28           sv = newRV_noinc(newSVuv(PTR2UV(c)));
652 28 100         sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
653 28           return sv;
654             }
655              
656             #if DEBUG
657             # define change_responding_state(c, _to) do { \
658             enum feer_respond_state __to = (_to); \
659             enum feer_respond_state __from = c->responding; \
660             const char *_from_str, *_to_str; \
661             if (likely(__from != __to)) { \
662             RESPOND_STR(c->responding, _from_str); \
663             RESPOND_STR(__to, _to_str); \
664             trace2("==> responding state %d: %s to %s\n", \
665             c->fd,_from_str,_to_str); \
666             c->responding = __to; \
667             } \
668             } while (0)
669             # define change_receiving_state(c, _to) do { \
670             enum feer_receive_state __to = (_to); \
671             enum feer_receive_state __from = c->receiving; \
672             const char *_from_str, *_to_str; \
673             if (likely(__from != __to)) { \
674             RECEIVE_STR(c->receiving, _from_str); \
675             RECEIVE_STR(__to, _to_str); \
676             trace2("==> receiving state %d: %s to %s\n", \
677             c->fd,_from_str,_to_str); \
678             c->receiving = __to; \
679             } \
680             } while (0)
681             #else
682             # define change_responding_state(c, _to) c->responding = _to
683             # define change_receiving_state(c, _to) c->receiving = _to
684             #endif
685              
686             INLINE_UNLESS_DEBUG static void
687 125           start_read_watcher(struct feer_conn *c) {
688 125 100         if (unlikely(ev_is_active(&c->read_ev_io)))
689 7           return;
690             trace("start read watcher %d\n",c->fd);
691 118           ev_io_start(feersum_ev_loop, &c->read_ev_io);
692 118           SvREFCNT_inc_void_NN(c->self);
693             }
694              
695             INLINE_UNLESS_DEBUG static void
696 131           stop_read_watcher(struct feer_conn *c) {
697 131 100         if (unlikely(!ev_is_active(&c->read_ev_io)))
698 13           return;
699             trace("stop read watcher %d\n",c->fd);
700 118           ev_io_stop(feersum_ev_loop, &c->read_ev_io);
701 118           SvREFCNT_dec(c->self);
702             }
703              
704             INLINE_UNLESS_DEBUG static void
705 124           restart_read_timer(struct feer_conn *c) {
706 124 100         if (likely(!ev_is_active(&c->read_ev_timer))) {
707             trace("restart read timer %d\n",c->fd);
708 118           c->read_ev_timer.repeat = read_timeout;
709 118           SvREFCNT_inc_void_NN(c->self);
710             }
711 124           ev_timer_again(feersum_ev_loop, &c->read_ev_timer);
712 124           }
713              
714             INLINE_UNLESS_DEBUG static void
715 131           stop_read_timer(struct feer_conn *c) {
716 131 100         if (unlikely(!ev_is_active(&c->read_ev_timer)))
717 13           return;
718             trace("stop read timer %d\n",c->fd);
719 118           ev_timer_stop(feersum_ev_loop, &c->read_ev_timer);
720 118           SvREFCNT_dec(c->self);
721             }
722              
723             INLINE_UNLESS_DEBUG static void
724 164           start_write_watcher(struct feer_conn *c) {
725 164 100         if (unlikely(ev_is_active(&c->write_ev_io)))
726 46           return;
727             trace("start write watcher %d\n",c->fd);
728 118           ev_io_start(feersum_ev_loop, &c->write_ev_io);
729 118           SvREFCNT_inc_void_NN(c->self);
730             }
731              
732             INLINE_UNLESS_DEBUG static void
733 125           stop_write_watcher(struct feer_conn *c) {
734 125 100         if (unlikely(!ev_is_active(&c->write_ev_io)))
735 7           return;
736             trace("stop write watcher %d\n",c->fd);
737 118           ev_io_stop(feersum_ev_loop, &c->write_ev_io);
738 118           SvREFCNT_dec(c->self);
739             }
740              
741              
742             static void
743 77           process_request_ready_rinq (void)
744             {
745 185 100         while (request_ready_rinq) {
746 108           struct feer_conn *c =
747             (struct feer_conn *)rinq_shift(&request_ready_rinq);
748             //trace("rinq shifted c=%p, head=%p\n", c, request_ready_rinq);
749              
750 108           call_request_callback(c);
751              
752 108 100         if (likely(c->wbuf_rinq)) {
753             // this was deferred until after the perl callback
754 97           conn_write_ready(c);
755             }
756 108           SvREFCNT_dec(c->self); // for the rinq
757             }
758 77           }
759              
760             static void
761 28           prepare_cb (EV_P_ ev_prepare *w, int revents)
762             {
763 28 50         if (unlikely(revents & EV_ERROR)) {
764 0           trouble("EV error in prepare, revents=0x%08x\n", revents);
765 0           ev_break(EV_A, EVBREAK_ALL);
766 0           return;
767             }
768              
769 28 50         if (!ev_is_active(&accept_w) && !shutting_down) {
    50          
770 28           ev_io_start(EV_A, &accept_w);
771             }
772 28           ev_prepare_stop(EV_A, w);
773             }
774              
775             static void
776 501           check_cb (EV_P_ ev_check *w, int revents)
777             {
778 501 50         if (unlikely(revents & EV_ERROR)) {
779 0           trouble("EV error in check, revents=0x%08x\n", revents);
780 0           ev_break(EV_A, EVBREAK_ALL);
781 0           return;
782             }
783             trace3("check! head=%p\n", request_ready_rinq);
784 501 100         if (request_ready_rinq)
785 77           process_request_ready_rinq();
786             }
787              
788             static void
789 76           idle_cb (EV_P_ ev_idle *w, int revents)
790             {
791 76 50         if (unlikely(revents & EV_ERROR)) {
792 0           trouble("EV error in idle, revents=0x%08x\n", revents);
793 0           ev_break(EV_A, EVBREAK_ALL);
794 0           return;
795             }
796             trace3("idle! head=%p\n", request_ready_rinq);
797 76 50         if (request_ready_rinq)
798 0           process_request_ready_rinq();
799 76           ev_idle_stop(EV_A, w);
800             }
801              
802             static void
803 156           try_conn_write(EV_P_ struct ev_io *w, int revents)
804             {
805 156           dCONN;
806             int i;
807             struct iomatrix *m;
808              
809 156           SvREFCNT_inc_void_NN(c->self);
810              
811             // if it's marked writeable EV suggests we simply try write to it.
812             // Otherwise it is stopped and we should ditch this connection.
813 156 50         if (unlikely(revents & EV_ERROR && !(revents & EV_WRITE))) {
    0          
814             trace("EV error on write, fd=%d revents=0x%08x\n", w->fd, revents);
815 0           change_responding_state(c, RESPOND_SHUTDOWN);
816 0           goto try_write_finished;
817             }
818              
819 156 100         if (unlikely(!c->wbuf_rinq)) {
820 42 50         if (unlikely(c->responding >= RESPOND_SHUTDOWN))
821 0           goto try_write_finished;
822              
823 42 50         if (!c->poll_write_cb) {
824             // no callback and no data: wait for app to push to us.
825 0 0         if (c->responding == RESPOND_STREAMING)
826 0           goto try_write_paused;
827              
828             trace("tried to write with an empty buffer %d resp=%d\n",w->fd,c->responding);
829 0           change_responding_state(c, RESPOND_SHUTDOWN);
830 0           goto try_write_finished;
831             }
832              
833 42 100         if (c->poll_write_cb_is_io_handle)
834 38           pump_io_handle(c, c->poll_write_cb);
835             else
836 4           call_poll_callback(c, 1);
837              
838             // callback didn't write anything:
839 42 50         if (unlikely(!c->wbuf_rinq)) goto try_write_again;
840             }
841              
842             try_write_again_immediately:
843 156           m = (struct iomatrix *)c->wbuf_rinq->ref;
844             #if DEBUG >= 2
845             warn("going to write to %d:\n",c->fd);
846             for (i=0; i < m->count; i++) {
847             fprintf(stderr,"%.*s",
848             (int)m->iov[i].iov_len, (char*)m->iov[i].iov_base);
849             }
850             #endif
851              
852             trace("going to write %d off=%d count=%d\n", w->fd, m->offset, m->count);
853 156           errno = 0;
854 156           ssize_t wrote = writev(w->fd, &m->iov[m->offset], m->count - m->offset);
855             trace("wrote %"Ssz_df" bytes to %d, errno=%d\n", (Ssz)wrote, w->fd, errno);
856              
857 156 50         if (unlikely(wrote <= 0)) {
858 0 0         if (unlikely(wrote == 0))
859 0           goto try_write_again;
860 0 0         if (likely(errno == EAGAIN || errno == EINTR))
    0          
861             goto try_write_again;
862 0           perror("Feersum try_conn_write");
863 0           change_responding_state(c, RESPOND_SHUTDOWN);
864 0           goto try_write_finished;
865             }
866              
867 1349 100         for (i = m->offset; i < m->count && wrote > 0; i++) {
    50          
868 1193           struct iovec *v = &m->iov[i];
869 1193 50         if (unlikely(v->iov_len > wrote)) {
870             trace3("offset vector %d base=%p len=%"Sz_uf"\n",
871             w->fd, v->iov_base, (Sz)v->iov_len);
872 0           v->iov_base += wrote;
873 0           v->iov_len -= wrote;
874             // don't consume any more:
875 0           wrote = 0;
876             }
877             else {
878             trace3("consume vector %d base=%p len=%"Sz_uf" sv=%p\n",
879             w->fd, v->iov_base, (Sz)v->iov_len, m->sv[i]);
880 1193           wrote -= v->iov_len;
881 1193           m->offset++;
882 1193 100         if (m->sv[i]) {
883 632           SvREFCNT_dec(m->sv[i]);
884 632           m->sv[i] = NULL;
885             }
886             }
887             }
888              
889 156 50         if (likely(m->offset >= m->count)) {
890             trace2("all done with iomatrix %d state=%d\n",w->fd,c->responding);
891 156           rinq_shift(&c->wbuf_rinq);
892 156           Safefree(m);
893 156 50         if (!c->wbuf_rinq)
894 156           goto try_write_finished;
895             trace2("write again immediately %d state=%d\n",w->fd,c->responding);
896 0           goto try_write_again_immediately;
897             }
898             // else, fallthrough:
899             trace2("write fallthrough %d state=%d\n",w->fd,c->responding);
900              
901             try_write_again:
902             trace("write again %d state=%d\n",w->fd,c->responding);
903 38           start_write_watcher(c);
904 38           goto try_write_cleanup;
905              
906             try_write_finished:
907             // should always be responding, but just in case
908 156           switch(c->responding) {
909             case RESPOND_NOT_STARTED:
910             // the write watcher shouldn't ever get called before starting to
911             // respond. Shut it down if it does.
912             trace("unexpected try_write when response not started %d\n",c->fd);
913 0           goto try_write_shutdown;
914             case RESPOND_NORMAL:
915 0           goto try_write_shutdown;
916             case RESPOND_STREAMING:
917 53 100         if (c->poll_write_cb) goto try_write_again;
918 15           else goto try_write_paused;
919             case RESPOND_SHUTDOWN:
920 103           goto try_write_shutdown;
921             default:
922 0           goto try_write_cleanup;
923             }
924              
925             try_write_paused:
926             trace3("write PAUSED %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
927 15           stop_write_watcher(c);
928 15           goto try_write_cleanup;
929              
930             try_write_shutdown:
931             trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
932 103           change_responding_state(c, RESPOND_SHUTDOWN);
933 103           stop_write_watcher(c);
934 103           safe_close_conn(c, "close at write shutdown");
935              
936             try_write_cleanup:
937 156           SvREFCNT_dec(c->self);
938 156           return;
939             }
940              
941             static int
942 111           try_parse_http(struct feer_conn *c, size_t last_read)
943             {
944 111           struct feer_req *req = c->req;
945 111 50         if (likely(!req)) {
946 111           Newxz(req,1,struct feer_req);
947 111           c->req = req;
948             }
949              
950             // GH#12 - incremental parsing sets num_headers to 0 each time; force it
951             // back on every invocation
952 111           req->num_headers = MAX_HEADERS;
953              
954 111           return phr_parse_request(SvPVX(c->rbuf), SvCUR(c->rbuf),
955             &req->method, &req->method_len,
956             &req->path, &req->path_len, &req->minor_version,
957 111           req->headers, &req->num_headers,
958 111           (SvCUR(c->rbuf)-last_read));
959             }
960              
961             static void
962 123           try_conn_read(EV_P_ ev_io *w, int revents)
963             {
964 123           dCONN;
965 123           SvREFCNT_inc_void_NN(c->self);
966              
967             // if it's marked readable EV suggests we simply try read it. Otherwise it
968             // is stopped and we should ditch this connection.
969 123 50         if (unlikely(revents & EV_ERROR && !(revents & EV_READ))) {
    0          
970             trace("EV error on read, fd=%d revents=0x%08x\n", w->fd, revents);
971 0           goto try_read_error;
972             }
973              
974 123 50         if (unlikely(c->receiving == RECEIVE_SHUTDOWN))
975 0           goto dont_read_again;
976              
977             trace("try read %d\n",w->fd);
978              
979 123 100         if (likely(!c->rbuf)) { // likely = optimize for small requests
980             trace("init rbuf for %d\n",w->fd);
981 118           c->rbuf = newSV(READ_INIT_FACTOR*READ_BUFSZ + 1);
982 118           SvPOK_on(c->rbuf);
983             }
984              
985 123           ssize_t space_free = SvLEN(c->rbuf) - SvCUR(c->rbuf);
986 123 100         if (unlikely(space_free < READ_BUFSZ)) { // unlikely = optimize for small
987 3           size_t new_len = SvLEN(c->rbuf) + READ_GROW_FACTOR*READ_BUFSZ;
988             trace("moar memory %d: %"Sz_uf" to %"Sz_uf"\n",
989             w->fd, (Sz)SvLEN(c->rbuf), (Sz)new_len);
990 3 50         SvGROW(c->rbuf, new_len);
    50          
991 3           space_free += READ_GROW_FACTOR*READ_BUFSZ;
992             }
993              
994 123           char *cur = SvPVX(c->rbuf) + SvCUR(c->rbuf);
995 123           ssize_t got_n = read(w->fd, cur, space_free);
996              
997 123 100         if (unlikely(got_n <= 0)) {
998 7 50         if (unlikely(got_n == 0)) {
999             trace("EOF before complete request: %d\n",w->fd,SvCUR(c->rbuf));
1000 7           goto try_read_error;
1001             }
1002 0 0         if (likely(errno == EAGAIN || errno == EINTR))
    0          
1003             goto try_read_again;
1004 0           perror("try_conn_read error");
1005 0           goto try_read_error;
1006             }
1007              
1008             trace("read %d %"Ssz_df"\n", w->fd, (Ssz)got_n);
1009 116           SvCUR(c->rbuf) += got_n;
1010             // likely = optimize for small requests
1011 116 100         if (likely(c->receiving == RECEIVE_HEADERS)) {
1012              
1013             #ifdef FLASH_SOCKET_POLICY_SUPPORT
1014             if (unlikely(*SvPVX(c->rbuf) == '<')) {
1015             if (likely(SvCUR(c->rbuf) >= 22)) { // length of vvv
1016             if (str_eq(SvPVX(c->rbuf), 22, "", 22)) {
1017             add_const_to_wbuf(c, STR_WITH_LEN(FLASH_SOCKET_POLICY));
1018             conn_write_ready(c);
1019             stop_read_watcher(c);
1020             stop_read_timer(c);
1021             // TODO: keep-alives: be sure to remove the 22 bytes
1022             // out of the rbuf
1023             change_receiving_state(c, RECEIVE_SHUTDOWN);
1024             change_responding_state(c, RESPOND_SHUTDOWN);
1025             goto dont_read_again;
1026             }
1027             }
1028             // "if prefixed with"
1029             else if (likely(str_eq(SvPVX(c->rbuf), SvCUR(c->rbuf),
1030             "", SvCUR(c->rbuf))))
1031             {
1032             goto try_read_again;
1033             }
1034             }
1035             #endif
1036              
1037 111           int ret = try_parse_http(c, (size_t)got_n);
1038 111 100         if (ret == -1) goto try_read_bad;
1039 110 100         if (ret == -2) goto try_read_again;
1040              
1041 109 100         if (process_request_headers(c, ret))
1042 4           goto try_read_again_reset_timer;
1043             else
1044 105           goto dont_read_again;
1045             }
1046 5 50         else if (likely(c->receiving == RECEIVE_BODY)) {
1047 5           c->received_cl += got_n;
1048 5 100         if (c->received_cl < c->expected_cl)
1049 2           goto try_read_again_reset_timer;
1050             // body is complete
1051 3           sched_request_callback(c);
1052 3           goto dont_read_again;
1053             }
1054             else {
1055 0           trouble("unknown read state %d %d", w->fd, c->receiving);
1056             }
1057              
1058             // fallthrough:
1059             try_read_error:
1060             trace("READ ERROR %d, refcnt=%d\n", w->fd, SvREFCNT(c->self));
1061 7           change_receiving_state(c, RECEIVE_SHUTDOWN);
1062 7           change_responding_state(c, RESPOND_SHUTDOWN);
1063 7           stop_read_watcher(c);
1064 7           stop_read_timer(c);
1065 7           stop_write_watcher(c);
1066 7           goto try_read_cleanup;
1067              
1068             try_read_bad:
1069             trace("bad request %d\n", w->fd);
1070 1           respond_with_server_error(c, "Malformed request.\n", 0, 400);
1071             // TODO: when keep-alive, close conn instead of fallthrough here.
1072             // fallthrough:
1073             dont_read_again:
1074             trace("done reading %d\n", w->fd);
1075 109           change_receiving_state(c, RECEIVE_SHUTDOWN);
1076 109           stop_read_watcher(c);
1077 109           stop_read_timer(c);
1078 109           goto try_read_cleanup;
1079              
1080             try_read_again_reset_timer:
1081             trace("(reset read timer) %d\n", w->fd);
1082 6           restart_read_timer(c);
1083             // fallthrough:
1084             try_read_again:
1085             trace("read again %d\n", w->fd);
1086 7           start_read_watcher(c);
1087              
1088             try_read_cleanup:
1089 123           SvREFCNT_dec(c->self);
1090 123           }
1091              
1092             static void
1093 2           conn_read_timeout (EV_P_ ev_timer *w, int revents)
1094             {
1095 2           dCONN;
1096 2           SvREFCNT_inc_void_NN(c->self);
1097              
1098 2 50         if (unlikely(!(revents & EV_TIMER) || c->receiving == RECEIVE_SHUTDOWN)) {
    50          
1099             // if there's no EV_TIMER then EV has stopped it on an error
1100 0 0         if (revents & EV_ERROR)
1101 0           trouble("EV error on read timer, fd=%d revents=0x%08x\n",
1102             c->fd,revents);
1103 0           goto read_timeout_cleanup;
1104             }
1105              
1106             trace("read timeout %d\n", c->fd);
1107              
1108 2 50         if (likely(c->responding == RESPOND_NOT_STARTED)) {
1109             const char *msg;
1110 2 100         if (c->receiving == RECEIVE_HEADERS) {
1111 1           msg = "Headers took too long.";
1112             }
1113             else {
1114 1           msg = "Timeout reading body.";
1115             }
1116 2           respond_with_server_error(c, msg, 0, 408);
1117             }
1118             else {
1119             // XXX as of 0.984 this appears to be dead code
1120             trace("read timeout while writing %d\n",c->fd);
1121 0           stop_write_watcher(c);
1122 0           stop_read_watcher(c);
1123 0           stop_read_timer(c);
1124 0           safe_close_conn(c, "close at read timeout");
1125 0           change_responding_state(c, RESPOND_SHUTDOWN);
1126             }
1127              
1128             read_timeout_cleanup:
1129 2           stop_read_watcher(c);
1130 2           stop_read_timer(c);
1131 2           SvREFCNT_dec(c->self);
1132 2           }
1133              
1134             static void
1135 86           accept_cb (EV_P_ ev_io *w, int revents)
1136             {
1137             struct sockaddr_storage sa_buf;
1138             socklen_t sa_len;
1139              
1140 86 50         if (unlikely(shutting_down)) {
1141             // shouldn't get called, but be defensive
1142 0           ev_io_stop(EV_A, w);
1143 0           close(w->fd);
1144 0           return;
1145             }
1146              
1147 86 50         if (unlikely(revents & EV_ERROR)) {
1148 0           trouble("EV error in accept_cb, fd=%d, revents=0x%08x\n",w->fd,revents);
1149 0           ev_break(EV_A, EVBREAK_ALL);
1150 0           return;
1151             }
1152              
1153             trace2("accept! revents=0x%08x\n", revents);
1154              
1155             while (1) {
1156 204           sa_len = sizeof(struct sockaddr_storage);
1157 204           errno = 0;
1158              
1159 204           int fd = accept(w->fd, (struct sockaddr *)&sa_buf, &sa_len);
1160             trace("accepted fd=%d, errno=%d\n", fd, errno);
1161 204 100         if (fd == -1) break;
1162              
1163 118           int is_tcp = 1;
1164             #ifdef AF_UNIX
1165 118 50         if (unlikely(sa_buf.ss_family == AF_UNIX)) is_tcp = 0;
1166             #endif
1167              
1168             assert(sa_len <= sizeof(struct sockaddr_storage));
1169 118 50         if (unlikely(prep_socket(fd, is_tcp))) {
1170 0           perror("prep_socket");
1171 0           trouble("prep_socket failed for %d\n", fd);
1172 0           close(fd);
1173 0           continue;
1174             }
1175              
1176 118           struct sockaddr *sa = (struct sockaddr *)malloc(sa_len);
1177 118           memcpy(sa,&sa_buf,(size_t)sa_len);
1178 118           struct feer_conn *c = new_feer_conn(EV_A,fd,sa);
1179 118           start_read_watcher(c);
1180 118           restart_read_timer(c);
1181             assert(SvREFCNT(c->self) == 3);
1182 118           SvREFCNT_dec(c->self);
1183 204           }
1184             }
1185              
1186             static void
1187 108           sched_request_callback (struct feer_conn *c)
1188             {
1189             trace("sched req callback: %d c=%p, head=%p\n", c->fd, c, request_ready_rinq);
1190 108           rinq_push(&request_ready_rinq, c);
1191 108           SvREFCNT_inc_void_NN(c->self); // for the rinq
1192 108 100         if (!ev_is_active(&ei)) {
1193 77           ev_idle_start(feersum_ev_loop, &ei);
1194             }
1195 108           }
1196              
1197             // the unlikely/likely annotations here are trying to optimize for GET first
1198             // and POST second. Other entity-body requests are third in line.
1199             static bool
1200 109           process_request_headers (struct feer_conn *c, int body_offset)
1201             {
1202             int err_code;
1203             const char *err;
1204 109           struct feer_req *req = c->req;
1205              
1206             trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
1207             bool body_is_required;
1208 109           bool next_req_follows = 0;
1209              
1210 109           c->is_http11 = (req->minor_version == 1);
1211              
1212 109           change_receiving_state(c, RECEIVE_BODY);
1213              
1214 109 100         if (likely(str_eq("GET", 3, req->method, req->method_len))) {
1215             // Not supposed to have a body. Additional bytes are either a
1216             // mistake, a websocket negotiation or pipelined requests under
1217             // HTTP/1.1
1218 98           next_req_follows = 1;
1219             }
1220 11 50         else if (likely(str_eq("OPTIONS", 7, req->method, req->method_len))) {
1221 0           body_is_required = 1;
1222 0           next_req_follows = 1;
1223             }
1224 11 50         else if (likely(str_eq("POST", 4, req->method, req->method_len))) {
1225 11           body_is_required = 1;
1226             }
1227 0 0         else if (str_eq("PUT", 3, req->method, req->method_len)) {
1228 0           body_is_required = 1;
1229             }
1230 0           else if (str_eq("HEAD", 4, req->method, req->method_len) ||
1231 0           str_eq("DELETE", 6, req->method, req->method_len))
1232             {
1233 0           next_req_follows = 1;
1234             }
1235             else {
1236 0           err = "Feersum doesn't support that method yet\n";
1237 0           err_code = 405;
1238 0           goto got_bad_request;
1239             }
1240              
1241             #if DEBUG >= 2
1242             if (next_req_follows)
1243             trace2("next req follows fd=%d, boff=%d\n",c->fd,body_offset);
1244             if (body_is_required)
1245             trace2("body is required fd=%d, boff=%d\n",c->fd,body_offset);
1246             #endif
1247              
1248             // a body or follow-on data potentially follows the headers. Let feer_req
1249             // retain its pointers into rbuf and make a new scalar for more body data.
1250             STRLEN from_len;
1251 109 50         char *from = SvPV(c->rbuf,from_len);
1252 109           from += body_offset;
1253 109           int need = from_len - body_offset;
1254 109           int new_alloc = (need > READ_INIT_FACTOR*READ_BUFSZ)
1255 109 50         ? need : READ_INIT_FACTOR*READ_BUFSZ-1;
1256             trace("new rbuf for body %d need=%d alloc=%d\n",c->fd, need, new_alloc);
1257 109 100         SV *new_rbuf = newSVpvn(need ? from : "", need);
1258              
1259 109           req->buf = c->rbuf;
1260 109           c->rbuf = new_rbuf;
1261 109           SvCUR_set(req->buf, body_offset);
1262              
1263 109 100         if (likely(next_req_follows)) // optimize for GET
1264 98           goto got_it_all;
1265              
1266             // determine how much we need to read
1267             int i;
1268 11           UV expected = 0;
1269 31 50         for (i=0; i < req->num_headers; i++) {
1270 31           struct phr_header *hdr = &req->headers[i];
1271 31 50         if (!hdr->name) continue;
1272             // XXX: ignore multiple C-L headers?
1273 31 100         if (unlikely(
1274             str_case_eq("content-length", 14, hdr->name, hdr->name_len)))
1275             {
1276 11           int g = grok_number(hdr->value, hdr->value_len, &expected);
1277 11 50         if (likely(g == IS_NUMBER_IN_UV)) {
1278 11 50         if (unlikely(expected > MAX_BODY_LEN)) {
1279 0           err_code = 413;
1280 0           err = "Content length exceeds maximum\n";
1281 0           goto got_bad_request;
1282             }
1283             else
1284 11           goto got_cl;
1285             }
1286             else {
1287 0           err_code = 400;
1288 0           err = "invalid content-length\n";
1289 0           goto got_bad_request;
1290             }
1291             }
1292             // TODO: support "Connection: close" bodies
1293             // TODO: support "Transfer-Encoding: chunked" bodies
1294             }
1295              
1296 0 0         if (body_is_required) {
1297             // Go the nginx route...
1298 0           err_code = 411;
1299 0           err = "Content-Length required\n";
1300             }
1301             else {
1302             // XXX TODO support requests that don't require a body
1303 0           err_code = 418;
1304 0           err = "Feersum doesn't know how to handle optional-body requests yet\n";
1305             }
1306              
1307             got_bad_request:
1308 0           respond_with_server_error(c, err, 0, err_code);
1309 0           return 0;
1310              
1311             got_cl:
1312 11           c->expected_cl = (ssize_t)expected;
1313 11           c->received_cl = SvCUR(c->rbuf);
1314             trace("expecting body %d size=%"Ssz_df" have=%"Ssz_df"\n",
1315             c->fd, (Ssz)c->expected_cl, (Ssz)c->received_cl);
1316 11 50         SvGROW(c->rbuf, c->expected_cl + 1);
    100          
1317              
1318             // don't have enough bytes to schedule immediately?
1319             // unlikely = optimize for short requests
1320 11 100         if (unlikely(c->expected_cl && c->received_cl < c->expected_cl)) {
    100          
1321             // TODO: schedule the callback immediately and support a non-blocking
1322             // ->read method.
1323             // sched_request_callback(c);
1324             // change_receiving_state(c, RECEIVE_STREAM);
1325 4           return 1;
1326             }
1327             // fallthrough: have enough bytes
1328             got_it_all:
1329 105           sched_request_callback(c);
1330 109           return 0;
1331             }
1332              
1333             static void
1334 328           conn_write_ready (struct feer_conn *c)
1335             {
1336 328 100         if (c->in_callback) return; // defer until out of callback
1337              
1338 126 100         if (c->write_ev_io.data == NULL) {
1339 103           ev_io_init(&c->write_ev_io, try_conn_write, c->fd, EV_WRITE);
1340 103           c->write_ev_io.data = (void *)c;
1341             }
1342              
1343             #if AUTOCORK_WRITES
1344 126           start_write_watcher(c);
1345             #else
1346             // attempt a non-blocking write immediately if we're not already
1347             // waiting for writability
1348             try_conn_write(feersum_ev_loop, &c->write_ev_io, EV_WRITE);
1349             #endif
1350             }
1351              
1352             static void
1353 5           respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len, int err_code)
1354             {
1355             SV *tmp;
1356              
1357 5 50         if (unlikely(c->responding != RESPOND_NOT_STARTED)) {
1358 0           trouble("Tried to send server error but already responding!");
1359 0           return;
1360             }
1361              
1362 5 50         if (!msg_len) msg_len = strlen(msg);
1363             assert(msg_len < INT_MAX);
1364              
1365 5           tmp = newSVpvf("HTTP/1.%d %d %s" CRLF
1366             "Content-Type: text/plain" CRLF
1367             "Connection: close" CRLF
1368             "Cache-Control: no-cache, no-store" CRLF
1369             "Content-Length: %"Ssz_df"" CRLFx2
1370             "%.*s",
1371 5           c->is_http11 ? 1 : 0,
1372             err_code, http_code_to_msg(err_code),
1373             (Ssz)msg_len,
1374             (int)msg_len, msg);
1375 5           add_sv_to_wbuf(c, sv_2mortal(tmp));
1376              
1377 5           stop_read_watcher(c);
1378 5           stop_read_timer(c);
1379 5           change_responding_state(c, RESPOND_SHUTDOWN);
1380 5           change_receiving_state(c, RECEIVE_SHUTDOWN);
1381 5           conn_write_ready(c);
1382             }
1383              
1384             INLINE_UNLESS_DEBUG bool
1385 131           str_eq(const char *a, int a_len, const char *b, int b_len)
1386             {
1387 131 100         if (a_len != b_len) return 0;
1388 109 50         if (a == b) return 1;
1389             int i;
1390 447 100         for (i=0; i
    50          
1391 338 50         if (a[i] != b[i]) return 0;
1392             }
1393 109           return 1;
1394             }
1395              
1396             /*
1397             * Compares two strings, assumes that the first string is already lower-cased
1398             */
1399             INLINE_UNLESS_DEBUG bool
1400 853           str_case_eq(const char *a, int a_len, const char *b, int b_len)
1401             {
1402 853 100         if (a_len != b_len) return 0;
1403 79 50         if (a == b) return 1;
1404             int i;
1405 605 100         for (i=0; i
    50          
1406 566 100         if (a[i] != tolower(b[i])) return 0;
1407             }
1408 39           return 1;
1409             }
1410              
1411             INLINE_UNLESS_DEBUG int
1412 32           hex_decode(const char ch)
1413             {
1414 32 100         if (likely('0' <= ch && ch <= '9'))
    100          
1415 26           return ch - '0';
1416 6 100         else if ('A' <= ch && ch <= 'F')
    100          
1417 2           return ch - 'A' + 10;
1418 4 100         else if ('a' <= ch && ch <= 'f')
    50          
1419 2           return ch - 'a' + 10;
1420 2           return -1;
1421             }
1422              
1423             static void
1424 99           uri_decode_sv (SV *sv)
1425             {
1426             STRLEN len;
1427             char *ptr, *end, *decoded;
1428              
1429 99 50         ptr = SvPV(sv, len);
1430 99           end = SvEND(sv);
1431              
1432             // quickly scan for % so we can ignore decoding that portion of the string
1433 452 100         while (ptr < end) {
1434 360 100         if (unlikely(*ptr == '%')) goto needs_decode;
1435 353           ptr++;
1436             }
1437 92           return;
1438              
1439             needs_decode:
1440              
1441             // Up until ptr have been "decoded" already by virtue of those chars not
1442             // being encoded.
1443 7           decoded = ptr;
1444              
1445 53 100         for (; ptr < end; ptr++) {
1446 46 100         if (unlikely(*ptr == '%') && likely(end - ptr >= 2)) {
    50          
1447 16           int c1 = hex_decode(ptr[1]);
1448 16           int c2 = hex_decode(ptr[2]);
1449 16 100         if (likely(c1 != -1 && c2 != -1)) {
    100          
1450 14           *decoded++ = (c1 << 4) + c2;
1451 14           ptr += 2;
1452 14           continue;
1453             }
1454             }
1455 32           *decoded++ = *ptr;
1456             }
1457              
1458 7           *decoded = '\0'; // play nice with C
1459              
1460 7 50         ptr = SvPV_nolen(sv);
1461 7           SvCUR_set(sv, decoded-ptr);
1462             }
1463              
1464             static void
1465 23           feersum_init_tmpl_env(pTHX)
1466             {
1467             HV *e;
1468 23           e = newHV();
1469              
1470             // constants
1471 23           hv_stores(e, "psgi.version", newRV((SV*)psgi_ver));
1472 23           hv_stores(e, "psgi.url_scheme", newSVpvs("http"));
1473 23           hv_stores(e, "psgi.run_once", &PL_sv_no);
1474 23           hv_stores(e, "psgi.nonblocking", &PL_sv_yes);
1475 23           hv_stores(e, "psgi.multithread", &PL_sv_no);
1476 23           hv_stores(e, "psgi.multiprocess", &PL_sv_no);
1477 23           hv_stores(e, "psgi.streaming", &PL_sv_yes);
1478 23           hv_stores(e, "psgi.errors", newRV((SV*)PL_stderrgv));
1479 23           hv_stores(e, "psgix.input.buffered", &PL_sv_yes);
1480 23           hv_stores(e, "psgix.output.buffered", &PL_sv_yes);
1481 23           hv_stores(e, "psgix.body.scalar_refs", &PL_sv_yes);
1482 23           hv_stores(e, "psgix.output.guard", &PL_sv_yes);
1483 23           hv_stores(e, "SCRIPT_NAME", newSVpvs(""));
1484              
1485             // placeholders that get defined for every request
1486 23           hv_stores(e, "SERVER_PROTOCOL", &PL_sv_undef);
1487 23           hv_stores(e, "SERVER_NAME", &PL_sv_undef);
1488 23           hv_stores(e, "SERVER_PORT", &PL_sv_undef);
1489 23           hv_stores(e, "REQUEST_URI", &PL_sv_undef);
1490 23           hv_stores(e, "REQUEST_METHOD", &PL_sv_undef);
1491 23           hv_stores(e, "PATH_INFO", &PL_sv_undef);
1492 23           hv_stores(e, "REMOTE_ADDR", &PL_sv_placeholder);
1493 23           hv_stores(e, "REMOTE_PORT", &PL_sv_placeholder);
1494              
1495             // defaults that get changed for some requests
1496 23           hv_stores(e, "psgi.input", &PL_sv_undef);
1497 23           hv_stores(e, "CONTENT_LENGTH", newSViv(0));
1498 23           hv_stores(e, "QUERY_STRING", newSVpvs(""));
1499              
1500             // anticipated headers
1501 23           hv_stores(e, "CONTENT_TYPE", &PL_sv_placeholder);
1502 23           hv_stores(e, "HTTP_HOST", &PL_sv_placeholder);
1503 23           hv_stores(e, "HTTP_USER_AGENT", &PL_sv_placeholder);
1504 23           hv_stores(e, "HTTP_ACCEPT", &PL_sv_placeholder);
1505 23           hv_stores(e, "HTTP_ACCEPT_LANGUAGE", &PL_sv_placeholder);
1506 23           hv_stores(e, "HTTP_ACCEPT_CHARSET", &PL_sv_placeholder);
1507 23           hv_stores(e, "HTTP_KEEP_ALIVE", &PL_sv_placeholder);
1508 23           hv_stores(e, "HTTP_CONNECTION", &PL_sv_placeholder);
1509 23           hv_stores(e, "HTTP_REFERER", &PL_sv_placeholder);
1510 23           hv_stores(e, "HTTP_COOKIE", &PL_sv_placeholder);
1511 23           hv_stores(e, "HTTP_IF_MODIFIED_SINCE", &PL_sv_placeholder);
1512 23           hv_stores(e, "HTTP_IF_NONE_MATCH", &PL_sv_placeholder);
1513 23           hv_stores(e, "HTTP_CACHE_CONTROL", &PL_sv_placeholder);
1514              
1515 23           hv_stores(e, "psgix.io", &PL_sv_placeholder);
1516              
1517 23           feersum_tmpl_env = e;
1518 23           }
1519              
1520             static HV*
1521 99           feersum_env(pTHX_ struct feer_conn *c)
1522             {
1523             HV *e;
1524             SV **hsv;
1525             int i,j;
1526 99           struct feer_req *r = c->req;
1527              
1528 99 100         if (unlikely(!feersum_tmpl_env))
1529 23           feersum_init_tmpl_env(aTHX);
1530 99           e = newHVhv(feersum_tmpl_env);
1531              
1532             trace("generating header (fd %d) %.*s\n",
1533             c->fd, (int)r->path_len, r->path);
1534              
1535 99           SV *path = newSVpvn(r->path, r->path_len);
1536 99           hv_stores(e, "SERVER_NAME", newSVsv(feer_server_name));
1537 99           hv_stores(e, "SERVER_PORT", newSVsv(feer_server_port));
1538 99           hv_stores(e, "REQUEST_URI", path);
1539 99           hv_stores(e, "REQUEST_METHOD", newSVpvn(r->method,r->method_len));
1540 99 100         hv_stores(e, "SERVER_PROTOCOL", (r->minor_version == 1) ?
1541             newSVsv(psgi_serv11) : newSVsv(psgi_serv10));
1542              
1543 99           SV *addr = &PL_sv_undef;
1544 99           SV *port = &PL_sv_undef;
1545             const char *str_addr;
1546             unsigned short s_port;
1547              
1548 99 50         if (c->sa->sa_family == AF_INET) {
1549 99           struct sockaddr_in *in = (struct sockaddr_in *)c->sa;
1550 99           addr = newSV(INET_ADDRSTRLEN);
1551 99           str_addr = inet_ntop(AF_INET,&in->sin_addr,SvPVX(addr),INET_ADDRSTRLEN);
1552 99           s_port = ntohs(in->sin_port);
1553             }
1554             #ifdef AF_INET6
1555 0 0         else if (c->sa->sa_family == AF_INET6) {
1556 0           struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)c->sa;
1557 0           addr = newSV(INET6_ADDRSTRLEN);
1558 0           str_addr = inet_ntop(AF_INET6,&in6->sin6_addr,SvPVX(addr),INET6_ADDRSTRLEN);
1559 0           s_port = ntohs(in6->sin6_port);
1560             }
1561             #endif
1562             #ifdef AF_UNIX
1563 0 0         else if (c->sa->sa_family == AF_UNIX) {
1564 0           str_addr = "unix";
1565 0           addr = newSV(sizeof(str_addr));
1566 0           memcpy(SvPVX(addr), str_addr, sizeof(str_addr));
1567 0           s_port = 0;
1568             }
1569             #endif
1570              
1571 99 50         if (likely(str_addr)) {
1572 99           SvCUR(addr) = strlen(SvPVX(addr));
1573 99           SvPOK_on(addr);
1574 99           port = newSViv(s_port);
1575             }
1576 99           hv_stores(e, "REMOTE_ADDR", addr);
1577 99           hv_stores(e, "REMOTE_PORT", port);
1578              
1579 99 100         if (unlikely(c->expected_cl > 0)) {
1580 9           hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
1581 9           hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c,0));
1582             }
1583 90           else if (request_cb_is_psgi) {
1584             // TODO: make psgi.input a valid, but always empty stream for PSGI mode?
1585             }
1586              
1587 99 100         if (request_cb_is_psgi) {
1588 73           SV *fake_fh = newSViv(c->fd); // just some random dummy value
1589 73           SV *selfref = sv_2mortal(feer_conn_2sv(c));
1590 73           sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
1591 73           hv_stores(e, "psgix.io", fake_fh);
1592             }
1593              
1594             {
1595 99           const char *qpos = r->path;
1596             SV *pinfo, *qstr;
1597              
1598             // rather than memchr, for speed:
1599 526 100         while (*qpos != '?' && qpos < r->path + r->path_len)
    100          
1600 427           qpos++;
1601              
1602 99 100         if (*qpos == '?') {
1603 16           pinfo = newSVpvn(r->path, (qpos - r->path));
1604 16           qpos++;
1605 16           qstr = newSVpvn(qpos, r->path_len - (qpos - r->path));
1606             }
1607             else {
1608 83           pinfo = newSVsv(path);
1609 83           qstr = NULL; // use template default
1610             }
1611 99           uri_decode_sv(pinfo);
1612 99           hv_stores(e, "PATH_INFO", pinfo);
1613 99 100         if (qstr != NULL) // hv template defaults QUERY_STRING to empty
1614 16           hv_stores(e, "QUERY_STRING", qstr);
1615             }
1616              
1617 99           SV *val = NULL;
1618             char *kbuf;
1619 99           size_t kbuflen = 64;
1620 99           Newx(kbuf, kbuflen, char);
1621 99           kbuf[0]='H'; kbuf[1]='T'; kbuf[2]='T'; kbuf[3]='P'; kbuf[4]='_';
1622              
1623 446 100         for (i=0; inum_headers; i++) {
1624 347           struct phr_header *hdr = &(r->headers[i]);
1625 347 50         if (unlikely(hdr->name == NULL && val != NULL)) {
    0          
1626             trace("... multiline %.*s\n", (int)hdr->value_len, hdr->value);
1627 0           sv_catpvn(val, hdr->value, hdr->value_len);
1628 0           continue;
1629             }
1630 347 100         else if (unlikely(str_case_eq(
1631             STR_WITH_LEN("content-length"), hdr->name, hdr->name_len)))
1632             {
1633             // content length shouldn't show up as HTTP_CONTENT_LENGTH but
1634             // as CONTENT_LENGTH in the env-hash.
1635 10           continue;
1636             }
1637 337 100         else if (unlikely(str_case_eq(
1638             STR_WITH_LEN("content-type"), hdr->name, hdr->name_len)))
1639             {
1640 10           hv_stores(e, "CONTENT_TYPE",newSVpvn(hdr->value, hdr->value_len));
1641 10           continue;
1642             }
1643              
1644 327           size_t klen = 5+hdr->name_len;
1645 327 50         if (kbuflen < klen) {
1646 0           kbuflen = klen;
1647 0           kbuf = Renew(kbuf, kbuflen, char);
1648             }
1649 327           char *key = kbuf + 5;
1650 2994 100         for (j=0; jname_len; j++) {
1651 2667           char n = hdr->name[j];
1652 2667 100         *key++ = (n == '-') ? '_' : toupper(n);
1653             }
1654              
1655 327           SV **val = hv_fetch(e, kbuf, klen, 1);
1656             trace("adding header to env (fd %d) %.*s: %.*s\n",
1657             c->fd, (int)klen, kbuf, (int)hdr->value_len, hdr->value);
1658              
1659             assert(val != NULL); // "fetch is store" flag should ensure this
1660 327 100         if (unlikely(SvPOK(*val))) {
1661             trace("... is multivalue\n");
1662             // extend header with comma
1663 1           sv_catpvn(*val, ", ", 2);
1664 1           sv_catpvn(*val, hdr->value, hdr->value_len);
1665             }
1666             else {
1667             // change from undef to a real value
1668 326           sv_setpvn(*val, hdr->value, hdr->value_len);
1669             }
1670             }
1671 99           Safefree(kbuf);
1672              
1673 99           return e;
1674             }
1675              
1676             static void
1677 98           feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
1678             int streaming)
1679             {
1680             const char *ptr;
1681             I32 i;
1682              
1683             trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
1684              
1685 98 50         if (unlikely(c->responding != RESPOND_NOT_STARTED))
1686 0           croak("already responding?!");
1687 98 100         change_responding_state(c, streaming ? RESPOND_STREAMING : RESPOND_NORMAL);
1688              
1689 98 50         if (unlikely(!SvOK(message) || !(SvIOK(message) || SvPOK(message)))) {
    0          
    0          
    0          
    50          
    100          
    50          
    50          
1690 0           croak("Must define an HTTP status code or message");
1691             }
1692              
1693 98           I32 avl = av_len(headers);
1694 98 50         if (unlikely(avl+1 % 2 == 1)) {
1695 0           croak("expected even-length array, got %d", avl+1);
1696             }
1697              
1698             // int or 3 chars? use a stock message
1699 98           UV code = 0;
1700 98 100         if (SvIOK(message))
1701 77 50         code = SvIV(message);
1702 21 50         else if (SvUOK(message))
1703 0 0         code = SvUV(message);
1704             else {
1705 21           const int numtype = grok_number(SvPVX_const(message),3,&code);
1706 21 50         if (unlikely(numtype != IS_NUMBER_IN_UV))
1707 0           code = 0;
1708             }
1709             trace2("starting response fd=%d code=%"UVuf"\n",c->fd,code);
1710              
1711 98 50         if (unlikely(!code))
1712 0           croak("first parameter is not a number or doesn't start with digits");
1713              
1714             // for PSGI it's always just an IV so optimize for that
1715 98 100         if (likely(!SvPOK(message) || SvCUR(message) == 3)) {
    100          
1716 78           ptr = http_code_to_msg(code);
1717 78           message = sv_2mortal(newSVpvf("%"UVuf" %s",code,ptr));
1718             }
1719              
1720             // don't generate or strip Content-Length headers for 304 or 1xx
1721 98 100         c->auto_cl = (code == 304 || (100 <= code && code <= 199)) ? 0 : 1;
    50          
    50          
1722              
1723 98 100         add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
1724 98           add_sv_to_wbuf(c, message);
1725 98           add_crlf_to_wbuf(c);
1726              
1727 237 100         for (i=0; i
1728 139           SV **hdr = av_fetch(headers, i, 0);
1729 139 50         if (unlikely(!hdr || !SvOK(*hdr))) {
    50          
    0          
    0          
    0          
    50          
1730             trace("skipping undef header key");
1731 8           continue;
1732             }
1733              
1734 139           SV **val = av_fetch(headers, i+1, 0);
1735 139 50         if (unlikely(!val || !SvOK(*val))) {
    50          
    0          
    0          
    0          
    50          
1736             trace("skipping undef header value");
1737 0           continue;
1738             }
1739              
1740             STRLEN hlen;
1741 139 50         const char *hp = SvPV(*hdr, hlen);
1742 277           if (likely(c->auto_cl) &&
1743 138           unlikely(str_case_eq("content-length",14,hp,hlen)))
1744             {
1745             trace("ignoring content-length header in the response\n");
1746 8           continue;
1747             }
1748              
1749 131           add_sv_to_wbuf(c, *hdr);
1750 131           add_const_to_wbuf(c, ": ", 2);
1751 131           add_sv_to_wbuf(c, *val);
1752 131           add_crlf_to_wbuf(c);
1753             }
1754              
1755 98 100         if (streaming) {
1756 28 100         if (c->is_http11)
1757 24           add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
1758             else
1759 4           add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
1760             }
1761              
1762 98           conn_write_ready(c);
1763 98           }
1764              
1765             static size_t
1766 70           feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
1767             {
1768             size_t RETVAL;
1769             int i;
1770 70           bool body_is_string = 0;
1771             STRLEN cur;
1772              
1773 70 50         if (c->responding != RESPOND_NORMAL)
1774 0           croak("can't use write_whole_body when in streaming mode");
1775              
1776 70 50         if (!SvOK(body)) {
    0          
    0          
1777 0           body = sv_2mortal(newSVpvs(""));
1778 0           body_is_string = 1;
1779             }
1780 70 100         else if (SvROK(body)) {
1781 67           SV *refd = SvRV(body);
1782 67 100         if (SvOK(refd) && !SvROK(refd)) {
    50          
    50          
    50          
1783 2           body = refd;
1784 2           body_is_string = 1;
1785             }
1786 65 50         else if (SvTYPE(refd) != SVt_PVAV) {
1787 67           croak("body must be a scalar, scalar reference or array reference");
1788             }
1789             }
1790             else {
1791 3           body_is_string = 1;
1792             }
1793              
1794             SV *cl_sv; // content-length future
1795             struct iovec *cl_iov;
1796 70 100         if (likely(c->auto_cl))
1797 67           add_placeholder_to_wbuf(c, &cl_sv, &cl_iov);
1798             else
1799 3           add_crlf_to_wbuf(c);
1800              
1801 70 100         if (body_is_string) {
1802 5           cur = add_sv_to_wbuf(c,body);
1803 5           RETVAL = cur;
1804             }
1805             else {
1806 65           AV *abody = (AV*)SvRV(body);
1807 65           I32 amax = av_len(abody);
1808 65           RETVAL = 0;
1809 156 100         for (i=0; i<=amax; i++) {
1810 91           SV *sv = fetch_av_normal(aTHX_ abody, i);
1811 91 100         if (unlikely(!sv)) continue;
1812 90           cur = add_sv_to_wbuf(c,sv);
1813             trace("body part i=%d sv=%p cur=%"Sz_uf"\n", i, sv, (Sz)cur);
1814 90           RETVAL += cur;
1815             }
1816             }
1817              
1818 70 100         if (likely(c->auto_cl)) {
1819 67           sv_setpvf(cl_sv, "Content-Length: %"Sz_uf"" CRLFx2, (Sz)RETVAL);
1820 67           update_wbuf_placeholder(c, cl_sv, cl_iov);
1821             }
1822              
1823 70           change_responding_state(c, RESPOND_SHUTDOWN);
1824 70           conn_write_ready(c);
1825 70           return RETVAL;
1826             }
1827              
1828             static void
1829 17           feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
1830             {
1831 17           dSP;
1832 17           ENTER;
1833 17           SAVETMPS;
1834 17 50         PUSHMARK(SP);
1835 17 50         mXPUSHs(feer_conn_2sv(c));
1836 17 50         XPUSHs(streamer);
1837 17           PUTBACK;
1838 17           call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
1839 17           SPAGAIN;
1840 17 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
1841 0           call_died(aTHX_ c, "PSGI stream initiator");
1842             }
1843 17           PUTBACK;
1844 17 50         FREETMPS;
1845 17           LEAVE;
1846 17           }
1847              
1848             static void
1849 78           feersum_handle_psgi_response(
1850             pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
1851             {
1852 78 50         if (unlikely(!SvOK(ret) || !SvROK(ret))) {
    0          
    0          
    0          
    50          
    50          
1853 0 0         sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
1854 0           call_died(aTHX_ c, "PSGI request");
1855 0           return;
1856             }
1857              
1858 78 50         if (SvOK(ret) && unlikely(!IsArrayRef(ret))) {
    0          
    0          
    50          
    100          
1859 17 50         if (likely(can_recurse)) {
1860             trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
1861 17           feersum_start_psgi_streaming(aTHX_ c, ret);
1862             }
1863             else {
1864 0 0         sv_setpvs(ERRSV, "PSGI attempt to recurse in a streaming callback");
1865 0           call_died(aTHX_ c, "PSGI request");
1866             }
1867 17           return;
1868             }
1869              
1870 61           AV *psgi_triplet = (AV*)SvRV(ret);
1871 61 50         if (unlikely(av_len(psgi_triplet)+1 != 3)) {
1872 0 0         sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
1873 0           call_died(aTHX_ c, "PSGI request");
1874 0           return;
1875             }
1876              
1877             trace("PSGI response triplet, c=%p av=%p\n", c, psgi_triplet);
1878             // we know there's three elems so *should* be safe to de-ref
1879 61           SV *msg = *(av_fetch(psgi_triplet,0,0));
1880 61           SV *hdrs = *(av_fetch(psgi_triplet,1,0));
1881 61           SV *body = *(av_fetch(psgi_triplet,2,0));
1882              
1883             AV *headers;
1884 61 50         if (IsArrayRef(hdrs))
    50          
1885 61           headers = (AV*)SvRV(hdrs);
1886             else {
1887 0 0         sv_setpvs(ERRSV, "PSGI Headers must be an array-ref");
1888 0           call_died(aTHX_ c, "PSGI request");
1889 0           return;
1890             }
1891              
1892 61 50         if (likely(IsArrayRef(body))) {
    100          
1893 48           feersum_start_response(aTHX_ c, msg, headers, 0);
1894 48           feersum_write_whole_body(aTHX_ c, body);
1895             }
1896 13 50         else if (likely(SvROK(body))) { // probaby an IO::Handle-like object
1897 13           feersum_start_response(aTHX_ c, msg, headers, 1);
1898 13           c->poll_write_cb = newSVsv(body);
1899 13           c->poll_write_cb_is_io_handle = 1;
1900 13           conn_write_ready(c);
1901             }
1902             else {
1903 0 0         sv_setpvs(ERRSV, "Expected PSGI array-ref or IO::Handle-like body");
1904 0           call_died(aTHX_ c, "PSGI request");
1905 0           return;
1906             }
1907             }
1908              
1909             static int
1910 23           feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
1911             {
1912             int RETVAL;
1913 23 100         if (is_writer) {
1914             trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
1915 19 50         if (c->poll_write_cb) {
1916 0           SvREFCNT_dec(c->poll_write_cb);
1917 0           c->poll_write_cb = NULL;
1918             }
1919 19 100         if (c->responding < RESPOND_SHUTDOWN) {
1920 15           finish_wbuf(c);
1921 15           conn_write_ready(c);
1922 15           change_responding_state(c, RESPOND_SHUTDOWN);
1923             }
1924 19           RETVAL = 1;
1925             }
1926             else {
1927             trace("close reader fd=%d, c=%p\n", c->fd, c);
1928             // TODO: ref-dec poll_read_cb
1929 4 100         if (c->rbuf) {
1930 1           SvREFCNT_dec(c->rbuf);
1931 1           c->rbuf = NULL;
1932             }
1933 4           RETVAL = shutdown(c->fd, SHUT_RD);
1934 4           change_receiving_state(c, RECEIVE_SHUTDOWN);
1935             }
1936              
1937             // disassociate the handle from the conn
1938 23           SvREFCNT_dec(c->self);
1939 23           return RETVAL;
1940             }
1941              
1942             static SV*
1943 6           feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard)
1944             {
1945 6 100         if (guard) {
1946 4 100         if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
1947 4 50         c->ext_guard = SvOK(guard) ? newSVsv(guard) : NULL;
    0          
    0          
1948             }
1949 6 50         return c->ext_guard ? newSVsv(c->ext_guard) : &PL_sv_undef;
1950             }
1951              
1952             static void
1953 2           call_died (pTHX_ struct feer_conn *c, const char *cb_type)
1954             {
1955 2           dSP;
1956             #if DEBUG >= 1
1957             trace("An error was thrown in the %s callback: %-p\n",cb_type,ERRSV);
1958             #endif
1959 2 50         PUSHMARK(SP);
1960 2 50         mXPUSHs(newSVsv(ERRSV));
    50          
1961 2           PUTBACK;
1962 2           call_pv("Feersum::DIED", G_DISCARD|G_EVAL|G_VOID|G_KEEPERR);
1963 2           SPAGAIN;
1964              
1965 2           respond_with_server_error(c,"Request handler exception.\n",0,500);
1966 2 50         sv_setsv(ERRSV, &PL_sv_undef);
1967 2           }
1968              
1969             static void
1970 108           call_request_callback (struct feer_conn *c)
1971             {
1972             dTHX;
1973 108           dSP;
1974             int flags;
1975 108           c->in_callback++;
1976 108           SvREFCNT_inc_void_NN(c->self);
1977              
1978             trace("request callback c=%p\n", c);
1979              
1980 108           ENTER;
1981 108           SAVETMPS;
1982 108 50         PUSHMARK(SP);
1983              
1984 108 100         if (request_cb_is_psgi) {
1985 73           HV *env = feersum_env(aTHX_ c);
1986 73 50         mXPUSHs(newRV_noinc((SV*)env));
1987 73           flags = G_EVAL|G_SCALAR;
1988             }
1989             else {
1990 35 50         mXPUSHs(feer_conn_2sv(c));
1991 35           flags = G_DISCARD|G_EVAL|G_VOID;
1992             }
1993              
1994 108           PUTBACK;
1995 108           int returned = call_sv(request_cb_cv, flags);
1996 108           SPAGAIN;
1997              
1998             trace("called request callback, errsv? %d\n", SvTRUE(ERRSV) ? 1 : 0);
1999              
2000 108 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
2001 2           call_died(aTHX_ c, "request");
2002 2           returned = 0; // pretend nothing got returned
2003             }
2004              
2005             SV *psgi_response;
2006 108 100         if (request_cb_is_psgi && likely(returned >= 1)) {
    100          
2007 72           psgi_response = POPs;
2008 72           SvREFCNT_inc_void_NN(psgi_response);
2009             }
2010              
2011             trace("leaving request callback\n");
2012 108           PUTBACK;
2013              
2014 108 100         if (request_cb_is_psgi && likely(returned >= 1)) {
    100          
2015 72           feersum_handle_psgi_response(aTHX_ c, psgi_response, 1); // can_recurse
2016 72           SvREFCNT_dec(psgi_response);
2017             }
2018              
2019             //fangyousong
2020 108 100         if (request_cb_is_psgi && c->expected_cl > 0) {
    100          
2021 3           SvREFCNT_dec(c->self);
2022             }
2023              
2024              
2025 108           c->in_callback--;
2026 108           SvREFCNT_dec(c->self);
2027              
2028 108 50         FREETMPS;
2029 108           LEAVE;
2030 108           }
2031              
2032             static void
2033 4           call_poll_callback (struct feer_conn *c, bool is_write)
2034             {
2035             dTHX;
2036 4           dSP;
2037              
2038 4 50         SV *cb = (is_write) ? c->poll_write_cb : NULL;
2039              
2040 4 50         if (unlikely(cb == NULL)) return;
2041              
2042 4           c->in_callback++;
2043              
2044             trace("%s poll callback c=%p cbrv=%p\n",
2045             is_write ? "write" : "read", c, cb);
2046              
2047 4           ENTER;
2048 4           SAVETMPS;
2049 4 50         PUSHMARK(SP);
2050 4 50         mXPUSHs(new_feer_conn_handle(aTHX_ c, is_write));
2051 4           PUTBACK;
2052 4           call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
2053 4           SPAGAIN;
2054              
2055             trace("called %s poll callback, errsv? %d\n",
2056             is_write ? "write" : "read", SvTRUE(ERRSV) ? 1 : 0);
2057              
2058 4 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
2059 0 0         call_died(aTHX_ c, is_write ? "write poll" : "read poll");
2060             }
2061              
2062             trace("leaving %s poll callback\n", is_write ? "write" : "read");
2063 4           PUTBACK;
2064 4 50         FREETMPS;
2065 4           LEAVE;
2066              
2067 4           c->in_callback--;
2068             }
2069              
2070             static void
2071 38           pump_io_handle (struct feer_conn *c, SV *io)
2072             {
2073             dTHX;
2074 38           dSP;
2075              
2076 38 50         if (unlikely(io == NULL)) return;
2077              
2078 38           c->in_callback++;
2079              
2080             trace("pump io handle %d\n", c->fd);
2081              
2082 38           ENTER;
2083 38           SAVETMPS;
2084              
2085             // Emulate `local $/ = \4096;`
2086 38           SV *old_rs = PL_rs;
2087 38           PL_rs = sv_2mortal(newRV_noinc(newSViv(4096)));
2088 38           sv_setsv(get_sv("/", GV_ADD), PL_rs);
2089              
2090 38 50         PUSHMARK(SP);
2091 38 50         XPUSHs(c->poll_write_cb);
2092 38           PUTBACK;
2093 38           int returned = call_method("getline", G_SCALAR|G_EVAL);
2094 38           SPAGAIN;
2095              
2096             trace("called getline on io handle fd=%d errsv=%d returned=%d\n",
2097             c->fd, SvTRUE(ERRSV) ? 1 : 0, returned);
2098              
2099 38 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
2100 0           call_died(aTHX_ c, "getline on io handle");
2101 0           goto done_pump_io;
2102             }
2103              
2104 38           SV *ret = NULL;
2105 38 50         if (returned > 0)
2106 38           ret = POPs;
2107 38 50         if (ret && SvMAGICAL(ret))
    50          
2108 0           ret = sv_2mortal(newSVsv(ret));
2109              
2110 38 50         if (unlikely(!ret || !SvOK(ret))) {
    100          
    50          
    50          
    50          
    100          
2111             // returned undef, so call the close method out of niceity
2112 13 50         PUSHMARK(SP);
2113 13 50         XPUSHs(c->poll_write_cb);
2114 13           PUTBACK;
2115 13           call_method("close", G_VOID|G_DISCARD|G_EVAL);
2116 13           SPAGAIN;
2117              
2118 13 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
2119 0 0         trouble("Couldn't close body IO handle: %-p",ERRSV);
2120             }
2121              
2122 13           SvREFCNT_dec(c->poll_write_cb);
2123 13           c->poll_write_cb = NULL;
2124 13           finish_wbuf(c);
2125 13           change_responding_state(c, RESPOND_SHUTDOWN);
2126              
2127 13           goto done_pump_io;
2128             }
2129              
2130 25 50         if (c->is_http11)
2131 25           add_chunk_sv_to_wbuf(c, ret);
2132             else
2133 0           add_sv_to_wbuf(c, ret);
2134              
2135             done_pump_io:
2136             trace("leaving pump io handle %d\n", c->fd);
2137              
2138 38           PUTBACK;
2139 38 50         FREETMPS;
2140 38           LEAVE;
2141              
2142 38           PL_rs = old_rs;
2143 38           sv_setsv(get_sv("/", GV_ADD), old_rs);
2144              
2145 38           c->in_callback--;
2146             }
2147              
2148             static int
2149 8           psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
2150             {
2151 8           dSP;
2152              
2153 8           struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
2154             trace("invoking psgix.io magic for fd=%d\n", c->fd);
2155              
2156 8           sv_unmagic(sv, PERL_MAGIC_ext);
2157              
2158 8           ENTER;
2159 8           SAVETMPS;
2160              
2161 8 50         PUSHMARK(SP);
2162 8 50         XPUSHs(sv);
2163 8 50         mXPUSHs(newSViv(c->fd));
2164 8           PUTBACK;
2165              
2166 8           call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
2167 8           SPAGAIN;
2168              
2169 8 50         if (unlikely(SvTRUE(ERRSV))) {
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
2170 0           call_died(aTHX_ c, "psgix.io magic");
2171             }
2172             else {
2173 8           SV *io_glob = SvRV(sv);
2174 8           GvSV(io_glob) = newRV_inc(c->self);
2175              
2176             // Put whatever remainder data into the socket buffer.
2177             // Optimizes for the websocket case.
2178             //
2179             // TODO: For keepalive support the opposite operation is required;
2180             // pull the data out of the socket buffer and back into feersum.
2181 8 50         if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
    50          
    0          
    0          
    0          
    50          
    50          
    50          
2182             STRLEN rbuf_len;
2183 8 50         const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
2184 8           IO *io = GvIOp(io_glob);
2185             assert(io != NULL);
2186 8           PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
2187 8           sv_setpvs(c->rbuf, "");
2188             }
2189              
2190 8           stop_read_watcher(c);
2191 8           stop_read_timer(c);
2192             // don't stop write watcher in case there's outstanding data.
2193             }
2194              
2195 8           PUTBACK;
2196 8 50         FREETMPS;
2197 8           LEAVE;
2198 8           return 0;
2199             }
2200              
2201             MODULE = Feersum PACKAGE = Feersum
2202              
2203             PROTOTYPES: ENABLE
2204              
2205             void
2206             set_server_name_and_port(SV *self, SV *name, SV *port)
2207             PPCODE:
2208             {
2209 29 50         if (feer_server_name)
2210 0           SvREFCNT_dec(feer_server_name);
2211 29           feer_server_name = newSVsv(name);
2212 29           SvREADONLY_on(feer_server_name);
2213              
2214 29 50         if (feer_server_port)
2215 0           SvREFCNT_dec(feer_server_port);
2216 29           feer_server_port = newSVsv(port);
2217 29           SvREADONLY_on(feer_server_port);
2218             }
2219              
2220             void
2221             accept_on_fd(SV *self, int fd)
2222             PPCODE:
2223             {
2224             trace("going to accept on %d\n",fd);
2225 29           feersum_ev_loop = EV_DEFAULT;
2226              
2227 29           signal(SIGPIPE, SIG_IGN);
2228              
2229 29           ev_prepare_init(&ep, prepare_cb);
2230 29           ev_prepare_start(feersum_ev_loop, &ep);
2231              
2232 29           ev_check_init(&ec, check_cb);
2233 29           ev_check_start(feersum_ev_loop, &ec);
2234              
2235 29           ev_idle_init(&ei, idle_cb);
2236              
2237 29           ev_io_init(&accept_w, accept_cb, fd, EV_READ);
2238             }
2239              
2240             void
2241             unlisten (SV *self)
2242             PPCODE:
2243             {
2244             trace("stopping accept\n");
2245 2           ev_prepare_stop(feersum_ev_loop, &ep);
2246 2           ev_check_stop(feersum_ev_loop, &ec);
2247 2           ev_idle_stop(feersum_ev_loop, &ei);
2248 2           ev_io_stop(feersum_ev_loop, &accept_w);
2249             }
2250              
2251             void
2252             request_handler(SV *self, SV *cb)
2253             PROTOTYPE: $&
2254             ALIAS:
2255             psgi_request_handler = 1
2256             PPCODE:
2257             {
2258 37 50         if (unlikely(!SvOK(cb) || !SvROK(cb)))
    0          
    0          
    0          
    50          
    50          
2259 0           croak("can't supply an undef handler");
2260 37 100         if (request_cb_cv)
2261 8           SvREFCNT_dec(request_cb_cv);
2262 37           request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
2263 37           request_cb_is_psgi = ix;
2264             trace("assigned %s request handler %p\n",
2265             request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
2266             }
2267              
2268             void
2269             graceful_shutdown (SV *self, SV *cb)
2270             PROTOTYPE: $&
2271             PPCODE:
2272             {
2273 10 50         if (!IsCodeRef(cb))
    50          
2274 0           croak("must supply a code reference");
2275 10 50         if (unlikely(shutting_down))
2276 0           croak("already shutting down");
2277 10           shutdown_cb_cv = newSVsv(cb);
2278             trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);
2279              
2280 10           shutting_down = 1;
2281 10           ev_io_stop(feersum_ev_loop, &accept_w);
2282 10           close(accept_w.fd);
2283              
2284 10 100         if (active_conns <= 0) {
2285             trace("shutdown is immediate\n");
2286 9           dSP;
2287 9           ENTER;
2288 9           SAVETMPS;
2289 9 50         PUSHMARK(SP);
2290 9           call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2291 1           PUTBACK;
2292             trace3("called shutdown handler\n");
2293 1           SvREFCNT_dec(shutdown_cb_cv);
2294 1           shutdown_cb_cv = NULL;
2295 1 50         FREETMPS;
2296 1           LEAVE;
2297             }
2298             }
2299              
2300             double
2301             read_timeout (SV *self, ...)
2302             PROTOTYPE: $;$
2303             CODE:
2304             {
2305 13 100         if (items <= 1) {
2306 7           RETVAL = read_timeout;
2307             }
2308 6 50         else if (items == 2) {
2309 6           SV *duration = ST(1);
2310 6 100         NV new_read_timeout = SvNV(duration);
2311 6 100         if (!(new_read_timeout > 0.0)) {
2312 3           croak("must set a positive (non-zero) value for the timeout");
2313             }
2314 3           read_timeout = (double) new_read_timeout;
2315             }
2316             }
2317             OUTPUT:
2318             RETVAL
2319              
2320             void
2321             DESTROY (SV *self)
2322             PPCODE:
2323             {
2324             trace3("DESTROY server\n");
2325 29 50         if (request_cb_cv)
2326 29           SvREFCNT_dec(request_cb_cv);
2327             }
2328              
2329             MODULE = Feersum PACKAGE = Feersum::Connection::Handle
2330              
2331             PROTOTYPES: ENABLE
2332              
2333             int
2334             fileno (feer_conn_handle *hdl)
2335             CODE:
2336 0           RETVAL = c->fd;
2337             OUTPUT:
2338             RETVAL
2339              
2340             void
2341             DESTROY (SV *self)
2342             ALIAS:
2343             Feersum::Connection::Reader::DESTROY = 1
2344             Feersum::Connection::Writer::DESTROY = 2
2345             PPCODE:
2346             {
2347 28           feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);
2348              
2349 28 100         if (hdl == NULL) {
2350             trace3("DESTROY handle (closed) class=%s\n",
2351             HvNAME(SvSTASH(SvRV(self))));
2352             }
2353             else {
2354 11           struct feer_conn *c = (struct feer_conn *)hdl;
2355             trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
2356             HvNAME(SvSTASH(SvRV(self))));
2357 11 100         if (ix == 2) // only close the writer on destruction
2358 6           feersum_close_handle(aTHX_ c, 1);
2359             }
2360             }
2361              
2362             SV*
2363             read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
2364             PROTOTYPE: $$$;$
2365             PPCODE:
2366             {
2367 13           STRLEN buf_len = 0, src_len = 0;
2368             ssize_t offset;
2369             char *buf_ptr, *src_ptr;
2370              
2371             // optimizes for the "read everything" case.
2372              
2373 13 100         if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
    50          
    0          
    0          
    50          
2374 4 50         offset = SvIV(ST(3));
2375             else
2376 9           offset = 0;
2377              
2378             trace("read fd=%d : request len=%"Sz_uf" off=%"Ssz_df"\n",
2379             c->fd, (Sz)len, (Ssz)offset);
2380              
2381 13 50         if (unlikely(c->receiving <= RECEIVE_HEADERS))
2382             // XXX as of 0.984 this is dead code
2383 0           croak("can't call read() until the body begins to arrive");
2384              
2385 13 100         if (!SvOK(buf) || !SvPOK(buf)) {
    50          
    50          
    50          
2386             // force to a PV and ensure buffer space
2387 6           sv_setpvn(buf,"",0);
2388 6 50         SvGROW(buf, len+1);
    100          
2389             }
2390              
2391 13 50         if (unlikely(SvREADONLY(buf)))
2392 0           croak("buffer must not be read-only");
2393              
2394 13 50         if (unlikely(len == 0))
2395 0           XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string
2396              
2397 13 50         buf_ptr = SvPV(buf, buf_len);
2398 13 100         if (likely(c->rbuf))
2399 13 50         src_ptr = SvPV(c->rbuf, src_len);
2400              
2401             if (unlikely(len < 0))
2402             len = src_len;
2403              
2404 13 100         if (unlikely(offset < 0))
2405 1 50         offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;
2406              
2407 13 100         if (unlikely(len + offset > src_len))
2408 4           len = src_len - offset;
2409              
2410             trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
2411             c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);
2412              
2413 13 100         if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
    50          
    100          
    50          
2414             trace2("rbuf empty during read %d\n", c->fd);
2415 2 50         if (c->receiving == RECEIVE_SHUTDOWN) {
2416 2           XSRETURN_IV(0);
2417             }
2418             else {
2419 0           errno = EAGAIN;
2420 0           XSRETURN_UNDEF;
2421             }
2422             }
2423              
2424 11 100         if (likely(len == src_len && offset == 0)) {
    50          
2425             trace2("appending entire rbuf fd=%d\n", c->fd);
2426 6           sv_2mortal(c->rbuf); // allow pv to be stolen
2427 6 100         if (likely(buf_len == 0)) {
2428 4           sv_setsv(buf, c->rbuf);
2429             }
2430             else {
2431 2           sv_catsv(buf, c->rbuf);
2432             }
2433 6           c->rbuf = NULL;
2434             }
2435             else {
2436 5           src_ptr += offset;
2437             trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
2438             c->fd, len, offset, src_ptr);
2439 5 50         SvGROW(buf, SvCUR(buf) + len);
    50          
2440 5           sv_catpvn(buf, src_ptr, len);
2441 5 100         if (likely(items == 3)) {
2442             // there wasn't an offset param, throw away beginning
2443 3           sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
2444             }
2445             }
2446              
2447 13           XSRETURN_IV(len);
2448             }
2449              
2450             STRLEN
2451             write (feer_conn_handle *hdl, ...)
2452             PROTOTYPE: $;$
2453             CODE:
2454             {
2455 24 50         if (unlikely(c->responding != RESPOND_STREAMING))
2456 0           croak("can only call write in streaming mode");
2457              
2458 24 50         SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
2459 24 50         if (unlikely(!body || !SvOK(body)))
    50          
    0          
    0          
    0          
    50          
2460 0           XSRETURN_IV(0);
2461              
2462             trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
2463 24 50         if (SvROK(body)) {
2464 0           SV *refd = SvRV(body);
2465 0 0         if (SvOK(refd) && SvPOK(refd)) {
    0          
    0          
    0          
2466 0           body = refd;
2467             }
2468             else {
2469 0           croak("body must be a scalar, scalar ref or undef");
2470             }
2471             }
2472 24 50         (void)SvPV(body, RETVAL);
2473              
2474 24 100         if (c->is_http11)
2475 15           add_chunk_sv_to_wbuf(c, body);
2476             else
2477 9           add_sv_to_wbuf(c, body);
2478              
2479 24           conn_write_ready(c);
2480             }
2481             OUTPUT:
2482             RETVAL
2483              
2484             void
2485             write_array (feer_conn_handle *hdl, AV *abody)
2486             PROTOTYPE: $$
2487             PPCODE:
2488             {
2489 2 50         if (unlikely(c->responding != RESPOND_STREAMING))
2490 0           croak("can only call write in streaming mode");
2491              
2492             trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);
2493              
2494 2           I32 amax = av_len(abody);
2495             int i;
2496 2 50         if (c->is_http11) {
2497 12 100         for (i=0; i<=amax; i++) {
2498 10           SV *sv = fetch_av_normal(aTHX_ abody, i);
2499 10 100         if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
2500             }
2501             }
2502             else {
2503 0 0         for (i=0; i<=amax; i++) {
2504 0           SV *sv = fetch_av_normal(aTHX_ abody, i);
2505 0 0         if (likely(sv)) add_sv_to_wbuf(c, sv);
2506             }
2507             }
2508              
2509 2           conn_write_ready(c);
2510             }
2511              
2512             int
2513             seek (feer_conn_handle *hdl, ssize_t offset, ...)
2514             PROTOTYPE: $$;$
2515             CODE:
2516             {
2517 9           int whence = SEEK_CUR;
2518 9 50         if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
    50          
    0          
    0          
    50          
2519 9 50         whence = SvIV(ST(2));
2520              
2521             trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);
2522              
2523 9 100         if (unlikely(!c->rbuf)) {
2524             // handle is effectively "closed"
2525 1           RETVAL = 0;
2526             }
2527 8 100         else if (offset == 0) {
2528 2           RETVAL = 1; // stay put for any whence
2529             }
2530 8 100         else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
    100          
    50          
2531             STRLEN len;
2532 2 50         const char *str = SvPV_const(c->rbuf, len);
2533 2 50         if (offset > len)
2534 0           offset = len;
2535 2           sv_chop(c->rbuf, str + offset);
2536 2           RETVAL = 1;
2537             }
2538 6 50         else if (offset < 0 && whence == SEEK_END) {
    100          
2539             STRLEN len;
2540 2 50         const char *str = SvPV_const(c->rbuf, len);
2541 2           offset += len; // can't be > len since block is offset<0
2542 2 50         if (offset == 0) {
2543 0           RETVAL = 1; // no-op, but OK
2544             }
2545 2 100         else if (offset > 0) {
2546 1           sv_chop(c->rbuf, str + offset);
2547 1           RETVAL = 1;
2548             }
2549             else {
2550             // past beginning of string
2551 1           RETVAL = 0;
2552             }
2553             }
2554             else {
2555             // invalid seek
2556 2           RETVAL = 0;
2557             }
2558             }
2559             OUTPUT:
2560             RETVAL
2561              
2562             int
2563             close (feer_conn_handle *hdl)
2564             PROTOTYPE: $
2565             ALIAS:
2566             Feersum::Connection::Reader::close = 1
2567             Feersum::Connection::Writer::close = 2
2568             CODE:
2569             {
2570             assert(ix);
2571 17           RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
2572 17           SvUVX(hdl_sv) = 0;
2573             }
2574             OUTPUT:
2575             RETVAL
2576              
2577             void
2578             _poll_cb (feer_conn_handle *hdl, SV *cb)
2579             PROTOTYPE: $$
2580             ALIAS:
2581             Feersum::Connection::Reader::poll_cb = 1
2582             Feersum::Connection::Writer::poll_cb = 2
2583             PPCODE:
2584             {
2585 8 50         if (unlikely(ix < 1 || ix > 2))
    50          
2586 0           croak("can't call _poll_cb directly");
2587 8 50         else if (unlikely(ix == 1))
2588 0           croak("poll_cb for reading not yet supported"); // TODO poll_read_cb
2589              
2590 8 100         if (c->poll_write_cb != NULL) {
2591 4           SvREFCNT_dec(c->poll_write_cb);
2592 4           c->poll_write_cb = NULL;
2593             }
2594              
2595 8 100         if (!SvOK(cb)) {
    50          
    50          
2596             trace("unset poll_cb ix=%d\n", ix);
2597 4           return;
2598             }
2599 4 50         else if (unlikely(!IsCodeRef(cb)))
    50          
2600 0           croak("must supply a code reference to poll_cb");
2601              
2602 4           c->poll_write_cb = newSVsv(cb);
2603 4           conn_write_ready(c);
2604             }
2605              
2606             SV*
2607             response_guard (feer_conn_handle *hdl, ...)
2608             PROTOTYPE: $;$
2609             CODE:
2610 3 100         RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
2611             OUTPUT:
2612             RETVAL
2613              
2614             MODULE = Feersum PACKAGE = Feersum::Connection
2615              
2616             PROTOTYPES: ENABLE
2617              
2618             SV *
2619             start_streaming (struct feer_conn *c, SV *message, AV *headers)
2620             PROTOTYPE: $$\@
2621             CODE:
2622 12           feersum_start_response(aTHX_ c, message, headers, 1);
2623 12           RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2624             OUTPUT:
2625             RETVAL
2626              
2627             size_t
2628             send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
2629             PROTOTYPE: $$\@$
2630             CODE:
2631 22           feersum_start_response(aTHX_ c, message, headers, 0);
2632 22 50         if (unlikely(!SvOK(body)))
    0          
    0          
    0          
2633 0           croak("can't send_response with an undef body");
2634 22           RETVAL = feersum_write_whole_body(aTHX_ c, body);
2635             OUTPUT:
2636             RETVAL
2637              
2638             SV*
2639             _continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
2640             PROTOTYPE: $\@
2641             CODE:
2642             {
2643             AV *av;
2644 9           int len = 0;
2645              
2646 9 50         if (IsArrayRef(psgi_response)) {
    50          
2647 9           av = (AV*)SvRV(psgi_response);
2648 9           len = av_len(av) + 1;
2649             }
2650              
2651 9 100         if (len == 3) {
2652             // 0 is "don't recurse" (i.e. don't allow another code-ref)
2653 6           feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
2654 6           RETVAL = &PL_sv_undef;
2655             }
2656 3 50         else if (len == 2) {
2657 3           SV *message = *(av_fetch(av,0,0));
2658 3           SV *headers = *(av_fetch(av,1,0));
2659 3 50         if (unlikely(!IsArrayRef(headers)))
    50          
2660 0           croak("PSGI headers must be an array ref");
2661 3           feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
2662 3           RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2663             }
2664             else {
2665 0           croak("PSGI response starter expects a 2 or 3 element array-ref");
2666             }
2667             }
2668             OUTPUT:
2669             RETVAL
2670              
2671             void
2672             force_http10 (struct feer_conn *c)
2673             PROTOTYPE: $
2674             ALIAS:
2675             force_http11 = 1
2676             PPCODE:
2677 0           c->is_http11 = ix;
2678              
2679             SV *
2680             env (struct feer_conn *c)
2681             PROTOTYPE: $
2682             CODE:
2683 26           RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
2684             OUTPUT:
2685             RETVAL
2686              
2687             int
2688             fileno (struct feer_conn *c)
2689             CODE:
2690 6           RETVAL = c->fd;
2691             OUTPUT:
2692             RETVAL
2693              
2694             SV*
2695             response_guard (struct feer_conn *c, ...)
2696             PROTOTYPE: $;$
2697             CODE:
2698 3 100         RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
2699             OUTPUT:
2700             RETVAL
2701              
2702             void
2703             DESTROY (struct feer_conn *c)
2704             PPCODE:
2705             {
2706             int i;
2707             trace("DESTROY connection fd=%d c=%p\n", c->fd, c);
2708              
2709 118 100         if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);
2710              
2711 118 50         if (c->wbuf_rinq) {
2712             struct iomatrix *m;
2713 0 0         while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
2714 0 0         for (i=0; i < m->count; i++) {
2715 0 0         if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
2716             }
2717 0           Safefree(m);
2718             }
2719             }
2720              
2721 118 100         if (likely(c->req)) {
2722 111 100         if (c->req->buf) SvREFCNT_dec(c->req->buf);
2723 111           Safefree(c->req);
2724             }
2725              
2726 118 50         if (likely(c->sa)) free(c->sa);
2727              
2728 118           safe_close_conn(c, "close at destruction");
2729              
2730 118 50         if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);
2731              
2732 118 100         if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
2733              
2734 118           active_conns--;
2735              
2736 118 100         if (unlikely(shutting_down && active_conns <= 0)) {
    100          
2737 1           ev_idle_stop(feersum_ev_loop, &ei);
2738 1           ev_prepare_stop(feersum_ev_loop, &ep);
2739 1           ev_check_stop(feersum_ev_loop, &ec);
2740              
2741             trace3("... was last conn, going to try shutdown\n");
2742 1 50         if (shutdown_cb_cv) {
2743 1 50         PUSHMARK(SP);
2744 1           call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2745 1           PUTBACK;
2746             trace3("... ok, called that handler\n");
2747 1           SvREFCNT_dec(shutdown_cb_cv);
2748 1           shutdown_cb_cv = NULL;
2749             }
2750             }
2751             }
2752              
2753             MODULE = Feersum PACKAGE = Feersum
2754              
2755             BOOT:
2756             {
2757 32           feer_stash = gv_stashpv("Feersum", 1);
2758 32           feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
2759 32           feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
2760 32           feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
2761 32 50         I_EV_API("Feersum");
    50          
    50          
    50          
2762              
2763 32           psgi_ver = newAV();
2764 32           av_extend(psgi_ver, 2);
2765 32           av_push(psgi_ver, newSViv(1));
2766 32           av_push(psgi_ver, newSViv(1));
2767 32           SvREADONLY_on((SV*)psgi_ver);
2768              
2769 32           psgi_serv10 = newSVpvs("HTTP/1.0");
2770 32           SvREADONLY_on(psgi_serv10);
2771 32           psgi_serv11 = newSVpvs("HTTP/1.1");
2772 32           SvREADONLY_on(psgi_serv11);
2773              
2774 32           Zero(&psgix_io_vtbl, 1, MGVTBL);
2775 32           psgix_io_vtbl.svt_get = psgix_io_svt_get;
2776              
2777             trace3("Feersum booted, iomatrix %lu "
2778             "(IOV_MAX=%u, FEERSUM_IOMATRIX_SIZE=%u), "
2779             "feer_req %lu, "
2780             "feer_conn %lu\n",
2781             (long unsigned int)sizeof(struct iomatrix),
2782             (unsigned int)IOV_MAX,
2783             (unsigned int)FEERSUM_IOMATRIX_SIZE,
2784             (long unsigned int)sizeof(struct feer_req),
2785             (long unsigned int)sizeof(struct feer_conn)
2786             );
2787             }