| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
/* |
|
2
|
|
|
|
|
|
|
* log.h -- Append-only shared-memory log (WAL) for Linux |
|
3
|
|
|
|
|
|
|
* |
|
4
|
|
|
|
|
|
|
* Multiple writers append variable-length entries via CAS on tail offset. |
|
5
|
|
|
|
|
|
|
* Readers replay from any offset. Entries persist until explicit reset. |
|
6
|
|
|
|
|
|
|
* |
|
7
|
|
|
|
|
|
|
* Entry format: [uint32_t reserve_size][uint32_t length][data][padding to 4B]. |
|
8
|
|
|
|
|
|
|
* reserve_size = total slot size (header + data + padding); written by |
|
9
|
|
|
|
|
|
|
* the writer immediately after CAS-reserving the slot, before any |
|
10
|
|
|
|
|
|
|
* data is copied. Lets readers locate the next slot even if the |
|
11
|
|
|
|
|
|
|
* writer crashes before committing len. |
|
12
|
|
|
|
|
|
|
* length = data length; written AFTER data via RELEASE store as a |
|
13
|
|
|
|
|
|
|
* commit flag (0 = uncommitted/abandoned). |
|
14
|
|
|
|
|
|
|
* |
|
15
|
|
|
|
|
|
|
* Crash recovery: if a writer dies after CAS but before committing len, |
|
16
|
|
|
|
|
|
|
* the slot has reserve_size > 0 and len == 0. log_read_ex returns |
|
17
|
|
|
|
|
|
|
* LOG_READ_ABANDONED for such slots (no data, next_off advances by |
|
18
|
|
|
|
|
|
|
* reserve_size), letting readers skip past gaps. A bounded spin gives |
|
19
|
|
|
|
|
|
|
* in-flight writers time to commit before declaring abandonment. |
|
20
|
|
|
|
|
|
|
* |
|
21
|
|
|
|
|
|
|
* The narrow window between CAS-reserve and the reserve_size store |
|
22
|
|
|
|
|
|
|
* (typically a few CPU instructions) is NOT recoverable: if a writer |
|
23
|
|
|
|
|
|
|
* dies there, the slot's size is unknown and readers cannot skip past |
|
24
|
|
|
|
|
|
|
* it. Callers must reset/recreate the log. This is extremely rare in |
|
25
|
|
|
|
|
|
|
* practice given the small instruction window. |
|
26
|
|
|
|
|
|
|
* |
|
27
|
|
|
|
|
|
|
* Padding ensures the next slot header is 4-byte aligned — required |
|
28
|
|
|
|
|
|
|
* for atomic load/store on strict-alignment ISAs (ARM64 LDAR/STLR |
|
29
|
|
|
|
|
|
|
* trap on unaligned addresses with SIGBUS). |
|
30
|
|
|
|
|
|
|
*/ |
|
31
|
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
#ifndef LOG_H |
|
33
|
|
|
|
|
|
|
#define LOG_H |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
#include |
|
36
|
|
|
|
|
|
|
#include |
|
37
|
|
|
|
|
|
|
#include |
|
38
|
|
|
|
|
|
|
#include |
|
39
|
|
|
|
|
|
|
#include |
|
40
|
|
|
|
|
|
|
#include |
|
41
|
|
|
|
|
|
|
#include |
|
42
|
|
|
|
|
|
|
#include |
|
43
|
|
|
|
|
|
|
#include |
|
44
|
|
|
|
|
|
|
#include |
|
45
|
|
|
|
|
|
|
#include |
|
46
|
|
|
|
|
|
|
#include |
|
47
|
|
|
|
|
|
|
#include |
|
48
|
|
|
|
|
|
|
#include |
|
49
|
|
|
|
|
|
|
#include |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
#define LOG_MAGIC 0x4C4F4732U /* "LOG2" — v2 entry format with reserve_size */ |
|
52
|
|
|
|
|
|
|
#define LOG_VERSION 2 |
|
53
|
|
|
|
|
|
|
#define LOG_ERR_BUFLEN 256 |
|
54
|
|
|
|
|
|
|
/* Slot header: reserve_size (u32) + len (u32) = 8 bytes. */ |
|
55
|
|
|
|
|
|
|
#define LOG_ENTRY_HDR (2 * sizeof(uint32_t)) |
|
56
|
|
|
|
|
|
|
/* Default bounded-spin (microseconds) for an in-flight writer to commit |
|
57
|
|
|
|
|
|
|
* len before a reader declares the slot abandoned. Override per call via |
|
58
|
|
|
|
|
|
|
* log_read_ex; this affects only the rare crash-recovery path. */ |
|
59
|
|
|
|
|
|
|
#ifndef LOG_ABANDON_WAIT_US |
|
60
|
|
|
|
|
|
|
#define LOG_ABANDON_WAIT_US 2000000 /* 2s */ |
|
61
|
|
|
|
|
|
|
#endif |
|
62
|
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
/* log_read / log_read_ex return codes */ |
|
64
|
|
|
|
|
|
|
#define LOG_READ_EMPTY 0 /* no entry at offset (end / uncommitted in flight) */ |
|
65
|
|
|
|
|
|
|
#define LOG_READ_OK 1 /* valid entry — out_data, out_len, next_off set */ |
|
66
|
|
|
|
|
|
|
#define LOG_READ_ABANDONED 2 /* slot abandoned — next_off set, no data */ |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
/* ================================================================ |
|
69
|
|
|
|
|
|
|
* Header (128 bytes) |
|
70
|
|
|
|
|
|
|
* ================================================================ */ |
|
71
|
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
typedef struct { |
|
73
|
|
|
|
|
|
|
uint32_t magic; |
|
74
|
|
|
|
|
|
|
uint32_t version; |
|
75
|
|
|
|
|
|
|
uint64_t data_size; /* 8: usable data region size */ |
|
76
|
|
|
|
|
|
|
uint64_t total_size; /* 16 */ |
|
77
|
|
|
|
|
|
|
uint64_t data_off; /* 24 */ |
|
78
|
|
|
|
|
|
|
uint8_t _pad0[32]; /* 32-63 */ |
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
uint64_t tail; /* 64: byte offset past last entry (CAS target) */ |
|
81
|
|
|
|
|
|
|
uint64_t count; /* 72: number of committed entries */ |
|
82
|
|
|
|
|
|
|
uint32_t waiters; /* 80: blocked tailers */ |
|
83
|
|
|
|
|
|
|
uint32_t wake_seq; /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */ |
|
84
|
|
|
|
|
|
|
uint64_t stat_appends; /* 88 */ |
|
85
|
|
|
|
|
|
|
uint64_t stat_waits; /* 96 */ |
|
86
|
|
|
|
|
|
|
uint64_t stat_timeouts; /* 104 */ |
|
87
|
|
|
|
|
|
|
uint64_t truncation; /* 112: entries before this offset are invalid */ |
|
88
|
|
|
|
|
|
|
uint8_t _pad2[8]; /* 120-127 */ |
|
89
|
|
|
|
|
|
|
} LogHeader; |
|
90
|
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L |
|
92
|
|
|
|
|
|
|
_Static_assert(sizeof(LogHeader) == 128, "LogHeader must be 128 bytes"); |
|
93
|
|
|
|
|
|
|
#endif |
|
94
|
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
typedef struct { |
|
96
|
|
|
|
|
|
|
LogHeader *hdr; |
|
97
|
|
|
|
|
|
|
uint8_t *data; |
|
98
|
|
|
|
|
|
|
size_t mmap_size; |
|
99
|
|
|
|
|
|
|
char *path; |
|
100
|
|
|
|
|
|
|
int notify_fd; |
|
101
|
|
|
|
|
|
|
int backing_fd; |
|
102
|
|
|
|
|
|
|
} LogHandle; |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
/* ================================================================ |
|
105
|
|
|
|
|
|
|
* Utility |
|
106
|
|
|
|
|
|
|
* ================================================================ */ |
|
107
|
|
|
|
|
|
|
|
|
108
|
2
|
|
|
|
|
|
static inline void log_make_deadline(double t, struct timespec *dl) { |
|
109
|
2
|
|
|
|
|
|
clock_gettime(CLOCK_MONOTONIC, dl); |
|
110
|
2
|
|
|
|
|
|
dl->tv_sec += (time_t)t; |
|
111
|
2
|
|
|
|
|
|
dl->tv_nsec += (long)((t - (double)(time_t)t) * 1e9); |
|
112
|
2
|
50
|
|
|
|
|
if (dl->tv_nsec >= 1000000000L) { dl->tv_sec++; dl->tv_nsec -= 1000000000L; } |
|
113
|
2
|
|
|
|
|
|
} |
|
114
|
|
|
|
|
|
|
|
|
115
|
3
|
|
|
|
|
|
static inline int log_remaining(const struct timespec *dl, struct timespec *rem) { |
|
116
|
|
|
|
|
|
|
struct timespec now; |
|
117
|
3
|
|
|
|
|
|
clock_gettime(CLOCK_MONOTONIC, &now); |
|
118
|
3
|
|
|
|
|
|
rem->tv_sec = dl->tv_sec - now.tv_sec; |
|
119
|
3
|
|
|
|
|
|
rem->tv_nsec = dl->tv_nsec - now.tv_nsec; |
|
120
|
3
|
100
|
|
|
|
|
if (rem->tv_nsec < 0) { rem->tv_sec--; rem->tv_nsec += 1000000000L; } |
|
121
|
3
|
|
|
|
|
|
return rem->tv_sec >= 0; |
|
122
|
|
|
|
|
|
|
} |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
/* ================================================================ |
|
125
|
|
|
|
|
|
|
* Append — CAS reserve space, publish reserve_size, write data, |
|
126
|
|
|
|
|
|
|
* commit (len). reserve_size is published BEFORE data so a crashed |
|
127
|
|
|
|
|
|
|
* writer leaves a recoverable slot boundary for readers. |
|
128
|
|
|
|
|
|
|
* ================================================================ */ |
|
129
|
|
|
|
|
|
|
|
|
130
|
113
|
|
|
|
|
|
static inline int64_t log_append(LogHandle *h, const void *data, uint32_t len) { |
|
131
|
113
|
50
|
|
|
|
|
if (len == 0) return -1; /* 0 is the uncommitted marker */ |
|
132
|
|
|
|
|
|
|
|
|
133
|
113
|
|
|
|
|
|
LogHeader *hdr = h->hdr; |
|
134
|
113
|
50
|
|
|
|
|
if (len > UINT32_MAX - LOG_ENTRY_HDR - 3U) return -1; |
|
135
|
|
|
|
|
|
|
/* Pad total slot size up to 4-byte boundary so the next slot's |
|
136
|
|
|
|
|
|
|
* header words are naturally aligned for atomic ops on ARM64. */ |
|
137
|
113
|
|
|
|
|
|
uint32_t entry_size = (LOG_ENTRY_HDR + len + 3U) & ~3U; |
|
138
|
|
|
|
|
|
|
|
|
139
|
0
|
|
|
|
|
|
for (;;) { |
|
140
|
113
|
|
|
|
|
|
uint64_t t = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED); |
|
141
|
222
|
100
|
|
|
|
|
if (t + entry_size > hdr->data_size) return -1; |
|
142
|
|
|
|
|
|
|
|
|
143
|
109
|
50
|
|
|
|
|
if (__atomic_compare_exchange_n(&hdr->tail, &t, t + entry_size, |
|
144
|
|
|
|
|
|
|
1, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { |
|
145
|
109
|
|
|
|
|
|
uint8_t *slot = h->data + t; |
|
146
|
|
|
|
|
|
|
/* Explicitly zero len before publishing reserve_size. In the |
|
147
|
|
|
|
|
|
|
* common case the slot is already zero (fresh log_init or |
|
148
|
|
|
|
|
|
|
* post-reset memset), but this defensive RELAXED store |
|
149
|
|
|
|
|
|
|
* tolerates user code that bypasses log_reset's zeroing or |
|
150
|
|
|
|
|
|
|
* concurrent edge cases. Release fence comes via the |
|
151
|
|
|
|
|
|
|
* subsequent reserve_size store. */ |
|
152
|
109
|
|
|
|
|
|
__atomic_store_n((uint32_t *)(slot + sizeof(uint32_t)), 0U, __ATOMIC_RELAXED); |
|
153
|
|
|
|
|
|
|
/* Publish slot boundary so readers can skip past us on crash. |
|
154
|
|
|
|
|
|
|
* RELEASE so any reader observing reserve_size > 0 also sees |
|
155
|
|
|
|
|
|
|
* the len=0 store above and correctly classifies the slot. */ |
|
156
|
109
|
|
|
|
|
|
__atomic_store_n((uint32_t *)slot, entry_size, __ATOMIC_RELEASE); |
|
157
|
|
|
|
|
|
|
/* Write payload. */ |
|
158
|
109
|
|
|
|
|
|
memcpy(slot + LOG_ENTRY_HDR, data, len); |
|
159
|
|
|
|
|
|
|
/* Commit: RELEASE store of len publishes the data. */ |
|
160
|
109
|
|
|
|
|
|
__atomic_store_n((uint32_t *)(slot + sizeof(uint32_t)), len, __ATOMIC_RELEASE); |
|
161
|
|
|
|
|
|
|
|
|
162
|
109
|
|
|
|
|
|
__atomic_add_fetch(&hdr->count, 1, __ATOMIC_RELEASE); |
|
163
|
109
|
|
|
|
|
|
__atomic_add_fetch(&hdr->stat_appends, 1, __ATOMIC_RELAXED); |
|
164
|
109
|
|
|
|
|
|
__atomic_add_fetch(&hdr->wake_seq, 1, __ATOMIC_RELEASE); |
|
165
|
|
|
|
|
|
|
|
|
166
|
109
|
50
|
|
|
|
|
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0) |
|
167
|
0
|
|
|
|
|
|
syscall(SYS_futex, &hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); |
|
168
|
|
|
|
|
|
|
|
|
169
|
109
|
|
|
|
|
|
return (int64_t)t; |
|
170
|
|
|
|
|
|
|
} |
|
171
|
|
|
|
|
|
|
} |
|
172
|
|
|
|
|
|
|
} |
|
173
|
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
/* ================================================================ |
|
175
|
|
|
|
|
|
|
* Read — read entry at offset. |
|
176
|
|
|
|
|
|
|
* |
|
177
|
|
|
|
|
|
|
* Returns: |
|
178
|
|
|
|
|
|
|
* LOG_READ_OK — entry found; out_data/out_len/next_off set. |
|
179
|
|
|
|
|
|
|
* LOG_READ_ABANDONED — slot was reserved but never committed (writer |
|
180
|
|
|
|
|
|
|
* crashed mid-append); next_off advanced past it, |
|
181
|
|
|
|
|
|
|
* no out_data. abandon_wait_us caps the wait for |
|
182
|
|
|
|
|
|
|
* an in-flight writer before declaring abandonment. |
|
183
|
|
|
|
|
|
|
* LOG_READ_EMPTY — no entry (past tail, truncated, or in-flight |
|
184
|
|
|
|
|
|
|
* writer not yet timed out). |
|
185
|
|
|
|
|
|
|
* ================================================================ */ |
|
186
|
|
|
|
|
|
|
|
|
187
|
240
|
|
|
|
|
|
static inline int log_read_ex(LogHandle *h, uint64_t offset, |
|
188
|
|
|
|
|
|
|
const uint8_t **out_data, uint32_t *out_len, |
|
189
|
|
|
|
|
|
|
uint64_t *next_off, |
|
190
|
|
|
|
|
|
|
uint64_t abandon_wait_us) { |
|
191
|
240
|
|
|
|
|
|
uint64_t trunc = __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE); |
|
192
|
240
|
100
|
|
|
|
|
if (offset < trunc) return LOG_READ_EMPTY; /* truncated */ |
|
193
|
239
|
|
|
|
|
|
uint64_t t = __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE); |
|
194
|
239
|
100
|
|
|
|
|
if (offset >= t) return LOG_READ_EMPTY; |
|
195
|
229
|
50
|
|
|
|
|
if (offset + LOG_ENTRY_HDR > h->hdr->data_size) return LOG_READ_EMPTY; |
|
196
|
|
|
|
|
|
|
|
|
197
|
229
|
|
|
|
|
|
uint8_t *slot = h->data + offset; |
|
198
|
|
|
|
|
|
|
/* Load reserve_size FIRST. ACQUIRE pairs with the writer's RELEASE- |
|
199
|
|
|
|
|
|
|
* store of reserve_size, which happens-after the RELAXED store of |
|
200
|
|
|
|
|
|
|
* len=0. So observing reserve_size > 0 guarantees the writer's |
|
201
|
|
|
|
|
|
|
* len=0 is also visible — we can then trust the subsequent len-load. |
|
202
|
|
|
|
|
|
|
* |
|
203
|
|
|
|
|
|
|
* This is the "happens-before chain": reserve_size release -> reader |
|
204
|
|
|
|
|
|
|
* acquire -> len load. Without it, a freshly-CAS-reserved slot could |
|
205
|
|
|
|
|
|
|
* race with a reader observing a stale len from an earlier epoch |
|
206
|
|
|
|
|
|
|
* before the writer has had a chance to overwrite it. */ |
|
207
|
229
|
|
|
|
|
|
uint64_t waited = 0; |
|
208
|
229
|
|
|
|
|
|
const uint64_t step_us = 1000; /* 1ms */ |
|
209
|
229
|
|
|
|
|
|
uint32_t reserve_size = 0; |
|
210
|
0
|
|
|
|
|
|
for (;;) { |
|
211
|
229
|
|
|
|
|
|
reserve_size = __atomic_load_n((const uint32_t *)slot, __ATOMIC_ACQUIRE); |
|
212
|
229
|
50
|
|
|
|
|
if (reserve_size > 0) { |
|
213
|
229
|
|
|
|
|
|
uint32_t len = __atomic_load_n( |
|
214
|
229
|
|
|
|
|
|
(const uint32_t *)(slot + sizeof(uint32_t)), __ATOMIC_ACQUIRE); |
|
215
|
229
|
50
|
|
|
|
|
if (len > 0) { |
|
216
|
458
|
50
|
|
|
|
|
if (offset + LOG_ENTRY_HDR + len > t) return LOG_READ_EMPTY; |
|
217
|
229
|
|
|
|
|
|
*out_data = slot + LOG_ENTRY_HDR; |
|
218
|
229
|
|
|
|
|
|
*out_len = len; |
|
219
|
229
|
|
|
|
|
|
*next_off = offset + (((uint64_t)LOG_ENTRY_HDR + len + 3U) & ~(uint64_t)3U); |
|
220
|
229
|
|
|
|
|
|
return LOG_READ_OK; |
|
221
|
|
|
|
|
|
|
} |
|
222
|
|
|
|
|
|
|
} |
|
223
|
0
|
0
|
|
|
|
|
if (waited >= abandon_wait_us) break; |
|
224
|
0
|
|
|
|
|
|
struct timespec ts = { 0, (long)(step_us * 1000) }; |
|
225
|
0
|
|
|
|
|
|
nanosleep(&ts, NULL); |
|
226
|
0
|
|
|
|
|
|
waited += step_us; |
|
227
|
|
|
|
|
|
|
} |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
/* Wait elapsed. Two outcomes: |
|
230
|
|
|
|
|
|
|
* reserve_size == 0: writer either hasn't published it yet (very |
|
231
|
|
|
|
|
|
|
* narrow window between CAS and the reserve_size store) or |
|
232
|
|
|
|
|
|
|
* crashed before publishing. We can't determine slot length so |
|
233
|
|
|
|
|
|
|
* signal EMPTY — caller must retry or give up. |
|
234
|
|
|
|
|
|
|
* reserve_size > 0, len == 0: writer crashed after publishing the |
|
235
|
|
|
|
|
|
|
* boundary but before committing data. Skip via reserve_size. */ |
|
236
|
0
|
0
|
|
|
|
|
if (reserve_size == 0) return LOG_READ_EMPTY; |
|
237
|
|
|
|
|
|
|
/* Sanity-check reserve_size: must be aligned, larger than the header |
|
238
|
|
|
|
|
|
|
* (len=0 is rejected by the writer, so a valid slot has reserve_size |
|
239
|
|
|
|
|
|
|
* > LOG_ENTRY_HDR), and within both the data region and committed tail. */ |
|
240
|
0
|
0
|
|
|
|
|
if (reserve_size <= LOG_ENTRY_HDR |
|
241
|
0
|
0
|
|
|
|
|
|| (reserve_size & 3U) != 0 |
|
242
|
0
|
0
|
|
|
|
|
|| offset + reserve_size > h->hdr->data_size |
|
243
|
0
|
0
|
|
|
|
|
|| offset + reserve_size > t) |
|
244
|
0
|
|
|
|
|
|
return LOG_READ_EMPTY; |
|
245
|
|
|
|
|
|
|
|
|
246
|
0
|
|
|
|
|
|
*next_off = offset + reserve_size; |
|
247
|
0
|
|
|
|
|
|
return LOG_READ_ABANDONED; |
|
248
|
|
|
|
|
|
|
} |
|
249
|
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
/* Backwards-compatible read: returns 1 for OK, 0 for EMPTY or ABANDONED. |
|
251
|
|
|
|
|
|
|
* Use log_read_ex if you need to distinguish abandoned slots. */ |
|
252
|
|
|
|
|
|
|
static inline int log_read(LogHandle *h, uint64_t offset, |
|
253
|
|
|
|
|
|
|
const uint8_t **out_data, uint32_t *out_len, |
|
254
|
|
|
|
|
|
|
uint64_t *next_off) { |
|
255
|
|
|
|
|
|
|
int rc = log_read_ex(h, offset, out_data, out_len, next_off, |
|
256
|
|
|
|
|
|
|
LOG_ABANDON_WAIT_US); |
|
257
|
|
|
|
|
|
|
return rc == LOG_READ_OK ? 1 : 0; |
|
258
|
|
|
|
|
|
|
} |
|
259
|
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
/* ================================================================ |
|
261
|
|
|
|
|
|
|
* Tail / Wait |
|
262
|
|
|
|
|
|
|
* ================================================================ */ |
|
263
|
|
|
|
|
|
|
|
|
264
|
4
|
|
|
|
|
|
static inline uint64_t log_tail_offset(LogHandle *h) { |
|
265
|
4
|
|
|
|
|
|
return __atomic_load_n(&h->hdr->tail, __ATOMIC_ACQUIRE); |
|
266
|
|
|
|
|
|
|
} |
|
267
|
|
|
|
|
|
|
|
|
268
|
19
|
|
|
|
|
|
static inline uint64_t log_entry_count(LogHandle *h) { |
|
269
|
19
|
|
|
|
|
|
return __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE); |
|
270
|
|
|
|
|
|
|
} |
|
271
|
|
|
|
|
|
|
|
|
272
|
1
|
|
|
|
|
|
static inline uint64_t log_data_size(LogHandle *h) { |
|
273
|
1
|
|
|
|
|
|
return h->hdr->data_size; |
|
274
|
|
|
|
|
|
|
} |
|
275
|
|
|
|
|
|
|
|
|
276
|
2
|
|
|
|
|
|
static inline uint64_t log_available(LogHandle *h) { |
|
277
|
2
|
|
|
|
|
|
return h->hdr->data_size - __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED); |
|
278
|
|
|
|
|
|
|
} |
|
279
|
|
|
|
|
|
|
|
|
280
|
2
|
|
|
|
|
|
static inline int log_wait(LogHandle *h, uint64_t expected_count, double timeout) { |
|
281
|
2
|
50
|
|
|
|
|
if (log_entry_count(h) != expected_count) return 1; |
|
282
|
2
|
50
|
|
|
|
|
if (timeout == 0) return 0; |
|
283
|
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
struct timespec dl, rem; |
|
285
|
2
|
|
|
|
|
|
int has_dl = (timeout > 0); |
|
286
|
2
|
50
|
|
|
|
|
if (has_dl) log_make_deadline(timeout, &dl); |
|
287
|
2
|
|
|
|
|
|
__atomic_add_fetch(&h->hdr->stat_waits, 1, __ATOMIC_RELAXED); |
|
288
|
|
|
|
|
|
|
|
|
289
|
0
|
|
|
|
|
|
for (;;) { |
|
290
|
2
|
|
|
|
|
|
__atomic_add_fetch(&h->hdr->waiters, 1, __ATOMIC_RELEASE); |
|
291
|
2
|
|
|
|
|
|
uint32_t seq = __atomic_load_n(&h->hdr->wake_seq, __ATOMIC_ACQUIRE); |
|
292
|
2
|
|
|
|
|
|
uint64_t cur = __atomic_load_n(&h->hdr->count, __ATOMIC_ACQUIRE); |
|
293
|
2
|
50
|
|
|
|
|
if (cur == expected_count) { |
|
294
|
2
|
|
|
|
|
|
struct timespec *pts = NULL; |
|
295
|
2
|
50
|
|
|
|
|
if (has_dl) { |
|
296
|
2
|
50
|
|
|
|
|
if (!log_remaining(&dl, &rem)) { |
|
297
|
0
|
|
|
|
|
|
__atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED); |
|
298
|
0
|
|
|
|
|
|
__atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED); |
|
299
|
0
|
|
|
|
|
|
return 0; |
|
300
|
|
|
|
|
|
|
} |
|
301
|
2
|
|
|
|
|
|
pts = &rem; |
|
302
|
|
|
|
|
|
|
} |
|
303
|
2
|
|
|
|
|
|
syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAIT, seq, pts, NULL, 0); |
|
304
|
|
|
|
|
|
|
} |
|
305
|
2
|
|
|
|
|
|
__atomic_sub_fetch(&h->hdr->waiters, 1, __ATOMIC_RELAXED); |
|
306
|
2
|
100
|
|
|
|
|
if (log_entry_count(h) != expected_count) return 1; |
|
307
|
1
|
50
|
|
|
|
|
if (has_dl && !log_remaining(&dl, &rem)) { |
|
|
|
50
|
|
|
|
|
|
|
308
|
1
|
|
|
|
|
|
__atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED); |
|
309
|
1
|
|
|
|
|
|
return 0; |
|
310
|
|
|
|
|
|
|
} |
|
311
|
|
|
|
|
|
|
} |
|
312
|
|
|
|
|
|
|
} |
|
313
|
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
/* ================================================================ |
|
315
|
|
|
|
|
|
|
* Create / Open / Close |
|
316
|
|
|
|
|
|
|
* ================================================================ */ |
|
317
|
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
#define LOG_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, LOG_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0) |
|
319
|
|
|
|
|
|
|
|
|
320
|
12
|
|
|
|
|
|
static inline void log_init_header(void *base, uint64_t total, uint64_t data_size) { |
|
321
|
12
|
|
|
|
|
|
LogHeader *hdr = (LogHeader *)base; |
|
322
|
12
|
|
|
|
|
|
memset(base, 0, (size_t)total); |
|
323
|
12
|
|
|
|
|
|
hdr->magic = LOG_MAGIC; |
|
324
|
12
|
|
|
|
|
|
hdr->version = LOG_VERSION; |
|
325
|
12
|
|
|
|
|
|
hdr->data_size = data_size; |
|
326
|
12
|
|
|
|
|
|
hdr->total_size = total; |
|
327
|
12
|
|
|
|
|
|
hdr->data_off = sizeof(LogHeader); |
|
328
|
12
|
|
|
|
|
|
__atomic_thread_fence(__ATOMIC_SEQ_CST); |
|
329
|
12
|
|
|
|
|
|
} |
|
330
|
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
/* Validate a mapped header (shared by log_create reopen and log_open_fd). |
|
332
|
|
|
|
|
|
|
* On failure, writes a diagnostic to errbuf (if non-NULL). */ |
|
333
|
2
|
|
|
|
|
|
static inline int log_validate_header(const LogHeader *hdr, uint64_t file_size, |
|
334
|
|
|
|
|
|
|
char *errbuf) { |
|
335
|
2
|
50
|
|
|
|
|
if (hdr->magic != LOG_MAGIC) { |
|
336
|
|
|
|
|
|
|
/* Recognize a v1 log specifically so users get a clear migration hint. */ |
|
337
|
0
|
0
|
|
|
|
|
if (hdr->magic == 0x4C4F4731U) { |
|
338
|
0
|
0
|
|
|
|
|
LOG_ERR("v1 log file (LOG1) not compatible with v0.04+ (LOG2): recreate"); |
|
339
|
|
|
|
|
|
|
} else { |
|
340
|
0
|
0
|
|
|
|
|
LOG_ERR("bad magic: 0x%08x (expected 0x%08x)", |
|
341
|
|
|
|
|
|
|
(unsigned)hdr->magic, (unsigned)LOG_MAGIC); |
|
342
|
|
|
|
|
|
|
} |
|
343
|
0
|
|
|
|
|
|
return 0; |
|
344
|
|
|
|
|
|
|
} |
|
345
|
2
|
50
|
|
|
|
|
if (hdr->version != LOG_VERSION) { |
|
346
|
0
|
0
|
|
|
|
|
LOG_ERR("version mismatch: %u (expected %u)", |
|
347
|
|
|
|
|
|
|
(unsigned)hdr->version, (unsigned)LOG_VERSION); |
|
348
|
0
|
|
|
|
|
|
return 0; |
|
349
|
|
|
|
|
|
|
} |
|
350
|
2
|
50
|
|
|
|
|
if (hdr->data_size == 0) { LOG_ERR("data_size = 0"); return 0; } |
|
|
|
0
|
|
|
|
|
|
|
351
|
2
|
50
|
|
|
|
|
if (hdr->data_size > UINT64_MAX - sizeof(LogHeader)) { |
|
352
|
0
|
0
|
|
|
|
|
LOG_ERR("data_size too large"); return 0; |
|
353
|
|
|
|
|
|
|
} |
|
354
|
2
|
50
|
|
|
|
|
if (hdr->total_size != file_size) { |
|
355
|
0
|
0
|
|
|
|
|
LOG_ERR("total_size mismatch"); return 0; |
|
356
|
|
|
|
|
|
|
} |
|
357
|
2
|
50
|
|
|
|
|
if (hdr->data_off != sizeof(LogHeader)) { |
|
358
|
0
|
0
|
|
|
|
|
LOG_ERR("data_off mismatch"); return 0; |
|
359
|
|
|
|
|
|
|
} |
|
360
|
2
|
50
|
|
|
|
|
if (hdr->total_size != sizeof(LogHeader) + hdr->data_size) { |
|
361
|
0
|
0
|
|
|
|
|
LOG_ERR("size invariant broken"); return 0; |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
/* Runtime-state sanity: tail and truncation must not exceed data_size. */ |
|
364
|
2
|
50
|
|
|
|
|
if (hdr->tail > hdr->data_size) { LOG_ERR("tail > data_size"); return 0; } |
|
|
|
0
|
|
|
|
|
|
|
365
|
2
|
50
|
|
|
|
|
if (hdr->truncation > hdr->data_size) { LOG_ERR("truncation > data_size"); return 0; } |
|
|
|
0
|
|
|
|
|
|
|
366
|
2
|
|
|
|
|
|
return 1; |
|
367
|
|
|
|
|
|
|
} |
|
368
|
|
|
|
|
|
|
|
|
369
|
14
|
|
|
|
|
|
static inline LogHandle *log_setup(void *base, size_t ms, const char *path, int bfd) { |
|
370
|
14
|
|
|
|
|
|
LogHeader *hdr = (LogHeader *)base; |
|
371
|
14
|
|
|
|
|
|
LogHandle *h = (LogHandle *)calloc(1, sizeof(LogHandle)); |
|
372
|
14
|
50
|
|
|
|
|
if (!h) { munmap(base, ms); return NULL; } |
|
373
|
14
|
|
|
|
|
|
h->hdr = hdr; |
|
374
|
14
|
|
|
|
|
|
h->data = (uint8_t *)base + hdr->data_off; |
|
375
|
14
|
|
|
|
|
|
h->mmap_size = ms; |
|
376
|
14
|
100
|
|
|
|
|
h->path = path ? strdup(path) : NULL; |
|
377
|
14
|
|
|
|
|
|
h->notify_fd = -1; |
|
378
|
14
|
|
|
|
|
|
h->backing_fd = bfd; |
|
379
|
|
|
|
|
|
|
/* Log is append-only: hint sequential access so the kernel prefetcher |
|
380
|
|
|
|
|
|
|
* reads ahead on large scans (read_entry, each_entry). Best-effort. */ |
|
381
|
14
|
|
|
|
|
|
(void)madvise(base, ms, MADV_SEQUENTIAL); |
|
382
|
14
|
|
|
|
|
|
return h; |
|
383
|
|
|
|
|
|
|
} |
|
384
|
|
|
|
|
|
|
|
|
385
|
6
|
|
|
|
|
|
static LogHandle *log_create(const char *path, uint64_t data_size, char *errbuf) { |
|
386
|
6
|
50
|
|
|
|
|
if (errbuf) errbuf[0] = '\0'; |
|
387
|
6
|
50
|
|
|
|
|
if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
388
|
6
|
50
|
|
|
|
|
if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
|
|
390
|
6
|
|
|
|
|
|
uint64_t total = sizeof(LogHeader) + data_size; |
|
391
|
6
|
|
|
|
|
|
int anonymous = (path == NULL); |
|
392
|
6
|
|
|
|
|
|
int fd = -1; |
|
393
|
|
|
|
|
|
|
size_t map_size; |
|
394
|
|
|
|
|
|
|
void *base; |
|
395
|
|
|
|
|
|
|
|
|
396
|
6
|
100
|
|
|
|
|
if (anonymous) { |
|
397
|
3
|
|
|
|
|
|
map_size = (size_t)total; |
|
398
|
3
|
|
|
|
|
|
base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); |
|
399
|
3
|
50
|
|
|
|
|
if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
} else { |
|
401
|
3
|
|
|
|
|
|
fd = open(path, O_RDWR|O_CREAT, 0666); |
|
402
|
4
|
50
|
|
|
|
|
if (fd < 0) { LOG_ERR("open: %s", strerror(errno)); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
403
|
3
|
50
|
|
|
|
|
if (flock(fd, LOCK_EX) < 0) { LOG_ERR("flock: %s", strerror(errno)); close(fd); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
struct stat st; |
|
405
|
3
|
50
|
|
|
|
|
if (fstat(fd, &st) < 0) { |
|
406
|
0
|
0
|
|
|
|
|
LOG_ERR("fstat: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; |
|
407
|
|
|
|
|
|
|
} |
|
408
|
3
|
|
|
|
|
|
int is_new = (st.st_size == 0); |
|
409
|
3
|
100
|
|
|
|
|
if (!is_new && (uint64_t)st.st_size < sizeof(LogHeader)) { |
|
|
|
50
|
|
|
|
|
|
|
410
|
0
|
0
|
|
|
|
|
LOG_ERR("%s: file too small (%lld)", path, (long long)st.st_size); |
|
411
|
0
|
|
|
|
|
|
flock(fd, LOCK_UN); close(fd); return NULL; |
|
412
|
|
|
|
|
|
|
} |
|
413
|
3
|
100
|
|
|
|
|
if (is_new && ftruncate(fd, (off_t)total) < 0) { |
|
|
|
50
|
|
|
|
|
|
|
414
|
0
|
0
|
|
|
|
|
LOG_ERR("ftruncate: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; |
|
415
|
|
|
|
|
|
|
} |
|
416
|
3
|
100
|
|
|
|
|
map_size = is_new ? (size_t)total : (size_t)st.st_size; |
|
417
|
3
|
|
|
|
|
|
base = mmap(NULL, map_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); |
|
418
|
3
|
50
|
|
|
|
|
if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); flock(fd, LOCK_UN); close(fd); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
419
|
3
|
100
|
|
|
|
|
if (!is_new) { |
|
420
|
1
|
50
|
|
|
|
|
if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size, errbuf)) { |
|
421
|
0
|
|
|
|
|
|
munmap(base, map_size); flock(fd, LOCK_UN); close(fd); return NULL; |
|
422
|
|
|
|
|
|
|
} |
|
423
|
1
|
|
|
|
|
|
flock(fd, LOCK_UN); close(fd); |
|
424
|
1
|
|
|
|
|
|
return log_setup(base, map_size, path, -1); |
|
425
|
|
|
|
|
|
|
} |
|
426
|
|
|
|
|
|
|
} |
|
427
|
5
|
|
|
|
|
|
log_init_header(base, total, data_size); |
|
428
|
5
|
100
|
|
|
|
|
if (fd >= 0) { flock(fd, LOCK_UN); close(fd); } |
|
429
|
5
|
|
|
|
|
|
return log_setup(base, map_size, path, -1); |
|
430
|
|
|
|
|
|
|
} |
|
431
|
|
|
|
|
|
|
|
|
432
|
7
|
|
|
|
|
|
static LogHandle *log_create_memfd(const char *name, uint64_t data_size, char *errbuf) { |
|
433
|
7
|
50
|
|
|
|
|
if (errbuf) errbuf[0] = '\0'; |
|
434
|
7
|
50
|
|
|
|
|
if (data_size == 0) { LOG_ERR("data_size must be > 0"); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
435
|
7
|
50
|
|
|
|
|
if (data_size > UINT64_MAX - sizeof(LogHeader)) { LOG_ERR("data_size too large"); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
436
|
7
|
|
|
|
|
|
uint64_t total = sizeof(LogHeader) + data_size; |
|
437
|
7
|
50
|
|
|
|
|
int fd = memfd_create(name ? name : "log", MFD_CLOEXEC | MFD_ALLOW_SEALING); |
|
438
|
7
|
50
|
|
|
|
|
if (fd < 0) { LOG_ERR("memfd_create: %s", strerror(errno)); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
439
|
7
|
50
|
|
|
|
|
if (ftruncate(fd, (off_t)total) < 0) { LOG_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
440
|
7
|
|
|
|
|
|
(void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW); |
|
441
|
7
|
|
|
|
|
|
void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); |
|
442
|
7
|
50
|
|
|
|
|
if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
443
|
7
|
|
|
|
|
|
log_init_header(base, total, data_size); |
|
444
|
7
|
|
|
|
|
|
return log_setup(base, (size_t)total, NULL, fd); |
|
445
|
|
|
|
|
|
|
} |
|
446
|
|
|
|
|
|
|
|
|
447
|
1
|
|
|
|
|
|
static LogHandle *log_open_fd(int fd, char *errbuf) { |
|
448
|
1
|
50
|
|
|
|
|
if (errbuf) errbuf[0] = '\0'; |
|
449
|
|
|
|
|
|
|
struct stat st; |
|
450
|
1
|
50
|
|
|
|
|
if (fstat(fd, &st) < 0) { LOG_ERR("fstat: %s", strerror(errno)); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
451
|
1
|
50
|
|
|
|
|
if ((uint64_t)st.st_size < sizeof(LogHeader)) { LOG_ERR("too small"); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
452
|
1
|
|
|
|
|
|
size_t ms = (size_t)st.st_size; |
|
453
|
1
|
|
|
|
|
|
void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); |
|
454
|
1
|
50
|
|
|
|
|
if (base == MAP_FAILED) { LOG_ERR("mmap: %s", strerror(errno)); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
455
|
1
|
50
|
|
|
|
|
if (!log_validate_header((LogHeader *)base, (uint64_t)st.st_size, errbuf)) { |
|
456
|
0
|
|
|
|
|
|
munmap(base, ms); return NULL; |
|
457
|
|
|
|
|
|
|
} |
|
458
|
1
|
|
|
|
|
|
int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0); |
|
459
|
1
|
50
|
|
|
|
|
if (myfd < 0) { LOG_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; } |
|
|
|
0
|
|
|
|
|
|
|
460
|
1
|
|
|
|
|
|
return log_setup(base, ms, NULL, myfd); |
|
461
|
|
|
|
|
|
|
} |
|
462
|
|
|
|
|
|
|
|
|
463
|
14
|
|
|
|
|
|
static void log_destroy(LogHandle *h) { |
|
464
|
14
|
50
|
|
|
|
|
if (!h) return; |
|
465
|
14
|
100
|
|
|
|
|
if (h->notify_fd >= 0) close(h->notify_fd); |
|
466
|
14
|
100
|
|
|
|
|
if (h->backing_fd >= 0) close(h->backing_fd); |
|
467
|
14
|
50
|
|
|
|
|
if (h->hdr) munmap(h->hdr, h->mmap_size); |
|
468
|
14
|
|
|
|
|
|
free(h->path); |
|
469
|
14
|
|
|
|
|
|
free(h); |
|
470
|
|
|
|
|
|
|
} |
|
471
|
|
|
|
|
|
|
|
|
472
|
|
|
|
|
|
|
/* NOT concurrency-safe — caller must ensure no concurrent access. |
|
473
|
|
|
|
|
|
|
* |
|
474
|
|
|
|
|
|
|
* Zeros the data region before rewinding tail. This is required for |
|
475
|
|
|
|
|
|
|
* correctness: a stale slot at offset 0 could otherwise leak old |
|
476
|
|
|
|
|
|
|
* data to readers that see the new tail before the new writer |
|
477
|
|
|
|
|
|
|
* publishes a fresh reserve_size. The header SEQ_CST fence below |
|
478
|
|
|
|
|
|
|
* publishes the zeroed region to all subsequent loads. */ |
|
479
|
7
|
|
|
|
|
|
static void log_reset(LogHandle *h) { |
|
480
|
7
|
|
|
|
|
|
memset(h->data, 0, (size_t)h->hdr->data_size); |
|
481
|
7
|
|
|
|
|
|
__atomic_thread_fence(__ATOMIC_SEQ_CST); |
|
482
|
7
|
|
|
|
|
|
__atomic_store_n(&h->hdr->truncation, 0, __ATOMIC_RELEASE); |
|
483
|
7
|
|
|
|
|
|
__atomic_store_n(&h->hdr->tail, 0, __ATOMIC_RELEASE); |
|
484
|
7
|
|
|
|
|
|
__atomic_store_n(&h->hdr->count, 0, __ATOMIC_RELEASE); |
|
485
|
7
|
|
|
|
|
|
__atomic_add_fetch(&h->hdr->wake_seq, 1, __ATOMIC_RELEASE); |
|
486
|
7
|
50
|
|
|
|
|
if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0) |
|
487
|
0
|
|
|
|
|
|
syscall(SYS_futex, &h->hdr->wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); |
|
488
|
7
|
|
|
|
|
|
} |
|
489
|
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
/* Concurrency-safe truncate: mark entries before offset as invalid. |
|
491
|
|
|
|
|
|
|
* Does NOT reclaim space — the log is append-only. Readers skip |
|
492
|
|
|
|
|
|
|
* entries below the truncation offset. */ |
|
493
|
4
|
|
|
|
|
|
static inline void log_truncate(LogHandle *h, uint64_t offset) { |
|
494
|
0
|
|
|
|
|
|
for (;;) { |
|
495
|
4
|
|
|
|
|
|
uint64_t cur = __atomic_load_n(&h->hdr->truncation, __ATOMIC_RELAXED); |
|
496
|
7
|
100
|
|
|
|
|
if (offset <= cur) return; /* can only advance, not retreat */ |
|
497
|
3
|
50
|
|
|
|
|
if (__atomic_compare_exchange_n(&h->hdr->truncation, &cur, offset, |
|
498
|
|
|
|
|
|
|
1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) |
|
499
|
3
|
|
|
|
|
|
return; |
|
500
|
|
|
|
|
|
|
} |
|
501
|
|
|
|
|
|
|
} |
|
502
|
|
|
|
|
|
|
|
|
503
|
13
|
|
|
|
|
|
static inline uint64_t log_truncation(LogHandle *h) { |
|
504
|
13
|
|
|
|
|
|
return __atomic_load_n(&h->hdr->truncation, __ATOMIC_ACQUIRE); |
|
505
|
|
|
|
|
|
|
} |
|
506
|
|
|
|
|
|
|
|
|
507
|
2
|
|
|
|
|
|
static int log_create_eventfd(LogHandle *h) { |
|
508
|
2
|
50
|
|
|
|
|
if (h->notify_fd >= 0) return h->notify_fd; |
|
509
|
2
|
|
|
|
|
|
int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); |
|
510
|
2
|
50
|
|
|
|
|
if (efd < 0) return -1; |
|
511
|
2
|
|
|
|
|
|
h->notify_fd = efd; return efd; |
|
512
|
|
|
|
|
|
|
} |
|
513
|
2
|
|
|
|
|
|
static int log_notify(LogHandle *h) { |
|
514
|
2
|
50
|
|
|
|
|
if (h->notify_fd < 0) return 0; |
|
515
|
2
|
|
|
|
|
|
uint64_t v = 1; return write(h->notify_fd, &v, sizeof(v)) == sizeof(v); |
|
516
|
|
|
|
|
|
|
} |
|
517
|
2
|
|
|
|
|
|
static int64_t log_eventfd_consume(LogHandle *h) { |
|
518
|
2
|
50
|
|
|
|
|
if (h->notify_fd < 0) return -1; |
|
519
|
2
|
|
|
|
|
|
uint64_t v = 0; |
|
520
|
2
|
50
|
|
|
|
|
if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1; |
|
521
|
2
|
|
|
|
|
|
return (int64_t)v; |
|
522
|
|
|
|
|
|
|
} |
|
523
|
1
|
|
|
|
|
|
static int log_msync(LogHandle *h) { return msync(h->hdr, h->mmap_size, MS_SYNC); } |
|
524
|
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
#endif /* LOG_H */ |