File Coverage

secret_buffer_async_write.c
Criterion Covered Total %
statement 19 147 12.9
branch 6 92 6.5
condition n/a
subroutine n/a
pod n/a
total 25 239 10.4


line stmt bran cond sub pod time code
1             /**********************************************************************************************\
2             * Async write implementation
3             \**********************************************************************************************/
4              
5             typedef struct {
6             int refcount;
7             #ifdef WIN32
8             CRITICAL_SECTION cs;
9             HANDLE startEvent, readyEvent, threadHandle, fd;
10             #define ASYNC_RESULT_MUTEX_LOCK(x) EnterCriticalSection(&((x)->cs))
11             #define ASYNC_RESULT_MUTEX_UNLOCK(x) LeaveCriticalSection(&((x)->cs))
12             #define ASYNC_RESULT_IS_THREAD_ALIVE(x) ((x)->threadHandle != INVALID_HANDLE_VALUE && WaitForSingleObject((x)->threadHandle, 0) == WAIT_TIMEOUT)
13             #define ASYNC_RESULT_NOTIFY_STARTED(x) SetEvent((x)->readyEvent)
14             #define ASYNC_RESULT_NOTIFY_COMPLETE(x) SetEvent((x)->startEvent)
15             #else
16             pthread_mutex_t mutex;
17             pthread_cond_t cond;
18             pthread_t threadHandle;
19             int fd;
20             #define ASYNC_RESULT_MUTEX_LOCK(x) pthread_mutex_lock(&((x)->mutex))
21             #define ASYNC_RESULT_MUTEX_UNLOCK(x) pthread_mutex_unlock(&((x)->mutex))
22             #define ASYNC_RESULT_IS_THREAD_ALIVE(x) ((x)->threadHandle > 0 && pthread_kill((x)->threadHandle, 0) == 0)
23             #define ASYNC_RESULT_NOTIFY_STARTED(x) pthread_cond_signal(&((x)->cond))
24             #define ASYNC_RESULT_NOTIFY_COMPLETE(x) pthread_cond_signal(&((x)->cond))
25             #endif
26             bool started, ready;
27             IV os_err;
28             IV total_written;
29             IV secret_len;
30             char secret[];
31             } secret_buffer_async_result;
32              
33 0           static SV *secret_buffer_async_result_wrap_with_object(pTHX_ secret_buffer_async_result *result) {
34 0           SV *ref= sv_2mortal(newRV_noinc(newSV(0)));
35 0           MAGIC *mg= sv_magicext(SvRV(ref), NULL, PERL_MAGIC_ext, &secret_buffer_async_result_magic_vtbl, (const char *)result, 0);
36             #ifdef USE_ITHREADS
37             mg->mg_flags |= MGf_DUP;
38             #else
39             PERL_UNUSED_VAR(mg);
40             #endif
41 0           return sv_bless(ref, gv_stashpv("Crypt::SecretBuffer::AsyncResult", GV_ADD));
42             }
43              
44 0           static secret_buffer_async_result* secret_buffer_async_result_from_magic(SV *obj, int flags) {
45             dTHX;
46 0           return (secret_buffer_async_result*) secret_buffer_X_from_magic(aTHX_
47             obj, flags, &secret_buffer_async_result_magic_vtbl, "secret_buffer_async_result", NULL);
48             }
49              
50 0           static secret_buffer_async_result *secret_buffer_async_result_new(int fd, secret_buffer *buf, size_t ofs, size_t count) {
51             #ifdef WIN32
52             /* On Win32, Perl redefines 'malloc' and 'free' to call methods on the perl thread instance
53             * which would be bad when a thread which perl is not aware of is the one calling 'free'.
54             * So use Win32 API GlobalAlloc/GlobalFree.
55             */
56             secret_buffer_async_result *result= (secret_buffer_async_result *)
57             GlobalAlloc(GMEM_FIXED|GMEM_ZEROINIT, sizeof(secret_buffer_async_result) + count);
58             if (!result)
59             croak_with_syserror("malloc(sizeof(secret_buffer_async_result) + count)", GetLastError());
60             Zero(((char*)result), sizeof(secret_buffer_async_result) + count, char);
61             InitializeCriticalSection(&result->cs);
62             /* Duplicate the file handle for the thread */
63             if (fd >= 0) {
64             if (!DuplicateHandle(GetCurrentProcess(), (HANDLE)_get_osfhandle(fd), GetCurrentProcess(), &result->fd,
65             0, FALSE, DUPLICATE_SAME_ACCESS)
66             ) {
67             GlobalFree(result);
68             croak_with_syserror("DuplicateHandle", GetLastError());
69             }
70             } else {
71             result->fd= INVALID_HANDLE_VALUE;
72             }
73             result->readyEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
74             result->startEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
75             if (result->readyEvent == NULL || result->startEvent == NULL) {
76             if (result->readyEvent) CloseHandle(result->readyEvent);
77             CloseHandle(result->fd);
78             GlobalFree(result);
79             croak_with_syserror("CreateEvent", GetLastError());
80             }
81             #else /* POSIX */
82             secret_buffer_async_result *result= (secret_buffer_async_result *)
83 0           malloc(sizeof(secret_buffer_async_result) + count);
84 0           Zero(((char*)result), sizeof(secret_buffer_async_result) + count, char);
85 0 0         result->fd= fd >= 0? dup(fd) : -1;
86 0 0         if (result->fd < 0) {
87 0           free(result);
88 0           croak_with_syserror("dup", errno);
89             }
90 0           pthread_mutex_init(&result->mutex, NULL);
91 0           pthread_cond_init(&result->cond, NULL);
92             #endif
93 0           result->ready= result->started= false;
94 0 0         if (count)
95 0           memcpy(buf->data + ofs, result->secret, count);
96 0           result->secret_len= count;
97 0           result->refcount= 1;
98 0           return result;
99             }
100              
101             /* One refcount is held by the main thread, and one by the worker thread */
102 0           static void secret_buffer_async_result_release(secret_buffer_async_result *result, bool from_thread) {
103 0           bool destroy= false, cleanup_thread_half= false, cleanup_perl_half= false;
104 0           ASYNC_RESULT_MUTEX_LOCK(result);
105 0 0         if (from_thread) {
106 0           cleanup_thread_half= true;
107 0           destroy= --result->refcount == 0;
108             } else {
109             /* check whether thread exited without cleaning up, and dec refcount if so */
110 0 0         if (!result->ready && !ASYNC_RESULT_IS_THREAD_ALIVE(result)) {
    0          
    0          
111             dTHX;
112 0           warn("writer thread died without cleaning up");
113 0           cleanup_thread_half= true;
114 0           result->ready= true;
115 0           --result->refcount;
116             }
117 0           destroy= --result->refcount == 0;
118 0 0         cleanup_perl_half= destroy || (!result->ready && result->refcount == 1);
    0          
    0          
119             }
120 0 0         if (cleanup_thread_half) {
121 0           result->ready= true;
122 0           secret_buffer_wipe(result->secret, result->secret_len);
123             #ifdef WIN32
124             if (result->fd != INVALID_HANDLE_VALUE)
125             CloseHandle(result->fd);
126             result->fd= INVALID_HANDLE_VALUE;
127             #else
128 0 0         if (result->fd >= 0)
129 0           close(result->fd);
130 0           result->fd= -1;
131             #endif
132             }
133 0 0         if (cleanup_perl_half) {
134             /* Detach so resources are freed on exit */
135             #ifdef WIN32
136             CloseHandle(result->threadHandle);
137             #else /* POSIX */
138 0           pthread_detach(result->threadHandle); /* Parent continues without waiting for child */
139             #endif
140             }
141 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
142             /* can't destroy mutexes while locked */
143 0 0         if (destroy) {
144             #ifdef WIN32
145             DeleteCriticalSection(&result->cs);
146             CloseHandle(result->startEvent);
147             CloseHandle(result->readyEvent);
148             if (result->fd != INVALID_HANDLE_VALUE)
149             CloseHandle(result->fd);
150             GlobalFree(result);
151             #else
152 0           pthread_mutex_destroy(&result->mutex);
153 0           pthread_cond_destroy(&result->cond);
154 0           free(result);
155             #endif
156             }
157 0           }
158              
159 0           static void secret_buffer_async_result_acquire(secret_buffer_async_result *result) {
160 0           ASYNC_RESULT_MUTEX_LOCK(result);
161 0           ++result->refcount;
162 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
163 0           }
164              
165 0           static int secret_buffer_async_result_magic_free(pTHX_ SV *sv, MAGIC *mg) {
166 0           secret_buffer_async_result_release((secret_buffer_async_result *) mg->mg_ptr, false);
167 0           return 0;
168             }
169             #ifdef USE_ITHREADS
170             int secret_buffer_async_result_magic_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *p) {
171             secret_buffer_async_result_acquire((secret_buffer_async_result *) mg->mg_ptr);
172             return 0;
173             }
174             #endif
175              
176 0           void secret_buffer_async_result_send(secret_buffer_async_result *result, IV os_err) {
177 0           ASYNC_RESULT_MUTEX_LOCK(result);
178 0           result->os_err= os_err;
179 0           result->ready= true;
180             #ifdef WIN32
181             SetEvent(result->readyEvent);
182             #else
183 0           pthread_cond_signal(&result->cond);
184             #endif
185 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
186 0           }
187              
188 0           bool secret_buffer_async_result_recv(secret_buffer_async_result *result, IV timeout_msec, IV *total_written, IV *os_err) {
189 0           bool ready= false;
190             #ifdef WIN32
191             DWORD ret = WaitForSingleObject(result->readyEvent, timeout_msec < 0? INFINITE : timeout_msec);
192             if (ret == WAIT_TIMEOUT)
193             return false;
194             if (ret != WAIT_OBJECT_0)
195             croak_with_syserror("WaitForSingleObject", GetLastError());
196             ready= true;
197             ASYNC_RESULT_MUTEX_LOCK(result);
198             #else
199             struct timespec ts;
200             /* timedwait operates on absolute wallclock time
201             * This is sort of dangerous since wallclock time can change... but the only
202             * alternative I see would be to play with the alarm signal.
203             */
204 0 0         if (timeout_msec >= 0) {
205 0           clock_gettime(CLOCK_REALTIME, &ts);
206 0           ts.tv_sec += timeout_msec / 1000;
207 0           ts.tv_nsec += (timeout_msec % 1000) * 1000000;
208 0 0         if (ts.tv_nsec >= 1000000000) {
209 0           ts.tv_sec += 1;
210 0           ts.tv_nsec -= 1000000000;
211             }
212             }
213 0           ASYNC_RESULT_MUTEX_LOCK(result);
214             /* Wait until data is ready or timeout occurs */
215 0 0         while (!result->ready) {
216 0           int rc = timeout_msec < 0? pthread_cond_wait(&result->cond, &result->mutex)
217 0 0         : pthread_cond_timedwait(&result->cond, &result->mutex, &ts);
218 0 0         if (rc == ETIMEDOUT)
219 0           break;
220 0           ready= result->ready;
221             }
222             #endif
223             /* If we got the data successfully, read it and reset the ready flag */
224 0 0         if (ready) {
225 0 0         if (total_written) *total_written= result->total_written;
226 0 0         if (os_err) *os_err= result->os_err;
227             }
228 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
229 0           return ready;
230             }
231              
232 0           bool secret_buffer_result_check(SV *promise_ref, int timeout_msec, IV *wrote, IV *os_err) {
233 0           return secret_buffer_async_result_recv(
234             secret_buffer_async_result_from_magic(promise_ref, SECRET_BUFFER_MAGIC_OR_DIE),
235             timeout_msec, wrote, os_err);
236             }
237              
238              
239             /* Worker thread for background writing.
240             * This thread receives a copy of the secret in the secret_buffer_async_result
241             * (along with a duplicated file handle) and it uses blocking writes to push the
242             * data through the handle, It updates the async_result fields as it goes,
243             * then uses a condvar/event to flag the main thread when it is done.
244             * The thread is responsible for erasing the secret, but the main thread can
245             * also erase the secret if it sees that the worker thread died.
246             */
247             #ifdef WIN32
248             DWORD WINAPI secret_buffer_async_writer(LPVOID arg) {
249             #else
250 0           void *secret_buffer_async_writer(void *arg) {
251             #endif
252 0           secret_buffer_async_result *result = (secret_buffer_async_result *) arg;
253 0           ASYNC_RESULT_MUTEX_LOCK(result);
254 0           ++result->refcount;
255 0           result->started= true;
256 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
257             /* no need to lock mutex for the written/secret/fd fields since the receiver isn't
258             * allowed to use them until this thread sets 'ready' or thread dies */
259             #ifdef WIN32
260             while (result->total_written < result->secret_len) {
261             DWORD wrote;
262             if (WriteFile((HANDLE) result->fd, result->secret + result->total_written,
263             (DWORD)(result->secret_len - result->total_written), &wrote, NULL)
264             ) {
265             if (wrote == 0) {
266             secret_buffer_async_result_send(result, 0);
267             break;
268             }
269             result->total_written += wrote;
270             }
271             else {
272             secret_buffer_async_result_send(result, GetLastError());
273             }
274             }
275             #else /* POSIX */
276             /* Blocking mode assumed */
277 0 0         while (result->total_written < result->secret_len) {
278 0           ssize_t wrote= write(result->fd, result->secret + result->total_written,
279 0           result->secret_len - result->total_written);
280 0 0         if (wrote <= 0) {
281 0 0         if (wrote < 0 && errno == EINTR)
    0          
282 0           continue;
283 0 0         else if (wrote < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
    0          
    0          
284             /* it's a nonblocking handle. use select() to wait for it to become writable */
285             fd_set writeable;
286 0 0         FD_ZERO(&writeable);
287 0           FD_SET(result->fd, &writeable);
288 0 0         if (select(result->fd + 1, NULL, &writeable, NULL, NULL) > 0 || errno == EINTR)
    0          
289             /* next write attempt should make progress */
290 0           continue;
291             /* something went wrong, bail out */
292             }
293 0 0         secret_buffer_async_result_send(result, wrote == 0? 0 : errno);
294 0           break;
295             }
296 0           result->total_written += wrote;
297             }
298             #endif /* POSIX */
299 0           secret_buffer_async_result_release(result, true);
300 0           return 0;
301             }
302              
303 3           IV secret_buffer_write_async(secret_buffer *buf, PerlIO *fh, IV offset, IV count, SV **ref_out) {
304             dTHX;
305 3           IV total_written= 0;
306 3           secret_buffer_async_result *result= NULL;
307             int fd;
308             /* translate negative offset or negative count, in the manner of substr */
309 3           offset= normalize_offset(offset, buf->len);
310 3           count= normalize_offset(count, buf->len - offset);
311             /* flush any buffered data already on the handle */
312 3           PerlIO_flush(fh);
313              
314 3 50         if (count == 0)
315 0           return 0;
316              
317 3           fd= PerlIO_fileno(fh);
318 3 50         if (fd < 0)
319 0           croak("Invalid file descriptor");
320             #ifdef WIN32
321             /* On Windows, there is no universal way to attempt a nonblocking write. So, check if its a
322             pipe, and then use nonblocking pipe write, and otherwise always create the worker thread.
323             The intent for this module is to write small secrets on pipes, so in most cases the
324             thread won't be created anyway.
325             */
326             {
327             HANDLE hFile = (HANDLE)_get_osfhandle(fd);
328             DWORD ret;
329             if (GetFileType(hFile) == FILE_TYPE_PIPE) {
330             DWORD origPipeMode = 0;
331             DWORD bytesWritten, lastError;
332             BOOL success;
333              
334             if (!GetNamedPipeHandleState(hFile, &origPipeMode, NULL, NULL, NULL, NULL, 0))
335             croak_with_syserror("GetNamedPipeHandleState", GetLastError());
336             if (!(origPipeMode & PIPE_NOWAIT)) {
337             /* Set pipe to non-blocking mode temporarily */
338             DWORD nonBlockMode = PIPE_NOWAIT;
339             if (!SetNamedPipeHandleState(hFile, &nonBlockMode, NULL, NULL))
340             croak_with_syserror("SetNamedPipeHandleState", GetLastError());
341             }
342              
343             /* Try nonblocking write */
344             success= WriteFile(hFile, buf->data + offset, count, &bytesWritten, NULL);
345             lastError = GetLastError();
346              
347             /* Restore original pipe state */
348             if (!(origPipeMode & PIPE_NOWAIT)) {
349             SetNamedPipeHandleState(hFile, &origPipeMode, NULL, NULL);
350             SetLastError(lastError);
351             }
352            
353             if (success && bytesWritten == count)
354             return count; /* Write completed immediately */
355             else if (!success && lastError != ERROR_NO_DATA)
356             /* an actual error */
357             return -1;
358             total_written= (IV) bytesWritten;
359             }
360             /* Launch thread */
361             result= secret_buffer_async_result_new(fd, buf, offset, count);
362             result->total_written= total_written;
363             result->threadHandle= CreateThread(
364             NULL, /* default security attributes */
365             0, /* default stack size */
366             (LPTHREAD_START_ROUTINE)secret_buffer_async_writer,
367             result, /* thread parameter */
368             0, /* default creation flags */
369             NULL); /* receive thread identifier */
370             if (result->threadHandle == NULL) {
371             secret_buffer_async_result_release(result, false);
372             croak_with_syserror("Failed to create thread", GetLastError());
373             }
374             /* make sure thread starts and takes ownership of its refcount */
375             WaitForSingleObject(result->startEvent, INFINITE);
376             }
377             #else /* POSIX */
378             {
379             pthread_t thread;
380             /* Set non-blocking mode if needed */
381 3           int old_flags = fcntl(fd, F_GETFL);
382 3 50         if (!(old_flags & O_NONBLOCK))
383 3 50         if (fcntl(fd, F_SETFL, old_flags | O_NONBLOCK) < 0)
384 0           croak_with_syserror("Failed to set nonblocking mode on handle", errno);
385              
386             /* First write attempt */
387 3           total_written = write(fd, buf->data + offset, count);
388              
389             /* Restore blocking mode if we changed it */
390 3 50         if (!(old_flags & O_NONBLOCK)) {
391 3           int save_errno= errno;
392 3           fcntl(fd, F_SETFL, old_flags);
393 3           errno= save_errno;
394             }
395              
396 3 50         if (total_written == count)
397 3           return count; /* Write completed immediately */
398 0 0         else if (total_written < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
    0          
    0          
399 0           return -1; /* actual error */
400 0 0         if (total_written < 0)
401 0           total_written= 0;
402             /* launch thread */
403 0           result= secret_buffer_async_result_new(fd, buf, offset, count);
404 0           result->total_written= total_written;
405 0 0         if (pthread_create(&thread, NULL, secret_buffer_async_writer, result) != 0) {
406 0           secret_buffer_async_result_release(result, false);
407 0           croak_with_syserror("Failed to create thread", errno);
408             }
409             /* make sure thread starts and takes ownership of its refcount */
410 0           ASYNC_RESULT_MUTEX_LOCK(result);
411 0 0         if (!result->started)
412 0           pthread_cond_wait(&result->cond, &result->mutex);
413 0           ASYNC_RESULT_MUTEX_UNLOCK(result);
414             } /* POSIX */
415             #endif
416 0 0         if (ref_out)
417             /* Caller requests a reference to the result */
418 0           *ref_out= secret_buffer_async_result_wrap_with_object(aTHX_ result);
419             else
420             /* nobody cares, so release our reference to the result. The thread will carry on silently */
421 0           secret_buffer_async_result_release(result, false);
422 0           return 0;
423             }
424