File Coverage

reqrep.h
Criterion Covered Total %
statement 774 919 84.2
branch 346 654 52.9
condition n/a
subroutine n/a
pod n/a
total 1120 1573 71.2


line stmt bran cond sub pod time code
1             /*
2             * reqrep.h -- Shared-memory request/response IPC for Linux
3             *
4             * Request queue (client -> server) with circular arena for variable-length
5             * request data, plus per-request response slots for targeted reply delivery.
6             *
7             * Uses file-backed mmap(MAP_SHARED) for cross-process sharing,
8             * futex for blocking wait, and PID-based stale lock recovery.
9             */
10              
11             #ifndef REQREP_H
12             #define REQREP_H
13              
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30              
31             /* ================================================================
32             * Constants
33             * ================================================================ */
34              
35             #define REQREP_MAGIC 0x52525331U /* "RRS1" */
36             #define REQREP_VERSION 1
37             #define REQREP_ERR_BUFLEN 256
38             #define REQREP_SPIN_LIMIT 32
39             #define REQREP_LOCK_TIMEOUT_SEC 2
40              
41             #define REQREP_UTF8_FLAG 0x80000000U
42             #define REQREP_STR_LEN_MASK 0x7FFFFFFFU
43              
44             #define RESP_FREE 0
45             #define RESP_ACQUIRED 1
46             #define RESP_READY 2
47              
48             #define REQREP_MODE_STR 0
49             #define REQREP_MODE_INT 1
50              
51             /* Pack/unpack slot index + generation into a 64-bit request ID.
52             * Prevents ABA: cancelled slot re-acquired by another client
53             * won't match the generation stored in the original request. */
54             #define REQREP_MAKE_ID(slot, gen) (((uint64_t)(gen) << 32) | (uint64_t)(slot))
55             #define REQREP_ID_SLOT(id) ((uint32_t)((id) & 0xFFFFFFFFULL))
56             #define REQREP_ID_GEN(id) ((uint32_t)((id) >> 32))
57              
58             /* ================================================================
59             * Header (256 bytes = 4 cache lines, lives at start of mmap)
60             * ================================================================ */
61              
62             typedef struct {
63             /* ---- Cache line 0 (0-63): immutable after create ---- */
64             uint32_t magic; /* 0 */
65             uint32_t version; /* 4 */
66             uint32_t mode; /* 8: REQREP_MODE_STR or REQREP_MODE_INT */
67             uint32_t req_cap; /* 12: request queue capacity (power of 2) */
68             uint64_t total_size; /* 16: mmap size */
69             uint32_t req_slots_off; /* 24: offset to request slot array */
70             uint32_t req_arena_off; /* 28: offset to request arena */
71             uint32_t req_arena_cap; /* 32: arena byte capacity */
72             uint32_t resp_slots; /* 36: number of response slots */
73             uint32_t resp_data_max; /* 40: max response data bytes per slot */
74             uint32_t resp_off; /* 44: offset to response slot area */
75             uint32_t resp_stride; /* 48: bytes per response slot (cache-aligned) */
76             uint8_t _pad0[12]; /* 52-63 */
77              
78             /* ---- Cache line 1 (64-127): recv hot (server) ---- */
79             uint64_t req_head; /* 64: consumer position */
80             uint32_t recv_waiters; /* 72: blocked servers */
81             uint32_t recv_futex; /* 76: futex for recv wakeup */
82             uint8_t _pad1[48]; /* 80-127 */
83              
84             /* ---- Cache line 2 (128-191): send hot (client) ---- */
85             uint64_t req_tail; /* 128: producer position */
86             uint32_t send_waiters; /* 136: blocked clients */
87             uint32_t send_futex; /* 140: futex for send wakeup */
88             uint8_t _pad2[48]; /* 144-191 */
89              
90             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
91             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
92             uint32_t mutex_waiters; /* 196 */
93             uint32_t arena_wpos; /* 200: arena write position */
94             uint32_t arena_used; /* 204: arena bytes consumed */
95             uint32_t resp_hint; /* 208: hint for slot scan */
96             uint32_t stat_recoveries; /* 212 */
97             uint32_t slot_futex; /* 216: futex for slot availability */
98             uint32_t slot_waiters; /* 220: threads waiting for free slot */
99             uint64_t stat_requests; /* 224 */
100             uint64_t stat_replies; /* 232 */
101             uint64_t stat_send_full; /* 240 */
102             uint32_t stat_recv_empty; /* 248 */
103             uint32_t _pad3; /* 252-255 */
104             } ReqRepHeader;
105              
106             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
107             _Static_assert(sizeof(ReqRepHeader) == 256, "ReqRepHeader must be 256 bytes");
108             #endif
109              
110             /* ================================================================
111             * Slot types
112             * ================================================================ */
113              
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on recv */
118             uint32_t resp_slot; /* response slot index */
119             uint32_t resp_gen; /* generation at time of slot acquire (ABA guard) */
120             uint32_t _rpad;
121             } ReqSlot; /* 24 bytes (Str mode) */
122              
123             /* Int request slot: Vyukov sequence + value + response routing */
124             typedef struct {
125             uint64_t sequence;
126             int64_t value;
127             uint32_t resp_slot;
128             uint32_t resp_gen;
129             } ReqIntSlot; /* 24 bytes (Int mode, lock-free) */
130              
131             typedef struct {
132             uint32_t state; /* futex: RESP_FREE=0, RESP_ACQUIRED=1, RESP_READY=2 */
133             uint32_t waiters; /* futex waiters on this slot */
134             uint32_t owner_pid; /* PID of client that acquired (for stale recovery) */
135             uint32_t resp_len; /* response data length */
136             uint32_t resp_flags; /* bit 0 = UTF-8 */
137             uint32_t generation; /* incremented on each acquire (ABA guard) */
138             uint32_t _rpad[2]; /* pad to 32 bytes */
139             } RespSlotHeader; /* 32 bytes + data */
140              
141             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
142             _Static_assert(sizeof(RespSlotHeader) == 32, "RespSlotHeader must be 32 bytes");
143             #endif
144              
145             /* ================================================================
146             * Process-local handle
147             * ================================================================ */
148              
149             typedef struct {
150             ReqRepHeader *hdr;
151             ReqSlot *req_slots;
152             char *req_arena;
153             uint8_t *resp_area; /* base of response slots region */
154             size_t mmap_size;
155             uint32_t req_cap;
156             uint32_t req_cap_mask; /* req_cap - 1 */
157             uint32_t req_arena_cap;
158             uint32_t resp_slots;
159             uint32_t resp_data_max;
160             uint32_t resp_stride;
161             char *copy_buf;
162             uint32_t copy_buf_cap;
163             char *path;
164             int notify_fd; /* request notification eventfd, -1 if unset */
165             int reply_fd; /* reply notification eventfd, -1 if unset */
166             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
167             } ReqRepHandle;
168              
169             /* ================================================================
170             * Utility
171             * ================================================================ */
172              
173 59           static inline uint32_t reqrep_next_pow2(uint32_t v) {
174 59 50         if (v < 2) return 2;
175 59 50         if (v > 0x80000000U) return 0;
176 59           v--;
177 59           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
178 59           return v + 1;
179             }
180              
181 317           static inline void reqrep_spin_pause(void) {
182             #if defined(__x86_64__) || defined(__i386__)
183 317           __asm__ volatile("pause" ::: "memory");
184             #elif defined(__aarch64__)
185             __asm__ volatile("yield" ::: "memory");
186             #else
187             __asm__ volatile("" ::: "memory");
188             #endif
189 317           }
190              
191 4891           static inline int reqrep_ensure_copy_buf(ReqRepHandle *h, uint32_t needed) {
192 4891 100         if (needed <= h->copy_buf_cap) return 1;
193 65 50         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
194 72 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
195 65           char *nb = (char *)realloc(h->copy_buf, ns);
196 65 50         if (!nb) return 0;
197 65           h->copy_buf = nb;
198 65           h->copy_buf_cap = ns;
199 65           return 1;
200             }
201              
202 18255           static inline RespSlotHeader *reqrep_resp_slot(ReqRepHandle *h, uint32_t idx) {
203 18255           return (RespSlotHeader *)(h->resp_area + (uint64_t)idx * h->resp_stride);
204             }
205              
206             /* ================================================================
207             * Futex helpers
208             * ================================================================ */
209              
210             #define REQREP_MUTEX_WRITER_BIT 0x80000000U
211             #define REQREP_MUTEX_PID_MASK 0x7FFFFFFFU
212             #define REQREP_MUTEX_VAL(pid) (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))
213              
214 4           static inline int reqrep_pid_alive(uint32_t pid) {
215 4 50         if (pid == 0) return 1;
216 4 50         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
217             }
218              
219             static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };
220              
221 0           static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
222 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
223             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
224 0           return;
225 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
226 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
227 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
228             }
229              
230 6592           static inline void reqrep_mutex_lock(ReqRepHeader *hdr) {
231 6592           uint32_t mypid = REQREP_MUTEX_VAL((uint32_t)getpid());
232 6915           for (int spin = 0; ; spin++) {
233 6915           uint32_t expected = 0;
234 6915 100         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
235             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
236 6592           return;
237 323 100         if (__builtin_expect(spin < REQREP_SPIN_LIMIT, 1)) {
238 317           reqrep_spin_pause();
239 317           continue;
240             }
241 6           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
242 6           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
243 6 50         if (cur != 0) {
244 6           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
245             &reqrep_lock_timeout, NULL, 0);
246 6 100         if (rc == -1 && errno == ETIMEDOUT) {
    50          
247 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
248 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
249 0 0         if (val >= REQREP_MUTEX_WRITER_BIT) {
250 0           uint32_t pid = val & REQREP_MUTEX_PID_MASK;
251 0 0         if (!reqrep_pid_alive(pid))
252 0           reqrep_recover_stale_mutex(hdr, val);
253             }
254 0           spin = 0;
255 0           continue;
256             }
257             }
258 6           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
259 6           spin = 0;
260             }
261             }
262              
263 6592           static inline void reqrep_mutex_unlock(ReqRepHeader *hdr) {
264 6592           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
265 6592 100         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
266 8           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
267 6592           }
268              
269 2898           static inline void reqrep_wake_consumers(ReqRepHeader *hdr) {
270 2898 100         if (__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED) > 0) {
271 1852           __atomic_add_fetch(&hdr->recv_futex, 1, __ATOMIC_RELEASE);
272 1852           syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
273             }
274 2898           }
275              
276 2277           static inline void reqrep_wake_producers(ReqRepHeader *hdr) {
277 2277 50         if (__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED) > 0) {
278 0           __atomic_add_fetch(&hdr->send_futex, 1, __ATOMIC_RELEASE);
279 0           syscall(SYS_futex, &hdr->send_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
280             }
281 2277           }
282              
283 2886           static inline void reqrep_wake_slot_waiters(ReqRepHeader *hdr) {
284 2886 50         if (__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED) > 0) {
285 0           __atomic_add_fetch(&hdr->slot_futex, 1, __ATOMIC_RELEASE);
286 0           syscall(SYS_futex, &hdr->slot_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
287             }
288 2886           }
289              
290 2493           static inline int reqrep_remaining_time(const struct timespec *deadline,
291             struct timespec *remaining) {
292             struct timespec now;
293 2493           clock_gettime(CLOCK_MONOTONIC, &now);
294 2493           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
295 2493           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
296 2493 100         if (remaining->tv_nsec < 0) {
297 2486           remaining->tv_sec--;
298 2486           remaining->tv_nsec += 1000000000L;
299             }
300 2493           return remaining->tv_sec >= 0;
301             }
302              
303 4321           static inline void reqrep_make_deadline(double timeout, struct timespec *deadline) {
304 4321           clock_gettime(CLOCK_MONOTONIC, deadline);
305 4321           deadline->tv_sec += (time_t)timeout;
306 4321           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
307 4321 100         if (deadline->tv_nsec >= 1000000000L) {
308 1758           deadline->tv_sec++;
309 1758           deadline->tv_nsec -= 1000000000L;
310             }
311 4321           }
312              
313             /* ================================================================
314             * Response slot operations
315             * ================================================================ */
316              
317 2904           static int32_t reqrep_slot_acquire(ReqRepHandle *h) {
318 2904           uint32_t n = h->resp_slots;
319 2904           uint32_t hint = __atomic_load_n(&h->hdr->resp_hint, __ATOMIC_RELAXED);
320 2904           uint32_t mypid = (uint32_t)getpid();
321              
322 3260 100         for (uint32_t i = 0; i < n; i++) {
323 3258           uint32_t idx = (hint + i) % n;
324 3258           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
325 3258           uint32_t expected = RESP_FREE;
326 3258 100         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
327             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
328 2902           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
329 2902           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
330 2902           __atomic_store_n(&h->hdr->resp_hint, (idx + 1) % n, __ATOMIC_RELAXED);
331 2902           return (int32_t)idx;
332             }
333             }
334              
335             /* Recover stale slots from dead processes (both ACQUIRED and READY) */
336 6 100         for (uint32_t i = 0; i < n; i++) {
337 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
338 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
339 4 50         if (state == RESP_ACQUIRED || state == RESP_READY) {
    0          
340 4           uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED);
341 4 50         if (pid && !reqrep_pid_alive(pid)) {
    50          
342 0 0         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
343             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
344 0           __atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
345 0           uint32_t expected = RESP_FREE;
346 0 0         if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
347             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
348 0           __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
349 0           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
350 0           return (int32_t)i;
351             }
352 0           reqrep_wake_slot_waiters(h->hdr);
353             }
354             }
355             }
356             }
357              
358 2           return -1;
359             }
360              
361 2677           static inline void reqrep_slot_release(ReqRepHandle *h, uint32_t idx) {
362 2677           RespSlotHeader *slot = reqrep_resp_slot(h, idx);
363 2677           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
364 2677           __atomic_store_n(&slot->state, RESP_FREE, __ATOMIC_RELEASE);
365 2677           reqrep_wake_slot_waiters(h->hdr);
366 2677           }
367              
368             /* ================================================================
369             * Create / Open / Close
370             * ================================================================ */
371              
372             #define REQREP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, REQREP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
373              
374 112           static ReqRepHandle *reqrep_setup_handle(void *base, size_t map_size,
375             const char *path, int backing_fd) {
376 112           ReqRepHeader *hdr = (ReqRepHeader *)base;
377 112           ReqRepHandle *h = (ReqRepHandle *)calloc(1, sizeof(ReqRepHandle));
378 112 50         if (!h) return NULL;
379              
380 112           h->hdr = hdr;
381 112           h->req_slots = (ReqSlot *)((char *)base + hdr->req_slots_off);
382 112           h->req_arena = (char *)base + hdr->req_arena_off;
383 112           h->resp_area = (uint8_t *)base + hdr->resp_off;
384 112           h->mmap_size = map_size;
385 112           h->req_cap = hdr->req_cap;
386 112           h->req_cap_mask = hdr->req_cap - 1;
387 112           h->req_arena_cap = hdr->req_arena_cap;
388 112           h->resp_slots = hdr->resp_slots;
389 112           h->resp_data_max = hdr->resp_data_max;
390 112           h->resp_stride = hdr->resp_stride;
391 112 100         h->path = path ? strdup(path) : NULL;
392 112           h->notify_fd = -1;
393 112           h->reply_fd = -1;
394 112           h->backing_fd = backing_fd;
395              
396 112           return h;
397             }
398              
399 55           static int reqrep_validate_header(ReqRepHeader *hdr, size_t file_size, uint32_t expected_mode) {
400 55 50         if (hdr->magic != REQREP_MAGIC) return 0;
401 55 50         if (hdr->version != REQREP_VERSION) return 0;
402 55 50         if (hdr->mode != expected_mode) return 0;
403 55 50         if (hdr->req_cap == 0 || (hdr->req_cap & (hdr->req_cap - 1)) != 0) return 0;
    50          
404 55 50         if (hdr->total_size != (uint64_t)file_size) return 0;
405 55 50         if (hdr->req_slots_off != sizeof(ReqRepHeader)) return 0;
406 55 50         if (hdr->resp_slots == 0) return 0;
407 55 50         if (hdr->resp_stride < sizeof(RespSlotHeader)) return 0;
408             /* Compute end of req slots area; req_arena and resp must come after it. */
409 55           uint64_t req_slot_size = (expected_mode == REQREP_MODE_STR)
410             ? sizeof(ReqSlot) : sizeof(ReqIntSlot);
411 55           uint64_t req_slots_end = (uint64_t)hdr->req_slots_off
412 55           + (uint64_t)hdr->req_cap * req_slot_size;
413 55 50         if (req_slots_end > hdr->total_size) return 0;
414 55 100         if (expected_mode == REQREP_MODE_STR) {
415 45 50         if (hdr->req_arena_off < req_slots_end) return 0;
416 45 50         if ((uint64_t)hdr->req_arena_off + hdr->req_arena_cap > hdr->total_size) return 0;
417             /* resp must not overlap arena */
418 45 50         if (hdr->resp_off < (uint64_t)hdr->req_arena_off + hdr->req_arena_cap) return 0;
419             }
420 55 50         if (hdr->resp_off < req_slots_end) return 0;
421 55 50         if ((uint64_t)hdr->resp_off + (uint64_t)hdr->resp_slots * hdr->resp_stride > hdr->total_size) return 0;
422 55           return 1;
423             }
424              
425 50           static void reqrep_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
426             uint32_t resp_data_max, uint64_t total_size,
427             uint32_t req_slots_off, uint32_t req_arena_off,
428             uint32_t req_arena_cap, uint32_t resp_off,
429             uint32_t resp_stride) {
430 50           ReqRepHeader *hdr = (ReqRepHeader *)base;
431 50           memset(hdr, 0, sizeof(ReqRepHeader));
432 50           hdr->magic = REQREP_MAGIC;
433 50           hdr->version = REQREP_VERSION;
434 50           hdr->mode = REQREP_MODE_STR;
435 50           hdr->req_cap = req_cap;
436 50           hdr->total_size = total_size;
437 50           hdr->req_slots_off = req_slots_off;
438 50           hdr->req_arena_off = req_arena_off;
439 50           hdr->req_arena_cap = req_arena_cap;
440 50           hdr->resp_slots = resp_slots_n;
441 50           hdr->resp_data_max = resp_data_max;
442 50           hdr->resp_off = resp_off;
443 50           hdr->resp_stride = resp_stride;
444              
445 948 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
446 898           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
447 898           memset(rs, 0, sizeof(RespSlotHeader));
448             }
449              
450 50           __atomic_thread_fence(__ATOMIC_SEQ_CST);
451 50           }
452              
453             /* Returns 0 on success, -1 on overflow (offsets would exceed UINT32_MAX). */
454 51           static int reqrep_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
455             uint32_t resp_data_max, uint64_t arena_hint,
456             uint32_t *out_req_slots_off, uint32_t *out_req_arena_off,
457             uint32_t *out_req_arena_cap, uint32_t *out_resp_off,
458             uint32_t *out_resp_stride, uint64_t *out_total_size) {
459 51           uint32_t req_slots_off = sizeof(ReqRepHeader);
460 51           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqSlot);
461 51           uint64_t req_arena_off_64 = (slots_end + 7) & ~(uint64_t)7;
462 51 50         if (req_arena_off_64 > UINT32_MAX) return -1;
463 51           uint32_t req_arena_off = (uint32_t)req_arena_off_64;
464              
465 51 50         if (arena_hint > UINT32_MAX) return -1;
466 51           uint32_t req_arena_cap = (uint32_t)arena_hint;
467 51 100         if (req_arena_cap < 4096) req_arena_cap = 4096;
468              
469 51           uint32_t resp_stride = (sizeof(RespSlotHeader) + resp_data_max + 63) & ~63u;
470 51           uint64_t resp_off_64 = ((uint64_t)req_arena_off + req_arena_cap + 63) & ~(uint64_t)63;
471 51 50         if (resp_off_64 > UINT32_MAX) return -1;
472 51           uint64_t total_size = resp_off_64 + (uint64_t)resp_slots_n * resp_stride;
473              
474 51           *out_req_slots_off = req_slots_off;
475 51           *out_req_arena_off = req_arena_off;
476 51           *out_req_arena_cap = req_arena_cap;
477 51           *out_resp_off = (uint32_t)resp_off_64;
478 51           *out_resp_stride = resp_stride;
479 51           *out_total_size = total_size;
480 51           return 0;
481             }
482              
483 48           static ReqRepHandle *reqrep_create(const char *path, uint32_t req_cap,
484             uint32_t resp_slots_n, uint32_t resp_data_max,
485             uint64_t arena_hint, char *errbuf) {
486 48 50         if (errbuf) errbuf[0] = '\0';
487              
488 48           req_cap = reqrep_next_pow2(req_cap);
489 48 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
490 48 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
491              
492 48 100         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
493              
494             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
495             uint64_t total_size;
496 48 50         if (reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
497             &req_slots_off, &req_arena_off, &req_arena_cap,
498             &resp_off, &resp_stride, &total_size) < 0) {
499 0 0         REQREP_ERR("layout overflow: req_cap/arena_hint too large for uint32 offsets");
500 0           return NULL;
501             }
502              
503 48           int anonymous = (path == NULL);
504             size_t map_size;
505             void *base;
506              
507 48 100         if (anonymous) {
508 1           map_size = (size_t)total_size;
509 1           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
510             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
511 1 50         if (base == MAP_FAILED) {
512 0 0         REQREP_ERR("mmap(anonymous): %s", strerror(errno));
513 0           return NULL;
514             }
515 1           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
516             req_slots_off, req_arena_off, req_arena_cap,
517             resp_off, resp_stride);
518             } else {
519 47           int fd = open(path, O_RDWR | O_CREAT, 0666);
520 48 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
521              
522 47 50         if (flock(fd, LOCK_EX) < 0) {
523 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
524 0           close(fd); return NULL;
525             }
526              
527             struct stat st;
528 47 50         if (fstat(fd, &st) < 0) {
529 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
530 0           flock(fd, LOCK_UN); close(fd); return NULL;
531             }
532              
533 47           int is_new = (st.st_size == 0);
534              
535 47 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
536 0 0         REQREP_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
537 0           flock(fd, LOCK_UN); close(fd); return NULL;
538             }
539              
540 47 100         if (is_new) {
541 46 50         if (ftruncate(fd, (off_t)total_size) < 0) {
542 0 0         REQREP_ERR("ftruncate(%s): %s", path, strerror(errno));
543 0           flock(fd, LOCK_UN); close(fd); return NULL;
544             }
545             }
546              
547 47 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
548 47           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
549 47 50         if (base == MAP_FAILED) {
550 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
551 0           flock(fd, LOCK_UN); close(fd); return NULL;
552             }
553              
554 47 100         if (!is_new) {
555 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, (size_t)st.st_size, REQREP_MODE_STR)) {
556 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
557 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
558             }
559 1           flock(fd, LOCK_UN);
560 1           close(fd);
561 1           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
562 1 50         if (!h) { munmap(base, map_size); return NULL; }
563 1           return h;
564             }
565              
566 46           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
567             req_slots_off, req_arena_off, req_arena_cap,
568             resp_off, resp_stride);
569 46           flock(fd, LOCK_UN);
570 46           close(fd);
571             }
572              
573 47           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
574 47 50         if (!h) { munmap(base, map_size); return NULL; }
575 47           return h;
576             }
577              
578 49           static ReqRepHandle *reqrep_open(const char *path, uint32_t mode, char *errbuf) {
579 49 50         if (errbuf) errbuf[0] = '\0';
580 49 50         if (!path) { REQREP_ERR("path required"); return NULL; }
    0          
581              
582 49           int fd = open(path, O_RDWR);
583 49 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
584              
585 49 50         if (flock(fd, LOCK_EX) < 0) {
586 0 0         REQREP_ERR("flock(%s): %s", path, strerror(errno));
587 0           close(fd); return NULL;
588             }
589              
590             struct stat st;
591 49 50         if (fstat(fd, &st) < 0) {
592 0 0         REQREP_ERR("fstat(%s): %s", path, strerror(errno));
593 0           flock(fd, LOCK_UN); close(fd); return NULL;
594             }
595              
596 49 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
597 0 0         REQREP_ERR("%s: file too small or not initialized", path);
598 0           flock(fd, LOCK_UN); close(fd); return NULL;
599             }
600              
601 49           size_t map_size = (size_t)st.st_size;
602 49           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
603 49 50         if (base == MAP_FAILED) {
604 0 0         REQREP_ERR("mmap(%s): %s", path, strerror(errno));
605 0           flock(fd, LOCK_UN); close(fd); return NULL;
606             }
607              
608 49 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
609 0 0         REQREP_ERR("%s: invalid or incompatible reqrep file", path);
610 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
611             }
612              
613 49           flock(fd, LOCK_UN);
614 49           close(fd);
615              
616 49           ReqRepHandle *h = reqrep_setup_handle(base, map_size, path, -1);
617 49 50         if (!h) { munmap(base, map_size); return NULL; }
618 49           return h;
619             }
620              
621 3           static ReqRepHandle *reqrep_create_memfd(const char *name, uint32_t req_cap,
622             uint32_t resp_slots_n, uint32_t resp_data_max,
623             uint64_t arena_hint, char *errbuf) {
624 3 50         if (errbuf) errbuf[0] = '\0';
625              
626 3           req_cap = reqrep_next_pow2(req_cap);
627 3 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
628 3 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
629              
630 3 50         if (arena_hint == 0) arena_hint = (uint64_t)req_cap * 256;
631              
632             uint32_t req_slots_off, req_arena_off, req_arena_cap, resp_off, resp_stride;
633             uint64_t total_size;
634 3 50         if (reqrep_compute_layout(req_cap, resp_slots_n, resp_data_max, arena_hint,
635             &req_slots_off, &req_arena_off, &req_arena_cap,
636             &resp_off, &resp_stride, &total_size) < 0) {
637 0 0         REQREP_ERR("layout overflow: req_cap/arena_hint too large for uint32 offsets");
638 0           return NULL;
639             }
640              
641 3 50         int fd = memfd_create(name ? name : "reqrep", MFD_CLOEXEC | MFD_ALLOW_SEALING);
642 3 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
643              
644 3 50         if (ftruncate(fd, (off_t)total_size) < 0) {
645 0 0         REQREP_ERR("ftruncate(memfd): %s", strerror(errno));
646 0           close(fd); return NULL;
647             }
648 3           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
649              
650 3           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
651 3 50         if (base == MAP_FAILED) {
652 0 0         REQREP_ERR("mmap(memfd): %s", strerror(errno));
653 0           close(fd); return NULL;
654             }
655              
656 3           reqrep_init_header(base, req_cap, resp_slots_n, resp_data_max, total_size,
657             req_slots_off, req_arena_off, req_arena_cap,
658             resp_off, resp_stride);
659              
660 3           ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
661 3 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
662 3           return h;
663             }
664              
665 4           static ReqRepHandle *reqrep_open_fd(int fd, uint32_t mode, char *errbuf) {
666 4 50         if (errbuf) errbuf[0] = '\0';
667              
668             struct stat st;
669 4 50         if (fstat(fd, &st) < 0) {
670 0 0         REQREP_ERR("fstat(fd=%d): %s", fd, strerror(errno));
671 0           return NULL;
672             }
673              
674 4 50         if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
675 0 0         REQREP_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
676 0           return NULL;
677             }
678              
679 4           size_t map_size = (size_t)st.st_size;
680 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
681 4 50         if (base == MAP_FAILED) {
682 0 0         REQREP_ERR("mmap(fd=%d): %s", fd, strerror(errno));
683 0           return NULL;
684             }
685              
686 4 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
687 0 0         REQREP_ERR("fd %d: invalid or incompatible reqrep", fd);
688 0           munmap(base, map_size);
689 0           return NULL;
690             }
691              
692 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
693 4 50         if (myfd < 0) {
694 0 0         REQREP_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
695 0           munmap(base, map_size);
696 0           return NULL;
697             }
698              
699 4           ReqRepHandle *h = reqrep_setup_handle(base, map_size, NULL, myfd);
700 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
701 4           return h;
702             }
703              
704 112           static void reqrep_destroy(ReqRepHandle *h) {
705 112 50         if (!h) return;
706 112 100         if (h->notify_fd >= 0) close(h->notify_fd);
707 112 100         if (h->reply_fd >= 0) close(h->reply_fd);
708 112 100         if (h->backing_fd >= 0) close(h->backing_fd);
709 112 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
710 112           free(h->copy_buf);
711 112           free(h->path);
712 112           free(h);
713             }
714              
715             /* ================================================================
716             * Request queue operations (client -> server)
717             * ================================================================ */
718              
719             /* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
720 2863           static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
721             uint32_t len, bool utf8,
722             uint32_t resp_slot_idx, uint32_t resp_gen) {
723 2863           ReqRepHeader *hdr = h->hdr;
724              
725 2863 50         if (len > REQREP_STR_LEN_MASK) return -2;
726              
727 2863 100         if (hdr->req_tail - hdr->req_head >= h->req_cap) {
728 4           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
729 4           return 0;
730             }
731              
732 2859           uint32_t alloc = (len + 7) & ~7u;
733 2859 100         if (alloc == 0) alloc = 8;
734             /* Single message must fit arena; else overflow into response slots. */
735 2859 50         if (alloc > h->req_arena_cap) return -2;
736 2859           uint32_t pos = hdr->arena_wpos;
737 2859           uint64_t skip = alloc;
738              
739 2859 100         if ((uint64_t)pos + alloc > h->req_arena_cap) {
740 1           skip += h->req_arena_cap - pos;
741 1           pos = 0;
742             }
743              
744 2859 100         if ((uint64_t)hdr->arena_used + skip > h->req_arena_cap) {
745 1 50         if (hdr->req_tail == hdr->req_head) {
746 0           hdr->arena_wpos = 0;
747 0           hdr->arena_used = 0;
748 0           pos = 0;
749 0           skip = alloc;
750             } else {
751 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
752 1           return 0;
753             }
754             }
755              
756 2858           memcpy(h->req_arena + pos, str, len);
757              
758 2858           uint32_t idx = (uint32_t)(hdr->req_tail & h->req_cap_mask);
759 2858           ReqSlot *slot = &h->req_slots[idx];
760 2858           slot->arena_off = pos;
761 2858 100         slot->packed_len = len | (utf8 ? REQREP_UTF8_FLAG : 0);
762 2858           slot->arena_skip = (uint32_t)skip;
763 2858           slot->resp_slot = resp_slot_idx;
764 2858           slot->resp_gen = resp_gen;
765              
766 2858           hdr->arena_wpos = pos + alloc;
767 2858           hdr->arena_used += (uint32_t)skip;
768 2858           hdr->req_tail++;
769 2858           __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
770 2858           return 1;
771             }
772              
773             /* Non-blocking send: acquire slot + push request.
774             * Returns 1=ok, 0=full, -2=too long, -3=no slots.
775             * On success, *out_id is the packed slot+generation ID. */
776 2864           static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
777             bool utf8, uint64_t *out_id) {
778 2864           int32_t slot = reqrep_slot_acquire(h);
779 2864 100         if (slot < 0) return -3;
780              
781 2863           RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
782 2863           uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);
783              
784 2863           reqrep_mutex_lock(h->hdr);
785 2863           int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
786 2863           reqrep_mutex_unlock(h->hdr);
787              
788 2863 100         if (r == 1) {
789 2858           reqrep_wake_consumers(h->hdr);
790 2858           *out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
791 2858           return 1;
792             }
793              
794 5           reqrep_slot_release(h, (uint32_t)slot);
795 5           return r;
796             }
797              
798             /* Blocking send with timeout. Returns 1=ok, 0=timeout, -2=too long, -3=no slots (timeout). */
799 2765           static int reqrep_send_wait(ReqRepHandle *h, const char *str, uint32_t len,
800             bool utf8, uint64_t *out_id, double timeout) {
801 2765           int r = reqrep_try_send(h, str, len, utf8, out_id);
802 2765 100         if (r == 1 || r == -2) return r;
    50          
803 1 50         if (timeout == 0) return r;
804              
805 1           ReqRepHeader *hdr = h->hdr;
806             struct timespec deadline, remaining;
807 1           int has_deadline = (timeout > 0);
808 1 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
809              
810 0           for (;;) {
811 1 50         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
812 1 50         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
813              
814 1           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
815 1           r = reqrep_try_send(h, str, len, utf8, out_id);
816 1 50         if (r == 1 || r == -2) return r;
    50          
817              
818 1           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
819 1           struct timespec *pts = NULL;
820 1 50         if (has_deadline) {
821 1 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
822 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
823 0           return r;
824             }
825 1           pts = &remaining;
826             }
827 1           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
828 1           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
829              
830 1           r = reqrep_try_send(h, str, len, utf8, out_id);
831 1 50         if (r == 1 || r == -2) return r;
    50          
832 1 50         if (rc == -1 && errno == ETIMEDOUT) return r;
    50          
833             }
834             }
835              
836             /* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
837 3747           static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
838             uint32_t *out_len, bool *out_utf8,
839             uint64_t *out_id) {
840 3747           ReqRepHeader *hdr = h->hdr;
841              
842 3747 100         if (hdr->req_tail == hdr->req_head) {
843 1495           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
844 1495           return 0;
845             }
846              
847 2252           uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
848 2252           ReqSlot *slot = &h->req_slots[idx];
849              
850 2252           uint32_t len = slot->packed_len & REQREP_STR_LEN_MASK;
851 2252           *out_utf8 = (slot->packed_len & REQREP_UTF8_FLAG) != 0;
852 2252           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
853              
854 2252 50         if (!reqrep_ensure_copy_buf(h, len + 1))
855 0           return -1;
856 2252 100         if (len > 0)
857 2248           memcpy(h->copy_buf, h->req_arena + slot->arena_off, len);
858 2252           h->copy_buf[len] = '\0';
859 2252           *out_str = h->copy_buf;
860 2252           *out_len = len;
861              
862 2252 50         if (hdr->arena_used >= slot->arena_skip)
863 2252           hdr->arena_used -= slot->arena_skip;
864             else
865 0           hdr->arena_used = 0;
866 2252 100         if (hdr->arena_used == 0)
867 1095           hdr->arena_wpos = 0;
868              
869 2252           hdr->req_head++;
870 2252           return 1;
871             }
872              
873             /* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
874 3719           static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
875             uint32_t *out_len, bool *out_utf8,
876             uint64_t *out_id) {
877 3719           reqrep_mutex_lock(h->hdr);
878 3719           int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
879 3719           reqrep_mutex_unlock(h->hdr);
880 3719 100         if (r == 1) reqrep_wake_producers(h->hdr);
881 3719           return r;
882             }
883              
884             /* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
885 2154           static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
886             uint32_t *out_len, bool *out_utf8,
887             uint64_t *out_id, double timeout) {
888 2154           int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
889 2154 100         if (r != 0) return r;
890 751 50         if (timeout == 0) return 0;
891              
892 751           ReqRepHeader *hdr = h->hdr;
893             struct timespec deadline, remaining;
894 751           int has_deadline = (timeout > 0);
895 751 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
896              
897 0           for (;;) {
898 751           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
899 751           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
900 751 100         if (r != 0) return r;
901              
902 723           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
903 723           struct timespec *pts = NULL;
904 723 50         if (has_deadline) {
905 723 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
906 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
907 0           return 0;
908             }
909 723           pts = &remaining;
910             }
911 723           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
912 723           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
913              
914 723           r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
915 723 100         if (r != 0) return r;
916 5 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
917             }
918             }
919              
920             /* ================================================================
921             * Response operations (server -> client)
922             * ================================================================ */
923              
924             /* Write response to a response slot.
925             * Returns 1=ok, -1=bad slot, -2=stale (cancelled/recycled), -3=too long. */
926 2249           static int reqrep_reply(ReqRepHandle *h, uint64_t id,
927             const char *str, uint32_t len, bool utf8) {
928 2249           uint32_t slot_idx = REQREP_ID_SLOT(id);
929 2249           uint32_t expected_gen = REQREP_ID_GEN(id);
930 2249 50         if (slot_idx >= h->resp_slots) return -1;
931 2249 100         if (len > h->resp_data_max) return -3;
932              
933 2248           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
934 2248           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
935 2248 100         if (state != RESP_ACQUIRED) return -2;
936 2055 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
937              
938 2047           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
939 2047 100         if (len > 0) memcpy(data, str, len);
940 2047           slot->resp_len = len;
941 2047           slot->resp_flags = utf8 ? 1 : 0;
942              
943             /* CAS ACQUIRED→READY to prevent race with concurrent cancel */
944 2047           uint32_t expected_state = RESP_ACQUIRED;
945 2047 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
946             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
947 0           return -2;
948              
949 2047 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
950 1838           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
951              
952 2047           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
953 2047           return 1;
954             }
955              
956             /* Non-blocking get response. Returns 1=ok, 0=not ready, -1=bad slot, -2=OOM, -4=stale. */
957 4718           static int reqrep_try_get(ReqRepHandle *h, uint64_t id,
958             const char **out_str, uint32_t *out_len, bool *out_utf8) {
959 4718           uint32_t slot_idx = REQREP_ID_SLOT(id);
960 4718           uint32_t expected_gen = REQREP_ID_GEN(id);
961 4718 50         if (slot_idx >= h->resp_slots) return -1;
962              
963 4718           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
964 4718           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
965 4718 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
966 4713 100         if (state != RESP_READY) return 0;
967              
968 2639           uint32_t len = slot->resp_len;
969 2639           *out_utf8 = (slot->resp_flags & 1) != 0;
970              
971 2639 50         if (!reqrep_ensure_copy_buf(h, len + 1)) return -2;
972              
973 2639           uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
974 2639 100         if (len > 0) memcpy(h->copy_buf, data, len);
975 2639           h->copy_buf[len] = '\0';
976 2639           *out_str = h->copy_buf;
977 2639           *out_len = len;
978              
979 2639           reqrep_slot_release(h, slot_idx);
980 2639           return 1;
981             }
982              
983             /* Blocking get response. Returns 1=ok, 0=timeout, -1=bad slot, -2=OOM, -4=stale. */
984 2554           static int reqrep_get_wait(ReqRepHandle *h, uint64_t id,
985             const char **out_str, uint32_t *out_len, bool *out_utf8,
986             double timeout) {
987 2554           int r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
988 2554 100         if (r != 0) return r;
989 2073 50         if (timeout == 0) return 0;
990              
991 2073           uint32_t slot_idx = REQREP_ID_SLOT(id);
992 2073           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
993             struct timespec deadline, remaining;
994 2073           int has_deadline = (timeout > 0);
995 2073 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
996              
997 1           for (;;) {
998 2074           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
999 2074 50         if (state == RESP_READY)
1000 0           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
1001              
1002 2074           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1003              
1004             /* Re-check: cancel may have fired between try_get and waiter registration */
1005 2074 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
1006 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1007 0           return -4;
1008             }
1009              
1010 2074           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1011 2074 50         if (state == RESP_READY) {
1012 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1013 0           return reqrep_try_get(h, id, out_str, out_len, out_utf8);
1014             }
1015              
1016 2074           struct timespec *pts = NULL;
1017 2074 100         if (has_deadline) {
1018 1763 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
1019 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1020 1           return 0;
1021             }
1022 1762           pts = &remaining;
1023             }
1024              
1025 2073           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
1026 2073           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1027              
1028 2073           r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
1029 2073 100         if (r != 0) return r;
1030             }
1031             }
1032              
1033             /* Cancel a pending request — CAS ACQUIRED→FREE only if generation matches.
1034             * If the reply already arrived (READY), cancel is a no-op — call get() to drain. */
1035 218           static void reqrep_cancel(ReqRepHandle *h, uint64_t id) {
1036 218           uint32_t slot_idx = REQREP_ID_SLOT(id);
1037 218           uint32_t expected_gen = REQREP_ID_GEN(id);
1038 218 50         if (slot_idx >= h->resp_slots) return;
1039 218           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1040 218 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
1041 218           uint32_t expected_state = RESP_ACQUIRED;
1042 218 100         if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
1043             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1044 207           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1045 207           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1046             /* Wake get_wait blocked on this slot's state futex */
1047 207 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1048 1           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1049 207           reqrep_wake_slot_waiters(h->hdr);
1050             }
1051             }
1052              
1053             /* Combined send + wait-for-reply with single deadline.
1054             * Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
1055 2548           static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
1056             bool req_utf8, const char **out_str, uint32_t *out_len,
1057             bool *out_utf8, double timeout) {
1058             uint64_t id;
1059             struct timespec deadline;
1060 2548           int has_deadline = (timeout > 0);
1061 2548 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1062              
1063 2548           int r = reqrep_send_wait(h, req_str, req_len, req_utf8, &id, timeout);
1064 2548 50         if (r != 1) return r;
1065              
1066 2548           double get_timeout = timeout;
1067 2548 100         if (has_deadline) {
1068             struct timespec now;
1069 1801           clock_gettime(CLOCK_MONOTONIC, &now);
1070 1801           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1071 1801           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1072 1801 50         if (get_timeout <= 0) {
1073 0           reqrep_cancel(h, id);
1074 0           return 0;
1075             }
1076             }
1077              
1078 2548           r = reqrep_get_wait(h, id, out_str, out_len, out_utf8, get_timeout);
1079 2548 100         if (r != 1) {
1080 1           reqrep_cancel(h, id);
1081             /* If reply arrived between timeout and cancel, drain to free the slot */
1082             const char *discard; uint32_t dlen; bool dutf8;
1083 1           reqrep_try_get(h, id, &discard, &dlen, &dutf8);
1084             }
1085 2548           return r;
1086             }
1087              
1088             /* Count response slots owned by this process */
1089 10           static uint32_t reqrep_pending(ReqRepHandle *h) {
1090 10           uint32_t mypid = (uint32_t)getpid();
1091 10           uint32_t count = 0;
1092 78 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1093 68           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1094 68           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_RELAXED);
1095 68 100         if ((state == RESP_ACQUIRED || state == RESP_READY) &&
    50          
1096 7 50         __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED) == mypid)
1097 7           count++;
1098             }
1099 10           return count;
1100             }
1101              
1102             /* ================================================================
1103             * Queue state
1104             * ================================================================ */
1105              
1106 40           static inline uint64_t reqrep_size(ReqRepHandle *h) {
1107 40           ReqRepHeader *hdr = h->hdr;
1108 40           uint64_t tail = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1109 40           uint64_t head = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1110 40           return tail - head;
1111             }
1112              
1113 1           static void reqrep_clear(ReqRepHandle *h) {
1114 1           ReqRepHeader *hdr = h->hdr;
1115 1           reqrep_mutex_lock(hdr);
1116 1           hdr->req_head = 0;
1117 1           hdr->req_tail = 0;
1118 1           hdr->arena_wpos = 0;
1119 1           hdr->arena_used = 0;
1120 1           reqrep_mutex_unlock(hdr);
1121              
1122             /* Release all in-flight response slots so get_wait callers unblock.
1123             * Retry CAS if reply races us (ACQUIRED→READY between load and CAS). */
1124 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1125 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1126 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1127 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1128 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1129             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1130 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1131 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1132 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1133 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1134 2           break;
1135             }
1136             /* CAS failed — state updated with actual value; retry if still valid */
1137             }
1138             }
1139              
1140 1           reqrep_wake_slot_waiters(hdr);
1141 1           reqrep_wake_producers(hdr);
1142 1           reqrep_wake_consumers(hdr);
1143 1           }
1144              
1145 0           static inline int reqrep_sync(ReqRepHandle *h) {
1146 0           return msync(h->hdr, h->mmap_size, MS_SYNC);
1147             }
1148              
1149             /* ================================================================
1150             * eventfd — event-loop integration
1151             *
1152             * Two separate eventfds:
1153             * notify_fd — request notification (client -> server: "new request")
1154             * reply_fd — reply notification (server -> client: "response ready")
1155             * ================================================================ */
1156              
1157 6           static inline int reqrep_eventfd_create(ReqRepHandle *h) {
1158 6 50         if (h->notify_fd >= 0) return h->notify_fd;
1159 6           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1160 6           return h->notify_fd;
1161             }
1162              
1163 2           static inline void reqrep_eventfd_set(ReqRepHandle *h, int fd) {
1164 2 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    0          
1165 0           close(h->notify_fd);
1166 2           h->notify_fd = fd;
1167 2           }
1168              
1169 3           static inline void reqrep_notify(ReqRepHandle *h) {
1170 3 50         if (h->notify_fd >= 0) {
1171 3           uint64_t one = 1;
1172 3           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1173             }
1174 3           }
1175              
1176 2           static inline int64_t reqrep_eventfd_consume(ReqRepHandle *h) {
1177 2 50         if (h->notify_fd < 0) return -1;
1178 2           uint64_t val = 0;
1179 2 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1180 2           return (int64_t)val;
1181             }
1182              
1183 9           static inline int reqrep_reply_eventfd_create(ReqRepHandle *h) {
1184 9 50         if (h->reply_fd >= 0) return h->reply_fd;
1185 9           h->reply_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1186 9           return h->reply_fd;
1187             }
1188              
1189 1           static inline void reqrep_reply_eventfd_set(ReqRepHandle *h, int fd) {
1190 1 50         if (h->reply_fd >= 0 && h->reply_fd != fd)
    0          
1191 0           close(h->reply_fd);
1192 1           h->reply_fd = fd;
1193 1           }
1194              
1195 4           static inline void reqrep_reply_notify(ReqRepHandle *h) {
1196 4 50         if (h->reply_fd >= 0) {
1197 4           uint64_t one = 1;
1198 4           ssize_t __attribute__((unused)) rc = write(h->reply_fd, &one, sizeof(one));
1199             }
1200 4           }
1201              
1202 2           static inline int64_t reqrep_reply_eventfd_consume(ReqRepHandle *h) {
1203 2 50         if (h->reply_fd < 0) return -1;
1204 2           uint64_t val = 0;
1205 2 50         if (read(h->reply_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1206 2           return (int64_t)val;
1207             }
1208              
1209             /* ================================================================
1210             * Int mode: lock-free Vyukov MPMC request queue, inline int64 response
1211             * ================================================================ */
1212              
1213             /* Returns 0 on success, -1 on overflow. */
1214 8           static int reqrep_int_compute_layout(uint32_t req_cap, uint32_t resp_slots_n,
1215             uint32_t *out_req_slots_off, uint32_t *out_resp_off,
1216             uint32_t *out_resp_stride, uint64_t *out_total_size) {
1217 8           uint32_t req_slots_off = sizeof(ReqRepHeader);
1218 8           uint64_t slots_end = (uint64_t)req_slots_off + (uint64_t)req_cap * sizeof(ReqIntSlot);
1219 8           uint32_t resp_stride = (sizeof(RespSlotHeader) + sizeof(int64_t) + 63) & ~63u;
1220 8           uint64_t resp_off = (slots_end + 63) & ~(uint64_t)63;
1221 8 50         if (resp_off > UINT32_MAX) return -1;
1222 8           *out_req_slots_off = req_slots_off;
1223 8           *out_resp_off = (uint32_t)resp_off;
1224 8           *out_resp_stride = resp_stride;
1225 8           *out_total_size = resp_off + (uint64_t)resp_slots_n * resp_stride;
1226 8           return 0;
1227             }
1228              
1229 7           static void reqrep_int_init_header(void *base, uint32_t req_cap, uint32_t resp_slots_n,
1230             uint64_t total_size, uint32_t req_slots_off,
1231             uint32_t resp_off, uint32_t resp_stride) {
1232 7           ReqRepHeader *hdr = (ReqRepHeader *)base;
1233 7           memset(hdr, 0, sizeof(ReqRepHeader));
1234 7           hdr->magic = REQREP_MAGIC;
1235 7           hdr->version = REQREP_VERSION;
1236 7           hdr->mode = REQREP_MODE_INT;
1237 7           hdr->req_cap = req_cap;
1238 7           hdr->total_size = total_size;
1239 7           hdr->req_slots_off = req_slots_off;
1240 7           hdr->resp_slots = resp_slots_n;
1241 7           hdr->resp_data_max = sizeof(int64_t);
1242 7           hdr->resp_off = resp_off;
1243 7           hdr->resp_stride = resp_stride;
1244              
1245             /* Vyukov: init sequence numbers */
1246 7           ReqIntSlot *slots = (ReqIntSlot *)((char *)base + req_slots_off);
1247 209 100         for (uint32_t i = 0; i < req_cap; i++)
1248 202           slots[i].sequence = i;
1249              
1250             /* Init response slots */
1251 58 100         for (uint32_t i = 0; i < resp_slots_n; i++) {
1252 51           RespSlotHeader *rs = (RespSlotHeader *)((uint8_t *)base + resp_off + (uint64_t)i * resp_stride);
1253 51           memset(rs, 0, sizeof(RespSlotHeader));
1254             }
1255              
1256 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1257 7           }
1258              
1259 7           static ReqRepHandle *reqrep_create_int(const char *path, uint32_t req_cap,
1260             uint32_t resp_slots_n, char *errbuf) {
1261 7 50         if (errbuf) errbuf[0] = '\0';
1262 7           req_cap = reqrep_next_pow2(req_cap);
1263 7 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1264 7 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1265              
1266             uint32_t req_slots_off, resp_off, resp_stride;
1267             uint64_t total_size;
1268 7 50         if (reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1269             &resp_off, &resp_stride, &total_size) < 0) {
1270 0 0         REQREP_ERR("layout overflow: req_cap too large for uint32 offsets");
1271 0           return NULL;
1272             }
1273              
1274 7           int anonymous = (path == NULL);
1275             size_t map_size;
1276             void *base;
1277              
1278 7 50         if (anonymous) {
1279 0           map_size = (size_t)total_size;
1280 0           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
1281             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
1282 0 0         if (base == MAP_FAILED) { REQREP_ERR("mmap(anonymous): %s", strerror(errno)); return NULL; }
    0          
1283             } else {
1284 7           int fd = open(path, O_RDWR | O_CREAT, 0666);
1285 7 50         if (fd < 0) { REQREP_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
1286 7 50         if (flock(fd, LOCK_EX) < 0) { REQREP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
1287             struct stat st;
1288 7 50         if (fstat(fd, &st) < 0) { REQREP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1289 7           int is_new = (st.st_size == 0);
1290 7 100         if (!is_new && (uint64_t)st.st_size < sizeof(ReqRepHeader)) {
    50          
1291 0 0         REQREP_ERR("%s: file too small", path); flock(fd, LOCK_UN); close(fd); return NULL;
1292             }
1293 7 100         if (is_new && ftruncate(fd, (off_t)total_size) < 0) {
    50          
1294 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
1295             }
1296 7 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
1297 7           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1298 7 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1299 7 100         if (!is_new) {
1300 1 50         if (!reqrep_validate_header((ReqRepHeader *)base, map_size, REQREP_MODE_INT)) {
1301 0 0         REQREP_ERR("%s: invalid or incompatible", path); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1302             }
1303 1           flock(fd, LOCK_UN); close(fd);
1304 1           return reqrep_setup_handle(base, map_size, path, -1);
1305             }
1306 6           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1307             req_slots_off, resp_off, resp_stride);
1308 6           flock(fd, LOCK_UN); close(fd);
1309 6           return reqrep_setup_handle(base, map_size, path, -1);
1310             }
1311              
1312 0           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1313             req_slots_off, resp_off, resp_stride);
1314 0           return reqrep_setup_handle(base, map_size, path, -1);
1315             }
1316              
1317 1           static ReqRepHandle *reqrep_create_int_memfd(const char *name, uint32_t req_cap,
1318             uint32_t resp_slots_n, char *errbuf) {
1319 1 50         if (errbuf) errbuf[0] = '\0';
1320 1           req_cap = reqrep_next_pow2(req_cap);
1321 1 50         if (req_cap == 0) { REQREP_ERR("invalid req_cap"); return NULL; }
    0          
1322 1 50         if (resp_slots_n == 0) { REQREP_ERR("resp_slots must be > 0"); return NULL; }
    0          
1323              
1324             uint32_t req_slots_off, resp_off, resp_stride;
1325             uint64_t total_size;
1326 1 50         if (reqrep_int_compute_layout(req_cap, resp_slots_n, &req_slots_off,
1327             &resp_off, &resp_stride, &total_size) < 0) {
1328 0 0         REQREP_ERR("layout overflow: req_cap too large for uint32 offsets");
1329 0           return NULL;
1330             }
1331              
1332 1 50         int fd = memfd_create(name ? name : "reqrep_int", MFD_CLOEXEC | MFD_ALLOW_SEALING);
1333 1 50         if (fd < 0) { REQREP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1334 1 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1335 0 0         REQREP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
1336             }
1337 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
1338 1           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1339 1 50         if (base == MAP_FAILED) { REQREP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
1340              
1341 1           reqrep_int_init_header(base, req_cap, resp_slots_n, total_size,
1342             req_slots_off, resp_off, resp_stride);
1343 1           return reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
1344             }
1345              
1346             /* --- Int request queue: lock-free Vyukov MPMC --- */
1347              
1348 40           static inline int reqrep_int_try_send(ReqRepHandle *h, int64_t value, uint64_t *out_id) {
1349 40           int32_t rslot = reqrep_slot_acquire(h);
1350 40 100         if (rslot < 0) return -3;
1351              
1352 39           RespSlotHeader *rs = reqrep_resp_slot(h, (uint32_t)rslot);
1353 39           uint32_t gen = __atomic_load_n(&rs->generation, __ATOMIC_ACQUIRE);
1354              
1355 39           ReqRepHeader *hdr = h->hdr;
1356 39           ReqIntSlot *slots = (ReqIntSlot *)((char *)h->hdr + hdr->req_slots_off);
1357 39           uint32_t mask = h->req_cap_mask;
1358 39           uint64_t pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1359              
1360 0           for (;;) {
1361 39           ReqIntSlot *slot = &slots[pos & mask];
1362 39           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1363 39           int64_t diff = (int64_t)seq - (int64_t)pos;
1364 39 100         if (diff == 0) {
1365 38 50         if (__atomic_compare_exchange_n(&hdr->req_tail, &pos, pos + 1,
1366             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1367 38           slot->value = value;
1368 38           slot->resp_slot = (uint32_t)rslot;
1369 38           slot->resp_gen = gen;
1370 38           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
1371 38           __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
1372 38           reqrep_wake_consumers(hdr);
1373 38           *out_id = REQREP_MAKE_ID((uint32_t)rslot, gen);
1374 38           return 1;
1375             }
1376 1 50         } else if (diff < 0) {
1377 1           __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
1378 1           reqrep_slot_release(h, (uint32_t)rslot);
1379 1           return 0;
1380             } else {
1381 0           pos = __atomic_load_n(&hdr->req_tail, __ATOMIC_RELAXED);
1382             }
1383             }
1384             }
1385              
1386 8           static int reqrep_int_send_wait(ReqRepHandle *h, int64_t value,
1387             uint64_t *out_id, double timeout) {
1388 8           int r = reqrep_int_try_send(h, value, out_id);
1389 8 50         if (r == 1) return 1;
1390 0 0         if (timeout == 0) return r;
1391 0           ReqRepHeader *hdr = h->hdr;
1392             struct timespec deadline, remaining;
1393 0           int has_deadline = (timeout > 0);
1394 0 0         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1395 0           for (;;) {
1396             /* Wait on slot_futex if no slots (-3), send_futex if queue full (0) */
1397 0 0         uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
1398 0 0         uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
1399              
1400 0           uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
1401 0           r = reqrep_int_try_send(h, value, out_id);
1402 0 0         if (r == 1) return 1;
1403              
1404 0           __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1405 0           struct timespec *pts = NULL;
1406 0 0         if (has_deadline) {
1407 0 0         if (!reqrep_remaining_time(&deadline, &remaining)) {
1408 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1409 0           return r;
1410             }
1411 0           pts = &remaining;
1412             }
1413 0           long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
1414 0           __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
1415 0           r = reqrep_int_try_send(h, value, out_id);
1416 0 0         if (r == 1) return 1;
1417 0 0         if (rc == -1 && errno == ETIMEDOUT) return r;
    0          
1418             }
1419             }
1420              
1421 46           static inline int reqrep_int_try_recv(ReqRepHandle *h, int64_t *out_value, uint64_t *out_id) {
1422 46           ReqRepHeader *hdr = h->hdr;
1423 46           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1424 46           uint32_t mask = h->req_cap_mask;
1425 46           uint64_t pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1426 0           for (;;) {
1427 46           ReqIntSlot *slot = &slots[pos & mask];
1428 46           uint64_t seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
1429 46           int64_t diff = (int64_t)seq - (int64_t)(pos + 1);
1430 46 100         if (diff == 0) {
1431 36 50         if (__atomic_compare_exchange_n(&hdr->req_head, &pos, pos + 1,
1432             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
1433 36           *out_value = slot->value;
1434 36           *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
1435 36           __atomic_store_n(&slot->sequence, pos + h->req_cap, __ATOMIC_RELEASE);
1436 36           reqrep_wake_producers(hdr);
1437 36           return 1;
1438             }
1439 10 50         } else if (diff < 0) {
1440 10           __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
1441 10           return 0;
1442             } else {
1443 0           pos = __atomic_load_n(&hdr->req_head, __ATOMIC_RELAXED);
1444             }
1445             }
1446             }
1447              
1448 7           static int reqrep_int_recv_wait(ReqRepHandle *h, int64_t *out_value,
1449             uint64_t *out_id, double timeout) {
1450 7 100         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1451 4 50         if (timeout == 0) return 0;
1452 4           ReqRepHeader *hdr = h->hdr;
1453             struct timespec deadline, remaining;
1454 4           int has_deadline = (timeout > 0);
1455 4 50         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1456 0           for (;;) {
1457 4           uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
1458 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1459 4           __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1460 4           struct timespec *pts = NULL;
1461 4 50         if (has_deadline) {
1462 4 50         if (!reqrep_remaining_time(&deadline, &remaining)) {
1463 0           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1464 0           return 0;
1465             }
1466 4           pts = &remaining;
1467             }
1468 4           long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
1469 4           __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
1470 4 50         if (reqrep_int_try_recv(h, out_value, out_id)) return 1;
1471 0 0         if (rc == -1 && errno == ETIMEDOUT) return 0;
    0          
1472             }
1473             }
1474              
1475             /* --- Int response: inline int64 in resp data area --- */
1476              
1477 35           static int reqrep_int_reply(ReqRepHandle *h, uint64_t id, int64_t value) {
1478 35           uint32_t slot_idx = REQREP_ID_SLOT(id);
1479 35           uint32_t expected_gen = REQREP_ID_GEN(id);
1480 35 50         if (slot_idx >= h->resp_slots) return -1;
1481              
1482 35           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1483 35           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1484 35 100         if (state != RESP_ACQUIRED) return -2;
1485 33 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
1486              
1487 32           *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader)) = value;
1488              
1489 32           uint32_t expected_state = RESP_ACQUIRED;
1490 32 50         if (!__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_READY,
1491             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
1492 0           return -2;
1493              
1494 32 100         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1495 5           syscall(SYS_futex, &slot->state, FUTEX_WAKE, 1, NULL, NULL, 0);
1496 32           __atomic_add_fetch(&h->hdr->stat_replies, 1, __ATOMIC_RELAXED);
1497 32           return 1;
1498             }
1499              
1500 40           static int reqrep_int_try_get(ReqRepHandle *h, uint64_t id, int64_t *out_value) {
1501 40           uint32_t slot_idx = REQREP_ID_SLOT(id);
1502 40           uint32_t expected_gen = REQREP_ID_GEN(id);
1503 40 50         if (slot_idx >= h->resp_slots) return -1;
1504              
1505 40           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1506 40           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1507 40 100         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -4;
1508 39 100         if (state != RESP_READY) return 0;
1509              
1510 32           *out_value = *(int64_t *)((uint8_t *)slot + sizeof(RespSlotHeader));
1511 32           reqrep_slot_release(h, slot_idx);
1512 32           return 1;
1513             }
1514              
1515 8           static int reqrep_int_get_wait(ReqRepHandle *h, uint64_t id, int64_t *out_value,
1516             double timeout) {
1517 8           int r = reqrep_int_try_get(h, id, out_value);
1518 8 100         if (r != 0) return r;
1519 6 50         if (timeout == 0) return 0;
1520 6           uint32_t slot_idx = REQREP_ID_SLOT(id);
1521 6           RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
1522             struct timespec deadline, remaining;
1523 6           int has_deadline = (timeout > 0);
1524 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1525 1           for (;;) {
1526 7           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1527 7 50         if (state == RESP_READY)
1528 0           return reqrep_int_try_get(h, id, out_value);
1529 7           __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1530 7 50         if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
1531 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1532 0           return -4;
1533             }
1534 7           state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1535 7 50         if (state == RESP_READY) {
1536 0           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1537 0           return reqrep_int_try_get(h, id, out_value);
1538             }
1539 7           struct timespec *pts = NULL;
1540 7 100         if (has_deadline) {
1541 2 100         if (!reqrep_remaining_time(&deadline, &remaining)) {
1542 1           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1543 1           return 0;
1544             }
1545 1           pts = &remaining;
1546             }
1547 6           syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
1548 6           __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
1549 6           r = reqrep_int_try_get(h, id, out_value);
1550 6 100         if (r != 0) return r;
1551             }
1552             }
1553              
1554 6           static int reqrep_int_request(ReqRepHandle *h, int64_t req_value, int64_t *out_value,
1555             double timeout) {
1556             uint64_t id;
1557             struct timespec deadline;
1558 6           int has_deadline = (timeout > 0);
1559 6 100         if (has_deadline) reqrep_make_deadline(timeout, &deadline);
1560 6           int r = reqrep_int_send_wait(h, req_value, &id, timeout);
1561 6 50         if (r != 1) return r;
1562 6           double get_timeout = timeout;
1563 6 100         if (has_deadline) {
1564             struct timespec now;
1565 1           clock_gettime(CLOCK_MONOTONIC, &now);
1566 1           get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
1567 1           (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
1568 1 50         if (get_timeout <= 0) { reqrep_cancel(h, id); return 0; }
1569             }
1570 6           r = reqrep_int_get_wait(h, id, out_value, get_timeout);
1571 6 100         if (r != 1) {
1572 1           reqrep_cancel(h, id);
1573             int64_t discard;
1574 1           reqrep_int_try_get(h, id, &discard);
1575             }
1576 6           return r;
1577             }
1578              
1579 6           static inline uint64_t reqrep_int_size(ReqRepHandle *h) {
1580 6           uint64_t tail = __atomic_load_n(&h->hdr->req_tail, __ATOMIC_RELAXED);
1581 6           uint64_t head = __atomic_load_n(&h->hdr->req_head, __ATOMIC_RELAXED);
1582 6           return tail - head;
1583             }
1584              
1585 1           static void reqrep_int_clear(ReqRepHandle *h) {
1586 1           ReqRepHeader *hdr = h->hdr;
1587              
1588             /* Reset Vyukov queue: head/tail to 0, reinit sequence numbers.
1589             * Use atomic stores — concurrent readers use atomic loads on these. */
1590 1           __atomic_store_n(&hdr->req_head, 0, __ATOMIC_RELAXED);
1591 1           __atomic_store_n(&hdr->req_tail, 0, __ATOMIC_RELAXED);
1592 1           ReqIntSlot *slots = (ReqIntSlot *)((char *)hdr + hdr->req_slots_off);
1593 17 100         for (uint32_t i = 0; i < h->req_cap; i++)
1594 16           __atomic_store_n(&slots[i].sequence, (uint64_t)i, __ATOMIC_RELAXED);
1595 1           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1596              
1597             /* Release all in-flight response slots (same as Str clear) */
1598 5 100         for (uint32_t i = 0; i < h->resp_slots; i++) {
1599 4           RespSlotHeader *slot = reqrep_resp_slot(h, i);
1600 4           uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
1601 4 100         while (state == RESP_ACQUIRED || state == RESP_READY) {
    50          
1602 2 50         if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
1603             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1604 2           __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
1605 2           __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
1606 2 50         if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
1607 0           syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1608 2           break;
1609             }
1610             }
1611             }
1612              
1613 1           reqrep_wake_slot_waiters(hdr);
1614 1           reqrep_wake_producers(hdr);
1615 1           reqrep_wake_consumers(hdr);
1616 1           }
1617              
1618             #endif /* REQREP_H */