File Coverage

Websockets.xs
Criterion Covered Total %
statement 855 1077 79.3
branch 558 1042 53.5
condition n/a
subroutine n/a
pod n/a
total 1413 2119 66.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include
9             #include
10             #include
11              
12             /* Magic numbers for use-after-free detection */
13             #define EV_WS_CTX_MAGIC 0xDEADBEEF
14             #define EV_WS_CTX_FREED 0xFEEDFACE
15             #define EV_WS_CONN_MAGIC 0xCAFEBABE
16             #define EV_WS_CONN_FREED 0xBADC0FFE
17              
18             /* Forward declarations */
19             typedef struct ev_ws_ctx_s ev_ws_ctx_t;
20             typedef struct ev_ws_conn_s ev_ws_conn_t;
21             typedef struct ev_ws_fd_s ev_ws_fd_t;
22              
23             #define EV_WS_SRV_MAGIC 0xFEEDCAFE
24             #define EV_WS_SRV_FREED 0xFEEDDEAD
25              
26             typedef struct ev_ws_server_s {
27             unsigned int magic;
28             SV* on_connect;
29             SV* on_message;
30             SV* on_close;
31             SV* on_error;
32             SV* on_pong;
33             SV* on_drain;
34             SV* on_handshake;
35             HV* response_headers; /* headers to inject into upgrade response */
36             size_t max_message_size;
37             char* protocol_name;
38             struct lws_protocols vhost_protocols[2];
39             } ev_ws_server_t;
40              
41             typedef ev_ws_ctx_t* EV__Websockets__Context;
42             typedef ev_ws_conn_t* EV__Websockets__Connection;
43             typedef struct ev_loop* EV__Loop;
44              
45             /* File descriptor watcher tracking */
46             struct ev_ws_fd_s {
47             ev_io io;
48             ev_ws_ctx_t* ctx;
49             int fd;
50             int poll_events; /* registered POLLIN/POLLOUT interest mask */
51             };
52              
53             /* Send buffer node for pending writes (FAM: data follows the struct) */
54             typedef struct ev_ws_send_s {
55             struct ev_ws_send_s* next;
56             size_t len;
57             enum lws_write_protocol write_mode;
58             char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
59             } ev_ws_send_t;
60              
61             /* Context structure - manages lws_context and connections */
62             struct ev_ws_ctx_s {
63             unsigned int magic;
64             int refcnt; /* lifecycle refcount: Perl + each in-flight lws_service */
65             int* alive_flag; /* points to caller's stack variable during lws_service */
66             struct ev_loop* loop;
67             struct lws_context* lws_ctx;
68             ev_ws_conn_t* connections;
69             ev_ws_conn_t* flush_head; /* conns with a buffered, fully-received message
70             awaiting delivery once the service call drains */
71             ev_ws_fd_t** fd_table;
72             int fd_table_size;
73             ev_timer timer;
74             };
75              
76             /* Connection structure */
77             struct ev_ws_conn_s {
78             unsigned int magic;
79             ev_ws_ctx_t* ctx;
80             struct lws* wsi;
81             ev_ws_conn_t* next; /* linked list */
82             ev_ws_conn_t* prev;
83              
84             int refcnt;
85             SV* perl_self; /* non-owning ptr to the blessed RV's target; its existence
86             holds one refcnt (added in get_conn_sv, dropped in DESTROY) */
87              
88             /* Callbacks */
89             SV* on_connect;
90             SV* on_message;
91             SV* on_close;
92             SV* on_error;
93             SV* on_pong;
94             SV* on_drain;
95              
96             /* Custom Headers */
97             HV* custom_headers;
98              
99             /* Response Headers (client: response headers; server: request headers) */
100             HV* response_headers;
101              
102             /* Receive buffer for reassembling a message across callbacks */
103             char* recv_buf;
104             size_t recv_len;
105             size_t recv_alloc;
106             int recv_is_binary;
107             int recv_complete; /* buffered message is fully received (final frame) */
108             ev_ws_conn_t* flush_next; /* link in ctx->flush_head while pending delivery */
109             int on_flush; /* already queued on ctx->flush_head (dedup) */
110             size_t max_message_size;
111              
112             /* Send queue */
113             ev_ws_send_t* send_head;
114             ev_ws_send_t* send_tail;
115             size_t send_queue_bytes;
116              
117             /* Adopted file handle (prevents Perl from closing the fd) */
118             SV* adopted_fh;
119              
120             /* Connect timeout */
121             ev_timer connect_timer;
122             int connect_timer_active;
123             struct ev_loop* loop;
124              
125             /* Fragmented send state */
126             int in_fragmented_send;
127              
128             /* Per-connection metadata */
129             HV* stash;
130              
131             /* State */
132             int connected;
133             int closing;
134             };
135              
136             #define EV_WS_PROTOCOL_NAME "ev-websockets"
137              
138             /* Extensions support (compression) */
139             #ifdef LWS_HAS_EXTENSIONS
140             static const struct lws_extension extensions[] = {
141             {
142             "permessage-deflate",
143             lws_extension_callback_pm_deflate,
144             "permessage-deflate; client_no_context_takeover; client_max_window_bits"
145             },
146             { NULL, NULL, NULL }
147             };
148             #endif
149              
150             static int ev_ws_debug = 0;
151              
152             /* Bridges userdata into ws_callback() before lws_adopt returns. */
153             static ev_ws_conn_t* pending_adoption = NULL;
154             static HV* handshake_headers_map = NULL; /* wsi-ptr → per-conn response headers HV */
155             static struct lws_context* ssl_keepalive_ctx = NULL; /* see ensure_ssl_keepalive() */
156              
157             /* Copy an lws header token into a fresh SV, or return NULL if absent/empty */
158 226           static SV* hdr_to_sv(struct lws *wsi, enum lws_token_indexes tok) {
159 226           int total = lws_hdr_total_length(wsi, tok);
160 226 100         if (total > 0) {
161             char *buf;
162             int n;
163 54           Newx(buf, total + 1, char);
164 54           n = lws_hdr_copy(wsi, buf, total + 1, tok);
165 54 50         if (n > 0) {
166 54           SV *val = newSVpvn(buf, n);
167 54           Safefree(buf);
168 54           return val;
169             }
170 0           Safefree(buf);
171             }
172 172           return NULL;
173             }
174              
175             /* Capture a header value into an HV under the given key */
176 224           static void capture_header(struct lws *wsi, HV *hv, enum lws_token_indexes tok,
177             const char *name, STRLEN nlen) {
178 224           SV *val = hdr_to_sv(wsi, tok);
179 224 100         if (val && !hv_store(hv, name, nlen, val, 0))
    50          
180 0           SvREFCNT_dec(val);
181 224           }
182              
183             typedef struct { enum lws_token_indexes tok; const char *name; STRLEN nlen; } header_def_t;
184              
185             static const header_def_t request_hdrs[] = {
186             { WSI_TOKEN_GET_URI, "Path", 4 },
187             { WSI_TOKEN_HOST, "Host", 4 },
188             { WSI_TOKEN_ORIGIN, "Origin", 6 },
189             { WSI_TOKEN_HTTP_COOKIE, "Cookie", 6 },
190             { WSI_TOKEN_HTTP_AUTHORIZATION, "Authorization", 13 },
191             { WSI_TOKEN_PROTOCOL, "Sec-WebSocket-Protocol", 22 },
192             { WSI_TOKEN_HTTP_USER_AGENT, "User-Agent", 10 },
193             { WSI_TOKEN_X_FORWARDED_FOR, "X-Forwarded-For", 15 },
194             };
195             #define N_REQUEST_HDRS (int)(sizeof(request_hdrs)/sizeof(request_hdrs[0]))
196              
197 18           static void capture_request_headers(struct lws *wsi, HV *hv) {
198             int i;
199 162 100         for (i = 0; i < N_REQUEST_HDRS; i++)
200 144           capture_header(wsi, hv, request_hdrs[i].tok,
201 144           request_hdrs[i].name, request_hdrs[i].nlen);
202 18           }
203              
204             /* Inject all key/value pairs from an HV as HTTP headers via lws.
205             Returns -1 if lws rejected a header (client path aborts the handshake);
206             server callers ignore the result and simply stop adding. */
207 1           static int inject_headers(struct lws *wsi, HV *hv,
208             unsigned char **p, unsigned char *end) {
209             HE *entry;
210             char kbuf[256];
211 1           hv_iterinit(hv);
212 2 100         while ((entry = hv_iternext(hv))) {
213             I32 klen;
214 1           const char *key = hv_iterkey(entry, &klen);
215             SV *val_sv;
216             STRLEN vlen;
217             const char *val;
218 1 50         if (klen >= 254) continue;
219 1           val_sv = hv_iterval(hv, entry);
220 1           val = SvPV(val_sv, vlen);
221 1           memcpy(kbuf, key, klen);
222 1           kbuf[klen] = ':';
223 1           kbuf[klen + 1] = '\0';
224 1 50         if (lws_add_http_header_by_name(wsi, (unsigned char *)kbuf,
225             (unsigned char *)val, vlen, p, end))
226 0           return -1;
227             }
228 1           return 0;
229             }
230              
231             /* Format a wsi pointer as the lookup key for handshake_headers_map.
232             Callers must pass a buffer of at least 32 bytes; returns the key length. */
233 0           static int wsi_key(char *buf, struct lws *wsi) {
234 0           return snprintf(buf, 32, "%p", (void*)wsi);
235             }
236              
237             #define DEBUG_LOG(fmt, ...) do { if (ev_ws_debug) fprintf(stderr, "[EV::WS] " fmt "\n", ##__VA_ARGS__); } while(0)
238              
239 255           static void ctx_ref(ev_ws_ctx_t* ctx) {
240 255           ctx->refcnt++;
241 255           }
242              
243 279           static void ctx_unref(ev_ws_ctx_t* ctx) {
244 279 100         if (--ctx->refcnt == 0) {
245 24           Safefree(ctx);
246             }
247 279           }
248              
249             /* Schedule the next lws housekeeping wake-up.
250              
251             lws_service_adjust_timeout returns the ms until lws next needs servicing; 0
252             means "service as soon as possible". This timer only paces lws's time-based
253             work (connection/handshake timeouts, TLS cert aging, draining buffered rx) --
254             socket readability/writability is driven by the per-fd io watcher, not here.
255             We deliberately floor the delay at 1ms rather than arming an ev_idle watcher
256             on 0: do_lws_service is now non-blocking, so an always-ready idle watcher
257             would busy-spin at 100% CPU whenever lws keeps asking for immediate service.
258             A 1ms floor lets the loop block briefly so every other EV watcher still
259             fires, at negligible latency for this coarse, time-based work. */
260 279           static void schedule_timeout(ev_ws_ctx_t* ctx) {
261 279           int delay_ms = lws_service_adjust_timeout(ctx->lws_ctx, 1000, 0);
262             double delay_s;
263              
264 279 100         if (delay_ms < 1) delay_ms = 1;
265 279           delay_s = (double)delay_ms / 1000.0;
266              
267 279           ev_timer_stop(ctx->loop, &ctx->timer);
268 279           ev_timer_set(&ctx->timer, delay_s, 0.);
269 279           ev_timer_start(ctx->loop, &ctx->timer);
270 279           }
271              
272             static void flush_recv_messages(ev_ws_ctx_t* ctx);
273              
274 5           static void do_lws_service(ev_ws_ctx_t* ctx) {
275 5 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
    50          
    50          
276 5           int alive = 1;
277 5           int* prev_flag = ctx->alive_flag;
278 5           ctx->alive_flag = &alive;
279 5           ctx_ref(ctx);
280             /* "Forced service": drive connections that need servicing with no
281             pending socket event -- rx already read into lws's buflist, plus any
282             due lws_sul timeouts. Counterintuitively a timeout_ms of 0 is NOT
283             non-blocking here: lws maps it to its maximum internal poll wait, so
284             the old lws_service(ctx, 0) blocked for seconds on an idle connection
285             and starved every other EV watcher. A negative timeout_ms clamps the
286             wait to 0, servicing only ready + forced-service work and returning at
287             once (see the lws_service_adjust_timeout docs). lws_service_fd(ctx,
288             NULL) is invalid since lws 3.2 and never drains buflists. Per-fd I/O
289             is driven by io_cb via lws_service_fd(&pollfd). */
290 5           lws_service_tsi(ctx->lws_ctx, -1, 0);
291 5 50         if (alive)
292 5           flush_recv_messages(ctx); /* deliver messages reassembled above */
293 5 50         if (alive) {
294 5           ctx->alive_flag = prev_flag;
295 5           schedule_timeout(ctx);
296 0 0         } else if (prev_flag) {
297 0           *prev_flag = 0; /* propagate destruction up the alive_flag chain */
298             }
299 5           ctx_unref(ctx);
300             }
301 5           }
302              
303 5           static void timer_cb(EV_P_ ev_timer* w, int revents) {
304             (void)loop; (void)revents;
305 5           do_lws_service((ev_ws_ctx_t*)w->data);
306 5           }
307              
308             /* Forward declarations */
309             static void io_cb(EV_P_ ev_io* w, int revents);
310             static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
311             static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
312             static void conn_ref(ev_ws_conn_t* conn);
313             static void conn_unref(ev_ws_conn_t* conn);
314              
315 131           static SV* get_conn_sv(ev_ws_conn_t* conn) {
316 131 100         if (conn->perl_self) {
317 89           SV* rv = newRV_inc(conn->perl_self);
318 89           sv_bless(rv, gv_stashpv("EV::Websockets::Connection", 1));
319 89           return rv;
320             }
321 42           SV* rv = newSV(0);
322 42           sv_setref_pv(rv, "EV::Websockets::Connection", (void*)conn);
323 42           conn->perl_self = SvRV(rv);
324            
325             /* We are creating a new Perl owner for this connection.
326             Increment refcnt so DESTROY doesn't kill it prematurely if LWS still needs it. */
327 42           conn_ref(conn);
328            
329 42           return rv;
330             }
331              
332             /* All emit_* callbacks share this skeleton: guard on a registered handler,
333             ref the conn across the call (a callback may close/destroy it), push $conn
334             plus handler-specific args, invoke under G_EVAL, and warn — without
335             recursing — on a die. EMIT_BEGIN opens a block that EMIT_END closes;
336             handler-specific args are XPUSHs'd in between. */
337             #define EMIT_BEGIN(conn, cb_field) \
338             if ((conn) == NULL || (conn)->cb_field == NULL) return; \
339             { \
340             SV* _emit_cb = (conn)->cb_field; \
341             dSP; \
342             ENTER; \
343             SAVETMPS; \
344             PUSHMARK(SP); \
345             conn_ref(conn); \
346             XPUSHs(sv_2mortal(get_conn_sv(conn)));
347              
348             #define EMIT_END(conn, label) \
349             PUTBACK; \
350             sv_setsv(ERRSV, &PL_sv_undef); \
351             call_sv(_emit_cb, G_DISCARD | G_EVAL); \
352             if (SvTRUE(ERRSV)) \
353             warn("EV::Websockets: exception in " label ": %s", SvPV_nolen(ERRSV)); \
354             FREETMPS; \
355             LEAVE; \
356             conn_unref(conn); \
357             }
358              
359             /* Guard shared by the send_* methods: croak if the connection has been
360             destroyed or is not currently open for writing. */
361             #define CHECK_CONN_OPEN(self) STMT_START { \
362             if ((self)->magic != EV_WS_CONN_MAGIC) \
363             croak("Connection has been destroyed"); \
364             if (!(self)->wsi || !(self)->connected || (self)->closing) \
365             croak("Connection is not open"); \
366             } STMT_END
367              
368             /* Guard for send/send_binary: a whole-message write must not interleave with
369             an in-progress fragmented message. */
370             #define CHECK_NOT_FRAGMENTING(self) STMT_START { \
371             if ((self)->in_fragmented_send) \
372             croak("Cannot send while a fragmented message is in progress; " \
373             "finish the fragment with send_fragment(..., is_final => 1) first"); \
374             } STMT_END
375              
376 2           static void emit_error(ev_ws_conn_t* conn, const char* error) {
377 2 50         EMIT_BEGIN(conn, on_error);
    50          
    50          
    50          
378 2 50         XPUSHs(sv_2mortal(newSVpv(error, 0)));
379 2 50         EMIT_END(conn, "error handler");
    50          
    50          
    0          
    50          
380             }
381              
382 38           static void emit_connect(ev_ws_conn_t* conn) {
383 38 50         EMIT_BEGIN(conn, on_connect);
    100          
    50          
    50          
384 36 50         if (conn->response_headers)
385 36 50         XPUSHs(sv_2mortal(newRV_inc((SV*)conn->response_headers)));
386             else
387 0 0         XPUSHs(&PL_sv_undef);
388 36 50         EMIT_END(conn, "connect handler");
    50          
    100          
    50          
    50          
389             }
390              
391 30           static void emit_message(ev_ws_conn_t* conn, const char* data, size_t len, int is_binary, int is_final) {
392 30 50         EMIT_BEGIN(conn, on_message);
    50          
    50          
    50          
393 30 50         XPUSHs(sv_2mortal(newSVpvn(data, len)));
394 30 50         XPUSHs(sv_2mortal(newSViv(is_binary)));
395 30 50         XPUSHs(sv_2mortal(newSViv(is_final)));
396 30 50         EMIT_END(conn, "message handler");
    50          
    50          
    0          
    50          
397             }
398              
399             /* Deliver buffered, fully-received messages once an lws service call has
400             drained all currently-available input. The receive path reassembles a
401             message across callbacks (necessary because permessage-deflate inflates one
402             frame into several callbacks, each reporting lws_is_final_fragment() == 1)
403             and queues the connection here; we emit only complete messages. The ref taken
404             when queuing keeps each conn alive across its callback, so it is safe to
405             touch conn after emit_message returns. */
406 255           static void flush_recv_messages(ev_ws_ctx_t* ctx) {
407 255           ev_ws_conn_t* conn = ctx->flush_head;
408 255           ctx->flush_head = NULL;
409 286 100         while (conn) {
410 31           ev_ws_conn_t* next = conn->flush_next;
411 31           conn->flush_next = NULL;
412 31           conn->on_flush = 0;
413 31 50         if (conn->magic == EV_WS_CONN_MAGIC && conn->recv_complete) {
    100          
414             /* recv_complete (not recv_len>0): a zero-length message is a valid
415             complete message and must still be delivered. */
416 30 50         emit_message(conn, conn->recv_buf ? conn->recv_buf : "", conn->recv_len, conn->recv_is_binary, 1);
417 30 50         if (conn->magic == EV_WS_CONN_MAGIC) {
418 30           conn->recv_len = 0;
419 30           conn->recv_complete = 0;
420             }
421             }
422 31           conn_unref(conn); /* release the flush-list ref */
423 31           conn = next;
424             }
425 255           }
426              
427 31           static void emit_close(ev_ws_conn_t* conn, int code, const char* reason) {
428 31 50         EMIT_BEGIN(conn, on_close);
    50          
    50          
    50          
429 31 50         XPUSHs(sv_2mortal(newSViv(code)));
430 31 50         XPUSHs(reason ? sv_2mortal(newSVpv(reason, 0)) : &PL_sv_undef);
    50          
431 31 50         EMIT_END(conn, "close handler");
    50          
    100          
    50          
    50          
432             }
433              
434 3           static void emit_pong(ev_ws_conn_t* conn, const char* data, size_t len) {
435 3 50         EMIT_BEGIN(conn, on_pong);
    50          
    50          
    50          
436 3 50         XPUSHs(sv_2mortal(newSVpvn(data ? data : "", len)));
    50          
437 3 50         EMIT_END(conn, "pong handler");
    50          
    100          
    50          
    50          
438             }
439              
440 33           static void emit_drain(ev_ws_conn_t* conn) {
441 33 50         EMIT_BEGIN(conn, on_drain);
    100          
    50          
    50          
442 1 50         EMIT_END(conn, "drain handler");
    50          
    50          
    50          
    50          
443             }
444              
445             /* Stop the connect-timeout watcher if it is running */
446 112           static void stop_connect_timer(ev_ws_conn_t* conn) {
447 112 50         if (conn->connect_timer_active && conn->loop) {
    0          
448 0           ev_timer_stop(conn->loop, &conn->connect_timer);
449 0           conn->connect_timer_active = 0;
450             }
451 112           }
452              
453             /* Drop references to the (up to seven) callback SVs collected while parsing
454             options, on an error path that croaks before they are owned elsewhere.
455             Pass NULL for absent slots; SvREFCNT_dec of NULL is guarded out. */
456 5           static void free_cb_svs(SV* on_connect, SV* on_message, SV* on_close,
457             SV* on_error, SV* on_pong, SV* on_drain,
458             SV* on_handshake) {
459 5 50         if (on_connect) SvREFCNT_dec(on_connect);
460 5 100         if (on_message) SvREFCNT_dec(on_message);
461 5 50         if (on_close) SvREFCNT_dec(on_close);
462 5 50         if (on_error) SvREFCNT_dec(on_error);
463 5 50         if (on_pong) SvREFCNT_dec(on_pong);
464 5 50         if (on_drain) SvREFCNT_dec(on_drain);
465 5 50         if (on_handshake) SvREFCNT_dec(on_handshake);
466 5           }
467              
468             /* Drop the server's callback/header SV references (not protocol_name, which the
469             live vhost still points at, nor the struct itself). Idempotent via NULL-out. */
470 18           static void free_server_svs(ev_ws_server_t* srv) {
471 18 50         if (srv->on_connect) { SvREFCNT_dec(srv->on_connect); srv->on_connect = NULL; }
472 18 100         if (srv->on_message) { SvREFCNT_dec(srv->on_message); srv->on_message = NULL; }
473 18 100         if (srv->on_close) { SvREFCNT_dec(srv->on_close); srv->on_close = NULL; }
474 18 100         if (srv->on_error) { SvREFCNT_dec(srv->on_error); srv->on_error = NULL; }
475 18 100         if (srv->on_pong) { SvREFCNT_dec(srv->on_pong); srv->on_pong = NULL; }
476 18 50         if (srv->on_drain) { SvREFCNT_dec(srv->on_drain); srv->on_drain = NULL; }
477 18 50         if (srv->on_handshake) { SvREFCNT_dec(srv->on_handshake); srv->on_handshake = NULL; }
478 18 50         if (srv->response_headers) {
479 0           SvREFCNT_dec((SV*)srv->response_headers);
480 0           srv->response_headers = NULL;
481             }
482 18           }
483              
484             /* Free a connection's resources (not the struct itself) */
485 42           static void free_conn_resources(ev_ws_conn_t* conn) {
486             ev_ws_send_t* send;
487             ev_ws_send_t* next_send;
488              
489 42 50         DEBUG_LOG("Freeing connection resources: conn=%p", conn);
490              
491 42           stop_connect_timer(conn);
492              
493 42 100         if (conn->on_connect) { SvREFCNT_dec(conn->on_connect); conn->on_connect = NULL; }
494 42 100         if (conn->on_message) { SvREFCNT_dec(conn->on_message); conn->on_message = NULL; }
495 42 100         if (conn->on_close) { SvREFCNT_dec(conn->on_close); conn->on_close = NULL; }
496 42 100         if (conn->on_error) { SvREFCNT_dec(conn->on_error); conn->on_error = NULL; }
497 42 100         if (conn->on_pong) { SvREFCNT_dec(conn->on_pong); conn->on_pong = NULL; }
498 42 100         if (conn->on_drain) { SvREFCNT_dec(conn->on_drain); conn->on_drain = NULL; }
499              
500 42 100         if (conn->custom_headers) {
501 1           SvREFCNT_dec((SV*)conn->custom_headers);
502 1           conn->custom_headers = NULL;
503             }
504              
505 42 100         if (conn->response_headers) {
506 38           SvREFCNT_dec((SV*)conn->response_headers);
507 38           conn->response_headers = NULL;
508             }
509              
510 42 100         if (conn->stash) { SvREFCNT_dec((SV*)conn->stash); conn->stash = NULL; }
511              
512 42 100         if (conn->adopted_fh) { SvREFCNT_dec(conn->adopted_fh); conn->adopted_fh = NULL; }
513              
514 42 100         if (conn->recv_buf) { Safefree(conn->recv_buf); conn->recv_buf = NULL; conn->recv_alloc = 0; }
515              
516 42 50         for (send = conn->send_head; send != NULL; send = next_send) {
517 0           next_send = send->next;
518 0           Safefree(send);
519             }
520 42           conn->send_head = NULL;
521 42           conn->send_tail = NULL;
522 42           conn->send_queue_bytes = 0;
523 42           }
524              
525 210           static void conn_ref(ev_ws_conn_t* conn) {
526 210           conn->refcnt++;
527 210 50         DEBUG_LOG("conn_ref: %p refcnt=%d", conn, conn->refcnt);
528 210           }
529              
530 276           static void conn_unref(ev_ws_conn_t* conn) {
531 276 50         DEBUG_LOG("conn_unref: %p refcnt=%d", conn, conn->refcnt);
532 276 100         if (--conn->refcnt == 0) {
533 42 50         DEBUG_LOG("Actually freeing conn: %p", conn);
534 42           free_conn_resources(conn);
535 42           conn->magic = EV_WS_CONN_FREED;
536 42           Safefree(conn);
537             }
538 276           }
539              
540 42           static void link_conn(ev_ws_ctx_t* ctx, ev_ws_conn_t* conn) {
541 42           conn->ctx = ctx;
542 42           conn->next = ctx->connections;
543 42 100         if (ctx->connections) {
544 19           ctx->connections->prev = conn;
545             }
546 42           ctx->connections = conn;
547 42           }
548              
549 32           static void unlink_conn(ev_ws_conn_t* conn) {
550 32 50         if (conn->ctx == NULL) return;
551              
552 32 100         if (conn->prev) {
553 13           conn->prev->next = conn->next;
554             } else {
555 19           conn->ctx->connections = conn->next;
556             }
557 32 100         if (conn->next) {
558 5           conn->next->prev = conn->prev;
559             }
560 32           conn->prev = NULL;
561 32           conn->next = NULL;
562 32           conn->ctx = NULL;
563             }
564              
565 36           static void queue_send(ev_ws_conn_t* conn, const char* data, size_t len, enum lws_write_protocol write_mode) {
566 36           int was_empty = (conn->send_head == NULL);
567 36           size_t alloc = offsetof(ev_ws_send_t, data) + LWS_PRE + len;
568 36           ev_ws_send_t* send = (ev_ws_send_t*)safemalloc(alloc);
569              
570 36 50         if (data && len > 0) {
    50          
571 36           memcpy(send->data + LWS_PRE, data, len);
572             }
573 36           send->len = len;
574 36           send->write_mode = write_mode;
575 36           send->next = NULL;
576              
577 36 100         if (conn->send_tail) {
578 2           conn->send_tail->next = send;
579 2           conn->send_tail = send;
580             } else {
581 34           conn->send_head = send;
582 34           conn->send_tail = send;
583             }
584 36           conn->send_queue_bytes += len;
585              
586 36 100         if (was_empty && conn->wsi) {
    50          
587 34           lws_callback_on_writable(conn->wsi);
588             }
589 36           }
590              
591 249           static void io_cb(EV_P_ ev_io* w, int revents) {
592 249           ev_ws_fd_t* fdw = (ev_ws_fd_t*)w;
593 249           ev_ws_ctx_t* ctx = fdw->ctx;
594             struct lws_pollfd pollfd;
595              
596             (void)loop;
597              
598 249 50         if (ctx == NULL || ctx->magic != EV_WS_CTX_MAGIC || ctx->lws_ctx == NULL) {
    50          
    50          
599 0           return;
600             }
601              
602 249           pollfd.fd = fdw->fd;
603 249           pollfd.events = fdw->poll_events;
604 249           pollfd.revents = 0;
605              
606 249 100         if (revents & EV_READ) pollfd.revents |= POLLIN;
607 249 100         if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
608 249 50         if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;
609              
610             {
611 249           int alive = 1;
612 249           int* prev_flag = ctx->alive_flag;
613 249           ctx->alive_flag = &alive;
614 249           ctx_ref(ctx);
615 249           lws_service_fd(ctx->lws_ctx, &pollfd);
616 249 50         if (alive)
617 249           flush_recv_messages(ctx); /* deliver messages reassembled above */
618 249 50         if (alive) {
619 249           ctx->alive_flag = prev_flag;
620 249           schedule_timeout(ctx);
621 0 0         } else if (prev_flag) {
622 0           *prev_flag = 0; /* propagate destruction up the alive_flag chain */
623             }
624 249           ctx_unref(ctx);
625             }
626             }
627              
628             #define FD_TABLE_INIT_SIZE 64
629              
630 23           static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
631 23 50         int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
632 23 50         while (new_size <= needed) new_size *= 2;
633 23           Renew(ctx->fd_table, new_size, ev_ws_fd_t*);
634 23           Zero(ctx->fd_table + ctx->fd_table_size, new_size - ctx->fd_table_size, ev_ws_fd_t*);
635 23           ctx->fd_table_size = new_size;
636 23           }
637              
638 60           static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
639             ev_ws_fd_t* fdw;
640 60           int ev_events = 0;
641              
642 60 50         if (fd < 0) return;
643 60 100         if (fd >= ctx->fd_table_size) fd_table_grow(ctx, fd);
644              
645 60           fdw = ctx->fd_table[fd];
646 60 50         if (fdw != NULL) {
647 0           change_fd_watcher(ctx, fd, events);
648 0           return;
649             }
650              
651 60           Newxz(fdw, 1, ev_ws_fd_t);
652 60           fdw->ctx = ctx;
653 60           fdw->fd = fd;
654 60           fdw->poll_events = events;
655              
656 60 50         if (events & POLLIN) ev_events |= EV_READ;
657 60 50         if (events & POLLOUT) ev_events |= EV_WRITE;
658              
659 60 50         DEBUG_LOG("add_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);
660              
661 60 50         ev_io_init(&fdw->io, io_cb, fd, ev_events ? ev_events : EV_READ);
662 60 50         if (ev_events)
663 60           ev_io_start(ctx->loop, &fdw->io);
664              
665 60           ctx->fd_table[fd] = fdw;
666             }
667              
668 32           static void del_fd_watcher(ev_ws_ctx_t* ctx, int fd) {
669             ev_ws_fd_t* fdw;
670 32 50         if (fd < 0 || fd >= ctx->fd_table_size) return;
    50          
671 32           fdw = ctx->fd_table[fd];
672 32 50         if (fdw == NULL) return;
673              
674 32           ev_io_stop(ctx->loop, &fdw->io);
675 32           ctx->fd_table[fd] = NULL;
676 32           Safefree(fdw);
677             }
678              
679 393           static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
680             ev_ws_fd_t* fdw;
681 393           int ev_events = 0;
682              
683 393 50         if (fd < 0) return;
684 393 50         if (fd >= ctx->fd_table_size) {
685 0           add_fd_watcher(ctx, fd, events);
686 0           return;
687             }
688 393           fdw = ctx->fd_table[fd];
689 393 50         if (fdw == NULL) {
690 0           add_fd_watcher(ctx, fd, events);
691 0           return;
692             }
693              
694 393           fdw->poll_events = events;
695              
696 393 100         if (events & POLLIN) ev_events |= EV_READ;
697 393 100         if (events & POLLOUT) ev_events |= EV_WRITE;
698              
699 393 50         DEBUG_LOG("change_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);
700              
701 393           ev_io_stop(ctx->loop, &fdw->io);
702 393 100         if (ev_events == 0) return;
703 387           ev_io_set(&fdw->io, fd, ev_events);
704 387           ev_io_start(ctx->loop, &fdw->io);
705             }
706              
707 24           static void free_all_fd_watchers(ev_ws_ctx_t* ctx) {
708             int i;
709 1496 100         for (i = 0; i < ctx->fd_table_size; i++) {
710 1472           ev_ws_fd_t* fdw = ctx->fd_table[i];
711 1472 100         if (fdw) {
712 28 50         if (ctx->loop) ev_io_stop(ctx->loop, &fdw->io);
713 28           Safefree(fdw);
714 28           ctx->fd_table[i] = NULL;
715             }
716             }
717 24           Safefree(ctx->fd_table);
718 24           ctx->fd_table = NULL;
719 24           ctx->fd_table_size = 0;
720 24           }
721              
722 0           static void connect_timeout_cb(EV_P_ ev_timer* w, int revents) {
723 0           ev_ws_conn_t* conn = (ev_ws_conn_t*)w->data;
724             (void)loop; (void)revents;
725 0           conn->connect_timer_active = 0;
726 0           conn->closing = 1;
727 0           conn_ref(conn);
728 0           emit_error(conn, "connect timeout");
729 0 0         if (conn->magic == EV_WS_CONN_MAGIC && conn->wsi)
    0          
730 0           lws_callback_on_writable(conn->wsi);
731 0           conn_unref(conn);
732 0           }
733              
734             /* libwebsockets callback */
735 2190           static int ws_callback(struct lws* wsi, enum lws_callback_reasons reason,
736             void* user, void* in, size_t len) {
737 2190 50         struct lws_context* lws_ctx = wsi ? lws_get_context(wsi) : NULL;
738 2190 50         ev_ws_ctx_t* ctx = lws_ctx ? (ev_ws_ctx_t*)lws_context_user(lws_ctx) : NULL;
739 2190           ev_ws_conn_t* conn = (ev_ws_conn_t*)user;
740              
741 2190 100         if (!conn && wsi && pending_adoption) {
    50          
    100          
742 1 50         DEBUG_LOG("Associating pending adoption: conn=%p", pending_adoption);
743 1           lws_set_wsi_user(wsi, pending_adoption);
744 1           conn = pending_adoption;
745 1 50         if (!conn->wsi) conn->wsi = wsi;
746             }
747              
748 2190 100         if (ctx && ctx->magic != EV_WS_CTX_MAGIC) {
    100          
749             /* Context is being destroyed. Only handle cleanup callbacks. */
750 221 100         if (reason != LWS_CALLBACK_WSI_DESTROY &&
    100          
751             reason != LWS_CALLBACK_PROTOCOL_DESTROY) {
752 150           return 0;
753             }
754             }
755              
756 2040 50         DEBUG_LOG("callback reason=%d user=%p ctx=%p wsi=%p conn=%p", (int)reason, user, ctx, wsi, conn);
757              
758 2040           switch (reason) {
759             /* Poll fd management callbacks */
760 60           case LWS_CALLBACK_ADD_POLL_FD: {
761 60           struct lws_pollargs* pa = (struct lws_pollargs*)in;
762 60 50         DEBUG_LOG("ADD_POLL_FD: ctx=%p fd=%d events=%d", ctx, pa->fd, pa->events);
763 60 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
764 60           add_fd_watcher(ctx, pa->fd, pa->events);
765             }
766 60           break;
767             }
768              
769 32           case LWS_CALLBACK_DEL_POLL_FD: {
770 32           struct lws_pollargs* pa = (struct lws_pollargs*)in;
771 32 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
772 32           del_fd_watcher(ctx, pa->fd);
773             }
774 32           break;
775             }
776              
777 393           case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
778 393           struct lws_pollargs* pa = (struct lws_pollargs*)in;
779 393 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
780 393           change_fd_watcher(ctx, pa->fd, pa->events);
781             }
782 393           break;
783             }
784              
785 21           case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: {
786 21 50         if (conn && conn->custom_headers) {
    100          
787 1           unsigned char **p = (unsigned char **)in;
788 1           unsigned char *end = (*p) + len;
789 1 50         if (inject_headers(wsi, conn->custom_headers, p, end))
790 0           return -1;
791             }
792 21           break;
793             }
794            
795 20           case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: {
796 20 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
797             static const header_def_t resp_hdrs[] = {
798             { WSI_TOKEN_HTTP_SET_COOKIE, "Set-Cookie", 10 },
799             { WSI_TOKEN_HTTP_CONTENT_TYPE, "Content-Type", 12 },
800             { WSI_TOKEN_HTTP_SERVER, "Server", 6 },
801             { WSI_TOKEN_PROTOCOL, "Sec-WebSocket-Protocol", 22 },
802             #ifdef WSI_TOKEN_HTTP_LOCATION
803             { WSI_TOKEN_HTTP_LOCATION, "Location", 8 },
804             #endif
805             #ifdef WSI_TOKEN_HTTP_WWW_AUTHENTICATE
806             { WSI_TOKEN_HTTP_WWW_AUTHENTICATE, "WWW-Authenticate", 16 },
807             #endif
808             };
809             int hi;
810 20 50         if (!conn->response_headers) conn->response_headers = newHV();
811 100 100         for (hi = 0; hi < (int)(sizeof(resp_hdrs)/sizeof(resp_hdrs[0])); hi++) {
812 80           capture_header(wsi, conn->response_headers, resp_hdrs[hi].tok,
813 80           resp_hdrs[hi].name, resp_hdrs[hi].nlen);
814             }
815             }
816 20           break;
817             }
818              
819 18           case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: {
820 18           struct lws_vhost *vh = lws_get_vhost(wsi);
821 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
822 18 50         if (srv && srv->magic == EV_WS_SRV_MAGIC && srv->on_handshake) {
    50          
    50          
823 0           HV *hdrs = newHV();
824             int count;
825             SV *result;
826 0           capture_request_headers(wsi, hdrs);
827              
828 0           dSP;
829 0           ENTER;
830 0           SAVETMPS;
831 0 0         PUSHMARK(SP);
832 0 0         XPUSHs(sv_2mortal(newRV_noinc((SV*)hdrs)));
833 0           PUTBACK;
834 0 0         sv_setsv(ERRSV, &PL_sv_undef);
835 0           count = call_sv(srv->on_handshake, G_SCALAR | G_EVAL);
836 0           SPAGAIN;
837 0 0         if (SvTRUE(ERRSV)) {
    0          
838 0 0         warn("EV::Websockets: exception in on_handshake: %s", SvPV_nolen(ERRSV));
839 0 0         if (count) POPs;
840 0           PUTBACK;
841 0 0         FREETMPS;
842 0           LEAVE;
843 0           return -1;
844             }
845 0 0         result = count ? POPs : &PL_sv_undef;
846 0 0         if (!SvTRUE(result)) {
847 0           PUTBACK;
848 0 0         FREETMPS;
849 0           LEAVE;
850 0           return -1;
851             }
852 0 0         if (SvROK(result) && SvTYPE(SvRV(result)) == SVt_PVHV) {
    0          
853             char key[32];
854 0           int klen = wsi_key(key, wsi);
855 0           SV *val = newRV_inc(SvRV(result));
856 0 0         if (!handshake_headers_map)
857 0           handshake_headers_map = newHV();
858 0 0         if (!hv_store(handshake_headers_map, key, klen, val, 0))
859 0           SvREFCNT_dec(val);
860             }
861 0           PUTBACK;
862 0 0         FREETMPS;
863 0           LEAVE;
864             }
865 18           break;
866             }
867              
868             /* Connection callbacks */
869 18           case LWS_CALLBACK_ESTABLISHED:
870 18 50         if (!conn) {
871 18           struct lws_vhost *vh = lws_get_vhost(wsi);
872 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
873 18 50         if (ctx && srv && srv->magic == EV_WS_SRV_MAGIC) {
    50          
    50          
874             ev_ws_conn_t *c;
875 18           Newxz(c, 1, ev_ws_conn_t);
876 18           c->magic = EV_WS_CONN_MAGIC;
877 18           c->wsi = wsi;
878 18           c->refcnt = 1; /* For LWS */
879 18           c->max_message_size = srv->max_message_size;
880 18           c->loop = ctx->loop;
881 18           link_conn(ctx, c);
882 18 50         c->on_connect = srv->on_connect ? SvREFCNT_inc(srv->on_connect) : NULL;
883 18 100         c->on_message = srv->on_message ? SvREFCNT_inc(srv->on_message) : NULL;
884 18 50         c->on_close = srv->on_close ? SvREFCNT_inc(srv->on_close) : NULL;
885 18 100         c->on_error = srv->on_error ? SvREFCNT_inc(srv->on_error) : NULL;
886 18 100         c->on_pong = srv->on_pong ? SvREFCNT_inc(srv->on_pong) : NULL;
887 18 50         c->on_drain = srv->on_drain ? SvREFCNT_inc(srv->on_drain) : NULL;
888              
889 18           c->response_headers = newHV();
890 18           capture_request_headers(wsi, c->response_headers);
891              
892 18           lws_set_wsi_user(wsi, c);
893 18           conn = c;
894             }
895             }
896             /* fallthrough */
897             case LWS_CALLBACK_CLIENT_ESTABLISHED:
898 38 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
899 38 50         DEBUG_LOG("Connected (reason %d): conn=%p", (int)reason, conn);
900 38           stop_connect_timer(conn);
901 38           conn->connected = 1;
902 38           emit_connect(conn);
903             }
904 38           break;
905              
906 18           case LWS_CALLBACK_ADD_HEADERS: {
907 18           struct lws_process_html_args *args = (struct lws_process_html_args *)in;
908 18           unsigned char *p_end = (unsigned char *)args->p + args->max_len;
909 18           struct lws_vhost *vh = lws_get_vhost(wsi);
910 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
911 18 50         if (srv && srv->magic == EV_WS_SRV_MAGIC && srv->response_headers)
    50          
    50          
912 0           inject_headers(wsi, srv->response_headers,
913 0           (unsigned char **)&args->p, p_end);
914 18 50         if (handshake_headers_map) {
915             char key[32];
916 0           int klen = wsi_key(key, wsi);
917 0           SV *val = hv_delete(handshake_headers_map, key, klen, 0);
918 0 0         if (val && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV)
    0          
    0          
919 0           inject_headers(wsi, (HV*)SvRV(val), (unsigned char **)&args->p, p_end);
920             }
921 18           break;
922             }
923              
924 3           case LWS_CALLBACK_RECEIVE_PONG:
925             case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
926 3 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
927 3           emit_pong(conn, (const char *)in, len);
928             }
929 3           break;
930              
931 33           case LWS_CALLBACK_RECEIVE:
932             case LWS_CALLBACK_CLIENT_RECEIVE:
933 33 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
934 33           int is_final = lws_is_final_fragment(wsi);
935 33           int is_binary = lws_frame_is_binary(wsi);
936 33 50         DEBUG_LOG("Received data (reason %d): len=%zu final=%d binary=%d", (int)reason, len, is_final, is_binary);
937              
938 33 100         if (lws_is_first_fragment(wsi)) {
939             /* A new message begins. If a completed message is still
940             buffered, deliver it first: with permessage-deflate lws
941             inflates one frame into several callbacks each flagged
942             "final", so we must reassemble across callbacks rather
943             than treat every final callback as a whole message. */
944 31 50         if (conn->recv_complete) {
945 0           conn_ref(conn);
946 0 0         emit_message(conn, conn->recv_buf ? conn->recv_buf : "", conn->recv_len, conn->recv_is_binary, 1);
947 0 0         if (conn->magic != EV_WS_CONN_MAGIC) { conn_unref(conn); break; }
948 0           conn_unref(conn);
949             }
950 31           conn->recv_len = 0;
951 31           conn->recv_complete = 0;
952 31           conn->recv_is_binary = is_binary;
953             }
954              
955             /* Enforce max message size */
956 33 100         if (conn->max_message_size > 0 && conn->recv_len + len > conn->max_message_size) {
    100          
957 1           conn_ref(conn);
958 1           emit_error(conn, "message exceeds max_message_size");
959 1 50         if (conn->magic == EV_WS_CONN_MAGIC) {
960 1           Safefree(conn->recv_buf);
961 1           conn->recv_buf = NULL;
962 1           conn->recv_len = 0;
963 1           conn->recv_alloc = 0;
964 1           conn->recv_complete = 0;
965             }
966 1           conn_unref(conn);
967 1           return -1;
968             }
969              
970             /* Accumulate data */
971 32 100         if (conn->recv_len + len > conn->recv_alloc) {
972 28 50         size_t new_alloc = conn->recv_alloc ? conn->recv_alloc * 2 : 4096;
973 28 50         while (new_alloc < conn->recv_len + len) new_alloc *= 2;
974 28 100         if (conn->max_message_size > 0 && new_alloc > conn->max_message_size)
    50          
975 2           new_alloc = conn->max_message_size;
976 28           Renew(conn->recv_buf, new_alloc, char);
977 28           conn->recv_alloc = new_alloc;
978             }
979 32 50         if (len) /* guard: recv_buf may still be NULL for an empty frame */
980 32           memcpy(conn->recv_buf + conn->recv_len, in, len);
981 32           conn->recv_len += len;
982 32           conn->recv_complete = is_final;
983              
984             /* Queue the connection so its completed message is delivered
985             once the current lws service call drains all available input
986             (see flush_recv_messages). is_final alone is unreliable under
987             permessage-deflate, so we defer delivery: a fully-received
988             message (recv_complete) is emitted either in the first-fragment
989             branch above when the next message starts, or at flush time.
990             The flush-list ref keeps conn alive until then. */
991 32 100         if (!conn->on_flush) {
992 31           conn_ref(conn);
993 31           conn->flush_next = conn->ctx->flush_head;
994 31           conn->ctx->flush_head = conn;
995 31           conn->on_flush = 1;
996             }
997             }
998 32           break;
999              
1000 69           case LWS_CALLBACK_SERVER_WRITEABLE:
1001             case LWS_CALLBACK_CLIENT_WRITEABLE:
1002 69 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
1003             ev_ws_send_t* send;
1004             int n;
1005 105 100         while (conn->send_head) {
1006 36           send = conn->send_head;
1007 36 50         DEBUG_LOG("Writing data: len=%zu mode=%d", send->len, (int)send->write_mode);
1008 36           n = lws_write(wsi, (unsigned char*)send->data + LWS_PRE, send->len, send->write_mode);
1009              
1010 36 50         if (n < 0) {
1011 0           lws_set_wsi_user(wsi, NULL);
1012 0           conn->wsi = NULL;
1013 0           conn->connected = 0;
1014 0           conn->in_fragmented_send = 0;
1015 0           unlink_conn(conn);
1016 0           conn_ref(conn);
1017 0           emit_error(conn, "write failed");
1018 0           conn_unref(conn);
1019 0           conn_unref(conn); /* drop wsi ref */
1020 0           return -1;
1021             }
1022              
1023 36           conn->send_head = send->next;
1024 36 100         if (conn->send_head == NULL) {
1025 34           conn->send_tail = NULL;
1026             }
1027 36           conn->send_queue_bytes -= send->len;
1028 36           Safefree(send);
1029              
1030 36 50         if (lws_send_pipe_choked(wsi)) {
1031 0           lws_callback_on_writable(wsi);
1032 0           break;
1033             }
1034             }
1035 69 100         if (conn->closing && conn->send_head == NULL) {
    50          
1036 36 50         DEBUG_LOG("Closing connection via writeable callback");
1037 36           return -1;
1038             }
1039 33 50         if (conn->send_head == NULL) {
1040 33           emit_drain(conn); /* self-guards on a registered on_drain */
1041             }
1042             }
1043 33           break;
1044              
1045 1           case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
1046 1 50         DEBUG_LOG("CLIENT_CONNECTION_ERROR: conn=%p", conn);
1047 1 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
1048 1 50         const char* err = in ? (const char*)in : "connection error";
1049 1           stop_connect_timer(conn);
1050             /* Clear wsi user pointer so WSI_DESTROY (which follows) sees NULL
1051             and skips — we handle cleanup here to avoid double conn_unref
1052             if Context::DESTROY fires from within the callback. */
1053 1           lws_set_wsi_user(wsi, NULL);
1054 1           conn->wsi = NULL;
1055 1           conn->connected = 0;
1056 1           unlink_conn(conn);
1057 1           conn_ref(conn);
1058 1           emit_error(conn, err);
1059 1           conn_unref(conn);
1060 1           conn_unref(conn); /* drop wsi ref */
1061             }
1062 1           break;
1063              
1064 31           case LWS_CALLBACK_CLIENT_CLOSED:
1065             case LWS_CALLBACK_CLOSED:
1066 31 50         DEBUG_LOG("CLOSED: conn=%p", conn);
1067 31 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
1068 31           int close_code = 1000;
1069 31           stop_connect_timer(conn);
1070 31           const char* close_reason = NULL;
1071             char reason_buf[126];
1072              
1073 31 50         if (in && len >= 2) {
    0          
1074 0           close_code = ((unsigned char *)in)[0] << 8 | ((unsigned char *)in)[1];
1075 0 0         if (len > 2) {
1076 0           size_t rlen = len - 2;
1077 0 0         if (rlen > sizeof(reason_buf) - 1) rlen = sizeof(reason_buf) - 1;
1078 0           memcpy(reason_buf, (char *)in + 2, rlen);
1079 0           reason_buf[rlen] = '\0';
1080 0           close_reason = reason_buf;
1081             }
1082             }
1083              
1084 31 50         DEBUG_LOG("Emitting close: code=%d", close_code);
1085 31           lws_set_wsi_user(wsi, NULL);
1086 31           conn->connected = 0;
1087 31           conn->wsi = NULL;
1088 31           unlink_conn(conn);
1089 31           conn_ref(conn);
1090             /* Deliver a fully-received-but-not-yet-flushed message before
1091             on_close, so on_message precedes on_close even when the data
1092             and the close arrive in the same lws service pass (delivery is
1093             otherwise deferred to flush_recv_messages, which runs after the
1094             service returns -- i.e. after this CLOSED callback). */
1095 31 50         if (conn->recv_complete) {
1096 0 0         emit_message(conn, conn->recv_buf ? conn->recv_buf : "", conn->recv_len, conn->recv_is_binary, 1);
1097 0 0         if (conn->magic == EV_WS_CONN_MAGIC) { conn->recv_len = 0; conn->recv_complete = 0; }
1098             }
1099 31 50         if (conn->magic == EV_WS_CONN_MAGIC)
1100 31           emit_close(conn, close_code, close_reason);
1101 31           conn_unref(conn);
1102 31           conn_unref(conn); /* drop wsi ref */
1103             }
1104 31           break;
1105              
1106 43           case LWS_CALLBACK_PROTOCOL_DESTROY: {
1107 43 50         struct lws_vhost *vh = wsi ? lws_get_vhost(wsi) : NULL;
1108 43 50         if (vh) {
1109 43           ev_ws_server_t *srv = (ev_ws_server_t *)lws_get_vhost_user(vh);
1110 43 100         if (srv && (srv->magic == EV_WS_SRV_MAGIC || srv->magic == EV_WS_SRV_FREED)) {
    100          
    50          
1111             /* SRV_FREED means a failed listen() already dropped the SV
1112             refs but deliberately left protocol_name alive (the vhost
1113             still pointed at it); free it here, once, at teardown. */
1114 18 50         if (srv->magic == EV_WS_SRV_MAGIC)
1115 18           free_server_svs(srv);
1116 18 50         if (srv->protocol_name) Safefree(srv->protocol_name);
1117 18           Safefree(srv);
1118             }
1119             }
1120 43           break;
1121             }
1122              
1123 60           case LWS_CALLBACK_WSI_DESTROY:
1124 60 50         if (handshake_headers_map) {
1125             char key[32];
1126 0           int klen = wsi_key(key, wsi);
1127 0           hv_delete(handshake_headers_map, key, klen, G_DISCARD);
1128             }
1129 60 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    0          
1130 0 0         DEBUG_LOG("WSI destroyed: conn=%p", conn);
1131 0           conn->wsi = NULL;
1132 0           conn->connected = 0;
1133 0           unlink_conn(conn);
1134 0           conn_unref(conn); /* drop wsi ref; frees resources when refcnt hits 0 */
1135             }
1136 60           break;
1137              
1138 1200           default:
1139 1200           break;
1140             }
1141              
1142 2003           return 0;
1143             }
1144              
1145             static const struct lws_protocols protocols[] = {
1146             {
1147             EV_WS_PROTOCOL_NAME,
1148             ws_callback,
1149             0,
1150             65536, /* rx buffer size */
1151             0,
1152             NULL,
1153             0
1154             },
1155             { NULL, NULL, 0, 0, 0, NULL, 0 }
1156             };
1157              
1158             /* Pin libwebsockets' global TLS init for the whole process.
1159              
1160             lws refcounts the global OpenSSL init across contexts created with
1161             LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT. When that refcount falls to zero (the
1162             last such context is destroyed) lws runs OPENSSL_cleanup(), which OpenSSL
1163             1.1+/3.x cannot undo: creating another TLS context then fails, and lws's own
1164             error reporting dereferences torn-down state and crashes. So a program that
1165             destroys its TLS context and makes a new one (reconnect-with-fresh-ctx,
1166             worker recycling, test suites) would break.
1167              
1168             A TLS-using context still needs its own GLOBAL_INIT flag for its TLS to work
1169             (the flag is per-context: "initialize the SSL library at all"); we keep that.
1170             This keepalive is a single extra flagged context, created on first TLS use
1171             and never destroyed or serviced, that holds the refcount floor at >= 1 so no
1172             user context's teardown can trigger the cleanup. Returns 1 if held.
1173              
1174             Idempotent; single-threaded use only (like the rest of this module). */
1175 24           static int ensure_ssl_keepalive(void) {
1176             struct lws_context_creation_info info;
1177 24 100         if (ssl_keepalive_ctx)
1178 7           return 1;
1179 17           memset(&info, 0, sizeof(info));
1180 17           info.port = CONTEXT_PORT_NO_LISTEN;
1181 17           info.protocols = protocols;
1182 17           info.gid = -1;
1183 17           info.uid = -1;
1184 17           info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1185 17           ssl_keepalive_ctx = lws_create_context(&info);
1186 17           return ssl_keepalive_ctx != NULL;
1187             }
1188              
1189             MODULE = EV::Websockets PACKAGE = EV::Websockets
1190              
1191             BOOT:
1192             {
1193 20 50         I_EV_API("EV::Websockets");
    50          
    50          
1194 20           lws_set_log_level(LLL_ERR | LLL_WARN, NULL);
1195             }
1196              
1197             void
1198             _set_debug(int enable);
1199             CODE:
1200             {
1201 0           ev_ws_debug = enable;
1202 0 0         if (enable)
1203 0           lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, NULL);
1204             else
1205 0           lws_set_log_level(LLL_ERR | LLL_WARN, NULL);
1206             }
1207              
1208             MODULE = EV::Websockets PACKAGE = EV::Websockets::Context
1209              
1210             EV::Websockets::Context
1211             _new(char* class, EV::Loop loop, const char* proxy = NULL, int proxy_port = 0, const char* ssl_cert = NULL, const char* ssl_key = NULL, const char* ssl_ca = NULL, int ssl_init = -1);
1212             CODE:
1213             {
1214             struct lws_context_creation_info info;
1215             void* foreign_loops[1];
1216              
1217             PERL_UNUSED_VAR(class);
1218              
1219 24           Newxz(RETVAL, 1, ev_ws_ctx_t);
1220 24           RETVAL->magic = EV_WS_CTX_MAGIC;
1221 24           RETVAL->refcnt = 1; /* Perl owns the context */
1222 24           RETVAL->loop = loop;
1223              
1224 24           foreign_loops[0] = loop;
1225              
1226 24           memset(&info, 0, sizeof(info));
1227 24           info.port = CONTEXT_PORT_NO_LISTEN;
1228 24           info.protocols = protocols;
1229             #ifdef LWS_HAS_EXTENSIONS
1230 24           info.extensions = extensions;
1231             #endif
1232 24           info.gid = -1;
1233 24           info.uid = -1;
1234             /* ssl_init: -1 = manage OpenSSL init (default); 1 = force; 0 = coexist
1235             (leave it to another TLS library). When we manage it, flag this context
1236             so its own TLS works, and also pin the global init in a process-lifetime
1237             keepalive so destroying this context can't drop lws's TLS refcount to
1238             zero (which would run OPENSSL_cleanup() and break later TLS use). */
1239 24           info.options = 0;
1240 24 50         if (ssl_init != 0) {
1241 24           info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1242 24           ensure_ssl_keepalive();
1243             }
1244 24           info.user = RETVAL;
1245 24           info.foreign_loops = foreign_loops;
1246 24           info.vhost_name = "default";
1247            
1248 24 50         if (proxy && strlen(proxy) > 0) {
    50          
1249 0 0         DEBUG_LOG("Context using proxy: %s:%d", proxy, proxy_port);
1250 0           info.http_proxy_address = proxy;
1251 0           info.http_proxy_port = proxy_port;
1252             }
1253              
1254 24 50         if (ssl_cert && strlen(ssl_cert) > 0) {
    50          
1255 0           info.ssl_cert_filepath = ssl_cert;
1256 0           info.ssl_private_key_filepath = ssl_key;
1257 0 0         if (ssl_ca && strlen(ssl_ca) > 0)
    0          
1258 0           info.ssl_ca_filepath = ssl_ca;
1259             }
1260              
1261 24 50         DEBUG_LOG("Creating context (manual integration)");
1262              
1263 24           RETVAL->lws_ctx = lws_create_context(&info);
1264 24 50         if (RETVAL->lws_ctx == NULL) {
1265 0           Safefree(RETVAL);
1266 0           croak("Failed to create libwebsockets context");
1267             }
1268 24           ev_timer_init(&RETVAL->timer, timer_cb, 0.00001, 0.);
1269 24           RETVAL->timer.data = (void*)RETVAL;
1270              
1271 24           schedule_timeout(RETVAL);
1272              
1273 24 50         DEBUG_LOG("Context created successfully");
1274             }
1275             OUTPUT:
1276             RETVAL
1277              
1278             void
1279             DESTROY(EV::Websockets::Context self);
1280             CODE:
1281             {
1282             ev_ws_conn_t* conn;
1283             ev_ws_conn_t* next;
1284              
1285 24 50         if (self->magic != EV_WS_CTX_MAGIC) return;
1286              
1287 24           self->magic = EV_WS_CTX_FREED;
1288              
1289 24           ev_timer_stop(self->loop, &self->timer);
1290              
1291 24           free_all_fd_watchers(self);
1292              
1293             /* Release connections still queued for message delivery (no delivery during
1294             teardown); each holds a flush-list ref. They remain in self->connections
1295             and are torn down by the loop below. */
1296 24 50         while (self->flush_head) {
1297 0           conn = self->flush_head;
1298 0           self->flush_head = conn->flush_next;
1299 0           conn->on_flush = 0;
1300 0           conn->flush_next = NULL;
1301 0           conn_unref(conn); /* release the flush-list ref */
1302             }
1303              
1304             /* Close all connections */
1305 34 100         for (conn = self->connections; conn != NULL; conn = next) {
1306 10           next = conn->next;
1307 10           conn->ctx = NULL;
1308 10           conn->prev = NULL;
1309 10           conn->next = NULL;
1310 10 50         if (conn->wsi) {
1311 10           lws_set_wsi_user(conn->wsi, NULL);
1312 10           conn->wsi = NULL;
1313             }
1314 10           conn_unref(conn); /* drop wsi ref — may free conn */
1315             }
1316 24           self->connections = NULL;
1317              
1318 24 50         if (self->lws_ctx) {
1319 24           lws_context_destroy(self->lws_ctx);
1320 24           self->lws_ctx = NULL;
1321             }
1322              
1323 24           self->loop = NULL;
1324 24 50         if (self->alive_flag) *self->alive_flag = 0;
1325 24           ctx_unref(self); /* drops Perl ref; Safefree happens when refcnt==0 */
1326             }
1327              
1328             EV::Websockets::Connection
1329             connect(EV::Websockets::Context self, ...);
1330             PREINIT:
1331             struct lws_client_connect_info ccinfo;
1332 25           const char* url = NULL;
1333 25           const char* protocol = NULL;
1334 25           char* host = NULL;
1335 25           char* host_header = NULL; /* for IPv6: "[::1]" form */
1336 25           char* path = NULL;
1337 25           int port = 80;
1338 25           int use_ssl = 0;
1339 25           int ssl_verify = 1;
1340 25           char* url_copy = NULL;
1341             char* p;
1342             char* path_start;
1343 25           SV* on_connect = NULL;
1344 25           SV* on_message = NULL;
1345 25           SV* on_close = NULL;
1346 25           SV* on_error = NULL;
1347 25           SV* on_pong = NULL;
1348 25           SV* on_drain = NULL;
1349 25           SV* headers_hv = NULL;
1350 25           size_t max_message_size = 0;
1351 25 50         double connect_timeout = 0;
1352             int i;
1353             CODE:
1354             {
1355 25 50         if (self->magic != EV_WS_CTX_MAGIC) {
1356 0           croak("Context has been destroyed");
1357             }
1358              
1359 130 100         for (i = 1; i < items; i += 2) {
1360 105 50         if (i + 1 >= items) break;
1361 105           const char* key = SvPV_nolen(ST(i));
1362 105           SV* val = ST(i + 1);
1363              
1364 105 100         if (strcmp(key, "url") == 0) {
1365 24           url = SvPV_nolen(val);
1366 81 50         } else if (strcmp(key, "protocol") == 0) {
1367 0           protocol = SvPV_nolen(val);
1368 81 50         } else if (strcmp(key, "ssl_verify") == 0) {
1369 0           ssl_verify = SvTRUE(val);
1370 81 50         } else if (strcmp(key, "max_message_size") == 0) {
1371 0           max_message_size = (size_t)SvUV(val);
1372 81 50         } else if (strcmp(key, "connect_timeout") == 0) {
1373 0           connect_timeout = SvNV(val);
1374 81 100         } else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
    50          
    50          
1375 1           headers_hv = val;
1376 80 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1377 19           on_connect = SvREFCNT_inc(val);
1378 61 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1379 16           on_message = SvREFCNT_inc(val);
1380 45 100         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1381 19           on_close = SvREFCNT_inc(val);
1382 26 100         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1383 23           on_error = SvREFCNT_inc(val);
1384 3 100         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1385 2           on_pong = SvREFCNT_inc(val);
1386 1 50         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1387 1           on_drain = SvREFCNT_inc(val);
1388             }
1389             }
1390              
1391 25 100         if (url == NULL) {
1392 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1393 1           croak("url parameter is required");
1394             }
1395              
1396 24           Newx(url_copy, strlen(url) + 1, char);
1397 24           strcpy(url_copy, url);
1398              
1399 24 50         if (strncasecmp(url_copy, "wss://", 6) == 0) {
1400 0           use_ssl = LCCSCF_USE_SSL;
1401 0 0         if (!ssl_verify) {
1402 0           use_ssl |= LCCSCF_ALLOW_SELFSIGNED
1403             | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK
1404             | LCCSCF_ALLOW_INSECURE;
1405             }
1406 0           port = 443;
1407 0           host = url_copy + 6;
1408 24 100         } else if (strncasecmp(url_copy, "ws://", 5) == 0) {
1409 23           host = url_copy + 5;
1410             } else {
1411 1           Safefree(url_copy);
1412 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1413 1           croak("URL must start with ws:// or wss://");
1414             }
1415              
1416 23           path_start = strchr(host, '/');
1417 23 50         if (path_start) {
1418             /* Allocate separate path string */
1419 0           Newx(path, strlen(path_start) + 1, char);
1420 0           strcpy(path, path_start);
1421 0           *path_start = '\0'; /* Terminate host */
1422             } else {
1423 23           Newx(path, 2, char);
1424 23           strcpy(path, "/");
1425             }
1426              
1427             /* Find port in host (handle IPv6 bracket notation) */
1428 23 50         if (host[0] == '[') {
1429 0           p = strchr(host, ']');
1430 0 0         if (p) {
1431 0           size_t iplen = p - host - 1;
1432 0           int hport = 0;
1433 0           host++; /* skip '[' */
1434 0 0         if (*(p + 1) == ':') {
1435 0           hport = atoi(p + 2);
1436 0           port = hport;
1437             }
1438 0           *p = '\0'; /* terminate IPv6 address */
1439 0           Newx(host_header, iplen + 10, char); /* [addr]:port\0 */
1440 0 0         if (hport && hport != (use_ssl ? 443 : 80))
    0          
    0          
1441 0           snprintf(host_header, iplen + 10, "[%s]:%d", host, hport);
1442             else
1443 0           snprintf(host_header, iplen + 10, "[%s]", host);
1444             }
1445             } else {
1446 23           p = strchr(host, ':');
1447 23 50         if (p) {
1448 23           *p = '\0';
1449 23           port = atoi(p + 1);
1450             }
1451             }
1452              
1453 23           Newxz(RETVAL, 1, ev_ws_conn_t);
1454 23           RETVAL->magic = EV_WS_CONN_MAGIC;
1455 23           RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
1456 23           RETVAL->on_connect = on_connect;
1457 23           RETVAL->on_message = on_message;
1458 23           RETVAL->on_close = on_close;
1459 23           RETVAL->on_error = on_error;
1460 23           RETVAL->on_pong = on_pong;
1461 23           RETVAL->on_drain = on_drain;
1462 23           RETVAL->max_message_size = max_message_size;
1463 23           RETVAL->loop = self->loop;
1464              
1465 23           link_conn(self, RETVAL);
1466              
1467 23           memset(&ccinfo, 0, sizeof(ccinfo));
1468 23           ccinfo.context = self->lws_ctx;
1469 23           ccinfo.address = host;
1470 23           ccinfo.port = port;
1471 23           ccinfo.path = path;
1472 23 50         ccinfo.host = host_header ? host_header : host;
1473 23 50         ccinfo.origin = host_header ? host_header : host;
1474 23           ccinfo.protocol = protocol;
1475             #ifdef LWS_HAS_EXTENSIONS
1476 23           ccinfo.client_exts = extensions;
1477             #endif
1478 23           ccinfo.ssl_connection = use_ssl;
1479 23           ccinfo.userdata = RETVAL;
1480              
1481 23 100         if (headers_hv) {
1482 1           RETVAL->custom_headers = (HV*)SvREFCNT_inc(SvRV(headers_hv));
1483             }
1484              
1485 23           RETVAL->wsi = lws_client_connect_via_info(&ccinfo);
1486              
1487 23           Safefree(path);
1488 23           Safefree(host_header);
1489 23           Safefree(url_copy);
1490              
1491 23 50         if (RETVAL->wsi == NULL) {
1492 0           unlink_conn(RETVAL);
1493 0 0         if (RETVAL->perl_self == NULL) {
1494 0           free_conn_resources(RETVAL);
1495 0           RETVAL->magic = EV_WS_CONN_FREED;
1496 0           Safefree(RETVAL);
1497             } else {
1498 0           conn_unref(RETVAL); /* drop sentinel; Perl DESTROY will handle final cleanup */
1499             }
1500 0           croak("Failed to initiate WebSocket connection");
1501             }
1502 23           conn_unref(RETVAL); /* drop sentinel */
1503              
1504 23 50         if (connect_timeout > 0) {
1505 0           ev_timer_init(&RETVAL->connect_timer, connect_timeout_cb, connect_timeout, 0.);
1506 0           RETVAL->connect_timer.data = (void*)RETVAL;
1507 0           ev_timer_start(self->loop, &RETVAL->connect_timer);
1508 0           RETVAL->connect_timer_active = 1;
1509             }
1510             }
1511             OUTPUT:
1512             RETVAL
1513              
1514             int
1515             listen(EV::Websockets::Context self, ...);
1516             PREINIT:
1517             struct lws_context_creation_info info;
1518 19           int port = 0;
1519 19           const char *name = "server";
1520 19           const char *ssl_cert = NULL;
1521 19           const char *ssl_key = NULL;
1522 19           const char *ssl_ca = NULL;
1523 19           SV* on_connect = NULL;
1524 19           SV* on_message = NULL;
1525 19           SV* on_close = NULL;
1526 19           SV* on_error = NULL;
1527 19           SV* on_pong = NULL;
1528 19           SV* on_drain = NULL;
1529 19           SV* on_handshake = NULL;
1530 19           SV* headers_hv = NULL;
1531 19           size_t max_message_size = 0;
1532 19 50         const char *protocol_name = NULL;
1533             ev_ws_server_t *srv;
1534             struct lws_vhost *vh;
1535             int i;
1536             CODE:
1537             {
1538 19 50         if (self->magic != EV_WS_CTX_MAGIC) {
1539 0           croak("Context has been destroyed");
1540             }
1541              
1542 96 100         for (i = 1; i < items; i += 2) {
1543 77 50         if (i + 1 >= items) break;
1544 77           const char* key = SvPV_nolen(ST(i));
1545 77           SV* val = ST(i + 1);
1546              
1547 77 100         if (strcmp(key, "port") == 0) {
1548 19           port = SvIV(val);
1549 58 100         } else if (strcmp(key, "name") == 0) {
1550 1           name = SvPV_nolen(val);
1551 57 50         } else if (strcmp(key, "protocol") == 0) {
1552 0           protocol_name = SvPV_nolen(val);
1553 57 50         } else if (strcmp(key, "ssl_cert") == 0) {
1554 0           ssl_cert = SvPV_nolen(val);
1555 57 50         } else if (strcmp(key, "ssl_key") == 0) {
1556 0           ssl_key = SvPV_nolen(val);
1557 57 50         } else if (strcmp(key, "ssl_ca") == 0) {
1558 0           ssl_ca = SvPV_nolen(val);
1559 57 100         } else if (strcmp(key, "max_message_size") == 0) {
1560 2           max_message_size = (size_t)SvUV(val);
1561 55 50         } else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
    0          
    0          
1562 0           headers_hv = val;
1563 55 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1564 18           on_connect = SvREFCNT_inc(val);
1565 37 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1566 17           on_message = SvREFCNT_inc(val);
1567 20 100         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1568 17           on_close = SvREFCNT_inc(val);
1569 3 100         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1570 2           on_error = SvREFCNT_inc(val);
1571 1 50         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1572 1           on_pong = SvREFCNT_inc(val);
1573 0 0         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1574 0           on_drain = SvREFCNT_inc(val);
1575 0 0         } else if (strcmp(key, "on_handshake") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1576 0           on_handshake = SvREFCNT_inc(val);
1577             }
1578             }
1579              
1580 19 100         if (strcmp(name, "default") == 0) {
1581 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, on_handshake);
1582 1           croak("listen: vhost name 'default' is reserved");
1583             }
1584              
1585 18           Newxz(srv, 1, ev_ws_server_t);
1586 18           srv->magic = EV_WS_SRV_MAGIC;
1587 18           srv->on_connect = on_connect;
1588 18           srv->on_message = on_message;
1589 18           srv->on_close = on_close;
1590 18           srv->on_error = on_error;
1591 18           srv->on_pong = on_pong;
1592 18           srv->on_drain = on_drain;
1593 18           srv->on_handshake = on_handshake;
1594 18           srv->max_message_size = max_message_size;
1595 18 50         if (headers_hv)
1596 0           srv->response_headers = (HV*)SvREFCNT_inc(SvRV(headers_hv));
1597              
1598 18 50         if (protocol_name) {
1599 0           STRLEN pnlen = strlen(protocol_name);
1600 0           Newx(srv->protocol_name, pnlen + 1, char);
1601 0           memcpy(srv->protocol_name, protocol_name, pnlen + 1);
1602 0           srv->vhost_protocols[0] = protocols[0];
1603 0           srv->vhost_protocols[0].name = srv->protocol_name;
1604 0           srv->vhost_protocols[1] = protocols[1];
1605             }
1606              
1607 18           memset(&info, 0, sizeof(info));
1608 18           info.port = port;
1609 18 50         info.protocols = srv->protocol_name ? srv->vhost_protocols : protocols;
1610 18           info.vhost_name = name;
1611 18           info.user = srv;
1612 18           info.options = 0;
1613              
1614 18 50         if (ssl_cert && ssl_key) {
    0          
1615 0           info.ssl_cert_filepath = ssl_cert;
1616 0           info.ssl_private_key_filepath = ssl_key;
1617 0 0         if (ssl_ca)
1618 0           info.ssl_ca_filepath = ssl_ca;
1619 0           info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1620 0           ensure_ssl_keepalive(); /* pin global init so vhost/context teardown won't OPENSSL_cleanup */
1621             }
1622              
1623 18           vh = lws_create_vhost(self->lws_ctx, &info);
1624 18 50         if (vh == NULL) {
1625 0           free_server_svs(srv);
1626 0 0         if (srv->protocol_name) Safefree(srv->protocol_name);
1627 0           Safefree(srv);
1628 0           croak("Failed to create vhost for listening");
1629             }
1630            
1631 18           RETVAL = lws_get_vhost_listen_port(vh);
1632 18 50         if (RETVAL <= 0) {
1633             /* Vhost created but port bind failed. Release the SV refs now, but
1634             leave protocol_name alive — the live vhost still points at it via
1635             vhost_protocols[0].name, so freeing it here would dangle. Do NOT
1636             Safefree(srv): the vhost retains the pointer. PROTOCOL_DESTROY frees
1637             protocol_name and srv at context teardown; the SRV_FREED sentinel
1638             tells it to skip the (already-released) SV refs. */
1639 0           free_server_svs(srv);
1640 0           srv->magic = EV_WS_SRV_FREED;
1641 0           croak("listen: failed to bind port");
1642             }
1643 18 50         DEBUG_LOG("Server listening on port %d", RETVAL);
1644             }
1645             OUTPUT:
1646             RETVAL
1647              
1648             EV::Websockets::Connection
1649             adopt(EV::Websockets::Context self, ...);
1650             PREINIT:
1651 3           int fd = -1;
1652 3           SV* fh_sv = NULL;
1653 3           SV* initial_data_sv = NULL;
1654 3           SV* on_connect = NULL;
1655 3           SV* on_message = NULL;
1656 3           SV* on_close = NULL;
1657 3           SV* on_error = NULL;
1658 3           SV* on_pong = NULL;
1659 3           SV* on_drain = NULL;
1660 3 50         size_t max_message_size = 0;
1661             int i;
1662             CODE:
1663             {
1664 3 50         if (self->magic != EV_WS_CTX_MAGIC) {
1665 0           croak("Context has been destroyed");
1666             }
1667              
1668 10 100         for (i = 1; i < items; i += 2) {
1669 7 50         if (i + 1 >= items) break;
1670 7           const char* key = SvPV_nolen(ST(i));
1671 7           SV* val = ST(i + 1);
1672              
1673 7 100         if (strcmp(key, "fh") == 0) {
1674 2           fh_sv = val;
1675 5 50         } else if (strcmp(key, "initial_data") == 0) {
1676 0           initial_data_sv = val;
1677 5 50         } else if (strcmp(key, "max_message_size") == 0) {
1678 0           max_message_size = (size_t)SvUV(val);
1679 5 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1680 1           on_connect = SvREFCNT_inc(val);
1681 4 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1682 3           on_message = SvREFCNT_inc(val);
1683 1 50         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1684 0           on_close = SvREFCNT_inc(val);
1685 1 50         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1686 1           on_error = SvREFCNT_inc(val);
1687 0 0         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1688 0           on_pong = SvREFCNT_inc(val);
1689 0 0         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1690 0           on_drain = SvREFCNT_inc(val);
1691             }
1692             }
1693              
1694 3 100         if (fh_sv == NULL) {
1695 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1696 1           croak("fh parameter is required");
1697             }
1698              
1699 2           IO* io = sv_2io(fh_sv);
1700 2 50         PerlIO *ifp = io ? IoIFP(io) : NULL;
1701 2 100         if (!ifp || (fd = PerlIO_fileno(ifp)) < 0) {
    50          
1702 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1703 1           croak("Invalid filehandle");
1704             }
1705              
1706 1           Newxz(RETVAL, 1, ev_ws_conn_t);
1707 1           RETVAL->magic = EV_WS_CONN_MAGIC;
1708 1           RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
1709 1           RETVAL->on_connect = on_connect;
1710 1           RETVAL->on_message = on_message;
1711 1           RETVAL->on_close = on_close;
1712 1           RETVAL->on_error = on_error;
1713 1           RETVAL->on_pong = on_pong;
1714 1           RETVAL->on_drain = on_drain;
1715 1           RETVAL->max_message_size = max_message_size;
1716 1           RETVAL->loop = self->loop;
1717             /* Hold a reference to the underlying glob/IO to prevent Perl
1718             * from closing the fd while lws owns it. For blessed glob refs
1719             * (IO::Socket etc.) we ref the glob itself so framework DESTROY
1720             * methods see it as still alive. */
1721 1           RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
1722 1 50         : SvREFCNT_inc(fh_sv);
1723              
1724 1           link_conn(self, RETVAL);
1725              
1726             {
1727 1           struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
1728 1 50         if (!vh) {
1729             /* Auto-create a server vhost for adoption (no listener needed) */
1730             struct lws_context_creation_info vinfo;
1731 1           memset(&vinfo, 0, sizeof(vinfo));
1732 1           vinfo.port = CONTEXT_PORT_NO_LISTEN_SERVER;
1733 1           vinfo.protocols = protocols;
1734 1           vinfo.vhost_name = "server";
1735 1           vh = lws_create_vhost(self->lws_ctx, &vinfo);
1736             }
1737 1 50         if (!vh) {
1738 0           unlink_conn(RETVAL);
1739 0           free_conn_resources(RETVAL);
1740 0           RETVAL->magic = EV_WS_CONN_FREED;
1741 0           Safefree(RETVAL);
1742 0           croak("Failed to create vhost for adoption");
1743             }
1744 1           pending_adoption = RETVAL;
1745 1 50         if (initial_data_sv && SvOK(initial_data_sv)) {
    0          
1746             STRLEN rdlen;
1747 0           const char *rdbuf = SvPV(initial_data_sv, rdlen);
1748 0           RETVAL->wsi = lws_adopt_socket_vhost_readbuf(vh,
1749             (lws_sockfd_type)fd, rdbuf, rdlen);
1750             } else {
1751 1           RETVAL->wsi = lws_adopt_socket_vhost(vh, (lws_sockfd_type)fd);
1752             }
1753 1           pending_adoption = NULL;
1754             }
1755              
1756 1 50         if (RETVAL->wsi == NULL) {
1757 0           unlink_conn(RETVAL);
1758 0 0         if (RETVAL->perl_self == NULL) {
1759 0           free_conn_resources(RETVAL);
1760 0           RETVAL->magic = EV_WS_CONN_FREED;
1761 0           Safefree(RETVAL);
1762             } else {
1763 0           conn_unref(RETVAL);
1764             }
1765 0           croak("Failed to adopt socket");
1766             }
1767 1           conn_unref(RETVAL); /* drop sentinel */
1768              
1769             /* Kick lws to process the adopted socket's readbuf (needed for lws 4.5+).
1770             * Use the same non-blocking forced-service call as do_lws_service: the
1771             * readbuf is pending work, so lws_service_tsi(ctx, -1, 0) drains it without
1772             * blocking. A plain lws_service(ctx, 0) would block the EV loop here when a
1773             * socket is adopted with no immediately-pending data.
1774             * Guard with extra refs: the service call may synchronously fire
1775             * error/destroy callbacks that would free RETVAL or ctx. */
1776             {
1777 1           int rejected, alive = 1;
1778 1           int* prev_flag = self->alive_flag;
1779 1           conn_ref(RETVAL);
1780 1           ctx_ref(self);
1781 1           self->alive_flag = &alive;
1782 1           lws_service_tsi(self->lws_ctx, -1, 0);
1783 1 50         if (alive)
1784 1           flush_recv_messages(self); /* deliver any reassembled message */
1785 1 50         if (alive) {
1786 1           self->alive_flag = prev_flag;
1787 1           schedule_timeout(self);
1788 0 0         } else if (prev_flag) {
1789             /* Context destroyed during inner lws_service.
1790             Propagate destruction up the alive_flag chain. */
1791 0           *prev_flag = 0;
1792             }
1793 1           rejected = (RETVAL->wsi == NULL);
1794 1           conn_unref(RETVAL);
1795 1           ctx_unref(self);
1796 1 50         if (rejected)
1797 0           croak("Failed to adopt socket");
1798             }
1799             }
1800             OUTPUT:
1801             RETVAL
1802              
1803             void
1804             connections(EV::Websockets::Context self);
1805             PPCODE:
1806             {
1807             ev_ws_conn_t* conn;
1808 2 50         if (self->magic != EV_WS_CTX_MAGIC) XSRETURN_EMPTY;
1809 8 100         for (conn = self->connections; conn != NULL; conn = conn->next) {
1810             /* "connected" stays set during the "closing" drain (close() never
1811             clears it), and is cleared together with wsi on teardown — so this
1812             single predicate covers both the "connected" and "closing" states. */
1813 6 50         if (conn->magic == EV_WS_CONN_MAGIC && conn->connected) {
    100          
1814 4 50         XPUSHs(sv_2mortal(get_conn_sv(conn)));
1815             }
1816             }
1817             }
1818              
1819             MODULE = EV::Websockets PACKAGE = EV::Websockets::Connection
1820              
1821             void
1822             DESTROY(EV::Websockets::Connection self);
1823             CODE:
1824             {
1825 42 50         if (self->magic != EV_WS_CONN_MAGIC) return;
1826              
1827 42 50         DEBUG_LOG("Perl object DESTROY: self=%p wsi=%p", self, self->wsi);
1828              
1829             /* Clear the cached Perl object pointer in the C struct */
1830 42           self->perl_self = NULL;
1831              
1832 42           conn_unref(self); /* drop Perl ref */
1833             }
1834              
1835             void
1836             send(EV::Websockets::Connection self, SV* data);
1837             CODE:
1838             {
1839             STRLEN len;
1840             const char* buf;
1841              
1842 28 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1843 28 100         CHECK_NOT_FRAGMENTING(self);
1844              
1845 27           buf = SvPV(data, len);
1846 27           queue_send(self, buf, len, LWS_WRITE_TEXT);
1847             }
1848              
1849             void
1850             send_binary(EV::Websockets::Connection self, SV* data);
1851             CODE:
1852             {
1853             STRLEN len;
1854             const char* buf;
1855              
1856 4 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1857 4 100         CHECK_NOT_FRAGMENTING(self);
1858              
1859 3           buf = SvPV(data, len);
1860 3           queue_send(self, buf, len, LWS_WRITE_BINARY);
1861             }
1862              
1863             void
1864             send_ping(EV::Websockets::Connection self, SV* data = NULL);
1865             CODE:
1866             {
1867 3           STRLEN len = 0;
1868 3           const char* buf = NULL;
1869            
1870 3 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1871            
1872 3 50         if (data && SvOK(data)) {
    50          
1873 3           buf = SvPV(data, len);
1874 3 50         if (len > 125) len = 125; /* PING payload limit */
1875             }
1876            
1877 3           queue_send(self, buf, len, LWS_WRITE_PING);
1878             }
1879              
1880             SV*
1881             get_protocol(EV::Websockets::Connection self);
1882             CODE:
1883             {
1884 2           SV* val = NULL;
1885 2 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi)
    50          
