File Coverage

queue.h
Criterion Covered Total %
statement 512 632 81.0
branch 304 578 52.6
condition n/a
subroutine n/a
pod n/a
total 816 1210 67.4


line stmt bran cond sub pod time code
1             /*
2             * queue.h -- Shared-memory MPMC queue for Linux
3             *
4             * Two variants:
5             * Int — lock-free Vyukov bounded MPMC queue (int64 values)
6             * Str — futex-mutex protected queue with circular arena (byte strings)
7             *
8             * Both use file-backed mmap(MAP_SHARED) for cross-process sharing,
9             * futex for blocking wait, and PID-based stale lock recovery.
10             */
11              
12             #ifndef QUEUE_H
13             #define QUEUE_H
14              
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31              
32             /* ================================================================
33             * Constants
34             * ================================================================ */
35              
36             #define QUEUE_MAGIC 0x51554531U /* "QUE1" */
37             #define QUEUE_VERSION 1
38             #define QUEUE_MODE_INT 0
39             #define QUEUE_MODE_STR 1
40             #define QUEUE_MODE_INT32 2
41             #define QUEUE_MODE_INT16 3
42             #define QUEUE_ERR_BUFLEN 256
43             #define QUEUE_SPIN_LIMIT 32
44             #define QUEUE_LOCK_TIMEOUT_SEC 2
45              
46             /* ================================================================
47             * Header (256 bytes = 4 cache lines, lives at start of mmap)
48             * ================================================================ */
49              
50             typedef struct {
51             /* ---- Cache line 0 (0-63): immutable after create ---- */
52             uint32_t magic; /* 0 */
53             uint32_t version; /* 4 */
54             uint32_t mode; /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
55             uint32_t capacity; /* 12: max elements (power of 2) */
56             uint64_t total_size; /* 16: mmap size */
57             uint64_t slots_off; /* 24: offset to slot array */
58             uint64_t arena_off; /* 32: str mode: offset to arena; int: 0 */
59             uint64_t arena_cap; /* 40: str mode: arena byte capacity; int: 0 */
60             uint8_t _pad0[16]; /* 48-63 */
61              
62             /* ---- Cache line 1 (64-127): head / consumer hot ---- */
63             uint64_t head; /* 64: consumer position */
64             uint32_t pop_waiters; /* 72: count of blocked consumers */
65             uint32_t pop_futex; /* 76: futex word for consumer wakeup */
66             uint8_t _pad1[48]; /* 80-127 */
67              
68             /* ---- Cache line 2 (128-191): tail / producer hot ---- */
69             uint64_t tail; /* 128: producer position */
70             uint32_t push_waiters; /* 136: count of blocked producers */
71             uint32_t push_futex; /* 140: futex word for producer wakeup */
72             uint8_t _pad2[48]; /* 144-191 */
73              
74             /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
75             uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
76             uint32_t mutex_waiters; /* 196 */
77             uint32_t arena_wpos; /* 200: str mode: next write position in arena */
78             uint32_t arena_used; /* 204: str mode: total arena bytes consumed */
79             uint64_t stat_push_ok; /* 208 */
80             uint64_t stat_pop_ok; /* 216 */
81             uint64_t stat_push_full; /* 224 */
82             uint64_t stat_pop_empty; /* 232 */
83             uint64_t stat_recoveries;/* 240 */
84             uint8_t _pad3[8]; /* 248-255 */
85             } QueueHeader;
86              
87             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
88             _Static_assert(sizeof(QueueHeader) == 256, "QueueHeader must be 256 bytes");
89             #endif
90              
91             /* ================================================================
92             * Slot types
93             * ================================================================ */
94              
95             /* Int slot: Vyukov MPMC sequence + value */
96             typedef struct {
97             uint64_t sequence;
98             int64_t value;
99             } QueueIntSlot; /* 16 bytes */
100              
101             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
102             typedef struct {
103             uint32_t sequence;
104             int32_t value;
105             } QueueInt32Slot; /* 8 bytes */
106              
107             typedef struct {
108             uint32_t sequence;
109             int16_t value;
110             int16_t _pad;
111             } QueueInt16Slot; /* 8 bytes */
112              
113             /* Str slot: arena pointer + length + skip (for FIFO arena free) */
114             typedef struct {
115             uint32_t arena_off;
116             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
117             uint32_t arena_skip; /* bytes to release from arena on pop (includes wrap waste) */
118             uint32_t prev_wpos; /* arena_wpos before this push (for pop_back rollback) */
119             } QueueStrSlot; /* 16 bytes */
120              
121             #define QUEUE_STR_UTF8_FLAG 0x80000000U
122             #define QUEUE_STR_LEN_MASK 0x7FFFFFFFU
123              
124             /* ================================================================
125             * Process-local handle
126             * ================================================================ */
127              
128             typedef struct {
129             QueueHeader *hdr;
130             void *slots; /* QueueIntSlot* or QueueStrSlot* */
131             char *arena; /* NULL for int mode */
132             size_t mmap_size;
133             uint32_t capacity;
134             uint32_t cap_mask; /* capacity - 1 */
135             uint64_t arena_cap;
136             char *copy_buf; /* for str pop: buffer to copy string before unlock */
137             uint32_t copy_buf_cap;
138             char *path;
139             int notify_fd; /* eventfd for event-loop integration, -1 if disabled */
140             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
141             int notify_fd_owned; /* 1 if we created notify_fd and must close it */
142             } QueueHandle;
143              
144             /* ================================================================
145             * Utility
146             * ================================================================ */
147              
148 99           static inline uint32_t queue_next_pow2(uint32_t v) {
149 99 100         if (v < 2) return 2;
150 97 50         if (v > 0x80000000U) return 0;
151 97           v--;
152 97           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
153 97           return v + 1;
154             }
155              
156 0           static inline void queue_spin_pause(void) {
157             #if defined(__x86_64__) || defined(__i386__)
158 0           __asm__ volatile("pause" ::: "memory");
159             #elif defined(__aarch64__)
160             __asm__ volatile("yield" ::: "memory");
161             #else
162             __asm__ volatile("" ::: "memory");
163             #endif
164 0           }
165              
166 189           static inline int queue_ensure_copy_buf(QueueHandle *h, uint32_t needed) {
167 189 100         if (needed <= h->copy_buf_cap) return 1;
168 28 100         uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
169 43 50         while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    100          
170 28           char *nb = (char *)realloc(h->copy_buf, ns);
171 28 50         if (!nb) return 0;
172 28           h->copy_buf = nb;
173 28           h->copy_buf_cap = ns;
174 28           return 1;
175             }
176              
177             /* ================================================================
178             * Futex helpers
179             * ================================================================ */
180              
181             #define QUEUE_MUTEX_WRITER_BIT 0x80000000U
182             #define QUEUE_MUTEX_PID_MASK 0x7FFFFFFFU
183             #define QUEUE_MUTEX_VAL(pid) (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))
184              
185 0           static inline int queue_pid_alive(uint32_t pid) {
186 0 0         if (pid == 0) return 1;
187 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
188             }
189              
190             static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };
191              
192 0           static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
193 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
194             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
195 0           return;
196 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
197 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
198 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
199             }
200              
201 419           static inline void queue_mutex_lock(QueueHeader *hdr) {
202 419           uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
203 419           for (int spin = 0; ; spin++) {
204 419           uint32_t expected = 0;
205 419 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
206             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
207 419           return;
208 0 0         if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
209 0           queue_spin_pause();
210 0           continue;
211             }
212 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
213 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
214 0 0         if (cur != 0) {
215 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
216             &queue_lock_timeout, NULL, 0);
217 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
218 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
219 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
220 0 0         if (val >= QUEUE_MUTEX_WRITER_BIT) {
221 0           uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
222 0 0         if (!queue_pid_alive(pid))
223 0           queue_recover_stale_mutex(hdr, val);
224             }
225 0           spin = 0;
226 0           continue;
227             }
228             }
229 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
230 0           spin = 0;
231             }
232             }
233              
234 419           static inline void queue_mutex_unlock(QueueHeader *hdr) {
235 419           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
236 419 100         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
237 1           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
238 419           }
239              
240             /* Wake up to `n` blocked consumers (after batch push). Each woken
241             * consumer pops at most one item, so batch publishers must wake `n`
242             * (not 1) to drain a multi-item commit without leaving consumers
243             * sleeping on still-available items. */
244 448           static inline void queue_wake_consumers_n(QueueHeader *hdr, uint32_t n) {
245 448 50         if (n == 0) return;
246 448 50         if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
247 0           __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
248 0           syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE,
249 0 0         n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
250             }
251             }
252              
253             /* Wake blocked consumers (after single push) */
254 446           static inline void queue_wake_consumers(QueueHeader *hdr) {
255 446           queue_wake_consumers_n(hdr, 1);
256 446           }
257              
258             /* Wake up to `n` blocked producers (after batch pop). See
259             * queue_wake_consumers_n for the batching rationale. */
260 627           static inline void queue_wake_producers_n(QueueHeader *hdr, uint32_t n) {
261 627 50         if (n == 0) return;
262 627 50         if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
263 0           __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
264 0           syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE,
265 0 0         n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
266             }
267             }
268              
269             /* Wake blocked producers (after single pop) */
270 619           static inline void queue_wake_producers(QueueHeader *hdr) {
271 619           queue_wake_producers_n(hdr, 1);
272 619           }
273              
274             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
275 26           static inline int queue_remaining_time(const struct timespec *deadline,
276             struct timespec *remaining) {
277             struct timespec now;
278 26           clock_gettime(CLOCK_MONOTONIC, &now);
279 26           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
280 26           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
281 26 100         if (remaining->tv_nsec < 0) {
282 17           remaining->tv_sec--;
283 17           remaining->tv_nsec += 1000000000L;
284             }
285 26           return remaining->tv_sec >= 0;
286             }
287              
288             /* Convert timeout in seconds (double) to absolute deadline */
289 25           static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
290 25           clock_gettime(CLOCK_MONOTONIC, deadline);
291 25           deadline->tv_sec += (time_t)timeout;
292 25           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
293 25 50         if (deadline->tv_nsec >= 1000000000L) {
294 0           deadline->tv_sec++;
295 0           deadline->tv_nsec -= 1000000000L;
296             }
297 25           }
298              
299             /* ================================================================
300             * Create / Open / Close
301             * ================================================================ */
302              
303             #define QUEUE_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, QUEUE_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
304              
305 90           static inline void queue_init_new_header(void *base, uint32_t cap, uint64_t arena_cap,
306             uint32_t slots_off, uint32_t arena_off,
307             uint32_t mode, uint64_t total_size) {
308 90           QueueHeader *hdr = (QueueHeader *)base;
309 90           memset(hdr, 0, sizeof(QueueHeader));
310 90           hdr->magic = QUEUE_MAGIC;
311 90           hdr->version = QUEUE_VERSION;
312 90           hdr->mode = mode;
313 90           hdr->capacity = cap;
314 90           hdr->total_size = total_size;
315 90           hdr->slots_off = slots_off;
316 90           hdr->arena_off = arena_off;
317 90           hdr->arena_cap = arena_cap;
318             #define INIT_SEQ(STYPE, C) do { \
319             STYPE *s = (STYPE *)((char *)base + slots_off); \
320             for (uint32_t _i = 0; _i < (C); _i++) s[_i].sequence = _i; \
321             } while(0)
322 2366 100         if (mode == QUEUE_MODE_INT) INIT_SEQ(QueueIntSlot, cap);
    100          
