File Coverage

queue.h
Criterion Covered Total %
statement 510 614 83.0
branch 300 564 53.1
condition n/a
subroutine n/a
pod n/a
total 810 1178 68.7


line stmt bran cond sub pod time code
1             /*
2             * queue.h -- Shared-memory MPMC queue for Linux
3             *
4             * Two variants:
5             * Int — lock-free Vyukov bounded MPMC queue (int64 values)
6             * Str — futex-mutex protected queue with circular arena (byte strings)
7             *
8             * Both use file-backed mmap(MAP_SHARED) for cross-process sharing,
9             * futex for blocking wait, and PID-based stale lock recovery.
10             */
11              
12             #ifndef QUEUE_H
13             #define QUEUE_H
14              
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31              
32             /* ================================================================
33             * Constants
34             * ================================================================ */
35              
36             #define QUEUE_MAGIC 0x51554531U /* "QUE1" */
37             #define QUEUE_VERSION 1
38             #define QUEUE_MODE_INT 0
39             #define QUEUE_MODE_STR 1
40             #define QUEUE_MODE_INT32 2
41             #define QUEUE_MODE_INT16 3
42             #define QUEUE_ERR_BUFLEN 256
43             #define QUEUE_SPIN_LIMIT 32
44             #define QUEUE_LOCK_TIMEOUT_SEC 2
45              
46             /* ================================================================
47             * Header (256 bytes = 4 cache lines, lives at start of mmap)
48             * ================================================================ */
49              
50             typedef struct {
51             /* ---- Cache line 0 (0-63): immutable after create ---- */
52             uint32_t magic; /* 0 */
53             uint32_t version; /* 4 */
54             uint32_t mode; /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
55             uint32_t capacity; /* 12: max elements (power of 2) */
56             uint64_t total_size; /* 16: mmap size */
57             uint64_t slots_off; /* 24: offset to slot array */
58             uint64_t arena_off; /* 32: str mode: offset to arena; int: 0 */
59             uint64_t arena_cap; /* 40: str mode: arena byte capacity; int: 0 */
60             uint8_t _pad0[16]; /* 48-63 */
61              
62             /* ---- Cache line 1 (64-127): head / consumer hot ---- */
63             uint64_t head; /* 64: consumer position */
64             uint32_t pop_waiters; /* 72: count of blocked consumers */
65             uint32_t pop_futex; /* 76: futex word for consumer wakeup */
66             uint8_t _pad1[48]; /* 80-127 */
67              
68             /* ---- Cache line 2 (128-191): tail / producer hot ---- */
69             uint64_t tail; /* 128: producer position */
70             uint32_t push_waiters; /* 136: count of blocked producers */
71             uint32_t push_futex; /* 140: futex word for producer wakeup */
72             uint8_t _pad2[48]; /* 144-191 */
73              
74             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
75             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
76             uint32_t mutex_waiters; /* 196 */
77             uint32_t arena_wpos; /* 200: str mode: next write position in arena */
78             uint32_t arena_used; /* 204: str mode: total arena bytes consumed */
79             uint64_t stat_push_ok; /* 208 */
80             uint64_t stat_pop_ok; /* 216 */
81             uint64_t stat_push_full; /* 224 */
82             uint64_t stat_pop_empty; /* 232 */
83             uint64_t stat_recoveries;/* 240 */
84             uint8_t _pad3[8]; /* 248-255 */
85             } QueueHeader;
86              
87             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
88             _Static_assert(sizeof(QueueHeader) == 256, "QueueHeader must be 256 bytes");
89             #endif
90              
91             /* ================================================================
92             * Slot types
93             * ================================================================ */
94              
95             /* Int slot: Vyukov MPMC sequence + value */
96             typedef struct {
97             uint64_t sequence;
98             int64_t value;
99             } QueueIntSlot; /* 16 bytes */
100              
101             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
102             typedef struct {
103             uint32_t sequence;
104             int32_t value;
105             } QueueInt32Slot; /* 8 bytes */
106              
107             typedef struct {
108             uint32_t sequence;
109             int16_t value;
110             int16_t _pad;
111             } QueueInt16Slot; /* 8 bytes */
112              
113             /* Str slot: arena pointer + length + skip (for FIFO arena free) */
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on pop (includes wrap waste) */
118             uint32_t prev_wpos; /* arena_wpos before this push (for pop_back rollback) */
119             } QueueStrSlot; /* 16 bytes */
120              
121             #define QUEUE_STR_UTF8_FLAG 0x80000000U
122             #define QUEUE_STR_LEN_MASK 0x7FFFFFFFU
123              
124             /* ================================================================
125             * Process-local handle
126             * ================================================================ */
127              
128             typedef struct {
129             QueueHeader *hdr;
130             void *slots; /* QueueIntSlot* or QueueStrSlot* */
131             char *arena; /* NULL for int mode */
132             size_t mmap_size;
133             uint32_t capacity;
134             uint32_t cap_mask; /* capacity - 1 */
135             uint64_t arena_cap;
136             char *copy_buf; /* for str pop: buffer to copy string before unlock */
137             uint32_t copy_buf_cap;
138             char *path;
139             int notify_fd; /* eventfd for event-loop integration, -1 if disabled */
140             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
141             } QueueHandle;
142              
143             /* ================================================================
144             * Utility
145             * ================================================================ */
146              
147 99           static inline uint32_t queue_next_pow2(uint32_t v) {
148 99 100         if (v < 2) return 2;
149 97 50         if (v > 0x80000000U) return 0;
150 97           v--;
151 97           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
152 97           return v + 1;
153             }
154              
155 5           static inline void queue_spin_pause(void) {
156             #if defined(__x86_64__) || defined(__i386__)
157 5           __asm__ volatile("pause" ::: "memory");
158             #elif defined(__aarch64__)
159             __asm__ volatile("yield" ::: "memory");
160             #else
161             __asm__ volatile("" ::: "memory");
162             #endif
163 5           }
164              
165 189           static inline int queue_ensure_copy_buf(QueueHandle *h, uint32_t needed) {
166 189 100         if (needed <= h->copy_buf_cap) return 1;
167 28 100         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
168 43 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
169 28           char *nb = (char *)realloc(h->copy_buf, ns);
170 28 50         if (!nb) return 0;
171 28           h->copy_buf = nb;
172 28           h->copy_buf_cap = ns;
173 28           return 1;
174             }
175              
176             /* ================================================================
177             * Futex helpers
178             * ================================================================ */
179              
180             #define QUEUE_MUTEX_WRITER_BIT 0x80000000U
181             #define QUEUE_MUTEX_PID_MASK 0x7FFFFFFFU
182             #define QUEUE_MUTEX_VAL(pid) (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))
183              
184 0           static inline int queue_pid_alive(uint32_t pid) {
185 0 0         if (pid == 0) return 1;
186 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
187             }
188              
189             static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };
190              
191 0           static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
192 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
193             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
194 0           return;
195 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
196 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
197 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
198             }
199              
200 419           static inline void queue_mutex_lock(QueueHeader *hdr) {
201 419           uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
202 424           for (int spin = 0; ; spin++) {
203 424           uint32_t expected = 0;
204 424 100         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
205             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
206 419           return;
207 5 50         if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
208 5           queue_spin_pause();
209 5           continue;
210             }
211 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
212 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
213 0 0         if (cur != 0) {
214 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
215             &queue_lock_timeout, NULL, 0);
216 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
217 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
218 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
219 0 0         if (val >= QUEUE_MUTEX_WRITER_BIT) {
220 0           uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
221 0 0         if (!queue_pid_alive(pid))
222 0           queue_recover_stale_mutex(hdr, val);
223             }
224 0           spin = 0;
225 0           continue;
226             }
227             }
228 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
229 0           spin = 0;
230             }
231             }
232              
233 419           static inline void queue_mutex_unlock(QueueHeader *hdr) {
234 419           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
235 419 100         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
236 1           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
237 419           }
238              
239             /* Wake blocked consumers (after push) */
240 448           static inline void queue_wake_consumers(QueueHeader *hdr) {
241 448 50         if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
242 0           __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
243 0           syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
244             }
245 448           }
246              
247             /* Wake blocked producers (after pop) */
248 628           static inline void queue_wake_producers(QueueHeader *hdr) {
249 628 50         if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
250 0           __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
251 0           syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
252             }
253 628           }
254              
255             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
256 26           static inline int queue_remaining_time(const struct timespec *deadline,
257             struct timespec *remaining) {
258             struct timespec now;
259 26           clock_gettime(CLOCK_MONOTONIC, &now);
260 26           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
261 26           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
262 26 100         if (remaining->tv_nsec < 0) {
263 17           remaining->tv_sec--;
264 17           remaining->tv_nsec += 1000000000L;
265             }
266 26           return remaining->tv_sec >= 0;
267             }
268              
269             /* Convert timeout in seconds (double) to absolute deadline */
270 25           static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
271 25           clock_gettime(CLOCK_MONOTONIC, deadline);
272 25           deadline->tv_sec += (time_t)timeout;
273 25           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
274 25 50         if (deadline->tv_nsec >= 1000000000L) {
275 0           deadline->tv_sec++;
276 0           deadline->tv_nsec -= 1000000000L;
277             }
278 25           }
279              
280             /* ================================================================
281             * Create / Open / Close
282             * ================================================================ */
283              
284             #define QUEUE_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, QUEUE_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
285              
286 90           static inline void queue_init_new_header(void *base, uint32_t cap, uint64_t arena_cap,
287             uint32_t slots_off, uint32_t arena_off,
288             uint32_t mode, uint64_t total_size) {
289 90           QueueHeader *hdr = (QueueHeader *)base;
290 90           memset(hdr, 0, sizeof(QueueHeader));
291 90           hdr->magic = QUEUE_MAGIC;
292 90           hdr->version = QUEUE_VERSION;
293 90           hdr->mode = mode;
294 90           hdr->capacity = cap;
295 90           hdr->total_size = total_size;
296 90           hdr->slots_off = slots_off;
297 90           hdr->arena_off = arena_off;
298 90           hdr->arena_cap = arena_cap;
299             #define INIT_SEQ(STYPE, C) do { \
300             STYPE *s = (STYPE *)((char *)base + slots_off); \
301             for (uint32_t _i = 0; _i < (C); _i++) s[_i].sequence = _i; \
302             } while(0)
303 2366 100         if (mode == QUEUE_MODE_INT) INIT_SEQ(QueueIntSlot, cap);
    100          
