File Coverage

hll.h
Criterion Covered Total %
statement 202 332 60.8
branch 86 258 33.3
condition n/a
subroutine n/a
pod n/a
total 288 590 48.8


line stmt bran cond sub pod time code
1             /*
2             * hll.h -- Shared-memory HyperLogLog cardinality estimator for Linux
3             *
4             * Estimates the number of distinct items seen (probabilistic distinct-count)
5             * using a fixed array of m = 2^precision single-byte registers. Each item is
6             * hashed (XXH3); the top `precision` bits pick a register, the position of the
7             * first set bit in the rest updates that register with a running maximum. The
8             * register array lives in a shared mapping so several processes share one
9             * estimator; a write-preferring futex rwlock with reader-slot dead-process
10             * recovery guards mutation. Two estimators of equal precision can be merged
11             * (register-wise max).
12             *
13             * Layout: Header -> reader_slots[1024] -> regs[m]
14             */
15              
16             #ifndef HLL_H
17             #define HLL_H
18              
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36              
37             #define XXH_INLINE_ALL
38             #include "xxhash.h"
39              
40             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
41             #error "hll.h: requires little-endian architecture"
42             #endif
43              
44              
45             /* ================================================================
46             * Constants
47             * ================================================================ */
48              
49             #define HLL_MAGIC 0x474C4C48U /* "HLLG" (little-endian) */
50             #define HLL_VERSION 1
51             #define HLL_ERR_BUFLEN 256
52             #define HLL_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
53             #define HLL_MIN_PRECISION 4 /* m = 16 registers */
54             #define HLL_MAX_PRECISION 18 /* m = 262144 registers (256 KB) */
55              
56             #define HLL_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, HLL_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
57              
58             /* ================================================================
59             * Structs
60             * ================================================================ */
61              
62             /* Per-process slot for dead-process recovery. Each shared rwlock counter
63             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
64             * is mirrored here so a wrlock timeout can attribute and reverse a dead
65             * process's contribution instead of waiting for the slow per-op timeout
66             * drain. */
67             typedef struct {
68             uint32_t pid; /* 0 = unclaimed */
69             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
70             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
71             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
72             } HllReaderSlot;
73              
74             struct HllHeader {
75             uint32_t magic, version; /* 0,4 */
76             uint32_t precision; /* 8 register-index bit count */
77             uint32_t m; /* 12 register count (= 1 << precision) */
78             uint32_t _pad0; /* 16 */
79             uint32_t _pad1; /* 20 */
80             uint64_t total_size; /* 24 */
81             uint64_t reader_slots_off; /* 32 */
82             uint64_t regs_off; /* 40 */
83             uint32_t rwlock; /* 48 */
84             uint32_t rwlock_waiters; /* 52 */
85             uint32_t rwlock_writers_waiting; /* 56 */
86             uint32_t _pad2; /* 60 */
87             uint64_t stat_ops; /* 64 */
88             uint8_t _pad[184]; /* 72..255 */
89             };
90             typedef struct HllHeader HllHeader;
91              
92             _Static_assert(sizeof(HllHeader) == 256, "HllHeader must be 256 bytes");
93              
94             /* ---- Process-local handle ---- */
95              
96             typedef struct HllHandle {
97             HllHeader *hdr;
98             HllReaderSlot *reader_slots; /* HLL_READER_SLOTS entries */
99             void *base; /* mmap base */
100             size_t mmap_size;
101             char *path; /* backing file path (strdup'd) */
102             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
103             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
104             uint32_t cached_pid; /* getpid() cached at last slot claim */
105             uint32_t cached_fork_gen; /* hll_fork_gen value at last slot claim */
106             } HllHandle;
107              
108             /* ================================================================
109             * Futex-based write-preferring read-write lock
110             * with reader-slot dead-process recovery
111             * ================================================================ */
112              
113             #define HLL_RWLOCK_SPIN_LIMIT 32
114             #define HLL_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
115              
116 0           static inline void hll_rwlock_spin_pause(void) {
117             #if defined(__x86_64__) || defined(__i386__)
118 0           __asm__ volatile("pause" ::: "memory");
119             #elif defined(__aarch64__)
120             __asm__ volatile("yield" ::: "memory");
121             #else
122             __asm__ volatile("" ::: "memory");
123             #endif
124 0           }
125              
126             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
127             #define HLL_RWLOCK_WRITER_BIT 0x80000000U
128             #define HLL_RWLOCK_PID_MASK 0x7FFFFFFFU
129             #define HLL_RWLOCK_WR(pid) (HLL_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HLL_RWLOCK_PID_MASK))
130              
131             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
132             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
133             * lock-holder's PID is recycled to an unrelated live process before recovery
134             * runs, this reports "alive" and that slot's orphaned contribution is not
135             * reclaimed until the recycled process exits. Robust detection would require
136             * a per-slot process-start-time epoch (a header-layout/version change).
137             * Documented under "Crash Safety" in the POD. */
138 0           static inline int hll_pid_alive(uint32_t pid) {
139 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
140 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
141             }
142              
143             /* Force-recover a stale write lock left by a dead process.
144             * CAS to OUR pid to hold the lock while fixing shared state, then release.
145             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
146             * recovering process can detect and re-recover if we crash mid-recovery. */
147 0           static inline void hll_recover_stale_lock(HllHandle *h, uint32_t observed_rwlock) {
148 0           HllHeader *hdr = h->hdr;
149 0           uint32_t mypid = HLL_RWLOCK_WR((uint32_t)getpid());
150 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
151             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
152 0           return;
153             /* We now hold the write lock as mypid. No additional shared state needs
154             * repair here (this module has no seqlock); just release the lock. */
155 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
156 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
157 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
158             }
159              
160             static const struct timespec hll_lock_timeout = { HLL_LOCK_TIMEOUT_SEC, 0 };
161              
162             /* Process-global fork-generation counter. Incremented in the pthread_atfork
163             * child callback so every open handle detects a fork transition on the next
164             * lock call without paying a getpid() syscall on the hot path. */
165             static uint32_t hll_fork_gen = 1;
166             static pthread_once_t hll_atfork_once = PTHREAD_ONCE_INIT;
167 0           static void hll_on_fork_child(void) {
168 0           __atomic_add_fetch(&hll_fork_gen, 1, __ATOMIC_RELAXED);
169 0           }
170 2           static void hll_atfork_init(void) {
171 2           pthread_atfork(NULL, NULL, hll_on_fork_child);
172 2           }
173              
174             /* Ensure this process owns a reader slot. Called from the lock helpers so
175             * that fork()'d children pick up their own slot lazily instead of sharing
176             * the parent's. Hot-path is a single relaxed load + compare; only on a
177             * fork-generation mismatch do we touch getpid() and scan slots. */
178 420737           static inline void hll_claim_reader_slot(HllHandle *h) {
179 420737           uint32_t cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
180 420737 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
181 420720           return;
182             /* Cold path -- register the atfork hook once per process, then claim. */
183 17           pthread_once(&hll_atfork_once, hll_atfork_init);
184             /* Re-read after pthread_once: hll_on_fork_child may have bumped it. */
185 17           cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
186 17           uint32_t now_pid = (uint32_t)getpid();
187 17           h->cached_pid = now_pid;
188 17           h->cached_fork_gen = cur_gen;
189 17           h->my_slot_idx = UINT32_MAX;
190 17           uint32_t start = now_pid % HLL_READER_SLOTS;
191 19 50         for (uint32_t i = 0; i < HLL_READER_SLOTS; i++) {
192 19           uint32_t s = (start + i) % HLL_READER_SLOTS;
193 19           uint32_t expected = 0;
194 19 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
195             &expected, now_pid, 0,
196             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
197             /* Zero all mirror fields, not just subcount: a SIGKILL'd
198             * predecessor may have left waiters_parked/writers_parked
199             * non-zero, and hll_recover_dead_readers won't drain them
200             * once we own the slot (the CAS expects the dead PID). */
201 17           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
202 17           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
203 17           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
204 17           h->my_slot_idx = s;
205 17           return;
206             }
207             }
208             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
209             * tracking for this handle (lock still works; just no recovery). */
210             }
211              
212             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
213 0           static inline void hll_atomic_sub_cap(uint32_t *p, uint32_t sub) {
214 0 0         if (!sub) return;
215 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
216 0           for (;;) {
217 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
218 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
219             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
220 0           return;
221             }
222             }
223              
224             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
225             * contributions back to the global counters. A no-op if the slot was stolen
226             * by another recoverer or had no waiter contribution to drain.
227             *
228             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
229             * Between our CAS and a follow-up store, a new process could claim the
230             * slot and start populating these fields -- our stores would clobber its
231             * state. hll_claim_reader_slot zeros all three on every claim, so
232             * leaving stale values is harmless. */
233 0           static inline void hll_drain_dead_slot(HllHandle *h, uint32_t i, uint32_t pid) {
234 0           HllHeader *hdr = h->hdr;
235 0           uint32_t expected = pid;
236             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
237             * ACQUIRE syncs us with prior writes from the dead process to
238             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
239             * a plain RELAXED load before the CAS could miss those writes;
240             * loading them after the CAS keeps them inside the acquire window. */
241 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
242             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
243 0           return;
244 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
245 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
246 0 0         if (wp) hll_atomic_sub_cap(&hdr->rwlock_waiters, wp);
247 0 0         if (writp) hll_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
248             }
249              
250             /* Scan reader slots for dead-process recovery.
251             *
252             * For each dead PID with non-zero contributions to the shared rwlock,
253             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
254             * out so live processes don't have to wait for the slow per-op timeout
255             * decrement to drain it for them.
256             *
257             * For the main rwlock counter we use the "no live reader holds -> force-
258             * reset to 0" trick (precise) because per-process attribution of the
259             * subcount is racy across the inc-counter-then-inc-subcount window. */
260 0           static inline void hll_recover_dead_readers(HllHandle *h) {
261 0 0         if (!h->reader_slots) return;
262 0           HllHeader *hdr = h->hdr;
263 0           int any_live_reader = 0;
264 0           int found_dead_reader = 0;
265              
266             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
267             * contribution to lose) are wiped immediately to free the slot for
268             * future claimants and drain any orphan parked-waiter counters. Slots
269             * with dead pid and sc > 0 are left intact in this pass: if force-
270             * reset cannot fire (because a live reader is concurrently present),
271             * wiping the dead slot would lose the only record of its orphan
272             * rwlock contribution and strand writers permanently once the live
273             * reader releases. */
274 0 0         for (uint32_t i = 0; i < HLL_READER_SLOTS; i++) {
275 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
276 0 0         if (pid == 0) continue;
277 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
278 0 0         if (hll_pid_alive(pid)) {
279 0 0         if (sc > 0) any_live_reader = 1;
280 0           continue;
281             }
282 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
283 0           hll_drain_dead_slot(h, i, pid);
284             }
285              
286             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
287             * reset CAS FIRST, while the window since pass 1's last scan is
288             * still narrow (a handful of instructions, as in the original
289             * single-pass code). A new reader that started rdlock between
290             * pass 1's scan and the CAS will either:
291             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
292             * fails (cur mismatched), recovery yields and a future
293             * cycle retries; or
294             * (b) be still in the subcount-bump phase -- our CAS sees the
295             * stale cur and resets to 0; the new reader's subsequent CAS
296             * rwlock(0 -> 1) succeeds cleanly.
297             * Only after the CAS resolves do we wipe the deferred dead slots,
298             * keeping that work outside the race-sensitive window. */
299 0 0         if (found_dead_reader && !any_live_reader) {
    0          
300 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
301 0 0         if (cur > 0 && cur < HLL_RWLOCK_WRITER_BIT) {
    0          
302 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
303             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
304 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
305 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
306             }
307             }
308 0 0         for (uint32_t i = 0; i < HLL_READER_SLOTS; i++) {
309 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
310 0 0         if (pid == 0 || hll_pid_alive(pid)) continue;
    0          
311 0           hll_drain_dead_slot(h, i, pid);
312             }
313             }
314             }
315              
316             /* Inspect the lock word after a futex-wait timeout. If a dead writer
317             * holds it, force-recover the lock. Otherwise drain dead readers' shares
318             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
319             * branches -- identical recovery logic in both. */
320 0           static inline void hll_recover_after_timeout(HllHandle *h) {
321 0           HllHeader *hdr = h->hdr;
322 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
323 0 0         if (val >= HLL_RWLOCK_WRITER_BIT) {
324 0           uint32_t pid = val & HLL_RWLOCK_PID_MASK;
325 0 0         if (!hll_pid_alive(pid))
326 0           hll_recover_stale_lock(h, val);
327             } else {
328 0           hll_recover_dead_readers(h);
329             }
330 0           }
331              
332             /* Park/unpark helpers: bump the global waiter counters together with this
333             * process's mirrored slot counters so a wrlock-timeout recovery scan can
334             * attribute and reverse a dead PID's contribution. Kept paired to make
335             * accidental drift between global and per-slot counts impossible. */
336 0           static inline void hll_park_reader(HllHandle *h) {
337 0 0         if (h->my_slot_idx != UINT32_MAX)
338 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
339 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
340 0           }
341 0           static inline void hll_unpark_reader(HllHandle *h) {
342 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
343 0 0         if (h->my_slot_idx != UINT32_MAX)
344 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
345 0           }
346 0           static inline void hll_park_writer(HllHandle *h) {
347 0 0         if (h->my_slot_idx != UINT32_MAX) {
348 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
349 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
350             }
351 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
352 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
353 0           }
354 0           static inline void hll_unpark_writer(HllHandle *h) {
355 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
356 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
357 0 0         if (h->my_slot_idx != UINT32_MAX) {
358 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
359 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
360             }
361 0           }
362              
363 23           static inline void hll_rwlock_rdlock(HllHandle *h) {
364 23           hll_claim_reader_slot(h);
365 23           HllHeader *hdr = h->hdr;
366 23           uint32_t *lock = &hdr->rwlock;
367 23           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
368             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
369             * a concurrent writer-side recovery scan that sees our PID alive with
370             * subcount > 0 will (correctly) defer force-reset, even while we are
371             * still spinning trying to win the rwlock CAS. Without this, a reader
372             * killed between rwlock CAS-success and subcount++ would let recovery
373             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
374             * our eventual rdunlock dec. */
375 23 50         if (h->my_slot_idx != UINT32_MAX)
376 23           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
377 23           for (int spin = 0; ; spin++) {
378 23           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
379             /* Write-preferring: when lock is free (cur==0) and writers are
380             * waiting, yield to let the writer acquire. When readers are
381             * already active (cur>=1), new readers may join freely. */
382 23 50         if (cur > 0 && cur < HLL_RWLOCK_WRITER_BIT) {
    0          
383 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
384             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
385 23           return;
386 23 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
387 23 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
388             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
389 23           return;
390             }
391 0 0         if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
392 0           hll_rwlock_spin_pause();
393 0           continue;
394             }
395 0           hll_park_reader(h);
396 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
397             /* Sleep when write-locked OR when yielding to waiting writers */
398 0 0         if (cur >= HLL_RWLOCK_WRITER_BIT || cur == 0) {
    0          
399 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
400             &hll_lock_timeout, NULL, 0);
401 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
402 0           hll_unpark_reader(h);
403 0           hll_recover_after_timeout(h);
404 0           spin = 0;
405 0           continue;
406             }
407             }
408 0           hll_unpark_reader(h);
409 0           spin = 0;
410             }
411             }
412              
413 23           static inline void hll_rwlock_rdunlock(HllHandle *h) {
414 23           HllHeader *hdr = h->hdr;
415             /* Release the shared counter BEFORE dropping our subcount so that
416             * "any live PID with subcount > 0" is a reliable in-flight indicator
417             * for the writer-side recovery scan. Inverting these would create a
418             * window where we still own a unit of rwlock but our slot subcount is
419             * 0, letting recovery force-reset rwlock underneath us. */
420 23           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
421 23 50         if (h->my_slot_idx != UINT32_MAX)
422 23           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
423 23 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
424 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
425 23           }
426              
427 420714           static inline void hll_rwlock_wrlock(HllHandle *h) {
428 420714           hll_claim_reader_slot(h); /* refresh cached_pid across fork */
429 420714           HllHeader *hdr = h->hdr;
430 420714           uint32_t *lock = &hdr->rwlock;
431             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
432             * any crash window between acquiring the lock and storing the owner. */
433 420714           uint32_t mypid = HLL_RWLOCK_WR(h->cached_pid);
434 420714           for (int spin = 0; ; spin++) {
435 420714           uint32_t expected = 0;
436 420714 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
437             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
438 420714           return;
439 0 0         if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
440 0           hll_rwlock_spin_pause();
441 0           continue;
442             }
443 0           hll_park_writer(h);
444 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
445 0 0         if (cur != 0) {
446 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
447             &hll_lock_timeout, NULL, 0);
448 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
449 0           hll_unpark_writer(h);
450 0           hll_recover_after_timeout(h);
451 0           spin = 0;
452 0           continue;
453             }
454             }
455 0           hll_unpark_writer(h);
456 0           spin = 0;
457             }
458             }
459              
460 420714           static inline void hll_rwlock_wrunlock(HllHandle *h) {
461 420714           HllHeader *hdr = h->hdr;
462 420714           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
463 420714 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
464 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
465 420714           }
466              
467             /* ================================================================
468             * Layout math + create / open / destroy
469             *
470             * Layout: Header -> reader_slots[1024] -> regs[m]
471             * ================================================================ */
472              
473             /* Single source of truth for the mmap region layout offsets. */
474             typedef struct { uint64_t reader_slots, regs; } HllLayout;
475              
476 48           static inline HllLayout hll_layout(void) {
477             HllLayout L;
478 48           L.reader_slots = sizeof(HllHeader);
479 48           L.regs = L.reader_slots + (uint64_t)HLL_READER_SLOTS * sizeof(HllReaderSlot);
480 48           L.regs = (L.regs + 7) & ~(uint64_t)7; /* 8-byte align the register array */
481 48           return L;
482             }
483              
484 25           static inline uint64_t hll_total_size(uint32_t m) {
485 25           HllLayout L = hll_layout();
486 25           return L.regs + (uint64_t)m;
487             }
488              
489 21           static inline void hll_init_header(void *base, uint32_t precision, uint32_t m, uint64_t total) {
490 21           HllLayout L = hll_layout();
491 21           HllHeader *hdr = (HllHeader *)base;
492             /* Explicitly zero the header + reader-slot region (lock-recovery state, like
493             intern.h); the register array relies on the fresh mapping being OS zero-filled. */
494 21           memset(base, 0, (size_t)L.regs);
495 21           hdr->magic = HLL_MAGIC;
496 21           hdr->version = HLL_VERSION;
497 21           hdr->precision = precision;
498 21           hdr->m = m;
499 21           hdr->total_size = total;
500 21           hdr->reader_slots_off = L.reader_slots;
501 21           hdr->regs_off = L.regs;
502 21           __atomic_thread_fence(__ATOMIC_SEQ_CST);
503 21           }
504              
505 428732           static inline uint8_t *hll_regs(HllHandle *h) {
506 428732           return (uint8_t *)((char *)h->base + h->hdr->regs_off);
507             }
508              
509 23           static inline HllHandle *hll_setup(void *base, size_t map_size,
510             const char *path, int backing_fd) {
511 23           HllHeader *hdr = (HllHeader *)base;
512 23           HllHandle *h = (HllHandle *)calloc(1, sizeof(HllHandle));
513 23 50         if (!h) {
514 0           munmap(base, map_size);
515 0 0         if (backing_fd >= 0) close(backing_fd);
516 0           return NULL;
517             }
518 23           h->hdr = hdr;
519 23           h->base = base;
520 23           h->reader_slots = (HllReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
521 23           h->mmap_size = map_size;
522 23 100         h->path = path ? strdup(path) : NULL;
523 23           h->backing_fd = backing_fd;
524 23           h->my_slot_idx = UINT32_MAX;
525 23           return h;
526             }
527              
528             /* Validate a mapped header (shared by hll_create reopen and hll_open_fd). */
529 2           static inline int hll_validate_header(const HllHeader *hdr, uint64_t file_size) {
530 2 50         if (hdr->magic != HLL_MAGIC) return 0;
531 2 50         if (hdr->version != HLL_VERSION) return 0;
532 2 50         if (hdr->precision < HLL_MIN_PRECISION || hdr->precision > HLL_MAX_PRECISION) return 0;
    50          
533 2 50         if (hdr->m != (1u << hdr->precision)) return 0;
534 2 50         if (hdr->total_size != file_size) return 0;
535 2 50         if (hdr->total_size != hll_total_size(hdr->m)) return 0;
536 2           HllLayout L = hll_layout();
537 2 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
538 2 50         if (hdr->regs_off != L.regs) return 0;
539 2           return 1;
540             }
541              
542             /* validate the precision argument */
543 23           static int hll_validate_create_args(uint32_t precision, uint32_t *m_out, char *errbuf) {
544 23 50         if (errbuf) errbuf[0] = '\0';
545 23 50         if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION) {
    50          
546 0 0         HLL_ERR("precision must be between %d and %d", HLL_MIN_PRECISION, HLL_MAX_PRECISION);
547 0           return 0;
548             }
549 23           *m_out = 1u << precision;
550 23           return 1;
551             }
552              
553 21           static HllHandle *hll_create(const char *path, uint32_t precision, char *errbuf) {
554             uint32_t m;
555 21 50         if (!hll_validate_create_args(precision, &m, errbuf)) return NULL;
556              
557 21           uint64_t total = hll_total_size(m);
558 21           int anonymous = (path == NULL);
559 21           int fd = -1;
560             size_t map_size;
561             void *base;
562              
563 21 100         if (anonymous) {
564 16           map_size = (size_t)total;
565 16           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
566 16 50         if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
567             } else {
568 5           fd = open(path, O_RDWR|O_CREAT, 0666);
569 7 50         if (fd < 0) { HLL_ERR("open: %s", strerror(errno)); return NULL; }
    0          
570 5 50         if (flock(fd, LOCK_EX) < 0) { HLL_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
571             struct stat st;
572 5 50         if (fstat(fd, &st) < 0) { HLL_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
573 5           int is_new = (st.st_size == 0);
574 5 100         if (!is_new && (uint64_t)st.st_size < sizeof(HllHeader)) {
    100          
575 1 50         HLL_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
576 1           flock(fd, LOCK_UN); close(fd); return NULL;
577             }
578 4 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
579 0 0         HLL_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
580             }
581 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
582 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
583 4 50         if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
584 4 100         if (!is_new) {
585 1 50         if (!hll_validate_header((HllHeader *)base, (uint64_t)st.st_size)) {
586 0 0         HLL_ERR("invalid HyperLogLog file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
587             }
588 1           flock(fd, LOCK_UN); close(fd);
589 1           return hll_setup(base, map_size, path, -1);
590             }
591             }
592 19           hll_init_header(base, precision, m, total);
593 19 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
594 19           return hll_setup(base, map_size, path, -1);
595             }
596              
597 2           static HllHandle *hll_create_memfd(const char *name, uint32_t precision, char *errbuf) {
598             uint32_t m;
599 2 50         if (!hll_validate_create_args(precision, &m, errbuf)) return NULL;
600              
601 2           uint64_t total = hll_total_size(m);
602 2 100         int fd = memfd_create(name ? name : "hll", MFD_CLOEXEC | MFD_ALLOW_SEALING);
603 2 50         if (fd < 0) { HLL_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
604 2 50         if (ftruncate(fd, (off_t)total) < 0) {
605 0 0         HLL_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
606             }
607 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
608 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
609 2 50         if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
610 2           hll_init_header(base, precision, m, total);
611 2           return hll_setup(base, (size_t)total, NULL, fd);
612             }
613              
614 1           static HllHandle *hll_open_fd(int fd, char *errbuf) {
615 1 50         if (errbuf) errbuf[0] = '\0';
616             struct stat st;
617 1 50         if (fstat(fd, &st) < 0) { HLL_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
618 1 50         if ((uint64_t)st.st_size < sizeof(HllHeader)) { HLL_ERR("too small"); return NULL; }
    0          
619 1           size_t ms = (size_t)st.st_size;
620 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
621 1 50         if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
622 1 50         if (!hll_validate_header((HllHeader *)base, (uint64_t)st.st_size)) {
623 0 0         HLL_ERR("invalid HyperLogLog table"); munmap(base, ms); return NULL;
624             }
625 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
626 1 50         if (myfd < 0) { HLL_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
627 1           return hll_setup(base, ms, NULL, myfd);
628             }
629              
630 23           static void hll_destroy(HllHandle *h) {
631 23 50         if (!h) return;
632 23 100         if (h->backing_fd >= 0) close(h->backing_fd);
633 23 50         if (h->base) munmap(h->base, h->mmap_size);
634 23           free(h->path);
635 23           free(h);
636             }
637              
638 2           static inline int hll_msync(HllHandle *h) {
639 2 50         if (!h || !h->base) return 0;
    50          
640 2           return msync(h->base, h->mmap_size, MS_SYNC);
641             }
642              
643             /* ================================================================
644             * HyperLogLog operations (callers hold the lock)
645             * ================================================================ */
646              
647             /* add one item; returns 1 if a register increased, else 0 */
648 428706           static int hll_add_locked(HllHandle *h, const void *item, size_t len) {
649 428706           uint64_t x = XXH3_64bits(item, len);
650 428706           uint32_t p = h->hdr->precision;
651 428706           uint32_t idx = (uint32_t)(x >> (64 - p)); /* top p bits = register index */
652 428706           uint64_t rest = (x << p) | (1ULL << (p - 1)); /* guard bit so clz terminates */
653 428706           uint8_t rho = (uint8_t)(__builtin_clzll(rest) + 1);
654 428706           uint8_t *regs = hll_regs(h);
655 428706 100         if (regs[idx] < rho) { regs[idx] = rho; return 1; }
656 292665           return 0;
657             }
658              
659             /* estimate; returns a double */
660 21           static double hll_count_locked(HllHandle *h) {
661 21           uint32_t m = h->hdr->m;
662 21           uint8_t *regs = hll_regs(h);
663 21           double sum = 0.0;
664 21           uint32_t V = 0;
665 368661 100         for (uint32_t j = 0; j < m; j++) {
666 368640           sum += ldexp(1.0, -(int)regs[j]);
667 368640           V += (regs[j] == 0);
668             }
669             double alpha;
670 21 50         if (m == 16) alpha = 0.673;
671 21 50         else if (m == 32) alpha = 0.697;
672 21 50         else if (m == 64) alpha = 0.709;
673 21           else alpha = 0.7213 / (1.0 + 1.079 / (double)m);
674 21           double E = alpha * (double)m * (double)m / sum;
675 21 100         if (E <= 2.5 * (double)m && V > 0)
    50          
676 17           E = (double)m * log((double)m / (double)V); /* linear counting (small range) */
677 21           return E;
678             }
679              
680             /* merge src registers into dst (caller guarantees equal m); register-wise max */
681 2           static void hll_merge_regs(HllHandle *dst, const uint8_t *src_regs) {
682 2           uint32_t m = dst->hdr->m;
683 2           uint8_t *regs = hll_regs(dst);
684 32770 100         for (uint32_t j = 0; j < m; j++)
685 32768 100         if (src_regs[j] > regs[j]) regs[j] = src_regs[j];
686 2           }
687              
688             /* reset all registers to 0 (caller holds the write lock) */
689 1           static inline void hll_clear_locked(HllHandle *h) {
690 1           memset(hll_regs(h), 0, (size_t)h->hdr->m);
691 1           }
692              
693             #endif /* HLL_H */