File Coverage

log.h
Criterion Covered Total %
statement 200 237 84.3
branch 90 220 40.9
condition n/a
subroutine n/a
pod n/a
total 290 457 63.4


line stmt bran cond sub pod time code
1             /*
2             * log.h -- Append-only shared-memory log (WAL) for Linux
3             *
4             * Multiple writers append variable-length entries via CAS on tail offset.
5             * Readers replay from any offset. Entries persist until explicit reset.
6             *
7             * Entry format: [uint32_t reserve_size][uint32_t length][data][padding to 4B].
8             * reserve_size = total slot size (header + data + padding); written by
9             * the writer immediately after CAS-reserving the slot, before any
10             * data is copied. Lets readers locate the next slot even if the
11             * writer crashes before committing len.
12             * length = data length; written AFTER data via RELEASE store as a
13             * commit flag (0 = uncommitted/abandoned).
14             *
15             * Crash recovery: if a writer dies after CAS but before committing len,
16             * the slot has reserve_size > 0 and len == 0. log_read_ex returns
17             * LOG_READ_ABANDONED for such slots (no data, next_off advances by
18             * reserve_size), letting readers skip past gaps. A bounded spin gives
19             * in-flight writers time to commit before declaring abandonment.
20             *
21             * The narrow window between CAS-reserve and the reserve_size store
22             * (typically a few CPU instructions) is NOT recoverable: if a writer
23             * dies there, the slot's size is unknown and readers cannot skip past
24             * it. Callers must reset/recreate the log. This is extremely rare in
25             * practice given the small instruction window.
26             *
27             * Padding ensures the next slot header is 4-byte aligned — required
28             * for atomic load/store on strict-alignment ISAs (ARM64 LDAR/STLR
29             * trap on unaligned addresses with SIGBUS).
30             */
31              
32             #ifndef LOG_H
33             #define LOG_H
34              
35             #include
36             #include
37             #include
38             #include
39             #include
40             #include
41             #include
42             #include
43             #include
44             #include
45             #include
46             #include
47             #include
48             #include
49             #include
50              
51             #define LOG_MAGIC 0x4C4F4732U /* "LOG2" — v2 entry format with reserve_size */
52             #define LOG_VERSION 2
53             #define LOG_ERR_BUFLEN 256
54             /* Slot header: reserve_size (u32) + len (u32) = 8 bytes. */
55             #define LOG_ENTRY_HDR (2 * sizeof(uint32_t))
56             /* Default bounded-spin (microseconds) for an in-flight writer to commit
57             * len before a reader declares the slot abandoned. Override per call via
58             * log_read_ex; this affects only the rare crash-recovery path. */
59             #ifndef LOG_ABANDON_WAIT_US
60             #define LOG_ABANDON_WAIT_US 2000000 /* 2s */
61             #endif
62              
63             /* log_read / log_read_ex return codes */
64             #define LOG_READ_EMPTY 0 /* no entry at offset (end / uncommitted in flight) */
65             #define LOG_READ_OK 1 /* valid entry — out_data, out_len, next_off set */
66             #define LOG_READ_ABANDONED 2 /* slot abandoned — next_off set, no data */
67              
68             /* ================================================================
69             * Header (128 bytes)
70             * ================================================================ */
71              
72             typedef struct {
73             uint32_t magic;
74             uint32_t version;
75             uint64_t data_size; /* 8: usable data region size */
76             uint64_t total_size; /* 16 */
77             uint64_t data_off; /* 24 */
78             uint8_t _pad0[32]; /* 32-63 */
79              
80             uint64_t tail; /* 64: byte offset past last entry (CAS target) */
81             uint64_t count; /* 72: number of committed entries */
82             uint32_t waiters; /* 80: blocked tailers */
83             uint32_t wake_seq; /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */
84             uint64_t stat_appends; /* 88 */
85             uint64_t stat_waits; /* 96 */
86             uint64_t stat_timeouts; /* 104 */
87             uint64_t truncation; /* 112: entries before this offset are invalid */
88             uint8_t _pad2[8]; /* 120-127 */
89             } LogHeader;
90              
91             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
92             _Static_assert(sizeof(LogHeader) == 128, "LogHeader must be 128 bytes");
93             #endif
94              
95             typedef struct {
96             LogHeader *hdr;
97             uint8_t *data;
98             size_t mmap_size;
99             char *path;
100             int notify_fd;
101             int backing_fd;
102             } LogHandle;
103              
104             /* ================================================================
105             * Utility
106             * ================================================================ */
107              
108 2           static inline void log_make_deadline(double t, struct timespec *dl) {
109 2           clock_gettime(CLOCK_MONOTONIC, dl);
110 2           dl->tv_sec += (time_t)t;
111 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
112 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
113 2           }
114              
115 3           static inline int log_remaining(const struct timespec *dl, struct timespec *rem) {
116             struct timespec now;
117 3           clock_gettime(CLOCK_MONOTONIC, &now);
118 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
119 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
120 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
121 3           return rem->tv_sec >= 0;
122             }
123              
124             /* ================================================================
125             * Append — CAS reserve space, publish reserve_size, write data,
126             * commit (len). reserve_size is published BEFORE data so a crashed
127             * writer leaves a recoverable slot boundary for readers.
128             * ================================================================ */
129              
130 113           static inline int64_t log_append(LogHandle *h, const void *data, uint32_t len) {
131 113 50         if (len == 0) return -1; /* 0 is the uncommitted marker */
132              
133 113           LogHeader *hdr = h->hdr;
134 113 50         if (len > UINT32_MAX - LOG_ENTRY_HDR - 3U) return -1;
135             /* Pad total slot size up to 4-byte boundary so the next slot's
136             * header words are naturally aligned for atomic ops on ARM64. */
137 113           uint32_t entry_size = (LOG_ENTRY_HDR + len + 3U) & ~3U;
138              
139 0           for (;;) {
140 113           uint64_t t = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
141 222 100         if (t + entry_size > hdr->data_size) return -1;
142              
143 109 50         if (__atomic_compare_exchange_n(&hdr->tail, &t, t + entry_size,
144             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
145 109           uint8_t *slot = h->data + t;
146             /* Explicitly zero len before publishing reserve_size. In the
147             * common case the slot is already zero (fresh log_init or
148             * post-reset memset), but this defensive RELAXED store
149             * tolerates user code that bypasses log_reset's zeroing or
150             * concurrent edge cases. Release fence comes via the
151             * subsequent reserve_size store. */
152 109           __atomic_store_n((uint32_t *)(slot + sizeof(uint32_t)), 0U, __ATOMIC_RELAXED);
153             /* Publish slot boundary so readers can skip past us on crash.
154             * RELEASE so any reader observing reserve_size > 0 also sees
155             * the len=0 store above and correctly classifies the slot. */
156 109           __atomic_store_n((uint32_t *)slot, entry_size, __ATOMIC_RELEASE);
157             /* Write payload. */
158 109           memcpy(slot + LOG_ENTRY_HDR, data, len);
159             /* Commit: RELEASE store of len publishes the data. */
160 109           __atomic_store_n((uint32_t *)(slot + sizeof(uint32_t)), len, __ATOMIC_RELEASE);
161              
162 109           __atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE);
163 109           __atomic_add_fetch(&hdr->stat_appends, 1, __ATOMIC_RELAXED);
164 109           __atomic_add_fetch(&hdr->wake_seq, 1, __ATOMIC_RELEASE);
165              
166 109 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
167 0           syscall(SYS_futex, &hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
168              
169 109           return (int64_t)t;
170             }
171             }
172             }
173              
174             /* ================================================================
175             * Read — read entry at offset.
176             *
177             * Returns:
178             * LOG_READ_OK — entry found; out_data/out_len/next_off set.
179             * LOG_READ_ABANDONED — slot was reserved but never committed (writer
180             * crashed mid-append); next_off advanced past it,
181             * no out_data. abandon_wait_us caps the wait for
182             * an in-flight writer before declaring abandonment.
183             * LOG_READ_EMPTY — no entry (past tail, truncated, or in-flight
184             * writer not yet timed out).
185             * ================================================================ */
186              
187 240           static inline int log_read_ex(LogHandle *h, uint64_t offset,
188             const uint8_t **out_data, uint32_t *out_len,
189             uint64_t *next_off,
190             uint64_t abandon_wait_us) {
191 240           uint64_t trunc = __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
192 240 100         if (offset < trunc) return LOG_READ_EMPTY; /* truncated */
193 239           uint64_t t = __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
194 239 100         if (offset >= t) return LOG_READ_EMPTY;
195 229 50         if (offset + LOG_ENTRY_HDR > h->hdr->data_size) return LOG_READ_EMPTY;
196              
197 229           uint8_t *slot = h->data + offset;
198             /* Load reserve_size FIRST. ACQUIRE pairs with the writer's RELEASE-
199             * store of reserve_size, which happens-after the RELAXED store of
200             * len=0. So observing reserve_size > 0 guarantees the writer's
201             * len=0 is also visible — we can then trust the subsequent len-load.
202             *
203             * This is the "happens-before chain": reserve_size release -> reader
204             * acquire -> len load. Without it, a freshly-CAS-reserved slot could
205             * race with a reader observing a stale len from an earlier epoch
206             * before the writer has had a chance to overwrite it. */
207 229           uint64_t waited = 0;
208 229           const uint64_t step_us = 1000; /* 1ms */
209 229           uint32_t reserve_size = 0;
210 0           for (;;) {
211 229           reserve_size = __atomic_load_n((const uint32_t *)slot, __ATOMIC_ACQUIRE);
212 229 50         if (reserve_size > 0) {
213 229           uint32_t len = __atomic_load_n(
214 229           (const uint32_t *)(slot + sizeof(uint32_t)), __ATOMIC_ACQUIRE);
215 229 50         if (len > 0) {
216 458 50         if (offset + LOG_ENTRY_HDR + len > t) return LOG_READ_EMPTY;
217 229           *out_data = slot + LOG_ENTRY_HDR;
218 229           *out_len = len;
219 229           *next_off = offset + (((uint64_t)LOG_ENTRY_HDR + len + 3U) & ~(uint64_t)3U);
220 229           return LOG_READ_OK;
221             }
222             }
223 0 0         if (waited >= abandon_wait_us) break;
224 0           struct timespec ts = { 0, (long)(step_us * 1000) };
225 0           nanosleep(&ts, NULL);
226 0           waited += step_us;
227             }
228              
229             /* Wait elapsed. Two outcomes:
230             * reserve_size == 0: writer either hasn't published it yet (very
231             * narrow window between CAS and the reserve_size store) or
232             * crashed before publishing. We can't determine slot length so
233             * signal EMPTY — caller must retry or give up.
234             * reserve_size > 0, len == 0: writer crashed after publishing the
235             * boundary but before committing data. Skip via reserve_size. */
236 0 0         if (reserve_size == 0) return LOG_READ_EMPTY;
237             /* Sanity-check reserve_size: must be aligned, larger than the header
238             * (len=0 is rejected by the writer, so a valid slot has reserve_size
239             * > LOG_ENTRY_HDR), and within both the data region and committed tail. */
240 0 0         if (reserve_size <= LOG_ENTRY_HDR
241 0 0         || (reserve_size & 3U) != 0
242 0 0         || offset + reserve_size > h->hdr->data_size
243 0 0         || offset + reserve_size > t)
244 0           return LOG_READ_EMPTY;
245              
246 0           *next_off = offset + reserve_size;
247 0           return LOG_READ_ABANDONED;
248             }
249              
250             /* Backwards-compatible read: returns 1 for OK, 0 for EMPTY or ABANDONED.
251             * Use log_read_ex if you need to distinguish abandoned slots. */
252             static inline int log_read(LogHandle *h, uint64_t offset,
253             const uint8_t **out_data, uint32_t *out_len,
254             uint64_t *next_off) {
255             int rc = log_read_ex(h, offset, out_data, out_len, next_off,
256             LOG_ABANDON_WAIT_US);
257             return rc == LOG_READ_OK ? 1 : 0;
258             }
259              
260             /* ================================================================
261             * Tail / Wait
262             * ================================================================ */
263              
264 4           static inline uint64_t log_tail_offset(LogHandle *h) {
265 4           return __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
266             }
267              
268 19           static inline uint64_t log_entry_count(LogHandle *h) {
269 19           return __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
270             }
271              
272 1           static inline uint64_t log_data_size(LogHandle *h) {
273 1           return h->hdr->data_size;
274             }
275              
276 2           static inline uint64_t log_available(LogHandle *h) {
277 2           return h->hdr->data_size - __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED);
278             }
279              
280 2           static inline int log_wait(LogHandle *h, uint64_t expected_count, double timeout) {
281 2 50         if (log_entry_count(h) != expected_count) return 1;
282 2 50         if (timeout == 0) return 0;
283              
284             struct timespec dl, rem;
285 2           int has_dl = (timeout > 0);
286 2 50         if (has_dl) log_make_deadline(timeout, &dl);
287 2           __atomic_add_fetch(&h->hdr->stat_waits, 1, __ATOMIC_RELAXED);
288              
289 0           for (;;) {
290 2           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE);
291 2           uint32_t seq = __atomic_load_n(&h->hdr->wake_seq, __ATOMIC_ACQUIRE);
292 2           uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
293 2 50         if (cur == expected_count) {
294 2           struct timespec *pts = NULL;
295 2 50         if (has_dl) {
296 2 50         if (!log_remaining(&dl, &rem)) {
297 0           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
298 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
299 0           return 0;
300             }
301 2           pts = &rem;
302             }
303 2           syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAIT, seq, pts, NULL, 0);
304             }
305 2           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
306 2 100         if (log_entry_count(h) != expected_count) return 1;
307 1 50         if (has_dl && !log_remaining(&dl, &rem)) {
    50          
308 1           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
309 1           return 0;
310             }
311             }
312             }
313              
314             /* ================================================================
315             * Create / Open / Close
316             * ================================================================ */
317              
318             #define LOG_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, LOG_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
319              
320 12           static inline void log_init_header(void *base, uint64_t total, uint64_t data_size) {
321 12           LogHeader *hdr = (LogHeader *)base;
322 12           memset(base, 0, (size_t)total);
323 12           hdr->magic = LOG_MAGIC;
324 12           hdr->version = LOG_VERSION;
325 12           hdr->data_size = data_size;
326 12           hdr->total_size = total;
327 12           hdr->data_off = sizeof(LogHeader);
328 12           __atomic_thread_fence(__ATOMIC_SEQ_CST);
329 12           }
330              
331             /* Validate a mapped header (shared by log_create reopen and log_open_fd).
332             * On failure, writes a diagnostic to errbuf (if non-NULL). */
333 2           static inline int log_validate_header(const LogHeader *hdr, uint64_t file_size,
334             char *errbuf) {
335 2 50         if (hdr->magic != LOG_MAGIC) {
336             /* Recognize a v1 log specifically so users get a clear migration hint. */
337 0 0         if (hdr->magic == 0x4C4F4731U) {
338 0 0         LOG_ERR("v1 log file (LOG1) not compatible with v0.04+ (LOG2): recreate");
339             } else {
340 0 0         LOG_ERR("bad magic: 0x%08x (expected 0x%08x)",
341             (unsigned)hdr->magic, (unsigned)LOG_MAGIC);
342             }
343 0           return 0;
344             }
345 2 50         if (hdr->version != LOG_VERSION) {
346 0 0         LOG_ERR("version mismatch: %u (expected %u)",
347             (unsigned)hdr->version, (unsigned)LOG_VERSION);
348 0           return 0;
349             }
350 2 50         if (hdr->data_size == 0) { LOG_ERR("data_size = 0"); return 0; }
    0          
351 2 50         if (hdr->data_size > UINT64_MAX - sizeof(LogHeader)) {
352 0 0         LOG_ERR("data_size too large"); return 0;
353             }
354 2 50         if (hdr->total_size != file_size) {
355 0 0         LOG_ERR("total_size mismatch"); return 0;
356             }
357 2 50         if (hdr->data_off != sizeof(LogHeader)) {
358 0 0         LOG_ERR("data_off mismatch"); return 0;
359             }
360 2 50         if (hdr->total_size != sizeof(LogHeader) + hdr->data_size) {
361 0 0         LOG_ERR("size invariant broken"); return 0;
362             }
363             /* Runtime-state sanity: tail and truncation must not exceed data_size. */
364 2 50         if (hdr->tail > hdr->data_size) { LOG_ERR("tail > data_size"); return 0; }
    0          
365 2 50         if (hdr->truncation > hdr->data_size) { LOG_ERR("truncation > data_size"); return 0; }
    0          
366 2           return 1;
367             }
368              
369 14           static inline LogHandle *log_setup(void *base, size_t ms, const char *path, int bfd) {
370 14           LogHeader *hdr = (LogHeader *)base;
371 14           LogHandle *h = (LogHandle *)calloc(1, sizeof(LogHandle));
372 14 50         if (!h) { munmap(base, ms); return NULL; }
373 14           h->hdr = hdr;
374 14           h->data = (uint8_t *)base + hdr->data_off;
375 14           h->mmap_size = ms;
376 14 100         h->path = path ? strdup(path) : NULL;
377 14           h->notify_fd = -1;
378 14           h->backing_fd = bfd;
379             /* Log is append-only: hint sequential access so the kernel prefetcher
380             * reads ahead on large scans (read_entry, each_entry). Best-effort. */
381 14           (void)madvise(base, ms, MADV_SEQUENTIAL);
382 14           return h;
383             }
384              
385 6           static LogHandle *log_create(const char *path, uint64_t data_size, char *errbuf) {
386 6 50         if (errbuf) errbuf[0] = '\0';
387 6 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
388 6 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
389              
390 6           uint64_t total = sizeof(LogHeader) + data_size;
391 6           int anonymous = (path == NULL);
392 6           int fd = -1;
393             size_t map_size;
394             void *base;
395              
396 6 100         if (anonymous) {
397 3           map_size = (size_t)total;
398 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
399 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
400             } else {
401 3           fd = open(path, O_RDWR|O_CREAT, 0666);
402 4 50         if (fd < 0) { LOG_ERR("open: %s", strerror(errno)); return NULL; }
    0          
403 3 50         if (flock(fd, LOCK_EX) < 0) { LOG_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
404             struct stat st;
405 3 50         if (fstat(fd, &st) < 0) {
406 0 0         LOG_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
407             }
408 3           int is_new = (st.st_size == 0);
409 3 100         if (!is_new && (uint64_t)st.st_size < sizeof(LogHeader)) {
    50          
410 0 0         LOG_ERR("%s: file too small (%lld)", path, (long long)st.st_size);
411 0           flock(fd, LOCK_UN); close(fd); return NULL;
412             }
413 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
414 0 0         LOG_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
415             }
416 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
417 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
418 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
419 3 100         if (!is_new) {
420 1 50         if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size, errbuf)) {
421 0           munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
422             }
423 1           flock(fd, LOCK_UN); close(fd);
424 1           return log_setup(base, map_size, path, -1);
425             }
426             }
427 5           log_init_header(base, total, data_size);
428 5 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
429 5           return log_setup(base, map_size, path, -1);
430             }
431              
432 7           static LogHandle *log_create_memfd(const char *name, uint64_t data_size, char *errbuf) {
433 7 50         if (errbuf) errbuf[0] = '\0';
434 7 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
435 7 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
436 7           uint64_t total = sizeof(LogHeader) + data_size;
437 7 50         int fd = memfd_create(name ? name : "log", MFD_CLOEXEC | MFD_ALLOW_SEALING);
438 7 50         if (fd < 0) { LOG_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
439 7 50         if (ftruncate(fd, (off_t)total) < 0) { LOG_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
440 7           (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
441 7           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
442 7 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
443 7           log_init_header(base, total, data_size);
444 7           return log_setup(base, (size_t)total, NULL, fd);
445             }
446              
447 1           static LogHandle *log_open_fd(int fd, char *errbuf) {
448 1 50         if (errbuf) errbuf[0] = '\0';
449             struct stat st;
450 1 50         if (fstat(fd, &st) < 0) { LOG_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
451 1 50         if ((uint64_t)st.st_size < sizeof(LogHeader)) { LOG_ERR("too small"); return NULL; }
    0          
452 1           size_t ms = (size_t)st.st_size;
453 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
454 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
455 1 50         if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size, errbuf)) {
456 0           munmap(base, ms); return NULL;
457             }
458 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
459 1 50         if (myfd < 0) { LOG_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
460 1           return log_setup(base, ms, NULL, myfd);
461             }
462              
463 14           static void log_destroy(LogHandle *h) {
464 14 50         if (!h) return;
465 14 100         if (h->notify_fd >= 0) close(h->notify_fd);
466 14 100         if (h->backing_fd >= 0) close(h->backing_fd);
467 14 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
468 14           free(h->path);
469 14           free(h);
470             }
471              
472             /* NOT concurrency-safe — caller must ensure no concurrent access.
473             *
474             * Zeros the data region before rewinding tail. This is required for
475             * correctness: a stale slot at offset 0 could otherwise leak old
476             * data to readers that see the new tail before the new writer
477             * publishes a fresh reserve_size. The header SEQ_CST fence below
478             * publishes the zeroed region to all subsequent loads. */
479 7           static void log_reset(LogHandle *h) {
480 7           memset(h->data, 0, (size_t)h->hdr->data_size);
481 7           __atomic_thread_fence(__ATOMIC_SEQ_CST);
482 7           __atomic_store_n(&h->hdr->truncation, 0, __ATOMIC_RELEASE);
483 7           __atomic_store_n(&h->hdr->tail, 0, __ATOMIC_RELEASE);
484 7           __atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE);
485 7           __atomic_add_fetch(&h->hdr->wake_seq, 1, __ATOMIC_RELEASE);
486 7 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
487 0           syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
488 7           }
489              
490             /* Concurrency-safe truncate: mark entries before offset as invalid.
491             * Does NOT reclaim space — the log is append-only. Readers skip
492             * entries below the truncation offset. */
493 4           static inline void log_truncate(LogHandle *h, uint64_t offset) {
494 0           for (;;) {
495 4           uint64_t cur = __atomic_load_n(&h->hdr->truncation, __ATOMIC_RELAXED);
496 7 100         if (offset <= cur) return; /* can only advance, not retreat */
497 3 50         if (__atomic_compare_exchange_n(&h->hdr->truncation, &cur, offset,
498             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
499 3           return;
500             }
501             }
502              
503 13           static inline uint64_t log_truncation(LogHandle *h) {
504 13           return __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
505             }
506              
507 2           static int log_create_eventfd(LogHandle *h) {
508 2 50         if (h->notify_fd >= 0) return h->notify_fd;
509 2           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
510 2 50         if (efd < 0) return -1;
511 2           h->notify_fd = efd; return efd;
512             }
513 2           static int log_notify(LogHandle *h) {
514 2 50         if (h->notify_fd < 0) return 0;
515 2           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
516             }
517 2           static int64_t log_eventfd_consume(LogHandle *h) {
518 2 50         if (h->notify_fd < 0) return -1;
519 2           uint64_t v = 0;
520 2 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
521 2           return (int64_t)v;
522             }
523 1           static int log_msync(LogHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); }
524              
525             #endif /* LOG_H */