File Coverage

Websockets.xs
Criterion Covered Total %
statement 828 1039 79.6
branch 545 1012 53.8
condition n/a
subroutine n/a
pod n/a
total 1373 2051 66.9


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include
9             #include
10             #include
11              
12             /* Magic numbers for use-after-free detection */
13             #define EV_WS_CTX_MAGIC 0xDEADBEEF
14             #define EV_WS_CTX_FREED 0xFEEDFACE
15             #define EV_WS_CONN_MAGIC 0xCAFEBABE
16             #define EV_WS_CONN_FREED 0xBADC0FFE
17              
18             /* Forward declarations */
19             typedef struct ev_ws_ctx_s ev_ws_ctx_t;
20             typedef struct ev_ws_conn_s ev_ws_conn_t;
21             typedef struct ev_ws_fd_s ev_ws_fd_t;
22              
23             #define EV_WS_SRV_MAGIC 0xFEEDCAFE
24             #define EV_WS_SRV_FREED 0xFEEDDEAD
25              
26             typedef struct ev_ws_server_s {
27             unsigned int magic;
28             SV* on_connect;
29             SV* on_message;
30             SV* on_close;
31             SV* on_error;
32             SV* on_pong;
33             SV* on_drain;
34             SV* on_handshake;
35             HV* response_headers; /* headers to inject into upgrade response */
36             size_t max_message_size;
37             char* protocol_name;
38             struct lws_protocols vhost_protocols[2];
39             } ev_ws_server_t;
40              
41             typedef ev_ws_ctx_t* EV__Websockets__Context;
42             typedef ev_ws_conn_t* EV__Websockets__Connection;
43             typedef struct ev_loop* EV__Loop;
44              
45             /* File descriptor watcher tracking */
46             struct ev_ws_fd_s {
47             ev_io io;
48             ev_ws_ctx_t* ctx;
49             int fd;
50             int poll_events; /* registered POLLIN/POLLOUT interest mask */
51             };
52              
53             /* Send buffer node for pending writes (FAM: data follows the struct) */
54             typedef struct ev_ws_send_s {
55             struct ev_ws_send_s* next;
56             size_t len;
57             enum lws_write_protocol write_mode;
58             char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
59             } ev_ws_send_t;
60              
61             /* Context structure - manages lws_context and connections */
62             struct ev_ws_ctx_s {
63             unsigned int magic;
64             int refcnt; /* lifecycle refcount: Perl + each in-flight lws_service */
65             int* alive_flag; /* points to caller's stack variable during lws_service */
66             struct ev_loop* loop;
67             struct lws_context* lws_ctx;
68             ev_ws_conn_t* connections;
69             ev_ws_fd_t** fd_table;
70             int fd_table_size;
71             ev_timer timer;
72             };
73              
74             /* Connection structure */
75             struct ev_ws_conn_s {
76             unsigned int magic;
77             ev_ws_ctx_t* ctx;
78             struct lws* wsi;
79             ev_ws_conn_t* next; /* linked list */
80             ev_ws_conn_t* prev;
81              
82             int refcnt;
83             SV* perl_self; /* non-owning ptr to the blessed RV's target; its existence
84             holds one refcnt (added in get_conn_sv, dropped in DESTROY) */
85              
86             /* Callbacks */
87             SV* on_connect;
88             SV* on_message;
89             SV* on_close;
90             SV* on_error;
91             SV* on_pong;
92             SV* on_drain;
93              
94             /* Custom Headers */
95             HV* custom_headers;
96              
97             /* Response Headers (client: response headers; server: request headers) */
98             HV* response_headers;
99              
100             /* Receive buffer for fragmented messages */
101             char* recv_buf;
102             size_t recv_len;
103             size_t recv_alloc;
104             int recv_is_binary;
105             size_t max_message_size;
106              
107             /* Send queue */
108             ev_ws_send_t* send_head;
109             ev_ws_send_t* send_tail;
110             size_t send_queue_bytes;
111              
112             /* Adopted file handle (prevents Perl from closing the fd) */
113             SV* adopted_fh;
114              
115             /* Connect timeout */
116             ev_timer connect_timer;
117             int connect_timer_active;
118             struct ev_loop* loop;
119              
120             /* Fragmented send state */
121             int in_fragmented_send;
122              
123             /* Per-connection metadata */
124             HV* stash;
125              
126             /* State */
127             int connected;
128             int closing;
129             };
130              
131             #define EV_WS_PROTOCOL_NAME "ev-websockets"
132              
133             /* Extensions support (compression) */
134             #ifdef LWS_HAS_EXTENSIONS
135             static const struct lws_extension extensions[] = {
136             {
137             "permessage-deflate",
138             lws_extension_callback_pm_deflate,
139             "permessage-deflate; client_no_context_takeover; client_max_window_bits"
140             },
141             { NULL, NULL, NULL }
142             };
143             #endif
144              
145             static int ev_ws_debug = 0;
146              
147             /* Bridges userdata into ws_callback() before lws_adopt returns. */
148             static ev_ws_conn_t* pending_adoption = NULL;
149             static HV* handshake_headers_map = NULL; /* wsi-ptr → per-conn response headers HV */
150             static struct lws_context* ssl_keepalive_ctx = NULL; /* see ensure_ssl_keepalive() */
151              
152             /* Copy an lws header token into a fresh SV, or return NULL if absent/empty */
153 226           static SV* hdr_to_sv(struct lws *wsi, enum lws_token_indexes tok) {
154 226           int total = lws_hdr_total_length(wsi, tok);
155 226 100         if (total > 0) {
156             char *buf;
157             int n;
158 54           Newx(buf, total + 1, char);
159 54           n = lws_hdr_copy(wsi, buf, total + 1, tok);
160 54 50         if (n > 0) {
161 54           SV *val = newSVpvn(buf, n);
162 54           Safefree(buf);
163 54           return val;
164             }
165 0           Safefree(buf);
166             }
167 172           return NULL;
168             }
169              
170             /* Capture a header value into an HV under the given key */
171 224           static void capture_header(struct lws *wsi, HV *hv, enum lws_token_indexes tok,
172             const char *name, STRLEN nlen) {
173 224           SV *val = hdr_to_sv(wsi, tok);
174 224 100         if (val && !hv_store(hv, name, nlen, val, 0))
    50          
175 0           SvREFCNT_dec(val);
176 224           }
177              
178             typedef struct { enum lws_token_indexes tok; const char *name; STRLEN nlen; } header_def_t;
179              
180             static const header_def_t request_hdrs[] = {
181             { WSI_TOKEN_GET_URI, "Path", 4 },
182             { WSI_TOKEN_HOST, "Host", 4 },
183             { WSI_TOKEN_ORIGIN, "Origin", 6 },
184             { WSI_TOKEN_HTTP_COOKIE, "Cookie", 6 },
185             { WSI_TOKEN_HTTP_AUTHORIZATION, "Authorization", 13 },
186             { WSI_TOKEN_PROTOCOL, "Sec-WebSocket-Protocol", 22 },
187             { WSI_TOKEN_HTTP_USER_AGENT, "User-Agent", 10 },
188             { WSI_TOKEN_X_FORWARDED_FOR, "X-Forwarded-For", 15 },
189             };
190             #define N_REQUEST_HDRS (int)(sizeof(request_hdrs)/sizeof(request_hdrs[0]))
191              
192 18           static void capture_request_headers(struct lws *wsi, HV *hv) {
193             int i;
194 162 100         for (i = 0; i < N_REQUEST_HDRS; i++)
195 144           capture_header(wsi, hv, request_hdrs[i].tok,
196 144           request_hdrs[i].name, request_hdrs[i].nlen);
197 18           }
198              
199             /* Inject all key/value pairs from an HV as HTTP headers via lws.
200             Returns -1 if lws rejected a header (client path aborts the handshake);
201             server callers ignore the result and simply stop adding. */
202 1           static int inject_headers(struct lws *wsi, HV *hv,
203             unsigned char **p, unsigned char *end) {
204             HE *entry;
205             char kbuf[256];
206 1           hv_iterinit(hv);
207 2 100         while ((entry = hv_iternext(hv))) {
208             I32 klen;
209 1           const char *key = hv_iterkey(entry, &klen);
210             SV *val_sv;
211             STRLEN vlen;
212             const char *val;
213 1 50         if (klen >= 254) continue;
214 1           val_sv = hv_iterval(hv, entry);
215 1           val = SvPV(val_sv, vlen);
216 1           memcpy(kbuf, key, klen);
217 1           kbuf[klen] = ':';
218 1           kbuf[klen + 1] = '\0';
219 1 50         if (lws_add_http_header_by_name(wsi, (unsigned char *)kbuf,
220             (unsigned char *)val, vlen, p, end))
221 0           return -1;
222             }
223 1           return 0;
224             }
225              
226             /* Format a wsi pointer as the lookup key for handshake_headers_map.
227             Callers must pass a buffer of at least 32 bytes; returns the key length. */
228 0           static int wsi_key(char *buf, struct lws *wsi) {
229 0           return snprintf(buf, 32, "%p", (void*)wsi);
230             }
231              
232             #define DEBUG_LOG(fmt, ...) do { if (ev_ws_debug) fprintf(stderr, "[EV::WS] " fmt "\n", ##__VA_ARGS__); } while(0)
233              
234 255           static void ctx_ref(ev_ws_ctx_t* ctx) {
235 255           ctx->refcnt++;
236 255           }
237              
238 279           static void ctx_unref(ev_ws_ctx_t* ctx) {
239 279 100         if (--ctx->refcnt == 0) {
240 24           Safefree(ctx);
241             }
242 279           }
243              
244             /* Schedule the next lws housekeeping wake-up.
245              
246             lws_service_adjust_timeout returns the ms until lws next needs servicing; 0
247             means "service as soon as possible". This timer only paces lws's time-based
248             work (connection/handshake timeouts, TLS cert aging, draining buffered rx) --
249             socket readability/writability is driven by the per-fd io watcher, not here.
250             We deliberately floor the delay at 1ms rather than arming an ev_idle watcher
251             on 0: do_lws_service is now non-blocking, so an always-ready idle watcher
252             would busy-spin at 100% CPU whenever lws keeps asking for immediate service.
253             A 1ms floor lets the loop block briefly so every other EV watcher still
254             fires, at negligible latency for this coarse, time-based work. */
255 279           static void schedule_timeout(ev_ws_ctx_t* ctx) {
256 279           int delay_ms = lws_service_adjust_timeout(ctx->lws_ctx, 1000, 0);
257             double delay_s;
258              
259 279 100         if (delay_ms < 1) delay_ms = 1;
260 279           delay_s = (double)delay_ms / 1000.0;
261              
262 279           ev_timer_stop(ctx->loop, &ctx->timer);
263 279           ev_timer_set(&ctx->timer, delay_s, 0.);
264 279           ev_timer_start(ctx->loop, &ctx->timer);
265 279           }
266              
267 5           static void do_lws_service(ev_ws_ctx_t* ctx) {
268 5 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
    50          
    50          
269 5           int alive = 1;
270 5           int* prev_flag = ctx->alive_flag;
271 5           ctx->alive_flag = &alive;
272 5           ctx_ref(ctx);
273             /* "Forced service": drive connections that need servicing with no
274             pending socket event -- rx already read into lws's buflist, plus any
275             due lws_sul timeouts. Counterintuitively a timeout_ms of 0 is NOT
276             non-blocking here: lws maps it to its maximum internal poll wait, so
277             the old lws_service(ctx, 0) blocked for seconds on an idle connection
278             and starved every other EV watcher. A negative timeout_ms clamps the
279             wait to 0, servicing only ready + forced-service work and returning at
280             once (see the lws_service_adjust_timeout docs). lws_service_fd(ctx,
281             NULL) is invalid since lws 3.2 and never drains buflists. Per-fd I/O
282             is driven by io_cb via lws_service_fd(&pollfd). */
283 5           lws_service_tsi(ctx->lws_ctx, -1, 0);
284 5 50         if (alive) {
285 5           ctx->alive_flag = prev_flag;
286 5           schedule_timeout(ctx);
287 0 0         } else if (prev_flag) {
288 0           *prev_flag = 0; /* propagate destruction up the alive_flag chain */
289             }
290 5           ctx_unref(ctx);
291             }
292 5           }
293              
294 5           static void timer_cb(EV_P_ ev_timer* w, int revents) {
295             (void)loop; (void)revents;
296 5           do_lws_service((ev_ws_ctx_t*)w->data);
297 5           }
298              
299             /* Forward declarations */
300             static void io_cb(EV_P_ ev_io* w, int revents);
301             static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
302             static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
303             static void conn_ref(ev_ws_conn_t* conn);
304             static void conn_unref(ev_ws_conn_t* conn);
305              
306 131           static SV* get_conn_sv(ev_ws_conn_t* conn) {
307 131 100         if (conn->perl_self) {
308 89           SV* rv = newRV_inc(conn->perl_self);
309 89           sv_bless(rv, gv_stashpv("EV::Websockets::Connection", 1));
310 89           return rv;
311             }
312 42           SV* rv = newSV(0);
313 42           sv_setref_pv(rv, "EV::Websockets::Connection", (void*)conn);
314 42           conn->perl_self = SvRV(rv);
315            
316             /* We are creating a new Perl owner for this connection.
317             Increment refcnt so DESTROY doesn't kill it prematurely if LWS still needs it. */
318 42           conn_ref(conn);
319            
320 42           return rv;
321             }
322              
323             /* All emit_* callbacks share this skeleton: guard on a registered handler,
324             ref the conn across the call (a callback may close/destroy it), push $conn
325             plus handler-specific args, invoke under G_EVAL, and warn — without
326             recursing — on a die. EMIT_BEGIN opens a block that EMIT_END closes;
327             handler-specific args are XPUSHs'd in between. */
328             #define EMIT_BEGIN(conn, cb_field) \
329             if ((conn) == NULL || (conn)->cb_field == NULL) return; \
330             { \
331             SV* _emit_cb = (conn)->cb_field; \
332             dSP; \
333             ENTER; \
334             SAVETMPS; \
335             PUSHMARK(SP); \
336             conn_ref(conn); \
337             XPUSHs(sv_2mortal(get_conn_sv(conn)));
338              
339             #define EMIT_END(conn, label) \
340             PUTBACK; \
341             sv_setsv(ERRSV, &PL_sv_undef); \
342             call_sv(_emit_cb, G_DISCARD | G_EVAL); \
343             if (SvTRUE(ERRSV)) \
344             warn("EV::Websockets: exception in " label ": %s", SvPV_nolen(ERRSV)); \
345             FREETMPS; \
346             LEAVE; \
347             conn_unref(conn); \
348             }
349              
350             /* Guard shared by the send_* methods: croak if the connection has been
351             destroyed or is not currently open for writing. */
352             #define CHECK_CONN_OPEN(self) STMT_START { \
353             if ((self)->magic != EV_WS_CONN_MAGIC) \
354             croak("Connection has been destroyed"); \
355             if (!(self)->wsi || !(self)->connected || (self)->closing) \
356             croak("Connection is not open"); \
357             } STMT_END
358              
359             /* Guard for send/send_binary: a whole-message write must not interleave with
360             an in-progress fragmented message. */
361             #define CHECK_NOT_FRAGMENTING(self) STMT_START { \
362             if ((self)->in_fragmented_send) \
363             croak("Cannot send while a fragmented message is in progress; " \
364             "finish the fragment with send_fragment(..., is_final => 1) first"); \
365             } STMT_END
366              
367 2           static void emit_error(ev_ws_conn_t* conn, const char* error) {
368 2 50         EMIT_BEGIN(conn, on_error);
    50          
    50          
    50          
369 2 50         XPUSHs(sv_2mortal(newSVpv(error, 0)));
370 2 50         EMIT_END(conn, "error handler");
    50          
    50          
    0          
    50          
371             }
372              
373 38           static void emit_connect(ev_ws_conn_t* conn) {
374 38 50         EMIT_BEGIN(conn, on_connect);
    100          
    50          
    50          
375 36 50         if (conn->response_headers)
376 36 50         XPUSHs(sv_2mortal(newRV_inc((SV*)conn->response_headers)));
377             else
378 0 0         XPUSHs(&PL_sv_undef);
379 36 50         EMIT_END(conn, "connect handler");
    50          
    100          
    50          
    50          
380             }
381              
382 30           static void emit_message(ev_ws_conn_t* conn, const char* data, size_t len, int is_binary, int is_final) {
383 30 50         EMIT_BEGIN(conn, on_message);
    50          
    50          
    50          
384 30 50         XPUSHs(sv_2mortal(newSVpvn(data, len)));
385 30 50         XPUSHs(sv_2mortal(newSViv(is_binary)));
386 30 50         XPUSHs(sv_2mortal(newSViv(is_final)));
387 30 50         EMIT_END(conn, "message handler");
    50          
    50          
    0          
    50          
388             }
389              
390 31           static void emit_close(ev_ws_conn_t* conn, int code, const char* reason) {
391 31 50         EMIT_BEGIN(conn, on_close);
    50          
    50          
    50          
392 31 50         XPUSHs(sv_2mortal(newSViv(code)));
393 31 50         XPUSHs(reason ? sv_2mortal(newSVpv(reason, 0)) : &PL_sv_undef);
    50          
394 31 50         EMIT_END(conn, "close handler");
    50          
    100          
    50          
    50          
395             }
396              
397 3           static void emit_pong(ev_ws_conn_t* conn, const char* data, size_t len) {
398 3 50         EMIT_BEGIN(conn, on_pong);
    50          
    50          
    50          
399 3 50         XPUSHs(sv_2mortal(newSVpvn(data ? data : "", len)));
    50          
400 3 50         EMIT_END(conn, "pong handler");
    50          
    100          
    50          
    50          
401             }
402              
403 33           static void emit_drain(ev_ws_conn_t* conn) {
404 33 50         EMIT_BEGIN(conn, on_drain);
    100          
    50          
    50          
405 1 50         EMIT_END(conn, "drain handler");
    50          
    50          
    50          
    50          
406             }
407              
408             /* Stop the connect-timeout watcher if it is running */
409 112           static void stop_connect_timer(ev_ws_conn_t* conn) {
410 112 50         if (conn->connect_timer_active && conn->loop) {
    0          
411 0           ev_timer_stop(conn->loop, &conn->connect_timer);
412 0           conn->connect_timer_active = 0;
413             }
414 112           }
415              
416             /* Drop references to the (up to seven) callback SVs collected while parsing
417             options, on an error path that croaks before they are owned elsewhere.
418             Pass NULL for absent slots; SvREFCNT_dec of NULL is guarded out. */
419 5           static void free_cb_svs(SV* on_connect, SV* on_message, SV* on_close,
420             SV* on_error, SV* on_pong, SV* on_drain,
421             SV* on_handshake) {
422 5 50         if (on_connect) SvREFCNT_dec(on_connect);
423 5 100         if (on_message) SvREFCNT_dec(on_message);
424 5 50         if (on_close) SvREFCNT_dec(on_close);
425 5 50         if (on_error) SvREFCNT_dec(on_error);
426 5 50         if (on_pong) SvREFCNT_dec(on_pong);
427 5 50         if (on_drain) SvREFCNT_dec(on_drain);
428 5 50         if (on_handshake) SvREFCNT_dec(on_handshake);
429 5           }
430              
431             /* Drop the server's callback/header SV references (not protocol_name, which the
432             live vhost still points at, nor the struct itself). Idempotent via NULL-out. */
433 18           static void free_server_svs(ev_ws_server_t* srv) {
434 18 50         if (srv->on_connect) { SvREFCNT_dec(srv->on_connect); srv->on_connect = NULL; }
435 18 100         if (srv->on_message) { SvREFCNT_dec(srv->on_message); srv->on_message = NULL; }
436 18 100         if (srv->on_close) { SvREFCNT_dec(srv->on_close); srv->on_close = NULL; }
437 18 100         if (srv->on_error) { SvREFCNT_dec(srv->on_error); srv->on_error = NULL; }
438 18 100         if (srv->on_pong) { SvREFCNT_dec(srv->on_pong); srv->on_pong = NULL; }
439 18 50         if (srv->on_drain) { SvREFCNT_dec(srv->on_drain); srv->on_drain = NULL; }
440 18 50         if (srv->on_handshake) { SvREFCNT_dec(srv->on_handshake); srv->on_handshake = NULL; }
441 18 50         if (srv->response_headers) {
442 0           SvREFCNT_dec((SV*)srv->response_headers);
443 0           srv->response_headers = NULL;
444             }
445 18           }
446              
447             /* Free a connection's resources (not the struct itself) */
448 42           static void free_conn_resources(ev_ws_conn_t* conn) {
449             ev_ws_send_t* send;
450             ev_ws_send_t* next_send;
451              
452 42 50         DEBUG_LOG("Freeing connection resources: conn=%p", conn);
453              
454 42           stop_connect_timer(conn);
455              
456 42 100         if (conn->on_connect) { SvREFCNT_dec(conn->on_connect); conn->on_connect = NULL; }
457 42 100         if (conn->on_message) { SvREFCNT_dec(conn->on_message); conn->on_message = NULL; }
458 42 100         if (conn->on_close) { SvREFCNT_dec(conn->on_close); conn->on_close = NULL; }
459 42 100         if (conn->on_error) { SvREFCNT_dec(conn->on_error); conn->on_error = NULL; }
460 42 100         if (conn->on_pong) { SvREFCNT_dec(conn->on_pong); conn->on_pong = NULL; }
461 42 100         if (conn->on_drain) { SvREFCNT_dec(conn->on_drain); conn->on_drain = NULL; }
462              
463 42 100         if (conn->custom_headers) {
464 1           SvREFCNT_dec((SV*)conn->custom_headers);
465 1           conn->custom_headers = NULL;
466             }
467              
468 42 100         if (conn->response_headers) {
469 38           SvREFCNT_dec((SV*)conn->response_headers);
470 38           conn->response_headers = NULL;
471             }
472              
473 42 100         if (conn->stash) { SvREFCNT_dec((SV*)conn->stash); conn->stash = NULL; }
474              
475 42 100         if (conn->adopted_fh) { SvREFCNT_dec(conn->adopted_fh); conn->adopted_fh = NULL; }
476              
477 42 100         if (conn->recv_buf) { Safefree(conn->recv_buf); conn->recv_buf = NULL; conn->recv_alloc = 0; }
478              
479 42 50         for (send = conn->send_head; send != NULL; send = next_send) {
480 0           next_send = send->next;
481 0           Safefree(send);
482             }
483 42           conn->send_head = NULL;
484 42           conn->send_tail = NULL;
485 42           conn->send_queue_bytes = 0;
486 42           }
487              
488 209           static void conn_ref(ev_ws_conn_t* conn) {
489 209           conn->refcnt++;
490 209 50         DEBUG_LOG("conn_ref: %p refcnt=%d", conn, conn->refcnt);
491 209           }
492              
493 275           static void conn_unref(ev_ws_conn_t* conn) {
494 275 50         DEBUG_LOG("conn_unref: %p refcnt=%d", conn, conn->refcnt);
495 275 100         if (--conn->refcnt == 0) {
496 42 50         DEBUG_LOG("Actually freeing conn: %p", conn);
497 42           free_conn_resources(conn);
498 42           conn->magic = EV_WS_CONN_FREED;
499 42           Safefree(conn);
500             }
501 275           }
502              
503 42           static void link_conn(ev_ws_ctx_t* ctx, ev_ws_conn_t* conn) {
504 42           conn->ctx = ctx;
505 42           conn->next = ctx->connections;
506 42 100         if (ctx->connections) {
507 19           ctx->connections->prev = conn;
508             }
509 42           ctx->connections = conn;
510 42           }
511              
512 32           static void unlink_conn(ev_ws_conn_t* conn) {
513 32 50         if (conn->ctx == NULL) return;
514              
515 32 100         if (conn->prev) {
516 12           conn->prev->next = conn->next;
517             } else {
518 20           conn->ctx->connections = conn->next;
519             }
520 32 100         if (conn->next) {
521 7           conn->next->prev = conn->prev;
522             }
523 32           conn->prev = NULL;
524 32           conn->next = NULL;
525 32           conn->ctx = NULL;
526             }
527              
528 36           static void queue_send(ev_ws_conn_t* conn, const char* data, size_t len, enum lws_write_protocol write_mode) {
529 36           int was_empty = (conn->send_head == NULL);
530 36           size_t alloc = offsetof(ev_ws_send_t, data) + LWS_PRE + len;
531 36           ev_ws_send_t* send = (ev_ws_send_t*)safemalloc(alloc);
532              
533 36 50         if (data && len > 0) {
    50          
534 36           memcpy(send->data + LWS_PRE, data, len);
535             }
536 36           send->len = len;
537 36           send->write_mode = write_mode;
538 36           send->next = NULL;
539              
540 36 100         if (conn->send_tail) {
541 2           conn->send_tail->next = send;
542 2           conn->send_tail = send;
543             } else {
544 34           conn->send_head = send;
545 34           conn->send_tail = send;
546             }
547 36           conn->send_queue_bytes += len;
548              
549 36 100         if (was_empty && conn->wsi) {
    50          
550 34           lws_callback_on_writable(conn->wsi);
551             }
552 36           }
553              
554 249           static void io_cb(EV_P_ ev_io* w, int revents) {
555 249           ev_ws_fd_t* fdw = (ev_ws_fd_t*)w;
556 249           ev_ws_ctx_t* ctx = fdw->ctx;
557             struct lws_pollfd pollfd;
558              
559             (void)loop;
560              
561 249 50         if (ctx == NULL || ctx->magic != EV_WS_CTX_MAGIC || ctx->lws_ctx == NULL) {
    50          
    50          
562 0           return;
563             }
564              
565 249           pollfd.fd = fdw->fd;
566 249           pollfd.events = fdw->poll_events;
567 249           pollfd.revents = 0;
568              
569 249 100         if (revents & EV_READ) pollfd.revents |= POLLIN;
570 249 100         if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
571 249 50         if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;
572              
573             {
574 249           int alive = 1;
575 249           int* prev_flag = ctx->alive_flag;
576 249           ctx->alive_flag = &alive;
577 249           ctx_ref(ctx);
578 249           lws_service_fd(ctx->lws_ctx, &pollfd);
579 249 50         if (alive) {
580 249           ctx->alive_flag = prev_flag;
581 249           schedule_timeout(ctx);
582 0 0         } else if (prev_flag) {
583 0           *prev_flag = 0; /* propagate destruction up the alive_flag chain */
584             }
585 249           ctx_unref(ctx);
586             }
587             }
588              
589             #define FD_TABLE_INIT_SIZE 64
590              
591 23           static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
592 23 50         int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
593 23 50         while (new_size <= needed) new_size *= 2;
594 23           Renew(ctx->fd_table, new_size, ev_ws_fd_t*);
595 23           Zero(ctx->fd_table + ctx->fd_table_size, new_size - ctx->fd_table_size, ev_ws_fd_t*);
596 23           ctx->fd_table_size = new_size;
597 23           }
598              
599 60           static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
600             ev_ws_fd_t* fdw;
601 60           int ev_events = 0;
602              
603 60 50         if (fd < 0) return;
604 60 100         if (fd >= ctx->fd_table_size) fd_table_grow(ctx, fd);
605              
606 60           fdw = ctx->fd_table[fd];
607 60 50         if (fdw != NULL) {
608 0           change_fd_watcher(ctx, fd, events);
609 0           return;
610             }
611              
612 60           Newxz(fdw, 1, ev_ws_fd_t);
613 60           fdw->ctx = ctx;
614 60           fdw->fd = fd;
615 60           fdw->poll_events = events;
616              
617 60 50         if (events & POLLIN) ev_events |= EV_READ;
618 60 50         if (events & POLLOUT) ev_events |= EV_WRITE;
619              
620 60 50         DEBUG_LOG("add_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);
621              
622 60 50         ev_io_init(&fdw->io, io_cb, fd, ev_events ? ev_events : EV_READ);
623 60 50         if (ev_events)
624 60           ev_io_start(ctx->loop, &fdw->io);
625              
626 60           ctx->fd_table[fd] = fdw;
627             }
628              
629 32           static void del_fd_watcher(ev_ws_ctx_t* ctx, int fd) {
630             ev_ws_fd_t* fdw;
631 32 50         if (fd < 0 || fd >= ctx->fd_table_size) return;
    50          
632 32           fdw = ctx->fd_table[fd];
633 32 50         if (fdw == NULL) return;
634              
635 32           ev_io_stop(ctx->loop, &fdw->io);
636 32           ctx->fd_table[fd] = NULL;
637 32           Safefree(fdw);
638             }
639              
640 393           static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
641             ev_ws_fd_t* fdw;
642 393           int ev_events = 0;
643              
644 393 50         if (fd < 0) return;
645 393 50         if (fd >= ctx->fd_table_size) {
646 0           add_fd_watcher(ctx, fd, events);
647 0           return;
648             }
649 393           fdw = ctx->fd_table[fd];
650 393 50         if (fdw == NULL) {
651 0           add_fd_watcher(ctx, fd, events);
652 0           return;
653             }
654              
655 393           fdw->poll_events = events;
656              
657 393 100         if (events & POLLIN) ev_events |= EV_READ;
658 393 100         if (events & POLLOUT) ev_events |= EV_WRITE;
659              
660 393 50         DEBUG_LOG("change_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);
661              
662 393           ev_io_stop(ctx->loop, &fdw->io);
663 393 100         if (ev_events == 0) return;
664 387           ev_io_set(&fdw->io, fd, ev_events);
665 387           ev_io_start(ctx->loop, &fdw->io);
666             }
667              
668 24           static void free_all_fd_watchers(ev_ws_ctx_t* ctx) {
669             int i;
670 1496 100         for (i = 0; i < ctx->fd_table_size; i++) {
671 1472           ev_ws_fd_t* fdw = ctx->fd_table[i];
672 1472 100         if (fdw) {
673 28 50         if (ctx->loop) ev_io_stop(ctx->loop, &fdw->io);
674 28           Safefree(fdw);
675 28           ctx->fd_table[i] = NULL;
676             }
677             }
678 24           Safefree(ctx->fd_table);
679 24           ctx->fd_table = NULL;
680 24           ctx->fd_table_size = 0;
681 24           }
682              
683 0           static void connect_timeout_cb(EV_P_ ev_timer* w, int revents) {
684 0           ev_ws_conn_t* conn = (ev_ws_conn_t*)w->data;
685             (void)loop; (void)revents;
686 0           conn->connect_timer_active = 0;
687 0           conn->closing = 1;
688 0           conn_ref(conn);
689 0           emit_error(conn, "connect timeout");
690 0 0         if (conn->magic == EV_WS_CONN_MAGIC && conn->wsi)
    0          
691 0           lws_callback_on_writable(conn->wsi);
692 0           conn_unref(conn);
693 0           }
694              
695             /* libwebsockets callback */
696 2190           static int ws_callback(struct lws* wsi, enum lws_callback_reasons reason,
697             void* user, void* in, size_t len) {
698 2190 50         struct lws_context* lws_ctx = wsi ? lws_get_context(wsi) : NULL;
699 2190 50         ev_ws_ctx_t* ctx = lws_ctx ? (ev_ws_ctx_t*)lws_context_user(lws_ctx) : NULL;
700 2190           ev_ws_conn_t* conn = (ev_ws_conn_t*)user;
701              
702 2190 100         if (!conn && wsi && pending_adoption) {
    50          
    100          
703 1 50         DEBUG_LOG("Associating pending adoption: conn=%p", pending_adoption);
704 1           lws_set_wsi_user(wsi, pending_adoption);
705 1           conn = pending_adoption;
706 1 50         if (!conn->wsi) conn->wsi = wsi;
707             }
708              
709 2190 100         if (ctx && ctx->magic != EV_WS_CTX_MAGIC) {
    100          
710             /* Context is being destroyed. Only handle cleanup callbacks. */
711 221 100         if (reason != LWS_CALLBACK_WSI_DESTROY &&
    100          
712             reason != LWS_CALLBACK_PROTOCOL_DESTROY) {
713 150           return 0;
714             }
715             }
716              
717 2040 50         DEBUG_LOG("callback reason=%d user=%p ctx=%p wsi=%p conn=%p", (int)reason, user, ctx, wsi, conn);
718              
719 2040           switch (reason) {
720             /* Poll fd management callbacks */
721 60           case LWS_CALLBACK_ADD_POLL_FD: {
722 60           struct lws_pollargs* pa = (struct lws_pollargs*)in;
723 60 50         DEBUG_LOG("ADD_POLL_FD: ctx=%p fd=%d events=%d", ctx, pa->fd, pa->events);
724 60 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
725 60           add_fd_watcher(ctx, pa->fd, pa->events);
726             }
727 60           break;
728             }
729              
730 32           case LWS_CALLBACK_DEL_POLL_FD: {
731 32           struct lws_pollargs* pa = (struct lws_pollargs*)in;
732 32 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
733 32           del_fd_watcher(ctx, pa->fd);
734             }
735 32           break;
736             }
737              
738 393           case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
739 393           struct lws_pollargs* pa = (struct lws_pollargs*)in;
740 393 50         if (ctx && ctx->magic == EV_WS_CTX_MAGIC) {
    50          
741 393           change_fd_watcher(ctx, pa->fd, pa->events);
742             }
743 393           break;
744             }
745              
746 21           case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: {
747 21 50         if (conn && conn->custom_headers) {
    100          
748 1           unsigned char **p = (unsigned char **)in;
749 1           unsigned char *end = (*p) + len;
750 1 50         if (inject_headers(wsi, conn->custom_headers, p, end))
751 0           return -1;
752             }
753 21           break;
754             }
755            
756 20           case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: {
757 20 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
758             static const header_def_t resp_hdrs[] = {
759             { WSI_TOKEN_HTTP_SET_COOKIE, "Set-Cookie", 10 },
760             { WSI_TOKEN_HTTP_CONTENT_TYPE, "Content-Type", 12 },
761             { WSI_TOKEN_HTTP_SERVER, "Server", 6 },
762             { WSI_TOKEN_PROTOCOL, "Sec-WebSocket-Protocol", 22 },
763             #ifdef WSI_TOKEN_HTTP_LOCATION
764             { WSI_TOKEN_HTTP_LOCATION, "Location", 8 },
765             #endif
766             #ifdef WSI_TOKEN_HTTP_WWW_AUTHENTICATE
767             { WSI_TOKEN_HTTP_WWW_AUTHENTICATE, "WWW-Authenticate", 16 },
768             #endif
769             };
770             int hi;
771 20 50         if (!conn->response_headers) conn->response_headers = newHV();
772 100 100         for (hi = 0; hi < (int)(sizeof(resp_hdrs)/sizeof(resp_hdrs[0])); hi++) {
773 80           capture_header(wsi, conn->response_headers, resp_hdrs[hi].tok,
774 80           resp_hdrs[hi].name, resp_hdrs[hi].nlen);
775             }
776             }
777 20           break;
778             }
779              
780 18           case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: {
781 18           struct lws_vhost *vh = lws_get_vhost(wsi);
782 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
783 18 50         if (srv && srv->magic == EV_WS_SRV_MAGIC && srv->on_handshake) {
    50          
    50          
784 0           HV *hdrs = newHV();
785             int count;
786             SV *result;
787 0           capture_request_headers(wsi, hdrs);
788              
789 0           dSP;
790 0           ENTER;
791 0           SAVETMPS;
792 0 0         PUSHMARK(SP);
793 0 0         XPUSHs(sv_2mortal(newRV_noinc((SV*)hdrs)));
794 0           PUTBACK;
795 0 0         sv_setsv(ERRSV, &PL_sv_undef);
796 0           count = call_sv(srv->on_handshake, G_SCALAR | G_EVAL);
797 0           SPAGAIN;
798 0 0         if (SvTRUE(ERRSV)) {
    0          
799 0 0         warn("EV::Websockets: exception in on_handshake: %s", SvPV_nolen(ERRSV));
800 0 0         if (count) POPs;
801 0           PUTBACK;
802 0 0         FREETMPS;
803 0           LEAVE;
804 0           return -1;
805             }
806 0 0         result = count ? POPs : &PL_sv_undef;
807 0 0         if (!SvTRUE(result)) {
808 0           PUTBACK;
809 0 0         FREETMPS;
810 0           LEAVE;
811 0           return -1;
812             }
813 0 0         if (SvROK(result) && SvTYPE(SvRV(result)) == SVt_PVHV) {
    0          
814             char key[32];
815 0           int klen = wsi_key(key, wsi);
816 0           SV *val = newRV_inc(SvRV(result));
817 0 0         if (!handshake_headers_map)
818 0           handshake_headers_map = newHV();
819 0 0         if (!hv_store(handshake_headers_map, key, klen, val, 0))
820 0           SvREFCNT_dec(val);
821             }
822 0           PUTBACK;
823 0 0         FREETMPS;
824 0           LEAVE;
825             }
826 18           break;
827             }
828              
829             /* Connection callbacks */
830 18           case LWS_CALLBACK_ESTABLISHED:
831 18 50         if (!conn) {
832 18           struct lws_vhost *vh = lws_get_vhost(wsi);
833 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
834 18 50         if (ctx && srv && srv->magic == EV_WS_SRV_MAGIC) {
    50          
    50          
835             ev_ws_conn_t *c;
836 18           Newxz(c, 1, ev_ws_conn_t);
837 18           c->magic = EV_WS_CONN_MAGIC;
838 18           c->wsi = wsi;
839 18           c->refcnt = 1; /* For LWS */
840 18           c->max_message_size = srv->max_message_size;
841 18           c->loop = ctx->loop;
842 18           link_conn(ctx, c);
843 18 50         c->on_connect = srv->on_connect ? SvREFCNT_inc(srv->on_connect) : NULL;
844 18 100         c->on_message = srv->on_message ? SvREFCNT_inc(srv->on_message) : NULL;
845 18 50         c->on_close = srv->on_close ? SvREFCNT_inc(srv->on_close) : NULL;
846 18 100         c->on_error = srv->on_error ? SvREFCNT_inc(srv->on_error) : NULL;
847 18 100         c->on_pong = srv->on_pong ? SvREFCNT_inc(srv->on_pong) : NULL;
848 18 50         c->on_drain = srv->on_drain ? SvREFCNT_inc(srv->on_drain) : NULL;
849              
850 18           c->response_headers = newHV();
851 18           capture_request_headers(wsi, c->response_headers);
852              
853 18           lws_set_wsi_user(wsi, c);
854 18           conn = c;
855             }
856             }
857             /* fallthrough */
858             case LWS_CALLBACK_CLIENT_ESTABLISHED:
859 38 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
860 38 50         DEBUG_LOG("Connected (reason %d): conn=%p", (int)reason, conn);
861 38           stop_connect_timer(conn);
862 38           conn->connected = 1;
863 38           emit_connect(conn);
864             }
865 38           break;
866              
867 18           case LWS_CALLBACK_ADD_HEADERS: {
868 18           struct lws_process_html_args *args = (struct lws_process_html_args *)in;
869 18           unsigned char *p_end = (unsigned char *)args->p + args->max_len;
870 18           struct lws_vhost *vh = lws_get_vhost(wsi);
871 18 50         ev_ws_server_t *srv = vh ? (ev_ws_server_t *)lws_get_vhost_user(vh) : NULL;
872 18 50         if (srv && srv->magic == EV_WS_SRV_MAGIC && srv->response_headers)
    50          
    50          
873 0           inject_headers(wsi, srv->response_headers,
874 0           (unsigned char **)&args->p, p_end);
875 18 50         if (handshake_headers_map) {
876             char key[32];
877 0           int klen = wsi_key(key, wsi);
878 0           SV *val = hv_delete(handshake_headers_map, key, klen, 0);
879 0 0         if (val && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV)
    0          
    0          
880 0           inject_headers(wsi, (HV*)SvRV(val), (unsigned char **)&args->p, p_end);
881             }
882 18           break;
883             }
884              
885 3           case LWS_CALLBACK_RECEIVE_PONG:
886             case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
887 3 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
888 3           emit_pong(conn, (const char *)in, len);
889             }
890 3           break;
891              
892 33           case LWS_CALLBACK_RECEIVE:
893             case LWS_CALLBACK_CLIENT_RECEIVE:
894 33 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
895 33           int is_final = lws_is_final_fragment(wsi);
896 33           int is_binary = lws_frame_is_binary(wsi);
897 33 50         DEBUG_LOG("Received data (reason %d): len=%zu final=%d binary=%d", (int)reason, len, is_final, is_binary);
898              
899 33 100         if (lws_is_first_fragment(wsi)) {
900 31           conn->recv_len = 0;
901 31           conn->recv_is_binary = is_binary;
902             }
903              
904             /* Enforce max message size */
905 33 100         if (conn->max_message_size > 0 && conn->recv_len + len > conn->max_message_size) {
    100          
906 1           conn_ref(conn);
907 1           emit_error(conn, "message exceeds max_message_size");
908 1 50         if (conn->magic == EV_WS_CONN_MAGIC) {
909 1           Safefree(conn->recv_buf);
910 1           conn->recv_buf = NULL;
911 1           conn->recv_len = 0;
912 1           conn->recv_alloc = 0;
913             }
914 1           conn_unref(conn);
915 1           return -1;
916             }
917              
918             /* Accumulate data */
919 32 100         if (conn->recv_len + len > conn->recv_alloc) {
920 28 50         size_t new_alloc = conn->recv_alloc ? conn->recv_alloc * 2 : 4096;
921 28 50         while (new_alloc < conn->recv_len + len) new_alloc *= 2;
922 28 100         if (conn->max_message_size > 0 && new_alloc > conn->max_message_size)
    50          
923 2           new_alloc = conn->max_message_size;
924 28           Renew(conn->recv_buf, new_alloc, char);
925 28           conn->recv_alloc = new_alloc;
926             }
927 32 50         if (len) /* guard: recv_buf may still be NULL for an empty frame */
928 32           memcpy(conn->recv_buf + conn->recv_len, in, len);
929 32           conn->recv_len += len;
930              
931 32 100         if (is_final) {
932 30           conn_ref(conn);
933 30           emit_message(conn, conn->recv_buf, conn->recv_len, conn->recv_is_binary, 1);
934 30 50         if (conn->magic == EV_WS_CONN_MAGIC)
935 30           conn->recv_len = 0;
936 30           conn_unref(conn);
937             }
938             }
939 32           break;
940              
941 69           case LWS_CALLBACK_SERVER_WRITEABLE:
942             case LWS_CALLBACK_CLIENT_WRITEABLE:
943 69 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
944             ev_ws_send_t* send;
945             int n;
946 105 100         while (conn->send_head) {
947 36           send = conn->send_head;
948 36 50         DEBUG_LOG("Writing data: len=%zu mode=%d", send->len, (int)send->write_mode);
949 36           n = lws_write(wsi, (unsigned char*)send->data + LWS_PRE, send->len, send->write_mode);
950              
951 36 50         if (n < 0) {
952 0           lws_set_wsi_user(wsi, NULL);
953 0           conn->wsi = NULL;
954 0           conn->connected = 0;
955 0           conn->in_fragmented_send = 0;
956 0           unlink_conn(conn);
957 0           conn_ref(conn);
958 0           emit_error(conn, "write failed");
959 0           conn_unref(conn);
960 0           conn_unref(conn); /* drop wsi ref */
961 0           return -1;
962             }
963              
964 36           conn->send_head = send->next;
965 36 100         if (conn->send_head == NULL) {
966 34           conn->send_tail = NULL;
967             }
968 36           conn->send_queue_bytes -= send->len;
969 36           Safefree(send);
970              
971 36 50         if (lws_send_pipe_choked(wsi)) {
972 0           lws_callback_on_writable(wsi);
973 0           break;
974             }
975             }
976 69 100         if (conn->closing && conn->send_head == NULL) {
    50          
977 36 50         DEBUG_LOG("Closing connection via writeable callback");
978 36           return -1;
979             }
980 33 50         if (conn->send_head == NULL) {
981 33           emit_drain(conn); /* self-guards on a registered on_drain */
982             }
983             }
984 33           break;
985              
986 1           case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
987 1 50         DEBUG_LOG("CLIENT_CONNECTION_ERROR: conn=%p", conn);
988 1 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
989 1 50         const char* err = in ? (const char*)in : "connection error";
990 1           stop_connect_timer(conn);
991             /* Clear wsi user pointer so WSI_DESTROY (which follows) sees NULL
992             and skips — we handle cleanup here to avoid double conn_unref
993             if Context::DESTROY fires from within the callback. */
994 1           lws_set_wsi_user(wsi, NULL);
995 1           conn->wsi = NULL;
996 1           conn->connected = 0;
997 1           unlink_conn(conn);
998 1           conn_ref(conn);
999 1           emit_error(conn, err);
1000 1           conn_unref(conn);
1001 1           conn_unref(conn); /* drop wsi ref */
1002             }
1003 1           break;
1004              
1005 31           case LWS_CALLBACK_CLIENT_CLOSED:
1006             case LWS_CALLBACK_CLOSED:
1007 31 50         DEBUG_LOG("CLOSED: conn=%p", conn);
1008 31 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    50          
1009 31           int close_code = 1000;
1010 31           stop_connect_timer(conn);
1011 31           const char* close_reason = NULL;
1012             char reason_buf[126];
1013              
1014 31 50         if (in && len >= 2) {
    0          
1015 0           close_code = ((unsigned char *)in)[0] << 8 | ((unsigned char *)in)[1];
1016 0 0         if (len > 2) {
1017 0           size_t rlen = len - 2;
1018 0 0         if (rlen > sizeof(reason_buf) - 1) rlen = sizeof(reason_buf) - 1;
1019 0           memcpy(reason_buf, (char *)in + 2, rlen);
1020 0           reason_buf[rlen] = '\0';
1021 0           close_reason = reason_buf;
1022             }
1023             }
1024              
1025 31 50         DEBUG_LOG("Emitting close: code=%d", close_code);
1026 31           lws_set_wsi_user(wsi, NULL);
1027 31           conn->connected = 0;
1028 31           conn->wsi = NULL;
1029 31           unlink_conn(conn);
1030 31           conn_ref(conn);
1031 31           emit_close(conn, close_code, close_reason);
1032 31           conn_unref(conn);
1033 31           conn_unref(conn); /* drop wsi ref */
1034             }
1035 31           break;
1036              
1037 43           case LWS_CALLBACK_PROTOCOL_DESTROY: {
1038 43 50         struct lws_vhost *vh = wsi ? lws_get_vhost(wsi) : NULL;
1039 43 50         if (vh) {
1040 43           ev_ws_server_t *srv = (ev_ws_server_t *)lws_get_vhost_user(vh);
1041 43 100         if (srv && (srv->magic == EV_WS_SRV_MAGIC || srv->magic == EV_WS_SRV_FREED)) {
    100          
    50          
1042             /* SRV_FREED means a failed listen() already dropped the SV
1043             refs but deliberately left protocol_name alive (the vhost
1044             still pointed at it); free it here, once, at teardown. */
1045 18 50         if (srv->magic == EV_WS_SRV_MAGIC)
1046 18           free_server_svs(srv);
1047 18 50         if (srv->protocol_name) Safefree(srv->protocol_name);
1048 18           Safefree(srv);
1049             }
1050             }
1051 43           break;
1052             }
1053              
1054 60           case LWS_CALLBACK_WSI_DESTROY:
1055 60 50         if (handshake_headers_map) {
1056             char key[32];
1057 0           int klen = wsi_key(key, wsi);
1058 0           hv_delete(handshake_headers_map, key, klen, G_DISCARD);
1059             }
1060 60 50         if (conn && conn->magic == EV_WS_CONN_MAGIC) {
    0          
1061 0 0         DEBUG_LOG("WSI destroyed: conn=%p", conn);
1062 0           conn->wsi = NULL;
1063 0           conn->connected = 0;
1064 0           unlink_conn(conn);
1065 0           conn_unref(conn); /* drop wsi ref; frees resources when refcnt hits 0 */
1066             }
1067 60           break;
1068              
1069 1200           default:
1070 1200           break;
1071             }
1072              
1073 2003           return 0;
1074             }
1075              
1076             static const struct lws_protocols protocols[] = {
1077             {
1078             EV_WS_PROTOCOL_NAME,
1079             ws_callback,
1080             0,
1081             65536, /* rx buffer size */
1082             0,
1083             NULL,
1084             0
1085             },
1086             { NULL, NULL, 0, 0, 0, NULL, 0 }
1087             };
1088              
1089             /* Pin libwebsockets' global TLS init for the whole process.
1090              
1091             lws refcounts the global OpenSSL init across contexts created with
1092             LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT. When that refcount falls to zero (the
1093             last such context is destroyed) lws runs OPENSSL_cleanup(), which OpenSSL
1094             1.1+/3.x cannot undo: creating another TLS context then fails, and lws's own
1095             error reporting dereferences torn-down state and crashes. So a program that
1096             destroys its TLS context and makes a new one (reconnect-with-fresh-ctx,
1097             worker recycling, test suites) would break.
1098              
1099             A TLS-using context still needs its own GLOBAL_INIT flag for its TLS to work
1100             (the flag is per-context: "initialize the SSL library at all"); we keep that.
1101             This keepalive is a single extra flagged context, created on first TLS use
1102             and never destroyed or serviced, that holds the refcount floor at >= 1 so no
1103             user context's teardown can trigger the cleanup. Returns 1 if held.
1104              
1105             Idempotent; single-threaded use only (like the rest of this module). */
1106 24           static int ensure_ssl_keepalive(void) {
1107             struct lws_context_creation_info info;
1108 24 100         if (ssl_keepalive_ctx)
1109 7           return 1;
1110 17           memset(&info, 0, sizeof(info));
1111 17           info.port = CONTEXT_PORT_NO_LISTEN;
1112 17           info.protocols = protocols;
1113 17           info.gid = -1;
1114 17           info.uid = -1;
1115 17           info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1116 17           ssl_keepalive_ctx = lws_create_context(&info);
1117 17           return ssl_keepalive_ctx != NULL;
1118             }
1119              
1120             MODULE = EV::Websockets PACKAGE = EV::Websockets
1121              
1122             BOOT:
1123             {
1124 20 50         I_EV_API("EV::Websockets");
    50          
    50          
1125 20           lws_set_log_level(LLL_ERR | LLL_WARN, NULL);
1126             }
1127              
1128             void
1129             _set_debug(int enable);
1130             CODE:
1131             {
1132 0           ev_ws_debug = enable;
1133 0 0         if (enable)
1134 0           lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, NULL);
1135             else
1136 0           lws_set_log_level(LLL_ERR | LLL_WARN, NULL);
1137             }
1138              
1139             MODULE = EV::Websockets PACKAGE = EV::Websockets::Context
1140              
1141             EV::Websockets::Context
1142             _new(char* class, EV::Loop loop, const char* proxy = NULL, int proxy_port = 0, const char* ssl_cert = NULL, const char* ssl_key = NULL, const char* ssl_ca = NULL, int ssl_init = -1);
1143             CODE:
1144             {
1145             struct lws_context_creation_info info;
1146             void* foreign_loops[1];
1147              
1148             PERL_UNUSED_VAR(class);
1149              
1150 24           Newxz(RETVAL, 1, ev_ws_ctx_t);
1151 24           RETVAL->magic = EV_WS_CTX_MAGIC;
1152 24           RETVAL->refcnt = 1; /* Perl owns the context */
1153 24           RETVAL->loop = loop;
1154              
1155 24           foreign_loops[0] = loop;
1156              
1157 24           memset(&info, 0, sizeof(info));
1158 24           info.port = CONTEXT_PORT_NO_LISTEN;
1159 24           info.protocols = protocols;
1160             #ifdef LWS_HAS_EXTENSIONS
1161 24           info.extensions = extensions;
1162             #endif
1163 24           info.gid = -1;
1164 24           info.uid = -1;
1165             /* ssl_init: -1 = manage OpenSSL init (default); 1 = force; 0 = coexist
1166             (leave it to another TLS library). When we manage it, flag this context
1167             so its own TLS works, and also pin the global init in a process-lifetime
1168             keepalive so destroying this context can't drop lws's TLS refcount to
1169             zero (which would run OPENSSL_cleanup() and break later TLS use). */
1170 24           info.options = 0;
1171 24 50         if (ssl_init != 0) {
1172 24           info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1173 24           ensure_ssl_keepalive();
1174             }
1175 24           info.user = RETVAL;
1176 24           info.foreign_loops = foreign_loops;
1177 24           info.vhost_name = "default";
1178            
1179 24 50         if (proxy && strlen(proxy) > 0) {
    50          
1180 0 0         DEBUG_LOG("Context using proxy: %s:%d", proxy, proxy_port);
1181 0           info.http_proxy_address = proxy;
1182 0           info.http_proxy_port = proxy_port;
1183             }
1184              
1185 24 50         if (ssl_cert && strlen(ssl_cert) > 0) {
    50          
1186 0           info.ssl_cert_filepath = ssl_cert;
1187 0           info.ssl_private_key_filepath = ssl_key;
1188 0 0         if (ssl_ca && strlen(ssl_ca) > 0)
    0          
1189 0           info.ssl_ca_filepath = ssl_ca;
1190             }
1191              
1192 24 50         DEBUG_LOG("Creating context (manual integration)");
1193              
1194 24           RETVAL->lws_ctx = lws_create_context(&info);
1195 24 50         if (RETVAL->lws_ctx == NULL) {
1196 0           Safefree(RETVAL);
1197 0           croak("Failed to create libwebsockets context");
1198             }
1199 24           ev_timer_init(&RETVAL->timer, timer_cb, 0.00001, 0.);
1200 24           RETVAL->timer.data = (void*)RETVAL;
1201              
1202 24           schedule_timeout(RETVAL);
1203              
1204 24 50         DEBUG_LOG("Context created successfully");
1205             }
1206             OUTPUT:
1207             RETVAL
1208              
1209             void
1210             DESTROY(EV::Websockets::Context self);
1211             CODE:
1212             {
1213             ev_ws_conn_t* conn;
1214             ev_ws_conn_t* next;
1215              
1216 24 50         if (self->magic != EV_WS_CTX_MAGIC) return;
1217              
1218 24           self->magic = EV_WS_CTX_FREED;
1219              
1220 24           ev_timer_stop(self->loop, &self->timer);
1221              
1222 24           free_all_fd_watchers(self);
1223              
1224             /* Close all connections */
1225 34 100         for (conn = self->connections; conn != NULL; conn = next) {
1226 10           next = conn->next;
1227 10           conn->ctx = NULL;
1228 10           conn->prev = NULL;
1229 10           conn->next = NULL;
1230 10 50         if (conn->wsi) {
1231 10           lws_set_wsi_user(conn->wsi, NULL);
1232 10           conn->wsi = NULL;
1233             }
1234 10           conn_unref(conn); /* drop wsi ref — may free conn */
1235             }
1236 24           self->connections = NULL;
1237              
1238 24 50         if (self->lws_ctx) {
1239 24           lws_context_destroy(self->lws_ctx);
1240 24           self->lws_ctx = NULL;
1241             }
1242              
1243 24           self->loop = NULL;
1244 24 50         if (self->alive_flag) *self->alive_flag = 0;
1245 24           ctx_unref(self); /* drops Perl ref; Safefree happens when refcnt==0 */
1246             }
1247              
1248             EV::Websockets::Connection
1249             connect(EV::Websockets::Context self, ...);
1250             PREINIT:
1251             struct lws_client_connect_info ccinfo;
1252 25           const char* url = NULL;
1253 25           const char* protocol = NULL;
1254 25           char* host = NULL;
1255 25           char* host_header = NULL; /* for IPv6: "[::1]" form */
1256 25           char* path = NULL;
1257 25           int port = 80;
1258 25           int use_ssl = 0;
1259 25           int ssl_verify = 1;
1260 25           char* url_copy = NULL;
1261             char* p;
1262             char* path_start;
1263 25           SV* on_connect = NULL;
1264 25           SV* on_message = NULL;
1265 25           SV* on_close = NULL;
1266 25           SV* on_error = NULL;
1267 25           SV* on_pong = NULL;
1268 25           SV* on_drain = NULL;
1269 25           SV* headers_hv = NULL;
1270 25           size_t max_message_size = 0;
1271 25 50         double connect_timeout = 0;
1272             int i;
1273             CODE:
1274             {
1275 25 50         if (self->magic != EV_WS_CTX_MAGIC) {
1276 0           croak("Context has been destroyed");
1277             }
1278              
1279 130 100         for (i = 1; i < items; i += 2) {
1280 105 50         if (i + 1 >= items) break;
1281 105           const char* key = SvPV_nolen(ST(i));
1282 105           SV* val = ST(i + 1);
1283              
1284 105 100         if (strcmp(key, "url") == 0) {
1285 24           url = SvPV_nolen(val);
1286 81 50         } else if (strcmp(key, "protocol") == 0) {
1287 0           protocol = SvPV_nolen(val);
1288 81 50         } else if (strcmp(key, "ssl_verify") == 0) {
1289 0           ssl_verify = SvTRUE(val);
1290 81 50         } else if (strcmp(key, "max_message_size") == 0) {
1291 0           max_message_size = (size_t)SvUV(val);
1292 81 50         } else if (strcmp(key, "connect_timeout") == 0) {
1293 0           connect_timeout = SvNV(val);
1294 81 100         } else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
    50          
    50          
1295 1           headers_hv = val;
1296 80 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1297 19           on_connect = SvREFCNT_inc(val);
1298 61 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1299 16           on_message = SvREFCNT_inc(val);
1300 45 100         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1301 19           on_close = SvREFCNT_inc(val);
1302 26 100         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1303 23           on_error = SvREFCNT_inc(val);
1304 3 100         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1305 2           on_pong = SvREFCNT_inc(val);
1306 1 50         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1307 1           on_drain = SvREFCNT_inc(val);
1308             }
1309             }
1310              
1311 25 100         if (url == NULL) {
1312 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1313 1           croak("url parameter is required");
1314             }
1315              
1316 24           Newx(url_copy, strlen(url) + 1, char);
1317 24           strcpy(url_copy, url);
1318              
1319 24 50         if (strncasecmp(url_copy, "wss://", 6) == 0) {
1320 0           use_ssl = LCCSCF_USE_SSL;
1321 0 0         if (!ssl_verify) {
1322 0           use_ssl |= LCCSCF_ALLOW_SELFSIGNED
1323             | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK
1324             | LCCSCF_ALLOW_INSECURE;
1325             }
1326 0           port = 443;
1327 0           host = url_copy + 6;
1328 24 100         } else if (strncasecmp(url_copy, "ws://", 5) == 0) {
1329 23           host = url_copy + 5;
1330             } else {
1331 1           Safefree(url_copy);
1332 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1333 1           croak("URL must start with ws:// or wss://");
1334             }
1335              
1336 23           path_start = strchr(host, '/');
1337 23 50         if (path_start) {
1338             /* Allocate separate path string */
1339 0           Newx(path, strlen(path_start) + 1, char);
1340 0           strcpy(path, path_start);
1341 0           *path_start = '\0'; /* Terminate host */
1342             } else {
1343 23           Newx(path, 2, char);
1344 23           strcpy(path, "/");
1345             }
1346              
1347             /* Find port in host (handle IPv6 bracket notation) */
1348 23 50         if (host[0] == '[') {
1349 0           p = strchr(host, ']');
1350 0 0         if (p) {
1351 0           size_t iplen = p - host - 1;
1352 0           int hport = 0;
1353 0           host++; /* skip '[' */
1354 0 0         if (*(p + 1) == ':') {
1355 0           hport = atoi(p + 2);
1356 0           port = hport;
1357             }
1358 0           *p = '\0'; /* terminate IPv6 address */
1359 0           Newx(host_header, iplen + 10, char); /* [addr]:port\0 */
1360 0 0         if (hport && hport != (use_ssl ? 443 : 80))
    0          
    0          
1361 0           snprintf(host_header, iplen + 10, "[%s]:%d", host, hport);
1362             else
1363 0           snprintf(host_header, iplen + 10, "[%s]", host);
1364             }
1365             } else {
1366 23           p = strchr(host, ':');
1367 23 50         if (p) {
1368 23           *p = '\0';
1369 23           port = atoi(p + 1);
1370             }
1371             }
1372              
1373 23           Newxz(RETVAL, 1, ev_ws_conn_t);
1374 23           RETVAL->magic = EV_WS_CONN_MAGIC;
1375 23           RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
1376 23           RETVAL->on_connect = on_connect;
1377 23           RETVAL->on_message = on_message;
1378 23           RETVAL->on_close = on_close;
1379 23           RETVAL->on_error = on_error;
1380 23           RETVAL->on_pong = on_pong;
1381 23           RETVAL->on_drain = on_drain;
1382 23           RETVAL->max_message_size = max_message_size;
1383 23           RETVAL->loop = self->loop;
1384              
1385 23           link_conn(self, RETVAL);
1386              
1387 23           memset(&ccinfo, 0, sizeof(ccinfo));
1388 23           ccinfo.context = self->lws_ctx;
1389 23           ccinfo.address = host;
1390 23           ccinfo.port = port;
1391 23           ccinfo.path = path;
1392 23 50         ccinfo.host = host_header ? host_header : host;
1393 23 50         ccinfo.origin = host_header ? host_header : host;
1394 23           ccinfo.protocol = protocol;
1395             #ifdef LWS_HAS_EXTENSIONS
1396 23           ccinfo.client_exts = extensions;
1397             #endif
1398 23           ccinfo.ssl_connection = use_ssl;
1399 23           ccinfo.userdata = RETVAL;
1400              
1401 23 100         if (headers_hv) {
1402 1           RETVAL->custom_headers = (HV*)SvREFCNT_inc(SvRV(headers_hv));
1403             }
1404              
1405 23           RETVAL->wsi = lws_client_connect_via_info(&ccinfo);
1406              
1407 23           Safefree(path);
1408 23           Safefree(host_header);
1409 23           Safefree(url_copy);
1410              
1411 23 50         if (RETVAL->wsi == NULL) {
1412 0           unlink_conn(RETVAL);
1413 0 0         if (RETVAL->perl_self == NULL) {
1414 0           free_conn_resources(RETVAL);
1415 0           RETVAL->magic = EV_WS_CONN_FREED;
1416 0           Safefree(RETVAL);
1417             } else {
1418 0           conn_unref(RETVAL); /* drop sentinel; Perl DESTROY will handle final cleanup */
1419             }
1420 0           croak("Failed to initiate WebSocket connection");
1421             }
1422 23           conn_unref(RETVAL); /* drop sentinel */
1423              
1424 23 50         if (connect_timeout > 0) {
1425 0           ev_timer_init(&RETVAL->connect_timer, connect_timeout_cb, connect_timeout, 0.);
1426 0           RETVAL->connect_timer.data = (void*)RETVAL;
1427 0           ev_timer_start(self->loop, &RETVAL->connect_timer);
1428 0           RETVAL->connect_timer_active = 1;
1429             }
1430             }
1431             OUTPUT:
1432             RETVAL
1433              
1434             int
1435             listen(EV::Websockets::Context self, ...);
1436             PREINIT:
1437             struct lws_context_creation_info info;
1438 19           int port = 0;
1439 19           const char *name = "server";
1440 19           const char *ssl_cert = NULL;
1441 19           const char *ssl_key = NULL;
1442 19           const char *ssl_ca = NULL;
1443 19           SV* on_connect = NULL;
1444 19           SV* on_message = NULL;
1445 19           SV* on_close = NULL;
1446 19           SV* on_error = NULL;
1447 19           SV* on_pong = NULL;
1448 19           SV* on_drain = NULL;
1449 19           SV* on_handshake = NULL;
1450 19           SV* headers_hv = NULL;
1451 19           size_t max_message_size = 0;
1452 19 50         const char *protocol_name = NULL;
1453             ev_ws_server_t *srv;
1454             struct lws_vhost *vh;
1455             int i;
1456             CODE:
1457             {
1458 19 50         if (self->magic != EV_WS_CTX_MAGIC) {
1459 0           croak("Context has been destroyed");
1460             }
1461              
1462 96 100         for (i = 1; i < items; i += 2) {
1463 77 50         if (i + 1 >= items) break;
1464 77           const char* key = SvPV_nolen(ST(i));
1465 77           SV* val = ST(i + 1);
1466              
1467 77 100         if (strcmp(key, "port") == 0) {
1468 19           port = SvIV(val);
1469 58 100         } else if (strcmp(key, "name") == 0) {
1470 1           name = SvPV_nolen(val);
1471 57 50         } else if (strcmp(key, "protocol") == 0) {
1472 0           protocol_name = SvPV_nolen(val);
1473 57 50         } else if (strcmp(key, "ssl_cert") == 0) {
1474 0           ssl_cert = SvPV_nolen(val);
1475 57 50         } else if (strcmp(key, "ssl_key") == 0) {
1476 0           ssl_key = SvPV_nolen(val);
1477 57 50         } else if (strcmp(key, "ssl_ca") == 0) {
1478 0           ssl_ca = SvPV_nolen(val);
1479 57 100         } else if (strcmp(key, "max_message_size") == 0) {
1480 2           max_message_size = (size_t)SvUV(val);
1481 55 50         } else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
    0          
    0          
1482 0           headers_hv = val;
1483 55 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1484 18           on_connect = SvREFCNT_inc(val);
1485 37 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1486 17           on_message = SvREFCNT_inc(val);
1487 20 100         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1488 17           on_close = SvREFCNT_inc(val);
1489 3 100         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1490 2           on_error = SvREFCNT_inc(val);
1491 1 50         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1492 1           on_pong = SvREFCNT_inc(val);
1493 0 0         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1494 0           on_drain = SvREFCNT_inc(val);
1495 0 0         } else if (strcmp(key, "on_handshake") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1496 0           on_handshake = SvREFCNT_inc(val);
1497             }
1498             }
1499              
1500 19 100         if (strcmp(name, "default") == 0) {
1501 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, on_handshake);
1502 1           croak("listen: vhost name 'default' is reserved");
1503             }
1504              
1505 18           Newxz(srv, 1, ev_ws_server_t);
1506 18           srv->magic = EV_WS_SRV_MAGIC;
1507 18           srv->on_connect = on_connect;
1508 18           srv->on_message = on_message;
1509 18           srv->on_close = on_close;
1510 18           srv->on_error = on_error;
1511 18           srv->on_pong = on_pong;
1512 18           srv->on_drain = on_drain;
1513 18           srv->on_handshake = on_handshake;
1514 18           srv->max_message_size = max_message_size;
1515 18 50         if (headers_hv)
1516 0           srv->response_headers = (HV*)SvREFCNT_inc(SvRV(headers_hv));
1517              
1518 18 50         if (protocol_name) {
1519 0           STRLEN pnlen = strlen(protocol_name);
1520 0           Newx(srv->protocol_name, pnlen + 1, char);
1521 0           memcpy(srv->protocol_name, protocol_name, pnlen + 1);
1522 0           srv->vhost_protocols[0] = protocols[0];
1523 0           srv->vhost_protocols[0].name = srv->protocol_name;
1524 0           srv->vhost_protocols[1] = protocols[1];
1525             }
1526              
1527 18           memset(&info, 0, sizeof(info));
1528 18           info.port = port;
1529 18 50         info.protocols = srv->protocol_name ? srv->vhost_protocols : protocols;
1530 18           info.vhost_name = name;
1531 18           info.user = srv;
1532 18           info.options = 0;
1533              
1534 18 50         if (ssl_cert && ssl_key) {
    0          
1535 0           info.ssl_cert_filepath = ssl_cert;
1536 0           info.ssl_private_key_filepath = ssl_key;
1537 0 0         if (ssl_ca)
1538 0           info.ssl_ca_filepath = ssl_ca;
1539 0           info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
1540 0           ensure_ssl_keepalive(); /* pin global init so vhost/context teardown won't OPENSSL_cleanup */
1541             }
1542              
1543 18           vh = lws_create_vhost(self->lws_ctx, &info);
1544 18 50         if (vh == NULL) {
1545 0           free_server_svs(srv);
1546 0 0         if (srv->protocol_name) Safefree(srv->protocol_name);
1547 0           Safefree(srv);
1548 0           croak("Failed to create vhost for listening");
1549             }
1550            
1551 18           RETVAL = lws_get_vhost_listen_port(vh);
1552 18 50         if (RETVAL <= 0) {
1553             /* Vhost created but port bind failed. Release the SV refs now, but
1554             leave protocol_name alive — the live vhost still points at it via
1555             vhost_protocols[0].name, so freeing it here would dangle. Do NOT
1556             Safefree(srv): the vhost retains the pointer. PROTOCOL_DESTROY frees
1557             protocol_name and srv at context teardown; the SRV_FREED sentinel
1558             tells it to skip the (already-released) SV refs. */
1559 0           free_server_svs(srv);
1560 0           srv->magic = EV_WS_SRV_FREED;
1561 0           croak("listen: failed to bind port");
1562             }
1563 18 50         DEBUG_LOG("Server listening on port %d", RETVAL);
1564             }
1565             OUTPUT:
1566             RETVAL
1567              
1568             EV::Websockets::Connection
1569             adopt(EV::Websockets::Context self, ...);
1570             PREINIT:
1571 3           int fd = -1;
1572 3           SV* fh_sv = NULL;
1573 3           SV* initial_data_sv = NULL;
1574 3           SV* on_connect = NULL;
1575 3           SV* on_message = NULL;
1576 3           SV* on_close = NULL;
1577 3           SV* on_error = NULL;
1578 3           SV* on_pong = NULL;
1579 3           SV* on_drain = NULL;
1580 3 50         size_t max_message_size = 0;
1581             int i;
1582             CODE:
1583             {
1584 3 50         if (self->magic != EV_WS_CTX_MAGIC) {
1585 0           croak("Context has been destroyed");
1586             }
1587              
1588 10 100         for (i = 1; i < items; i += 2) {
1589 7 50         if (i + 1 >= items) break;
1590 7           const char* key = SvPV_nolen(ST(i));
1591 7           SV* val = ST(i + 1);
1592              
1593 7 100         if (strcmp(key, "fh") == 0) {
1594 2           fh_sv = val;
1595 5 50         } else if (strcmp(key, "initial_data") == 0) {
1596 0           initial_data_sv = val;
1597 5 50         } else if (strcmp(key, "max_message_size") == 0) {
1598 0           max_message_size = (size_t)SvUV(val);
1599 5 100         } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1600 1           on_connect = SvREFCNT_inc(val);
1601 4 100         } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1602 3           on_message = SvREFCNT_inc(val);
1603 1 50         } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1604 0           on_close = SvREFCNT_inc(val);
1605 1 50         } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    50          
    50          
