File Coverage

pubsub.h
Criterion Covered Total %
statement 355 430 82.5
branch 225 412 54.6
condition n/a
subroutine n/a
pod n/a
total 580 842 68.8


line stmt bran cond sub pod time code
1             /*
2             * pubsub.h -- Shared-memory broadcast pub/sub for Linux
3             *
4             * Two variants:
5             * Int -- lock-free MPMC publish, lock-free subscribe (int64 values)
6             * Str -- mutex-protected publish, lock-free subscribe (variable-length
7             * byte strings up to msg_size, stored in a circular arena)
8             *
9             * Ring buffer broadcast: publishers write, each subscriber independently
10             * reads with its own cursor. Messages are never consumed -- the ring
11             * overwrites old data when it wraps. Subscribers auto-recover from
12             * overflow by resetting to the oldest available position.
13             *
14             * File-backed mmap(MAP_SHARED) for cross-process sharing,
15             * futex for blocking poll, PID-based stale lock recovery (Str mode).
16             */
17              
18             #ifndef PUBSUB_H
19             #define PUBSUB_H
20              
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32             #include
33             #include
34             #include
35             #include
36             #include
37              
38             /* ================================================================
39             * Constants
40             * ================================================================ */
41              
42             #define PUBSUB_MAGIC 0x50534231U /* "PSB1" */
43             #define PUBSUB_VERSION 1
44             #define PUBSUB_MODE_INT 0
45             #define PUBSUB_MODE_STR 1
46             #define PUBSUB_MODE_INT32 2
47             #define PUBSUB_MODE_INT16 3
48             #define PUBSUB_ERR_BUFLEN 256
49             #define PUBSUB_SPIN_LIMIT 32
50             #define PUBSUB_LOCK_TIMEOUT_SEC 2
51             #define PUBSUB_DEFAULT_MSG_SIZE 256
52             #define PUBSUB_STR_UTF8_FLAG 0x80000000U
53             #define PUBSUB_STR_LEN_MASK 0x7FFFFFFFU
54             #define PUBSUB_POLL_RETRIES 8
55              
56             /* ================================================================
57             * Header (256 bytes = 4 cache lines)
58             * ================================================================ */
59              
60             typedef struct {
61             /* ---- Cache line 0 (0-63): immutable after create ---- */
62             uint32_t magic; /* 0 */
63             uint32_t version; /* 4 */
64             uint32_t mode; /* 8 */
65             uint32_t capacity; /* 12 */
66             uint64_t total_size; /* 16 */
67             uint64_t slots_off; /* 24 */
68             uint64_t data_off; /* 32: str: offset to arena; int: 0 */
69             uint32_t msg_size; /* 40: str: max bytes per message; int: 0 */
70             uint32_t _reserved0; /* 44 */
71             uint64_t arena_cap; /* 48: str: arena byte capacity; int: 0 */
72             uint8_t _pad0[8]; /* 56-63 */
73              
74             /* ---- Cache line 1 (64-127): writer hot ---- */
75             uint64_t write_pos; /* 64 */
76             uint32_t mutex; /* 72: str: futex mutex */
77             uint32_t mutex_waiters; /* 76 */
78             uint32_t arena_wpos; /* 80: str: next write position in arena */
79             uint8_t _pad1[44]; /* 84-127 */
80              
81             /* ---- Cache line 2 (128-191): subscriber notification ---- */
82             uint32_t sub_futex; /* 128 */
83             uint32_t sub_waiters; /* 132 */
84             uint8_t _pad2[56]; /* 136-191 */
85              
86             /* ---- Cache line 3 (192-255): stats ---- */
87             uint64_t stat_publish_ok; /* 192 */
88             uint32_t stat_recoveries; /* 200 */
89             uint8_t _pad3[52]; /* 204-255 */
90             } PubSubHeader;
91              
92             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
93             _Static_assert(sizeof(PubSubHeader) == 256, "PubSubHeader must be 256 bytes");
94             #endif
95              
96             /* ================================================================
97             * Slot types
98             * ================================================================ */
99              
100             typedef struct {
101             uint64_t sequence;
102             int64_t value;
103             } PubSubIntSlot; /* 16 bytes */
104              
105             /* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
106             typedef struct {
107             uint32_t sequence;
108             int32_t value;
109             } PubSubInt32Slot; /* 8 bytes */
110              
111             typedef struct {
112             uint32_t sequence;
113             int16_t value;
114             int16_t _pad;
115             } PubSubInt16Slot; /* 8 bytes */
116              
117             typedef struct {
118             uint64_t sequence;
119             uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
120             uint32_t arena_off; /* offset into data arena */
121             } PubSubStrSlot; /* 16 bytes */
122              
123             /* ================================================================
124             * Process-local handles
125             * ================================================================ */
126              
127             typedef struct {
128             PubSubHeader *hdr;
129             void *slots;
130             char *data; /* NULL for int mode */
131             size_t mmap_size;
132             uint32_t capacity;
133             uint32_t cap_mask;
134             uint32_t msg_size;
135             uint64_t arena_cap;
136             char *path;
137             int notify_fd;
138             int backing_fd;
139             } PubSubHandle;
140              
141             typedef struct {
142             PubSubHeader *hdr;
143             void *slots;
144             char *data;
145             uint64_t cursor;
146             uint32_t capacity;
147             uint32_t cap_mask;
148             uint32_t msg_size;
149             char *copy_buf;
150             uint32_t copy_buf_cap;
151             uint64_t overflow_count;
152             int notify_fd;
153             void *userdata;
154             } PubSubSub;
155              
156             /* ================================================================
157             * Utility
158             * ================================================================ */
159              
160 136           static inline uint32_t pubsub_next_pow2(uint32_t v) {
161 136 50         if (v < 2) return 2;
162 136 50         if (v > 0x80000000U) return 0;
163 136           v--;
164 136           v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
165 136           return v + 1;
166             }
167              
168 0           static inline void pubsub_spin_pause(void) {
169             #if defined(__x86_64__) || defined(__i386__)
170 0           __asm__ volatile("pause" ::: "memory");
171             #elif defined(__aarch64__)
172             __asm__ volatile("yield" ::: "memory");
173             #else
174             __asm__ volatile("" ::: "memory");
175             #endif
176 0           }
177              
178 111           static inline int pubsub_ensure_copy_buf(PubSubSub *sub, uint32_t needed) {
179 111 100         if (needed <= sub->copy_buf_cap) return 1;
180 32 100         uint32_t ns = sub->copy_buf_cap ? sub->copy_buf_cap : 64;
181 43 100         while (ns < needed) {
182 11           uint32_t n2 = ns * 2;
183 11 50         if (n2 <= ns) { ns = needed; break; }
184 11           ns = n2;
185             }
186 32           char *nb = (char *)realloc(sub->copy_buf, ns);
187 32 50         if (!nb) return 0;
188 32           sub->copy_buf = nb;
189 32           sub->copy_buf_cap = ns;
190 32           return 1;
191             }
192              
193             /* ================================================================
194             * Futex helpers
195             * ================================================================ */
196              
197             #define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
198             #define PUBSUB_MUTEX_PID_MASK 0x7FFFFFFFU
199             #define PUBSUB_MUTEX_VAL(pid) (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))
200              
201 0           static inline int pubsub_pid_alive(uint32_t pid) {
202 0 0         if (pid == 0) return 1;
203 0 0         return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
    0          
204             }
205              
206             static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };
207              
208 0           static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
209 0 0         if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
210             0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
211 0           return;
212 0           __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
213 0 0         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
214 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
215             }
216              
217 187           static inline void pubsub_mutex_lock(PubSubHeader *hdr) {
218 187           uint32_t mypid = PUBSUB_MUTEX_VAL((uint32_t)getpid());
219 187           for (int spin = 0; ; spin++) {
220 187           uint32_t expected = 0;
221 187 50         if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
222             1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
223 187           return;
224 0 0         if (__builtin_expect(spin < PUBSUB_SPIN_LIMIT, 1)) {
225 0           pubsub_spin_pause();
226 0           continue;
227             }
228 0           __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
229 0           uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
230 0 0         if (cur != 0) {
231 0           long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
232             &pubsub_lock_timeout, NULL, 0);
233 0 0         if (rc == -1 && errno == ETIMEDOUT) {
    0          
234 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
235 0           uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
236 0 0         if (val >= PUBSUB_MUTEX_WRITER_BIT) {
237 0           uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
238 0 0         if (!pubsub_pid_alive(pid))
239 0           pubsub_recover_stale_mutex(hdr, val);
240             }
241 0           spin = 0;
242 0           continue;
243             }
244             }
245 0           __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
246 0           spin = 0;
247             }
248             }
249              
250 187           static inline void pubsub_mutex_unlock(PubSubHeader *hdr) {
251 187           __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
252 187 50         if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
253 0           syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
254 187           }
255              
256 761           static inline void pubsub_wake_subscribers(PubSubHeader *hdr) {
257 761 100         if (__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED) > 0) {
258 61           __atomic_add_fetch(&hdr->sub_futex, 1, __ATOMIC_RELEASE);
259 61           syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
260             }
261 761           }
262              
263 13           static inline int pubsub_remaining_time(const struct timespec *deadline,
264             struct timespec *remaining) {
265             struct timespec now;
266 13           clock_gettime(CLOCK_MONOTONIC, &now);
267 13           remaining->tv_sec = deadline->tv_sec - now.tv_sec;
268 13           remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
269 13 100         if (remaining->tv_nsec < 0) {
270 8           remaining->tv_sec--;
271 8           remaining->tv_nsec += 1000000000L;
272             }
273 13           return remaining->tv_sec >= 0;
274             }
275              
276 13           static inline void pubsub_make_deadline(double timeout, struct timespec *deadline) {
277 13           clock_gettime(CLOCK_MONOTONIC, deadline);
278 13           deadline->tv_sec += (time_t)timeout;
279 13           deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
280 13 50         if (deadline->tv_nsec >= 1000000000L) {
281 0           deadline->tv_sec++;
282 0           deadline->tv_nsec -= 1000000000L;
283             }
284 13           }
285              
286             /* ================================================================
287             * Header validation
288             * ================================================================ */
289              
290 20           static inline int pubsub_validate_header(PubSubHeader *hdr, uint32_t mode,
291             uint64_t file_size) {
292 20 50         if (hdr->magic != PUBSUB_MAGIC ||
293 20 50         hdr->version != PUBSUB_VERSION ||
294 20 100         hdr->mode != mode ||
295 16 50         hdr->capacity == 0 ||
296 16 50         (hdr->capacity & (hdr->capacity - 1)) != 0 ||
297 16 50         hdr->total_size != file_size ||
298 16 50         hdr->slots_off != sizeof(PubSubHeader))
299 4           return 0;
300 16 100         if (mode == PUBSUB_MODE_STR) {
301 4 50         if (hdr->data_off == 0 || hdr->msg_size == 0 || hdr->arena_cap == 0)
    50          
    50          
302 0           return 0;
303 4 50         if (hdr->data_off + hdr->arena_cap > hdr->total_size)
304 0           return 0;
305             }
306 16           return 1;
307             }
308              
309             /* ================================================================
310             * Create / Open / Close
311             * ================================================================ */
312              
313             #define PUBSUB_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, PUBSUB_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
314              
315 136           static PubSubHandle *pubsub_init_handle(void *base, size_t map_size,
316             uint32_t mode, const char *path) {
317 136           PubSubHeader *hdr = (PubSubHeader *)base;
318 136           PubSubHandle *h = (PubSubHandle *)calloc(1, sizeof(PubSubHandle));
319 136 50         if (!h) return NULL;
320              
321 136           h->hdr = hdr;
322 136           h->slots = (char *)base + hdr->slots_off;
323 136 100         h->data = (mode == PUBSUB_MODE_STR) ? (char *)base + hdr->data_off : NULL;
324 136           h->mmap_size = map_size;
325 136           h->capacity = hdr->capacity;
326 136           h->cap_mask = hdr->capacity - 1;
327 136           h->msg_size = hdr->msg_size;
328 136           h->arena_cap = hdr->arena_cap;
329 136 100         h->path = path ? strdup(path) : NULL;
330 136           h->notify_fd = -1;
331 136           h->backing_fd = -1;
332              
333 136           return h;
334             }
335              
336 120           static void pubsub_init_header(void *base, uint32_t mode, uint32_t cap,
337             uint64_t total_size, uint64_t slots_off,
338             uint64_t data_off, uint32_t msg_size,
339             uint64_t arena_cap) {
340 120           PubSubHeader *hdr = (PubSubHeader *)base;
341 120           memset(hdr, 0, sizeof(PubSubHeader));
342 120           hdr->magic = PUBSUB_MAGIC;
343 120           hdr->version = PUBSUB_VERSION;
344 120           hdr->mode = mode;
345 120           hdr->capacity = cap;
346 120           hdr->total_size = total_size;
347 120           hdr->slots_off = slots_off;
348 120           hdr->data_off = data_off;
349 120           hdr->msg_size = msg_size;
350 120           hdr->arena_cap = arena_cap;
351 120           }
352              
353 136           static void pubsub_calc_layout(uint32_t cap, uint32_t mode, uint32_t msg_size,
354             uint64_t *out_slots_off, uint64_t *out_data_off,
355             uint64_t *out_arena_cap, uint64_t *out_total_size) {
356 136           uint64_t slots_off = sizeof(PubSubHeader);
357 136           uint64_t slot_size = (mode == PUBSUB_MODE_INT) ? sizeof(PubSubIntSlot)
358 207 100         : (mode == PUBSUB_MODE_INT32) ? sizeof(PubSubInt32Slot)
359 130 100         : (mode == PUBSUB_MODE_INT16) ? sizeof(PubSubInt16Slot)
360 59 100         : sizeof(PubSubStrSlot);
361 136           uint64_t data_off = 0, arena_cap = 0, total_size;
362              
363 136 100         if (mode == PUBSUB_MODE_STR) {
364 48           uint64_t slots_end = slots_off + (uint64_t)cap * slot_size;
365 48           data_off = (slots_end + 63) & ~(uint64_t)63;
366 48           arena_cap = (uint64_t)cap * ((uint64_t)msg_size + 8);
367 48 50         if (arena_cap > UINT32_MAX) arena_cap = UINT32_MAX;
368 48           total_size = data_off + arena_cap;
369             } else {
370 88           total_size = slots_off + (uint64_t)cap * slot_size;
371             }
372              
373 136           *out_slots_off = slots_off;
374 136           *out_data_off = data_off;
375 136           *out_arena_cap = arena_cap;
376 136           *out_total_size = total_size;
377 136           }
378              
379 132           static PubSubHandle *pubsub_create(const char *path, uint32_t capacity,
380             uint32_t mode, uint32_t msg_size,
381             char *errbuf) {
382 132 50         if (errbuf) errbuf[0] = '\0';
383              
384 132           uint32_t cap = pubsub_next_pow2(capacity);
385 132 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
386 132 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
387              
388 132 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    100          
389 33           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
390              
391             uint64_t slots_off, data_off, arena_cap, total_size;
392 132           pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size);
393              
394 132           int anonymous = (path == NULL);
395             size_t map_size;
396             void *base;
397              
398 132 100         if (anonymous) {
399 81           map_size = (size_t)total_size;
400 81           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE,
401             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
402 81 50         if (base == MAP_FAILED) {
403 0 0         PUBSUB_ERR("mmap(anonymous): %s", strerror(errno));
404 0           return NULL;
405             }
406             } else {
407 51           int fd = open(path, O_RDWR | O_CREAT, 0666);
408 67 50         if (fd < 0) { PUBSUB_ERR("open(%s): %s", path, strerror(errno)); return NULL; }
    0          
409              
410 51 50         if (flock(fd, LOCK_EX) < 0) {
411 0 0         PUBSUB_ERR("flock(%s): %s", path, strerror(errno));
412 0           close(fd); return NULL;
413             }
414              
415             struct stat st;
416 51 50         if (fstat(fd, &st) < 0) {
417 0 0         PUBSUB_ERR("fstat(%s): %s", path, strerror(errno));
418 0           flock(fd, LOCK_UN); close(fd); return NULL;
419             }
420              
421 51           int is_new = (st.st_size == 0);
422              
423 51 100         if (!is_new && (uint64_t)st.st_size < sizeof(PubSubHeader)) {
    50          
424 0 0         PUBSUB_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
425 0           flock(fd, LOCK_UN); close(fd); return NULL;
426             }
427              
428 51 100         if (is_new) {
429 35 50         if (ftruncate(fd, (off_t)total_size) < 0) {
430 0 0         PUBSUB_ERR("ftruncate(%s): %s", path, strerror(errno));
431 0           flock(fd, LOCK_UN); close(fd); return NULL;
432             }
433             }
434              
435 51 100         map_size = is_new ? (size_t)total_size : (size_t)st.st_size;
436 51           base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
437 51 50         if (base == MAP_FAILED) {
438 0 0         PUBSUB_ERR("mmap(%s): %s", path, strerror(errno));
439 0           flock(fd, LOCK_UN); close(fd); return NULL;
440             }
441              
442 51 100         if (!is_new) {
443 16 100         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
444 4 50         PUBSUB_ERR("%s: invalid or incompatible pubsub file", path);
445 4           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
446             }
447 12           flock(fd, LOCK_UN);
448 12           close(fd);
449 12           return pubsub_init_handle(base, map_size, mode, path);
450             }
451              
452 35           flock(fd, LOCK_UN);
453 35           close(fd);
454             }
455              
456 116           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
457             msg_size, arena_cap);
458              
459 116           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, path);
460 116 50         if (!h) { munmap(base, (size_t)total_size); return NULL; }
461 116           return h;
462             }
463              
464 4           static PubSubHandle *pubsub_create_memfd(const char *name, uint32_t capacity,
465             uint32_t mode, uint32_t msg_size,
466             char *errbuf) {
467 4 50         if (errbuf) errbuf[0] = '\0';
468              
469 4           uint32_t cap = pubsub_next_pow2(capacity);
470 4 50         if (cap == 0) { PUBSUB_ERR("invalid capacity"); return NULL; }
    0          
471 4 50         if (mode > PUBSUB_MODE_INT16) { PUBSUB_ERR("unknown mode %u", mode); return NULL; }
    0          
472              
473 4 100         if (mode == PUBSUB_MODE_STR && msg_size == 0)
    50          
474 1           msg_size = PUBSUB_DEFAULT_MSG_SIZE;
475              
476             uint64_t slots_off, data_off, arena_cap, total_size;
477 4           pubsub_calc_layout(cap, mode, msg_size, &slots_off, &data_off, &arena_cap, &total_size);
478              
479 4 50         int fd = memfd_create(name ? name : "pubsub", MFD_CLOEXEC);
480 4 50         if (fd < 0) { PUBSUB_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
481              
482 4 50         if (ftruncate(fd, (off_t)total_size) < 0) {
483 0 0         PUBSUB_ERR("ftruncate(memfd): %s", strerror(errno));
484 0           close(fd); return NULL;
485             }
486              
487 4           void *base = mmap(NULL, (size_t)total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
488 4 50         if (base == MAP_FAILED) {
489 0 0         PUBSUB_ERR("mmap(memfd): %s", strerror(errno));
490 0           close(fd); return NULL;
491             }
492              
493 4           pubsub_init_header(base, mode, cap, total_size, slots_off, data_off,
494             msg_size, arena_cap);
495              
496 4           PubSubHandle *h = pubsub_init_handle(base, (size_t)total_size, mode, NULL);
497 4 50         if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
498 4           h->backing_fd = fd;
499 4           return h;
500             }
501              
502 4           static PubSubHandle *pubsub_open_fd(int fd, uint32_t mode, char *errbuf) {
503 4 50         if (errbuf) errbuf[0] = '\0';
504              
505             struct stat st;
506 4 50         if (fstat(fd, &st) < 0) {
507 0 0         PUBSUB_ERR("fstat(fd=%d): %s", fd, strerror(errno));
508 0           return NULL;
509             }
510              
511 4 50         if ((uint64_t)st.st_size < sizeof(PubSubHeader)) {
512 0 0         PUBSUB_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
513 0           return NULL;
514             }
515              
516 4           size_t map_size = (size_t)st.st_size;
517 4           void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
518 4 50         if (base == MAP_FAILED) {
519 0 0         PUBSUB_ERR("mmap(fd=%d): %s", fd, strerror(errno));
520 0           return NULL;
521             }
522              
523 4 50         if (!pubsub_validate_header((PubSubHeader *)base, mode, (uint64_t)st.st_size)) {
524 0 0         PUBSUB_ERR("fd %d: invalid or incompatible pubsub", fd);
525 0           munmap(base, map_size);
526 0           return NULL;
527             }
528              
529 4           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
530 4 50         if (myfd < 0) {
531 0 0         PUBSUB_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
532 0           munmap(base, map_size);
533 0           return NULL;
534             }
535              
536 4           PubSubHandle *h = pubsub_init_handle(base, map_size, mode, NULL);
537 4 50         if (!h) { munmap(base, map_size); close(myfd); return NULL; }
538 4           h->backing_fd = myfd;
539 4           return h;
540             }
541              
542 136           static void pubsub_destroy(PubSubHandle *h) {
543 136 50         if (!h) return;
544 136 100         if (h->notify_fd >= 0) close(h->notify_fd);
545 136 100         if (h->backing_fd >= 0) close(h->backing_fd);
546 136 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
547 136           free(h->path);
548 136           free(h);
549             }
550              
551             /* ================================================================
552             * Subscribe
553             * ================================================================ */
554              
555 98           static PubSubSub *pubsub_subscribe(PubSubHandle *h, int from_oldest) {
556 98           PubSubSub *sub = (PubSubSub *)calloc(1, sizeof(PubSubSub));
557 98 50         if (!sub) return NULL;
558              
559 98           sub->hdr = h->hdr;
560 98           sub->slots = h->slots;
561 98           sub->data = h->data;
562 98           sub->capacity = h->capacity;
563 98           sub->cap_mask = h->cap_mask;
564 98           sub->msg_size = h->msg_size;
565              
566 98           sub->notify_fd = h->notify_fd;
567              
568 98           uint64_t wp = __atomic_load_n(&h->hdr->write_pos, __ATOMIC_ACQUIRE);
569 98 100         if (from_oldest && wp > h->capacity)
    100          
570 1           sub->cursor = wp - h->capacity;
571 97 100         else if (from_oldest)
572 63           sub->cursor = 0;
573             else
574 34           sub->cursor = wp;
575              
576 98           return sub;
577             }
578              
579 98           static void pubsub_sub_destroy(PubSubSub *sub) {
580 98 50         if (!sub) return;
581 98           free(sub->copy_buf);
582 98           free(sub);
583             }
584              
585             /* ================================================================
586             * Common: lag (shared between Int and Str)
587             * ================================================================ */
588              
589 26           static inline uint64_t pubsub_lag(PubSubSub *sub) {
590 26           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
591 26 100         return (wp > sub->cursor) ? (wp - sub->cursor) : 0;
592             }
593              
594             /* ================================================================
595             * Int publish/poll macro template
596             *
597             * DEFINE_INT_PUBSUB(prefix, SlotType, ValType, SeqType, DiffType)
598             * generates: pubsub__publish, _publish_multi, _poll, _poll_wait
599             * ================================================================ */
600              
601             #define DEFINE_INT_PUBSUB(PFX, SLOT, VTYPE, STYPE, DTYPE) \
602             \
603             static inline int pubsub_##PFX##_publish(PubSubHandle *h, VTYPE value) { \
604             PubSubHeader *hdr = h->hdr; \
605             SLOT *slots = (SLOT *)h->slots; \
606             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, 1, __ATOMIC_RELAXED); \
607             uint32_t idx = pos & h->cap_mask; \
608             slots[idx].value = value; \
609             __atomic_store_n(&slots[idx].sequence, (STYPE)(pos + 1), __ATOMIC_RELEASE);\
610             __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED); \
611             pubsub_wake_subscribers(hdr); \
612             return 1; \
613             } \
614             \
615             static inline uint32_t pubsub_##PFX##_publish_multi(PubSubHandle *h, \
616             const VTYPE *values, uint32_t count) { \
617             PubSubHeader *hdr = h->hdr; \
618             SLOT *slots = (SLOT *)h->slots; \
619             uint32_t mask = h->cap_mask; \
620             uint64_t pos = __atomic_fetch_add(&hdr->write_pos, count, __ATOMIC_RELAXED);\
621             for (uint32_t i = 0; i < count; i++) { \
622             uint32_t idx = (pos + i) & mask; \
623             slots[idx].value = values[i]; \
624             __atomic_store_n(&slots[idx].sequence, \
625             (STYPE)(pos + i + 1), __ATOMIC_RELEASE); \
626             } \
627             __atomic_add_fetch(&hdr->stat_publish_ok, count, __ATOMIC_RELAXED); \
628             pubsub_wake_subscribers(hdr); \
629             return count; \
630             } \
631             \
632             static inline int pubsub_##PFX##_poll(PubSubSub *sub, VTYPE *value) { \
633             PubSubHeader *hdr = sub->hdr; \
634             SLOT *slots = (SLOT *)sub->slots; \
635             for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) { \
636             uint64_t cursor = sub->cursor; \
637             uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE); \
638             if (cursor >= wp) return 0; \
639             if (wp - cursor > sub->capacity) { \
640             sub->overflow_count += wp - cursor - sub->capacity; \
641             sub->cursor = wp - sub->capacity; \
642             continue; \
643             } \
644             uint32_t idx = cursor & sub->cap_mask; \
645             SLOT *slot = &slots[idx]; \
646             STYPE seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
647             DTYPE diff = (DTYPE)seq1 - (DTYPE)(STYPE)(cursor + 1); \
648             if (diff != 0) { \
649             if (diff > 0) { \
650             uint64_t nc = wp > sub->capacity ? wp - sub->capacity : 0; \
651             if (nc > cursor) sub->overflow_count += nc - cursor; \
652             sub->cursor = nc; \
653             continue; \
654             } \
655             return 0; \
656             } \
657             VTYPE v = slot->value; \
658             STYPE seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
659             if (seq2 != seq1) continue; \
660             *value = v; \
661             sub->cursor = cursor + 1; \
662             return 1; \
663             } \
664             return 0; \
665             } \
666             \
667             static int pubsub_##PFX##_poll_wait(PubSubSub *sub, VTYPE *value, \
668             double timeout) { \
669             int r = pubsub_##PFX##_poll(sub, value); \
670             if (r != 0) return r; \
671             if (timeout == 0.0) return 0; \
672             PubSubHeader *hdr = sub->hdr; \
673             struct timespec deadline, remaining; \
674             int has_deadline = (timeout > 0); \
675             if (has_deadline) pubsub_make_deadline(timeout, &deadline); \
676             for (;;) { \
677             uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE); \
678             r = pubsub_##PFX##_poll(sub, value); \
679             if (r != 0) return r; \
680             __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
681             struct timespec *pts = NULL; \
682             if (has_deadline) { \
683             if (!pubsub_remaining_time(&deadline, &remaining)) { \
684             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
685             return 0; \
686             } \
687             pts = &remaining; \
688             } \
689             long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT, \
690             fseq, pts, NULL, 0); \
691             __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE); \
692             r = pubsub_##PFX##_poll(sub, value); \
693             if (r != 0) return r; \
694             if (rc == -1 && errno == ETIMEDOUT) return 0; \
695             } \
696             }
697              
698             /* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
699 1907 100         DEFINE_INT_PUBSUB(int, PubSubIntSlot, int64_t, uint64_t, int64_t)
  17 50          
  1212 50          
  212 50          
  466 50          
    50          
    100          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
700              
701             /* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
702 93 100         DEFINE_INT_PUBSUB(int32, PubSubInt32Slot, int32_t, uint32_t, int32_t)
  3 50          
  37 50          
  4 50          
  49 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
703              
704             /* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
705 92 100         DEFINE_INT_PUBSUB(int16, PubSubInt16Slot, int16_t, uint32_t, int32_t)
  3 50          
  37 50          
  4 50          
  48 50          
    50          
    50          
    50          
    50          
    100          
    100          
    50          
    0          
    0          
    0          
    50          
    50          
    100          
706              
707             /* ================================================================
708             * Str: mutex-protected publish, lock-free subscribe
709             *
710             * Variable-length messages stored in a circular arena. Each slot
711             * records the arena offset; the seqlock (sequence double-check)
712             * guarantees readers see consistent data.
713             * ================================================================ */
714              
715             /* Publish one Str message while mutex is already held (no lock/wake). */
716 194           static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
717             uint32_t len, bool utf8) {
718 194 100         if (len > h->msg_size) return -1;
719              
720 193           PubSubHeader *hdr = h->hdr;
721 193           PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;
722              
723 193           uint64_t pos = hdr->write_pos;
724 193           uint32_t idx = pos & h->cap_mask;
725 193           PubSubStrSlot *slot = &slots[idx];
726              
727 193           __atomic_store_n(&slot->sequence, 0, __ATOMIC_RELAXED);
728 193           __atomic_thread_fence(__ATOMIC_RELEASE);
729              
730 193           uint32_t alloc = (len + 7) & ~7u;
731 193 100         if (alloc == 0) alloc = 8;
732 193           uint32_t apos = __atomic_load_n(&hdr->arena_wpos, __ATOMIC_RELAXED);
733 193 100         if ((uint64_t)apos + alloc > h->arena_cap)
734 1           apos = 0;
735              
736 193           memcpy(h->data + apos, str, len);
737              
738 193           slot->arena_off = apos;
739 193 100         slot->packed_len = len | (utf8 ? PUBSUB_STR_UTF8_FLAG : 0);
740              
741 193           __atomic_store_n(&hdr->arena_wpos, apos + alloc, __ATOMIC_RELAXED);
742              
743 193           __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
744 193           __atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
745 193           __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);
746              
747 193           return 1;
748             }
749              
750 183           static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
751             uint32_t len, bool utf8) {
752 183 100         if (len > h->msg_size) return -1;
753 181           pubsub_mutex_lock(h->hdr);
754 181           int r = pubsub_str_publish_locked(h, str, len, utf8);
755 181           pubsub_mutex_unlock(h->hdr);
756 181 50         if (r == 1) pubsub_wake_subscribers(h->hdr);
757 181           return r;
758             }
759              
760             /* Returns: 1 = success, 0 = empty/not-ready */
761 132           static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
762             uint32_t *out_len, bool *out_utf8) {
763 132           PubSubHeader *hdr = sub->hdr;
764 132           PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;
765              
766 134 50         for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) {
767 134           uint64_t cursor = sub->cursor;
768 134           uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE);
769              
770 134 100         if (cursor >= wp) return 0;
771              
772 113 100         if (wp - cursor > sub->capacity) {
773 2           sub->overflow_count += wp - cursor - sub->capacity;
774 2           sub->cursor = wp - sub->capacity;
775 2           continue;
776             }
777              
778 111           uint32_t idx = cursor & sub->cap_mask;
779 111           PubSubStrSlot *slot = &slots[idx];
780              
781 111           uint64_t seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
782 111 50         if (seq1 != cursor + 1) {
783 0 0         if (seq1 > cursor + 1) {
784 0 0         uint64_t new_cursor = wp > sub->capacity ? wp - sub->capacity : 0;
785 0 0         if (new_cursor > cursor)
786 0           sub->overflow_count += new_cursor - cursor;
787 0           sub->cursor = new_cursor;
788 0           continue;
789             }
790 0           return 0;
791             }
792              
793 111           uint32_t plen = slot->packed_len;
794 111           uint32_t aoff = slot->arena_off;
795 111           uint32_t len = plen & PUBSUB_STR_LEN_MASK;
796 111           bool utf8 = (plen & PUBSUB_STR_UTF8_FLAG) != 0;
797              
798             /* Safety: if metadata looks corrupted, retry */
799 111 50         if (len > sub->msg_size) continue;
800 111 50         if ((uint64_t)aoff + len > sub->hdr->arena_cap) continue;
801              
802 111 50         if (!pubsub_ensure_copy_buf(sub, len + 1)) return 0;
803              
804 111 100         if (len > 0)
805 108           memcpy(sub->copy_buf, sub->data + aoff, len);
806 111           sub->copy_buf[len] = '\0';
807              
808 111           uint64_t seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
809 111 50         if (seq2 != seq1) continue;
810              
811 111           *out_str = sub->copy_buf;
812 111           *out_len = len;
813 111           *out_utf8 = utf8;
814 111           sub->cursor = cursor + 1;
815 111           return 1;
816             }
817 0           return 0;
818             }
819              
820 55           static int pubsub_str_poll_wait(PubSubSub *sub, const char **out_str,
821             uint32_t *out_len, bool *out_utf8,
822             double timeout) {
823 55           int r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
824 55 100         if (r != 0) return r;
825 5 50         if (timeout == 0.0) return 0;
826              
827 5           PubSubHeader *hdr = sub->hdr;
828             struct timespec deadline, remaining;
829 5           int has_deadline = (timeout > 0);
830 5 50         if (has_deadline) pubsub_make_deadline(timeout, &deadline);
831              
832 0           for (;;) {
833 5           uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE);
834 5           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
835 5 50         if (r != 0) return r;
836              
837 5           __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
838 5           struct timespec *pts = NULL;
839 5 50         if (has_deadline) {
840 5 50         if (!pubsub_remaining_time(&deadline, &remaining)) {
841 0           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
842 0           return 0;
843             }
844 5           pts = &remaining;
845             }
846 5           long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT,
847             fseq, pts, NULL, 0);
848 5           __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_RELEASE);
849              
850 5           r = pubsub_str_poll(sub, out_str, out_len, out_utf8);
851 5 100         if (r != 0) return r;
852 1 50         if (rc == -1 && errno == ETIMEDOUT) return 0;
    50          