323 1497 100         else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, cap);
    100          
324 1113 100         else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, cap);
    100          
325             #undef INIT_SEQ
326 90           __atomic_thread_fence(__ATOMIC_SEQ_CST);
327 90           }
328              
329 87           static QueueHandle *queue_create(const char *path, uint32_t capacity,
330             uint32_t mode, uint64_t arena_cap_hint,
331             char *errbuf) {
332 87 50         if (errbuf) errbuf[0] = '\0';
333              
334 87           uint32_t cap = queue_next_pow2(capacity);
335 87 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
336 87 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
337              
338 87           uint64_t slots_off = sizeof(QueueHeader);
339 87           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
340 135 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
341 88 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
342 40 100         : sizeof(QueueStrSlot);
343 87           uint64_t arena_off = 0, arena_cap = 0;
344             uint64_t total_size;
345              
346 87 100         if (mode == QUEUE_MODE_STR) {
347 37           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
348 37           arena_off = (slots_end + 7) & ~(uint64_t)7;
349 37           arena_cap = arena_cap_hint;
350 37 100         if (arena_cap < 4096) arena_cap = 4096;
351 37 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
352 37           total_size = arena_off + arena_cap;
353             } else {
354 50           total_size = slots_off + (uint64_t)cap * slot_size;
355             }
356              
357 87           int anonymous = (path == NULL);
358             size_t map_size;
359             void *base;
360              
361 87 100         if (anonymous) {
362             /* Anonymous shared mmap — fork-inherited, no filesystem */
363 21           map_size = (size_t)total_size;
364 21           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
365             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
366 21 50         if (base == MAP_FAILED) {
367 0 0         QUEUE_ERR("mmap(anonymous): %s", strerror(errno));
368 0           return NULL;
369             }
370 21           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
371             } else {
372             /* File-backed shared mmap */
373 66           int fd = open(path, O_RDWR | O_CREAT, 0666);
374 70 50         if (fd < 0) { QUEUE_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
375              
376 66 50         if (flock(fd, LOCK_EX) < 0) {
377 0 0         QUEUE_ERR("flock(%s): %s", path, strerror(errno));
378 0           close(fd); return NULL;
379             }
380              
381             struct stat st;
382 66 50         if (fstat(fd, &st) < 0) {
383 0 0         QUEUE_ERR("fstat(%s): %s", path, strerror(errno));
384 0           flock(fd, LOCK_UN); close(fd); return NULL;
385             }
386              
387 66           int is_new = (st.st_size == 0);
388              
389 66 100         if (!is_new && (uint64_t)st.st_size < sizeof(QueueHeader)) {
    50          
390 0 0         QUEUE_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
391 0           flock(fd, LOCK_UN); close(fd); return NULL;
392             }
393              
394 66 100         if (is_new) {
395 57 50         if (ftruncate(fd, (off_t)total_size) < 0) {
396 0 0         QUEUE_ERR("ftruncate(%s): %s", path, strerror(errno));
397 0           flock(fd, LOCK_UN); close(fd); return NULL;
398             }
399             }
400              
401 66 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
402 66           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
403 66 50         if (base == MAP_FAILED) {
404 0 0         QUEUE_ERR("mmap(%s): %s", path, strerror(errno));
405 0           flock(fd, LOCK_UN); close(fd); return NULL;
406             }
407              
408 66           QueueHeader *hdr = (QueueHeader *)base;
409              
410 66 100         if (!is_new) {
411             /* Validate existing file */
412 27           int valid = (hdr->magic == QUEUE_MAGIC &&
413 9 50         hdr->version == QUEUE_VERSION &&
414 9 100         hdr->mode == mode &&
415 5 50         hdr->capacity > 0 &&
416 5 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
417 5 50         hdr->total_size == (uint64_t)st.st_size &&
418 23 50         hdr->slots_off == sizeof(QueueHeader) &&
    50          
419 5 50         hdr->capacity <= (hdr->total_size - hdr->slots_off) / slot_size);
420 9 100         if (mode == QUEUE_MODE_STR && valid) {
    100          
421 2           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
422 4 50         valid = (hdr->arena_off >= slots_end &&
423 2 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
424             }
425 9 100         if (!valid) {
426 4 50         QUEUE_ERR("%s: invalid or incompatible queue file", path);
427 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
428             }
429 5           cap = hdr->capacity;
430 5           arena_cap = hdr->arena_cap;
431 5           flock(fd, LOCK_UN);
432 5           close(fd);
433 5           goto setup_handle;
434             }
435              
436 57           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
437 57           flock(fd, LOCK_UN);
438 57           close(fd);
439             }
440              
441 83           setup_handle:;
442             {
443 83           QueueHeader *hdr = (QueueHeader *)base;
444 83           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
445 83 50         if (!h) { munmap(base, map_size); return NULL; }
446              
447 83           h->hdr = hdr;
448 83           h->slots = (char *)base + hdr->slots_off;
449 83 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
450 83           h->mmap_size = map_size;
451 83           h->capacity = cap;
452 83           h->cap_mask = cap - 1;
453 83           h->arena_cap = arena_cap;
454 83 100         h->path = path ? strdup(path) : NULL;
455 83           h->notify_fd = -1;
456 83           h->backing_fd = -1;
457              
458 83           return h;
459             }
460             }
461              
462             /* Create queue backed by memfd — shareable via fd passing (SCM_RIGHTS) */
463 12           static QueueHandle *queue_create_memfd(const char *name, uint32_t capacity,
464             uint32_t mode, uint64_t arena_cap_hint,
465             char *errbuf) {
466 12 50         if (errbuf) errbuf[0] = '\0';
467              
468 12           uint32_t cap = queue_next_pow2(capacity);
469 12 50         if (cap == 0) { QUEUE_ERR("invalid capacity"); return NULL; }
    0          
470 12 50         if (mode > QUEUE_MODE_INT16) { QUEUE_ERR("unknown mode %u", mode); return NULL; }
    0          
471              
472 12           uint64_t slots_off = sizeof(QueueHeader);
473 12           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
474 18 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
475 11 100         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
476 5 100         : sizeof(QueueStrSlot);
477 12           uint64_t arena_off = 0, arena_cap = 0, total_size;
478              
479 12 100         if (mode == QUEUE_MODE_STR) {
480 4           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
481 4           arena_off = (slots_end + 7) & ~(uint64_t)7;
482 4           arena_cap = arena_cap_hint;
483 4 50         if (arena_cap < 4096) arena_cap = 4096;
484 4 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
485 4           total_size = arena_off + arena_cap;
486             } else {
487 8           total_size = slots_off + (uint64_t)cap * slot_size;
488             }
489              
490 12 50         int fd = memfd_create(name ? name : "queue", MFD_CLOEXEC | MFD_ALLOW_SEALING);
491 12 50         if (fd < 0) { QUEUE_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
492              
493 12 50         if (ftruncate(fd, (off_t)total_size) < 0) {
494 0 0         QUEUE_ERR("ftruncate(memfd): %s", strerror(errno));
495 0           close(fd); return NULL;
496             }
497              
498             /* Seal size against ftruncate-based SIGBUS attacks via SCM_RIGHTS peers. */
499 12           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
500              
501 12           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
502 12 50         if (base == MAP_FAILED) {
503 0 0         QUEUE_ERR("mmap(memfd): %s", strerror(errno));
504 0           close(fd); return NULL;
505             }
506              
507 12           queue_init_new_header(base, cap, arena_cap, slots_off, arena_off, mode, total_size);
508              
509 12           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
510 12 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
511              
512 12           h->hdr = (QueueHeader *)base;
513 12           h->slots = (char *)base + slots_off;
514 12 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + arena_off : NULL;
515 12           h->mmap_size = (size_t)total_size;
516 12           h->capacity = cap;
517 12           h->cap_mask = cap - 1;
518 12           h->arena_cap = arena_cap;
519 12           h->path = NULL;
520 12           h->notify_fd = -1;
521 12           h->backing_fd = fd;
522              
523 12           return h;
524             }
525              
526             /* Open queue from an existing fd (memfd received via SCM_RIGHTS or dup) */
527 2           static QueueHandle *queue_open_fd(int fd, uint32_t mode, char *errbuf) {
528 2 50         if (errbuf) errbuf[0] = '\0';
529              
530             struct stat st;
531 2 50         if (fstat(fd, &st) < 0) {
532 0 0         QUEUE_ERR("fstat(fd=%d): %s", fd, strerror(errno));
533 0           return NULL;
534             }
535              
536 2 50         if ((uint64_t)st.st_size < sizeof(QueueHeader)) {
537 0 0         QUEUE_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
538 0           return NULL;
539             }
540              
541 2           size_t map_size = (size_t)st.st_size;
542 2           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
543 2 50         if (base == MAP_FAILED) {
544 0 0         QUEUE_ERR("mmap(fd=%d): %s", fd, strerror(errno));
545 0           return NULL;
546             }
547              
548 2           QueueHeader *hdr = (QueueHeader *)base;
549 2           uint64_t slot_size = (mode == QUEUE_MODE_INT) ? sizeof(QueueIntSlot)
550 3 100         : (mode == QUEUE_MODE_INT32) ? sizeof(QueueInt32Slot)
551 2 50         : (mode == QUEUE_MODE_INT16) ? sizeof(QueueInt16Slot)
552 1 50         : sizeof(QueueStrSlot);
553 6           int valid = (hdr->magic == QUEUE_MAGIC &&
554 2 50         hdr->version == QUEUE_VERSION &&
555 2 50         hdr->mode == mode &&
556 2 50         hdr->capacity > 0 &&
557 2 50         (hdr->capacity & (hdr->capacity - 1)) == 0 &&
558 2 50         hdr->total_size == (uint64_t)st.st_size &&
559 6 50         hdr->slots_off == sizeof(QueueHeader) &&
    50          
560 2 50         hdr->capacity <= (hdr->total_size - hdr->slots_off) / slot_size);
561 2 100         if (mode == QUEUE_MODE_STR && valid) {
    50          
562 1           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
563 2 50         valid = (hdr->arena_off >= slots_end &&
564 1 50         hdr->arena_off + hdr->arena_cap <= hdr->total_size);
565             }
566 2 50         if (!valid) {
567 0 0         QUEUE_ERR("fd %d: invalid or incompatible queue", fd);
568 0           munmap(base, map_size);
569 0           return NULL;
570             }
571              
572             /* Dup the fd so caller retains ownership of the original */
573 2           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
574 2 50         if (myfd < 0) {
575 0 0         QUEUE_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
576 0           munmap(base, map_size);
577 0           return NULL;
578             }
579              
580 2           QueueHandle *h = (QueueHandle *)calloc(1, sizeof(QueueHandle));
581 2 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
582              
583 2           h->hdr = hdr;
584 2           h->slots = (char *)base + hdr->slots_off;
585 2 100         h->arena = (mode == QUEUE_MODE_STR) ? (char *)base + hdr->arena_off : NULL;
586 2           h->mmap_size = map_size;
587 2           h->capacity = hdr->capacity;
588 2           h->cap_mask = hdr->capacity - 1;
589 2           h->arena_cap = hdr->arena_cap;
590 2           h->path = NULL;
591 2           h->notify_fd = -1;
592 2           h->backing_fd = myfd;
593              
594 2           return h;
595             }
596              
597 97           static void queue_destroy(QueueHandle *h) {
598 97 50         if (!h) return;
599             /* Only close notify_fd if we own it; caller-supplied fds (via
600             * queue_eventfd_set) remain the caller's responsibility. */
601 97 100         if (h->notify_fd >= 0 && h->notify_fd_owned) close(h->notify_fd);
    100          
602 97 100         if (h->backing_fd >= 0) close(h->backing_fd);
603 97 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
604 97           free(h->copy_buf);
605 97           free(h->path);
606 97           free(h);
607             }
608              
609             /* ================================================================
610             * Int queue macro template: Vyukov bounded MPMC (lock-free)
611             *
612             * DEFINE_INT_QUEUE(prefix, SlotType, ValType, SeqType, DiffType)
613             * generates: queue__try_push, try_pop, push_wait, pop_wait,
614             * peek, size, clear
615             * ================================================================ */
616              
617             #define DEFINE_INT_QUEUE(PFX, SLOT, VTYPE, STYPE, DTYPE) \
618             \
619             static inline int queue_##PFX##_try_push(QueueHandle *h, VTYPE value) { \
620             QueueHeader *hdr = h->hdr; \
621             SLOT *slots = (SLOT *)h->slots; \
622             uint32_t mask = h->cap_mask; \
623             uint64_t pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
624             for (;;) { \
625             SLOT *slot = &slots[pos & mask]; \
626             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
627             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)pos; \
628             if (diff == 0) { \
629             if (__atomic_compare_exchange_n(&hdr->tail, &pos, pos + 1, \
630             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
631             slot->value = value; \
632             __atomic_store_n(&slot->sequence, (STYPE)(pos + 1), \
633             __ATOMIC_RELEASE); \
634             __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED); \
635             queue_wake_consumers(hdr); \
636             return 1; \
637             } \
638             } else if (diff < 0) { \
639             __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED); \
640             return 0; \
641             } else { \
642             pos = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); \
643             } \
644             } \
645             } \
646             \
647             static inline int queue_##PFX##_try_pop(QueueHandle *h, VTYPE *value) { \
648             QueueHeader *hdr = h->hdr; \
649             SLOT *slots = (SLOT *)h->slots; \
650             uint32_t mask = h->cap_mask; \
651             uint64_t pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
652             for (;;) { \
653             SLOT *slot = &slots[pos & mask]; \
654             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
655             DTYPE diff = (DTYPE)seq - (DTYPE)(STYPE)(pos + 1); \
656             if (diff == 0) { \
657             if (__atomic_compare_exchange_n(&hdr->head, &pos, pos + 1, \
658             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { \
659             *value = slot->value; \
660             __atomic_store_n(&slot->sequence, \
661             (STYPE)(pos + h->capacity), __ATOMIC_RELEASE); \
662             __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED); \
663             queue_wake_producers(hdr); \
664             return 1; \
665             } \
666             } else if (diff < 0) { \
667             __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED); \
668             return 0; \
669             } else { \
670             pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED); \
671             } \
672             } \
673             } \
674             \
675             static int queue_##PFX##_push_wait(QueueHandle *h, VTYPE value, \
676             double timeout) { \
677             if (queue_##PFX##_try_push(h, value)) return 1; \
678             if (timeout == 0) return 0; \
679             QueueHeader *hdr = h->hdr; \
680             struct timespec deadline, remaining; \
681             int has_deadline = (timeout > 0); \
682             if (has_deadline) queue_make_deadline(timeout, &deadline); \
683             for (;;) { \
684             /* Announce waiter BEFORE sampling fseq and re-checking the \
685             * condition. The reverse order races with a waker that pops \
686             * between our check and our announce: the waker reads \
687             * push_waiters==0, skips the push_futex bump, and our subsequent \
688             * FUTEX_WAIT then sleeps on an unchanged seq with no future \
689             * waker to wake it. */ \
690             __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
691             uint32_t fseq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE); \
692             if (queue_##PFX##_try_push(h, value)) { \
693             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
694             return 1; \
695             } \
696             struct timespec *pts = NULL; \
697             if (has_deadline) { \
698             if (!queue_remaining_time(&deadline, &remaining)) { \
699             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
700             return 0; \
701             } \
702             pts = &remaining; \
703             } \
704             long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, \
705             fseq, pts, NULL, 0); \
706             __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE); \
707             if (queue_##PFX##_try_push(h, value)) return 1; \
708             if (rc == -1 && errno == ETIMEDOUT) return 0; \
709             } \
710             } \
711             \
712             static int queue_##PFX##_pop_wait(QueueHandle *h, VTYPE *value, \
713             double timeout) { \
714             if (queue_##PFX##_try_pop(h, value)) return 1; \
715             if (timeout == 0) return 0; \
716             QueueHeader *hdr = h->hdr; \
717             struct timespec deadline, remaining; \
718             int has_deadline = (timeout > 0); \
719             if (has_deadline) queue_make_deadline(timeout, &deadline); \
720             for (;;) { \
721             /* Announce BEFORE sampling fseq / checking condition; see \
722             * queue_##PFX##_push_wait for the missed-wakeup race this \
723             * ordering closes. */ \
724             __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
725             uint32_t fseq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE); \
726             if (queue_##PFX##_try_pop(h, value)) { \
727             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
728             return 1; \
729             } \
730             struct timespec *pts = NULL; \
731             if (has_deadline) { \
732             if (!queue_remaining_time(&deadline, &remaining)) { \
733             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
734             return 0; \
735             } \
736             pts = &remaining; \
737             } \
738             long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, \
739             fseq, pts, NULL, 0); \
740             __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
741             if (queue_##PFX##_try_pop(h, value)) return 1; \
742             if (rc == -1 && errno == ETIMEDOUT) return 0; \
743             } \
744             } \
745             \
746             static inline int queue_##PFX##_peek(QueueHandle *h, VTYPE *value) { \
747             SLOT *slots = (SLOT *)h->slots; \
748             uint64_t pos = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE); \
749             SLOT *slot = &slots[pos & h->cap_mask]; \
750             STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
751             if ((DTYPE)seq - (DTYPE)(STYPE)(pos + 1) == 0) { \
752             *value = slot->value; \
753             return 1; \
754             } \
755             return 0; \
756             } \
757             \
758             static inline uint64_t queue_##PFX##_size(QueueHandle *h) { \
759             uint64_t tail = __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED); \
760             uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_RELAXED); \
761             return tail - head; \
762             } \
763             \
764             static void queue_##PFX##_clear(QueueHandle *h) { \
765             VTYPE tmp; \
766             while (queue_##PFX##_try_pop(h, &tmp)) {} \
767             }
768              
769             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
770 784 100         DEFINE_INT_QUEUE(int, QueueIntSlot, int64_t, uint64_t, int64_t)
  49 100          
  28 100          
  9 50          
  114 50          
  15 50          
  349 50          
  220 50          
    100          
    50          
    50          
    100          
    100          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
    100          
    50          
    50          
