File Coverage

pubsub.h
Criterion Covered Total %
statement 367 443 82.8
branch 236 428 55.1
condition n/a
subroutine n/a
pod n/a
total 603 871 69.2


line stmt bran cond sub pod time code
1             /*
2             * pubsub.h -- Shared-memory broadcast pub/sub for Linux
3             *
4             * Two variants:
5             * Int -- lock-free MPMC publish, lock-free subscribe (int64 values)
6             * Str -- mutex-protected publish, lock-free subscribe (variable-length
7             * byte strings up to msg_size, stored in a circular arena)
8             *
9             * Ring buffer broadcast: publishers write, each subscriber independently
10             * reads with its own cursor. Messages are never consumed -- the ring
11             * overwrites old data when it wraps. Subscribers auto-recover from
12             * overflow by resetting to the oldest available position.
13             *
14             * File-backed mmap(MAP_SHARED) for cross-process sharing,
15             * futex for blocking poll, PID-based stale lock recovery (Str mode).
16             */
17              
18             #ifndef PUBSUB_H
19             #define PUBSUB_H
20              
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37              
38             /* ================================================================
39             * Constants
40             * ================================================================ */
41              
42             #define PUBSUB_MAGIC 0x50534231U /* "PSB1" */
43             #define PUBSUB_VERSION 1
44             #define PUBSUB_MODE_INT 0
45             #define PUBSUB_MODE_STR 1
46             #define PUBSUB_MODE_INT32 2
47             #define PUBSUB_MODE_INT16 3
48             #define PUBSUB_ERR_BUFLEN 256
49             #define PUBSUB_SPIN_LIMIT 32
50             #define PUBSUB_LOCK_TIMEOUT_SEC 2
51             #define PUBSUB_DEFAULT_MSG_SIZE 256
52             #define PUBSUB_STR_UTF8_FLAG 0x80000000U
53             #define PUBSUB_STR_LEN_MASK 0x7FFFFFFFU
54             #define PUBSUB_POLL_RETRIES 8
55              
56             /* ================================================================
57             * Header (256 bytes = 4 cache lines)
58             * ================================================================ */
59              
60             typedef struct {
61             /* ---- Cache line 0 (0-63): immutable after create ---- */
62             uint32_t magic; /* 0 */
63             uint32_t version; /* 4 */
64             uint32_t mode; /* 8 */
65             uint32_t capacity; /* 12 */
66             uint64_t total_size; /* 16 */
67             uint64_t slots_off; /* 24 */
68             uint64_t data_off; /* 32: str: offset to arena; int: 0 */
69             uint32_t msg_size; /* 40: str: max bytes per message; int: 0 */
70             uint32_t _reserved0; /* 44 */
71             uint64_t arena_cap; /* 48: str: arena byte capacity; int: 0 */
72             uint8_t _pad0[8]; /* 56-63 */
73              
74             /* ---- Cache line 1 (64-127): writer hot ---- */
75             uint64_t write_pos; /* 64 */
76             uint32_t mutex; /* 72: str: futex mutex */
77             uint32_t mutex_waiters; /* 76 */
78             uint32_t arena_wpos; /* 80: str: next write position in arena */
79             uint8_t _pad1[44]; /* 84-127 */
80              
81             /* ---- Cache line 2 (128-191): subscriber notification ---- */
82             uint32_t sub_futex; /* 128 */
83             uint32_t sub_waiters; /* 132 */
84             uint8_t _pad2[56]; /* 136-191 */
85              
86             /* ---- Cache line 3 (192-255): stats ---- */
87             uint64_t stat_publish_ok; /* 192 */
88             uint64_t stat_recoveries; /* 200 */
89             uint8_t _pad3[48]; /* 208-255 */
90             } PubSubHeader;
91              
92             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
93             _Static_assert(sizeof(PubSubHeader) == 256, "PubSubHeader must be 256 bytes");
94             #endif
95              
96             /* ================================================================
97             * Slot types
98             * ================================================================ */
99              
100             typedef struct {
101             uint64_t sequence;
102             int64_t value;
103             } PubSubIntSlot; /* 16 bytes */
104              
105             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
106             typedef struct {
107             uint32_t sequence;
108             int32_t value;
109             } PubSubInt32Slot; /* 8 bytes */
110              
111             typedef struct {
112             uint32_t sequence;
113             int16_t value;
114             int16_t _pad;
115             } PubSubInt16Slot; /* 8 bytes */
116              
117             typedef struct {
118             uint64_t sequence;
119             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
120             uint32_t arena_off; /* offset into data arena */
121             } PubSubStrSlot; /* 16 bytes */
122              
123             /* ================================================================
124             * Process-local handles
125             * ================================================================ */
126              
127             typedef struct {
128             PubSubHeader *hdr;
129             void *slots;
130             char *data; /* NULL for int mode */
131             size_t mmap_size;
132             uint32_t capacity;
133             uint32_t cap_mask;
134             uint32_t msg_size;
135             uint64_t arena_cap;
136             char *path;
137             int notify_fd;
138             int backing_fd;
139             } PubSubHandle;
140              
141             typedef struct {
142             PubSubHeader *hdr;
143             void *slots;
144             char *data;
145             uint64_t cursor;
146             uint32_t capacity;
147             uint32_t cap_mask;
148             uint32_t msg_size;
149             char *copy_buf;
150             uint32_t copy_buf_cap;
151             uint64_t overflow_count;
152             int notify_fd;
153             void *userdata;
154             } PubSubSub;
155              
156             /* ================================================================
157             * Utility
158             * ================================================================ */
159              
160 142           static inline uint32_t pubsub_next_pow2(uint32_t v) {
161 142 50         if (v < 2) return 2;
162 142 50         if (v > 0x80000000U) return 0;
163 142           v--;
164 142           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
165 142           return v + 1;
166             }
167              
168 0           static inline void pubsub_spin_pause(void) {
169             #if defined(__x86_64__) || defined(__i386__)
170 0           __asm__ volatile("pause" ::: "memory");
171             #elif defined(__aarch64__)
172             __asm__ volatile("yield" ::: "memory");
173             #else
174             __asm__ volatile("" ::: "memory");
175             #endif
176 0           }
177              
178 114           static inline int pubsub_ensure_copy_buf(PubSubSub *sub, uint32_t needed) {
179 114 100         if (needed <= sub->copy_buf_cap) return 1;
180 34 100         uint32_t ns = sub->copy_buf_cap ? sub->copy_buf_cap : 64;
181 45 100         while (ns < needed) {
182 11           uint32_t n2 = ns * 2;
183 11 50         if (n2 <= ns) { ns = needed; break; }
184 11           ns = n2;
185             }
186 34           char *nb = (char *)realloc(sub->copy_buf, ns);
187 34 50         if (!nb) return 0;
188 34           sub->copy_buf = nb;
189 34           sub->copy_buf_cap = ns;
190 34           return 1;
191             }
192              
193             /* ================================================================
194             * Futex helpers
195             * ================================================================ */
196              
197             #define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
198             #define PUBSUB_MUTEX_PID_MASK 0x7FFFFFFFU
199             #define PUBSUB_MUTEX_VAL(pid) (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))
200              
201 0           static inline int pubsub_pid_alive(uint32_t pid) {
202 0 0         if (pid == 0) return 1;
203 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
204             }
205              
206             static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };
207              
208 0           static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
209 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
210             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
211 0           return;
212 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
213 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
214 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
215             }
216              
217 190           static inline void pubsub_mutex_lock(PubSubHeader *hdr) {
218 190           uint32_t mypid = PUBSUB_MUTEX_VAL((uint32_t)getpid());
219 190           for (int spin = 0; ; spin++) {
220 190           uint32_t expected = 0;
221 190 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
222             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
223 190           return;
224 0 0         if (__builtin_expect(spin < PUBSUB_SPIN_LIMIT, 1)) {
225 0           pubsub_spin_pause();
226 0           continue;
227             }
228 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
229 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
230 0 0         if (cur != 0) {
231 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
232             &pubsub_lock_timeout, NULL, 0);
233 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
234 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
235 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
236 0 0         if (val >= PUBSUB_MUTEX_WRITER_BIT) {
237 0           uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
238 0 0         if (!pubsub_pid_alive(pid))
239 0           pubsub_recover_stale_mutex(hdr, val);
240             }
241 0           spin = 0;
242 0           continue;
243             }
244             }
245 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
246 0           spin = 0;
247             }
248             }
249              
250 190           static inline void pubsub_mutex_unlock(PubSubHeader *hdr) {
251 190           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
252 190 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
253 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
254 190           }
255              
256 769           static inline void pubsub_wake_subscribers(PubSubHeader *hdr) {
257 769 100         if (__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED) > 0) {
258 97           __atomic_add_fetch(&hdr->sub_futex, 1, __ATOMIC_RELEASE);
259 97           syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
260             }
261 769           }
262              
263 11           static inline int pubsub_remaining_time(const struct timespec *deadline,
264             struct timespec *remaining) {
265             struct timespec now;
266 11           clock_gettime(CLOCK_MONOTONIC, &now);
267 11           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
268 11           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
269 11 100         if (remaining->tv_nsec < 0) {
270 6           remaining->tv_sec--;
271 6           remaining->tv_nsec += 1000000000L;
272             }
273 11           return remaining->tv_sec >= 0;
274             }
275              
276 11           static inline void pubsub_make_deadline(double timeout, struct timespec *deadline) {
277 11           clock_gettime(CLOCK_MONOTONIC, deadline);
278 11           deadline->tv_sec += (time_t)timeout;
279 11           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
280 11 50         if (deadline->tv_nsec >= 1000000000L) {
281 0           deadline->tv_sec++;
282 0           deadline->tv_nsec -= 1000000000L;
283             }
284 11           }
285              
286             /* ================================================================
287             * Header validation
288             * ================================================================ */
289              
290 20           static inline int pubsub_validate_header(PubSubHeader *hdr, uint32_t mode,
291             uint64_t file_size) {
292 20 50         if (hdr->magic != PUBSUB_MAGIC ||
293 20 50         hdr->version != PUBSUB_VERSION ||
294 20 100         hdr->mode != mode ||
295 16 50         hdr->capacity == 0 ||
296 16 50         (hdr->capacity & (hdr->capacity - 1)) != 0 ||
297 16 50         hdr->total_size != file_size ||
298 16 50         hdr->slots_off != sizeof(PubSubHeader))
299 4           return 0;
300             /* Slot array must fit within file. */
301 16           uint64_t slot_size = (mode == PUBSUB_MODE_INT) ? sizeof(PubSubIntSlot)
302 22 100         : (mode == PUBSUB_MODE_INT32) ? sizeof(PubSubInt32Slot)
303 11 100         : (mode == PUBSUB_MODE_INT16) ? sizeof(PubSubInt16Slot)
304 5 100         : sizeof(PubSubStrSlot);
305 16 50         if (hdr->capacity > (hdr->total_size - hdr->slots_off) / slot_size)
306 0           return 0;
307 16 100         if (mode == PUBSUB_MODE_STR) {
308 4 50         if (hdr->data_off == 0 || hdr->msg_size == 0 || hdr->arena_cap == 0)
    50          
    50          
309 0           return 0;
310 4           uint64_t slots_end = hdr->slots_off + (uint64_t)hdr->capacity * slot_size;
311 4 50         if (hdr->data_off < slots_end ||
312 4 50         hdr->data_off + hdr->arena_cap > hdr->total_size)
313 0           return 0;
314             }
315 16           return 1;
316             }
317              
318             /* ================================================================
319             * Create / Open / Close
320             * ================================================================ */
321              
322             #define PUBSUB_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, PUBSUB_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
323              
324 142           static PubSubHandle *pubsub_init_handle(void *base, size_t map_size,
325             uint32_t mode, const char *path) {
326 142           PubSubHeader *hdr = (PubSubHeader *)base;
327 142           PubSubHandle *h = (PubSubHandle *)calloc(1, sizeof(PubSubHandle));
328 142 50         if (!h) return NULL;
329              
330 142           h->hdr = hdr;
331 142           h->slots = (char *)base + hdr->slots_off;
332 142 100         h->data = (mode == PUBSUB_MODE_STR) ? (char *)base + hdr->data_off : NULL;
333 142           h->mmap_size = map_size;
334 142           h->capacity = hdr->capacity;
335 142           h->cap_mask = hdr->capacity - 1;
336 142           h->msg_size = hdr->msg_size;
337 142           h->arena_cap = hdr->arena_cap;
338 142 100         h->path = path ? strdup(path) : NULL;
339 142           h->notify_fd = -1;
340 142           h->backing_fd = -1;
341              
342 142           return h;
343             }
344              
345 126           static void pubsub_init_header(void *base, uint32_t mode, uint32_t cap,
346             uint64_t total_size, uint64_t slots_off,
347             uint64_t data_off, uint32_t msg_size,
348             uint64_t arena_cap) {
349 126           PubSubHeader *hdr = (PubSubHeader *)base;
350 126           memset(hdr, 0, sizeof(PubSubHeader));
351 126           hdr->magic = PUBSUB_MAGIC;
352 126           hdr->version = PUBSUB_VERSION;
353 126           hdr->mode = mode;
354 126           hdr->capacity = cap;
355 126           hdr->total_size = total_size;
356 126           hdr->slots_off = slots_off;
357 126           hdr->data_off = data_off;
358 126           hdr->msg_size = msg_size;
359 126           hdr->arena_cap = arena_cap;
360 126           }
361              
362 142           static void pubsub_calc_layout(uint32_t cap, uint32_t mode, uint32_t msg_size,
363             uint64_t *out_slots_off, uint64_t *out_data_off,
364             uint64_t *out_arena_cap, uint64_t *out_total_size) {
365 142           uint64_t slots_off = sizeof(PubSubHeader);
366 142           uint64_t slot_size = (mode == PUBSUB_MODE_INT) ? sizeof(PubSubIntSlot)
367 217 100         : (mode == PUBSUB_MODE_INT32) ? sizeof(PubSubInt32Slot)
368 137 100         : (mode == PUBSUB_MODE_INT16) ? sizeof(PubSubInt16Slot)
369 62 100         : sizeof(PubSubStrSlot);
370 142           uint64_t data_off = 0, arena_cap = 0, total_size;
371              
372 142 100         if (mode == PUBSUB_MODE_STR) {
373 50           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
374 50           data_off = (slots_end + 63) & ~(uint64_t)63;
375 50           arena_cap = (uint64_t)cap * ((uint64_t)msg_size + 8);
376 50 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
377 50           total_size = data_off + arena_cap;
378             } else {
379 92           total_size = slots_off + (uint64_t)cap * slot_size;
380             }
381              
382 142           *out_slots_off = slots_off;
383 142           *out_data_off = data_off;
384 142           *out_arena_cap = arena_cap;
385 142           *out_total_size = total_size;
386 142           }
387              
388 138           static PubSubHandle *pubsub_create(const char *path, uint32_t capacity,
389             uint32_t mode, uint32_t msg_size,
390             char *errbuf) {
391 138 50         if (errbuf) errbuf[0] = '\0';
392              
393 138           uint32_t cap = pubsub_next_pow2(capacity);
394 138 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
395 138 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
396              
397 138 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    100          
398 35           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
399              
400             uint64_t slots_off, data_off, arena_cap, total_size;
401 138           pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size);
402              
403 138           int anonymous = (path == NULL);
404             size_t map_size;
405             void *base;
406              
407 138 100         if (anonymous) {
408 87           map_size = (size_t)total_size;
409 87           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
410             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
411 87 50         if (base == MAP_FAILED) {
412 0 0         PUBSUB_ERR("mmap(anonymous): %s", strerror(errno));
413 0           return NULL;
414             }
415 87           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
416             msg_size, arena_cap);
417             } else {
418 51           int fd = open(path, O_RDWR | O_CREAT, 0666);
419 67 50         if (fd < 0) { PUBSUB_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
420              
421 51 50         if (flock(fd, LOCK_EX) < 0) {
422 0 0         PUBSUB_ERR("flock(%s): %s", path, strerror(errno));
423 0           close(fd); return NULL;
424             }
425              
426             struct stat st;
427 51 50         if (fstat(fd, &st) < 0) {
428 0 0         PUBSUB_ERR("fstat(%s): %s", path, strerror(errno));
429 0           flock(fd, LOCK_UN); close(fd); return NULL;
430             }
431              
432 51           int is_new = (st.st_size == 0);
433              
434 51 100         if (!is_new && (uint64_t)st.st_size < sizeof(PubSubHeader)) {
    50          
435 0 0         PUBSUB_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
436 0           flock(fd, LOCK_UN); close(fd); return NULL;
437             }
438              
439 51 100         if (is_new) {
440 35 50         if (ftruncate(fd, (off_t)total_size) < 0) {
441 0 0         PUBSUB_ERR("ftruncate(%s): %s", path, strerror(errno));
442 0           flock(fd, LOCK_UN); close(fd); return NULL;
443             }
444             }
445              
446 51 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
447 51           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
448 51 50         if (base == MAP_FAILED) {
449 0 0         PUBSUB_ERR("mmap(%s): %s", path, strerror(errno));
450 0           flock(fd, LOCK_UN); close(fd); return NULL;
451             }
452              
453 51 100         if (!is_new) {
454 16 100         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
455 4 50         PUBSUB_ERR("%s: invalid or incompatible pubsub file", path);
456 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
457             }
458 12           flock(fd, LOCK_UN);
459 12           close(fd);
460 12           return pubsub_init_handle(base, map_size, mode, path);
461             }
462              
463 35           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
464             msg_size, arena_cap);
465 35           flock(fd, LOCK_UN);
466 35           close(fd);
467             }
468              
469 122           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, path);
470 122 50         if (!h) { munmap(base, (size_t)total_size); return NULL; }
471 122           return h;
472             }
473              
474 4           static PubSubHandle *pubsub_create_memfd(const char *name, uint32_t capacity,
475             uint32_t mode, uint32_t msg_size,
476             char *errbuf) {
477 4 50         if (errbuf) errbuf[0] = '\0';
478              
479 4           uint32_t cap = pubsub_next_pow2(capacity);
480 4 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
481 4 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
482              
483 4 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    50          
484 1           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
485              
486             uint64_t slots_off, data_off, arena_cap, total_size;
487 4           pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size);
488              
489 4 50         int fd = memfd_create(name ? name : "pubsub", MFD_CLOEXEC | MFD_ALLOW_SEALING);
490 4 50         if (fd < 0) { PUBSUB_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
491              
492 4 50         if (ftruncate(fd, (off_t)total_size) < 0) {
493 0 0         PUBSUB_ERR("ftruncate(memfd): %s", strerror(errno));
494 0           close(fd); return NULL;
495             }
496 4           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
497              
498 4           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
499 4 50         if (base == MAP_FAILED) {
500 0 0         PUBSUB_ERR("mmap(memfd): %s", strerror(errno));
501 0           close(fd); return NULL;
502             }
503              
504 4           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
505             msg_size, arena_cap);
506              
507 4           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, NULL);
508 4 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
509 4           h->backing_fd = fd;
510 4           return h;
511             }
512              
513 4           static PubSubHandle *pubsub_open_fd(int fd, uint32_t mode, char *errbuf) {
514 4 50         if (errbuf) errbuf[0] = '\0';
515              
516             struct stat st;
517 4 50         if (fstat(fd, &st) < 0) {
518 0 0         PUBSUB_ERR("fstat(fd=%d): %s", fd, strerror(errno));
519 0           return NULL;
520             }
521              
522 4 50         if ((uint64_t)st.st_size < sizeof(PubSubHeader)) {
523 0 0         PUBSUB_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
524 0           return NULL;
525             }
526              
527 4           size_t map_size = (size_t)st.st_size;
528 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
529 4 50         if (base == MAP_FAILED) {
530 0 0         PUBSUB_ERR("mmap(fd=%d): %s", fd, strerror(errno));
531 0           return NULL;
532             }
533              
534 4 50         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
535 0 0         PUBSUB_ERR("fd %d: invalid or incompatible pubsub", fd);
536 0           munmap(base, map_size);
537 0           return NULL;
538             }
539              
540 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
541 4 50         if (myfd < 0) {
542 0 0         PUBSUB_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
543 0           munmap(base, map_size);
544 0           return NULL;
545             }
546              
547 4           PubSubHandle *h = pubsub_init_handle(base, map_size, mode, NULL);
548 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
549 4           h->backing_fd = myfd;
550 4           return h;
551             }
552              
553 142           static void pubsub_destroy(PubSubHandle *h) {
554 142 50         if (!h) return;
555 142 100         if (h->notify_fd >= 0) close(h->notify_fd);
556 142 100         if (h->backing_fd >= 0) close(h->backing_fd);
557 142 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
558 142           free(h->path);
559 142           free(h);
560             }
561              
562             /* ================================================================
563             * Subscribe
564             * ================================================================ */
565              
566 104           static PubSubSub *pubsub_subscribe(PubSubHandle *h, int from_oldest) {
567 104           PubSubSub *sub = (PubSubSub *)calloc(1, sizeof(PubSubSub));
568 104 50         if (!sub) return NULL;
569              
570 104           sub->hdr = h->hdr;
571 104           sub->slots = h->slots;
572 104           sub->data = h->data;
573 104           sub->capacity = h->capacity;
574 104           sub->cap_mask = h->cap_mask;
575 104           sub->msg_size = h->msg_size;
576              
577 104           sub->notify_fd = h->notify_fd;
578              
579 104           uint64_t wp = __atomic_load_n(&h->hdr->write_pos, __ATOMIC_ACQUIRE);
580 104 100         if (from_oldest && wp > h->capacity)
    100          
581 1           sub->cursor = wp - h->capacity;
582 103 100         else if (from_oldest)
583 63           sub->cursor = 0;
584             else
585 40           sub->cursor = wp;
586              
587 104           return sub;
588             }
589              
590 104           static void pubsub_sub_destroy(PubSubSub *sub) {
591 104 50         if (!sub) return;
592 104           free(sub->copy_buf);
593 104           free(sub);
594             }
595              
596             /* ================================================================
597             * Common: lag (shared between Int and Str)
598             * ================================================================ */
599              
600 32           static inline uint64_t pubsub_lag(PubSubSub *sub) {
601 32           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
602 32 100         return (wp > sub->cursor) ? (wp - sub->cursor) : 0;
603             }
604              
605             /* ================================================================
606             * Int publish/poll macro template
607             *
608             * DEFINE_INT_PUBSUB(prefix, SlotType, ValType, SeqType, DiffType)
609             * generates: pubsub__publish, _publish_multi, _poll, _poll_wait
610             * ================================================================ */
611              
612             #define DEFINE_INT_PUBSUB(PFX, SLOT, VTYPE, STYPE, DTYPE) \
613             \
614             static inline int pubsub_##PFX##_publish(PubSubHandle *h, VTYPE value) { \
615             PubSubHeader *hdr = h->hdr; \
616             SLOT *slots = (SLOT *)h->slots; \
617             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, 1, __ATOMIC_RELAXED); \
618             uint32_t idx = pos & h->cap_mask; \
619             slots[idx].value = value; \
620             __atomic_store_n(&slots[idx].sequence, (STYPE)(pos + 1), __ATOMIC_RELEASE);\
621             __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED); \
622             pubsub_wake_subscribers(hdr); \
623             return 1; \
624             } \
625             \
626             static inline uint32_t pubsub_##PFX##_publish_multi(PubSubHandle *h, \
627             const VTYPE *values, uint32_t count) { \
628             PubSubHeader *hdr = h->hdr; \
629             SLOT *slots = (SLOT *)h->slots; \
630             uint32_t mask = h->cap_mask; \
631             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, count, __ATOMIC_RELAXED);\
632             for (uint32_t i = 0; i < count; i++) { \
633             uint32_t idx = (pos + i) & mask; \
634             slots[idx].value = values[i]; \
635             __atomic_store_n(&slots[idx].sequence, \
636             (STYPE)(pos + i + 1), __ATOMIC_RELEASE); \
637             } \
638             __atomic_add_fetch(&hdr->stat_publish_ok, count, __ATOMIC_RELAXED); \
639             pubsub_wake_subscribers(hdr); \
640             return count; \
641             } \
642             \
643             static inline int pubsub_##PFX##_poll(PubSubSub *sub, VTYPE *value) { \
644             PubSubHeader *hdr = sub->hdr; \
645             SLOT *slots = (SLOT *)sub->slots; \
646             for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) { \
647             uint64_t cursor = sub->cursor; \
648             uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE); \
649             if (cursor >= wp) return 0; \
650             if (wp - cursor > sub->capacity) { \
651             sub->overflow_count += wp - cursor - sub->capacity; \
652             sub->cursor = wp - sub->capacity; \
653             continue; \
654             } \
655             uint32_t idx = cursor & sub->cap_mask; \
656             SLOT *slot = &slots[idx]; \
657             STYPE seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
658             DTYPE diff = (DTYPE)seq1 - (DTYPE)(STYPE)(cursor + 1); \
659             if (diff != 0) { \
660             if (diff > 0) { \
661             uint64_t nc = wp > sub->capacity ? wp - sub->capacity : 0; \
662             if (nc > cursor) sub->overflow_count += nc - cursor; \
663             sub->cursor = nc; \
664             continue; \
665             } \
666             return 0; \
667             } \
668             VTYPE v = slot->value; \
669             STYPE seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
670             if (seq2 != seq1) continue; \
671             *value = v; \
672             sub->cursor = cursor + 1; \
673             return 1; \
674             } \
675             return 0; \
676             } \
677             \
678             static int pubsub_##PFX##_poll_wait(PubSubSub *sub, VTYPE *value, \
679             double timeout) { \
680             int r = pubsub_##PFX##_poll(sub, value); \
681             if (r != 0) return r; \
682             if (timeout == 0.0) return 0; \
683             PubSubHeader *hdr = sub->hdr; \
684             struct timespec deadline, remaining; \
685             int has_deadline = (timeout > 0); \
686             if (has_deadline) pubsub_make_deadline(timeout, &deadline); \
687             for (;;) { \
688             uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE); \
689             r = pubsub_##PFX##_poll(sub, value); \
690             if (r != 0) return r; \
691             __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
692             struct timespec *pts = NULL; \
693             if (has_deadline) { \
694             if (!pubsub_remaining_time(&deadline, &remaining)) { \
695             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
696             return 0; \
697             } \
698             pts = &remaining; \
699             } \
700             long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT, \
701             fseq, pts, NULL, 0); \
702             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
703             r = pubsub_##PFX##_poll(sub, value); \
704             if (r != 0) return r; \
705             if (rc == -1 && errno == ETIMEDOUT) return 0; \
706             } \
707             }
708              
709             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
710 1914 100         DEFINE_INT_PUBSUB(int, PubSubIntSlot, int64_t, uint64_t, int64_t)
  17 50          
  1216 50          
  212 50          
  469 50          
    50          
    100          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
