File Coverage

stack.h
Criterion Covered Total %
statement 259 309 83.8
branch 114 246 46.3
condition n/a
subroutine n/a
pod n/a
total 373 555 67.2


line stmt bran cond sub pod time code
1             /*
2             * stack.h -- Fixed-size shared-memory LIFO stack for Linux
3             *
4             * CAS-based position handout (top) with per-slot publication state
5             * machine (EMPTY -> WRITING -> FILLED -> READING -> EMPTY, generation
6             * bumped on release). The state machine closes the race between a
7             * position CAS and the corresponding slot write/read under MPMC.
8             * Futex blocking via push/pop_wake_seq when empty (pop) or full (push).
9             *
10             * Variants: Int (int64_t), Str (length-prefixed)
11             */
12              
13             #ifndef STACK_H
14             #define STACK_H
15              
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31              
32             #define STK_MAGIC 0x53544B32U /* "STK2" — v2 layout (per-slot ctl) */
33             #define STK_VERSION 2
34             #define STK_ERR_BUFLEN 256
35              
36             #define STK_VAR_INT 0
37             #define STK_VAR_STR 1
38              
39             /* Slot state (low 2 bits of ctl word). Upper 62 bits = generation. */
40             #define STK_SLOT_EMPTY 0u
41             #define STK_SLOT_WRITING 1u
42             #define STK_SLOT_FILLED 2u
43             #define STK_SLOT_READING 3u
44             #define STK_SLOT_STATE_MASK 3u
45             #define STK_SLOT_STATE(c) ((uint32_t)((c) & STK_SLOT_STATE_MASK))
46             #define STK_SLOT_GEN(c) ((c) >> 2)
47              
48             /* ================================================================
49             * Header (128 bytes)
50             * ================================================================ */
51              
52             typedef struct {
53             uint32_t magic;
54             uint32_t version;
55             uint32_t elem_size;
56             uint32_t variant_id;
57             uint64_t capacity;
58             uint64_t total_size;
59             uint64_t data_off;
60             uint64_t ctl_off; /* offset to per-slot ctl array */
61             uint8_t _pad0[16];
62              
63             uint32_t top; /* 64: next free index (0=empty, capacity=full) */
64             uint32_t waiters_push; /* 68 */
65             uint32_t waiters_pop; /* 72 */
66             uint32_t push_wake_seq; /* 76: bumped by every pop, futex word for pushers */
67             uint32_t pop_wake_seq; /* 80: bumped by every push, futex word for poppers */
68             uint32_t _pad1; /* 84 */
69             uint64_t stat_pushes; /* 88 */
70             uint64_t stat_pops; /* 96 */
71             uint64_t stat_waits; /* 104 */
72             uint64_t stat_timeouts; /* 112 */
73             uint8_t _pad2[8]; /* 120-127 */
74             } StkHeader;
75              
76             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
77             _Static_assert(sizeof(StkHeader) == 128, "StkHeader must be 128 bytes");
78             #endif
79              
80             typedef struct {
81             StkHeader *hdr;
82             uint8_t *data;
83             uint64_t *ctl; /* per-slot state+generation word */
84             size_t mmap_size;
85             uint32_t elem_size; /* cached from header at open time */
86             char *path;
87             int notify_fd;
88             int backing_fd;
89             } StkHandle;
90              
91             /* ================================================================
92             * Utility
93             * ================================================================ */
94              
95 3           static inline void stk_make_deadline(double timeout, struct timespec *dl) {
96 3           clock_gettime(CLOCK_MONOTONIC, dl);
97 3           dl->tv_sec += (time_t)timeout;
98 3           dl->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
99 3 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
100 3           }
101              
102 5           static inline int stk_remaining(const struct timespec *dl, struct timespec *rem) {
103             struct timespec now;
104 5           clock_gettime(CLOCK_MONOTONIC, &now);
105 5           rem->tv_sec = dl->tv_sec - now.tv_sec;
106 5           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
107 5 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
108 5           return rem->tv_sec >= 0;
109             }
110              
111 18105           static inline uint8_t *stk_slot(StkHandle *h, uint32_t idx) {
112 18105           return h->data + (uint64_t)idx * h->elem_size;
113             }
114              
115 0           static inline void stk_spin_pause(void) {
116             #if defined(__x86_64__) || defined(__i386__)
117 0           __asm__ volatile("pause" ::: "memory");
118             #elif defined(__aarch64__)
119             __asm__ volatile("yield" ::: "memory");
120             #endif
121 0           }
122              
123             /* Slot state machine — same pattern as Data::Deque::Shared. */
124 63           static inline uint64_t stk_slot_claim_write(uint64_t *ctl_word) {
125 0           for (;;) {
126 63           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
127 63 50         if (STK_SLOT_STATE(c) == STK_SLOT_EMPTY) {
128 63           uint64_t nc = (STK_SLOT_GEN(c) << 2) | STK_SLOT_WRITING;
129 63 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
130             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
131 63           return STK_SLOT_GEN(c);
132             }
133 0           stk_spin_pause();
134             }
135             }
136              
137             /* Publish WRITING@gen -> FILLED@gen. Implemented as CAS (not a plain store)
138             * so that if stk_drain force-recovered the slot mid-write — bumping it to
139             * EMPTY@(gen+1) — this publish is a no-op rather than clobbering the
140             * recovered state back to FILLED@gen. That would leave a phantom FILLED at
141             * a stale gen which the next pusher's stk_slot_claim_write (waits on EMPTY)
142             * could never advance past, deadlocking that slot forever. The caller's
143             * top CAS was already committed, so on lost-race the value is silently
144             * dropped — matching the documented drain-recovery semantics. */
145 63           static inline void stk_slot_publish(uint64_t *ctl_word, uint64_t gen) {
146 63           uint64_t expected = (gen << 2) | STK_SLOT_WRITING;
147 63           uint64_t desired = (gen << 2) | STK_SLOT_FILLED;
148 63           (void)__atomic_compare_exchange_n(ctl_word, &expected, desired,
149             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
150 63           }
151              
152 18039           static inline uint64_t stk_slot_claim_read(uint64_t *ctl_word) {
153 0           for (;;) {
154 18039           uint64_t c = __atomic_load_n(ctl_word, __ATOMIC_ACQUIRE);
155 18039 50         if (STK_SLOT_STATE(c) == STK_SLOT_FILLED) {
156 18039           uint64_t nc = (STK_SLOT_GEN(c) << 2) | STK_SLOT_READING;
157 18039 50         if (__atomic_compare_exchange_n(ctl_word, &c, nc,
158             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
159 18039           return STK_SLOT_GEN(c);
160             }
161 0           stk_spin_pause();
162             }
163             }
164              
165 18044           static inline void stk_slot_release(uint64_t *ctl_word, uint64_t gen) {
166 18044           __atomic_store_n(ctl_word, ((gen + 1) << 2) | STK_SLOT_EMPTY, __ATOMIC_RELEASE);
167 18044           }
168              
169             /* ================================================================
170             * Push (LIFO top++)
171             * ================================================================ */
172              
173 67           static inline int stk_try_push(StkHandle *h, const void *val, uint32_t vlen) {
174 67           StkHeader *hdr = h->hdr;
175 67           uint32_t cap = (uint32_t)hdr->capacity;
176 0           for (;;) {
177 67           uint32_t t = __atomic_load_n(&hdr->top, __ATOMIC_RELAXED);
178 130 100         if (t >= cap) return 0;
179 63 50         if (__atomic_compare_exchange_n(&hdr->top, &t, t + 1,
180             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
181 63           uint32_t sz = h->elem_size;
182 63           uint32_t cp = vlen < sz ? vlen : sz;
183 63           uint64_t gen = stk_slot_claim_write(&h->ctl[t]);
184 63           memcpy(stk_slot(h, t), val, cp);
185 63 50         if (cp < sz) memset(stk_slot(h, t) + cp, 0, sz - cp);
186 63           stk_slot_publish(&h->ctl[t], gen);
187 63           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
188 63 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
189 0           __atomic_add_fetch(&hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
190 0           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
191             }
192 63           return 1;
193             }
194             }
195             }
196              
197 1           static inline int stk_push(StkHandle *h, const void *val, uint32_t vlen, double timeout) {
198 1 50         if (stk_try_push(h, val, vlen)) return 1;
199 1 50         if (timeout == 0) return 0;
200              
201 1           StkHeader *hdr = h->hdr;
202 1           uint32_t cap = (uint32_t)hdr->capacity;
203             struct timespec dl, rem;
204 1           int has_dl = (timeout > 0);
205 1 50         if (has_dl) stk_make_deadline(timeout, &dl);
206 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
207              
208 0           for (;;) {
209 1           uint32_t wseq = __atomic_load_n(&hdr->push_wake_seq, __ATOMIC_ACQUIRE);
210 1           __atomic_add_fetch(&hdr->waiters_push, 1, __ATOMIC_RELEASE);
211 1           uint32_t t = __atomic_load_n(&hdr->top, __ATOMIC_ACQUIRE);
212 1 50         if (t >= cap) {
213 1           struct timespec *pts = NULL;
214 1 50         if (has_dl) {
215 1 50         if (!stk_remaining(&dl, &rem)) {
216 0           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
217 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
218 0           return 0;
219             }
220 1           pts = &rem;
221             }
222 1           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
223             }
224 1           __atomic_sub_fetch(&hdr->waiters_push, 1, __ATOMIC_RELAXED);
225 1 50         if (stk_try_push(h, val, vlen)) return 1;
226 1 50         if (has_dl && !stk_remaining(&dl, &rem)) {
    50          
227 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
228 1           return 0;
229             }
230             }
231             }
232              
233             /* ================================================================
234             * Pop (LIFO top--)
235             * ================================================================ */
236              
237 18047           static inline int stk_try_pop(StkHandle *h, void *out) {
238 18047           StkHeader *hdr = h->hdr;
239 0           for (;;) {
240 18047           uint32_t t = __atomic_load_n(&hdr->top, __ATOMIC_ACQUIRE);
241 36086 100         if (t == 0) return 0;
242 18039 50         if (__atomic_compare_exchange_n(&hdr->top, &t, t - 1,
243             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
244 18039           uint64_t gen = stk_slot_claim_read(&h->ctl[t - 1]);
245 18039           memcpy(out, stk_slot(h, t - 1), h->elem_size);
246 18039           stk_slot_release(&h->ctl[t - 1], gen);
247 18039           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
248 18039 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
249 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
250 0           syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE, 1, NULL, NULL, 0);
251             }
252 18039           return 1;
253             }
254             }
255             }
256              
257 2           static inline int stk_pop(StkHandle *h, void *out, double timeout) {
258 2 50         if (stk_try_pop(h, out)) return 1;
259 2 50         if (timeout == 0) return 0;
260              
261 2           StkHeader *hdr = h->hdr;
262             struct timespec dl, rem;
263 2           int has_dl = (timeout > 0);
264 2 50         if (has_dl) stk_make_deadline(timeout, &dl);
265 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
266              
267 0           for (;;) {
268 2           uint32_t wseq = __atomic_load_n(&hdr->pop_wake_seq, __ATOMIC_ACQUIRE);
269 2           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
270 2           uint32_t t = __atomic_load_n(&hdr->top, __ATOMIC_ACQUIRE);
271 2 50         if (t == 0) {
272 2           struct timespec *pts = NULL;
273 2 50         if (has_dl) {
274 2 50         if (!stk_remaining(&dl, &rem)) {
275 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
276 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
277 0           return 0;
278             }
279 2           pts = &rem;
280             }
281 2           syscall(SYS_futex, &hdr->pop_wake_seq, FUTEX_WAIT, wseq, pts, NULL, 0);
282             }
283 2           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
284 2 100         if (stk_try_pop(h, out)) return 1;
285 1 50         if (has_dl && !stk_remaining(&dl, &rem)) {
    50          
286 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
287 1           return 0;
288             }
289             }
290             }
291              
292             /* ================================================================
293             * Peek / Status
294             * ================================================================ */
295              
296             /* Best-effort peek: seqlock-style retry against slot ctl. Returns 0 if
297             * empty OR if the top is mutating concurrently after the retry budget.
298             * Under no contention this is a single load. */
299 3           static inline int stk_peek(StkHandle *h, void *out) {
300 3 50         for (int tries = 0; tries < 64; tries++) {
301 3           uint32_t t = __atomic_load_n(&h->hdr->top, __ATOMIC_ACQUIRE);
302 3 50         if (t == 0) return 0;
303 3           uint64_t c1 = __atomic_load_n(&h->ctl[t - 1], __ATOMIC_ACQUIRE);
304 3 50         if (STK_SLOT_STATE(c1) != STK_SLOT_FILLED) {
305 0           stk_spin_pause();
306 0           continue;
307             }
308 3           memcpy(out, stk_slot(h, t - 1), h->elem_size);
309 3           uint64_t c2 = __atomic_load_n(&h->ctl[t - 1], __ATOMIC_ACQUIRE);
310 3           uint32_t t2 = __atomic_load_n(&h->hdr->top, __ATOMIC_ACQUIRE);
311 3 50         if (c1 == c2 && t == t2) return 1;
    50          
312             }
313 0           return 0;
314             }
315              
316 17           static inline uint32_t stk_size(StkHandle *h) {
317 17           return __atomic_load_n(&h->hdr->top, __ATOMIC_RELAXED);
318             }
319              
320             /* ================================================================
321             * Create / Open / Close
322             * ================================================================ */
323              
324             #define STK_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, STK_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
325              
326 36           static inline uint64_t stk_ctl_offset(uint32_t elem_size, uint64_t capacity) {
327 36           uint64_t data_end = sizeof(StkHeader) + capacity * elem_size;
328 36           return (data_end + 7u) & ~(uint64_t)7u;
329             }
330              
331 19           static inline uint64_t stk_total_size(uint32_t elem_size, uint64_t capacity) {
332 19           return stk_ctl_offset(elem_size, capacity) + capacity * sizeof(uint64_t);
333             }
334              
335 15           static inline void stk_init_header(void *base, uint64_t total,
336             uint32_t elem_size, uint32_t variant_id,
337             uint64_t capacity) {
338 15           StkHeader *hdr = (StkHeader *)base;
339 15           memset(base, 0, (size_t)total); /* zeroes ctl array → all slots EMPTY, gen=0 */
340 15           hdr->magic = STK_MAGIC;
341 15           hdr->version = STK_VERSION;
342 15           hdr->elem_size = elem_size;
343 15           hdr->variant_id = variant_id;
344 15           hdr->capacity = capacity;
345 15           hdr->total_size = total;
346 15           hdr->data_off = sizeof(StkHeader);
347 15           hdr->ctl_off = stk_ctl_offset(elem_size, capacity);
348 15           __atomic_thread_fence(__ATOMIC_SEQ_CST);
349 15           }
350              
351 17           static inline StkHandle *stk_setup(void *base, size_t msize,
352             const char *path, int bfd) {
353 17           StkHeader *hdr = (StkHeader *)base;
354 17           StkHandle *h = (StkHandle *)calloc(1, sizeof(StkHandle));
355 17 50         if (!h) { munmap(base, msize); return NULL; }
356 17           h->hdr = hdr;
357 17           h->data = (uint8_t *)base + hdr->data_off;
358 17           h->ctl = (uint64_t *)((uint8_t *)base + hdr->ctl_off);
359 17           h->mmap_size = msize;
360 17           h->elem_size = hdr->elem_size; /* cached — safe from shared-mem tampering */
361 17 100         h->path = path ? strdup(path) : NULL;
362 17           h->notify_fd = -1;
363 17           h->backing_fd = bfd;
364 17           return h;
365             }
366              
367             /* Validate a mapped header (shared by stk_create reopen and stk_open_fd). */
368 3           static inline int stk_validate_header(const StkHeader *hdr, uint64_t file_size,
369             uint32_t expected_variant) {
370 3 100         if (hdr->magic != STK_MAGIC) return 0;
371 2 50         if (hdr->version != STK_VERSION) return 0;
372 2 50         if (hdr->variant_id != expected_variant) return 0;
373 2 50         if (hdr->elem_size == 0 || hdr->capacity == 0) return 0;
    50          
374 2 50         if (hdr->capacity > 0x7FFFFFFFu) return 0;
375 2 50         if (hdr->total_size != file_size) return 0;
376 2 50         if (hdr->data_off != sizeof(StkHeader)) return 0;
377 2 50         if (hdr->ctl_off != stk_ctl_offset(hdr->elem_size, hdr->capacity)) return 0;
378 2 50         if (hdr->total_size != stk_total_size(hdr->elem_size, hdr->capacity)) return 0;
379 2           return 1;
380             }
381              
382 16           static StkHandle *stk_create(const char *path, uint64_t capacity,
383             uint32_t elem_size, uint32_t variant_id,
384             char *errbuf) {
385 16 50         if (errbuf) errbuf[0] = '\0';
386 16 50         if (capacity == 0) { STK_ERR("capacity must be > 0"); return NULL; }
    0          
387 16 50         if (elem_size == 0) { STK_ERR("elem_size must be > 0"); return NULL; }
    0          
388 16 50         if (capacity > (UINT64_MAX - sizeof(StkHeader) - 16) / (elem_size + sizeof(uint64_t))) {
389 0 0         STK_ERR("capacity * elem_size overflow"); return NULL;
390             }
391              
392 16           uint64_t total = stk_total_size(elem_size, capacity);
393 16           int anonymous = (path == NULL);
394 16           int fd = -1;
395             size_t map_size;
396             void *base;
397              
398 16 100         if (anonymous) {
399 12           map_size = (size_t)total;
400 12           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE,
401             MAP_SHARED|MAP_ANONYMOUS, -1, 0);
402 12 50         if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
403             } else {
404 4           fd = open(path, O_RDWR|O_CREAT, 0666);
405 6 50         if (fd < 0) { STK_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
406 4 50         if (flock(fd, LOCK_EX) < 0) { STK_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
407              
408             struct stat st;
409 4 50         if (fstat(fd, &st) < 0) {
410 0 0         STK_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
411             }
412 4           int is_new = (st.st_size == 0);
413 4 100         if (!is_new && (uint64_t)st.st_size < sizeof(StkHeader)) {
    50          
414 0 0         STK_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
415 0           flock(fd, LOCK_UN); close(fd); return NULL;
416             }
417              
418 4 100         if (is_new) {
419 2 50         if (ftruncate(fd, (off_t)total) < 0) {
420 0 0         STK_ERR("ftruncate: %s", strerror(errno));
421 0           flock(fd, LOCK_UN); close(fd); return NULL;
422             }
423             }
424 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
425 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
426 4 50         if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
427              
428 4 100         if (!is_new) {
429 2 100         if (!stk_validate_header((StkHeader *)base, (uint64_t)st.st_size, variant_id)) {
430 1 50         STK_ERR("invalid or incompatible stack file");
431 1           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
432             }
433 1           flock(fd, LOCK_UN); close(fd);
434 1           return stk_setup(base, map_size, path, -1);
435             }
436             }
437              
438 14           stk_init_header(base, total, elem_size, variant_id, capacity);
439 14 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
440 14           return stk_setup(base, map_size, path, -1);
441             }
442              
443 1           static StkHandle *stk_create_memfd(const char *name, uint64_t capacity,
444             uint32_t elem_size, uint32_t variant_id,
445             char *errbuf) {
446 1 50         if (errbuf) errbuf[0] = '\0';
447 1 50         if (capacity == 0) { STK_ERR("capacity must be > 0"); return NULL; }
    0          
448 1 50         if (elem_size == 0) { STK_ERR("elem_size must be > 0"); return NULL; }
    0          
449 1 50         if (capacity > (UINT64_MAX - sizeof(StkHeader) - 16) / (elem_size + sizeof(uint64_t))) {
450 0 0         STK_ERR("capacity * elem_size overflow"); return NULL;
451             }
452              
453 1           uint64_t total = stk_total_size(elem_size, capacity);
454 1 50         int fd = memfd_create(name ? name : "stack", MFD_CLOEXEC | MFD_ALLOW_SEALING);
455 1 50         if (fd < 0) { STK_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
456 1 50         if (ftruncate(fd, (off_t)total) < 0) { STK_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
457 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
458 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
459 1 50         if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
460 1           stk_init_header(base, total, elem_size, variant_id, capacity);
461 1           return stk_setup(base, (size_t)total, NULL, fd);
462             }
463              
464 1           static StkHandle *stk_open_fd(int fd, uint32_t variant_id, char *errbuf) {
465 1 50         if (errbuf) errbuf[0] = '\0';
466             struct stat st;
467 1 50         if (fstat(fd, &st) < 0) { STK_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
468 1 50         if ((uint64_t)st.st_size < sizeof(StkHeader)) { STK_ERR("too small"); return NULL; }
    0          
469 1           size_t ms = (size_t)st.st_size;
470 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
471 1 50         if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
472 1 50         if (!stk_validate_header((StkHeader *)base, (uint64_t)st.st_size, variant_id)) {
473 0 0         STK_ERR("invalid stack"); munmap(base, ms); return NULL;
474             }
475 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
476 1 50         if (myfd < 0) { STK_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
477 1           return stk_setup(base, ms, NULL, myfd);
478             }
479              
480 17           static void stk_destroy(StkHandle *h) {
481 17 50         if (!h) return;
482 17 100         if (h->notify_fd >= 0) close(h->notify_fd);
483 17 100         if (h->backing_fd >= 0) close(h->backing_fd);
484 17 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
485 17           free(h->path);
486 17           free(h);
487             }
488              
489             /* NOT concurrency-safe — use drain() for concurrent scenarios */
490 3           static void stk_clear(StkHandle *h) {
491 3           __atomic_store_n(&h->hdr->top, 0, __ATOMIC_RELEASE);
492 3           memset(h->ctl, 0, (size_t)h->hdr->capacity * sizeof(uint64_t));
493             /* clear() frees the entire stack at once — wake all waiters. */
494 3 50         if (__atomic_load_n(&h->hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
495 0           __atomic_add_fetch(&h->hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
496 0           syscall(SYS_futex, &h->hdr->push_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
497             }
498 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
499 0           __atomic_add_fetch(&h->hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
500 0           syscall(SYS_futex, &h->hdr->pop_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
501             }
502 3           }
503              
504             /* Concurrency-safe drain: atomically swap top to 0, then release each
505             * drained slot through the state machine. Returns count drained.
506             *
507             * Crash-recovery: a pusher that won the top CAS but died (SIGKILL/segfault)
508             * between stk_slot_claim_write and stk_slot_publish leaves the slot stuck
509             * in WRITING. Plain stk_slot_claim_read would spin forever. We bound the
510             * wait at ~2s per slot; on timeout we CAS WRITING -> EMPTY (gen bumped) so
511             * the slot is reclaimed.
512             *
513             * Limitation: slot ctl encodes only (gen << 2) | state — no PID — so we
514             * cannot distinguish a crashed pusher from a merely slow one. A live pusher
515             * stalled > 2s would be falsely reclaimed; its subsequent publish is a CAS
516             * (see stk_slot_publish) so it observes the gen bump and silently no-ops
517             * rather than resurrecting a phantom FILLED slot. The pusher's value is
518             * dropped — equivalent to a crashed pusher. In practice the gap between
519             * claim_write and publish is sub-microsecond memcpy time, so the false-
520             * positive threshold is many orders of magnitude away from normal latency. */
521 2           static inline uint32_t stk_drain(StkHandle *h) {
522 2           StkHeader *hdr = h->hdr;
523 2           uint32_t t = __atomic_exchange_n(&hdr->top, 0, __ATOMIC_ACQ_REL);
524 2 100         if (t == 0) return 0;
525             /* Wall-clock deadline for the per-slot wait. We hot-spin first, then
526             * fall back to short sleeps to avoid burning a core for 2s on a stuck
527             * slot. The deadline is checked periodically (every 64 iterations) to
528             * keep the steady-state cost ~zero. */
529 6 100         for (uint32_t i = 0; i < t; i++) {
530             struct timespec dl;
531 5           int dl_set = 0;
532 5           uint32_t spins = 0;
533 0           for (;;) {
534 5           uint64_t c = __atomic_load_n(&h->ctl[i], __ATOMIC_ACQUIRE);
535 5           uint32_t st = STK_SLOT_STATE(c);
536 5 50         if (st == STK_SLOT_FILLED) {
537 5           uint64_t nc = (STK_SLOT_GEN(c) << 2) | STK_SLOT_READING;
538 5 50         if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
539             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
540 5           stk_slot_release(&h->ctl[i], STK_SLOT_GEN(c));
541 5           break;
542             }
543 0           continue;
544             }
545 0           stk_spin_pause();
546 0 0         if ((++spins & 0x3F) == 0) {
547 0 0         if (!dl_set) { stk_make_deadline(2.0, &dl); dl_set = 1; }
548             struct timespec rem;
549 0 0         if (!stk_remaining(&dl, &rem)) {
550             /* Treat as abandoned (crashed writer/reader): force the
551             * slot back to EMPTY with gen bumped. If CAS succeeds we
552             * skipped the slot; if it fails, the writer just published
553             * (or another recoverer fixed it) — loop and re-observe so
554             * a FILLED value is not leaked. */
555 0           uint64_t nc = ((STK_SLOT_GEN(c) + 1) << 2) | STK_SLOT_EMPTY;
556 0 0         if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
557             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
558 0           break;
559 0           continue;
560             }
561             /* Short sleep to keep CPU usage low during the long wait. */
562 0           struct timespec ts = { 0, 100000L }; /* 100us */
563 0           nanosleep(&ts, NULL);
564             }
565             }
566             }
567             /* drain freed `t` slots at once — wake up to that many. */
568 1 50         if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
569 0           __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
570 0 0         syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE,
571             t < INT_MAX ? (int)t : INT_MAX, NULL, NULL, 0);
572             }
573 1           return t;
574             }
575              
576             /* eventfd */
577 2           static int stk_create_eventfd(StkHandle *h) {
578 2 50         if (h->notify_fd >= 0) return h->notify_fd;
579 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
580 2 50         if (efd < 0) return -1;
581 2           h->notify_fd = efd;
582 2           return efd;
583             }
584 3           static int stk_notify(StkHandle *h) {
585 3 50         if (h->notify_fd < 0) return 0;
586 3           uint64_t v = 1;
587 3           return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
588             }
589 2           static int64_t stk_eventfd_consume(StkHandle *h) {
590 2 50         if (h->notify_fd < 0) return -1;
591 2           uint64_t v = 0;
592 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
593 2           return (int64_t)v;
594             }
595 1           static int stk_msync(StkHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
596              
597             #endif /* STACK_H */