File Coverage

bloom.h
Criterion Covered Total %
statement 240 368 65.2
branch 103 282 36.5
condition n/a
subroutine n/a
pod n/a
total 343 650 52.7


line stmt bran cond sub pod time code
1             /*
2             * bloom.h -- Shared-memory Bloom filter for Linux
3             *
4             * Probabilistic set membership: tells you whether an item is "definitely not"
5             * or "probably" in the set, in a fixed amount of memory, with a tunable false-
6             * positive rate and no false negatives. Each item is hashed once (XXH3-128);
7             * the two 64-bit halves drive k probe bits (Kirsch-Mitzenmacher double hashing)
8             * into a power-of-two bit array. The bit array lives in a shared mapping so
9             * several processes share one filter; a write-preferring futex rwlock with
10             * reader-slot dead-process recovery guards mutation. Two filters of equal
11             * geometry can be merged (bitwise OR -> union of memberships).
12             *
13             * Layout: Header -> reader_slots[1024] -> bits[m_bits/8]
14             */
15              
16             #ifndef BLOOM_H
17             #define BLOOM_H
18              
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36              
37             #define XXH_INLINE_ALL
38             #include "xxhash.h"
39              
40             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
41             #error "bloom.h: requires little-endian architecture"
42             #endif
43              
44              
45             /* ================================================================
46             * Constants
47             * ================================================================ */
48              
49             #define BF_MAGIC 0x4D4F4F42U /* "BOOM" (little-endian) */
50             #define BF_VERSION 1
51             #define BF_ERR_BUFLEN 256
52             #define BF_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
53             #define BF_MIN_BITS 64 /* floor on the bit array size (power of two) */
54             #define BF_MAX_BITS 0x4000000000ULL /* 2^38 bits = 32 GiB bit array cap */
55             #define BF_MIN_K 1
56             #define BF_MAX_K 32
57              
58             #define BF_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, BF_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
59              
60             /* ================================================================
61             * Structs
62             * ================================================================ */
63              
64             /* Per-process slot for dead-process recovery. Each shared rwlock counter
65             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
66             * is mirrored here so a wrlock timeout can attribute and reverse a dead
67             * process's contribution instead of waiting for the slow per-op timeout
68             * drain. */
69             typedef struct {
70             uint32_t pid; /* 0 = unclaimed */
71             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
72             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
73             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
74             } BfReaderSlot;
75              
76             struct BfHeader {
77             uint32_t magic, version; /* 0,4 */
78             uint32_t k; /* 8 number of hash probes per item */
79             uint32_t _pad0; /* 12 */
80             uint64_t m_bits; /* 16 bit array size in bits (power of two) */
81             uint64_t m_mask; /* 24 m_bits - 1 (probe index mask) */
82             uint64_t capacity; /* 32 configured item capacity n (for stats) */
83             double fp_rate; /* 40 configured target false-positive rate (for stats) */
84             uint64_t total_size; /* 48 */
85             uint64_t reader_slots_off; /* 56 */
86             uint64_t bits_off; /* 64 */
87             uint32_t rwlock; /* 72 */
88             uint32_t rwlock_waiters; /* 76 */
89             uint32_t rwlock_writers_waiting; /* 80 */
90             uint32_t _pad1; /* 84 */
91             uint64_t stat_ops; /* 88 */
92             uint8_t _pad[160]; /* 96..255 */
93             };
94             typedef struct BfHeader BfHeader;
95              
96             _Static_assert(sizeof(BfHeader) == 256, "BfHeader must be 256 bytes");
97              
98             /* ---- Process-local handle ---- */
99              
100             typedef struct BfHandle {
101             BfHeader *hdr;
102             BfReaderSlot *reader_slots; /* BF_READER_SLOTS entries */
103             void *base; /* mmap base */
104             size_t mmap_size;
105             char *path; /* backing file path (strdup'd) */
106             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
107             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
108             uint32_t cached_pid; /* getpid() cached at last slot claim */
109             uint32_t cached_fork_gen; /* bf_fork_gen value at last slot claim */
110             } BfHandle;
111              
112             /* ================================================================
113             * Futex-based write-preferring read-write lock
114             * with reader-slot dead-process recovery
115             * ================================================================ */
116              
117             #define BF_RWLOCK_SPIN_LIMIT 32
118             #define BF_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
119              
120 0           static inline void bf_rwlock_spin_pause(void) {
121             #if defined(__x86_64__) || defined(__i386__)
122 0           __asm__ volatile("pause" ::: "memory");
123             #elif defined(__aarch64__)
124             __asm__ volatile("yield" ::: "memory");
125             #else
126             __asm__ volatile("" ::: "memory");
127             #endif
128 0           }
129              
130             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
131             #define BF_RWLOCK_WRITER_BIT 0x80000000U
132             #define BF_RWLOCK_PID_MASK 0x7FFFFFFFU
133             #define BF_RWLOCK_WR(pid) (BF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BF_RWLOCK_PID_MASK))
134              
135             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
136             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
137             * lock-holder's PID is recycled to an unrelated live process before recovery
138             * runs, this reports "alive" and that slot's orphaned contribution is not
139             * reclaimed until the recycled process exits. Robust detection would require
140             * a per-slot process-start-time epoch (a header-layout/version change).
141             * Documented under "Crash Safety" in the POD. */
142 0           static inline int bf_pid_alive(uint32_t pid) {
143 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
144 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
145             }
146              
147             /* Force-recover a stale write lock left by a dead process.
148             * CAS to OUR pid to hold the lock while fixing shared state, then release.
149             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
150             * recovering process can detect and re-recover if we crash mid-recovery. */
151 0           static inline void bf_recover_stale_lock(BfHandle *h, uint32_t observed_rwlock) {
152 0           BfHeader *hdr = h->hdr;
153 0           uint32_t mypid = BF_RWLOCK_WR((uint32_t)getpid());
154 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
155             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
156 0           return;
157             /* We now hold the write lock as mypid. No additional shared state needs
158             * repair here (this module has no seqlock); just release the lock. */
159 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
160 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
161 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
162             }
163              
164             static const struct timespec bf_lock_timeout = { BF_LOCK_TIMEOUT_SEC, 0 };
165              
166             /* Process-global fork-generation counter. Incremented in the pthread_atfork
167             * child callback so every open handle detects a fork transition on the next
168             * lock call without paying a getpid() syscall on the hot path. */
169             static uint32_t bf_fork_gen = 1;
170             static pthread_once_t bf_atfork_once = PTHREAD_ONCE_INIT;
171 0           static void bf_on_fork_child(void) {
172 0           __atomic_add_fetch(&bf_fork_gen, 1, __ATOMIC_RELAXED);
173 0           }
174 2           static void bf_atfork_init(void) {
175 2           pthread_atfork(NULL, NULL, bf_on_fork_child);
176 2           }
177              
178             /* Ensure this process owns a reader slot. Called from the lock helpers so
179             * that fork()'d children pick up their own slot lazily instead of sharing
180             * the parent's. Hot-path is a single relaxed load + compare; only on a
181             * fork-generation mismatch do we touch getpid() and scan slots. */
182 50839           static inline void bf_claim_reader_slot(BfHandle *h) {
183 50839           uint32_t cur_gen = __atomic_load_n(&bf_fork_gen, __ATOMIC_RELAXED);
184 50839 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
185 50819           return;
186             /* Cold path -- register the atfork hook once per process, then claim. */
187 20           pthread_once(&bf_atfork_once, bf_atfork_init);
188             /* Re-read after pthread_once: bf_on_fork_child may have bumped it. */
189 20           cur_gen = __atomic_load_n(&bf_fork_gen, __ATOMIC_RELAXED);
190 20           uint32_t now_pid = (uint32_t)getpid();
191 20           h->cached_pid = now_pid;
192 20           h->cached_fork_gen = cur_gen;
193 20           h->my_slot_idx = UINT32_MAX;
194 20           uint32_t start = now_pid % BF_READER_SLOTS;
195 22 50         for (uint32_t i = 0; i < BF_READER_SLOTS; i++) {
196 22           uint32_t s = (start + i) % BF_READER_SLOTS;
197 22           uint32_t expected = 0;
198 22 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
199             &expected, now_pid, 0,
200             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
201             /* Zero all mirror fields, not just subcount: a SIGKILL'd
202             * predecessor may have left waiters_parked/writers_parked
203             * non-zero, and bf_recover_dead_readers won't drain them
204             * once we own the slot (the CAS expects the dead PID). */
205 20           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
206 20           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
207 20           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
208 20           h->my_slot_idx = s;
209 20           return;
210             }
211             }
212             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
213             * tracking for this handle (lock still works; just no recovery). */
214             }
215              
216             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
217 0           static inline void bf_atomic_sub_cap(uint32_t *p, uint32_t sub) {
218 0 0         if (!sub) return;
219 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
220 0           for (;;) {
221 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
222 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
223             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
224 0           return;
225             }
226             }
227              
228             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
229             * contributions back to the global counters. A no-op if the slot was stolen
230             * by another recoverer or had no waiter contribution to drain.
231             *
232             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
233             * Between our CAS and a follow-up store, a new process could claim the
234             * slot and start populating these fields -- our stores would clobber its
235             * state. bf_claim_reader_slot zeros all three on every claim, so
236             * leaving stale values is harmless. */
237 0           static inline void bf_drain_dead_slot(BfHandle *h, uint32_t i, uint32_t pid) {
238 0           BfHeader *hdr = h->hdr;
239 0           uint32_t expected = pid;
240             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
241             * ACQUIRE syncs us with prior writes from the dead process to
242             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
243             * a plain RELAXED load before the CAS could miss those writes;
244             * loading them after the CAS keeps them inside the acquire window. */
245 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
246             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
247 0           return;
248 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
249 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
250 0 0         if (wp) bf_atomic_sub_cap(&hdr->rwlock_waiters, wp);
251 0 0         if (writp) bf_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
252             }
253              
254             /* Scan reader slots for dead-process recovery.
255             *
256             * For each dead PID with non-zero contributions to the shared rwlock,
257             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
258             * out so live processes don't have to wait for the slow per-op timeout
259             * decrement to drain it for them.
260             *
261             * For the main rwlock counter we use the "no live reader holds -> force-
262             * reset to 0" trick (precise) because per-process attribution of the
263             * subcount is racy across the inc-counter-then-inc-subcount window. */
264 0           static inline void bf_recover_dead_readers(BfHandle *h) {
265 0 0         if (!h->reader_slots) return;
266 0           BfHeader *hdr = h->hdr;
267 0           int any_live_reader = 0;
268 0           int found_dead_reader = 0;
269              
270             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
271             * contribution to lose) are wiped immediately to free the slot for
272             * future claimants and drain any orphan parked-waiter counters. Slots
273             * with dead pid and sc > 0 are left intact in this pass: if force-
274             * reset cannot fire (because a live reader is concurrently present),
275             * wiping the dead slot would lose the only record of its orphan
276             * rwlock contribution and strand writers permanently once the live
277             * reader releases. */
278 0 0         for (uint32_t i = 0; i < BF_READER_SLOTS; i++) {
279 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
280 0 0         if (pid == 0) continue;
281 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
282 0 0         if (bf_pid_alive(pid)) {
283 0 0         if (sc > 0) any_live_reader = 1;
284 0           continue;
285             }
286 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
287 0           bf_drain_dead_slot(h, i, pid);
288             }
289              
290             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
291             * reset CAS FIRST, while the window since pass 1's last scan is
292             * still narrow (a handful of instructions, as in the original
293             * single-pass code). A new reader that started rdlock between
294             * pass 1's scan and the CAS will either:
295             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
296             * fails (cur mismatched), recovery yields and a future
297             * cycle retries; or
298             * (b) be still in the subcount-bump phase -- our CAS sees the
299             * stale cur and resets to 0; the new reader's subsequent CAS
300             * rwlock(0 -> 1) succeeds cleanly.
301             * Only after the CAS resolves do we wipe the deferred dead slots,
302             * keeping that work outside the race-sensitive window. */
303 0 0         if (found_dead_reader && !any_live_reader) {
    0          
304 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
305 0 0         if (cur > 0 && cur < BF_RWLOCK_WRITER_BIT) {
    0          
306 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
307             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
308 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
309 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
310             }
311             }
312 0 0         for (uint32_t i = 0; i < BF_READER_SLOTS; i++) {
313 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
314 0 0         if (pid == 0 || bf_pid_alive(pid)) continue;
    0          
315 0           bf_drain_dead_slot(h, i, pid);
316             }
317             }
318             }
319              
320             /* Inspect the lock word after a futex-wait timeout. If a dead writer
321             * holds it, force-recover the lock. Otherwise drain dead readers' shares
322             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
323             * branches -- identical recovery logic in both. */
324 0           static inline void bf_recover_after_timeout(BfHandle *h) {
325 0           BfHeader *hdr = h->hdr;
326 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
327 0 0         if (val >= BF_RWLOCK_WRITER_BIT) {
328 0           uint32_t pid = val & BF_RWLOCK_PID_MASK;
329 0 0         if (!bf_pid_alive(pid))
330 0           bf_recover_stale_lock(h, val);
331             } else {
332 0           bf_recover_dead_readers(h);
333             }
334 0           }
335              
336             /* Park/unpark helpers: bump the global waiter counters together with this
337             * process's mirrored slot counters so a wrlock-timeout recovery scan can
338             * attribute and reverse a dead PID's contribution. Kept paired to make
339             * accidental drift between global and per-slot counts impossible. */
340 0           static inline void bf_park_reader(BfHandle *h) {
341 0 0         if (h->my_slot_idx != UINT32_MAX)
342 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
343 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
344 0           }
345 0           static inline void bf_unpark_reader(BfHandle *h) {
346 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
347 0 0         if (h->my_slot_idx != UINT32_MAX)
348 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
349 0           }
350 0           static inline void bf_park_writer(BfHandle *h) {
351 0 0         if (h->my_slot_idx != UINT32_MAX) {
352 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
353 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
354             }
355 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
356 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
357 0           }
358 0           static inline void bf_unpark_writer(BfHandle *h) {
359 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
360 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
361 0 0         if (h->my_slot_idx != UINT32_MAX) {
362 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
363 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
364             }
365 0           }
366              
367 24821           static inline void bf_rwlock_rdlock(BfHandle *h) {
368 24821           bf_claim_reader_slot(h);
369 24821           BfHeader *hdr = h->hdr;
370 24821           uint32_t *lock = &hdr->rwlock;
371 24821           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
372             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
373             * a concurrent writer-side recovery scan that sees our PID alive with
374             * subcount > 0 will (correctly) defer force-reset, even while we are
375             * still spinning trying to win the rwlock CAS. Without this, a reader
376             * killed between rwlock CAS-success and subcount++ would let recovery
377             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
378             * our eventual rdunlock dec. */
379 24821 50         if (h->my_slot_idx != UINT32_MAX)
380 24821           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
381 24821           for (int spin = 0; ; spin++) {
382 24821           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
383             /* Write-preferring: when lock is free (cur==0) and writers are
384             * waiting, yield to let the writer acquire. When readers are
385             * already active (cur>=1), new readers may join freely. */
386 24821 50         if (cur > 0 && cur < BF_RWLOCK_WRITER_BIT) {
    0          
387 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
388             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
389 24821           return;
390 24821 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
391 24821 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
392             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
393 24821           return;
394             }
395 0 0         if (__builtin_expect(spin < BF_RWLOCK_SPIN_LIMIT, 1)) {
396 0           bf_rwlock_spin_pause();
397 0           continue;
398             }
399 0           bf_park_reader(h);
400 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
401             /* Sleep when write-locked OR when yielding to waiting writers */
402 0 0         if (cur >= BF_RWLOCK_WRITER_BIT || cur == 0) {
    0          
403 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
404             &bf_lock_timeout, NULL, 0);
405 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
406 0           bf_unpark_reader(h);
407 0           bf_recover_after_timeout(h);
408 0           spin = 0;
409 0           continue;
410             }
411             }
412 0           bf_unpark_reader(h);
413 0           spin = 0;
414             }
415             }
416              
417 24821           static inline void bf_rwlock_rdunlock(BfHandle *h) {
418 24821           BfHeader *hdr = h->hdr;
419             /* Release the shared counter BEFORE dropping our subcount so that
420             * "any live PID with subcount > 0" is a reliable in-flight indicator
421             * for the writer-side recovery scan. Inverting these would create a
422             * window where we still own a unit of rwlock but our slot subcount is
423             * 0, letting recovery force-reset rwlock underneath us. */
424 24821           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
425 24821 50         if (h->my_slot_idx != UINT32_MAX)
426 24821           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
427 24821 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
428 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
429 24821           }
430              
431 26018           static inline void bf_rwlock_wrlock(BfHandle *h) {
432 26018           bf_claim_reader_slot(h); /* refresh cached_pid across fork */
433 26018           BfHeader *hdr = h->hdr;
434 26018           uint32_t *lock = &hdr->rwlock;
435             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
436             * any crash window between acquiring the lock and storing the owner. */
437 26018           uint32_t mypid = BF_RWLOCK_WR(h->cached_pid);
438 26018           for (int spin = 0; ; spin++) {
439 26018           uint32_t expected = 0;
440 26018 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
441             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
442 26018           return;
443 0 0         if (__builtin_expect(spin < BF_RWLOCK_SPIN_LIMIT, 1)) {
444 0           bf_rwlock_spin_pause();
445 0           continue;
446             }
447 0           bf_park_writer(h);
448 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
449 0 0         if (cur != 0) {
450 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
451             &bf_lock_timeout, NULL, 0);
452 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
453 0           bf_unpark_writer(h);
454 0           bf_recover_after_timeout(h);
455 0           spin = 0;
456 0           continue;
457             }
458             }
459 0           bf_unpark_writer(h);
460 0           spin = 0;
461             }
462             }
463              
464 26018           static inline void bf_rwlock_wrunlock(BfHandle *h) {
465 26018           BfHeader *hdr = h->hdr;
466 26018           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
467 26018 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
468 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
469 26018           }
470              
471             /* ================================================================
472             * Layout math + create / open / destroy
473             *
474             * Layout: Header -> reader_slots[1024] -> bits[m_bits/8]
475             * ================================================================ */
476              
477             /* Single source of truth for the mmap region layout offsets. */
478             typedef struct { uint64_t reader_slots, bits; } BfLayout;
479              
480 56           static inline BfLayout bf_layout(void) {
481             BfLayout L;
482 56           L.reader_slots = sizeof(BfHeader);
483 56           L.bits = L.reader_slots + (uint64_t)BF_READER_SLOTS * sizeof(BfReaderSlot);
484 56           L.bits = (L.bits + 7) & ~(uint64_t)7; /* 8-byte align the bit array (uint64_t words) */
485 56           return L;
486             }
487              
488 29           static inline uint64_t bf_total_size(uint64_t m_bits) {
489 29           BfLayout L = bf_layout();
490 29           return L.bits + (m_bits / 8); /* m_bits is a power of two >= 64 -> exact bytes */
491             }
492              
493             /* round v up to the next power of two (64-bit), with a floor of BF_MIN_BITS */
494 27           static inline uint64_t bf_next_pow2_u64(uint64_t v) {
495 27 100         if (v <= BF_MIN_BITS) return BF_MIN_BITS;
496 26           return 1ULL << (64 - __builtin_clzll(v - 1));
497             }
498              
499 25           static inline void bf_init_header(void *base, uint32_t k, uint64_t m_bits,
500             uint64_t capacity, double fp_rate, uint64_t total) {
501 25           BfLayout L = bf_layout();
502 25           BfHeader *hdr = (BfHeader *)base;
503             /* Explicitly zero the header + reader-slot region (lock-recovery state, like
504             hll.h); the bit array relies on the fresh mapping being OS zero-filled. */
505 25           memset(base, 0, (size_t)L.bits);
506 25           hdr->magic = BF_MAGIC;
507 25           hdr->version = BF_VERSION;
508 25           hdr->k = k;
509 25           hdr->m_bits = m_bits;
510 25           hdr->m_mask = m_bits - 1;
511 25           hdr->capacity = capacity;
512 25           hdr->fp_rate = fp_rate;
513 25           hdr->total_size = total;
514 25           hdr->reader_slots_off = L.reader_slots;
515 25           hdr->bits_off = L.bits;
516 25           __atomic_thread_fence(__ATOMIC_SEQ_CST);
517 25           }
518              
519 57631           static inline uint64_t *bf_bits(BfHandle *h) {
520 57631           return (uint64_t *)((char *)h->base + h->hdr->bits_off);
521             }
522              
523 27           static inline BfHandle *bf_setup(void *base, size_t map_size,
524             const char *path, int backing_fd) {
525 27           BfHeader *hdr = (BfHeader *)base;
526 27           BfHandle *h = (BfHandle *)calloc(1, sizeof(BfHandle));
527 27 50         if (!h) {
528 0           munmap(base, map_size);
529 0 0         if (backing_fd >= 0) close(backing_fd);
530 0           return NULL;
531             }
532 27           h->hdr = hdr;
533 27           h->base = base;
534 27           h->reader_slots = (BfReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
535 27           h->mmap_size = map_size;
536 27 100         h->path = path ? strdup(path) : NULL;
537 27           h->backing_fd = backing_fd;
538 27           h->my_slot_idx = UINT32_MAX;
539 27           return h;
540             }
541              
542             /* Validate a mapped header (shared by bf_create reopen and bf_open_fd). */
543 2           static inline int bf_validate_header(const BfHeader *hdr, uint64_t file_size) {
544 2 50         if (hdr->magic != BF_MAGIC) return 0;
545 2 50         if (hdr->version != BF_VERSION) return 0;
546 2 50         if (hdr->k < BF_MIN_K || hdr->k > BF_MAX_K) return 0;
    50          
547 2 50         if (hdr->m_bits < BF_MIN_BITS || hdr->m_bits > BF_MAX_BITS) return 0;
    50          
548 2 50         if ((hdr->m_bits & (hdr->m_bits - 1)) != 0) return 0; /* power of two */
549 2 50         if (hdr->m_mask != hdr->m_bits - 1) return 0;
550 2 50         if (hdr->capacity == 0) return 0;
551 2 50         if (!(hdr->fp_rate > 0.0 && hdr->fp_rate < 1.0)) return 0;
    50          
552 2 50         if (hdr->total_size != file_size) return 0;
553 2 50         if (hdr->total_size != bf_total_size(hdr->m_bits)) return 0;
554 2           BfLayout L = bf_layout();
555 2 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
556 2 50         if (hdr->bits_off != L.bits) return 0;
557 2           return 1;
558             }
559              
560             /* validate args + compute the geometry (k, m_bits) */
561 28           static int bf_validate_create_args(uint64_t capacity, double fp_rate,
562             uint32_t *k_out, uint64_t *m_bits_out, char *errbuf) {
563 28 50         if (errbuf) errbuf[0] = '\0';
564 28 50         if (capacity < 1) { BF_ERR("capacity must be >= 1"); return 0; }
    0          
565 28 50         if (!(fp_rate > 0.0 && fp_rate < 1.0)) { BF_ERR("fp_rate must be between 0 and 1 (exclusive)"); return 0; }
    50          
    0          
566              
567             /* k = round(-log2(fp_rate)) clamped to [1, 32] */
568 28           long kl = lround(-log2(fp_rate));
569 28 50         if (kl < BF_MIN_K) kl = BF_MIN_K;
570 28 50         if (kl > BF_MAX_K) kl = BF_MAX_K;
571 28           uint32_t k = (uint32_t)kl;
572              
573             /* m_opt = ceil(capacity * k / ln2); reject if it would exceed the bit-array
574             * cap (otherwise the filter would be silently undersized -> fp_rate broken);
575             * m_bits = next_pow2(m_opt), floor BF_MIN_BITS. */
576 28           double m_opt_d = ceil((double)capacity * (double)k / M_LN2);
577 28 100         if (m_opt_d > (double)BF_MAX_BITS) { BF_ERR("capacity too large for the bit-array cap"); return 0; }
    50          
578 27           uint64_t m_bits = bf_next_pow2_u64((uint64_t)m_opt_d);
579              
580 27           *k_out = k;
581 27           *m_bits_out = m_bits;
582 27           return 1;
583             }
584              
585 26           static BfHandle *bf_create(const char *path, uint64_t capacity, double fp_rate, char *errbuf) {
586             uint32_t k;
587             uint64_t m_bits;
588 26 100         if (!bf_validate_create_args(capacity, fp_rate, &k, &m_bits, errbuf)) return NULL;
589              
590 25           uint64_t total = bf_total_size(m_bits);
591 25           int anonymous = (path == NULL);
592 25           int fd = -1;
593             size_t map_size;
594             void *base;
595              
596 25 100         if (anonymous) {
597 20           map_size = (size_t)total;
598 20           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
599 20 50         if (base == MAP_FAILED) { BF_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
600             } else {
601 5           fd = open(path, O_RDWR|O_CREAT, 0666);
602 7 50         if (fd < 0) { BF_ERR("open: %s", strerror(errno)); return NULL; }
    0          
603 5 50         if (flock(fd, LOCK_EX) < 0) { BF_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
604             struct stat st;
605 5 50         if (fstat(fd, &st) < 0) { BF_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
606 5           int is_new = (st.st_size == 0);
607 5 100         if (!is_new && (uint64_t)st.st_size < sizeof(BfHeader)) {
    100          
608 1 50         BF_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
609 1           flock(fd, LOCK_UN); close(fd); return NULL;
610             }
611 4 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
612 0 0         BF_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
613             }
614 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
615 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
616 4 50         if (base == MAP_FAILED) { BF_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
617 4 100         if (!is_new) {
618 1 50         if (!bf_validate_header((BfHeader *)base, (uint64_t)st.st_size)) {
619 0 0         BF_ERR("invalid Bloom filter file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
620             }
621 1           flock(fd, LOCK_UN); close(fd);
622 1           return bf_setup(base, map_size, path, -1);
623             }
624             }
625 23           bf_init_header(base, k, m_bits, capacity, fp_rate, total);
626 23 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
627 23           return bf_setup(base, map_size, path, -1);
628             }
629              
630 2           static BfHandle *bf_create_memfd(const char *name, uint64_t capacity, double fp_rate, char *errbuf) {
631             uint32_t k;
632             uint64_t m_bits;
633 2 50         if (!bf_validate_create_args(capacity, fp_rate, &k, &m_bits, errbuf)) return NULL;
634              
635 2           uint64_t total = bf_total_size(m_bits);
636 2 100         int fd = memfd_create(name ? name : "bloom", MFD_CLOEXEC | MFD_ALLOW_SEALING);
637 2 50         if (fd < 0) { BF_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
638 2 50         if (ftruncate(fd, (off_t)total) < 0) {
639 0 0         BF_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
640             }
641 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
642 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
643 2 50         if (base == MAP_FAILED) { BF_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
644 2           bf_init_header(base, k, m_bits, capacity, fp_rate, total);
645 2           return bf_setup(base, (size_t)total, NULL, fd);
646             }
647              
648 2           static BfHandle *bf_open_fd(int fd, char *errbuf) {
649 2 50         if (errbuf) errbuf[0] = '\0';
650             struct stat st;
651 2 50         if (fstat(fd, &st) < 0) { BF_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
652 2 100         if ((uint64_t)st.st_size < sizeof(BfHeader)) { BF_ERR("too small"); return NULL; }
    50          
653 1           size_t ms = (size_t)st.st_size;
654 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
655 1 50         if (base == MAP_FAILED) { BF_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
656 1 50         if (!bf_validate_header((BfHeader *)base, (uint64_t)st.st_size)) {
657 0 0         BF_ERR("invalid Bloom filter table"); munmap(base, ms); return NULL;
658             }
659 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
660 1 50         if (myfd < 0) { BF_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
661 1           return bf_setup(base, ms, NULL, myfd);
662             }
663              
664 27           static void bf_destroy(BfHandle *h) {
665 27 50         if (!h) return;
666 27 100         if (h->backing_fd >= 0) close(h->backing_fd);
667 27 50         if (h->base) munmap(h->base, h->mmap_size);
668 27           free(h->path);
669 27           free(h);
670             }
671              
672 2           static inline int bf_msync(BfHandle *h) {
673 2 50         if (!h || !h->base) return 0;
    50          
674 2           return msync(h->base, h->mmap_size, MS_SYNC);
675             }
676              
677             /* ================================================================
678             * Bloom filter operations (callers hold the lock) -- double hashing
679             * (Kirsch-Mitzenmacher): one XXH3-128 hash drives all k probes.
680             * ================================================================ */
681              
682 57616           static inline void bf_indices(BfHandle *h, const void *item, size_t len,
683             uint64_t *h1, uint64_t *h2) {
684             (void)h;
685 57616           XXH128_hash_t hh = XXH3_128bits(item, len);
686 57616           *h1 = hh.low64;
687 57616           *h2 = hh.high64 | 1ULL; /* force odd so the k probes spread over the pow2 table */
688 57616           }
689              
690             /* set k bits; return 1 if the item was probably NEW (at least one bit was 0), else 0 */
691 32806           static int bf_add_locked(BfHandle *h, const void *item, size_t len) {
692             uint64_t h1, h2;
693 32806           bf_indices(h, item, len, &h1, &h2);
694 32806           uint64_t mask = h->hdr->m_mask;
695 32806           uint32_t k = h->hdr->k;
696 32806           uint64_t *bits = bf_bits(h);
697 32806           int was_new = 0;
698 262448 100         for (uint32_t i = 0; i < k; i++) {
699 229642           uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
700 229642           uint64_t word = idx >> 6;
701 229642           uint64_t bit = 1ULL << (idx & 63);
702 229642 100         if (!(bits[word] & bit)) { bits[word] |= bit; was_new = 1; }
703             }
704 32806           return was_new;
705             }
706              
707             /* return 1 if ALL k bits are set (probably present), else 0 */
708 24810           static int bf_contains_locked(BfHandle *h, const void *item, size_t len) {
709             uint64_t h1, h2;
710 24810           bf_indices(h, item, len, &h1, &h2);
711 24810           uint64_t mask = h->hdr->m_mask;
712 24810           uint32_t k = h->hdr->k;
713 24810           uint64_t *bits = bf_bits(h);
714 135538 100         for (uint32_t i = 0; i < k; i++) {
715 120705           uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
716 120705           uint64_t word = idx >> 6;
717 120705           uint64_t bit = 1ULL << (idx & 63);
718 120705 100         if (!(bits[word] & bit)) return 0;
719             }
720 14833           return 1;
721             }
722              
723             /* count set bits across the whole array (caller holds a lock) */
724 8           static uint64_t bf_popcount_locked(BfHandle *h) {
725 8           uint64_t *bits = bf_bits(h);
726 8           uint64_t words = h->hdr->m_bits / 64;
727 8           uint64_t n = 0;
728 4616 100         for (uint64_t i = 0; i < words; i++)
729 4608           n += (uint64_t)__builtin_popcountll(bits[i]);
730 8           return n;
731             }
732              
733             /* estimate the number of distinct items added, from a pre-computed popcount X.
734             n_est = -(m/k) * ln(1 - X/m); saturated -> capacity. (caller holds a lock) */
735 8           static uint64_t bf_count_from_popcount(BfHandle *h, uint64_t X) {
736 8           uint64_t m_bits = h->hdr->m_bits;
737 8           uint32_t k = h->hdr->k;
738 8 50         if (X >= m_bits) return h->hdr->capacity; /* saturated */
739 8           double n_est = -((double)m_bits / (double)k) * log(1.0 - (double)X / (double)m_bits);
740 8 50         if (n_est < 0.0) n_est = 0.0;
741 8           return (uint64_t)(n_est + 0.5);
742             }
743              
744             /* estimate the number of distinct items added (popcounts the array). (caller holds a lock) */
745 2           static uint64_t bf_count_locked(BfHandle *h) {
746 2           return bf_count_from_popcount(h, bf_popcount_locked(h));
747             }
748              
749             /* merge src words into dst (caller guarantees equal m_bits); bitwise OR */
750 3           static void bf_merge_words(BfHandle *dst, const uint64_t *src_words) {
751 3           uint64_t *bits = bf_bits(dst);
752 3           uint64_t words = dst->hdr->m_bits / 64;
753 1795 100         for (uint64_t i = 0; i < words; i++)
754 1792           bits[i] |= src_words[i];
755 3           }
756              
757             /* reset all bits to 0 (caller holds the write lock) */
758 1           static inline void bf_clear_locked(BfHandle *h) {
759 1           memset(bf_bits(h), 0, (size_t)(h->hdr->m_bits / 8));
760 1           }
761              
762             #endif /* BLOOM_H */