711              
712             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
713 95 100         DEFINE_INT_PUBSUB(int32, PubSubInt32Slot, int32_t, uint32_t, int32_t)
  3 50          
  38 50          
  4 50          
  50 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
714              
715             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
716 94 100         DEFINE_INT_PUBSUB(int16, PubSubInt16Slot, int16_t, uint32_t, int32_t)
  3 50          
  38 50          
  4 50          
  49 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
717              
718             /* ================================================================
719             * Str: mutex-protected publish, lock-free subscribe
720             *
721             * Variable-length messages stored in a circular arena. Each slot
722             * records the arena offset; the seqlock (sequence double-check)
723             * guarantees readers see consistent data.
724             * ================================================================ */
725              
726             /* Publish one Str message while mutex is already held (no lock/wake). */
727 197           static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
728             uint32_t len, bool utf8) {
729 197 50         if (len > PUBSUB_STR_LEN_MASK) return -1;
730 197 100         if (len > h->msg_size) return -1;
731              
732 196           PubSubHeader *hdr = h->hdr;
733 196           PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;
734              
735 196           uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
736 196           uint32_t idx = pos & h->cap_mask;
737 196           PubSubStrSlot *slot = &slots[idx];
738              
739 196           __atomic_store_n(&slot->sequence, 0, __ATOMIC_RELAXED);
740 196           __atomic_thread_fence(__ATOMIC_RELEASE);
741              
742 196           uint32_t alloc = (len + 7) & ~7u;
743 196 100         if (alloc == 0) alloc = 8;
744 196 50         if (alloc > h->arena_cap) return -1;
745 196           uint32_t apos = __atomic_load_n(&hdr->arena_wpos, __ATOMIC_RELAXED);
746 196 100         if ((uint64_t)apos + alloc > h->arena_cap)
747 1           apos = 0;
748              
749 196           memcpy(h->data + apos, str, len);
750              
751 196           slot->arena_off = apos;
752 196 100         slot->packed_len = len | (utf8 ? PUBSUB_STR_UTF8_FLAG : 0);
753              
754 196           __atomic_store_n(&hdr->arena_wpos, apos + alloc, __ATOMIC_RELAXED);
755              
756 196           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
757 196           __atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
758 196           __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);
759              
760 196           return 1;
761             }
762              
763 186           static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
764             uint32_t len, bool utf8) {
765 186 100         if (len > h->msg_size) return -1;
766 184           pubsub_mutex_lock(h->hdr);
767 184           int r = pubsub_str_publish_locked(h, str, len, utf8);
768 184           pubsub_mutex_unlock(h->hdr);
769 184 50         if (r == 1) pubsub_wake_subscribers(h->hdr);
770 184           return r;
771             }
772              
773             /* Returns: 1 = success, 0 = empty/not-ready */
774 132           static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
775             uint32_t *out_len, bool *out_utf8) {
776 132           PubSubHeader *hdr = sub->hdr;
777 132           PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;
778              
779 134 50         for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) {
780 134           uint64_t cursor = sub->cursor;
781 134           uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE);
782              
783 134 100         if (cursor >= wp) return 0;
784              
785 116 100         if (wp - cursor > sub->capacity) {
786 2           sub->overflow_count += wp - cursor - sub->capacity;
787 2           sub->cursor = wp - sub->capacity;
788 2           continue;
789             }
790              
791 114           uint32_t idx = cursor & sub->cap_mask;
792 114           PubSubStrSlot *slot = &slots[idx];
793              
794 114           uint64_t seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
795 114 50         if (seq1 != cursor + 1) {
796 0 0         if (seq1 > cursor + 1) {
797 0 0         uint64_t new_cursor = wp > sub->capacity ? wp - sub->capacity : 0;
798 0 0         if (new_cursor > cursor)
799 0           sub->overflow_count += new_cursor - cursor;
800 0           sub->cursor = new_cursor;
801 0           continue;
802             }
803 0           return 0;
804             }
805              
806 114           uint32_t plen = slot->packed_len;
807 114           uint32_t aoff = slot->arena_off;
808 114           uint32_t len = plen & PUBSUB_STR_LEN_MASK;
809 114           bool utf8 = (plen & PUBSUB_STR_UTF8_FLAG) != 0;
810              
811             /* Safety: if metadata looks corrupted, retry */
812 114 50         if (len > sub->msg_size) continue;
813 114 50         if ((uint64_t)aoff + len > sub->hdr->arena_cap) continue;
814              
815 114 50         if (!pubsub_ensure_copy_buf(sub, len + 1)) return 0;
816              
817 114 100         if (len > 0)
818 111           memcpy(sub->copy_buf, sub->data + aoff, len);
819 114           sub->copy_buf[len] = '\0';
820              
821 114           uint64_t seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
822 114 50         if (seq2 != seq1) continue;
823              
824 114           *out_str = sub->copy_buf;
825 114           *out_len = len;
826 114           *out_utf8 = utf8;
827 114           sub->cursor = cursor + 1;
828 114           return 1;
829             }
830 0           return 0;
831             }
832              
833 55           static int pubsub_str_poll_wait(PubSubSub *sub, const char **out_str,
834             uint32_t *out_len, bool *out_utf8,
835             double timeout) {
836 55           int r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
837 55 100         if (r != 0) return r;
838 3 50         if (timeout == 0.0) return 0;
839              
840 3           PubSubHeader *hdr = sub->hdr;
841             struct timespec deadline, remaining;
842 3           int has_deadline = (timeout > 0);
843 3 50         if (has_deadline) pubsub_make_deadline(timeout, &deadline);
844              
845 0           for (;;) {
846 3           uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE);
847 3           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
848 3 50         if (r != 0) return r;
849              
850 3           __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
851 3           struct timespec *pts = NULL;
852 3 50         if (has_deadline) {
853 3 50         if (!pubsub_remaining_time(&deadline, &remaining)) {
854 0           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
855 0           return 0;
856             }
857 3           pts = &remaining;
858             }
859 3           long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT,
860             fseq, pts, NULL, 0);
861 3           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
862              
863 3           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
864 3 100         if (r != 0) return r;
865 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
866             }
867             }
868              
869             /* ================================================================
870             * Common operations
871             * ================================================================ */
872              
873 8           static void pubsub_clear(PubSubHandle *h) {
874 8           PubSubHeader *hdr = h->hdr;
875 8 100         if (hdr->mode == PUBSUB_MODE_STR)
876 2           pubsub_mutex_lock(hdr);
877              
878 8           __atomic_store_n(&hdr->write_pos, 0, __ATOMIC_RELAXED);
879 8           __atomic_store_n(&hdr->stat_publish_ok, 0, __ATOMIC_RELAXED);
880 8 100         if (hdr->mode == PUBSUB_MODE_STR)
881 2           __atomic_store_n(&hdr->arena_wpos, 0, __ATOMIC_RELAXED);
882              
883             /* Zero all slot sequences */
884 8           uint32_t cap = h->capacity;
885 8 100         if (hdr->mode == PUBSUB_MODE_INT) {
886 4           PubSubIntSlot *s = (PubSubIntSlot *)h->slots;
887 260 100         for (uint32_t i = 0; i < cap; i++)
888 256           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
889 4 100         } else if (hdr->mode == PUBSUB_MODE_INT32) {
890 1           PubSubInt32Slot *s = (PubSubInt32Slot *)h->slots;
891 65 100         for (uint32_t i = 0; i < cap; i++)
892 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
893 3 100         } else if (hdr->mode == PUBSUB_MODE_INT16) {
894 1           PubSubInt16Slot *s = (PubSubInt16Slot *)h->slots;
895 65 100         for (uint32_t i = 0; i < cap; i++)
896 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
897             } else {
898 2           PubSubStrSlot *s = (PubSubStrSlot *)h->slots;
899 82 100         for (uint32_t i = 0; i < cap; i++)
900 80           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
901             }
902              
903 8 100         if (hdr->mode == PUBSUB_MODE_STR)
904 2           pubsub_mutex_unlock(hdr);
905 8           pubsub_wake_subscribers(hdr);
906 8           }
907              
908 7           static inline int pubsub_sync(PubSubHandle *h) {
909 7           return msync(h->hdr, h->mmap_size, MS_SYNC);
910             }
911              
912 13           static inline int pubsub_eventfd_create(PubSubHandle *h) {
913 13 50         if (h->notify_fd >= 0) return h->notify_fd;
914 13           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
915 13           return h->notify_fd;
916             }
917              
918 1           static inline void pubsub_eventfd_set(PubSubHandle *h, int fd) {
919 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    50          
920 0           close(h->notify_fd);
921 1           h->notify_fd = fd;
922 1           }
923              
924 8           static inline void pubsub_notify(PubSubHandle *h) {
925 8 50         if (h->notify_fd >= 0) {
926 8           uint64_t one = 1;
927 8           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
928             }
929 8           }
930              
931 6           static inline int64_t pubsub_eventfd_consume(PubSubHandle *h) {
932 6 50         if (h->notify_fd < 0) return -1;
933 6           uint64_t val = 0;
934 6 50         if (read(h->notify_fd, &val, sizeof(val)) != sizeof(val)) return -1;
935 6           return (int64_t)val;
936             }
937              
938 4           static inline void pubsub_sub_eventfd_consume(PubSubSub *sub) {
939 4 50         if (sub->notify_fd >= 0) {
940             uint64_t val;
941 4           ssize_t __attribute__((unused)) rc = read(sub->notify_fd, &val, sizeof(val));
942             }
943 4           }
944              
945             #endif /* PUBSUB_H */