File Coverage

sync.h
Criterion Covered Total %
statement 620 867 71.5
branch 291 606 48.0
condition n/a
subroutine n/a
pod n/a
total 911 1473 61.8


line stmt bran cond sub pod time code
1             /*
2             * sync.h -- Shared-memory synchronization primitives for Linux
3             *
4             * Five primitives:
5             * Semaphore — bounded counter (CAS-based, cross-process resource limiting)
6             * Barrier — N processes rendezvous at a point before proceeding
7             * RWLock — reader-writer lock for external resources
8             * Condvar — condition variable with futex wait/signal/broadcast
9             * Once — one-time initialization gate (like pthread_once)
10             *
11             * All use file-backed mmap(MAP_SHARED) for cross-process sharing,
12             * futex for blocking wait, and PID-based stale lock recovery.
13             */
14              
15             #ifndef SYNC_H
16             #define SYNC_H
17              
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35              
36             /* ================================================================
37             * Constants
38             * ================================================================ */
39              
40             #define SYNC_MAGIC 0x53594E31U /* "SYN1" */
41             #define SYNC_VERSION 2 /* v2: per-process reader-slot table for RWLock dead-reader recovery */
42              
43             /* Primitive type IDs */
44             #define SYNC_TYPE_SEMAPHORE 0
45             #define SYNC_TYPE_BARRIER 1
46             #define SYNC_TYPE_RWLOCK 2
47             #define SYNC_TYPE_CONDVAR 3
48             #define SYNC_TYPE_ONCE 4
49              
50             #define SYNC_ERR_BUFLEN 256
51             #define SYNC_SPIN_LIMIT 32
52             #define SYNC_LOCK_TIMEOUT_SEC 2
53             #define SYNC_READER_SLOTS 1024 /* per-process reader-counter mirror for RWLock */
54              
55             /* ================================================================
56             * Per-process reader-slot table (for RWLock dead-reader recovery)
57             *
58             * Allocated only when type == SYNC_TYPE_RWLOCK (Option A).
59             * Mirrors each process's contribution to the global rwlock counters so a
60             * SIGKILL'd reader's stuck reader-count contribution can be reclaimed.
61             * ~16KB per RWLock (1024 slots * 16 bytes); zero overhead for other types.
62             * ================================================================ */
63              
64             typedef struct {
65             uint32_t pid; /* owning PID, 0 = free */
66             uint32_t subcount; /* this process's rwlock reader contribution */
67             uint32_t waiters_parked; /* this process's contribution to hdr->waiters */
68             uint32_t writers_parked; /* this process's contribution to rwlock_writers_waiting */
69             } SyncReaderSlot;
70              
71             /* ================================================================
72             * Header (128 bytes = 2 cache lines, lives at start of mmap)
73             * ================================================================ */
74              
75             typedef struct {
76             /* ---- Cache line 0 (0-63): immutable after create ---- */
77             uint32_t magic; /* 0 */
78             uint32_t version; /* 4 */
79             uint32_t type; /* 8: SYNC_TYPE_* */
80             uint32_t param; /* 12: type-specific (sem max, barrier count, etc.) */
81             uint64_t total_size; /* 16: mmap size */
82             uint64_t reader_slots_off;/* 24: offset of SyncReaderSlot[SYNC_READER_SLOTS], 0 if not allocated (non-RWLock primitives) */
83             uint8_t _pad0[32]; /* 32-63 */
84              
85             /* ---- Cache line 1 (64-127): mutable state ---- */
86              
87             /* Semaphore: value = current count, waiters = blocked acquirers */
88             /* Barrier: value = arrived count, waiters = blocked at barrier,
89             generation = increments each time barrier trips */
90             /* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
91             waiters = blocked lockers */
92             /* Condvar: value = signal counter (futex word), waiters = blocked waiters,
93             mutex = associated mutex for predicate protection */
94             /* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
95             waiters = blocked on completion */
96              
97             uint32_t value; /* 64: primary state word (futex target) */
98             uint32_t waiters; /* 68: waiter count */
99             uint32_t generation; /* 72: barrier generation / condvar epoch */
100             uint32_t mutex; /* 76: condvar mutex (0 or PID|0x80000000) */
101             uint32_t mutex_waiters; /* 80: condvar mutex waiter count */
102             uint32_t stat_recoveries;/* 84 */
103             uint64_t stat_acquires; /* 88 */
104             uint64_t stat_releases; /* 96 */
105             uint64_t stat_waits; /* 104 */
106             uint64_t stat_timeouts; /* 112 */
107             uint32_t stat_signals; /* 120 */
108             uint32_t rwlock_writers_waiting; /* 124: RWLock write-preferring yield signal
109             (writers only, not readers) */
110             } SyncHeader;
111              
112             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
113             _Static_assert(sizeof(SyncHeader) == 128, "SyncHeader must be 128 bytes");
114             #endif
115              
116             /* ================================================================
117             * Process-local handle
118             * ================================================================ */
119              
120             typedef struct {
121             SyncHeader *hdr;
122             size_t mmap_size;
123             char *path;
124             int notify_fd; /* eventfd, -1 if disabled */
125             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
126             SyncReaderSlot *reader_slots; /* in mmap, SYNC_READER_SLOTS entries; NULL if not RWLock */
127             uint32_t my_slot_idx; /* UINT32_MAX = unclaimed; per-process slot index */
128             uint32_t cached_pid; /* getpid() at claim time */
129             uint32_t cached_fork_gen; /* fork-generation at claim time */
130             } SyncHandle;
131              
132             /* ================================================================
133             * Utility
134             * ================================================================ */
135              
136 64           static inline void sync_spin_pause(void) {
137             #if defined(__x86_64__) || defined(__i386__)
138 64           __asm__ volatile("pause" ::: "memory");
139             #elif defined(__aarch64__)
140             __asm__ volatile("yield" ::: "memory");
141             #else
142             __asm__ volatile("" ::: "memory");
143             #endif
144 64           }
145              
146 1           static inline int sync_pid_alive(uint32_t pid) {
147 1 50         if (pid == 0) return 1;
148 1 50         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    50          
149             }
150              
151             /* Convert timeout in seconds (double) to absolute deadline */
152 17           static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
153 17           clock_gettime(CLOCK_MONOTONIC, deadline);
154 17           deadline->tv_sec += (time_t)timeout;
155 17           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
156 17 100         if (deadline->tv_nsec >= 1000000000L) {
157 1           deadline->tv_sec++;
158 1           deadline->tv_nsec -= 1000000000L;
159             }
160 17           }
161              
162             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
163 18           static inline int sync_remaining_time(const struct timespec *deadline,
164             struct timespec *remaining) {
165             struct timespec now;
166 18           clock_gettime(CLOCK_MONOTONIC, &now);
167 18           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
168 18           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
169 18 100         if (remaining->tv_nsec < 0) {
170 10           remaining->tv_sec--;
171 10           remaining->tv_nsec += 1000000000L;
172             }
173 18           return remaining->tv_sec >= 0;
174             }
175              
176             /* ================================================================
177             * Mutex helpers (for Condvar's internal mutex)
178             * ================================================================ */
179              
180             #define SYNC_MUTEX_WRITER_BIT 0x80000000U
181             #define SYNC_MUTEX_PID_MASK 0x7FFFFFFFU
182             #define SYNC_MUTEX_VAL(pid) (SYNC_MUTEX_WRITER_BIT | ((uint32_t)(pid) & SYNC_MUTEX_PID_MASK))
183              
184             static const struct timespec sync_lock_timeout = { SYNC_LOCK_TIMEOUT_SEC, 0 };
185              
186 0           static inline void sync_recover_stale_mutex(SyncHeader *hdr, uint32_t observed) {
187 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
188             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
189 0           return;
190 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
191 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
192 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
193             }
194              
195 15           static inline void sync_mutex_lock(SyncHeader *hdr) {
196 15           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
197 15           for (int spin = 0; ; spin++) {
198 15           uint32_t expected = 0;
199 15 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
200             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
201 15           return;
202 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
203 0           sync_spin_pause();
204 0           continue;
205             }
206 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
207 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
208 0 0         if (cur != 0) {
209 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
210             &sync_lock_timeout, NULL, 0);
211 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
212 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
213 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
214 0 0         if (val >= SYNC_MUTEX_WRITER_BIT) {
215 0           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
216 0 0         if (!sync_pid_alive(pid))
217 0           sync_recover_stale_mutex(hdr, val);
218             }
219 0           spin = 0;
220 0           continue;
221             }
222             }
223 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
224 0           spin = 0;
225             }
226             }
227              
228 16           static inline void sync_mutex_unlock(SyncHeader *hdr) {
229 16           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
230 16 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
231 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
232 16           }
233              
234             /* ================================================================
235             * RWLock helpers (for SYNC_TYPE_RWLOCK)
236             *
237             * value == 0: unlocked
238             * value 1..0x7FFFFFFF: N active readers
239             * value 0x80000000 | pid: write-locked by pid
240             * ================================================================ */
241              
242             #define SYNC_RWLOCK_WRITER_BIT 0x80000000U
243             #define SYNC_RWLOCK_PID_MASK 0x7FFFFFFFU
244             #define SYNC_RWLOCK_WR(pid) (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))
245              
246             static inline int sync_rwlock_try_rdlock(SyncHandle *h);
247             static inline int sync_rwlock_try_wrlock(SyncHandle *h);
248              
249 0           static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
250 0 0         if (!__atomic_compare_exchange_n(&hdr->value, &observed, 0,
251             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
252 0           return;
253 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
254 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
255 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
256             }
257              
258             /* ---- Per-process reader-slot lifecycle (dead-reader recovery) ----
259             * Each process claims one SyncReaderSlot lazily on first rwlock op so that
260             * its contribution to the shared reader-count can be reclaimed by other
261             * processes if it dies (SIGKILL'd reader no longer pins the counter).
262             * Only relevant for SYNC_TYPE_RWLOCK; non-RWLock primitives leave
263             * h->reader_slots == NULL and these helpers become no-ops. */
264             static uint32_t sync_fork_gen = 0;
265             static pthread_once_t sync_atfork_once = PTHREAD_ONCE_INIT;
266 0           static void sync_on_fork_child(void) {
267 0           __atomic_add_fetch(&sync_fork_gen, 1, __ATOMIC_RELAXED);
268 0           }
269 2           static void sync_atfork_init(void) {
270 2           pthread_atfork(NULL, NULL, sync_on_fork_child);
271 2           }
272              
273 20           static inline void sync_claim_reader_slot(SyncHandle *h) {
274 20 50         if (!h->reader_slots) return;
275 20           pthread_once(&sync_atfork_once, sync_atfork_init);
276 20           uint32_t cur_gen = __atomic_load_n(&sync_fork_gen, __ATOMIC_RELAXED);
277 20 50         if (h->cached_fork_gen != cur_gen) {
278 0           h->cached_fork_gen = cur_gen;
279 0           h->my_slot_idx = UINT32_MAX;
280             }
281 20 100         if (h->my_slot_idx != UINT32_MAX) return;
282 8           uint32_t now_pid = (uint32_t)getpid();
283 8           h->cached_pid = now_pid;
284 8           uint32_t start = now_pid % SYNC_READER_SLOTS;
285 8 50         for (uint32_t i = 0; i < SYNC_READER_SLOTS; i++) {
286 8           uint32_t s = (start + i) % SYNC_READER_SLOTS;
287 8           uint32_t expected = 0;
288 8 50         if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
289             &expected, now_pid, 0,
290             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
291 8           __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
292 8           __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
293 8           __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
294 8           h->my_slot_idx = s;
295 8           return;
296             }
297             }
298             /* Slot table full — silently skip tracking; recovery falls back to
299             * the slow per-op timeout drain. */
300             }
301              
302             /* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
303 0           static inline void sync_atomic_sub_cap(uint32_t *p, uint32_t sub) {
304 0 0         if (!sub) return;
305 0           uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
306 0           for (;;) {
307 0 0         uint32_t want = (cur > sub) ? cur - sub : 0;
308 0 0         if (__atomic_compare_exchange_n(p, &cur, want,
309             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
310 0           return;
311             }
312             }
313              
314             /* Try to claim a dead slot (CAS pid → 0) and drain its parked-waiter
315             * contributions to the global counters. Returns 1 if drained, 0 if lost
316             * the CAS race or had no contributions. ACQ_REL syncs us with the dead
317             * process's RELAXED stores to mirror fields on weakly-ordered archs. */
318 0           static inline int sync_drain_dead_slot(SyncHandle *h, uint32_t i, uint32_t pid) {
319 0           SyncHeader *hdr = h->hdr;
320 0           uint32_t expected = pid;
321 0 0         if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
322             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
323 0           return 0;
324 0           uint32_t wp = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
325 0           uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
326 0           int drained = 0;
327 0 0         if (wp) { sync_atomic_sub_cap(&hdr->waiters, wp); drained = 1; }
328 0 0         if (writp) { sync_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp); drained = 1; }
329             /* Don't zero slot fields — sync_claim_reader_slot zeros them on the
330             * next claim; zeroing here can race a new claimant's increments. */
331 0           return drained;
332             }
333              
334 0           static inline void sync_recover_dead_readers(SyncHandle *h) {
335 0 0         if (!h->reader_slots) return;
336 0           SyncHeader *hdr = h->hdr;
337 0           int any_live_reader = 0;
338 0           int found_dead_reader = 0;
339 0           int any_recovery = 0;
340              
341             /* Pass 1: scan; classify; immediate-wipe dead slots with sc==0 (no
342             * rwlock contribution to lose). Defer wiping dead-with-sc>0 slots
343             * until force-reset can fire — otherwise we'd lose the only record
344             * of the orphan rwlock contribution while a live reader is present. */
345 0 0         for (uint32_t i = 0; i < SYNC_READER_SLOTS; i++) {
346 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
347 0 0         if (pid == 0) continue;
348 0           uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
349 0 0         if (sync_pid_alive(pid)) {
350 0 0         if (sc > 0) any_live_reader = 1;
351 0           continue;
352             }
353 0 0         if (sc > 0) { found_dead_reader = 1; continue; }
354 0 0         if (sync_drain_dead_slot(h, i, pid)) any_recovery = 1;
355             }
356              
357             /* Pass 2: only if force-reset will fire. Issue the rwlock CAS first
358             * to keep the race window with new readers narrow, then wipe the
359             * deferred dead slots. */
360 0 0         if (found_dead_reader && !any_live_reader) {
    0          
361 0           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
362 0 0         if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
    0          
363 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, 0,
364             0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
365 0           any_recovery = 1;
366 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
367 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
368             }
369             }
370 0 0         for (uint32_t i = 0; i < SYNC_READER_SLOTS; i++) {
371 0           uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
372 0 0         if (pid == 0) continue;
373 0 0         if (sync_pid_alive(pid)) continue;
374 0 0         if (sync_drain_dead_slot(h, i, pid)) any_recovery = 1;
375             }
376             }
377 0 0         if (any_recovery)
378 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
379             }
380              
381             /* Park/unpark helpers — keep global hdr->waiters/rwlock_writers_waiting
382             * and per-slot mirror counters in sync so recovery can drain them. */
383 1           static inline void sync_park_reader(SyncHandle *h) {
384 1           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
385 1 50         if (h->my_slot_idx != UINT32_MAX)
386 1           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
387 1           }
388 1           static inline void sync_unpark_reader(SyncHandle *h) {
389 1           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
390 1 50         if (h->my_slot_idx != UINT32_MAX)
391 1           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
392 1           }
393 1           static inline void sync_park_writer(SyncHandle *h) {
394 1           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
395 1           __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
396 1 50         if (h->my_slot_idx != UINT32_MAX) {
397 1           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
398 1           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
399             }
400 1           }
401 1           static inline void sync_unpark_writer(SyncHandle *h) {
402 1           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
403 1           __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
404 1 50         if (h->my_slot_idx != UINT32_MAX) {
405 1           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
406 1           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
407             }
408 1           }
409              
410             /* Recovery dispatcher: if a writer is dead, force-reset the lock word;
411             * otherwise scan reader slots for dead readers and drain their stuck
412             * contributions to the rwlock and waiter counters. Reload the lock
413             * value here (rather than trusting a stale snapshot from the futex
414             * caller) so that (a) a writer that died after our futex_wait started
415             * is detected on the same timeout, and (b) phantom waiter/writers_waiting
416             * contributions left by a dead parked writer are drained even when the
417             * lock word itself is now 0. */
418 0           static inline void sync_recover_after_timeout(SyncHandle *h) {
419 0           SyncHeader *hdr = h->hdr;
420 0           uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
421 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
422 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
423 0 0         if (!sync_pid_alive(pid))
424 0           sync_recover_stale_rwlock(hdr, val);
425             } else {
426 0           sync_recover_dead_readers(h);
427             }
428 0           }
429              
430 7           static inline void sync_rwlock_rdlock(SyncHandle *h) {
431 7           SyncHeader *hdr = h->hdr;
432 7           sync_claim_reader_slot(h);
433 7           uint32_t *lock = &hdr->value;
434 7           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
435             /* Bump per-process subcount BEFORE attempting the rwlock CAS so a
436             * concurrent recovery scan sees us as a live in-flight reader. */
437 7 50         if (h->my_slot_idx != UINT32_MAX)
438 7           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
439 7           for (int spin = 0; ; spin++) {
440 7           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
441             /* Write-preferring: yield to parked writers when lock is free. */
442 7 100         if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
    50          
443 1 50         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
444             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
445 7           return;
446 6 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
447 6 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
448             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
449 6           return;
450             }
451 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
452 0           sync_spin_pause();
453 0           continue;
454             }
455 0           sync_park_reader(h);
456 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
457             /* Sleep when write-locked OR yielding to parked writers (cur==0) */
458 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
    0          