1606 1           on_error = SvREFCNT_inc(val);
1607 0 0         } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1608 0           on_pong = SvREFCNT_inc(val);
1609 0 0         } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
    0          
    0          
1610 0           on_drain = SvREFCNT_inc(val);
1611             }
1612             }
1613              
1614 3 100         if (fh_sv == NULL) {
1615 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1616 1           croak("fh parameter is required");
1617             }
1618              
1619 2           IO* io = sv_2io(fh_sv);
1620 2 50         PerlIO *ifp = io ? IoIFP(io) : NULL;
1621 2 100         if (!ifp || (fd = PerlIO_fileno(ifp)) < 0) {
    50          
1622 1           free_cb_svs(on_connect, on_message, on_close, on_error, on_pong, on_drain, NULL);
1623 1           croak("Invalid filehandle");
1624             }
1625              
1626 1           Newxz(RETVAL, 1, ev_ws_conn_t);
1627 1           RETVAL->magic = EV_WS_CONN_MAGIC;
1628 1           RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
1629 1           RETVAL->on_connect = on_connect;
1630 1           RETVAL->on_message = on_message;
1631 1           RETVAL->on_close = on_close;
1632 1           RETVAL->on_error = on_error;
1633 1           RETVAL->on_pong = on_pong;
1634 1           RETVAL->on_drain = on_drain;
1635 1           RETVAL->max_message_size = max_message_size;
1636 1           RETVAL->loop = self->loop;
1637             /* Hold a reference to the underlying glob/IO to prevent Perl
1638             * from closing the fd while lws owns it. For blessed glob refs
1639             * (IO::Socket etc.) we ref the glob itself so framework DESTROY
1640             * methods see it as still alive. */
1641 1           RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
1642 1 50         : SvREFCNT_inc(fh_sv);
1643              
1644 1           link_conn(self, RETVAL);
1645              
1646             {
1647 1           struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
1648 1 50         if (!vh) {
1649             /* Auto-create a server vhost for adoption (no listener needed) */
1650             struct lws_context_creation_info vinfo;
1651 1           memset(&vinfo, 0, sizeof(vinfo));
1652 1           vinfo.port = CONTEXT_PORT_NO_LISTEN_SERVER;
1653 1           vinfo.protocols = protocols;
1654 1           vinfo.vhost_name = "server";
1655 1           vh = lws_create_vhost(self->lws_ctx, &vinfo);
1656             }
1657 1 50         if (!vh) {
1658 0           unlink_conn(RETVAL);
1659 0           free_conn_resources(RETVAL);
1660 0           RETVAL->magic = EV_WS_CONN_FREED;
1661 0           Safefree(RETVAL);
1662 0           croak("Failed to create vhost for adoption");
1663             }
1664 1           pending_adoption = RETVAL;
1665 1 50         if (initial_data_sv && SvOK(initial_data_sv)) {
    0          
1666             STRLEN rdlen;
1667 0           const char *rdbuf = SvPV(initial_data_sv, rdlen);
1668 0           RETVAL->wsi = lws_adopt_socket_vhost_readbuf(vh,
1669             (lws_sockfd_type)fd, rdbuf, rdlen);
1670             } else {
1671 1           RETVAL->wsi = lws_adopt_socket_vhost(vh, (lws_sockfd_type)fd);
1672             }
1673 1           pending_adoption = NULL;
1674             }
1675              
1676 1 50         if (RETVAL->wsi == NULL) {
1677 0           unlink_conn(RETVAL);
1678 0 0         if (RETVAL->perl_self == NULL) {
1679 0           free_conn_resources(RETVAL);
1680 0           RETVAL->magic = EV_WS_CONN_FREED;
1681 0           Safefree(RETVAL);
1682             } else {
1683 0           conn_unref(RETVAL);
1684             }
1685 0           croak("Failed to adopt socket");
1686             }
1687 1           conn_unref(RETVAL); /* drop sentinel */
1688              
1689             /* Kick lws to process the adopted socket's readbuf (needed for lws 4.5+).
1690             * Use the same non-blocking forced-service call as do_lws_service: the
1691             * readbuf is pending work, so lws_service_tsi(ctx, -1, 0) drains it without
1692             * blocking. A plain lws_service(ctx, 0) would block the EV loop here when a
1693             * socket is adopted with no immediately-pending data.
1694             * Guard with extra refs: the service call may synchronously fire
1695             * error/destroy callbacks that would free RETVAL or ctx. */
1696             {
1697 1           int rejected, alive = 1;
1698 1           int* prev_flag = self->alive_flag;
1699 1           conn_ref(RETVAL);
1700 1           ctx_ref(self);
1701 1           self->alive_flag = &alive;
1702 1           lws_service_tsi(self->lws_ctx, -1, 0);
1703 1 50         if (alive) {
1704 1           self->alive_flag = prev_flag;
1705 1           schedule_timeout(self);
1706 0 0         } else if (prev_flag) {
1707             /* Context destroyed during inner lws_service.
1708             Propagate destruction up the alive_flag chain. */
1709 0           *prev_flag = 0;
1710             }
1711 1           rejected = (RETVAL->wsi == NULL);
1712 1           conn_unref(RETVAL);
1713 1           ctx_unref(self);
1714 1 50         if (rejected)
1715 0           croak("Failed to adopt socket");
1716             }
1717             }
1718             OUTPUT:
1719             RETVAL
1720              
1721             void
1722             connections(EV::Websockets::Context self);
1723             PPCODE:
1724             {
1725             ev_ws_conn_t* conn;
1726 2 50         if (self->magic != EV_WS_CTX_MAGIC) XSRETURN_EMPTY;
1727 8 100         for (conn = self->connections; conn != NULL; conn = conn->next) {
1728             /* "connected" stays set during the "closing" drain (close() never
1729             clears it), and is cleared together with wsi on teardown — so this
1730             single predicate covers both the "connected" and "closing" states. */
1731 6 50         if (conn->magic == EV_WS_CONN_MAGIC && conn->connected) {
    100          
1732 4 50         XPUSHs(sv_2mortal(get_conn_sv(conn)));
1733             }
1734             }
1735             }
1736              
1737             MODULE = EV::Websockets PACKAGE = EV::Websockets::Connection
1738              
1739             void
1740             DESTROY(EV::Websockets::Connection self);
1741             CODE:
1742             {
1743 42 50         if (self->magic != EV_WS_CONN_MAGIC) return;
1744              
1745 42 50         DEBUG_LOG("Perl object DESTROY: self=%p wsi=%p", self, self->wsi);
1746              
1747             /* Clear the cached Perl object pointer in the C struct */
1748 42           self->perl_self = NULL;
1749              
1750 42           conn_unref(self); /* drop Perl ref */
1751             }
1752              
1753             void
1754             send(EV::Websockets::Connection self, SV* data);
1755             CODE:
1756             {
1757             STRLEN len;
1758             const char* buf;
1759              
1760 28 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1761 28 100         CHECK_NOT_FRAGMENTING(self);
1762              
1763 27           buf = SvPV(data, len);
1764 27           queue_send(self, buf, len, LWS_WRITE_TEXT);
1765             }
1766              
1767             void
1768             send_binary(EV::Websockets::Connection self, SV* data);
1769             CODE:
1770             {
1771             STRLEN len;
1772             const char* buf;
1773              
1774 4 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1775 4 100         CHECK_NOT_FRAGMENTING(self);
1776              
1777 3           buf = SvPV(data, len);
1778 3           queue_send(self, buf, len, LWS_WRITE_BINARY);
1779             }
1780              
1781             void
1782             send_ping(EV::Websockets::Connection self, SV* data = NULL);
1783             CODE:
1784             {
1785 3           STRLEN len = 0;
1786 3           const char* buf = NULL;
1787            
1788 3 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1789            
1790 3 50         if (data && SvOK(data)) {
    50          
1791 3           buf = SvPV(data, len);
1792 3 50         if (len > 125) len = 125; /* PING payload limit */
1793             }
1794            
1795 3           queue_send(self, buf, len, LWS_WRITE_PING);
1796             }
1797              
1798             SV*
1799             get_protocol(EV::Websockets::Connection self);
1800             CODE:
1801             {
1802 2           SV* val = NULL;
1803 2 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi)
    50          
