File Coverage

secret_buffer_async_write.c
Criterion Covered Total %
statement 19 147 12.9
branch 6 92 6.5
condition n/a
subroutine n/a
pod n/a
total 25 239 10.4


line stmt bran cond sub pod time code
1             /*
2             * Async write implementation
3             *
4             * This writes a span of bytes from a SecretBuffer into a file handle, and if it would block,
5             * starts a background thread to continue pumping the bytes into it. The more sensible way to
6             * go is to just use IPC::Run, but this enables the $sb->as_pipe feature without a dependency
7             * on IPC::Run.
8             */
9              
10             typedef struct {
11             int refcount;
12             #ifdef WIN32
13             CRITICAL_SECTION cs;
14             HANDLE startEvent, readyEvent, threadHandle, fd;
15             #define ASYNC_RESULT_MUTEX_LOCK(x) EnterCriticalSection(&((x)->cs))
16             #define ASYNC_RESULT_MUTEX_UNLOCK(x) LeaveCriticalSection(&((x)->cs))
17             #define ASYNC_RESULT_IS_THREAD_ALIVE(x) ((x)->threadHandle != INVALID_HANDLE_VALUE && WaitForSingleObject((x)->threadHandle, 0) == WAIT_TIMEOUT)
18             #define ASYNC_RESULT_NOTIFY_STARTED(x) SetEvent((x)->readyEvent)
19             #define ASYNC_RESULT_NOTIFY_COMPLETE(x) SetEvent((x)->startEvent)
20             #else
21             pthread_mutex_t mutex;
22             pthread_cond_t cond;
23             pthread_t threadHandle;
24             int fd;
25             #define ASYNC_RESULT_MUTEX_LOCK(x) pthread_mutex_lock(&((x)->mutex))
26             #define ASYNC_RESULT_MUTEX_UNLOCK(x) pthread_mutex_unlock(&((x)->mutex))
27             #define ASYNC_RESULT_IS_THREAD_ALIVE(x) ((x)->threadHandle > 0 && pthread_kill((x)->threadHandle, 0) == 0)
28             #define ASYNC_RESULT_NOTIFY_STARTED(x) pthread_cond_signal(&((x)->cond))
29             #define ASYNC_RESULT_NOTIFY_COMPLETE(x) pthread_cond_signal(&((x)->cond))
30             #endif
31             bool started, ready;
32             IV os_err;
33             IV total_written;
34             IV secret_len;
35             char secret[];
36             } secret_buffer_async_result;
37              
38 0           static SV *secret_buffer_async_result_wrap_with_object(pTHX_ secret_buffer_async_result *result) {
39 0           SV *ref= sv_2mortal(newRV_noinc(newSV(0)));
40 0           MAGIC *mg= sv_magicext(SvRV(ref), NULL, PERL_MAGIC_ext, &secret_buffer_async_result_magic_vtbl, (const char *)result, 0);
41             #ifdef USE_ITHREADS
42             mg->mg_flags |= MGf_DUP;
43             #else
44             PERL_UNUSED_VAR(mg);
45             #endif
46 0           return sv_bless(ref, gv_stashpv("Crypt::SecretBuffer::AsyncResult", GV_ADD));
47             }
48              
49 0           static secret_buffer_async_result* secret_buffer_async_result_from_magic(SV *obj, int flags) {
50             dTHX;
51 0           return (secret_buffer_async_result*) secret_buffer_X_from_magic(aTHX_
52             obj, flags, &secret_buffer_async_result_magic_vtbl, "secret_buffer_async_result", NULL);
53             }
54              
55 0           static secret_buffer_async_result *secret_buffer_async_result_new(int fd, secret_buffer *buf, size_t ofs, size_t count) {
56             #ifdef WIN32
57             /* On Win32, Perl redefines 'malloc' and 'free' to call methods on the perl thread instance
58             * which would be bad when a thread which perl is not aware of is the one calling 'free'.
59             * So use Win32 API GlobalAlloc/GlobalFree.
60             */
61             secret_buffer_async_result *result= (secret_buffer_async_result *)
62             GlobalAlloc(GMEM_FIXED|GMEM_ZEROINIT, sizeof(secret_buffer_async_result) + count);
63             if (!result)
64             croak_with_syserror("malloc(sizeof(secret_buffer_async_result) + count)", GetLastError());
65             Zero(((char*)result), sizeof(secret_buffer_async_result) + count, char);
66             InitializeCriticalSection(&result->cs);
67             /* Duplicate the file handle for the thread */
68             if (fd >= 0) {
69             if (!DuplicateHandle(GetCurrentProcess(), (HANDLE)_get_osfhandle(fd), GetCurrentProcess(), &result->fd,
70             0, FALSE, DUPLICATE_SAME_ACCESS)
71             ) {
72             GlobalFree(result);
73             croak_with_syserror("DuplicateHandle", GetLastError());
74             }
75             } else {
76             result->fd= INVALID_HANDLE_VALUE;
77             }
78             result->readyEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
79             result->startEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
80             if (result->readyEvent == NULL || result->startEvent == NULL) {
81             if (result->readyEvent) CloseHandle(result->readyEvent);
82             CloseHandle(result->fd);
83             GlobalFree(result);
84             croak_with_syserror("CreateEvent", GetLastError());
85             }
86             #else /* POSIX */
87             secret_buffer_async_result *result= (secret_buffer_async_result *)
88 0           malloc(sizeof(secret_buffer_async_result) + count);
89 0           Zero(((char*)result), sizeof(secret_buffer_async_result) + count, char);
90 0 0         result->fd= fd >= 0? dup(fd) : -1;
91 0 0         if (result->fd < 0) {
92 0           free(result);
93 0           croak_with_syserror("dup", errno);
94             }
95 0           pthread_mutex_init(&result->mutex, NULL);
96 0           pthread_cond_init(&result->cond, NULL);
97             #endif
98 0           result->ready= result->started= false;
99 0 0         if (count)
100 0           memcpy(buf->data + ofs, result->secret, count);
101 0           result->secret_len= count;
102 0           result->refcount= 1;
103 0           return result;
104             }
105              
106             /* One refcount is held by the main thread, and one by the worker thread */
107 0           static void secret_buffer_async_result_release(secret_buffer_async_result *result, bool from_thread) {
108 0           bool destroy= false, cleanup_thread_half= false, cleanup_perl_half= false;
109 0           ASYNC_RESULT_MUTEX_LOCK(result);
110 0 0         if (from_thread) {
111 0           cleanup_thread_half= true;
112 0           destroy= --result->refcount == 0;
113             } else {
114             /* check whether thread exited without cleaning up, and dec refcount if so */
115 0 0         if (!result->ready && !ASYNC_RESULT_IS_THREAD_ALIVE(result)) {
    0          
    0          
116             dTHX;
117 0           warn("writer thread died without cleaning up");
118 0           cleanup_thread_half= true;
119 0           result->ready= true;
120 0           --result->refcount;
121             }
122 0           destroy= --result->refcount == 0;
123 0 0         cleanup_perl_half= destroy || (!result->ready && result->refcount == 1);
    0          
    0          
124             }
125 0 0         if (cleanup_thread_half) {
126 0           result->ready= true;
127 0           secret_buffer_wipe(result->secret, result->secret_len);
128             #ifdef WIN32
129             if (result->fd != INVALID_HANDLE_VALUE)
130             CloseHandle(result->fd);
131             result->fd= INVALID_HANDLE_VALUE;
132             #else
133 0 0         if (result->fd >= 0)
134 0           close(result->fd);
135 0           result->fd= -1;
136             #endif
137             }
138 0 0         if (cleanup_perl_half) {
139             /* Detach so resources are freed on exit */
140             #ifdef WIN32
141             CloseHandle(result->threadHandle);
142             #else /* POSIX */
143 0           pthread_detach(result->threadHandle); /* Parent continues without waiting for child */
144             #endif
145             }
146 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
147             /* can't destroy mutexes while locked */
148 0 0         if (destroy) {
149             #ifdef WIN32
150             DeleteCriticalSection(&result->cs);
151             CloseHandle(result->startEvent);
152             CloseHandle(result->readyEvent);
153             if (result->fd != INVALID_HANDLE_VALUE)
154             CloseHandle(result->fd);
155             GlobalFree(result);
156             #else
157 0           pthread_mutex_destroy(&result->mutex);
158 0           pthread_cond_destroy(&result->cond);
159 0           free(result);
160             #endif
161             }
162 0           }
163              
164 0           static void secret_buffer_async_result_acquire(secret_buffer_async_result *result) {
165 0           ASYNC_RESULT_MUTEX_LOCK(result);
166 0           ++result->refcount;
167 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
168 0           }
169              
170 0           static int secret_buffer_async_result_magic_free(pTHX_ SV *sv, MAGIC *mg) {
171 0           secret_buffer_async_result_release((secret_buffer_async_result *) mg->mg_ptr, false);
172 0           return 0;
173             }
174             #ifdef USE_ITHREADS
175             int secret_buffer_async_result_magic_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *p) {
176             secret_buffer_async_result_acquire((secret_buffer_async_result *) mg->mg_ptr);
177             return 0;
178             }
179             #endif
180              
181 0           void secret_buffer_async_result_send(secret_buffer_async_result *result, IV os_err) {
182 0           ASYNC_RESULT_MUTEX_LOCK(result);
183 0           result->os_err= os_err;
184 0           result->ready= true;
185             #ifdef WIN32
186             SetEvent(result->readyEvent);
187             #else
188 0           pthread_cond_signal(&result->cond);
189             #endif
190 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
191 0           }
192              
193 0           bool secret_buffer_async_result_recv(secret_buffer_async_result *result, IV timeout_msec, IV *total_written, IV *os_err) {
194 0           bool ready= false;
195             #ifdef WIN32
196             DWORD ret = WaitForSingleObject(result->readyEvent, timeout_msec < 0? INFINITE : timeout_msec);
197             if (ret == WAIT_TIMEOUT)
198             return false;
199             if (ret != WAIT_OBJECT_0)
200             croak_with_syserror("WaitForSingleObject", GetLastError());
201             ready= true;
202             ASYNC_RESULT_MUTEX_LOCK(result);
203             #else
204             struct timespec ts;
205             /* timedwait operates on absolute wallclock time
206             * This is sort of dangerous since wallclock time can change... but the only
207             * alternative I see would be to play with the alarm signal.
208             */
209 0 0         if (timeout_msec >= 0) {
210 0           clock_gettime(CLOCK_REALTIME, &ts);
211 0           ts.tv_sec += timeout_msec / 1000;
212 0           ts.tv_nsec += (timeout_msec % 1000) * 1000000;
213 0 0         if (ts.tv_nsec >= 1000000000) {
214 0           ts.tv_sec += 1;
215 0           ts.tv_nsec -= 1000000000;
216             }
217             }
218 0           ASYNC_RESULT_MUTEX_LOCK(result);
219             /* Wait until data is ready or timeout occurs */
220 0 0         while (!result->ready) {
221 0           int rc = timeout_msec < 0? pthread_cond_wait(&result->cond, &result->mutex)
222 0 0         : pthread_cond_timedwait(&result->cond, &result->mutex, &ts);
223 0 0         if (rc == ETIMEDOUT)
224 0           break;
225 0           ready= result->ready;
226             }
227             #endif
228             /* If we got the data successfully, read it and reset the ready flag */
229 0 0         if (ready) {
230 0 0         if (total_written) *total_written= result->total_written;
231 0 0         if (os_err) *os_err= result->os_err;
232             }
233 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
234 0           return ready;
235             }
236              
237 0           bool secret_buffer_result_check(SV *promise_ref, int timeout_msec, IV *wrote, IV *os_err) {
238 0           return secret_buffer_async_result_recv(
239             secret_buffer_async_result_from_magic(promise_ref, SECRET_BUFFER_MAGIC_OR_DIE),
240             timeout_msec, wrote, os_err);
241             }
242              
243              
244             /* Worker thread for background writing.
245             * This thread receives a copy of the secret in the secret_buffer_async_result
246             * (along with a duplicated file handle) and it uses blocking writes to push the
247             * data through the handle, It updates the async_result fields as it goes,
248             * then uses a condvar/event to flag the main thread when it is done.
249             * The thread is responsible for erasing the secret, but the main thread can
250             * also erase the secret if it sees that the worker thread died.
251             */
252             #ifdef WIN32
253             DWORD WINAPI secret_buffer_async_writer(LPVOID arg) {
254             #else
255 0           void *secret_buffer_async_writer(void *arg) {
256             #endif
257 0           secret_buffer_async_result *result = (secret_buffer_async_result *) arg;
258 0           ASYNC_RESULT_MUTEX_LOCK(result);
259 0           ++result->refcount;
260 0           result->started= true;
261 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
262             /* no need to lock mutex for the written/secret/fd fields since the receiver isn't
263             * allowed to use them until this thread sets 'ready' or thread dies */
264             #ifdef WIN32
265             while (result->total_written < result->secret_len) {
266             DWORD wrote;
267             if (WriteFile((HANDLE) result->fd, result->secret + result->total_written,
268             (DWORD)(result->secret_len - result->total_written), &wrote, NULL)
269             ) {
270             if (wrote == 0) {
271             secret_buffer_async_result_send(result, 0);
272             break;
273             }
274             result->total_written += wrote;
275             }
276             else {
277             secret_buffer_async_result_send(result, GetLastError());
278             }
279             }
280             #else /* POSIX */
281             /* Blocking mode assumed */
282 0 0         while (result->total_written < result->secret_len) {
283 0           ssize_t wrote= write(result->fd, result->secret + result->total_written,
284 0           result->secret_len - result->total_written);
285 0 0         if (wrote <= 0) {
286 0 0         if (wrote < 0 && errno == EINTR)
    0          
287 0           continue;
288 0 0         else if (wrote < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
    0          
    0          
289             /* it's a nonblocking handle. use select() to wait for it to become writable */
290             fd_set writeable;
291 0 0         FD_ZERO(&writeable);
292 0           FD_SET(result->fd, &writeable);
293 0 0         if (select(result->fd + 1, NULL, &writeable, NULL, NULL) > 0 || errno == EINTR)
    0          
294             /* next write attempt should make progress */
295 0           continue;
296             /* something went wrong, bail out */
297             }
298 0 0         secret_buffer_async_result_send(result, wrote == 0? 0 : errno);
299 0           break;
300             }
301 0           result->total_written += wrote;
302             }
303             #endif /* POSIX */
304 0           secret_buffer_async_result_release(result, true);
305 0           return 0;
306             }
307              
308 3           IV secret_buffer_write_async(secret_buffer *buf, PerlIO *fh, IV offset, IV count, SV **ref_out) {
309             dTHX;
310 3           IV total_written= 0;
311 3           secret_buffer_async_result *result= NULL;
312             int fd;
313             /* translate negative offset or negative count, in the manner of substr */
314 3           offset= normalize_offset(offset, buf->len);
315 3           count= normalize_offset(count, buf->len - offset);
316             /* flush any buffered data already on the handle */
317 3           PerlIO_flush(fh);
318              
319 3 50         if (count == 0)
320 0           return 0;
321              
322 3           fd= PerlIO_fileno(fh);
323 3 50         if (fd < 0)
324 0           croak("Invalid file descriptor");
325             #ifdef WIN32
326             /* On Windows, there is no universal way to attempt a nonblocking write. So, check if its a
327             pipe, and then use nonblocking pipe write, and otherwise always create the worker thread.
328             The intent for this module is to write small secrets on pipes, so in most cases the
329             thread won't be created anyway.
330             */
331             {
332             HANDLE hFile = (HANDLE)_get_osfhandle(fd);
333             DWORD ret;
334             if (GetFileType(hFile) == FILE_TYPE_PIPE) {
335             DWORD origPipeMode = 0;
336             DWORD bytesWritten, lastError;
337             BOOL success;
338              
339             if (!GetNamedPipeHandleState(hFile, &origPipeMode, NULL, NULL, NULL, NULL, 0))
340             croak_with_syserror("GetNamedPipeHandleState", GetLastError());
341             if (!(origPipeMode & PIPE_NOWAIT)) {
342             /* Set pipe to non-blocking mode temporarily */
343             DWORD nonBlockMode = PIPE_NOWAIT;
344             if (!SetNamedPipeHandleState(hFile, &nonBlockMode, NULL, NULL))
345             croak_with_syserror("SetNamedPipeHandleState", GetLastError());
346             }
347              
348             /* Try nonblocking write */
349             success= WriteFile(hFile, buf->data + offset, count, &bytesWritten, NULL);
350             lastError = GetLastError();
351              
352             /* Restore original pipe state */
353             if (!(origPipeMode & PIPE_NOWAIT)) {
354             SetNamedPipeHandleState(hFile, &origPipeMode, NULL, NULL);
355             SetLastError(lastError);
356             }
357            
358             if (success && bytesWritten == count)
359             return count; /* Write completed immediately */
360             else if (!success && lastError != ERROR_NO_DATA)
361             /* an actual error */
362             return -1;
363             total_written= (IV) bytesWritten;
364             }
365             /* Launch thread */
366             result= secret_buffer_async_result_new(fd, buf, offset, count);
367             result->total_written= total_written;
368             result->threadHandle= CreateThread(
369             NULL, /* default security attributes */
370             0, /* default stack size */
371             (LPTHREAD_START_ROUTINE)secret_buffer_async_writer,
372             result, /* thread parameter */
373             0, /* default creation flags */
374             NULL); /* receive thread identifier */
375             if (result->threadHandle == NULL) {
376             secret_buffer_async_result_release(result, false);
377             croak_with_syserror("Failed to create thread", GetLastError());
378             }
379             /* make sure thread starts and takes ownership of its refcount */
380             WaitForSingleObject(result->startEvent, INFINITE);
381             }
382             #else /* POSIX */
383             {
384             pthread_t thread;
385             /* Set non-blocking mode if needed */
386 3           int old_flags = fcntl(fd, F_GETFL);
387 3 50         if (!(old_flags & O_NONBLOCK))
388 3 50         if (fcntl(fd, F_SETFL, old_flags | O_NONBLOCK) < 0)
389 0           croak_with_syserror("Failed to set nonblocking mode on handle", errno);
390              
391             /* First write attempt */
392 3           total_written = write(fd, buf->data + offset, count);
393              
394             /* Restore blocking mode if we changed it */
395 3 50         if (!(old_flags & O_NONBLOCK)) {
396 3           int save_errno= errno;
397 3           fcntl(fd, F_SETFL, old_flags);
398 3           errno= save_errno;
399             }
400              
401 3 50         if (total_written == count)
402 3           return count; /* Write completed immediately */
403 0 0         else if (total_written < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
    0          
    0          
404 0           return -1; /* actual error */
405 0 0         if (total_written < 0)
406 0           total_written= 0;
407             /* launch thread */
408 0           result= secret_buffer_async_result_new(fd, buf, offset, count);
409 0           result->total_written= total_written;
410 0 0         if (pthread_create(&thread, NULL, secret_buffer_async_writer, result) != 0) {
411 0           secret_buffer_async_result_release(result, false);
412 0           croak_with_syserror("Failed to create thread", errno);
413             }
414             /* make sure thread starts and takes ownership of its refcount */
415 0           ASYNC_RESULT_MUTEX_LOCK(result);
416 0 0         if (!result->started)
417 0           pthread_cond_wait(&result->cond, &result->mutex);
418 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
419             } /* POSIX */
420             #endif
421 0 0         if (ref_out)
422             /* Caller requests a reference to the result */
423 0           *ref_out= secret_buffer_async_result_wrap_with_object(aTHX_ result);
424             else
425             /* nobody cares, so release our reference to the result. The thread will carry on silently */
426 0           secret_buffer_async_result_release(result, false);
427 0           return 0;
428             }
429