File Coverage

pool.h
Criterion Covered Total %
statement 338 395 85.5
branch 137 272 50.3
condition n/a
subroutine n/a
pod n/a
total 475 667 71.2


line stmt bran cond sub pod time code
1             /*
2             * pool.h -- Fixed-size shared-memory object pool for Linux
3             *
4             * Bitmap-based slot allocation with CAS (lock-free).
5             * Futex-based blocking when pool is exhausted.
6             * PID-based stale slot recovery.
7             *
8             * Variants:
9             * Raw — opaque byte slots of arbitrary elem_size
10             * I64 — int64_t slots with atomic CAS/add
11             * F64 — double slots
12             * I32 — int32_t slots with atomic CAS/add
13             * Str — fixed-length string slots (4-byte length prefix + data)
14             */
15              
16             #ifndef POOL_H
17             #define POOL_H
18              
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35              
36             /* ================================================================
37             * Constants
38             * ================================================================ */
39              
40             #define POOL_MAGIC 0x504F4C31U /* "POL1" */
41             #define POOL_VERSION 1
42             #define POOL_ERR_BUFLEN 256
43              
44             #define POOL_VAR_RAW 0
45             #define POOL_VAR_I64 1
46             #define POOL_VAR_F64 2
47             #define POOL_VAR_I32 3
48             #define POOL_VAR_STR 4
49              
50             #define POOL_ALIGN8(x) (((x) + 7) & ~(uint64_t)7)
51              
52             /* ================================================================
53             * Header (128 bytes = 2 cache lines)
54             * ================================================================ */
55              
56             typedef struct {
57             /* ---- Cache line 0 (0-63): immutable after create ---- */
58             uint32_t magic; /* 0 */
59             uint32_t version; /* 4 */
60             uint32_t elem_size; /* 8 */
61             uint32_t variant_id; /* 12 */
62             uint64_t capacity; /* 16: number of slots */
63             uint64_t total_size; /* 24: total mmap size */
64             uint64_t data_off; /* 32: offset to slot data */
65             uint64_t bitmap_off; /* 40: offset to allocation bitmap */
66             uint64_t owners_off; /* 48: offset to per-slot owner PIDs */
67             uint8_t _pad0[8]; /* 56-63 */
68              
69             /* ---- Cache line 1 (64-127): mutable state ---- */
70             uint32_t used; /* 64: allocated count (futex word) */
71             uint32_t waiters; /* 68: blocked on alloc */
72             uint8_t _pad1[8]; /* 72-79 */
73             uint64_t stat_allocs; /* 80 */
74             uint64_t stat_frees; /* 88 */
75             uint64_t stat_waits; /* 96 */
76             uint64_t stat_timeouts; /* 104 */
77             uint32_t stat_recoveries;/* 112 */
78             uint8_t _pad2[12]; /* 116-127 */
79             } PoolHeader;
80              
81             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
82             _Static_assert(sizeof(PoolHeader) == 128, "PoolHeader must be 128 bytes");
83             #endif
84              
85             /* ================================================================
86             * Process-local handle
87             * ================================================================ */
88              
89             typedef struct {
90             PoolHeader *hdr;
91             uint64_t *bitmap;
92             uint32_t *owners;
93             uint8_t *data;
94             size_t mmap_size;
95             uint32_t bitmap_words;
96             char *path;
97             int notify_fd;
98             int backing_fd;
99             uint32_t scan_hint;
100             } PoolHandle;
101              
102             /* ================================================================
103             * Utility
104             * ================================================================ */
105              
106 22           static inline int pool_pid_alive(uint32_t pid) {
107 22 50         if (pid == 0) return 1;
108 22 100         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    50          
109             }
110              
111 2           static inline void pool_make_deadline(double timeout, struct timespec *deadline) {
112 2           clock_gettime(CLOCK_MONOTONIC, deadline);
113 2           deadline->tv_sec += (time_t)timeout;
114 2           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
115 2 50         if (deadline->tv_nsec >= 1000000000L) {
116 0           deadline->tv_sec++;
117 0           deadline->tv_nsec -= 1000000000L;
118             }
119 2           }
120              
121 3           static inline int pool_remaining_time(const struct timespec *deadline,
122             struct timespec *remaining) {
123             struct timespec now;
124 3           clock_gettime(CLOCK_MONOTONIC, &now);
125 3           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
126 3           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
127 3 100         if (remaining->tv_nsec < 0) {
128 2           remaining->tv_sec--;
129 2           remaining->tv_nsec += 1000000000L;
130             }
131 3           return remaining->tv_sec >= 0;
132             }
133              
134             /* ================================================================
135             * Slot access
136             * ================================================================ */
137              
138 1680           static inline uint8_t *pool_slot_ptr(PoolHandle *h, uint64_t slot) {
139 1680           return h->data + slot * h->hdr->elem_size;
140             }
141              
142 962           static inline int pool_is_allocated(PoolHandle *h, uint64_t slot) {
143 962           uint32_t widx = (uint32_t)(slot / 64);
144 962           int bit = (int)(slot % 64);
145 962           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
146 962           return (word >> bit) & 1;
147             }
148              
149             /* ================================================================
150             * Allocation (lock-free bitmap scan + CAS)
151             * ================================================================ */
152              
153 928           static inline int64_t pool_try_alloc(PoolHandle *h) {
154 928           uint32_t nwords = h->bitmap_words;
155 928           uint64_t cap = h->hdr->capacity;
156 928           uint32_t start = h->scan_hint;
157 928           uint32_t mypid = (uint32_t)getpid();
158              
159 956 100         for (uint32_t i = 0; i < nwords; i++) {
160 940           uint32_t widx = (start + i) % nwords;
161 940           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
162              
163 940 100         while (word != ~(uint64_t)0) {
164 929           int bit = __builtin_ctzll(~word);
165 929           uint64_t slot = (uint64_t)widx * 64 + bit;
166 929 100         if (slot >= cap) break;
167              
168 912           uint64_t new_word = word | ((uint64_t)1 << bit);
169 912 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
170             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
171 912           __atomic_store_n(&h->owners[slot], mypid, __ATOMIC_RELAXED);
172 912           memset(pool_slot_ptr(h, slot), 0, h->hdr->elem_size);
173 912           __atomic_add_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
174 912           __atomic_add_fetch(&h->hdr->stat_allocs, 1, __ATOMIC_RELAXED);
175             /* Advance hint past full word to reduce next scan */
176 11 100         h->scan_hint = (new_word == ~(uint64_t)0 && nwords > 1)
177 923 100         ? (widx + 1) % nwords : widx;
178 912           return (int64_t)slot;
179             }
180             /* CAS failed — word now holds current value, retry */
181             }
182             }
183 16           return -1;
184             }
185              
186             /* Blocking alloc. timeout<0 = infinite, 0 = non-blocking, >0 = seconds. */
187 907           static inline int64_t pool_alloc(PoolHandle *h, double timeout) {
188 907           int64_t slot = pool_try_alloc(h);
189 907 100         if (slot >= 0) return slot;
190 3 100         if (timeout == 0) return -1;
191              
192 2           PoolHeader *hdr = h->hdr;
193             struct timespec deadline, remaining;
194 2           int has_deadline = (timeout > 0);
195 2 50         if (has_deadline) pool_make_deadline(timeout, &deadline);
196              
197 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
198              
199 0           for (;;) {
200 2           uint32_t cur_used = __atomic_load_n(&hdr->used, __ATOMIC_RELAXED);
201 2 50         if (cur_used < (uint32_t)hdr->capacity) {
202 0           slot = pool_try_alloc(h);
203 0 0         if (slot >= 0) return slot;
204             }
205              
206 2           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
207              
208 2           cur_used = __atomic_load_n(&hdr->used, __ATOMIC_ACQUIRE);
209 2 50         if (cur_used >= (uint32_t)hdr->capacity) {
210 2           struct timespec *pts = NULL;
211 2 50         if (has_deadline) {
212 2 50         if (!pool_remaining_time(&deadline, &remaining)) {
213 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
214 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
215 0           return -1;
216             }
217 2           pts = &remaining;
218             }
219 2           syscall(SYS_futex, &hdr->used, FUTEX_WAIT, cur_used, pts, NULL, 0);
220             }
221              
222 2           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
223              
224 2           slot = pool_try_alloc(h);
225 2 100         if (slot >= 0) return slot;
226              
227 1 50         if (has_deadline) {
228 1 50         if (!pool_remaining_time(&deadline, &remaining)) {
229 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
230 1           return -1;
231             }
232             }
233             }
234             }
235              
236             /* ================================================================
237             * Free
238             * ================================================================ */
239              
240 691           static inline int pool_free_slot(PoolHandle *h, uint64_t slot) {
241 691           PoolHeader *hdr = h->hdr;
242 691 50         if (slot >= hdr->capacity) return 0;
243              
244 691           uint32_t widx = (uint32_t)(slot / 64);
245 691           int bit = (int)(slot % 64);
246 691           uint64_t mask = (uint64_t)1 << bit;
247              
248 0           for (;;) {
249 691           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
250 1380 100         if (!(word & mask)) return 0;
251              
252 689           uint64_t new_word = word & ~mask;
253 689 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
254             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
255 689           __atomic_store_n(&h->owners[slot], 0, __ATOMIC_RELAXED);
256 689           __atomic_sub_fetch(&hdr->used, 1, __ATOMIC_RELEASE);
257 689           __atomic_add_fetch(&hdr->stat_frees, 1, __ATOMIC_RELAXED);
258 689 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
259 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
260 689           return 1;
261             }
262             }
263             }
264              
265             /* ================================================================
266             * Batch free — single used decrement + single futex wake
267             * ================================================================ */
268              
269 2           static inline uint32_t pool_free_n(PoolHandle *h, uint64_t *slots, uint32_t count) {
270 2           uint32_t freed = 0;
271 2           PoolHeader *hdr = h->hdr;
272              
273 6 100         for (uint32_t i = 0; i < count; i++) {
274 4           uint64_t slot = slots[i];
275 4 50         if (slot >= hdr->capacity) continue;
276              
277 4           uint32_t widx = (uint32_t)(slot / 64);
278 4           int bit = (int)(slot % 64);
279 4           uint64_t mask = (uint64_t)1 << bit;
280              
281 0           for (;;) {
282 4           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
283 8 50         if (!(word & mask)) break;
284 4           uint64_t new_word = word & ~mask;
285 4 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
286             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
287 4           __atomic_store_n(&h->owners[slot], 0, __ATOMIC_RELAXED);
288 4           freed++;
289 4           break;
290             }
291             }
292             }
293              
294 2 50         if (freed > 0) {
295 2           __atomic_sub_fetch(&hdr->used, freed, __ATOMIC_RELEASE);
296 2           __atomic_add_fetch(&hdr->stat_frees, freed, __ATOMIC_RELAXED);
297 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) {
298 0 0         int wake = freed < (uint32_t)INT_MAX ? (int)freed : INT_MAX;
299 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, wake, NULL, NULL, 0);
300             }
301             }
302 2           return freed;
303             }
304              
305             /* ================================================================
306             * Batch alloc — all-or-nothing, shared deadline
307             * ================================================================ */
308              
309 2           static inline int pool_alloc_n(PoolHandle *h, uint64_t *out, uint32_t count,
310             double timeout) {
311 2 50         if (count == 0) return 1;
312              
313 2 100         if (timeout == 0) {
314 2 50         for (uint32_t i = 0; i < count; i++) {
315 2           int64_t slot = pool_try_alloc(h);
316 2 100         if (slot < 0) {
317 1 50         if (i > 0) pool_free_n(h, out, i);
318 1           return 0;
319             }
320 1           out[i] = (uint64_t)slot;
321             }
322 0           return 1;
323             }
324              
325             struct timespec deadline, remaining;
326 1           int has_deadline = (timeout > 0);
327 1 50         if (has_deadline) pool_make_deadline(timeout, &deadline);
328              
329 4 100         for (uint32_t i = 0; i < count; i++) {
330 3           double t = timeout;
331 3 50         if (has_deadline) {
332 0 0         if (!pool_remaining_time(&deadline, &remaining)) {
333 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
334 0 0         if (i > 0) pool_free_n(h, out, i);
335 0           return 0;
336             }
337 0           t = (double)remaining.tv_sec + (double)remaining.tv_nsec / 1e9;
338             }
339 3           int64_t slot = pool_alloc(h, t);
340 3 50         if (slot < 0) {
341 0 0         if (i > 0) pool_free_n(h, out, i);
342 0           return 0;
343             }
344 3           out[i] = (uint64_t)slot;
345             }
346 1           return 1;
347             }
348              
349             /* ================================================================
350             * Stale recovery — CAS owner to narrow race window
351             * ================================================================ */
352              
353 2           static inline uint32_t pool_recover_stale(PoolHandle *h) {
354 2           uint32_t recovered = 0;
355 2           uint64_t cap = h->hdr->capacity;
356              
357 202 100         for (uint64_t slot = 0; slot < cap; slot++) {
358 201 100         if (!pool_is_allocated(h, slot)) continue;
359 22           uint32_t owner = __atomic_load_n(&h->owners[slot], __ATOMIC_ACQUIRE);
360 22 50         if (owner == 0 || pool_pid_alive(owner)) continue;
    100          
361              
362             /* CAS owner from dead PID to 0 — if it fails, slot was
363             * re-allocated or already recovered by another process */
364 21 50         if (!__atomic_compare_exchange_n(&h->owners[slot], &owner, 0,
365             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
366 0           continue;
367              
368             /* We now own the right to free this slot's bitmap bit */
369 21           uint32_t widx = (uint32_t)(slot / 64);
370 21           int bit = (int)(slot % 64);
371 21           uint64_t mask = (uint64_t)1 << bit;
372              
373 0           for (;;) {
374 21           uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
375 42 50         if (!(word & mask)) break;
376 21           uint64_t new_word = word & ~mask;
377 21 50         if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
378             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
379 21           __atomic_sub_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
380 21           __atomic_add_fetch(&h->hdr->stat_frees, 1, __ATOMIC_RELAXED);
381 21 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
382 0           syscall(SYS_futex, &h->hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
383 21           recovered++;
384 21           break;
385             }
386             }
387             }
388              
389 2 50         if (recovered > 0)
390 2           __atomic_add_fetch(&h->hdr->stat_recoveries, recovered, __ATOMIC_RELAXED);
391              
392 2           return recovered;
393             }
394              
395             /* ================================================================
396             * Layout calculation
397             * ================================================================ */
398              
399 42           static inline void pool_calc_layout(uint64_t capacity, uint32_t elem_size,
400             uint64_t *bitmap_off, uint64_t *owners_off,
401             uint64_t *data_off, uint64_t *total_size,
402             uint32_t *bitmap_words_out) {
403 42           uint32_t bwords = (uint32_t)((capacity + 63) / 64);
404 42           uint64_t bitmap_sz = (uint64_t)bwords * 8;
405 42           uint64_t owners_sz = POOL_ALIGN8(capacity * 4);
406 42           uint64_t data_sz = (uint64_t)capacity * elem_size;
407              
408 42           *bitmap_off = sizeof(PoolHeader);
409 42           *owners_off = *bitmap_off + bitmap_sz;
410 42           *data_off = *owners_off + owners_sz;
411 42           *total_size = *data_off + data_sz;
412 42           *bitmap_words_out = bwords;
413 42           }
414              
415             /* ================================================================
416             * Header initialization (shared by pool_create and pool_create_memfd)
417             * ================================================================ */
418              
419 40           static inline void pool_init_header(void *base, uint64_t total,
420             uint32_t elem_size, uint32_t variant_id,
421             uint64_t capacity, uint64_t bm_off,
422             uint64_t own_off, uint64_t dat_off) {
423 40           PoolHeader *hdr = (PoolHeader *)base;
424 40           memset(base, 0, (size_t)total);
425 40           hdr->magic = POOL_MAGIC;
426 40           hdr->version = POOL_VERSION;
427 40           hdr->elem_size = elem_size;
428 40           hdr->variant_id = variant_id;
429 40           hdr->capacity = capacity;
430 40           hdr->total_size = total;
431 40           hdr->data_off = dat_off;
432 40           hdr->bitmap_off = bm_off;
433 40           hdr->owners_off = own_off;
434 40           __atomic_thread_fence(__ATOMIC_SEQ_CST);
435 40           }
436              
437             /* ================================================================
438             * Create / Open / Close
439             * ================================================================ */
440              
441             #define POOL_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, POOL_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
442              
443             /* Max capacity to prevent bitmap_words (uint32_t) truncation */
444             #define POOL_MAX_CAPACITY ((uint64_t)UINT32_MAX * 64)
445              
446 47           static inline PoolHandle *pool_setup_handle(void *base, size_t map_size,
447             const char *path, int backing_fd) {
448 47           PoolHeader *hdr = (PoolHeader *)base;
449 47           PoolHandle *h = (PoolHandle *)calloc(1, sizeof(PoolHandle));
450 47 50         if (!h) { munmap(base, map_size); return NULL; }
451              
452 47           h->hdr = hdr;
453 47           h->bitmap = (uint64_t *)((uint8_t *)base + hdr->bitmap_off);
454 47           h->owners = (uint32_t *)((uint8_t *)base + hdr->owners_off);
455 47           h->data = (uint8_t *)base + hdr->data_off;
456 47           h->mmap_size = map_size;
457 47           h->bitmap_words = (uint32_t)((hdr->capacity + 63) / 64);
458 47 100         h->path = path ? strdup(path) : NULL;
459 47           h->notify_fd = -1;
460 47           h->backing_fd = backing_fd;
461 47           h->scan_hint = (uint32_t)getpid() % h->bitmap_words;
462              
463 47           return h;
464             }
465              
466 36           static PoolHandle *pool_create(const char *path, uint64_t capacity,
467             uint32_t elem_size, uint32_t variant_id,
468             char *errbuf) {
469 36 50         if (errbuf) errbuf[0] = '\0';
470              
471 36 50         if (capacity == 0) { POOL_ERR("capacity must be > 0"); return NULL; }
    0          
472 36 50         if (elem_size == 0) { POOL_ERR("elem_size must be > 0"); return NULL; }
    0          
473 36 50         if (capacity > POOL_MAX_CAPACITY) { POOL_ERR("capacity too large"); return NULL; }
    0          
474 36 50         if (capacity > (UINT64_MAX - sizeof(PoolHeader)) / elem_size) {
475 0 0         POOL_ERR("capacity * elem_size overflow"); return NULL;
476             }
477              
478             uint64_t bm_off, own_off, dat_off, total;
479             uint32_t bwords;
480 36           pool_calc_layout(capacity, elem_size, &bm_off, &own_off, &dat_off, &total, &bwords);
481              
482 36           int fd = -1;
483 36           int anonymous = (path == NULL);
484             size_t map_size;
485             void *base;
486              
487 36 100         if (anonymous) {
488 25           map_size = (size_t)total;
489 25           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
490             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
491 25 50         if (base == MAP_FAILED) {
492 0 0         POOL_ERR("mmap(anonymous): %s", strerror(errno));
493 0           return NULL;
494             }
495             } else {
496 11           fd = open(path, O_RDWR | O_CREAT, 0666);
497 13 50         if (fd < 0) { POOL_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
498              
499 11 50         if (flock(fd, LOCK_EX) < 0) {
500 0 0         POOL_ERR("flock(%s): %s", path, strerror(errno));
501 0           close(fd); return NULL;
502             }
503              
504             struct stat st;
505 11 50         if (fstat(fd, &st) < 0) {
506 0 0         POOL_ERR("fstat(%s): %s", path, strerror(errno));
507 0           flock(fd, LOCK_UN); close(fd); return NULL;
508             }
509              
510 11           int is_new = (st.st_size == 0);
511              
512 11 100         if (!is_new && (uint64_t)st.st_size < sizeof(PoolHeader)) {
    50          
513 0 0         POOL_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
514 0           flock(fd, LOCK_UN); close(fd); return NULL;
515             }
516              
517 11 100         if (is_new) {
518 9 50         if (ftruncate(fd, (off_t)total) < 0) {
519 0 0         POOL_ERR("ftruncate(%s): %s", path, strerror(errno));
520 0           flock(fd, LOCK_UN); close(fd); return NULL;
521             }
522             }
523              
524 11 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
525 11           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
526 11 50         if (base == MAP_FAILED) {
527 0 0         POOL_ERR("mmap(%s): %s", path, strerror(errno));
528 0           flock(fd, LOCK_UN); close(fd); return NULL;
529             }
530              
531 11 100         if (!is_new) {
532 2           PoolHeader *hdr = (PoolHeader *)base;
533 6           int valid = (hdr->magic == POOL_MAGIC &&
534 2 50         hdr->version == POOL_VERSION &&
535 2 50         hdr->variant_id == variant_id &&
536 6 50         hdr->capacity > 0 &&
    50          
537 2 50         hdr->total_size == (uint64_t)st.st_size);
538 2 50         if (!valid) {
539 0 0         POOL_ERR("%s: invalid or incompatible pool file", path);
540 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
541             }
542 2           flock(fd, LOCK_UN);
543 2           close(fd);
544 2           return pool_setup_handle(base, map_size, path, -1);
545             }
546             }
547              
548             /* Initialize header — flock still held for file-backed new files */
549 34           pool_init_header(base, total, elem_size, variant_id, capacity,
550             bm_off, own_off, dat_off);
551              
552 34 100         if (fd >= 0) {
553 9           flock(fd, LOCK_UN);
554 9           close(fd);
555             }
556              
557 34           return pool_setup_handle(base, map_size, path, -1);
558             }
559              
560 6           static PoolHandle *pool_create_memfd(const char *name, uint64_t capacity,
561             uint32_t elem_size, uint32_t variant_id,
562             char *errbuf) {
563 6 50         if (errbuf) errbuf[0] = '\0';
564              
565 6 50         if (capacity == 0) { POOL_ERR("capacity must be > 0"); return NULL; }
    0          
566 6 50         if (elem_size == 0) { POOL_ERR("elem_size must be > 0"); return NULL; }
    0          
567 6 50         if (capacity > POOL_MAX_CAPACITY) { POOL_ERR("capacity too large"); return NULL; }
    0          
568 6 50         if (capacity > (UINT64_MAX - sizeof(PoolHeader)) / elem_size) {
569 0 0         POOL_ERR("capacity * elem_size overflow"); return NULL;
570             }
571              
572             uint64_t bm_off, own_off, dat_off, total;
573             uint32_t bwords;
574 6           pool_calc_layout(capacity, elem_size, &bm_off, &own_off, &dat_off, &total, &bwords);
575              
576 6 50         int fd = memfd_create(name ? name : "pool", MFD_CLOEXEC);
577 6 50         if (fd < 0) { POOL_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
578              
579 6 50         if (ftruncate(fd, (off_t)total) < 0) {
580 0 0         POOL_ERR("ftruncate(memfd): %s", strerror(errno));
581 0           close(fd); return NULL;
582             }
583              
584 6           void *base = mmap(NULL, (size_t)total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
585 6 50         if (base == MAP_FAILED) {
586 0 0         POOL_ERR("mmap(memfd): %s", strerror(errno));
587 0           close(fd); return NULL;
588             }
589              
590 6           pool_init_header(base, total, elem_size, variant_id, capacity,
591             bm_off, own_off, dat_off);
592              
593 6           return pool_setup_handle(base, (size_t)total, NULL, fd);
594             }
595              
596 5           static PoolHandle *pool_open_fd(int fd, uint32_t variant_id, char *errbuf) {
597 5 50         if (errbuf) errbuf[0] = '\0';
598              
599             struct stat st;
600 5 50         if (fstat(fd, &st) < 0) {
601 0 0         POOL_ERR("fstat(fd=%d): %s", fd, strerror(errno));
602 0           return NULL;
603             }
604              
605 5 50         if ((uint64_t)st.st_size < sizeof(PoolHeader)) {
606 0 0         POOL_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
607 0           return NULL;
608             }
609              
610 5           size_t map_size = (size_t)st.st_size;
611 5           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
612 5 50         if (base == MAP_FAILED) {
613 0 0         POOL_ERR("mmap(fd=%d): %s", fd, strerror(errno));
614 0           return NULL;
615             }
616              
617 5           PoolHeader *hdr = (PoolHeader *)base;
618 15           int valid = (hdr->magic == POOL_MAGIC &&
619 5 50         hdr->version == POOL_VERSION &&
620 5 50         hdr->variant_id == variant_id &&
621 15 50         hdr->capacity > 0 &&
    50          
622 5 50         hdr->total_size == (uint64_t)st.st_size);
623 5 50         if (!valid) {
624 0 0         POOL_ERR("fd %d: invalid or incompatible pool", fd);
625 0           munmap(base, map_size);
626 0           return NULL;
627             }
628              
629 5           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
630 5 50         if (myfd < 0) {
631 0 0         POOL_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
632 0           munmap(base, map_size);
633 0           return NULL;
634             }
635              
636 5           return pool_setup_handle(base, map_size, NULL, myfd);
637             }
638              
639 47           static void pool_destroy(PoolHandle *h) {
640 47 50         if (!h) return;
641 47 100         if (h->notify_fd >= 0) close(h->notify_fd);
642 47 100         if (h->backing_fd >= 0) close(h->backing_fd);
643 47 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
644 47           free(h->path);
645 47           free(h);
646             }
647              
648             /* ================================================================
649             * Eventfd integration
650             * ================================================================ */
651              
652 2           static int pool_create_eventfd(PoolHandle *h) {
653 2 100         if (h->notify_fd >= 0) close(h->notify_fd);
654 2           int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
655 2 50         if (efd < 0) return -1;
656 2           h->notify_fd = efd;
657 2           return efd;
658             }
659              
660 3           static int pool_notify(PoolHandle *h) {
661 3 50         if (h->notify_fd < 0) return 0;
662 3           uint64_t val = 1;
663 3           return write(h->notify_fd, &val, sizeof(val)) == sizeof(val);
664             }
665              
666 3           static int64_t pool_eventfd_consume(PoolHandle *h) {
667 3 50         if (h->notify_fd < 0) return -1;
668 3           uint64_t val = 0;
669 3 100         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
670 2           return (int64_t)val;
671             }
672              
673 1           static void pool_msync(PoolHandle *h) {
674 1           msync(h->hdr, h->mmap_size, MS_SYNC);
675 1           }
676              
677             /* ================================================================
678             * Typed accessors — integers (atomic)
679             * ================================================================ */
680              
681 62           static inline int64_t pool_get_i64(PoolHandle *h, uint64_t slot) {
682 62           return __atomic_load_n((int64_t *)pool_slot_ptr(h, slot), __ATOMIC_RELAXED);
683             }
684              
685 613           static inline void pool_set_i64(PoolHandle *h, uint64_t slot, int64_t val) {
686 613           __atomic_store_n((int64_t *)pool_slot_ptr(h, slot), val, __ATOMIC_RELAXED);
687 613           }
688              
689 3           static inline int pool_cas_i64(PoolHandle *h, uint64_t slot,
690             int64_t expected, int64_t desired) {
691 3           return __atomic_compare_exchange_n((int64_t *)pool_slot_ptr(h, slot),
692             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
693             }
694              
695 2           static inline int64_t pool_cmpxchg_i64(PoolHandle *h, uint64_t slot,
696             int64_t expected, int64_t desired) {
697 2           __atomic_compare_exchange_n((int64_t *)pool_slot_ptr(h, slot),
698             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
699 2           return expected;
700             }
701              
702 1           static inline int64_t pool_xchg_i64(PoolHandle *h, uint64_t slot, int64_t val) {
703 1           return __atomic_exchange_n((int64_t *)pool_slot_ptr(h, slot), val, __ATOMIC_ACQ_REL);
704             }
705              
706 7           static inline int64_t pool_add_i64(PoolHandle *h, uint64_t slot, int64_t delta) {
707 7           return __atomic_add_fetch((int64_t *)pool_slot_ptr(h, slot), delta, __ATOMIC_ACQ_REL);
708             }
709              
710 5           static inline int32_t pool_get_i32(PoolHandle *h, uint64_t slot) {
711 5           return __atomic_load_n((int32_t *)pool_slot_ptr(h, slot), __ATOMIC_RELAXED);
712             }
713              
714 6           static inline void pool_set_i32(PoolHandle *h, uint64_t slot, int32_t val) {
715 6           __atomic_store_n((int32_t *)pool_slot_ptr(h, slot), val, __ATOMIC_RELAXED);
716 6           }
717              
718 1           static inline int pool_cas_i32(PoolHandle *h, uint64_t slot,
719             int32_t expected, int32_t desired) {
720 1           return __atomic_compare_exchange_n((int32_t *)pool_slot_ptr(h, slot),
721             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
722             }
723              
724 2           static inline int32_t pool_cmpxchg_i32(PoolHandle *h, uint64_t slot,
725             int32_t expected, int32_t desired) {
726 2           __atomic_compare_exchange_n((int32_t *)pool_slot_ptr(h, slot),
727             &expected, desired, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
728 2           return expected;
729             }
730              
731 1           static inline int32_t pool_xchg_i32(PoolHandle *h, uint64_t slot, int32_t val) {
732 1           return __atomic_exchange_n((int32_t *)pool_slot_ptr(h, slot), val, __ATOMIC_ACQ_REL);
733             }
734              
735 2           static inline int32_t pool_add_i32(PoolHandle *h, uint64_t slot, int32_t delta) {
736 2           return __atomic_add_fetch((int32_t *)pool_slot_ptr(h, slot), delta, __ATOMIC_ACQ_REL);
737             }
738              
739             /* ================================================================
740             * Typed accessors — float (non-atomic)
741             * ================================================================ */
742              
743 7           static inline double pool_get_f64(PoolHandle *h, uint64_t slot) {
744             double v;
745 7           memcpy(&v, pool_slot_ptr(h, slot), sizeof(double));
746 7           return v;
747             }
748              
749 7           static inline void pool_set_f64(PoolHandle *h, uint64_t slot, double val) {
750 7           memcpy(pool_slot_ptr(h, slot), &val, sizeof(double));
751 7           }
752              
753             /* ================================================================
754             * Typed accessors — string (4-byte length prefix + data)
755             * ================================================================ */
756              
757 11           static inline uint32_t pool_get_str_len(PoolHandle *h, uint64_t slot) {
758             uint32_t len;
759 11           memcpy(&len, pool_slot_ptr(h, slot), sizeof(uint32_t));
760 11           uint32_t max_len = h->hdr->elem_size - sizeof(uint32_t);
761 11 50         if (len > max_len) len = max_len;
762 11           return len;
763             }
764              
765 11           static inline const char *pool_get_str_ptr(PoolHandle *h, uint64_t slot) {
766 11           return (const char *)(pool_slot_ptr(h, slot) + sizeof(uint32_t));
767             }
768              
769 8           static inline void pool_set_str(PoolHandle *h, uint64_t slot,
770             const char *str, uint32_t len) {
771 8           uint32_t max_len = h->hdr->elem_size - sizeof(uint32_t);
772 8 100         if (len > max_len) len = max_len;
773 8           memcpy(pool_slot_ptr(h, slot), &len, sizeof(uint32_t));
774 8           memcpy(pool_slot_ptr(h, slot) + sizeof(uint32_t), str, len);
775 8           }
776              
777             /* ================================================================
778             * Reset — free all slots (NOT concurrency-safe, caller must
779             * ensure no other process is accessing the pool)
780             * ================================================================ */
781              
782 10           static inline void pool_reset(PoolHandle *h) {
783 10           PoolHeader *hdr = h->hdr;
784 10           memset(h->bitmap, 0, (size_t)h->bitmap_words * 8);
785 10           memset(h->owners, 0, (size_t)hdr->capacity * 4);
786 10           __atomic_store_n(&hdr->used, 0, __ATOMIC_RELEASE);
787 10 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
788 0           syscall(SYS_futex, &hdr->used, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
789 10           }
790              
791             #endif /* POOL_H */