1804 2           val = hdr_to_sv(self->wsi, WSI_TOKEN_PROTOCOL);
1805 2 50         RETVAL = val ? val : newSV(0);
1806             }
1807             OUTPUT:
1808             RETVAL
1809              
1810             SV*
1811             peer_address(EV::Websockets::Connection self);
1812             CODE:
1813             {
1814             char buf[128];
1815 2           buf[0] = '\0';
1816 2           RETVAL = newSV(0);
1817 2 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi) {
    50          
1818 2           lws_get_peer_simple(self->wsi, buf, sizeof(buf));
1819 2 50         if (buf[0])
1820 2           sv_setpv(RETVAL, buf);
1821             }
1822             }
1823             OUTPUT:
1824             RETVAL
1825              
1826             void
1827             send_pong(EV::Websockets::Connection self, SV* data = NULL);
1828             CODE:
1829             {
1830 1           STRLEN len = 0;
1831 1           const char* buf = NULL;
1832              
1833 1 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1834              
1835 1 50         if (data && SvOK(data)) {
    50          
1836 1           buf = SvPV(data, len);
1837 1 50         if (len > 125) len = 125;
1838             }
1839              
1840 1           queue_send(self, buf, len, LWS_WRITE_PONG);
1841             }
1842              
1843             void
1844             pause_recv(EV::Websockets::Connection self);
1845             CODE:
1846             {
1847 1 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi && self->connected)
    50          
    50          
