File Coverage

heap.h
Criterion Covered Total %
statement 215 248 86.6
branch 96 212 45.2
condition n/a
subroutine n/a
pod n/a
total 311 460 67.6


line stmt bran cond sub pod time code
1             /*
2             * heap.h -- Shared-memory binary min-heap (priority queue) for Linux
3             *
4             * Mutex-protected push/pop with sift-up/sift-down.
5             * Futex blocking when empty (pop_wait).
6             * Elements are (int64_t priority, int64_t value) pairs.
7             * Lowest priority pops first (min-heap).
8             */
9              
10             #ifndef HEAP_H
11             #define HEAP_H
12              
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29              
30             #define HEAP_MAGIC 0x48455031U /* "HEP1" */
31             #define HEAP_VERSION 1
32             #define HEAP_ERR_BUFLEN 256
33             #define HEAP_SPIN_LIMIT 32
34              
35             #define HEAP_MUTEX_BIT 0x80000000U
36             #define HEAP_MUTEX_PID 0x7FFFFFFFU
37              
38             typedef struct {
39             int64_t priority;
40             int64_t value;
41             } HeapEntry;
42              
43             /* ================================================================
44             * Header (128 bytes)
45             * ================================================================ */
46              
47             typedef struct {
48             uint32_t magic;
49             uint32_t version;
50             uint64_t capacity;
51             uint64_t total_size;
52             uint64_t data_off;
53             uint8_t _pad0[32];
54              
55             uint32_t size; /* 64: current element count (futex word for pop) */
56             uint32_t mutex; /* 68: 0=free, HEAP_MUTEX_BIT|pid=locked */
57             uint32_t mutex_waiters; /* 72 */
58             uint32_t waiters_pop; /* 76 */
59             uint64_t stat_pushes; /* 80 */
60             uint64_t stat_pops; /* 88 */
61             uint64_t stat_waits; /* 96 */
62             uint64_t stat_timeouts; /* 104 */
63             uint64_t stat_recoveries; /* 112 */
64             uint8_t _pad1[8]; /* 120-127 */
65             } HeapHeader;
66              
67             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
68             _Static_assert(sizeof(HeapHeader) == 128, "HeapHeader must be 128 bytes");
69             #endif
70              
71             typedef struct {
72             HeapHeader *hdr;
73             HeapEntry *data;
74             size_t mmap_size;
75             char *path;
76             int notify_fd;
77             int backing_fd;
78             } HeapHandle;
79              
80             /* ================================================================
81             * Mutex (PID-based, stale-recoverable)
82             * ================================================================ */
83              
84             static const struct timespec heap_lock_timeout = { 2, 0 };
85              
86 0           static inline int heap_pid_alive(uint32_t pid) {
87 0 0         if (pid == 0) return 1;
88 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
89             }
90              
91 99           static inline void heap_mutex_lock(HeapHeader *hdr) {
92 99           uint32_t mypid = HEAP_MUTEX_BIT | ((uint32_t)getpid() & HEAP_MUTEX_PID);
93 99           for (int spin = 0; ; spin++) {
94 99           uint32_t expected = 0;
95 99 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
96             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
97 99           return;
98 0 0         if (spin < HEAP_SPIN_LIMIT) {
99             #if defined(__x86_64__) || defined(__i386__)
100 0           __asm__ volatile("pause" ::: "memory");
101             #elif defined(__aarch64__)
102             __asm__ volatile("yield" ::: "memory");
103             #endif
104 0           continue;
105             }
106 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
107 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
108 0 0         if (cur != 0) {
109 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
110             &heap_lock_timeout, NULL, 0);
111 0 0         if (rc == -1 && errno == ETIMEDOUT && cur >= HEAP_MUTEX_BIT) {
    0          
    0          
112 0           uint32_t pid = cur & HEAP_MUTEX_PID;
113 0 0         if (!heap_pid_alive(pid)) {
114 0 0         if (__atomic_compare_exchange_n(&hdr->mutex, &cur, 0,
115             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
116 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
117             /* Wake one waiter so recovery latency is not bounded by the 2s timeout. */
118 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
119 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
120             }
121             }
122             }
123             }
124 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
125 0           spin = 0;
126             }
127             }
128              
129 99           static inline void heap_mutex_unlock(HeapHeader *hdr) {
130 99           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
131 99 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
132 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
133 99           }
134              
135             /* ================================================================
136             * Heap operations (must hold mutex)
137             * ================================================================ */
138              
139 33           static inline void heap_swap(HeapEntry *a, HeapEntry *b) {
140 33           HeapEntry t = *a; *a = *b; *b = t;
141 33           }
142              
143 53           static inline void heap_sift_up(HeapEntry *data, uint32_t idx) {
144 67 100         while (idx > 0) {
145 43           uint32_t parent = (idx - 1) / 2;
146 43 100         if (data[parent].priority <= data[idx].priority) break;
147 14           heap_swap(&data[parent], &data[idx]);
148 14           idx = parent;
149             }
150 53           }
151              
152 23           static inline void heap_sift_down(HeapEntry *data, uint32_t size, uint32_t idx) {
153 19           while (1) {
154 42           uint32_t smallest = idx;
155 42           uint32_t left = 2 * idx + 1;
156 42           uint32_t right = 2 * idx + 2;
157 42 100         if (left < size && data[left].priority < data[smallest].priority)
    100          
158 19           smallest = left;
159 42 100         if (right < size && data[right].priority < data[smallest].priority)
    100          
160 5           smallest = right;
161 42 100         if (smallest == idx) break;
162 19           heap_swap(&data[idx], &data[smallest]);
163 19           idx = smallest;
164             }
165 23           }
166              
167             /* ================================================================
168             * Public API
169             * ================================================================ */
170              
171 2           static inline void heap_make_deadline(double t, struct timespec *dl) {
172 2           clock_gettime(CLOCK_MONOTONIC, dl);
173 2           dl->tv_sec += (time_t)t;
174 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
175 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
176 2           }
177              
178 3           static inline int heap_remaining(const struct timespec *dl, struct timespec *rem) {
179             struct timespec now;
180 3           clock_gettime(CLOCK_MONOTONIC, &now);
181 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
182 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
183 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
184 3           return rem->tv_sec >= 0;
185             }
186              
187 57           static inline int heap_push(HeapHandle *h, int64_t priority, int64_t value) {
188 57           HeapHeader *hdr = h->hdr;
189 57           heap_mutex_lock(hdr);
190 57 100         if (hdr->size >= (uint32_t)hdr->capacity) {
191 4           heap_mutex_unlock(hdr);
192 4           return 0;
193             }
194 53           uint32_t idx = hdr->size++;
195 53           h->data[idx].priority = priority;
196 53           h->data[idx].value = value;
197 53           heap_sift_up(h->data, idx);
198 53           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
199 53           heap_mutex_unlock(hdr);
200             /* wake pop-waiters */
201 53 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
202 0           syscall(SYS_futex, &hdr->size, FUTEX_WAKE, 1, NULL, NULL, 0);
203 53           return 1;
204             }
205              
206 37           static inline int heap_pop(HeapHandle *h, int64_t *out_priority, int64_t *out_value) {
207 37           HeapHeader *hdr = h->hdr;
208 37           heap_mutex_lock(hdr);
209 37 100         if (hdr->size == 0) {
210 5           heap_mutex_unlock(hdr);
211 5           return 0;
212             }
213 32           *out_priority = h->data[0].priority;
214 32           *out_value = h->data[0].value;
215 32           hdr->size--;
216 32 100         if (hdr->size > 0) {
217 23           h->data[0] = h->data[hdr->size];
218 23           heap_sift_down(h->data, hdr->size, 0);
219             }
220 32           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
221 32           heap_mutex_unlock(hdr);
222 32           return 1;
223             }
224              
225 2           static inline int heap_pop_wait(HeapHandle *h, int64_t *out_p, int64_t *out_v, double timeout) {
226 2 50         if (heap_pop(h, out_p, out_v)) return 1;
227 2 50         if (timeout == 0) return 0;
228              
229 2           HeapHeader *hdr = h->hdr;
230             struct timespec dl, rem;
231 2           int has_dl = (timeout > 0);
232 2 50         if (has_dl) heap_make_deadline(timeout, &dl);
233 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
234              
235 0           for (;;) {
236 2           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
237 2           uint32_t cur = __atomic_load_n(&hdr->size, __ATOMIC_ACQUIRE);
238 2 50         if (cur == 0) {
239 2           struct timespec *pts = NULL;
240 2 50         if (has_dl) {
241 2 50         if (!heap_remaining(&dl, &rem)) {
242 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
243 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
244 0           return 0;
245             }
246 2           pts = &rem;
247             }
248 2           syscall(SYS_futex, &hdr->size, FUTEX_WAIT, 0, pts, NULL, 0);
249             }
250 2           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
251 2 100         if (heap_pop(h, out_p, out_v)) return 1;
252 1 50         if (has_dl && !heap_remaining(&dl, &rem)) {
    50          
253 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
254 1           return 0;
255             }
256             }
257             }
258              
259 2           static inline int heap_peek(HeapHandle *h, int64_t *out_p, int64_t *out_v) {
260 2           HeapHeader *hdr = h->hdr;
261 2           heap_mutex_lock(hdr);
262 2 50         if (hdr->size == 0) { heap_mutex_unlock(hdr); return 0; }
263 2           *out_p = h->data[0].priority;
264 2           *out_v = h->data[0].value;
265 2           heap_mutex_unlock(hdr);
266 2           return 1;
267             }
268              
269 24           static inline uint32_t heap_size(HeapHandle *h) {
270 24           return __atomic_load_n(&h->hdr->size, __ATOMIC_RELAXED);
271             }
272              
273             /* ================================================================
274             * Create / Open / Close
275             * ================================================================ */
276              
277             #define HEAP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, HEAP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
278              
279 15           static inline void heap_init_header(void *base, uint64_t total, uint64_t capacity) {
280 15           HeapHeader *hdr = (HeapHeader *)base;
281 15           memset(base, 0, (size_t)total);
282 15           hdr->magic = HEAP_MAGIC;
283 15           hdr->version = HEAP_VERSION;
284 15           hdr->capacity = capacity;
285 15           hdr->total_size = total;
286 15           hdr->data_off = sizeof(HeapHeader);
287 15           __atomic_thread_fence(__ATOMIC_SEQ_CST);
288 15           }
289              
290             /* Validate a mapped header (shared by heap_create reopen and heap_open_fd). */
291 2           static inline int heap_validate_header(const HeapHeader *hdr, uint64_t file_size) {
292 2 50         if (hdr->magic != HEAP_MAGIC) return 0;
293 2 50         if (hdr->version != HEAP_VERSION) return 0;
294 2 50         if (hdr->capacity == 0) return 0;
295 2 50         if (hdr->capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) return 0;
296 2 50         if (hdr->total_size != file_size) return 0;
297 2 50         if (hdr->data_off != sizeof(HeapHeader)) return 0;
298 2           uint64_t exp_total = sizeof(HeapHeader) + hdr->capacity * sizeof(HeapEntry);
299 2 50         if (hdr->total_size != exp_total) return 0;
300             /* Runtime-state sanity: size must not exceed capacity (corrupted file). */
301 2 50         if (hdr->size > hdr->capacity) return 0;
302 2           return 1;
303             }
304              
305 17           static inline HeapHandle *heap_setup(void *base, size_t ms, const char *path, int bfd) {
306 17           HeapHeader *hdr = (HeapHeader *)base;
307 17           HeapHandle *h = (HeapHandle *)calloc(1, sizeof(HeapHandle));
308 17 50         if (!h) { munmap(base, ms); return NULL; }
309 17           h->hdr = hdr;
310 17           h->data = (HeapEntry *)((uint8_t *)base + hdr->data_off);
311 17           h->mmap_size = ms;
312 17 100         h->path = path ? strdup(path) : NULL;
313 17           h->notify_fd = -1;
314 17           h->backing_fd = bfd;
315 17           return h;
316             }
317              
318 9           static HeapHandle *heap_create(const char *path, uint64_t capacity, char *errbuf) {
319 9 50         if (errbuf) errbuf[0] = '\0';
320 9 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
321 9 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
322 0 0         HEAP_ERR("capacity overflow"); return NULL;
323             }
324              
325 9           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
326 9           int anonymous = (path == NULL);
327 9           int fd = -1;
328             size_t map_size;
329             void *base;
330              
331 9 100         if (anonymous) {
332 6           map_size = (size_t)total;
333 6           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
334 6 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
335             } else {
336 3           fd = open(path, O_RDWR|O_CREAT, 0666);
337 4 50         if (fd < 0) { HEAP_ERR("open: %s", strerror(errno)); return NULL; }
    0          
338 3 50         if (flock(fd, LOCK_EX) < 0) { HEAP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
339             struct stat st;
340 3 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
341 3           int is_new = (st.st_size == 0);
342 3 100         if (!is_new && (uint64_t)st.st_size < sizeof(HeapHeader)) {
    50          
343 0 0         HEAP_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
344 0           flock(fd, LOCK_UN); close(fd); return NULL;
345             }
346 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
347 0 0         HEAP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
348             }
349 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
350 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
351 3 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
352 3 100         if (!is_new) {
353 1 50         if (!heap_validate_header((HeapHeader *)base, (uint64_t)st.st_size)) {
354 0 0         HEAP_ERR("invalid heap file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
355             }
356 1           flock(fd, LOCK_UN); close(fd);
357 1           return heap_setup(base, map_size, path, -1);
358             }
359             }
360 8           heap_init_header(base, total, capacity);
361 8 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
362 8           return heap_setup(base, map_size, path, -1);
363             }
364              
365 7           static HeapHandle *heap_create_memfd(const char *name, uint64_t capacity, char *errbuf) {
366 7 50         if (errbuf) errbuf[0] = '\0';
367 7 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
368 7 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
369 0 0         HEAP_ERR("capacity overflow"); return NULL;
370             }
371 7           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
372 7 50         int fd = memfd_create(name ? name : "heap", MFD_CLOEXEC | MFD_ALLOW_SEALING);
373 7 50         if (fd < 0) { HEAP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
374 7 50         if (ftruncate(fd, (off_t)total) < 0) { HEAP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
375 7           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
376 7           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
377 7 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
378 7           heap_init_header(base, total, capacity);
379 7           return heap_setup(base, (size_t)total, NULL, fd);
380             }
381              
382 1           static HeapHandle *heap_open_fd(int fd, char *errbuf) {
383 1 50         if (errbuf) errbuf[0] = '\0';
384             struct stat st;
385 1 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
386 1 50         if ((uint64_t)st.st_size < sizeof(HeapHeader)) { HEAP_ERR("too small"); return NULL; }
    0          
387 1           size_t ms = (size_t)st.st_size;
388 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
389 1 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
390 1 50         if (!heap_validate_header((HeapHeader *)base, (uint64_t)st.st_size)) {
391 0 0         HEAP_ERR("invalid heap"); munmap(base, ms); return NULL;
392             }
393 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
394 1 50         if (myfd < 0) { HEAP_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
395 1           return heap_setup(base, ms, NULL, myfd);
396             }
397              
398 17           static void heap_destroy(HeapHandle *h) {
399 17 50         if (!h) return;
400 17 100         if (h->notify_fd >= 0) close(h->notify_fd);
401 17 100         if (h->backing_fd >= 0) close(h->backing_fd);
402 17 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
403 17           free(h->path);
404 17           free(h);
405             }
406              
407             /* Concurrency-safe: holds mutex (heap is already mutex-based) */
408 3           static void heap_clear(HeapHandle *h) {
409 3           heap_mutex_lock(h->hdr);
410 3           h->hdr->size = 0;
411 3           heap_mutex_unlock(h->hdr);
412 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
413 0           syscall(SYS_futex, &h->hdr->size, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
414 3           }
415              
416 2           static int heap_create_eventfd(HeapHandle *h) {
417 2 50         if (h->notify_fd >= 0) return h->notify_fd;
418 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
419 2 50         if (efd < 0) return -1;
420 2           h->notify_fd = efd; return efd;
421             }
422 2           static int heap_notify(HeapHandle *h) {
423 2 50         if (h->notify_fd < 0) return 0;
424 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
425             }
426 2           static int64_t heap_eventfd_consume(HeapHandle *h) {
427 2 50         if (h->notify_fd < 0) return -1;
428 2           uint64_t v = 0;
429 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
430 2           return (int64_t)v;
431             }
432 1           static int heap_msync(HeapHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
433              
434             #endif /* HEAP_H */