File Coverage

ring.h
Criterion Covered Total %
statement 172 182 94.5
branch 82 176 46.5
condition n/a
subroutine n/a
pod n/a
total 254 358 70.9


line stmt bran cond sub pod time code
1             /*
2             * ring.h -- Shared-memory fixed-size ring buffer for Linux
3             *
4             * Lock-free circular buffer: writes overwrite oldest when full.
5             * Readers access by relative position (0=latest) or absolute sequence.
6             * No consumer tracking — data persists until overwritten.
7             *
8             * Unlike Queue (consumed on read) or PubSub (subscription tracking),
9             * RingBuffer is a simple overwriting circular window.
10             */
11              
12             #ifndef RING_H
13             #define RING_H
14              
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28              
29             #define RING_MAGIC 0x524E4731U /* "RNG1" */
30             #define RING_VERSION 1
31             #define RING_ERR_BUFLEN 256
32              
33             #define RING_VAR_INT 0
34             #define RING_VAR_F64 1
35              
36             /* ================================================================
37             * Header (128 bytes)
38             * ================================================================ */
39              
40             typedef struct {
41             uint32_t magic;
42             uint32_t version;
43             uint32_t elem_size;
44             uint32_t variant_id;
45             uint64_t capacity;
46             uint64_t total_size;
47             uint64_t data_off;
48             uint8_t _pad0[24];
49              
50             uint64_t head; /* 64: monotonic write cursor (next write position) */
51             uint64_t count; /* 72: total writes (for overwrite detection) */
52             uint32_t waiters; /* 80: blocked on new data */
53             uint32_t _pad1; /* 84 */
54             uint64_t stat_writes; /* 88 */
55             uint64_t stat_overwrites; /* 96 */
56             uint8_t _pad2[24]; /* 104-127 */
57             } RingHeader;
58              
59             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
60             _Static_assert(sizeof(RingHeader) == 128, "RingHeader must be 128 bytes");
61             #endif
62              
63             typedef struct {
64             RingHeader *hdr;
65             uint8_t *data;
66             size_t mmap_size;
67             uint32_t elem_size;
68             char *path;
69             int notify_fd;
70             int backing_fd;
71             } RingHandle;
72              
73             /* ================================================================
74             * Slot access
75             * ================================================================ */
76              
77 32           static inline uint8_t *ring_slot(RingHandle *h, uint64_t seq) {
78 32           return h->data + (seq % h->hdr->capacity) * h->elem_size;
79             }
80              
81             /* ================================================================
82             * Write — overwrites oldest when full, always succeeds
83             * ================================================================ */
84              
85 13           static inline uint64_t ring_write(RingHandle *h, const void *val, uint32_t vlen) {
86 13           RingHeader *hdr = h->hdr;
87             /* CAS to claim a slot */
88             uint64_t pos;
89             for (;;) {
90 0           pos = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
91 13 50         if (__atomic_compare_exchange_n(&hdr->head, &pos, pos + 1,
92             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
93 13           break;
94             }
95             /* Write data */
96 13           uint32_t sz = h->elem_size;
97 13           uint32_t cp = vlen < sz ? vlen : sz;
98 13           memcpy(ring_slot(h, pos), val, cp);
99 13 50         if (cp < sz) memset(ring_slot(h, pos) + cp, 0, sz - cp);
100 13           __atomic_thread_fence(__ATOMIC_RELEASE);
101              
102             /* Track overwrites */
103 13           uint64_t cnt = __atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE);
104 13 100         if (cnt > hdr->capacity)
105 1           __atomic_add_fetch(&hdr->stat_overwrites, 1, __ATOMIC_RELAXED);
106 13           __atomic_add_fetch(&hdr->stat_writes, 1, __ATOMIC_RELAXED);
107              
108             /* Wake readers */
109 13 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
110 0           syscall(SYS_futex, &hdr->count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
111              
112 13           return pos;
113             }
114              
115             /* ================================================================
116             * Read — by relative position (0=latest) or absolute sequence
117             * ================================================================ */
118              
119             /* Read the nth most recent value (0=latest, 1=previous, ...).
120             * Returns 1 on success, 0 if n >= available entries. */
121 17           static inline int ring_read_latest(RingHandle *h, uint32_t n, void *out) {
122 17           uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE);
123 17 50         if (head == 0) return 0;
124 17           uint64_t avail = head < h->hdr->capacity ? head : h->hdr->capacity;
125 17 100         if (n >= avail) return 0;
126 16           uint64_t seq = head - 1 - n;
127 16           memcpy(out, ring_slot(h, seq), h->elem_size);
128 16           return 1;
129             }
130              
131             /* Read by absolute sequence number. Returns 1 if still in buffer. */
132 5           static inline int ring_read_seq(RingHandle *h, uint64_t seq, void *out) {
133 5           uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE);
134 5 100         if (seq >= head) return 0; /* not yet written */
135 4 100         uint64_t oldest = (head > h->hdr->capacity) ? head - h->hdr->capacity : 0;
136 4 100         if (seq < oldest) return 0; /* already overwritten */
137 3           memcpy(out, ring_slot(h, seq), h->elem_size);
138 3           return 1;
139             }
140              
141             /* ================================================================
142             * Status
143             * ================================================================ */
144              
145 2           static inline uint64_t ring_head(RingHandle *h) {
146 2           return __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE);
147             }
148              
149 9           static inline uint64_t ring_size(RingHandle *h) {
150 9           uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE);
151 9           return head < h->hdr->capacity ? head : h->hdr->capacity;
152             }
153              
154             /* ================================================================
155             * Wait — block until new data arrives
156             * ================================================================ */
157              
158 2           static inline void ring_make_deadline(double t, struct timespec *dl) {
159 2           clock_gettime(CLOCK_MONOTONIC, dl);
160 2           dl->tv_sec += (time_t)t;
161 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
162 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
163 2           }
164              
165 3           static inline int ring_remaining(const struct timespec *dl, struct timespec *rem) {
166             struct timespec now;
167 3           clock_gettime(CLOCK_MONOTONIC, &now);
168 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
169 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
170 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
171 3           return rem->tv_sec >= 0;
172             }
173              
174 2           static inline int ring_wait(RingHandle *h, uint64_t expected_count, double timeout) {
175 2 50         if (__atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE) != expected_count) return 1;
176 2 50         if (timeout == 0) return 0;
177              
178             struct timespec dl, rem;
179 2           int has_dl = (timeout > 0);
180 2 50         if (has_dl) ring_make_deadline(timeout, &dl);
181              
182 0           for (;;) {
183 2           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE);
184 2           uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
185 2 50         if (cur == expected_count) {
186 2           struct timespec *pts = NULL;
187 2 50         if (has_dl) {
188 2 50         if (!ring_remaining(&dl, &rem)) {
189 0           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
190 0           return 0;
191             }
192 2           pts = &rem;
193             }
194 2           syscall(SYS_futex, &h->hdr->count, FUTEX_WAIT,
195             (uint32_t)(cur & 0xFFFFFFFF), pts, NULL, 0);
196             }
197 2           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
198 2 100         if (__atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE) != expected_count) return 1;
199 1 50         if (has_dl && !ring_remaining(&dl, &rem)) return 0;
    50          