1848 1           lws_rx_flow_control(self->wsi, 0);
1849             }
1850              
1851             void
1852             resume_recv(EV::Websockets::Connection self);
1853             CODE:
1854             {
1855 1 50         if (self->magic == EV_WS_CONN_MAGIC && self->wsi && self->connected)
    50          
    50          
1856 1           lws_rx_flow_control(self->wsi, 1);
1857             }
1858              
1859             void
1860             close(EV::Websockets::Connection self, int code = 1000, const char* reason = NULL);
1861             CODE:
1862             {
1863 21 50         if (self->magic != EV_WS_CONN_MAGIC) {
1864 0           return;
1865             }
1866 21 100         if (!self->wsi || !self->connected || self->closing) {
    50          
    100          
1867 2           return;
1868             }
1869              
1870 19 50         DEBUG_LOG("Closing connection: code=%d reason=%s", code, reason ? reason : "none");
    0          
1871 19 50         if (self->in_fragmented_send) {
1872 0           queue_send(self, NULL, 0, LWS_WRITE_CONTINUATION);
1873 0           self->in_fragmented_send = 0;
1874             }
1875 19           self->closing = 1;
1876 25 100         lws_close_reason(self->wsi, (enum lws_close_status)code,
1877             reason ? (unsigned char*)reason : NULL,
1878 6           reason ? strlen(reason) : 0);
1879 19           lws_callback_on_writable(self->wsi);
1880             }
1881              
1882             int
1883             is_connected(EV::Websockets::Connection self);
1884             CODE:
1885             {
1886 6 50         RETVAL = (self->magic == EV_WS_CONN_MAGIC && self->wsi != NULL && self->connected) ? 1 : 0;
    100          
    100          
    100          
1887             }
1888             OUTPUT:
1889             RETVAL
1890              
1891             int
1892             is_connecting(EV::Websockets::Connection self);
1893             CODE:
1894             {
1895 3 50         RETVAL = (self->magic == EV_WS_CONN_MAGIC && self->wsi != NULL && !self->connected && !self->closing) ? 1 : 0;
    50          
    100          
    50          
    50          
1896             }
1897             OUTPUT:
1898             RETVAL
1899              
1900             const char*
1901             state(EV::Websockets::Connection self);
1902             CODE:
1903             {
1904 8 50         if (self->magic != EV_WS_CONN_MAGIC) RETVAL = "destroyed";
1905 8 100         else if (!self->wsi) RETVAL = "closed";
1906 5 50         else if (self->closing) RETVAL = "closing";
1907 5 100         else if (self->connected) RETVAL = "connected";
1908 3           else RETVAL = "connecting";
1909             }
1910             OUTPUT:
1911             RETVAL
1912              
1913             UV
1914             send_queue_size(EV::Websockets::Connection self);
1915             CODE:
1916             {
1917 0 0         RETVAL = (self->magic == EV_WS_CONN_MAGIC) ? (UV)self->send_queue_bytes : 0;
    0          
1918             }
1919             OUTPUT:
1920             RETVAL
1921              
1922             void
1923             send_fragment(EV::Websockets::Connection self, SV* data, int is_binary = 0, int is_final = 1);
1924             CODE:
1925             {
1926             STRLEN len;
1927             const char* buf;
1928             enum lws_write_protocol mode;
1929              
1930 2 50         CHECK_CONN_OPEN(self);
    50          
    50          
    50          
1931              
1932 2           buf = SvPV(data, len);
1933              
1934 2 100         if (!self->in_fragmented_send) {
1935 1           mode = is_binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT;
1936 1 50         if (!is_final) {
1937 1           mode |= LWS_WRITE_NO_FIN;
1938 1           self->in_fragmented_send = 1;
1939             }
1940             } else {
1941 1           mode = LWS_WRITE_CONTINUATION;
1942 1 50         if (!is_final) {
1943 0           mode |= LWS_WRITE_NO_FIN;
1944             } else {
1945 1           self->in_fragmented_send = 0;
1946             }
1947             }
1948              
1949 2           queue_send(self, buf, len, mode);
1950             }
1951              
1952             SV*
1953             stash(EV::Websockets::Connection self);
1954             CODE:
1955             {
1956 2 50         if (self->magic != EV_WS_CONN_MAGIC)
1957 0           croak("Connection has been destroyed");
1958 2 100         if (!self->stash)
1959 1           self->stash = newHV();
1960 2           RETVAL = newRV_inc((SV*)self->stash);
1961             }
1962             OUTPUT:
1963             RETVAL