File Coverage

pubsub.h
Criterion Covered Total %
statement 370 452 81.8
branch 239 436 54.8
condition n/a
subroutine n/a
pod n/a
total 609 888 68.5


line stmt bran cond sub pod time code
1             /*
2             * pubsub.h -- Shared-memory broadcast pub/sub for Linux
3             *
4             * Two variants:
5             * Int -- lock-free MPMC publish, lock-free subscribe (int64 values)
6             * Str -- mutex-protected publish, lock-free subscribe (variable-length
7             * byte strings up to msg_size, stored in a circular arena)
8             *
9             * Ring buffer broadcast: publishers write, each subscriber independently
10             * reads with its own cursor. Messages are never consumed -- the ring
11             * overwrites old data when it wraps. Subscribers auto-recover from
12             * overflow by resetting to the oldest available position.
13             *
14             * File-backed mmap(MAP_SHARED) for cross-process sharing,
15             * futex for blocking poll, PID-based stale lock recovery (Str mode).
16             */
17              
18             #ifndef PUBSUB_H
19             #define PUBSUB_H
20              
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37              
38             /* ================================================================
39             * Constants
40             * ================================================================ */
41              
42             #define PUBSUB_MAGIC 0x50534231U /* "PSB1" */
43             #define PUBSUB_VERSION 1
44             #define PUBSUB_MODE_INT 0
45             #define PUBSUB_MODE_STR 1
46             #define PUBSUB_MODE_INT32 2
47             #define PUBSUB_MODE_INT16 3
48             #define PUBSUB_ERR_BUFLEN 256
49             #define PUBSUB_SPIN_LIMIT 32
50             #define PUBSUB_LOCK_TIMEOUT_SEC 2
51             #define PUBSUB_DEFAULT_MSG_SIZE 256
52             #define PUBSUB_STR_UTF8_FLAG 0x80000000U
53             #define PUBSUB_STR_LEN_MASK 0x7FFFFFFFU
54             #define PUBSUB_POLL_RETRIES 8
55              
56             /* ================================================================
57             * Header (256 bytes = 4 cache lines)
58             * ================================================================ */
59              
60             typedef struct {
61             /* ---- Cache line 0 (0-63): immutable after create ---- */
62             uint32_t magic; /* 0 */
63             uint32_t version; /* 4 */
64             uint32_t mode; /* 8 */
65             uint32_t capacity; /* 12 */
66             uint64_t total_size; /* 16 */
67             uint64_t slots_off; /* 24 */
68             uint64_t data_off; /* 32: str: offset to arena; int: 0 */
69             uint32_t msg_size; /* 40: str: max bytes per message; int: 0 */
70             uint32_t _reserved0; /* 44 */
71             uint64_t arena_cap; /* 48: str: arena byte capacity; int: 0 */
72             uint8_t _pad0[8]; /* 56-63 */
73              
74             /* ---- Cache line 1 (64-127): writer hot ---- */
75             uint64_t write_pos; /* 64 */
76             uint32_t mutex; /* 72: str: futex mutex */
77             uint32_t mutex_waiters; /* 76 */
78             uint32_t arena_wpos; /* 80: str: next write position in arena */
79             uint8_t _pad1[44]; /* 84-127 */
80              
81             /* ---- Cache line 2 (128-191): subscriber notification ---- */
82             uint32_t sub_futex; /* 128 */
83             uint32_t sub_waiters; /* 132 */
84             uint8_t _pad2[56]; /* 136-191 */
85              
86             /* ---- Cache line 3 (192-255): stats ---- */
87             uint64_t stat_publish_ok; /* 192 */
88             uint64_t stat_recoveries; /* 200 */
89             uint8_t _pad3[48]; /* 208-255 */
90             } PubSubHeader;
91              
92             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
93             _Static_assert(sizeof(PubSubHeader) == 256, "PubSubHeader must be 256 bytes");
94             #endif
95              
96             /* ================================================================
97             * Slot types
98             * ================================================================ */
99              
100             typedef struct {
101             uint64_t sequence;
102             int64_t value;
103             } PubSubIntSlot; /* 16 bytes */
104              
105             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
106             typedef struct {
107             uint32_t sequence;
108             int32_t value;
109             } PubSubInt32Slot; /* 8 bytes */
110              
111             typedef struct {
112             uint32_t sequence;
113             int16_t value;
114             int16_t _pad;
115             } PubSubInt16Slot; /* 8 bytes */
116              
117             typedef struct {
118             uint64_t sequence;
119             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
120             uint32_t arena_off; /* offset into data arena */
121             } PubSubStrSlot; /* 16 bytes */
122              
123             /* ================================================================
124             * Process-local handles
125             * ================================================================ */
126              
127             typedef struct {
128             PubSubHeader *hdr;
129             void *slots;
130             char *data; /* NULL for int mode */
131             size_t mmap_size;
132             uint32_t capacity;
133             uint32_t cap_mask;
134             uint32_t msg_size;
135             uint64_t arena_cap;
136             char *path;
137             int notify_fd;
138             int backing_fd;
139             } PubSubHandle;
140              
141             typedef struct {
142             PubSubHeader *hdr;
143             void *slots;
144             char *data;
145             uint64_t cursor;
146             uint32_t capacity;
147             uint32_t cap_mask;
148             uint32_t msg_size;
149             char *copy_buf;
150             uint32_t copy_buf_cap;
151             uint64_t overflow_count;
152             int notify_fd;
153             void *userdata;
154             } PubSubSub;
155              
156             /* ================================================================
157             * Utility
158             * ================================================================ */
159              
160 142           static inline uint32_t pubsub_next_pow2(uint32_t v) {
161 142 50         if (v < 2) return 2;
162 142 50         if (v > 0x80000000U) return 0;
163 142           v--;
164 142           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
165 142           return v + 1;
166             }
167              
168 0           static inline void pubsub_spin_pause(void) {
169             #if defined(__x86_64__) || defined(__i386__)
170 0           __asm__ volatile("pause" ::: "memory");
171             #elif defined(__aarch64__)
172             __asm__ volatile("yield" ::: "memory");
173             #else
174             __asm__ volatile("" ::: "memory");
175             #endif
176 0           }
177              
178 114           static inline int pubsub_ensure_copy_buf(PubSubSub *sub, uint32_t needed) {
179 114 100         if (needed <= sub->copy_buf_cap) return 1;
180 34 100         uint32_t ns = sub->copy_buf_cap ? sub->copy_buf_cap : 64;
181 45 100         while (ns < needed) {
182 11           uint32_t n2 = ns * 2;
183 11 50         if (n2 <= ns) { ns = needed; break; }
184 11           ns = n2;
185             }
186 34           char *nb = (char *)realloc(sub->copy_buf, ns);
187 34 50         if (!nb) return 0;
188 34           sub->copy_buf = nb;
189 34           sub->copy_buf_cap = ns;
190 34           return 1;
191             }
192              
193             /* ================================================================
194             * Futex helpers
195             * ================================================================ */
196              
197             #define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
198             #define PUBSUB_MUTEX_PID_MASK 0x7FFFFFFFU
199             #define PUBSUB_MUTEX_VAL(pid) (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))
200              
201 0           static inline int pubsub_pid_alive(uint32_t pid) {
202 0 0         if (pid == 0) return 1;
203 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
204             }
205              
206             static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };
207              
208 0           static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
209 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
210             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
211 0           return;
212 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
213 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
214 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
215             }
216              
217 190           static inline void pubsub_mutex_lock(PubSubHeader *hdr) {
218 190           uint32_t mypid = PUBSUB_MUTEX_VAL((uint32_t)getpid());
219 190           for (int spin = 0; ; spin++) {
220 190           uint32_t expected = 0;
221 190 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
222             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
223 190           return;
224 0 0         if (__builtin_expect(spin < PUBSUB_SPIN_LIMIT, 1)) {
225 0           pubsub_spin_pause();
226 0           continue;
227             }
228 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
229 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
230 0 0         if (cur != 0) {
231 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
232             &pubsub_lock_timeout, NULL, 0);
233 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
234 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
235 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
236 0 0         if (val >= PUBSUB_MUTEX_WRITER_BIT) {
237 0           uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
238 0 0         if (!pubsub_pid_alive(pid))
239 0           pubsub_recover_stale_mutex(hdr, val);
240             }
241 0           spin = 0;
242 0           continue;
243             }
244             }
245 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
246 0           spin = 0;
247             }
248             }
249              
250 190           static inline void pubsub_mutex_unlock(PubSubHeader *hdr) {
251 190           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
252 190 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
253 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
254 190           }
255              
256 769           static inline void pubsub_wake_subscribers(PubSubHeader *hdr) {
257             /* SEQ_CST fence pairs with consumer's SEQ_CST waiters increment in
258             * poll_wait. Without it, the RELAXED load below may be reordered
259             * before the prior RELEASE store of the slot sequence on weak-memory
260             * architectures, letting us observe waiters == 0 even when a consumer
261             * has already incremented it after publishing the data. */
262 769           __atomic_thread_fence(__ATOMIC_SEQ_CST);
263 769 100         if (__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED) > 0) {
264 97           __atomic_add_fetch(&hdr->sub_futex, 1, __ATOMIC_RELEASE);
265 97           syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
266             }
267 769           }
268              
269 11           static inline int pubsub_remaining_time(const struct timespec *deadline,
270             struct timespec *remaining) {
271             struct timespec now;
272 11           clock_gettime(CLOCK_MONOTONIC, &now);
273 11           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
274 11           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
275 11 100         if (remaining->tv_nsec < 0) {
276 7           remaining->tv_sec--;
277 7           remaining->tv_nsec += 1000000000L;
278             }
279 11           return remaining->tv_sec >= 0;
280             }
281              
282 11           static inline void pubsub_make_deadline(double timeout, struct timespec *deadline) {
283 11           clock_gettime(CLOCK_MONOTONIC, deadline);
284 11           deadline->tv_sec += (time_t)timeout;
285 11           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
286 11 100         if (deadline->tv_nsec >= 1000000000L) {
287 1           deadline->tv_sec++;
288 1           deadline->tv_nsec -= 1000000000L;
289             }
290 11           }
291              
292             /* ================================================================
293             * Header validation
294             * ================================================================ */
295              
296 20           static inline int pubsub_validate_header(PubSubHeader *hdr, uint32_t mode,
297             uint64_t file_size) {
298 20 50         if (hdr->magic != PUBSUB_MAGIC ||
299 20 50         hdr->version != PUBSUB_VERSION ||
300 20 100         hdr->mode != mode ||
301 16 50         hdr->capacity == 0 ||
302 16 50         (hdr->capacity & (hdr->capacity - 1)) != 0 ||
303 16 50         hdr->total_size != file_size ||
304 16 50         hdr->slots_off != sizeof(PubSubHeader))
305 4           return 0;
306             /* Slot array must fit within file. */
307 16           uint64_t slot_size = (mode == PUBSUB_MODE_INT) ? sizeof(PubSubIntSlot)
308 22 100         : (mode == PUBSUB_MODE_INT32) ? sizeof(PubSubInt32Slot)
309 11 100         : (mode == PUBSUB_MODE_INT16) ? sizeof(PubSubInt16Slot)
310 5 100         : sizeof(PubSubStrSlot);
311 16 50         if (hdr->capacity > (hdr->total_size - hdr->slots_off) / slot_size)
312 0           return 0;
313 16 100         if (mode == PUBSUB_MODE_STR) {
314 4 50         if (hdr->data_off == 0 || hdr->msg_size == 0 || hdr->arena_cap == 0)
    50          
    50          
315 0           return 0;
316 4           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
317 4 50         if (hdr->data_off < slots_end ||
318 4 50         hdr->data_off + hdr->arena_cap > hdr->total_size)
319 0           return 0;
320             }
321 16           return 1;
322             }
323              
324             /* ================================================================
325             * Create / Open / Close
326             * ================================================================ */
327              
328             #define PUBSUB_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, PUBSUB_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
329              
330 142           static PubSubHandle *pubsub_init_handle(void *base, size_t map_size,
331             uint32_t mode, const char *path) {
332 142           PubSubHeader *hdr = (PubSubHeader *)base;
333 142           PubSubHandle *h = (PubSubHandle *)calloc(1, sizeof(PubSubHandle));
334 142 50         if (!h) return NULL;
335              
336 142           h->hdr = hdr;
337 142           h->slots = (char *)base + hdr->slots_off;
338 142 100         h->data = (mode == PUBSUB_MODE_STR) ? (char *)base + hdr->data_off : NULL;
339 142           h->mmap_size = map_size;
340 142           h->capacity = hdr->capacity;
341 142           h->cap_mask = hdr->capacity - 1;
342 142           h->msg_size = hdr->msg_size;
343 142           h->arena_cap = hdr->arena_cap;
344 142 100         h->path = path ? strdup(path) : NULL;
345 142           h->notify_fd = -1;
346 142           h->backing_fd = -1;
347              
348 142           return h;
349             }
350              
351 126           static void pubsub_init_header(void *base, uint32_t mode, uint32_t cap,
352             uint64_t total_size, uint64_t slots_off,
353             uint64_t data_off, uint32_t msg_size,
354             uint64_t arena_cap) {
355 126           PubSubHeader *hdr = (PubSubHeader *)base;
356 126           memset(hdr, 0, sizeof(PubSubHeader));
357 126           hdr->magic = PUBSUB_MAGIC;
358 126           hdr->version = PUBSUB_VERSION;
359 126           hdr->mode = mode;
360 126           hdr->capacity = cap;
361 126           hdr->total_size = total_size;
362 126           hdr->slots_off = slots_off;
363 126           hdr->data_off = data_off;
364 126           hdr->msg_size = msg_size;
365 126           hdr->arena_cap = arena_cap;
366 126           }
367              
368             /* Returns 1 on success, 0 if the requested capacity * msg_size would
369             * exceed the 32-bit arena addressing limit (out_arena_cap is set to 0). */
370 142           static int pubsub_calc_layout(uint32_t cap, uint32_t mode, uint32_t msg_size,
371             uint64_t *out_slots_off, uint64_t *out_data_off,
372             uint64_t *out_arena_cap, uint64_t *out_total_size) {
373 142           uint64_t slots_off = sizeof(PubSubHeader);
374 142           uint64_t slot_size = (mode == PUBSUB_MODE_INT) ? sizeof(PubSubIntSlot)
375 217 100         : (mode == PUBSUB_MODE_INT32) ? sizeof(PubSubInt32Slot)
376 137 100         : (mode == PUBSUB_MODE_INT16) ? sizeof(PubSubInt16Slot)
377 62 100         : sizeof(PubSubStrSlot);
378 142           uint64_t data_off = 0, arena_cap = 0, total_size;
379              
380 142 100         if (mode == PUBSUB_MODE_STR) {
381 50           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
382 50           data_off = (slots_end + 63) & ~(uint64_t)63;
383 50           arena_cap = (uint64_t)cap * ((uint64_t)msg_size + 8);
384             /* Safety invariant: in any window of `capacity` consecutive publishes
385             * (one full slot-ring wrap), the arena must not wrap. Otherwise a
386             * publish to slot K could overwrite slot N's still-current arena
387             * region without bumping slot N's sequence -- the seqlock would
388             * not catch the corruption. arena_cap = capacity*(msg_size+8)
389             * guarantees this; silently capping to UINT32_MAX would break the
390             * invariant for extreme (capacity, msg_size) combinations. */
391 50 50         if (arena_cap > UINT32_MAX) {
392 0           *out_arena_cap = 0;
393 0           return 0;
394             }
395 50           total_size = data_off + arena_cap;
396             } else {
397 92           total_size = slots_off + (uint64_t)cap * slot_size;
398             }
399              
400 142           *out_slots_off = slots_off;
401 142           *out_data_off = data_off;
402 142           *out_arena_cap = arena_cap;
403 142           *out_total_size = total_size;
404 142           return 1;
405             }
406              
407 138           static PubSubHandle *pubsub_create(const char *path, uint32_t capacity,
408             uint32_t mode, uint32_t msg_size,
409             char *errbuf) {
410 138 50         if (errbuf) errbuf[0] = '\0';
411              
412 138           uint32_t cap = pubsub_next_pow2(capacity);
413 138 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
414 138 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
415              
416 138 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    100          
417 35           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
418              
419             uint64_t slots_off, data_off, arena_cap, total_size;
420 138 50         if (!pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size)) {
421 0 0         PUBSUB_ERR("capacity * (msg_size+8) exceeds 4GB arena limit");
422 0           return NULL;
423             }
424              
425 138           int anonymous = (path == NULL);
426             size_t map_size;
427             void *base;
428              
429 138 100         if (anonymous) {
430 87           map_size = (size_t)total_size;
431 87           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
432             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
433 87 50         if (base == MAP_FAILED) {
434 0 0         PUBSUB_ERR("mmap(anonymous): %s", strerror(errno));
435 0           return NULL;
436             }
437 87           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
438             msg_size, arena_cap);
439             } else {
440 51           int fd = open(path, O_RDWR | O_CREAT, 0666);
441 67 50         if (fd < 0) { PUBSUB_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
442              
443 51 50         if (flock(fd, LOCK_EX) < 0) {
444 0 0         PUBSUB_ERR("flock(%s): %s", path, strerror(errno));
445 0           close(fd); return NULL;
446             }
447              
448             struct stat st;
449 51 50         if (fstat(fd, &st) < 0) {
450 0 0         PUBSUB_ERR("fstat(%s): %s", path, strerror(errno));
451 0           flock(fd, LOCK_UN); close(fd); return NULL;
452             }
453              
454 51           int is_new = (st.st_size == 0);
455              
456 51 100         if (!is_new && (uint64_t)st.st_size < sizeof(PubSubHeader)) {
    50          
457 0 0         PUBSUB_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
458 0           flock(fd, LOCK_UN); close(fd); return NULL;
459             }
460              
461 51 100         if (is_new) {
462 35 50         if (ftruncate(fd, (off_t)total_size) < 0) {
463 0 0         PUBSUB_ERR("ftruncate(%s): %s", path, strerror(errno));
464 0           flock(fd, LOCK_UN); close(fd); return NULL;
465             }
466             }
467              
468 51 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
469 51           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
470 51 50         if (base == MAP_FAILED) {
471 0 0         PUBSUB_ERR("mmap(%s): %s", path, strerror(errno));
472 0           flock(fd, LOCK_UN); close(fd); return NULL;
473             }
474              
475 51 100         if (!is_new) {
476 16 100         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
477 4 50         PUBSUB_ERR("%s: invalid or incompatible pubsub file", path);
478 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
479             }
480 12           flock(fd, LOCK_UN);
481 12           close(fd);
482 12           return pubsub_init_handle(base, map_size, mode, path);
483             }
484              
485 35           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
486             msg_size, arena_cap);
487 35           flock(fd, LOCK_UN);
488 35           close(fd);
489             }
490              
491 122           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, path);
492 122 50         if (!h) { munmap(base, (size_t)total_size); return NULL; }
493 122           return h;
494             }
495              
496 4           static PubSubHandle *pubsub_create_memfd(const char *name, uint32_t capacity,
497             uint32_t mode, uint32_t msg_size,
498             char *errbuf) {
499 4 50         if (errbuf) errbuf[0] = '\0';
500              
501 4           uint32_t cap = pubsub_next_pow2(capacity);
502 4 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
503 4 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
504              
505 4 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    50          
506 1           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
507              
508             uint64_t slots_off, data_off, arena_cap, total_size;
509 4 50         if (!pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size)) {
510 0 0         PUBSUB_ERR("capacity * (msg_size+8) exceeds 4GB arena limit");
511 0           return NULL;
512             }
513              
514 4 50         int fd = memfd_create(name ? name : "pubsub", MFD_CLOEXEC | MFD_ALLOW_SEALING);
515 4 50         if (fd < 0) { PUBSUB_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
516              
517 4 50         if (ftruncate(fd, (off_t)total_size) < 0) {
518 0 0         PUBSUB_ERR("ftruncate(memfd): %s", strerror(errno));
519 0           close(fd); return NULL;
520             }
521 4           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
522              
523 4           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
524 4 50         if (base == MAP_FAILED) {
525 0 0         PUBSUB_ERR("mmap(memfd): %s", strerror(errno));
526 0           close(fd); return NULL;
527             }
528              
529 4           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
530             msg_size, arena_cap);
531              
532 4           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, NULL);
533 4 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
534 4           h->backing_fd = fd;
535 4           return h;
536             }
537              
538 4           static PubSubHandle *pubsub_open_fd(int fd, uint32_t mode, char *errbuf) {
539 4 50         if (errbuf) errbuf[0] = '\0';
540              
541             struct stat st;
542 4 50         if (fstat(fd, &st) < 0) {
543 0 0         PUBSUB_ERR("fstat(fd=%d): %s", fd, strerror(errno));
544 0           return NULL;
545             }
546              
547 4 50         if ((uint64_t)st.st_size < sizeof(PubSubHeader)) {
548 0 0         PUBSUB_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
549 0           return NULL;
550             }
551              
552 4           size_t map_size = (size_t)st.st_size;
553 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
554 4 50         if (base == MAP_FAILED) {
555 0 0         PUBSUB_ERR("mmap(fd=%d): %s", fd, strerror(errno));
556 0           return NULL;
557             }
558              
559 4 50         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
560 0 0         PUBSUB_ERR("fd %d: invalid or incompatible pubsub", fd);
561 0           munmap(base, map_size);
562 0           return NULL;
563             }
564              
565 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
566 4 50         if (myfd < 0) {
567 0 0         PUBSUB_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
568 0           munmap(base, map_size);
569 0           return NULL;
570             }
571              
572 4           PubSubHandle *h = pubsub_init_handle(base, map_size, mode, NULL);
573 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
574 4           h->backing_fd = myfd;
575 4           return h;
576             }
577              
578 142           static void pubsub_destroy(PubSubHandle *h) {
579 142 50         if (!h) return;
580 142 100         if (h->notify_fd >= 0) close(h->notify_fd);
581 142 100         if (h->backing_fd >= 0) close(h->backing_fd);
582 142 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
583 142           free(h->path);
584 142           free(h);
585             }
586              
587             /* ================================================================
588             * Subscribe
589             * ================================================================ */
590              
591 104           static PubSubSub *pubsub_subscribe(PubSubHandle *h, int from_oldest) {
592 104           PubSubSub *sub = (PubSubSub *)calloc(1, sizeof(PubSubSub));
593 104 50         if (!sub) return NULL;
594              
595 104           sub->hdr = h->hdr;
596 104           sub->slots = h->slots;
597 104           sub->data = h->data;
598 104           sub->capacity = h->capacity;
599 104           sub->cap_mask = h->cap_mask;
600 104           sub->msg_size = h->msg_size;
601              
602 104           sub->notify_fd = h->notify_fd;
603              
604 104           uint64_t wp = __atomic_load_n(&h->hdr->write_pos, __ATOMIC_ACQUIRE);
605 104 100         if (from_oldest && wp > h->capacity)
    100          
606 1           sub->cursor = wp - h->capacity;
607 103 100         else if (from_oldest)
608 63           sub->cursor = 0;
609             else
610 40           sub->cursor = wp;
611              
612 104           return sub;
613             }
614              
615 104           static void pubsub_sub_destroy(PubSubSub *sub) {
616 104 50         if (!sub) return;
617 104           free(sub->copy_buf);
618 104           free(sub);
619             }
620              
621             /* ================================================================
622             * Common: lag (shared between Int and Str)
623             * ================================================================ */
624              
625 32           static inline uint64_t pubsub_lag(PubSubSub *sub) {
626 32           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
627 32 100         return (wp > sub->cursor) ? (wp - sub->cursor) : 0;
628             }
629              
630             /* ================================================================
631             * Int publish/poll macro template
632             *
633             * DEFINE_INT_PUBSUB(prefix, SlotType, ValType, SeqType, DiffType)
634             * generates: pubsub__publish, _publish_multi, _poll, _poll_wait
635             * ================================================================ */
636              
637             #define DEFINE_INT_PUBSUB(PFX, SLOT, VTYPE, STYPE, DTYPE) \
638             \
639             static inline int pubsub_##PFX##_publish(PubSubHandle *h, VTYPE value) { \
640             PubSubHeader *hdr = h->hdr; \
641             SLOT *slots = (SLOT *)h->slots; \
642             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, 1, __ATOMIC_RELAXED); \
643             uint32_t idx = pos & h->cap_mask; \
644             slots[idx].value = value; \
645             __atomic_store_n(&slots[idx].sequence, (STYPE)(pos + 1), __ATOMIC_RELEASE);\
646             __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED); \
647             pubsub_wake_subscribers(hdr); \
648             return 1; \
649             } \
650             \
651             static inline uint32_t pubsub_##PFX##_publish_multi(PubSubHandle *h, \
652             const VTYPE *values, uint32_t count) { \
653             PubSubHeader *hdr = h->hdr; \
654             SLOT *slots = (SLOT *)h->slots; \
655             uint32_t mask = h->cap_mask; \
656             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, count, __ATOMIC_RELAXED);\
657             for (uint32_t i = 0; i < count; i++) { \
658             uint32_t idx = (pos + i) & mask; \
659             slots[idx].value = values[i]; \
660             __atomic_store_n(&slots[idx].sequence, \
661             (STYPE)(pos + i + 1), __ATOMIC_RELEASE); \
662             } \
663             __atomic_add_fetch(&hdr->stat_publish_ok, count, __ATOMIC_RELAXED); \
664             pubsub_wake_subscribers(hdr); \
665             return count; \
666             } \
667             \
668             static inline int pubsub_##PFX##_poll(PubSubSub *sub, VTYPE *value) { \
669             PubSubHeader *hdr = sub->hdr; \
670             SLOT *slots = (SLOT *)sub->slots; \
671             for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) { \
672             uint64_t cursor = sub->cursor; \
673             uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE); \
674             if (cursor >= wp) return 0; \
675             if (wp - cursor > sub->capacity) { \
676             sub->overflow_count += wp - cursor - sub->capacity; \
677             sub->cursor = wp - sub->capacity; \
678             continue; \
679             } \
680             uint32_t idx = cursor & sub->cap_mask; \
681             SLOT *slot = &slots[idx]; \
682             STYPE seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
683             DTYPE diff = (DTYPE)seq1 - (DTYPE)(STYPE)(cursor + 1); \
684             if (diff != 0) { \
685             if (diff > 0) { \
686             uint64_t nc = wp > sub->capacity ? wp - sub->capacity : 0; \
687             if (nc > cursor) sub->overflow_count += nc - cursor; \
688             sub->cursor = nc; \
689             continue; \
690             } \
691             return 0; \
692             } \
693             VTYPE v = slot->value; \
694             STYPE seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
695             if (seq2 != seq1) continue; \
696             *value = v; \
697             sub->cursor = cursor + 1; \
698             return 1; \
699             } \
700             return 0; \
701             } \
702             \
703             static int pubsub_##PFX##_poll_wait(PubSubSub *sub, VTYPE *value, \
704             double timeout) { \
705             int r = pubsub_##PFX##_poll(sub, value); \
706             if (r != 0) return r; \
707             if (timeout == 0.0) return 0; \
708             PubSubHeader *hdr = sub->hdr; \
709             struct timespec deadline, remaining; \
710             int has_deadline = (timeout > 0); \
711             if (has_deadline) pubsub_make_deadline(timeout, &deadline); \
712             for (;;) { \
713             /* Increment waiters BEFORE loading fseq/polling. SEQ_CST pairs \
714             * with publisher's SEQ_CST fence in pubsub_wake_subscribers so a \
715             * publisher that races our poll() either sees waiters > 0 and \
716             * wakes us, or publishes data we observe in the post-increment \
717             * poll. Without this ordering, we could sleep forever on an \
718             * unchanged fseq while data sits in the ring. */ \
719             __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
720             uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE); \
721             r = pubsub_##PFX##_poll(sub, value); \
722             if (r != 0) { \
723             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
724             return r; \
725             } \
726             struct timespec *pts = NULL; \
727             if (has_deadline) { \
728             if (!pubsub_remaining_time(&deadline, &remaining)) { \
729             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
730             return 0; \
731             } \
732             pts = &remaining; \
733             } \
734             long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT, \
735             fseq, pts, NULL, 0); \
736             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
737             r = pubsub_##PFX##_poll(sub, value); \
738             if (r != 0) return r; \
739             if (rc == -1 && errno == ETIMEDOUT) return 0; \
740             } \
741             }
742              
743             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
744 1914 100         DEFINE_INT_PUBSUB(int, PubSubIntSlot, int64_t, uint64_t, int64_t)
  17 50          
  1216 50          
  212 50          
  469 50          
    50          
    100          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
