File Coverage

reqrep.h
Criterion Covered Total %
statement 786 963 81.6
branch 353 688 51.3
condition n/a
subroutine n/a
pod n/a
total 1139 1651 68.9


line stmt bran cond sub pod time code
1             /*
2             * reqrep.h -- Shared-memory request/response IPC for Linux
3             *
4             * Request queue (client -> server) with circular arena for variable-length
5             * request data, plus per-request response slots for targeted reply delivery.
6             *
7             * Uses file-backed mmap(MAP_SHARED) for cross-process sharing,
8             * futex for blocking wait, and PID-based stale lock recovery.
9             */
10              
11             #ifndef REQREP_H
12             #define REQREP_H
13              
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30              
31             /* ================================================================
32             * Constants
33             * ================================================================ */
34              
35             #define REQREP_MAGIC 0x52525331U /* "RRS1" */
36             #define REQREP_VERSION 1
37             #define REQREP_ERR_BUFLEN 256
38             #define REQREP_SPIN_LIMIT 32
39             #define REQREP_LOCK_TIMEOUT_SEC 2
40              
41             #define REQREP_UTF8_FLAG 0x80000000U
42             #define REQREP_STR_LEN_MASK 0x7FFFFFFFU
43              
44             #define RESP_FREE 0
45             #define RESP_ACQUIRED 1
46             #define RESP_READY 2
47              
48             #define REQREP_MODE_STR 0
49             #define REQREP_MODE_INT 1
50              
51             /* Pack/unpack slot index + generation into a 64-bit request ID.
52             * Prevents ABA: cancelled slot re-acquired by another client
53             * won't match the generation stored in the original request. */
54             #define REQREP_MAKE_ID(slot, gen) (((uint64_t)(gen) << 32) | (uint64_t)(slot))
55             #define REQREP_ID_SLOT(id) ((uint32_t)((id) & 0xFFFFFFFFULL))
56             #define REQREP_ID_GEN(id) ((uint32_t)((id) >> 32))
57              
58             /* ================================================================
59             * Header (256 bytes = 4 cache lines, lives at start of mmap)
60             * ================================================================ */
61              
62             typedef struct {
63             /* ---- Cache line 0 (0-63): immutable after create ---- */
64             uint32_t magic; /* 0 */
65             uint32_t version; /* 4 */
66             uint32_t mode; /* 8: REQREP_MODE_STR or REQREP_MODE_INT */
67             uint32_t req_cap; /* 12: request queue capacity (power of 2) */
68             uint64_t total_size; /* 16: mmap size */
69             uint32_t req_slots_off; /* 24: offset to request slot array */
70             uint32_t req_arena_off; /* 28: offset to request arena */
71             uint32_t req_arena_cap; /* 32: arena byte capacity */
72             uint32_t resp_slots; /* 36: number of response slots */
73             uint32_t resp_data_max; /* 40: max response data bytes per slot */
74             uint32_t resp_off; /* 44: offset to response slot area */
75             uint32_t resp_stride; /* 48: bytes per response slot (cache-aligned) */
76             uint8_t _pad0[12]; /* 52-63 */
77              
78             /* ---- Cache line 1 (64-127): recv hot (server) ---- */
79             uint64_t req_head; /* 64: consumer position */
80             uint32_t recv_waiters; /* 72: blocked servers */
81             uint32_t recv_futex; /* 76: futex for recv wakeup */
82             uint8_t _pad1[48]; /* 80-127 */
83              
84             /* ---- Cache line 2 (128-191): send hot (client) ---- */
85             uint64_t req_tail; /* 128: producer position */
86             uint32_t send_waiters; /* 136: blocked clients */
87             uint32_t send_futex; /* 140: futex for send wakeup */
88             uint8_t _pad2[48]; /* 144-191 */
89              
90             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
91             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
92             uint32_t mutex_waiters; /* 196 */
93             uint32_t arena_wpos; /* 200: arena write position */
94             uint32_t arena_used; /* 204: arena bytes consumed */
95             uint32_t resp_hint; /* 208: hint for slot scan */
96             uint32_t stat_recoveries; /* 212 */
97             uint32_t slot_futex; /* 216: futex for slot availability */
98             uint32_t slot_waiters; /* 220: threads waiting for free slot */
99             uint64_t stat_requests; /* 224 */
100             uint64_t stat_replies; /* 232 */
101             uint64_t stat_send_full; /* 240 */
102             uint32_t stat_recv_empty; /* 248 */
103             uint32_t _pad3; /* 252-255 */
104             } ReqRepHeader;
105              
106             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
107             _Static_assert(sizeof(ReqRepHeader) == 256, "ReqRepHeader must be 256 bytes");
108             #endif
109              
110             /* ================================================================
111             * Slot types
112             * ================================================================ */
113              
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on recv */
118             uint32_t resp_slot; /* response slot index */
119             uint32_t resp_gen; /* generation at time of slot acquire (ABA guard) */
120             uint32_t _rpad;
121             } ReqSlot; /* 24 bytes (Str mode) */
122              
123             /* Int request slot: Vyukov sequence + value + response routing */
124             typedef struct {
125             uint64_t sequence;
126             int64_t value;
127             uint32_t resp_slot;
128             uint32_t resp_gen;
129             } ReqIntSlot; /* 24 bytes (Int mode, lock-free) */
130              
131             typedef struct {
132             uint32_t state; /* futex: RESP_FREE=0, RESP_ACQUIRED=1, RESP_READY=2 */
133             uint32_t waiters; /* futex waiters on this slot */
134             uint32_t owner_pid; /* PID of client that acquired (for stale recovery) */
135             uint32_t resp_len; /* response data length */
136             uint32_t resp_flags; /* bit 0 = UTF-8 */
137             uint32_t generation; /* incremented on each acquire (ABA guard) */
138             uint32_t _rpad[2]; /* pad to 32 bytes */
139             } RespSlotHeader; /* 32 bytes + data */
140              
141             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
142             _Static_assert(sizeof(RespSlotHeader) == 32, "RespSlotHeader must be 32 bytes");
143             #endif
144              
145             /* ================================================================
146             * Process-local handle
147             * ================================================================ */
148              
149             typedef struct {
150             ReqRepHeader *hdr;
151             ReqSlot *req_slots;
152             char *req_arena;
153             uint8_t *resp_area; /* base of response slots region */
154             size_t mmap_size;
155             uint32_t req_cap;
156             uint32_t req_cap_mask; /* req_cap - 1 */
157             uint32_t req_arena_cap;
158             uint32_t resp_slots;
159             uint32_t resp_data_max;
160             uint32_t resp_stride;
161             char *copy_buf;
162             uint32_t copy_buf_cap;
163             char *path;
164             int notify_fd; /* request notification eventfd, -1 if unset */
165             int reply_fd; /* reply notification eventfd, -1 if unset */
166             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
167             } ReqRepHandle;
168              
169             /* ================================================================
170             * Utility
171             * ================================================================ */
172              
173 59           static inline uint32_t reqrep_next_pow2(uint32_t v) {
174 59 50         if (v < 2) return 2;
175 59 50         if (v > 0x80000000U) return 0;
176 59           v--;
177 59           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
178 59           return v + 1;
179             }
180              
181 78           static inline void reqrep_spin_pause(void) {
182             #if defined(__x86_64__) || defined(__i386__)
183 78           __asm__ volatile("pause" ::: "memory");
184             #elif defined(__aarch64__)
185             __asm__ volatile("yield" ::: "memory");
186             #else
187             __asm__ volatile("" ::: "memory");
188             #endif
189 78           }
190              
191 4891           static inline int reqrep_ensure_copy_buf(ReqRepHandle *h, uint32_t needed) {
192 4891 100         if (needed <= h->copy_buf_cap) return 1;
193 65 50         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
194 72 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
195 65           char *nb = (char *)realloc(h->copy_buf, ns);
196 65 50         if (!nb) return 0;
197 65           h->copy_buf = nb;
198 65           h->copy_buf_cap = ns;
199 65           return 1;
200             }
201              
202 19003           static inline RespSlotHeader *reqrep_resp_slot(ReqRepHandle *h, uint32_t idx) {
203 19003           return (RespSlotHeader *)(h->resp_area + (uint64_t)idx * h->resp_stride);
204             }
205              
206             /* ================================================================
207             * Futex helpers
208             * ================================================================ */
209              
210             #define REQREP_MUTEX_WRITER_BIT 0x80000000U
211             #define REQREP_MUTEX_PID_MASK 0x7FFFFFFFU
212             #define REQREP_MUTEX_VAL(pid) (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))
213              
214 4           static inline int reqrep_pid_alive(uint32_t pid) {
215 4 50         if (pid == 0) return 1;
216 4 50         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
217             }
218              
219             static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };
220              
221 0           static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
222 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
223             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
224 0           return;
225 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
226 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
227 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
228             }
229              
230 6169           static inline void reqrep_mutex_lock(ReqRepHeader *hdr) {
231 6169           uint32_t mypid = REQREP_MUTEX_VAL((uint32_t)getpid());
232 6247           for (int spin = 0; ; spin++) {
233 6247           uint32_t expected = 0;
234 6247 100         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
235             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
236 6169           return;
237 78 50         if (__builtin_expect(spin < REQREP_SPIN_LIMIT, 1)) {
238 78           reqrep_spin_pause();
239 78           continue;
240             }
241 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
242 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
243 0 0         if (cur != 0) {
244 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
245             &reqrep_lock_timeout, NULL, 0);
246 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
247 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
248 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
249 0 0         if (val >= REQREP_MUTEX_WRITER_BIT) {
250 0           uint32_t pid = val & REQREP_MUTEX_PID_MASK;
251 0 0         if (!reqrep_pid_alive(pid))
252 0           reqrep_recover_stale_mutex(hdr, val);
253             }
254 0           spin = 0;
255 0           continue;
256             }
257             }
258 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
259 0           spin = 0;
260             }
261             }
262              
263 6169           static inline void reqrep_mutex_unlock(ReqRepHeader *hdr) {
264 6169           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
265 6169 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
266 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
267 6169           }
268              
269 2898           static inline void reqrep_wake_consumers(ReqRepHeader *hdr) {
270 2898 100         if (__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED) > 0) {
271 1365           __atomic_add_fetch(&hdr->recv_futex, 1, __ATOMIC_RELEASE);
272 1365           syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
273             }
274 2898           }
275              
276 2277           static inline void reqrep_wake_producers(ReqRepHeader *hdr) {
277 2277 50         if (__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED) > 0) {
278 0           __atomic_add_fetch(&hdr->send_futex, 1, __ATOMIC_RELEASE);
279 0           syscall(SYS_futex, &hdr->send_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
280             }
281 2277           }
282              
283 2891           static inline void reqrep_wake_slot_waiters(ReqRepHeader *hdr) {
284 2891 50         if (__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED) > 0) {
285 0           __atomic_add_fetch(&hdr->slot_futex, 1, __ATOMIC_RELEASE);
286 0           syscall(SYS_futex, &hdr->slot_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
287             }
288 2891           }
289              
290 2302           static inline int reqrep_remaining_time(const struct timespec *deadline,
291             struct timespec *remaining) {
292             struct timespec now;
293 2302           clock_gettime(CLOCK_MONOTONIC, &now);
294 2302           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
295 2302           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
296 2302 100         if (remaining->tv_nsec < 0) {
297 2295           remaining->tv_sec--;
298 2295           remaining->tv_nsec += 1000000000L;
299             }
300 2302           return remaining->tv_sec >= 0;
301             }
302              
303 4127           static inline void reqrep_make_deadline(double timeout, struct timespec *deadline) {
304 4127           clock_gettime(CLOCK_MONOTONIC, deadline);
305 4127           deadline->tv_sec += (time_t)timeout;
306 4127           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
307 4127 100         if (deadline->tv_nsec >= 1000000000L) {
308 1777           deadline->tv_sec++;
309 1777           deadline->tv_nsec -= 1000000000L;
310             }
311 4127           }
312              
313             /* ================================================================
314             * Response slot operations
315             * ================================================================ */
316              
317 2904           static int32_t reqrep_slot_acquire(ReqRepHandle *h) {
318 2904           uint32_t n = h->resp_slots;
319 2904           uint32_t hint = __atomic_load_n(&h->hdr->resp_hint, __ATOMIC_RELAXED);
320 2904           uint32_t mypid = (uint32_t)getpid();
321              
322 3098 100         for (uint32_t i = 0; i < n; i++) {
323 3096           uint32_t idx = (hint + i) % n;
324 3096           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
325 3096           uint32_t expected = RESP_FREE;
326 3096 100         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
327             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
328 2902           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
329 2902           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
330 2902           __atomic_store_n(&h->hdr->resp_hint, (idx + 1) % n, __ATOMIC_RELAXED);
331 2902           return (int32_t)idx;
332             }
333             }
334              
335             /* Recover stale slots from dead processes (both ACQUIRED and READY).
336             *
337             * ABA-safe pattern (mirrors Pool's recover_stale): CAS owner_pid dead→0
338             * FIRST to claim exclusive recovery rights. Without this, two recoverers
339             * (or a recoverer racing a free+fresh-acquire cycle) can both see
340             * state==ACQUIRED and the same dead PID, both race their state CAS, and
341             * the loser silently clobbers a live owner — the constant RESP_ACQUIRED
342             * value is identical across acquisitions, so the state CAS alone cannot
343             * detect the recycle.
344             *
345             * Use ACQUIRE on owner_pid so we synchronize with the writer's RELEASE
346             * generation bump (which orders the owner_pid store before any external
347             * observer of the new generation). */
348 6 100         for (uint32_t i = 0; i < n; i++) {
349 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
350 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
351 8 50         if (state != RESP_ACQUIRED && state != RESP_READY) continue;
    0          
352 4           uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_ACQUIRE);
353 4 50         if (!pid || reqrep_pid_alive(pid)) continue;
    50          