771              
772             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
773 310 100         DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)
  17 50          
  6 100          
  1 50          
  103 50          
  0 50          
  145 50          
  38 50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    100          
    50          
    50          
774              
775             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
776 26 0         DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)
  0 0          
  1 0          
  0 0          
  0 0          
  0 0          
  13 0          
  12 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
    50          
    50          
    50          
    50          
    0          
777              
778             /* ================================================================
779             * Str queue: mutex-protected with circular arena
780             * ================================================================ */
781              
782             /* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
783 190           static inline int queue_str_push_locked(QueueHandle *h, const char *str,
784             uint32_t len, bool utf8) {
785 190           QueueHeader *hdr = h->hdr;
786              
787 190 50         if (len > QUEUE_STR_LEN_MASK) return -2;
788              
789 190 100         if (hdr->tail - hdr->head >= h->capacity) {
790 11           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
791 11           return 0;
792             }
793              
794 179           uint32_t alloc = (len + 7) & ~7u;
795 179 100         if (alloc == 0) alloc = 8;
796 179 50         if (alloc > h->arena_cap) return -2;
797 179           uint32_t saved_wpos = hdr->arena_wpos;
798 179           uint32_t pos = saved_wpos;
799 179           uint64_t skip = alloc;
800              
801 179 100         if ((uint64_t)pos + alloc > h->arena_cap) {
802 2           skip += h->arena_cap - pos;
803 2           pos = 0;
804             }
805              
806 179 100         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
807 1 50         if (hdr->tail == hdr->head) {
808 0           hdr->arena_wpos = 0;
809 0           hdr->arena_used = 0;
810 0           saved_wpos = 0;
811 0           pos = 0;
812 0           skip = alloc;
813             } else {
814 1           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
815 1           return 0;
816             }
817             }
818              
819 178           memcpy(h->arena + pos, str, len);
820              
821 178           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
822 178           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
823 178           slot->arena_off = pos;
824 178 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
825 178           slot->arena_skip = (uint32_t)skip;
826 178           slot->prev_wpos = saved_wpos;
827              
828 178           hdr->arena_wpos = pos + alloc;
829 178           hdr->arena_used += (uint32_t)skip;
830 178           hdr->tail++;
831 178           __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
832 178           return 1;
833             }
834              
835 184           static inline int queue_str_try_push(QueueHandle *h, const char *str,
836             uint32_t len, bool utf8) {
837 184           queue_mutex_lock(h->hdr);
838 184           int r = queue_str_push_locked(h, str, len, utf8);
839 184           queue_mutex_unlock(h->hdr);
840 184 100         if (r == 1) queue_wake_consumers(h->hdr);
841 184           return r;
842             }
843              
844             /* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
845 194           static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
846             uint32_t *out_len, bool *out_utf8) {
847 194           QueueHeader *hdr = h->hdr;
848              
849 194 100         if (hdr->tail == hdr->head) {
850 30           __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
851 30           return 0;
852             }
853              
854 164           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
855 164           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
856              
857 164           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
858 164           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
859              
860 164 50         if (!queue_ensure_copy_buf(h, len + 1))
861 0           return -1;
862 164 100         if (len > 0)
863 162           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
864 164           h->copy_buf[len] = '\0';
865 164           *out_str = h->copy_buf;
866 164           *out_len = len;
867              
868 164           hdr->arena_used -= slot->arena_skip;
869 164 100         if (hdr->arena_used == 0)
870 49           hdr->arena_wpos = 0;
871              
872 164           hdr->head++;
873 164           __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
874 164           return 1;
875             }
876              
877 163           static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
878             uint32_t *out_len, bool *out_utf8) {
879 163           queue_mutex_lock(h->hdr);
880 163           int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
881 163           queue_mutex_unlock(h->hdr);
882 163 100         if (r == 1) queue_wake_producers(h->hdr);
883 163           return r;
884             }
885              
886 8           static int queue_str_push_wait(QueueHandle *h, const char *str,
887             uint32_t len, bool utf8, double timeout) {
888 8           int r = queue_str_try_push(h, str, len, utf8);
889 8 100         if (r != 0) return r; /* 1 = success, -2 = too long */
890 3 50         if (timeout == 0) return 0;
891              
892 3           QueueHeader *hdr = h->hdr;
893             struct timespec deadline, remaining;
894 3           int has_deadline = (timeout > 0);
895 3 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
896              
897 0           for (;;) {
898             /* Announce BEFORE sampling seq and re-trying; otherwise a popper
899             * that drains between try_push and waiters++ reads waiters==0,
900             * skips the push_futex bump, and our FUTEX_WAIT then sleeps on
901             * an unchanged seq with no future waker. */
902 3           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
903 3           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
904 3           r = queue_str_try_push(h, str, len, utf8);
905 3 50         if (r != 0) {
906 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
907 0           return r;
908             }
909              
910 3           struct timespec *pts = NULL;
911 3 50         if (has_deadline) {
912 3 50         if (!queue_remaining_time(&deadline, &remaining)) {
913 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
914 0           return 0;
915             }
916 3           pts = &remaining;
917             }
918 3           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
919 3           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
920              
921 3           r = queue_str_try_push(h, str, len, utf8);
922 3 100         if (r != 0) return r;
923 2 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
924             }
925             }
926              
927 57           static int queue_str_pop_wait(QueueHandle *h, const char **out_str,
928             uint32_t *out_len, bool *out_utf8, double timeout) {
929 57           int r = queue_str_try_pop(h, out_str, out_len, out_utf8);
930 57 100         if (r != 0) return r;
931 7 50         if (timeout == 0) return 0;
932              
933 7           QueueHeader *hdr = h->hdr;
934             struct timespec deadline, remaining;
935 7           int has_deadline = (timeout > 0);
936 7 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
937              
938 1           for (;;) {
939             /* Announce BEFORE sampling seq / re-trying; see push_wait. */
940 8           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
941 8           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
942 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
943 8 50         if (r != 0) {
944 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
945 0           return r;
946             }
947              
948 8           struct timespec *pts = NULL;
949 8 50         if (has_deadline) {
950 8 50         if (!queue_remaining_time(&deadline, &remaining)) {
951 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
952 0           return 0;
953             }
954 8           pts = &remaining;
955             }
956 8           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
957 8           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
958              
959 8           r = queue_str_try_pop(h, out_str, out_len, out_utf8);
960 8 100         if (r != 0) return r;
961 3 100         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
962             }
963             }
964              
965 26           static inline uint64_t queue_str_size(QueueHandle *h) {
966 26           QueueHeader *hdr = h->hdr;
967 26           uint64_t tail = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
968 26           uint64_t head = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
969 26           return tail - head; /* unsigned wrap is correct for push_front (head > tail) */
970             }
971              
972 7           static void queue_str_clear(QueueHandle *h) {
973 7           QueueHeader *hdr = h->hdr;
974 7           queue_mutex_lock(hdr);
975 7           hdr->head = 0;
976 7           hdr->tail = 0;
977 7           hdr->arena_wpos = 0;
978 7           hdr->arena_used = 0;
979 7           queue_mutex_unlock(hdr);
980             /* clear is a bulk transition — wake every blocked producer and
981             * consumer so they re-evaluate state, not just one of each. */
982 7 50         if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
983 0           __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
984 0           syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
985             }
986 7 50         if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
987 0           __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
988 0           syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
989             }
990 7           }
991              
992             /* Peek: read front element without consuming (exact, under mutex). */
993 7           static inline int queue_str_peek(QueueHandle *h, const char **out_str,
994             uint32_t *out_len, bool *out_utf8) {
995 7           QueueHeader *hdr = h->hdr;
996 7           queue_mutex_lock(hdr);
997 7 100         if (hdr->tail == hdr->head) {
998 1           queue_mutex_unlock(hdr);
999 1           return 0;
1000             }
1001 6           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
1002 6           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1003 6           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
1004 6           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
1005 6 50         if (!queue_ensure_copy_buf(h, len + 1)) {
1006 0           queue_mutex_unlock(hdr);
1007 0           return -1;
1008             }
1009 6 50         if (len > 0)
1010 6           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
1011 6           h->copy_buf[len] = '\0';
1012 6           *out_str = h->copy_buf;
1013 6           *out_len = len;
1014 6           queue_mutex_unlock(hdr);
1015 6           return 1;
1016             }
1017              
1018             /* Push to front of queue (requeue). Str only — Int is strictly FIFO. */
1019 21           static inline int queue_str_push_front(QueueHandle *h, const char *str,
1020             uint32_t len, bool utf8) {
1021 21           QueueHeader *hdr = h->hdr;
1022 21           queue_mutex_lock(hdr);
1023              
1024 21 50         if (len > QUEUE_STR_LEN_MASK) {
1025 0           queue_mutex_unlock(hdr);
1026 0           return -2;
1027             }
1028              
1029 21           uint64_t size = hdr->tail - hdr->head;
1030 21 100         if (size >= h->capacity) {
1031 6           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
1032 6           queue_mutex_unlock(hdr);
1033 6           return 0;
1034             }
1035              
1036 15           uint32_t alloc = (len + 7) & ~7u;
1037 15 50         if (alloc == 0) alloc = 8;
1038 15 50         if (alloc > h->arena_cap) {
1039 0           queue_mutex_unlock(hdr);
1040 0           return -2;
1041             }
1042 15           uint32_t saved_wpos = hdr->arena_wpos;
1043 15           uint32_t pos = saved_wpos;
1044 15           uint64_t skip = alloc;
1045              
1046 15 50         if ((uint64_t)pos + alloc > h->arena_cap) {
1047 0           skip += h->arena_cap - pos;
1048 0           pos = 0;
1049             }
1050              
1051 15 50         if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
1052 0 0         if (hdr->tail == hdr->head) {
1053 0           hdr->arena_wpos = 0;
1054 0           hdr->arena_used = 0;
1055 0           saved_wpos = 0;
1056 0           pos = 0;
1057 0           skip = alloc;
1058             } else {
1059 0           __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
1060 0           queue_mutex_unlock(hdr);
1061 0           return 0;
1062             }
1063             }
1064              
1065 15           memcpy(h->arena + pos, str, len);
1066              
1067 15           hdr->head--;
1068 15           uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
1069 15           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1070 15           slot->arena_off = pos;
1071 15 100         slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
1072 15           slot->arena_skip = (uint32_t)skip;
1073 15           slot->prev_wpos = saved_wpos;
1074              
1075 15           hdr->arena_wpos = pos + alloc;
1076 15           hdr->arena_used += (uint32_t)skip;
1077 15           __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
1078              
1079 15           queue_mutex_unlock(hdr);
1080 15           queue_wake_consumers(hdr);
1081 15           return 1;
1082             }
1083              
1084 3           static int queue_str_push_front_wait(QueueHandle *h, const char *str,
1085             uint32_t len, bool utf8, double timeout) {
1086 3           int r = queue_str_push_front(h, str, len, utf8);
1087 3 100         if (r != 0) return r;
1088 2 50         if (timeout == 0) return 0;
1089              
1090 2           QueueHeader *hdr = h->hdr;
1091             struct timespec deadline, remaining;
1092 2           int has_deadline = (timeout > 0);
1093 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1094              
1095 0           for (;;) {
1096             /* Announce BEFORE sampling seq / re-trying; see push_wait. */
1097 2           __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1098 2           uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
1099 2           r = queue_str_push_front(h, str, len, utf8);
1100 2 50         if (r != 0) {
1101 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1102 0           return r;
1103             }
1104              
1105 2           struct timespec *pts = NULL;
1106 2 50         if (has_deadline) {
1107 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1108 0           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1109 0           return 0;
1110             }
1111 2           pts = &remaining;
1112             }
1113 2           long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1114 2           __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
1115              
1116 2           r = queue_str_push_front(h, str, len, utf8);
1117 2 100         if (r != 0) return r;
1118 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1119             }
1120             }
1121              
1122             /* Pop from back (tail). Str only — undoes the most recent push. */
1123 26           static inline int queue_str_pop_back(QueueHandle *h, const char **out_str,
1124             uint32_t *out_len, bool *out_utf8) {
1125 26           QueueHeader *hdr = h->hdr;
1126 26           queue_mutex_lock(hdr);
1127              
1128 26 100         if (hdr->tail == hdr->head) {
1129 7           __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
1130 7           queue_mutex_unlock(hdr);
1131 7           return 0;
1132             }
1133              
1134 19           hdr->tail--;
1135 19           uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
1136 19           QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
1137              
1138 19           uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
1139 19           *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
1140              
1141 19 50         if (!queue_ensure_copy_buf(h, len + 1)) {
1142 0           hdr->tail++; /* rollback */
1143 0           queue_mutex_unlock(hdr);
1144 0           return -1;
1145             }
1146 19 50         if (len > 0)
1147 19           memcpy(h->copy_buf, h->arena + slot->arena_off, len);
1148 19           h->copy_buf[len] = '\0';
1149 19           *out_str = h->copy_buf;
1150 19           *out_len = len;
1151              
1152 19           hdr->arena_used -= slot->arena_skip;
1153             /* Restore arena_wpos to before this slot's push if it's the frontier.
1154             * prev_wpos correctly handles wrap waste — it's the pre-push state
1155             * including the original position before any wrap adjustment. */
1156             {
1157 19           uint32_t slot_alloc = (len + 7) & ~7u;
1158 19 50         if (slot_alloc == 0) slot_alloc = 8;
1159 19 100         if (slot->arena_off + slot_alloc == hdr->arena_wpos)
1160 16           hdr->arena_wpos = slot->prev_wpos;
1161             }
1162 19 100         if (hdr->arena_used == 0)
1163 7           hdr->arena_wpos = 0;
1164              
1165 19           __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
1166 19           queue_mutex_unlock(hdr);
1167 19           queue_wake_producers(hdr);
1168 19           return 1;
1169             }
1170              
1171 3           static int queue_str_pop_back_wait(QueueHandle *h, const char **out_str,
1172             uint32_t *out_len, bool *out_utf8, double timeout) {
1173 3           int r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1174 3 100         if (r != 0) return r;
1175 2 50         if (timeout == 0) return 0;
1176              
1177 2           QueueHeader *hdr = h->hdr;
1178             struct timespec deadline, remaining;
1179 2           int has_deadline = (timeout > 0);
1180 2 50         if (has_deadline) queue_make_deadline(timeout, &deadline);
1181              
1182 0           for (;;) {
1183             /* Announce BEFORE sampling seq / re-trying; see pop_wait. */
1184 2           __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1185 2           uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
1186 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1187 2 50         if (r != 0) {
1188 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1189 0           return r;
1190             }
1191              
1192 2           struct timespec *pts = NULL;
1193 2 50         if (has_deadline) {
1194 2 50         if (!queue_remaining_time(&deadline, &remaining)) {
1195 0           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1196 0           return 0;
1197             }
1198 2           pts = &remaining;
1199             }
1200 2           long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
1201 2           __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
1202              
1203 2           r = queue_str_pop_back(h, out_str, out_len, out_utf8);
1204 2 100         if (r != 0) return r;
1205 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
1206             }
1207             }
1208              
1209             /* msync — flush mmap to disk for crash durability */
1210 5           static inline int queue_sync(QueueHandle *h) {
1211 5           return msync(h->hdr, h->mmap_size, MS_SYNC);
1212             }
1213              
1214             /* ================================================================
1215             * eventfd — event-loop integration
1216             * ================================================================ */
1217              
1218 11           static inline int queue_eventfd_create(QueueHandle *h) {
1219 11 100         if (h->notify_fd >= 0) return h->notify_fd;
1220 10           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1221 10 50         if (h->notify_fd >= 0) h->notify_fd_owned = 1;
1222 10           return h->notify_fd;
1223             }
1224              
1225             /* Replace the current notify fd. Closes the old fd ONLY if we created
1226             * it ourselves via queue_eventfd_create — caller-supplied fds remain
1227             * the caller's property. */
1228 1           static inline void queue_eventfd_set(QueueHandle *h, int fd) {
1229 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd && h->notify_fd_owned)
    50          
    50          
1230 1           close(h->notify_fd);
1231 1           h->notify_fd = fd;
1232 1           h->notify_fd_owned = 0;
1233 1           }
1234              
1235             /* Signal that data is available. Called after successful push. */
1236 8           static inline void queue_notify(QueueHandle *h) {
1237 8 100         if (h->notify_fd >= 0) {
1238 7           uint64_t one = 1;
1239 7           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
1240             }
1241 8           }
1242              
1243             /* Consume notification counter. Call from event-loop callback before pop.
1244             * Returns the accumulated counter, or -1 on error / no eventfd. */
1245 10           static inline int64_t queue_eventfd_consume(QueueHandle *h) {
1246 10 50         if (h->notify_fd < 0) return -1;
1247 10           uint64_t val = 0;
1248 10 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1249 10           return (int64_t)val;
1250             }
1251              
1252             #endif /* QUEUE_H */