File Coverage

queue.h
Criterion Covered Total %
statement 516 605 85.2
branch 306 556 55.0
condition n/a
subroutine n/a
pod n/a
total 822 1161 70.8


line stmt bran cond sub pod time code
1             /*
2             * queue.h -- Shared-memory MPMC queue for Linux
3             *
4             * Two variants:
5             * Int — lock-free Vyukov bounded MPMC queue (int64 values)
6             * Str — futex-mutex protected queue with circular arena (byte strings)
7             *
8             * Both use file-backed mmap(MAP_SHARED) for cross-process sharing,
9             * futex for blocking wait, and PID-based stale lock recovery.
10             */
11              
12             #ifndef QUEUE_H
13             #define QUEUE_H
14              
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31              
32             /* ================================================================
33             * Constants
34             * ================================================================ */
35              
36             #define QUEUE_MAGIC 0x51554531U /* "QUE1" */
37             #define QUEUE_VERSION 1
38             #define QUEUE_MODE_INT 0
39             #define QUEUE_MODE_STR 1
40             #define QUEUE_MODE_INT32 2
41             #define QUEUE_MODE_INT16 3
42             #define QUEUE_ERR_BUFLEN 256
43             #define QUEUE_SPIN_LIMIT 32
44             #define QUEUE_LOCK_TIMEOUT_SEC 2
45              
46             /* ================================================================
47             * Header (256 bytes = 4 cache lines, lives at start of mmap)
48             * ================================================================ */
49              
50             typedef struct {
51             /* ---- Cache line 0 (0-63): immutable after create ---- */
52             uint32_t magic; /* 0 */
53             uint32_t version; /* 4 */
54             uint32_t mode; /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
55             uint32_t capacity; /* 12: max elements (power of 2) */
56             uint64_t total_size; /* 16: mmap size */
57             uint64_t slots_off; /* 24: offset to slot array */
58             uint64_t arena_off; /* 32: str mode: offset to arena; int: 0 */
59             uint64_t arena_cap; /* 40: str mode: arena byte capacity; int: 0 */
60             uint8_t _pad0[16]; /* 48-63 */
61              
62             /* ---- Cache line 1 (64-127): head / consumer hot ---- */
63             uint64_t head; /* 64: consumer position */
64             uint32_t pop_waiters; /* 72: count of blocked consumers */
65             uint32_t pop_futex; /* 76: futex word for consumer wakeup */
66             uint8_t _pad1[48]; /* 80-127 */
67              
68             /* ---- Cache line 2 (128-191): tail / producer hot ---- */
69             uint64_t tail; /* 128: producer position */
70             uint32_t push_waiters; /* 136: count of blocked producers */
71             uint32_t push_futex; /* 140: futex word for producer wakeup */
72             uint8_t _pad2[48]; /* 144-191 */
73              
74             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
75             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
76             uint32_t mutex_waiters; /* 196 */
77             uint32_t arena_wpos; /* 200: str mode: next write position in arena */
78             uint32_t arena_used; /* 204: str mode: total arena bytes consumed */
79             uint64_t stat_push_ok; /* 208 */
80             uint64_t stat_pop_ok; /* 216 */
81             uint64_t stat_push_full; /* 224 */
82             uint64_t stat_pop_empty; /* 232 */
83             uint32_t stat_recoveries;/* 240 */
84             uint8_t _pad3[12]; /* 244-255 */
85             } QueueHeader;
86              
87             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
88             _Static_assert(sizeof(QueueHeader) == 256, "QueueHeader must be 256 bytes");
89             #endif
90              
91             /* ================================================================
92             * Slot types
93             * ================================================================ */
94              
95             /* Int slot: Vyukov MPMC sequence + value */
96             typedef struct {
97             uint64_t sequence;
98             int64_t value;
99             } QueueIntSlot; /* 16 bytes */
100              
101             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
102             typedef struct {
103             uint32_t sequence;
104             int32_t value;
105             } QueueInt32Slot; /* 8 bytes */
106              
107             typedef struct {
108             uint32_t sequence;
109             int16_t value;
110             int16_t _pad;
111             } QueueInt16Slot; /* 8 bytes */
112              
113             /* Str slot: arena pointer + length + skip (for FIFO arena free) */
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on pop (includes wrap waste) */
118             uint32_t prev_wpos; /* arena_wpos before this push (for pop_back rollback) */
119             } QueueStrSlot; /* 16 bytes */
120              
121             #define QUEUE_STR_UTF8_FLAG 0x80000000U
122             #define QUEUE_STR_LEN_MASK 0x7FFFFFFFU
123              
124             /* ================================================================
125             * Process-local handle
126             * ================================================================ */
127              
128             typedef struct {
129             QueueHeader *hdr;
130             void *slots; /* QueueIntSlot* or QueueStrSlot* */
131             char *arena; /* NULL for int mode */
132             size_t mmap_size;
133             uint32_t capacity;
134             uint32_t cap_mask; /* capacity - 1 */
135             uint64_t arena_cap;
136             char *copy_buf; /* for str pop: buffer to copy string before unlock */
137             uint32_t copy_buf_cap;
138             char *path;
139             int notify_fd; /* eventfd for event-loop integration, -1 if disabled */
140             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
141             } QueueHandle;
142              
143             /* ================================================================
144             * Utility
145             * ================================================================ */
146              
147 96           static inline uint32_t queue_next_pow2(uint32_t v) {
148 96 100         if (v < 2) return 2;
149 95 50         if (v > 0x80000000U) return 0;
150 95           v--;
151 95           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
152 95           return v + 1;
153             }
154              
155 32           static inline void queue_spin_pause(void) {
156             #if defined(__x86_64__) || defined(__i386__)
157 32           __asm__ volatile("pause" ::: "memory");
158             #elif defined(__aarch64__)
159             __asm__ volatile("yield" ::: "memory");
160             #else
161             __asm__ volatile("" ::: "memory");
162             #endif
163 32           }
164              
165 186           static inline int queue_ensure_copy_buf(QueueHandle *h, uint32_t needed) {
166 186 100         if (needed <= h->copy_buf_cap) return 1;
167 27 100         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
168 42 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
169 27           char *nb = (char *)realloc(h->copy_buf, ns);
170 27 50         if (!nb) return 0;
171 27           h->copy_buf = nb;
172 27           h->copy_buf_cap = ns;
173 27           return 1;
174             }
175              
176             /* ================================================================
177             * Futex helpers
178             * ================================================================ */
179              
180             #define QUEUE_MUTEX_WRITER_BIT 0x80000000U
181             #define QUEUE_MUTEX_PID_MASK 0x7FFFFFFFU
182             #define QUEUE_MUTEX_VAL(pid) (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))
183              
184 0           static inline int queue_pid_alive(uint32_t pid) {
185 0 0         if (pid == 0) return 1;
186 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
187             }
188              
189             static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };
190              
191 0           static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
192 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
193             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
194 0           return;
195 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
196 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
197 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
198             }
199              
200 413           static inline void queue_mutex_lock(QueueHeader *hdr) {
201 413           uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
202 446           for (int spin = 0; ; spin++) {
203 446           uint32_t expected = 0;
204 446 100         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
205             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
206 413           return;
207 33 100         if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
208 32           queue_spin_pause();
209 32           continue;
210             }
211 1           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
212 1           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
213 1 50         if (cur != 0) {
214 1           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
215             &queue_lock_timeout, NULL, 0);
216 1 50         if (rc == -1 && errno == ETIMEDOUT) {
    50          
217 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
218 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
219 0 0         if (val >= QUEUE_MUTEX_WRITER_BIT) {
220 0           uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
221 0 0         if (!queue_pid_alive(pid))
222 0           queue_recover_stale_mutex(hdr, val);
223             }
224 0           spin = 0;
225 0           continue;
226             }
227             }
228 1           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
229 1           spin = 0;
230             }
231             }
232              
233 413           static inline void queue_mutex_unlock(QueueHeader *hdr) {
234 413           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
235 413 100         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
236 1           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
237 413           }
238              
239             /* Wake blocked consumers (after push) */
240 398           static inline void queue_wake_consumers(QueueHeader *hdr) {
241 398 50         if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
242 0           __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
243 0           syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
244             }
245 398           }
246              
247             /* Wake blocked producers (after pop) */
248 578           static inline void queue_wake_producers(QueueHeader *hdr) {
249 578 50         if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
250 0           __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
251 0           syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
252             }
253 578           }
254              
255             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
256 26           static inline int queue_remaining_time(const struct timespec *deadline,
257             struct timespec *remaining) {
258             struct timespec now;
259 26           clock_gettime(CLOCK_MONOTONIC, &now);
260 26           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
261 26           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
262 26 100         if (remaining->tv_nsec < 0) {
263 18           remaining->tv_sec--;
264 18           remaining->tv_nsec += 1000000000L;
265             }
266 26           return remaining->tv_sec >= 0;
267             }
268              
269             /* Convert timeout in seconds (double) to absolute deadline */
270 25           static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
271 25           clock_gettime(CLOCK_MONOTONIC, deadline);
272 25           deadline->tv_sec += (time_t)timeout;
273 25           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
274 25 100         if (deadline->tv_nsec >= 1000000000L) {
275 1           deadline->tv_sec++;
276 1           deadline->tv_nsec -= 1000000000L;
277             }
278 25           }
279              
280             /* ================================================================
281             * Create / Open / Close
282             * ================================================================ */
283              
284             #define QUEUE_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, QUEUE_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
285              
286 84           static QueueHandle *queue_create(const char *path, uint32_t capacity,
287             uint32_t mode, uint64_t arena_cap_hint,
288             char *errbuf) {
289 84 50         if (errbuf) errbuf[0] = '\0';
290              
291 84           uint32_t cap = queue_next_pow2(capacity);
292 84 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
293 84 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
294              
295 84           uint64_t slots_off = sizeof(QueueHeader);
296 84           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
297 131 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
298 86 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
299 39 100         : sizeof(QueueStrSlot);
300 84           uint64_t arena_off = 0, arena_cap = 0;
301             uint64_t total_size;
302              
303 84 100         if (mode == QUEUE_MODE_STR) {
304 36           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
305 36           arena_off = (slots_end + 7) & ~(uint64_t)7;
306 36           arena_cap = arena_cap_hint;
307 36 100         if (arena_cap < 4096) arena_cap = 4096;
308 36 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
309 36           total_size = arena_off + arena_cap;
310             } else {
311 48           total_size = slots_off + (uint64_t)cap * slot_size;
312             }
313              
314 84           int anonymous = (path == NULL);
315             size_t map_size;
316             void *base;
317              
318 84 100         if (anonymous) {
319             /* Anonymous shared mmap — fork-inherited, no filesystem */
320 18           map_size = (size_t)total_size;
321 18           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
322             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
323 18 50         if (base == MAP_FAILED) {
324 0 0         QUEUE_ERR("mmap(anonymous): %s", strerror(errno));
325 0           return NULL;
326             }
327             } else {
328             /* File-backed shared mmap */
329 66           int fd = open(path, O_RDWR | O_CREAT, 0666);
330 70 50         if (fd < 0) { QUEUE_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
331              
332 66 50         if (flock(fd, LOCK_EX) < 0) {
333 0 0         QUEUE_ERR("flock(%s): %s", path, strerror(errno));
334 0           close(fd); return NULL;
335             }
336              
337             struct stat st;
338 66 50         if (fstat(fd, &st) < 0) {
339 0 0         QUEUE_ERR("fstat(%s): %s", path, strerror(errno));
340 0           flock(fd, LOCK_UN); close(fd); return NULL;
341             }
342              
343 66           int is_new = (st.st_size == 0);
344              
345 66 100         if (!is_new && (uint64_t)st.st_size < sizeof(QueueHeader)) {
    50          
346 0 0         QUEUE_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
347 0           flock(fd, LOCK_UN); close(fd); return NULL;
348             }
349              
350 66 100         if (is_new) {
351 57 50         if (ftruncate(fd, (off_t)total_size) < 0) {
352 0 0         QUEUE_ERR("ftruncate(%s): %s", path, strerror(errno));
353 0           flock(fd, LOCK_UN); close(fd); return NULL;
354             }
355             }
356              
357 66 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
358 66           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
359 66 50         if (base == MAP_FAILED) {
360 0 0         QUEUE_ERR("mmap(%s): %s", path, strerror(errno));
361 0           flock(fd, LOCK_UN); close(fd); return NULL;
362             }
363              
364 66           QueueHeader *hdr = (QueueHeader *)base;
365              
366 66 100         if (!is_new) {
367             /* Validate existing file */
368 27           int valid = (hdr->magic == QUEUE_MAGIC &&
369 9 50         hdr->version == QUEUE_VERSION &&
370 9 100         hdr->mode == mode &&
371 5 50         hdr->capacity > 0 &&
372 5 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
373 23 50         hdr->total_size == (uint64_t)st.st_size &&
    50          
374 5 50         hdr->slots_off == sizeof(QueueHeader));
375 9 100         if (mode == QUEUE_MODE_STR && valid)
    100          
376 4 50         valid = (hdr->arena_off > 0 &&
377 2 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
378 9 100         if (!valid) {
379 4 50         QUEUE_ERR("%s: invalid or incompatible queue file", path);
380 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
381             }
382 5           cap = hdr->capacity;
383 5           arena_cap = hdr->arena_cap;
384 5           flock(fd, LOCK_UN);
385 5           close(fd);
386 5           goto setup_handle;
387             }
388              
389 57           flock(fd, LOCK_UN);
390 57           close(fd);
391             }
392              
393             /* Initialize new queue (anonymous or new file) */
394             {
395 75           QueueHeader *hdr = (QueueHeader *)base;
396 75           memset(hdr, 0, sizeof(QueueHeader));
397 75           hdr->magic = QUEUE_MAGIC;
398 75           hdr->version = QUEUE_VERSION;
399 75           hdr->mode = mode;
400 75           hdr->capacity = cap;
401 75           hdr->total_size = total_size;
402 75           hdr->slots_off = slots_off;
403 75           hdr->arena_off = arena_off;
404 75           hdr->arena_cap = arena_cap;
405              
406             /* Initialize Vyukov sequence numbers for integer modes */
407             #define INIT_SEQ(STYPE, BASE, OFF, CAP) do { \
408             STYPE *s = (STYPE *)((char *)(BASE) + (OFF)); \
409             for (uint32_t _i = 0; _i < (CAP); _i++) s[_i].sequence = _i; \
410             } while(0)
411 2117 100         if (mode == QUEUE_MODE_INT) INIT_SEQ(QueueIntSlot, base, slots_off, cap);
    100          
412 1458 100         else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, base, slots_off, cap);
    100          
413 1075 100         else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, base, slots_off, cap);
    100          
414             #undef INIT_SEQ
415              
416 75           __atomic_thread_fence(__ATOMIC_SEQ_CST);
417             }
418              
419 80           setup_handle:;
420             {
421 80           QueueHeader *hdr = (QueueHeader *)base;
422 80           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
423 80 50         if (!h) { munmap(base, map_size); return NULL; }
424              
425 80           h->hdr = hdr;
426 80           h->slots = (char *)base + hdr->slots_off;
427 80 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
428 80           h->mmap_size = map_size;
429 80           h->capacity = cap;
430 80           h->cap_mask = cap - 1;
431 80           h->arena_cap = arena_cap;
432 80 100         h->path = path ? strdup(path) : NULL;
433 80           h->notify_fd = -1;
434 80           h->backing_fd = -1;
435              
436 80           return h;
437             }
438             }
439              
440             /* Create queue backed by memfd — shareable via fd passing (SCM_RIGHTS) */
441 12           static QueueHandle *queue_create_memfd(const char *name, uint32_t capacity,
442             uint32_t mode, uint64_t arena_cap_hint,
443             char *errbuf) {
444 12 50         if (errbuf) errbuf[0] = '\0';
445              
446 12           uint32_t cap = queue_next_pow2(capacity);
447 12 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
448 12 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
449              
450 12           uint64_t slots_off = sizeof(QueueHeader);
451 12           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
452 18 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
453 11 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
454 5 100         : sizeof(QueueStrSlot);
455 12           uint64_t arena_off = 0, arena_cap = 0, total_size;
456              
457 12 100         if (mode == QUEUE_MODE_STR) {
458 4           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
459 4           arena_off = (slots_end + 7) & ~(uint64_t)7;
460 4           arena_cap = arena_cap_hint;
461 4 50         if (arena_cap < 4096) arena_cap = 4096;
462 4 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
463 4           total_size = arena_off + arena_cap;
464             } else {
465 8           total_size = slots_off + (uint64_t)cap * slot_size;
466             }
467              
468 12 50         int fd = memfd_create(name ? name : "queue", MFD_CLOEXEC);
469 12 50         if (fd < 0) { QUEUE_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
470              
471 12 50         if (ftruncate(fd, (off_t)total_size) < 0) {
472 0 0         QUEUE_ERR("ftruncate(memfd): %s", strerror(errno));
473 0           close(fd); return NULL;
474             }
475              
476 12           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
477 12 50         if (base == MAP_FAILED) {
478 0 0         QUEUE_ERR("mmap(memfd): %s", strerror(errno));
479 0           close(fd); return NULL;
480             }
481              
482 12           QueueHeader *hdr = (QueueHeader *)base;
483 12           memset(hdr, 0, sizeof(QueueHeader));
484 12           hdr->magic = QUEUE_MAGIC;
485 12           hdr->version = QUEUE_VERSION;
486 12           hdr->mode = mode;
487 12           hdr->capacity = cap;
488 12           hdr->total_size = total_size;
489 12           hdr->slots_off = slots_off;
490 12           hdr->arena_off = arena_off;
491 12           hdr->arena_cap = arena_cap;
492              
493             #define INIT_SEQ(STYPE, BASE, OFF, CAP) do { \
494             STYPE *s = (STYPE *)((char *)(BASE) + (OFF)); \
495             for (uint32_t _i = 0; _i < (CAP); _i++) s[_i].sequence = _i; \
496             } while(0)
497 212 100         if (mode == QUEUE_MODE_INT) INIT_SEQ(QueueIntSlot, base, slots_off, cap);
    100          
498 38 100         else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, base, slots_off, cap);
    100          