304 1497 100         else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, cap);
    100          
305 1113 100         else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, cap);
    100          
306             #undef INIT_SEQ
307 90           __atomic_thread_fence(__ATOMIC_SEQ_CST);
308 90           }
309              
310 87           static QueueHandle *queue_create(const char *path, uint32_t capacity,
311             uint32_t mode, uint64_t arena_cap_hint,
312             char *errbuf) {
313 87 50         if (errbuf) errbuf[0] = '\0';
314              
315 87           uint32_t cap = queue_next_pow2(capacity);
316 87 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
317 87 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
318              
319 87           uint64_t slots_off = sizeof(QueueHeader);
320 87           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
321 135 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
322 88 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
323 40 100         : sizeof(QueueStrSlot);
324 87           uint64_t arena_off = 0, arena_cap = 0;
325             uint64_t total_size;
326              
327 87 100         if (mode == QUEUE_MODE_STR) {
328 37           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
329 37           arena_off = (slots_end + 7) & ~(uint64_t)7;
330 37           arena_cap = arena_cap_hint;
331 37 100         if (arena_cap < 4096) arena_cap = 4096;
332 37 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
333 37           total_size = arena_off + arena_cap;
334             } else {
335 50           total_size = slots_off + (uint64_t)cap * slot_size;
336             }
337              
338 87           int anonymous = (path == NULL);
339             size_t map_size;
340             void *base;
341              
342 87 100         if (anonymous) {
343             /* Anonymous shared mmap — fork-inherited, no filesystem */
344 21           map_size = (size_t)total_size;
345 21           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
346             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
347 21 50         if (base == MAP_FAILED) {
348 0 0         QUEUE_ERR("mmap(anonymous): %s", strerror(errno));
349 0           return NULL;
350             }
351 21           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
352             } else {
353             /* File-backed shared mmap */
354 66           int fd = open(path, O_RDWR | O_CREAT, 0666);
355 70 50         if (fd < 0) { QUEUE_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
356              
357 66 50         if (flock(fd, LOCK_EX) < 0) {
358 0 0         QUEUE_ERR("flock(%s): %s", path, strerror(errno));
359 0           close(fd); return NULL;
360             }
361              
362             struct stat st;
363 66 50         if (fstat(fd, &st) < 0) {
364 0 0         QUEUE_ERR("fstat(%s): %s", path, strerror(errno));
365 0           flock(fd, LOCK_UN); close(fd); return NULL;
366             }
367              
368 66           int is_new = (st.st_size == 0);
369              
370 66 100         if (!is_new && (uint64_t)st.st_size < sizeof(QueueHeader)) {
    50          
371 0 0         QUEUE_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
372 0           flock(fd, LOCK_UN); close(fd); return NULL;
373             }
374              
375 66 100         if (is_new) {
376 57 50         if (ftruncate(fd, (off_t)total_size) < 0) {
377 0 0         QUEUE_ERR("ftruncate(%s): %s", path, strerror(errno));
378 0           flock(fd, LOCK_UN); close(fd); return NULL;
379             }
380             }
381              
382 66 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
383 66           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
384 66 50         if (base == MAP_FAILED) {
385 0 0         QUEUE_ERR("mmap(%s): %s", path, strerror(errno));
386 0           flock(fd, LOCK_UN); close(fd); return NULL;
387             }
388              
389 66           QueueHeader *hdr = (QueueHeader *)base;
390              
391 66 100         if (!is_new) {
392             /* Validate existing file */
393 27           int valid = (hdr->magic == QUEUE_MAGIC &&
394 9 50         hdr->version == QUEUE_VERSION &&
395 9 100         hdr->mode == mode &&
396 5 50         hdr->capacity > 0 &&
397 5 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
398 5 50         hdr->total_size == (uint64_t)st.st_size &&
399 23 50         hdr->slots_off == sizeof(QueueHeader) &&
    50          
400 5 50         hdr->capacity <= (hdr->total_size - hdr->slots_off) / slot_size);
401 9 100         if (mode == QUEUE_MODE_STR && valid) {
    100          
402 2           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
403 4 50         valid = (hdr->arena_off >= slots_end &&
404 2 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
405             }
406 9 100         if (!valid) {
407 4 50         QUEUE_ERR("%s: invalid or incompatible queue file", path);
408 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
409             }
410 5           cap = hdr->capacity;
411 5           arena_cap = hdr->arena_cap;
412 5           flock(fd, LOCK_UN);
413 5           close(fd);
414 5           goto setup_handle;
415             }
416              
417 57           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
418 57           flock(fd, LOCK_UN);
419 57           close(fd);
420             }
421              
422 83           setup_handle:;
423             {
424 83           QueueHeader *hdr = (QueueHeader *)base;
425 83           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
426 83 50         if (!h) { munmap(base, map_size); return NULL; }
427              
428 83           h->hdr = hdr;
429 83           h->slots = (char *)base + hdr->slots_off;
430 83 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
431 83           h->mmap_size = map_size;
432 83           h->capacity = cap;
433 83           h->cap_mask = cap - 1;
434 83           h->arena_cap = arena_cap;
435 83 100         h->path = path ? strdup(path) : NULL;
436 83           h->notify_fd = -1;
437 83           h->backing_fd = -1;
438              
439 83           return h;
440             }
441             }
442              
443             /* Create queue backed by memfd — shareable via fd passing (SCM_RIGHTS) */
444 12           static QueueHandle *queue_create_memfd(const char *name, uint32_t capacity,
445             uint32_t mode, uint64_t arena_cap_hint,
446             char *errbuf) {
447 12 50         if (errbuf) errbuf[0] = '\0';
448              
449 12           uint32_t cap = queue_next_pow2(capacity);
450 12 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
451 12 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
452              
453 12           uint64_t slots_off = sizeof(QueueHeader);
454 12           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
455 18 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
456 11 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
457 5 100         : sizeof(QueueStrSlot);
458 12           uint64_t arena_off = 0, arena_cap = 0, total_size;
459              
460 12 100         if (mode == QUEUE_MODE_STR) {
461 4           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
462 4           arena_off = (slots_end + 7) & ~(uint64_t)7;
463 4           arena_cap = arena_cap_hint;
464 4 50         if (arena_cap < 4096) arena_cap = 4096;
465 4 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
466 4           total_size = arena_off + arena_cap;
467             } else {
468 8           total_size = slots_off + (uint64_t)cap * slot_size;
469             }
470              
471 12 50         int fd = memfd_create(name ? name : "queue", MFD_CLOEXEC | MFD_ALLOW_SEALING);
472 12 50         if (fd < 0) { QUEUE_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
473              
474 12 50         if (ftruncate(fd, (off_t)total_size) < 0) {
475 0 0         QUEUE_ERR("ftruncate(memfd): %s", strerror(errno));
476 0           close(fd); return NULL;
477             }
478              
479             /* Seal size against ftruncate-based SIGBUS attacks via SCM_RIGHTS peers. */
480 12           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
481              
482 12           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
483 12 50         if (base == MAP_FAILED) {
484 0 0         QUEUE_ERR("mmap(memfd): %s", strerror(errno));
485 0           close(fd); return NULL;
486             }
487              
488 12           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
489              
490 12           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
491 12 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
492              
493 12           h->hdr = (QueueHeader *)base;
494 12           h->slots = (char *)base + slots_off;
495 12 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + arena_off : NULL;
496 12           h->mmap_size = (size_t)total_size;
497 12           h->capacity = cap;
498 12           h->cap_mask = cap - 1;
499 12           h->arena_cap = arena_cap;
500 12           h->path = NULL;
501 12           h->notify_fd = -1;
502 12           h->backing_fd = fd;
503              
504 12           return h;
505             }
506              
507             /* Open queue from an existing fd (memfd received via SCM_RIGHTS or dup) */
508 2           static QueueHandle *queue_open_fd(int fd, uint32_t mode, char *errbuf) {
509 2 50         if (errbuf) errbuf[0] = '\0';
510              
511             struct stat st;
512 2 50         if (fstat(fd, &st) < 0) {
513 0 0         QUEUE_ERR("fstat(fd=%d): %s", fd, strerror(errno));
514 0           return NULL;
515             }
516              
517 2 50         if ((uint64_t)st.st_size < sizeof(QueueHeader)) {
518 0 0         QUEUE_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
519 0           return NULL;
520             }
521              
522 2           size_t map_size = (size_t)st.st_size;
523 2           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
524 2 50         if (base == MAP_FAILED) {
525 0 0         QUEUE_ERR("mmap(fd=%d): %s", fd, strerror(errno));
526 0           return NULL;
527             }
528              
529 2           QueueHeader *hdr = (QueueHeader *)base;
530 2           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
531 3 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
532 2 50         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
533 1 50         : sizeof(QueueStrSlot);
534 6           int valid = (hdr->magic == QUEUE_MAGIC &&
535 2 50         hdr->version == QUEUE_VERSION &&
536 2 50         hdr->mode == mode &&
537 2 50         hdr->capacity > 0 &&
538 2 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
539 2 50         hdr->total_size == (uint64_t)st.st_size &&
540 6 50         hdr->slots_off == sizeof(QueueHeader) &&
    50          
541 2 50         hdr->capacity <= (hdr->total_size - hdr->slots_off) / slot_size);
542 2 100         if (mode == QUEUE_MODE_STR && valid) {
    50          
543 1           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
544 2 50         valid = (hdr->arena_off >= slots_end &&
545 1 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
546             }
547 2 50         if (!valid) {
548 0 0         QUEUE_ERR("fd %d: invalid or incompatible queue", fd);
549 0           munmap(base, map_size);
550 0           return NULL;
551             }
552              
553             /* Dup the fd so caller retains ownership of the original */
554 2           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
555 2 50         if (myfd < 0) {
556 0 0         QUEUE_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
557 0           munmap(base, map_size);
558 0           return NULL;
559             }
560              
561 2           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
562 2 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
563              
564 2           h->hdr = hdr;
565 2           h->slots = (char *)base + hdr->slots_off;
566 2 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
567 2           h->mmap_size = map_size;
568 2           h->capacity = hdr->capacity;
569 2           h->cap_mask = hdr->capacity - 1;
570 2           h->arena_cap = hdr->arena_cap;
571 2           h->path = NULL;
572 2           h->notify_fd = -1;
573 2           h->backing_fd = myfd;
574              
575 2           return h;
576             }
577              
578 97           static void queue_destroy(QueueHandle *h) {
579 97 50         if (!h) return;
580 97 100         if (h->notify_fd >= 0) close(h->notify_fd);
581 97 100         if (h->backing_fd >= 0) close(h->backing_fd);
582 97 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
583 97           free(h->copy_buf);
584 97           free(h->path);
585 97           free(h);
586             }
587              
588             /* ================================================================
589             * Int queue macro template: Vyukov bounded MPMC (lock-free)
590             *
591             * DEFINE_INT_QUEUE(prefix, SlotType, ValType, SeqType, DiffType)
592             * generates: queue__try_push, try_pop, push_wait, pop_wait,
593             * peek, size, clear
594             * ================================================================ */
595              
596             #define DEFINE_INT_QUEUE(PFX, SLOT, VTYPE, STYPE, DTYPE) \
597             \
598             static inline int queue_##PFX##_try_push(QueueHandle *h, VTYPE value) { \
599             QueueHeader *hdr = h->hdr; \
600             SLOT *slots = (SLOT *)h->slots; \
601             uint32_t mask = h->cap_mask; \
602             uint64_t pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
603             for (;;) { \
604             SLOT *slot = &slots[pos & mask]; \
605             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
606             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)pos; \
607             if (diff == 0) { \
608             if (__atomic_compare_exchange_n(&hdr->tail, &pos, pos + 1, \
609             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
610             slot->value = value; \
611             __atomic_store_n(&slot->sequence, (STYPE)(pos + 1), \
612             __ATOMIC_RELEASE); \
613             __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED); \
614             queue_wake_consumers(hdr); \
615             return 1; \
616             } \
617             } else if (diff < 0) { \
618             __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED); \
619             return 0; \
620             } else { \
621             pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
622             } \
623             } \
624             } \
625             \
626             static inline int queue_##PFX##_try_pop(QueueHandle *h, VTYPE *value) { \
627             QueueHeader *hdr = h->hdr; \
628             SLOT *slots = (SLOT *)h->slots; \
629             uint32_t mask = h->cap_mask; \
630             uint64_t pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
631             for (;;) { \
632             SLOT *slot = &slots[pos & mask]; \
633             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
634             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)(pos + 1); \
635             if (diff == 0) { \
636             if (__atomic_compare_exchange_n(&hdr->head, &pos, pos + 1, \
637             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
638             *value = slot->value; \
639             __atomic_store_n(&slot->sequence, \
640             (STYPE)(pos + h->capacity), __ATOMIC_RELEASE); \
641             __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED); \
642             queue_wake_producers(hdr); \
643             return 1; \
644             } \
645             } else if (diff < 0) { \
646             __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED); \
647             return 0; \
648             } else { \
649             pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
650             } \
651             } \
652             } \
653             \
654             static int queue_##PFX##_push_wait(QueueHandle *h, VTYPE value, \
655             double timeout) { \
656             if (queue_##PFX##_try_push(h, value)) return 1; \
657             if (timeout == 0) return 0; \
658             QueueHeader *hdr = h->hdr; \
659             struct timespec deadline, remaining; \
660             int has_deadline = (timeout > 0); \
661             if (has_deadline) queue_make_deadline(timeout, &deadline); \
662             for (;;) { \
663             uint32_t fseq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE); \
664             if (queue_##PFX##_try_push(h, value)) return 1; \
665             __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
666             struct timespec *pts = NULL; \
667             if (has_deadline) { \
668             if (!queue_remaining_time(&deadline, &remaining)) { \
669             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
670             return 0; \
671             } \
672             pts = &remaining; \
673             } \
674             long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, \
675             fseq, pts, NULL, 0); \
676             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
677             if (queue_##PFX##_try_push(h, value)) return 1; \
678             if (rc == -1 && errno == ETIMEDOUT) return 0; \
679             } \
680             } \
681             \
682             static int queue_##PFX##_pop_wait(QueueHandle *h, VTYPE *value, \
683             double timeout) { \
684             if (queue_##PFX##_try_pop(h, value)) return 1; \
685             if (timeout == 0) return 0; \
686             QueueHeader *hdr = h->hdr; \
687             struct timespec deadline, remaining; \
688             int has_deadline = (timeout > 0); \
689             if (has_deadline) queue_make_deadline(timeout, &deadline); \
690             for (;;) { \
691             uint32_t fseq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE); \
692             if (queue_##PFX##_try_pop(h, value)) return 1; \
693             __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
694             struct timespec *pts = NULL; \
695             if (has_deadline) { \
696             if (!queue_remaining_time(&deadline, &remaining)) { \
697             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
698             return 0; \
699             } \
700             pts = &remaining; \
701             } \
702             long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, \
703             fseq, pts, NULL, 0); \
704             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
705             if (queue_##PFX##_try_pop(h, value)) return 1; \
706             if (rc == -1 && errno == ETIMEDOUT) return 0; \
707             } \
708             } \
709             \
710             static inline int queue_##PFX##_peek(QueueHandle *h, VTYPE *value) { \
711             SLOT *slots = (SLOT *)h->slots; \
712             uint64_t pos = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE); \
713             SLOT *slot = &slots[pos & h->cap_mask]; \
714             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
715             if ((DTYPE)seq - (DTYPE)(STYPE)(pos + 1) == 0) { \
716             *value = slot->value; \
717             return 1; \
718             } \
719             return 0; \
720             } \
721             \
722             static inline uint64_t queue_##PFX##_size(QueueHandle *h) { \
723             uint64_t tail = __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED); \
724             uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_RELAXED); \
725             return tail - head; \
726             } \
727             \
728             static void queue_##PFX##_clear(QueueHandle *h) { \
729             VTYPE tmp; \
730             while (queue_##PFX##_try_pop(h, &tmp)) {} \
731             }
732              
733             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
734 784 100         DEFINE_INT_QUEUE(int, QueueIntSlot, int64_t, uint64_t, int64_t)
  49 100          
  28 100          
  9 50          
  114 50          
  15 50          
  349 50          
  220 50          
    100          
    50          
    50          
    100          
    100          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
