File Coverage

sync.h
Criterion Covered Total %
statement 530 706 75.0
branch 251 494 50.8
condition n/a
subroutine n/a
pod n/a
total 781 1200 65.0


line stmt bran cond sub pod time code
1             /*
2             * sync.h -- Shared-memory synchronization primitives for Linux
3             *
4             * Five primitives:
5             * Semaphore — bounded counter (CAS-based, cross-process resource limiting)
6             * Barrier — N processes rendezvous at a point before proceeding
7             * RWLock — reader-writer lock for external resources
8             * Condvar — condition variable with futex wait/signal/broadcast
9             * Once — one-time initialization gate (like pthread_once)
10             *
11             * All use file-backed mmap(MAP_SHARED) for cross-process sharing,
12             * futex for blocking wait, and PID-based stale lock recovery.
13             */
14              
15             #ifndef SYNC_H
16             #define SYNC_H
17              
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34              
35             /* ================================================================
36             * Constants
37             * ================================================================ */
38              
39             #define SYNC_MAGIC 0x53594E31U /* "SYN1" */
40             #define SYNC_VERSION 1
41              
42             /* Primitive type IDs */
43             #define SYNC_TYPE_SEMAPHORE 0
44             #define SYNC_TYPE_BARRIER 1
45             #define SYNC_TYPE_RWLOCK 2
46             #define SYNC_TYPE_CONDVAR 3
47             #define SYNC_TYPE_ONCE 4
48              
49             #define SYNC_ERR_BUFLEN 256
50             #define SYNC_SPIN_LIMIT 32
51             #define SYNC_LOCK_TIMEOUT_SEC 2
52              
53             /* ================================================================
54             * Header (128 bytes = 2 cache lines, lives at start of mmap)
55             * ================================================================ */
56              
57             typedef struct {
58             /* ---- Cache line 0 (0-63): immutable after create ---- */
59             uint32_t magic; /* 0 */
60             uint32_t version; /* 4 */
61             uint32_t type; /* 8: SYNC_TYPE_* */
62             uint32_t param; /* 12: type-specific (sem max, barrier count, etc.) */
63             uint64_t total_size; /* 16: mmap size */
64             uint8_t _pad0[40]; /* 24-63 */
65              
66             /* ---- Cache line 1 (64-127): mutable state ---- */
67              
68             /* Semaphore: value = current count, waiters = blocked acquirers */
69             /* Barrier: value = arrived count, waiters = blocked at barrier,
70             generation = increments each time barrier trips */
71             /* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
72             waiters = blocked lockers */
73             /* Condvar: value = signal counter (futex word), waiters = blocked waiters,
74             mutex = associated mutex for predicate protection */
75             /* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
76             waiters = blocked on completion */
77              
78             uint32_t value; /* 64: primary state word (futex target) */
79             uint32_t waiters; /* 68: waiter count */
80             uint32_t generation; /* 72: barrier generation / condvar epoch */
81             uint32_t mutex; /* 76: condvar mutex (0 or PID|0x80000000) */
82             uint32_t mutex_waiters; /* 80: condvar mutex waiter count */
83             uint32_t stat_recoveries;/* 84 */
84             uint64_t stat_acquires; /* 88 */
85             uint64_t stat_releases; /* 96 */
86             uint64_t stat_waits; /* 104 */
87             uint64_t stat_timeouts; /* 112 */
88             uint32_t stat_signals; /* 120 */
89             uint32_t rwlock_writers_waiting; /* 124: RWLock write-preferring yield signal
90             (writers only, not readers) */
91             } SyncHeader;
92              
93             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
94             _Static_assert(sizeof(SyncHeader) == 128, "SyncHeader must be 128 bytes");
95             #endif
96              
97             /* ================================================================
98             * Process-local handle
99             * ================================================================ */
100              
101             typedef struct {
102             SyncHeader *hdr;
103             size_t mmap_size;
104             char *path;
105             int notify_fd; /* eventfd, -1 if disabled */
106             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
107             } SyncHandle;
108              
109             /* ================================================================
110             * Utility
111             * ================================================================ */
112              
113 126           static inline void sync_spin_pause(void) {
114             #if defined(__x86_64__) || defined(__i386__)
115 126           __asm__ volatile("pause" ::: "memory");
116             #elif defined(__aarch64__)
117             __asm__ volatile("yield" ::: "memory");
118             #else
119             __asm__ volatile("" ::: "memory");
120             #endif
121 126           }
122              
123 3           static inline int sync_pid_alive(uint32_t pid) {
124 3 50         if (pid == 0) return 1;
125 3 100         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    50          
126             }
127              
128             /* Convert timeout in seconds (double) to absolute deadline */
129 17           static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
130 17           clock_gettime(CLOCK_MONOTONIC, deadline);
131 17           deadline->tv_sec += (time_t)timeout;
132 17           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
133 17 100         if (deadline->tv_nsec >= 1000000000L) {
134 1           deadline->tv_sec++;
135 1           deadline->tv_nsec -= 1000000000L;
136             }
137 17           }
138              
139             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
140 20           static inline int sync_remaining_time(const struct timespec *deadline,
141             struct timespec *remaining) {
142             struct timespec now;
143 20           clock_gettime(CLOCK_MONOTONIC, &now);
144 20           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
145 20           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
146 20 100         if (remaining->tv_nsec < 0) {
147 12           remaining->tv_sec--;
148 12           remaining->tv_nsec += 1000000000L;
149             }
150 20           return remaining->tv_sec >= 0;
151             }
152              
153             /* ================================================================
154             * Mutex helpers (for Condvar's internal mutex)
155             * ================================================================ */
156              
157             #define SYNC_MUTEX_WRITER_BIT 0x80000000U
158             #define SYNC_MUTEX_PID_MASK 0x7FFFFFFFU
159             #define SYNC_MUTEX_VAL(pid) (SYNC_MUTEX_WRITER_BIT | ((uint32_t)(pid) & SYNC_MUTEX_PID_MASK))
160              
161             static const struct timespec sync_lock_timeout = { SYNC_LOCK_TIMEOUT_SEC, 0 };
162              
163 0           static inline void sync_recover_stale_mutex(SyncHeader *hdr, uint32_t observed) {
164 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
165             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
166 0           return;
167 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
168 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
169 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
170             }
171              
172 15           static inline void sync_mutex_lock(SyncHeader *hdr) {
173 15           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
174 15           for (int spin = 0; ; spin++) {
175 15           uint32_t expected = 0;
176 15 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
177             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
178 15           return;
179 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
180 0           sync_spin_pause();
181 0           continue;
182             }
183 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
184 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
185 0 0         if (cur != 0) {
186 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
187             &sync_lock_timeout, NULL, 0);
188 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
189 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
190 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
191 0 0         if (val >= SYNC_MUTEX_WRITER_BIT) {
192 0           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
193 0 0         if (!sync_pid_alive(pid))
194 0           sync_recover_stale_mutex(hdr, val);
195             }
196 0           spin = 0;
197 0           continue;
198             }
199             }
200 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
201 0           spin = 0;
202             }
203             }
204              
205 16           static inline void sync_mutex_unlock(SyncHeader *hdr) {
206 16           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
207 16 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
208 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
209 16           }
210              
211             /* ================================================================
212             * RWLock helpers (for SYNC_TYPE_RWLOCK)
213             *
214             * value == 0: unlocked
215             * value 1..0x7FFFFFFF: N active readers
216             * value 0x80000000 | pid: write-locked by pid
217             * ================================================================ */
218              
219             #define SYNC_RWLOCK_WRITER_BIT 0x80000000U
220             #define SYNC_RWLOCK_PID_MASK 0x7FFFFFFFU
221             #define SYNC_RWLOCK_WR(pid) (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))
222              
223             static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
224             static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);
225              
226 0           static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
227 0 0         if (!__atomic_compare_exchange_n(&hdr->value, &observed, 0,
228             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
229 0           return;
230 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
231 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
232 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
233             }
234              
235 7           static inline void sync_rwlock_rdlock(SyncHeader *hdr) {
236 7           uint32_t *lock = &hdr->value;
237 7           uint32_t *w = &hdr->waiters;
238 7           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
239 7           for (int spin = 0; ; spin++) {
240 7           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
241             /* Write-preferring: yield to parked writers when lock is free. */
242 7 100         if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
    50          
243 1 50         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
244             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
245 7           return;
246 6 50         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
247 6 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
248             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
249 6           return;
250             }
251 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
252 0           sync_spin_pause();
253 0           continue;
254             }
255 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
256 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
257             /* Sleep when write-locked OR yielding to parked writers (cur==0) */
258 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
    0          
259 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
260             &sync_lock_timeout, NULL, 0);
261 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
262 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
263 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
264 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
265 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
266 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
267 0 0         if (!sync_pid_alive(pid))
268 0           sync_recover_stale_rwlock(hdr, val);
269             }
270             } else {
271             /* Yielding to writers timed out — optimistically drop one
272             * writers_waiting to recover from potentially-crashed
273             * parked writer. A live writer just re-increments. */
274 0           uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
275 0 0         while (wc > 0 && !__atomic_compare_exchange_n(
276 0 0         writers_waiting, &wc, wc - 1,
277             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
278             }
279 0           spin = 0;
280 0           continue;
281             }
282             }
283 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
284 0           spin = 0;
285             }
286             }
287              
288             /* Timed rdlock: returns 1 on success, 0 on timeout. timeout<0 = infinite.
289             * No try-lock fast-path: would bypass write-preference when cur==0 &&
290             * writers_waiting > 0. Main loop's first iteration handles the uncontended
291             * case at ~same cost. */
292 3           static inline int sync_rwlock_rdlock_timed(SyncHeader *hdr, double timeout) {
293 3 50         if (timeout == 0) {
294 0           return sync_rwlock_try_rdlock(hdr);
295             }
296              
297 3           uint32_t *lock = &hdr->value;
298 3           uint32_t *w = &hdr->waiters;
299             struct timespec deadline, remaining;
300 3           int has_deadline = (timeout > 0);
301 3 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
302              
303 3           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
304 67           for (int spin = 0; ; spin++) {
305 67           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
306 67 100         if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
    50          
307 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
308             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
309 3           return 1;
310 67 100         } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
    50          
311 2 50         if (__atomic_compare_exchange_n(lock, &cur, 1,
312             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
313 2           return 1;
314             }
315 65 100         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
316 63           sync_spin_pause();
317 64           continue;
318             }
319 2           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
320 2           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
321 2 50         if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
    0          
322 2           struct timespec *pts = NULL;
323             /* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
324             * runs periodically even with a user-supplied deadline. */
325 2 50         if (has_deadline) {
326 2 100         if (!sync_remaining_time(&deadline, &remaining)) {
327 1           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
328 1           return 0;
329             }
330 1 50         if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC)
331 0           pts = (struct timespec *)&sync_lock_timeout;
332             else
333 1           pts = &remaining;
334             } else {
335 0           pts = (struct timespec *)&sync_lock_timeout;
336             }
337 1           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
338 1 50         if (rc == -1 && errno == ETIMEDOUT) {
    50          
339 1           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
340 1 50         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
341 1           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
342 1 50         if (val >= SYNC_RWLOCK_WRITER_BIT) {
343 1           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
344 1 50         if (!sync_pid_alive(pid))
345 0           sync_recover_stale_rwlock(hdr, val);
346             }
347             } else {
348             /* Yielding to writer timed out — drop one writers_waiting
349             * to recover from a potentially-crashed parked writer. */
350 0           uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
351 0 0         while (wc > 0 && !__atomic_compare_exchange_n(
352 0 0         writers_waiting, &wc, wc - 1,
353             1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
354             }
355 1           spin = 0;
356 1           continue;
357             }
358             }
359 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
360 0           spin = 0;
361             }
362             }
363              
364 3           static inline int sync_rwlock_try_rdlock(SyncHeader *hdr) {
365 3           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
366 3 100         if (cur >= SYNC_RWLOCK_WRITER_BIT) return 0;
367 2           return __atomic_compare_exchange_n(&hdr->value, &cur, cur + 1,
368             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
369             }
370              
371 12           static inline void sync_rwlock_rdunlock(SyncHeader *hdr) {
372 12           uint32_t prev = __atomic_sub_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
373 12 100         if (prev == 0 && __atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
    50          
374 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
375 12           }
376              
377 5           static inline void sync_rwlock_wrlock(SyncHeader *hdr) {
378 5           uint32_t *lock = &hdr->value;
379 5           uint32_t *w = &hdr->waiters;
380 5           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
381 5           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
382 5           for (int spin = 0; ; spin++) {
383 5           uint32_t expected = 0;
384 5 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
385             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
386 5           return;
387 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
388 0           sync_spin_pause();
389 0           continue;
390             }
391 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
392 0           __atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
393 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
394 0 0         if (cur != 0) {
395 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
396             &sync_lock_timeout, NULL, 0);
397 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
398 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
399 0           __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
400 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
401 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
402 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
403 0 0         if (!sync_pid_alive(pid))
404 0           sync_recover_stale_rwlock(hdr, val);
405             }
406 0           spin = 0;
407 0           continue;
408             }
409             }
410 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
411 0           __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
412 0           spin = 0;
413             }
414             }
415              
416             /* Timed wrlock: returns 1 on success, 0 on timeout. timeout<0 = infinite. */
417 4           static inline int sync_rwlock_wrlock_timed(SyncHeader *hdr, double timeout) {
418 4 100         if (sync_rwlock_try_wrlock(hdr)) return 1;
419 1 50         if (timeout == 0) return 0;
420              
421 1           uint32_t *lock = &hdr->value;
422 1           uint32_t *w = &hdr->waiters;
423 1           uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
424 1           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
425             struct timespec deadline, remaining;
426 1           int has_deadline = (timeout > 0);
427 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
428              
429 65           for (int spin = 0; ; spin++) {
430 65           uint32_t expected = 0;
431 65 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
432             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
433 1           return 1;
434 65 100         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
435 63           sync_spin_pause();
436 64           continue;
437             }
438 2           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
439 2           __atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
440 2           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
441 2 50         if (cur != 0) {
442 2           struct timespec *pts = NULL;
443             /* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
444             * runs periodically even with a user-supplied deadline. */
445 2 50         if (has_deadline) {
446 2 100         if (!sync_remaining_time(&deadline, &remaining)) {
447 1           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
448 1           __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
449 1           return 0;
450             }
451 1 50         if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC)
452 0           pts = (struct timespec *)&sync_lock_timeout;
453             else
454 1           pts = &remaining;
455             } else {
456 0           pts = (struct timespec *)&sync_lock_timeout;
457             }
458 1           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
459 1 50         if (rc == -1 && errno == ETIMEDOUT) {
    50          
460 1           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
461 1           __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
462 1           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
463 1 50         if (val >= SYNC_RWLOCK_WRITER_BIT) {
464 1           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
465 1 50         if (!sync_pid_alive(pid))
466 0           sync_recover_stale_rwlock(hdr, val);
467             }
468 1           spin = 0;
469 1           continue;
470             }
471             }
472 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
473 0           __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
474 0           spin = 0;
475             }
476             }
477              
478 7           static inline int sync_rwlock_try_wrlock(SyncHeader *hdr) {
479 7           uint32_t expected = 0;
480 7           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
481 7           return __atomic_compare_exchange_n(&hdr->value, &expected, mypid,
482             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
483             }
484              
485 8           static inline void sync_rwlock_wrunlock(SyncHeader *hdr) {
486 8           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
487 8 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
488 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
489 8           }
490              
491             /* Downgrade: atomically convert wrlock to rdlock (writer -> 1 reader) */
492 1           static inline void sync_rwlock_downgrade(SyncHeader *hdr) {
493 1           __atomic_store_n(&hdr->value, 1, __ATOMIC_RELEASE);
494 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
495 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
496 1           }
497              
498             /* ================================================================
499             * Semaphore operations
500             *
501             * value = current count (0..param where param=max)
502             * CAS-based acquire/release, futex wait when 0
503             * ================================================================ */
504              
505 29           static inline int sync_sem_try_acquire(SyncHandle *h) {
506 29           SyncHeader *hdr = h->hdr;
507 0           for (;;) {
508 29           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
509 53 100         if (cur == 0) return 0;
510 24 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
511             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
512 24           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
513 24           return 1;
514             }
515             }
516             }
517              
518 10           static inline int sync_sem_try_acquire_n(SyncHandle *h, uint32_t n) {
519 10 100         if (n == 0) return 1;
520 9           SyncHeader *hdr = h->hdr;
521 0           for (;;) {
522 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
523 14 100         if (cur < n) return 0;
524 5 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
525             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
526 5           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
527 5           return 1;
528             }
529             }
530             }
531              
532 3           static inline int sync_sem_acquire_n(SyncHandle *h, uint32_t n, double timeout) {
533 3 50         if (n == 0) return 1;
534 3 100         if (sync_sem_try_acquire_n(h, n)) return 1;
535 1 50         if (timeout == 0) return 0;
536              
537 1           SyncHeader *hdr = h->hdr;
538             struct timespec deadline, remaining;
539 1           int has_deadline = (timeout > 0);
540 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
541              
542 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
543              
544 0           for (;;) {
545 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
546 1 50         if (cur >= n) {
547 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
548             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
549 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
550 1           return 1;
551             }
552 0           continue;
553             }
554              
555 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
556              
557 1           struct timespec *pts = NULL;
558 1 50         if (has_deadline) {
559 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
560 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
561 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
562 0           return 0;
563             }
564 1           pts = &remaining;
565             }
566              
567 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, cur, pts, NULL, 0);
568 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
569              
570 1 50         if (sync_sem_try_acquire_n(h, n)) return 1;
571              
572 1 50         if (has_deadline) {
573 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
574 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
575 1           return 0;
576             }
577             }
578             }
579             }
580              
581 4           static inline int sync_sem_acquire(SyncHandle *h, double timeout) {
582 4 100         if (sync_sem_try_acquire(h)) return 1;
583 2 100         if (timeout == 0) return 0;
584              
585 1           SyncHeader *hdr = h->hdr;
586             struct timespec deadline, remaining;
587 1           int has_deadline = (timeout > 0);
588 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
589              
590 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
591              
592 0           for (;;) {
593 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
594 1 50         if (cur > 0) {
595 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
596             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
597 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
598 1           return 1;
599             }
600 0           continue;
601             }
602              
603 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
604              
605 1           struct timespec *pts = NULL;
606 1 50         if (has_deadline) {
607 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
608 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
609 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
610 0           return 0;
611             }
612 1           pts = &remaining;
613             }
614              
615 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, 0, pts, NULL, 0);
616 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
617              
618             /* Retry acquire after wakeup */
619 1 50         if (sync_sem_try_acquire(h)) return 1;
620              
621 1 50         if (has_deadline) {
622 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
623 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
624 1           return 0;
625             }
626             }
627             }
628             }
629              
630 7           static inline void sync_sem_release(SyncHandle *h) {
631 7           SyncHeader *hdr = h->hdr;
632 7           uint32_t max_val = hdr->param;
633 0           for (;;) {
634 7           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
635 7           uint32_t next = cur + 1;
636 7 100         if (next > max_val) next = max_val; /* clamp at max */
637 7 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
638             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
639 7           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
640 7 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
641 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
642 7           return;
643             }
644             }
645             }
646              
647 10           static inline void sync_sem_release_n(SyncHandle *h, uint32_t n) {
648 10 100         if (n == 0) return;
649 9           SyncHeader *hdr = h->hdr;
650 9           uint32_t max_val = hdr->param;
651 0           for (;;) {
652 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
653 9 100         uint32_t next = (n > max_val - cur) ? max_val : cur + n;
654 9 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
655             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
656 9           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
657 9 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) {
658 0           uint32_t wake = n < (uint32_t)INT_MAX ? n : INT_MAX;
659 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, wake, NULL, NULL, 0);
660             }
661 9           return;
662             }
663             }
664             }
665              
666             /* Drain: acquire all available permits at once, return count acquired */
667 4           static inline uint32_t sync_sem_drain(SyncHandle *h) {
668 4           SyncHeader *hdr = h->hdr;
669 4           uint32_t cur = __atomic_exchange_n(&hdr->value, 0, __ATOMIC_ACQUIRE);
670 4 100         if (cur == 0) return 0;
671 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
672 3           return cur;
673             }
674              
675 33           static inline uint32_t sync_sem_value(SyncHandle *h) {
676 33           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
677             }
678              
679             /* ================================================================
680             * Barrier operations
681             *
682             * param = number of parties
683             * value = arrived count (0..param)
684             * generation = bit 31: "broken" flag (set on timeout)
685             * bits 0..30: generation counter (bumped each trip/reset)
686             *
687             * Timeout breaks the barrier permanently; all waiters return -1 and all
688             * future wait() calls also return -1 until sync_barrier_reset() is called.
689             * This mirrors pthread_barrier "broken" semantics and avoids the race where
690             * a timed-out waiter's reset raced with new-generation arrivals.
691             * ================================================================ */
692              
693             #define SYNC_BARRIER_BROKEN_BIT 0x80000000U
694             #define SYNC_BARRIER_GEN_MASK 0x7FFFFFFFU
695              
696 9           static inline int sync_barrier_wait(SyncHandle *h, double timeout) {
697 9           SyncHeader *hdr = h->hdr;
698 9           uint32_t parties = hdr->param;
699              
700 9 100         if (timeout == 0) return -1; /* non-blocking probe: can't rendezvous instantly */
701              
702 8           uint32_t gen_raw = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
703 8 100         if (gen_raw & SYNC_BARRIER_BROKEN_BIT) return -1; /* already broken */
704              
705 7           uint32_t arrived = __atomic_add_fetch(&hdr->value, 1, __ATOMIC_ACQ_REL);
706              
707 7 50         if (arrived == parties) {
708             /* Last to arrive — trip the barrier. CAS preserves broken bit invariant. */
709 0           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
710 0           for (;;) {
711 0           uint32_t old_g = __atomic_load_n(&hdr->generation, __ATOMIC_RELAXED);
712 0 0         if (old_g & SYNC_BARRIER_BROKEN_BIT) {
713 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
714 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
715 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
716 0           return -1;
717             }
718 0           uint32_t new_g = (old_g + 1) & SYNC_BARRIER_GEN_MASK;
719 0 0         if (__atomic_compare_exchange_n(&hdr->generation, &old_g, new_g,
720             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
721 0           break;
722             }
723 0           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
724 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
725 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
726 0           return 1; /* leader */
727             }
728              
729             /* Not last — wait for generation to change or broken bit to appear */
730 7           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
731              
732             struct timespec deadline, remaining;
733 7           int has_deadline = (timeout > 0);
734 7 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
735              
736 7           for (;;) {
737 14           uint32_t cur_raw = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
738 14 50         if (cur_raw & SYNC_BARRIER_BROKEN_BIT) return -1; /* broken */
739 14 100         if (cur_raw != gen_raw) return 0; /* barrier tripped */
740              
741 9           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
742              
743 9           struct timespec *pts = NULL;
744 9 50         if (has_deadline) {
745 9 100         if (!sync_remaining_time(&deadline, &remaining)) {
746 2           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
747             /* Try to break the barrier. If CAS fails with BROKEN_BIT
748             * clear, only gen changed — our cohort tripped → return 0.
749             * If CAS fails with BROKEN_BIT set, current state is
750             * broken (whether by us, another waiter, or trip+re-break)
751             * → return -1, matching the non-timeout path. */
752 2           uint32_t g = gen_raw;
753 2           if (!__atomic_compare_exchange_n(&hdr->generation, &g,
754 2 50         gen_raw | SYNC_BARRIER_BROKEN_BIT,
755             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)
756 0 0         && !(g & SYNC_BARRIER_BROKEN_BIT)) {
757 0           return 0;
758             }
759 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
760 2           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
761 2           return -1;
762             }
763 7           pts = &remaining;
764             }
765              
766 7           syscall(SYS_futex, &hdr->generation, FUTEX_WAIT, gen_raw, pts, NULL, 0);
767 7           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
768             }
769             }
770              
771 6           static inline uint32_t sync_barrier_generation(SyncHandle *h) {
772 6           return __atomic_load_n(&h->hdr->generation, __ATOMIC_RELAXED) & SYNC_BARRIER_GEN_MASK;
773             }
774              
775 4           static inline uint32_t sync_barrier_arrived(SyncHandle *h) {
776 4           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
777             }
778              
779 2           static inline int sync_barrier_is_broken(SyncHandle *h) {
780 2           return (__atomic_load_n(&h->hdr->generation, __ATOMIC_RELAXED)
781 2           & SYNC_BARRIER_BROKEN_BIT) != 0;
782             }
783              
784 2           static inline void sync_barrier_reset(SyncHandle *h) {
785 2           SyncHeader *hdr = h->hdr;
786 2           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
787             /* Bump gen and clear broken bit in one CAS */
788 0           for (;;) {
789 2           uint32_t old_g = __atomic_load_n(&hdr->generation, __ATOMIC_RELAXED);
790 2           uint32_t new_g = ((old_g & SYNC_BARRIER_GEN_MASK) + 1) & SYNC_BARRIER_GEN_MASK;
791 2 50         if (__atomic_compare_exchange_n(&hdr->generation, &old_g, new_g,
792             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
793 2           break;
794             }
795 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
796 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
797 2           }
798              
799             /* ================================================================
800             * Condvar operations
801             *
802             * Uses the internal mutex (hdr->mutex) to protect the predicate.
803             * value = signal counter (futex word)
804             * generation = broadcast epoch
805             * ================================================================ */
806              
807 12           static inline void sync_condvar_lock(SyncHandle *h) {
808 12           sync_mutex_lock(h->hdr);
809 12           __atomic_add_fetch(&h->hdr->stat_acquires, 1, __ATOMIC_RELAXED);
810 12           }
811              
812 13           static inline void sync_condvar_unlock(SyncHandle *h) {
813 13           sync_mutex_unlock(h->hdr);
814 13           __atomic_add_fetch(&h->hdr->stat_releases, 1, __ATOMIC_RELAXED);
815 13           }
816              
817 1           static inline int sync_condvar_try_lock(SyncHandle *h) {
818 1           SyncHeader *hdr = h->hdr;
819 1           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
820 1           uint32_t expected = 0;
821 1 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
822             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
823 1           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
824 1           return 1;
825             }
826 0           return 0;
827             }
828              
829             /* Wait: atomically unlock mutex, wait on futex, re-lock mutex.
830             * Returns 1 on signal/broadcast, 0 on timeout. */
831 4           static inline int sync_condvar_wait(SyncHandle *h, double timeout) {
832 4           SyncHeader *hdr = h->hdr;
833              
834 4 100         if (timeout == 0) return 0; /* non-blocking: no wait */
835              
836 3           uint32_t seq = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
837              
838 3           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
839              
840 3           sync_mutex_unlock(hdr);
841              
842             struct timespec deadline, remaining;
843 3           int has_deadline = (timeout > 0);
844 3 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
845              
846 3           int signaled = 0;
847 0           for (;;) {
848 3           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
849 3 50         if (cur != seq) { signaled = 1; break; }
850              
851 3           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
852              
853 3           struct timespec *pts = NULL;
854 3 50         if (has_deadline) {
855 3 50         if (!sync_remaining_time(&deadline, &remaining)) {
856 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
857 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
858 0           break;
859             }
860 3           pts = &remaining;
861             }
862              
863 3           long rc = syscall(SYS_futex, &hdr->value, FUTEX_WAIT, seq, pts, NULL, 0);
864 3           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
865              
866 3           cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
867 3 100         if (cur != seq) { signaled = 1; break; }
868              
869 2 50         if (rc == -1 && errno == ETIMEDOUT && has_deadline) {
    50          
    50          
870 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
871 2           break;
872             }
873             }
874              
875 3           sync_mutex_lock(hdr);
876 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
877 3           return signaled;
878             }
879              
880 1           static inline void sync_condvar_signal(SyncHandle *h) {
881 1           SyncHeader *hdr = h->hdr;
882 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
883 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
884 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
885 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
886 1           }
887              
888 1           static inline void sync_condvar_broadcast(SyncHandle *h) {
889 1           SyncHeader *hdr = h->hdr;
890 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
891 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
892 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
893 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
894 1           }
895              
896             /* ================================================================
897             * Once operations
898             *
899             * value states: 0=INIT, (SYNC_MUTEX_WRITER_BIT|pid)=RUNNING, 1=DONE
900             * ================================================================ */
901              
902             #define SYNC_ONCE_INIT 0
903             #define SYNC_ONCE_DONE 1
904             /* RUNNING = SYNC_MUTEX_WRITER_BIT | pid */
905              
906 8           static inline int sync_once_is_done(SyncHandle *h) {
907 8           return __atomic_load_n(&h->hdr->value, __ATOMIC_ACQUIRE) == SYNC_ONCE_DONE;
908             }
909              
910             /* Try to become the initializer. Returns:
911             * 1 = you are the initializer, call once_done() when finished
912             * 0 = already done
913             * -1 = another process is initializing (wait with once_wait) */
914 8           static inline int sync_once_try(SyncHandle *h) {
915 8           SyncHeader *hdr = h->hdr;
916 8           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
917              
918 8           uint32_t expected = SYNC_ONCE_INIT;
919 8 100         if (__atomic_compare_exchange_n(&hdr->value, &expected, mypid,
920             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
921 4           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
922 4           return 1;
923             }
924 4 100         if (expected == SYNC_ONCE_DONE) return 0;
925 3           return -1;
926             }
927              
928             /* Call/wait combo: try to become initializer, or wait for completion.
929             * Returns 1 if caller is the initializer, 0 if already done or waited. */
930 6           static inline int sync_once_enter(SyncHandle *h, double timeout) {
931 6           SyncHeader *hdr = h->hdr;
932              
933             /* Non-blocking probe: just try, don't wait */
934 6           int r = sync_once_try(h);
935 6 100         if (r == 1) return 1;
936 3 100         if (r == 0) return 0;
937 2 100         if (timeout == 0) return 0;
938              
939             struct timespec deadline, remaining;
940 1           int has_deadline = (timeout > 0);
941 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
942              
943 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
944              
945 1           for (;;) {
946 2           r = sync_once_try(h);
947 2 100         if (r == 1) return 1; /* caller is initializer */
948 1 50         if (r == 0) return 0; /* already done */
949              
950             /* r == -1: someone else is running. Wait or detect stale. */
951 1           uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
952 1 50         if (val == SYNC_ONCE_DONE) return 0;
953 2 50         if (val == SYNC_ONCE_INIT) continue; /* race: was reset, retry */
954              
955             /* Check stale initializer */
956 1 50         if (val >= SYNC_MUTEX_WRITER_BIT) {
957 1           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
958 1 50         if (!sync_pid_alive(pid)) {
959 1 50         if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
960             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
961 1           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
962 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
963 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
964             }
965 1           continue;
966             }
967             }
968              
969 0           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
970              
971             /* Always cap at SYNC_LOCK_TIMEOUT_SEC so stale-initializer recovery
972             * runs periodically even when the caller specifies infinite timeout. */
973 0           struct timespec *pts = (struct timespec *)&sync_lock_timeout;
974 0 0         if (has_deadline) {
975 0 0         if (!sync_remaining_time(&deadline, &remaining)) {
976 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
977 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
978 0           return 0;
979             }
980 0 0         if (remaining.tv_sec < SYNC_LOCK_TIMEOUT_SEC)
981 0           pts = &remaining;
982             }
983              
984 0           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, val, pts, NULL, 0);
985 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
986             }
987             }
988              
989 4           static inline void sync_once_done(SyncHandle *h) {
990 4           SyncHeader *hdr = h->hdr;
991 4           __atomic_store_n(&hdr->value, SYNC_ONCE_DONE, __ATOMIC_RELEASE);
992 4           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
993 4 100         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
994 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
995 4           }
996              
997 2           static inline void sync_once_reset(SyncHandle *h) {
998 2           SyncHeader *hdr = h->hdr;
999 2           __atomic_store_n(&hdr->value, SYNC_ONCE_INIT, __ATOMIC_RELEASE);
1000 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
1001 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
1002 2           }
1003              
1004             /* ================================================================
1005             * Create / Open / Close
1006             * ================================================================ */
1007              
1008             #define SYNC_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, SYNC_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
1009              
1010 74           static SyncHandle *sync_create(const char *path, uint32_t type, uint32_t param,
1011             uint32_t initial, char *errbuf) {
1012 74 50         if (errbuf) errbuf[0] = '\0';
1013              
1014 74 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
1015 74 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    100          
    50          
1016 73 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    100          
    50          
1017 71 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    100          
    50          
1018              
1019 69           uint64_t total_size = sizeof(SyncHeader);
1020 69           int anonymous = (path == NULL);
1021             size_t map_size;
1022             void *base;
1023              
1024 69 100         if (anonymous) {
1025 53           map_size = (size_t)total_size;
1026 53           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
1027             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
1028 53 50         if (base == MAP_FAILED) {
1029 0 0         SYNC_ERR("mmap(anonymous): %s", strerror(errno));
1030 0           return NULL;
1031             }
1032 53           SyncHeader *hdr = (SyncHeader *)base;
1033 53           memset(hdr, 0, sizeof(SyncHeader));
1034 53           hdr->magic = SYNC_MAGIC;
1035 53           hdr->version = SYNC_VERSION;
1036 53           hdr->type = type;
1037 53           hdr->param = param;
1038 53           hdr->total_size = total_size;
1039 53 100         if (type == SYNC_TYPE_SEMAPHORE)
1040 17           hdr->value = initial;
1041 53           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1042 53           goto setup_handle;
1043             } else {
1044 16           int fd = open(path, O_RDWR | O_CREAT, 0666);
1045 19 50         if (fd < 0) { SYNC_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
1046              
1047 16 50         if (flock(fd, LOCK_EX) < 0) {
1048 0 0         SYNC_ERR("flock(%s): %s", path, strerror(errno));
1049 0           close(fd); return NULL;
1050             }
1051              
1052             struct stat st;
1053 16 50         if (fstat(fd, &st) < 0) {
1054 0 0         SYNC_ERR("fstat(%s): %s", path, strerror(errno));
1055 0           flock(fd, LOCK_UN); close(fd); return NULL;
1056             }
1057              
1058 16           int is_new = (st.st_size == 0);
1059              
1060 16 100         if (!is_new && (uint64_t)st.st_size < sizeof(SyncHeader)) {
    50          
1061 0 0         SYNC_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
1062 0           flock(fd, LOCK_UN); close(fd); return NULL;
1063             }
1064              
1065 16 100         if (is_new) {
1066 7 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1067 0 0         SYNC_ERR("ftruncate(%s): %s", path, strerror(errno));
1068 0           flock(fd, LOCK_UN); close(fd); return NULL;
1069             }
1070             }
1071              
1072 16 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
1073 16           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1074 16 50         if (base == MAP_FAILED) {
1075 0 0         SYNC_ERR("mmap(%s): %s", path, strerror(errno));
1076 0           flock(fd, LOCK_UN); close(fd); return NULL;
1077             }
1078              
1079 16           SyncHeader *hdr = (SyncHeader *)base;
1080              
1081 16 100         if (!is_new) {
1082 27           int valid = (hdr->magic == SYNC_MAGIC &&
1083 9 50         hdr->version == SYNC_VERSION &&
1084 24 50         hdr->type == type &&
    100          
1085 6 50         hdr->total_size == (uint64_t)st.st_size);
1086 9 100         if (!valid) {
1087 3 50         SYNC_ERR("%s: invalid or incompatible sync file", path);
1088 3           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1089             }
1090 6           flock(fd, LOCK_UN);
1091 6           close(fd);
1092 6           goto setup_handle;
1093             }
1094              
1095             /* Initialize while holding the flock */
1096 7           memset(base, 0, sizeof(SyncHeader));
1097 7           hdr->magic = SYNC_MAGIC;
1098 7           hdr->version = SYNC_VERSION;
1099 7           hdr->type = type;
1100 7           hdr->param = param;
1101 7           hdr->total_size = total_size;
1102 7 100         if (type == SYNC_TYPE_SEMAPHORE)
1103 3           hdr->value = initial;
1104 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1105              
1106 7           flock(fd, LOCK_UN);
1107 7           close(fd);
1108             } /* end file-backed */
1109              
1110 66           setup_handle:;
1111             {
1112 66           SyncHeader *hdr = (SyncHeader *)base;
1113 66           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1114 66 50         if (!h) { munmap(base, map_size); return NULL; }
1115              
1116 66           h->hdr = hdr;
1117 66           h->mmap_size = map_size;
1118 66 100         h->path = path ? strdup(path) : NULL;
1119 66           h->notify_fd = -1;
1120 66           h->backing_fd = -1;
1121              
1122 66           return h;
1123             }
1124             }
1125              
1126 6           static SyncHandle *sync_create_memfd(const char *name, uint32_t type,
1127             uint32_t param, uint32_t initial,
1128             char *errbuf) {
1129 6 50         if (errbuf) errbuf[0] = '\0';
1130              
1131 6 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
1132 6 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    50          
    0          
1133 6 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    50          
    0          
1134 6 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    50          
    0          
1135              
1136 6           uint64_t total_size = sizeof(SyncHeader);
1137              
1138 6 50         int fd = memfd_create(name ? name : "sync", MFD_CLOEXEC | MFD_ALLOW_SEALING);
1139 6 50         if (fd < 0) { SYNC_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1140              
1141 6 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1142 0 0         SYNC_ERR("ftruncate(memfd): %s", strerror(errno));
1143 0           close(fd); return NULL;
1144             }
1145 6           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
1146              
1147 6           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1148 6 50         if (base == MAP_FAILED) {
1149 0 0         SYNC_ERR("mmap(memfd): %s", strerror(errno));
1150 0           close(fd); return NULL;
1151             }
1152              
1153 6           SyncHeader *hdr = (SyncHeader *)base;
1154 6           memset(hdr, 0, sizeof(SyncHeader));
1155 6           hdr->magic = SYNC_MAGIC;
1156 6           hdr->version = SYNC_VERSION;
1157 6           hdr->type = type;
1158 6           hdr->param = param;
1159 6           hdr->total_size = total_size;
1160              
1161 6 100         if (type == SYNC_TYPE_SEMAPHORE)
1162 2           hdr->value = initial;
1163              
1164 6           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1165              
1166 6           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1167 6 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
1168              
1169 6           h->hdr = hdr;
1170 6           h->mmap_size = (size_t)total_size;
1171 6           h->path = NULL;
1172 6           h->notify_fd = -1;
1173 6           h->backing_fd = fd;
1174              
1175 6           return h;
1176             }
1177              
1178 5           static SyncHandle *sync_open_fd(int fd, uint32_t type, char *errbuf) {
1179 5 50         if (errbuf) errbuf[0] = '\0';
1180              
1181             struct stat st;
1182 5 100         if (fstat(fd, &st) < 0) {
1183 1 50         SYNC_ERR("fstat(fd=%d): %s", fd, strerror(errno));
1184 1           return NULL;
1185             }
1186              
1187 4 50         if ((uint64_t)st.st_size < sizeof(SyncHeader)) {
1188 0 0         SYNC_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
1189 0           return NULL;
1190             }
1191              
1192 4           size_t map_size = (size_t)st.st_size;
1193 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1194 4 50         if (base == MAP_FAILED) {
1195 0 0         SYNC_ERR("mmap(fd=%d): %s", fd, strerror(errno));
1196 0           return NULL;
1197             }
1198              
1199 4           SyncHeader *hdr = (SyncHeader *)base;
1200 12           int valid = (hdr->magic == SYNC_MAGIC &&
1201 4 50         hdr->version == SYNC_VERSION &&
1202 11 50         hdr->type == type &&
    100          
1203 3 50         hdr->total_size == (uint64_t)st.st_size);
1204 4 100         if (!valid) {
1205 1 50         SYNC_ERR("fd %d: invalid or incompatible sync", fd);
1206 1           munmap(base, map_size);
1207 1           return NULL;
1208             }
1209              
1210 3           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
1211 3 50         if (myfd < 0) {
1212 0 0         SYNC_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
1213 0           munmap(base, map_size);
1214 0           return NULL;
1215             }
1216              
1217 3           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1218 3 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
1219              
1220 3           h->hdr = hdr;
1221 3           h->mmap_size = map_size;
1222 3           h->path = NULL;
1223 3           h->notify_fd = -1;
1224 3           h->backing_fd = myfd;
1225              
1226 3           return h;
1227             }
1228              
1229 75           static void sync_destroy(SyncHandle *h) {
1230 75 50         if (!h) return;
1231 75 100         if (h->notify_fd >= 0) close(h->notify_fd);
1232 75 100         if (h->backing_fd >= 0) close(h->backing_fd);
1233 75 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
1234 75           free(h->path);
1235 75           free(h);
1236             }
1237              
1238             /* ================================================================
1239             * Eventfd integration
1240             * ================================================================ */
1241              
1242 11           static int sync_create_eventfd(SyncHandle *h) {
1243 11 50         if (h->notify_fd >= 0) return h->notify_fd;
1244 11           int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1245 11 50         if (efd < 0) return -1;
1246 11           h->notify_fd = efd;
1247 11           return efd;
1248             }
1249              
1250 10           static int sync_notify(SyncHandle *h) {
1251 10 50         if (h->notify_fd < 0) return 0;
1252 10           uint64_t val = 1;
1253 10           return write(h->notify_fd, &val, sizeof(val)) == sizeof(val);
1254             }
1255              
1256 11           static int64_t sync_eventfd_consume(SyncHandle *h) {
1257 11 50         if (h->notify_fd < 0) return -1;
1258 11           uint64_t val = 0;
1259 11 100         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1260 6           return (int64_t)val;
1261             }
1262              
1263             /* ================================================================
1264             * Misc
1265             * ================================================================ */
1266              
1267 0           static int sync_msync(SyncHandle *h) {
1268 0           return msync(h->hdr, h->mmap_size, MS_SYNC);
1269             }
1270              
1271             #endif /* SYNC_H */