853             }
854             }
855              
856             /* ================================================================
857             * Common operations
858             * ================================================================ */
859              
860 8           static void pubsub_clear(PubSubHandle *h) {
861 8           PubSubHeader *hdr = h->hdr;
862 8 100         if (hdr->mode == PUBSUB_MODE_STR)
863 2           pubsub_mutex_lock(hdr);
864              
865 8           __atomic_store_n(&hdr->write_pos, 0, __ATOMIC_RELAXED);
866 8           __atomic_store_n(&hdr->stat_publish_ok, 0, __ATOMIC_RELAXED);
867 8 100         if (hdr->mode == PUBSUB_MODE_STR)
868 2           __atomic_store_n(&hdr->arena_wpos, 0, __ATOMIC_RELAXED);
869              
870             /* Zero all slot sequences */
871 8           uint32_t cap = h->capacity;
872 8 100         if (hdr->mode == PUBSUB_MODE_INT) {
873 4           PubSubIntSlot *s = (PubSubIntSlot *)h->slots;
874 260 100         for (uint32_t i = 0; i < cap; i++)
875 256           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
876 4 100         } else if (hdr->mode == PUBSUB_MODE_INT32) {
877 1           PubSubInt32Slot *s = (PubSubInt32Slot *)h->slots;
878 65 100         for (uint32_t i = 0; i < cap; i++)
879 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
880 3 100         } else if (hdr->mode == PUBSUB_MODE_INT16) {
881 1           PubSubInt16Slot *s = (PubSubInt16Slot *)h->slots;
882 65 100         for (uint32_t i = 0; i < cap; i++)
883 64           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
884             } else {
885 2           PubSubStrSlot *s = (PubSubStrSlot *)h->slots;
886 82 100         for (uint32_t i = 0; i < cap; i++)
887 80           __atomic_store_n(&s[i].sequence, 0, __ATOMIC_RELAXED);
888             }
889              
890 8 100         if (hdr->mode == PUBSUB_MODE_STR)
891 2           pubsub_mutex_unlock(hdr);
892 8           pubsub_wake_subscribers(hdr);
893 8           }
894              
895 7           static inline int pubsub_sync(PubSubHandle *h) {
896 7           return msync(h->hdr, h->mmap_size, MS_SYNC);
897             }
898              
899 13           static inline int pubsub_eventfd_create(PubSubHandle *h) {
900 13 50         if (h->notify_fd >= 0) return h->notify_fd;
901 13           h->notify_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
902 13           return h->notify_fd;
903             }
904              
905 1           static inline void pubsub_eventfd_set(PubSubHandle *h, int fd) {
906 1 50         if (h->notify_fd >= 0 && h->notify_fd != fd)
    50          
907 0           close(h->notify_fd);
908 1           h->notify_fd = fd;
909 1           }
910              
911 8           static inline void pubsub_notify(PubSubHandle *h) {
912 8 50         if (h->notify_fd >= 0) {
913 8           uint64_t one = 1;
914 8           ssize_t __attribute__((unused)) rc = write(h->notify_fd, &one, sizeof(one));
915             }
916 8           }
917              
918 6           static inline void pubsub_eventfd_consume(PubSubHandle *h) {
919 6 50         if (h->notify_fd >= 0) {
920             uint64_t val;
921 6           ssize_t __attribute__((unused)) rc = read(h->notify_fd, &val, sizeof(val));
922             }
923 6           }
924              
925 4           static inline void pubsub_sub_eventfd_consume(PubSubSub *sub) {
926 4 50         if (sub->notify_fd >= 0) {
927             uint64_t val;
928 4           ssize_t __attribute__((unused)) rc = read(sub->notify_fd, &val, sizeof(val));
929             }
930 4           }
931              
932             #endif /* PUBSUB_H */