354              
355             /* Claim exclusive recovery rights by CASing the dead owner to 0.
356             * Loser of this race exits — winner of state CAS would have already
357             * progressed; we can't safely race the state transition. */
358 0           uint32_t expected_pid = pid;
359 0 0         if (!__atomic_compare_exchange_n(&slot->owner_pid, &expected_pid, 0,
360             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
361 0           continue;
362              
363             /* We own the recovery now. Drive state to FREE; retry on transient
364             * ACQUIRED→READY (reply arrived after death) until state is FREE.
365             * If clear() or another path beat us, state==FREE already.
366             * State can only be 0/1/2; CAS-on-fail sets cur_state to current. */
367 0           uint32_t cur_state = state;
368 0 0         while (cur_state != RESP_FREE) {
369 0 0         if (__atomic_compare_exchange_n(&slot->state, &cur_state, RESP_FREE,
370             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
371 0           break;
372             }
373 0           __atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
374              
375             /* Now claim FREE→ACQUIRED for ourselves. */
376 0           uint32_t expected = RESP_FREE;
377 0 0         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
378             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
379 0           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
380 0           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
381 0           return (int32_t)i;
382             }
383             /* Someone else grabbed the FREE slot; wake any waiters on slot avail. */
384 0           reqrep_wake_slot_waiters(h->hdr);
385             }
386              
387 2           return -1;
388             }
389              
390             /* Release a response slot from a known prior state we owned (ACQUIRED for
391             * the post-try_send-failure path, READY for the try_get success path).
392             *
393             * Two-step claim to survive clear()/recovery races:
394             * 1. CAS owner_pid mypid -> 0 to claim release rights. If this fails,
395             * clear() already reset us (or a fresh acquirer set their own pid):
396             * either way, the slot is no longer ours and the release is a no-op.
397             * 2. Single-shot CAS state from_state -> FREE. If state is unexpected
398             * (already FREE from clear, or ACQUIRED again from a fresh acquirer
399             * after clear), CAS-on-fail is a no-op — fresh acquirer's claim is
400             * preserved.
401             *
402             * Without step 1, a blind state CAS from RESP_ACQUIRED -> FREE would
403             * silently clobber a freshly-re-acquired slot (ABA on the state value). */
404 2677           static inline void reqrep_slot_release_from(ReqRepHandle *h, uint32_t idx,
405             uint32_t from_state) {
406 2677           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
407 2677           uint32_t mypid = (uint32_t)getpid();
408 2677           uint32_t expected_pid = mypid;
409 2677 50         if (!__atomic_compare_exchange_n(&slot->owner_pid, &expected_pid, 0,
410             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
411 0           return;
412 2677           uint32_t expected_state = from_state;
413 2677           (void)__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
414             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
415 2677           reqrep_wake_slot_waiters(h->hdr);
416             }
417              
418             /* Release the slot we just acquired (ACQUIRED state). Used in
419             * try_send / int_try_send when send_locked fails after slot_acquire. */
420 6           static inline void reqrep_slot_release(ReqRepHandle *h, uint32_t idx) {
421 6           reqrep_slot_release_from(h, idx, RESP_ACQUIRED);
422 6           }
423              
424             /* ================================================================
425             * Create / Open / Close
426             * ================================================================ */
427              
428             #define REQREP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, REQREP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
429              
430 112           static ReqRepHandle *reqrep_setup_handle(void *base, size_t map_size,
431             const char *path, int backing_fd) {
432 112           ReqRepHeader *hdr = (ReqRepHeader *)base;
433 112           ReqRepHandle *h = (ReqRepHandle *)calloc(1, sizeof(ReqRepHandle));
434 112 50         if (!h) return NULL;
435              
436 112           h->hdr = hdr;
437 112           h->req_slots = (ReqSlot *)((char *)base + hdr->req_slots_off);
438 112           h->req_arena = (char *)base + hdr->req_arena_off;
439 112           h->resp_area = (uint8_t *)base + hdr->resp_off;
440 112           h->mmap_size = map_size;
441 112           h->req_cap = hdr->req_cap;
442 112           h->req_cap_mask = hdr->req_cap - 1;
443 112           h->req_arena_cap = hdr->req_arena_cap;
444 112           h->resp_slots = hdr->resp_slots;
445 112           h->resp_data_max = hdr->resp_data_max;
446 112           h->resp_stride = hdr->resp_stride;
447 112 100         h->path = path ? strdup(path) : NULL;
448 112           h->notify_fd = -1;
449 112           h->reply_fd = -1;
450 112           h->backing_fd = backing_fd;
451              
452 112           return h;
453             }
454              
455 55           static int reqrep_validate_header(ReqRepHeader *hdr, size_t file_size, uint32_t expected_mode) {
456 55 50         if (hdr->magic != REQREP_MAGIC) return 0;
457 55 50         if (hdr->version != REQREP_VERSION) return 0;
458 55 50         if (hdr->mode != expected_mode) return 0;
459 55 50         if (hdr->req_cap == 0 || (hdr->req_cap & (hdr->req_cap - 1)) != 0) return 0;
    50          
460 55 50         if (hdr->total_size != (uint64_t)file_size) return 0;
461 55 50         if (hdr->req_slots_off != sizeof(ReqRepHeader)) return 0;
462 55 50         if (hdr->resp_slots == 0) return 0;
463 55 50         if (hdr->resp_stride < sizeof(RespSlotHeader)) return 0;
464             /* Stride must hold full slot header + data area; otherwise reply writes
465             * to slot N would overflow into slot N+1's header (data corruption /
466             * memory unsafety for a crafted file). */
467 55 50         if ((uint64_t)hdr->resp_stride < (uint64_t)sizeof(RespSlotHeader) + hdr->resp_data_max) return 0;
468             /* Compute end of req slots area; req_arena and resp must come after it. */
469 55           uint64_t req_slot_size = (expected_mode == REQREP_MODE_STR)
470             ? sizeof(ReqSlot) : sizeof(ReqIntSlot);
471 55           uint64_t req_slots_end = (uint64_t)hdr->req_slots_off
472 55           + (uint64_t)hdr->req_cap * req_slot_size;
473 55 50         if (req_slots_end > hdr->total_size) return 0;
474 55 100         if (expected_mode == REQREP_MODE_STR) {
475 45 50         if (hdr->req_arena_off < req_slots_end) return 0;
476 45 50         if ((uint64_t)hdr->req_arena_off + hdr->req_arena_cap > hdr->total_size) return 0;
477             /* resp must not overlap arena */
478 45 50         if (hdr->resp_off < (uint64_t)hdr->req_arena_off + hdr->req_arena_cap) return 0;
479             }
480 55 50         if (hdr->resp_off < req_slots_end) return 0;
481 55 50         if ((uint64_t)hdr->resp_off + (uint64_t)hdr->resp_slots * hdr->resp_stride > hdr->total_size) return 0;
482 55           return 1;
483             }
484              
485 50           static void reqrep_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
486             uint32_t resp_data_max, uint64_t total_size,
487             uint32_t req_slots_off, uint32_t req_arena_off,
488             uint32_t req_arena_cap, uint32_t resp_off,
489             uint32_t resp_stride) {
490 50           ReqRepHeader *hdr = (ReqRepHeader *)base;
491 50           memset(hdr, 0, sizeof(ReqRepHeader));
492 50           hdr->magic = REQREP_MAGIC;
493 50           hdr->version = REQREP_VERSION;
494 50           hdr->mode = REQREP_MODE_STR;
495 50           hdr->req_cap = req_cap;
496 50           hdr->total_size = total_size;
497 50           hdr->req_slots_off = req_slots_off;
498 50           hdr->req_arena_off = req_arena_off;
499 50           hdr->req_arena_cap = req_arena_cap;
500 50           hdr->resp_slots = resp_slots_n;
501 50           hdr->resp_data_max = resp_data_max;
502 50           hdr->resp_off = resp_off;
503 50           hdr->resp_stride = resp_stride;
504              
505 948 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
506 898           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
507 898           memset(rs, 0, sizeof(RespSlotHeader));
508             }
509              
510 50           __atomic_thread_fence(__ATOMIC_SEQ_CST);
511 50           }
512              
513             /* Returns 0 on success, -1 on overflow (offsets would exceed UINT32_MAX). */
514 51           static int reqrep_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
515             uint32_t resp_data_max, uint64_t arena_hint,
516             uint32_t *out_req_slots_off, uint32_t *out_req_arena_off,
517             uint32_t *out_req_arena_cap, uint32_t *out_resp_off,
518             uint32_t *out_resp_stride, uint64_t *out_total_size) {
519 51           uint32_t req_slots_off = sizeof(ReqRepHeader);
520 51           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqSlot);
521 51           uint64_t req_arena_off_64 = (slots_end + 7) & ~(uint64_t)7;
522 51 50         if (req_arena_off_64 > UINT32_MAX) return -1;
523 51           uint32_t req_arena_off = (uint32_t)req_arena_off_64;
524              
525 51 50         if (arena_hint > UINT32_MAX) return -1;
526 51           uint32_t req_arena_cap = (uint32_t)arena_hint;
527 51 100         if (req_arena_cap < 4096) req_arena_cap = 4096;
528              
529             /* Compute resp_stride in uint64 to catch overflow before truncation.
530             * Without the cast, a large resp_data_max (close to UINT32_MAX) wraps
531             * to a tiny stride value, leaving slot allocation too small and
532             * letting reply writes overflow into neighboring slots. */
533 51           uint64_t resp_stride_64 = ((uint64_t)sizeof(RespSlotHeader)
534 51           + (uint64_t)resp_data_max + 63) & ~(uint64_t)63;
535 51 50         if (resp_stride_64 > UINT32_MAX) return -1;
536 51           uint32_t resp_stride = (uint32_t)resp_stride_64;
537 51           uint64_t resp_off_64 = ((uint64_t)req_arena_off + req_arena_cap + 63) & ~(uint64_t)63;
538 51 50         if (resp_off_64 > UINT32_MAX) return -1;
539 51           uint64_t total_size = resp_off_64 + (uint64_t)resp_slots_n * resp_stride;
540              
541 51           *out_req_slots_off = req_slots_off;
542 51           *out_req_arena_off = req_arena_off;
543 51           *out_req_arena_cap = req_arena_cap;
544 51           *out_resp_off = (uint32_t)resp_off_64;
545 51           *out_resp_stride = resp_stride;
546 51           *out_total_size = total_size;
547 51           return 0;
548             }
549              
550 48           static ReqRepHandle *reqrep_create(const char *path, uint32_t req_cap,
551             uint32_t resp_slots_n, uint32_t resp_data_max,
552             uint64_t arena_hint, char *errbuf) {
553 48 50         if (errbuf) errbuf[0] = '\0';
554              
555 48           req_cap = reqrep_next_pow2(req_cap);
556 48 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
557 48 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
558              
559 48 100         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
560              
561             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
562             uint64_t total_size;
563 48 50         if (reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
564             &req_slots_off, &req_arena_off, &req_arena_cap,
565             &resp_off, &resp_stride, &total_size) < 0) {
566 0 0         REQREP_ERR("layout overflow: req_cap/arena_hint too large for uint32 offsets");
567 0           return NULL;
568             }
569              
570 48           int anonymous = (path == NULL);
571             size_t map_size;
572             void *base;
573              
574 48 100         if (anonymous) {
575 1           map_size = (size_t)total_size;
576 1           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
577             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
578 1 50         if (base == MAP_FAILED) {
579 0 0         REQREP_ERR("mmap(anonymous): %s", strerror(errno));
580 0           return NULL;
581             }
582 1           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
583             req_slots_off, req_arena_off, req_arena_cap,
584             resp_off, resp_stride);
585             } else {
586 47           int fd = open(path, O_RDWR | O_CREAT, 0666);
587 48 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
588              
589 47 50         if (flock(fd, LOCK_EX) < 0) {
590 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
591 0           close(fd); return NULL;
592             }
593              
594             struct stat st;
595 47 50         if (fstat(fd, &st) < 0) {
596 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
597 0           flock(fd, LOCK_UN); close(fd); return NULL;
598             }
599              
600 47           int is_new = (st.st_size == 0);
601              
602 47 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
603 0 0         REQREP_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
604 0           flock(fd, LOCK_UN); close(fd); return NULL;
605             }
606              
607 47 100         if (is_new) {
608 46 50         if (ftruncate(fd, (off_t)total_size) < 0) {
609 0 0         REQREP_ERR("ftruncate(%s): %s", path, strerror(errno));
610 0           flock(fd, LOCK_UN); close(fd); return NULL;
611             }
612             }
613              
614 47 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
615 47           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
616 47 50         if (base == MAP_FAILED) {
617 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
618 0           flock(fd, LOCK_UN); close(fd); return NULL;
619             }
620              
621 47 100         if (!is_new) {
622 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, (size_t)st.st_size, REQREP_MODE_STR)) {
623 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
624 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
625             }
626 1           flock(fd, LOCK_UN);
627 1           close(fd);
628 1           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
629 1 50         if (!h) { munmap(base, map_size); return NULL; }
630 1           return h;
631             }
632              
633 46           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
634             req_slots_off, req_arena_off, req_arena_cap,
635             resp_off, resp_stride);
636 46           flock(fd, LOCK_UN);
637 46           close(fd);
638             }
639              
640 47           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
641 47 50         if (!h) { munmap(base, map_size); return NULL; }
642 47           return h;
643             }
644              
645 49           static ReqRepHandle *reqrep_open(const char *path, uint32_t mode, char *errbuf) {
646 49 50         if (errbuf) errbuf[0] = '\0';
647 49 50         if (!path) { REQREP_ERR("path required"); return NULL; }
    0          
648              
649 49           int fd = open(path, O_RDWR);
650 49 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
651              
652 49 50         if (flock(fd, LOCK_EX) < 0) {
653 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
654 0           close(fd); return NULL;
655             }
656              
657             struct stat st;
658 49 50         if (fstat(fd, &st) < 0) {
659 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
660 0           flock(fd, LOCK_UN); close(fd); return NULL;
661             }
662              
663 49 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
664 0 0         REQREP_ERR("%s: file too small or not initialized", path);
665 0           flock(fd, LOCK_UN); close(fd); return NULL;
666             }
667              
668 49           size_t map_size = (size_t)st.st_size;
669 49           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
670 49 50         if (base == MAP_FAILED) {
671 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
672 0           flock(fd, LOCK_UN); close(fd); return NULL;
673             }
674              
675 49 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
676 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
677 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
678             }
679              
680 49           flock(fd, LOCK_UN);
681 49           close(fd);
682              
683 49           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
684 49 50         if (!h) { munmap(base, map_size); return NULL; }
685 49           return h;
686             }
687              
688 3           static ReqRepHandle *reqrep_create_memfd(const char *name, uint32_t req_cap,
689             uint32_t resp_slots_n, uint32_t resp_data_max,
690             uint64_t arena_hint, char *errbuf) {
691 3 50         if (errbuf) errbuf[0] = '\0';
692              
693 3           req_cap = reqrep_next_pow2(req_cap);
694 3 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
695 3 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
696              
697 3 50         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
698              
699             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
700             uint64_t total_size;
701 3 50         if (reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
702             &req_slots_off, &req_arena_off, &req_arena_cap,
703             &resp_off, &resp_stride, &total_size) < 0) {
704 0 0         REQREP_ERR("layout overflow: req_cap/arena_hint too large for uint32 offsets");
705 0           return NULL;
706             }
707              
708 3 50         int fd = memfd_create(name ? name : "reqrep", MFD_CLOEXEC | MFD_ALLOW_SEALING);
709 3 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
710              
711 3 50         if (ftruncate(fd, (off_t)total_size) < 0) {
712 0 0         REQREP_ERR("ftruncate(memfd): %s", strerror(errno));
713 0           close(fd); return NULL;
714             }
715 3           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
716              
717 3           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
718 3 50         if (base == MAP_FAILED) {
719 0 0         REQREP_ERR("mmap(memfd): %s", strerror(errno));
720 0           close(fd); return NULL;
721             }
722              
723 3           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
724             req_slots_off, req_arena_off, req_arena_cap,
725             resp_off, resp_stride);
726              
727 3           ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
728 3 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
729 3           return h;
730             }
731              
732 4           static ReqRepHandle *reqrep_open_fd(int fd, uint32_t mode, char *errbuf) {
733 4 50         if (errbuf) errbuf[0] = '\0';
734              
735             struct stat st;
736 4 50         if (fstat(fd, &st) < 0) {
737 0 0         REQREP_ERR("fstat(fd=%d): %s", fd, strerror(errno));
738 0           return NULL;
739             }
740              
741 4 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
742 0 0         REQREP_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
743 0           return NULL;
744             }
745              
746 4           size_t map_size = (size_t)st.st_size;
747 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
748 4 50         if (base == MAP_FAILED) {
749 0 0         REQREP_ERR("mmap(fd=%d): %s", fd, strerror(errno));
750 0           return NULL;
751             }
752              
753 4 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
754 0 0         REQREP_ERR("fd %d: invalid or incompatible reqrep", fd);
755 0           munmap(base, map_size);
756 0           return NULL;
757             }
758              
759 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
760 4 50         if (myfd < 0) {
761 0 0         REQREP_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
762 0           munmap(base, map_size);
763 0           return NULL;
764             }
765              
766 4           ReqRepHandle *h = reqrep_setup_handle(base, map_size, NULL, myfd);
767 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
768 4           return h;
769             }
770              
771 112           static void reqrep_destroy(ReqRepHandle *h) {
772 112 50         if (!h) return;
773 112 100         if (h->notify_fd >= 0) close(h->notify_fd);
774 112 100         if (h->reply_fd >= 0) close(h->reply_fd);
775 112 100         if (h->backing_fd >= 0) close(h->backing_fd);
776 112 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
777 112           free(h->copy_buf);
778 112           free(h->path);
779 112           free(h);
780             }
781              
782             /* ================================================================
783             * Request queue operations (client -> server)
784             * ================================================================ */
785              
786             /* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
787 2863           static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
788             uint32_t len, bool utf8,
789             uint32_t resp_slot_idx, uint32_t resp_gen) {
790 2863           ReqRepHeader *hdr = h->hdr;
791              
792 2863 50         if (len > REQREP_STR_LEN_MASK) return -2;
793              
794 2863 100         if (hdr->req_tail - hdr->req_head >= h->req_cap) {
795 4           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
796 4           return 0;
797             }
798              
799 2859           uint32_t alloc = (len + 7) & ~7u;
800 2859 100         if (alloc == 0) alloc = 8;
801             /* Single message must fit arena; else overflow into response slots. */
802 2859 50         if (alloc > h->req_arena_cap) return -2;
803 2859           uint32_t pos = hdr->arena_wpos;
804 2859           uint64_t skip = alloc;
805              
806 2859 100         if ((uint64_t)pos + alloc > h->req_arena_cap) {
807 1           skip += h->req_arena_cap - pos;
808 1           pos = 0;
809             }
810              
811 2859 100         if ((uint64_t)hdr->arena_used + skip > h->req_arena_cap) {
812 1 50         if (hdr->req_tail == hdr->req_head) {
813 0           hdr->arena_wpos = 0;
814 0           hdr->arena_used = 0;
815 0           pos = 0;
816 0           skip = alloc;
817             } else {
818 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
819 1           return 0;
820             }
821             }
822              
823 2858           memcpy(h->req_arena + pos, str, len);
824              
825 2858           uint32_t idx = (uint32_t)(hdr->req_tail & h->req_cap_mask);
826 2858           ReqSlot *slot = &h->req_slots[idx];
827 2858           slot->arena_off = pos;
828 2858 100         slot->packed_len = len | (utf8 ? REQREP_UTF8_FLAG : 0);
829 2858           slot->arena_skip = (uint32_t)skip;
830 2858           slot->resp_slot = resp_slot_idx;
831 2858           slot->resp_gen = resp_gen;
832              
833 2858           hdr->arena_wpos = pos + alloc;
834 2858           hdr->arena_used += (uint32_t)skip;
835 2858           hdr->req_tail++;
836 2858           __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
837 2858           return 1;
838             }
839              
840             /* Non-blocking send: acquire slot + push request.
841             * Returns 1=ok, 0=full, -2=too long, -3=no slots.
842             * On success, *out_id is the packed slot+generation ID. */
843 2864           static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
844             bool utf8, uint64_t *out_id) {
845 2864           int32_t slot = reqrep_slot_acquire(h);
846 2864 100         if (slot < 0) return -3;
847              
848 2863           RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
849 2863           uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);
850              
851 2863           reqrep_mutex_lock(h->hdr);
852 2863           int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
853 2863           reqrep_mutex_unlock(h->hdr);
854              
855 2863 100         if (r == 1) {
856 2858           reqrep_wake_consumers(h->hdr);
857 2858           *out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
858 2858           return 1;
859             }
860              
861 5           reqrep_slot_release(h, (uint32_t)slot);
862 5           return r;
863             }
864              
865             /* Blocking send with timeout. Returns 1=ok, 0=timeout, -2=too long, -3=no slots (timeout). */
866 2765           static int reqrep_send_wait(ReqRepHandle *h, const char *str, uint32_t len,
867             bool utf8, uint64_t *out_id, double timeout) {
868 2765           int r = reqrep_try_send(h, str, len, utf8, out_id);
869 2765 100         if (r == 1 || r == -2) return r;
    50          
870 1 50         if (timeout == 0) return r;
871              
872 1           ReqRepHeader *hdr = h->hdr;
873             struct timespec deadline, remaining;
874 1           int has_deadline = (timeout > 0);
875 1 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
876              
877 0           for (;;) {
878 1 50         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
879 1 50         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
880              
881 1           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
882 1           r = reqrep_try_send(h, str, len, utf8, out_id);
883 1 50         if (r == 1 || r == -2) return r;
    50          
884              
885 1           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
886 1           struct timespec *pts = NULL;
887 1 50         if (has_deadline) {
888 1 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
889 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
890 0           return r;
891             }
892 1           pts = &remaining;
893             }
894 1           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
895 1           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
896              
897 1           r = reqrep_try_send(h, str, len, utf8, out_id);
898 1 50         if (r == 1 || r == -2) return r;
    50          
899 1 50         if (rc == -1 && errno == ETIMEDOUT) return r;
    50          
900             }
901             }
902              
903             /* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
904 3324           static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
905             uint32_t *out_len, bool *out_utf8,
906             uint64_t *out_id) {
907 3324           ReqRepHeader *hdr = h->hdr;
908              
909 3324 100         if (hdr->req_tail == hdr->req_head) {
910 1072           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
911 1072           return 0;
912             }
913              
914 2252           uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
915 2252           ReqSlot *slot = &h->req_slots[idx];
916              
917 2252           uint32_t len = slot->packed_len & REQREP_STR_LEN_MASK;
918 2252           *out_utf8 = (slot->packed_len & REQREP_UTF8_FLAG) != 0;
919 2252           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
920              
921 2252 50         if (!reqrep_ensure_copy_buf(h, len + 1))
922 0           return -1;
923 2252 100         if (len > 0)
924 2248           memcpy(h->copy_buf, h->req_arena + slot->arena_off, len);
925 2252           h->copy_buf[len] = '\0';
926 2252           *out_str = h->copy_buf;
927 2252           *out_len = len;
928              
929 2252 50         if (hdr->arena_used >= slot->arena_skip)
930 2252           hdr->arena_used -= slot->arena_skip;
931             else
932 0           hdr->arena_used = 0;
933 2252 100         if (hdr->arena_used == 0)
934 1127           hdr->arena_wpos = 0;
935              
936 2252           hdr->req_head++;
937 2252           return 1;
938             }
939              
940             /* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
941 3296           static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
942             uint32_t *out_len, bool *out_utf8,
943             uint64_t *out_id) {
944 3296           reqrep_mutex_lock(h->hdr);
945 3296           int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
946 3296           reqrep_mutex_unlock(h->hdr);
947 3296 100         if (r == 1) reqrep_wake_producers(h->hdr);
948 3296           return r;
949             }
950              
951             /* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
952 2154           static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
953             uint32_t *out_len, bool *out_utf8,
954             uint64_t *out_id, double timeout) {
955 2154           int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
956 2154 100         if (r != 0) return r;
957 538 50         if (timeout == 0) return 0;
958              
959 538           ReqRepHeader *hdr = h->hdr;
960             struct timespec deadline, remaining;
961 538           int has_deadline = (timeout > 0);
962 538 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
963              
964 0           for (;;) {
965 538           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
966 538           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
967 538 100         if (r != 0) return r;
968              
969 513           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
970 513           struct timespec *pts = NULL;
971 513 50         if (has_deadline) {
972 513 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
973 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
974 0           return 0;
975             }
976 513           pts = &remaining;
977             }
978 513           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
979 513           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
980              
981 513           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
982 513 100         if (r != 0) return r;
983 5 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
984             }
985             }
986              
987             /* ================================================================
988             * Response operations (server -> client)
989             * ================================================================ */
990              
991             /* Write response to a response slot.
992             * Returns 1=ok, -1=bad slot, -2=stale (cancelled/recycled), -3=too long.
993             *
994             * Stale-detection race: the gen check at top + CAS at bottom is TOCTOU on
995             * the generation counter. Between them, the original owner may cancel
996             * (state→FREE, gen++) and a new owner may re-acquire (state→ACQUIRED,
997             * gen++) — leaving state==ACQUIRED (same numeric value) but gen advanced.
998             * The CAS, which only compares state, would silently publish our data
999             * into the new owner's slot, who'd then read it as their reply.
1000             *
1001             * Fix: re-verify gen immediately before the publishing CAS, and re-verify
1002             * once more AFTER the CAS. If the post-CAS gen check fails, we've already
1003             * published READY against a wrong-owner slot — issue a corrective generation
1004             * bump + state reset to invalidate the wrong owner's id. They will see
1005             * -4 (stale) from get_wait, which is the same outcome as if they had been
1006             * cancelled — preferable to data corruption. */
1007 2249           static int reqrep_reply(ReqRepHandle *h, uint64_t id,
1008             const char *str, uint32_t len, bool utf8) {
1009 2249           uint32_t slot_idx = REQREP_ID_SLOT(id);
1010 2249           uint32_t expected_gen = REQREP_ID_GEN(id);
1011 2249 50         if (slot_idx >= h->resp_slots) return -1;
1012 2249 100         if (len > h->resp_data_max) return -3;
1013              
1014 2248           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1015 2248           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1016 2248 100         if (state != RESP_ACQUIRED) return -2;
1017 2054 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
1018              
1019 2043           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
1020 2043 100         if (len > 0) memcpy(data, str, len);
1021 2043           slot->resp_len = len;
1022 2043           slot->resp_flags = utf8 ? 1 : 0;
1023              
1024             /* Tighten the race window: re-check gen just before CAS. A cancel +
1025             * re-acquire cycle would have bumped gen; reject if so. */
1026 2043 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen)
1027 1           return -2;
1028              
1029             /* CAS ACQUIRED→READY (with RELEASE to publish data writes). */
1030 2042           uint32_t expected_state = RESP_ACQUIRED;
1031 2042 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
1032             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
1033 0           return -2;
1034              
1035             /* Post-CAS gen verification: cancel+re-acquire could still have raced
1036             * between the gen recheck and the CAS. If so, we've stamped READY onto
1037             * the wrong owner's slot. Invalidate them rather than let them read
1038             * our stale data: bump gen + reset state to FREE. */
1039 2042 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) {
1040 0           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1041 0           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1042 0           __atomic_store_n(&slot->state, RESP_FREE, __ATOMIC_RELEASE);
1043 0 0         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1044 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1045 0           reqrep_wake_slot_waiters(h->hdr);
1046 0           return -2;
1047             }
1048              
1049 2042 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1050 1921           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
1051              
1052 2042           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
1053 2042           return 1;
1054             }
1055              
1056             /* Non-blocking get response. Returns 1=ok, 0=not ready, -1=bad slot, -2=OOM, -4=stale. */
1057 5173           static int reqrep_try_get(ReqRepHandle *h, uint64_t id,
1058             const char **out_str, uint32_t *out_len, bool *out_utf8) {
1059 5173           uint32_t slot_idx = REQREP_ID_SLOT(id);
1060 5173           uint32_t expected_gen = REQREP_ID_GEN(id);
1061 5173 50         if (slot_idx >= h->resp_slots) return -1;
1062              
1063 5173           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1064 5173           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1065 5173 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
1066 5168 100         if (state != RESP_READY) return 0;
1067              
1068 2639           uint32_t len = slot->resp_len;
1069 2639           *out_utf8 = (slot->resp_flags & 1) != 0;
1070              
1071 2639 50         if (!reqrep_ensure_copy_buf(h, len + 1)) return -2;
1072              
1073 2639           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
1074 2639 100         if (len > 0) memcpy(h->copy_buf, data, len);
1075 2639           h->copy_buf[len] = '\0';
1076 2639           *out_str = h->copy_buf;
1077 2639           *out_len = len;
1078              
1079             /* CAS READY→FREE so a fresh acquirer (after clear/recovery raced our
1080             * read) keeps their slot. */
1081 2639           reqrep_slot_release_from(h, slot_idx, RESP_READY);
1082 2639           return 1;
1083             }
1084              
1085             /* Blocking get response. Returns 1=ok, 0=timeout, -1=bad slot, -2=OOM, -4=stale. */
1086 2554           static int reqrep_get_wait(ReqRepHandle *h, uint64_t id,
1087             const char **out_str, uint32_t *out_len, bool *out_utf8,
1088             double timeout) {
1089 2554           int r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
1090 2554 100         if (r != 0) return r;
1091 2528 50         if (timeout == 0) return 0;
1092              
1093 2528           uint32_t slot_idx = REQREP_ID_SLOT(id);
1094 2528           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1095             struct timespec deadline, remaining;
1096 2528           int has_deadline = (timeout > 0);
1097 2528 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1098              
1099 1           for (;;) {
1100 2529           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1101 2529 50         if (state == RESP_READY)
1102 0           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
1103              
1104 2529           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1105              
1106             /* Re-check: cancel may have fired between try_get and waiter registration */
1107 2529 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
1108 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1109 0           return -4;
1110             }
1111              
1112 2529           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1113 2529 50         if (state == RESP_READY) {
1114 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1115 0           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
1116             }
1117              
1118 2529           struct timespec *pts = NULL;
1119 2529 100         if (has_deadline) {
1120 1782 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
1121 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1122 1           return 0;
1123             }
1124 1781           pts = &remaining;
1125             }
1126              
1127 2528           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
1128 2528           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1129              
1130 2528           r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
1131 2528 100         if (r != 0) return r;
1132             }
1133             }
1134              
1135             /* Cancel a pending request — CAS ACQUIRED→FREE only if generation matches.
1136             * If the reply already arrived (READY), cancel is a no-op — call get() to drain. */
1137 218           static void reqrep_cancel(ReqRepHandle *h, uint64_t id) {
1138 218           uint32_t slot_idx = REQREP_ID_SLOT(id);
1139 218           uint32_t expected_gen = REQREP_ID_GEN(id);
1140 218 50         if (slot_idx >= h->resp_slots) return;
1141 218           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1142 218 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
1143 218           uint32_t expected_state = RESP_ACQUIRED;
1144 218 100         if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
1145             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1146 212           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1147 212           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1148             /* Wake get_wait blocked on this slot's state futex */
1149 212 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1150 1           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1151 212           reqrep_wake_slot_waiters(h->hdr);
1152             }
1153             }
1154              
1155             /* Combined send + wait-for-reply with single deadline.
1156             * Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
1157 2548           static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
1158             bool req_utf8, const char **out_str, uint32_t *out_len,
1159             bool *out_utf8, double timeout) {
1160             uint64_t id;
1161             struct timespec deadline;
1162 2548           int has_deadline = (timeout > 0);
1163 2548 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1164              
1165 2548           int r = reqrep_send_wait(h, req_str, req_len, req_utf8, &id, timeout);
1166 2548 50         if (r != 1) return r;
1167              
1168 2548           double get_timeout = timeout;
1169 2548 100         if (has_deadline) {
1170             struct timespec now;
1171 1801           clock_gettime(CLOCK_MONOTONIC, &now);
1172 1801           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1173 1801           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1174 1801 50         if (get_timeout <= 0) {
1175 0           reqrep_cancel(h, id);
1176 0           return 0;
1177             }
1178             }
1179              
1180 2548           r = reqrep_get_wait(h, id, out_str, out_len, out_utf8, get_timeout);
1181 2548 100         if (r != 1) {
1182 1           reqrep_cancel(h, id);
1183             /* If reply arrived between timeout and cancel, drain to free the slot */
1184             const char *discard; uint32_t dlen; bool dutf8;
1185 1           reqrep_try_get(h, id, &discard, &dlen, &dutf8);
1186             }
1187 2548           return r;
1188             }
1189              
1190             /* Count response slots owned by this process. Approximate under contention;
1191             * uses ACQUIRE on state so a slot we just acquired (RELEASE on generation
1192             * orders the prior owner_pid store) is reliably counted. */
1193 10           static uint32_t reqrep_pending(ReqRepHandle *h) {
1194 10           uint32_t mypid = (uint32_t)getpid();
1195 10           uint32_t count = 0;
1196 78 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1197 68           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1198 68           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1199 68 100         if ((state == RESP_ACQUIRED || state == RESP_READY) &&
    50          
1200 7 50         __atomic_load_n(&slot->owner_pid, __ATOMIC_ACQUIRE) == mypid)
1201 7           count++;
1202             }
1203 10           return count;
1204             }
1205              
1206             /* ================================================================
1207             * Queue state
1208             * ================================================================ */
1209              
1210 40           static inline uint64_t reqrep_size(ReqRepHandle *h) {
1211 40           ReqRepHeader *hdr = h->hdr;
1212             /* Read head first, then tail: guarantees tail >= head_snapshot for a
1213             * monotonic queue, so the unsigned subtract never wraps below zero
1214             * (race-snapshotted size is at most stale, never negative). */
1215 40           uint64_t head = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1216 40           uint64_t tail = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1217 40 50         return (tail >= head) ? (tail - head) : 0;
1218             }
1219              
1220 1           static void reqrep_clear(ReqRepHandle *h) {
1221 1           ReqRepHeader *hdr = h->hdr;
1222 1           reqrep_mutex_lock(hdr);
1223 1           hdr->req_head = 0;
1224 1           hdr->req_tail = 0;
1225 1           hdr->arena_wpos = 0;
1226 1           hdr->arena_used = 0;
1227 1           reqrep_mutex_unlock(hdr);
1228              
1229             /* Release all in-flight response slots so get_wait callers unblock.
1230             * Retry CAS if reply races us (ACQUIRED→READY between load and CAS). */
1231 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1232 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1233 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1234 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1235 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1236             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1237 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1238 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1239 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1240 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1241 2           break;
1242             }
1243             /* CAS failed — state updated with actual value; retry if still valid */
1244             }
1245             }
1246              
1247 1           reqrep_wake_slot_waiters(hdr);
1248 1           reqrep_wake_producers(hdr);
1249 1           reqrep_wake_consumers(hdr);
1250 1           }
1251              
1252 0           static inline int reqrep_sync(ReqRepHandle *h) {
1253 0           return msync(h->hdr, h->mmap_size, MS_SYNC);
1254             }
1255              
1256             /* ================================================================
1257             * eventfd — event-loop integration
1258             *
1259             * Two separate eventfds:
1260             * notify_fd — request notification (client -> server: "new request")
1261             * reply_fd — reply notification (server -> client: "response ready")
1262             * ================================================================ */
1263              
1264 6           static inline int reqrep_eventfd_create(ReqRepHandle *h) {
1265 6 50         if (h->notify_fd >= 0) return h->notify_fd;
1266 6           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1267 6           return h->notify_fd;
1268             }
1269              
1270 2           static inline void reqrep_eventfd_set(ReqRepHandle *h, int fd) {
1271 2 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    0          
1272 0           close(h->notify_fd);
1273 2           h->notify_fd = fd;
1274 2           }
1275              
1276 3           static inline void reqrep_notify(ReqRepHandle *h) {
1277 3 50         if (h->notify_fd >= 0) {
1278 3           uint64_t one = 1;
1279 3           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1280             }
1281 3           }
1282              
1283 2           static inline int64_t reqrep_eventfd_consume(ReqRepHandle *h) {
1284 2 50         if (h->notify_fd < 0) return -1;
1285 2           uint64_t val = 0;
1286 2 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1287 2           return (int64_t)val;
1288             }
1289              
1290 9           static inline int reqrep_reply_eventfd_create(ReqRepHandle *h) {
1291 9 50         if (h->reply_fd >= 0) return h->reply_fd;
1292 9           h->reply_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1293 9           return h->reply_fd;
1294             }
1295              
1296 1           static inline void reqrep_reply_eventfd_set(ReqRepHandle *h, int fd) {
1297 1 50         if (h->reply_fd >= 0 && h->reply_fd != fd)
    0          
1298 0           close(h->reply_fd);
1299 1           h->reply_fd = fd;
1300 1           }
1301              
1302 4           static inline void reqrep_reply_notify(ReqRepHandle *h) {
1303 4 50         if (h->reply_fd >= 0) {
1304 4           uint64_t one = 1;
1305 4           ssize_t __attribute__((unused)) rc = write(h->reply_fd, &one, sizeof(one));
1306             }
1307 4           }
1308              
1309 2           static inline int64_t reqrep_reply_eventfd_consume(ReqRepHandle *h) {
1310 2 50         if (h->reply_fd < 0) return -1;
1311 2           uint64_t val = 0;
1312 2 50         if (read(h->reply_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1313 2           return (int64_t)val;
1314             }
1315              
1316             /* ================================================================
1317             * Int mode: lock-free Vyukov MPMC request queue, inline int64 response
1318             * ================================================================ */
1319              
1320             /* Returns 0 on success, -1 on overflow. */
1321 8           static int reqrep_int_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
1322             uint32_t *out_req_slots_off, uint32_t *out_resp_off,
1323             uint32_t *out_resp_stride, uint64_t *out_total_size) {
1324 8           uint32_t req_slots_off = sizeof(ReqRepHeader);
1325 8           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqIntSlot);
1326 8           uint32_t resp_stride = (sizeof(RespSlotHeader) + sizeof(int64_t) + 63) & ~63u;
1327 8           uint64_t resp_off = (slots_end + 63) & ~(uint64_t)63;
1328 8 50         if (resp_off > UINT32_MAX) return -1;
1329 8           *out_req_slots_off = req_slots_off;
1330 8           *out_resp_off = (uint32_t)resp_off;
1331 8           *out_resp_stride = resp_stride;
1332 8           *out_total_size = resp_off + (uint64_t)resp_slots_n * resp_stride;
1333 8           return 0;
1334             }
1335              
1336 7           static void reqrep_int_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
1337             uint64_t total_size, uint32_t req_slots_off,
1338             uint32_t resp_off, uint32_t resp_stride) {
1339 7           ReqRepHeader *hdr = (ReqRepHeader *)base;
1340 7           memset(hdr, 0, sizeof(ReqRepHeader));
1341 7           hdr->magic = REQREP_MAGIC;
1342 7           hdr->version = REQREP_VERSION;
1343 7           hdr->mode = REQREP_MODE_INT;
1344 7           hdr->req_cap = req_cap;
1345 7           hdr->total_size = total_size;
1346 7           hdr->req_slots_off = req_slots_off;
1347 7           hdr->resp_slots = resp_slots_n;
1348 7           hdr->resp_data_max = sizeof(int64_t);
1349 7           hdr->resp_off = resp_off;
1350 7           hdr->resp_stride = resp_stride;
1351              
1352             /* Vyukov: init sequence numbers */
1353 7           ReqIntSlot *slots = (ReqIntSlot *)((char *)base + req_slots_off);
1354 209 100         for (uint32_t i = 0; i < req_cap; i++)
1355 202           slots[i].sequence = i;
1356              
1357             /* Init response slots */
1358 58 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
1359 51           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
1360 51           memset(rs, 0, sizeof(RespSlotHeader));
1361             }
1362              
1363 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1364 7           }
1365              
1366 7           static ReqRepHandle *reqrep_create_int(const char *path, uint32_t req_cap,
1367             uint32_t resp_slots_n, char *errbuf) {
1368 7 50         if (errbuf) errbuf[0] = '\0';
1369 7           req_cap = reqrep_next_pow2(req_cap);
1370 7 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1371 7 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1372              
1373             uint32_t req_slots_off, resp_off, resp_stride;
1374             uint64_t total_size;
1375 7 50         if (reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1376             &resp_off, &resp_stride, &total_size) < 0) {
1377 0 0         REQREP_ERR("layout overflow: req_cap too large for uint32 offsets");
1378 0           return NULL;
1379             }
1380              
1381 7           int anonymous = (path == NULL);
1382             size_t map_size;
1383             void *base;
1384              
1385 7 50         if (anonymous) {
1386 0           map_size = (size_t)total_size;
1387 0           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
1388             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
1389 0 0         if (base == MAP_FAILED) { REQREP_ERR("mmap(anonymous): %s", strerror(errno)); return NULL; }
    0          
1390             } else {
1391 7           int fd = open(path, O_RDWR | O_CREAT, 0666);
1392 7 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
1393 7 50         if (flock(fd, LOCK_EX) < 0) { REQREP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
1394             struct stat st;
1395 7 50         if (fstat(fd, &st) < 0) { REQREP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1396 7           int is_new = (st.st_size == 0);
1397 7 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
1398 0 0         REQREP_ERR("%s: file too small", path); flock(fd, LOCK_UN); close(fd); return NULL;
1399             }
1400 7 100         if (is_new && ftruncate(fd, (off_t)total_size) < 0) {
    50          
1401 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
1402             }
1403 7 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
1404 7           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1405 7 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1406 7 100         if (!is_new) {
1407 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, REQREP_MODE_INT)) {
1408 0 0         REQREP_ERR("%s: invalid or incompatible", path); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1409             }
1410 1           flock(fd, LOCK_UN); close(fd);
1411 1           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
1412 1 50         if (!h) { munmap(base, map_size); return NULL; }
1413 1           return h;
1414             }
1415 6           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1416             req_slots_off, resp_off, resp_stride);
1417 6           flock(fd, LOCK_UN); close(fd);
1418 6           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
1419 6 50         if (!h) { munmap(base, map_size); return NULL; }
1420 6           return h;
1421             }
1422              
1423 0           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1424             req_slots_off, resp_off, resp_stride);
1425 0           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
1426 0 0         if (!h) { munmap(base, map_size); return NULL; }
1427 0           return h;
1428             }
1429              
1430 1           static ReqRepHandle *reqrep_create_int_memfd(const char *name, uint32_t req_cap,
1431             uint32_t resp_slots_n, char *errbuf) {
1432 1 50         if (errbuf) errbuf[0] = '\0';
1433 1           req_cap = reqrep_next_pow2(req_cap);
1434 1 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1435 1 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1436              
1437             uint32_t req_slots_off, resp_off, resp_stride;
1438             uint64_t total_size;
1439 1 50         if (reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1440             &resp_off, &resp_stride, &total_size) < 0) {
1441 0 0         REQREP_ERR("layout overflow: req_cap too large for uint32 offsets");
1442 0           return NULL;
1443             }
1444              
1445 1 50         int fd = memfd_create(name ? name : "reqrep_int", MFD_CLOEXEC | MFD_ALLOW_SEALING);
1446 1 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1447 1 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1448 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
1449             }
1450 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
1451 1           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1452 1 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
1453              
1454 1           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1455             req_slots_off, resp_off, resp_stride);
1456 1           ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
1457 1 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
1458 1           return h;
1459             }
1460              
1461             /* --- Int request queue: lock-free Vyukov MPMC --- */
1462              
1463 40           static inline int reqrep_int_try_send(ReqRepHandle *h, int64_t value, uint64_t *out_id) {
1464 40           int32_t rslot = reqrep_slot_acquire(h);
1465 40 100         if (rslot < 0) return -3;
1466              
1467 39           RespSlotHeader *rs = reqrep_resp_slot(h, (uint32_t)rslot);
1468 39           uint32_t gen = __atomic_load_n(&rs->generation, __ATOMIC_ACQUIRE);
1469              
1470 39           ReqRepHeader *hdr = h->hdr;
1471 39           ReqIntSlot *slots = (ReqIntSlot *)((char *)h->hdr + hdr->req_slots_off);
1472 39           uint32_t mask = h->req_cap_mask;
1473 39           uint64_t pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1474              
1475 0           for (;;) {
1476 39           ReqIntSlot *slot = &slots[pos & mask];
1477 39           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1478 39           int64_t diff = (int64_t)seq - (int64_t)pos;
1479 39 100         if (diff == 0) {
1480 38 50         if (__atomic_compare_exchange_n(&hdr->req_tail, &pos, pos + 1,
1481             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1482 38           slot->value = value;
1483 38           slot->resp_slot = (uint32_t)rslot;
1484 38           slot->resp_gen = gen;
1485 38           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
1486 38           __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
1487 38           reqrep_wake_consumers(hdr);
1488 38           *out_id = REQREP_MAKE_ID((uint32_t)rslot, gen);
1489 38           return 1;
1490             }
1491 1 50         } else if (diff < 0) {
1492 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
1493 1           reqrep_slot_release(h, (uint32_t)rslot);
1494 1           return 0;
1495             } else {
1496 0           pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1497             }
1498             }
1499             }
1500              
1501 8           static int reqrep_int_send_wait(ReqRepHandle *h, int64_t value,
1502             uint64_t *out_id, double timeout) {
1503 8           int r = reqrep_int_try_send(h, value, out_id);
1504 8 50         if (r == 1) return 1;
1505 0 0         if (timeout == 0) return r;
1506 0           ReqRepHeader *hdr = h->hdr;
1507             struct timespec deadline, remaining;
1508 0           int has_deadline = (timeout > 0);
1509 0 0         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1510 0           for (;;) {
1511             /* Wait on slot_futex if no slots (-3), send_futex if queue full (0) */
1512 0 0         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
1513 0 0         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
1514              
1515 0           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
1516 0           r = reqrep_int_try_send(h, value, out_id);
1517 0 0         if (r == 1) return 1;
1518              
1519 0           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1520 0           struct timespec *pts = NULL;
1521 0 0         if (has_deadline) {
1522 0 0         if (!reqrep_remaining_time(&deadline, &remaining)) {
1523 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1524 0           return r;
1525             }
1526 0           pts = &remaining;
1527             }
1528 0           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
1529 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1530 0           r = reqrep_int_try_send(h, value, out_id);
1531 0 0         if (r == 1) return 1;
1532 0 0         if (rc == -1 && errno == ETIMEDOUT) return r;
    0          
1533             }
1534             }
1535              
1536 46           static inline int reqrep_int_try_recv(ReqRepHandle *h, int64_t *out_value, uint64_t *out_id) {
1537 46           ReqRepHeader *hdr = h->hdr;
1538 46           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1539 46           uint32_t mask = h->req_cap_mask;
1540 46           uint64_t pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1541 0           for (;;) {
1542 46           ReqIntSlot *slot = &slots[pos & mask];
1543 46           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1544 46           int64_t diff = (int64_t)seq - (int64_t)(pos + 1);
1545 46 100         if (diff == 0) {
1546 36 50         if (__atomic_compare_exchange_n(&hdr->req_head, &pos, pos + 1,
1547             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1548 36           *out_value = slot->value;
1549 36           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
1550 36           __atomic_store_n(&slot->sequence, pos + h->req_cap, __ATOMIC_RELEASE);
1551 36           reqrep_wake_producers(hdr);
1552 36           return 1;
1553             }
1554 10 50         } else if (diff < 0) {
1555 10           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
1556 10           return 0;
1557             } else {
1558 0           pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1559             }
1560             }
1561             }
1562              
1563 7           static int reqrep_int_recv_wait(ReqRepHandle *h, int64_t *out_value,
1564             uint64_t *out_id, double timeout) {
1565 7 100         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1566 4 50         if (timeout == 0) return 0;
1567 4           ReqRepHeader *hdr = h->hdr;
1568             struct timespec deadline, remaining;
1569 4           int has_deadline = (timeout > 0);
1570 4 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1571 0           for (;;) {
1572 4           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
1573 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1574 4           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1575 4           struct timespec *pts = NULL;
1576 4 50         if (has_deadline) {
1577 4 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
1578 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1579 0           return 0;
1580             }
1581 4           pts = &remaining;
1582             }
1583 4           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
1584 4           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1585 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1586 0 0         if (rc == -1 && errno == ETIMEDOUT) return 0;
    0          
1587             }
1588             }
1589              
1590             /* --- Int response: inline int64 in resp data area --- */
1591              
1592             /* Int reply: same TOCTOU stale-detection fix as Str reply. See reqrep_reply
1593             * for the race description and rationale. */
1594 35           static int reqrep_int_reply(ReqRepHandle *h, uint64_t id, int64_t value) {
1595 35           uint32_t slot_idx = REQREP_ID_SLOT(id);
1596 35           uint32_t expected_gen = REQREP_ID_GEN(id);
1597 35 50         if (slot_idx >= h->resp_slots) return -1;
1598              
1599 35           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1600 35           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1601 35 100         if (state != RESP_ACQUIRED) return -2;
1602 33 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
1603              
1604 32           *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader)) = value;
1605              
1606             /* Tightened gen recheck before the publishing CAS. */
1607 32 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen)
1608 0           return -2;
1609              
1610 32           uint32_t expected_state = RESP_ACQUIRED;
1611 32 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
1612             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
1613 0           return -2;
1614              
1615             /* Post-CAS verification — invalidate the wrong owner if we raced. */
1616 32 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) {
1617 0           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1618 0           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1619 0           __atomic_store_n(&slot->state, RESP_FREE, __ATOMIC_RELEASE);
1620 0 0         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1621 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1622 0           reqrep_wake_slot_waiters(h->hdr);
1623 0           return -2;
1624             }
1625              
1626 32 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1627 5           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
1628 32           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
1629 32           return 1;
1630             }
1631              
1632 40           static int reqrep_int_try_get(ReqRepHandle *h, uint64_t id, int64_t *out_value) {
1633 40           uint32_t slot_idx = REQREP_ID_SLOT(id);
1634 40           uint32_t expected_gen = REQREP_ID_GEN(id);
1635 40 50         if (slot_idx >= h->resp_slots) return -1;
1636              
1637 40           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1638 40           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1639 40 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
1640 39 100         if (state != RESP_READY) return 0;
1641              
1642 32           *out_value = *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader));
1643             /* CAS READY→FREE — see reqrep_try_get release comment. */
1644 32           reqrep_slot_release_from(h, slot_idx, RESP_READY);
1645 32           return 1;
1646             }
1647              
1648 8           static int reqrep_int_get_wait(ReqRepHandle *h, uint64_t id, int64_t *out_value,
1649             double timeout) {
1650 8           int r = reqrep_int_try_get(h, id, out_value);
1651 8 100         if (r != 0) return r;
1652 6 50         if (timeout == 0) return 0;
1653 6           uint32_t slot_idx = REQREP_ID_SLOT(id);
1654 6           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1655             struct timespec deadline, remaining;
1656 6           int has_deadline = (timeout > 0);
1657 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1658 1           for (;;) {
1659 7           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1660 7 50         if (state == RESP_READY)
1661 0           return reqrep_int_try_get(h, id, out_value);
1662 7           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1663 7 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
1664 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1665 0           return -4;
1666             }
1667 7           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1668 7 50         if (state == RESP_READY) {
1669 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1670 0           return reqrep_int_try_get(h, id, out_value);
1671             }
1672 7           struct timespec *pts = NULL;
1673 7 100         if (has_deadline) {
1674 2 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
1675 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1676 1           return 0;
1677             }
1678 1           pts = &remaining;
1679             }
1680 6           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
1681 6           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1682 6           r = reqrep_int_try_get(h, id, out_value);
1683 6 100         if (r != 0) return r;
1684             }
1685             }
1686              
1687 6           static int reqrep_int_request(ReqRepHandle *h, int64_t req_value, int64_t *out_value,
1688             double timeout) {
1689             uint64_t id;
1690             struct timespec deadline;
1691 6           int has_deadline = (timeout > 0);
1692 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1693 6           int r = reqrep_int_send_wait(h, req_value, &id, timeout);
1694 6 50         if (r != 1) return r;
1695 6           double get_timeout = timeout;
1696 6 100         if (has_deadline) {
1697             struct timespec now;
1698 1           clock_gettime(CLOCK_MONOTONIC, &now);
1699 1           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1700 1           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1701 1 50         if (get_timeout <= 0) { reqrep_cancel(h, id); return 0; }
1702             }
1703 6           r = reqrep_int_get_wait(h, id, out_value, get_timeout);
1704 6 100         if (r != 1) {
1705 1           reqrep_cancel(h, id);
1706             int64_t discard;
1707 1           reqrep_int_try_get(h, id, &discard);
1708             }
1709 6           return r;
1710             }
1711              
1712 6           static inline uint64_t reqrep_int_size(ReqRepHandle *h) {
1713             /* Read head before tail (lock-free Vyukov): producer-tail can only
1714             * grow past head, so snapshot stays non-negative. See reqrep_size. */
1715 6           uint64_t head = __atomic_load_n(&h->hdr->req_head, __ATOMIC_RELAXED);
1716 6           uint64_t tail = __atomic_load_n(&h->hdr->req_tail, __ATOMIC_RELAXED);
1717 6 50         return (tail >= head) ? (tail - head) : 0;
1718             }
1719              
1720 1           static void reqrep_int_clear(ReqRepHandle *h) {
1721 1           ReqRepHeader *hdr = h->hdr;
1722              
1723             /* Reset Vyukov queue: head/tail to 0, reinit sequence numbers.
1724             * Use atomic stores — concurrent readers use atomic loads on these. */
1725 1           __atomic_store_n(&hdr->req_head, 0, __ATOMIC_RELAXED);
1726 1           __atomic_store_n(&hdr->req_tail, 0, __ATOMIC_RELAXED);
1727 1           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1728 17 100         for (uint32_t i = 0; i < h->req_cap; i++)
1729 16           __atomic_store_n(&slots[i].sequence, (uint64_t)i, __ATOMIC_RELAXED);
1730 1           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1731              
1732             /* Release all in-flight response slots (same as Str clear) */
1733 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1734 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1735 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1736 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1737 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1738             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1739 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1740 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1741 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1742 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1743 2           break;
1744             }
1745             }
1746             }
1747              
1748 1           reqrep_wake_slot_waiters(hdr);
1749 1           reqrep_wake_producers(hdr);
1750 1           reqrep_wake_consumers(hdr);
1751 1           }
1752              
1753             #endif /* REQREP_H */