File Coverage

log.h
Criterion Covered Total %
statement 169 180 93.8
branch 76 166 45.7
condition n/a
subroutine n/a
pod n/a
total 245 346 70.8


line stmt bran cond sub pod time code
1             /*
2             * log.h -- Append-only shared-memory log (WAL) for Linux
3             *
4             * Multiple writers append variable-length entries via CAS on tail offset.
5             * Readers replay from any offset. Entries persist until explicit reset.
6             *
7             * Entry format: [uint32_t length][data bytes] — no alignment padding.
8             * length is written AFTER data, acting as a commit flag (0 = uncommitted).
9             */
10              
11             #ifndef LOG_H
12             #define LOG_H
13              
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29              
30             #define LOG_MAGIC 0x4C4F4731U /* "LOG1" */
31             #define LOG_VERSION 1
32             #define LOG_ERR_BUFLEN 256
33             #define LOG_ENTRY_HDR sizeof(uint32_t)
34              
35             /* ================================================================
36             * Header (128 bytes)
37             * ================================================================ */
38              
39             typedef struct {
40             uint32_t magic;
41             uint32_t version;
42             uint64_t data_size; /* 8: usable data region size */
43             uint64_t total_size; /* 16 */
44             uint64_t data_off; /* 24 */
45             uint8_t _pad0[32]; /* 32-63 */
46              
47             uint64_t tail; /* 64: byte offset past last entry (CAS target) */
48             uint64_t count; /* 72: number of committed entries */
49             uint32_t waiters; /* 80: blocked tailers */
50             uint32_t _pad1; /* 84 */
51             uint64_t stat_appends; /* 88 */
52             uint64_t stat_waits; /* 96 */
53             uint64_t stat_timeouts; /* 104 */
54             uint8_t _pad2[16]; /* 112-127 */
55             } LogHeader;
56              
57             #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
58             _Static_assert(sizeof(LogHeader) == 128, "LogHeader must be 128 bytes");
59             #endif
60              
61             typedef struct {
62             LogHeader *hdr;
63             uint8_t *data;
64             size_t mmap_size;
65             char *path;
66             int notify_fd;
67             int backing_fd;
68             } LogHandle;
69              
70             /* ================================================================
71             * Utility
72             * ================================================================ */
73              
74 2           static inline void log_make_deadline(double t, struct timespec *dl) {
75 2           clock_gettime(CLOCK_MONOTONIC, dl);
76 2           dl->tv_sec += (time_t)t;
77 2           dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9);
78 2 50         if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; }
79 2           }
80              
81 3           static inline int log_remaining(const struct timespec *dl, struct timespec *rem) {
82             struct timespec now;
83 3           clock_gettime(CLOCK_MONOTONIC, &now);
84 3           rem->tv_sec = dl->tv_sec - now.tv_sec;
85 3           rem->tv_nsec = dl->tv_nsec - now.tv_nsec;
86 3 100         if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; }
87 3           return rem->tv_sec >= 0;
88             }
89              
90             /* ================================================================
91             * Append — CAS reserve space, then write data, then commit (len)
92             * ================================================================ */
93              
94 50           static inline int64_t log_append(LogHandle *h, const void *data, uint32_t len) {
95 50 50         if (len == 0) return -1; /* 0 is the uncommitted marker */
96              
97 50           LogHeader *hdr = h->hdr;
98 50 50         if (len > UINT32_MAX - LOG_ENTRY_HDR) return -1;
99 50           uint32_t entry_size = LOG_ENTRY_HDR + len;
100              
101 0           for (;;) {
102 50           uint64_t t = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
103 98 100         if (t + entry_size > hdr->data_size) return -1;
104              
105 48 50         if (__atomic_compare_exchange_n(&hdr->tail, &t, t + entry_size,
106             1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
107 48           uint8_t *slot = h->data + t;
108             /* write data first */
109 48           memcpy(slot + LOG_ENTRY_HDR, data, len);
110             /* release fence ensures data visible before commit */
111 48           __atomic_thread_fence(__ATOMIC_RELEASE);
112             /* commit: write len (non-zero = committed) */
113 48           memcpy(slot, &len, LOG_ENTRY_HDR);
114              
115 48           __atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE);
116 48           __atomic_add_fetch(&hdr->stat_appends, 1, __ATOMIC_RELAXED);
117              
118 48 50         if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
119 0           syscall(SYS_futex, &hdr->count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
120              
121 48           return (int64_t)t;
122             }
123             }
124             }
125              
126             /* ================================================================
127             * Read — read entry at offset
128             * ================================================================ */
129              
130 215           static inline int log_read(LogHandle *h, uint64_t offset,
131             const uint8_t **out_data, uint32_t *out_len,
132             uint64_t *next_off) {
133 215           uint64_t t = __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
134 215 100         if (offset >= t) return 0;
135 211 50         if (offset + LOG_ENTRY_HDR > h->hdr->data_size) return 0;
136              
137 211           uint8_t *slot = h->data + offset;
138             uint32_t len;
139 211           memcpy(&len, slot, LOG_ENTRY_HDR);
140 211           __atomic_thread_fence(__ATOMIC_ACQUIRE);
141 211 50         if (len == 0) return 0; /* uncommitted */
142              
143 211 50         if (offset + LOG_ENTRY_HDR + len > t) return 0;
144              
145 211           *out_data = slot + LOG_ENTRY_HDR;
146 211           *out_len = len;
147 211           *next_off = offset + LOG_ENTRY_HDR + len;
148 211           return 1;
149             }
150              
151             /* ================================================================
152             * Tail / Wait
153             * ================================================================ */
154              
155 3           static inline uint64_t log_tail_offset(LogHandle *h) {
156 3           return __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE);
157             }
158              
159 13           static inline uint64_t log_entry_count(LogHandle *h) {
160 13           return __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
161             }
162              
163 1           static inline uint64_t log_data_size(LogHandle *h) {
164 1           return h->hdr->data_size;
165             }
166              
167 2           static inline uint64_t log_available(LogHandle *h) {
168 2           return h->hdr->data_size - __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED);
169             }
170              
171 2           static inline int log_wait(LogHandle *h, uint64_t expected_count, double timeout) {
172 2 50         if (log_entry_count(h) != expected_count) return 1;
173 2 50         if (timeout == 0) return 0;
174              
175             struct timespec dl, rem;
176 2           int has_dl = (timeout > 0);
177 2 50         if (has_dl) log_make_deadline(timeout, &dl);
178 2           __atomic_add_fetch(&h->hdr->stat_waits, 1, __ATOMIC_RELAXED);
179              
180 0           for (;;) {
181 2           __atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE);
182 2           uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE);
183 2 50         if (cur == expected_count) {
184 2           struct timespec *pts = NULL;
185 2 50         if (has_dl) {
186 2 50         if (!log_remaining(&dl, &rem)) {
187 0           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
188 0           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
189 0           return 0;
190             }
191 2           pts = &rem;
192             }
193 2           syscall(SYS_futex, &h->hdr->count, FUTEX_WAIT,
194             (uint32_t)(cur & 0xFFFFFFFF), pts, NULL, 0);
195             }
196 2           __atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED);
197 2 100         if (log_entry_count(h) != expected_count) return 1;
198 1 50         if (has_dl && !log_remaining(&dl, &rem)) {
    50          
199 1           __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
200 1           return 0;
201             }
202             }
203             }
204              
205             /* ================================================================
206             * Create / Open / Close
207             * ================================================================ */
208              
209             #define LOG_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, LOG_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
210              
211 5           static inline void log_init_header(void *base, uint64_t total, uint64_t data_size) {
212 5           LogHeader *hdr = (LogHeader *)base;
213 5           memset(base, 0, (size_t)total);
214 5           hdr->magic = LOG_MAGIC;
215 5           hdr->version = LOG_VERSION;
216 5           hdr->data_size = data_size;
217 5           hdr->total_size = total;
218 5           hdr->data_off = sizeof(LogHeader);
219 5           __atomic_thread_fence(__ATOMIC_SEQ_CST);
220 5           }
221              
222 7           static inline LogHandle *log_setup(void *base, size_t ms, const char *path, int bfd) {
223 7           LogHeader *hdr = (LogHeader *)base;
224 7           LogHandle *h = (LogHandle *)calloc(1, sizeof(LogHandle));
225 7 50         if (!h) { munmap(base, ms); return NULL; }
226 7           h->hdr = hdr;
227 7           h->data = (uint8_t *)base + hdr->data_off;
228 7           h->mmap_size = ms;
229 7 100         h->path = path ? strdup(path) : NULL;
230 7           h->notify_fd = -1;
231 7           h->backing_fd = bfd;
232 7           return h;
233             }
234              
235 5           static LogHandle *log_create(const char *path, uint64_t data_size, char *errbuf) {
236 5 50         if (errbuf) errbuf[0] = '\0';
237 5 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
238 5 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
239              
240 5           uint64_t total = sizeof(LogHeader) + data_size;
241 5           int anonymous = (path == NULL);
242 5           int fd = -1;
243             size_t map_size;
244             void *base;
245              
246 5 100         if (anonymous) {
247 2           map_size = (size_t)total;
248 2           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
249 2 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
250             } else {
251 3           fd = open(path, O_RDWR|O_CREAT, 0666);
252 4 50         if (fd < 0) { LOG_ERR("open: %s", strerror(errno)); return NULL; }
    0          
253 3 50         if (flock(fd, LOCK_EX) < 0) { LOG_ERR("flock: %s", strerror(errno)); close(fd); return NULL; }
    0          
254             struct stat st;
255 3 50         if (fstat(fd, &st) < 0) {
256 0 0         LOG_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
257             }
258 3           int is_new = (st.st_size == 0);
259 3 100         if (is_new && ftruncate(fd, (off_t)total) < 0) {
    50          
260 0 0         LOG_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL;
261             }
262 3 100         map_size = is_new ? (size_t)total : (size_t)st.st_size;
263 3           base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
264 3 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; }
    0          
265 3 100         if (!is_new) {
266 1           LogHeader *hdr = (LogHeader *)base;
267 1 50         if (hdr->magic != LOG_MAGIC || hdr->version != LOG_VERSION ||
    50          
268 1 50         hdr->total_size != (uint64_t)st.st_size) {
269 0 0         LOG_ERR("invalid log file"); munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL;
270             }
271 1           flock(fd, LOCK_UN); close(fd);
272 1           return log_setup(base, map_size, path, -1);
273             }
274             }
275 4           log_init_header(base, total, data_size);
276 4 100         if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
277 4           return log_setup(base, map_size, path, -1);
278             }
279              
280 1           static LogHandle *log_create_memfd(const char *name, uint64_t data_size, char *errbuf) {
281 1 50         if (errbuf) errbuf[0] = '\0';
282 1 50         if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; }
    0          
