File Coverage

roaring.h
Criterion Covered Total %
statement 462 606 76.2
branch 225 408 55.1
condition n/a
subroutine n/a
pod n/a
total 687 1014 67.7


line stmt bran cond sub pod time code
1             /*
2             * roaring.h -- Shared-memory Roaring bitmap (compressed uint32 set) for Linux
3             *
4             * A Roaring bitmap is a compressed set of 32-bit unsigned integers. The 32-bit
5             * space is split into 65536 buckets keyed by the high 16 bits; each bucket holds
6             * the low 16 bits of its members in one of two container kinds, chosen by
7             * density:
8             *
9             * - array container : a SORTED ascending uint16 array (good when the bucket is
10             * sparse, <= 4096 elements).
11             * - bitmap container: a 65536-bit bitmap (good when the bucket is dense).
12             *
13             * A bucket starts as an array and is converted to a bitmap once it would exceed
14             * RB_ARRAY_MAX (4096) elements. Both kinds occupy one fixed 8192-byte slot
15             * (4096 * sizeof(uint16) == 1024 * sizeof(uint64) == 8192), drawn from a slot
16             * pool with a freelist, so several processes share one bitmap in a shared
17             * mapping. A write-preferring futex rwlock with reader-slot dead-process
18             * recovery guards mutation.
19             *
20             * contains / cardinality / min / max / to_array are pure reads under the READ
21             * lock; add / remove / clear / union / intersect take the WRITE lock.
22             *
23             * v1 scope: array + bitmap containers (no run containers); union and intersect
24             * only (no xor / andnot yet); bitmap containers are NOT down-converted to arrays
25             * on removal.
26             *
27             * Layout: Header -> reader_slots[1024] -> bucket_table[65536] -> container_pool[container_cap]
28             */
29              
30             #ifndef ROARING_H
31             #define ROARING_H
32              
33             #include
34             #include
35             #include
36             #include
37             #include
38             #include
39             #include
40             #include
41             #include
42             #include
43             #include
44             #include
45             #include
46             #include
47             #include
48             #include
49             #include
50              
51             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
52             #error "roaring.h: requires little-endian architecture"
53             #endif
54              
55              
56             /* ================================================================
57             * Constants
58             * ================================================================ */
59              
60             #define RB_MAGIC 0x474E5252U /* "RRNG" (little-endian) */
61             #define RB_VERSION 1
62             #define RB_ERR_BUFLEN 256
63             #define RB_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
64             #define RB_NUM_BUCKETS 65536u /* one bucket per high-16 value */
65             #define RB_CONTAINER_BYTES 8192u /* 4096*2 == 1024*8: array and bitmap share this size */
66             #define RB_ARRAY_MAX 4096u /* convert array -> bitmap when an array would exceed this */
67             #define RB_MAX_CONTAINERS (1u << 20) /* container-pool ceiling; slot index 0 is the NULL sentinel */
68              
69             /* Container types (stored in bucket_table[hi].type) */
70             #define RB_TYPE_NONE 0u
71             #define RB_TYPE_ARRAY 1u
72             #define RB_TYPE_BITMAP 2u
73              
74             #define RB_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, RB_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
75              
76             /* ================================================================
77             * Structs
78             * ================================================================ */
79              
80             /* Direct-indexed bucket entry, one per high-16 value. container_off == 0 means
81             * the bucket is empty (slot index 0 is the reserved NULL sentinel, never a real
82             * container). When non-empty, container_off is the 1-based slot index into the
83             * container pool and `type` selects the interpretation of that slot. */
84             typedef struct {
85             uint32_t container_off; /* 1-based slot index; 0 == empty bucket */
86             uint32_t cardinality; /* number of members in this bucket (1..65536) */
87             uint32_t type; /* RB_TYPE_NONE / RB_TYPE_ARRAY / RB_TYPE_BITMAP */
88             uint32_t _pad; /* pad to 16 bytes for clean alignment */
89             } RbBucket;
90              
91             _Static_assert(sizeof(RbBucket) == 16, "RbBucket must be 16 bytes");
92              
93             /* Per-process slot for dead-process recovery. Mirrors this process's
94             * contribution to each shared rwlock counter so a wrlock timeout can attribute
95             * and reverse a dead process's share instead of waiting for the slow per-op
96             * timeout drain. */
97             typedef struct {
98             uint32_t pid; /* 0 = unclaimed */
99             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
100             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
101             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
102             } RbReaderSlot;
103              
104             struct RbHeader {
105             uint32_t magic, version; /* 0,4 */
106             uint32_t container_cap; /* 8 container-pool capacity (slots, incl. NULL sentinel) */
107             uint32_t container_used; /* 12 1-based high-water of slots allocated since creation/last clear (slot 0 reserved) */
108             uint32_t free_head; /* 16 freelist head (slot index, 0 == empty) */
109             uint32_t free_count; /* 20 number of slots on the freelist (O(1) capacity check) */
110             uint64_t bitmap_id; /* 24 per-bitmap identity (random, set once at create, never 0);
111             * orders set-op lock acquisition consistently across processes */
112             uint64_t cardinality; /* 32 TOTAL elements across all buckets */
113             uint64_t total_size; /* 40 */
114             uint64_t reader_slots_off; /* 48 */
115             uint64_t bucket_table_off; /* 56 */
116             uint64_t container_pool_off; /* 64 */
117             uint32_t rwlock; /* 72 */
118             uint32_t rwlock_waiters; /* 76 */
119             uint32_t rwlock_writers_waiting; /* 80 */
120             uint32_t _pad0; /* 84 align stat_ops to 8 */
121             uint64_t stat_ops; /* 88 */
122             uint8_t _pad1[160]; /* 96..255 */
123             };
124             typedef struct RbHeader RbHeader;
125              
126             _Static_assert(sizeof(RbHeader) == 256, "RbHeader must be 256 bytes");
127              
128             /* ---- Process-local handle ---- */
129              
130             typedef struct RbHandle {
131             RbHeader *hdr;
132             RbReaderSlot *reader_slots; /* RB_READER_SLOTS entries */
133             void *base; /* mmap base */
134             size_t mmap_size;
135             char *path; /* backing file path (strdup'd) */
136             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
137             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
138             uint32_t cached_pid; /* getpid() cached at last slot claim */
139             uint32_t cached_fork_gen;/* rb_fork_gen value at last slot claim */
140             } RbHandle;
141              
142             /* ================================================================
143             * Futex-based write-preferring read-write lock
144             * with reader-slot dead-process recovery
145             * ================================================================ */
146              
147             #define RB_RWLOCK_SPIN_LIMIT 32
148             #define RB_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
149              
150 0           static inline void rb_rwlock_spin_pause(void) {
151             #if defined(__x86_64__) || defined(__i386__)
152 0           __asm__ volatile("pause" ::: "memory");
153             #elif defined(__aarch64__)
154             __asm__ volatile("yield" ::: "memory");
155             #else
156             __asm__ volatile("" ::: "memory");
157             #endif
158 0           }
159              
160             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
161             #define RB_RWLOCK_WRITER_BIT 0x80000000U
162             #define RB_RWLOCK_PID_MASK 0x7FFFFFFFU
163             #define RB_RWLOCK_WR(pid) (RB_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & RB_RWLOCK_PID_MASK))
164              
165             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
166             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
167             * lock-holder's PID is recycled to an unrelated live process before recovery
168             * runs, this reports "alive" and that slot's orphaned contribution is not
169             * reclaimed until the recycled process exits. Documented under "Crash Safety". */
170 0           static inline int rb_pid_alive(uint32_t pid) {
171 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
172 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
173             }
174              
175             /* Force-recover a stale write lock left by a dead process.
176             * CAS to OUR pid to hold the lock while fixing shared state, then release.
177             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
178             * recovering process can detect and re-recover if we crash mid-recovery. */
179 0           static inline void rb_recover_stale_lock(RbHandle *h, uint32_t observed_rwlock) {
180 0           RbHeader *hdr = h->hdr;
181 0           uint32_t mypid = RB_RWLOCK_WR((uint32_t)getpid());
182 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
183             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
184 0           return;
185             /* We now hold the write lock as mypid. No additional shared state needs
186             * repair here (this module has no seqlock); just release the lock. */
187 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
188 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
189 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
190             }
191              
192             static const struct timespec rb_lock_timeout = { RB_LOCK_TIMEOUT_SEC, 0 };
193              
194             /* Process-global fork-generation counter. Incremented in the pthread_atfork
195             * child callback so every open handle detects a fork transition on the next
196             * lock call without paying a getpid() syscall on the hot path. */
197             static uint32_t rb_fork_gen = 1;
198             static pthread_once_t rb_atfork_once = PTHREAD_ONCE_INIT;
199 0           static void rb_on_fork_child(void) {
200 0           __atomic_add_fetch(&rb_fork_gen, 1, __ATOMIC_RELAXED);
201 0           }
202 2           static void rb_atfork_init(void) {
203 2           pthread_atfork(NULL, NULL, rb_on_fork_child);
204 2           }
205              
206             /* Ensure this process owns a reader slot. Called from the lock helpers so
207             * that fork()'d children pick up their own slot lazily instead of sharing
208             * the parent's. Hot-path is a single relaxed load + compare; only on a
209             * fork-generation mismatch do we touch getpid() and scan slots. */
210 62569           static inline void rb_claim_reader_slot(RbHandle *h) {
211 62569           uint32_t cur_gen = __atomic_load_n(&rb_fork_gen, __ATOMIC_RELAXED);
212 62569 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
213 62530           return;
214             /* Cold path -- register the atfork hook once per process, then claim. */
215 39           pthread_once(&rb_atfork_once, rb_atfork_init);
216             /* Re-read after pthread_once: rb_on_fork_child may have bumped it. */
217 39           cur_gen = __atomic_load_n(&rb_fork_gen, __ATOMIC_RELAXED);
218 39           uint32_t now_pid = (uint32_t)getpid();
219 39           h->cached_pid = now_pid;
220 39           h->cached_fork_gen = cur_gen;
221 39           h->my_slot_idx = UINT32_MAX;
222 39           uint32_t start = now_pid % RB_READER_SLOTS;
223 42 50         for (uint32_t i = 0; i < RB_READER_SLOTS; i++) {
224 42           uint32_t s = (start + i) % RB_READER_SLOTS;
225 42           uint32_t expected = 0;
226 42 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
227             &expected, now_pid, 0,
228             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
229             /* Zero all mirror fields, not just subcount: a SIGKILL'd
230             * predecessor may have left waiters_parked/writers_parked
231             * non-zero, and rb_recover_dead_readers won't drain them
232             * once we own the slot (the CAS expects the dead PID). */
233 39           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
234 39           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
235 39           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
236 39           h->my_slot_idx = s;
237 39           return;
238             }
239             }
240             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
241             * tracking for this handle (lock still works; just no recovery). */
242             }
243              
244             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
245 0           static inline void rb_atomic_sub_cap(uint32_t *p, uint32_t sub) {
246 0 0         if (!sub) return;
247 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
248 0           for (;;) {
249 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
250 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
251             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
252 0           return;
253             }
254             }
255              
256             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
257             * contributions back to the global counters. A no-op if the slot was stolen
258             * by another recoverer or had no waiter contribution to drain.
259             *
260             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
261             * rb_claim_reader_slot zeros all three on every claim, so leaving stale
262             * values is harmless. */
263 0           static inline void rb_drain_dead_slot(RbHandle *h, uint32_t i, uint32_t pid) {
264 0           RbHeader *hdr = h->hdr;
265 0           uint32_t expected = pid;
266             /* ACQ_REL on success: RELEASE publishes pid=0; ACQUIRE syncs us with the
267             * dead process's prior writes to waiters_parked/writers_parked. */
268 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
269             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
270 0           return;
271 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
272 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
273 0 0         if (wp) rb_atomic_sub_cap(&hdr->rwlock_waiters, wp);
274 0 0         if (writp) rb_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
275             }
276              
277             /* Scan reader slots for dead-process recovery.
278             *
279             * For each dead PID with non-zero contributions to the shared rwlock,
280             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
281             * out so live processes don't have to wait for the slow per-op timeout
282             * decrement to drain it for them. */
283 0           static inline void rb_recover_dead_readers(RbHandle *h) {
284 0 0         if (!h->reader_slots) return;
285 0           RbHeader *hdr = h->hdr;
286 0           int any_live_reader = 0;
287 0           int found_dead_reader = 0;
288              
289             /* Pass 1: classify slots. Dead pid with sc == 0 is wiped immediately to
290             * free the slot and drain orphan parked-waiter counters. Dead pid with
291             * sc > 0 is left intact: if force-reset cannot fire (a live reader is
292             * concurrently present), wiping it would lose the only record of its
293             * orphan rwlock contribution and strand writers. */
294 0 0         for (uint32_t i = 0; i < RB_READER_SLOTS; i++) {
295 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
296 0 0         if (pid == 0) continue;
297 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
298 0 0         if (rb_pid_alive(pid)) {
299 0 0         if (sc > 0) any_live_reader = 1;
300 0           continue;
301             }
302 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
303 0           rb_drain_dead_slot(h, i, pid);
304             }
305              
306             /* Pass 2: only if force-reset will fire. Issue the rwlock force-reset
307             * CAS FIRST (narrow window since pass 1), then wipe the deferred dead
308             * slots outside the race-sensitive window. */
309 0 0         if (found_dead_reader && !any_live_reader) {
    0          
310 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
311 0 0         if (cur > 0 && cur < RB_RWLOCK_WRITER_BIT) {
    0          
312 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
313             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
314 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
315 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
316             }
317             }
318 0 0         for (uint32_t i = 0; i < RB_READER_SLOTS; i++) {
319 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
320 0 0         if (pid == 0 || rb_pid_alive(pid)) continue;
    0          
321 0           rb_drain_dead_slot(h, i, pid);
322             }
323             }
324             }
325              
326             /* Inspect the lock word after a futex-wait timeout. If a dead writer
327             * holds it, force-recover the lock. Otherwise drain dead readers' shares
328             * of the rwlock/waiter counters. */
329 0           static inline void rb_recover_after_timeout(RbHandle *h) {
330 0           RbHeader *hdr = h->hdr;
331 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
332 0 0         if (val >= RB_RWLOCK_WRITER_BIT) {
333 0           uint32_t pid = val & RB_RWLOCK_PID_MASK;
334 0 0         if (!rb_pid_alive(pid))
335 0           rb_recover_stale_lock(h, val);
336             } else {
337 0           rb_recover_dead_readers(h);
338             }
339 0           }
340              
341             /* Park/unpark helpers: bump the global waiter counters together with this
342             * process's mirrored slot counters so a wrlock-timeout recovery scan can
343             * attribute and reverse a dead PID's contribution. */
344 0           static inline void rb_park_reader(RbHandle *h) {
345 0 0         if (h->my_slot_idx != UINT32_MAX)
346 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
347 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
348 0           }
349 0           static inline void rb_unpark_reader(RbHandle *h) {
350 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
351 0 0         if (h->my_slot_idx != UINT32_MAX)
352 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
353 0           }
354 0           static inline void rb_park_writer(RbHandle *h) {
355 0 0         if (h->my_slot_idx != UINT32_MAX) {
356 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
357 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
358             }
359 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
360 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
361 0           }
362 0           static inline void rb_unpark_writer(RbHandle *h) {
363 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
364 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
365 0 0         if (h->my_slot_idx != UINT32_MAX) {
366 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
367 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
368             }
369 0           }
370              
371 12904           static inline void rb_rwlock_rdlock(RbHandle *h) {
372 12904           rb_claim_reader_slot(h);
373 12904           RbHeader *hdr = h->hdr;
374 12904           uint32_t *lock = &hdr->rwlock;
375 12904           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
376             /* Claim subcount BEFORE bumping the shared rwlock counter so a concurrent
377             * writer-side recovery scan that sees our PID alive with subcount > 0 will
378             * (correctly) defer force-reset even while we are still spinning. */
379 12904 50         if (h->my_slot_idx != UINT32_MAX)
380 12904           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
381 12904           for (int spin = 0; ; spin++) {
382 12904           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
383             /* Write-preferring: when free (cur==0) and writers wait, yield. When
384             * readers are already active (cur>=1), new readers may join freely. */
385 12904 50         if (cur > 0 && cur < RB_RWLOCK_WRITER_BIT) {
    0          
386 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
387             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
388 12904           return;
389 12904 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
390 12904 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
391             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
392 12904           return;
393             }
394 0 0         if (__builtin_expect(spin < RB_RWLOCK_SPIN_LIMIT, 1)) {
395 0           rb_rwlock_spin_pause();
396 0           continue;
397             }
398 0           rb_park_reader(h);
399 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
400 0 0         if (cur >= RB_RWLOCK_WRITER_BIT || cur == 0) {
    0          
401 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
402             &rb_lock_timeout, NULL, 0);
403 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
404 0           rb_unpark_reader(h);
405 0           rb_recover_after_timeout(h);
406 0           spin = 0;
407 0           continue;
408             }
409             }
410 0           rb_unpark_reader(h);
411 0           spin = 0;
412             }
413             }
414              
415 12904           static inline void rb_rwlock_rdunlock(RbHandle *h) {
416 12904           RbHeader *hdr = h->hdr;
417             /* Release the shared counter BEFORE dropping our subcount so "any live PID
418             * with subcount > 0" stays a reliable in-flight indicator for recovery. */
419 12904           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
420 12904 50         if (h->my_slot_idx != UINT32_MAX)
421 12904           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
422 12904 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
423 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
424 12904           }
425              
426 49665           static inline void rb_rwlock_wrlock(RbHandle *h) {
427 49665           rb_claim_reader_slot(h); /* refresh cached_pid across fork */
428 49665           RbHeader *hdr = h->hdr;
429 49665           uint32_t *lock = &hdr->rwlock;
430             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate any
431             * crash window between acquiring the lock and storing the owner. */
432 49665           uint32_t mypid = RB_RWLOCK_WR(h->cached_pid);
433 49665           for (int spin = 0; ; spin++) {
434 49665           uint32_t expected = 0;
435 49665 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
436             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
437 49665           return;
438 0 0         if (__builtin_expect(spin < RB_RWLOCK_SPIN_LIMIT, 1)) {
439 0           rb_rwlock_spin_pause();
440 0           continue;
441             }
442 0           rb_park_writer(h);
443 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
444 0 0         if (cur != 0) {
445 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
446             &rb_lock_timeout, NULL, 0);
447 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
448 0           rb_unpark_writer(h);
449 0           rb_recover_after_timeout(h);
450 0           spin = 0;
451 0           continue;
452             }
453             }
454 0           rb_unpark_writer(h);
455 0           spin = 0;
456             }
457             }
458              
459 49665           static inline void rb_rwlock_wrunlock(RbHandle *h) {
460 49665           RbHeader *hdr = h->hdr;
461 49665           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
462 49665 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
463 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
464 49665           }
465              
466             /* ================================================================
467             * Layout math + bucket-table / container-pool accessors
468             *
469             * Layout: Header -> reader_slots[1024] -> bucket_table[65536] -> container_pool
470             * RbReaderSlot is 16 bytes and RbBucket is 16 bytes, so every region begins on
471             * a 16-byte boundary; container slots are 8192 bytes (8-byte aligned for the
472             * uint64 bitmap interpretation).
473             * ================================================================ */
474              
475             typedef struct { uint64_t reader_slots, bucket_table, container_pool; } RbLayout;
476              
477 132           static inline RbLayout rb_layout(void) {
478             RbLayout L;
479 132           L.reader_slots = sizeof(RbHeader);
480 132           L.bucket_table = L.reader_slots + (uint64_t)RB_READER_SLOTS * sizeof(RbReaderSlot);
481 132           L.container_pool = L.bucket_table + (uint64_t)RB_NUM_BUCKETS * sizeof(RbBucket);
482 132           return L;
483             }
484              
485 89           static inline uint64_t rb_total_size(uint32_t container_cap) {
486 89           RbLayout L = rb_layout();
487 89           return L.container_pool + (uint64_t)container_cap * RB_CONTAINER_BYTES;
488             }
489              
490 185285           static inline RbBucket *rb_buckets(RbHandle *h) {
491 185285           return (RbBucket *)((char *)h->base + h->hdr->bucket_table_off);
492             }
493 102896           static inline uint8_t *rb_pool(RbHandle *h) {
494 102896           return (uint8_t *)((char *)h->base + h->hdr->container_pool_off);
495             }
496             /* Base pointer of container slot `i` (1-based; i==0 is the NULL sentinel). */
497 102896           static inline void *rb_slot(RbHandle *h, uint32_t i) {
498 102896           return (void *)(rb_pool(h) + (size_t)i * RB_CONTAINER_BYTES);
499             }
500 78022           static inline uint16_t *rb_array(RbHandle *h, uint32_t i) {
501 78022           return (uint16_t *)rb_slot(h, i);
502             }
503 24358           static inline uint64_t *rb_bitmap(RbHandle *h, uint32_t i) {
504 24358           return (uint64_t *)rb_slot(h, i);
505             }
506              
507             /* ================================================================
508             * Container-slot allocation (freelist). Callers hold the WRITE lock.
509             * ================================================================ */
510              
511             /* Number of container slots available to satisfy future allocations:
512             * fresh high-water headroom plus returned slots on the freelist. */
513 464           static inline uint32_t rb_avail_slots(RbHandle *h) {
514 464           RbHeader *hdr = h->hdr;
515 464           return (hdr->container_cap - hdr->container_used) + hdr->free_count;
516             }
517              
518             /* Allocate one container slot. Returns a 1-based slot index, or 0 if the pool
519             * is exhausted. The returned slot is fully zeroed. Pops the freelist first
520             * (the freelist threads through the first 4 bytes of each freed slot), else
521             * bumps the high-water mark. */
522 454           static inline uint32_t rb_alloc_slot(RbHandle *h) {
523 454           RbHeader *hdr = h->hdr;
524             uint32_t idx;
525 454 100         if (hdr->free_head) {
526 1           idx = hdr->free_head;
527 1           hdr->free_head = *(uint32_t *)rb_slot(h, idx); /* next free slot */
528 1           hdr->free_count--;
529 453 50         } else if (hdr->container_used < hdr->container_cap) {
530 453           idx = hdr->container_used++;
531             } else {
532 0           return 0;
533             }
534 454           memset(rb_slot(h, idx), 0, RB_CONTAINER_BYTES);
535 454           return idx;
536             }
537              
538             /* Return a container slot to the freelist. */
539 61           static inline void rb_free_slot(RbHandle *h, uint32_t i) {
540 61           RbHeader *hdr = h->hdr;
541 61           *(uint32_t *)rb_slot(h, i) = hdr->free_head;
542 61           hdr->free_head = i;
543 61           hdr->free_count++;
544 61           }
545              
546             /* ================================================================
547             * Bit / popcount helpers
548             * ================================================================ */
549              
550 6           static inline uint64_t rb_popcount_bitmap(const uint64_t *bits) {
551 6           uint64_t c = 0;
552 6150 100         for (uint32_t w = 0; w < 1024; w++) c += (uint64_t)__builtin_popcountll(bits[w]);
553 6           return c;
554             }
555              
556             /* Binary search for `lo` in a sorted uint16 array of `card` entries. Returns
557             * 1 and sets *pos to the index if found; returns 0 and sets *pos to the
558             * insertion point (lower bound) if absent. */
559 76689           static inline int rb_array_search(const uint16_t *vals, uint32_t card, uint16_t lo, uint32_t *pos) {
560 76689           uint32_t lo_i = 0, hi_i = card;
561 841833 100         while (lo_i < hi_i) {
562 765144           uint32_t mid = lo_i + ((hi_i - lo_i) >> 1);
563 765144 100         if (vals[mid] < lo) lo_i = mid + 1;
564 202415           else hi_i = mid;
565             }
566 76689           *pos = lo_i;
567 76689 100         return (lo_i < card && vals[lo_i] == lo);
    100          
568             }
569              
570             /* ================================================================
571             * Single-bitmap operations. Callers hold the WRITE lock (mutators) or the
572             * READ lock (rb_contains_locked).
573             * ================================================================ */
574              
575             /* Convert bucket `hi`'s array container to a bitmap, in place (same slot).
576             * The slot is reinterpreted, so the array values are copied to a C-stack temp
577             * first. The array is at most RB_ARRAY_MAX entries (a full array container). */
578 13           static inline void rb_array_to_bitmap(RbHandle *h, uint32_t hi) {
579 13           RbBucket *bt = &rb_buckets(h)[hi];
580 13           uint32_t card = bt->cardinality;
581             uint16_t tmp[RB_ARRAY_MAX];
582 13           uint16_t *vals = rb_array(h, bt->container_off);
583 13           memcpy(tmp, vals, (size_t)card * sizeof(uint16_t));
584 13           uint64_t *bits = rb_bitmap(h, bt->container_off);
585 13           memset(bits, 0, RB_CONTAINER_BYTES);
586 53261 100         for (uint32_t i = 0; i < card; i++) {
587 53248           uint16_t lo = tmp[i];
588 53248           bits[lo >> 6] |= (uint64_t)1 << (lo & 63);
589             }
590 13           bt->type = RB_TYPE_BITMAP;
591 13           }
592              
593             /* Add x to the set. Returns 1 if newly added, 0 if already present. The
594             * caller has verified (under the lock) that a free slot exists when the target
595             * bucket is currently empty. */
596 83698           static inline int rb_add_locked(RbHandle *h, uint32_t x) {
597 83698           uint32_t hi = x >> 16;
598 83698           uint16_t lo = (uint16_t)(x & 0xffff);
599 83698           RbBucket *bt = &rb_buckets(h)[hi];
600              
601 83698 100         if (bt->type == RB_TYPE_NONE) {
602 454           uint32_t s = rb_alloc_slot(h); /* guaranteed available by caller */
603 454           bt->container_off = s;
604 454           bt->type = RB_TYPE_ARRAY;
605 454           bt->cardinality = 1;
606 454           rb_array(h, s)[0] = lo;
607 454           h->hdr->cardinality++;
608 454           return 1;
609             }
610 83244 100         if (bt->type == RB_TYPE_ARRAY) {
611 71969           uint16_t *vals = rb_array(h, bt->container_off);
612             uint32_t pos;
613 71969 100         if (rb_array_search(vals, bt->cardinality, lo, &pos)) return 0;
614             /* A full array container (RB_ARRAY_MAX entries) cannot hold one more
615             * value without overflowing its fixed-size slot; promote it to a
616             * bitmap FIRST, then set the bit. (The new value is genuinely absent,
617             * confirmed above, so this always grows the set.) */
618 71956 100         if (bt->cardinality >= RB_ARRAY_MAX) {
619 13           rb_array_to_bitmap(h, hi);
620 13           uint64_t *bits = rb_bitmap(h, bt->container_off);
621 13           bits[lo >> 6] |= (uint64_t)1 << (lo & 63);
622 13           bt->cardinality++;
623 13           h->hdr->cardinality++;
624 13           return 1;
625             }
626 71943           memmove(&vals[pos + 1], &vals[pos], (size_t)(bt->cardinality - pos) * sizeof(uint16_t));
627 71943           vals[pos] = lo;
628 71943           bt->cardinality++;
629 71943           h->hdr->cardinality++;
630 71943           return 1;
631             }
632             /* bitmap */
633             {
634 11275           uint64_t *bits = rb_bitmap(h, bt->container_off);
635 11275           uint32_t w = lo >> 6;
636 11275           uint64_t b = (uint64_t)1 << (lo & 63);
637 11275 50         if (bits[w] & b) return 0;
638 11275           bits[w] |= b;
639 11275           bt->cardinality++;
640 11275           h->hdr->cardinality++;
641 11275           return 1;
642             }
643             }
644              
645             /* Membership test. Read-only. */
646 12784           static inline int rb_contains_locked(RbHandle *h, uint32_t x) {
647 12784           uint32_t hi = x >> 16;
648 12784           uint16_t lo = (uint16_t)(x & 0xffff);
649 12784           RbBucket *bt = &rb_buckets(h)[hi];
650 12784 100         if (bt->type == RB_TYPE_NONE) return 0;
651 12746 100         if (bt->type == RB_TYPE_ARRAY) {
652             uint32_t pos;
653 4718           return rb_array_search(rb_array(h, bt->container_off), bt->cardinality, lo, &pos);
654             }
655             {
656 8028           uint64_t *bits = rb_bitmap(h, bt->container_off);
657 8028           return (bits[lo >> 6] >> (lo & 63)) & 1;
658             }
659             }
660              
661             /* Remove x from the set. Returns 1 if removed, 0 if absent. Frees the slot
662             * (and clears the bucket) when the last element of a bucket is removed. v1
663             * does NOT down-convert a bitmap to an array. */
664 5004           static inline int rb_remove_locked(RbHandle *h, uint32_t x) {
665 5004           uint32_t hi = x >> 16;
666 5004           uint16_t lo = (uint16_t)(x & 0xffff);
667 5004           RbBucket *bt = &rb_buckets(h)[hi];
668 5004 100         if (bt->type == RB_TYPE_NONE) return 0;
669 5003 100         if (bt->type == RB_TYPE_ARRAY) {
670 2           uint16_t *vals = rb_array(h, bt->container_off);
671             uint32_t pos;
672 2 50         if (!rb_array_search(vals, bt->cardinality, lo, &pos)) return 0;
673 2           memmove(&vals[pos], &vals[pos + 1], (size_t)(bt->cardinality - pos - 1) * sizeof(uint16_t));
674 2           bt->cardinality--;
675 2           h->hdr->cardinality--;
676 2 100         if (bt->cardinality == 0) {
677 1           rb_free_slot(h, bt->container_off);
678 1           bt->container_off = 0;
679 1           bt->type = RB_TYPE_NONE;
680             }
681 2           return 1;
682             }
683             {
684 5001           uint64_t *bits = rb_bitmap(h, bt->container_off);
685 5001           uint32_t w = lo >> 6;
686 5001           uint64_t b = (uint64_t)1 << (lo & 63);
687 5001 100         if (!(bits[w] & b)) return 0;
688 5000           bits[w] &= ~b;
689 5000           bt->cardinality--;
690 5000           h->hdr->cardinality--;
691 5000 100         if (bt->cardinality == 0) {
692 1           rb_free_slot(h, bt->container_off);
693 1           bt->container_off = 0;
694 1           bt->type = RB_TYPE_NONE;
695             }
696 5000           return 1;
697             }
698             }
699              
700             /* Reset to empty: free every container, zero the bucket table, reset the pool.
701             * Caller holds the write lock. */
702 1           static inline void rb_clear_locked(RbHandle *h) {
703 1           RbHeader *hdr = h->hdr;
704 1           RbBucket *bt = rb_buckets(h);
705 1           memset(bt, 0, (size_t)RB_NUM_BUCKETS * sizeof(RbBucket));
706 1           hdr->container_used = 1; /* slot 0 reserved */
707 1           hdr->free_head = 0;
708 1           hdr->free_count = 0;
709 1           hdr->cardinality = 0;
710 1           }
711              
712             /* Smallest set element. Returns 1 and sets *out, else 0 (empty set). */
713 8           static inline int rb_min_locked(RbHandle *h, uint32_t *out) {
714 8           RbBucket *bt = rb_buckets(h);
715 65544 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
716 65543 100         if (bt[hi].type == RB_TYPE_NONE) continue;
717 7 100         if (bt[hi].type == RB_TYPE_ARRAY) {
718 4           *out = (hi << 16) | rb_array(h, bt[hi].container_off)[0];
719 4           return 1;
720             }
721             {
722 3           uint64_t *bits = rb_bitmap(h, bt[hi].container_off);
723 3 50         for (uint32_t w = 0; w < 1024; w++) {
724 3 50         if (bits[w]) {
725 3           uint32_t lo = (w << 6) + (uint32_t)__builtin_ctzll(bits[w]);
726 3           *out = (hi << 16) | lo;
727 3           return 1;
728             }
729             }
730             }
731             }
732 1           return 0;
733             }
734              
735             /* Largest set element. Returns 1 and sets *out, else 0 (empty set). */
736 8           static inline int rb_max_locked(RbHandle *h, uint32_t *out) {
737 8           RbBucket *bt = rb_buckets(h);
738 458526 100         for (uint32_t hi = RB_NUM_BUCKETS; hi-- > 0; ) {
739 458525 100         if (bt[hi].type == RB_TYPE_NONE) continue;
740 7 100         if (bt[hi].type == RB_TYPE_ARRAY) {
741 5           *out = (hi << 16) | rb_array(h, bt[hi].container_off)[bt[hi].cardinality - 1];
742 5           return 1;
743             }
744             {
745 2           uint64_t *bits = rb_bitmap(h, bt[hi].container_off);
746 1892 50         for (uint32_t w = 1024; w-- > 0; ) {
747 1892 100         if (bits[w]) {
748 2           uint32_t lo = (w << 6) + (63 - (uint32_t)__builtin_clzll(bits[w]));
749 2           *out = (hi << 16) | lo;
750 2           return 1;
751             }
752             }
753             }
754             }
755 1           return 0;
756             }
757              
758             /* Count the number of non-empty buckets (for stats). Read-only. */
759 12           static inline uint32_t rb_buckets_used(RbHandle *h) {
760 12           RbBucket *bt = rb_buckets(h);
761 12           uint32_t n = 0;
762 786444 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++)
763 786432 100         if (bt[hi].type != RB_TYPE_NONE) n++;
764 12           return n;
765             }
766              
767             /* ================================================================
768             * In-place set operations. Callers hold a's WRITE lock and b's READ lock
769             * (acquired in a globally-consistent order keyed on the shared-memory
770             * bitmap_id to avoid cross-process deadlock). a and b are guaranteed to be
771             * DISTINCT underlying bitmaps (the same-bitmap case -- whether o==h or merely
772             * o->hdr->bitmap_id == h->hdr->bitmap_id -- is a no-op handled by the caller).
773             * ================================================================ */
774              
775             /* a |= b for a single bucket where a's container is a bitmap. Reads b's
776             * container of either kind; returns the new popcount. */
777 4           static inline uint32_t rb_or_into_bitmap(uint64_t *abits, RbHandle *b, const RbBucket *bbt) {
778 4 100         if (bbt->type == RB_TYPE_ARRAY) {
779 3           const uint16_t *bv = rb_array(b, bbt->container_off);
780 4025 100         for (uint32_t i = 0; i < bbt->cardinality; i++) {
781 4022           uint16_t lo = bv[i];
782 4022           abits[lo >> 6] |= (uint64_t)1 << (lo & 63);
783             }
784             } else { /* bitmap */
785 1           const uint64_t *bb = rb_bitmap(b, bbt->container_off);
786 1025 100         for (uint32_t w = 0; w < 1024; w++) abits[w] |= bb[w];
787             }
788 4           return (uint32_t)rb_popcount_bitmap(abits);
789             }
790              
791             /* Pre-count how many NEW container slots a |= b will need: one per bucket that
792             * b occupies and a does not. Caller holds both locks. */
793 7           static inline uint32_t rb_union_new_slots_needed(RbHandle *a, RbHandle *b) {
794 7           RbBucket *abt = rb_buckets(a);
795 7           RbBucket *bbt = rb_buckets(b);
796 7           uint32_t need = 0;
797 458759 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
798 458752 100         if (bbt[hi].type != RB_TYPE_NONE && abt[hi].type == RB_TYPE_NONE) need++;
    100          
799             }
800 7           return need;
801             }
802              
803             /* Recompute the total cardinality from the per-bucket cards (after a set op). */
804 11           static inline void rb_recompute_cardinality(RbHandle *a, RbBucket *abt) {
805 11           uint64_t total = 0;
806 720907 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++)
807 720896 100         if (abt[hi].type != RB_TYPE_NONE) total += abt[hi].cardinality;
808 11           a->hdr->cardinality = total;
809 11           }
810              
811             /* a |= b. Caller has verified rb_avail_slots(a) >= rb_union_new_slots_needed.
812             * Every bucket combination is handled in place. */
813 6           static inline void rb_union_locked(RbHandle *a, RbHandle *b) {
814 6           RbBucket *abt = rb_buckets(a);
815 6           RbBucket *bbt = rb_buckets(b);
816 393222 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
817 393216 100         if (bbt[hi].type == RB_TYPE_NONE) continue;
818              
819 127 50         if (abt[hi].type == RB_TYPE_NONE) {
820             /* a lacks this bucket -> copy b's container wholesale. */
821 0           uint32_t s = rb_alloc_slot(a); /* guaranteed available */
822 0           memcpy(rb_slot(a, s), rb_slot(b, bbt[hi].container_off), RB_CONTAINER_BYTES);
823 0           abt[hi].container_off = s;
824 0           abt[hi].type = bbt[hi].type;
825 0           abt[hi].cardinality = bbt[hi].cardinality;
826 0           continue;
827             }
828              
829 127 100         if (abt[hi].type == RB_TYPE_ARRAY && bbt[hi].type == RB_TYPE_ARRAY) {
    100          
830             /* array | array -> merge sorted, dedup, into a C-stack temp. */
831             uint16_t tmp[2 * RB_ARRAY_MAX];
832 122           uint16_t *av = rb_array(a, abt[hi].container_off);
833 122           const uint16_t *bv = rb_array(b, bbt[hi].container_off);
834 122           uint32_t ai = 0, bi = 0, n = 0;
835 122           uint32_t ac = abt[hi].cardinality, bc = bbt[hi].cardinality;
836 10584 100         while (ai < ac && bi < bc) {
    50          
837 10462           uint16_t x = av[ai], y = bv[bi];
838 10462 100         if (x < y) { tmp[n++] = x; ai++; }
839 6675 100         else if (x > y) { tmp[n++] = y; bi++; }
840 3009           else { tmp[n++] = x; ai++; bi++; }
841             }
842 122 50         while (ai < ac) tmp[n++] = av[ai++];
843 246 100         while (bi < bc) tmp[n++] = bv[bi++];
844 122 100         if (n <= RB_ARRAY_MAX) {
845 120           memcpy(av, tmp, (size_t)n * sizeof(uint16_t));
846 120           abt[hi].cardinality = n;
847             } else {
848 2           uint64_t *bits = rb_bitmap(a, abt[hi].container_off);
849 2           memset(bits, 0, RB_CONTAINER_BYTES);
850 9028 100         for (uint32_t i = 0; i < n; i++) bits[tmp[i] >> 6] |= (uint64_t)1 << (tmp[i] & 63);
851 2           abt[hi].type = RB_TYPE_BITMAP;
852 2           abt[hi].cardinality = n;
853             }
854 122           continue;
855             }
856              
857 5 100         if (abt[hi].type == RB_TYPE_ARRAY && bbt[hi].type == RB_TYPE_BITMAP) {
    50          
858             /* array(a) | bitmap(b): copy b's bitmap into a, then OR a's old
859             * array values back in. Snapshot a's array first (slot reused). */
860             uint16_t tmp[RB_ARRAY_MAX];
861 1           uint16_t *av = rb_array(a, abt[hi].container_off);
862 1           uint32_t ac = abt[hi].cardinality;
863 1           memcpy(tmp, av, (size_t)ac * sizeof(uint16_t));
864 1           uint64_t *abits = rb_bitmap(a, abt[hi].container_off);
865 1           memcpy(abits, rb_bitmap(b, bbt[hi].container_off), RB_CONTAINER_BYTES);
866 2 100         for (uint32_t i = 0; i < ac; i++) abits[tmp[i] >> 6] |= (uint64_t)1 << (tmp[i] & 63);
867 1           abt[hi].type = RB_TYPE_BITMAP;
868 1           abt[hi].cardinality = (uint32_t)rb_popcount_bitmap(abits);
869 1           continue;
870             }
871              
872             /* bitmap(a) | array(b) and bitmap(a) | bitmap(b) */
873             {
874 4           uint64_t *abits = rb_bitmap(a, abt[hi].container_off);
875 4           abt[hi].cardinality = rb_or_into_bitmap(abits, b, &bbt[hi]);
876             }
877             }
878              
879 6           rb_recompute_cardinality(a, abt);
880 6           }
881              
882             /* a &= b. Never needs new slots (intersection only shrinks or frees). Caller
883             * holds a's write lock and b's read lock. */
884 5           static inline void rb_intersect_locked(RbHandle *a, RbHandle *b) {
885 5           RbBucket *abt = rb_buckets(a);
886 5           RbBucket *bbt = rb_buckets(b);
887 327685 100         for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
888 327680 100         if (abt[hi].type == RB_TYPE_NONE) continue;
889              
890 126 50         if (bbt[hi].type == RB_TYPE_NONE) {
891 0           rb_free_slot(a, abt[hi].container_off);
892 0           abt[hi].container_off = 0;
893 0           abt[hi].type = RB_TYPE_NONE;
894 0           abt[hi].cardinality = 0;
895 0           continue;
896             }
897              
898 126 100         if (abt[hi].type == RB_TYPE_ARRAY && bbt[hi].type == RB_TYPE_ARRAY) {
    100          
899             /* array & array -> two-pointer intersect into a C-stack temp. */
900             uint16_t tmp[RB_ARRAY_MAX];
901 121           uint16_t *av = rb_array(a, abt[hi].container_off);
902 121           const uint16_t *bv = rb_array(b, bbt[hi].container_off);
903 121           uint32_t ai = 0, bi = 0, n = 0;
904 121           uint32_t ac = abt[hi].cardinality, bc = bbt[hi].cardinality;
905 6069 100         while (ai < ac && bi < bc) {
    100          
906 5948           uint16_t x = av[ai], y = bv[bi];
907 5948 100         if (x < y) ai++;
908 3726 100         else if (x > y) bi++;
909 1893           else { tmp[n++] = x; ai++; bi++; }
910             }
911 121           memcpy(av, tmp, (size_t)n * sizeof(uint16_t));
912 121           abt[hi].cardinality = n;
913             }
914 5 100         else if (abt[hi].type == RB_TYPE_ARRAY && bbt[hi].type == RB_TYPE_BITMAP) {
    50          
915             /* array(a) & bitmap(b): keep a's values whose bit is set in b. */
916 1           uint16_t *av = rb_array(a, abt[hi].container_off);
917 1           const uint64_t *bb = rb_bitmap(b, bbt[hi].container_off);
918 1           uint32_t n = 0;
919 2 100         for (uint32_t i = 0; i < abt[hi].cardinality; i++) {
920 1           uint16_t lo = av[i];
921 1 50         if ((bb[lo >> 6] >> (lo & 63)) & 1) av[n++] = lo;
922             }
923 1           abt[hi].cardinality = n;
924             }
925 4 50         else if (abt[hi].type == RB_TYPE_BITMAP && bbt[hi].type == RB_TYPE_ARRAY) {
    100          
926             /* bitmap(a) & array(b) -> result is b's values that are set in a;
927             * write it back as an ARRAY into a's slot. Snapshot b's array to
928             * a temp (a's slot is being overwritten). */
929             uint16_t tmp[RB_ARRAY_MAX];
930 3           const uint16_t *bv = rb_array(b, bbt[hi].container_off);
931 3           uint64_t *abits = rb_bitmap(a, abt[hi].container_off);
932 3           uint32_t n = 0;
933 5023 100         for (uint32_t i = 0; i < bbt[hi].cardinality; i++) {
934 5020           uint16_t lo = bv[i];
935 5020 100         if ((abits[lo >> 6] >> (lo & 63)) & 1) tmp[n++] = lo;
936             }
937 3           uint16_t *av = rb_array(a, abt[hi].container_off);
938 3           memcpy(av, tmp, (size_t)n * sizeof(uint16_t)); /* same slot, array view */
939 3           abt[hi].type = RB_TYPE_ARRAY;
940 3           abt[hi].cardinality = n;
941             }
942             else { /* bitmap(a) & bitmap(b) */
943 1           uint64_t *abits = rb_bitmap(a, abt[hi].container_off);
944 1           const uint64_t *bb = rb_bitmap(b, bbt[hi].container_off);
945 1025 100         for (uint32_t w = 0; w < 1024; w++) abits[w] &= bb[w];
946 1           abt[hi].cardinality = (uint32_t)rb_popcount_bitmap(abits);
947             }
948              
949             /* If the bucket emptied, free its slot. */
950 126 100         if (abt[hi].cardinality == 0) {
951 59           rb_free_slot(a, abt[hi].container_off);
952 59           abt[hi].container_off = 0;
953 59           abt[hi].type = RB_TYPE_NONE;
954             }
955             }
956              
957 5           rb_recompute_cardinality(a, abt);
958 5           }
959              
960             /* ================================================================
961             * Validate args + header init / setup / open / destroy
962             * ================================================================ */
963              
964             /* Validate create args. Single source of truth: the XS layer does NOT
965             * duplicate these range checks. */
966 46           static int rb_validate_create_args(uint64_t container_cap, char *errbuf) {
967 46 50         if (errbuf) errbuf[0] = '\0';
968 46 100         if (container_cap < 1) { RB_ERR("container_capacity must be >= 1"); return 0; }
    50          
969 44 100         if (container_cap > RB_MAX_CONTAINERS) { RB_ERR("container_capacity must be <= %u", (unsigned)RB_MAX_CONTAINERS); return 0; }
    50          
970             {
971 43           uint64_t total = rb_total_size((uint32_t)container_cap);
972             if (total > (uint64_t)SIZE_MAX) { RB_ERR("requested mapping too large"); return 0; }
973             }
974 43           return 1;
975             }
976              
977             /* Generate a non-zero per-bitmap identity, used ONLY at create time to order
978             * set-op lock acquisition consistently across unrelated processes. Prefers
979             * getrandom(); on any failure/short read falls back to a non-zero mix of pid,
980             * a process-local counter, and the header address. Never returns 0. */
981 40           static inline uint64_t rb_gen_bitmap_id(const void *hdr_addr) {
982             static uint32_t rb_id_counter = 0;
983 40           uint64_t id = 0;
984 40           ssize_t r = getrandom(&id, sizeof id, 0);
985 40 50         if (r != (ssize_t)sizeof id) {
986 0           uint32_t c = __atomic_add_fetch(&rb_id_counter, 1, __ATOMIC_RELAXED);
987 0           id = ((uint64_t)(uint32_t)getpid() << 32)
988 0           ^ ((uint64_t)c * 0x9E3779B97F4A7C15ull)
989 0           ^ (uint64_t)(uintptr_t)hdr_addr;
990             }
991 40 50         if (id == 0) id = 0x9E3779B97F4A7C15ull; /* never 0 */
992 40           return id;
993             }
994              
995 40           static inline void rb_init_header(void *base, uint32_t container_cap, uint64_t total_size) {
996 40           RbLayout L = rb_layout();
997 40           RbHeader *hdr = (RbHeader *)base;
998             /* Zero the header + reader-slot + bucket-table region. A fresh mapping is
999             * OS-zeroed, but zero explicitly for the reopen-of-anon path. */
1000 40           memset(base, 0, (size_t)L.container_pool);
1001 40           hdr->magic = RB_MAGIC;
1002 40           hdr->version = RB_VERSION;
1003 40           hdr->bitmap_id = rb_gen_bitmap_id(base);
1004 40           hdr->container_cap = container_cap;
1005 40           hdr->container_used = 1; /* slot 0 reserved as the NULL sentinel */
1006 40           hdr->free_head = 0;
1007 40           hdr->free_count = 0;
1008 40           hdr->cardinality = 0;
1009 40           hdr->total_size = total_size;
1010 40           hdr->reader_slots_off = L.reader_slots;
1011 40           hdr->bucket_table_off = L.bucket_table;
1012 40           hdr->container_pool_off = L.container_pool;
1013 40           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1014 40           }
1015              
1016 43           static inline RbHandle *rb_setup(void *base, size_t map_size,
1017             const char *path, int backing_fd) {
1018 43           RbHeader *hdr = (RbHeader *)base;
1019 43           RbHandle *h = (RbHandle *)calloc(1, sizeof(RbHandle));
1020 43 50         if (!h) {
1021 0           munmap(base, map_size);
1022 0 0         if (backing_fd >= 0) close(backing_fd);
1023 0           return NULL;
1024             }
1025 43           h->hdr = hdr;
1026 43           h->base = base;
1027 43           h->reader_slots = (RbReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
1028 43           h->mmap_size = map_size;
1029 43 100         h->path = path ? strdup(path) : NULL;
1030 43           h->backing_fd = backing_fd;
1031 43           h->my_slot_idx = UINT32_MAX;
1032 43           return h;
1033             }
1034              
1035             /* Validate a mapped header (shared by rb_create reopen and rb_open_fd).
1036             * Stored geometry wins on reopen. */
1037 3           static inline int rb_validate_header(const RbHeader *hdr, uint64_t file_size) {
1038 3 50         if (hdr->magic != RB_MAGIC) return 0;
1039 3 50         if (hdr->version != RB_VERSION) return 0;
1040 3 50         if (hdr->bitmap_id == 0) return 0; /* identity must have been set at create */
1041 3 50         if (hdr->container_cap < 1 || hdr->container_cap > RB_MAX_CONTAINERS) return 0;
    50          
1042 3 50         if (hdr->total_size != file_size) return 0;
1043 3 50         if (hdr->total_size != rb_total_size(hdr->container_cap)) return 0;
1044 3           RbLayout L = rb_layout();
1045 3 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
1046 3 50         if (hdr->bucket_table_off != L.bucket_table) return 0;
1047 3 50         if (hdr->container_pool_off != L.container_pool) return 0;
1048 3 50         if (hdr->container_used < 1 || hdr->container_used > hdr->container_cap) return 0;
    50          
1049 3 50         if (hdr->free_head >= hdr->container_cap) return 0;
1050 3 50         if (hdr->free_count > hdr->container_cap) return 0;
1051 3           return 1;
1052             }
1053              
1054 43           static RbHandle *rb_create(const char *path, uint64_t container_cap_in, char *errbuf) {
1055 43 100         if (!rb_validate_create_args(container_cap_in, errbuf)) return NULL;
1056 41           uint32_t container_cap = (uint32_t)container_cap_in;
1057              
1058 41           uint64_t total = rb_total_size(container_cap);
1059 41           int anonymous = (path == NULL);
1060 41           int fd = -1;
1061             size_t map_size;
1062             void *base;
1063              
1064 41 100         if (anonymous) {
1065 34           map_size = (size_t)total;
1066 34           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
1067 34 50         if (base == MAP_FAILED) { RB_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
1068             } else {
1069 7           fd = open(path, O_RDWR|O_CREAT, 0666);
1070 10 50         if (fd < 0) { RB_ERR("open: %s", strerror(errno)); return NULL; }
    0          
1071 7 50         if (flock(fd, LOCK_EX) < 0) { RB_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
1072             struct stat st;
1073 7 50         if (fstat(fd, &st) < 0) { RB_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1074 7           int is_new = (st.st_size == 0);
1075 7 100         if (!is_new && (uint64_t)st.st_size < sizeof(RbHeader)) {
    100          
1076 1 50         RB_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
1077 1           flock(fd, LOCK_UN); close(fd); return NULL;
1078             }
1079 6 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
1080 0 0         RB_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
1081             }
1082 6 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
1083 6           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1084 6 50         if (base == MAP_FAILED) { RB_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
1085 6 100         if (!is_new) {
1086 2 50         if (!rb_validate_header((RbHeader *)base, (uint64_t)st.st_size)) {
1087 0 0         RB_ERR("invalid roaring-bitmap file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1088             }
1089 2           flock(fd, LOCK_UN); close(fd);
1090 2           return rb_setup(base, map_size, path, -1);
1091             }
1092             }
1093 38           rb_init_header(base, container_cap, total);
1094 38 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
1095 38           return rb_setup(base, map_size, path, -1);
1096             }
1097              
1098 3           static RbHandle *rb_create_memfd(const char *name, uint64_t container_cap_in, char *errbuf) {
1099 3 100         if (!rb_validate_create_args(container_cap_in, errbuf)) return NULL;
1100 2           uint32_t container_cap = (uint32_t)container_cap_in;
1101              
1102 2           uint64_t total = rb_total_size(container_cap);
1103 2 100         int fd = memfd_create(name ? name : "roaring", MFD_CLOEXEC | MFD_ALLOW_SEALING);
1104 2 50         if (fd < 0) { RB_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1105 2 50         if (ftruncate(fd, (off_t)total) < 0) {
1106 0 0         RB_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
1107             }
1108 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
1109 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1110 2 50         if (base == MAP_FAILED) { RB_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
1111 2           rb_init_header(base, container_cap, total);
1112 2           return rb_setup(base, (size_t)total, NULL, fd);
1113             }
1114              
1115 2           static RbHandle *rb_open_fd(int fd, char *errbuf) {
1116 2 50         if (errbuf) errbuf[0] = '\0';
1117             struct stat st;
1118 2 50         if (fstat(fd, &st) < 0) { RB_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
1119 2 100         if ((uint64_t)st.st_size < sizeof(RbHeader)) { RB_ERR("too small"); return NULL; }
    50          
1120 1           size_t ms = (size_t)st.st_size;
1121 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1122 1 50         if (base == MAP_FAILED) { RB_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
1123 1 50         if (!rb_validate_header((RbHeader *)base, (uint64_t)st.st_size)) {
1124 0 0         RB_ERR("invalid roaring-bitmap file"); munmap(base, ms); return NULL;
1125             }
1126 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
1127 1 50         if (myfd < 0) { RB_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
1128 1           return rb_setup(base, ms, NULL, myfd);
1129             }
1130              
1131 43           static void rb_destroy(RbHandle *h) {
1132 43 50         if (!h) return;
1133 43 100         if (h->backing_fd >= 0) close(h->backing_fd);
1134 43 50         if (h->base) munmap(h->base, h->mmap_size);
1135 43           free(h->path);
1136 43           free(h);
1137             }
1138              
1139 2           static inline int rb_msync(RbHandle *h) {
1140 2 50         if (!h || !h->base) return 0;
    50          
1141 2           return msync(h->base, h->mmap_size, MS_SYNC);
1142             }
1143              
1144             #endif /* ROARING_H */