File Coverage

sync.h
Criterion Covered Total %
statement 444 661 67.1
branch 207 450 46.0
condition n/a
subroutine n/a
pod n/a
total 651 1111 58.6


line stmt bran cond sub pod time code
1             /*
2             * sync.h -- Shared-memory synchronization primitives for Linux
3             *
4             * Five primitives:
5             * Semaphore — bounded counter (CAS-based, cross-process resource limiting)
6             * Barrier — N processes rendezvous at a point before proceeding
7             * RWLock — reader-writer lock for external resources
8             * Condvar — condition variable with futex wait/signal/broadcast
9             * Once — one-time initialization gate (like pthread_once)
10             *
11             * All use file-backed mmap(MAP_SHARED) for cross-process sharing,
12             * futex for blocking wait, and PID-based stale lock recovery.
13             */
14              
15             #ifndef SYNC_H
16             #define SYNC_H
17              
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34              
35             /* ================================================================
36             * Constants
37             * ================================================================ */
38              
39             #define SYNC_MAGIC 0x53594E31U /* "SYN1" */
40             #define SYNC_VERSION 1
41              
42             /* Primitive type IDs */
43             #define SYNC_TYPE_SEMAPHORE 0
44             #define SYNC_TYPE_BARRIER 1
45             #define SYNC_TYPE_RWLOCK 2
46             #define SYNC_TYPE_CONDVAR 3
47             #define SYNC_TYPE_ONCE 4
48              
49             #define SYNC_ERR_BUFLEN 256
50             #define SYNC_SPIN_LIMIT 32
51             #define SYNC_LOCK_TIMEOUT_SEC 2
52              
53             /* ================================================================
54             * Header (128 bytes = 2 cache lines, lives at start of mmap)
55             * ================================================================ */
56              
57             typedef struct {
58             /* ---- Cache line 0 (0-63): immutable after create ---- */
59             uint32_t magic; /* 0 */
60             uint32_t version; /* 4 */
61             uint32_t type; /* 8: SYNC_TYPE_* */
62             uint32_t param; /* 12: type-specific (sem max, barrier count, etc.) */
63             uint64_t total_size; /* 16: mmap size */
64             uint8_t _pad0[40]; /* 24-63 */
65              
66             /* ---- Cache line 1 (64-127): mutable state ---- */
67              
68             /* Semaphore: value = current count, waiters = blocked acquirers */
69             /* Barrier: value = arrived count, waiters = blocked at barrier,
70             generation = increments each time barrier trips */
71             /* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
72             waiters = blocked lockers */
73             /* Condvar: value = signal counter (futex word), waiters = blocked waiters,
74             mutex = associated mutex for predicate protection */
75             /* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
76             waiters = blocked on completion */
77              
78             uint32_t value; /* 64: primary state word (futex target) */
79             uint32_t waiters; /* 68: waiter count */
80             uint32_t generation; /* 72: barrier generation / condvar epoch */
81             uint32_t mutex; /* 76: condvar mutex (0 or PID|0x80000000) */
82             uint32_t mutex_waiters; /* 80: condvar mutex waiter count */
83             uint32_t stat_recoveries;/* 84 */
84             uint64_t stat_acquires; /* 88 */
85             uint64_t stat_releases; /* 96 */
86             uint64_t stat_waits; /* 104 */
87             uint64_t stat_timeouts; /* 112 */
88             uint32_t stat_signals; /* 120 */
89             uint8_t _pad1[4]; /* 124-127 */
90             } SyncHeader;
91              
92             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
93             _Static_assert(sizeof(SyncHeader) == 128, "SyncHeader must be 128 bytes");
94             #endif
95              
96             /* ================================================================
97             * Process-local handle
98             * ================================================================ */
99              
100             typedef struct {
101             SyncHeader *hdr;
102             size_t mmap_size;
103             char *path;
104             int notify_fd; /* eventfd, -1 if disabled */
105             int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
106             } SyncHandle;
107              
108             /* ================================================================
109             * Utility
110             * ================================================================ */
111              
112 0           static inline void sync_spin_pause(void) {
113             #if defined(__x86_64__) || defined(__i386__)
114 0           __asm__ volatile("pause" ::: "memory");
115             #elif defined(__aarch64__)
116             __asm__ volatile("yield" ::: "memory");
117             #else
118             __asm__ volatile("" ::: "memory");
119             #endif
120 0           }
121              
122 1           static inline int sync_pid_alive(uint32_t pid) {
123 1 50         if (pid == 0) return 1;
124 1 50         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    50          
125             }
126              
127             /* Convert timeout in seconds (double) to absolute deadline */
128 13           static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
129 13           clock_gettime(CLOCK_MONOTONIC, deadline);
130 13           deadline->tv_sec += (time_t)timeout;
131 13           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
132 13 100         if (deadline->tv_nsec >= 1000000000L) {
133 1           deadline->tv_sec++;
134 1           deadline->tv_nsec -= 1000000000L;
135             }
136 13           }
137              
138             /* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
139 16           static inline int sync_remaining_time(const struct timespec *deadline,
140             struct timespec *remaining) {
141             struct timespec now;
142 16           clock_gettime(CLOCK_MONOTONIC, &now);
143 16           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
144 16           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
145 16 100         if (remaining->tv_nsec < 0) {
146 10           remaining->tv_sec--;
147 10           remaining->tv_nsec += 1000000000L;
148             }
149 16           return remaining->tv_sec >= 0;
150             }
151              
152             /* ================================================================
153             * Mutex helpers (for Condvar's internal mutex)
154             * ================================================================ */
155              
156             #define SYNC_MUTEX_WRITER_BIT 0x80000000U
157             #define SYNC_MUTEX_PID_MASK 0x7FFFFFFFU
158             #define SYNC_MUTEX_VAL(pid) (SYNC_MUTEX_WRITER_BIT | ((uint32_t)(pid) & SYNC_MUTEX_PID_MASK))
159              
160             static const struct timespec sync_lock_timeout = { SYNC_LOCK_TIMEOUT_SEC, 0 };
161              
162 0           static inline void sync_recover_stale_mutex(SyncHeader *hdr, uint32_t observed) {
163 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
164             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
165 0           return;
166 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
167 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
168 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
169             }
170              
171 15           static inline void sync_mutex_lock(SyncHeader *hdr) {
172 15           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
173 15           for (int spin = 0; ; spin++) {
174 15           uint32_t expected = 0;
175 15 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
176             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
177 15           return;
178 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
179 0           sync_spin_pause();
180 0           continue;
181             }
182 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
183 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
184 0 0         if (cur != 0) {
185 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
186             &sync_lock_timeout, NULL, 0);
187 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
188 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
189 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
190 0 0         if (val >= SYNC_MUTEX_WRITER_BIT) {
191 0           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
192 0 0         if (!sync_pid_alive(pid))
193 0           sync_recover_stale_mutex(hdr, val);
194             }
195 0           spin = 0;
196 0           continue;
197             }
198             }
199 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
200 0           spin = 0;
201             }
202             }
203              
204 16           static inline void sync_mutex_unlock(SyncHeader *hdr) {
205 16           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
206 16 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
207 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
208 16           }
209              
210             /* ================================================================
211             * RWLock helpers (for SYNC_TYPE_RWLOCK)
212             *
213             * value == 0: unlocked
214             * value 1..0x7FFFFFFF: N active readers
215             * value 0x80000000 | pid: write-locked by pid
216             * ================================================================ */
217              
218             #define SYNC_RWLOCK_WRITER_BIT 0x80000000U
219             #define SYNC_RWLOCK_PID_MASK 0x7FFFFFFFU
220             #define SYNC_RWLOCK_WR(pid) (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))
221              
222             static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
223             static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);
224              
225 0           static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
226 0 0         if (!__atomic_compare_exchange_n(&hdr->value, &observed, 0,
227             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
228 0           return;
229 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
230 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
231 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
232             }
233              
234 7           static inline void sync_rwlock_rdlock(SyncHeader *hdr) {
235 7           uint32_t *lock = &hdr->value;
236 7           uint32_t *w = &hdr->waiters;
237 7           for (int spin = 0; ; spin++) {
238 7           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
239 7 50         if (cur < SYNC_RWLOCK_WRITER_BIT) {
240 7 50         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
241             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
242 7           return;
243             }
244 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
245 0           sync_spin_pause();
246 0           continue;
247             }
248 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
249 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
250 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
251 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
252             &sync_lock_timeout, NULL, 0);
253 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
254 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
255 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
256 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
257 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
258 0 0         if (!sync_pid_alive(pid))
259 0           sync_recover_stale_rwlock(hdr, val);
260             }
261 0           spin = 0;
262 0           continue;
263             }
264             }
265 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
266 0           spin = 0;
267             }
268             }
269              
270             /* Timed rdlock: returns 1 on success, 0 on timeout. timeout<0 = infinite. */
271 1           static inline int sync_rwlock_rdlock_timed(SyncHeader *hdr, double timeout) {
272 1 50         if (sync_rwlock_try_rdlock(hdr)) return 1;
273 0 0         if (timeout == 0) return 0;
274              
275 0           uint32_t *lock = &hdr->value;
276 0           uint32_t *w = &hdr->waiters;
277             struct timespec deadline, remaining;
278 0           int has_deadline = (timeout > 0);
279 0 0         if (has_deadline) sync_make_deadline(timeout, &deadline);
280              
281 0           for (int spin = 0; ; spin++) {
282 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
283 0 0         if (cur < SYNC_RWLOCK_WRITER_BIT) {
284 0 0         if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
285             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
286 0           return 1;
287             }
288 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
289 0           sync_spin_pause();
290 0           continue;
291             }
292 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
293 0           cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
294 0 0         if (cur >= SYNC_RWLOCK_WRITER_BIT) {
295 0           struct timespec *pts = NULL;
296 0 0         if (has_deadline) {
297 0 0         if (!sync_remaining_time(&deadline, &remaining)) {
298 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
299 0           return 0;
300             }
301 0           pts = &remaining;
302             } else {
303 0           pts = (struct timespec *)&sync_lock_timeout;
304             }
305 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
306 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
307 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
308 0 0         if (!has_deadline) {
309 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
310 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
311 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
312 0 0         if (!sync_pid_alive(pid))
313 0           sync_recover_stale_rwlock(hdr, val);
314             }
315             }
316 0           spin = 0;
317 0           continue;
318             }
319             }
320 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
321 0           spin = 0;
322             }
323             }
324              
325 4           static inline int sync_rwlock_try_rdlock(SyncHeader *hdr) {
326 4           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
327 4 100         if (cur >= SYNC_RWLOCK_WRITER_BIT) return 0;
328 3           return __atomic_compare_exchange_n(&hdr->value, &cur, cur + 1,
329             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
330             }
331              
332 11           static inline void sync_rwlock_rdunlock(SyncHeader *hdr) {
333 11           uint32_t prev = __atomic_sub_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
334 11 100         if (prev == 0 && __atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
    50          
335 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
336 11           }
337              
338 5           static inline void sync_rwlock_wrlock(SyncHeader *hdr) {
339 5           uint32_t *lock = &hdr->value;
340 5           uint32_t *w = &hdr->waiters;
341 5           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
342 5           for (int spin = 0; ; spin++) {
343 5           uint32_t expected = 0;
344 5 50         if (__atomic_compare_exchange_n(lock, &expected, mypid,
345             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
346 5           return;
347 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
348 0           sync_spin_pause();
349 0           continue;
350             }
351 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
352 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
353 0 0         if (cur != 0) {
354 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
355             &sync_lock_timeout, NULL, 0);
356 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
357 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
358 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
359 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
360 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
361 0 0         if (!sync_pid_alive(pid))
362 0           sync_recover_stale_rwlock(hdr, val);
363             }
364 0           spin = 0;
365 0           continue;
366             }
367             }
368 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
369 0           spin = 0;
370             }
371             }
372              
373             /* Timed wrlock: returns 1 on success, 0 on timeout. timeout<0 = infinite. */
374 1           static inline int sync_rwlock_wrlock_timed(SyncHeader *hdr, double timeout) {
375 1 50         if (sync_rwlock_try_wrlock(hdr)) return 1;
376 0 0         if (timeout == 0) return 0;
377              
378 0           uint32_t *lock = &hdr->value;
379 0           uint32_t *w = &hdr->waiters;
380 0           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
381             struct timespec deadline, remaining;
382 0           int has_deadline = (timeout > 0);
383 0 0         if (has_deadline) sync_make_deadline(timeout, &deadline);
384              
385 0           for (int spin = 0; ; spin++) {
386 0           uint32_t expected = 0;
387 0 0         if (__atomic_compare_exchange_n(lock, &expected, mypid,
388             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
389 0           return 1;
390 0 0         if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
391 0           sync_spin_pause();
392 0           continue;
393             }
394 0           __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
395 0           uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
396 0 0         if (cur != 0) {
397 0           struct timespec *pts = NULL;
398 0 0         if (has_deadline) {
399 0 0         if (!sync_remaining_time(&deadline, &remaining)) {
400 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
401 0           return 0;
402             }
403 0           pts = &remaining;
404             } else {
405 0           pts = (struct timespec *)&sync_lock_timeout;
406             }
407 0           long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
408 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
409 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
410 0 0         if (!has_deadline) {
411 0           uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
412 0 0         if (val >= SYNC_RWLOCK_WRITER_BIT) {
413 0           uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
414 0 0         if (!sync_pid_alive(pid))
415 0           sync_recover_stale_rwlock(hdr, val);
416             }
417             }
418 0           spin = 0;
419 0           continue;
420             }
421             }
422 0           __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
423 0           spin = 0;
424             }
425             }
426              
427 4           static inline int sync_rwlock_try_wrlock(SyncHeader *hdr) {
428 4           uint32_t expected = 0;
429 4           uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
430 4           return __atomic_compare_exchange_n(&hdr->value, &expected, mypid,
431             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
432             }
433              
434 6           static inline void sync_rwlock_wrunlock(SyncHeader *hdr) {
435 6           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
436 6 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
437 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
438 6           }
439              
440             /* Downgrade: atomically convert wrlock to rdlock (writer -> 1 reader) */
441 1           static inline void sync_rwlock_downgrade(SyncHeader *hdr) {
442 1           __atomic_store_n(&hdr->value, 1, __ATOMIC_RELEASE);
443 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
444 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
445 1           }
446              
447             /* ================================================================
448             * Semaphore operations
449             *
450             * value = current count (0..param where param=max)
451             * CAS-based acquire/release, futex wait when 0
452             * ================================================================ */
453              
454 29           static inline int sync_sem_try_acquire(SyncHandle *h) {
455 29           SyncHeader *hdr = h->hdr;
456 0           for (;;) {
457 29           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
458 53 100         if (cur == 0) return 0;
459 24 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
460             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
461 24           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
462 24           return 1;
463             }
464             }
465             }
466              
467 10           static inline int sync_sem_try_acquire_n(SyncHandle *h, uint32_t n) {
468 10 100         if (n == 0) return 1;
469 9           SyncHeader *hdr = h->hdr;
470 0           for (;;) {
471 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
472 14 100         if (cur < n) return 0;
473 5 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
474             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
475 5           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
476 5           return 1;
477             }
478             }
479             }
480              
481 3           static inline int sync_sem_acquire_n(SyncHandle *h, uint32_t n, double timeout) {
482 3 50         if (n == 0) return 1;
483 3 100         if (sync_sem_try_acquire_n(h, n)) return 1;
484 1 50         if (timeout == 0) return 0;
485              
486 1           SyncHeader *hdr = h->hdr;
487             struct timespec deadline, remaining;
488 1           int has_deadline = (timeout > 0);
489 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
490              
491 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
492              
493 0           for (;;) {
494 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
495 1 50         if (cur >= n) {
496 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
497             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
498 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
499 1           return 1;
500             }
501 0           continue;
502             }
503              
504 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
505              
506 1           struct timespec *pts = NULL;
507 1 50         if (has_deadline) {
508 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
509 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
510 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
511 0           return 0;
512             }
513 1           pts = &remaining;
514             }
515              
516 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, cur, pts, NULL, 0);
517 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
518              
519 1 50         if (sync_sem_try_acquire_n(h, n)) return 1;
520              
521 1 50         if (has_deadline) {
522 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
523 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
524 1           return 0;
525             }
526             }
527             }
528             }
529              
530 4           static inline int sync_sem_acquire(SyncHandle *h, double timeout) {
531 4 100         if (sync_sem_try_acquire(h)) return 1;
532 2 100         if (timeout == 0) return 0;
533              
534 1           SyncHeader *hdr = h->hdr;
535             struct timespec deadline, remaining;
536 1           int has_deadline = (timeout > 0);
537 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
538              
539 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
540              
541 0           for (;;) {
542 1           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
543 1 50         if (cur > 0) {
544 0 0         if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
545             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
546 0           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
547 1           return 1;
548             }
549 0           continue;
550             }
551              
552 1           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
553              
554 1           struct timespec *pts = NULL;
555 1 50         if (has_deadline) {
556 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
557 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
558 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
559 0           return 0;
560             }
561 1           pts = &remaining;
562             }
563              
564 1           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, 0, pts, NULL, 0);
565 1           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELEASE);
566              
567             /* Retry acquire after wakeup */
568 1 50         if (sync_sem_try_acquire(h)) return 1;
569              
570 1 50         if (has_deadline) {
571 1 50         if (!sync_remaining_time(&deadline, &remaining)) {
572 1           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
573 1           return 0;
574             }
575             }
576             }
577             }
578              
579 7           static inline void sync_sem_release(SyncHandle *h) {
580 7           SyncHeader *hdr = h->hdr;
581 7           uint32_t max_val = hdr->param;
582 0           for (;;) {
583 7           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
584 7           uint32_t next = cur + 1;
585 7 100         if (next > max_val) next = max_val; /* clamp at max */
586 7 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
587             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
588 7           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
589 7 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
590 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
591 7           return;
592             }
593             }
594             }
595              
596 10           static inline void sync_sem_release_n(SyncHandle *h, uint32_t n) {
597 10 100         if (n == 0) return;
598 9           SyncHeader *hdr = h->hdr;
599 9           uint32_t max_val = hdr->param;
600 0           for (;;) {
601 9           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
602 9 100         uint32_t next = (n > max_val - cur) ? max_val : cur + n;
603 9 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, next,
604             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
605 9           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
606 9 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) {
607 0           uint32_t wake = n < (uint32_t)INT_MAX ? n : INT_MAX;
608 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, wake, NULL, NULL, 0);
609             }
610 9           return;
611             }
612             }
613             }
614              
615             /* Drain: acquire all available permits at once, return count acquired */
616 4           static inline uint32_t sync_sem_drain(SyncHandle *h) {
617 4           SyncHeader *hdr = h->hdr;
618 0           for (;;) {
619 4           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
620 7 100         if (cur == 0) return 0;
621 3 50         if (__atomic_compare_exchange_n(&hdr->value, &cur, 0,
622             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
623 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
624 3           return cur;
625             }
626             }
627             }
628              
629 33           static inline uint32_t sync_sem_value(SyncHandle *h) {
630 33           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
631             }
632              
633             /* ================================================================
634             * Barrier operations
635             *
636             * param = number of parties
637             * value = arrived count (0..param)
638             * generation = increments each time barrier trips
639             * ================================================================ */
640              
641 8           static inline int sync_barrier_wait(SyncHandle *h, double timeout) {
642 8           SyncHeader *hdr = h->hdr;
643 8           uint32_t parties = hdr->param;
644              
645 8 100         if (timeout == 0) return -1; /* non-blocking probe: can't rendezvous instantly */
646              
647 7           uint32_t gen = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
648 7           uint32_t arrived = __atomic_add_fetch(&hdr->value, 1, __ATOMIC_ACQ_REL);
649              
650 7 50         if (arrived == parties) {
651             /* Last to arrive — trip the barrier */
652 0           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
653 0           __atomic_add_fetch(&hdr->generation, 1, __ATOMIC_RELEASE);
654 0           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
655             /* Wake all waiters */
656 0 0         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
657 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
658 0           return 1; /* leader */
659             }
660              
661             /* Not last — wait for generation to change */
662 7           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
663              
664             struct timespec deadline, remaining;
665 7           int has_deadline = (timeout > 0);
666 7 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
667              
668 7           for (;;) {
669 14           uint32_t cur_gen = __atomic_load_n(&hdr->generation, __ATOMIC_ACQUIRE);
670 14 100         if (cur_gen != gen) return 0; /* barrier tripped */
671              
672 9           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
673              
674 9           struct timespec *pts = NULL;
675 9 50         if (has_deadline) {
676 9 100         if (!sync_remaining_time(&deadline, &remaining)) {
677 2           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
678 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
679             /* Break the barrier — reset arrived before bumping
680             * generation so new arrivals see value=0. CAS on
681             * generation ensures only one process does the reset. */
682 2           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
683 2           uint32_t cur_g = gen;
684 2           if (__atomic_compare_exchange_n(&hdr->generation, &cur_g,
685 2 50         gen + 1, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
686 2           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
687             }
688 2           return -1; /* timeout */
689             }
690 7           pts = &remaining;
691             }
692              
693 7           syscall(SYS_futex, &hdr->generation, FUTEX_WAIT, gen, pts, NULL, 0);
694 7           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
695             }
696             }
697              
698 6           static inline uint32_t sync_barrier_generation(SyncHandle *h) {
699 6           return __atomic_load_n(&h->hdr->generation, __ATOMIC_RELAXED);
700             }
701              
702 4           static inline uint32_t sync_barrier_arrived(SyncHandle *h) {
703 4           return __atomic_load_n(&h->hdr->value, __ATOMIC_RELAXED);
704             }
705              
706 1           static inline void sync_barrier_reset(SyncHandle *h) {
707 1           SyncHeader *hdr = h->hdr;
708 1           __atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
709 1           __atomic_add_fetch(&hdr->generation, 1, __ATOMIC_RELEASE);
710 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
711 0           syscall(SYS_futex, &hdr->generation, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
712 1           }
713              
714             /* ================================================================
715             * Condvar operations
716             *
717             * Uses the internal mutex (hdr->mutex) to protect the predicate.
718             * value = signal counter (futex word)
719             * generation = broadcast epoch
720             * ================================================================ */
721              
722 12           static inline void sync_condvar_lock(SyncHandle *h) {
723 12           sync_mutex_lock(h->hdr);
724 12           __atomic_add_fetch(&h->hdr->stat_acquires, 1, __ATOMIC_RELAXED);
725 12           }
726              
727 13           static inline void sync_condvar_unlock(SyncHandle *h) {
728 13           sync_mutex_unlock(h->hdr);
729 13           __atomic_add_fetch(&h->hdr->stat_releases, 1, __ATOMIC_RELAXED);
730 13           }
731              
732 1           static inline int sync_condvar_try_lock(SyncHandle *h) {
733 1           SyncHeader *hdr = h->hdr;
734 1           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
735 1           uint32_t expected = 0;
736 1 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
737             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
738 1           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
739 1           return 1;
740             }
741 0           return 0;
742             }
743              
744             /* Wait: atomically unlock mutex, wait on futex, re-lock mutex.
745             * Returns 1 on signal/broadcast, 0 on timeout. */
746 4           static inline int sync_condvar_wait(SyncHandle *h, double timeout) {
747 4           SyncHeader *hdr = h->hdr;
748              
749 4 100         if (timeout == 0) return 0; /* non-blocking: no wait */
750              
751 3           uint32_t seq = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
752              
753 3           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
754              
755 3           sync_mutex_unlock(hdr);
756              
757             struct timespec deadline, remaining;
758 3           int has_deadline = (timeout > 0);
759 3 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
760              
761 3           int signaled = 0;
762 0           for (;;) {
763 3           uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
764 3 50         if (cur != seq) { signaled = 1; break; }
765              
766 3           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
767              
768 3           struct timespec *pts = NULL;
769 3 50         if (has_deadline) {
770 3 50         if (!sync_remaining_time(&deadline, &remaining)) {
771 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
772 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
773 0           break;
774             }
775 3           pts = &remaining;
776             }
777              
778 3           long rc = syscall(SYS_futex, &hdr->value, FUTEX_WAIT, seq, pts, NULL, 0);
779 3           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
780              
781 3           cur = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
782 3 100         if (cur != seq) { signaled = 1; break; }
783              
784 2 50         if (rc == -1 && errno == ETIMEDOUT && has_deadline) {
    50          
    50          
785 2           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
786 2           break;
787             }
788             }
789              
790 3           sync_mutex_lock(hdr);
791 3           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
792 3           return signaled;
793             }
794              
795 1           static inline void sync_condvar_signal(SyncHandle *h) {
796 1           SyncHeader *hdr = h->hdr;
797 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
798 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
799 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
800 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, 1, NULL, NULL, 0);
801 1           }
802              
803 1           static inline void sync_condvar_broadcast(SyncHandle *h) {
804 1           SyncHeader *hdr = h->hdr;
805 1           __atomic_add_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
806 1           __atomic_add_fetch(&hdr->stat_signals, 1, __ATOMIC_RELAXED);
807 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
808 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
809 1           }
810              
811             /* ================================================================
812             * Once operations
813             *
814             * value states: 0=INIT, (SYNC_MUTEX_WRITER_BIT|pid)=RUNNING, 1=DONE
815             * ================================================================ */
816              
817             #define SYNC_ONCE_INIT 0
818             #define SYNC_ONCE_DONE 1
819             /* RUNNING = SYNC_MUTEX_WRITER_BIT | pid */
820              
821 8           static inline int sync_once_is_done(SyncHandle *h) {
822 8           return __atomic_load_n(&h->hdr->value, __ATOMIC_ACQUIRE) == SYNC_ONCE_DONE;
823             }
824              
825             /* Try to become the initializer. Returns:
826             * 1 = you are the initializer, call once_done() when finished
827             * 0 = already done
828             * -1 = another process is initializing (wait with once_wait) */
829 8           static inline int sync_once_try(SyncHandle *h) {
830 8           SyncHeader *hdr = h->hdr;
831 8           uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
832              
833 8           uint32_t expected = SYNC_ONCE_INIT;
834 8 100         if (__atomic_compare_exchange_n(&hdr->value, &expected, mypid,
835             0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
836 4           __atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
837 4           return 1;
838             }
839 4 100         if (expected == SYNC_ONCE_DONE) return 0;
840 3           return -1;
841             }
842              
843             /* Call/wait combo: try to become initializer, or wait for completion.
844             * Returns 1 if caller is the initializer, 0 if already done or waited. */
845 6           static inline int sync_once_enter(SyncHandle *h, double timeout) {
846 6           SyncHeader *hdr = h->hdr;
847              
848             /* Non-blocking probe: just try, don't wait */
849 6           int r = sync_once_try(h);
850 6 100         if (r == 1) return 1;
851 3 100         if (r == 0) return 0;
852 2 100         if (timeout == 0) return 0;
853              
854             struct timespec deadline, remaining;
855 1           int has_deadline = (timeout > 0);
856 1 50         if (has_deadline) sync_make_deadline(timeout, &deadline);
857              
858 1           __atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
859              
860 1           for (;;) {
861 2           r = sync_once_try(h);
862 2 100         if (r == 1) return 1; /* caller is initializer */
863 1 50         if (r == 0) return 0; /* already done */
864              
865             /* r == -1: someone else is running. Wait or detect stale. */
866 1           uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
867 1 50         if (val == SYNC_ONCE_DONE) return 0;
868 2 50         if (val == SYNC_ONCE_INIT) continue; /* race: was reset, retry */
869              
870             /* Check stale initializer */
871 1 50         if (val >= SYNC_MUTEX_WRITER_BIT) {
872 1           uint32_t pid = val & SYNC_MUTEX_PID_MASK;
873 1 50         if (!sync_pid_alive(pid)) {
874 1 50         if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
875             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
876 1           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
877 1 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
878 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
879             }
880 1           continue;
881             }
882             }
883              
884 0           __atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
885              
886 0           struct timespec *pts = NULL;
887 0 0         if (has_deadline) {
888 0 0         if (!sync_remaining_time(&deadline, &remaining)) {
889 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
890 0           __atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
891 0           return 0;
892             }
893 0           pts = &remaining;
894             }
895              
896 0           syscall(SYS_futex, &hdr->value, FUTEX_WAIT, val, pts, NULL, 0);
897 0           __atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
898             }
899             }
900              
901 4           static inline void sync_once_done(SyncHandle *h) {
902 4           SyncHeader *hdr = h->hdr;
903 4           __atomic_store_n(&hdr->value, SYNC_ONCE_DONE, __ATOMIC_RELEASE);
904 4           __atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
905 4 100         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
906 1           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
907 4           }
908              
909 2           static inline void sync_once_reset(SyncHandle *h) {
910 2           SyncHeader *hdr = h->hdr;
911 2           __atomic_store_n(&hdr->value, SYNC_ONCE_INIT, __ATOMIC_RELEASE);
912 2 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
913 0           syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
914 2           }
915              
916             /* ================================================================
917             * Create / Open / Close
918             * ================================================================ */
919              
920             #define SYNC_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, SYNC_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
921              
922 73           static SyncHandle *sync_create(const char *path, uint32_t type, uint32_t param,
923             uint32_t initial, char *errbuf) {
924 73 50         if (errbuf) errbuf[0] = '\0';
925              
926 73 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
927 73 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    100          
    50          
928 72 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    100          
    50          
929 70 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    100          
    50          
930              
931 68           uint64_t total_size = sizeof(SyncHeader);
932 68           int anonymous = (path == NULL);
933             size_t map_size;
934             void *base;
935              
936 68 100         if (anonymous) {
937 52           map_size = (size_t)total_size;
938 52           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
939             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
940 52 50         if (base == MAP_FAILED) {
941 0 0         SYNC_ERR("mmap(anonymous): %s", strerror(errno));
942 0           return NULL;
943             }
944 52           SyncHeader *hdr = (SyncHeader *)base;
945 52           memset(hdr, 0, sizeof(SyncHeader));
946 52           hdr->magic = SYNC_MAGIC;
947 52           hdr->version = SYNC_VERSION;
948 52           hdr->type = type;
949 52           hdr->param = param;
950 52           hdr->total_size = total_size;
951 52 100         if (type == SYNC_TYPE_SEMAPHORE)
952 17           hdr->value = initial;
953 52           __atomic_thread_fence(__ATOMIC_SEQ_CST);
954 52           goto setup_handle;
955             } else {
956 16           int fd = open(path, O_RDWR | O_CREAT, 0666);
957 19 50         if (fd < 0) { SYNC_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
958              
959 16 50         if (flock(fd, LOCK_EX) < 0) {
960 0 0         SYNC_ERR("flock(%s): %s", path, strerror(errno));
961 0           close(fd); return NULL;
962             }
963              
964             struct stat st;
965 16 50         if (fstat(fd, &st) < 0) {
966 0 0         SYNC_ERR("fstat(%s): %s", path, strerror(errno));
967 0           flock(fd, LOCK_UN); close(fd); return NULL;
968             }
969              
970 16           int is_new = (st.st_size == 0);
971              
972 16 100         if (!is_new && (uint64_t)st.st_size < sizeof(SyncHeader)) {
    50          
973 0 0         SYNC_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
974 0           flock(fd, LOCK_UN); close(fd); return NULL;
975             }
976              
977 16 100         if (is_new) {
978 7 50         if (ftruncate(fd, (off_t)total_size) < 0) {
979 0 0         SYNC_ERR("ftruncate(%s): %s", path, strerror(errno));
980 0           flock(fd, LOCK_UN); close(fd); return NULL;
981             }
982             }
983              
984 16 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
985 16           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
986 16 50         if (base == MAP_FAILED) {
987 0 0         SYNC_ERR("mmap(%s): %s", path, strerror(errno));
988 0           flock(fd, LOCK_UN); close(fd); return NULL;
989             }
990              
991 16           SyncHeader *hdr = (SyncHeader *)base;
992              
993 16 100         if (!is_new) {
994 27           int valid = (hdr->magic == SYNC_MAGIC &&
995 9 50         hdr->version == SYNC_VERSION &&
996 24 50         hdr->type == type &&
    100          
997 6 50         hdr->total_size == (uint64_t)st.st_size);
998 9 100         if (!valid) {
999 3 50         SYNC_ERR("%s: invalid or incompatible sync file", path);
1000 3           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
1001             }
1002 6           flock(fd, LOCK_UN);
1003 6           close(fd);
1004 6           goto setup_handle;
1005             }
1006              
1007             /* Initialize while holding the flock */
1008 7           memset(base, 0, sizeof(SyncHeader));
1009 7           hdr->magic = SYNC_MAGIC;
1010 7           hdr->version = SYNC_VERSION;
1011 7           hdr->type = type;
1012 7           hdr->param = param;
1013 7           hdr->total_size = total_size;
1014 7 100         if (type == SYNC_TYPE_SEMAPHORE)
1015 3           hdr->value = initial;
1016 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1017              
1018 7           flock(fd, LOCK_UN);
1019 7           close(fd);
1020             } /* end file-backed */
1021              
1022 65           setup_handle:;
1023             {
1024 65           SyncHeader *hdr = (SyncHeader *)base;
1025 65           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1026 65 50         if (!h) { munmap(base, map_size); return NULL; }
1027              
1028 65           h->hdr = hdr;
1029 65           h->mmap_size = map_size;
1030 65 100         h->path = path ? strdup(path) : NULL;
1031 65           h->notify_fd = -1;
1032 65           h->backing_fd = -1;
1033              
1034 65           return h;
1035             }
1036             }
1037              
1038 6           static SyncHandle *sync_create_memfd(const char *name, uint32_t type,
1039             uint32_t param, uint32_t initial,
1040             char *errbuf) {
1041 6 50         if (errbuf) errbuf[0] = '\0';
1042              
1043 6 50         if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
    0          
1044 6 100         if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
    50          
    0          
1045 6 100         if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
    50          
    0          
1046 6 100         if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
    50          
    0          
1047              
1048 6           uint64_t total_size = sizeof(SyncHeader);
1049              
1050 6 50         int fd = memfd_create(name ? name : "sync", MFD_CLOEXEC);
1051 6 50         if (fd < 0) { SYNC_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
1052              
1053 6 50         if (ftruncate(fd, (off_t)total_size) < 0) {
1054 0 0         SYNC_ERR("ftruncate(memfd): %s", strerror(errno));
1055 0           close(fd); return NULL;
1056             }
1057              
1058 6           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1059 6 50         if (base == MAP_FAILED) {
1060 0 0         SYNC_ERR("mmap(memfd): %s", strerror(errno));
1061 0           close(fd); return NULL;
1062             }
1063              
1064 6           SyncHeader *hdr = (SyncHeader *)base;
1065 6           memset(hdr, 0, sizeof(SyncHeader));
1066 6           hdr->magic = SYNC_MAGIC;
1067 6           hdr->version = SYNC_VERSION;
1068 6           hdr->type = type;
1069 6           hdr->param = param;
1070 6           hdr->total_size = total_size;
1071              
1072 6 100         if (type == SYNC_TYPE_SEMAPHORE)
1073 2           hdr->value = initial;
1074              
1075 6           __atomic_thread_fence(__ATOMIC_SEQ_CST);
1076              
1077 6           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1078 6 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
1079              
1080 6           h->hdr = hdr;
1081 6           h->mmap_size = (size_t)total_size;
1082 6           h->path = NULL;
1083 6           h->notify_fd = -1;
1084 6           h->backing_fd = fd;
1085              
1086 6           return h;
1087             }
1088              
1089 5           static SyncHandle *sync_open_fd(int fd, uint32_t type, char *errbuf) {
1090 5 50         if (errbuf) errbuf[0] = '\0';
1091              
1092             struct stat st;
1093 5 100         if (fstat(fd, &st) < 0) {
1094 1 50         SYNC_ERR("fstat(fd=%d): %s", fd, strerror(errno));
1095 1           return NULL;
1096             }
1097              
1098 4 50         if ((uint64_t)st.st_size < sizeof(SyncHeader)) {
1099 0 0         SYNC_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
1100 0           return NULL;
1101             }
1102              
1103 4           size_t map_size = (size_t)st.st_size;
1104 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1105 4 50         if (base == MAP_FAILED) {
1106 0 0         SYNC_ERR("mmap(fd=%d): %s", fd, strerror(errno));
1107 0           return NULL;
1108             }
1109              
1110 4           SyncHeader *hdr = (SyncHeader *)base;
1111 12           int valid = (hdr->magic == SYNC_MAGIC &&
1112 4 50         hdr->version == SYNC_VERSION &&
1113 11 50         hdr->type == type &&
    100          
1114 3 50         hdr->total_size == (uint64_t)st.st_size);
1115 4 100         if (!valid) {
1116 1 50         SYNC_ERR("fd %d: invalid or incompatible sync", fd);
1117 1           munmap(base, map_size);
1118 1           return NULL;
1119             }
1120              
1121 3           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
1122 3 50         if (myfd < 0) {
1123 0 0         SYNC_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
1124 0           munmap(base, map_size);
1125 0           return NULL;
1126             }
1127              
1128 3           SyncHandle *h = (SyncHandle *)calloc(1, sizeof(SyncHandle));
1129 3 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
1130              
1131 3           h->hdr = hdr;
1132 3           h->mmap_size = map_size;
1133 3           h->path = NULL;
1134 3           h->notify_fd = -1;
1135 3           h->backing_fd = myfd;
1136              
1137 3           return h;
1138             }
1139              
1140 74           static void sync_destroy(SyncHandle *h) {
1141 74 50         if (!h) return;
1142 74 100         if (h->notify_fd >= 0) close(h->notify_fd);
1143 74 100         if (h->backing_fd >= 0) close(h->backing_fd);
1144 74 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
1145 74           free(h->path);
1146 74           free(h);
1147             }
1148              
1149             /* ================================================================
1150             * Eventfd integration
1151             * ================================================================ */
1152              
1153 11           static int sync_create_eventfd(SyncHandle *h) {
1154 11 50         if (h->notify_fd >= 0) close(h->notify_fd);
1155 11           int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1156 11 50         if (efd < 0) return -1;
1157 11           h->notify_fd = efd;
1158 11           return efd;
1159             }
1160              
1161 10           static int sync_notify(SyncHandle *h) {
1162 10 50         if (h->notify_fd < 0) return 0;
1163 10           uint64_t val = 1;
1164 10           return write(h->notify_fd, &val, sizeof(val)) == sizeof(val);
1165             }
1166              
1167 11           static int64_t sync_eventfd_consume(SyncHandle *h) {
1168 11 50         if (h->notify_fd < 0) return -1;
1169 11           uint64_t val = 0;
1170 11 100         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
1171 6           return (int64_t)val;
1172             }
1173              
1174             /* ================================================================
1175             * Misc
1176             * ================================================================ */
1177              
1178 0           static void sync_msync(SyncHandle *h) {
1179 0           msync(h->hdr, h->mmap_size, MS_SYNC);
1180 0           }
1181              
1182             #endif /* SYNC_H */