459 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
460             &sync_lock_timeout, NULL, 0);
461 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
462 0           sync_unpark_reader(h);
463 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
464 0           sync_recover_after_timeout(h);
465             } else {
466             /* Yielding to writers timed out — optimistically drop one
467             * writers_waiting to recover from potentially-crashed
468             * parked writer. A live writer just re-increments. */
469 0           uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
470 0 0         while (wc > 0 && !__atomic_compare_exchange_n(
471 0 0         writers_waiting, &wc, wc - 1,
472             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
473             /* Also opportunistically reap dead-reader slot mirrors
474             * (some other reader holds the lock but may be dead). */
475 0           sync_recover_dead_readers(h);
476             }
477 0           spin = 0;
478 0           continue;
479             }
480             }
481 0           sync_unpark_reader(h);
482 0           spin = 0;
483             }
484             }
485              
486             /* Timed rdlock: returns 1 on success, 0 on timeout. timeout<0 = infinite.
487             * No try-lock fast-path: would bypass write-preference when cur==0 &&
488             * writers_waiting > 0. Main loop's first iteration handles the uncontended
489             * case at ~same cost.
490             *
491             * Uses the same slot-claim + park-reader pattern as the regular rdlock,
492             * with user-timeout ETIMEDOUT short-circuiting to return 0 (after we drop
493             * any claimed subcount). Per-iteration futex waits are capped at
494             * SYNC_LOCK_TIMEOUT_SEC so the global recovery scan runs periodically. */
495 3           static inline int sync_rwlock_rdlock_timed(SyncHandle *h, double timeout) {
496 3 50         if (timeout == 0) {
497 0           return sync_rwlock_try_rdlock(h);
498             }
499              
500 3           SyncHeader *hdr = h->hdr;
501 3           sync_claim_reader_slot(h);
502 3           uint32_t *lock = &hdr->value;
503             struct timespec deadline, remaining;
504 3           int has_deadline = (timeout > 0);
505 3 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
506              
507             /* Bump per-process subcount BEFORE attempting the rwlock CAS so a
508             * concurrent recovery scan sees us as a live in-flight reader. */
509 3 50         if (h->my_slot_idx != UINT32_MAX)
510 3           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
511              
512 3           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
513 35           for (int spin = 0; ; spin++) {
514 35           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
515 35 100         if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
    50          
516 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
517             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
518 3           return 1;
519 35 100         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
520 2 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
521             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
522 2           return 1;
523             }
524 33 100         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
525 32           sync_spin_pause();
526 32           continue;
527             }
528 1           sync_park_reader(h);
529 1           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
530 1 50         if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
    0          
