File Coverage

deque.h
Criterion Covered Total %
statement 288 329 87.5
branch 121 242 50.0
condition n/a
subroutine n/a
pod n/a
total 409 571 71.6


line stmt bran cond sub pod time code
1             /*
2             * deque.h -- Fixed-size shared-memory double-ended queue for Linux
3             *
4             * Ring buffer with CAS-based push/pop at both ends, plus a per-slot
5             * publication state machine for MPMC safety:
6             *
7             * EMPTY -> WRITING (pusher claims slot)
8             * WRITING-> FILLED (pusher publishes value)
9             * FILLED -> READING (popper claims slot)
10             * READING-> EMPTY (popper releases, generation bumps)
11             *
12             * The slot's ctl word encodes (generation << 2) | state_bits, and
13             * transitions are made via CAS. Head/tail still serialize the POSITION
14             * handout; the per-slot state machine serializes the VALUE handoff.
15             * A consumer that wins the head/tail CAS always waits for the matching
16             * publisher's transition to FILLED before reading.
17             *
18             * Futex blocking when empty or full.
19             */
20              
21             #ifndef DEQUE_H
22             #define DEQUE_H
23              
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37             #include
38             #include
39              
40             #define DEQ_MAGIC 0x44455132U /* "DEQ2" — v2 layout (per-slot ctl) */
41             #define DEQ_VERSION 2
42             #define DEQ_ERR_BUFLEN 256
43              
44             #define DEQ_VAR_INT 0
45             #define DEQ_VAR_STR 1
46              
47             /* Slot state (low 2 bits of ctl word). Upper 62 bits = generation. */
48             #define DEQ_SLOT_EMPTY 0u
49             #define DEQ_SLOT_WRITING 1u
50             #define DEQ_SLOT_FILLED 2u
51             #define DEQ_SLOT_READING 3u
52             #define DEQ_SLOT_STATE_MASK 3u
53             #define DEQ_SLOT_STATE(c) ((uint32_t)((c) & DEQ_SLOT_STATE_MASK))
54             #define DEQ_SLOT_GEN(c) ((c) >> 2)
55              
56             /* Combined cursor: upper 32 bits = head, lower 32 bits = tail. A single
57             * 64-bit CAS atomically updates both ends, so push_front vs push_back (or
58             * pop_front vs pop_back) cannot both succeed when they share a boundary
59             * slot. Capacity is bounded by 2^31 elements. Head and tail wrap mod 2^32
60             * after 4B ops; size = (tail - head) treated as uint32.
61             */
62             typedef struct {
63             uint32_t magic;
64             uint32_t version;
65             uint32_t elem_size;
66             uint32_t variant_id;
67             uint64_t capacity;
68             uint64_t total_size;
69             uint64_t data_off;
70             uint64_t ctl_off; /* offset to per-slot ctl array */
71             uint8_t _pad0[16];
72              
73             uint64_t cursor; /* 64: (head<<32)|tail */
74             uint32_t waiters_push; /* 72 */
75             uint32_t waiters_pop; /* 76 */
76             uint64_t stat_pushes; /* 80 */
77             uint64_t stat_pops; /* 88 */
78             uint64_t stat_waits; /* 96 */
79             uint64_t stat_timeouts; /* 104 */
80             uint32_t push_wake_seq; /* 112: bumped by every pop, futex word for pushers */
81             uint32_t pop_wake_seq; /* 116: bumped by every push, futex word for poppers */
82             uint8_t _pad1[8]; /* 120-127 */
83             } DeqHeader;
84              
85             #define DEQ_CURSOR(head, tail) (((uint64_t)(head) << 32) | (uint32_t)(tail))
86             #define DEQ_CURSOR_HEAD(c) ((uint32_t)((c) >> 32))
87             #define DEQ_CURSOR_TAIL(c) ((uint32_t)(c))
88             #define DEQ_CURSOR_SIZE(c) ((uint32_t)(DEQ_CURSOR_TAIL(c) - DEQ_CURSOR_HEAD(c)))
89              
90             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
91             _Static_assert(sizeof(DeqHeader) == 128, "DeqHeader must be 128 bytes");
92             #endif
93              
94             typedef struct {
95             DeqHeader *hdr;
96             uint8_t *data;
97             uint64_t *ctl; /* per-slot state+generation word */
98             size_t mmap_size;
99             uint32_t elem_size;
100             char *path;
101             int notify_fd;
102             int backing_fd;
103             } DeqHandle;
104              
105             /* ================================================================ */
106              
107 5           static inline void deq_make_deadline(double t, struct timespec *dl) {
108 5           clock_gettime(CLOCK_MONOTONIC, dl);
109 5           dl->tv_sec += (time_t)t;
110 5           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
111 5 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
112 5           }
113              
114 9           static inline int deq_remaining(const struct timespec *dl, struct timespec *rem) {
115             struct timespec now;
116 9           clock_gettime(CLOCK_MONOTONIC, &now);
117 9           rem->tv_sec = dl->tv_sec - now.tv_sec;
118 9           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
119 9 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
120 9           return rem->tv_sec >= 0;
121             }
122              
123 18140           static inline uint8_t *deq_slot(DeqHandle *h, uint32_t idx) {
124 18140           return h->data + (idx % (uint32_t)h->hdr->capacity) * h->elem_size;
125             }
126              
127 19           static inline uint32_t deq_size(DeqHandle *h) {
128 19           uint64_t c = __atomic_load_n(&h->hdr->cursor, __ATOMIC_ACQUIRE);
129 19           return DEQ_CURSOR_SIZE(c);
130             }
131              
132 0           static inline void deq_spin_pause(void) {
133             #if defined(__x86_64__) || defined(__i386__)
134 0           __asm__ volatile("pause" ::: "memory");
135             #elif defined(__aarch64__)
136             __asm__ volatile("yield" ::: "memory");
137             #endif
138 0           }
139              
140             /* --- per-slot state machine helpers ---
141             * Claim a slot for writing: spin CAS until we observe EMPTY and can mark
142             * WRITING. Returns the generation that was observed, for the matching
143             * publish. Caller holds the position CAS so at most one pusher targets
144             * this slot, but a pending popper from the previous cycle may still be
145             * finishing; the spin is bounded by that popper's READING -> EMPTY store.
146             */
147 88           static inline uint64_t deq_slot_claim_write(uint64_t *ctl_word) {
148 0           for (;;) {
149 88           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
150 88 50         if (DEQ_SLOT_STATE(c) == DEQ_SLOT_EMPTY) {
151 88           uint64_t nc = (DEQ_SLOT_GEN(c) << 2) | DEQ_SLOT_WRITING;
152 88 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
153             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
154 88           return DEQ_SLOT_GEN(c);
155             }
156 0           deq_spin_pause();
157             }
158             }
159              
160             /* Publish written value: WRITING -> FILLED at same generation. */
161 88           static inline void deq_slot_publish(uint64_t *ctl_word, uint64_t gen) {
162 88           __atomic_store_n(ctl_word, (gen << 2) | DEQ_SLOT_FILLED, __ATOMIC_RELEASE);
163 88           }
164              
165             /* Claim a slot for reading: spin CAS until we observe FILLED and mark READING. */
166 18057           static inline uint64_t deq_slot_claim_read(uint64_t *ctl_word) {
167 0           for (;;) {
168 18057           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
169 18057 50         if (DEQ_SLOT_STATE(c) == DEQ_SLOT_FILLED) {
170 18057           uint64_t nc = (DEQ_SLOT_GEN(c) << 2) | DEQ_SLOT_READING;
171 18057 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
172             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
173 18057           return DEQ_SLOT_GEN(c);
174             }
175 0           deq_spin_pause();
176             }
177             }
178              
179             /* Release slot after read: READING -> EMPTY with gen+1. */
180 18057           static inline void deq_slot_release(uint64_t *ctl_word, uint64_t gen) {
181 18057           __atomic_store_n(ctl_word, ((gen + 1) << 2) | DEQ_SLOT_EMPTY, __ATOMIC_RELEASE);
182 18057           }
183              
184             /* ================================================================
185             * Push back (tail++)
186             * ================================================================ */
187              
188 83           static inline int deq_try_push_back(DeqHandle *h, const void *val, uint32_t vlen) {
189 83           DeqHeader *hdr = h->hdr;
190 83           uint32_t cap = (uint32_t)hdr->capacity;
191 0           for (;;) {
192 83           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
193 83           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
194 161 100         if ((uint32_t)(t - hd) >= cap) return 0;
195 78           uint64_t nc = DEQ_CURSOR(hd, t + 1);
196 78 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
197             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
198 78           uint32_t sz = h->elem_size;
199 78           uint32_t cp = vlen < sz ? vlen : sz;
200 78           uint32_t idx = t % cap;
201 78           uint64_t gen = deq_slot_claim_write(&h->ctl[idx]);
202 78           memcpy(deq_slot(h, t), val, cp);
203 78 50         if (cp < sz) memset(deq_slot(h, t) + cp, 0, sz - cp);
204 78           deq_slot_publish(&h->ctl[idx], gen);
205 78           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
206 78 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
207 0           __atomic_add_fetch(&hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
208 0           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
209             }
210 78           return 1;
211             }
212             }
213             }
214              
215             /* ================================================================
216             * Push front (head--)
217             * ================================================================ */
218              
219 15           static inline int deq_try_push_front(DeqHandle *h, const void *val, uint32_t vlen) {
220 15           DeqHeader *hdr = h->hdr;
221 15           uint32_t cap = (uint32_t)hdr->capacity;
222 0           for (;;) {
223 15           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
224 15           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
225 25 100         if ((uint32_t)(t - hd) >= cap) return 0;
226 10           uint32_t new_hd = hd - 1;
227 10           uint64_t nc = DEQ_CURSOR(new_hd, t);
228 10 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
229             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
230 10           uint32_t sz = h->elem_size;
231 10           uint32_t cp = vlen < sz ? vlen : sz;
232 10           uint32_t idx = new_hd % cap;
233 10           uint64_t gen = deq_slot_claim_write(&h->ctl[idx]);
234 10           memcpy(deq_slot(h, new_hd), val, cp);
235 10 50         if (cp < sz) memset(deq_slot(h, new_hd) + cp, 0, sz - cp);
236 10           deq_slot_publish(&h->ctl[idx], gen);
237 10           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
238 10 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
239 0           __atomic_add_fetch(&hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
240 0           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
241             }
242 10           return 1;
243             }
244             }
245             }
246              
247             /* ================================================================
248             * Pop front (head++)
249             * ================================================================ */
250              
251 18048           static inline int deq_try_pop_front(DeqHandle *h, void *out) {
252 18048           DeqHeader *hdr = h->hdr;
253 18048           uint32_t cap = (uint32_t)hdr->capacity;
254 0           for (;;) {
255 18048           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
256 18048           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
257 36090 100         if ((uint32_t)(t - hd) == 0) return 0;
258 18042           uint64_t nc = DEQ_CURSOR(hd + 1, t);
259 18042 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
260             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
261 18042           uint32_t idx = hd % cap;
262 18042           uint64_t gen = deq_slot_claim_read(&h->ctl[idx]);
263 18042           memcpy(out, deq_slot(h, hd), h->elem_size);
264 18042           deq_slot_release(&h->ctl[idx], gen);
265 18042           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
266 18042 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
267 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
268 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
269             }
270 18042           return 1;
271             }
272             }
273             }
274              
275             /* ================================================================
276             * Pop back (tail--)
277             * ================================================================ */
278              
279 14           static inline int deq_try_pop_back(DeqHandle *h, void *out) {
280 14           DeqHeader *hdr = h->hdr;
281 14           uint32_t cap = (uint32_t)hdr->capacity;
282 0           for (;;) {
283 14           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
284 14           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
285 24 100         if ((uint32_t)(t - hd) == 0) return 0;
286 10           uint64_t nc = DEQ_CURSOR(hd, t - 1);
287 10 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
288             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
289 10           uint32_t idx = (t - 1) % cap;
290 10           uint64_t gen = deq_slot_claim_read(&h->ctl[idx]);
291 10           memcpy(out, deq_slot(h, t - 1), h->elem_size);
292 10           deq_slot_release(&h->ctl[idx], gen);
293 10           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
294 10 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
295 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
296 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
297             }
298 10           return 1;
299             }
300             }
301             }
302              
303             /* ================================================================
304             * Blocking push/pop
305             * ================================================================ */
306              
307 2           static inline int deq_push_wait(DeqHandle *h, const void *val, uint32_t vlen,
308             int front, double timeout) {
309 2           int (*try_fn)(DeqHandle*, const void*, uint32_t) =
310 2 100         front ? deq_try_push_front : deq_try_push_back;
311 2 50         if (try_fn(h, val, vlen)) return 1;
312 2 50         if (timeout == 0) return 0;
313              
314 2           DeqHeader *hdr = h->hdr;
315             struct timespec dl, rem;
316 2           int has_dl = (timeout > 0);
317 2 50         if (has_dl) deq_make_deadline(timeout, &dl);
318 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
319              
320 2           uint32_t cap = (uint32_t)hdr->capacity;
321 0           for (;;) {
322 2           uint32_t wseq = __atomic_load_n(&hdr->push_wake_seq, __ATOMIC_ACQUIRE);
323 2           __atomic_add_fetch(&hdr->waiters_push, 1, __ATOMIC_RELEASE);
324 2           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
325 2 50         if (DEQ_CURSOR_SIZE(c) >= cap) {
326 2           struct timespec *pts = NULL;
327 2 50         if (has_dl) {
328 2 50         if (!deq_remaining(&dl, &rem)) {
329 0           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
330 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
331 0           return 0;
332             }
333 2           pts = &rem;
334             }
335 2           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
336             }
337 2           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
338 2 50         if (try_fn(h, val, vlen)) return 1;
339 2 50         if (has_dl && !deq_remaining(&dl, &rem)) {
    50          
340 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
341 2           return 0;
342             }
343             }
344             }
345              
346 3           static inline int deq_pop_wait(DeqHandle *h, void *out, int back, double timeout) {
347 3           int (*try_fn)(DeqHandle*, void*) =
348 3 100         back ? deq_try_pop_back : deq_try_pop_front;
349 3 50         if (try_fn(h, out)) return 1;
350 3 50         if (timeout == 0) return 0;
351              
352 3           DeqHeader *hdr = h->hdr;
353             struct timespec dl, rem;
354 3           int has_dl = (timeout > 0);
355 3 50         if (has_dl) deq_make_deadline(timeout, &dl);
356 3           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
357              
358 0           for (;;) {
359 3           uint32_t wseq = __atomic_load_n(&hdr->pop_wake_seq, __ATOMIC_ACQUIRE);
360 3           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
361 3           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
362 3 50         if (DEQ_CURSOR_SIZE(c) == 0) {
363 3           struct timespec *pts = NULL;
364 3 50         if (has_dl) {
365 3 50         if (!deq_remaining(&dl, &rem)) {
366 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
367 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
368 0           return 0;
369             }
370 3           pts = &rem;
371             }
372 3           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
373             }
374 3           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
375 3 100         if (try_fn(h, out)) return 1;
376 2 50         if (has_dl && !deq_remaining(&dl, &rem)) {
    50          
377 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
378 2           return 0;
379             }
380             }
381             }
382              
383             /* ================================================================
384             * Create / Open / Close
385             * ================================================================ */
386              
387             #define DEQ_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, DEQ_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
388              
389             /* Layout offsets — data array first, then 8-byte-aligned ctl array. */
390 39           static inline uint64_t deq_ctl_offset(uint32_t elem_size, uint64_t capacity) {
391 39           uint64_t data_end = sizeof(DeqHeader) + capacity * elem_size;
392 39           return (data_end + 7u) & ~(uint64_t)7u;
393             }
394              
395 21           static inline uint64_t deq_total_size(uint32_t elem_size, uint64_t capacity) {
396 21           return deq_ctl_offset(elem_size, capacity) + capacity * sizeof(uint64_t);
397             }
398              
399 15           static inline void deq_init_header(void *base, uint64_t total,
400             uint32_t elem_size, uint32_t variant_id,
401             uint64_t capacity) {
402 15           DeqHeader *hdr = (DeqHeader *)base;
403 15           memset(base, 0, (size_t)total); /* zeroes data + ctl → all slots EMPTY, gen=0 */
404 15           hdr->magic = DEQ_MAGIC;
405 15           hdr->version = DEQ_VERSION;
406 15           hdr->elem_size = elem_size;
407 15           hdr->variant_id = variant_id;
408 15           hdr->capacity = capacity;
409 15           hdr->total_size = total;
410 15           hdr->data_off = sizeof(DeqHeader);
411 15           hdr->ctl_off = deq_ctl_offset(elem_size, capacity);
412 15           __atomic_thread_fence(__ATOMIC_SEQ_CST);
413 15           }
414              
415 18           static inline DeqHandle *deq_setup(void *base, size_t ms, const char *path, int bfd) {
416 18           DeqHeader *hdr = (DeqHeader *)base;
417 18           DeqHandle *h = (DeqHandle *)calloc(1, sizeof(DeqHandle));
418 18 50         if (!h) { munmap(base, ms); return NULL; }
419 18           h->hdr = hdr;
420 18           h->data = (uint8_t *)base + hdr->data_off;
421 18           h->ctl = (uint64_t *)((uint8_t *)base + hdr->ctl_off);
422 18           h->mmap_size = ms;
423 18           h->elem_size = hdr->elem_size;
424 18 100         h->path = path ? strdup(path) : NULL;
425 18           h->notify_fd = -1;
426 18           h->backing_fd = bfd;
427 18           return h;
428             }
429              
430             /* Validate a mapped header (shared by deq_create reopen and deq_open_fd). */
431 4           static inline int deq_validate_header(const DeqHeader *hdr, uint64_t file_size,
432             uint32_t expected_variant) {
433 4 100         if (hdr->magic != DEQ_MAGIC) return 0;
434 3 50         if (hdr->version != DEQ_VERSION) return 0;
435 3 50         if (hdr->variant_id != expected_variant) return 0;
436 3 50         if (hdr->elem_size == 0 || hdr->capacity == 0) return 0;
    50          
437 3 50         if (hdr->capacity > 0x7FFFFFFFu) return 0;
438 3 50         if (hdr->total_size != file_size) return 0;
439 3 50         if (hdr->data_off != sizeof(DeqHeader)) return 0;
440 3 50         if (hdr->ctl_off != deq_ctl_offset(hdr->elem_size, hdr->capacity)) return 0;
441 3 50         if (hdr->total_size != deq_total_size(hdr->elem_size, hdr->capacity)) return 0;
442 3           return 1;
443             }
444              
445 17           static DeqHandle *deq_create(const char *path, uint64_t capacity,
446             uint32_t elem_size, uint32_t variant_id,
447             char *errbuf) {
448 17 50         if (errbuf) errbuf[0] = '\0';
449 17 50         if (capacity == 0) { DEQ_ERR("capacity must be > 0"); return NULL; }
    0          
450 17 50         if (elem_size == 0) { DEQ_ERR("elem_size must be > 0"); return NULL; }
    0          
451 17 50         if (capacity > 0x7FFFFFFFu) {
452 0 0         DEQ_ERR("capacity must be <= 2^31 (32-bit cursor halves)"); return NULL;
453             }
454              
455 17           uint64_t total = deq_total_size(elem_size, capacity);
456 17           int anonymous = (path == NULL);
457 17           int fd = -1;
458             size_t map_size;
459             void *base;
460              
461 17 100         if (anonymous) {
462 11           map_size = (size_t)total;
463 11           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
464 11 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
465             } else {
466 6           fd = open(path, O_RDWR|O_CREAT, 0666);
467 9 50         if (fd < 0) { DEQ_ERR("open: %s", strerror(errno)); return NULL; }
    0          
468 6 50         if (flock(fd, LOCK_EX) < 0) { DEQ_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
469             struct stat st;
470 6 50         if (fstat(fd, &st) < 0) {
471 0 0         DEQ_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
472             }
473 6           int is_new = (st.st_size == 0);
474 6 100         if (!is_new && (uint64_t)st.st_size < sizeof(DeqHeader)) {
    50          
475 0 0         DEQ_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
476 0           flock(fd, LOCK_UN); close(fd); return NULL;
477             }
478 6 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
479 0 0         DEQ_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
480             }
481 6 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
482 6           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
483 6 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
484 6 100         if (!is_new) {
485 3 100         if (!deq_validate_header((DeqHeader *)base, (uint64_t)st.st_size, variant_id)) {
486 1 50         DEQ_ERR("invalid deque file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
487             }
488 2           flock(fd, LOCK_UN); close(fd);
489 2           return deq_setup(base, map_size, path, -1);
490             }
491             }
492 14           deq_init_header(base, total, elem_size, variant_id, capacity);
493 14 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
494 14           return deq_setup(base, map_size, path, -1);
495             }
496              
497 1           static DeqHandle *deq_create_memfd(const char *name, uint64_t capacity,
498             uint32_t elem_size, uint32_t variant_id,
499             char *errbuf) {
500 1 50         if (errbuf) errbuf[0] = '\0';
501 1 50         if (capacity == 0) { DEQ_ERR("capacity must be > 0"); return NULL; }
    0          
502 1 50         if (elem_size == 0) { DEQ_ERR("elem_size must be > 0"); return NULL; }
    0          
503 1 50         if (capacity > 0x7FFFFFFFu) {
504 0 0         DEQ_ERR("capacity must be <= 2^31 (32-bit cursor halves)"); return NULL;
505             }
506 1           uint64_t total = deq_total_size(elem_size, capacity);
507 1 50         int fd = memfd_create(name ? name : "deque", MFD_CLOEXEC | MFD_ALLOW_SEALING);
508 1 50         if (fd < 0) { DEQ_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
509 1 50         if (ftruncate(fd, (off_t)total) < 0) { DEQ_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
510 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
511 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
512 1 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
513 1           deq_init_header(base, total, elem_size, variant_id, capacity);
514 1           return deq_setup(base, (size_t)total, NULL, fd);
515             }
516              
517 1           static DeqHandle *deq_open_fd(int fd, uint32_t variant_id, char *errbuf) {
518 1 50         if (errbuf) errbuf[0] = '\0';
519             struct stat st;
520 1 50         if (fstat(fd, &st) < 0) { DEQ_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
521 1 50         if ((uint64_t)st.st_size < sizeof(DeqHeader)) { DEQ_ERR("too small"); return NULL; }
    0          
522 1           size_t ms = (size_t)st.st_size;
523 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
524 1 50         if (base == MAP_FAILED) { DEQ_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
525 1 50         if (!deq_validate_header((DeqHeader *)base, (uint64_t)st.st_size, variant_id)) {
526 0 0         DEQ_ERR("invalid deque"); munmap(base, ms); return NULL;
527             }
528 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
529 1 50         if (myfd < 0) { DEQ_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
530 1           return deq_setup(base, ms, NULL, myfd);
531             }
532              
533 18           static void deq_destroy(DeqHandle *h) {
534 18 50         if (!h) return;
535 18 100         if (h->notify_fd >= 0) close(h->notify_fd);
536 18 100         if (h->backing_fd >= 0) close(h->backing_fd);
537 18 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
538 18           free(h->path);
539 18           free(h);
540             }
541              
542             /* NOT concurrency-safe — use drain() for concurrent scenarios */
543 3           static void deq_clear(DeqHandle *h) {
544 3           __atomic_store_n(&h->hdr->cursor, 0, __ATOMIC_RELEASE);
545             /* Reset all slot ctl to {EMPTY, gen=0}. Safe only when no concurrent
546             * push/pop — which is the documented contract of clear(). */
547 3           memset(h->ctl, 0, (size_t)h->hdr->capacity * sizeof(uint64_t));
548             /* clear() frees the entire deque at once — wake all waiters so they
549             * can re-evaluate state, not just one. */
550 3 50         if (__atomic_load_n(&h->hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
551 0           __atomic_add_fetch(&h->hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
552 0           syscall(SYS_futex, &h->hdr->push_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
553             }
554 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
555 0           __atomic_add_fetch(&h->hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
556 0           syscall(SYS_futex, &h->hdr->pop_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
557             }
558 3           }
559              
560             /* Concurrency-safe drain: CAS cursor to advance head to tail, then release
561             * each drained slot through the state machine so future pushes can reuse. */
562 2           static inline uint32_t deq_drain(DeqHandle *h) {
563 2           DeqHeader *hdr = h->hdr;
564 2           uint32_t cap = (uint32_t)hdr->capacity;
565 0           for (;;) {
566 2           uint64_t c = __atomic_load_n(&hdr->cursor, __ATOMIC_ACQUIRE);
567 2           uint32_t hd = DEQ_CURSOR_HEAD(c), t = DEQ_CURSOR_TAIL(c);
568 2           uint32_t count = (uint32_t)(t - hd);
569 3 100         if (count == 0) return 0;
570 1           uint64_t nc = DEQ_CURSOR(t, t);
571 1 50         if (__atomic_compare_exchange_n(&hdr->cursor, &c, nc,
572             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
573 6 100         for (uint32_t i = 0; i < count; i++) {
574 5           uint32_t idx = (hd + i) % cap;
575 5           uint64_t gen = deq_slot_claim_read(&h->ctl[idx]);
576 5           deq_slot_release(&h->ctl[idx], gen);
577             }
578             /* drain freed `count` slots at once — wake up to that many. */
579 1 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
580 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
581 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE,
582             (int)count, NULL, NULL, 0);
583             }
584 1           return count;
585             }
586             }
587             }
588              
589 2           static int deq_create_eventfd(DeqHandle *h) {
590 2 50         if (h->notify_fd >= 0) return h->notify_fd;
591 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
592 2 50         if (efd < 0) return -1;
593 2           h->notify_fd = efd; return efd;
594             }
595 2           static int deq_notify(DeqHandle *h) {
596 2 50         if (h->notify_fd < 0) return 0;
597 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
598             }
599 2           static int64_t deq_eventfd_consume(DeqHandle *h) {
600 2 50         if (h->notify_fd < 0) return -1;
601 2           uint64_t v = 0;
602 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
603 2           return (int64_t)v;
604             }
605 1           static int deq_msync(DeqHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
606              
607             #endif /* DEQUE_H */