File Coverage

sortedset.h
Criterion Covered Total %
statement 561 689 81.4
branch 339 554 61.1
condition n/a
subroutine n/a
pod n/a
total 900 1243 72.4


line stmt bran cond sub pod time code
1             /*
2             * sortedset.h -- Shared-memory sorted set (ZSET) for Linux
3             *
4             * int64 members ordered by a double score on an order-statistics B+tree (keyed
5             * by the total order (score, member); per-child subtree counts give O(log n)
6             * rank; doubly-linked leaves give sequential range scans), plus a member->score
7             * open-addressed hash index for O(1) lookup by member. Nodes and index slots
8             * live in fixed pools. A write-preferring futex rwlock with reader-slot
9             * dead-process recovery guards mutations.
10             *
11             * Layout: Header -> reader_slots[1024] -> member_index -> node_pool
12             */
13              
14             #ifndef SORTEDSET_H
15             #define SORTEDSET_H
16              
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33              
34             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
35             #error "sortedset.h: requires little-endian architecture"
36             #endif
37              
38              
39             /* ================================================================
40             * Constants
41             * ================================================================ */
42              
43             #define SS_MAGIC 0x53534554U /* "SSET" */
44             #define SS_VERSION 1
45             #define SS_NONE UINT32_MAX
46             #define SS_ERR_BUFLEN 256
47             #define SS_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
48              
49             #define SS_ORDER 16 /* B+tree fanout: max children / max leaf entries */
50             #define SS_MIN (SS_ORDER / 2) /* min children/entries except the root */
51              
52             #define SS_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, SS_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
53              
54             /* ================================================================
55             * Structs
56             * ================================================================ */
57              
58             /* B+tree node (one pool, one size). Leaf (is_leaf=1): up to SS_ORDER
59             (score,member) entries in key order, doubly-linked via next/prev. Internal:
60             up to SS_ORDER children with SS_ORDER-1 separators (scores/members) and
61             per-child subtree entry counts (order-statistics, for rank). Parallel arrays
62             so the in-node binary search scans packed scores. Free nodes thread the
63             free-list through `parent`. */
64             /* arrays are SS_ORDER+1 to hold the transient overflow during a split */
65             typedef struct {
66             uint16_t num; /* leaf: #entries; internal: #children */
67             uint8_t is_leaf;
68             uint8_t _pad;
69             uint32_t parent; /* SS_NONE for root; free-list link when free */
70             uint32_t next, prev; /* leaf sibling links (SS_NONE at the ends) */
71             double scores[SS_ORDER + 1]; /* leaf entries / internal separators (num-1 used) */
72             int64_t members[SS_ORDER + 1];
73             uint32_t children[SS_ORDER + 1]; /* internal child node idx */
74             uint32_t counts[SS_ORDER + 1]; /* internal per-child subtree entry counts */
75             } SsNode;
76              
77             /* member -> score open-addressed hash index slot (backward-shift delete) */
78             typedef struct {
79             int64_t member;
80             double score;
81             uint8_t state; /* 0 empty, 1 occupied */
82             } SsIdxSlot;
83              
84             /* Per-process slot for dead-process recovery. Each shared rwlock counter
85             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
86             * is mirrored here so a wrlock timeout can attribute and reverse a dead
87             * process's contribution instead of waiting for the slow per-op timeout
88             * drain. */
89             typedef struct {
90             uint32_t pid; /* 0 = unclaimed */
91             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
92             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
93             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
94             } SsReaderSlot;
95              
96             struct SsHeader {
97             uint32_t magic, version; /* 0,4 */
98             uint32_t max_entries; /* 8 entry capacity */
99             uint32_t node_capacity; /* 12 node-pool size */
100             uint32_t index_slots; /* 16 member-index slots (power of two) */
101             uint32_t count; /* 20 live entries */
102             uint64_t total_size; /* 24 */
103             uint64_t reader_slots_off; /* 32 */
104             uint64_t index_off; /* 40 */
105             uint64_t nodes_off; /* 48 */
106             uint32_t root; /* 56 root node idx (SS_NONE if empty) */
107             uint32_t height; /* 60 tree height (0 = empty) */
108             uint32_t leftmost; /* 64 leftmost leaf idx */
109             uint32_t rightmost; /* 68 rightmost leaf idx */
110             uint32_t node_free_head; /* 72 */
111             uint32_t rwlock; /* 76 */
112             uint32_t rwlock_waiters; /* 80 */
113             uint32_t rwlock_writers_waiting; /* 84 */
114             uint64_t stat_ops; /* 88 */
115             uint8_t _pad[160]; /* 96..255 */
116             };
117             typedef struct SsHeader SsHeader;
118              
119             _Static_assert(sizeof(SsHeader) == 256, "SsHeader must be 256 bytes");
120              
121             /* ---- Process-local handle ---- */
122              
123             typedef struct SsHandle {
124             SsHeader *hdr;
125             SsReaderSlot *reader_slots; /* SS_READER_SLOTS entries */
126             SsIdxSlot *index; /* member -> score */
127             SsNode *nodes; /* B+tree node pool */
128             size_t mmap_size;
129             char *path; /* backing file path (strdup'd) */
130             int notify_fd;
131             int backing_fd; /* memfd fd to close on destroy, -1 otherwise */
132             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
133             uint32_t cached_pid; /* getpid() cached at last slot claim */
134             uint32_t cached_fork_gen; /* ss_fork_gen value at last slot claim */
135             } SsHandle;
136              
137             /* ================================================================
138             * Helpers
139             * ================================================================ */
140              
141 35           static inline uint32_t ss_next_pow2(uint32_t v) {
142 35 50         if (v < 2) return 1;
143 35           return 1u << (32 - __builtin_clz(v - 1));
144             }
145              
146             /* member hash: splitmix64 finalizer (good avalanche for int64 keys) */
147 366114           static inline uint64_t ss_hash_member(int64_t m) {
148 366114           uint64_t x = (uint64_t)m + 0x9E3779B97F4A7C15ULL;
149 366114           x = (x ^ (x >> 30)) * 0xBF58476D1CE4E5B9ULL;
150 366114           x = (x ^ (x >> 27)) * 0x94D049BB133111EBULL;
151 366114           return x ^ (x >> 31);
152             }
153              
154             /* total order on (score, member): -1 / 0 / +1 */
155 3294907           static inline int ss_key_cmp(double sa, int64_t ma, double sb, int64_t mb) {
156 3294907 100         if (sa < sb) return -1;
157 2522829 100         if (sa > sb) return 1;
158 2155400 100         return (ma < mb) ? -1 : (ma > mb) ? 1 : 0;
159             }
160              
161             /* ================================================================
162             * Futex-based write-preferring read-write lock
163             * with reader-slot dead-process recovery
164             * ================================================================ */
165              
166             #define SS_RWLOCK_SPIN_LIMIT 32
167             #define SS_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
168              
169 0           static inline void ss_rwlock_spin_pause(void) {
170             #if defined(__x86_64__) || defined(__i386__)
171 0           __asm__ volatile("pause" ::: "memory");
172             #elif defined(__aarch64__)
173             __asm__ volatile("yield" ::: "memory");
174             #else
175             __asm__ volatile("" ::: "memory");
176             #endif
177 0           }
178              
179             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
180             #define SS_RWLOCK_WRITER_BIT 0x80000000U
181             #define SS_RWLOCK_PID_MASK 0x7FFFFFFFU
182             #define SS_RWLOCK_WR(pid) (SS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SS_RWLOCK_PID_MASK))
183              
184             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
185             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
186             * lock-holder's PID is recycled to an unrelated live process before recovery
187             * runs, this reports "alive" and that slot's orphaned contribution is not
188             * reclaimed until the recycled process exits. Robust detection would require
189             * a per-slot process-start-time epoch (a header-layout/version change).
190             * Documented under "Crash Safety" in the POD. */
191 0           static inline int ss_pid_alive(uint32_t pid) {
192 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
193 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
194             }
195              
196             /* Force-recover a stale write lock left by a dead process.
197             * CAS to OUR pid to hold the lock while fixing shared state, then release.
198             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
199             * recovering process can detect and re-recover if we crash mid-recovery. */
200 0           static inline void ss_recover_stale_lock(SsHandle *h, uint32_t observed_rwlock) {
201 0           SsHeader *hdr = h->hdr;
202 0           uint32_t mypid = SS_RWLOCK_WR((uint32_t)getpid());
203 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
204             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
205 0           return;
206             /* We now hold the write lock as mypid. No additional shared state needs
207             * repair here (this module has no seqlock); just release the lock. */
208 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
209 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
210 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
211             }
212              
213             static const struct timespec ss_lock_timeout = { SS_LOCK_TIMEOUT_SEC, 0 };
214              
215             /* Process-global fork-generation counter. Incremented in the pthread_atfork
216             * child callback so every open handle detects a fork transition on the next
217             * lock call without paying a getpid() syscall on the hot path. */
218             static uint32_t ss_fork_gen = 1;
219             static pthread_once_t ss_atfork_once = PTHREAD_ONCE_INIT;
220 0           static void ss_on_fork_child(void) {
221 0           __atomic_add_fetch(&ss_fork_gen, 1, __ATOMIC_RELAXED);
222 0           }
223 8           static void ss_atfork_init(void) {
224 8           pthread_atfork(NULL, NULL, ss_on_fork_child);
225 8           }
226              
227             /* Ensure this process owns a reader slot. Called from the lock helpers so
228             * that fork()'d children pick up their own slot lazily instead of sharing
229             * the parent's. Hot-path is a single relaxed load + compare; only on a
230             * fork-generation mismatch do we touch getpid() and scan slots. */
231 234404           static inline void ss_claim_reader_slot(SsHandle *h) {
232 234404           uint32_t cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
233 234404 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
234 234374           return;
235             /* Cold path -- register the atfork hook once per process, then claim. */
236 30           pthread_once(&ss_atfork_once, ss_atfork_init);
237             /* Re-read after pthread_once: ss_on_fork_child may have bumped it. */
238 30           cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
239 30           uint32_t now_pid = (uint32_t)getpid();
240 30           h->cached_pid = now_pid;
241 30           h->cached_fork_gen = cur_gen;
242 30           h->my_slot_idx = UINT32_MAX;
243 30           uint32_t start = now_pid % SS_READER_SLOTS;
244 33 50         for (uint32_t i = 0; i < SS_READER_SLOTS; i++) {
245 33           uint32_t s = (start + i) % SS_READER_SLOTS;
246 33           uint32_t expected = 0;
247 33 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
248             &expected, now_pid, 0,
249             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
250             /* Zero all mirror fields, not just subcount: a SIGKILL'd
251             * predecessor may have left waiters_parked/writers_parked
252             * non-zero, and ss_recover_dead_readers won't drain them
253             * once we own the slot (the CAS expects the dead PID). */
254 30           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
255 30           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
256 30           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
257 30           h->my_slot_idx = s;
258 30           return;
259             }
260             }
261             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
262             * tracking for this handle (lock still works; just no recovery). */
263             }
264              
265             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
266 0           static inline void ss_atomic_sub_cap(uint32_t *p, uint32_t sub) {
267 0 0         if (!sub) return;
268 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
269 0           for (;;) {
270 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
271 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
272             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
273 0           return;
274             }
275             }
276              
277             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
278             * contributions back to the global counters. A no-op if the slot was stolen
279             * by another recoverer or had no waiter contribution to drain.
280             *
281             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
282             * Between our CAS and a follow-up store, a new process could claim the
283             * slot and start populating these fields -- our stores would clobber its
284             * state. ss_claim_reader_slot zeros all three on every claim, so
285             * leaving stale values is harmless. */
286 0           static inline void ss_drain_dead_slot(SsHandle *h, uint32_t i, uint32_t pid) {
287 0           SsHeader *hdr = h->hdr;
288 0           uint32_t expected = pid;
289             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
290             * ACQUIRE syncs us with prior writes from the dead process to
291             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
292             * a plain RELAXED load before the CAS could miss those writes;
293             * loading them after the CAS keeps them inside the acquire window. */
294 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
295             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
296 0           return;
297 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
298 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
299 0 0         if (wp) ss_atomic_sub_cap(&hdr->rwlock_waiters, wp);
300 0 0         if (writp) ss_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
301             }
302              
303             /* Scan reader slots for dead-process recovery.
304             *
305             * For each dead PID with non-zero contributions to the shared rwlock,
306             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
307             * out so live processes don't have to wait for the slow per-op timeout
308             * decrement to drain it for them.
309             *
310             * For the main rwlock counter we use the "no live reader holds -> force-
311             * reset to 0" trick (precise) because per-process attribution of the
312             * subcount is racy across the inc-counter-then-inc-subcount window. */
313 0           static inline void ss_recover_dead_readers(SsHandle *h) {
314 0 0         if (!h->reader_slots) return;
315 0           SsHeader *hdr = h->hdr;
316 0           int any_live_reader = 0;
317 0           int found_dead_reader = 0;
318              
319             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
320             * contribution to lose) are wiped immediately to free the slot for
321             * future claimants and drain any orphan parked-waiter counters. Slots
322             * with dead pid and sc > 0 are left intact in this pass: if force-
323             * reset cannot fire (because a live reader is concurrently present),
324             * wiping the dead slot would lose the only record of its orphan
325             * rwlock contribution and strand writers permanently once the live
326             * reader releases. */
327 0 0         for (uint32_t i = 0; i < SS_READER_SLOTS; i++) {
328 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
329 0 0         if (pid == 0) continue;
330 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
331 0 0         if (ss_pid_alive(pid)) {
332 0 0         if (sc > 0) any_live_reader = 1;
333 0           continue;
334             }
335 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
336 0           ss_drain_dead_slot(h, i, pid);
337             }
338              
339             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
340             * reset CAS FIRST, while the window since pass 1's last scan is
341             * still narrow (a handful of instructions, as in the original
342             * single-pass code). A new reader that started rdlock between
343             * pass 1's scan and the CAS will either:
344             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
345             * fails (cur mismatched), recovery yields and a future
346             * cycle retries; or
347             * (b) be still in the subcount-bump phase -- our CAS sees the
348             * stale cur and resets to 0; the new reader's subsequent CAS
349             * rwlock(0 -> 1) succeeds cleanly.
350             * Only after the CAS resolves do we wipe the deferred dead slots,
351             * keeping that work outside the race-sensitive window. */
352 0 0         if (found_dead_reader && !any_live_reader) {
    0          
353 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
354 0 0         if (cur > 0 && cur < SS_RWLOCK_WRITER_BIT) {
    0          
355 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
356             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
357 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
358 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
359             }
360             }
361 0 0         for (uint32_t i = 0; i < SS_READER_SLOTS; i++) {
362 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
363 0 0         if (pid == 0 || ss_pid_alive(pid)) continue;
    0          
364 0           ss_drain_dead_slot(h, i, pid);
365             }
366             }
367             }
368              
369             /* Inspect the lock word after a futex-wait timeout. If a dead writer
370             * holds it, force-recover the lock. Otherwise drain dead readers' shares
371             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
372             * branches -- identical recovery logic in both. */
373 0           static inline void ss_recover_after_timeout(SsHandle *h) {
374 0           SsHeader *hdr = h->hdr;
375 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
376 0 0         if (val >= SS_RWLOCK_WRITER_BIT) {
377 0           uint32_t pid = val & SS_RWLOCK_PID_MASK;
378 0 0         if (!ss_pid_alive(pid))
379 0           ss_recover_stale_lock(h, val);
380             } else {
381 0           ss_recover_dead_readers(h);
382             }
383 0           }
384              
385             /* Park/unpark helpers: bump the global waiter counters together with this
386             * process's mirrored slot counters so a wrlock-timeout recovery scan can
387             * attribute and reverse a dead PID's contribution. Kept paired to make
388             * accidental drift between global and per-slot counts impossible. */
389 0           static inline void ss_park_reader(SsHandle *h) {
390 0 0         if (h->my_slot_idx != UINT32_MAX)
391 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
392 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
393 0           }
394 0           static inline void ss_unpark_reader(SsHandle *h) {
395 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
396 0 0         if (h->my_slot_idx != UINT32_MAX)
397 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
398 0           }
399 0           static inline void ss_park_writer(SsHandle *h) {
400 0 0         if (h->my_slot_idx != UINT32_MAX) {
401 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
402 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
403             }
404 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
405 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
406 0           }
407 0           static inline void ss_unpark_writer(SsHandle *h) {
408 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
409 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
410 0 0         if (h->my_slot_idx != UINT32_MAX) {
411 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
412 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
413             }
414 0           }
415              
416 76430           static inline void ss_rwlock_rdlock(SsHandle *h) {
417 76430           ss_claim_reader_slot(h);
418 76430           SsHeader *hdr = h->hdr;
419 76430           uint32_t *lock = &hdr->rwlock;
420 76430           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
421             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
422             * a concurrent writer-side recovery scan that sees our PID alive with
423             * subcount > 0 will (correctly) defer force-reset, even while we are
424             * still spinning trying to win the rwlock CAS. Without this, a reader
425             * killed between rwlock CAS-success and subcount++ would let recovery
426             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
427             * our eventual rdunlock dec. */
428 76430 50         if (h->my_slot_idx != UINT32_MAX)
429 76430           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
430 76430           for (int spin = 0; ; spin++) {
431 76430           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
432             /* Write-preferring: when lock is free (cur==0) and writers are
433             * waiting, yield to let the writer acquire. When readers are
434             * already active (cur>=1), new readers may join freely. */
435 76430 50         if (cur > 0 && cur < SS_RWLOCK_WRITER_BIT) {
    0          
436 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
437             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
438 76430           return;
439 76430 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
440 76430 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
441             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
442 76430           return;
443             }
444 0 0         if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
445 0           ss_rwlock_spin_pause();
446 0           continue;
447             }
448 0           ss_park_reader(h);
449 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
450             /* Sleep when write-locked OR when yielding to waiting writers */
451 0 0         if (cur >= SS_RWLOCK_WRITER_BIT || cur == 0) {
    0          
452 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
453             &ss_lock_timeout, NULL, 0);
454 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
455 0           ss_unpark_reader(h);
456 0           ss_recover_after_timeout(h);
457 0           spin = 0;
458 0           continue;
459             }
460             }
461 0           ss_unpark_reader(h);
462 0           spin = 0;
463             }
464             }
465              
466 76430           static inline void ss_rwlock_rdunlock(SsHandle *h) {
467 76430           SsHeader *hdr = h->hdr;
468             /* Release the shared counter BEFORE dropping our subcount so that
469             * "any live PID with subcount > 0" is a reliable in-flight indicator
470             * for the writer-side recovery scan. Inverting these would create a
471             * window where we still own a unit of rwlock but our slot subcount is
472             * 0, letting recovery force-reset rwlock underneath us. */
473 76430           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
474 76430 50         if (h->my_slot_idx != UINT32_MAX)
475 76430           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
476 76430 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
477 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
478 76430           }
479              
480 157974           static inline void ss_rwlock_wrlock(SsHandle *h) {
481 157974           ss_claim_reader_slot(h); /* refresh cached_pid across fork */
482 157974           SsHeader *hdr = h->hdr;
483 157974           uint32_t *lock = &hdr->rwlock;
484             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
485             * any crash window between acquiring the lock and storing the owner. */
486 157974           uint32_t mypid = SS_RWLOCK_WR(h->cached_pid);
487 157974           for (int spin = 0; ; spin++) {
488 157974           uint32_t expected = 0;
489 157974 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
490             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
491 157974           return;
492 0 0         if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
493 0           ss_rwlock_spin_pause();
494 0           continue;
495             }
496 0           ss_park_writer(h);
497 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
498 0 0         if (cur != 0) {
499 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
500             &ss_lock_timeout, NULL, 0);
501 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
502 0           ss_unpark_writer(h);
503 0           ss_recover_after_timeout(h);
504 0           spin = 0;
505 0           continue;
506             }
507             }
508 0           ss_unpark_writer(h);
509 0           spin = 0;
510             }
511             }
512              
513 157974           static inline void ss_rwlock_wrunlock(SsHandle *h) {
514 157974           SsHeader *hdr = h->hdr;
515 157974           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
516 157974 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
517 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
518 157974           }
519              
520             /* ================================================================
521             * Layout math + create / open / destroy
522             *
523             * Layout: Header -> reader_slots[1024] -> member_index -> node_pool
524             * ================================================================ */
525              
526             /* Largest max_entries accepted at create time. 2^30 keeps the index-slot
527             * power-of-two rounding (ss_next_pow2) and every byte offset well within
528             * range, and is far beyond any realistic shared-memory map. */
529             #define SS_MAX_CAPACITY 0x40000000u
530              
531             /* Single source of truth for the mmap region layout offsets. */
532             typedef struct { uint64_t reader_slots, index, nodes; } SsLayout;
533              
534 75           static inline SsLayout ss_layout(uint32_t index_slots) {
535             SsLayout L;
536 75           L.reader_slots = sizeof(SsHeader);
537 75           L.index = L.reader_slots + (uint64_t)SS_READER_SLOTS * sizeof(SsReaderSlot);
538 75           L.nodes = L.index + (uint64_t)index_slots * sizeof(SsIdxSlot);
539 75           L.nodes = (L.nodes + 7) & ~(uint64_t)7; /* 8-byte align the node pool */
540 75           return L;
541             }
542              
543 40           static inline uint64_t ss_total_size(uint32_t index_slots, uint32_t node_capacity) {
544 40           SsLayout L = ss_layout(index_slots);
545 40           return L.nodes + (uint64_t)node_capacity * sizeof(SsNode);
546             }
547              
548 30           static inline void ss_init_header(void *base, uint32_t max_entries, uint32_t index_slots,
549             uint32_t node_capacity, uint64_t total) {
550 30           SsLayout L = ss_layout(index_slots);
551 30           SsHeader *hdr = (SsHeader *)base;
552 30           memset(base, 0, (size_t)total);
553 30           hdr->magic = SS_MAGIC;
554 30           hdr->version = SS_VERSION;
555 30           hdr->max_entries = max_entries;
556 30           hdr->node_capacity = node_capacity;
557 30           hdr->index_slots = index_slots;
558 30           hdr->count = 0;
559 30           hdr->total_size = total;
560 30           hdr->reader_slots_off = L.reader_slots;
561 30           hdr->index_off = L.index;
562 30           hdr->nodes_off = L.nodes;
563 30           hdr->root = SS_NONE; /* empty tree */
564 30           hdr->height = 0;
565 30           hdr->leftmost = SS_NONE;
566 30           hdr->rightmost = SS_NONE;
567              
568             /* Thread the node free-list through `parent`. */
569 30           SsNode *nodes = (SsNode *)((uint8_t *)base + L.nodes);
570 15487 100         for (uint32_t i = 0; i < node_capacity; i++)
571 15457 100         nodes[i].parent = (i + 1 < node_capacity) ? (i + 1) : SS_NONE;
572 30           hdr->node_free_head = 0;
573             /* index region left zeroed: every slot empty (state == 0). */
574 30           __atomic_thread_fence(__ATOMIC_SEQ_CST);
575 30           }
576              
577 35           static inline SsHandle *ss_setup(void *base, size_t map_size,
578             const char *path, int backing_fd) {
579 35           SsHeader *hdr = (SsHeader *)base;
580 35           SsHandle *h = (SsHandle *)calloc(1, sizeof(SsHandle));
581 35 50         if (!h) {
582 0           munmap(base, map_size);
583 0 0         if (backing_fd >= 0) close(backing_fd);
584 0           return NULL;
585             }
586 35           h->hdr = hdr;
587 35           h->reader_slots = (SsReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
588 35           h->index = (SsIdxSlot *)((uint8_t *)base + hdr->index_off);
589 35           h->nodes = (SsNode *)((uint8_t *)base + hdr->nodes_off);
590 35           h->mmap_size = map_size;
591 35 100         h->path = path ? strdup(path) : NULL;
592 35           h->notify_fd = -1;
593 35           h->backing_fd = backing_fd;
594 35           h->my_slot_idx = UINT32_MAX;
595 35           return h;
596             }
597              
598             /* Validate a mapped header (shared by ss_create reopen and ss_open_fd). */
599 5           static inline int ss_validate_header(const SsHeader *hdr, uint64_t file_size) {
600 5 50         if (hdr->magic != SS_MAGIC) return 0;
601 5 50         if (hdr->version != SS_VERSION) return 0;
602 5 50         if (hdr->max_entries == 0 || hdr->max_entries > SS_MAX_CAPACITY) return 0;
    50          
603 5 50         if (hdr->index_slots == 0 || (hdr->index_slots & (hdr->index_slots - 1)) != 0) return 0; /* pow2 */
    50          
604 5 50         if (hdr->node_capacity == 0) return 0;
605 5 50         if (hdr->total_size != file_size) return 0;
606 5 50         if (hdr->total_size != ss_total_size(hdr->index_slots, hdr->node_capacity)) return 0;
607 5           SsLayout L = ss_layout(hdr->index_slots);
608 5 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
609 5 50         if (hdr->index_off != L.index) return 0;
610 5 50         if (hdr->nodes_off != L.nodes) return 0;
611 5 50         if (hdr->count > hdr->max_entries) return 0;
612 5 100         if (hdr->root != SS_NONE && hdr->root >= hdr->node_capacity) return 0;
    50          
613 5 100         if (hdr->root == SS_NONE && hdr->count != 0) return 0;
    50          
614 5           return 1;
615             }
616              
617              
618             /* validate max_entries and compute the index-slot count + node-pool capacity
619             (shared by ss_create + ss_create_memfd) */
620 36           static int ss_validate_create_args(uint32_t max_entries, uint32_t *index_slots,
621             uint32_t *node_capacity, char *errbuf) {
622 36 50         if (errbuf) errbuf[0] = '\0';
623 36 100         if (max_entries == 0) { SS_ERR("max_entries must be > 0"); return 0; }
    50          
624 35 50         if (max_entries > SS_MAX_CAPACITY) { SS_ERR("max_entries too large (max %u)", SS_MAX_CAPACITY); return 0; }
    0          
625 35           uint64_t want = (uint64_t)max_entries * 10 / 7 + 1; /* index load factor ~0.7 */
626 35 50         if (want > SS_MAX_CAPACITY) want = SS_MAX_CAPACITY;
627 35           *index_slots = ss_next_pow2((uint32_t)want);
628 35           *node_capacity = (uint32_t)((uint64_t)max_entries / (SS_MIN - 1) + 64); /* worst-case fill + slack */
629 35           return 1;
630             }
631              
632 35           static SsHandle *ss_create(const char *path, uint32_t max_entries, char *errbuf) {
633             uint32_t index_slots, node_capacity;
634 35 100         if (!ss_validate_create_args(max_entries, &index_slots, &node_capacity, errbuf)) return NULL;
635              
636 34           uint64_t total = ss_total_size(index_slots, node_capacity);
637 34           int anonymous = (path == NULL);
638 34           int fd = -1;
639             size_t map_size;
640             void *base;
641              
642 34 100         if (anonymous) {
643 24           map_size = (size_t)total;
644 24           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
645 24 50         if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
646             } else {
647 10           fd = open(path, O_RDWR|O_CREAT, 0666);
648 15 50         if (fd < 0) { SS_ERR("open: %s", strerror(errno)); return NULL; }
    0          
649 10 50         if (flock(fd, LOCK_EX) < 0) { SS_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
650             struct stat st;
651 10 50         if (fstat(fd, &st) < 0) { SS_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
652 10           int is_new = (st.st_size == 0);
653 10 100         if (!is_new && (uint64_t)st.st_size < sizeof(SsHeader)) {
    100          
654 1 50         SS_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
655 1           flock(fd, LOCK_UN); close(fd); return NULL;
656             }
657 9 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
658 0 0         SS_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
659             }
660 9 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
661 9           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
662 9 50         if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
663 9 100         if (!is_new) {
664 4 50         if (!ss_validate_header((SsHeader *)base, (uint64_t)st.st_size)) {
665 0 0         SS_ERR("invalid sorted-set file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
666             }
667 4           flock(fd, LOCK_UN); close(fd);
668 4           return ss_setup(base, map_size, path, -1);
669             }
670             }
671 29           ss_init_header(base, max_entries, index_slots, node_capacity, total);
672 29 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
673 29           return ss_setup(base, map_size, path, -1);
674             }
675              
676 1           static SsHandle *ss_create_memfd(const char *name, uint32_t max_entries, char *errbuf) {
677             uint32_t index_slots, node_capacity;
678 1 50         if (!ss_validate_create_args(max_entries, &index_slots, &node_capacity, errbuf)) return NULL;
679              
680 1           uint64_t total = ss_total_size(index_slots, node_capacity);
681 1 50         int fd = memfd_create(name ? name : "sortedset", MFD_CLOEXEC | MFD_ALLOW_SEALING);
682 1 50         if (fd < 0) { SS_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
683 1 50         if (ftruncate(fd, (off_t)total) < 0) {
684 0 0         SS_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
685             }
686 1           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
687 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
688 1 50         if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
689 1           ss_init_header(base, max_entries, index_slots, node_capacity, total);
690 1           return ss_setup(base, (size_t)total, NULL, fd);
691             }
692              
693 1           static SsHandle *ss_open_fd(int fd, char *errbuf) {
694 1 50         if (errbuf) errbuf[0] = '\0';
695             struct stat st;
696 1 50         if (fstat(fd, &st) < 0) { SS_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
697 1 50         if ((uint64_t)st.st_size < sizeof(SsHeader)) { SS_ERR("too small"); return NULL; }
    0          
698 1           size_t ms = (size_t)st.st_size;
699 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
700 1 50         if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
701 1 50         if (!ss_validate_header((SsHeader *)base, (uint64_t)st.st_size)) {
702 0 0         SS_ERR("invalid sorted-set"); munmap(base, ms); return NULL;
703             }
704 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
705 1 50         if (myfd < 0) { SS_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
706 1           return ss_setup(base, ms, NULL, myfd);
707             }
708              
709 35           static void ss_destroy(SsHandle *h) {
710 35 50         if (!h) return;
711 35 100         if (h->notify_fd >= 0) close(h->notify_fd);
712 35 100         if (h->backing_fd >= 0) close(h->backing_fd);
713 35 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
714 35           free(h->path);
715 35           free(h);
716             }
717              
718 5           static inline int ss_msync(SsHandle *h) {
719 5 50         if (!h || !h->hdr) return 0;
    50          
720 5           return msync(h->hdr, h->mmap_size, MS_SYNC);
721             }
722              
723 1           static int ss_create_eventfd(SsHandle *h) {
724 1 50         if (h->notify_fd >= 0) return h->notify_fd;
725 1           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
726 1 50         if (efd < 0) return -1;
727 1           h->notify_fd = efd;
728 1           return efd;
729             }
730              
731 1           static int ss_notify(SsHandle *h) {
732 1 50         if (h->notify_fd < 0) return 0;
733 1           uint64_t v = 1;
734 1           return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
735             }
736              
737 1           static int64_t ss_eventfd_consume(SsHandle *h) {
738 1 50         if (h->notify_fd < 0) return -1;
739 1           uint64_t v = 0;
740 1 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
741 1           return (int64_t)v;
742             }
743              
744             /* ================================================================
745             * Sorted set: node pool, member index, B+tree (callers hold the lock)
746             * ================================================================ */
747              
748             /* reset to the empty set (caller holds the write lock) */
749 3           static inline void ss_clear_locked(SsHandle *h) {
750 3           SsHeader *hdr = h->hdr;
751 3           hdr->count = 0;
752 3           hdr->root = SS_NONE;
753 3           hdr->height = 0;
754 3           hdr->leftmost = SS_NONE;
755 3           hdr->rightmost = SS_NONE;
756 493 100         for (uint32_t i = 0; i < hdr->node_capacity; i++)
757 490 100         h->nodes[i].parent = (i + 1 < hdr->node_capacity) ? (i + 1) : SS_NONE;
758 3           hdr->node_free_head = 0;
759 3           memset(h->index, 0, (size_t)hdr->index_slots * sizeof(SsIdxSlot));
760 3           }
761              
762             /* ---- node pool ---- */
763 6667           static inline uint32_t ss_node_alloc(SsHandle *h) {
764 6667           uint32_t idx = h->hdr->node_free_head;
765 6667 50         if (idx == SS_NONE) return SS_NONE;
766 6667           h->hdr->node_free_head = h->nodes[idx].parent;
767 6667           return idx;
768             }
769 2338           static inline void ss_node_free(SsHandle *h, uint32_t idx) {
770 2338           h->nodes[idx].parent = h->hdr->node_free_head;
771 2338           h->hdr->node_free_head = idx;
772 2338           }
773              
774             /* ---- member -> score index (open addressing, linear probe) ---- */
775             /* returns the slot of `member` (if present) or the first empty slot for it;
776             *found (optional, may be NULL) says which */
777 360007           static inline uint32_t ss_idx_find(SsHandle *h, int64_t member, int *found) {
778 360007           uint32_t mask = h->hdr->index_slots - 1;
779 360007           uint32_t i = (uint32_t)(ss_hash_member(member) & mask);
780 399499 100         while (h->index[i].state) {
781 230609 100         if (h->index[i].member == member) { if (found) *found = 1; return i; }
    100          
782 39492           i = (i + 1) & mask;
783             }
784 168890 100         if (found) *found = 0;
785 168890           return i;
786             }
787 213158           static inline int ss_idx_get(SsHandle *h, int64_t member, double *score) {
788 213158           int f; uint32_t i = ss_idx_find(h, member, &f);
789 213158 100         if (f) { *score = h->index[i].score; return 1; }
790 90344           return 0;
791             }
792 111320           static inline void ss_idx_set(SsHandle *h, int64_t member, double score) {
793 111320           uint32_t i = ss_idx_find(h, member, NULL);
794 111320           h->index[i].member = member;
795 111320           h->index[i].score = score;
796 111320           h->index[i].state = 1;
797 111320           }
798              
799             /* ---- B+tree ---- */
800 13246           static inline uint32_t ss_node_total(const SsNode *nd) {
801 13246 100         if (nd->is_leaf) return nd->num;
802 794           uint32_t t = 0;
803 7543 100         for (int i = 0; i < nd->num; i++) t += nd->counts[i];
804 794           return t;
805             }
806              
807             /* child index in an internal node whose subtree would hold (score,member) */
808 426690           static inline int ss_child_index(const SsNode *nd, double score, int64_t member) {
809 426690           int lo = 0, hi = nd->num - 1;
810 1829545 100         while (lo < hi) {
811 1402855           int mid = (lo + hi) / 2;
812 1402855 100         if (ss_key_cmp(score, member, nd->scores[mid], nd->members[mid]) < 0) hi = mid;
813 598302           else lo = mid + 1;
814             }
815 426690           return lo;
816             }
817              
818             typedef struct { int split; uint32_t rnode; double rscore; int64_t rmember; } SsSplit;
819              
820             /* insert (score,member) into subtree nidx (member known absent); on overflow
821             split and return the new right node + separator to promote; maintains counts */
822 372640           static SsSplit ss_insert_rec(SsHandle *h, uint32_t nidx, double score, int64_t member) {
823 372640           SsNode *nd = &h->nodes[nidx];
824 372640           SsSplit r = { 0, SS_NONE, 0.0, 0 };
825 372640 100         if (nd->is_leaf) {
826 111293           int pos = 0;
827 763356 100         while (pos < nd->num && ss_key_cmp(nd->scores[pos], nd->members[pos], score, member) < 0) pos++;
    100          
828 715616 100         for (int i = nd->num; i > pos; i--) { nd->scores[i] = nd->scores[i-1]; nd->members[i] = nd->members[i-1]; }
829 111293           nd->scores[pos] = score; nd->members[pos] = member; nd->num++;
830 111293 100         if (nd->num <= SS_ORDER) return r;
831 6226           uint32_t ridx = ss_node_alloc(h);
832 6226           SsNode *rn = &h->nodes[ridx];
833 6226           rn->is_leaf = 1; rn->parent = nd->parent;
834 6226           int mid = nd->num / 2, rc = nd->num - mid;
835 62260 100         for (int i = 0; i < rc; i++) { rn->scores[i] = nd->scores[mid+i]; rn->members[i] = nd->members[mid+i]; }
836 6226           rn->num = (uint16_t)rc; nd->num = (uint16_t)mid;
837 6226           rn->next = nd->next; rn->prev = nidx;
838 6226 100         if (nd->next != SS_NONE) h->nodes[nd->next].prev = ridx; else h->hdr->rightmost = ridx;
839 6226           nd->next = ridx;
840 6226           r.split = 1; r.rnode = ridx; r.rscore = rn->scores[0]; r.rmember = rn->members[0];
841 6226           return r;
842             }
843 261347           int c = ss_child_index(nd, score, member);
844 261347           SsSplit cr = ss_insert_rec(h, nd->children[c], score, member);
845 261347 100         if (!cr.split) { nd->counts[c]++; return r; }
846 42340 100         for (int i = nd->num; i > c + 1; i--) { nd->children[i] = nd->children[i-1]; nd->counts[i] = nd->counts[i-1]; }
847 42340 100         for (int i = nd->num - 1; i > c; i--) { nd->scores[i] = nd->scores[i-1]; nd->members[i] = nd->members[i-1]; }
848 6606           nd->scores[c] = cr.rscore; nd->members[c] = cr.rmember;
849 6606           nd->children[c+1] = cr.rnode; h->nodes[cr.rnode].parent = nidx;
850 6606           nd->num++;
851 6606           nd->counts[c] = ss_node_total(&h->nodes[nd->children[c]]);
852 6606           nd->counts[c+1] = ss_node_total(&h->nodes[cr.rnode]);
853 6606 100         if (nd->num <= SS_ORDER) return r;
854 397           uint32_t ridx = ss_node_alloc(h);
855 397           SsNode *rn = &h->nodes[ridx];
856 397           rn->is_leaf = 0; rn->parent = nd->parent;
857 397           int midc = nd->num / 2, rch = nd->num - midc;
858 3970 100         for (int i = 0; i < rch; i++) {
859 3573           rn->children[i] = nd->children[midc+i];
860 3573           rn->counts[i] = nd->counts[midc+i];
861 3573           h->nodes[rn->children[i]].parent = ridx;
862             }
863 3573 100         for (int i = 0; i < rch - 1; i++) { rn->scores[i] = nd->scores[midc+i]; rn->members[i] = nd->members[midc+i]; }
864 397           rn->num = (uint16_t)rch;
865 397           double up_s = nd->scores[midc-1]; int64_t up_m = nd->members[midc-1];
866 397           nd->num = (uint16_t)midc;
867 397           r.split = 1; r.rnode = ridx; r.rscore = up_s; r.rmember = up_m;
868 397           return r;
869             }
870              
871 111320           static void ss_tree_add(SsHandle *h, double score, int64_t member) {
872 111320           SsHeader *hdr = h->hdr;
873 111320 100         if (hdr->root == SS_NONE) {
874 27           uint32_t l = ss_node_alloc(h);
875 27           SsNode *nd = &h->nodes[l];
876 27           nd->is_leaf = 1; nd->parent = SS_NONE; nd->next = SS_NONE; nd->prev = SS_NONE;
877 27           nd->num = 1; nd->scores[0] = score; nd->members[0] = member;
878 27           hdr->root = l; hdr->leftmost = l; hdr->rightmost = l; hdr->height = 1;
879 27           hdr->count++;
880 27           return;
881             }
882 111293           SsSplit r = ss_insert_rec(h, hdr->root, score, member);
883 111293 100         if (r.split) {
884 17           uint32_t nr = ss_node_alloc(h);
885 17           SsNode *root = &h->nodes[nr];
886 17           root->is_leaf = 0; root->parent = SS_NONE; root->num = 2;
887 17           root->children[0] = hdr->root; root->children[1] = r.rnode;
888 17           root->scores[0] = r.rscore; root->members[0] = r.rmember;
889 17           root->counts[0] = ss_node_total(&h->nodes[hdr->root]);
890 17           root->counts[1] = ss_node_total(&h->nodes[r.rnode]);
891 17           h->nodes[hdr->root].parent = nr;
892 17           h->nodes[r.rnode].parent = nr;
893 17           hdr->root = nr; hdr->height++;
894             }
895 111293           hdr->count++;
896             }
897              
898             /* index backward-shift deletion (keeps probe sequences contiguous) */
899 35529           static void ss_idx_del(SsHandle *h, int64_t member) {
900 35529           uint32_t mask = h->hdr->index_slots - 1;
901 35529           int f; uint32_t i = ss_idx_find(h, member, &f);
902 35529 50         if (!f) return;
903 35529           uint32_t j = i;
904 6107           for (;;) {
905 41636           j = (j + 1) & mask;
906 41636 100         if (!h->index[j].state) break;
907 6107           uint32_t k = (uint32_t)(ss_hash_member(h->index[j].member) & mask);
908 6107 50         int in = (i < j) ? (i < k && k <= j) : (k > i || k <= j); /* k in (i, j] ? */
    100          
    50          
    0          
    0          
909 6107 100         if (!in) { h->index[i] = h->index[j]; i = j; }
910             }
911 35529           h->index[i].state = 0;
912             }
913              
914             /* merge children[c] and children[c+1] of pidx into children[c]; frees the right
915             node and pulls separator c down (for internal nodes) */
916 2334           static void ss_merge(SsHandle *h, uint32_t pidx, int c) {
917 2334           SsNode *p = &h->nodes[pidx];
918 2334           uint32_t lidx = p->children[c], ridx = p->children[c+1];
919 2334           SsNode *ln = &h->nodes[lidx], *rn = &h->nodes[ridx];
920 2334 100         if (ln->is_leaf) {
921 19462 100         for (int i = 0; i < rn->num; i++) { ln->scores[ln->num+i] = rn->scores[i]; ln->members[ln->num+i] = rn->members[i]; }
922 2307           ln->next = rn->next;
923 2307 100         if (rn->next != SS_NONE) h->nodes[rn->next].prev = lidx; else h->hdr->rightmost = lidx;
924 2307           ln->num = (uint16_t)(ln->num + rn->num);
925             } else {
926 27           ln->scores[ln->num-1] = p->scores[c]; ln->members[ln->num-1] = p->members[c]; /* pull separator down */
927 234 100         for (int i = 0; i < rn->num; i++) { ln->children[ln->num+i] = rn->children[i]; ln->counts[ln->num+i] = rn->counts[i]; h->nodes[rn->children[i]].parent = lidx; }
928 207 100         for (int i = 0; i < rn->num - 1; i++) { ln->scores[ln->num+i] = rn->scores[i]; ln->members[ln->num+i] = rn->members[i]; }
929 27           ln->num = (uint16_t)(ln->num + rn->num);
930             }
931 2334           p->counts[c] += p->counts[c+1];
932 15567 100         for (int i = c+1; i+1 < p->num; i++) { p->children[i] = p->children[i+1]; p->counts[i] = p->counts[i+1]; }
933 15567 100         for (int i = c; i+1 < p->num-1; i++) { p->scores[i] = p->scores[i+1]; p->members[i] = p->members[i+1]; }
934 2334           p->num--;
935 2334           ss_node_free(h, ridx);
936 2334           }
937              
938             /* fix an underflow in children[c] of pidx by borrowing from a sibling or merging */
939 11908           static void ss_fix_underflow(SsHandle *h, uint32_t pidx, int c) {
940 11908           SsNode *p = &h->nodes[pidx];
941 11908           uint32_t cidx = p->children[c];
942 11908           SsNode *cn = &h->nodes[cidx];
943 11908 100         if (c > 0) {
944 9105           uint32_t lidx = p->children[c-1]; SsNode *ln = &h->nodes[lidx];
945 9105 100         if (ln->num > SS_MIN) { /* borrow from left */
946 6195 100         if (cn->is_leaf) {
947 49144 100         for (int i = cn->num; i > 0; i--) { cn->scores[i] = cn->scores[i-1]; cn->members[i] = cn->members[i-1]; }
948 6143           cn->scores[0] = ln->scores[ln->num-1]; cn->members[0] = ln->members[ln->num-1];
949 6143           cn->num++; ln->num--;
950 6143           p->scores[c-1] = cn->scores[0]; p->members[c-1] = cn->members[0];
951 6143           p->counts[c-1]--; p->counts[c]++;
952             } else {
953 52           uint32_t moved = ln->counts[ln->num-1];
954 416 100         for (int i = cn->num; i > 0; i--) { cn->children[i] = cn->children[i-1]; cn->counts[i] = cn->counts[i-1]; }
955 364 100         for (int i = cn->num-1; i > 0; i--) { cn->scores[i] = cn->scores[i-1]; cn->members[i] = cn->members[i-1]; }
956 52           cn->scores[0] = p->scores[c-1]; cn->members[0] = p->members[c-1];
957 52           cn->children[0] = ln->children[ln->num-1]; cn->counts[0] = moved;
958 52           h->nodes[cn->children[0]].parent = cidx;
959 52           cn->num++;
960 52           p->scores[c-1] = ln->scores[ln->num-2]; p->members[c-1] = ln->members[ln->num-2];
961 52           ln->num--;
962 52           p->counts[c-1] -= moved; p->counts[c] += moved;
963             }
964 6195           return;
965             }
966             }
967 5713 100         if (c < p->num - 1) {
968 5091           uint32_t ridx = p->children[c+1]; SsNode *rn = &h->nodes[ridx];
969 5091 100         if (rn->num > SS_MIN) { /* borrow from right */
970 3379 100         if (cn->is_leaf) {
971 3327           cn->scores[cn->num] = rn->scores[0]; cn->members[cn->num] = rn->members[0]; cn->num++;
972 34314 100         for (int i = 0; i+1 < rn->num; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
973 3327           rn->num--;
974 3327           p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
975 3327           p->counts[c]++; p->counts[c+1]--;
976             } else {
977 52           uint32_t moved = rn->counts[0];
978 52           cn->scores[cn->num-1] = p->scores[c]; cn->members[cn->num-1] = p->members[c];
979 52           cn->children[cn->num] = rn->children[0]; cn->counts[cn->num] = moved;
980 52           h->nodes[cn->children[cn->num]].parent = cidx;
981 52           cn->num++;
982 52           p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
983 526 100         for (int i = 0; i+1 < rn->num; i++) { rn->children[i] = rn->children[i+1]; rn->counts[i] = rn->counts[i+1]; }
984 474 100         for (int i = 0; i+1 < rn->num-1; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
985 52           rn->num--;
986 52           p->counts[c] += moved; p->counts[c+1] -= moved;
987             }
988 3379           return;
989             }
990             }
991 2334 100         ss_merge(h, pidx, (c > 0) ? c - 1 : c); /* merge with a sibling */
992             }
993              
994             /* delete (score,member) from subtree nidx (known present); decrement counts;
995             return 1 if nidx now underflows (num < SS_MIN) */
996 204597           static int ss_delete_rec(SsHandle *h, uint32_t nidx, double score, int64_t member) {
997 204597           SsNode *nd = &h->nodes[nidx];
998 204597 100         if (nd->is_leaf) {
999 68303           int pos = 0;
1000 380161 50         while (pos < nd->num && ss_key_cmp(nd->scores[pos], nd->members[pos], score, member) != 0) pos++;
    100          
1001 441891 100         for (int i = pos; i+1 < nd->num; i++) { nd->scores[i] = nd->scores[i+1]; nd->members[i] = nd->members[i+1]; }
1002 68303           nd->num--;
1003 68303           return nd->num < SS_MIN;
1004             }
1005 136294           int c = ss_child_index(nd, score, member);
1006 136294           int under = ss_delete_rec(h, nd->children[c], score, member);
1007 136294           nd->counts[c]--;
1008 136294 100         if (under) ss_fix_underflow(h, nidx, c);
1009 136294           return nd->num < SS_MIN;
1010             }
1011              
1012 68303           static void ss_tree_del(SsHandle *h, double score, int64_t member) {
1013 68303           SsHeader *hdr = h->hdr;
1014 68303           ss_delete_rec(h, hdr->root, score, member);
1015 68303           SsNode *root = &h->nodes[hdr->root];
1016 68303 100         if (root->is_leaf) {
1017 23 100         if (root->num == 0) {
1018 3           ss_node_free(h, hdr->root);
1019 3           hdr->root = SS_NONE; hdr->leftmost = SS_NONE; hdr->rightmost = SS_NONE; hdr->height = 0;
1020             }
1021 68280 100         } else if (root->num == 1) {
1022 1           uint32_t child = root->children[0];
1023 1           ss_node_free(h, hdr->root);
1024 1           hdr->root = child; h->nodes[child].parent = SS_NONE; hdr->height--;
1025             }
1026 68303           hdr->count--;
1027 68303           }
1028              
1029             /* add: 1 (new), 0 (existing -- score updated if changed), -1 (full) */
1030 97364           static int ss_add_locked(SsHandle *h, int64_t member, double score) {
1031             double old;
1032 97364 100         if (ss_idx_get(h, member, &old)) {
1033 27138 100         if (old != score) { ss_tree_del(h, old, member); ss_tree_add(h, score, member); ss_idx_set(h, member, score); }
1034 27138           return 0;
1035             }
1036 70226 100         if (h->hdr->count >= h->hdr->max_entries) return -1;
1037 70223           ss_tree_add(h, score, member);
1038 70223           ss_idx_set(h, member, score);
1039 70223           return 1;
1040             }
1041              
1042             /* remove: 1 if removed, 0 if absent */
1043 23436           static int ss_remove_locked(SsHandle *h, int64_t member) {
1044             double old;
1045 23436 100         if (!ss_idx_get(h, member, &old)) return 0;
1046 11747           ss_tree_del(h, old, member);
1047 11747           ss_idx_del(h, member);
1048 11747           return 1;
1049             }
1050              
1051             /* incr by delta. *out = new score. returns 1 (created), 0 (updated), -1 (full),
1052             -2 (result is NaN) */
1053 16395           static int ss_incr_locked(SsHandle *h, int64_t member, double delta, double *out) {
1054             double old;
1055 16395 100         if (ss_idx_get(h, member, &old)) {
1056 8072           double ns = old + delta; *out = ns;
1057 8072 100         if (ns != ns) return -2;
1058 8071 100         if (ns != old) { ss_tree_del(h, old, member); ss_tree_add(h, ns, member); ss_idx_set(h, member, ns); }
1059 8071           return 0;
1060             }
1061 8323 50         if (h->hdr->count >= h->hdr->max_entries) return -1;
1062 8323           *out = delta;
1063 8323 50         if (delta != delta) return -2;
1064 8323           ss_tree_add(h, delta, member); ss_idx_set(h, member, delta);
1065 8323           return 1;
1066             }
1067              
1068             /* pop the min (max=0) or max (max=1): 0 if empty, else 1 with *m,*s */
1069 23784           static int ss_pop_locked(SsHandle *h, int max, int64_t *m, double *s) {
1070 23784 100         if (h->hdr->root == SS_NONE) return 0;
1071 23782 100         SsNode *nd = &h->nodes[max ? h->hdr->rightmost : h->hdr->leftmost];
1072 23782 100         int pos = max ? nd->num - 1 : 0;
1073 23782           *m = nd->members[pos]; *s = nd->scores[pos];
1074 23782           ss_tree_del(h, *s, *m);
1075 23782           ss_idx_del(h, *m);
1076 23782           return 1;
1077             }
1078              
1079             /* ---- structural validator (debug / tests) ---- */
1080 34888           static long ss_check_rec(SsHandle *h, uint32_t nidx, int depth, int is_root,
1081             double *ps, int64_t *pm, int *hp, int *leaf_depth) {
1082 34888           SsNode *nd = &h->nodes[nidx];
1083 34888 50         if (nd->num < 1 || nd->num > SS_ORDER) return -1;
    50          
1084 34888 100         if (!is_root && nd->num < SS_MIN) return -1;
    50          
1085 34888 100         if (nd->is_leaf) {
1086 31710 100         if (*leaf_depth < 0) *leaf_depth = depth;
1087 31606 50         else if (*leaf_depth != depth) return -1;
1088 380706 100         for (int i = 0; i < nd->num; i++) {
1089 348996 100         if (*hp && ss_key_cmp(*ps, *pm, nd->scores[i], nd->members[i]) >= 0) return -1;
    50          
1090 348996           *ps = nd->scores[i]; *pm = nd->members[i]; *hp = 1;
1091             }
1092 31710           return nd->num;
1093             }
1094 3178           long total = 0;
1095 37962 100         for (int i = 0; i < nd->num; i++) {
1096 34784           long c = ss_check_rec(h, nd->children[i], depth + 1, 0, ps, pm, hp, leaf_depth);
1097 34784 50         if (c < 0 || (uint32_t)c != nd->counts[i]) return -1;
    50          
1098 34784           total += c;
1099             }
1100 3178           return total;
1101             }
1102 105           static int ss_validate_tree(SsHandle *h) {
1103 105           SsHeader *hdr = h->hdr;
1104 105 100         if (hdr->root == SS_NONE) return hdr->count == 0 && hdr->leftmost == SS_NONE;
    50          
    50          
1105 104           double ps = 0; int64_t pm = 0; int hp = 0, ld = -1;
1106 104           long total = ss_check_rec(h, hdr->root, 0, 1, &ps, &pm, &hp, &ld);
1107 104 50         if (total < 0 || (uint32_t)total != hdr->count) return 0;
    50          
1108             /* leaf links == in-order, and reach rightmost */
1109 104           uint32_t leaf = hdr->leftmost; double ls = 0; int64_t lm = 0; int lh = 0; uint32_t seen = 0;
1110 31814 100         while (leaf != SS_NONE) {
1111 31710           SsNode *nd = &h->nodes[leaf];
1112 31710 50         if (!nd->is_leaf) return 0;
1113 380706 100         for (int i = 0; i < nd->num; i++) {
1114 348996 100         if (lh && ss_key_cmp(ls, lm, nd->scores[i], nd->members[i]) >= 0) return 0;
    50          
1115 348996           ls = nd->scores[i]; lm = nd->members[i]; lh = 1; seen++;
1116             }
1117 31710 100         if (nd->next == SS_NONE && leaf != hdr->rightmost) return 0;
    50          
1118 31710           leaf = nd->next;
1119             }
1120 104 50         if (seen != hdr->count) return 0;
1121             /* index population == count */
1122 104           uint32_t icount = 0;
1123 2796160 100         for (uint32_t i = 0; i < hdr->index_slots; i++) if (h->index[i].state) icount++;
    100          
1124 104           return icount == hdr->count;
1125             }
1126              
1127             /* ---- order-statistics queries (read paths; caller holds the read lock) ---- */
1128              
1129             /* number of entries strictly less than (score, member) -- the rank of the key */
1130 10339           static uint32_t ss_rank_of(SsHandle *h, double score, int64_t member) {
1131 10339           uint32_t n = h->hdr->root, rank = 0;
1132 39388 100         while (!h->nodes[n].is_leaf) {
1133 29049           SsNode *nd = &h->nodes[n];
1134 29049           int c = ss_child_index(nd, score, member);
1135 165329 100         for (int i = 0; i < c; i++) rank += nd->counts[i];
1136 29049           n = nd->children[c];
1137             }
1138 10339           SsNode *nd = &h->nodes[n];
1139 10339           int pos = 0;
1140 64130 100         while (pos < nd->num && ss_key_cmp(nd->scores[pos], nd->members[pos], score, member) < 0) pos++;
    100          
1141 10339           return rank + (uint32_t)pos;
1142             }
1143              
1144             /* entry at 0-based rank r (r < count); returns leaf idx and sets *pos */
1145 381           static uint32_t ss_at_rank(SsHandle *h, uint32_t r, int *pos) {
1146 381           uint32_t n = h->hdr->root;
1147 1464 100         while (!h->nodes[n].is_leaf) {
1148 1083           SsNode *nd = &h->nodes[n];
1149 1083           int c = 0;
1150 3377 50         while (c < nd->num && r >= nd->counts[c]) { r -= nd->counts[c]; c++; }
    100          
1151 1083           n = nd->children[c];
1152             }
1153 381           *pos = (int)r;
1154 381           return n;
1155             }
1156              
1157             /* number of entries with score in [min, max] (inclusive). *lo_out (optional, may
1158             be NULL) receives #{score < min} -- the rank of the first in-range entry -- so
1159             range_by_score can reuse it instead of recomputing ss_rank_of(min, ...). */
1160 104           static uint32_t ss_count_in_score(SsHandle *h, double min, double max, uint32_t *lo_out) {
1161 104 100         if (h->hdr->root == SS_NONE || !(min <= max)) { if (lo_out) *lo_out = 0; return 0; }
    50          
    50          
1162 103           uint32_t lo = ss_rank_of(h, min, INT64_MIN); /* #{score < min} */
1163 103 100         if (lo_out) *lo_out = lo;
1164 103           uint32_t hi = ss_rank_of(h, max, INT64_MAX); /* #{key < (max, INT64_MAX)} */
1165             double sc;
1166 103 100         if (ss_idx_get(h, INT64_MAX, &sc) && sc == max) hi++; /* + the (max, INT64_MAX) entry itself */
    100          
1167 103           return hi - lo;
1168             }
1169              
1170             /* result collector: parallel member/score arrays */
1171             typedef struct { int64_t *members; double *scores; size_t n, cap; } ss_rcollect_t;
1172 93733           static int ss_rcollect_push(ss_rcollect_t *c, int64_t m, double s) {
1173 93733 100         if (c->n == c->cap) {
1174 315 100         size_t nc = c->cap ? c->cap * 2 : 64;
1175 315           int64_t *nm = (int64_t *)realloc(c->members, nc * sizeof(int64_t));
1176 315 50         if (!nm) return 0; c->members = nm;
1177 315           double *ns = (double *)realloc(c->scores, nc * sizeof(double));
1178 315 50         if (!ns) return 0; c->scores = ns;
1179 315           c->cap = nc;
1180             }
1181 93733           c->members[c->n] = m; c->scores[c->n] = s; c->n++;
1182 93733           return 1;
1183             }
1184              
1185             /* collect `len` entries starting at rank `start` (in order, or reversed). The
1186             caller has clamped [start, start+len) within [0, count]. Returns 0 on OOM. */
1187 83           static int ss_collect_range(SsHandle *h, uint32_t start, uint32_t len, int reverse, ss_rcollect_t *c) {
1188 83 100         if (len == 0 || h->hdr->root == SS_NONE) return 1;
    50          
1189             int pos;
1190 77 100         uint32_t leaf = ss_at_rank(h, reverse ? (start + len - 1) : start, &pos);
1191 77           uint32_t got = 0;
1192 77 100         if (!reverse) {
1193 5589 100         while (leaf != SS_NONE && got < len) {
    100          
1194 5538           SsNode *nd = &h->nodes[leaf];
1195 65924 100         for (; pos < nd->num && got < len; pos++, got++)
    100          
1196 60386 50         if (!ss_rcollect_push(c, nd->members[pos], nd->scores[pos])) return 0;
1197 5538           leaf = nd->next; pos = 0;
1198             }
1199             } else {
1200 3070 100         while (leaf != SS_NONE && got < len) {
    100          
1201 3044           SsNode *nd = &h->nodes[leaf];
1202 36391 100         for (; pos >= 0 && got < len; pos--, got++)
    100          
1203 33347 50         if (!ss_rcollect_push(c, nd->members[pos], nd->scores[pos])) return 0;
1204 3044           leaf = nd->prev;
1205 3044 100         if (leaf != SS_NONE) pos = h->nodes[leaf].num - 1;
1206             }
1207             }
1208 77           return 1;
1209             }
1210              
1211             #endif /* SORTEDSET_H */
1212