283 1 50         if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; }
    0          
284 1           uint64_t total = sizeof(LogHeader) + data_size;
285 1 50         int fd = memfd_create(name ? name : "log", MFD_CLOEXEC);
286 1 50         if (fd < 0) { LOG_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    0          
287 1 50         if (ftruncate(fd, (off_t)total) < 0) { LOG_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; }
    0          
288 1           void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
289 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    0          
290 1           log_init_header(base, total, data_size);
291 1           return log_setup(base, (size_t)total, NULL, fd);
292             }
293              
294 1           static LogHandle *log_open_fd(int fd, char *errbuf) {
295 1 50         if (errbuf) errbuf[0] = '\0';
296             struct stat st;
297 1 50         if (fstat(fd, &st) < 0) { LOG_ERR("fstat: %s", strerror(errno)); return NULL; }
    0          
298 1 50         if ((uint64_t)st.st_size < sizeof(LogHeader)) { LOG_ERR("too small"); return NULL; }
    0          
299 1           size_t ms = (size_t)st.st_size;
300 1           void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
301 1 50         if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; }
    0          
302 1           LogHeader *hdr = (LogHeader *)base;
303 1 50         if (hdr->magic != LOG_MAGIC || hdr->version != LOG_VERSION ||
    50          
304 1 50         hdr->total_size != (uint64_t)st.st_size) {
305 0 0         LOG_ERR("invalid log"); munmap(base, ms); return NULL;
306             }
307 1           int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
308 1 50         if (myfd < 0) { LOG_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    0          
309 1           return log_setup(base, ms, NULL, myfd);
310             }
311              
312 7           static void log_destroy(LogHandle *h) {
313 7 50         if (!h) return;
314 7 100         if (h->notify_fd >= 0) close(h->notify_fd);
315 7 100         if (h->backing_fd >= 0) close(h->backing_fd);
316 7 50         if (h->hdr) munmap(h->hdr, h->mmap_size);
317 7           free(h->path);
318 7           free(h);
319             }
320              
321             /* NOT concurrency-safe — caller must ensure no concurrent access */
322 5           static void log_reset(LogHandle *h) {
323 5           __atomic_store_n(&h->hdr->tail, 0, __ATOMIC_RELEASE);
324 5           __atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE);
325 5 50         if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
326 0           syscall(SYS_futex, &h->hdr->count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
327 5           }
328              
329 1           static int log_create_eventfd(LogHandle *h) {
330 1 50         if (h->notify_fd >= 0) close(h->notify_fd);
331 1           int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
332 1 50         if (efd < 0) return -1;
333 1           h->notify_fd = efd; return efd;
334             }
335 1           static int log_notify(LogHandle *h) {
336 1 50         if (h->notify_fd < 0) return 0;
337 1           uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
338             }
339 1           static int64_t log_eventfd_consume(LogHandle *h) {
340 1 50         if (h->notify_fd < 0) return -1;
341 1           uint64_t v = 0;
342 1 50         if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
343 1           return (int64_t)v;
344             }
345 1           static void log_msync(LogHandle *h) { msync(h->hdr, h->mmap_size, MS_SYNC); }
346              
347             #endif /* LOG_H */