735              
736             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
737 310 100         DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)
  17 50          
  6 100          
  1 50          
  103 50          
  0 50          
  145 50          
  38 50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    100          
    50          
    50          
738              
739             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
740 26 0         DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)
  0 0          
  1 0          
  0 0          
  0 0          
  0 0          
  13 0          
  12 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    50          
    50          
    0          
741              
742             /* ================================================================
743             * Str queue: mutex-protected with circular arena
744             * ================================================================ */
745              
746             /* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
747 190           static inline int queue_str_push_locked(QueueHandle *h, const char *str,
748             uint32_t len, bool utf8) {
749 190           QueueHeader *hdr = h->hdr;
750              
751 190 50         if (len > QUEUE_STR_LEN_MASK) return -2;
752              
753 190 100         if (hdr->tail - hdr->head >= h->capacity) {
754 11           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
755 11           return 0;
756             }
757              
758 179           uint32_t alloc = (len + 7) & ~7u;
759 179 100         if (alloc == 0) alloc = 8;
760 179 50         if (alloc > h->arena_cap) return -2;
761 179           uint32_t saved_wpos = hdr->arena_wpos;
762 179           uint32_t pos = saved_wpos;
763 179           uint64_t skip = alloc;
764              
765 179 100         if ((uint64_t)pos + alloc > h->arena_cap) {
766 2           skip += h->arena_cap - pos;
767 2           pos = 0;
768             }
769              
770 179 100         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
771 1 50         if (hdr->tail == hdr->head) {
772 0           hdr->arena_wpos = 0;
773 0           hdr->arena_used = 0;
774 0           saved_wpos = 0;
775 0           pos = 0;
776 0           skip = alloc;
777             } else {
778 1           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
779 1           return 0;
780             }
781             }
782              
783 178           memcpy(h->arena + pos, str, len);
784              
785 178           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
786 178           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
787 178           slot->arena_off = pos;
788 178 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
789 178           slot->arena_skip = (uint32_t)skip;
790 178           slot->prev_wpos = saved_wpos;
791              
792 178           hdr->arena_wpos = pos + alloc;
793 178           hdr->arena_used += (uint32_t)skip;
794 178           hdr->tail++;
795 178           __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
796 178           return 1;
797             }
798              
799 184           static inline int queue_str_try_push(QueueHandle *h, const char *str,
800             uint32_t len, bool utf8) {
801 184           queue_mutex_lock(h->hdr);
802 184           int r = queue_str_push_locked(h, str, len, utf8);
803 184           queue_mutex_unlock(h->hdr);
804 184 100         if (r == 1) queue_wake_consumers(h->hdr);
805 184           return r;
806             }
807              
808             /* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
809 194           static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
810             uint32_t *out_len, bool *out_utf8) {
811 194           QueueHeader *hdr = h->hdr;
812              
813 194 100         if (hdr->tail == hdr->head) {
814 30           __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
815 30           return 0;
816             }
817              
818 164           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
819 164           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
820              
821 164           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
822 164           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
823              
824 164 50         if (!queue_ensure_copy_buf(h, len + 1))
825 0           return -1;
826 164 100         if (len > 0)
827 162           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
828 164           h->copy_buf[len] = '\0';
829 164           *out_str = h->copy_buf;
830 164           *out_len = len;
831              
832 164           hdr->arena_used -= slot->arena_skip;
833 164 100         if (hdr->arena_used == 0)
834 49           hdr->arena_wpos = 0;
835              
836 164           hdr->head++;
837 164           __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
838 164           return 1;
839             }
840              
841 163           static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
842             uint32_t *out_len, bool *out_utf8) {
843 163           queue_mutex_lock(h->hdr);
844 163           int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
845 163           queue_mutex_unlock(h->hdr);
846 163 100         if (r == 1) queue_wake_producers(h->hdr);
847 163           return r;
848             }
849              
850 8           static int queue_str_push_wait(QueueHandle *h, const char *str,
851             uint32_t len, bool utf8, double timeout) {
852 8           int r = queue_str_try_push(h, str, len, utf8);
853 8 100         if (r != 0) return r; /* 1 = success, -2 = too long */
854 3 50         if (timeout == 0) return 0;
855              
856 3           QueueHeader *hdr = h->hdr;
857             struct timespec deadline, remaining;
858 3           int has_deadline = (timeout > 0);
859 3 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
860              
861 0           for (;;) {
862 3           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
863 3           r = queue_str_try_push(h, str, len, utf8);
864 3 50         if (r != 0) return r;
865              
866 3           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
867 3           struct timespec *pts = NULL;
868 3 50         if (has_deadline) {
869 3 50         if (!queue_remaining_time(&deadline, &remaining)) {
870 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
871 0           return 0;
872             }
873 3           pts = &remaining;
874             }
875 3           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
876 3           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
877              
878 3           r = queue_str_try_push(h, str, len, utf8);
879 3 100         if (r != 0) return r;
880 2 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
881             }
882             }
883              
884 57           static int queue_str_pop_wait(QueueHandle *h, const char **out_str,
885             uint32_t *out_len, bool *out_utf8, double timeout) {
886 57           int r = queue_str_try_pop(h, out_str, out_len, out_utf8);
887 57 100         if (r != 0) return r;
888 7 50         if (timeout == 0) return 0;
889              
890 7           QueueHeader *hdr = h->hdr;
891             struct timespec deadline, remaining;
892 7           int has_deadline = (timeout > 0);
893 7 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
894              
895 1           for (;;) {
896 8           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
897 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
898 8 50         if (r != 0) return r;
899              
900 8           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
901 8           struct timespec *pts = NULL;
902 8 50         if (has_deadline) {
903 8 50         if (!queue_remaining_time(&deadline, &remaining)) {
904 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
905 0           return 0;
906             }
907 8           pts = &remaining;
908             }
909 8           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
910 8           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
911              
912 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
913 8 100         if (r != 0) return r;
914 3 100         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
915             }
916             }
917              
918 26           static inline uint64_t queue_str_size(QueueHandle *h) {
919 26           QueueHeader *hdr = h->hdr;
920 26           uint64_t tail = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
921 26           uint64_t head = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
922 26           return tail - head; /* unsigned wrap is correct for push_front (head > tail) */
923             }
924              
925 7           static void queue_str_clear(QueueHandle *h) {
926 7           QueueHeader *hdr = h->hdr;
927 7           queue_mutex_lock(hdr);
928 7           hdr->head = 0;
929 7           hdr->tail = 0;
930 7           hdr->arena_wpos = 0;
931 7           hdr->arena_used = 0;
932 7           queue_mutex_unlock(hdr);
933             /* clear is a bulk transition — wake every blocked producer and
934             * consumer so they re-evaluate state, not just one of each. */
935 7 50         if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
936 0           __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
937 0           syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
938             }
939 7 50         if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
940 0           __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
941 0           syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
942             }
943 7           }
944              
945             /* Peek: read front element without consuming (exact, under mutex). */
946 7           static inline int queue_str_peek(QueueHandle *h, const char **out_str,
947             uint32_t *out_len, bool *out_utf8) {
948 7           QueueHeader *hdr = h->hdr;
949 7           queue_mutex_lock(hdr);
950 7 100         if (hdr->tail == hdr->head) {
951 1           queue_mutex_unlock(hdr);
952 1           return 0;
953             }
954 6           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
955 6           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
956 6           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
957 6           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
958 6 50         if (!queue_ensure_copy_buf(h, len + 1)) {
959 0           queue_mutex_unlock(hdr);
960 0           return -1;
961             }
962 6 50         if (len > 0)
963 6           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
964 6           h->copy_buf[len] = '\0';
965 6           *out_str = h->copy_buf;
966 6           *out_len = len;
967 6           queue_mutex_unlock(hdr);
968 6           return 1;
969             }
970              
971             /* Push to front of queue (requeue). Str only — Int is strictly FIFO. */
972 21           static inline int queue_str_push_front(QueueHandle *h, const char *str,
973             uint32_t len, bool utf8) {
974 21           QueueHeader *hdr = h->hdr;
975 21           queue_mutex_lock(hdr);
976              
977 21 50         if (len > QUEUE_STR_LEN_MASK) {
978 0           queue_mutex_unlock(hdr);
979 0           return -2;
980             }
981              
982 21           uint64_t size = hdr->tail - hdr->head;
983 21 100         if (size >= h->capacity) {
984 6           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
985 6           queue_mutex_unlock(hdr);
986 6           return 0;
987             }
988              
989 15           uint32_t alloc = (len + 7) & ~7u;
990 15 50         if (alloc == 0) alloc = 8;
991 15 50         if (alloc > h->arena_cap) {
992 0           queue_mutex_unlock(hdr);
993 0           return -2;
994             }
995 15           uint32_t saved_wpos = hdr->arena_wpos;
996 15           uint32_t pos = saved_wpos;
997 15           uint64_t skip = alloc;
998              
999 15 50         if ((uint64_t)pos + alloc > h->arena_cap) {
1000 0           skip += h->arena_cap - pos;
1001 0           pos = 0;
1002             }
1003              
1004 15 50         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
1005 0 0         if (hdr->tail == hdr->head) {
1006 0           hdr->arena_wpos = 0;
1007 0           hdr->arena_used = 0;
1008 0           saved_wpos = 0;
1009 0           pos = 0;
1010 0           skip = alloc;
1011             } else {
1012 0           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
1013 0           queue_mutex_unlock(hdr);
1014 0           return 0;
1015             }
1016             }
1017              
1018 15           memcpy(h->arena + pos, str, len);
1019              
1020 15           hdr->head--;
1021 15           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
1022 15           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1023 15           slot->arena_off = pos;
1024 15 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
1025 15           slot->arena_skip = (uint32_t)skip;
1026 15           slot->prev_wpos = saved_wpos;
1027              
1028 15           hdr->arena_wpos = pos + alloc;
1029 15           hdr->arena_used += (uint32_t)skip;
1030 15           __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
1031              
1032 15           queue_mutex_unlock(hdr);
1033 15           queue_wake_consumers(hdr);
1034 15           return 1;
1035             }
1036              
1037 3           static int queue_str_push_front_wait(QueueHandle *h, const char *str,
1038             uint32_t len, bool utf8, double timeout) {
1039 3           int r = queue_str_push_front(h, str, len, utf8);
1040 3 100         if (r != 0) return r;
1041 2 50         if (timeout == 0) return 0;
1042              
1043 2           QueueHeader *hdr = h->hdr;
1044             struct timespec deadline, remaining;
1045 2           int has_deadline = (timeout > 0);
1046 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1047              
1048 0           for (;;) {
1049 2           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
1050 2           r = queue_str_push_front(h, str, len, utf8);
1051 2 50         if (r != 0) return r;
1052              
1053 2           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1054 2           struct timespec *pts = NULL;
1055 2 50         if (has_deadline) {
1056 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1057 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1058 0           return 0;
1059             }
1060 2           pts = &remaining;
1061             }
1062 2           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1063 2           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1064              
1065 2           r = queue_str_push_front(h, str, len, utf8);
1066 2 100         if (r != 0) return r;
1067 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1068             }
1069             }
1070              
1071             /* Pop from back (tail). Str only — undoes the most recent push. */
1072 26           static inline int queue_str_pop_back(QueueHandle *h, const char **out_str,
1073             uint32_t *out_len, bool *out_utf8) {
1074 26           QueueHeader *hdr = h->hdr;
1075 26           queue_mutex_lock(hdr);
1076              
1077 26 100         if (hdr->tail == hdr->head) {
1078 7           __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
1079 7           queue_mutex_unlock(hdr);
1080 7           return 0;
1081             }
1082              
1083 19           hdr->tail--;
1084 19           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
1085 19           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1086              
1087 19           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
1088 19           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
1089              
1090 19 50         if (!queue_ensure_copy_buf(h, len + 1)) {
1091 0           hdr->tail++; /* rollback */
1092 0           queue_mutex_unlock(hdr);
1093 0           return -1;
1094             }
1095 19 50         if (len > 0)
1096 19           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
1097 19           h->copy_buf[len] = '\0';
1098 19           *out_str = h->copy_buf;
1099 19           *out_len = len;
1100              
1101 19           hdr->arena_used -= slot->arena_skip;
1102             /* Restore arena_wpos to before this slot's push if it's the frontier.
1103             * prev_wpos correctly handles wrap waste — it's the pre-push state
1104             * including the original position before any wrap adjustment. */
1105             {
1106 19           uint32_t slot_alloc = (len + 7) & ~7u;
1107 19 50         if (slot_alloc == 0) slot_alloc = 8;
1108 19 100         if (slot->arena_off + slot_alloc == hdr->arena_wpos)
1109 16           hdr->arena_wpos = slot->prev_wpos;
1110             }
1111 19 100         if (hdr->arena_used == 0)
1112 7           hdr->arena_wpos = 0;
1113              
1114 19           __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
1115 19           queue_mutex_unlock(hdr);
1116 19           queue_wake_producers(hdr);
1117 19           return 1;
1118             }
1119              
1120 3           static int queue_str_pop_back_wait(QueueHandle *h, const char **out_str,
1121             uint32_t *out_len, bool *out_utf8, double timeout) {
1122 3           int r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1123 3 100         if (r != 0) return r;
1124 2 50         if (timeout == 0) return 0;
1125              
1126 2           QueueHeader *hdr = h->hdr;
1127             struct timespec deadline, remaining;
1128 2           int has_deadline = (timeout > 0);
1129 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1130              
1131 0           for (;;) {
1132 2           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
1133 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1134 2 50         if (r != 0) return r;
1135              
1136 2           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1137 2           struct timespec *pts = NULL;
1138 2 50         if (has_deadline) {
1139 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1140 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1141 0           return 0;
1142             }
1143 2           pts = &remaining;
1144             }
1145 2           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1146 2           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1147              
1148 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1149 2 100         if (r != 0) return r;
1150 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1151             }
1152             }
1153              
1154             /* msync — flush mmap to disk for crash durability */
1155 5           static inline int queue_sync(QueueHandle *h) {
1156 5           return msync(h->hdr, h->mmap_size, MS_SYNC);
1157             }
1158              
1159             /* ================================================================
1160             * eventfd — event-loop integration
1161             * ================================================================ */
1162              
1163 11           static inline int queue_eventfd_create(QueueHandle *h) {
1164 11 100         if (h->notify_fd >= 0) return h->notify_fd;
1165 10           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1166 10           return h->notify_fd;
1167             }
1168              
1169 1           static inline void queue_eventfd_set(QueueHandle *h, int fd) {
1170 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    50          
1171 1           close(h->notify_fd);
1172 1           h->notify_fd = fd;
1173 1           }
1174              
1175             /* Signal that data is available. Called after successful push. */
1176 8           static inline void queue_notify(QueueHandle *h) {
1177 8 100         if (h->notify_fd >= 0) {
1178 7           uint64_t one = 1;
1179 7           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1180             }
1181 8           }
1182              
1183             /* Consume notification counter. Call from event-loop callback before pop.
1184             * Returns the accumulated counter, or -1 on error / no eventfd. */
1185 10           static inline int64_t queue_eventfd_consume(QueueHandle *h) {
1186 10 50         if (h->notify_fd < 0) return -1;
1187 10           uint64_t val = 0;
1188 10 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1189 10           return (int64_t)val;
1190             }
1191              
1192             #endif /* QUEUE_H */