499 37 100         else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, base, slots_off, cap);
    100          
500             #undef INIT_SEQ
501              
502 12           __atomic_thread_fence(__ATOMIC_SEQ_CST);
503              
504 12           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
505 12 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
506              
507 12           h->hdr = hdr;
508 12           h->slots = (char *)base + slots_off;
509 12 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + arena_off : NULL;
510 12           h->mmap_size = (size_t)total_size;
511 12           h->capacity = cap;
512 12           h->cap_mask = cap - 1;
513 12           h->arena_cap = arena_cap;
514 12           h->path = NULL;
515 12           h->notify_fd = -1;
516 12           h->backing_fd = fd;
517              
518 12           return h;
519             }
520              
521             /* Open queue from an existing fd (memfd received via SCM_RIGHTS or dup) */
522 2           static QueueHandle *queue_open_fd(int fd, uint32_t mode, char *errbuf) {
523 2 50         if (errbuf) errbuf[0] = '\0';
524              
525             struct stat st;
526 2 50         if (fstat(fd, &st) < 0) {
527 0 0         QUEUE_ERR("fstat(fd=%d): %s", fd, strerror(errno));
528 0           return NULL;
529             }
530              
531 2 50         if ((uint64_t)st.st_size < sizeof(QueueHeader)) {
532 0 0         QUEUE_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
533 0           return NULL;
534             }
535              
536 2           size_t map_size = (size_t)st.st_size;
537 2           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
538 2 50         if (base == MAP_FAILED) {
539 0 0         QUEUE_ERR("mmap(fd=%d): %s", fd, strerror(errno));
540 0           return NULL;
541             }
542              
543 2           QueueHeader *hdr = (QueueHeader *)base;
544 6           int valid = (hdr->magic == QUEUE_MAGIC &&
545 2 50         hdr->version == QUEUE_VERSION &&
546 2 50         hdr->mode == mode &&
547 2 50         hdr->capacity > 0 &&
548 2 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
549 6 50         hdr->total_size == (uint64_t)st.st_size &&
    50          
550 2 50         hdr->slots_off == sizeof(QueueHeader));
551 2 100         if (mode == QUEUE_MODE_STR && valid)
    50          
552 2 50         valid = (hdr->arena_off > 0 &&
553 1 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
554 2 50         if (!valid) {
555 0 0         QUEUE_ERR("fd %d: invalid or incompatible queue", fd);
556 0           munmap(base, map_size);
557 0           return NULL;
558             }
559              
560             /* Dup the fd so caller retains ownership of the original */
561 2           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
562 2 50         if (myfd < 0) {
563 0 0         QUEUE_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
564 0           munmap(base, map_size);
565 0           return NULL;
566             }
567              
568 2           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
569 2 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
570              
571 2           h->hdr = hdr;
572 2           h->slots = (char *)base + hdr->slots_off;
573 2 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
574 2           h->mmap_size = map_size;
575 2           h->capacity = hdr->capacity;
576 2           h->cap_mask = hdr->capacity - 1;
577 2           h->arena_cap = hdr->arena_cap;
578 2           h->path = NULL;
579 2           h->notify_fd = -1;
580 2           h->backing_fd = myfd;
581              
582 2           return h;
583             }
584              
585 94           static void queue_destroy(QueueHandle *h) {
586 94 50         if (!h) return;
587 94 100         if (h->notify_fd >= 0) close(h->notify_fd);
588 94 100         if (h->backing_fd >= 0) close(h->backing_fd);
589 94 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
590 94           free(h->copy_buf);
591 94           free(h->path);
592 94           free(h);
593             }
594              
595             /* ================================================================
596             * Int queue macro template: Vyukov bounded MPMC (lock-free)
597             *
598             * DEFINE_INT_QUEUE(prefix, SlotType, ValType, SeqType, DiffType)
599             * generates: queue__try_push, try_pop, push_wait, pop_wait,
600             * peek, size, clear
601             * ================================================================ */
602              
603             #define DEFINE_INT_QUEUE(PFX, SLOT, VTYPE, STYPE, DTYPE) \
604             \
605             static inline int queue_##PFX##_try_push(QueueHandle *h, VTYPE value) { \
606             QueueHeader *hdr = h->hdr; \
607             SLOT *slots = (SLOT *)h->slots; \
608             uint32_t mask = h->cap_mask; \
609             uint64_t pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
610             for (;;) { \
611             SLOT *slot = &slots[pos & mask]; \
612             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
613             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)pos; \
614             if (diff == 0) { \
615             if (__atomic_compare_exchange_n(&hdr->tail, &pos, pos + 1, \
616             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
617             slot->value = value; \
618             __atomic_store_n(&slot->sequence, (STYPE)(pos + 1), \
619             __ATOMIC_RELEASE); \
620             __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED); \
621             queue_wake_consumers(hdr); \
622             return 1; \
623             } \
624             } else if (diff < 0) { \
625             __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED); \
626             return 0; \
627             } else { \
628             pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
629             } \
630             } \
631             } \
632             \
633             static inline int queue_##PFX##_try_pop(QueueHandle *h, VTYPE *value) { \
634             QueueHeader *hdr = h->hdr; \
635             SLOT *slots = (SLOT *)h->slots; \
636             uint32_t mask = h->cap_mask; \
637             uint64_t pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
638             for (;;) { \
639             SLOT *slot = &slots[pos & mask]; \
640             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
641             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)(pos + 1); \
642             if (diff == 0) { \
643             if (__atomic_compare_exchange_n(&hdr->head, &pos, pos + 1, \
644             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
645             *value = slot->value; \
646             __atomic_store_n(&slot->sequence, \
647             (STYPE)(pos + h->capacity), __ATOMIC_RELEASE); \
648             __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED); \
649             queue_wake_producers(hdr); \
650             return 1; \
651             } \
652             } else if (diff < 0) { \
653             __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED); \
654             return 0; \
655             } else { \
656             pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
657             } \
658             } \
659             } \
660             \
661             static int queue_##PFX##_push_wait(QueueHandle *h, VTYPE value, \
662             double timeout) { \
663             if (queue_##PFX##_try_push(h, value)) return 1; \
664             if (timeout == 0) return 0; \
665             QueueHeader *hdr = h->hdr; \
666             struct timespec deadline, remaining; \
667             int has_deadline = (timeout > 0); \
668             if (has_deadline) queue_make_deadline(timeout, &deadline); \
669             for (;;) { \
670             uint32_t fseq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE); \
671             if (queue_##PFX##_try_push(h, value)) return 1; \
672             __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
673             struct timespec *pts = NULL; \
674             if (has_deadline) { \
675             if (!queue_remaining_time(&deadline, &remaining)) { \
676             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
677             return 0; \
678             } \
679             pts = &remaining; \
680             } \
681             long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, \
682             fseq, pts, NULL, 0); \
683             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
684             if (queue_##PFX##_try_push(h, value)) return 1; \
685             if (rc == -1 && errno == ETIMEDOUT) return 0; \
686             } \
687             } \
688             \
689             static int queue_##PFX##_pop_wait(QueueHandle *h, VTYPE *value, \
690             double timeout) { \
691             if (queue_##PFX##_try_pop(h, value)) return 1; \
692             if (timeout == 0) return 0; \
693             QueueHeader *hdr = h->hdr; \
694             struct timespec deadline, remaining; \
695             int has_deadline = (timeout > 0); \
696             if (has_deadline) queue_make_deadline(timeout, &deadline); \
697             for (;;) { \
698             uint32_t fseq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE); \
699             if (queue_##PFX##_try_pop(h, value)) return 1; \
700             __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
701             struct timespec *pts = NULL; \
702             if (has_deadline) { \
703             if (!queue_remaining_time(&deadline, &remaining)) { \
704             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
705             return 0; \
706             } \
707             pts = &remaining; \
708             } \
709             long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, \
710             fseq, pts, NULL, 0); \
711             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
712             if (queue_##PFX##_try_pop(h, value)) return 1; \
713             if (rc == -1 && errno == ETIMEDOUT) return 0; \
714             } \
715             } \
716             \
717             static inline int queue_##PFX##_peek(QueueHandle *h, VTYPE *value) { \
718             SLOT *slots = (SLOT *)h->slots; \
719             uint64_t pos = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE); \
720             SLOT *slot = &slots[pos & h->cap_mask]; \
721             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
722             if ((DTYPE)seq - (DTYPE)(STYPE)(pos + 1) == 0) { \
723             *value = slot->value; \
724             return 1; \
725             } \
726             return 0; \
727             } \
728             \
729             static inline uint64_t queue_##PFX##_size(QueueHandle *h) { \
730             uint64_t tail = __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED); \
731             uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_RELAXED); \
732             return tail - head; \
733             } \
734             \
735             static void queue_##PFX##_clear(QueueHandle *h) { \
736             VTYPE tmp; \
737             while (queue_##PFX##_try_pop(h, &tmp)) {} \
738             }
739              
740             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
741 668 100         DEFINE_INT_QUEUE(int, QueueIntSlot, int64_t, uint64_t, int64_t)
  49 100          
  22 100          
  9 50          
  114 50          
  15 50          
  294 50          
  165 50          
    100          
    50          
    50          
    100          
    100          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
