File Coverage

reqrep.h
Criterion Covered Total %
statement 746 890 83.8
branch 323 614 52.6
condition n/a
subroutine n/a
pod n/a
total 1069 1504 71.0


line stmt bran cond sub pod time code
1             /*
2             * reqrep.h -- Shared-memory request/response IPC for Linux
3             *
4             * Request queue (client -> server) with circular arena for variable-length
5             * request data, plus per-request response slots for targeted reply delivery.
6             *
7             * Uses file-backed mmap(MAP_SHARED) for cross-process sharing,
8             * futex for blocking wait, and PID-based stale lock recovery.
9             */
10              
11             #ifndef REQREP_H
12             #define REQREP_H
13              
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30              
31             /* ================================================================
32             * Constants
33             * ================================================================ */
34              
35             #define REQREP_MAGIC 0x52525331U /* "RRS1" */
36             #define REQREP_VERSION 1
37             #define REQREP_ERR_BUFLEN 256
38             #define REQREP_SPIN_LIMIT 32
39             #define REQREP_LOCK_TIMEOUT_SEC 2
40              
41             #define REQREP_UTF8_FLAG 0x80000000U
42             #define REQREP_STR_LEN_MASK 0x7FFFFFFFU
43              
44             #define RESP_FREE 0
45             #define RESP_ACQUIRED 1
46             #define RESP_READY 2
47              
48             #define REQREP_MODE_STR 0
49             #define REQREP_MODE_INT 1
50              
51             /* Pack/unpack slot index + generation into a 64-bit request ID.
52             * Prevents ABA: cancelled slot re-acquired by another client
53             * won't match the generation stored in the original request. */
54             #define REQREP_MAKE_ID(slot, gen) (((uint64_t)(gen) << 32) | (uint64_t)(slot))
55             #define REQREP_ID_SLOT(id) ((uint32_t)((id) & 0xFFFFFFFFULL))
56             #define REQREP_ID_GEN(id) ((uint32_t)((id) >> 32))
57              
58             /* ================================================================
59             * Header (256 bytes = 4 cache lines, lives at start of mmap)
60             * ================================================================ */
61              
62             typedef struct {
63             /* ---- Cache line 0 (0-63): immutable after create ---- */
64             uint32_t magic; /* 0 */
65             uint32_t version; /* 4 */
66             uint32_t mode; /* 8: REQREP_MODE_STR or REQREP_MODE_INT */
67             uint32_t req_cap; /* 12: request queue capacity (power of 2) */
68             uint64_t total_size; /* 16: mmap size */
69             uint32_t req_slots_off; /* 24: offset to request slot array */
70             uint32_t req_arena_off; /* 28: offset to request arena */
71             uint32_t req_arena_cap; /* 32: arena byte capacity */
72             uint32_t resp_slots; /* 36: number of response slots */
73             uint32_t resp_data_max; /* 40: max response data bytes per slot */
74             uint32_t resp_off; /* 44: offset to response slot area */
75             uint32_t resp_stride; /* 48: bytes per response slot (cache-aligned) */
76             uint8_t _pad0[12]; /* 52-63 */
77              
78             /* ---- Cache line 1 (64-127): recv hot (server) ---- */
79             uint64_t req_head; /* 64: consumer position */
80             uint32_t recv_waiters; /* 72: blocked servers */
81             uint32_t recv_futex; /* 76: futex for recv wakeup */
82             uint8_t _pad1[48]; /* 80-127 */
83              
84             /* ---- Cache line 2 (128-191): send hot (client) ---- */
85             uint64_t req_tail; /* 128: producer position */
86             uint32_t send_waiters; /* 136: blocked clients */
87             uint32_t send_futex; /* 140: futex for send wakeup */
88             uint8_t _pad2[48]; /* 144-191 */
89              
90             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
91             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
92             uint32_t mutex_waiters; /* 196 */
93             uint32_t arena_wpos; /* 200: arena write position */
94             uint32_t arena_used; /* 204: arena bytes consumed */
95             uint32_t resp_hint; /* 208: hint for slot scan */
96             uint32_t stat_recoveries; /* 212 */
97             uint32_t slot_futex; /* 216: futex for slot availability */
98             uint32_t slot_waiters; /* 220: threads waiting for free slot */
99             uint64_t stat_requests; /* 224 */
100             uint64_t stat_replies; /* 232 */
101             uint64_t stat_send_full; /* 240 */
102             uint32_t stat_recv_empty; /* 248 */
103             uint32_t _pad3; /* 252-255 */
104             } ReqRepHeader;
105              
106             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
107             _Static_assert(sizeof(ReqRepHeader) == 256, "ReqRepHeader must be 256 bytes");
108             #endif
109              
110             /* ================================================================
111             * Slot types
112             * ================================================================ */
113              
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on recv */
118             uint32_t resp_slot; /* response slot index */
119             uint32_t resp_gen; /* generation at time of slot acquire (ABA guard) */
120             uint32_t _rpad;
121             } ReqSlot; /* 24 bytes (Str mode) */
122              
123             /* Int request slot: Vyukov sequence + value + response routing */
124             typedef struct {
125             uint64_t sequence;
126             int64_t value;
127             uint32_t resp_slot;
128             uint32_t resp_gen;
129             } ReqIntSlot; /* 24 bytes (Int mode, lock-free) */
130              
131             typedef struct {
132             uint32_t state; /* futex: RESP_FREE=0, RESP_ACQUIRED=1, RESP_READY=2 */
133             uint32_t waiters; /* futex waiters on this slot */
134             uint32_t owner_pid; /* PID of client that acquired (for stale recovery) */
135             uint32_t resp_len; /* response data length */
136             uint32_t resp_flags; /* bit 0 = UTF-8 */
137             uint32_t generation; /* incremented on each acquire (ABA guard) */
138             uint32_t _rpad[2]; /* pad to 32 bytes */
139             } RespSlotHeader; /* 32 bytes + data */
140              
141             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
142             _Static_assert(sizeof(RespSlotHeader) == 32, "RespSlotHeader must be 32 bytes");
143             #endif
144              
145             /* ================================================================
146             * Process-local handle
147             * ================================================================ */
148              
149             typedef struct {
150             ReqRepHeader *hdr;
151             ReqSlot *req_slots;
152             char *req_arena;
153             uint8_t *resp_area; /* base of response slots region */
154             size_t mmap_size;
155             uint32_t req_cap;
156             uint32_t req_cap_mask; /* req_cap - 1 */
157             uint32_t req_arena_cap;
158             uint32_t resp_slots;
159             uint32_t resp_data_max;
160             uint32_t resp_stride;
161             char *copy_buf;
162             uint32_t copy_buf_cap;
163             char *path;
164             int notify_fd; /* request notification eventfd, -1 if unset */
165             int reply_fd; /* reply notification eventfd, -1 if unset */
166             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
167             } ReqRepHandle;
168              
169             /* ================================================================
170             * Utility
171             * ================================================================ */
172              
173 58           static inline uint32_t reqrep_next_pow2(uint32_t v) {
174 58 50         if (v < 2) return 2;
175 58 50         if (v > 0x80000000U) return 0;
176 58           v--;
177 58           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
178 58           return v + 1;
179             }
180              
181 159           static inline void reqrep_spin_pause(void) {
182             #if defined(__x86_64__) || defined(__i386__)
183 159           __asm__ volatile("pause" ::: "memory");
184             #elif defined(__aarch64__)
185             __asm__ volatile("yield" ::: "memory");
186             #else
187             __asm__ volatile("" ::: "memory");
188             #endif
189 159           }
190              
191 4891           static inline int reqrep_ensure_copy_buf(ReqRepHandle *h, uint32_t needed) {
192 4891 100         if (needed <= h->copy_buf_cap) return 1;
193 65 50         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
194 72 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
195 65           char *nb = (char *)realloc(h->copy_buf, ns);
196 65 50         if (!nb) return 0;
197 65           h->copy_buf = nb;
198 65           h->copy_buf_cap = ns;
199 65           return 1;
200             }
201              
202 18825           static inline RespSlotHeader *reqrep_resp_slot(ReqRepHandle *h, uint32_t idx) {
203 18825           return (RespSlotHeader *)(h->resp_area + (uint64_t)idx * h->resp_stride);
204             }
205              
206             /* ================================================================
207             * Futex helpers
208             * ================================================================ */
209              
210             #define REQREP_MUTEX_WRITER_BIT 0x80000000U
211             #define REQREP_MUTEX_PID_MASK 0x7FFFFFFFU
212             #define REQREP_MUTEX_VAL(pid) (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))
213              
214 4           static inline int reqrep_pid_alive(uint32_t pid) {
215 4 50         if (pid == 0) return 1;
216 4 50         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
217             }
218              
219             static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };
220              
221 0           static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
222 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
223             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
224 0           return;
225 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
226 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
227 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
228             }
229              
230 6251           static inline void reqrep_mutex_lock(ReqRepHeader *hdr) {
231 6251           uint32_t mypid = REQREP_MUTEX_VAL((uint32_t)getpid());
232 6410           for (int spin = 0; ; spin++) {
233 6410           uint32_t expected = 0;
234 6410 100         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
235             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
236 6251           return;
237 159 50         if (__builtin_expect(spin < REQREP_SPIN_LIMIT, 1)) {
238 159           reqrep_spin_pause();
239 159           continue;
240             }
241 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
242 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
243 0 0         if (cur != 0) {
244 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
245             &reqrep_lock_timeout, NULL, 0);
246 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
247 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
248 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
249 0 0         if (val >= REQREP_MUTEX_WRITER_BIT) {
250 0           uint32_t pid = val & REQREP_MUTEX_PID_MASK;
251 0 0         if (!reqrep_pid_alive(pid))
252 0           reqrep_recover_stale_mutex(hdr, val);
253             }
254 0           spin = 0;
255 0           continue;
256             }
257             }
258 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
259 0           spin = 0;
260             }
261             }
262              
263 6251           static inline void reqrep_mutex_unlock(ReqRepHeader *hdr) {
264 6251           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
265 6251 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
266 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
267 6251           }
268              
269 2898           static inline void reqrep_wake_consumers(ReqRepHeader *hdr) {
270 2898 100         if (__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED) > 0) {
271 1467           __atomic_add_fetch(&hdr->recv_futex, 1, __ATOMIC_RELEASE);
272 1467           syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
273             }
274 2898           }
275              
276 2277           static inline void reqrep_wake_producers(ReqRepHeader *hdr) {
277 2277 50         if (__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED) > 0) {
278 0           __atomic_add_fetch(&hdr->send_futex, 1, __ATOMIC_RELEASE);
279 0           syscall(SYS_futex, &hdr->send_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
280             }
281 2277           }
282              
283 2886           static inline void reqrep_wake_slot_waiters(ReqRepHeader *hdr) {
284 2886 50         if (__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED) > 0) {
285 0           __atomic_add_fetch(&hdr->slot_futex, 1, __ATOMIC_RELEASE);
286 0           syscall(SYS_futex, &hdr->slot_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
287             }
288 2886           }
289              
290 2343           static inline int reqrep_remaining_time(const struct timespec *deadline,
291             struct timespec *remaining) {
292             struct timespec now;
293 2343           clock_gettime(CLOCK_MONOTONIC, &now);
294 2343           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
295 2343           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
296 2343 100         if (remaining->tv_nsec < 0) {
297 2336           remaining->tv_sec--;
298 2336           remaining->tv_nsec += 1000000000L;
299             }
300 2343           return remaining->tv_sec >= 0;
301             }
302              
303 4205           static inline void reqrep_make_deadline(double timeout, struct timespec *deadline) {
304 4205           clock_gettime(CLOCK_MONOTONIC, deadline);
305 4205           deadline->tv_sec += (time_t)timeout;
306 4205           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
307 4205 100         if (deadline->tv_nsec >= 1000000000L) {
308 1796           deadline->tv_sec++;
309 1796           deadline->tv_nsec -= 1000000000L;
310             }
311 4205           }
312              
313             /* ================================================================
314             * Response slot operations
315             * ================================================================ */
316              
317 2904           static int32_t reqrep_slot_acquire(ReqRepHandle *h) {
318 2904           uint32_t n = h->resp_slots;
319 2904           uint32_t hint = __atomic_load_n(&h->hdr->resp_hint, __ATOMIC_RELAXED);
320 2904           uint32_t mypid = (uint32_t)getpid();
321              
322 3136 100         for (uint32_t i = 0; i < n; i++) {
323 3134           uint32_t idx = (hint + i) % n;
324 3134           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
325 3134           uint32_t expected = RESP_FREE;
326 3134 100         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
327             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
328 2902           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
329 2902           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
330 2902           __atomic_store_n(&h->hdr->resp_hint, (idx + 1) % n, __ATOMIC_RELAXED);
331 2902           return (int32_t)idx;
332             }
333             }
334              
335             /* Recover stale slots from dead processes (both ACQUIRED and READY) */
336 6 100         for (uint32_t i = 0; i < n; i++) {
337 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
338 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
339 4 50         if (state == RESP_ACQUIRED || state == RESP_READY) {
    0          
340 4           uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED);
341 4 50         if (pid && !reqrep_pid_alive(pid)) {
    50          
342 0 0         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
343             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
344 0           __atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
345 0           uint32_t expected = RESP_FREE;
346 0 0         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
347             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
348 0           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
349 0           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
350 0           return (int32_t)i;
351             }
352 0           reqrep_wake_slot_waiters(h->hdr);
353             }
354             }
355             }
356             }
357              
358 2           return -1;
359             }
360              
361 2677           static inline void reqrep_slot_release(ReqRepHandle *h, uint32_t idx) {
362 2677           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
363 2677           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
364 2677           __atomic_store_n(&slot->state, RESP_FREE, __ATOMIC_RELEASE);
365 2677           reqrep_wake_slot_waiters(h->hdr);
366 2677           }
367              
368             /* ================================================================
369             * Create / Open / Close
370             * ================================================================ */
371              
372             #define REQREP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, REQREP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
373              
374 111           static ReqRepHandle *reqrep_setup_handle(void *base, size_t map_size,
375             const char *path, int backing_fd) {
376 111           ReqRepHeader *hdr = (ReqRepHeader *)base;
377 111           ReqRepHandle *h = (ReqRepHandle *)calloc(1, sizeof(ReqRepHandle));
378 111 50         if (!h) return NULL;
379              
380 111           h->hdr = hdr;
381 111           h->req_slots = (ReqSlot *)((char *)base + hdr->req_slots_off);
382 111           h->req_arena = (char *)base + hdr->req_arena_off;
383 111           h->resp_area = (uint8_t *)base + hdr->resp_off;
384 111           h->mmap_size = map_size;
385 111           h->req_cap = hdr->req_cap;
386 111           h->req_cap_mask = hdr->req_cap - 1;
387 111           h->req_arena_cap = hdr->req_arena_cap;
388 111           h->resp_slots = hdr->resp_slots;
389 111           h->resp_data_max = hdr->resp_data_max;
390 111           h->resp_stride = hdr->resp_stride;
391 111 100         h->path = path ? strdup(path) : NULL;
392 111           h->notify_fd = -1;
393 111           h->reply_fd = -1;
394 111           h->backing_fd = backing_fd;
395              
396 111           return h;
397             }
398              
399 55           static int reqrep_validate_header(ReqRepHeader *hdr, size_t file_size, uint32_t expected_mode) {
400 55 50         if (hdr->magic != REQREP_MAGIC) return 0;
401 55 50         if (hdr->version != REQREP_VERSION) return 0;
402 55 50         if (hdr->mode != expected_mode) return 0;
403 55 50         if (hdr->req_cap == 0 || (hdr->req_cap & (hdr->req_cap - 1)) != 0) return 0;
    50          
404 55 50         if (hdr->total_size != (uint64_t)file_size) return 0;
405 55 50         if (hdr->req_slots_off != sizeof(ReqRepHeader)) return 0;
406 55 50         if (hdr->resp_slots == 0) return 0;
407 55 50         if (hdr->resp_stride < sizeof(RespSlotHeader)) return 0;
408 55 100         if (expected_mode == REQREP_MODE_STR) {
409 45 50         if ((uint64_t)hdr->req_arena_off + hdr->req_arena_cap > hdr->total_size) return 0;
410             }
411 55 50         if ((uint64_t)hdr->resp_off + (uint64_t)hdr->resp_slots * hdr->resp_stride > hdr->total_size) return 0;
412 55           return 1;
413             }
414              
415 49           static void reqrep_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
416             uint32_t resp_data_max, uint64_t total_size,
417             uint32_t req_slots_off, uint32_t req_arena_off,
418             uint32_t req_arena_cap, uint32_t resp_off,
419             uint32_t resp_stride) {
420 49           ReqRepHeader *hdr = (ReqRepHeader *)base;
421 49           memset(hdr, 0, sizeof(ReqRepHeader));
422 49           hdr->magic = REQREP_MAGIC;
423 49           hdr->version = REQREP_VERSION;
424 49           hdr->mode = REQREP_MODE_STR;
425 49           hdr->req_cap = req_cap;
426 49           hdr->total_size = total_size;
427 49           hdr->req_slots_off = req_slots_off;
428 49           hdr->req_arena_off = req_arena_off;
429 49           hdr->req_arena_cap = req_arena_cap;
430 49           hdr->resp_slots = resp_slots_n;
431 49           hdr->resp_data_max = resp_data_max;
432 49           hdr->resp_off = resp_off;
433 49           hdr->resp_stride = resp_stride;
434              
435 943 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
436 894           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
437 894           memset(rs, 0, sizeof(RespSlotHeader));
438             }
439              
440 49           __atomic_thread_fence(__ATOMIC_SEQ_CST);
441 49           }
442              
443 50           static void reqrep_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
444             uint32_t resp_data_max, uint64_t arena_hint,
445             uint32_t *out_req_slots_off, uint32_t *out_req_arena_off,
446             uint32_t *out_req_arena_cap, uint32_t *out_resp_off,
447             uint32_t *out_resp_stride, uint64_t *out_total_size) {
448 50           uint32_t req_slots_off = sizeof(ReqRepHeader);
449 50           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqSlot);
450 50           uint32_t req_arena_off = (uint32_t)((slots_end + 7) & ~(uint64_t)7);
451              
452 50           uint32_t req_arena_cap = (uint32_t)arena_hint;
453 50 100         if (req_arena_cap < 4096) req_arena_cap = 4096;
454              
455 50           uint32_t resp_stride = (sizeof(RespSlotHeader) + resp_data_max + 63) & ~63u;
456 50           uint64_t resp_off_64 = ((uint64_t)req_arena_off + req_arena_cap + 63) & ~(uint64_t)63;
457 50           uint32_t resp_off = (uint32_t)resp_off_64;
458 50           uint64_t total_size = resp_off_64 + (uint64_t)resp_slots_n * resp_stride;
459              
460 50           *out_req_slots_off = req_slots_off;
461 50           *out_req_arena_off = req_arena_off;
462 50           *out_req_arena_cap = req_arena_cap;
463 50           *out_resp_off = resp_off;
464 50           *out_resp_stride = resp_stride;
465 50           *out_total_size = total_size;
466 50           }
467              
468 47           static ReqRepHandle *reqrep_create(const char *path, uint32_t req_cap,
469             uint32_t resp_slots_n, uint32_t resp_data_max,
470             uint64_t arena_hint, char *errbuf) {
471 47 50         if (errbuf) errbuf[0] = '\0';
472              
473 47           req_cap = reqrep_next_pow2(req_cap);
474 47 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
475 47 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
476              
477 47 100         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
478              
479             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
480             uint64_t total_size;
481 47           reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
482             &req_slots_off, &req_arena_off, &req_arena_cap,
483             &resp_off, &resp_stride, &total_size);
484              
485 47           int anonymous = (path == NULL);
486             size_t map_size;
487             void *base;
488              
489 47 50         if (anonymous) {
490 0           map_size = (size_t)total_size;
491 0           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
492             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
493 0 0         if (base == MAP_FAILED) {
494 0 0         REQREP_ERR("mmap(anonymous): %s", strerror(errno));
495 0           return NULL;
496             }
497             } else {
498 47           int fd = open(path, O_RDWR | O_CREAT, 0666);
499 48 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
500              
501 47 50         if (flock(fd, LOCK_EX) < 0) {
502 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
503 0           close(fd); return NULL;
504             }
505              
506             struct stat st;
507 47 50         if (fstat(fd, &st) < 0) {
508 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
509 0           flock(fd, LOCK_UN); close(fd); return NULL;
510             }
511              
512 47           int is_new = (st.st_size == 0);
513              
514 47 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
515 0 0         REQREP_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
516 0           flock(fd, LOCK_UN); close(fd); return NULL;
517             }
518              
519 47 100         if (is_new) {
520 46 50         if (ftruncate(fd, (off_t)total_size) < 0) {
521 0 0         REQREP_ERR("ftruncate(%s): %s", path, strerror(errno));
522 0           flock(fd, LOCK_UN); close(fd); return NULL;
523             }
524             }
525              
526 47 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
527 47           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
528 47 50         if (base == MAP_FAILED) {
529 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
530 0           flock(fd, LOCK_UN); close(fd); return NULL;
531             }
532              
533 47 100         if (!is_new) {
534 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, (size_t)st.st_size, REQREP_MODE_STR)) {
535 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
536 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
537             }
538 1           flock(fd, LOCK_UN);
539 1           close(fd);
540 1           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
541 1 50         if (!h) { munmap(base, map_size); return NULL; }
542 1           return h;
543             }
544              
545 46           flock(fd, LOCK_UN);
546 46           close(fd);
547             }
548              
549 46           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
550             req_slots_off, req_arena_off, req_arena_cap,
551             resp_off, resp_stride);
552              
553 46           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
554 46 50         if (!h) { munmap(base, map_size); return NULL; }
555 46           return h;
556             }
557              
558 49           static ReqRepHandle *reqrep_open(const char *path, uint32_t mode, char *errbuf) {
559 49 50         if (errbuf) errbuf[0] = '\0';
560 49 50         if (!path) { REQREP_ERR("path required"); return NULL; }
    0          
561              
562 49           int fd = open(path, O_RDWR);
563 49 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
564              
565 49 50         if (flock(fd, LOCK_EX) < 0) {
566 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
567 0           close(fd); return NULL;
568             }
569              
570             struct stat st;
571 49 50         if (fstat(fd, &st) < 0) {
572 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
573 0           flock(fd, LOCK_UN); close(fd); return NULL;
574             }
575              
576 49 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
577 0 0         REQREP_ERR("%s: file too small or not initialized", path);
578 0           flock(fd, LOCK_UN); close(fd); return NULL;
579             }
580              
581 49           size_t map_size = (size_t)st.st_size;
582 49           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
583 49 50         if (base == MAP_FAILED) {
584 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
585 0           flock(fd, LOCK_UN); close(fd); return NULL;
586             }
587              
588 49 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
589 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
590 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
591             }
592              
593 49           flock(fd, LOCK_UN);
594 49           close(fd);
595              
596 49           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
597 49 50         if (!h) { munmap(base, map_size); return NULL; }
598 49           return h;
599             }
600              
601 3           static ReqRepHandle *reqrep_create_memfd(const char *name, uint32_t req_cap,
602             uint32_t resp_slots_n, uint32_t resp_data_max,
603             uint64_t arena_hint, char *errbuf) {
604 3 50         if (errbuf) errbuf[0] = '\0';
605              
606 3           req_cap = reqrep_next_pow2(req_cap);
607 3 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
608 3 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
609              
610 3 50         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
611              
612             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
613             uint64_t total_size;
614 3           reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
615             &req_slots_off, &req_arena_off, &req_arena_cap,
616             &resp_off, &resp_stride, &total_size);
617              
618 3 50         int fd = memfd_create(name ? name : "reqrep", MFD_CLOEXEC);
619 3 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
620              
621 3 50         if (ftruncate(fd, (off_t)total_size) < 0) {
622 0 0         REQREP_ERR("ftruncate(memfd): %s", strerror(errno));
623 0           close(fd); return NULL;
624             }
625              
626 3           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
627 3 50         if (base == MAP_FAILED) {
628 0 0         REQREP_ERR("mmap(memfd): %s", strerror(errno));
629 0           close(fd); return NULL;
630             }
631              
632 3           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
633             req_slots_off, req_arena_off, req_arena_cap,
634             resp_off, resp_stride);
635              
636 3           ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
637 3 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
638 3           return h;
639             }
640              
641 4           static ReqRepHandle *reqrep_open_fd(int fd, uint32_t mode, char *errbuf) {
642 4 50         if (errbuf) errbuf[0] = '\0';
643              
644             struct stat st;
645 4 50         if (fstat(fd, &st) < 0) {
646 0 0         REQREP_ERR("fstat(fd=%d): %s", fd, strerror(errno));
647 0           return NULL;
648             }
649              
650 4 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
651 0 0         REQREP_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
652 0           return NULL;
653             }
654              
655 4           size_t map_size = (size_t)st.st_size;
656 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
657 4 50         if (base == MAP_FAILED) {
658 0 0         REQREP_ERR("mmap(fd=%d): %s", fd, strerror(errno));
659 0           return NULL;
660             }
661              
662 4 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
663 0 0         REQREP_ERR("fd %d: invalid or incompatible reqrep", fd);
664 0           munmap(base, map_size);
665 0           return NULL;
666             }
667              
668 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
669 4 50         if (myfd < 0) {
670 0 0         REQREP_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
671 0           munmap(base, map_size);
672 0           return NULL;
673             }
674              
675 4           ReqRepHandle *h = reqrep_setup_handle(base, map_size, NULL, myfd);
676 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
677 4           return h;
678             }
679              
680 111           static void reqrep_destroy(ReqRepHandle *h) {
681 111 50         if (!h) return;
682 111 100         if (h->notify_fd >= 0) close(h->notify_fd);
683 111 100         if (h->reply_fd >= 0) close(h->reply_fd);
684 111 100         if (h->backing_fd >= 0) close(h->backing_fd);
685 111 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
686 111           free(h->copy_buf);
687 111           free(h->path);
688 111           free(h);
689             }
690              
691             /* ================================================================
692             * Request queue operations (client -> server)
693             * ================================================================ */
694              
695             /* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
696 2863           static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
697             uint32_t len, bool utf8,
698             uint32_t resp_slot_idx, uint32_t resp_gen) {
699 2863           ReqRepHeader *hdr = h->hdr;
700              
701 2863 50         if (len > REQREP_STR_LEN_MASK) return -2;
702              
703 2863 100         if (hdr->req_tail - hdr->req_head >= h->req_cap) {
704 4           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
705 4           return 0;
706             }
707              
708 2859           uint32_t alloc = (len + 7) & ~7u;
709 2859 100         if (alloc == 0) alloc = 8;
710 2859           uint32_t pos = hdr->arena_wpos;
711 2859           uint64_t skip = alloc;
712              
713 2859 100         if ((uint64_t)pos + alloc > h->req_arena_cap) {
714 1           skip += h->req_arena_cap - pos;
715 1           pos = 0;
716             }
717              
718 2859 100         if ((uint64_t)hdr->arena_used + skip > h->req_arena_cap) {
719 1 50         if (hdr->req_tail == hdr->req_head) {
720 0           hdr->arena_wpos = 0;
721 0           hdr->arena_used = 0;
722 0           pos = 0;
723 0           skip = alloc;
724             } else {
725 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
726 1           return 0;
727             }
728             }
729              
730 2858           memcpy(h->req_arena + pos, str, len);
731              
732 2858           uint32_t idx = (uint32_t)(hdr->req_tail & h->req_cap_mask);
733 2858           ReqSlot *slot = &h->req_slots[idx];
734 2858           slot->arena_off = pos;
735 2858 100         slot->packed_len = len | (utf8 ? REQREP_UTF8_FLAG : 0);
736 2858           slot->arena_skip = (uint32_t)skip;
737 2858           slot->resp_slot = resp_slot_idx;
738 2858           slot->resp_gen = resp_gen;
739              
740 2858           hdr->arena_wpos = pos + alloc;
741 2858           hdr->arena_used += (uint32_t)skip;
742 2858           hdr->req_tail++;
743 2858           hdr->stat_requests++;
744 2858           return 1;
745             }
746              
747             /* Non-blocking send: acquire slot + push request.
748             * Returns 1=ok, 0=full, -2=too long, -3=no slots.
749             * On success, *out_id is the packed slot+generation ID. */
750 2864           static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
751             bool utf8, uint64_t *out_id) {
752 2864           int32_t slot = reqrep_slot_acquire(h);
753 2864 100         if (slot < 0) return -3;
754              
755 2863           RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
756 2863           uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);
757              
758 2863           reqrep_mutex_lock(h->hdr);
759 2863           int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
760 2863           reqrep_mutex_unlock(h->hdr);
761              
762 2863 100         if (r == 1) {
763 2858           reqrep_wake_consumers(h->hdr);
764 2858           *out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
765 2858           return 1;
766             }
767              
768 5           reqrep_slot_release(h, (uint32_t)slot);
769 5           return r;
770             }
771              
772             /* Blocking send with timeout. Returns 1=ok, 0=timeout, -2=too long, -3=no slots (timeout). */
773 2765           static int reqrep_send_wait(ReqRepHandle *h, const char *str, uint32_t len,
774             bool utf8, uint64_t *out_id, double timeout) {
775 2765           int r = reqrep_try_send(h, str, len, utf8, out_id);
776 2765 100         if (r == 1 || r == -2) return r;
    50          
777 1 50         if (timeout == 0) return r;
778              
779 1           ReqRepHeader *hdr = h->hdr;
780             struct timespec deadline, remaining;
781 1           int has_deadline = (timeout > 0);
782 1 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
783              
784 0           for (;;) {
785 1 50         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
786 1 50         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
787              
788 1           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
789 1           r = reqrep_try_send(h, str, len, utf8, out_id);
790 1 50         if (r == 1 || r == -2) return r;
    50          
791              
792 1           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
793 1           struct timespec *pts = NULL;
794 1 50         if (has_deadline) {
795 1 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
796 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
797 0           return r;
798             }
799 1           pts = &remaining;
800             }
801 1           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
802 1           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
803              
804 1           r = reqrep_try_send(h, str, len, utf8, out_id);
805 1 50         if (r == 1 || r == -2) return r;
    50          
806 1 50         if (rc == -1 && errno == ETIMEDOUT) return r;
    50          
807             }
808             }
809              
810             /* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
811 3406           static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
812             uint32_t *out_len, bool *out_utf8,
813             uint64_t *out_id) {
814 3406           ReqRepHeader *hdr = h->hdr;
815              
816 3406 100         if (hdr->req_tail == hdr->req_head) {
817 1154           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
818 1154           return 0;
819             }
820              
821 2252           uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
822 2252           ReqSlot *slot = &h->req_slots[idx];
823              
824 2252           uint32_t len = slot->packed_len & REQREP_STR_LEN_MASK;
825 2252           *out_utf8 = (slot->packed_len & REQREP_UTF8_FLAG) != 0;
826 2252           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
827              
828 2252 50         if (!reqrep_ensure_copy_buf(h, len + 1))
829 0           return -1;
830 2252 100         if (len > 0)
831 2248           memcpy(h->copy_buf, h->req_arena + slot->arena_off, len);
832 2252           h->copy_buf[len] = '\0';
833 2252           *out_str = h->copy_buf;
834 2252           *out_len = len;
835              
836 2252           hdr->arena_used -= slot->arena_skip;
837 2252 100         if (hdr->arena_used == 0)
838 1123           hdr->arena_wpos = 0;
839              
840 2252           hdr->req_head++;
841 2252           return 1;
842             }
843              
844             /* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
845 3378           static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
846             uint32_t *out_len, bool *out_utf8,
847             uint64_t *out_id) {
848 3378           reqrep_mutex_lock(h->hdr);
849 3378           int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
850 3378           reqrep_mutex_unlock(h->hdr);
851 3378 100         if (r == 1) reqrep_wake_producers(h->hdr);
852 3378           return r;
853             }
854              
855             /* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
856 2154           static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
857             uint32_t *out_len, bool *out_utf8,
858             uint64_t *out_id, double timeout) {
859 2154           int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
860 2154 100         if (r != 0) return r;
861 597 50         if (timeout == 0) return 0;
862              
863 597           ReqRepHeader *hdr = h->hdr;
864             struct timespec deadline, remaining;
865 597           int has_deadline = (timeout > 0);
866 597 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
867              
868 0           for (;;) {
869 597           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
870 597           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
871 597 100         if (r != 0) return r;
872              
873 536           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
874 536           struct timespec *pts = NULL;
875 536 50         if (has_deadline) {
876 536 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
877 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
878 0           return 0;
879             }
880 536           pts = &remaining;
881             }
882 536           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
883 536           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
884              
885 536           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
886 536 100         if (r != 0) return r;
887 5 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
888             }
889             }
890              
891             /* ================================================================
892             * Response operations (server -> client)
893             * ================================================================ */
894              
895             /* Write response to a response slot.
896             * Returns 1=ok, -1=bad slot, -2=stale (cancelled/recycled), -3=too long. */
897 2249           static int reqrep_reply(ReqRepHandle *h, uint64_t id,
898             const char *str, uint32_t len, bool utf8) {
899 2249           uint32_t slot_idx = REQREP_ID_SLOT(id);
900 2249           uint32_t expected_gen = REQREP_ID_GEN(id);
901 2249 50         if (slot_idx >= h->resp_slots) return -1;
902 2249 100         if (len > h->resp_data_max) return -3;
903              
904 2248           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
905 2248           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
906 2248 100         if (state != RESP_ACQUIRED) return -2;
907 2054 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
908              
909 2047           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
910 2047 100         if (len > 0) memcpy(data, str, len);
911 2047           slot->resp_len = len;
912 2047           slot->resp_flags = utf8 ? 1 : 0;
913              
914             /* CAS ACQUIRED→READY to prevent race with concurrent cancel */
915 2047           uint32_t expected_state = RESP_ACQUIRED;
916 2047 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
917             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
918 0           return -2;
919              
920 2047 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
921 1920           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
922              
923 2047           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
924 2047           return 1;
925             }
926              
927             /* Non-blocking get response. Returns 1=ok, 0=not ready, -1=bad slot, -2=OOM, -4=stale. */
928 5065           static int reqrep_try_get(ReqRepHandle *h, uint64_t id,
929             const char **out_str, uint32_t *out_len, bool *out_utf8) {
930 5065           uint32_t slot_idx = REQREP_ID_SLOT(id);
931 5065           uint32_t expected_gen = REQREP_ID_GEN(id);
932 5065 50         if (slot_idx >= h->resp_slots) return -1;
933              
934 5065           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
935 5065           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
936 5065 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
937 5060 100         if (state != RESP_READY) return 0;
938              
939 2639           uint32_t len = slot->resp_len;
940 2639           *out_utf8 = (slot->resp_flags & 1) != 0;
941              
942 2639 50         if (!reqrep_ensure_copy_buf(h, len + 1)) return -2;
943              
944 2639           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
945 2639 100         if (len > 0) memcpy(h->copy_buf, data, len);
946 2639           h->copy_buf[len] = '\0';
947 2639           *out_str = h->copy_buf;
948 2639           *out_len = len;
949              
950 2639           reqrep_slot_release(h, slot_idx);
951 2639           return 1;
952             }
953              
954             /* Blocking get response. Returns 1=ok, 0=timeout, -1=bad slot, -2=OOM, -4=stale. */
955 2554           static int reqrep_get_wait(ReqRepHandle *h, uint64_t id,
956             const char **out_str, uint32_t *out_len, bool *out_utf8,
957             double timeout) {
958 2554           int r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
959 2554 100         if (r != 0) return r;
960 2420 50         if (timeout == 0) return 0;
961              
962 2420           uint32_t slot_idx = REQREP_ID_SLOT(id);
963 2420           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
964             struct timespec deadline, remaining;
965 2420           int has_deadline = (timeout > 0);
966 2420 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
967              
968 1           for (;;) {
969 2421           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
970 2421 100         if (state == RESP_READY)
971 1           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
972              
973 2420           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
974              
975             /* Re-check: cancel may have fired between try_get and waiter registration */
976 2420 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
977 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
978 0           return -4;
979             }
980              
981 2420           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
982 2420 50         if (state == RESP_READY) {
983 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
984 0           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
985             }
986              
987 2420           struct timespec *pts = NULL;
988 2420 100         if (has_deadline) {
989 1800 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
990 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
991 1           return 0;
992             }
993 1799           pts = &remaining;
994             }
995              
996 2419           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
997 2419           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
998              
999 2419           r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
1000 2419 100         if (r != 0) return r;
1001             }
1002             }
1003              
1004             /* Cancel a pending request — CAS ACQUIRED→FREE only if generation matches.
1005             * If the reply already arrived (READY), cancel is a no-op — call get() to drain. */
1006 218           static void reqrep_cancel(ReqRepHandle *h, uint64_t id) {
1007 218           uint32_t slot_idx = REQREP_ID_SLOT(id);
1008 218           uint32_t expected_gen = REQREP_ID_GEN(id);
1009 218 50         if (slot_idx >= h->resp_slots) return;
1010 218           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1011 218 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
1012 218           uint32_t expected_state = RESP_ACQUIRED;
1013 218 100         if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
1014             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1015 207           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1016 207           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1017             /* Wake get_wait blocked on this slot's state futex */
1018 207 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1019 1           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1020 207           reqrep_wake_slot_waiters(h->hdr);
1021             }
1022             }
1023              
1024             /* Combined send + wait-for-reply with single deadline.
1025             * Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
1026 2548           static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
1027             bool req_utf8, const char **out_str, uint32_t *out_len,
1028             bool *out_utf8, double timeout) {
1029             uint64_t id;
1030             struct timespec deadline;
1031 2548           int has_deadline = (timeout > 0);
1032 2548 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1033              
1034 2548           int r = reqrep_send_wait(h, req_str, req_len, req_utf8, &id, timeout);
1035 2548 50         if (r != 1) return r;
1036              
1037 2548           double get_timeout = timeout;
1038 2548 100         if (has_deadline) {
1039             struct timespec now;
1040 1801           clock_gettime(CLOCK_MONOTONIC, &now);
1041 1801           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1042 1801           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1043 1801 50         if (get_timeout <= 0) {
1044 0           reqrep_cancel(h, id);
1045 0           return 0;
1046             }
1047             }
1048              
1049 2548           r = reqrep_get_wait(h, id, out_str, out_len, out_utf8, get_timeout);
1050 2548 100         if (r != 1) {
1051 1           reqrep_cancel(h, id);
1052             /* If reply arrived between timeout and cancel, drain to free the slot */
1053             const char *discard; uint32_t dlen; bool dutf8;
1054 1           reqrep_try_get(h, id, &discard, &dlen, &dutf8);
1055             }
1056 2548           return r;
1057             }
1058              
1059             /* Count response slots owned by this process */
1060 10           static uint32_t reqrep_pending(ReqRepHandle *h) {
1061 10           uint32_t mypid = (uint32_t)getpid();
1062 10           uint32_t count = 0;
1063 78 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1064 68           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1065 68           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_RELAXED);
1066 68 100         if ((state == RESP_ACQUIRED || state == RESP_READY) &&
    50          
1067 7 50         __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED) == mypid)
1068 7           count++;
1069             }
1070 10           return count;
1071             }
1072              
1073             /* ================================================================
1074             * Queue state
1075             * ================================================================ */
1076              
1077 40           static inline uint64_t reqrep_size(ReqRepHandle *h) {
1078 40           ReqRepHeader *hdr = h->hdr;
1079 40           uint64_t tail = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1080 40           uint64_t head = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1081 40           return tail - head;
1082             }
1083              
1084 1           static void reqrep_clear(ReqRepHandle *h) {
1085 1           ReqRepHeader *hdr = h->hdr;
1086 1           reqrep_mutex_lock(hdr);
1087 1           hdr->req_head = 0;
1088 1           hdr->req_tail = 0;
1089 1           hdr->arena_wpos = 0;
1090 1           hdr->arena_used = 0;
1091 1           reqrep_mutex_unlock(hdr);
1092              
1093             /* Release all in-flight response slots so get_wait callers unblock.
1094             * Retry CAS if reply races us (ACQUIRED→READY between load and CAS). */
1095 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1096 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1097 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1098 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1099 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1100             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1101 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1102 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1103 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1104 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1105 2           break;
1106             }
1107             /* CAS failed — state updated with actual value; retry if still valid */
1108             }
1109             }
1110              
1111 1           reqrep_wake_slot_waiters(hdr);
1112 1           reqrep_wake_producers(hdr);
1113 1           reqrep_wake_consumers(hdr);
1114 1           }
1115              
1116 0           static inline int reqrep_sync(ReqRepHandle *h) {
1117 0           return msync(h->hdr, h->mmap_size, MS_SYNC);
1118             }
1119              
1120             /* ================================================================
1121             * eventfd — event-loop integration
1122             *
1123             * Two separate eventfds:
1124             * notify_fd — request notification (client -> server: "new request")
1125             * reply_fd — reply notification (server -> client: "response ready")
1126             * ================================================================ */
1127              
1128 6           static inline int reqrep_eventfd_create(ReqRepHandle *h) {
1129 6 50         if (h->notify_fd >= 0) return h->notify_fd;
1130 6           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1131 6           return h->notify_fd;
1132             }
1133              
1134 2           static inline void reqrep_eventfd_set(ReqRepHandle *h, int fd) {
1135 2 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    0          
1136 0           close(h->notify_fd);
1137 2           h->notify_fd = fd;
1138 2           }
1139              
1140 3           static inline void reqrep_notify(ReqRepHandle *h) {
1141 3 50         if (h->notify_fd >= 0) {
1142 3           uint64_t one = 1;
1143 3           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1144             }
1145 3           }
1146              
1147 2           static inline void reqrep_eventfd_consume(ReqRepHandle *h) {
1148 2 50         if (h->notify_fd >= 0) {
1149             uint64_t val;
1150 2           ssize_t __attribute__((unused)) rc = read(h->notify_fd, &val, sizeof(val));
1151             }
1152 2           }
1153              
1154 8           static inline int reqrep_reply_eventfd_create(ReqRepHandle *h) {
1155 8 50         if (h->reply_fd >= 0) return h->reply_fd;
1156 8           h->reply_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1157 8           return h->reply_fd;
1158             }
1159              
1160 1           static inline void reqrep_reply_eventfd_set(ReqRepHandle *h, int fd) {
1161 1 50         if (h->reply_fd >= 0 && h->reply_fd != fd)
    0          
1162 0           close(h->reply_fd);
1163 1           h->reply_fd = fd;
1164 1           }
1165              
1166 1           static inline void reqrep_reply_notify(ReqRepHandle *h) {
1167 1 50         if (h->reply_fd >= 0) {
1168 1           uint64_t one = 1;
1169 1           ssize_t __attribute__((unused)) rc = write(h->reply_fd, &one, sizeof(one));
1170             }
1171 1           }
1172              
1173 1           static inline void reqrep_reply_eventfd_consume(ReqRepHandle *h) {
1174 1 50         if (h->reply_fd >= 0) {
1175             uint64_t val;
1176 1           ssize_t __attribute__((unused)) rc = read(h->reply_fd, &val, sizeof(val));
1177             }
1178 1           }
1179              
1180             /* ================================================================
1181             * Int mode: lock-free Vyukov MPMC request queue, inline int64 response
1182             * ================================================================ */
1183              
1184 8           static void reqrep_int_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
1185             uint32_t *out_req_slots_off, uint32_t *out_resp_off,
1186             uint32_t *out_resp_stride, uint64_t *out_total_size) {
1187 8           uint32_t req_slots_off = sizeof(ReqRepHeader);
1188 8           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqIntSlot);
1189 8           uint32_t resp_stride = (sizeof(RespSlotHeader) + sizeof(int64_t) + 63) & ~63u;
1190 8           uint64_t resp_off = (slots_end + 63) & ~(uint64_t)63;
1191 8           *out_req_slots_off = req_slots_off;
1192 8           *out_resp_off = (uint32_t)resp_off;
1193 8           *out_resp_stride = resp_stride;
1194 8           *out_total_size = resp_off + (uint64_t)resp_slots_n * resp_stride;
1195 8           }
1196              
1197 7           static void reqrep_int_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
1198             uint64_t total_size, uint32_t req_slots_off,
1199             uint32_t resp_off, uint32_t resp_stride) {
1200 7           ReqRepHeader *hdr = (ReqRepHeader *)base;
1201 7           memset(hdr, 0, sizeof(ReqRepHeader));
1202 7           hdr->magic = REQREP_MAGIC;
1203 7           hdr->version = REQREP_VERSION;
1204 7           hdr->mode = REQREP_MODE_INT;
1205 7           hdr->req_cap = req_cap;
1206 7           hdr->total_size = total_size;
1207 7           hdr->req_slots_off = req_slots_off;
1208 7           hdr->resp_slots = resp_slots_n;
1209 7           hdr->resp_data_max = sizeof(int64_t);
1210 7           hdr->resp_off = resp_off;
1211 7           hdr->resp_stride = resp_stride;
1212              
1213             /* Vyukov: init sequence numbers */
1214 7           ReqIntSlot *slots = (ReqIntSlot *)((char *)base + req_slots_off);
1215 209 100         for (uint32_t i = 0; i < req_cap; i++)
1216 202           slots[i].sequence = i;
1217              
1218             /* Init response slots */
1219 58 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
1220 51           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
1221 51           memset(rs, 0, sizeof(RespSlotHeader));
1222             }
1223              
1224 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1225 7           }
1226              
1227 7           static ReqRepHandle *reqrep_create_int(const char *path, uint32_t req_cap,
1228             uint32_t resp_slots_n, char *errbuf) {
1229 7 50         if (errbuf) errbuf[0] = '\0';
1230 7           req_cap = reqrep_next_pow2(req_cap);
1231 7 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1232 7 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1233              
1234             uint32_t req_slots_off, resp_off, resp_stride;
1235             uint64_t total_size;
1236 7           reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1237             &resp_off, &resp_stride, &total_size);
1238              
1239 7           int anonymous = (path == NULL);
1240             size_t map_size;
1241             void *base;
1242              
1243 7 50         if (anonymous) {
1244 0           map_size = (size_t)total_size;
1245 0           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
1246             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
1247 0 0         if (base == MAP_FAILED) { REQREP_ERR("mmap(anonymous): %s", strerror(errno)); return NULL; }
    0          
1248             } else {
1249 7           int fd = open(path, O_RDWR | O_CREAT, 0666);
1250 8 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
1251 7 50         if (flock(fd, LOCK_EX) < 0) { REQREP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
1252             struct stat st;
1253 7 50         if (fstat(fd, &st) < 0) { REQREP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1254 7           int is_new = (st.st_size == 0);
1255 7 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
1256 0 0         REQREP_ERR("%s: file too small", path); flock(fd, LOCK_UN); close(fd); return NULL;
1257             }
1258 7 100         if (is_new && ftruncate(fd, (off_t)total_size) < 0) {
    50          
1259 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
1260             }
1261 7 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
1262 7           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1263 7 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1264 7 100         if (!is_new) {
1265 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, REQREP_MODE_INT)) {
1266 0 0         REQREP_ERR("%s: invalid or incompatible", path); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1267             }
1268 1           flock(fd, LOCK_UN); close(fd);
1269 1           return reqrep_setup_handle(base, map_size, path, -1);
1270             }
1271 6           flock(fd, LOCK_UN); close(fd);
1272             }
1273              
1274 6           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1275             req_slots_off, resp_off, resp_stride);
1276 6           return reqrep_setup_handle(base, map_size, path, -1);
1277             }
1278              
1279 1           static ReqRepHandle *reqrep_create_int_memfd(const char *name, uint32_t req_cap,
1280             uint32_t resp_slots_n, char *errbuf) {
1281 1 50         if (errbuf) errbuf[0] = '\0';
1282 1           req_cap = reqrep_next_pow2(req_cap);
1283 1 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1284 1 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1285              
1286             uint32_t req_slots_off, resp_off, resp_stride;
1287             uint64_t total_size;
1288 1           reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1289             &resp_off, &resp_stride, &total_size);
1290              
1291 1 50         int fd = memfd_create(name ? name : "reqrep_int", MFD_CLOEXEC);
1292 1 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1293 1 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1294 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
1295             }
1296 1           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1297 1 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
1298              
1299 1           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1300             req_slots_off, resp_off, resp_stride);
1301 1           return reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
1302             }
1303              
1304             /* --- Int request queue: lock-free Vyukov MPMC --- */
1305              
1306 40           static inline int reqrep_int_try_send(ReqRepHandle *h, int64_t value, uint64_t *out_id) {
1307 40           int32_t rslot = reqrep_slot_acquire(h);
1308 40 100         if (rslot < 0) return -3;
1309              
1310 39           RespSlotHeader *rs = reqrep_resp_slot(h, (uint32_t)rslot);
1311 39           uint32_t gen = __atomic_load_n(&rs->generation, __ATOMIC_ACQUIRE);
1312              
1313 39           ReqRepHeader *hdr = h->hdr;
1314 39           ReqIntSlot *slots = (ReqIntSlot *)((char *)h->hdr + hdr->req_slots_off);
1315 39           uint32_t mask = h->req_cap_mask;
1316 39           uint64_t pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1317              
1318 0           for (;;) {
1319 39           ReqIntSlot *slot = &slots[pos & mask];
1320 39           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1321 39           int64_t diff = (int64_t)seq - (int64_t)pos;
1322 39 100         if (diff == 0) {
1323 38 50         if (__atomic_compare_exchange_n(&hdr->req_tail, &pos, pos + 1,
1324             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1325 38           slot->value = value;
1326 38           slot->resp_slot = (uint32_t)rslot;
1327 38           slot->resp_gen = gen;
1328 38           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
1329 38           __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
1330 38           reqrep_wake_consumers(hdr);
1331 38           *out_id = REQREP_MAKE_ID((uint32_t)rslot, gen);
1332 38           return 1;
1333             }
1334 1 50         } else if (diff < 0) {
1335 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
1336 1           reqrep_slot_release(h, (uint32_t)rslot);
1337 1           return 0;
1338             } else {
1339 0           pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1340             }
1341             }
1342             }
1343              
1344 8           static int reqrep_int_send_wait(ReqRepHandle *h, int64_t value,
1345             uint64_t *out_id, double timeout) {
1346 8           int r = reqrep_int_try_send(h, value, out_id);
1347 8 50         if (r == 1) return 1;
1348 0 0         if (timeout == 0) return r;
1349 0           ReqRepHeader *hdr = h->hdr;
1350             struct timespec deadline, remaining;
1351 0           int has_deadline = (timeout > 0);
1352 0 0         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1353 0           for (;;) {
1354             /* Wait on slot_futex if no slots (-3), send_futex if queue full (0) */
1355 0 0         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
1356 0 0         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
1357              
1358 0           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
1359 0           r = reqrep_int_try_send(h, value, out_id);
1360 0 0         if (r == 1) return 1;
1361              
1362 0           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1363 0           struct timespec *pts = NULL;
1364 0 0         if (has_deadline) {
1365 0 0         if (!reqrep_remaining_time(&deadline, &remaining)) {
1366 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1367 0           return r;
1368             }
1369 0           pts = &remaining;
1370             }
1371 0           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
1372 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1373 0           r = reqrep_int_try_send(h, value, out_id);
1374 0 0         if (r == 1) return 1;
1375 0 0         if (rc == -1 && errno == ETIMEDOUT) return r;
    0          
1376             }
1377             }
1378              
1379 46           static inline int reqrep_int_try_recv(ReqRepHandle *h, int64_t *out_value, uint64_t *out_id) {
1380 46           ReqRepHeader *hdr = h->hdr;
1381 46           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1382 46           uint32_t mask = h->req_cap_mask;
1383 46           uint64_t pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1384 0           for (;;) {
1385 46           ReqIntSlot *slot = &slots[pos & mask];
1386 46           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1387 46           int64_t diff = (int64_t)seq - (int64_t)(pos + 1);
1388 46 100         if (diff == 0) {
1389 36 50         if (__atomic_compare_exchange_n(&hdr->req_head, &pos, pos + 1,
1390             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1391 36           *out_value = slot->value;
1392 36           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
1393 36           __atomic_store_n(&slot->sequence, pos + h->req_cap, __ATOMIC_RELEASE);
1394 36           reqrep_wake_producers(hdr);
1395 36           return 1;
1396             }
1397 10 50         } else if (diff < 0) {
1398 10           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
1399 10           return 0;
1400             } else {
1401 0           pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1402             }
1403             }
1404             }
1405              
1406 7           static int reqrep_int_recv_wait(ReqRepHandle *h, int64_t *out_value,
1407             uint64_t *out_id, double timeout) {
1408 7 100         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1409 4 50         if (timeout == 0) return 0;
1410 4           ReqRepHeader *hdr = h->hdr;
1411             struct timespec deadline, remaining;
1412 4           int has_deadline = (timeout > 0);
1413 4 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1414 0           for (;;) {
1415 4           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
1416 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1417 4           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1418 4           struct timespec *pts = NULL;
1419 4 50         if (has_deadline) {
1420 4 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
1421 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1422 0           return 0;
1423             }
1424 4           pts = &remaining;
1425             }
1426 4           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
1427 4           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1428 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1429 0 0         if (rc == -1 && errno == ETIMEDOUT) return 0;
    0          
1430             }
1431             }
1432              
1433             /* --- Int response: inline int64 in resp data area --- */
1434              
1435 35           static int reqrep_int_reply(ReqRepHandle *h, uint64_t id, int64_t value) {
1436 35           uint32_t slot_idx = REQREP_ID_SLOT(id);
1437 35           uint32_t expected_gen = REQREP_ID_GEN(id);
1438 35 50         if (slot_idx >= h->resp_slots) return -1;
1439              
1440 35           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1441 35           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1442 35 100         if (state != RESP_ACQUIRED) return -2;
1443 33 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
1444              
1445 32           *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader)) = value;
1446              
1447 32           uint32_t expected_state = RESP_ACQUIRED;
1448 32 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
1449             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
1450 0           return -2;
1451              
1452 32 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1453 5           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
1454 32           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
1455 32           return 1;
1456             }
1457              
1458 40           static int reqrep_int_try_get(ReqRepHandle *h, uint64_t id, int64_t *out_value) {
1459 40           uint32_t slot_idx = REQREP_ID_SLOT(id);
1460 40           uint32_t expected_gen = REQREP_ID_GEN(id);
1461 40 50         if (slot_idx >= h->resp_slots) return -1;
1462              
1463 40           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1464 40           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1465 40 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
1466 39 100         if (state != RESP_READY) return 0;
1467              
1468 32           *out_value = *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader));
1469 32           reqrep_slot_release(h, slot_idx);
1470 32           return 1;
1471             }
1472              
1473 8           static int reqrep_int_get_wait(ReqRepHandle *h, uint64_t id, int64_t *out_value,
1474             double timeout) {
1475 8           int r = reqrep_int_try_get(h, id, out_value);
1476 8 100         if (r != 0) return r;
1477 6 50         if (timeout == 0) return 0;
1478 6           uint32_t slot_idx = REQREP_ID_SLOT(id);
1479 6           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1480             struct timespec deadline, remaining;
1481 6           int has_deadline = (timeout > 0);
1482 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1483 1           for (;;) {
1484 7           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1485 7 50         if (state == RESP_READY)
1486 0           return reqrep_int_try_get(h, id, out_value);
1487 7           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1488 7 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
1489 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1490 0           return -4;
1491             }
1492 7           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1493 7 50         if (state == RESP_READY) {
1494 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1495 0           return reqrep_int_try_get(h, id, out_value);
1496             }
1497 7           struct timespec *pts = NULL;
1498 7 100         if (has_deadline) {
1499 2 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
1500 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1501 1           return 0;
1502             }
1503 1           pts = &remaining;
1504             }
1505 6           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
1506 6           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1507 6           r = reqrep_int_try_get(h, id, out_value);
1508 6 100         if (r != 0) return r;
1509             }
1510             }
1511              
1512 6           static int reqrep_int_request(ReqRepHandle *h, int64_t req_value, int64_t *out_value,
1513             double timeout) {
1514             uint64_t id;
1515             struct timespec deadline;
1516 6           int has_deadline = (timeout > 0);
1517 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1518 6           int r = reqrep_int_send_wait(h, req_value, &id, timeout);
1519 6 50         if (r != 1) return r;
1520 6           double get_timeout = timeout;
1521 6 100         if (has_deadline) {
1522             struct timespec now;
1523 1           clock_gettime(CLOCK_MONOTONIC, &now);
1524 1           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1525 1           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1526 1 50         if (get_timeout <= 0) { reqrep_cancel(h, id); return 0; }
1527             }
1528 6           r = reqrep_int_get_wait(h, id, out_value, get_timeout);
1529 6 100         if (r != 1) {
1530 1           reqrep_cancel(h, id);
1531             int64_t discard;
1532 1           reqrep_int_try_get(h, id, &discard);
1533             }
1534 6           return r;
1535             }
1536              
1537 6           static inline uint64_t reqrep_int_size(ReqRepHandle *h) {
1538 6           uint64_t tail = __atomic_load_n(&h->hdr->req_tail, __ATOMIC_RELAXED);
1539 6           uint64_t head = __atomic_load_n(&h->hdr->req_head, __ATOMIC_RELAXED);
1540 6           return tail - head;
1541             }
1542              
1543 1           static void reqrep_int_clear(ReqRepHandle *h) {
1544 1           ReqRepHeader *hdr = h->hdr;
1545              
1546             /* Reset Vyukov queue: head/tail to 0, reinit sequence numbers.
1547             * Use atomic stores — concurrent readers use atomic loads on these. */
1548 1           __atomic_store_n(&hdr->req_head, 0, __ATOMIC_RELAXED);
1549 1           __atomic_store_n(&hdr->req_tail, 0, __ATOMIC_RELAXED);
1550 1           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1551 17 100         for (uint32_t i = 0; i < h->req_cap; i++)
1552 16           __atomic_store_n(&slots[i].sequence, (uint64_t)i, __ATOMIC_RELAXED);
1553 1           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1554              
1555             /* Release all in-flight response slots (same as Str clear) */
1556 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1557 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1558 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1559 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1560 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1561             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1562 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1563 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1564 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1565 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1566 2           break;
1567             }
1568             }
1569             }
1570              
1571 1           reqrep_wake_slot_waiters(hdr);
1572 1           reqrep_wake_producers(hdr);
1573 1           reqrep_wake_consumers(hdr);
1574 1           }
1575              
1576             #endif /* REQREP_H */