File Coverage

cuckoo.h
Criterion Covered Total %
statement 254 382 66.4
branch 115 286 40.2
condition n/a
subroutine n/a
pod n/a
total 369 668 55.2


line stmt bran cond sub pod time code
1             /*
2             * cuckoo.h -- Shared-memory Cuckoo filter for Linux
3             *
4             * Approximate set membership WITH delete: tells you whether an item is
5             * "definitely not" or "probably" in the set, in a fixed amount of memory, with
6             * a tiny false-positive rate -- and unlike a Bloom filter it supports removal.
7             * Each item is hashed once (XXH3-128); a 16-bit fingerprint plus two candidate
8             * buckets (partial-key cuckoo hashing) drive a bucketed open-addressed table of
9             * CF_SLOTS fingerprint slots per bucket. The table lives in a shared mapping so
10             * several processes share one filter; a write-preferring futex rwlock with
11             * reader-slot dead-process recovery guards mutation. The filter has a bounded
12             * capacity: add returns false (a true no-op) when the table is full.
13             *
14             * Layout: Header -> reader_slots[1024] -> slots[num_buckets * CF_SLOTS]
15             */
16              
17             #ifndef CUCKOO_H
18             #define CUCKOO_H
19              
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37              
38             #define XXH_INLINE_ALL
39             #include "xxhash.h"
40              
41             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
42             #error "cuckoo.h: requires little-endian architecture"
43             #endif
44              
45              
46             /* ================================================================
47             * Constants
48             * ================================================================ */
49              
50             #define CF_MAGIC 0x4B4F4F43U /* "COOK" (little-endian) */
51             #define CF_VERSION 1
52             #define CF_ERR_BUFLEN 256
53             #define CF_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
54             #define CF_SLOTS 4 /* fingerprint slots per bucket */
55             #define CF_MAX_KICKS 500 /* cuckoo eviction bound before declaring the table full */
56             #define CF_MIN_BUCKETS 2 /* floor on the bucket count (power of two) */
57             #define CF_MAX_BUCKETS 0x4000000000ULL /* 2^38 buckets cap (2^38*4*2 = 2 TiB slot array) */
58              
59             #define CF_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, CF_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
60              
61             /* ================================================================
62             * Structs
63             * ================================================================ */
64              
65             /* Per-process slot for dead-process recovery. Each shared rwlock counter
66             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
67             * is mirrored here so a wrlock timeout can attribute and reverse a dead
68             * process's contribution instead of waiting for the slow per-op timeout
69             * drain. */
70             typedef struct {
71             uint32_t pid; /* 0 = unclaimed */
72             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
73             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
74             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
75             } CfReaderSlot;
76              
77             struct CfHeader {
78             uint32_t magic, version; /* 0,4 */
79             uint32_t _pad0; /* 8 */
80             uint32_t _pad1; /* 12 */
81             uint64_t num_buckets; /* 16 bucket count (power of two) */
82             uint64_t bucket_mask; /* 24 num_buckets - 1 (bucket index mask) */
83             uint64_t capacity; /* 32 configured item capacity (for stats) */
84             uint64_t count; /* 40 live fingerprint count (maintained on add/remove) */
85             uint64_t rng_state; /* 48 xorshift64 state for eviction victim choice */
86             uint64_t total_size; /* 56 */
87             uint64_t reader_slots_off; /* 64 */
88             uint64_t slots_off; /* 72 */
89             uint32_t rwlock; /* 80 */
90             uint32_t rwlock_waiters; /* 84 */
91             uint32_t rwlock_writers_waiting; /* 88 */
92             uint32_t _pad2; /* 92 */
93             uint64_t stat_ops; /* 96 */
94             uint8_t _pad[152]; /* 104..255 */
95             };
96             typedef struct CfHeader CfHeader;
97              
98             _Static_assert(sizeof(CfHeader) == 256, "CfHeader must be 256 bytes");
99              
100             /* ---- Process-local handle ---- */
101              
102             typedef struct CfHandle {
103             CfHeader *hdr;
104             CfReaderSlot *reader_slots; /* CF_READER_SLOTS entries */
105             void *base; /* mmap base */
106             size_t mmap_size;
107             char *path; /* backing file path (strdup'd) */
108             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
109             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
110             uint32_t cached_pid; /* getpid() cached at last slot claim */
111             uint32_t cached_fork_gen; /* cf_fork_gen value at last slot claim */
112             } CfHandle;
113              
114             /* ================================================================
115             * Futex-based write-preferring read-write lock
116             * with reader-slot dead-process recovery
117             * ================================================================ */
118              
119             #define CF_RWLOCK_SPIN_LIMIT 32
120             #define CF_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
121              
122 0           static inline void cf_rwlock_spin_pause(void) {
123             #if defined(__x86_64__) || defined(__i386__)
124 0           __asm__ volatile("pause" ::: "memory");
125             #elif defined(__aarch64__)
126             __asm__ volatile("yield" ::: "memory");
127             #else
128             __asm__ volatile("" ::: "memory");
129             #endif
130 0           }
131              
132             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
133             #define CF_RWLOCK_WRITER_BIT 0x80000000U
134             #define CF_RWLOCK_PID_MASK 0x7FFFFFFFU
135             #define CF_RWLOCK_WR(pid) (CF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & CF_RWLOCK_PID_MASK))
136              
137             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
138             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
139             * lock-holder's PID is recycled to an unrelated live process before recovery
140             * runs, this reports "alive" and that slot's orphaned contribution is not
141             * reclaimed until the recycled process exits. Robust detection would require
142             * a per-slot process-start-time epoch (a header-layout/version change).
143             * Documented under "Crash Safety" in the POD. */
144 0           static inline int cf_pid_alive(uint32_t pid) {
145 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
146 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
147             }
148              
149             /* Force-recover a stale write lock left by a dead process.
150             * CAS to OUR pid to hold the lock while fixing shared state, then release.
151             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
152             * recovering process can detect and re-recover if we crash mid-recovery. */
153 0           static inline void cf_recover_stale_lock(CfHandle *h, uint32_t observed_rwlock) {
154 0           CfHeader *hdr = h->hdr;
155 0           uint32_t mypid = CF_RWLOCK_WR((uint32_t)getpid());
156 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
157             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
158 0           return;
159             /* We now hold the write lock as mypid. No additional shared state needs
160             * repair here (this module has no seqlock); just release the lock. */
161 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
162 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
163 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
164             }
165              
166             static const struct timespec cf_lock_timeout = { CF_LOCK_TIMEOUT_SEC, 0 };
167              
168             /* Process-global fork-generation counter. Incremented in the pthread_atfork
169             * child callback so every open handle detects a fork transition on the next
170             * lock call without paying a getpid() syscall on the hot path. */
171             static uint32_t cf_fork_gen = 1;
172             static pthread_once_t cf_atfork_once = PTHREAD_ONCE_INIT;
173 0           static void cf_on_fork_child(void) {
174 0           __atomic_add_fetch(&cf_fork_gen, 1, __ATOMIC_RELAXED);
175 0           }
176 2           static void cf_atfork_init(void) {
177 2           pthread_atfork(NULL, NULL, cf_on_fork_child);
178 2           }
179              
180             /* Ensure this process owns a reader slot. Called from the lock helpers so
181             * that fork()'d children pick up their own slot lazily instead of sharing
182             * the parent's. Hot-path is a single relaxed load + compare; only on a
183             * fork-generation mismatch do we touch getpid() and scan slots. */
184 52674           static inline void cf_claim_reader_slot(CfHandle *h) {
185 52674           uint32_t cur_gen = __atomic_load_n(&cf_fork_gen, __ATOMIC_RELAXED);
186 52674 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
187 52656           return;
188             /* Cold path -- register the atfork hook once per process, then claim. */
189 18           pthread_once(&cf_atfork_once, cf_atfork_init);
190             /* Re-read after pthread_once: cf_on_fork_child may have bumped it. */
191 18           cur_gen = __atomic_load_n(&cf_fork_gen, __ATOMIC_RELAXED);
192 18           uint32_t now_pid = (uint32_t)getpid();
193 18           h->cached_pid = now_pid;
194 18           h->cached_fork_gen = cur_gen;
195 18           h->my_slot_idx = UINT32_MAX;
196 18           uint32_t start = now_pid % CF_READER_SLOTS;
197 20 50         for (uint32_t i = 0; i < CF_READER_SLOTS; i++) {
198 20           uint32_t s = (start + i) % CF_READER_SLOTS;
199 20           uint32_t expected = 0;
200 20 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
201             &expected, now_pid, 0,
202             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
203             /* Zero all mirror fields, not just subcount: a SIGKILL'd
204             * predecessor may have left waiters_parked/writers_parked
205             * non-zero, and cf_recover_dead_readers won't drain them
206             * once we own the slot (the CAS expects the dead PID). */
207 18           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
208 18           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
209 18           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
210 18           h->my_slot_idx = s;
211 18           return;
212             }
213             }
214             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
215             * tracking for this handle (lock still works; just no recovery). */
216             }
217              
218             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
219 0           static inline void cf_atomic_sub_cap(uint32_t *p, uint32_t sub) {
220 0 0         if (!sub) return;
221 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
222 0           for (;;) {
223 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
224 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
225             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
226 0           return;
227             }
228             }
229              
230             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
231             * contributions back to the global counters. A no-op if the slot was stolen
232             * by another recoverer or had no waiter contribution to drain.
233             *
234             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
235             * Between our CAS and a follow-up store, a new process could claim the
236             * slot and start populating these fields -- our stores would clobber its
237             * state. cf_claim_reader_slot zeros all three on every claim, so
238             * leaving stale values is harmless. */
239 0           static inline void cf_drain_dead_slot(CfHandle *h, uint32_t i, uint32_t pid) {
240 0           CfHeader *hdr = h->hdr;
241 0           uint32_t expected = pid;
242             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
243             * ACQUIRE syncs us with prior writes from the dead process to
244             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
245             * a plain RELAXED load before the CAS could miss those writes;
246             * loading them after the CAS keeps them inside the acquire window. */
247 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
248             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
249 0           return;
250 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
251 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
252 0 0         if (wp) cf_atomic_sub_cap(&hdr->rwlock_waiters, wp);
253 0 0         if (writp) cf_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
254             }
255              
256             /* Scan reader slots for dead-process recovery.
257             *
258             * For each dead PID with non-zero contributions to the shared rwlock,
259             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
260             * out so live processes don't have to wait for the slow per-op timeout
261             * decrement to drain it for them.
262             *
263             * For the main rwlock counter we use the "no live reader holds -> force-
264             * reset to 0" trick (precise) because per-process attribution of the
265             * subcount is racy across the inc-counter-then-inc-subcount window. */
266 0           static inline void cf_recover_dead_readers(CfHandle *h) {
267 0 0         if (!h->reader_slots) return;
268 0           CfHeader *hdr = h->hdr;
269 0           int any_live_reader = 0;
270 0           int found_dead_reader = 0;
271              
272             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
273             * contribution to lose) are wiped immediately to free the slot for
274             * future claimants and drain any orphan parked-waiter counters. Slots
275             * with dead pid and sc > 0 are left intact in this pass: if force-
276             * reset cannot fire (because a live reader is concurrently present),
277             * wiping the dead slot would lose the only record of its orphan
278             * rwlock contribution and strand writers permanently once the live
279             * reader releases. */
280 0 0         for (uint32_t i = 0; i < CF_READER_SLOTS; i++) {
281 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
282 0 0         if (pid == 0) continue;
283 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
284 0 0         if (cf_pid_alive(pid)) {
285 0 0         if (sc > 0) any_live_reader = 1;
286 0           continue;
287             }
288 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
289 0           cf_drain_dead_slot(h, i, pid);
290             }
291              
292             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
293             * reset CAS FIRST, while the window since pass 1's last scan is
294             * still narrow (a handful of instructions, as in the original
295             * single-pass code). A new reader that started rdlock between
296             * pass 1's scan and the CAS will either:
297             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
298             * fails (cur mismatched), recovery yields and a future
299             * cycle retries; or
300             * (b) be still in the subcount-bump phase -- our CAS sees the
301             * stale cur and resets to 0; the new reader's subsequent CAS
302             * rwlock(0 -> 1) succeeds cleanly.
303             * Only after the CAS resolves do we wipe the deferred dead slots,
304             * keeping that work outside the race-sensitive window. */
305 0 0         if (found_dead_reader && !any_live_reader) {
    0          
306 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
307 0 0         if (cur > 0 && cur < CF_RWLOCK_WRITER_BIT) {
    0          
308 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
309             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
310 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
311 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
312             }
313             }
314 0 0         for (uint32_t i = 0; i < CF_READER_SLOTS; i++) {
315 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
316 0 0         if (pid == 0 || cf_pid_alive(pid)) continue;
    0          
317 0           cf_drain_dead_slot(h, i, pid);
318             }
319             }
320             }
321              
322             /* Inspect the lock word after a futex-wait timeout. If a dead writer
323             * holds it, force-recover the lock. Otherwise drain dead readers' shares
324             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
325             * branches -- identical recovery logic in both. */
326 0           static inline void cf_recover_after_timeout(CfHandle *h) {
327 0           CfHeader *hdr = h->hdr;
328 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
329 0 0         if (val >= CF_RWLOCK_WRITER_BIT) {
330 0           uint32_t pid = val & CF_RWLOCK_PID_MASK;
331 0 0         if (!cf_pid_alive(pid))
332 0           cf_recover_stale_lock(h, val);
333             } else {
334 0           cf_recover_dead_readers(h);
335             }
336 0           }
337              
338             /* Park/unpark helpers: bump the global waiter counters together with this
339             * process's mirrored slot counters so a wrlock-timeout recovery scan can
340             * attribute and reverse a dead PID's contribution. Kept paired to make
341             * accidental drift between global and per-slot counts impossible. */
342 0           static inline void cf_park_reader(CfHandle *h) {
343 0 0         if (h->my_slot_idx != UINT32_MAX)
344 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
345 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
346 0           }
347 0           static inline void cf_unpark_reader(CfHandle *h) {
348 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
349 0 0         if (h->my_slot_idx != UINT32_MAX)
350 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
351 0           }
352 0           static inline void cf_park_writer(CfHandle *h) {
353 0 0         if (h->my_slot_idx != UINT32_MAX) {
354 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
355 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
356             }
357 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
358 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
359 0           }
360 0           static inline void cf_unpark_writer(CfHandle *h) {
361 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
362 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
363 0 0         if (h->my_slot_idx != UINT32_MAX) {
364 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
365 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
366             }
367 0           }
368              
369 26841           static inline void cf_rwlock_rdlock(CfHandle *h) {
370 26841           cf_claim_reader_slot(h);
371 26841           CfHeader *hdr = h->hdr;
372 26841           uint32_t *lock = &hdr->rwlock;
373 26841           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
374             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
375             * a concurrent writer-side recovery scan that sees our PID alive with
376             * subcount > 0 will (correctly) defer force-reset, even while we are
377             * still spinning trying to win the rwlock CAS. Without this, a reader
378             * killed between rwlock CAS-success and subcount++ would let recovery
379             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
380             * our eventual rdunlock dec. */
381 26841 50         if (h->my_slot_idx != UINT32_MAX)
382 26841           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
383 26841           for (int spin = 0; ; spin++) {
384 26841           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
385             /* Write-preferring: when lock is free (cur==0) and writers are
386             * waiting, yield to let the writer acquire. When readers are
387             * already active (cur>=1), new readers may join freely. */
388 26841 50         if (cur > 0 && cur < CF_RWLOCK_WRITER_BIT) {
    0          
389 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
390             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
391 26841           return;
392 26841 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
393 26841 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
394             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
395 26841           return;
396             }
397 0 0         if (__builtin_expect(spin < CF_RWLOCK_SPIN_LIMIT, 1)) {
398 0           cf_rwlock_spin_pause();
399 0           continue;
400             }
401 0           cf_park_reader(h);
402 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
403             /* Sleep when write-locked OR when yielding to waiting writers */
404 0 0         if (cur >= CF_RWLOCK_WRITER_BIT || cur == 0) {
    0          
405 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
406             &cf_lock_timeout, NULL, 0);
407 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
408 0           cf_unpark_reader(h);
409 0           cf_recover_after_timeout(h);
410 0           spin = 0;
411 0           continue;
412             }
413             }
414 0           cf_unpark_reader(h);
415 0           spin = 0;
416             }
417             }
418              
419 26841           static inline void cf_rwlock_rdunlock(CfHandle *h) {
420 26841           CfHeader *hdr = h->hdr;
421             /* Release the shared counter BEFORE dropping our subcount so that
422             * "any live PID with subcount > 0" is a reliable in-flight indicator
423             * for the writer-side recovery scan. Inverting these would create a
424             * window where we still own a unit of rwlock but our slot subcount is
425             * 0, letting recovery force-reset rwlock underneath us. */
426 26841           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
427 26841 50         if (h->my_slot_idx != UINT32_MAX)
428 26841           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
429 26841 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
430 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
431 26841           }
432              
433 25833           static inline void cf_rwlock_wrlock(CfHandle *h) {
434 25833           cf_claim_reader_slot(h); /* refresh cached_pid across fork */
435 25833           CfHeader *hdr = h->hdr;
436 25833           uint32_t *lock = &hdr->rwlock;
437             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
438             * any crash window between acquiring the lock and storing the owner. */
439 25833           uint32_t mypid = CF_RWLOCK_WR(h->cached_pid);
440 25833           for (int spin = 0; ; spin++) {
441 25833           uint32_t expected = 0;
442 25833 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
443             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
444 25833           return;
445 0 0         if (__builtin_expect(spin < CF_RWLOCK_SPIN_LIMIT, 1)) {
446 0           cf_rwlock_spin_pause();
447 0           continue;
448             }
449 0           cf_park_writer(h);
450 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
451 0 0         if (cur != 0) {
452 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
453             &cf_lock_timeout, NULL, 0);
454 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
455 0           cf_unpark_writer(h);
456 0           cf_recover_after_timeout(h);
457 0           spin = 0;
458 0           continue;
459             }
460             }
461 0           cf_unpark_writer(h);
462 0           spin = 0;
463             }
464             }
465              
466 25833           static inline void cf_rwlock_wrunlock(CfHandle *h) {
467 25833           CfHeader *hdr = h->hdr;
468 25833           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
469 25833 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
470 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
471 25833           }
472              
473             /* ================================================================
474             * Layout math + create / open / destroy
475             *
476             * Layout: Header -> reader_slots[1024] -> slots[num_buckets * CF_SLOTS]
477             * ================================================================ */
478              
479             /* Single source of truth for the mmap region layout offsets. */
480             typedef struct { uint64_t reader_slots, slots; } CfLayout;
481              
482 46           static inline CfLayout cf_layout(void) {
483             CfLayout L;
484 46           L.reader_slots = sizeof(CfHeader);
485 46           L.slots = L.reader_slots + (uint64_t)CF_READER_SLOTS * sizeof(CfReaderSlot);
486 46           L.slots = (L.slots + 7) & ~(uint64_t)7; /* 8-byte align the slot array */
487 46           return L;
488             }
489              
490 24           static inline uint64_t cf_total_size(uint64_t num_buckets) {
491 24           CfLayout L = cf_layout();
492             /* num_buckets * CF_SLOTS uint16_t fingerprint slots */
493 24           return L.slots + num_buckets * (uint64_t)CF_SLOTS * sizeof(uint16_t);
494             }
495              
496             /* round v up to the next power of two (64-bit), with a floor of CF_MIN_BUCKETS */
497 22           static inline uint64_t cf_next_pow2_u64(uint64_t v) {
498 22 100         if (v <= CF_MIN_BUCKETS) return CF_MIN_BUCKETS;
499 21           return 1ULL << (64 - __builtin_clzll(v - 1));
500             }
501              
502 20           static inline void cf_init_header(void *base, uint64_t num_buckets,
503             uint64_t capacity, uint64_t total) {
504 20           CfLayout L = cf_layout();
505 20           CfHeader *hdr = (CfHeader *)base;
506             /* Zero the header + reader-slot region (lock-recovery state); the slot
507             array relies on the fresh mapping being OS zero-filled (0 = empty slot). */
508 20           memset(base, 0, (size_t)L.slots);
509 20           hdr->magic = CF_MAGIC;
510 20           hdr->version = CF_VERSION;
511 20           hdr->num_buckets = num_buckets;
512 20           hdr->bucket_mask = num_buckets - 1;
513 20           hdr->capacity = capacity;
514 20           hdr->count = 0;
515             /* Deterministic, non-zero seed for the eviction-victim RNG. */
516 20           hdr->rng_state = 0x9e3779b97f4a7c15ULL ^ (capacity * 0x2545F4914F6CDD1DULL);
517 20 50         if (hdr->rng_state == 0) hdr->rng_state = 1;
518 20           hdr->total_size = total;
519 20           hdr->reader_slots_off = L.reader_slots;
520 20           hdr->slots_off = L.slots;
521 20           __atomic_thread_fence(__ATOMIC_SEQ_CST);
522 20           }
523              
524 97419           static inline uint16_t *cf_slots(CfHandle *h) {
525 97419           return (uint16_t *)((char *)h->base + h->hdr->slots_off);
526             }
527              
528 22           static inline CfHandle *cf_setup(void *base, size_t map_size,
529             const char *path, int backing_fd) {
530 22           CfHeader *hdr = (CfHeader *)base;
531 22           CfHandle *h = (CfHandle *)calloc(1, sizeof(CfHandle));
532 22 50         if (!h) {
533 0           munmap(base, map_size);
534 0 0         if (backing_fd >= 0) close(backing_fd);
535 0           return NULL;
536             }
537 22           h->hdr = hdr;
538 22           h->base = base;
539 22           h->reader_slots = (CfReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
540 22           h->mmap_size = map_size;
541 22 100         h->path = path ? strdup(path) : NULL;
542 22           h->backing_fd = backing_fd;
543 22           h->my_slot_idx = UINT32_MAX;
544 22           return h;
545             }
546              
547             /* Validate a mapped header (shared by cf_create reopen and cf_open_fd). */
548 2           static inline int cf_validate_header(const CfHeader *hdr, uint64_t file_size) {
549 2 50         if (hdr->magic != CF_MAGIC) return 0;
550 2 50         if (hdr->version != CF_VERSION) return 0;
551 2 50         if (hdr->num_buckets < CF_MIN_BUCKETS || hdr->num_buckets > CF_MAX_BUCKETS) return 0;
    50          
552 2 50         if ((hdr->num_buckets & (hdr->num_buckets - 1)) != 0) return 0; /* power of two */
553 2 50         if (hdr->bucket_mask != hdr->num_buckets - 1) return 0;
554 2 50         if (hdr->capacity == 0) return 0;
555 2 50         if (hdr->rng_state == 0) return 0;
556 2 50         if (hdr->count > hdr->num_buckets * (uint64_t)CF_SLOTS) return 0;
557 2 50         if (hdr->total_size != file_size) return 0;
558 2 50         if (hdr->total_size != cf_total_size(hdr->num_buckets)) return 0;
559 2           CfLayout L = cf_layout();
560 2 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
561 2 50         if (hdr->slots_off != L.slots) return 0;
562 2           return 1;
563             }
564              
565             /* validate args + compute the geometry (num_buckets) */
566 23           static int cf_validate_create_args(uint64_t capacity,
567             uint64_t *num_buckets_out, char *errbuf) {
568 23 50         if (errbuf) errbuf[0] = '\0';
569 23 50         if (capacity < 1) { CF_ERR("capacity must be >= 1"); return 0; }
    0          
570              
571             /* Size for the target load factor: num_buckets =
572             next_pow2(ceil(capacity / CF_SLOTS / 0.95)), floor CF_MIN_BUCKETS.
573             Reject (don't silently cap) a capacity that would exceed the bucket cap,
574             else the filter would be undersized and overflow at the requested load. */
575 23           double want_d = ceil((double)capacity / (double)CF_SLOTS / 0.95);
576 23 100         if (want_d > (double)CF_MAX_BUCKETS) { CF_ERR("capacity too large for the bucket cap"); return 0; }
    50          
577 22           uint64_t num_buckets = cf_next_pow2_u64((uint64_t)want_d); /* next_pow2 floors at CF_MIN_BUCKETS */
578              
579 22           *num_buckets_out = num_buckets;
580 22           return 1;
581             }
582              
583 21           static CfHandle *cf_create(const char *path, uint64_t capacity, char *errbuf) {
584             uint64_t num_buckets;
585 21 100         if (!cf_validate_create_args(capacity, &num_buckets, errbuf)) return NULL;
586              
587 20           uint64_t total = cf_total_size(num_buckets);
588 20           int anonymous = (path == NULL);
589 20           int fd = -1;
590             size_t map_size;
591             void *base;
592              
593 20 100         if (anonymous) {
594 15           map_size = (size_t)total;
595 15           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
596 15 50         if (base == MAP_FAILED) { CF_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
597             } else {
598 5           fd = open(path, O_RDWR|O_CREAT, 0666);
599 7 50         if (fd < 0) { CF_ERR("open: %s", strerror(errno)); return NULL; }
    0          
600 5 50         if (flock(fd, LOCK_EX) < 0) { CF_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
601             struct stat st;
602 5 50         if (fstat(fd, &st) < 0) { CF_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
603 5           int is_new = (st.st_size == 0);
604 5 100         if (!is_new && (uint64_t)st.st_size < sizeof(CfHeader)) {
    100          
605 1 50         CF_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
606 1           flock(fd, LOCK_UN); close(fd); return NULL;
607             }
608 4 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
609 0 0         CF_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
610             }
611 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
612 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
613 4 50         if (base == MAP_FAILED) { CF_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
614 4 100         if (!is_new) {
615 1 50         if (!cf_validate_header((CfHeader *)base, (uint64_t)st.st_size)) {
616 0 0         CF_ERR("invalid Cuckoo filter file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
617             }
618 1           flock(fd, LOCK_UN); close(fd);
619 1           return cf_setup(base, map_size, path, -1);
620             }
621             }
622 18           cf_init_header(base, num_buckets, capacity, total);
623 18 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
624 18           return cf_setup(base, map_size, path, -1);
625             }
626              
627 2           static CfHandle *cf_create_memfd(const char *name, uint64_t capacity, char *errbuf) {
628             uint64_t num_buckets;
629 2 50         if (!cf_validate_create_args(capacity, &num_buckets, errbuf)) return NULL;
630              
631 2           uint64_t total = cf_total_size(num_buckets);
632 2 100         int fd = memfd_create(name ? name : "cuckoo", MFD_CLOEXEC | MFD_ALLOW_SEALING);
633 2 50         if (fd < 0) { CF_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
634 2 50         if (ftruncate(fd, (off_t)total) < 0) {
635 0 0         CF_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
636             }
637 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
638 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
639 2 50         if (base == MAP_FAILED) { CF_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
640 2           cf_init_header(base, num_buckets, capacity, total);
641 2           return cf_setup(base, (size_t)total, NULL, fd);
642             }
643              
644 2           static CfHandle *cf_open_fd(int fd, char *errbuf) {
645 2 50         if (errbuf) errbuf[0] = '\0';
646             struct stat st;
647 2 50         if (fstat(fd, &st) < 0) { CF_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
648 2 100         if ((uint64_t)st.st_size < sizeof(CfHeader)) { CF_ERR("too small"); return NULL; }
    50          
649 1           size_t ms = (size_t)st.st_size;
650 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
651 1 50         if (base == MAP_FAILED) { CF_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
652 1 50         if (!cf_validate_header((CfHeader *)base, (uint64_t)st.st_size)) {
653 0 0         CF_ERR("invalid Cuckoo filter table"); munmap(base, ms); return NULL;
654             }
655 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
656 1 50         if (myfd < 0) { CF_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
657 1           return cf_setup(base, ms, NULL, myfd);
658             }
659              
660 22           static void cf_destroy(CfHandle *h) {
661 22 50         if (!h) return;
662 22 100         if (h->backing_fd >= 0) close(h->backing_fd);
663 22 50         if (h->base) munmap(h->base, h->mmap_size);
664 22           free(h->path);
665 22           free(h);
666             }
667              
668 2           static inline int cf_msync(CfHandle *h) {
669 2 50         if (!h || !h->base) return 0;
    50          
670 2           return msync(h->base, h->mmap_size, MS_SYNC);
671             }
672              
673             /* ================================================================
674             * Cuckoo filter operations (callers hold the lock)
675             *
676             * Partial-key cuckoo hashing from a single XXH3-128 hash: a 16-bit non-zero
677             * fingerprint plus two candidate buckets i1 and i2 = alt(i1, fp), where alt is
678             * an XOR-based involution so alt(alt(i,fp),fp) == i and i1 != i2.
679             * ================================================================ */
680              
681             /* xorshift64 victim-choice RNG; advances and stores hdr->rng_state.
682             * Called only under the write lock. */
683 6501           static inline uint64_t cf_rng_next(CfHandle *h) {
684 6501           uint64_t x = h->hdr->rng_state;
685 6501           x ^= x << 13;
686 6501           x ^= x >> 7;
687 6501           x ^= x << 17;
688 6501           h->hdr->rng_state = x;
689 6501           return x;
690             }
691              
692             /* Spread a 16-bit fingerprint's bits across 64 bits (good integer mix). */
693 64215           static inline uint64_t cf_fp_mix(uint16_t fp) {
694 64215           return (uint64_t)fp * 0x9e3779b97f4a7c15ULL; /* golden-ratio mix; full 64 bits */
695             }
696              
697             /* Alternate bucket for (i, fp): involutive XOR displacement masked to the
698             * table. fh is forced non-zero so the alternate is always a different bucket
699             * (i1 != i2). Since num_buckets is a power of two and fh < num_buckets,
700             * i ^ fh stays < num_buckets, and alt(alt(i,fp),fp) == i. */
701 64215           static inline uint64_t cf_alt(CfHandle *h, uint64_t i, uint16_t fp) {
702 64215           uint64_t mask = h->hdr->bucket_mask;
703 64215           uint64_t fh = cf_fp_mix(fp) & mask;
704 64215 100         if (fh == 0) fh = 1;
705 64215           return (i ^ fh) & mask;
706             }
707              
708             /* pointer to bucket i's CF_SLOTS fingerprint slots */
709 97418           static inline uint16_t *cf_bucket(CfHandle *h, uint64_t i) {
710 97418           return cf_slots(h) + i * (uint64_t)CF_SLOTS;
711             }
712              
713             /* slot index of fp in bucket i, or -1 if absent */
714 45192           static inline int cf_bucket_find(CfHandle *h, uint64_t i, uint16_t fp) {
715 45192           uint16_t *b = cf_bucket(h, i);
716 169903 100         for (int j = 0; j < CF_SLOTS; j++)
717 143035 100         if (b[j] == fp) return j;
718 26868           return -1;
719             }
720              
721             /* place fp in a free (0) slot of bucket i: return 1 if placed, 0 if full */
722 35097           static inline int cf_bucket_insert(CfHandle *h, uint64_t i, uint16_t fp) {
723 35097           uint16_t *b = cf_bucket(h, i);
724 91585 100         for (int j = 0; j < CF_SLOTS; j++) {
725 83809 100         if (b[j] == 0) { b[j] = fp; return 1; }
726             }
727 7776           return 0;
728             }
729              
730             /* derive the fingerprint and the two candidate buckets for (item,len) */
731 58152           static inline void cf_hash(CfHandle *h, const void *item, size_t len,
732             uint16_t *fp_out, uint64_t *i1_out, uint64_t *i2_out) {
733 58152           XXH128_hash_t hh = XXH3_128bits(item, len);
734 58152           uint16_t fp = (uint16_t)(hh.high64 & 0xFFFF);
735 58152 50         if (fp == 0) fp = 1; /* 0 means empty slot; never use it */
736 58152           uint64_t i1 = (uint64_t)(hh.low64 & h->hdr->bucket_mask);
737 58152           *fp_out = fp;
738 58152           *i1_out = i1;
739 58152           *i2_out = cf_alt(h, i1, fp);
740 58152           }
741              
742             /* Add (item,len). Returns 1 on success, 0 if the table is full.
743             *
744             * ATOMIC: a failed add is a true no-op (the table is byte-identical to before),
745             * so a failed add can never introduce a false negative -- every fingerprint
746             * that was present stays present. The eviction path records every swap and
747             * rolls them back in reverse if CF_MAX_KICKS is exhausted. */
748 27322           static int cf_add_locked(CfHandle *h, const void *item, size_t len) {
749             uint16_t fp;
750             uint64_t i1, i2;
751 27322           cf_hash(h, item, len, &fp, &i1, &i2);
752              
753 27322 100         if (cf_bucket_insert(h, i1, fp) || cf_bucket_insert(h, i2, fp)) {
    100          
754 26884           h->hdr->count++;
755 26884           return 1;
756             }
757              
758             /* Both candidate buckets are full -> cuckoo eviction with a recorded
759             * path so an exhausted run can be rolled back to a byte-identical state. */
760             uint64_t path_i[CF_MAX_KICKS];
761             uint8_t path_j[CF_MAX_KICKS];
762 438 100         uint64_t i = (cf_rng_next(h) & 1) ? i1 : i2;
763 438           uint16_t carried = fp;
764 6064 100         for (int n = 0; n < CF_MAX_KICKS; n++) {
765 6063           uint8_t j = (uint8_t)(cf_rng_next(h) % CF_SLOTS);
766 6063           path_i[n] = i;
767 6063           path_j[n] = j;
768 6063           uint16_t tmp = cf_bucket(h, i)[j]; /* swap carried into slot j */
769 6063           cf_bucket(h, i)[j] = carried;
770 6063           carried = tmp;
771 6063           i = cf_alt(h, i, carried); /* follow the evicted fingerprint */
772 6063 100         if (cf_bucket_insert(h, i, carried)) { /* free slot in the new bucket? */
773 437           h->hdr->count++;
774 437           return 1; /* placed -> committed */
775             }
776             }
777             /* Exhausted: roll back every swap in reverse so the table is unchanged.
778             * After undoing step 0, carried == fp and nothing was modified. */
779 501 100         for (int n = CF_MAX_KICKS - 1; n >= 0; n--) {
780 500           uint16_t tmp = cf_bucket(h, path_i[n])[path_j[n]];
781 500           cf_bucket(h, path_i[n])[path_j[n]] = carried;
782 500           carried = tmp;
783             }
784 1           return 0; /* full; NO state change */
785             }
786              
787             /* return 1 if (item,len) is probably present, else 0 (definitely absent) */
788 26825           static int cf_contains_locked(CfHandle *h, const void *item, size_t len) {
789             uint16_t fp;
790             uint64_t i1, i2;
791 26825           cf_hash(h, item, len, &fp, &i1, &i2);
792 26825 100         return cf_bucket_find(h, i1, fp) >= 0 || cf_bucket_find(h, i2, fp) >= 0;
    100          
793             }
794              
795             /* remove ONE matching fingerprint of (item,len): clear the slot, return 1 if
796             * found, else 0. Removing an item that was never added (or one whose 16-bit
797             * fingerprint collides with a present item) can delete the wrong fingerprint --
798             * standard cuckoo-filter caveat; only remove items you added. */
799 4005           static int cf_remove_locked(CfHandle *h, const void *item, size_t len) {
800             uint16_t fp;
801             uint64_t i1, i2;
802 4005           cf_hash(h, item, len, &fp, &i1, &i2);
803 4005           int j = cf_bucket_find(h, i1, fp);
804 4005 100         if (j >= 0) { cf_bucket(h, i1)[j] = 0; h->hdr->count--; return 1; }
805 205           j = cf_bucket_find(h, i2, fp);
806 205 100         if (j >= 0) { cf_bucket(h, i2)[j] = 0; h->hdr->count--; return 1; }
807 2           return 0;
808             }
809              
810             /* reset all slots to 0, count = 0 (caller holds the write lock) */
811 1           static inline void cf_clear_locked(CfHandle *h) {
812 1           memset(cf_slots(h), 0, (size_t)(h->hdr->num_buckets * (uint64_t)CF_SLOTS * sizeof(uint16_t)));
813 1           h->hdr->count = 0;
814 1           }
815              
816             #endif /* CUCKOO_H */