File Coverage

SocketAlarm_watcher.c
Criterion Covered Total %
statement 124 192 64.5
branch 81 170 47.6
condition n/a
subroutine n/a
pod n/a
total 205 362 56.6


line stmt bran cond sub pod time code
1             #include "pollfd_rbhash.c"
2              
3             // Returns false when time to exit
4             static bool do_watch();
5              
6 0           void watch_thread_log(void* buffer, int len) {
7 0 0         int unused= write(2, len <= 0? "error\n":buffer, len <= 0? 6 : len);
    0          
8             (void) unused;
9 0           }
10             #ifdef WATCHTHREAD_DEBUGGING
11             #define WATCHTHREAD_DEBUG(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ ))
12             #define WATCHTHREAD_WARN(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ ))
13             #else
14             #define WATCHTHREAD_DEBUG(fmt, ...) ((void)0)
15             #define WATCHTHREAD_WARN(fmt, ...) ((void)0)
16             #endif
17              
18 1           void* watch_thread_main(void* unused) {
19 24 100         while (do_watch()) {}
20 1           return NULL;
21             }
22              
23             // separate from watch_thread_main because it uses a dynamic alloca() on each iteration
24 24           bool do_watch() {
25             struct pollfd *pollset;
26 24           struct timespec wake_time= { 0, -1 };
27 24           int capacity, buckets, sz, n_poll, i, j, n, delay= 10000;
28             char msgbuf[128];
29            
30 24 50         if (pthread_mutex_lock(&watch_list_mutex))
31 0           abort(); // should never fail
32             // allocate to the size of watch_list_count, but cap it at 1024 for sanity
33             // since this is coming off the stack. If any user actually wants to watch
34             // more than 1024 sockets, this needs re-designed, but I'm not sure if
35             // malloc is thread-safe when the main perl binary was compiled without
36             // thread support.
37 24 50         capacity= watch_list_count > 1024? 1024 : watch_list_count+1;
38 24 50         buckets= capacity < 16? 16 : capacity < 128? 32 : 64;
    0          
39 24 50         sz= sizeof(struct pollfd) * capacity + POLLFD_RBHASH_SIZEOF(capacity, buckets);
40 24           pollset= (struct pollfd *) alloca(sz);
41 24           memset(pollset, 0, sz);
42            
43             // first fd is always our control socket
44 24           pollset[0].fd= control_pipe[0];
45 24           pollset[0].events= POLLIN;
46 24           n_poll= 1;
47             WATCHTHREAD_DEBUG("watch_thread loop iter, watch_list_count=%d capacity=%d buckets=%d\n", watch_list_count, capacity, buckets);
48 203 100         for (i= 0, n= watch_list_count; i < n && n_poll < capacity; i++) {
    50          
49 179           struct socketalarm *alarm= watch_list[i];
50 179           int fd= alarm->watch_fd;
51             // Ignore watches that finished. Main thread needs to clean these up.
52 179 100         if (alarm->cur_action >= alarm->action_count)
53 57           continue;
54             // If the socketalarm is in the process of being executed and stopped at
55             // a 'sleep' command, factor that into the wake time.
56 122 50         if (alarm->wake_ts.tv_nsec != -1) {
57 0 0         if (wake_time.tv_nsec == -1
58 0 0         || wake_time.tv_sec > alarm->wake_ts.tv_sec
59 0 0         || (wake_time.tv_sec == alarm->wake_ts.tv_sec && wake_time.tv_nsec > alarm->wake_ts.tv_nsec)
    0          
60             ) {
61 0           wake_time.tv_sec= alarm->wake_ts.tv_sec;
62 0           wake_time.tv_nsec= alarm->wake_ts.tv_nsec;
63             }
64             }
65             else {
66             // Find a pollset slot for this file descriptor, collapsing duplicates.
67             // The next unused one is at [n_poll], which has NodeID n_poll+1
68 122           int poll_i= -1 + (int)pollfd_rbhash_insert(pollset+capacity, capacity, n_poll+1, fd & (buckets-1), fd);
69 122 50         if (poll_i < 0) { // corrupt datastruct, should never happen
70             WATCHTHREAD_WARN("BUG: corrupt pollfd_rbhash");
71 0           break;
72             }
73 122 50         if (poll_i == n_poll) { // using the new uninitialized one?
74 122           pollset[poll_i].fd= fd;
75 122           pollset[poll_i].events= 0;
76 122           ++n_poll;
77             }
78             // Add the poll flags of this socketalarm
79 122           int events= alarm->event_mask;
80             #ifdef POLLRDHUP
81 122 50         if (events & EVENT_SHUT)
82 122           pollset[poll_i].events= POLLRDHUP;
83             #endif
84 122 50         if (events & EVENT_EOF) {
85             // If a fd gets data in the queue, there is no way to wait exclusively
86             // for the EOF event. We have to wake up periodically to check the socket.
87 0 0         if (alarm->unwaitable) // will be set if found data queued in the buffer
88 0           delay= 500;
89             else
90 0           pollset[poll_i].events= POLLIN;
91             }
92 122 50         if (events & EVENT_IN)
93 0           pollset[poll_i].events= POLLIN;
94 122 50         if (events & EVENT_PRI)
95 0           pollset[poll_i].events= POLLPRI;
96 122 50         if (events & EVENT_CLOSE) {
97             // According to poll docs, it is a bug to assume closing a socket in one thread
98             // will wake a 'poll' in another thread, so if the user wants to know about this
99             // condition, we have to loop more quickly.
100 0           delay= 500;
101             }
102             }
103             }
104 24           pthread_mutex_unlock(&watch_list_mutex);
105              
106             // If there is a defined wake-time, truncate the delay if the wake-time comes first
107 24 50         if (wake_time.tv_nsec != -1) {
108 0           struct timespec now_ts= { 0, -1 };
109 0 0         if (lazy_build_now_ts(&now_ts)) {
110             // subtract to find out delay. poll only has millisecond precision anyway.
111 0           int wake_delay= ((long)wake_time.tv_sec - (long)now_ts.tv_sec) * 1000
112 0           + (wake_time.tv_nsec - now_ts.tv_nsec)/1000000;
113 0 0         if (wake_delay < delay)
114 0           delay= wake_delay;
115             }
116             }
117             WATCHTHREAD_DEBUG("poll(n=%d delay=%d)\n", n_poll, delay);
118 24 50         if (poll(pollset, n_poll, delay < 0? 0 : delay) < 0) {
119 0           perror("poll");
120 0           return false;
121             }
122 170 100         for (i= 0; i < n_poll; i++) {
123 146           int e= pollset[i].revents;
124             WATCHTHREAD_DEBUG(" fd=%3d revents=%02X (%s%s%s%s%s%s%s)\n", pollset[i].fd, e,
125             e&POLLIN? " IN":"", e&POLLPRI? " PRI":"", e&POLLOUT? " OUT":"",
126             #ifdef POLLRDHUP
127             e&POLLRDHUP? " RDHUP":"",
128             #else
129             "",
130             #endif
131             e&POLLERR? " ERR":"", e&POLLHUP? " HUP":"", e&POLLNVAL? " NVAL":"");
132             }
133            
134             // First, did we get new control messages?
135 24 100         if (pollset[0].revents & POLLIN) {
136             char msg;
137 12           int ret= read(pollset[0].fd, &msg, 1);
138 12 50         if (ret != 1) { // should never fail
139             WATCHTHREAD_DEBUG("read(control_pipe): %d, errno %m, terminating watch_thread\n", ret);
140 0           return false;
141             }
142 12 100         if (msg == CONTROL_TERMINATE) {// intentional exit
143             WATCHTHREAD_DEBUG("terminate received\n");
144 1           return false;
145             }
146             // else its CONTROL_REWATCH, which means we should start over with new alarms to watch
147             WATCHTHREAD_DEBUG("got REWATCH, starting over\n");
148 11           return true;
149             }
150            
151             // Now, process all of the socketalarms using the statuses from the pollfd
152 12 50         if (pthread_mutex_lock(&watch_list_mutex))
153 0           abort(); // should never fail
154 114 100         for (i= 0, n= watch_list_count; i < n; i++) {
155 102           struct socketalarm *alarm= watch_list[i];
156             // If it has not been triggered yet, see if it is now
157 102 100         if (alarm->cur_action == -1) {
158 57           bool trigger= false;
159 57           int fd= alarm->watch_fd, revents;
160             size_t pollfd_node;
161             struct stat statbuf;
162             // Is it still the same socket that we intended to watch?
163 57 50         if (fstat(fd, &statbuf) != 0
164 57 50         || statbuf.st_dev != alarm->watch_fd_dev
165 57 50         || statbuf.st_ino != alarm->watch_fd_ino
166             ) {
167             // fd was closed/reused. If user watching event CLOSE, then trigger the actions,
168             // else assume that the host program took care of the socket and doesn't want
169             // the alarm.
170 0 0         if (alarm->event_mask & EVENT_CLOSE)
171 0           trigger= true;
172             else
173 0           alarm->cur_action= alarm->action_count;
174             }
175             else {
176 57           int poll_i= -1 + (int) pollfd_rbhash_find(pollset+capacity, capacity, fd & (buckets-1), fd);
177             // Did we poll this fd?
178 57 50         if (poll_i < 0)
179             // can only happen if watch_list changed while we let go of the mutex (or a bug in rbhash)
180 45           continue;
181              
182 57           revents= pollset[poll_i].revents;
183 57 100         trigger= ((alarm->event_mask & EVENT_SHUT) && (revents &
184             #ifdef POLLRDHUP
185             (POLLHUP|POLLRDHUP|POLLERR)
186             #else
187             (POLLHUP|POLLERR)
188             #endif
189             ))
190 45 50         || ((alarm->event_mask & EVENT_IN) && (revents & POLLIN))
    0          
191 114 50         || ((alarm->event_mask & EVENT_PRI) && (revents & POLLPRI));
    50          
    0          
192             // Now the tricky one, EVENT_EOF...
193 57 100         if (!trigger && (alarm->event_mask & EVENT_EOF) && (alarm->unwaitable || (revents & POLLIN))) {
    50          
    0          
    0          
194 0           int avail= recv(fd, msgbuf, sizeof(msgbuf), MSG_DONTWAIT|MSG_PEEK);
195 0 0         if (avail == 0)
196             // This the zero-length read that means EOF
197 0           trigger= true;
198             else
199             // else if there is data on the socket, we are in the "unwaitable" condition
200             // else, error conditions are not "EOF" and can still be waited using POLLIN.
201 0           alarm->unwaitable= (avail > 0);
202             }
203             // We're playing with race conditions, so make sure one more time that we're
204             // triggering on the socket we expected.
205 57 100         if (trigger) {
206 12 50         if ((fstat(fd, &statbuf) != 0
207 12 50         || statbuf.st_dev != alarm->watch_fd_dev
208 12 50         || statbuf.st_ino != alarm->watch_fd_ino
209 0 0         ) && !(alarm->event_mask & EVENT_CLOSE)
210             ) {
211 0           alarm->cur_action= alarm->action_count;
212 0           trigger= false;
213             }
214             }
215             }
216 57 100         if (!trigger)
217 45           continue; // don't exec_actions
218             }
219 57           socketalarm_exec_actions(alarm);
220             }
221 12           pthread_mutex_unlock(&watch_list_mutex);
222 12           return true;
223             }
224              
225             // May only be called by Perl's thread
226 12           static bool watch_list_add(struct socketalarm *alarm) {
227             int i;
228 12           const char *error= NULL;
229              
230 12 50         if (pthread_mutex_lock(&watch_list_mutex))
231 0           croak("mutex_lock failed");
232              
233 12 100         if (!watch_list) {
234 1           Newxz(watch_list, 16, struct socketalarm * volatile);
235 1           watch_list_alloc= 16;
236             }
237             else {
238             // Clean up completed watches
239 56 100         for (i= watch_list_count-1; i >= 0; --i) {
240 45 50         if (watch_list[i]->cur_action >= watch_list[i]->action_count) {
241 0           watch_list[i]->list_ofs= -1;
242 0 0         if (--watch_list_count > i) {
243 0           watch_list[i]= watch_list[watch_list_count];
244 0           watch_list[i]->list_ofs= i;
245             }
246 0           watch_list[watch_list_count]= NULL;
247             }
248             }
249             // allocate more if needed
250 11 50         if (watch_list_count >= watch_list_alloc) {
251 0           Renew(watch_list, watch_list_alloc*2, struct socketalarm * volatile);
252 0           watch_list_alloc= watch_list_alloc*2;
253             }
254             }
255              
256 12           i= alarm->list_ofs;
257 12 50         if (i < 0) { // only add if not already added
258 12           alarm->list_ofs= watch_list_count;
259 12           watch_list[watch_list_count++]= alarm;
260             // Initialize fields that watcher uses to track status
261 12           alarm->cur_action= -1;
262 12           alarm->wake_ts.tv_nsec= -1;
263 12           alarm->unwaitable= false;
264             }
265            
266             // If the thread is not running, start it. Also create pipe if needed.
267 12 100         if (control_pipe[1] < 0) {
268 1           int ret= 0;
269             sigset_t mask, orig;
270 1           sigfillset(&mask);
271              
272 1 50         if (pipe(control_pipe) != 0)
273 0           error= "pipe() failed";
274             // Block all signals before creating thread so that the new thread inherits it,
275             // then restore the original signals.
276 1 50         else if (pthread_sigmask(SIG_SETMASK, &mask, &orig) != 0)
277 0           error= "pthread_sigmask(BLOCK) failed";
278 1 50         else if (pthread_create(&watch_thread, NULL, (void*(*)(void*)) watch_thread_main, NULL) != 0)
279 0           error= "pthread_create failed";
280 1 50         else if (pthread_sigmask(SIG_SETMASK, &orig, NULL) != 0)
281 0           error= "pthread_sigmask(UNBLOCK) failed";
282             } else {
283 11           char msg= CONTROL_REWATCH;
284 11 50         if (write(control_pipe[1], &msg, 1) != 1)
285 0           error= "failed to notify watch_thread";
286             }
287 12           pthread_mutex_unlock(&watch_list_mutex);
288 12 50         if (error)
289 0           croak(error);
290 12           return i < 0;
291             }
292              
293             // need to lock mutex before accessing concurrent alarm fields
294 3           static void watch_list_item_get_status(struct socketalarm *alarm, int *cur_action_out) {
295 3 50         if (pthread_mutex_lock(&watch_list_mutex))
296 0           croak("mutex_lock failed");
297              
298 3 50         if (cur_action_out) *cur_action_out= alarm->cur_action;
299              
300 3           pthread_mutex_unlock(&watch_list_mutex);
301 3           }
302              
303             // May only be called by Perl's thread
304 28           static bool watch_list_remove(struct socketalarm *alarm) {
305             int i;
306 28 50         if (pthread_mutex_lock(&watch_list_mutex))
307 0           croak("mutex_lock failed");
308             // Clean up completed watches
309 40 100         for (i= watch_list_count-1; i >= 0; --i) {
310 12 50         if (watch_list[i]->cur_action >= watch_list[i]->action_count) {
311 12           watch_list[i]->list_ofs= -1;
312 12 50         if (--watch_list_count > i) {
313 0           watch_list[i]= watch_list[watch_list_count];
314 0           watch_list[i]->list_ofs= i;
315             }
316 12           watch_list[watch_list_count]= NULL;
317             }
318             }
319 28           i= alarm->list_ofs;
320 28 50         if (i >= 0) {
321             // fill the hole in the list by moving the final item
322 0 0         if (i < watch_list_count-1) {
323 0           watch_list[i]= watch_list[watch_list_count-1];
324 0           watch_list[i]->list_ofs= i;
325             }
326 0           --watch_list_count;
327 0           alarm->list_ofs= -1;
328              
329             // This one was still an active watch, so need to notify thread
330             // not to listen for it anymore
331 0 0         if (control_pipe[1] >= 0) {
332 0           char msg= CONTROL_REWATCH;
333 0 0         if (write(control_pipe[1], &msg, 1) != 1) {
334 0           pthread_mutex_unlock(&watch_list_mutex);
335 0           croak("failed to notify watch_thread");
336             }
337             }
338             }
339 28           pthread_mutex_unlock(&watch_list_mutex);
340 28           return i >= 0;
341             }
342              
343             // only called during Perl's END phase. Just need to let
344             // things end gracefully and not have the thread go nuts
345             // as sockets get closed.
346 6           static void shutdown_watch_thread() {
347             int i;
348             // Wipe the alarm list
349 6 50         if (pthread_mutex_lock(&watch_list_mutex))
350 0           croak("mutex_lock failed");
351 6 50         for (i= 0; i < watch_list_count; i++) {
352 0           watch_list[i]->list_ofs= -1;
353 0           watch_list[i]= NULL;
354             }
355 6           watch_list_count= 0;
356              
357             // Notify the thread to stop
358 6 100         if (control_pipe[1] >= 0) {
359 1           char msg= CONTROL_TERMINATE;
360 1 50         if (write(control_pipe[1], &msg, 1) != 1)
361 0           warn("write(control_pipe) failed");
362             }
363            
364 6           pthread_mutex_unlock(&watch_list_mutex);
365             // don't bother unallocating watch_list or closing pipe,
366             // because we're exiting anyway.
367 6           }