745              
746             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
747 95 100         DEFINE_INT_PUBSUB(int32, PubSubInt32Slot, int32_t, uint32_t, int32_t)
  3 50          
  38 50          
  4 50          
  50 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
748              
749             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
750 94 100         DEFINE_INT_PUBSUB(int16, PubSubInt16Slot, int16_t, uint32_t, int32_t)
  3 50          
  38 50          
  4 50          
  49 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
751              
752             /* ================================================================
753             * Str: mutex-protected publish, lock-free subscribe
754             *
755             * Variable-length messages stored in a circular arena. Each slot
756             * records the arena offset; the seqlock (sequence double-check)
757             * guarantees readers see consistent data.
758             * ================================================================ */
759              
760             /* Publish one Str message while mutex is already held (no lock/wake). */
761 197           static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
762             uint32_t len, bool utf8) {
763 197 50         if (len > PUBSUB_STR_LEN_MASK) return -1;
764 197 100         if (len > h->msg_size) return -1;
765              
766 196           PubSubHeader *hdr = h->hdr;
767 196           PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;
768              
769 196           uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
770 196           uint32_t idx = pos & h->cap_mask;
771 196           PubSubStrSlot *slot = &slots[idx];
772              
773 196           __atomic_store_n(&slot->sequence, 0, __ATOMIC_RELAXED);
774 196           __atomic_thread_fence(__ATOMIC_RELEASE);
775              
776 196           uint32_t alloc = (len + 7) & ~7u;
777 196 100         if (alloc == 0) alloc = 8;
778 196 50         if (alloc > h->arena_cap) return -1;
779 196           uint32_t apos = __atomic_load_n(&hdr->arena_wpos, __ATOMIC_RELAXED);
780 196 100         if ((uint64_t)apos + alloc > h->arena_cap)
781 1           apos = 0;
782              
783 196           memcpy(h->data + apos, str, len);
784              
785 196           slot->arena_off = apos;
786 196 100         slot->packed_len = len | (utf8 ? PUBSUB_STR_UTF8_FLAG : 0);
787              
788 196           __atomic_store_n(&hdr->arena_wpos, apos + alloc, __ATOMIC_RELAXED);
789              
790 196           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
791 196           __atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
792 196           __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);
793              
794 196           return 1;
795             }
796              
797 186           static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
798             uint32_t len, bool utf8) {
799 186 100         if (len > h->msg_size) return -1;
800 184           pubsub_mutex_lock(h->hdr);
801 184           int r = pubsub_str_publish_locked(h, str, len, utf8);
802 184           pubsub_mutex_unlock(h->hdr);
803 184 50         if (r == 1) pubsub_wake_subscribers(h->hdr);
804 184           return r;
805             }
806              
807             /* Returns: 1 = success, 0 = empty/not-ready */
808 132           static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
809             uint32_t *out_len, bool *out_utf8) {
810 132           PubSubHeader *hdr = sub->hdr;
811 132           PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;
812              
813 134 50         for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) {
814 134           uint64_t cursor = sub->cursor;
815 134           uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE);
816              
817 134 100         if (cursor >= wp) return 0;
818              
819 116 100         if (wp - cursor > sub->capacity) {
820 2           sub->overflow_count += wp - cursor - sub->capacity;
821 2           sub->cursor = wp - sub->capacity;
822 2           continue;
823             }
824              
825 114           uint32_t idx = cursor & sub->cap_mask;
826 114           PubSubStrSlot *slot = &slots[idx];
827              
828 114           uint64_t seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
829 114 50         if (seq1 != cursor + 1) {
830 0 0         if (seq1 > cursor + 1) {
831 0 0         uint64_t new_cursor = wp > sub->capacity ? wp - sub->capacity : 0;
832 0 0         if (new_cursor > cursor)
833 0           sub->overflow_count += new_cursor - cursor;
834 0           sub->cursor = new_cursor;
835 0           continue;
836             }
837 0           return 0;
838             }
839              
840 114           uint32_t plen = slot->packed_len;
841 114           uint32_t aoff = slot->arena_off;
842 114           uint32_t len = plen & PUBSUB_STR_LEN_MASK;
843 114           bool utf8 = (plen & PUBSUB_STR_UTF8_FLAG) != 0;
844              
845             /* Safety: if metadata looks corrupted, retry */
846 114 50         if (len > sub->msg_size) continue;
847 114 50         if ((uint64_t)aoff + len > sub->hdr->arena_cap) continue;
848              
849 114 50         if (!pubsub_ensure_copy_buf(sub, len + 1)) return 0;
850              
851 114 100         if (len > 0)
852 111           memcpy(sub->copy_buf, sub->data + aoff, len);
853 114           sub->copy_buf[len] = '\0';
854              
855 114           uint64_t seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
856 114 50         if (seq2 != seq1) continue;
857              
858 114           *out_str = sub->copy_buf;
859 114           *out_len = len;
860 114           *out_utf8 = utf8;
861 114           sub->cursor = cursor + 1;
862 114           return 1;
863             }
864 0           return 0;
865             }
866              
867 55           static int pubsub_str_poll_wait(PubSubSub *sub, const char **out_str,
868             uint32_t *out_len, bool *out_utf8,
869             double timeout) {
870 55           int r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
871 55 100         if (r != 0) return r;
872 3 50         if (timeout == 0.0) return 0;
873              
874 3           PubSubHeader *hdr = sub->hdr;
875             struct timespec deadline, remaining;
876 3           int has_deadline = (timeout > 0);
877 3 50         if (has_deadline) pubsub_make_deadline(timeout, &deadline);
878              
879 0           for (;;) {
880             /* See pubsub_int_poll_wait above for the SEQ_CST ordering rationale. */
881 3           __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);
882 3           uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE);
883 3           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
884 3 50         if (r != 0) {
885 0           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);
886 0           return r;
887             }
888              
889 3           struct timespec *pts = NULL;
890 3 50         if (has_deadline) {
891 3 50         if (!pubsub_remaining_time(&deadline, &remaining)) {
892 0           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);
893 0           return 0;
894             }
895 3           pts = &remaining;
896             }
897 3           long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT,
898             fseq, pts, NULL, 0);
899 3           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);
900              
901 3           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
902 3 100         if (r != 0) return r;
903 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
904             }
905             }
906              
907             /* ================================================================
908             * Common operations
909             * ================================================================ */
910              
911 8           static void pubsub_clear(PubSubHandle *h) {
912 8           PubSubHeader *hdr = h->hdr;
913 8 100         if (hdr->mode == PUBSUB_MODE_STR)
914 2           pubsub_mutex_lock(hdr);
915              
916 8           __atomic_store_n(&hdr->write_pos, 0, __ATOMIC_RELAXED);
917 8           __atomic_store_n(&hdr->stat_publish_ok, 0, __ATOMIC_RELAXED);
918 8 100         if (hdr->mode == PUBSUB_MODE_STR)
919 2           __atomic_store_n(&hdr->arena_wpos, 0, __ATOMIC_RELAXED);
920              
921             /* Zero all slot sequences */
922 8           uint32_t cap = h->capacity;
923 8 100         if (hdr->mode == PUBSUB_MODE_INT) {
924 4           PubSubIntSlot *s = (PubSubIntSlot *)h->slots;
925 260 100         for (uint32_t i = 0; i < cap; i++)
926 256           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
927 4 100         } else if (hdr->mode == PUBSUB_MODE_INT32) {
928 1           PubSubInt32Slot *s = (PubSubInt32Slot *)h->slots;
929 65 100         for (uint32_t i = 0; i < cap; i++)
930 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
931 3 100         } else if (hdr->mode == PUBSUB_MODE_INT16) {
932 1           PubSubInt16Slot *s = (PubSubInt16Slot *)h->slots;
933 65 100         for (uint32_t i = 0; i < cap; i++)
934 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
935             } else {
936 2           PubSubStrSlot *s = (PubSubStrSlot *)h->slots;
937 82 100         for (uint32_t i = 0; i < cap; i++)
938 80           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
939             }
940              
941 8 100         if (hdr->mode == PUBSUB_MODE_STR)
942 2           pubsub_mutex_unlock(hdr);
943 8           pubsub_wake_subscribers(hdr);
944 8           }
945              
946 7           static inline int pubsub_sync(PubSubHandle *h) {
947 7           return msync(h->hdr, h->mmap_size, MS_SYNC);
948             }
949              
950 13           static inline int pubsub_eventfd_create(PubSubHandle *h) {
951 13 50         if (h->notify_fd >= 0) return h->notify_fd;
952 13           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
953 13           return h->notify_fd;
954             }
955              
956 1           static inline void pubsub_eventfd_set(PubSubHandle *h, int fd) {
957 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    50          
958 0           close(h->notify_fd);
959 1           h->notify_fd = fd;
960 1           }
961              
962 8           static inline void pubsub_notify(PubSubHandle *h) {
963 8 50         if (h->notify_fd >= 0) {
964 8           uint64_t one = 1;
965 8           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
966             }
967 8           }
968              
969 6           static inline int64_t pubsub_eventfd_consume(PubSubHandle *h) {
970 6 50         if (h->notify_fd < 0) return -1;
971 6           uint64_t val = 0;
972 6 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
973 6           return (int64_t)val;
974             }
975              
976 4           static inline void pubsub_sub_eventfd_consume(PubSubSub *sub) {
977 4 50         if (sub->notify_fd >= 0) {
978             uint64_t val;
979 4           ssize_t __attribute__((unused)) rc = read(sub->notify_fd, &val, sizeof(val));
980             }
981 4           }
982              
983             #endif /* PUBSUB_H */