742              
743             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
744 310 100         DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)
  17 50          
  6 100          
  1 50          
  103 50          
  0 50          
  145 50          
  38 50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    100          
    50          
    50          
745              
746             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
747 26 0         DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)
  0 0          
  1 0          
  0 0          
  0 0          
  0 0          
  13 0          
  12 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    50          
    50          
    0          
748              
749             /* ================================================================
750             * Str queue: mutex-protected with circular arena
751             * ================================================================ */
752              
753             /* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
754 187           static inline int queue_str_push_locked(QueueHandle *h, const char *str,
755             uint32_t len, bool utf8) {
756 187           QueueHeader *hdr = h->hdr;
757              
758 187 50         if (len > QUEUE_STR_LEN_MASK) return -2;
759              
760 187 100         if (hdr->tail - hdr->head >= h->capacity) {
761 11           hdr->stat_push_full++;
762 11           return 0;
763             }
764              
765 176           uint32_t alloc = (len + 7) & ~7u;
766 176 100         if (alloc == 0) alloc = 8;
767 176           uint32_t saved_wpos = hdr->arena_wpos;
768 176           uint32_t pos = saved_wpos;
769 176           uint64_t skip = alloc;
770              
771 176 100         if ((uint64_t)pos + alloc > h->arena_cap) {
772 2           skip += h->arena_cap - pos;
773 2           pos = 0;
774             }
775              
776 176 100         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
777 1 50         if (hdr->tail == hdr->head) {
778 0           hdr->arena_wpos = 0;
779 0           hdr->arena_used = 0;
780 0           saved_wpos = 0;
781 0           pos = 0;
782 0           skip = alloc;
783             } else {
784 1           hdr->stat_push_full++;
785 1           return 0;
786             }
787             }
788              
789 175           memcpy(h->arena + pos, str, len);
790              
791 175           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
792 175           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
793 175           slot->arena_off = pos;
794 175 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
795 175           slot->arena_skip = (uint32_t)skip;
796 175           slot->prev_wpos = saved_wpos;
797              
798 175           hdr->arena_wpos = pos + alloc;
799 175           hdr->arena_used += (uint32_t)skip;
800 175           hdr->tail++;
801 175           hdr->stat_push_ok++;
802 175           return 1;
803             }
804              
805 181           static inline int queue_str_try_push(QueueHandle *h, const char *str,
806             uint32_t len, bool utf8) {
807 181           queue_mutex_lock(h->hdr);
808 181           int r = queue_str_push_locked(h, str, len, utf8);
809 181           queue_mutex_unlock(h->hdr);
810 181 100         if (r == 1) queue_wake_consumers(h->hdr);
811 181           return r;
812             }
813              
814             /* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
815 191           static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
816             uint32_t *out_len, bool *out_utf8) {
817 191           QueueHeader *hdr = h->hdr;
818              
819 191 100         if (hdr->tail == hdr->head) {
820 30           hdr->stat_pop_empty++;
821 30           return 0;
822             }
823              
824 161           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
825 161           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
826              
827 161           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
828 161           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
829              
830 161 50         if (!queue_ensure_copy_buf(h, len + 1))
831 0           return -1;
832 161 100         if (len > 0)
833 160           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
834 161           h->copy_buf[len] = '\0';
835 161           *out_str = h->copy_buf;
836 161           *out_len = len;
837              
838 161           hdr->arena_used -= slot->arena_skip;
839 161 100         if (hdr->arena_used == 0)
840 48           hdr->arena_wpos = 0;
841              
842 161           hdr->head++;
843 161           hdr->stat_pop_ok++;
844 161           return 1;
845             }
846              
847 160           static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
848             uint32_t *out_len, bool *out_utf8) {
849 160           queue_mutex_lock(h->hdr);
850 160           int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
851 160           queue_mutex_unlock(h->hdr);
852 160 100         if (r == 1) queue_wake_producers(h->hdr);
853 160           return r;
854             }
855              
856 8           static int queue_str_push_wait(QueueHandle *h, const char *str,
857             uint32_t len, bool utf8, double timeout) {
858 8           int r = queue_str_try_push(h, str, len, utf8);
859 8 100         if (r != 0) return r; /* 1 = success, -2 = too long */
860 3 50         if (timeout == 0) return 0;
861              
862 3           QueueHeader *hdr = h->hdr;
863             struct timespec deadline, remaining;
864 3           int has_deadline = (timeout > 0);
865 3 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
866              
867 0           for (;;) {
868 3           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
869 3           r = queue_str_try_push(h, str, len, utf8);
870 3 50         if (r != 0) return r;
871              
872 3           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
873 3           struct timespec *pts = NULL;
874 3 50         if (has_deadline) {
875 3 50         if (!queue_remaining_time(&deadline, &remaining)) {
876 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
877 0           return 0;
878             }
879 3           pts = &remaining;
880             }
881 3           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
882 3           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
883              
884 3           r = queue_str_try_push(h, str, len, utf8);
885 3 100         if (r != 0) return r;
886 2 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
887             }
888             }
889              
890 57           static int queue_str_pop_wait(QueueHandle *h, const char **out_str,
891             uint32_t *out_len, bool *out_utf8, double timeout) {
892 57           int r = queue_str_try_pop(h, out_str, out_len, out_utf8);
893 57 100         if (r != 0) return r;
894 7 50         if (timeout == 0) return 0;
895              
896 7           QueueHeader *hdr = h->hdr;
897             struct timespec deadline, remaining;
898 7           int has_deadline = (timeout > 0);
899 7 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
900              
901 1           for (;;) {
902 8           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
903 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
904 8 50         if (r != 0) return r;
905              
906 8           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
907 8           struct timespec *pts = NULL;
908 8 50         if (has_deadline) {
909 8 50         if (!queue_remaining_time(&deadline, &remaining)) {
910 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
911 0           return 0;
912             }
913 8           pts = &remaining;
914             }
915 8           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
916 8           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
917              
918 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
919 8 100         if (r != 0) return r;
920 3 100         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
921             }
922             }
923              
924 26           static inline uint64_t queue_str_size(QueueHandle *h) {
925 26           QueueHeader *hdr = h->hdr;
926 26           uint64_t tail = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
927 26           uint64_t head = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
928 26           return tail - head; /* unsigned wrap is correct for push_front (head > tail) */
929             }
930              
931 7           static void queue_str_clear(QueueHandle *h) {
932 7           QueueHeader *hdr = h->hdr;
933 7           queue_mutex_lock(hdr);
934 7           hdr->head = 0;
935 7           hdr->tail = 0;
936 7           hdr->arena_wpos = 0;
937 7           hdr->arena_used = 0;
938 7           queue_mutex_unlock(hdr);
939 7           queue_wake_producers(hdr);
940 7           queue_wake_consumers(hdr);
941 7           }
942              
943             /* Peek: read front element without consuming (exact, under mutex). */
944 7           static inline int queue_str_peek(QueueHandle *h, const char **out_str,
945             uint32_t *out_len, bool *out_utf8) {
946 7           QueueHeader *hdr = h->hdr;
947 7           queue_mutex_lock(hdr);
948 7 100         if (hdr->tail == hdr->head) {
949 1           queue_mutex_unlock(hdr);
950 1           return 0;
951             }
952 6           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
953 6           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
954 6           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
955 6           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
956 6 50         if (!queue_ensure_copy_buf(h, len + 1)) {
957 0           queue_mutex_unlock(hdr);
958 0           return -1;
959             }
960 6 50         if (len > 0)
961 6           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
962 6           h->copy_buf[len] = '\0';
963 6           *out_str = h->copy_buf;
964 6           *out_len = len;
965 6           queue_mutex_unlock(hdr);
966 6           return 1;
967             }
968              
969             /* Push to front of queue (requeue). Str only — Int is strictly FIFO. */
970 21           static inline int queue_str_push_front(QueueHandle *h, const char *str,
971             uint32_t len, bool utf8) {
972 21           QueueHeader *hdr = h->hdr;
973 21           queue_mutex_lock(hdr);
974              
975 21 50         if (len > QUEUE_STR_LEN_MASK) {
976 0           queue_mutex_unlock(hdr);
977 0           return -2;
978             }
979              
980 21           uint64_t size = hdr->tail - hdr->head;
981 21 100         if (size >= h->capacity) {
982 6           hdr->stat_push_full++;
983 6           queue_mutex_unlock(hdr);
984 6           return 0;
985             }
986              
987 15           uint32_t alloc = (len + 7) & ~7u;
988 15 50         if (alloc == 0) alloc = 8;
989 15           uint32_t saved_wpos = hdr->arena_wpos;
990 15           uint32_t pos = saved_wpos;
991 15           uint64_t skip = alloc;
992              
993 15 50         if ((uint64_t)pos + alloc > h->arena_cap) {
994 0           skip += h->arena_cap - pos;
995 0           pos = 0;
996             }
997              
998 15 50         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
999 0 0         if (hdr->tail == hdr->head) {
1000 0           hdr->arena_wpos = 0;
1001 0           hdr->arena_used = 0;
1002 0           saved_wpos = 0;
1003 0           pos = 0;
1004 0           skip = alloc;
1005             } else {
1006 0           hdr->stat_push_full++;
1007 0           queue_mutex_unlock(hdr);
1008 0           return 0;
1009             }
1010             }
1011              
1012 15           memcpy(h->arena + pos, str, len);
1013              
1014 15           hdr->head--;
1015 15           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
1016 15           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1017 15           slot->arena_off = pos;
1018 15 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
1019 15           slot->arena_skip = (uint32_t)skip;
1020 15           slot->prev_wpos = saved_wpos;
1021              
1022 15           hdr->arena_wpos = pos + alloc;
1023 15           hdr->arena_used += (uint32_t)skip;
1024 15           hdr->stat_push_ok++;
1025              
1026 15           queue_mutex_unlock(hdr);
1027 15           queue_wake_consumers(hdr);
1028 15           return 1;
1029             }
1030              
1031 3           static int queue_str_push_front_wait(QueueHandle *h, const char *str,
1032             uint32_t len, bool utf8, double timeout) {
1033 3           int r = queue_str_push_front(h, str, len, utf8);
1034 3 100         if (r != 0) return r;
1035 2 50         if (timeout == 0) return 0;
1036              
1037 2           QueueHeader *hdr = h->hdr;
1038             struct timespec deadline, remaining;
1039 2           int has_deadline = (timeout > 0);
1040 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1041              
1042 0           for (;;) {
1043 2           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
1044 2           r = queue_str_push_front(h, str, len, utf8);
1045 2 50         if (r != 0) return r;
1046              
1047 2           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1048 2           struct timespec *pts = NULL;
1049 2 50         if (has_deadline) {
1050 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1051 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1052 0           return 0;
1053             }
1054 2           pts = &remaining;
1055             }
1056 2           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1057 2           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1058              
1059 2           r = queue_str_push_front(h, str, len, utf8);
1060 2 100         if (r != 0) return r;
1061 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1062             }
1063             }
1064              
1065             /* Pop from back (tail). Str only — undoes the most recent push. */
1066 26           static inline int queue_str_pop_back(QueueHandle *h, const char **out_str,
1067             uint32_t *out_len, bool *out_utf8) {
1068 26           QueueHeader *hdr = h->hdr;
1069 26           queue_mutex_lock(hdr);
1070              
1071 26 100         if (hdr->tail == hdr->head) {
1072 7           hdr->stat_pop_empty++;
1073 7           queue_mutex_unlock(hdr);
1074 7           return 0;
1075             }
1076              
1077 19           hdr->tail--;
1078 19           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
1079 19           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1080              
1081 19           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
1082 19           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
1083              
1084 19 50         if (!queue_ensure_copy_buf(h, len + 1)) {
1085 0           hdr->tail++; /* rollback */
1086 0           queue_mutex_unlock(hdr);
1087 0           return -1;
1088             }
1089 19 50         if (len > 0)
1090 19           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
1091 19           h->copy_buf[len] = '\0';
1092 19           *out_str = h->copy_buf;
1093 19           *out_len = len;
1094              
1095 19           hdr->arena_used -= slot->arena_skip;
1096             /* Restore arena_wpos to before this slot's push if it's the frontier.
1097             * prev_wpos correctly handles wrap waste — it's the pre-push state
1098             * including the original position before any wrap adjustment. */
1099             {
1100 19           uint32_t slot_alloc = (len + 7) & ~7u;
1101 19 50         if (slot_alloc == 0) slot_alloc = 8;
1102 19 100         if (slot->arena_off + slot_alloc == hdr->arena_wpos)
1103 16           hdr->arena_wpos = slot->prev_wpos;
1104             }
1105 19 100         if (hdr->arena_used == 0)
1106 7           hdr->arena_wpos = 0;
1107              
1108 19           hdr->stat_pop_ok++;
1109 19           queue_mutex_unlock(hdr);
1110 19           queue_wake_producers(hdr);
1111 19           return 1;
1112             }
1113              
1114 3           static int queue_str_pop_back_wait(QueueHandle *h, const char **out_str,
1115             uint32_t *out_len, bool *out_utf8, double timeout) {
1116 3           int r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1117 3 100         if (r != 0) return r;
1118 2 50         if (timeout == 0) return 0;
1119              
1120 2           QueueHeader *hdr = h->hdr;
1121             struct timespec deadline, remaining;
1122 2           int has_deadline = (timeout > 0);
1123 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1124              
1125 0           for (;;) {
1126 2           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
1127 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1128 2 50         if (r != 0) return r;
1129              
1130 2           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1131 2           struct timespec *pts = NULL;
1132 2 50         if (has_deadline) {
1133 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1134 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1135 0           return 0;
1136             }
1137 2           pts = &remaining;
1138             }
1139 2           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1140 2           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1141              
1142 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1143 2 100         if (r != 0) return r;
1144 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1145             }
1146             }
1147              
1148             /* msync — flush mmap to disk for crash durability */
1149 5           static inline int queue_sync(QueueHandle *h) {
1150 5           return msync(h->hdr, h->mmap_size, MS_SYNC);
1151             }
1152              
1153             /* ================================================================
1154             * eventfd — event-loop integration
1155             * ================================================================ */
1156              
1157 11           static inline int queue_eventfd_create(QueueHandle *h) {
1158 11 100         if (h->notify_fd >= 0) return h->notify_fd;
1159 10           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1160 10           return h->notify_fd;
1161             }
1162              
1163 1           static inline void queue_eventfd_set(QueueHandle *h, int fd) {
1164 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    50          
1165 1           close(h->notify_fd);
1166 1           h->notify_fd = fd;
1167 1           }
1168              
1169             /* Signal that data is available. Called after successful push. */
1170 8           static inline void queue_notify(QueueHandle *h) {
1171 8 100         if (h->notify_fd >= 0) {
1172 7           uint64_t one = 1;
1173 7           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1174             }
1175 8           }
1176              
1177             /* Consume notification counter. Call from event-loop callback before pop. */
1178 10           static inline void queue_eventfd_consume(QueueHandle *h) {
1179 10 50         if (h->notify_fd >= 0) {
1180             uint64_t val;
1181 10           ssize_t __attribute__((unused)) rc = read(h->notify_fd, &val, sizeof(val));
1182             }
1183 10           }
1184              
1185             #endif /* QUEUE_H */