File Coverage

log.h
Criterion Covered Total %
statement 179 191 93.7
branch 81 172 47.0
condition n/a
subroutine n/a
pod n/a
total 260 363 71.6


line stmt bran cond sub pod time code
1             /*
2             * log.h -- Append-only shared-memory log (WAL) for Linux
3             *
4             * Multiple writers append variable-length entries via CAS on tail offset.
5             * Readers replay from any offset. Entries persist until explicit reset.
6             *
7             * Entry format: [uint32_t length][data bytes] — no alignment padding.
8             * length is written AFTER data, acting as a commit flag (0 = uncommitted).
9             */
10              
11             #ifndef LOG_H
12             #define LOG_H
13              
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29              
30             #define LOG_MAGIC 0x4C4F4731U /* "LOG1" */
31             #define LOG_VERSION 1
32             #define LOG_ERR_BUFLEN 256
33             #define LOG_ENTRY_HDR sizeof(uint32_t)
34              
35             /* ================================================================
36             * Header (128 bytes)
37             * ================================================================ */
38              
39             typedef struct {
40             uint32_t magic;
41             uint32_t version;
42             uint64_t data_size; /* 8: usable data region size */
43             uint64_t total_size; /* 16 */
44             uint64_t data_off; /* 24 */
45             uint8_t _pad0[32]; /* 32-63 */
46              
47             uint64_t tail; /* 64: byte offset past last entry (CAS target) */
48             uint64_t count; /* 72: number of committed entries */
49             uint32_t waiters; /* 80: blocked tailers */
50             uint32_t _pad1; /* 84 */
51             uint64_t stat_appends; /* 88 */
52             uint64_t stat_waits; /* 96 */
53             uint64_t stat_timeouts; /* 104 */
54             uint64_t truncation; /* 112: entries before this offset are invalid */
55             uint8_t _pad2[8]; /* 120-127 */
56             } LogHeader;
57              
58             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
59             _Static_assert(sizeof(LogHeader) == 128, "LogHeader must be 128 bytes");
60             #endif
61              
62             typedef struct {
63             LogHeader *hdr;
64             uint8_t *data;
65             size_t mmap_size;
66             char *path;
67             int notify_fd;
68             int backing_fd;
69             } LogHandle;
70              
71             /* ================================================================
72             * Utility
73             * ================================================================ */
74              
75 2           static inline void log_make_deadline(double t, struct timespec *dl) {
76 2           clock_gettime(CLOCK_MONOTONIC, dl);
77 2           dl->tv_sec += (time_t)t;
78 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
79 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
80 2           }
81              
82 3           static inline int log_remaining(const struct timespec *dl, struct timespec *rem) {
83             struct timespec now;
84 3           clock_gettime(CLOCK_MONOTONIC, &now);
85 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
86 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
87 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
88 3           return rem->tv_sec >= 0;
89             }
90              
91             /* ================================================================
92             * Append — CAS reserve space, then write data, then commit (len)
93             * ================================================================ */
94              
95 57           static inline int64_t log_append(LogHandle *h, const void *data, uint32_t len) {
96 57 50         if (len == 0) return -1; /* 0 is the uncommitted marker */
97              
98 57           LogHeader *hdr = h->hdr;
99 57 50         if (len > UINT32_MAX - LOG_ENTRY_HDR) return -1;
100 57           uint32_t entry_size = LOG_ENTRY_HDR + len;
101              
102 0           for (;;) {
103 57           uint64_t t = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
104 112 100         if (t + entry_size > hdr->data_size) return -1;
105              
106 55 50         if (__atomic_compare_exchange_n(&hdr->tail, &t, t + entry_size,
107             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
108 55           uint8_t *slot = h->data + t;
109             /* write data first */
110 55           memcpy(slot + LOG_ENTRY_HDR, data, len);
111             /* release fence ensures data visible before commit */
112 55           __atomic_thread_fence(__ATOMIC_RELEASE);
113             /* commit: write len (non-zero = committed) */
114 55           memcpy(slot, &len, LOG_ENTRY_HDR);
115              
116 55           __atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE);
117 55           __atomic_add_fetch(&hdr->stat_appends, 1, __ATOMIC_RELAXED);
118              
119 55 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
120 0           syscall(SYS_futex, &hdr->count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
121              
122 55           return (int64_t)t;
123             }
124             }
125             }
126              
127             /* ================================================================
128             * Read — read entry at offset
129             * ================================================================ */
130              
131 225           static inline int log_read(LogHandle *h, uint64_t offset,
132             const uint8_t **out_data, uint32_t *out_len,
133             uint64_t *next_off) {
134 225           uint64_t trunc = __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
135 225 100         if (offset < trunc) return 0; /* truncated */
136 224           uint64_t t = __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
137 224 100         if (offset >= t) return 0;
138 217 50         if (offset + LOG_ENTRY_HDR > h->hdr->data_size) return 0;
139              
140 217           uint8_t *slot = h->data + offset;
141             uint32_t len;
142 217           memcpy(&len, slot, LOG_ENTRY_HDR);
143 217           __atomic_thread_fence(__ATOMIC_ACQUIRE);
144 217 50         if (len == 0) return 0; /* uncommitted */
145              
146 217 50         if (offset + LOG_ENTRY_HDR + len > t) return 0;
147              
148 217           *out_data = slot + LOG_ENTRY_HDR;
149 217           *out_len = len;
150 217           *next_off = offset + LOG_ENTRY_HDR + len;
151 217           return 1;
152             }
153              
154             /* ================================================================
155             * Tail / Wait
156             * ================================================================ */
157              
158 4           static inline uint64_t log_tail_offset(LogHandle *h) {
159 4           return __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
160             }
161              
162 15           static inline uint64_t log_entry_count(LogHandle *h) {
163 15           return __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
164             }
165              
166 1           static inline uint64_t log_data_size(LogHandle *h) {
167 1           return h->hdr->data_size;
168             }
169              
170 2           static inline uint64_t log_available(LogHandle *h) {
171 2           return h->hdr->data_size - __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED);
172             }
173              
174 2           static inline int log_wait(LogHandle *h, uint64_t expected_count, double timeout) {
175 2 50         if (log_entry_count(h) != expected_count) return 1;
176 2 50         if (timeout == 0) return 0;
177              
178             struct timespec dl, rem;
179 2           int has_dl = (timeout > 0);
180 2 50         if (has_dl) log_make_deadline(timeout, &dl);
181 2           __atomic_add_fetch(&h->hdr->stat_waits, 1, __ATOMIC_RELAXED);
182              
183 0           for (;;) {
184 2           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE);
185 2           uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
186 2 50         if (cur == expected_count) {
187 2           struct timespec *pts = NULL;
188 2 50         if (has_dl) {
189 2 50         if (!log_remaining(&dl, &rem)) {
190 0           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
191 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
192 0           return 0;
193             }
194 2           pts = &rem;
195             }
196 2           syscall(SYS_futex, &h->hdr->count, FUTEX_WAIT,
197             (uint32_t)(cur & 0xFFFFFFFF), pts, NULL, 0);
198             }
199 2           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
200 2 100         if (log_entry_count(h) != expected_count) return 1;
201 1 50         if (has_dl && !log_remaining(&dl, &rem)) {
    50          
202 1           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
203 1           return 0;
204             }
205             }
206             }
207              
208             /* ================================================================
209             * Create / Open / Close
210             * ================================================================ */
211              
212             #define LOG_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, LOG_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
213              
214 5           static inline void log_init_header(void *base, uint64_t total, uint64_t data_size) {
215 5           LogHeader *hdr = (LogHeader *)base;
216 5           memset(base, 0, (size_t)total);
217 5           hdr->magic = LOG_MAGIC;
218 5           hdr->version = LOG_VERSION;
219 5           hdr->data_size = data_size;
220 5           hdr->total_size = total;
221 5           hdr->data_off = sizeof(LogHeader);
222 5           __atomic_thread_fence(__ATOMIC_SEQ_CST);
223 5           }
224              
225 7           static inline LogHandle *log_setup(void *base, size_t ms, const char *path, int bfd) {
226 7           LogHeader *hdr = (LogHeader *)base;
227 7           LogHandle *h = (LogHandle *)calloc(1, sizeof(LogHandle));
228 7 50         if (!h) { munmap(base, ms); return NULL; }
229 7           h->hdr = hdr;
230 7           h->data = (uint8_t *)base + hdr->data_off;
231 7           h->mmap_size = ms;
232 7 100         h->path = path ? strdup(path) : NULL;
233 7           h->notify_fd = -1;
234 7           h->backing_fd = bfd;
235 7           return h;
236             }
237              
238 5           static LogHandle *log_create(const char *path, uint64_t data_size, char *errbuf) {
239 5 50         if (errbuf) errbuf[0] = '\0';
240 5 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
241 5 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
242              
243 5           uint64_t total = sizeof(LogHeader) + data_size;
244 5           int anonymous = (path == NULL);
245 5           int fd = -1;
246             size_t map_size;
247             void *base;
248              
249 5 100         if (anonymous) {
250 2           map_size = (size_t)total;
251 2           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
252 2 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
253             } else {
254 3           fd = open(path, O_RDWR|O_CREAT, 0666);
255 4 50         if (fd < 0) { LOG_ERR("open: %s", strerror(errno)); return NULL; }
    0          
256 3 50         if (flock(fd, LOCK_EX) < 0) { LOG_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
257             struct stat st;
258 3 50         if (fstat(fd, &st) < 0) {
259 0 0         LOG_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
260             }
261 3           int is_new = (st.st_size == 0);
262 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
263 0 0         LOG_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
264             }
265 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
266 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
267 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
268 3 100         if (!is_new) {
269 1           LogHeader *hdr = (LogHeader *)base;
270 1 50         if (hdr->magic != LOG_MAGIC || hdr->version != LOG_VERSION ||
    50          
271 1 50         hdr->total_size != (uint64_t)st.st_size) {
272 0 0         LOG_ERR("invalid log file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
273             }
274 1           flock(fd, LOCK_UN); close(fd);
275 1           return log_setup(base, map_size, path, -1);
276             }
277             }
278 4           log_init_header(base, total, data_size);
279 4 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
280 4           return log_setup(base, map_size, path, -1);
281             }
282              
283 1           static LogHandle *log_create_memfd(const char *name, uint64_t data_size, char *errbuf) {
284 1 50         if (errbuf) errbuf[0] = '\0';
285 1 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
286 1 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
287 1           uint64_t total = sizeof(LogHeader) + data_size;
288 1 50         int fd = memfd_create(name ? name : "log", MFD_CLOEXEC);
289 1 50         if (fd < 0) { LOG_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
290 1 50         if (ftruncate(fd, (off_t)total) < 0) { LOG_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
291 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
292 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
293 1           log_init_header(base, total, data_size);
294 1           return log_setup(base, (size_t)total, NULL, fd);
295             }
296              
297 1           static LogHandle *log_open_fd(int fd, char *errbuf) {
298 1 50         if (errbuf) errbuf[0] = '\0';
299             struct stat st;
300 1 50         if (fstat(fd, &st) < 0) { LOG_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
301 1 50         if ((uint64_t)st.st_size < sizeof(LogHeader)) { LOG_ERR("too small"); return NULL; }
    0          
302 1           size_t ms = (size_t)st.st_size;
303 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
304 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
305 1           LogHeader *hdr = (LogHeader *)base;
306 1 50         if (hdr->magic != LOG_MAGIC || hdr->version != LOG_VERSION ||
    50          
307 1 50         hdr->total_size != (uint64_t)st.st_size) {
308 0 0         LOG_ERR("invalid log"); munmap(base, ms); return NULL;
309             }
310 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
311 1 50         if (myfd < 0) { LOG_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
312 1           return log_setup(base, ms, NULL, myfd);
313             }
314              
315 7           static void log_destroy(LogHandle *h) {
316 7 50         if (!h) return;
317 7 100         if (h->notify_fd >= 0) close(h->notify_fd);
318 7 100         if (h->backing_fd >= 0) close(h->backing_fd);
319 7 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
320 7           free(h->path);
321 7           free(h);
322             }
323              
324             /* NOT concurrency-safe — caller must ensure no concurrent access */
325 7           static void log_reset(LogHandle *h) {
326 7           __atomic_store_n(&h->hdr->truncation, 0, __ATOMIC_RELEASE);
327 7           __atomic_store_n(&h->hdr->tail, 0, __ATOMIC_RELEASE);
328 7           __atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE);
329 7 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
330 0           syscall(SYS_futex, &h->hdr->count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
331 7           }
332              
333             /* Concurrency-safe truncate: mark entries before offset as invalid.
334             * Does NOT reclaim space — the log is append-only. Readers skip
335             * entries below the truncation offset. */
336 3           static inline void log_truncate(LogHandle *h, uint64_t offset) {
337 0           for (;;) {
338 3           uint64_t cur = __atomic_load_n(&h->hdr->truncation, __ATOMIC_RELAXED);
339 5 100         if (offset <= cur) return; /* can only advance, not retreat */
340 2 50         if (__atomic_compare_exchange_n(&h->hdr->truncation, &cur, offset,
341             1, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
342 2           return;
343             }
344             }
345              
346 10           static inline uint64_t log_truncation(LogHandle *h) {
347 10           return __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE);
348             }
349              
350 1           static int log_create_eventfd(LogHandle *h) {
351 1 50         if (h->notify_fd >= 0) close(h->notify_fd);
352 1           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
353 1 50         if (efd < 0) return -1;
354 1           h->notify_fd = efd; return efd;
355             }
356 1           static int log_notify(LogHandle *h) {
357 1 50         if (h->notify_fd < 0) return 0;
358 1           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
359             }
360 1           static int64_t log_eventfd_consume(LogHandle *h) {
361 1 50         if (h->notify_fd < 0) return -1;
362 1           uint64_t v = 0;
363 1 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
364 1           return (int64_t)v;
365             }
366 1           static void log_msync(LogHandle *h) { msync(h->hdr, h->mmap_size, MS_SYNC); }
367              
368             #endif /* LOG_H */