1886 2           val = hdr_to_sv(self->wsi, WSI_TOKEN_PROTOCOL);
1887 2 50         RETVAL = val ? val : newSV(0);
1888             }
1889             OUTPUT:
1890             RETVAL
1891              
1892             SV*
1893             peer_address(EV::Websockets::Connection self);
1894             CODE:
1895             {
1896             char buf[128];
1897 2           buf[0] = '\0';
1898 2           RETVAL = newSV(0);
1899 2 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi) {
    50          
1900 2           lws_get_peer_simple(self->wsi, buf, sizeof(buf));
1901 2 50         if (buf[0])
1902 2           sv_setpv(RETVAL, buf);
1903             }
1904             }
1905             OUTPUT:
1906             RETVAL
1907              
1908             void
1909             send_pong(EV::Websockets::Connection self, SV* data = NULL);
1910             CODE:
1911             {
1912 1           STRLEN len = 0;
1913 1           const char* buf = NULL;
1914              
1915 1 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1916              
1917 1 50         if (data && SvOK(data)) {
    50          
1918 1           buf = SvPV(data, len);
1919 1 50         if (len > 125) len = 125;
1920             }
1921              
1922 1           queue_send(self, buf, len, LWS_WRITE_PONG);
1923             }
1924              
1925             void
1926             pause_recv(EV::Websockets::Connection self);
1927             CODE:
1928             {
1929 1 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi && self->connected)
    50          
    50          
