File Coverage

heap.h
Criterion Covered Total %
statement 216 252 85.7
branch 98 222 44.1
condition n/a
subroutine n/a
pod n/a
total 314 474 66.2


line stmt bran cond sub pod time code
1             /*
2             * heap.h -- Shared-memory binary min-heap (priority queue) for Linux
3             *
4             * Mutex-protected push/pop with sift-up/sift-down.
5             * Futex blocking when empty (pop_wait).
6             * Elements are (int64_t priority, int64_t value) pairs.
7             * Lowest priority pops first (min-heap).
8             */
9              
10             #ifndef HEAP_H
11             #define HEAP_H
12              
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29              
30             #define HEAP_MAGIC 0x48455031U /* "HEP1" */
31             #define HEAP_VERSION 1
32             #define HEAP_ERR_BUFLEN 256
33             #define HEAP_SPIN_LIMIT 32
34              
35             #define HEAP_MUTEX_BIT 0x80000000U
36             #define HEAP_MUTEX_PID 0x7FFFFFFFU
37              
38             /* Indices into the heap data are uint32_t (sift_up/down). Cap capacity
39             * to UINT32_MAX so size++ in heap_push cannot wrap and the (uint32_t)
40             * cast against hdr->capacity in the capacity check cannot truncate. */
41             #define HEAP_MAX_CAPACITY ((uint64_t)UINT32_MAX)
42              
43             typedef struct {
44             int64_t priority;
45             int64_t value;
46             } HeapEntry;
47              
48             /* ================================================================
49             * Header (128 bytes)
50             * ================================================================ */
51              
52             typedef struct {
53             uint32_t magic;
54             uint32_t version;
55             uint64_t capacity;
56             uint64_t total_size;
57             uint64_t data_off;
58             uint8_t _pad0[32];
59              
60             uint32_t size; /* 64: current element count (futex word for pop) */
61             uint32_t mutex; /* 68: 0=free, HEAP_MUTEX_BIT|pid=locked */
62             uint32_t mutex_waiters; /* 72 */
63             uint32_t waiters_pop; /* 76 */
64             uint64_t stat_pushes; /* 80 */
65             uint64_t stat_pops; /* 88 */
66             uint64_t stat_waits; /* 96 */
67             uint64_t stat_timeouts; /* 104 */
68             uint64_t stat_recoveries; /* 112 */
69             uint8_t _pad1[8]; /* 120-127 */
70             } HeapHeader;
71              
72             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
73             _Static_assert(sizeof(HeapHeader) == 128, "HeapHeader must be 128 bytes");
74             #endif
75              
76             typedef struct {
77             HeapHeader *hdr;
78             HeapEntry *data;
79             size_t mmap_size;
80             char *path;
81             int notify_fd;
82             int backing_fd;
83             } HeapHandle;
84              
85             /* ================================================================
86             * Mutex (PID-based, stale-recoverable)
87             * ================================================================ */
88              
89             static const struct timespec heap_lock_timeout = { 2, 0 };
90              
91 0           static inline int heap_pid_alive(uint32_t pid) {
92 0 0         if (pid == 0) return 1;
93 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
94             }
95              
96 99           static inline void heap_mutex_lock(HeapHeader *hdr) {
97 99           uint32_t mypid = HEAP_MUTEX_BIT | ((uint32_t)getpid() & HEAP_MUTEX_PID);
98 99           for (int spin = 0; ; spin++) {
99 99           uint32_t expected = 0;
100 99 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
101             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
102 99           return;
103 0 0         if (spin < HEAP_SPIN_LIMIT) {
104             #if defined(__x86_64__) || defined(__i386__)
105 0           __asm__ volatile("pause" ::: "memory");
106             #elif defined(__aarch64__)
107             __asm__ volatile("yield" ::: "memory");
108             #endif
109 0           continue;
110             }
111 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
112 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
113 0 0         if (cur != 0) {
114 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
115             &heap_lock_timeout, NULL, 0);
116 0 0         if (rc == -1 && errno == ETIMEDOUT && cur >= HEAP_MUTEX_BIT) {
    0          
    0          
117 0           uint32_t pid = cur & HEAP_MUTEX_PID;
118 0 0         if (!heap_pid_alive(pid)) {
119 0 0         if (__atomic_compare_exchange_n(&hdr->mutex, &cur, 0,
120             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
121 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
122             /* Wake one waiter so recovery latency is not bounded by the 2s timeout. */
123 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
124 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
125             }
126             }
127             }
128             }
129 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
130 0           spin = 0;
131             }
132             }
133              
134 99           static inline void heap_mutex_unlock(HeapHeader *hdr) {
135 99           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
136 99 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
137 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
138 99           }
139              
140             /* ================================================================
141             * Heap operations (must hold mutex)
142             * ================================================================ */
143              
144 33           static inline void heap_swap(HeapEntry *a, HeapEntry *b) {
145 33           HeapEntry t = *a; *a = *b; *b = t;
146 33           }
147              
148 53           static inline void heap_sift_up(HeapEntry *data, uint32_t idx) {
149 67 100         while (idx > 0) {
150 43           uint32_t parent = (idx - 1) / 2;
151 43 100         if (data[parent].priority <= data[idx].priority) break;
152 14           heap_swap(&data[parent], &data[idx]);
153 14           idx = parent;
154             }
155 53           }
156              
157 23           static inline void heap_sift_down(HeapEntry *data, uint32_t size, uint32_t idx) {
158 19           while (1) {
159 42           uint32_t smallest = idx;
160 42           uint32_t left = 2 * idx + 1;
161 42           uint32_t right = 2 * idx + 2;
162 42 100         if (left < size && data[left].priority < data[smallest].priority)
    100          
163 19           smallest = left;
164 42 100         if (right < size && data[right].priority < data[smallest].priority)
    100          
165 5           smallest = right;
166 42 100         if (smallest == idx) break;
167 19           heap_swap(&data[idx], &data[smallest]);
168 19           idx = smallest;
169             }
170 23           }
171              
172             /* ================================================================
173             * Public API
174             * ================================================================ */
175              
176 2           static inline void heap_make_deadline(double t, struct timespec *dl) {
177 2           clock_gettime(CLOCK_MONOTONIC, dl);
178 2           dl->tv_sec += (time_t)t;
179 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
180 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
181 2           }
182              
183 3           static inline int heap_remaining(const struct timespec *dl, struct timespec *rem) {
184             struct timespec now;
185 3           clock_gettime(CLOCK_MONOTONIC, &now);
186 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
187 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
188 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
189 3           return rem->tv_sec >= 0;
190             }
191              
192 57           static inline int heap_push(HeapHandle *h, int64_t priority, int64_t value) {
193 57           HeapHeader *hdr = h->hdr;
194 57           heap_mutex_lock(hdr);
195 57 100         if (hdr->size >= (uint32_t)hdr->capacity) {
196 4           heap_mutex_unlock(hdr);
197 4           return 0;
198             }
199 53           uint32_t idx = hdr->size++;
200 53           h->data[idx].priority = priority;
201 53           h->data[idx].value = value;
202 53           heap_sift_up(h->data, idx);
203 53           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
204 53           heap_mutex_unlock(hdr);
205             /* wake pop-waiters */
206 53 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
207 0           syscall(SYS_futex, &hdr->size, FUTEX_WAKE, 1, NULL, NULL, 0);
208 53           return 1;
209             }
210              
211 37           static inline int heap_pop(HeapHandle *h, int64_t *out_priority, int64_t *out_value) {
212 37           HeapHeader *hdr = h->hdr;
213 37           heap_mutex_lock(hdr);
214 37 100         if (hdr->size == 0) {
215 5           heap_mutex_unlock(hdr);
216 5           return 0;
217             }
218 32           *out_priority = h->data[0].priority;
219 32           *out_value = h->data[0].value;
220 32           hdr->size--;
221 32 100         if (hdr->size > 0) {
222 23           h->data[0] = h->data[hdr->size];
223 23           heap_sift_down(h->data, hdr->size, 0);
224             }
225 32           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
226 32           heap_mutex_unlock(hdr);
227 32           return 1;
228             }
229              
230 2           static inline int heap_pop_wait(HeapHandle *h, int64_t *out_p, int64_t *out_v, double timeout) {
231 2 50         if (heap_pop(h, out_p, out_v)) return 1;
232 2 50         if (timeout == 0) return 0;
233              
234 2           HeapHeader *hdr = h->hdr;
235             struct timespec dl, rem;
236 2           int has_dl = (timeout > 0);
237 2 50         if (has_dl) heap_make_deadline(timeout, &dl);
238 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
239              
240 0           for (;;) {
241 2           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
242 2           uint32_t cur = __atomic_load_n(&hdr->size, __ATOMIC_ACQUIRE);
243 2 50         if (cur == 0) {
244 2           struct timespec *pts = NULL;
245 2 50         if (has_dl) {
246 2 50         if (!heap_remaining(&dl, &rem)) {
247 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
248 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
249 0           return 0;
250             }
251 2           pts = &rem;
252             }
253 2           syscall(SYS_futex, &hdr->size, FUTEX_WAIT, 0, pts, NULL, 0);
254             }
255 2           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
256 2 100         if (heap_pop(h, out_p, out_v)) return 1;
257 1 50         if (has_dl && !heap_remaining(&dl, &rem)) {
    50          
258 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
259 1           return 0;
260             }
261             }
262             }
263              
264 2           static inline int heap_peek(HeapHandle *h, int64_t *out_p, int64_t *out_v) {
265 2           HeapHeader *hdr = h->hdr;
266 2           heap_mutex_lock(hdr);
267 2 50         if (hdr->size == 0) { heap_mutex_unlock(hdr); return 0; }
268 2           *out_p = h->data[0].priority;
269 2           *out_v = h->data[0].value;
270 2           heap_mutex_unlock(hdr);
271 2           return 1;
272             }
273              
274 24           static inline uint32_t heap_size(HeapHandle *h) {
275 24           return __atomic_load_n(&h->hdr->size, __ATOMIC_RELAXED);
276             }
277              
278             /* ================================================================
279             * Create / Open / Close
280             * ================================================================ */
281              
282             #define HEAP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, HEAP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
283              
284 15           static inline void heap_init_header(void *base, uint64_t total, uint64_t capacity) {
285 15           HeapHeader *hdr = (HeapHeader *)base;
286 15           memset(base, 0, (size_t)total);
287 15           hdr->magic = HEAP_MAGIC;
288 15           hdr->version = HEAP_VERSION;
289 15           hdr->capacity = capacity;
290 15           hdr->total_size = total;
291 15           hdr->data_off = sizeof(HeapHeader);
292 15           __atomic_thread_fence(__ATOMIC_SEQ_CST);
293 15           }
294              
295             /* Validate a mapped header (shared by heap_create reopen and heap_open_fd). */
296 2           static inline int heap_validate_header(const HeapHeader *hdr, uint64_t file_size) {
297 2 50         if (hdr->magic != HEAP_MAGIC) return 0;
298 2 50         if (hdr->version != HEAP_VERSION) return 0;
299 2 50         if (hdr->capacity == 0 || hdr->capacity > HEAP_MAX_CAPACITY) return 0;
    50          
300 2 50         if (hdr->total_size != file_size) return 0;
301 2 50         if (hdr->data_off != sizeof(HeapHeader)) return 0;
302 2           uint64_t exp_total = sizeof(HeapHeader) + hdr->capacity * sizeof(HeapEntry);
303 2 50         if (hdr->total_size != exp_total) return 0;
304             /* Runtime-state sanity: size must not exceed capacity (corrupted file). */
305 2 50         if (hdr->size > hdr->capacity) return 0;
306 2           return 1;
307             }
308              
309 17           static inline HeapHandle *heap_setup(void *base, size_t ms, const char *path, int bfd) {
310 17           HeapHeader *hdr = (HeapHeader *)base;
311 17           HeapHandle *h = (HeapHandle *)calloc(1, sizeof(HeapHandle));
312 17 50         if (!h) {
313 0           munmap(base, ms);
314 0 0         if (bfd >= 0) close(bfd);
315 0           return NULL;
316             }
317 17           h->hdr = hdr;
318 17           h->data = (HeapEntry *)((uint8_t *)base + hdr->data_off);
319 17           h->mmap_size = ms;
320 17 100         h->path = path ? strdup(path) : NULL;
321 17           h->notify_fd = -1;
322 17           h->backing_fd = bfd;
323 17           return h;
324             }
325              
326 9           static HeapHandle *heap_create(const char *path, uint64_t capacity, char *errbuf) {
327 9 50         if (errbuf) errbuf[0] = '\0';
328 9 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
329 9 50         if (capacity > HEAP_MAX_CAPACITY) { HEAP_ERR("capacity too large (max %u)", (unsigned)HEAP_MAX_CAPACITY); return NULL; }
    0          
330 9 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
331 0 0         HEAP_ERR("capacity overflow"); return NULL;
332             }
333              
334 9           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
335 9           int anonymous = (path == NULL);
336 9           int fd = -1;
337             size_t map_size;
338             void *base;
339              
340 9 100         if (anonymous) {
341 6           map_size = (size_t)total;
342 6           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
343 6 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
344             } else {
345 3           fd = open(path, O_RDWR|O_CREAT, 0666);
346 4 50         if (fd < 0) { HEAP_ERR("open: %s", strerror(errno)); return NULL; }
    0          
347 3 50         if (flock(fd, LOCK_EX) < 0) { HEAP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
348             struct stat st;
349 3 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
350 3           int is_new = (st.st_size == 0);
351 3 100         if (!is_new && (uint64_t)st.st_size < sizeof(HeapHeader)) {
    50          
352 0 0         HEAP_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
353 0           flock(fd, LOCK_UN); close(fd); return NULL;
354             }
355 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
356 0 0         HEAP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
357             }
358 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
359 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
360 3 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
361 3 100         if (!is_new) {
362 1 50         if (!heap_validate_header((HeapHeader *)base, (uint64_t)st.st_size)) {
363 0 0         HEAP_ERR("invalid heap file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
364             }
365 1           flock(fd, LOCK_UN); close(fd);
366 1           return heap_setup(base, map_size, path, -1);
367             }
368             }
369 8           heap_init_header(base, total, capacity);
370 8 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
371 8           return heap_setup(base, map_size, path, -1);
372             }
373              
374 7           static HeapHandle *heap_create_memfd(const char *name, uint64_t capacity, char *errbuf) {
375 7 50         if (errbuf) errbuf[0] = '\0';
376 7 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
377 7 50         if (capacity > HEAP_MAX_CAPACITY) { HEAP_ERR("capacity too large (max %u)", (unsigned)HEAP_MAX_CAPACITY); return NULL; }
    0          
378 7 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
379 0 0         HEAP_ERR("capacity overflow"); return NULL;
380             }
381 7           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
382 7 50         int fd = memfd_create(name ? name : "heap", MFD_CLOEXEC | MFD_ALLOW_SEALING);
383 7 50         if (fd < 0) { HEAP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
384 7 50         if (ftruncate(fd, (off_t)total) < 0) { HEAP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
385 7           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
386 7           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
387 7 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
388 7           heap_init_header(base, total, capacity);
389 7           return heap_setup(base, (size_t)total, NULL, fd);
390             }
391              
392 1           static HeapHandle *heap_open_fd(int fd, char *errbuf) {
393 1 50         if (errbuf) errbuf[0] = '\0';
394             struct stat st;
395 1 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
396 1 50         if ((uint64_t)st.st_size < sizeof(HeapHeader)) { HEAP_ERR("too small"); return NULL; }
    0          
397 1           size_t ms = (size_t)st.st_size;
398 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
399 1 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
400 1 50         if (!heap_validate_header((HeapHeader *)base, (uint64_t)st.st_size)) {
401 0 0         HEAP_ERR("invalid heap"); munmap(base, ms); return NULL;
402             }
403 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
404 1 50         if (myfd < 0) { HEAP_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
405 1           return heap_setup(base, ms, NULL, myfd);
406             }
407              
408 17           static void heap_destroy(HeapHandle *h) {
409 17 50         if (!h) return;
410 17 100         if (h->notify_fd >= 0) close(h->notify_fd);
411 17 100         if (h->backing_fd >= 0) close(h->backing_fd);
412 17 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
413 17           free(h->path);
414 17           free(h);
415             }
416              
417             /* Concurrency-safe: holds mutex (heap is already mutex-based) */
418 3           static void heap_clear(HeapHandle *h) {
419 3           heap_mutex_lock(h->hdr);
420 3           h->hdr->size = 0;
421 3           heap_mutex_unlock(h->hdr);
422 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
423 0           syscall(SYS_futex, &h->hdr->size, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
424 3           }
425              
426 2           static int heap_create_eventfd(HeapHandle *h) {
427 2 50         if (h->notify_fd >= 0) return h->notify_fd;
428 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
429 2 50         if (efd < 0) return -1;
430 2           h->notify_fd = efd; return efd;
431             }
432 2           static int heap_notify(HeapHandle *h) {
433 2 50         if (h->notify_fd < 0) return 0;
434 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
435             }
436 2           static int64_t heap_eventfd_consume(HeapHandle *h) {
437 2 50         if (h->notify_fd < 0) return -1;
438 2           uint64_t v = 0;
439 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
440 2           return (int64_t)v;
441             }
442 1           static int heap_msync(HeapHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
443              
444             #endif /* HEAP_H */