200             }
201             }
202              
203             /* ================================================================
204             * Create / Open / Close
205             * ================================================================ */
206              
207             #define RING_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, RING_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
208              
209 6           static inline void ring_init_header(void *base, uint64_t total,
210             uint32_t elem_size, uint32_t variant_id,
211             uint64_t capacity) {
212 6           RingHeader *hdr = (RingHeader *)base;
213 6           memset(base, 0, (size_t)total);
214 6           hdr->magic = RING_MAGIC;
215 6           hdr->version = RING_VERSION;
216 6           hdr->elem_size = elem_size;
217 6           hdr->variant_id = variant_id;
218 6           hdr->capacity = capacity;
219 6           hdr->total_size = total;
220 6           hdr->data_off = sizeof(RingHeader);
221 6           __atomic_thread_fence(__ATOMIC_SEQ_CST);
222 6           }
223              
224 8           static inline RingHandle *ring_setup(void *base, size_t ms, const char *path, int bfd) {
225 8           RingHeader *hdr = (RingHeader *)base;
226 8           RingHandle *h = (RingHandle *)calloc(1, sizeof(RingHandle));
227 8 50         if (!h) { munmap(base, ms); return NULL; }
228 8           h->hdr = hdr;
229 8           h->data = (uint8_t *)base + hdr->data_off;
230 8           h->mmap_size = ms;
231 8           h->elem_size = hdr->elem_size;
232 8 100         h->path = path ? strdup(path) : NULL;
233 8           h->notify_fd = -1;
234 8           h->backing_fd = bfd;
235 8           return h;
236             }
237              
238 6           static RingHandle *ring_create(const char *path, uint64_t capacity,
239             uint32_t elem_size, uint32_t variant_id,
240             char *errbuf) {
241 6 50         if (errbuf) errbuf[0] = '\0';
242 6 50         if (capacity == 0) { RING_ERR("capacity must be > 0"); return NULL; }
    0          
243 6 50         if (elem_size == 0) { RING_ERR("elem_size must be > 0"); return NULL; }
    0          
244 6 50         if (capacity > (UINT64_MAX - sizeof(RingHeader)) / elem_size) {
245 0 0         RING_ERR("capacity * elem_size overflow"); return NULL;
246             }
247              
248 6           uint64_t total = sizeof(RingHeader) + capacity * elem_size;
249 6           int anonymous = (path == NULL);
250 6           int fd = -1;
251             size_t map_size;
252             void *base;
253              
254 6 100         if (anonymous) {
255 3           map_size = (size_t)total;
256 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
257 3 50         if (base == MAP_FAILED) { RING_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
258             } else {
259 3           fd = open(path, O_RDWR|O_CREAT, 0666);
260 4 50         if (fd < 0) { RING_ERR("open: %s", strerror(errno)); return NULL; }
    0          
261 3 50         if (flock(fd, LOCK_EX) < 0) { RING_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
262             struct stat st;
263 3 50         if (fstat(fd, &st) < 0) { RING_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
264 3           int is_new = (st.st_size == 0);
265 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
266 0 0         RING_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
267             }
268 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
269 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
270 3 50         if (base == MAP_FAILED) { RING_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
271 3 100         if (!is_new) {
272 1           RingHeader *hdr = (RingHeader *)base;
273 1 50         if (hdr->magic != RING_MAGIC || hdr->version != RING_VERSION ||
    50          
274 1 50         hdr->variant_id != variant_id ||
275 1 50         hdr->total_size != (uint64_t)st.st_size) {
276 0 0         RING_ERR("invalid ring file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
277             }
278 1           flock(fd, LOCK_UN); close(fd);
279 1           return ring_setup(base, map_size, path, -1);
280             }
281             }
282 5           ring_init_header(base, total, elem_size, variant_id, capacity);
283 5 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
284 5           return ring_setup(base, map_size, path, -1);
285             }
286              
287 1           static RingHandle *ring_create_memfd(const char *name, uint64_t capacity,
288             uint32_t elem_size, uint32_t variant_id,
289             char *errbuf) {
290 1 50         if (errbuf) errbuf[0] = '\0';
291 1 50         if (capacity == 0) { RING_ERR("capacity must be > 0"); return NULL; }
    0          
292 1 50         if (elem_size == 0) { RING_ERR("elem_size must be > 0"); return NULL; }
    0          
293 1 50         if (capacity > (UINT64_MAX - sizeof(RingHeader)) / elem_size) {
294 0 0         RING_ERR("capacity * elem_size overflow"); return NULL;
295             }
296 1           uint64_t total = sizeof(RingHeader) + capacity * elem_size;
297 1 50         int fd = memfd_create(name ? name : "ring", MFD_CLOEXEC);
298 1 50         if (fd < 0) { RING_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
299 1 50         if (ftruncate(fd, (off_t)total) < 0) { RING_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
300 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
301 1 50         if (base == MAP_FAILED) { RING_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
302 1           ring_init_header(base, total, elem_size, variant_id, capacity);
303 1           return ring_setup(base, (size_t)total, NULL, fd);
304             }
305              
306 1           static RingHandle *ring_open_fd(int fd, uint32_t variant_id, char *errbuf) {
307 1 50         if (errbuf) errbuf[0] = '\0';
308             struct stat st;
309 1 50         if (fstat(fd, &st) < 0) { RING_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
310 1 50         if ((uint64_t)st.st_size < sizeof(RingHeader)) { RING_ERR("too small"); return NULL; }
    0          
311 1           size_t ms = (size_t)st.st_size;
312 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
313 1 50         if (base == MAP_FAILED) { RING_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
314 1           RingHeader *hdr = (RingHeader *)base;
315 1 50         if (hdr->magic != RING_MAGIC || hdr->version != RING_VERSION ||
    50          
316 1 50         hdr->variant_id != variant_id ||
317 1 50         hdr->total_size != (uint64_t)st.st_size) {
318 0 0         RING_ERR("invalid ring"); munmap(base, ms); return NULL;
319             }
320 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
321 1 50         if (myfd < 0) { RING_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
322 1           return ring_setup(base, ms, NULL, myfd);
323             }
324              
325 8           static void ring_destroy(RingHandle *h) {
326 8 50         if (!h) return;
327 8 100         if (h->notify_fd >= 0) close(h->notify_fd);
328 8 100         if (h->backing_fd >= 0) close(h->backing_fd);
329 8 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
330 8           free(h->path);
331 8           free(h);
332             }
333              
334 1           static void ring_clear(RingHandle *h) {
335 1           __atomic_store_n(&h->hdr->head, 0, __ATOMIC_RELEASE);
336 1           __atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE);
337 1           }
338              
339 1           static int ring_create_eventfd(RingHandle *h) {
340 1 50         if (h->notify_fd >= 0) close(h->notify_fd);
341 1           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
342 1 50         if (efd < 0) return -1;
343 1           h->notify_fd = efd; return efd;
344             }
345 1           static int ring_notify(RingHandle *h) {
346 1 50         if (h->notify_fd < 0) return 0;
347 1           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
348             }
349 1           static int64_t ring_eventfd_consume(RingHandle *h) {
350 1 50         if (h->notify_fd < 0) return -1;
351 1           uint64_t v = 0;
352 1 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
353 1           return (int64_t)v;
354             }
355 1           static void ring_msync(RingHandle *h) { msync(h->hdr, h->mmap_size, MS_SYNC); }
356              
357             #endif /* RING_H */