File Coverage

cms.h
Criterion Covered Total %
statement 223 351 63.5
branch 102 272 37.5
condition n/a
subroutine n/a
pod n/a
total 325 623 52.1


line stmt bran cond sub pod time code
1             /*
2             * cms.h -- Shared-memory Count-Min sketch for Linux
3             *
4             * Approximate frequency estimation over a stream in fixed memory. Each item is
5             * hashed once (XXH3-128); the two 64-bit halves drive one column per row
6             * (d-row double hashing) into a d x w counter matrix (w a power of two). add
7             * increments the d cells; estimate returns the minimum of the d cells, which
8             * never underestimates the true count and overestimates by at most epsilon*total
9             * with probability >= 1-delta. The matrix lives in a shared mapping so several
10             * processes share one sketch; a write-preferring futex rwlock with reader-slot
11             * dead-process recovery guards mutation. Two sketches of equal geometry can be
12             * merged (cellwise add -> sum of streams).
13             *
14             * Layout: Header -> reader_slots[1024] -> counters[d * w]
15             */
16              
17             #ifndef CMS_H
18             #define CMS_H
19              
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37              
38             #define XXH_INLINE_ALL
39             #include "xxhash.h"
40              
41             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
42             #error "cms.h: requires little-endian architecture"
43             #endif
44              
45              
46             /* ================================================================
47             * Constants
48             * ================================================================ */
49              
50             #define CMS_MAGIC 0x534D4F43U /* "COMS" (little-endian) */
51             #define CMS_VERSION 1
52             #define CMS_ERR_BUFLEN 256
53             #define CMS_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
54             #define CMS_MIN_W 2 /* floor on the column count (power of two) */
55             #define CMS_MAX_W 0x100000000ULL /* 2^32 columns cap */
56             #define CMS_MIN_D 1
57             #define CMS_MAX_D 32
58              
59             #define CMS_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, CMS_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
60              
61             /* ================================================================
62             * Structs
63             * ================================================================ */
64              
65             /* Per-process slot for dead-process recovery. Each shared rwlock counter
66             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
67             * is mirrored here so a wrlock timeout can attribute and reverse a dead
68             * process's contribution instead of waiting for the slow per-op timeout
69             * drain. */
70             typedef struct {
71             uint32_t pid; /* 0 = unclaimed */
72             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
73             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
74             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
75             } CmsReaderSlot;
76              
77             struct CmsHeader {
78             uint32_t magic, version; /* 0,4 */
79             uint32_t d; /* 8 rows (hash functions), in [1,32] */
80             uint32_t _pad0; /* 12 */
81             uint64_t w; /* 16 columns (power of two) */
82             uint64_t mask; /* 24 w - 1 (column index mask) */
83             uint64_t total; /* 32 sum of all increments (for stats) */
84             uint64_t total_size; /* 40 */
85             uint64_t reader_slots_off; /* 48 */
86             uint64_t counters_off; /* 56 */
87             uint32_t rwlock; /* 64 */
88             uint32_t rwlock_waiters; /* 68 */
89             uint32_t rwlock_writers_waiting; /* 72 */
90             uint32_t _pad1; /* 76 */
91             uint64_t stat_ops; /* 80 */
92             uint8_t _pad[168]; /* 88..255 */
93             };
94             typedef struct CmsHeader CmsHeader;
95              
96             _Static_assert(sizeof(CmsHeader) == 256, "CmsHeader must be 256 bytes");
97              
98             /* ---- Process-local handle ---- */
99              
100             typedef struct CmsHandle {
101             CmsHeader *hdr;
102             CmsReaderSlot *reader_slots; /* CMS_READER_SLOTS entries */
103             void *base; /* mmap base */
104             size_t mmap_size;
105             char *path; /* backing file path (strdup'd) */
106             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
107             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
108             uint32_t cached_pid; /* getpid() cached at last slot claim */
109             uint32_t cached_fork_gen; /* cms_fork_gen value at last slot claim */
110             } CmsHandle;
111              
112             /* ================================================================
113             * Futex-based write-preferring read-write lock
114             * with reader-slot dead-process recovery
115             * ================================================================ */
116              
117             #define CMS_RWLOCK_SPIN_LIMIT 32
118             #define CMS_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
119              
120 0           static inline void cms_rwlock_spin_pause(void) {
121             #if defined(__x86_64__) || defined(__i386__)
122 0           __asm__ volatile("pause" ::: "memory");
123             #elif defined(__aarch64__)
124             __asm__ volatile("yield" ::: "memory");
125             #else
126             __asm__ volatile("" ::: "memory");
127             #endif
128 0           }
129              
130             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
131             #define CMS_RWLOCK_WRITER_BIT 0x80000000U
132             #define CMS_RWLOCK_PID_MASK 0x7FFFFFFFU
133             #define CMS_RWLOCK_WR(pid) (CMS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & CMS_RWLOCK_PID_MASK))
134              
135             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
136             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
137             * lock-holder's PID is recycled to an unrelated live process before recovery
138             * runs, this reports "alive" and that slot's orphaned contribution is not
139             * reclaimed until the recycled process exits. Robust detection would require
140             * a per-slot process-start-time epoch (a header-layout/version change).
141             * Documented under "Crash Safety" in the POD. */
142 0           static inline int cms_pid_alive(uint32_t pid) {
143 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
144 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
145             }
146              
147             /* Force-recover a stale write lock left by a dead process.
148             * CAS to OUR pid to hold the lock while fixing shared state, then release.
149             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
150             * recovering process can detect and re-recover if we crash mid-recovery. */
151 0           static inline void cms_recover_stale_lock(CmsHandle *h, uint32_t observed_rwlock) {
152 0           CmsHeader *hdr = h->hdr;
153 0           uint32_t mypid = CMS_RWLOCK_WR((uint32_t)getpid());
154 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
155             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
156 0           return;
157             /* We now hold the write lock as mypid. No additional shared state needs
158             * repair here (this module has no seqlock); just release the lock. */
159 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
160 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
161 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
162             }
163              
164             static const struct timespec cms_lock_timeout = { CMS_LOCK_TIMEOUT_SEC, 0 };
165              
166             /* Process-global fork-generation counter. Incremented in the pthread_atfork
167             * child callback so every open handle detects a fork transition on the next
168             * lock call without paying a getpid() syscall on the hot path. */
169             static uint32_t cms_fork_gen = 1;
170             static pthread_once_t cms_atfork_once = PTHREAD_ONCE_INIT;
171 0           static void cms_on_fork_child(void) {
172 0           __atomic_add_fetch(&cms_fork_gen, 1, __ATOMIC_RELAXED);
173 0           }
174 2           static void cms_atfork_init(void) {
175 2           pthread_atfork(NULL, NULL, cms_on_fork_child);
176 2           }
177              
178             /* Ensure this process owns a reader slot. Called from the lock helpers so
179             * that fork()'d children pick up their own slot lazily instead of sharing
180             * the parent's. Hot-path is a single relaxed load + compare; only on a
181             * fork-generation mismatch do we touch getpid() and scan slots. */
182 17172           static inline void cms_claim_reader_slot(CmsHandle *h) {
183 17172           uint32_t cur_gen = __atomic_load_n(&cms_fork_gen, __ATOMIC_RELAXED);
184 17172 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
185 17153           return;
186             /* Cold path -- register the atfork hook once per process, then claim. */
187 19           pthread_once(&cms_atfork_once, cms_atfork_init);
188             /* Re-read after pthread_once: cms_on_fork_child may have bumped it. */
189 19           cur_gen = __atomic_load_n(&cms_fork_gen, __ATOMIC_RELAXED);
190 19           uint32_t now_pid = (uint32_t)getpid();
191 19           h->cached_pid = now_pid;
192 19           h->cached_fork_gen = cur_gen;
193 19           h->my_slot_idx = UINT32_MAX;
194 19           uint32_t start = now_pid % CMS_READER_SLOTS;
195 21 50         for (uint32_t i = 0; i < CMS_READER_SLOTS; i++) {
196 21           uint32_t s = (start + i) % CMS_READER_SLOTS;
197 21           uint32_t expected = 0;
198 21 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
199             &expected, now_pid, 0,
200             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
201             /* Zero all mirror fields, not just subcount: a SIGKILL'd
202             * predecessor may have left waiters_parked/writers_parked
203             * non-zero, and cms_recover_dead_readers won't drain them
204             * once we own the slot (the CAS expects the dead PID). */
205 19           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
206 19           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
207 19           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
208 19           h->my_slot_idx = s;
209 19           return;
210             }
211             }
212             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
213             * tracking for this handle (lock still works; just no recovery). */
214             }
215              
216             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
217 0           static inline void cms_atomic_sub_cap(uint32_t *p, uint32_t sub) {
218 0 0         if (!sub) return;
219 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
220 0           for (;;) {
221 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
222 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
223             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
224 0           return;
225             }
226             }
227              
228             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
229             * contributions back to the global counters. A no-op if the slot was stolen
230             * by another recoverer or had no waiter contribution to drain.
231             *
232             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
233             * Between our CAS and a follow-up store, a new process could claim the
234             * slot and start populating these fields -- our stores would clobber its
235             * state. cms_claim_reader_slot zeros all three on every claim, so
236             * leaving stale values is harmless. */
237 0           static inline void cms_drain_dead_slot(CmsHandle *h, uint32_t i, uint32_t pid) {
238 0           CmsHeader *hdr = h->hdr;
239 0           uint32_t expected = pid;
240             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
241             * ACQUIRE syncs us with prior writes from the dead process to
242             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
243             * a plain RELAXED load before the CAS could miss those writes;
244             * loading them after the CAS keeps them inside the acquire window. */
245 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
246             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
247 0           return;
248 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
249 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
250 0 0         if (wp) cms_atomic_sub_cap(&hdr->rwlock_waiters, wp);
251 0 0         if (writp) cms_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
252             }
253              
254             /* Scan reader slots for dead-process recovery.
255             *
256             * For each dead PID with non-zero contributions to the shared rwlock,
257             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
258             * out so live processes don't have to wait for the slow per-op timeout
259             * decrement to drain it for them.
260             *
261             * For the main rwlock counter we use the "no live reader holds -> force-
262             * reset to 0" trick (precise) because per-process attribution of the
263             * subcount is racy across the inc-counter-then-inc-subcount window. */
264 0           static inline void cms_recover_dead_readers(CmsHandle *h) {
265 0 0         if (!h->reader_slots) return;
266 0           CmsHeader *hdr = h->hdr;
267 0           int any_live_reader = 0;
268 0           int found_dead_reader = 0;
269              
270             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
271             * contribution to lose) are wiped immediately to free the slot for
272             * future claimants and drain any orphan parked-waiter counters. Slots
273             * with dead pid and sc > 0 are left intact in this pass: if force-
274             * reset cannot fire (because a live reader is concurrently present),
275             * wiping the dead slot would lose the only record of its orphan
276             * rwlock contribution and strand writers permanently once the live
277             * reader releases. */
278 0 0         for (uint32_t i = 0; i < CMS_READER_SLOTS; i++) {
279 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
280 0 0         if (pid == 0) continue;
281 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
282 0 0         if (cms_pid_alive(pid)) {
283 0 0         if (sc > 0) any_live_reader = 1;
284 0           continue;
285             }
286 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
287 0           cms_drain_dead_slot(h, i, pid);
288             }
289              
290             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
291             * reset CAS FIRST, while the window since pass 1's last scan is
292             * still narrow (a handful of instructions, as in the original
293             * single-pass code). A new reader that started rdlock between
294             * pass 1's scan and the CAS will either:
295             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
296             * fails (cur mismatched), recovery yields and a future
297             * cycle retries; or
298             * (b) be still in the subcount-bump phase -- our CAS sees the
299             * stale cur and resets to 0; the new reader's subsequent CAS
300             * rwlock(0 -> 1) succeeds cleanly.
301             * Only after the CAS resolves do we wipe the deferred dead slots,
302             * keeping that work outside the race-sensitive window. */
303 0 0         if (found_dead_reader && !any_live_reader) {
    0          
304 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
305 0 0         if (cur > 0 && cur < CMS_RWLOCK_WRITER_BIT) {
    0          
306 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
307             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
308 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
309 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
310             }
311             }
312 0 0         for (uint32_t i = 0; i < CMS_READER_SLOTS; i++) {
313 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
314 0 0         if (pid == 0 || cms_pid_alive(pid)) continue;
    0          
315 0           cms_drain_dead_slot(h, i, pid);
316             }
317             }
318             }
319              
320             /* Inspect the lock word after a futex-wait timeout. If a dead writer
321             * holds it, force-recover the lock. Otherwise drain dead readers' shares
322             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
323             * branches -- identical recovery logic in both. */
324 0           static inline void cms_recover_after_timeout(CmsHandle *h) {
325 0           CmsHeader *hdr = h->hdr;
326 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
327 0 0         if (val >= CMS_RWLOCK_WRITER_BIT) {
328 0           uint32_t pid = val & CMS_RWLOCK_PID_MASK;
329 0 0         if (!cms_pid_alive(pid))
330 0           cms_recover_stale_lock(h, val);
331             } else {
332 0           cms_recover_dead_readers(h);
333             }
334 0           }
335              
336             /* Park/unpark helpers: bump the global waiter counters together with this
337             * process's mirrored slot counters so a wrlock-timeout recovery scan can
338             * attribute and reverse a dead PID's contribution. Kept paired to make
339             * accidental drift between global and per-slot counts impossible. */
340 0           static inline void cms_park_reader(CmsHandle *h) {
341 0 0         if (h->my_slot_idx != UINT32_MAX)
342 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
343 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
344 0           }
345 0           static inline void cms_unpark_reader(CmsHandle *h) {
346 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
347 0 0         if (h->my_slot_idx != UINT32_MAX)
348 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
349 0           }
350 0           static inline void cms_park_writer(CmsHandle *h) {
351 0 0         if (h->my_slot_idx != UINT32_MAX) {
352 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
353 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
354             }
355 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
356 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
357 0           }
358 0           static inline void cms_unpark_writer(CmsHandle *h) {
359 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
360 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
361 0 0         if (h->my_slot_idx != UINT32_MAX) {
362 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
363 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
364             }
365 0           }
366              
367 10038           static inline void cms_rwlock_rdlock(CmsHandle *h) {
368 10038           cms_claim_reader_slot(h);
369 10038           CmsHeader *hdr = h->hdr;
370 10038           uint32_t *lock = &hdr->rwlock;
371 10038           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
372             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
373             * a concurrent writer-side recovery scan that sees our PID alive with
374             * subcount > 0 will (correctly) defer force-reset, even while we are
375             * still spinning trying to win the rwlock CAS. Without this, a reader
376             * killed between rwlock CAS-success and subcount++ would let recovery
377             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
378             * our eventual rdunlock dec. */
379 10038 50         if (h->my_slot_idx != UINT32_MAX)
380 10038           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
381 10038           for (int spin = 0; ; spin++) {
382 10038           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
383             /* Write-preferring: when lock is free (cur==0) and writers are
384             * waiting, yield to let the writer acquire. When readers are
385             * already active (cur>=1), new readers may join freely. */
386 10038 50         if (cur > 0 && cur < CMS_RWLOCK_WRITER_BIT) {
    0          
387 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
388             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
389 10038           return;
390 10038 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
391 10038 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
392             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
393 10038           return;
394             }
395 0 0         if (__builtin_expect(spin < CMS_RWLOCK_SPIN_LIMIT, 1)) {
396 0           cms_rwlock_spin_pause();
397 0           continue;
398             }
399 0           cms_park_reader(h);
400 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
401             /* Sleep when write-locked OR when yielding to waiting writers */
402 0 0         if (cur >= CMS_RWLOCK_WRITER_BIT || cur == 0) {
    0          
403 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
404             &cms_lock_timeout, NULL, 0);
405 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
406 0           cms_unpark_reader(h);
407 0           cms_recover_after_timeout(h);
408 0           spin = 0;
409 0           continue;
410             }
411             }
412 0           cms_unpark_reader(h);
413 0           spin = 0;
414             }
415             }
416              
417 10038           static inline void cms_rwlock_rdunlock(CmsHandle *h) {
418 10038           CmsHeader *hdr = h->hdr;
419             /* Release the shared counter BEFORE dropping our subcount so that
420             * "any live PID with subcount > 0" is a reliable in-flight indicator
421             * for the writer-side recovery scan. Inverting these would create a
422             * window where we still own a unit of rwlock but our slot subcount is
423             * 0, letting recovery force-reset rwlock underneath us. */
424 10038           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
425 10038 50         if (h->my_slot_idx != UINT32_MAX)
426 10038           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
427 10038 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
428 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
429 10038           }
430              
431 7134           static inline void cms_rwlock_wrlock(CmsHandle *h) {
432 7134           cms_claim_reader_slot(h); /* refresh cached_pid across fork */
433 7134           CmsHeader *hdr = h->hdr;
434 7134           uint32_t *lock = &hdr->rwlock;
435             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
436             * any crash window between acquiring the lock and storing the owner. */
437 7134           uint32_t mypid = CMS_RWLOCK_WR(h->cached_pid);
438 7134           for (int spin = 0; ; spin++) {
439 7134           uint32_t expected = 0;
440 7134 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
441             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
442 7134           return;
443 0 0         if (__builtin_expect(spin < CMS_RWLOCK_SPIN_LIMIT, 1)) {
444 0           cms_rwlock_spin_pause();
445 0           continue;
446             }
447 0           cms_park_writer(h);
448 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
449 0 0         if (cur != 0) {
450 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
451             &cms_lock_timeout, NULL, 0);
452 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
453 0           cms_unpark_writer(h);
454 0           cms_recover_after_timeout(h);
455 0           spin = 0;
456 0           continue;
457             }
458             }
459 0           cms_unpark_writer(h);
460 0           spin = 0;
461             }
462             }
463              
464 7134           static inline void cms_rwlock_wrunlock(CmsHandle *h) {
465 7134           CmsHeader *hdr = h->hdr;
466 7134           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
467 7134 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
468 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
469 7134           }
470              
471             /* ================================================================
472             * Layout math + create / open / destroy
473             *
474             * Layout: Header -> reader_slots[1024] -> counters[d * w]
475             * ================================================================ */
476              
477             /* Single source of truth for the mmap region layout offsets. */
478             typedef struct { uint64_t reader_slots, counters; } CmsLayout;
479              
480 56           static inline CmsLayout cms_layout(void) {
481             CmsLayout L;
482 56           L.reader_slots = sizeof(CmsHeader);
483 56           L.counters = L.reader_slots + (uint64_t)CMS_READER_SLOTS * sizeof(CmsReaderSlot);
484 56           L.counters = (L.counters + 7) & ~(uint64_t)7; /* 8-byte align the counter matrix (uint64_t words) */
485 56           return L;
486             }
487              
488 29           static inline uint64_t cms_total_size(uint64_t w, uint32_t d) {
489 29           CmsLayout L = cms_layout();
490 29           return L.counters + (uint64_t)d * w * sizeof(uint64_t); /* d*w uint64_t cells */
491             }
492              
493             /* round v up to the next power of two (64-bit), with a floor of CMS_MIN_W */
494 27           static inline uint64_t cms_next_pow2_u64(uint64_t v) {
495 27 50         if (v <= CMS_MIN_W) return CMS_MIN_W;
496 27           return 1ULL << (64 - __builtin_clzll(v - 1));
497             }
498              
499 25           static inline void cms_init_header(void *base, uint64_t w, uint32_t d,
500             uint64_t total_size) {
501 25           CmsLayout L = cms_layout();
502 25           CmsHeader *hdr = (CmsHeader *)base;
503             /* Explicitly zero the header + reader-slot region (lock-recovery state);
504             the counter matrix relies on the fresh mapping being OS zero-filled. */
505 25           memset(base, 0, (size_t)L.counters);
506 25           hdr->magic = CMS_MAGIC;
507 25           hdr->version = CMS_VERSION;
508 25           hdr->d = d;
509 25           hdr->w = w;
510 25           hdr->mask = w - 1;
511 25           hdr->total = 0;
512 25           hdr->total_size = total_size;
513 25           hdr->reader_slots_off = L.reader_slots;
514 25           hdr->counters_off = L.counters;
515 25           __atomic_thread_fence(__ATOMIC_SEQ_CST);
516 25           }
517              
518 19651           static inline uint64_t *cms_counters(CmsHandle *h) {
519 19651           return (uint64_t *)((char *)h->base + h->hdr->counters_off);
520             }
521              
522 27           static inline CmsHandle *cms_setup(void *base, size_t map_size,
523             const char *path, int backing_fd) {
524 27           CmsHeader *hdr = (CmsHeader *)base;
525 27           CmsHandle *h = (CmsHandle *)calloc(1, sizeof(CmsHandle));
526 27 50         if (!h) {
527 0           munmap(base, map_size);
528 0 0         if (backing_fd >= 0) close(backing_fd);
529 0           return NULL;
530             }
531 27           h->hdr = hdr;
532 27           h->base = base;
533 27           h->reader_slots = (CmsReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
534 27           h->mmap_size = map_size;
535 27 100         h->path = path ? strdup(path) : NULL;
536 27           h->backing_fd = backing_fd;
537 27           h->my_slot_idx = UINT32_MAX;
538 27           return h;
539             }
540              
541             /* Validate a mapped header (shared by cms_create reopen and cms_open_fd). */
542 2           static inline int cms_validate_header(const CmsHeader *hdr, uint64_t file_size) {
543 2 50         if (hdr->magic != CMS_MAGIC) return 0;
544 2 50         if (hdr->version != CMS_VERSION) return 0;
545 2 50         if (hdr->d < CMS_MIN_D || hdr->d > CMS_MAX_D) return 0;
    50          
546 2 50         if (hdr->w < CMS_MIN_W || hdr->w > CMS_MAX_W) return 0;
    50          
547 2 50         if ((hdr->w & (hdr->w - 1)) != 0) return 0; /* power of two */
548 2 50         if (hdr->mask != hdr->w - 1) return 0;
549 2 50         if (hdr->total_size != file_size) return 0;
550 2 50         if (hdr->total_size != cms_total_size(hdr->w, hdr->d)) return 0;
551 2           CmsLayout L = cms_layout();
552 2 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
553 2 50         if (hdr->counters_off != L.counters) return 0;
554 2           return 1;
555             }
556              
557             /* validate args + compute the geometry (w, d).
558             * w = next_pow2(ceil(M_E / epsilon)), floor CMS_MIN_W, cap CMS_MAX_W
559             * d = ceil(log(1 / delta)) clamped to [1, 32] */
560 35           static int cms_validate_create_args(double epsilon, double delta,
561             uint64_t *w_out, uint32_t *d_out, char *errbuf) {
562 35 50         if (errbuf) errbuf[0] = '\0';
563 35 100         if (!(epsilon > 0.0 && epsilon < 1.0)) { CMS_ERR("epsilon must be between 0 and 1 (exclusive)"); return 0; }
    100          
    50          
564 31 100         if (!(delta > 0.0 && delta < 1.0)) { CMS_ERR("delta must be between 0 and 1 (exclusive)"); return 0; }
    100          
    50          
565              
566             /* w = next_pow2(ceil(e / epsilon)), floor CMS_MIN_W. Reject (don't silently
567             clamp) an epsilon so small the column count would exceed the cap, else the
568             achieved error bound would be worse than requested (and the mapping huge). */
569 28           double w_opt_d = ceil(M_E / epsilon);
570 28 100         if (w_opt_d > (double)CMS_MAX_W) { CMS_ERR("epsilon too small for the column cap"); return 0; }
    50          
571 27           uint64_t w = cms_next_pow2_u64((uint64_t)w_opt_d);
572              
573             /* d = ceil(log(1/delta)) clamped to [1, 32] */
574 27           double d_d = ceil(log(1.0 / delta));
575 27           long dl = (long)d_d;
576 27 50         if (dl < CMS_MIN_D) dl = CMS_MIN_D;
577 27 50         if (dl > CMS_MAX_D) dl = CMS_MAX_D;
578 27           uint32_t d = (uint32_t)dl;
579              
580 27           *w_out = w;
581 27           *d_out = d;
582 27           return 1;
583             }
584              
585 31           static CmsHandle *cms_create(const char *path, double epsilon, double delta, char *errbuf) {
586             uint64_t w;
587             uint32_t d;
588 31 100         if (!cms_validate_create_args(epsilon, delta, &w, &d, errbuf)) return NULL;
589              
590 25           uint64_t total = cms_total_size(w, d);
591 25           int anonymous = (path == NULL);
592 25           int fd = -1;
593             size_t map_size;
594             void *base;
595              
596 25 100         if (anonymous) {
597 20           map_size = (size_t)total;
598 20           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
599 20 50         if (base == MAP_FAILED) { CMS_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
600             } else {
601 5           fd = open(path, O_RDWR|O_CREAT, 0666);
602 7 50         if (fd < 0) { CMS_ERR("open: %s", strerror(errno)); return NULL; }
    0          
603 5 50         if (flock(fd, LOCK_EX) < 0) { CMS_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
604             struct stat st;
605 5 50         if (fstat(fd, &st) < 0) { CMS_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
606 5           int is_new = (st.st_size == 0);
607 5 100         if (!is_new && (uint64_t)st.st_size < sizeof(CmsHeader)) {
    100          
608 1 50         CMS_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
609 1           flock(fd, LOCK_UN); close(fd); return NULL;
610             }
611 4 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
612 0 0         CMS_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
613             }
614 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
615 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
616 4 50         if (base == MAP_FAILED) { CMS_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
617 4 100         if (!is_new) {
618 1 50         if (!cms_validate_header((CmsHeader *)base, (uint64_t)st.st_size)) {
619 0 0         CMS_ERR("invalid Count-Min sketch file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
620             }
621 1           flock(fd, LOCK_UN); close(fd);
622 1           return cms_setup(base, map_size, path, -1);
623             }
624             }
625 23           cms_init_header(base, w, d, total);
626 23 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
627 23           return cms_setup(base, map_size, path, -1);
628             }
629              
630 4           static CmsHandle *cms_create_memfd(const char *name, double epsilon, double delta, char *errbuf) {
631             uint64_t w;
632             uint32_t d;
633 4 100         if (!cms_validate_create_args(epsilon, delta, &w, &d, errbuf)) return NULL;
634              
635 2           uint64_t total = cms_total_size(w, d);
636 2 100         int fd = memfd_create(name ? name : "cms", MFD_CLOEXEC | MFD_ALLOW_SEALING);
637 2 50         if (fd < 0) { CMS_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
638 2 50         if (ftruncate(fd, (off_t)total) < 0) {
639 0 0         CMS_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
640             }
641 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
642 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
643 2 50         if (base == MAP_FAILED) { CMS_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
644 2           cms_init_header(base, w, d, total);
645 2           return cms_setup(base, (size_t)total, NULL, fd);
646             }
647              
648 2           static CmsHandle *cms_open_fd(int fd, char *errbuf) {
649 2 50         if (errbuf) errbuf[0] = '\0';
650             struct stat st;
651 2 50         if (fstat(fd, &st) < 0) { CMS_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
652 2 100         if ((uint64_t)st.st_size < sizeof(CmsHeader)) { CMS_ERR("too small"); return NULL; }
    50          
653 1           size_t ms = (size_t)st.st_size;
654 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
655 1 50         if (base == MAP_FAILED) { CMS_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
656 1 50         if (!cms_validate_header((CmsHeader *)base, (uint64_t)st.st_size)) {
657 0 0         CMS_ERR("invalid Count-Min sketch table"); munmap(base, ms); return NULL;
658             }
659 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
660 1 50         if (myfd < 0) { CMS_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
661 1           return cms_setup(base, ms, NULL, myfd);
662             }
663              
664 27           static void cms_destroy(CmsHandle *h) {
665 27 50         if (!h) return;
666 27 100         if (h->backing_fd >= 0) close(h->backing_fd);
667 27 50         if (h->base) munmap(h->base, h->mmap_size);
668 27           free(h->path);
669 27           free(h);
670             }
671              
672 2           static inline int cms_msync(CmsHandle *h) {
673 2 50         if (!h || !h->base) return 0;
    50          
674 2           return msync(h->base, h->mmap_size, MS_SYNC);
675             }
676              
677             /* ================================================================
678             * Count-Min sketch operations (callers hold the lock) -- d-row double
679             * hashing: one XXH3-128 hash drives all d row probes.
680             * Row r's column = (h1 + r*h2) & mask.
681             * ================================================================ */
682              
683 19644           static inline void cms_indices(const void *item, size_t len,
684             uint64_t *h1, uint64_t *h2) {
685 19644           XXH128_hash_t hh = XXH3_128bits(item, len);
686 19644           *h1 = hh.low64;
687 19644           *h2 = hh.high64 | 1ULL; /* force odd so the d row columns spread over the pow2 matrix */
688 19644           }
689              
690             /* add n to each of the d cells; bump total by n (caller holds the write lock) */
691 9626           static void cms_add_locked(CmsHandle *h, const void *item, size_t len, uint64_t n) {
692             uint64_t h1, h2;
693 9626           cms_indices(item, len, &h1, &h2);
694 9626           uint64_t w = h->hdr->w;
695 9626           uint64_t mask = h->hdr->mask;
696 9626           uint32_t d = h->hdr->d;
697 9626           uint64_t *counters = cms_counters(h);
698 77008 100         for (uint32_t r = 0; r < d; r++) {
699 67382           uint64_t c = (h1 + (uint64_t)r * h2) & mask;
700 67382           counters[(uint64_t)r * w + c] += n;
701             }
702 9626           h->hdr->total += n;
703 9626           }
704              
705             /* return the minimum of the d cells -- the Count-Min estimate, which never
706             * underestimates the true count (caller holds a lock) */
707 10018           static uint64_t cms_estimate_locked(CmsHandle *h, const void *item, size_t len) {
708             uint64_t h1, h2;
709 10018           cms_indices(item, len, &h1, &h2);
710 10018           uint64_t w = h->hdr->w;
711 10018           uint64_t mask = h->hdr->mask;
712 10018           uint32_t d = h->hdr->d;
713 10018           uint64_t *counters = cms_counters(h);
714 10018           uint64_t m = UINT64_MAX;
715 80144 100         for (uint32_t r = 0; r < d; r++) {
716 70126           uint64_t c = (h1 + (uint64_t)r * h2) & mask;
717 70126           uint64_t v = counters[(uint64_t)r * w + c];
718 70126 100         if (v < m) m = v;
719             }
720 10018           return m;
721             }
722              
723             /* merge src cells into dst (caller guarantees equal w and d); cellwise add,
724             * saturating at UINT64_MAX on overflow (caller holds dst's write lock) */
725 3           static void cms_merge_counters(uint64_t *dst, const uint64_t *src, uint64_t cells) {
726 86019 100         for (uint64_t i = 0; i < cells; i++) {
727 86016 50         if (dst[i] > UINT64_MAX - src[i]) dst[i] = UINT64_MAX;
728 86016           else dst[i] += src[i];
729             }
730 3           }
731              
732             /* reset all counters to 0 and total to 0 (caller holds the write lock) */
733 1           static inline void cms_clear_locked(CmsHandle *h) {
734 1           uint64_t cells = (uint64_t)h->hdr->d * h->hdr->w;
735 1           memset(cms_counters(h), 0, (size_t)(cells * sizeof(uint64_t)));
736 1           h->hdr->total = 0;
737 1           }
738              
739             #endif /* CMS_H */