File Coverage

deque.h
Criterion Covered Total %
statement 321 361 88.9
branch 142 266 53.3
condition n/a
subroutine n/a
pod n/a
total 463 627 73.8


line stmt bran cond sub pod time code
1             /*
2             * deque.h -- Fixed-size shared-memory double-ended queue for Linux
3             *
4             * Ring buffer with CAS-based push/pop at both ends, plus a per-slot
5             * publication state machine for MPMC safety:
6             *
7             * EMPTY -> WRITING (pusher claims slot)
8             * WRITING-> FILLED (pusher publishes value)
9             * FILLED -> READING (popper claims slot)
10             * READING-> EMPTY (popper releases, generation bumps)
11             *
12             * The slot's ctl word encodes (generation << 2) | state_bits, and
13             * transitions are made via CAS. Head/tail still serialize the POSITION
14             * handout; the per-slot state machine serializes the VALUE handoff.
15             * A consumer that wins the head/tail CAS always waits for the matching
16             * publisher's transition to FILLED before reading.
17             *
18             * Futex blocking when empty or full.
19             */
20              
21             #ifndef DEQUE_H
22             #define DEQUE_H
23              
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37             #include
38             #include
39              
40             #define DEQ_MAGIC 0x44455132U /* "DEQ2" — v2 layout (per-slot ctl) */
41             #define DEQ_VERSION 2
42             #define DEQ_ERR_BUFLEN 256
43              
44             /* Drain-time recovery: how long to wait for a slot stuck in WRITING before
45             * declaring its pusher dead and force-skipping. Matches the slot-stuck
46             * recovery timeout used in sister Data-*-Shared modules (e.g. Stack). */
47             #define DEQ_DRAIN_RECOVERY_SEC 2
48              
49             #define DEQ_VAR_INT 0
50             #define DEQ_VAR_STR 1
51              
52             /* Slot state (low 2 bits of ctl word). Upper 62 bits = generation. */
53             #define DEQ_SLOT_EMPTY 0u
54             #define DEQ_SLOT_WRITING 1u
55             #define DEQ_SLOT_FILLED 2u
56             #define DEQ_SLOT_READING 3u
57             #define DEQ_SLOT_STATE_MASK 3u
58             #define DEQ_SLOT_STATE(c) ((uint32_t)((c) & DEQ_SLOT_STATE_MASK))
59             #define DEQ_SLOT_GEN(c) ((c) >> 2)
60              
61             /* Combined cursor: upper 32 bits = head, lower 32 bits = tail. A single
62             * 64-bit CAS atomically updates both ends, so push_front vs push_back (or
63             * pop_front vs pop_back) cannot both succeed when they share a boundary
64             * slot. Capacity is bounded by 2^31 elements. Head and tail wrap mod 2^32
65             * after 4B ops; size = (tail - head) treated as uint32.
66             */
67             typedef struct {
68             uint32_t magic;
69             uint32_t version;
70             uint32_t elem_size;
71             uint32_t variant_id;
72             uint64_t capacity;
73             uint64_t total_size;
74             uint64_t data_off;
75             uint64_t ctl_off; /* offset to per-slot ctl array */
76             uint8_t _pad0[16];
77              
78             uint64_t cursor; /* 64: (head<<32)|tail */
79             uint32_t waiters_push; /* 72 */
80             uint32_t waiters_pop; /* 76 */
81             uint64_t stat_pushes; /* 80 */
82             uint64_t stat_pops; /* 88 */
83             uint64_t stat_waits; /* 96 */
84             uint64_t stat_timeouts; /* 104 */
85             uint32_t push_wake_seq; /* 112: bumped by every pop, futex word for pushers */
86             uint32_t pop_wake_seq; /* 116: bumped by every push, futex word for poppers */
87             uint64_t stat_recoveries; /* 120: drain-time recovery of stuck slots (WRITING or EMPTY) */
88             } DeqHeader;
89              
90             #define DEQ_CURSOR(head, tail) (((uint64_t)(head) << 32) | (uint32_t)(tail))
91             #define DEQ_CURSOR_HEAD(c) ((uint32_t)((c) >> 32))
92             #define DEQ_CURSOR_TAIL(c) ((uint32_t)(c))
93             #define DEQ_CURSOR_SIZE(c) ((uint32_t)(DEQ_CURSOR_TAIL(c) - DEQ_CURSOR_HEAD(c)))
94              
95             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
96             _Static_assert(sizeof(DeqHeader) == 128, "DeqHeader must be 128 bytes");
97             #endif
98              
99             typedef struct {
100             DeqHeader *hdr;
101             uint8_t *data;
102             uint64_t *ctl; /* per-slot state+generation word */
103             size_t mmap_size;
104             uint32_t elem_size;
105             char *path;
106             int notify_fd;
107             int backing_fd;
108             } DeqHandle;
109              
110             /* ================================================================ */
111              
112 7           static inline void deq_make_deadline(double t, struct timespec *dl) {
113 7           clock_gettime(CLOCK_MONOTONIC, dl);
114 7           dl->tv_sec += (time_t)t;
115 7           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
116 7 100         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
117 7           }
118              
119 20341           static inline int deq_remaining(const struct timespec *dl, struct timespec *rem) {
120             struct timespec now;
121 20341           clock_gettime(CLOCK_MONOTONIC, &now);
122 20341           rem->tv_sec = dl->tv_sec - now.tv_sec;
123 20341           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
124 20341 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
125 20341           return rem->tv_sec >= 0;
126             }
127              
128 18153           static inline uint8_t *deq_slot(DeqHandle *h, uint32_t idx) {
129 18153           return h->data + (idx % (uint32_t)h->hdr->capacity) * h->elem_size;
130             }
131              
132 25           static inline uint32_t deq_size(DeqHandle *h) {
133 25           uint64_t c = __atomic_load_n(&h->hdr->cursor, __ATOMIC_ACQUIRE);
134 25           return DEQ_CURSOR_SIZE(c);
135             }
136              
137 1301248           static inline void deq_spin_pause(void) {
138             #if defined(__x86_64__) || defined(__i386__)
139 1301248           __asm__ volatile("pause" ::: "memory");
140             #elif defined(__aarch64__)
141             __asm__ volatile("yield" ::: "memory");
142             #endif
143 1301248           }
144              
145             /* --- per-slot state machine helpers ---
146             * Claim a slot for writing: spin CAS until we observe EMPTY and can mark
147             * WRITING. Returns the generation that was observed, for the matching
148             * publish. Caller holds the position CAS so at most one pusher targets
149             * this slot, but a pending popper from the previous cycle may still be
150             * finishing; the spin is bounded by that popper's READING -> EMPTY store.
151             */
152 98           static inline uint64_t deq_slot_claim_write(uint64_t *ctl_word) {
153 0           for (;;) {
154 98           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
155 98 50         if (DEQ_SLOT_STATE(c) == DEQ_SLOT_EMPTY) {
156 98           uint64_t nc = (DEQ_SLOT_GEN(c) << 2) | DEQ_SLOT_WRITING;
157 98 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
158             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
159 98           return DEQ_SLOT_GEN(c);
160             }
161 0           deq_spin_pause();
162             }
163             }
164              
165             /* Publish written value: WRITING@gen -> FILLED@gen. Implemented as CAS
166             * (not a plain store) so that if deq_drain force-recovered the slot mid-
167             * write — bumping it to EMPTY@(gen+1) — this publish is a no-op rather
168             * than clobbering the recovered state back to FILLED@gen. That would
169             * leave a phantom FILLED at a stale gen which the next pusher's
170             * deq_slot_claim_write (waits on EMPTY) could never advance past,
171             * deadlocking that slot forever. The caller's cursor CAS was already
172             * committed, so on lost-race the value is silently dropped — matching
173             * the documented drain-recovery semantics. */
174 98           static inline void deq_slot_publish(uint64_t *ctl_word, uint64_t gen) {
175 98           uint64_t expected = (gen << 2) | DEQ_SLOT_WRITING;
176 98           uint64_t desired = (gen << 2) | DEQ_SLOT_FILLED;
177 98           (void)__atomic_compare_exchange_n(ctl_word, &expected, desired,
178             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
179 98           }
180              
181             /* Claim a slot for reading: spin CAS until we observe FILLED and mark READING. */
182 18055           static inline uint64_t deq_slot_claim_read(uint64_t *ctl_word) {
183 0           for (;;) {
184 18055           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
185 18055 50         if (DEQ_SLOT_STATE(c) == DEQ_SLOT_FILLED) {
186 18055           uint64_t nc = (DEQ_SLOT_GEN(c) << 2) | DEQ_SLOT_READING;
187 18055 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
188             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
189 18055           return DEQ_SLOT_GEN(c);
190             }
191 0           deq_spin_pause();
192             }
193             }
194              
195             /* Release slot after read: READING -> EMPTY with gen+1. */
196 18066           static inline void deq_slot_release(uint64_t *ctl_word, uint64_t gen) {
197 18066           __atomic_store_n(ctl_word, ((gen + 1) << 2) | DEQ_SLOT_EMPTY, __ATOMIC_RELEASE);
198 18066           }
199              
200             /* ================================================================
201             * Push back (tail++)
202             * ================================================================ */
203              
204 93           static inline int deq_try_push_back(DeqHandle *h, const void *val, uint32_t vlen) {
205 93           DeqHeader *hdr = h->hdr;
206 93           uint32_t cap = (uint32_t)hdr->capacity;
207 0           for (;;) {
208 93           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
209 93           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
210 181 100         if ((uint32_t)(t - hd) >= cap) return 0;
211 88           uint64_t nc = DEQ_CURSOR(hd, t + 1);
212 88 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
213             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
214 88           uint32_t sz = h->elem_size;
215 88           uint32_t cp = vlen < sz ? vlen : sz;
216 88           uint32_t idx = t % cap;
217 88           uint64_t gen = deq_slot_claim_write(&h->ctl[idx]);
218 88           memcpy(deq_slot(h, t), val, cp);
219 88 50         if (cp < sz) memset(deq_slot(h, t) + cp, 0, sz - cp);
220 88           deq_slot_publish(&h->ctl[idx], gen);
221 88           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
222 88 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
223 0           __atomic_add_fetch(&hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
224 0           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
225             }
226 88           return 1;
227             }
228             }
229             }
230              
231             /* ================================================================
232             * Push front (head--)
233             * ================================================================ */
234              
235 15           static inline int deq_try_push_front(DeqHandle *h, const void *val, uint32_t vlen) {
236 15           DeqHeader *hdr = h->hdr;
237 15           uint32_t cap = (uint32_t)hdr->capacity;
238 0           for (;;) {
239 15           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
240 15           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
241 25 100         if ((uint32_t)(t - hd) >= cap) return 0;
242 10           uint32_t new_hd = hd - 1;
243 10           uint64_t nc = DEQ_CURSOR(new_hd, t);
244 10 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
245             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
246 10           uint32_t sz = h->elem_size;
247 10           uint32_t cp = vlen < sz ? vlen : sz;
248 10           uint32_t idx = new_hd % cap;
249 10           uint64_t gen = deq_slot_claim_write(&h->ctl[idx]);
250 10           memcpy(deq_slot(h, new_hd), val, cp);
251 10 50         if (cp < sz) memset(deq_slot(h, new_hd) + cp, 0, sz - cp);
252 10           deq_slot_publish(&h->ctl[idx], gen);
253 10           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
254 10 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
255 0           __atomic_add_fetch(&hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
256 0           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
257             }
258 10           return 1;
259             }
260             }
261             }
262              
263             /* ================================================================
264             * Pop front (head++)
265             * ================================================================ */
266              
267 18051           static inline int deq_try_pop_front(DeqHandle *h, void *out) {
268 18051           DeqHeader *hdr = h->hdr;
269 18051           uint32_t cap = (uint32_t)hdr->capacity;
270 0           for (;;) {
271 18051           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
272 18051           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
273 36096 100         if ((uint32_t)(t - hd) == 0) return 0;
274 18045           uint64_t nc = DEQ_CURSOR(hd + 1, t);
275 18045 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
276             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
277 18045           uint32_t idx = hd % cap;
278 18045           uint64_t gen = deq_slot_claim_read(&h->ctl[idx]);
279 18045           memcpy(out, deq_slot(h, hd), h->elem_size);
280 18045           deq_slot_release(&h->ctl[idx], gen);
281 18045           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
282 18045 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
283 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
284 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
285             }
286 18045           return 1;
287             }
288             }
289             }
290              
291             /* ================================================================
292             * Pop back (tail--)
293             * ================================================================ */
294              
295 14           static inline int deq_try_pop_back(DeqHandle *h, void *out) {
296 14           DeqHeader *hdr = h->hdr;
297 14           uint32_t cap = (uint32_t)hdr->capacity;
298 0           for (;;) {
299 14           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
300 14           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
301 24 100         if ((uint32_t)(t - hd) == 0) return 0;
302 10           uint64_t nc = DEQ_CURSOR(hd, t - 1);
303 10 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
304             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
305 10           uint32_t idx = (t - 1) % cap;
306 10           uint64_t gen = deq_slot_claim_read(&h->ctl[idx]);
307 10           memcpy(out, deq_slot(h, t - 1), h->elem_size);
308 10           deq_slot_release(&h->ctl[idx], gen);
309 10           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
310 10 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
311 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
312 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
313             }
314 10           return 1;
315             }
316             }
317             }
318              
319             /* ================================================================
320             * Blocking push/pop
321             * ================================================================ */
322              
323 2           static inline int deq_push_wait(DeqHandle *h, const void *val, uint32_t vlen,
324             int front, double timeout) {
325 2           int (*try_fn)(DeqHandle*, const void*, uint32_t) =
326 2 100         front ? deq_try_push_front : deq_try_push_back;
327 2 50         if (try_fn(h, val, vlen)) return 1;
328 2 50         if (timeout == 0) return 0;
329              
330 2           DeqHeader *hdr = h->hdr;
331             struct timespec dl, rem;
332 2           int has_dl = (timeout > 0);
333 2 50         if (has_dl) deq_make_deadline(timeout, &dl);
334 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
335              
336 2           uint32_t cap = (uint32_t)hdr->capacity;
337 0           for (;;) {
338 2           uint32_t wseq = __atomic_load_n(&hdr->push_wake_seq, __ATOMIC_ACQUIRE);
339 2           __atomic_add_fetch(&hdr->waiters_push, 1, __ATOMIC_RELEASE);
340 2           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
341 2 50         if (DEQ_CURSOR_SIZE(c) >= cap) {
342 2           struct timespec *pts = NULL;
343 2 50         if (has_dl) {
344 2 50         if (!deq_remaining(&dl, &rem)) {
345 0           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
346 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
347 0           return 0;
348             }
349 2           pts = &rem;
350             }
351 2           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
352             }
353 2           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
354 2 50         if (try_fn(h, val, vlen)) return 1;
355 2 50         if (has_dl && !deq_remaining(&dl, &rem)) {
    50          
356 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
357 2           return 0;
358             }
359             }
360             }
361              
362 3           static inline int deq_pop_wait(DeqHandle *h, void *out, int back, double timeout) {
363 3           int (*try_fn)(DeqHandle*, void*) =
364 3 100         back ? deq_try_pop_back : deq_try_pop_front;
365 3 50         if (try_fn(h, out)) return 1;
366 3 50         if (timeout == 0) return 0;
367              
368 3           DeqHeader *hdr = h->hdr;
369             struct timespec dl, rem;
370 3           int has_dl = (timeout > 0);
371 3 50         if (has_dl) deq_make_deadline(timeout, &dl);
372 3           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
373              
374 0           for (;;) {
375 3           uint32_t wseq = __atomic_load_n(&hdr->pop_wake_seq, __ATOMIC_ACQUIRE);
376 3           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
377 3           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
378 3 50         if (DEQ_CURSOR_SIZE(c) == 0) {
379 3           struct timespec *pts = NULL;
380 3 50         if (has_dl) {
381 3 50         if (!deq_remaining(&dl, &rem)) {
382 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
383 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
384 0           return 0;
385             }
386 3           pts = &rem;
387             }
388 3           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
389             }
390 3           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
391 3 100         if (try_fn(h, out)) return 1;
392 2 50         if (has_dl && !deq_remaining(&dl, &rem)) {
    50          
393 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
394 2           return 0;
395             }
396             }
397             }
398              
399             /* ================================================================
400             * Create / Open / Close
401             * ================================================================ */
402              
403             #define DEQ_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, DEQ_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
404              
405             /* Layout offsets — data array first, then 8-byte-aligned ctl array. */
406 45           static inline uint64_t deq_ctl_offset(uint32_t elem_size, uint64_t capacity) {
407 45           uint64_t data_end = sizeof(DeqHeader) + capacity * elem_size;
408 45           return (data_end + 7u) & ~(uint64_t)7u;
409             }
410              
411 25           static inline uint64_t deq_total_size(uint32_t elem_size, uint64_t capacity) {
412 25           return deq_ctl_offset(elem_size, capacity) + capacity * sizeof(uint64_t);
413             }
414              
415 17           static inline void deq_init_header(void *base, uint64_t total,
416             uint32_t elem_size, uint32_t variant_id,
417             uint64_t capacity) {
418 17           DeqHeader *hdr = (DeqHeader *)base;
419 17           memset(base, 0, (size_t)total); /* zeroes data + ctl → all slots EMPTY, gen=0 */
420 17           hdr->magic = DEQ_MAGIC;
421 17           hdr->version = DEQ_VERSION;
422 17           hdr->elem_size = elem_size;
423 17           hdr->variant_id = variant_id;
424 17           hdr->capacity = capacity;
425 17           hdr->total_size = total;
426 17           hdr->data_off = sizeof(DeqHeader);
427 17           hdr->ctl_off = deq_ctl_offset(elem_size, capacity);
428 17           __atomic_thread_fence(__ATOMIC_SEQ_CST);
429 17           }
430              
431 20           static inline DeqHandle *deq_setup(void *base, size_t ms, const char *path, int bfd) {
432 20           DeqHeader *hdr = (DeqHeader *)base;
433 20           DeqHandle *h = (DeqHandle *)calloc(1, sizeof(DeqHandle));
434 20 50         if (!h) { munmap(base, ms); return NULL; }
435 20           h->hdr = hdr;
436 20           h->data = (uint8_t *)base + hdr->data_off;
437 20           h->ctl = (uint64_t *)((uint8_t *)base + hdr->ctl_off);
438 20           h->mmap_size = ms;
439 20           h->elem_size = hdr->elem_size;
440 20 100         h->path = path ? strdup(path) : NULL;
441 20           h->notify_fd = -1;
442 20           h->backing_fd = bfd;
443 20           return h;
444             }
445              
446             /* Validate a mapped header (shared by deq_create reopen and deq_open_fd). */
447 6           static inline int deq_validate_header(const DeqHeader *hdr, uint64_t file_size,
448             uint32_t expected_variant) {
449 6 100         if (hdr->magic != DEQ_MAGIC) return 0;
450 5 50         if (hdr->version != DEQ_VERSION) return 0;
451 5 50         if (hdr->variant_id != expected_variant) return 0;
452 5 50         if (hdr->elem_size == 0 || hdr->capacity == 0) return 0;
    50          
453 5 50         if (hdr->capacity > 0x7FFFFFFFu) return 0;
454             /* Variant-specific elem_size sanity: prevents buffer overflows in the
455             * XS push paths if a corrupted/tampered file claims an impossibly-small
456             * elem_size (e.g. < 4 for a Str variant where push writes a 4-byte
457             * length prefix). */
458 5 100         if (expected_variant == DEQ_VAR_INT && hdr->elem_size != sizeof(int64_t))
    100          
459 1           return 0;
460 4 100         if (expected_variant == DEQ_VAR_STR && hdr->elem_size < sizeof(uint32_t) + 1)
    100          
461 1           return 0;
462 3 50         if (hdr->total_size != file_size) return 0;
463 3 50         if (hdr->data_off != sizeof(DeqHeader)) return 0;
464 3 50         if (hdr->ctl_off != deq_ctl_offset(hdr->elem_size, hdr->capacity)) return 0;
465 3 50         if (hdr->total_size != deq_total_size(hdr->elem_size, hdr->capacity)) return 0;
466 3           return 1;
467             }
468              
469 21           static DeqHandle *deq_create(const char *path, uint64_t capacity,
470             uint32_t elem_size, uint32_t variant_id,
471             char *errbuf) {
472 21 50         if (errbuf) errbuf[0] = '\0';
473 21 50         if (capacity == 0) { DEQ_ERR("capacity must be > 0"); return NULL; }
    0          
474 21 50         if (elem_size == 0) { DEQ_ERR("elem_size must be > 0"); return NULL; }
    0          
475 21 50         if (capacity > 0x7FFFFFFFu) {
476 0 0         DEQ_ERR("capacity must be <= 2^31 (32-bit cursor halves)"); return NULL;
477             }
478              
479 21           uint64_t total = deq_total_size(elem_size, capacity);
480 21           int anonymous = (path == NULL);
481 21           int fd = -1;
482             size_t map_size;
483             void *base;
484              
485 21 100         if (anonymous) {
486 11           map_size = (size_t)total;
487 11           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
488 11 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
489             } else {
490 10           fd = open(path, O_RDWR|O_CREAT, 0666);
491 15 50         if (fd < 0) { DEQ_ERR("open: %s", strerror(errno)); return NULL; }
    0          
492 10 50         if (flock(fd, LOCK_EX) < 0) { DEQ_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
493             struct stat st;
494 10 50         if (fstat(fd, &st) < 0) {
495 0 0         DEQ_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
496             }
497 10           int is_new = (st.st_size == 0);
498 10 100         if (!is_new && (uint64_t)st.st_size < sizeof(DeqHeader)) {
    50          
499 0 0         DEQ_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
500 0           flock(fd, LOCK_UN); close(fd); return NULL;
501             }
502 10 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
503 0 0         DEQ_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
504             }
505 10 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
506 10           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
507 10 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
508 10 100         if (!is_new) {
509 5 100         if (!deq_validate_header((DeqHeader *)base, (uint64_t)st.st_size, variant_id)) {
510 3 50         DEQ_ERR("invalid deque file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
511             }
512 2           flock(fd, LOCK_UN); close(fd);
513 2           return deq_setup(base, map_size, path, -1);
514             }
515             }
516 16           deq_init_header(base, total, elem_size, variant_id, capacity);
517 16 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
518 16           return deq_setup(base, map_size, path, -1);
519             }
520              
521 1           static DeqHandle *deq_create_memfd(const char *name, uint64_t capacity,
522             uint32_t elem_size, uint32_t variant_id,
523             char *errbuf) {
524 1 50         if (errbuf) errbuf[0] = '\0';
525 1 50         if (capacity == 0) { DEQ_ERR("capacity must be > 0"); return NULL; }
    0          
526 1 50         if (elem_size == 0) { DEQ_ERR("elem_size must be > 0"); return NULL; }
    0          
527 1 50         if (capacity > 0x7FFFFFFFu) {
528 0 0         DEQ_ERR("capacity must be <= 2^31 (32-bit cursor halves)"); return NULL;
529             }
530 1           uint64_t total = deq_total_size(elem_size, capacity);
531 1 50         int fd = memfd_create(name ? name : "deque", MFD_CLOEXEC | MFD_ALLOW_SEALING);
532 1 50         if (fd < 0) { DEQ_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
533 1 50         if (ftruncate(fd, (off_t)total) < 0) { DEQ_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
534 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
535 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
536 1 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
537 1           deq_init_header(base, total, elem_size, variant_id, capacity);
538 1           return deq_setup(base, (size_t)total, NULL, fd);
539             }
540              
541 1           static DeqHandle *deq_open_fd(int fd, uint32_t variant_id, char *errbuf) {
542 1 50         if (errbuf) errbuf[0] = '\0';
543             struct stat st;
544 1 50         if (fstat(fd, &st) < 0) { DEQ_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
545 1 50         if ((uint64_t)st.st_size < sizeof(DeqHeader)) { DEQ_ERR("too small"); return NULL; }
    0          
546 1           size_t ms = (size_t)st.st_size;
547 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
548 1 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
549 1 50         if (!deq_validate_header((DeqHeader *)base, (uint64_t)st.st_size, variant_id)) {
550 0 0         DEQ_ERR("invalid deque"); munmap(base, ms); return NULL;
551             }
552 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
553 1 50         if (myfd < 0) { DEQ_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
554 1           return deq_setup(base, ms, NULL, myfd);
555             }
556              
557 20           static void deq_destroy(DeqHandle *h) {
558 20 50         if (!h) return;
559 20 100         if (h->notify_fd >= 0) close(h->notify_fd);
560 20 100         if (h->backing_fd >= 0) close(h->backing_fd);
561 20 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
562 20           free(h->path);
563 20           free(h);
564             }
565              
566             /* NOT concurrency-safe — use drain() for concurrent scenarios */
567 3           static void deq_clear(DeqHandle *h) {
568 3           __atomic_store_n(&h->hdr->cursor, 0, __ATOMIC_RELEASE);
569             /* Reset all slot ctl to {EMPTY, gen=0}. Safe only when no concurrent
570             * push/pop — which is the documented contract of clear(). */
571 3           memset(h->ctl, 0, (size_t)h->hdr->capacity * sizeof(uint64_t));
572             /* clear() frees the entire deque at once — wake all waiters so they
573             * can re-evaluate state, not just one. */
574 3 50         if (__atomic_load_n(&h->hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
575 0           __atomic_add_fetch(&h->hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
576 0           syscall(SYS_futex, &h->hdr->push_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
577             }
578 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
579 0           __atomic_add_fetch(&h->hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
580 0           syscall(SYS_futex, &h->hdr->pop_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
581             }
582 3           }
583              
584             /* Concurrency-safe drain: CAS cursor to advance head to tail, then release
585             * each drained slot through the state machine so future pushes can reuse.
586             *
587             * Crash-recovery: a pusher that won the cursor CAS but died (SIGKILL/crash)
588             * before completing its write leaves its slot stuck in a non-FILLED state.
589             * Two distinct stall windows:
590             * 1. cursor CAS done, claim_write not yet succeeded — slot is still in
591             * EMPTY@gen (or briefly READING@gen if a prior popper is finishing).
592             * 2. claim_write done, publish not yet done — slot is WRITING@gen.
593             * In either case plain deq_slot_claim_read would spin forever. We bound the
594             * per-slot wait to ~2s; on timeout we CAS the current state -> EMPTY@(gen+1)
595             * so the slot is reclaimed. The lock-free design doesn't track per-slot
596             * owner PID so we cannot distinguish "dead writer" from "live but extremely
597             * slow writer"; a 2s threshold is far longer than any legitimate slot fill.
598             * A falsely-recovered live writer's late deq_slot_publish is a CAS keyed on
599             * the original gen, so it observes the bump and silently no-ops rather than
600             * resurrecting a phantom FILLED slot. */
601 4           static inline uint32_t deq_drain(DeqHandle *h) {
602 4           DeqHeader *hdr = h->hdr;
603 4           uint32_t cap = (uint32_t)hdr->capacity;
604 0           for (;;) {
605 4           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
606 4           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
607 4           uint32_t count = (uint32_t)(t - hd);
608 7 100         if (count == 0) return 0;
609 3           uint64_t nc = DEQ_CURSOR(t, t);
610 3 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
611             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
612 16 100         for (uint32_t i = 0; i < count; i++) {
613 13           uint32_t idx = (hd + i) % cap;
614 13           uint64_t *ctl_word = &h->ctl[idx];
615 13           int recovered = 0;
616 13           int dl_set = 0;
617 13           uint32_t spins = 0;
618             struct timespec dl;
619 1301246           for (;;) {
620 1301259           uint64_t cw = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
621 1301259           uint32_t st = DEQ_SLOT_STATE(cw);
622 1301259 100         if (st == DEQ_SLOT_FILLED) {
623 11           uint64_t nw = (DEQ_SLOT_GEN(cw) << 2) | DEQ_SLOT_READING;
624 11 50         if (__atomic_compare_exchange_n(ctl_word, &cw, nw,
625             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
626 11           deq_slot_release(ctl_word, DEQ_SLOT_GEN(cw));
627 11           break;
628             }
629 0           continue;
630             }
631             /* Non-FILLED: hot-spin first, then short sleeps; on
632             * timeout force the slot to EMPTY@(gen+1) regardless of
633             * current state (covers both stall windows above). */
634 1301248           deq_spin_pause();
635 1301248 100         if ((++spins & 0x3F) == 0) {
636 20332 100         if (!dl_set) {
637 2           deq_make_deadline((double)DEQ_DRAIN_RECOVERY_SEC, &dl);
638 2           dl_set = 1;
639             }
640             struct timespec rem;
641 20332 100         if (!deq_remaining(&dl, &rem)) {
642 2           uint64_t nw = ((DEQ_SLOT_GEN(cw) + 1) << 2) | DEQ_SLOT_EMPTY;
643 2 50         if (__atomic_compare_exchange_n(ctl_word, &cw, nw,
644             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
645 2           recovered = 1;
646 2           break;
647             }
648             /* CAS lost: state advanced concurrently
649             * (publish/release/another recoverer). Re-observe. */
650 0           continue;
651             }
652             /* Short sleep to keep CPU usage low during the long wait. */
653 20330           struct timespec ts = { 0, 100000L }; /* 100us */
654 20330           nanosleep(&ts, NULL);
655             }
656             }
657 13 100         if (recovered)
658 2           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
659             }
660             /* drain freed `count` slots at once — wake up to that many. */
661 3 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
662 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
663 0 0         syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE,
664             count < INT_MAX ? (int)count : INT_MAX,
665             NULL, NULL, 0);
666             }
667 3           return count;
668             }
669             }
670             }
671              
672 2           static int deq_create_eventfd(DeqHandle *h) {
673 2 50         if (h->notify_fd >= 0) return h->notify_fd;
674 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
675 2 50         if (efd < 0) return -1;
676 2           h->notify_fd = efd; return efd;
677             }
678 2           static int deq_notify(DeqHandle *h) {
679 2 50         if (h->notify_fd < 0) return 0;
680 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
681             }
682 2           static int64_t deq_eventfd_consume(DeqHandle *h) {
683 2 50         if (h->notify_fd < 0) return -1;
684 2           uint64_t v = 0;
685 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
686 2           return (int64_t)v;
687             }
688 1           static int deq_msync(DeqHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
689              
690             #endif /* DEQUE_H */