File Coverage

dsu.h
Criterion Covered Total %
statement 204 332 61.4
branch 90 256 35.1
condition n/a
subroutine n/a
pod n/a
total 294 588 50.0


line stmt bran cond sub pod time code
1             /*
2             * dsu.h -- Shared-memory union-find (disjoint-set) for Linux
3             *
4             * Maintains a partition of a fixed universe of N integer elements (0..N-1)
5             * into disjoint sets. union(a,b) merges the two sets containing a and b;
6             * find(x) returns the canonical representative (root) of x's set; connected
7             * tests whether two elements are in the same set. Path compression (path
8             * halving) on find plus union by size give near-constant amortized time per
9             * operation. The parent/size arrays live in a shared mapping so several
10             * processes share one structure; a write-preferring futex rwlock with
11             * reader-slot dead-process recovery guards mutation.
12             *
13             * NOTE: find / connected / set_size perform path compression -- they MUTATE
14             * the structure -- so every accessor that calls dsu_find acquires the WRITE
15             * lock. Only num_sets / capacity are true read-only operations.
16             *
17             * Layout: Header -> reader_slots[1024] -> parent[n] (uint32) -> size[n] (uint32)
18             */
19              
20             #ifndef DSU_H
21             #define DSU_H
22              
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37             #include
38             #include
39              
40             #if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
41             #error "dsu.h: requires little-endian architecture"
42             #endif
43              
44              
45             /* ================================================================
46             * Constants
47             * ================================================================ */
48              
49             #define DSU_MAGIC 0x55534444U /* "DDSU" (little-endian) */
50             #define DSU_VERSION 1
51             #define DSU_ERR_BUFLEN 256
52             #define DSU_READER_SLOTS 1024 /* max concurrent reader processes for dead-process recovery */
53             #define DSU_MAX_N (1u << 31) /* 2.1B elements: keeps n*8 well within size_t and size[] sums within uint32 */
54              
55             #define DSU_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, DSU_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while (0)
56              
57             /* ================================================================
58             * Structs
59             * ================================================================ */
60              
61             /* Per-process slot for dead-process recovery. Each shared rwlock counter
62             * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
63             * is mirrored here so a wrlock timeout can attribute and reverse a dead
64             * process's contribution instead of waiting for the slow per-op timeout
65             * drain. */
66             typedef struct {
67             uint32_t pid; /* 0 = unclaimed */
68             uint32_t subcount; /* in-flight rdlock acquisitions for this process */
69             uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
70             uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
71             } DsuReaderSlot;
72              
73             struct DsuHeader {
74             uint32_t magic, version; /* 0,4 */
75             uint32_t _pad0; /* 8 */
76             uint32_t _pad1; /* 12 */
77              
78             /* ---- configuration / partition state ---- */
79             uint32_t n; /* 16 number of elements = capacity */
80             uint32_t num_sets; /* 20 current count of disjoint sets */
81             uint32_t _pad2; /* 24 */
82             uint32_t _pad3; /* 28 */
83              
84             /* ---- offsets / size ---- */
85             uint64_t total_size; /* 32 */
86             uint64_t reader_slots_off; /* 40 */
87             uint64_t parent_off; /* 48 */
88             uint64_t size_off; /* 56 */
89              
90             /* ---- lock + stats ---- */
91             uint32_t rwlock; /* 64 */
92             uint32_t rwlock_waiters; /* 68 */
93             uint32_t rwlock_writers_waiting; /* 72 */
94             uint32_t _pad4; /* 76 */
95             uint64_t stat_ops; /* 80 */
96             uint8_t _pad[168]; /* 88..255 */
97             };
98             typedef struct DsuHeader DsuHeader;
99              
100             _Static_assert(sizeof(DsuHeader) == 256, "DsuHeader must be 256 bytes");
101              
102             /* ---- Process-local handle ---- */
103              
104             typedef struct DsuHandle {
105             DsuHeader *hdr;
106             DsuReaderSlot *reader_slots; /* DSU_READER_SLOTS entries */
107             void *base; /* mmap base */
108             size_t mmap_size;
109             char *path; /* backing file path (strdup'd) */
110             int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
111             uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
112             uint32_t cached_pid; /* getpid() cached at last slot claim */
113             uint32_t cached_fork_gen; /* dsu_fork_gen value at last slot claim */
114             } DsuHandle;
115              
116             /* ================================================================
117             * Futex-based write-preferring read-write lock
118             * with reader-slot dead-process recovery
119             * ================================================================ */
120              
121             #define DSU_RWLOCK_SPIN_LIMIT 32
122             #define DSU_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
123              
124 0           static inline void dsu_rwlock_spin_pause(void) {
125             #if defined(__x86_64__) || defined(__i386__)
126 0           __asm__ volatile("pause" ::: "memory");
127             #elif defined(__aarch64__)
128             __asm__ volatile("yield" ::: "memory");
129             #else
130             __asm__ volatile("" ::: "memory");
131             #endif
132 0           }
133              
134             /* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
135             #define DSU_RWLOCK_WRITER_BIT 0x80000000U
136             #define DSU_RWLOCK_PID_MASK 0x7FFFFFFFU
137             #define DSU_RWLOCK_WR(pid) (DSU_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & DSU_RWLOCK_PID_MASK))
138              
139             /* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
140             /* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
141             * lock-holder's PID is recycled to an unrelated live process before recovery
142             * runs, this reports "alive" and that slot's orphaned contribution is not
143             * reclaimed until the recycled process exits. Robust detection would require
144             * a per-slot process-start-time epoch (a header-layout/version change).
145             * Documented under "Crash Safety" in the POD. */
146 0           static inline int dsu_pid_alive(uint32_t pid) {
147 0 0         if (pid == 0) return 1; /* no owner recorded, assume alive */
148 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
149             }
150              
151             /* Force-recover a stale write lock left by a dead process.
152             * CAS to OUR pid to hold the lock while fixing shared state, then release.
153             * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
154             * recovering process can detect and re-recover if we crash mid-recovery. */
155 0           static inline void dsu_recover_stale_lock(DsuHandle *h, uint32_t observed_rwlock) {
156 0           DsuHeader *hdr = h->hdr;
157 0           uint32_t mypid = DSU_RWLOCK_WR((uint32_t)getpid());
158 0 0         if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
159             mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
160 0           return;
161             /* We now hold the write lock as mypid. No additional shared state needs
162             * repair here (this module has no seqlock); just release the lock. */
163 0           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
164 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
165 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
166             }
167              
168             static const struct timespec dsu_lock_timeout = { DSU_LOCK_TIMEOUT_SEC, 0 };
169              
170             /* Process-global fork-generation counter. Incremented in the pthread_atfork
171             * child callback so every open handle detects a fork transition on the next
172             * lock call without paying a getpid() syscall on the hot path. */
173             static uint32_t dsu_fork_gen = 1;
174             static pthread_once_t dsu_atfork_once = PTHREAD_ONCE_INIT;
175 0           static void dsu_on_fork_child(void) {
176 0           __atomic_add_fetch(&dsu_fork_gen, 1, __ATOMIC_RELAXED);
177 0           }
178 2           static void dsu_atfork_init(void) {
179 2           pthread_atfork(NULL, NULL, dsu_on_fork_child);
180 2           }
181              
182             /* Ensure this process owns a reader slot. Called from the lock helpers so
183             * that fork()'d children pick up their own slot lazily instead of sharing
184             * the parent's. Hot-path is a single relaxed load + compare; only on a
185             * fork-generation mismatch do we touch getpid() and scan slots. */
186 14857           static inline void dsu_claim_reader_slot(DsuHandle *h) {
187 14857           uint32_t cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
188 14857 100         if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
    50          
189 14842           return;
190             /* Cold path -- register the atfork hook once per process, then claim. */
191 15           pthread_once(&dsu_atfork_once, dsu_atfork_init);
192             /* Re-read after pthread_once: dsu_on_fork_child may have bumped it. */
193 15           cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
194 15           uint32_t now_pid = (uint32_t)getpid();
195 15           h->cached_pid = now_pid;
196 15           h->cached_fork_gen = cur_gen;
197 15           h->my_slot_idx = UINT32_MAX;
198 15           uint32_t start = now_pid % DSU_READER_SLOTS;
199 17 50         for (uint32_t i = 0; i < DSU_READER_SLOTS; i++) {
200 17           uint32_t s = (start + i) % DSU_READER_SLOTS;
201 17           uint32_t expected = 0;
202 17 100         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
203             &expected, now_pid, 0,
204             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
205             /* Zero all mirror fields, not just subcount: a SIGKILL'd
206             * predecessor may have left waiters_parked/writers_parked
207             * non-zero, and dsu_recover_dead_readers won't drain them
208             * once we own the slot (the CAS expects the dead PID). */
209 15           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
210 15           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
211 15           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
212 15           h->my_slot_idx = s;
213 15           return;
214             }
215             }
216             /* Table full -- leave my_slot_idx = UINT32_MAX so we silently skip
217             * tracking for this handle (lock still works; just no recovery). */
218             }
219              
220             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
221 0           static inline void dsu_atomic_sub_cap(uint32_t *p, uint32_t sub) {
222 0 0         if (!sub) return;
223 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
224 0           for (;;) {
225 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
226 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
227             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
228 0           return;
229             }
230             }
231              
232             /* Try to claim a dead slot (CAS pid -> 0) and drain its parked-waiter
233             * contributions back to the global counters. A no-op if the slot was stolen
234             * by another recoverer or had no waiter contribution to drain.
235             *
236             * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
237             * Between our CAS and a follow-up store, a new process could claim the
238             * slot and start populating these fields -- our stores would clobber its
239             * state. dsu_claim_reader_slot zeros all three on every claim, so
240             * leaving stale values is harmless. */
241 0           static inline void dsu_drain_dead_slot(DsuHandle *h, uint32_t i, uint32_t pid) {
242 0           DsuHeader *hdr = h->hdr;
243 0           uint32_t expected = pid;
244             /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
245             * ACQUIRE syncs us with prior writes from the dead process to
246             * waiters_parked/writers_parked. On weakly-ordered archs (aarch64)
247             * a plain RELAXED load before the CAS could miss those writes;
248             * loading them after the CAS keeps them inside the acquire window. */
249 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
250             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
251 0           return;
252 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
253 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
254 0 0         if (wp) dsu_atomic_sub_cap(&hdr->rwlock_waiters, wp);
255 0 0         if (writp) dsu_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp);
256             }
257              
258             /* Scan reader slots for dead-process recovery.
259             *
260             * For each dead PID with non-zero contributions to the shared rwlock,
261             * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
262             * out so live processes don't have to wait for the slow per-op timeout
263             * decrement to drain it for them.
264             *
265             * For the main rwlock counter we use the "no live reader holds -> force-
266             * reset to 0" trick (precise) because per-process attribution of the
267             * subcount is racy across the inc-counter-then-inc-subcount window. */
268 0           static inline void dsu_recover_dead_readers(DsuHandle *h) {
269 0 0         if (!h->reader_slots) return;
270 0           DsuHeader *hdr = h->hdr;
271 0           int any_live_reader = 0;
272 0           int found_dead_reader = 0;
273              
274             /* Pass 1: classify slots. Slots with dead pid and sc == 0 (no rwlock
275             * contribution to lose) are wiped immediately to free the slot for
276             * future claimants and drain any orphan parked-waiter counters. Slots
277             * with dead pid and sc > 0 are left intact in this pass: if force-
278             * reset cannot fire (because a live reader is concurrently present),
279             * wiping the dead slot would lose the only record of its orphan
280             * rwlock contribution and strand writers permanently once the live
281             * reader releases. */
282 0 0         for (uint32_t i = 0; i < DSU_READER_SLOTS; i++) {
283 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
284 0 0         if (pid == 0) continue;
285 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
286 0 0         if (dsu_pid_alive(pid)) {
287 0 0         if (sc > 0) any_live_reader = 1;
288 0           continue;
289             }
290 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
291 0           dsu_drain_dead_slot(h, i, pid);
292             }
293              
294             /* Pass 2: only if force-reset will fire. Issue the rwlock force-
295             * reset CAS FIRST, while the window since pass 1's last scan is
296             * still narrow (a handful of instructions, as in the original
297             * single-pass code). A new reader that started rdlock between
298             * pass 1's scan and the CAS will either:
299             * (a) have already CAS'd rwlock from cur to cur+1 -- our CAS then
300             * fails (cur mismatched), recovery yields and a future
301             * cycle retries; or
302             * (b) be still in the subcount-bump phase -- our CAS sees the
303             * stale cur and resets to 0; the new reader's subsequent CAS
304             * rwlock(0 -> 1) succeeds cleanly.
305             * Only after the CAS resolves do we wipe the deferred dead slots,
306             * keeping that work outside the race-sensitive window. */
307 0 0         if (found_dead_reader && !any_live_reader) {
    0          
308 0           uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
309 0 0         if (cur > 0 && cur < DSU_RWLOCK_WRITER_BIT) {
    0          
310 0 0         if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
311             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
312 0 0         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
313 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
314             }
315             }
316 0 0         for (uint32_t i = 0; i < DSU_READER_SLOTS; i++) {
317 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
318 0 0         if (pid == 0 || dsu_pid_alive(pid)) continue;
    0          
319 0           dsu_drain_dead_slot(h, i, pid);
320             }
321             }
322             }
323              
324             /* Inspect the lock word after a futex-wait timeout. If a dead writer
325             * holds it, force-recover the lock. Otherwise drain dead readers' shares
326             * of the rwlock/waiter counters. Called from rdlock and wrlock ETIMEDOUT
327             * branches -- identical recovery logic in both. */
328 0           static inline void dsu_recover_after_timeout(DsuHandle *h) {
329 0           DsuHeader *hdr = h->hdr;
330 0           uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
331 0 0         if (val >= DSU_RWLOCK_WRITER_BIT) {
332 0           uint32_t pid = val & DSU_RWLOCK_PID_MASK;
333 0 0         if (!dsu_pid_alive(pid))
334 0           dsu_recover_stale_lock(h, val);
335             } else {
336 0           dsu_recover_dead_readers(h);
337             }
338 0           }
339              
340             /* Park/unpark helpers: bump the global waiter counters together with this
341             * process's mirrored slot counters so a wrlock-timeout recovery scan can
342             * attribute and reverse a dead PID's contribution. Kept paired to make
343             * accidental drift between global and per-slot counts impossible. */
344 0           static inline void dsu_park_reader(DsuHandle *h) {
345 0 0         if (h->my_slot_idx != UINT32_MAX)
346 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
347 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
348 0           }
349 0           static inline void dsu_unpark_reader(DsuHandle *h) {
350 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
351 0 0         if (h->my_slot_idx != UINT32_MAX)
352 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
353 0           }
354 0           static inline void dsu_park_writer(DsuHandle *h) {
355 0 0         if (h->my_slot_idx != UINT32_MAX) {
356 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
357 0           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
358             }
359 0           __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
360 0           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
361 0           }
362 0           static inline void dsu_unpark_writer(DsuHandle *h) {
363 0           __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
364 0           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
365 0 0         if (h->my_slot_idx != UINT32_MAX) {
366 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
367 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
368             }
369 0           }
370              
371 2017           static inline void dsu_rwlock_rdlock(DsuHandle *h) {
372 2017           dsu_claim_reader_slot(h);
373 2017           DsuHeader *hdr = h->hdr;
374 2017           uint32_t *lock = &hdr->rwlock;
375 2017           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
376             /* Claim subcount BEFORE bumping the shared rwlock counter. This way
377             * a concurrent writer-side recovery scan that sees our PID alive with
378             * subcount > 0 will (correctly) defer force-reset, even while we are
379             * still spinning trying to win the rwlock CAS. Without this, a reader
380             * killed between rwlock CAS-success and subcount++ would let recovery
381             * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
382             * our eventual rdunlock dec. */
383 2017 50         if (h->my_slot_idx != UINT32_MAX)
384 2017           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
385 2017           for (int spin = 0; ; spin++) {
386 2017           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
387             /* Write-preferring: when lock is free (cur==0) and writers are
388             * waiting, yield to let the writer acquire. When readers are
389             * already active (cur>=1), new readers may join freely. */
390 2017 50         if (cur > 0 && cur < DSU_RWLOCK_WRITER_BIT) {
    0          
391 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
392             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
393 2017           return;
394 2017 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
395 2017 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
396             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
397 2017           return;
398             }
399 0 0         if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
400 0           dsu_rwlock_spin_pause();
401 0           continue;
402             }
403 0           dsu_park_reader(h);
404 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
405             /* Sleep when write-locked OR when yielding to waiting writers */
406 0 0         if (cur >= DSU_RWLOCK_WRITER_BIT || cur == 0) {
    0          
407 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
408             &dsu_lock_timeout, NULL, 0);
409 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
410 0           dsu_unpark_reader(h);
411 0           dsu_recover_after_timeout(h);
412 0           spin = 0;
413 0           continue;
414             }
415             }
416 0           dsu_unpark_reader(h);
417 0           spin = 0;
418             }
419             }
420              
421 2017           static inline void dsu_rwlock_rdunlock(DsuHandle *h) {
422 2017           DsuHeader *hdr = h->hdr;
423             /* Release the shared counter BEFORE dropping our subcount so that
424             * "any live PID with subcount > 0" is a reliable in-flight indicator
425             * for the writer-side recovery scan. Inverting these would create a
426             * window where we still own a unit of rwlock but our slot subcount is
427             * 0, letting recovery force-reset rwlock underneath us. */
428 2017           uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
429 2017 50         if (h->my_slot_idx != UINT32_MAX)
430 2017           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
431 2017 50         if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
    50          
432 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
433 2017           }
434              
435 12840           static inline void dsu_rwlock_wrlock(DsuHandle *h) {
436 12840           dsu_claim_reader_slot(h); /* refresh cached_pid across fork */
437 12840           DsuHeader *hdr = h->hdr;
438 12840           uint32_t *lock = &hdr->rwlock;
439             /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
440             * any crash window between acquiring the lock and storing the owner. */
441 12840           uint32_t mypid = DSU_RWLOCK_WR(h->cached_pid);
442 12840           for (int spin = 0; ; spin++) {
443 12840           uint32_t expected = 0;
444 12840 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
445             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
446 12840           return;
447 0 0         if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
448 0           dsu_rwlock_spin_pause();
449 0           continue;
450             }
451 0           dsu_park_writer(h);
452 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
453 0 0         if (cur != 0) {
454 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
455             &dsu_lock_timeout, NULL, 0);
456 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
457 0           dsu_unpark_writer(h);
458 0           dsu_recover_after_timeout(h);
459 0           spin = 0;
460 0           continue;
461             }
462             }
463 0           dsu_unpark_writer(h);
464 0           spin = 0;
465             }
466             }
467              
468 12840           static inline void dsu_rwlock_wrunlock(DsuHandle *h) {
469 12840           DsuHeader *hdr = h->hdr;
470 12840           __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
471 12840 50         if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
472 0           syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
473 12840           }
474              
475             /* ================================================================
476             * Layout math + create / open / destroy
477             *
478             * Layout: Header -> reader_slots[1024] -> parent[n] (uint32) -> size[n] (uint32)
479             * Both arrays are 4-byte words; the reader-slot region is a multiple of 4
480             * bytes, so parent_off and size_off are naturally 4-byte aligned.
481             * ================================================================ */
482              
483             /* Single source of truth for the mmap region layout offsets. */
484             typedef struct { uint64_t reader_slots, parent, size; } DsuLayout;
485              
486 42           static inline DsuLayout dsu_layout(uint32_t n) {
487             DsuLayout L;
488 42           L.reader_slots = sizeof(DsuHeader);
489 42           L.parent = L.reader_slots + (uint64_t)DSU_READER_SLOTS * sizeof(DsuReaderSlot);
490 42           L.size = L.parent + (uint64_t)n * sizeof(uint32_t);
491 42           return L;
492             }
493              
494 22           static inline uint64_t dsu_total_size(uint32_t n) {
495 22           DsuLayout L = dsu_layout(n);
496 22           return L.size + (uint64_t)n * sizeof(uint32_t); /* parent[n] + size[n] */
497             }
498              
499 21458           static inline uint32_t *dsu_parent(DsuHandle *h) {
500 21458           return (uint32_t *)((char *)h->base + h->hdr->parent_off);
501             }
502              
503 3063           static inline uint32_t *dsu_size(DsuHandle *h) {
504 3063           return (uint32_t *)((char *)h->base + h->hdr->size_off);
505             }
506              
507             /* ================================================================
508             * Union-find core (callers hold the WRITE lock -- find compresses)
509             * ================================================================ */
510              
511             /* Find the root of x with path halving (every other node on the path is
512             * relinked to its grandparent). MUTATING -- the caller must hold the write
513             * lock. x must already be range-checked (< n) by the XS layer. */
514 20524           static inline uint32_t dsu_find(DsuHandle *h, uint32_t x) {
515 20524           uint32_t *p = dsu_parent(h);
516 35763 100         while (p[x] != x) {
517 15239           p[x] = p[p[x]]; /* path halving */
518 15239           x = p[x];
519             }
520 20524           return x;
521             }
522              
523             /* Union the sets containing a and b by size (the larger-sized root wins, so
524             * the tree stays shallow). Returns 1 if the two were in different sets and
525             * are now merged, 0 if they were already in the same set. Caller holds the
526             * write lock; a and b are range-checked. */
527 2026           static inline int dsu_union_locked(DsuHandle *h, uint32_t a, uint32_t b) {
528 2026           uint32_t ra = dsu_find(h, a), rb = dsu_find(h, b);
529 2026 100         if (ra == rb) return 0;
530 932           uint32_t *p = dsu_parent(h);
531 932           uint32_t *sz = dsu_size(h);
532 932 100         if (sz[ra] < sz[rb]) { uint32_t t = ra; ra = rb; rb = t; }
533 932           p[rb] = ra;
534 932           sz[ra] += sz[rb];
535 932           h->hdr->num_sets--;
536 932           return 1;
537             }
538              
539             /* Whether a and b are in the same set (mutates via path compression). */
540 5652           static inline int dsu_connected_locked(DsuHandle *h, uint32_t a, uint32_t b) {
541 5652           return dsu_find(h, a) == dsu_find(h, b);
542             }
543              
544             /* Size of the set containing x (mutates via path compression). */
545 2129           static inline uint32_t dsu_set_size_locked(DsuHandle *h, uint32_t x) {
546 2129           return dsu_size(h)[dsu_find(h, x)];
547             }
548              
549             /* Reset to all singletons: parent[i]=i, size[i]=1, num_sets=n.
550             * Caller holds the write lock. */
551 2           static inline void dsu_reset_locked(DsuHandle *h) {
552 2           uint32_t *p = dsu_parent(h);
553 2           uint32_t *sz = dsu_size(h);
554 2           uint32_t n = h->hdr->n;
555 34 100         for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
556 2           h->hdr->num_sets = n;
557 2           }
558              
559             /* ================================================================
560             * Validate args + header init / setup / open / destroy
561             * ================================================================ */
562              
563             /* Validate create args. Single source of truth: the XS layer does NOT
564             * duplicate this range check. */
565 22           static int dsu_validate_create_args(uint64_t n, char *errbuf) {
566 22 50         if (errbuf) errbuf[0] = '\0';
567 22 100         if (n < 1) { DSU_ERR("n must be >= 1"); return 0; }
    50          
568 20 50         if (n > DSU_MAX_N) { DSU_ERR("n must be <= %u", (unsigned)DSU_MAX_N); return 0; }
    0          
569 20           return 1;
570             }
571              
572 18           static inline void dsu_init_header(void *base, uint32_t n, uint64_t total_size) {
573 18           DsuLayout L = dsu_layout(n);
574 18           DsuHeader *hdr = (DsuHeader *)base;
575             /* Explicitly zero the header + reader-slot region (lock-recovery state);
576             the parent/size arrays are initialized explicitly below. */
577 18           memset(base, 0, (size_t)L.parent);
578 18           hdr->magic = DSU_MAGIC;
579 18           hdr->version = DSU_VERSION;
580 18           hdr->n = n;
581 18           hdr->num_sets = n;
582 18           hdr->total_size = total_size;
583 18           hdr->reader_slots_off = L.reader_slots;
584 18           hdr->parent_off = L.parent;
585 18           hdr->size_off = L.size;
586             {
587 18           uint32_t *p = (uint32_t *)((char *)base + L.parent);
588 18           uint32_t *sz = (uint32_t *)((char *)base + L.size);
589 1260 100         for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
590             }
591 18           __atomic_thread_fence(__ATOMIC_SEQ_CST);
592 18           }
593              
594 20           static inline DsuHandle *dsu_setup(void *base, size_t map_size,
595             const char *path, int backing_fd) {
596 20           DsuHeader *hdr = (DsuHeader *)base;
597 20           DsuHandle *h = (DsuHandle *)calloc(1, sizeof(DsuHandle));
598 20 50         if (!h) {
599 0           munmap(base, map_size);
600 0 0         if (backing_fd >= 0) close(backing_fd);
601 0           return NULL;
602             }
603 20           h->hdr = hdr;
604 20           h->base = base;
605 20           h->reader_slots = (DsuReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
606 20           h->mmap_size = map_size;
607 20 100         h->path = path ? strdup(path) : NULL;
608 20           h->backing_fd = backing_fd;
609 20           h->my_slot_idx = UINT32_MAX;
610 20           return h;
611             }
612              
613             /* Validate a mapped header (shared by dsu_create reopen and dsu_open_fd).
614             * Stored n wins on reopen; require total_size == the size n implies and ==
615             * the actual file size, and all offsets to match the canonical layout. */
616 2           static inline int dsu_validate_header(const DsuHeader *hdr, uint64_t file_size) {
617 2 50         if (hdr->magic != DSU_MAGIC) return 0;
618 2 50         if (hdr->version != DSU_VERSION) return 0;
619 2 50         if (hdr->n < 1 || hdr->n > DSU_MAX_N) return 0;
    50          
620 2 50         if (hdr->num_sets < 1 || hdr->num_sets > hdr->n) return 0;
    50          
621 2 50         if (hdr->total_size != file_size) return 0;
622 2 50         if (hdr->total_size != dsu_total_size(hdr->n)) return 0;
623 2           DsuLayout L = dsu_layout(hdr->n);
624 2 50         if (hdr->reader_slots_off != L.reader_slots) return 0;
625 2 50         if (hdr->parent_off != L.parent) return 0;
626 2 50         if (hdr->size_off != L.size) return 0;
627 2           return 1;
628             }
629              
630 19           static DsuHandle *dsu_create(const char *path, uint64_t n_in, char *errbuf) {
631 19 100         if (!dsu_validate_create_args(n_in, errbuf)) return NULL;
632 18           uint32_t n = (uint32_t)n_in;
633              
634 18           uint64_t total = dsu_total_size(n);
635 18           int anonymous = (path == NULL);
636 18           int fd = -1;
637             size_t map_size;
638             void *base;
639              
640 18 100         if (anonymous) {
641 13           map_size = (size_t)total;
642 13           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
643 13 50         if (base == MAP_FAILED) { DSU_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
644             } else {
645 5           fd = open(path, O_RDWR|O_CREAT, 0666);
646 7 50         if (fd < 0) { DSU_ERR("open: %s", strerror(errno)); return NULL; }
    0          
647 5 50         if (flock(fd, LOCK_EX) < 0) { DSU_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
648             struct stat st;
649 5 50         if (fstat(fd, &st) < 0) { DSU_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
650 5           int is_new = (st.st_size == 0);
651 5 100         if (!is_new && (uint64_t)st.st_size < sizeof(DsuHeader)) {
    100          
652 1 50         DSU_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
653 1           flock(fd, LOCK_UN); close(fd); return NULL;
654             }
655 4 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
656 0 0         DSU_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
657             }
658 4 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
659 4           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
660 4 50         if (base == MAP_FAILED) { DSU_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
661 4 100         if (!is_new) {
662 1 50         if (!dsu_validate_header((DsuHeader *)base, (uint64_t)st.st_size)) {
663 0 0         DSU_ERR("invalid disjoint-set file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
664             }
665 1           flock(fd, LOCK_UN); close(fd);
666 1           return dsu_setup(base, map_size, path, -1);
667             }
668             }
669 16           dsu_init_header(base, n, total);
670 16 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
671 16           return dsu_setup(base, map_size, path, -1);
672             }
673              
674 3           static DsuHandle *dsu_create_memfd(const char *name, uint64_t n_in, char *errbuf) {
675 3 100         if (!dsu_validate_create_args(n_in, errbuf)) return NULL;
676 2           uint32_t n = (uint32_t)n_in;
677              
678 2           uint64_t total = dsu_total_size(n);
679 2 100         int fd = memfd_create(name ? name : "dsu", MFD_CLOEXEC | MFD_ALLOW_SEALING);
680 2 50         if (fd < 0) { DSU_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
681 2 50         if (ftruncate(fd, (off_t)total) < 0) {
682 0 0         DSU_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
683             }
684 2           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
685 2           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
686 2 50         if (base == MAP_FAILED) { DSU_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
687 2           dsu_init_header(base, n, total);
688 2           return dsu_setup(base, (size_t)total, NULL, fd);
689             }
690              
691 2           static DsuHandle *dsu_open_fd(int fd, char *errbuf) {
692 2 50         if (errbuf) errbuf[0] = '\0';
693             struct stat st;
694 2 50         if (fstat(fd, &st) < 0) { DSU_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
695 2 100         if ((uint64_t)st.st_size < sizeof(DsuHeader)) { DSU_ERR("too small"); return NULL; }
    50          
696 1           size_t ms = (size_t)st.st_size;
697 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
698 1 50         if (base == MAP_FAILED) { DSU_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
699 1 50         if (!dsu_validate_header((DsuHeader *)base, (uint64_t)st.st_size)) {
700 0 0         DSU_ERR("invalid disjoint-set table"); munmap(base, ms); return NULL;
701             }
702 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
703 1 50         if (myfd < 0) { DSU_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
704 1           return dsu_setup(base, ms, NULL, myfd);
705             }
706              
707 20           static void dsu_destroy(DsuHandle *h) {
708 20 50         if (!h) return;
709 20 100         if (h->backing_fd >= 0) close(h->backing_fd);
710 20 50         if (h->base) munmap(h->base, h->mmap_size);
711 20           free(h->path);
712 20           free(h);
713             }
714              
715 2           static inline int dsu_msync(DsuHandle *h) {
716 2 50         if (!h || !h->base) return 0;
    50          
717 2           return msync(h->base, h->mmap_size, MS_SYNC);
718             }
719              
720             #endif /* DSU_H */