1930 1           lws_rx_flow_control(self->wsi, 0);
1931             }
1932              
1933             void
1934             resume_recv(EV::Websockets::Connection self);
1935             CODE:
1936             {
1937 1 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi && self->connected)
    50          
    50          
1938 1           lws_rx_flow_control(self->wsi, 1);
1939             }
1940              
1941             void
1942             close(EV::Websockets::Connection self, int code = 1000, const char* reason = NULL);
1943             CODE:
1944             {
1945 21 50         if (self->magic != EV_WS_CONN_MAGIC) {
1946 0           return;
1947             }
1948 21 100         if (!self->wsi || !self->connected || self->closing) {
    50          
    100          
1949 2           return;
1950             }
1951              
1952 19 50         DEBUG_LOG("Closing connection: code=%d reason=%s", code, reason ? reason : "none");
    0          
1953 19 50         if (self->in_fragmented_send) {
1954 0           queue_send(self, NULL, 0, LWS_WRITE_CONTINUATION);
1955 0           self->in_fragmented_send = 0;
1956             }
1957 19           self->closing = 1;
1958 25 100         lws_close_reason(self->wsi, (enum lws_close_status)code,
1959             reason ? (unsigned char*)reason : NULL,
1960 6           reason ? strlen(reason) : 0);
1961 19           lws_callback_on_writable(self->wsi);
1962             }
1963              
1964             int
1965             is_connected(EV::Websockets::Connection self);
1966             CODE:
1967             {
1968 6 50         RETVAL = (self->magic == EV_WS_CONN_MAGIC && self->wsi != NULL && self->connected) ? 1 : 0;
    100          
    100          
    100          
1969             }
1970             OUTPUT:
1971             RETVAL
1972              
1973             int
1974             is_connecting(EV::Websockets::Connection self);
1975             CODE:
1976             {
1977 3 50         RETVAL = (self->magic == EV_WS_CONN_MAGIC && self->wsi != NULL && !self->connected && !self->closing) ? 1 : 0;
    50          
    100          
    50          
    50          
1978             }
1979             OUTPUT:
1980             RETVAL
1981              
1982             const char*
1983             state(EV::Websockets::Connection self);
1984             CODE:
1985             {
1986 8 50         if (self->magic != EV_WS_CONN_MAGIC) RETVAL = "destroyed";
1987 8 100         else if (!self->wsi) RETVAL = "closed";
1988 5 50         else if (self->closing) RETVAL = "closing";
1989 5 100         else if (self->connected) RETVAL = "connected";
1990 3           else RETVAL = "connecting";
1991             }
1992             OUTPUT:
1993             RETVAL
1994              
1995             UV
1996             send_queue_size(EV::Websockets::Connection self);
1997             CODE:
1998             {
1999 0 0         RETVAL = (self->magic == EV_WS_CONN_MAGIC) ? (UV)self->send_queue_bytes : 0;
    0          
2000             }
2001             OUTPUT:
2002             RETVAL
2003              
2004             void
2005             send_fragment(EV::Websockets::Connection self, SV* data, int is_binary = 0, int is_final = 1);
2006             CODE:
2007             {
2008             STRLEN len;
2009             const char* buf;
2010             enum lws_write_protocol mode;
2011              
2012 2 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
2013              
2014 2           buf = SvPV(data, len);
2015              
2016 2 100         if (!self->in_fragmented_send) {
2017 1           mode = is_binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT;
2018 1 50         if (!is_final) {
2019 1           mode |= LWS_WRITE_NO_FIN;
2020 1           self->in_fragmented_send = 1;
2021             }
2022             } else {
2023 1           mode = LWS_WRITE_CONTINUATION;
2024 1 50         if (!is_final) {
2025 0           mode |= LWS_WRITE_NO_FIN;
2026             } else {
2027 1           self->in_fragmented_send = 0;
2028             }
2029             }
2030              
2031 2           queue_send(self, buf, len, mode);
2032             }
2033              
2034             SV*
2035             stash(EV::Websockets::Connection self);
2036             CODE:
2037             {
2038 2 50         if (self->magic != EV_WS_CONN_MAGIC)
2039 0           croak("Connection has been destroyed");
2040 2 100         if (!self->stash)
2041 1           self->stash = newHV();
2042 2           RETVAL = newRV_inc((SV*)self->stash);
2043             }
2044             OUTPUT:
2045             RETVAL