File Coverage

heap.h
Criterion Covered Total %
statement 206 235 87.6
branch 89 196 45.4
condition n/a
subroutine n/a
pod n/a
total 295 431 68.4


line stmt bran cond sub pod time code
1             /*
2             * heap.h -- Shared-memory binary min-heap (priority queue) for Linux
3             *
4             * Mutex-protected push/pop with sift-up/sift-down.
5             * Futex blocking when empty (pop_wait).
6             * Elements are (int64_t priority, int64_t value) pairs.
7             * Lowest priority pops first (min-heap).
8             */
9              
10             #ifndef HEAP_H
11             #define HEAP_H
12              
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29              
30             #define HEAP_MAGIC 0x48455031U /* "HEP1" */
31             #define HEAP_VERSION 1
32             #define HEAP_ERR_BUFLEN 256
33             #define HEAP_SPIN_LIMIT 32
34              
35             #define HEAP_MUTEX_BIT 0x80000000U
36             #define HEAP_MUTEX_PID 0x7FFFFFFFU
37              
38             typedef struct {
39             int64_t priority;
40             int64_t value;
41             } HeapEntry;
42              
43             /* ================================================================
44             * Header (128 bytes)
45             * ================================================================ */
46              
47             typedef struct {
48             uint32_t magic;
49             uint32_t version;
50             uint64_t capacity;
51             uint64_t total_size;
52             uint64_t data_off;
53             uint8_t _pad0[32];
54              
55             uint32_t size; /* 64: current element count (futex word for pop) */
56             uint32_t mutex; /* 68: 0=free, HEAP_MUTEX_BIT|pid=locked */
57             uint32_t mutex_waiters; /* 72 */
58             uint32_t waiters_pop; /* 76 */
59             uint64_t stat_pushes; /* 80 */
60             uint64_t stat_pops; /* 88 */
61             uint64_t stat_waits; /* 96 */
62             uint64_t stat_timeouts; /* 104 */
63             uint32_t stat_recoveries; /* 112 */
64             uint8_t _pad1[12]; /* 116-127 */
65             } HeapHeader;
66              
67             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
68             _Static_assert(sizeof(HeapHeader) == 128, "HeapHeader must be 128 bytes");
69             #endif
70              
71             typedef struct {
72             HeapHeader *hdr;
73             HeapEntry *data;
74             size_t mmap_size;
75             char *path;
76             int notify_fd;
77             int backing_fd;
78             } HeapHandle;
79              
80             /* ================================================================
81             * Mutex (PID-based, stale-recoverable)
82             * ================================================================ */
83              
84             static const struct timespec heap_lock_timeout = { 2, 0 };
85              
86 0           static inline int heap_pid_alive(uint32_t pid) {
87 0 0         if (pid == 0) return 1;
88 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
89             }
90              
91 46           static inline void heap_mutex_lock(HeapHeader *hdr) {
92 46           uint32_t mypid = HEAP_MUTEX_BIT | ((uint32_t)getpid() & HEAP_MUTEX_PID);
93 46           for (int spin = 0; ; spin++) {
94 46           uint32_t expected = 0;
95 46 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
96             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
97 46           return;
98 0 0         if (spin < HEAP_SPIN_LIMIT) {
99             #if defined(__x86_64__) || defined(__i386__)
100 0           __asm__ volatile("pause" ::: "memory");
101             #elif defined(__aarch64__)
102             __asm__ volatile("yield" ::: "memory");
103             #endif
104 0           continue;
105             }
106 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
107 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
108 0 0         if (cur != 0) {
109 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
110             &heap_lock_timeout, NULL, 0);
111 0 0         if (rc == -1 && errno == ETIMEDOUT && cur >= HEAP_MUTEX_BIT) {
    0          
    0          
112 0           uint32_t pid = cur & HEAP_MUTEX_PID;
113 0 0         if (!heap_pid_alive(pid)) {
114 0 0         if (__atomic_compare_exchange_n(&hdr->mutex, &cur, 0,
115             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
116 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
117             }
118             }
119             }
120 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
121 0           spin = 0;
122             }
123             }
124              
125 46           static inline void heap_mutex_unlock(HeapHeader *hdr) {
126 46           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
127 46 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
128 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
129 46           }
130              
131             /* ================================================================
132             * Heap operations (must hold mutex)
133             * ================================================================ */
134              
135 9           static inline void heap_swap(HeapEntry *a, HeapEntry *b) {
136 9           HeapEntry t = *a; *a = *b; *b = t;
137 9           }
138              
139 26           static inline void heap_sift_up(HeapEntry *data, uint32_t idx) {
140 31 100         while (idx > 0) {
141 19           uint32_t parent = (idx - 1) / 2;
142 19 100         if (data[parent].priority <= data[idx].priority) break;
143 5           heap_swap(&data[parent], &data[idx]);
144 5           idx = parent;
145             }
146 26           }
147              
148 7           static inline void heap_sift_down(HeapEntry *data, uint32_t size, uint32_t idx) {
149 4           while (1) {
150 11           uint32_t smallest = idx;
151 11           uint32_t left = 2 * idx + 1;
152 11           uint32_t right = 2 * idx + 2;
153 11 100         if (left < size && data[left].priority < data[smallest].priority)
    100          
154 4           smallest = left;
155 11 100         if (right < size && data[right].priority < data[smallest].priority)
    100          
156 1           smallest = right;
157 11 100         if (smallest == idx) break;
158 4           heap_swap(&data[idx], &data[smallest]);
159 4           idx = smallest;
160             }
161 7           }
162              
163             /* ================================================================
164             * Public API
165             * ================================================================ */
166              
167 2           static inline void heap_make_deadline(double t, struct timespec *dl) {
168 2           clock_gettime(CLOCK_MONOTONIC, dl);
169 2           dl->tv_sec += (time_t)t;
170 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
171 2 100         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
172 2           }
173              
174 3           static inline int heap_remaining(const struct timespec *dl, struct timespec *rem) {
175             struct timespec now;
176 3           clock_gettime(CLOCK_MONOTONIC, &now);
177 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
178 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
179 3 50         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
180 3           return rem->tv_sec >= 0;
181             }
182              
183 27           static inline int heap_push(HeapHandle *h, int64_t priority, int64_t value) {
184 27           HeapHeader *hdr = h->hdr;
185 27           heap_mutex_lock(hdr);
186 27 100         if (hdr->size >= (uint32_t)hdr->capacity) {
187 1           heap_mutex_unlock(hdr);
188 1           return 0;
189             }
190 26           uint32_t idx = hdr->size++;
191 26           h->data[idx].priority = priority;
192 26           h->data[idx].value = value;
193 26           heap_sift_up(h->data, idx);
194 26           __atomic_add_fetch(&hdr->stat_pushes, 1, __ATOMIC_RELAXED);
195 26           heap_mutex_unlock(hdr);
196             /* wake pop-waiters */
197 26 50         if (__atomic_load_n(&hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
198 0           syscall(SYS_futex, &hdr->size, FUTEX_WAKE, 1, NULL, NULL, 0);
199 26           return 1;
200             }
201              
202 15           static inline int heap_pop(HeapHandle *h, int64_t *out_priority, int64_t *out_value) {
203 15           HeapHeader *hdr = h->hdr;
204 15           heap_mutex_lock(hdr);
205 15 100         if (hdr->size == 0) {
206 4           heap_mutex_unlock(hdr);
207 4           return 0;
208             }
209 11           *out_priority = h->data[0].priority;
210 11           *out_value = h->data[0].value;
211 11           hdr->size--;
212 11 100         if (hdr->size > 0) {
213 7           h->data[0] = h->data[hdr->size];
214 7           heap_sift_down(h->data, hdr->size, 0);
215             }
216 11           __atomic_add_fetch(&hdr->stat_pops, 1, __ATOMIC_RELAXED);
217 11           heap_mutex_unlock(hdr);
218 11           return 1;
219             }
220              
221 2           static inline int heap_pop_wait(HeapHandle *h, int64_t *out_p, int64_t *out_v, double timeout) {
222 2 50         if (heap_pop(h, out_p, out_v)) return 1;
223 2 50         if (timeout == 0) return 0;
224              
225 2           HeapHeader *hdr = h->hdr;
226             struct timespec dl, rem;
227 2           int has_dl = (timeout > 0);
228 2 50         if (has_dl) heap_make_deadline(timeout, &dl);
229 2           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
230              
231 0           for (;;) {
232 2           __atomic_add_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELEASE);
233 2           uint32_t cur = __atomic_load_n(&hdr->size, __ATOMIC_ACQUIRE);
234 2 50         if (cur == 0) {
235 2           struct timespec *pts = NULL;
236 2 50         if (has_dl) {
237 2 50         if (!heap_remaining(&dl, &rem)) {
238 0           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
239 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
240 0           return 0;
241             }
242 2           pts = &rem;
243             }
244 2           syscall(SYS_futex, &hdr->size, FUTEX_WAIT, 0, pts, NULL, 0);
245             }
246 2           __atomic_sub_fetch(&hdr->waiters_pop, 1, __ATOMIC_RELAXED);
247 2 100         if (heap_pop(h, out_p, out_v)) return 1;
248 1 50         if (has_dl && !heap_remaining(&dl, &rem)) {
    50          
249 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
250 1           return 0;
251             }
252             }
253             }
254              
255 1           static inline int heap_peek(HeapHandle *h, int64_t *out_p, int64_t *out_v) {
256 1           HeapHeader *hdr = h->hdr;
257 1           heap_mutex_lock(hdr);
258 1 50         if (hdr->size == 0) { heap_mutex_unlock(hdr); return 0; }
259 1           *out_p = h->data[0].priority;
260 1           *out_v = h->data[0].value;
261 1           heap_mutex_unlock(hdr);
262 1           return 1;
263             }
264              
265 9           static inline uint32_t heap_size(HeapHandle *h) {
266 9           return __atomic_load_n(&h->hdr->size, __ATOMIC_RELAXED);
267             }
268              
269             /* ================================================================
270             * Create / Open / Close
271             * ================================================================ */
272              
273             #define HEAP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, HEAP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
274              
275 5           static inline void heap_init_header(void *base, uint64_t total, uint64_t capacity) {
276 5           HeapHeader *hdr = (HeapHeader *)base;
277 5           memset(base, 0, (size_t)total);
278 5           hdr->magic = HEAP_MAGIC;
279 5           hdr->version = HEAP_VERSION;
280 5           hdr->capacity = capacity;
281 5           hdr->total_size = total;
282 5           hdr->data_off = sizeof(HeapHeader);
283 5           __atomic_thread_fence(__ATOMIC_SEQ_CST);
284 5           }
285              
286 7           static inline HeapHandle *heap_setup(void *base, size_t ms, const char *path, int bfd) {
287 7           HeapHeader *hdr = (HeapHeader *)base;
288 7           HeapHandle *h = (HeapHandle *)calloc(1, sizeof(HeapHandle));
289 7 50         if (!h) { munmap(base, ms); return NULL; }
290 7           h->hdr = hdr;
291 7           h->data = (HeapEntry *)((uint8_t *)base + hdr->data_off);
292 7           h->mmap_size = ms;
293 7 100         h->path = path ? strdup(path) : NULL;
294 7           h->notify_fd = -1;
295 7           h->backing_fd = bfd;
296 7           return h;
297             }
298              
299 5           static HeapHandle *heap_create(const char *path, uint64_t capacity, char *errbuf) {
300 5 50         if (errbuf) errbuf[0] = '\0';
301 5 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
302 5 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
303 0 0         HEAP_ERR("capacity overflow"); return NULL;
304             }
305              
306 5           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
307 5           int anonymous = (path == NULL);
308 5           int fd = -1;
309             size_t map_size;
310             void *base;
311              
312 5 100         if (anonymous) {
313 2           map_size = (size_t)total;
314 2           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
315 2 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
316             } else {
317 3           fd = open(path, O_RDWR|O_CREAT, 0666);
318 4 50         if (fd < 0) { HEAP_ERR("open: %s", strerror(errno)); return NULL; }
    0          
319 3 50         if (flock(fd, LOCK_EX) < 0) { HEAP_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
320             struct stat st;
321 3 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
322 3           int is_new = (st.st_size == 0);
323 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
324 0 0         HEAP_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
325             }
326 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
327 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
328 3 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
329 3 100         if (!is_new) {
330 1           HeapHeader *hdr = (HeapHeader *)base;
331 1 50         if (hdr->magic != HEAP_MAGIC || hdr->version != HEAP_VERSION ||
    50          
332 1 50         hdr->total_size != (uint64_t)st.st_size) {
333 0 0         HEAP_ERR("invalid heap file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
334             }
335 1           flock(fd, LOCK_UN); close(fd);
336 1           return heap_setup(base, map_size, path, -1);
337             }
338             }
339 4           heap_init_header(base, total, capacity);
340 4 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
341 4           return heap_setup(base, map_size, path, -1);
342             }
343              
344 1           static HeapHandle *heap_create_memfd(const char *name, uint64_t capacity, char *errbuf) {
345 1 50         if (errbuf) errbuf[0] = '\0';
346 1 50         if (capacity == 0) { HEAP_ERR("capacity must be > 0"); return NULL; }
    0          
347 1 50         if (capacity > (UINT64_MAX - sizeof(HeapHeader)) / sizeof(HeapEntry)) {
348 0 0         HEAP_ERR("capacity overflow"); return NULL;
349             }
350 1           uint64_t total = sizeof(HeapHeader) + capacity * sizeof(HeapEntry);
351 1 50         int fd = memfd_create(name ? name : "heap", MFD_CLOEXEC);
352 1 50         if (fd < 0) { HEAP_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
353 1 50         if (ftruncate(fd, (off_t)total) < 0) { HEAP_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
354 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
355 1 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
356 1           heap_init_header(base, total, capacity);
357 1           return heap_setup(base, (size_t)total, NULL, fd);
358             }
359              
360 1           static HeapHandle *heap_open_fd(int fd, char *errbuf) {
361 1 50         if (errbuf) errbuf[0] = '\0';
362             struct stat st;
363 1 50         if (fstat(fd, &st) < 0) { HEAP_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
364 1 50         if ((uint64_t)st.st_size < sizeof(HeapHeader)) { HEAP_ERR("too small"); return NULL; }
    0          
365 1           size_t ms = (size_t)st.st_size;
366 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
367 1 50         if (base == MAP_FAILED) { HEAP_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
368 1           HeapHeader *hdr = (HeapHeader *)base;
369 1 50         if (hdr->magic != HEAP_MAGIC || hdr->version != HEAP_VERSION ||
    50          
370 1 50         hdr->total_size != (uint64_t)st.st_size) {
371 0 0         HEAP_ERR("invalid heap"); munmap(base, ms); return NULL;
372             }
373 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
374 1 50         if (myfd < 0) { HEAP_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
375 1           return heap_setup(base, ms, NULL, myfd);
376             }
377              
378 7           static void heap_destroy(HeapHandle *h) {
379 7 50         if (!h) return;
380 7 100         if (h->notify_fd >= 0) close(h->notify_fd);
381 7 100         if (h->backing_fd >= 0) close(h->backing_fd);
382 7 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
383 7           free(h->path);
384 7           free(h);
385             }
386              
387             /* Concurrency-safe: holds mutex (heap is already mutex-based) */
388 3           static void heap_clear(HeapHandle *h) {
389 3           heap_mutex_lock(h->hdr);
390 3           h->hdr->size = 0;
391 3           heap_mutex_unlock(h->hdr);
392 3 50         if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0)
393 0           syscall(SYS_futex, &h->hdr->size, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
394 3           }
395              
396 1           static int heap_create_eventfd(HeapHandle *h) {
397 1 50         if (h->notify_fd >= 0) close(h->notify_fd);
398 1           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
399 1 50         if (efd < 0) return -1;
400 1           h->notify_fd = efd; return efd;
401             }
402 1           static int heap_notify(HeapHandle *h) {
403 1 50         if (h->notify_fd < 0) return 0;
404 1           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
405             }
406 1           static int64_t heap_eventfd_consume(HeapHandle *h) {
407 1 50         if (h->notify_fd < 0) return -1;
408 1           uint64_t v = 0;
409 1 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
410 1           return (int64_t)v;
411             }
412 1           static void heap_msync(HeapHandle *h) { msync(h->hdr, h->mmap_size, MS_SYNC); }
413              
414             #endif /* HEAP_H */