File Coverage

pool.h
Criterion Covered Total %
statement 341 396 86.1
branch 139 276 50.3
condition n/a
subroutine n/a
pod n/a
total 480 672 71.4


line stmt bran cond sub pod time code
1             /*
2             * pool.h -- Fixed-size shared-memory object pool for Linux
3             *
4             * Bitmap-based slot allocation with CAS (lock-free).
5             * Futex-based blocking when pool is exhausted.
6             * PID-based stale slot recovery.
7             *
8             * Variants:
9             * Raw — opaque byte slots of arbitrary elem_size
10             * I64 — int64_t slots with atomic CAS/add
11             * F64 — double slots
12             * I32 — int32_t slots with atomic CAS/add
13             * Str — fixed-length string slots (4-byte length prefix + data)
14             */
15              
16             #ifndef POOL_H
17             #define POOL_H
18              
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35              
36             /* ================================================================
37             * Constants
38             * ================================================================ */
39              
40             #define POOL_MAGIC 0x504F4C31U /* "POL1" */
41             #define POOL_VERSION 1
42             #define POOL_ERR_BUFLEN 256
43              
44             #define POOL_VAR_RAW 0
45             #define POOL_VAR_I64 1
46             #define POOL_VAR_F64 2
47             #define POOL_VAR_I32 3
48             #define POOL_VAR_STR 4
49              
50             #define POOL_ALIGN8(x) (((x) + 7) & ~(uint64_t)7)
51              
52             /* ================================================================
53             * Header (128 bytes = 2 cache lines)
54             * ================================================================ */
55              
56             typedef struct {
57             /* ---- Cache line 0 (0-63): immutable after create ---- */
58             uint32_t magic; /* 0 */
59             uint32_t version; /* 4 */
60             uint32_t elem_size; /* 8 */
61             uint32_t variant_id; /* 12 */
62             uint64_t capacity; /* 16: number of slots */
63             uint64_t total_size; /* 24: total mmap size */
64             uint64_t data_off; /* 32: offset to slot data */
65             uint64_t bitmap_off; /* 40: offset to allocation bitmap */
66             uint64_t owners_off; /* 48: offset to per-slot owner PIDs */
67             uint8_t _pad0[8]; /* 56-63 */
68              
69             /* ---- Cache line 1 (64-127): mutable state ---- */
70             uint32_t used; /* 64: allocated count (futex word) */
71             uint32_t waiters; /* 68: blocked on alloc */
72             uint8_t _pad1[8]; /* 72-79 */
73             uint64_t stat_allocs; /* 80 */
74             uint64_t stat_frees; /* 88 */
75             uint64_t stat_waits; /* 96 */
76             uint64_t stat_timeouts; /* 104 */
77             uint64_t stat_recoveries;/* 112 */
78             uint8_t _pad2[8]; /* 120-127 */
79             } PoolHeader;
80              
81             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
82             _Static_assert(sizeof(PoolHeader) == 128, "PoolHeader must be 128 bytes");
83             #endif
84              
85             /* ================================================================
86             * Process-local handle
87             * ================================================================ */
88              
89             typedef struct {
90             PoolHeader *hdr;
91             uint64_t *bitmap;
92             uint32_t *owners;
93             uint8_t *data;
94             size_t mmap_size;
95             uint32_t bitmap_words;
96             char *path;
97             int notify_fd;
98             int backing_fd;
99             uint32_t scan_hint;
100             } PoolHandle;
101              
102             /* ================================================================
103             * Utility
104             * ================================================================ */
105              
106 22           static inline int pool_pid_alive(uint32_t pid) {
107 22 50         if (pid == 0) return 1;
108 22 100         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    50          
109             }
110              
111 2           static inline void pool_make_deadline(double timeout, struct timespec *deadline) {
112 2           clock_gettime(CLOCK_MONOTONIC, deadline);
113 2           deadline->tv_sec += (time_t)timeout;
114 2           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
115 2 100         if (deadline->tv_nsec >= 1000000000L) {
116 1           deadline->tv_sec++;
117 1           deadline->tv_nsec -= 1000000000L;
118             }
119 2           }
120              
121 3           static inline int pool_remaining_time(const struct timespec *deadline,
122             struct timespec *remaining) {
123             struct timespec now;
124 3           clock_gettime(CLOCK_MONOTONIC, &now);
125 3           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
126 3           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
127 3 50         if (remaining->tv_nsec < 0) {
128 3           remaining->tv_sec--;
129 3           remaining->tv_nsec += 1000000000L;
130             }
131 3           return remaining->tv_sec >= 0;
132             }
133              
134             /* ================================================================
135             * Slot access
136             * ================================================================ */
137              
138 1686           static inline uint8_t *pool_slot_ptr(PoolHandle *h, uint64_t slot) {
139 1686           return h->data + slot * h->hdr->elem_size;
140             }
141              
142 964           static inline int pool_is_allocated(PoolHandle *h, uint64_t slot) {
143 964           uint32_t widx = (uint32_t)(slot / 64);
144 964           int bit = (int)(slot % 64);
145 964           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
146 964           return (word >> bit) & 1;
147             }
148              
149             /* ================================================================
150             * Allocation (lock-free bitmap scan + CAS)
151             * ================================================================ */
152              
153 932           static inline int64_t pool_try_alloc(PoolHandle *h) {
154 932           uint32_t nwords = h->bitmap_words;
155 932           uint64_t cap = h->hdr->capacity;
156 932           uint32_t start = h->scan_hint;
157 932           uint32_t mypid = (uint32_t)getpid();
158              
159 960 100         for (uint32_t i = 0; i < nwords; i++) {
160 944           uint32_t widx = (start + i) % nwords;
161 944           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
162              
163 944 100         while (word != ~(uint64_t)0) {
164 933           int bit = __builtin_ctzll(~word);
165 933           uint64_t slot = (uint64_t)widx * 64 + bit;
166 933 100         if (slot >= cap) break;
167              
168 916           uint64_t new_word = word | ((uint64_t)1 << bit);
169 916 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
170             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
171 916           __atomic_store_n(&h->owners[slot], mypid, __ATOMIC_RELAXED);
172 916           memset(pool_slot_ptr(h, slot), 0, h->hdr->elem_size);
173 916           __atomic_add_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
174 916           __atomic_add_fetch(&h->hdr->stat_allocs, 1, __ATOMIC_RELAXED);
175             /* Advance hint past full word to reduce next scan */
176 11 100         h->scan_hint = (new_word == ~(uint64_t)0 && nwords > 1)
177 927 100         ? (widx + 1) % nwords : widx;
178 916           return (int64_t)slot;
179             }
180             /* CAS failed — word now holds current value, retry */
181             }
182             }
183 16           return -1;
184             }
185              
186             /* Blocking alloc. timeout<0 = infinite, 0 = non-blocking, >0 = seconds. */
187 911           static inline int64_t pool_alloc(PoolHandle *h, double timeout) {
188 911           int64_t slot = pool_try_alloc(h);
189 911 100         if (slot >= 0) return slot;
190 3 100         if (timeout == 0) return -1;
191              
192 2           PoolHeader *hdr = h->hdr;
193             struct timespec deadline, remaining;
194 2           int has_deadline = (timeout > 0);
195 2 50         if (has_deadline) pool_make_deadline(timeout, &deadline);
196              
197 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
198              
199 0           for (;;) {
200 2           uint32_t cur_used = __atomic_load_n(&hdr->used, __ATOMIC_RELAXED);
201 2 50         if (cur_used < (uint32_t)hdr->capacity) {
202 0           slot = pool_try_alloc(h);
203 0 0         if (slot >= 0) return slot;
204             }
205              
206 2           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
207              
208 2           cur_used = __atomic_load_n(&hdr->used, __ATOMIC_ACQUIRE);
209 2 50         if (cur_used >= (uint32_t)hdr->capacity) {
210 2           struct timespec *pts = NULL;
211 2 50         if (has_deadline) {
212 2 50         if (!pool_remaining_time(&deadline, &remaining)) {
213 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
214 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
215 0           return -1;
216             }
217 2           pts = &remaining;
218             }
219 2           syscall(SYS_futex, &hdr->used, FUTEX_WAIT, cur_used, pts, NULL, 0);
220             }
221              
222 2           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
223              
224 2           slot = pool_try_alloc(h);
225 2 100         if (slot >= 0) return slot;
226              
227 1 50         if (has_deadline) {
228 1 50         if (!pool_remaining_time(&deadline, &remaining)) {
229 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
230 1           return -1;
231             }
232             }
233             }
234             }
235              
236             /* ================================================================
237             * Free
238             * ================================================================ */
239              
240 691           static inline int pool_free_slot(PoolHandle *h, uint64_t slot) {
241 691           PoolHeader *hdr = h->hdr;
242 691 50         if (slot >= hdr->capacity) return 0;
243              
244 691           uint32_t widx = (uint32_t)(slot / 64);
245 691           int bit = (int)(slot % 64);
246 691           uint64_t mask = (uint64_t)1 << bit;
247              
248 0           for (;;) {
249 691           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
250 1380 100         if (!(word & mask)) return 0;
251              
252 689           uint64_t new_word = word & ~mask;
253 689 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
254             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
255 689           __atomic_store_n(&h->owners[slot], 0, __ATOMIC_RELAXED);
256 689           __atomic_sub_fetch(&hdr->used, 1, __ATOMIC_RELEASE);
257 689           __atomic_add_fetch(&hdr->stat_frees, 1, __ATOMIC_RELAXED);
258 689 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
259 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
260 689           return 1;
261             }
262             }
263             }
264              
265             /* ================================================================
266             * Batch free — single used decrement + single futex wake
267             * ================================================================ */
268              
269 3           static inline uint32_t pool_free_n(PoolHandle *h, uint64_t *slots, uint32_t count) {
270 3           uint32_t freed = 0;
271 3           PoolHeader *hdr = h->hdr;
272              
273 10 100         for (uint32_t i = 0; i < count; i++) {
274 7           uint64_t slot = slots[i];
275 7 50         if (slot >= hdr->capacity) continue;
276              
277 7           uint32_t widx = (uint32_t)(slot / 64);
278 7           int bit = (int)(slot % 64);
279 7           uint64_t mask = (uint64_t)1 << bit;
280              
281 0           for (;;) {
282 7           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
283 14 50         if (!(word & mask)) break;
284 7           uint64_t new_word = word & ~mask;
285 7 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
286             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
287 7           __atomic_store_n(&h->owners[slot], 0, __ATOMIC_RELAXED);
288 7           freed++;
289 7           break;
290             }
291             }
292             }
293              
294 3 50         if (freed > 0) {
295 3           __atomic_sub_fetch(&hdr->used, freed, __ATOMIC_RELEASE);
296 3           __atomic_add_fetch(&hdr->stat_frees, freed, __ATOMIC_RELAXED);
297 3 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) {
298 0 0         int wake = freed < (uint32_t)INT_MAX ? (int)freed : INT_MAX;
299 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, wake, NULL, NULL, 0);
300             }
301             }
302 3           return freed;
303             }
304              
305             /* ================================================================
306             * Batch alloc — all-or-nothing, shared deadline
307             * ================================================================ */
308              
309 2           static inline int pool_alloc_n(PoolHandle *h, uint64_t *out, uint32_t count,
310             double timeout) {
311 2 50         if (count == 0) return 1;
312              
313 2 100         if (timeout == 0) {
314 2 50         for (uint32_t i = 0; i < count; i++) {
315 2           int64_t slot = pool_try_alloc(h);
316 2 100         if (slot < 0) {
317 1 50         if (i > 0) pool_free_n(h, out, i);
318 1           return 0;
319             }
320 1           out[i] = (uint64_t)slot;
321             }
322 0           return 1;
323             }
324              
325             struct timespec deadline, remaining;
326 1           int has_deadline = (timeout > 0);
327 1 50         if (has_deadline) pool_make_deadline(timeout, &deadline);
328              
329 4 100         for (uint32_t i = 0; i < count; i++) {
330 3           double t = timeout;
331 3 50         if (has_deadline) {
332 0 0         if (!pool_remaining_time(&deadline, &remaining)) {
333 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
334 0 0         if (i > 0) pool_free_n(h, out, i);
335 0           return 0;
336             }
337 0           t = (double)remaining.tv_sec + (double)remaining.tv_nsec / 1e9;
338             }
339 3           int64_t slot = pool_alloc(h, t);
340 3 50         if (slot < 0) {
341 0 0         if (i > 0) pool_free_n(h, out, i);
342 0           return 0;
343             }
344 3           out[i] = (uint64_t)slot;
345             }
346 1           return 1;
347             }
348              
349             /* ================================================================
350             * Stale recovery — CAS owner to narrow race window
351             * ================================================================ */
352              
353 2           static inline uint32_t pool_recover_stale(PoolHandle *h) {
354 2           uint32_t recovered = 0;
355 2           uint64_t cap = h->hdr->capacity;
356              
357 202 100         for (uint64_t slot = 0; slot < cap; slot++) {
358 201 100         if (!pool_is_allocated(h, slot)) continue;
359 22           uint32_t owner = __atomic_load_n(&h->owners[slot], __ATOMIC_ACQUIRE);
360 22 50         if (owner == 0 || pool_pid_alive(owner)) continue;
    100          
361              
362             /* CAS owner from dead PID to 0 — if it fails, slot was
363             * re-allocated or already recovered by another process */
364 21 50         if (!__atomic_compare_exchange_n(&h->owners[slot], &owner, 0,
365             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
366 0           continue;
367              
368             /* We now own the right to free this slot's bitmap bit */
369 21           uint32_t widx = (uint32_t)(slot / 64);
370 21           int bit = (int)(slot % 64);
371 21           uint64_t mask = (uint64_t)1 << bit;
372              
373 0           for (;;) {
374 21           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
375 42 50         if (!(word & mask)) break;
376 21           uint64_t new_word = word & ~mask;
377 21 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
378             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
379 21           __atomic_sub_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
380 21           __atomic_add_fetch(&h->hdr->stat_frees, 1, __ATOMIC_RELAXED);
381 21 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
382 0           syscall(SYS_futex, &h->hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
383 21           recovered++;
384 21           break;
385             }
386             }
387             }
388              
389 2 50         if (recovered > 0)
390 2           __atomic_add_fetch(&h->hdr->stat_recoveries, recovered, __ATOMIC_RELAXED);
391              
392 2           return recovered;
393             }
394              
395             /* ================================================================
396             * Layout calculation
397             * ================================================================ */
398              
399 51           static inline void pool_calc_layout(uint64_t capacity, uint32_t elem_size,
400             uint64_t *bitmap_off, uint64_t *owners_off,
401             uint64_t *data_off, uint64_t *total_size) {
402 51           uint64_t bwords = (capacity + 63) / 64;
403 51           uint64_t bitmap_sz = bwords * 8;
404 51           uint64_t owners_sz = POOL_ALIGN8(capacity * 4);
405 51           uint64_t data_sz = (uint64_t)capacity * elem_size;
406              
407 51           *bitmap_off = sizeof(PoolHeader);
408 51           *owners_off = *bitmap_off + bitmap_sz;
409 51           *data_off = *owners_off + owners_sz;
410 51           *total_size = *data_off + data_sz;
411 51           }
412              
413             /* ================================================================
414             * Header initialization (shared by pool_create and pool_create_memfd)
415             * ================================================================ */
416              
417 42           static inline void pool_init_header(void *base, uint64_t total,
418             uint32_t elem_size, uint32_t variant_id,
419             uint64_t capacity, uint64_t bm_off,
420             uint64_t own_off, uint64_t dat_off) {
421 42           PoolHeader *hdr = (PoolHeader *)base;
422 42           memset(base, 0, (size_t)total);
423 42           hdr->magic = POOL_MAGIC;
424 42           hdr->version = POOL_VERSION;
425 42           hdr->elem_size = elem_size;
426 42           hdr->variant_id = variant_id;
427 42           hdr->capacity = capacity;
428 42           hdr->total_size = total;
429 42           hdr->data_off = dat_off;
430 42           hdr->bitmap_off = bm_off;
431 42           hdr->owners_off = own_off;
432 42           __atomic_thread_fence(__ATOMIC_SEQ_CST);
433 42           }
434              
435             /* ================================================================
436             * Create / Open / Close
437             * ================================================================ */
438              
439             #define POOL_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, POOL_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
440              
441             /* Max capacity to prevent bitmap_words (uint32_t) truncation */
442             #define POOL_MAX_CAPACITY ((uint64_t)UINT32_MAX * 64)
443              
444             /* Validate header: magic, version, variant, sizes, AND layout offsets. */
445 7           static inline int pool_validate_header(const PoolHeader *hdr, uint64_t file_size,
446             uint32_t expected_variant) {
447 7 50         if (hdr->magic != POOL_MAGIC) return 0;
448 7 50         if (hdr->version != POOL_VERSION) return 0;
449 7 50         if (hdr->variant_id != expected_variant) return 0;
450 7 50         if (hdr->capacity == 0 || hdr->capacity > POOL_MAX_CAPACITY) return 0;
    50          
451 7 50         if (hdr->elem_size == 0) return 0;
452 7 50         if (hdr->capacity > (UINT64_MAX - sizeof(PoolHeader)) / hdr->elem_size) return 0;
453 7 50         if (hdr->total_size != file_size) return 0;
454              
455             uint64_t bm_off, own_off, dat_off, total;
456 7           pool_calc_layout(hdr->capacity, hdr->elem_size, &bm_off, &own_off, &dat_off, &total);
457              
458 7 50         if (hdr->bitmap_off != bm_off) return 0;
459 7 50         if (hdr->owners_off != own_off) return 0;
460 7 50         if (hdr->data_off != dat_off) return 0;
461 7 50         if (hdr->total_size != total) return 0;
462 7           return 1;
463             }
464              
465 49           static inline PoolHandle *pool_setup_handle(void *base, size_t map_size,
466             const char *path, int backing_fd) {
467 49           PoolHeader *hdr = (PoolHeader *)base;
468 49           PoolHandle *h = (PoolHandle *)calloc(1, sizeof(PoolHandle));
469 49 50         if (!h) { munmap(base, map_size); return NULL; }
470              
471 49           h->hdr = hdr;
472 49           h->bitmap = (uint64_t *)((uint8_t *)base + hdr->bitmap_off);
473 49           h->owners = (uint32_t *)((uint8_t *)base + hdr->owners_off);
474 49           h->data = (uint8_t *)base + hdr->data_off;
475 49           h->mmap_size = map_size;
476 49           h->bitmap_words = (uint32_t)((hdr->capacity + 63) / 64);
477 49 100         h->path = path ? strdup(path) : NULL;
478 49           h->notify_fd = -1;
479 49           h->backing_fd = backing_fd;
480 49           h->scan_hint = (uint32_t)getpid() % h->bitmap_words;
481              
482 49           return h;
483             }
484              
485 38           static PoolHandle *pool_create(const char *path, uint64_t capacity,
486             uint32_t elem_size, uint32_t variant_id,
487             char *errbuf) {
488 38 50         if (errbuf) errbuf[0] = '\0';
489              
490 38 50         if (capacity == 0) { POOL_ERR("capacity must be > 0"); return NULL; }
    0          
491 38 50         if (elem_size == 0) { POOL_ERR("elem_size must be > 0"); return NULL; }
    0          
492 38 50         if (capacity > POOL_MAX_CAPACITY) { POOL_ERR("capacity too large"); return NULL; }
    0          
493 38 50         if (capacity > (UINT64_MAX - sizeof(PoolHeader)) / elem_size) {
494 0 0         POOL_ERR("capacity * elem_size overflow"); return NULL;
495             }
496              
497             uint64_t bm_off, own_off, dat_off, total;
498 38           pool_calc_layout(capacity, elem_size, &bm_off, &own_off, &dat_off, &total);
499              
500 38           int fd = -1;
501 38           int anonymous = (path == NULL);
502             size_t map_size;
503             void *base;
504              
505 38 100         if (anonymous) {
506 27           map_size = (size_t)total;
507 27           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
508             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
509 27 50         if (base == MAP_FAILED) {
510 0 0         POOL_ERR("mmap(anonymous): %s", strerror(errno));
511 0           return NULL;
512             }
513             } else {
514 11           fd = open(path, O_RDWR | O_CREAT, 0666);
515 13 50         if (fd < 0) { POOL_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
516              
517 11 50         if (flock(fd, LOCK_EX) < 0) {
518 0 0         POOL_ERR("flock(%s): %s", path, strerror(errno));
519 0           close(fd); return NULL;
520             }
521              
522             struct stat st;
523 11 50         if (fstat(fd, &st) < 0) {
524 0 0         POOL_ERR("fstat(%s): %s", path, strerror(errno));
525 0           flock(fd, LOCK_UN); close(fd); return NULL;
526             }
527              
528 11           int is_new = (st.st_size == 0);
529              
530 11 100         if (!is_new && (uint64_t)st.st_size < sizeof(PoolHeader)) {
    50          
531 0 0         POOL_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
532 0           flock(fd, LOCK_UN); close(fd); return NULL;
533             }
534              
535 11 100         if (is_new) {
536 9 50         if (ftruncate(fd, (off_t)total) < 0) {
537 0 0         POOL_ERR("ftruncate(%s): %s", path, strerror(errno));
538 0           flock(fd, LOCK_UN); close(fd); return NULL;
539             }
540             }
541              
542 11 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
543 11           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
544 11 50         if (base == MAP_FAILED) {
545 0 0         POOL_ERR("mmap(%s): %s", path, strerror(errno));
546 0           flock(fd, LOCK_UN); close(fd); return NULL;
547             }
548              
549 11 100         if (!is_new) {
550 2 50         if (!pool_validate_header((PoolHeader *)base, (uint64_t)st.st_size, variant_id)) {
551 0 0         POOL_ERR("%s: invalid or incompatible pool file", path);
552 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
553             }
554 2           flock(fd, LOCK_UN);
555 2           close(fd);
556 2           return pool_setup_handle(base, map_size, path, -1);
557             }
558             }
559              
560             /* Initialize header — flock still held for file-backed new files */
561 36           pool_init_header(base, total, elem_size, variant_id, capacity,
562             bm_off, own_off, dat_off);
563              
564 36 100         if (fd >= 0) {
565 9           flock(fd, LOCK_UN);
566 9           close(fd);
567             }
568              
569 36           return pool_setup_handle(base, map_size, path, -1);
570             }
571              
572 6           static PoolHandle *pool_create_memfd(const char *name, uint64_t capacity,
573             uint32_t elem_size, uint32_t variant_id,
574             char *errbuf) {
575 6 50         if (errbuf) errbuf[0] = '\0';
576              
577 6 50         if (capacity == 0) { POOL_ERR("capacity must be > 0"); return NULL; }
    0          
578 6 50         if (elem_size == 0) { POOL_ERR("elem_size must be > 0"); return NULL; }
    0          
579 6 50         if (capacity > POOL_MAX_CAPACITY) { POOL_ERR("capacity too large"); return NULL; }
    0          
580 6 50         if (capacity > (UINT64_MAX - sizeof(PoolHeader)) / elem_size) {
581 0 0         POOL_ERR("capacity * elem_size overflow"); return NULL;
582             }
583              
584             uint64_t bm_off, own_off, dat_off, total;
585 6           pool_calc_layout(capacity, elem_size, &bm_off, &own_off, &dat_off, &total);
586              
587 6 50         int fd = memfd_create(name ? name : "pool", MFD_CLOEXEC | MFD_ALLOW_SEALING);
588 6 50         if (fd < 0) { POOL_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
589              
590 6 50         if (ftruncate(fd, (off_t)total) < 0) {
591 0 0         POOL_ERR("ftruncate(memfd): %s", strerror(errno));
592 0           close(fd); return NULL;
593             }
594              
595             /* Seal against shrink/grow to block ftruncate-based SIGBUS attacks via
596             * SCM_RIGHTS-shared fds. Peers can still write; only size is immutable. */
597 6           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
598              
599 6           void *base = mmap(NULL, (size_t)total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
600 6 50         if (base == MAP_FAILED) {
601 0 0         POOL_ERR("mmap(memfd): %s", strerror(errno));
602 0           close(fd); return NULL;
603             }
604              
605 6           pool_init_header(base, total, elem_size, variant_id, capacity,
606             bm_off, own_off, dat_off);
607              
608 6           return pool_setup_handle(base, (size_t)total, NULL, fd);
609             }
610              
611 5           static PoolHandle *pool_open_fd(int fd, uint32_t variant_id, char *errbuf) {
612 5 50         if (errbuf) errbuf[0] = '\0';
613              
614             struct stat st;
615 5 50         if (fstat(fd, &st) < 0) {
616 0 0         POOL_ERR("fstat(fd=%d): %s", fd, strerror(errno));
617 0           return NULL;
618             }
619              
620 5 50         if ((uint64_t)st.st_size < sizeof(PoolHeader)) {
621 0 0         POOL_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
622 0           return NULL;
623             }
624              
625 5           size_t map_size = (size_t)st.st_size;
626 5           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
627 5 50         if (base == MAP_FAILED) {
628 0 0         POOL_ERR("mmap(fd=%d): %s", fd, strerror(errno));
629 0           return NULL;
630             }
631              
632 5 50         if (!pool_validate_header((PoolHeader *)base, (uint64_t)st.st_size, variant_id)) {
633 0 0         POOL_ERR("fd %d: invalid or incompatible pool", fd);
634 0           munmap(base, map_size);
635 0           return NULL;
636             }
637              
638 5           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
639 5 50         if (myfd < 0) {
640 0 0         POOL_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
641 0           munmap(base, map_size);
642 0           return NULL;
643             }
644              
645 5           return pool_setup_handle(base, map_size, NULL, myfd);
646             }
647              
648 49           static void pool_destroy(PoolHandle *h) {
649 49 50         if (!h) return;
650 49 100         if (h->notify_fd >= 0) close(h->notify_fd);
651 49 100         if (h->backing_fd >= 0) close(h->backing_fd);
652 49 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
653 49           free(h->path);
654 49           free(h);
655             }
656              
657             /* ================================================================
658             * Eventfd integration
659             * ================================================================ */
660              
661 3           static int pool_create_eventfd(PoolHandle *h) {
662 3 100         if (h->notify_fd >= 0) return h->notify_fd;
663 2           int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
664 2 50         if (efd < 0) return -1;
665 2           h->notify_fd = efd;
666 2           return efd;
667             }
668              
669 4           static int pool_notify(PoolHandle *h) {
670 4 50         if (h->notify_fd < 0) return 0;
671 4           uint64_t val = 1;
672 4           return write(h->notify_fd, &val, sizeof(val)) == sizeof(val);
673             }
674              
675 4           static int64_t pool_eventfd_consume(PoolHandle *h) {
676 4 50         if (h->notify_fd < 0) return -1;
677 4           uint64_t val = 0;
678 4 100         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
679 3           return (int64_t)val;
680             }
681              
682 1           static int pool_msync(PoolHandle *h) {
683 1           return msync(h->hdr, h->mmap_size, MS_SYNC);
684             }
685              
686             /* ================================================================
687             * Typed accessors — integers (atomic)
688             * ================================================================ */
689              
690 62           static inline int64_t pool_get_i64(PoolHandle *h, uint64_t slot) {
691 62           return __atomic_load_n((int64_t *)pool_slot_ptr(h, slot), __ATOMIC_RELAXED);
692             }
693              
694 613           static inline void pool_set_i64(PoolHandle *h, uint64_t slot, int64_t val) {
695 613           __atomic_store_n((int64_t *)pool_slot_ptr(h, slot), val, __ATOMIC_RELAXED);
696 613           }
697              
698 3           static inline int pool_cas_i64(PoolHandle *h, uint64_t slot,
699             int64_t expected, int64_t desired) {
700 3           return __atomic_compare_exchange_n((int64_t *)pool_slot_ptr(h, slot),
701             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
702             }
703              
704 2           static inline int64_t pool_cmpxchg_i64(PoolHandle *h, uint64_t slot,
705             int64_t expected, int64_t desired) {
706 2           __atomic_compare_exchange_n((int64_t *)pool_slot_ptr(h, slot),
707             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
708 2           return expected;
709             }
710              
711 1           static inline int64_t pool_xchg_i64(PoolHandle *h, uint64_t slot, int64_t val) {
712 1           return __atomic_exchange_n((int64_t *)pool_slot_ptr(h, slot), val, __ATOMIC_ACQ_REL);
713             }
714              
715 7           static inline int64_t pool_add_i64(PoolHandle *h, uint64_t slot, int64_t delta) {
716 7           return __atomic_add_fetch((int64_t *)pool_slot_ptr(h, slot), delta, __ATOMIC_ACQ_REL);
717             }
718              
719 5           static inline int32_t pool_get_i32(PoolHandle *h, uint64_t slot) {
720 5           return __atomic_load_n((int32_t *)pool_slot_ptr(h, slot), __ATOMIC_RELAXED);
721             }
722              
723 6           static inline void pool_set_i32(PoolHandle *h, uint64_t slot, int32_t val) {
724 6           __atomic_store_n((int32_t *)pool_slot_ptr(h, slot), val, __ATOMIC_RELAXED);
725 6           }
726              
727 1           static inline int pool_cas_i32(PoolHandle *h, uint64_t slot,
728             int32_t expected, int32_t desired) {
729 1           return __atomic_compare_exchange_n((int32_t *)pool_slot_ptr(h, slot),
730             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
731             }
732              
733 2           static inline int32_t pool_cmpxchg_i32(PoolHandle *h, uint64_t slot,
734             int32_t expected, int32_t desired) {
735 2           __atomic_compare_exchange_n((int32_t *)pool_slot_ptr(h, slot),
736             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
737 2           return expected;
738             }
739              
740 1           static inline int32_t pool_xchg_i32(PoolHandle *h, uint64_t slot, int32_t val) {
741 1           return __atomic_exchange_n((int32_t *)pool_slot_ptr(h, slot), val, __ATOMIC_ACQ_REL);
742             }
743              
744 2           static inline int32_t pool_add_i32(PoolHandle *h, uint64_t slot, int32_t delta) {
745 2           return __atomic_add_fetch((int32_t *)pool_slot_ptr(h, slot), delta, __ATOMIC_ACQ_REL);
746             }
747              
748             /* ================================================================
749             * Typed accessors — float (non-atomic)
750             * ================================================================ */
751              
752 7           static inline double pool_get_f64(PoolHandle *h, uint64_t slot) {
753             double v;
754 7           memcpy(&v, pool_slot_ptr(h, slot), sizeof(double));
755 7           return v;
756             }
757              
758 7           static inline void pool_set_f64(PoolHandle *h, uint64_t slot, double val) {
759 7           memcpy(pool_slot_ptr(h, slot), &val, sizeof(double));
760 7           }
761              
762             /* ================================================================
763             * Typed accessors — string (4-byte length prefix + data)
764             * ================================================================ */
765              
766 11           static inline uint32_t pool_get_str_len(PoolHandle *h, uint64_t slot) {
767             uint32_t len;
768 11           memcpy(&len, pool_slot_ptr(h, slot), sizeof(uint32_t));
769 11           uint32_t max_len = h->hdr->elem_size - sizeof(uint32_t);
770 11 50         if (len > max_len) len = max_len;
771 11           return len;
772             }
773              
774 11           static inline const char *pool_get_str_ptr(PoolHandle *h, uint64_t slot) {
775 11           return (const char *)(pool_slot_ptr(h, slot) + sizeof(uint32_t));
776             }
777              
778 8           static inline void pool_set_str(PoolHandle *h, uint64_t slot,
779             const char *str, uint32_t len) {
780 8           uint32_t max_len = h->hdr->elem_size - sizeof(uint32_t);
781 8 100         if (len > max_len) len = max_len;
782 8           memcpy(pool_slot_ptr(h, slot), &len, sizeof(uint32_t));
783 8           memcpy(pool_slot_ptr(h, slot) + sizeof(uint32_t), str, len);
784 8           }
785              
786             /* ================================================================
787             * Reset — free all slots (NOT concurrency-safe, caller must
788             * ensure no other process is accessing the pool)
789             * ================================================================ */
790              
791 10           static inline void pool_reset(PoolHandle *h) {
792 10           PoolHeader *hdr = h->hdr;
793 10           memset(h->bitmap, 0, (size_t)h->bitmap_words * 8);
794 10           memset(h->owners, 0, (size_t)hdr->capacity * 4);
795 10           __atomic_store_n(&hdr->used, 0, __ATOMIC_RELEASE);
796 10 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
797 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
798 10           }
799              
800             #endif /* POOL_H */