531 1           struct timespec *pts = NULL;
532 1           int capped = 0;
533             /* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
534             * runs periodically even with a user-supplied deadline. */
535 1 50         if (has_deadline) {
536 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
537 0           sync_unpark_reader(h);
538 0 0         if (h->my_slot_idx != UINT32_MAX)
539 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
540 0           return 0;
541             }
542 1 50         if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC) {
543 0           pts = (struct timespec *)&sync_lock_timeout;
544 0           capped = 1;
545             } else {
546 1           pts = &remaining;
547             }
548             } else {
549 0           pts = (struct timespec *)&sync_lock_timeout;
550 0           capped = 1;
551             }
552 1           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
553 1 50         if (rc == -1 && errno == ETIMEDOUT) {
    50          
554 1           sync_unpark_reader(h);
555             /* If timeout matches the global lock-timeout cap (not the
556             * user's deadline), run the recovery scan; otherwise it's
557             * the user's deadline expiring and we should return 0. */
558 1 50         if (!capped) {
559 1 50         if (h->my_slot_idx != UINT32_MAX)
560 1           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
561 1           return 0;
562             }
563 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
564 0           sync_recover_after_timeout(h);
565             } else {
566             /* Yielding to writer timed out — drop one writers_waiting
567             * to recover from a potentially-crashed parked writer. */
568 0           uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
569 0 0         while (wc > 0 && !__atomic_compare_exchange_n(
570 0 0         writers_waiting, &wc, wc - 1,
571             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
572 0           sync_recover_dead_readers(h);
573             }
574 0           spin = 0;
575 0           continue;
576             }
577             }
578 0           sync_unpark_reader(h);
579 0           spin = 0;
580             }
581             }
582              
583             /* try_rdlock: bump subcount up-front so concurrent recovery scans see us
584             * as live in-flight; revert on CAS failure. Cheap and keeps recovery
585             * accounting consistent. */
586 3           static inline int sync_rwlock_try_rdlock(SyncHandle *h) {
587 3           SyncHeader *hdr = h->hdr;
588 3           sync_claim_reader_slot(h);
589 3 50         if (h->my_slot_idx != UINT32_MAX)
590 3           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
591 3           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
592 3 100         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
593 1 50         if (h->my_slot_idx != UINT32_MAX)
594 1           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
595 1           return 0;
596             }
597 2 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur + 1,
598             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
599 2           return 1;
600 0 0         if (h->my_slot_idx != UINT32_MAX)
601 0           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
602 0           return 0;
603             }
604              
605 12           static inline void sync_rwlock_rdunlock(SyncHandle *h) {
606 12           SyncHeader *hdr = h->hdr;
607             /* Decrement rwlock BEFORE subcount: a concurrent recovery scan that
608             * sees subcount > 0 with our (live) PID will (correctly) treat us as
609             * an in-flight reader and skip force-reset. */
610 12           uint32_t prev = __atomic_sub_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
611 12 50         if (h->my_slot_idx != UINT32_MAX)
612 12           __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
613 12 100         if (prev == 0 && __atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
    50          
614 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
615 12           }
616              
617 5           static inline void sync_rwlock_wrlock(SyncHandle *h) {
618 5           SyncHeader *hdr = h->hdr;
619 5           sync_claim_reader_slot(h);
620 5           uint32_t *lock = &hdr->value;
621 5           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
622 5           for (int spin = 0; ; spin++) {
623 5           uint32_t expected = 0;
624 5 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
625             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
626 5           return;
627 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
628 0           sync_spin_pause();
629 0           continue;
630             }
631 0           sync_park_writer(h);
632 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
633 0 0         if (cur != 0) {
634 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
635             &sync_lock_timeout, NULL, 0);
636 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
637 0           sync_unpark_writer(h);
638 0           sync_recover_after_timeout(h);
639 0           spin = 0;
640 0           continue;
641             }
642             }
643 0           sync_unpark_writer(h);
644 0           spin = 0;
645             }
646             }
647              
648             /* Timed wrlock: returns 1 on success, 0 on timeout. timeout<0 = infinite. */
649 4           static inline int sync_rwlock_wrlock_timed(SyncHandle *h, double timeout) {
650 4 100         if (sync_rwlock_try_wrlock(h)) return 1;
651 1 50         if (timeout == 0) return 0;
652              
653 1           SyncHeader *hdr = h->hdr;
654 1           sync_claim_reader_slot(h);
655 1           uint32_t *lock = &hdr->value;
656 1           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
657             struct timespec deadline, remaining;
658 1           int has_deadline = (timeout > 0);
659 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
660              
661 33           for (int spin = 0; ; spin++) {
662 33           uint32_t expected = 0;
663 33 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
664             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
665 1           return 1;
666 33 100         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
667 32           sync_spin_pause();
668 32           continue;
669             }
670 1           sync_park_writer(h);
671 1           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
672 1 50         if (cur != 0) {
673 1           struct timespec *pts = NULL;
674 1           int capped = 0;
675             /* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
676             * runs periodically even with a user-supplied deadline. */
677 1 50         if (has_deadline) {
678 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
679 0           sync_unpark_writer(h);
680 0           return 0;
681             }
682 1 50         if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC) {
683 0           pts = (struct timespec *)&sync_lock_timeout;
684 0           capped = 1;
685             } else {
686 1           pts = &remaining;
687             }
688             } else {
689 0           pts = (struct timespec *)&sync_lock_timeout;
690 0           capped = 1;
691             }
692 1           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
693 1 50         if (rc == -1 && errno == ETIMEDOUT) {
    50          
694 1           sync_unpark_writer(h);
695 1 50         if (!capped) return 0; /* user deadline expired */
696 0           sync_recover_after_timeout(h);
697 0           spin = 0;
698 0           continue;
699             }
700             }
701 0           sync_unpark_writer(h);
702 0           spin = 0;
703             }
704             }
705              
706 7           static inline int sync_rwlock_try_wrlock(SyncHandle *h) {
707 7           SyncHeader *hdr = h->hdr;
708 7           uint32_t expected = 0;
709 7           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
710 7           return __atomic_compare_exchange_n(&hdr->value, &expected, mypid,
711             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
712             }
713              
714 8           static inline void sync_rwlock_wrunlock(SyncHandle *h) {
715 8           SyncHeader *hdr = h->hdr;
716 8           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
717 8 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
718 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
719 8           }
720              
721             /* Downgrade: atomically convert wrlock to rdlock (writer -> 1 reader).
722             * Also accounts the post-downgrade reader contribution on our slot so a
723             * subsequent SIGKILL leaves a recoverable subcount, not an orphan. */
724 1           static inline void sync_rwlock_downgrade(SyncHandle *h) {
725 1           SyncHeader *hdr = h->hdr;
726 1           sync_claim_reader_slot(h);
727 1 50         if (h->my_slot_idx != UINT32_MAX)
728 1           __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
729 1           __atomic_store_n(&hdr->value, 1, __ATOMIC_RELEASE);
730 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
731 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
732 1           }
733              
734             /* ================================================================
735             * Semaphore operations
736             *
737             * value = current count (0..param where param=max)
738             * CAS-based acquire/release, futex wait when 0
739             * ================================================================ */
740              
741 29           static inline int sync_sem_try_acquire(SyncHandle *h) {
742 29           SyncHeader *hdr = h->hdr;
743 0           for (;;) {
744 29           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
745 53 100         if (cur == 0) return 0;
746 24 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
747             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
748 24           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
749 24           return 1;
750             }
751             }
752             }
753              
754 10           static inline int sync_sem_try_acquire_n(SyncHandle *h, uint32_t n) {
755 10 100         if (n == 0) return 1;
756 9           SyncHeader *hdr = h->hdr;
757 0           for (;;) {
758 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
759 14 100         if (cur < n) return 0;
760 5 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
761             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
762 5           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
763 5           return 1;
764             }
765             }
766             }
767              
768 3           static inline int sync_sem_acquire_n(SyncHandle *h, uint32_t n, double timeout) {
769 3 50         if (n == 0) return 1;
770 3 100         if (sync_sem_try_acquire_n(h, n)) return 1;
771 1 50         if (timeout == 0) return 0;
772              
773 1           SyncHeader *hdr = h->hdr;
774             struct timespec deadline, remaining;
775 1           int has_deadline = (timeout > 0);
776 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
777              
778 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
779              
780 0           for (;;) {
781 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
782 1 50         if (cur >= n) {
783 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
784             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
785 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
786 1           return 1;
787             }
788 0           continue;
789             }
790              
791 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
792              
793 1           struct timespec *pts = NULL;
794 1 50         if (has_deadline) {
795 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
796 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
797 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
798 0           return 0;
799             }
800 1           pts = &remaining;
801             }
802              
803 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, cur, pts, NULL, 0);
804 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
805              
806 1 50         if (sync_sem_try_acquire_n(h, n)) return 1;
807              
808 1 50         if (has_deadline) {
809 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
810 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
811 1           return 0;
812             }
813             }
814             }
815             }
816              
817 4           static inline int sync_sem_acquire(SyncHandle *h, double timeout) {
818 4 100         if (sync_sem_try_acquire(h)) return 1;
819 2 100         if (timeout == 0) return 0;
820              
821 1           SyncHeader *hdr = h->hdr;
822             struct timespec deadline, remaining;
823 1           int has_deadline = (timeout > 0);
824 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
825              
826 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
827              
828 0           for (;;) {
829 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
830 1 50         if (cur > 0) {
831 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
832             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
833 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
834 1           return 1;
835             }
836 0           continue;
837             }
838              
839 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
840              
841 1           struct timespec *pts = NULL;
842 1 50         if (has_deadline) {
843 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
844 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
845 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
846 0           return 0;
847             }
848 1           pts = &remaining;
849             }
850              
851 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, 0, pts, NULL, 0);
852 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
853              
854             /* Retry acquire after wakeup */
855 1 50         if (sync_sem_try_acquire(h)) return 1;
856              
857 1 50         if (has_deadline) {
858 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
859 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
860 1           return 0;
861             }
862             }
863             }
864             }
865              
866 7           static inline void sync_sem_release(SyncHandle *h) {
867 7           SyncHeader *hdr = h->hdr;
868 7           uint32_t max_val = hdr->param;
869 0           for (;;) {
870 7           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
871 7           uint32_t next = cur + 1;
872 7 100         if (next > max_val) next = max_val; /* clamp at max */
873 7 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
874             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
875 7           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
876 7 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
877 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
878 7           return;
879             }
880             }
881             }
882              
883 10           static inline void sync_sem_release_n(SyncHandle *h, uint32_t n) {
884 10 100         if (n == 0) return;
885 9           SyncHeader *hdr = h->hdr;
886 9           uint32_t max_val = hdr->param;
887 0           for (;;) {
888 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
889 9 100         uint32_t next = (n > max_val - cur) ? max_val : cur + n;
890 9 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
891             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
892 9           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
893 9 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) {
894 0           uint32_t wake = n < (uint32_t)INT_MAX ? n : INT_MAX;
895 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, wake, NULL, NULL, 0);
896             }
897 9           return;
898             }
899             }
900             }
901              
902             /* Drain: acquire all available permits at once, return count acquired */
903 4           static inline uint32_t sync_sem_drain(SyncHandle *h) {
904 4           SyncHeader *hdr = h->hdr;
905 4           uint32_t cur = __atomic_exchange_n(&hdr->value, 0, __ATOMIC_ACQUIRE);
906 4 100         if (cur == 0) return 0;
907 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
908 3           return cur;
909             }
910              
911 33           static inline uint32_t sync_sem_value(SyncHandle *h) {
912 33           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
913             }
914              
915             /* ================================================================
916             * Barrier operations
917             *
918             * param = number of parties
919             * value = arrived count (0..param)
920             * generation = bit 31: "broken" flag (set on timeout)
921             * bits 0..30: generation counter (bumped each trip/reset)
922             *
923             * Timeout breaks the barrier permanently; all waiters return -1 and all
924             * future wait() calls also return -1 until sync_barrier_reset() is called.
925             * This mirrors pthread_barrier "broken" semantics and avoids the race where
926             * a timed-out waiter's reset raced with new-generation arrivals.
927             * ================================================================ */
928              
929             #define SYNC_BARRIER_BROKEN_BIT 0x80000000U
930             #define SYNC_BARRIER_GEN_MASK 0x7FFFFFFFU
931              
932 9           static inline int sync_barrier_wait(SyncHandle *h, double timeout) {
933 9           SyncHeader *hdr = h->hdr;
934 9           uint32_t parties = hdr->param;
935              
936 9 100         if (timeout == 0) return -1; /* non-blocking probe: can't rendezvous instantly */
937              
938 8           uint32_t gen_raw = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
939 8 100         if (gen_raw & SYNC_BARRIER_BROKEN_BIT) return -1; /* already broken */
940              
941 7           uint32_t arrived = __atomic_add_fetch(&hdr->value, 1, __ATOMIC_ACQ_REL);
942              
943 7 50         if (arrived == parties) {
944             /* Last to arrive — trip the barrier. CAS preserves broken bit invariant. */
945 0           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
946 0           for (;;) {
947 0           uint32_t old_g = __atomic_load_n(&hdr->generation, __ATOMIC_RELAXED);
948 0 0         if (old_g & SYNC_BARRIER_BROKEN_BIT) {
949 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
950 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
951 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
952 0           return -1;
953             }
954 0           uint32_t new_g = (old_g + 1) & SYNC_BARRIER_GEN_MASK;
955 0 0         if (__atomic_compare_exchange_n(&hdr->generation, &old_g, new_g,
956             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
957 0           break;
958             }
959 0           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
960 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
961 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
962 0           return 1; /* leader */
963             }
964              
965             /* Not last — wait for generation to change or broken bit to appear */
966 7           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
967              
968             struct timespec deadline, remaining;
969 7           int has_deadline = (timeout > 0);
970 7 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
971              
972 7           for (;;) {
973 14           uint32_t cur_raw = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
974 14 50         if (cur_raw & SYNC_BARRIER_BROKEN_BIT) return -1; /* broken */
975 14 100         if (cur_raw != gen_raw) return 0; /* barrier tripped */
976              
977 9           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
978              
979 9           struct timespec *pts = NULL;
980 9 50         if (has_deadline) {
981 9 100         if (!sync_remaining_time(&deadline, &remaining)) {
982 2           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
983             /* Try to break the barrier. If CAS fails with BROKEN_BIT
984             * clear, only gen changed — our cohort tripped → return 0.
985             * If CAS fails with BROKEN_BIT set, current state is
986             * broken (whether by us, another waiter, or trip+re-break)
987             * → return -1, matching the non-timeout path. */
988 2           uint32_t g = gen_raw;
989 2           if (!__atomic_compare_exchange_n(&hdr->generation, &g,
990 2 50         gen_raw | SYNC_BARRIER_BROKEN_BIT,
991             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)
992 0 0         && !(g & SYNC_BARRIER_BROKEN_BIT)) {
993 0           return 0;
994             }
995 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
996 2           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
997 2           return -1;
998             }
999 7           pts = &remaining;
1000             }
1001              
1002 7           syscall(SYS_futex, &hdr->generation, FUTEX_WAIT, gen_raw, pts, NULL, 0);
1003 7           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1004             }
1005             }
1006              
1007 6           static inline uint32_t sync_barrier_generation(SyncHandle *h) {
1008 6           return __atomic_load_n(&h->hdr->generation, __ATOMIC_RELAXED) & SYNC_BARRIER_GEN_MASK;
1009             }
1010              
1011 4           static inline uint32_t sync_barrier_arrived(SyncHandle *h) {
1012 4           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
1013             }
1014              
1015 2           static inline int sync_barrier_is_broken(SyncHandle *h) {
1016 2           return (__atomic_load_n(&h->hdr->generation, __ATOMIC_RELAXED)
1017 2           & SYNC_BARRIER_BROKEN_BIT) != 0;
1018             }
1019              
1020 2           static inline void sync_barrier_reset(SyncHandle *h) {
1021 2           SyncHeader *hdr = h->hdr;
1022 2           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
1023             /* Bump gen and clear broken bit in one CAS */
1024 0           for (;;) {
1025 2           uint32_t old_g = __atomic_load_n(&hdr->generation, __ATOMIC_RELAXED);
1026 2           uint32_t new_g = ((old_g & SYNC_BARRIER_GEN_MASK) + 1) & SYNC_BARRIER_GEN_MASK;
1027 2 50         if (__atomic_compare_exchange_n(&hdr->generation, &old_g, new_g,
1028             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
1029 2           break;
1030             }
1031 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1032 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1033 2           }
1034              
1035             /* ================================================================
1036             * Condvar operations
1037             *
1038             * Uses the internal mutex (hdr->mutex) to protect the predicate.
1039             * value = signal counter (futex word)
1040             * generation = broadcast epoch
1041             * ================================================================ */
1042              
1043 12           static inline void sync_condvar_lock(SyncHandle *h) {
1044 12           sync_mutex_lock(h->hdr);
1045 12           __atomic_add_fetch(&h->hdr->stat_acquires, 1, __ATOMIC_RELAXED);
1046 12           }
1047              
1048 13           static inline void sync_condvar_unlock(SyncHandle *h) {
1049 13           sync_mutex_unlock(h->hdr);
1050 13           __atomic_add_fetch(&h->hdr->stat_releases, 1, __ATOMIC_RELAXED);
1051 13           }
1052              
1053 1           static inline int sync_condvar_try_lock(SyncHandle *h) {
1054 1           SyncHeader *hdr = h->hdr;
1055 1           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
1056 1           uint32_t expected = 0;
1057 1 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
1058             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
1059 1           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
1060 1           return 1;
1061             }
1062 0           return 0;
1063             }
1064              
1065             /* Wait: atomically unlock mutex, wait on futex, re-lock mutex.
1066             * Returns 1 on signal/broadcast, 0 on timeout. */
1067 4           static inline int sync_condvar_wait(SyncHandle *h, double timeout) {
1068 4           SyncHeader *hdr = h->hdr;
1069              
1070 4 100         if (timeout == 0) return 0; /* non-blocking: no wait */
1071              
1072 3           uint32_t seq = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
1073              
1074 3           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
1075              
1076 3           sync_mutex_unlock(hdr);
1077              
1078             struct timespec deadline, remaining;
1079 3           int has_deadline = (timeout > 0);
1080 3 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
1081              
1082 3           int signaled = 0;
1083 0           for (;;) {
1084 3           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
1085 3 50         if (cur != seq) { signaled = 1; break; }
1086              
1087 3           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1088              
1089 3           struct timespec *pts = NULL;
1090 3 50         if (has_deadline) {
1091 3 50         if (!sync_remaining_time(&deadline, &remaining)) {
1092 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1093 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
1094 0           break;
1095             }
1096 3           pts = &remaining;
1097             }
1098              
1099 3           long rc = syscall(SYS_futex, &hdr->value, FUTEX_WAIT, seq, pts, NULL, 0);
1100 3           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1101              
1102 3           cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
1103 3 100         if (cur != seq) { signaled = 1; break; }
1104              
1105 2 50         if (rc == -1 && errno == ETIMEDOUT && has_deadline) {
    50          
    50          
1106 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
1107 2           break;
1108             }
1109             }
1110              
1111 3           sync_mutex_lock(hdr);
1112 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
1113 3           return signaled;
1114             }
1115              
1116 1           static inline void sync_condvar_signal(SyncHandle *h) {
1117 1           SyncHeader *hdr = h->hdr;
1118 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
1119 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
1120 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1121 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
1122 1           }
1123              
1124 1           static inline void sync_condvar_broadcast(SyncHandle *h) {
1125 1           SyncHeader *hdr = h->hdr;
1126 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
1127 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
1128 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1129 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1130 1           }
1131              
1132             /* ================================================================
1133             * Once operations
1134             *
1135             * value states: 0=INIT, (SYNC_MUTEX_WRITER_BIT|pid)=RUNNING, 1=DONE
1136             * ================================================================ */
1137              
1138             #define SYNC_ONCE_INIT 0
1139             #define SYNC_ONCE_DONE 1
1140             /* RUNNING = SYNC_MUTEX_WRITER_BIT | pid */
1141              
1142 8           static inline int sync_once_is_done(SyncHandle *h) {
1143 8           return __atomic_load_n(&h->hdr->value, __ATOMIC_ACQUIRE) == SYNC_ONCE_DONE;
1144             }
1145              
1146             /* Try to become the initializer. Returns:
1147             * 1 = you are the initializer, call once_done() when finished
1148             * 0 = already done
1149             * -1 = another process is initializing (wait with once_wait) */
1150 8           static inline int sync_once_try(SyncHandle *h) {
1151 8           SyncHeader *hdr = h->hdr;
1152 8           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
1153              
1154 8           uint32_t expected = SYNC_ONCE_INIT;
1155 8 100         if (__atomic_compare_exchange_n(&hdr->value, &expected, mypid,
1156             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
1157 4           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
1158 4           return 1;
1159             }
1160 4 100         if (expected == SYNC_ONCE_DONE) return 0;
1161 3           return -1;
1162             }
1163              
1164             /* Call/wait combo: try to become initializer, or wait for completion.
1165             * Returns 1 if caller is the initializer, 0 if already done or waited. */
1166 6           static inline int sync_once_enter(SyncHandle *h, double timeout) {
1167 6           SyncHeader *hdr = h->hdr;
1168              
1169             /* Non-blocking probe: just try, don't wait */
1170 6           int r = sync_once_try(h);
1171 6 100         if (r == 1) return 1;
1172 3 100         if (r == 0) return 0;
1173 2 100         if (timeout == 0) return 0;
1174              
1175             struct timespec deadline, remaining;
1176 1           int has_deadline = (timeout > 0);
1177 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
1178              
1179 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
1180              
1181 1           for (;;) {
1182 2           r = sync_once_try(h);
1183 2 100         if (r == 1) return 1; /* caller is initializer */
1184 1 50         if (r == 0) return 0; /* already done */
1185              
1186             /* r == -1: someone else is running. Wait or detect stale. */
1187 1           uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
1188 1 50         if (val == SYNC_ONCE_DONE) return 0;
1189 2 50         if (val == SYNC_ONCE_INIT) continue; /* race: was reset, retry */
1190              
1191             /* Check stale initializer */
1192 1 50         if (val >= SYNC_MUTEX_WRITER_BIT) {
1193 1           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
1194 1 50         if (!sync_pid_alive(pid)) {
1195 1 50         if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
1196             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
1197 1           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
1198 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1199 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1200             }
1201 1           continue;
1202             }
1203             }
1204              
1205 0           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1206              
1207             /* Always cap at SYNC_LOCK_TIMEOUT_SEC so stale-initializer recovery
1208             * runs periodically even when the caller specifies infinite timeout. */
1209 0           struct timespec *pts = (struct timespec *)&sync_lock_timeout;
1210 0 0         if (has_deadline) {
1211 0 0         if (!sync_remaining_time(&deadline, &remaining)) {
1212 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1213 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
1214 0           return 0;
1215             }
1216 0 0         if (remaining.tv_sec < SYNC_LOCK_TIMEOUT_SEC)
1217 0           pts = &remaining;
1218             }
1219              
1220 0           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, val, pts, NULL, 0);
1221 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
1222             }
1223             }
1224              
1225 4           static inline void sync_once_done(SyncHandle *h) {
1226 4           SyncHeader *hdr = h->hdr;
1227 4           __atomic_store_n(&hdr->value, SYNC_ONCE_DONE, __ATOMIC_RELEASE);
1228 4           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
1229 4 100         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1230 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1231 4           }
1232              
1233 2           static inline void sync_once_reset(SyncHandle *h) {
1234 2           SyncHeader *hdr = h->hdr;
1235 2           __atomic_store_n(&hdr->value, SYNC_ONCE_INIT, __ATOMIC_RELEASE);
1236 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1237 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1238 2           }
1239              
1240             /* ================================================================
1241             * Create / Open / Close
1242             *
1243             * Layout:
1244             * [0..127] : SyncHeader
1245             * [128..128+SLOTS_SIZE-1] : SyncReaderSlot[SYNC_READER_SLOTS] (RWLock only)
1246             *
1247             * Non-RWLock primitives keep total_size = sizeof(SyncHeader) (Option A:
1248             * pay-for-what-you-use, ~16KB only when needed).
1249             * ================================================================ */
1250              
1251             #define SYNC_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, SYNC_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
1252              
1253 75           static inline uint64_t sync_layout_total_size(uint32_t type) {
1254 75           uint64_t sz = sizeof(SyncHeader);
1255 75 100         if (type == SYNC_TYPE_RWLOCK)
1256 15           sz += (uint64_t)SYNC_READER_SLOTS * sizeof(SyncReaderSlot);
1257 75           return sz;
1258             }
1259              
1260 75           static inline uint64_t sync_layout_slots_off(uint32_t type) {
1261 75 100         return (type == SYNC_TYPE_RWLOCK) ? sizeof(SyncHeader) : 0;
1262             }
1263              
1264 74           static SyncHandle *sync_create(const char *path, uint32_t type, uint32_t param,
1265             uint32_t initial, char *errbuf) {
1266 74 50         if (errbuf) errbuf[0] = '\0';
1267              
1268 74 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
1269 74 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    100          
    50          
1270 73 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    100          
    50          
1271 71 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    100          
    50          
1272              
1273 69           uint64_t total_size = sync_layout_total_size(type);
1274 69           uint64_t slots_off = sync_layout_slots_off(type);
1275 69           int anonymous = (path == NULL);
1276             size_t map_size;
1277             void *base;
1278              
1279 69 100         if (anonymous) {
1280 53           map_size = (size_t)total_size;
1281 53           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
1282             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
1283 53 50         if (base == MAP_FAILED) {
1284 0 0         SYNC_ERR("mmap(anonymous): %s", strerror(errno));
1285 0           return NULL;
1286             }
1287 53           SyncHeader *hdr = (SyncHeader *)base;
1288 53           memset(hdr, 0, sizeof(SyncHeader));
1289 53           hdr->magic = SYNC_MAGIC;
1290 53           hdr->version = SYNC_VERSION;
1291 53           hdr->type = type;
1292 53           hdr->param = param;
1293 53           hdr->total_size = total_size;
1294 53           hdr->reader_slots_off = slots_off;
1295 53 100         if (type == SYNC_TYPE_SEMAPHORE)
1296 17           hdr->value = initial;
1297             /* MAP_ANONYMOUS already zero-fills reader_slots region. */
1298 53           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1299 53           goto setup_handle;
1300             } else {
1301 16           int fd = open(path, O_RDWR | O_CREAT, 0666);
1302 19 50         if (fd < 0) { SYNC_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
1303              
1304 16 50         if (flock(fd, LOCK_EX) < 0) {
1305 0 0         SYNC_ERR("flock(%s): %s", path, strerror(errno));
1306 0           close(fd); return NULL;
1307             }
1308              
1309             struct stat st;
1310 16 50         if (fstat(fd, &st) < 0) {
1311 0 0         SYNC_ERR("fstat(%s): %s", path, strerror(errno));
1312 0           flock(fd, LOCK_UN); close(fd); return NULL;
1313             }
1314              
1315 16           int is_new = (st.st_size == 0);
1316              
1317 16 100         if (!is_new && (uint64_t)st.st_size < sizeof(SyncHeader)) {
    50          
1318 0 0         SYNC_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
1319 0           flock(fd, LOCK_UN); close(fd); return NULL;
1320             }
1321              
1322 16 100         if (is_new) {
1323 7 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1324 0 0         SYNC_ERR("ftruncate(%s): %s", path, strerror(errno));
1325 0           flock(fd, LOCK_UN); close(fd); return NULL;
1326             }
1327             }
1328              
1329 16 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
1330 16           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1331 16 50         if (base == MAP_FAILED) {
1332 0 0         SYNC_ERR("mmap(%s): %s", path, strerror(errno));
1333 0           flock(fd, LOCK_UN); close(fd); return NULL;
1334             }
1335              
1336 16           SyncHeader *hdr = (SyncHeader *)base;
1337              
1338 16 100         if (!is_new) {
1339 27           int valid = (hdr->magic == SYNC_MAGIC &&
1340 9 50         hdr->version == SYNC_VERSION &&
1341 24 50         hdr->type == type &&
    100          
1342 6 50         hdr->total_size == (uint64_t)st.st_size);
1343 9 100         if (valid && type == SYNC_TYPE_RWLOCK) {
    100          
1344             /* reader_slots_off must point to a valid region inside the file. */
1345 1           uint64_t need = sizeof(SyncHeader) +
1346             (uint64_t)SYNC_READER_SLOTS * sizeof(SyncReaderSlot);
1347 1 50         if (hdr->reader_slots_off != sizeof(SyncHeader) ||
1348 1 50         (uint64_t)st.st_size < need)
1349 0           valid = 0;
1350             }
1351 9 100         if (!valid) {
1352 3 50         SYNC_ERR("%s: invalid or incompatible sync file", path);
1353 3           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1354             }
1355 6           flock(fd, LOCK_UN);
1356 6           close(fd);
1357 6           goto setup_handle;
1358             }
1359              
1360             /* Initialize while holding the flock */
1361 7           memset(base, 0, (size_t)total_size); /* zero header + reader_slots region */
1362 7           hdr->magic = SYNC_MAGIC;
1363 7           hdr->version = SYNC_VERSION;
1364 7           hdr->type = type;
1365 7           hdr->param = param;
1366 7           hdr->total_size = total_size;
1367 7           hdr->reader_slots_off = slots_off;
1368 7 100         if (type == SYNC_TYPE_SEMAPHORE)
1369 3           hdr->value = initial;
1370 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1371              
1372 7           flock(fd, LOCK_UN);
1373 7           close(fd);
1374             } /* end file-backed */
1375              
1376 66           setup_handle:;
1377             {
1378 66           SyncHeader *hdr = (SyncHeader *)base;
1379 66           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1380 66 50         if (!h) { munmap(base, map_size); return NULL; }
1381              
1382 66           h->hdr = hdr;
1383 66           h->mmap_size = map_size;
1384 66 100         h->path = path ? strdup(path) : NULL;
1385 66           h->notify_fd = -1;
1386 66           h->backing_fd = -1;
1387 132           h->reader_slots = (hdr->reader_slots_off > 0)
1388 13           ? (SyncReaderSlot *)((char *)base + hdr->reader_slots_off)
1389 66 100         : NULL;
1390 66           h->my_slot_idx = UINT32_MAX;
1391              
1392 66           return h;
1393             }
1394             }
1395              
1396 6           static SyncHandle *sync_create_memfd(const char *name, uint32_t type,
1397             uint32_t param, uint32_t initial,
1398             char *errbuf) {
1399 6 50         if (errbuf) errbuf[0] = '\0';
1400              
1401 6 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
1402 6 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    50          
    0          
1403 6 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    50          
    0          
1404 6 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    50          
    0          
1405              
1406 6           uint64_t total_size = sync_layout_total_size(type);
1407 6           uint64_t slots_off = sync_layout_slots_off(type);
1408              
1409 6 50         int fd = memfd_create(name ? name : "sync", MFD_CLOEXEC | MFD_ALLOW_SEALING);
1410 6 50         if (fd < 0) { SYNC_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1411              
1412 6 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1413 0 0         SYNC_ERR("ftruncate(memfd): %s", strerror(errno));
1414 0           close(fd); return NULL;
1415             }
1416 6           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
1417              
1418 6           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1419 6 50         if (base == MAP_FAILED) {
1420 0 0         SYNC_ERR("mmap(memfd): %s", strerror(errno));
1421 0           close(fd); return NULL;
1422             }
1423              
1424 6           SyncHeader *hdr = (SyncHeader *)base;
1425 6           memset(hdr, 0, (size_t)total_size); /* zero header + reader_slots region */
1426 6           hdr->magic = SYNC_MAGIC;
1427 6           hdr->version = SYNC_VERSION;
1428 6           hdr->type = type;
1429 6           hdr->param = param;
1430 6           hdr->total_size = total_size;
1431 6           hdr->reader_slots_off = slots_off;
1432              
1433 6 100         if (type == SYNC_TYPE_SEMAPHORE)
1434 2           hdr->value = initial;
1435              
1436 6           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1437              
1438 6           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1439 6 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
1440              
1441 6           h->hdr = hdr;
1442 6           h->mmap_size = (size_t)total_size;
1443 6           h->path = NULL;
1444 6           h->notify_fd = -1;
1445 6           h->backing_fd = fd;
1446 6           h->reader_slots = (slots_off > 0)
1447             ? (SyncReaderSlot *)((char *)base + slots_off)
1448 6 100         : NULL;
1449 6           h->my_slot_idx = UINT32_MAX;
1450              
1451 6           return h;
1452             }
1453              
1454 5           static SyncHandle *sync_open_fd(int fd, uint32_t type, char *errbuf) {
1455 5 50         if (errbuf) errbuf[0] = '\0';
1456              
1457             struct stat st;
1458 5 100         if (fstat(fd, &st) < 0) {
1459 1 50         SYNC_ERR("fstat(fd=%d): %s", fd, strerror(errno));
1460 1           return NULL;
1461             }
1462              
1463 4 50         if ((uint64_t)st.st_size < sizeof(SyncHeader)) {
1464 0 0         SYNC_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
1465 0           return NULL;
1466             }
1467              
1468 4           size_t map_size = (size_t)st.st_size;
1469 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1470 4 50         if (base == MAP_FAILED) {
1471 0 0         SYNC_ERR("mmap(fd=%d): %s", fd, strerror(errno));
1472 0           return NULL;
1473             }
1474              
1475 4           SyncHeader *hdr = (SyncHeader *)base;
1476 12           int valid = (hdr->magic == SYNC_MAGIC &&
1477 4 50         hdr->version == SYNC_VERSION &&
1478 11 50         hdr->type == type &&
    100          
1479 3 50         hdr->total_size == (uint64_t)st.st_size);
1480 4 100         if (valid && type == SYNC_TYPE_RWLOCK) {
    100          
1481 1           uint64_t need = sizeof(SyncHeader) +
1482             (uint64_t)SYNC_READER_SLOTS * sizeof(SyncReaderSlot);
1483 1 50         if (hdr->reader_slots_off != sizeof(SyncHeader) ||
1484 1 50         (uint64_t)st.st_size < need)
1485 0           valid = 0;
1486             }
1487 4 100         if (!valid) {
1488 1 50         SYNC_ERR("fd %d: invalid or incompatible sync", fd);
1489 1           munmap(base, map_size);
1490 1           return NULL;
1491             }
1492              
1493 3           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
1494 3 50         if (myfd < 0) {
1495 0 0         SYNC_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
1496 0           munmap(base, map_size);
1497 0           return NULL;
1498             }
1499              
1500 3           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1501 3 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
1502              
1503 3           h->hdr = hdr;
1504 3           h->mmap_size = map_size;
1505 3           h->path = NULL;
1506 3           h->notify_fd = -1;
1507 3           h->backing_fd = myfd;
1508 6           h->reader_slots = (hdr->reader_slots_off > 0)
1509 1           ? (SyncReaderSlot *)((char *)base + hdr->reader_slots_off)
1510 3 100         : NULL;
1511 3           h->my_slot_idx = UINT32_MAX;
1512              
1513 3           return h;
1514             }
1515              
1516 75           static void sync_destroy(SyncHandle *h) {
1517 75 50         if (!h) return;
1518             /* Release reader slot — only if we still own it AND no fork has happened
1519             * since we claimed it. A forked child that inherits the handle but never
1520             * acquired the lock itself must NOT clear the parent's slot. */
1521 75 100         if (h->reader_slots && h->my_slot_idx != UINT32_MAX && h->cached_pid &&
    100          
    50          
1522 8 50         h->cached_fork_gen == __atomic_load_n(&sync_fork_gen, __ATOMIC_RELAXED)) {
1523 8           uint32_t expected = h->cached_pid;
1524             /* CAS pid -> 0; do NOT clear subcount/wp/writp — between the CAS and
1525             * a follow-up store, a new process could claim the slot, and our
1526             * store would clobber its state. sync_claim_reader_slot zeros all
1527             * mirror fields on every claim, so leaving stale values is safe. */
1528 8           __atomic_compare_exchange_n(&h->reader_slots[h->my_slot_idx].pid,
1529             &expected, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
1530             }
1531 75 100         if (h->notify_fd >= 0) close(h->notify_fd);
1532 75 100         if (h->backing_fd >= 0) close(h->backing_fd);
1533 75 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
1534 75           free(h->path);
1535 75           free(h);
1536             }
1537              
1538             /* ================================================================
1539             * Eventfd integration
1540             * ================================================================ */
1541              
1542 11           static int sync_create_eventfd(SyncHandle *h) {
1543 11 50         if (h->notify_fd >= 0) return h->notify_fd;
1544 11           int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1545 11 50         if (efd < 0) return -1;
1546 11           h->notify_fd = efd;
1547 11           return efd;
1548             }
1549              
1550 10           static int sync_notify(SyncHandle *h) {
1551 10 50         if (h->notify_fd < 0) return 0;
1552 10           uint64_t val = 1;
1553 10           return write(h->notify_fd, &val, sizeof(val)) == sizeof(val);
1554             }
1555              
1556 11           static int64_t sync_eventfd_consume(SyncHandle *h) {
1557 11 50         if (h->notify_fd < 0) return -1;
1558 11           uint64_t val = 0;
1559 11 100         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1560 6           return (int64_t)val;
1561             }
1562              
1563             /* ================================================================
1564             * Misc
1565             * ================================================================ */
1566              
1567 0           static int sync_msync(SyncHandle *h) {
1568 0           return msync(h->hdr, h->mmap_size, MS_SYNC);
1569             }
1570              
1571             #endif /* SYNC_H */