File Coverage

log.h
Criterion Covered Total %
statement 190 204 93.1
branch 89 188 47.3
condition n/a
subroutine n/a
pod n/a
total 279 392 71.1


line stmt bran cond sub pod time code
1             /*
2             * log.h -- Append-only shared-memory log (WAL) for Linux
3             *
4             * Multiple writers append variable-length entries via CAS on tail offset.
5             * Readers replay from any offset. Entries persist until explicit reset.
6             *
7             * Entry format: [uint32_t length][data bytes][padding to 4-byte alignment].
8             * length is written AFTER data, acting as a commit flag (0 = uncommitted).
9             * Padding ensures the next uint32_t length is naturally aligned — required
10             * for atomic load/store on strict-alignment ISAs (ARM64 LDAR/STLR trap on
11             * unaligned addresses with SIGBUS).
12             */
13              
14             #ifndef LOG_H
15             #define LOG_H
16              
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #include
31             #include
32              
33             #define LOG_MAGIC 0x4C4F4731U /* "LOG1" */
34             #define LOG_VERSION 1
35             #define LOG_ERR_BUFLEN 256
36             #define LOG_ENTRY_HDR sizeof(uint32_t)
37              
38             /* ================================================================
39             * Header (128 bytes)
40             * ================================================================ */
41              
42             typedef struct {
43             uint32_t magic;
44             uint32_t version;
45             uint64_t data_size; /* 8: usable data region size */
46             uint64_t total_size; /* 16 */
47             uint64_t data_off; /* 24 */
48             uint8_t _pad0[32]; /* 32-63 */
49              
50             uint64_t tail; /* 64: byte offset past last entry (CAS target) */
51             uint64_t count; /* 72: number of committed entries */
52             uint32_t waiters; /* 80: blocked tailers */
53             uint32_t wake_seq; /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */
54             uint64_t stat_appends; /* 88 */
55             uint64_t stat_waits; /* 96 */
56             uint64_t stat_timeouts; /* 104 */
57             uint64_t truncation; /* 112: entries before this offset are invalid */
58             uint8_t _pad2[8]; /* 120-127 */
59             } LogHeader;
60              
61             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
62             _Static_assert(sizeof(LogHeader) == 128, "LogHeader must be 128 bytes");
63             #endif
64              
65             typedef struct {
66             LogHeader *hdr;
67             uint8_t *data;
68             size_t mmap_size;
69             char *path;
70             int notify_fd;
71             int backing_fd;
72             } LogHandle;
73              
74             /* ================================================================
75             * Utility
76             * ================================================================ */
77              
78 2           static inline void log_make_deadline(double t, struct timespec *dl) {
79 2           clock_gettime(CLOCK_MONOTONIC, dl);
80 2           dl->tv_sec += (time_t)t;
81 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
82 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
83 2           }
84              
85 3           static inline int log_remaining(const struct timespec *dl, struct timespec *rem) {
86             struct timespec now;
87 3           clock_gettime(CLOCK_MONOTONIC, &now);
88 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
89 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
90 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
91 3           return rem->tv_sec >= 0;
92             }
93              
94             /* ================================================================
95             * Append — CAS reserve space, then write data, then commit (len)
96             * ================================================================ */
97              
98 81           static inline int64_t log_append(LogHandle *h, const void *data, uint32_t len) {
99 81 50         if (len == 0) return -1; /* 0 is the uncommitted marker */
100              
101 81           LogHeader *hdr = h->hdr;
102 81 50         if (len > UINT32_MAX - LOG_ENTRY_HDR - 3U) return -1;
103             /* Pad total entry size up to 4-byte boundary so the next entry's
104             * length field is naturally aligned for atomic ops on ARM64. */
105 81           uint32_t entry_size = (LOG_ENTRY_HDR + len + 3U) & ~3U;
106              
107 0           for (;;) {
108 81           uint64_t t = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
109 158 100         if (t + entry_size > hdr->data_size) return -1;
110              
111 77 50         if (__atomic_compare_exchange_n(&hdr->tail, &t, t + entry_size,
112             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
113 77           uint8_t *slot = h->data + t;
114             /* write data first */
115 77           memcpy(slot + LOG_ENTRY_HDR, data, len);
116             /* commit: atomic release store of len — readers acquire-load it. */
117 77           __atomic_store_n((uint32_t *)slot, len, __ATOMIC_RELEASE);
118              
119 77           __atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE);
120 77           __atomic_add_fetch(&hdr->stat_appends, 1, __ATOMIC_RELAXED);
121 77           __atomic_add_fetch(&hdr->wake_seq, 1, __ATOMIC_RELEASE);
122              
123 77 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
124 0           syscall(SYS_futex, &hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
125              
126 77           return (int64_t)t;
127             }
128             }
129             }
130              
131             /* ================================================================
132             * Read — read entry at offset
133             * ================================================================ */
134              
135 240           static inline int log_read(LogHandle *h, uint64_t offset,
136             const uint8_t **out_data, uint32_t *out_len,
137             uint64_t *next_off) {
138 240           uint64_t trunc = __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
139 240 100         if (offset < trunc) return 0; /* truncated */
140 239           uint64_t t = __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
141 239 100         if (offset >= t) return 0;
142 229 50         if (offset + LOG_ENTRY_HDR > h->hdr->data_size) return 0;
143              
144 229           uint8_t *slot = h->data + offset;
145             /* Atomic acquire-load pairs with writer's __atomic_store_n RELEASE. */
146 229           uint32_t len = __atomic_load_n((const uint32_t *)slot, __ATOMIC_ACQUIRE);
147 229 50         if (len == 0) return 0; /* uncommitted */
148              
149 229 50         if (offset + LOG_ENTRY_HDR + len > t) return 0;
150              
151 229           *out_data = slot + LOG_ENTRY_HDR;
152 229           *out_len = len;
153             /* Advance past entry + alignment padding (matches log_append). */
154 229           *next_off = offset + (((uint64_t)LOG_ENTRY_HDR + len + 3U) & ~(uint64_t)3U);
155 229           return 1;
156             }
157              
158             /* ================================================================
159             * Tail / Wait
160             * ================================================================ */
161              
162 4           static inline uint64_t log_tail_offset(LogHandle *h) {
163 4           return __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
164             }
165              
166 19           static inline uint64_t log_entry_count(LogHandle *h) {
167 19           return __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
168             }
169              
170 1           static inline uint64_t log_data_size(LogHandle *h) {
171 1           return h->hdr->data_size;
172             }
173              
174 2           static inline uint64_t log_available(LogHandle *h) {
175 2           return h->hdr->data_size - __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED);
176             }
177              
178 2           static inline int log_wait(LogHandle *h, uint64_t expected_count, double timeout) {
179 2 50         if (log_entry_count(h) != expected_count) return 1;
180 2 50         if (timeout == 0) return 0;
181              
182             struct timespec dl, rem;
183 2           int has_dl = (timeout > 0);
184 2 50         if (has_dl) log_make_deadline(timeout, &dl);
185 2           __atomic_add_fetch(&h->hdr->stat_waits, 1, __ATOMIC_RELAXED);
186              
187 0           for (;;) {
188 2           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE);
189 2           uint32_t seq = __atomic_load_n(&h->hdr->wake_seq, __ATOMIC_ACQUIRE);
190 2           uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
191 2 50         if (cur == expected_count) {
192 2           struct timespec *pts = NULL;
193 2 50         if (has_dl) {
194 2 50         if (!log_remaining(&dl, &rem)) {
195 0           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
196 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
197 0           return 0;
198             }
199 2           pts = &rem;
200             }
201 2           syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAIT, seq, pts, NULL, 0);
202             }
203 2           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
204 2 100         if (log_entry_count(h) != expected_count) return 1;
205 1 50         if (has_dl && !log_remaining(&dl, &rem)) {
    50          
206 1           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
207 1           return 0;
208             }
209             }
210             }
211              
212             /* ================================================================
213             * Create / Open / Close
214             * ================================================================ */
215              
216             #define LOG_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, LOG_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
217              
218 12           static inline void log_init_header(void *base, uint64_t total, uint64_t data_size) {
219 12           LogHeader *hdr = (LogHeader *)base;
220 12           memset(base, 0, (size_t)total);
221 12           hdr->magic = LOG_MAGIC;
222 12           hdr->version = LOG_VERSION;
223 12           hdr->data_size = data_size;
224 12           hdr->total_size = total;
225 12           hdr->data_off = sizeof(LogHeader);
226 12           __atomic_thread_fence(__ATOMIC_SEQ_CST);
227 12           }
228              
229             /* Validate a mapped header (shared by log_create reopen and log_open_fd). */
230 2           static inline int log_validate_header(const LogHeader *hdr, uint64_t file_size) {
231 2 50         if (hdr->magic != LOG_MAGIC) return 0;
232 2 50         if (hdr->version != LOG_VERSION) return 0;
233 2 50         if (hdr->data_size == 0) return 0;
234 2 50         if (hdr->data_size > UINT64_MAX - sizeof(LogHeader)) return 0;
235 2 50         if (hdr->total_size != file_size) return 0;
236 2 50         if (hdr->data_off != sizeof(LogHeader)) return 0;
237 2 50         if (hdr->total_size != sizeof(LogHeader) + hdr->data_size) return 0;
238             /* Runtime-state sanity: tail and truncation must not exceed data_size. */
239 2 50         if (hdr->tail > hdr->data_size) return 0;
240 2 50         if (hdr->truncation > hdr->data_size) return 0;
241 2           return 1;
242             }
243              
244 14           static inline LogHandle *log_setup(void *base, size_t ms, const char *path, int bfd) {
245 14           LogHeader *hdr = (LogHeader *)base;
246 14           LogHandle *h = (LogHandle *)calloc(1, sizeof(LogHandle));
247 14 50         if (!h) { munmap(base, ms); return NULL; }
248 14           h->hdr = hdr;
249 14           h->data = (uint8_t *)base + hdr->data_off;
250 14           h->mmap_size = ms;
251 14 100         h->path = path ? strdup(path) : NULL;
252 14           h->notify_fd = -1;
253 14           h->backing_fd = bfd;
254             /* Log is append-only: hint sequential access so the kernel prefetcher
255             * reads ahead on large scans (read_entry, each_entry). Best-effort. */
256 14           (void)madvise(base, ms, MADV_SEQUENTIAL);
257 14           return h;
258             }
259              
260 6           static LogHandle *log_create(const char *path, uint64_t data_size, char *errbuf) {
261 6 50         if (errbuf) errbuf[0] = '\0';
262 6 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
263 6 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
264              
265 6           uint64_t total = sizeof(LogHeader) + data_size;
266 6           int anonymous = (path == NULL);
267 6           int fd = -1;
268             size_t map_size;
269             void *base;
270              
271 6 100         if (anonymous) {
272 3           map_size = (size_t)total;
273 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
274 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
275             } else {
276 3           fd = open(path, O_RDWR|O_CREAT, 0666);
277 4 50         if (fd < 0) { LOG_ERR("open: %s", strerror(errno)); return NULL; }
    0          
278 3 50         if (flock(fd, LOCK_EX) < 0) { LOG_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
279             struct stat st;
280 3 50         if (fstat(fd, &st) < 0) {
281 0 0         LOG_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
282             }
283 3           int is_new = (st.st_size == 0);
284 3 100         if (!is_new && (uint64_t)st.st_size < sizeof(LogHeader)) {
    50          
285 0 0         LOG_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
286 0           flock(fd, LOCK_UN); close(fd); return NULL;
287             }
288 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
289 0 0         LOG_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
290             }
291 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
292 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
293 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
294 3 100         if (!is_new) {
295 1 50         if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size)) {
296 0 0         LOG_ERR("invalid log file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
297             }
298 1           flock(fd, LOCK_UN); close(fd);
299 1           return log_setup(base, map_size, path, -1);
300             }
301             }
302 5           log_init_header(base, total, data_size);
303 5 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
304 5           return log_setup(base, map_size, path, -1);
305             }
306              
307 7           static LogHandle *log_create_memfd(const char *name, uint64_t data_size, char *errbuf) {
308 7 50         if (errbuf) errbuf[0] = '\0';
309 7 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
310 7 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
311 7           uint64_t total = sizeof(LogHeader) + data_size;
312 7 50         int fd = memfd_create(name ? name : "log", MFD_CLOEXEC | MFD_ALLOW_SEALING);
313 7 50         if (fd < 0) { LOG_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
314 7 50         if (ftruncate(fd, (off_t)total) < 0) { LOG_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
315 7           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
316 7           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
317 7 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
318 7           log_init_header(base, total, data_size);
319 7           return log_setup(base, (size_t)total, NULL, fd);
320             }
321              
322 1           static LogHandle *log_open_fd(int fd, char *errbuf) {
323 1 50         if (errbuf) errbuf[0] = '\0';
324             struct stat st;
325 1 50         if (fstat(fd, &st) < 0) { LOG_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
326 1 50         if ((uint64_t)st.st_size < sizeof(LogHeader)) { LOG_ERR("too small"); return NULL; }
    0          
327 1           size_t ms = (size_t)st.st_size;
328 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
329 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
330 1 50         if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size)) {
331 0 0         LOG_ERR("invalid log"); munmap(base, ms); return NULL;
332             }
333 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
334 1 50         if (myfd < 0) { LOG_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
335 1           return log_setup(base, ms, NULL, myfd);
336             }
337              
338 14           static void log_destroy(LogHandle *h) {
339 14 50         if (!h) return;
340 14 100         if (h->notify_fd >= 0) close(h->notify_fd);
341 14 100         if (h->backing_fd >= 0) close(h->backing_fd);
342 14 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
343 14           free(h->path);
344 14           free(h);
345             }
346              
347             /* NOT concurrency-safe — caller must ensure no concurrent access */
348 7           static void log_reset(LogHandle *h) {
349 7           __atomic_store_n(&h->hdr->truncation, 0, __ATOMIC_RELEASE);
350 7           __atomic_store_n(&h->hdr->tail, 0, __ATOMIC_RELEASE);
351 7           __atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE);
352 7           __atomic_add_fetch(&h->hdr->wake_seq, 1, __ATOMIC_RELEASE);
353 7 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
354 0           syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
355 7           }
356              
357             /* Concurrency-safe truncate: mark entries before offset as invalid.
358             * Does NOT reclaim space — the log is append-only. Readers skip
359             * entries below the truncation offset. */
360 4           static inline void log_truncate(LogHandle *h, uint64_t offset) {
361 0           for (;;) {
362 4           uint64_t cur = __atomic_load_n(&h->hdr->truncation, __ATOMIC_RELAXED);
363 7 100         if (offset <= cur) return; /* can only advance, not retreat */
364 3 50         if (__atomic_compare_exchange_n(&h->hdr->truncation, &cur, offset,
365             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
366 3           return;
367             }
368             }
369              
370 13           static inline uint64_t log_truncation(LogHandle *h) {
371 13           return __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
372             }
373              
374 2           static int log_create_eventfd(LogHandle *h) {
375 2 50         if (h->notify_fd >= 0) return h->notify_fd;
376 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
377 2 50         if (efd < 0) return -1;
378 2           h->notify_fd = efd; return efd;
379             }
380 2           static int log_notify(LogHandle *h) {
381 2 50         if (h->notify_fd < 0) return 0;
382 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
383             }
384 2           static int64_t log_eventfd_consume(LogHandle *h) {
385 2 50         if (h->notify_fd < 0) return -1;
386 2           uint64_t v = 0;
387 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
388 2           return (int64_t)v;
389             }
390 1           static int log_msync(LogHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
391              
392             #endif /* LOG_H */