| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
#include "pollfd_rbhash.c" |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
// Returns false when time to exit |
|
4
|
|
|
|
|
|
|
static bool do_watch(); |
|
5
|
|
|
|
|
|
|
|
|
6
|
0
|
|
|
|
|
|
void watch_thread_log(void* buffer, int len) { |
|
7
|
0
|
0
|
|
|
|
|
int unused= write(2, len <= 0? "error\n":buffer, len <= 0? 6 : len); |
|
|
|
0
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
(void) unused; |
|
9
|
0
|
|
|
|
|
|
} |
|
10
|
|
|
|
|
|
|
#ifdef WATCHTHREAD_DEBUGGING |
|
11
|
|
|
|
|
|
|
#define WATCHTHREAD_DEBUG(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ )) |
|
12
|
|
|
|
|
|
|
#define WATCHTHREAD_WARN(fmt, ...) watch_thread_log(msgbuf, snprintf(msgbuf, sizeof(msgbuf), fmt, ##__VA_ARGS__ )) |
|
13
|
|
|
|
|
|
|
#else |
|
14
|
|
|
|
|
|
|
#define WATCHTHREAD_DEBUG(fmt, ...) ((void)0) |
|
15
|
|
|
|
|
|
|
#define WATCHTHREAD_WARN(fmt, ...) ((void)0) |
|
16
|
|
|
|
|
|
|
#endif |
|
17
|
|
|
|
|
|
|
|
|
18
|
1
|
|
|
|
|
|
void* watch_thread_main(void* unused) { |
|
19
|
24
|
100
|
|
|
|
|
while (do_watch()) {} |
|
20
|
1
|
|
|
|
|
|
return NULL; |
|
21
|
|
|
|
|
|
|
} |
|
22
|
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
// separate from watch_thread_main because it uses a dynamic alloca() on each iteration |
|
24
|
24
|
|
|
|
|
|
bool do_watch() { |
|
25
|
|
|
|
|
|
|
struct pollfd *pollset; |
|
26
|
24
|
|
|
|
|
|
struct timespec wake_time= { 0, -1 }; |
|
27
|
24
|
|
|
|
|
|
int capacity, buckets, sz, n_poll, i, j, n, delay= 10000; |
|
28
|
|
|
|
|
|
|
char msgbuf[128]; |
|
29
|
|
|
|
|
|
|
|
|
30
|
24
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
31
|
0
|
|
|
|
|
|
abort(); // should never fail |
|
32
|
|
|
|
|
|
|
// allocate to the size of watch_list_count, but cap it at 1024 for sanity |
|
33
|
|
|
|
|
|
|
// since this is coming off the stack. If any user actually wants to watch |
|
34
|
|
|
|
|
|
|
// more than 1024 sockets, this needs re-designed, but I'm not sure if |
|
35
|
|
|
|
|
|
|
// malloc is thread-safe when the main perl binary was compiled without |
|
36
|
|
|
|
|
|
|
// thread support. |
|
37
|
24
|
50
|
|
|
|
|
capacity= watch_list_count > 1024? 1024 : watch_list_count+1; |
|
38
|
24
|
50
|
|
|
|
|
buckets= capacity < 16? 16 : capacity < 128? 32 : 64; |
|
|
|
0
|
|
|
|
|
|
|
39
|
24
|
50
|
|
|
|
|
sz= sizeof(struct pollfd) * capacity + POLLFD_RBHASH_SIZEOF(capacity, buckets); |
|
40
|
24
|
|
|
|
|
|
pollset= (struct pollfd *) alloca(sz); |
|
41
|
24
|
|
|
|
|
|
memset(pollset, 0, sz); |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
// first fd is always our control socket |
|
44
|
24
|
|
|
|
|
|
pollset[0].fd= control_pipe[0]; |
|
45
|
24
|
|
|
|
|
|
pollset[0].events= POLLIN; |
|
46
|
24
|
|
|
|
|
|
n_poll= 1; |
|
47
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG("watch_thread loop iter, watch_list_count=%d capacity=%d buckets=%d\n", watch_list_count, capacity, buckets); |
|
48
|
203
|
100
|
|
|
|
|
for (i= 0, n= watch_list_count; i < n && n_poll < capacity; i++) { |
|
|
|
50
|
|
|
|
|
|
|
49
|
179
|
|
|
|
|
|
struct socketalarm *alarm= watch_list[i]; |
|
50
|
179
|
|
|
|
|
|
int fd= alarm->watch_fd; |
|
51
|
|
|
|
|
|
|
// Ignore watches that finished. Main thread needs to clean these up. |
|
52
|
179
|
100
|
|
|
|
|
if (alarm->cur_action >= alarm->action_count) |
|
53
|
57
|
|
|
|
|
|
continue; |
|
54
|
|
|
|
|
|
|
// If the socketalarm is in the process of being executed and stopped at |
|
55
|
|
|
|
|
|
|
// a 'sleep' command, factor that into the wake time. |
|
56
|
122
|
50
|
|
|
|
|
if (alarm->wake_ts.tv_nsec != -1) { |
|
57
|
0
|
0
|
|
|
|
|
if (wake_time.tv_nsec == -1 |
|
58
|
0
|
0
|
|
|
|
|
|| wake_time.tv_sec > alarm->wake_ts.tv_sec |
|
59
|
0
|
0
|
|
|
|
|
|| (wake_time.tv_sec == alarm->wake_ts.tv_sec && wake_time.tv_nsec > alarm->wake_ts.tv_nsec) |
|
|
|
0
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
) { |
|
61
|
0
|
|
|
|
|
|
wake_time.tv_sec= alarm->wake_ts.tv_sec; |
|
62
|
0
|
|
|
|
|
|
wake_time.tv_nsec= alarm->wake_ts.tv_nsec; |
|
63
|
|
|
|
|
|
|
} |
|
64
|
|
|
|
|
|
|
} |
|
65
|
|
|
|
|
|
|
else { |
|
66
|
|
|
|
|
|
|
// Find a pollset slot for this file descriptor, collapsing duplicates. |
|
67
|
|
|
|
|
|
|
// The next unused one is at [n_poll], which has NodeID n_poll+1 |
|
68
|
122
|
|
|
|
|
|
int poll_i= -1 + (int)pollfd_rbhash_insert(pollset+capacity, capacity, n_poll+1, fd & (buckets-1), fd); |
|
69
|
122
|
50
|
|
|
|
|
if (poll_i < 0) { // corrupt datastruct, should never happen |
|
70
|
|
|
|
|
|
|
WATCHTHREAD_WARN("BUG: corrupt pollfd_rbhash"); |
|
71
|
0
|
|
|
|
|
|
break; |
|
72
|
|
|
|
|
|
|
} |
|
73
|
122
|
50
|
|
|
|
|
if (poll_i == n_poll) { // using the new uninitialized one? |
|
74
|
122
|
|
|
|
|
|
pollset[poll_i].fd= fd; |
|
75
|
122
|
|
|
|
|
|
pollset[poll_i].events= 0; |
|
76
|
122
|
|
|
|
|
|
++n_poll; |
|
77
|
|
|
|
|
|
|
} |
|
78
|
|
|
|
|
|
|
// Add the poll flags of this socketalarm |
|
79
|
122
|
|
|
|
|
|
int events= alarm->event_mask; |
|
80
|
|
|
|
|
|
|
#ifdef POLLRDHUP |
|
81
|
122
|
50
|
|
|
|
|
if (events & EVENT_SHUT) |
|
82
|
122
|
|
|
|
|
|
pollset[poll_i].events= POLLRDHUP; |
|
83
|
|
|
|
|
|
|
#endif |
|
84
|
122
|
50
|
|
|
|
|
if (events & EVENT_EOF) { |
|
85
|
|
|
|
|
|
|
// If a fd gets data in the queue, there is no way to wait exclusively |
|
86
|
|
|
|
|
|
|
// for the EOF event. We have to wake up periodically to check the socket. |
|
87
|
0
|
0
|
|
|
|
|
if (alarm->unwaitable) // will be set if found data queued in the buffer |
|
88
|
0
|
|
|
|
|
|
delay= 500; |
|
89
|
|
|
|
|
|
|
else |
|
90
|
0
|
|
|
|
|
|
pollset[poll_i].events= POLLIN; |
|
91
|
|
|
|
|
|
|
} |
|
92
|
122
|
50
|
|
|
|
|
if (events & EVENT_IN) |
|
93
|
0
|
|
|
|
|
|
pollset[poll_i].events= POLLIN; |
|
94
|
122
|
50
|
|
|
|
|
if (events & EVENT_PRI) |
|
95
|
0
|
|
|
|
|
|
pollset[poll_i].events= POLLPRI; |
|
96
|
122
|
50
|
|
|
|
|
if (events & EVENT_CLOSE) { |
|
97
|
|
|
|
|
|
|
// According to poll docs, it is a bug to assume closing a socket in one thread |
|
98
|
|
|
|
|
|
|
// will wake a 'poll' in another thread, so if the user wants to know about this |
|
99
|
|
|
|
|
|
|
// condition, we have to loop more quickly. |
|
100
|
0
|
|
|
|
|
|
delay= 500; |
|
101
|
|
|
|
|
|
|
} |
|
102
|
|
|
|
|
|
|
} |
|
103
|
|
|
|
|
|
|
} |
|
104
|
24
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
105
|
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
// If there is a defined wake-time, truncate the delay if the wake-time comes first |
|
107
|
24
|
50
|
|
|
|
|
if (wake_time.tv_nsec != -1) { |
|
108
|
0
|
|
|
|
|
|
struct timespec now_ts= { 0, -1 }; |
|
109
|
0
|
0
|
|
|
|
|
if (lazy_build_now_ts(&now_ts)) { |
|
110
|
|
|
|
|
|
|
// subtract to find out delay. poll only has millisecond precision anyway. |
|
111
|
0
|
|
|
|
|
|
int wake_delay= ((long)wake_time.tv_sec - (long)now_ts.tv_sec) * 1000 |
|
112
|
0
|
|
|
|
|
|
+ (wake_time.tv_nsec - now_ts.tv_nsec)/1000000; |
|
113
|
0
|
0
|
|
|
|
|
if (wake_delay < delay) |
|
114
|
0
|
|
|
|
|
|
delay= wake_delay; |
|
115
|
|
|
|
|
|
|
} |
|
116
|
|
|
|
|
|
|
} |
|
117
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG("poll(n=%d delay=%d)\n", n_poll, delay); |
|
118
|
24
|
50
|
|
|
|
|
if (poll(pollset, n_poll, delay < 0? 0 : delay) < 0) { |
|
119
|
0
|
|
|
|
|
|
perror("poll"); |
|
120
|
0
|
|
|
|
|
|
return false; |
|
121
|
|
|
|
|
|
|
} |
|
122
|
170
|
100
|
|
|
|
|
for (i= 0; i < n_poll; i++) { |
|
123
|
146
|
|
|
|
|
|
int e= pollset[i].revents; |
|
124
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG(" fd=%3d revents=%02X (%s%s%s%s%s%s%s)\n", pollset[i].fd, e, |
|
125
|
|
|
|
|
|
|
e&POLLIN? " IN":"", e&POLLPRI? " PRI":"", e&POLLOUT? " OUT":"", |
|
126
|
|
|
|
|
|
|
#ifdef POLLRDHUP |
|
127
|
|
|
|
|
|
|
e&POLLRDHUP? " RDHUP":"", |
|
128
|
|
|
|
|
|
|
#else |
|
129
|
|
|
|
|
|
|
"", |
|
130
|
|
|
|
|
|
|
#endif |
|
131
|
|
|
|
|
|
|
e&POLLERR? " ERR":"", e&POLLHUP? " HUP":"", e&POLLNVAL? " NVAL":""); |
|
132
|
|
|
|
|
|
|
} |
|
133
|
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
// First, did we get new control messages? |
|
135
|
24
|
100
|
|
|
|
|
if (pollset[0].revents & POLLIN) { |
|
136
|
|
|
|
|
|
|
char msg; |
|
137
|
12
|
|
|
|
|
|
int ret= read(pollset[0].fd, &msg, 1); |
|
138
|
12
|
50
|
|
|
|
|
if (ret != 1) { // should never fail |
|
139
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG("read(control_pipe): %d, errno %m, terminating watch_thread\n", ret); |
|
140
|
0
|
|
|
|
|
|
return false; |
|
141
|
|
|
|
|
|
|
} |
|
142
|
12
|
100
|
|
|
|
|
if (msg == CONTROL_TERMINATE) {// intentional exit |
|
143
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG("terminate received\n"); |
|
144
|
1
|
|
|
|
|
|
return false; |
|
145
|
|
|
|
|
|
|
} |
|
146
|
|
|
|
|
|
|
// else its CONTROL_REWATCH, which means we should start over with new alarms to watch |
|
147
|
|
|
|
|
|
|
WATCHTHREAD_DEBUG("got REWATCH, starting over\n"); |
|
148
|
11
|
|
|
|
|
|
return true; |
|
149
|
|
|
|
|
|
|
} |
|
150
|
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
// Now, process all of the socketalarms using the statuses from the pollfd |
|
152
|
12
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
153
|
0
|
|
|
|
|
|
abort(); // should never fail |
|
154
|
114
|
100
|
|
|
|
|
for (i= 0, n= watch_list_count; i < n; i++) { |
|
155
|
102
|
|
|
|
|
|
struct socketalarm *alarm= watch_list[i]; |
|
156
|
|
|
|
|
|
|
// If it has not been triggered yet, see if it is now |
|
157
|
102
|
100
|
|
|
|
|
if (alarm->cur_action == -1) { |
|
158
|
57
|
|
|
|
|
|
bool trigger= false; |
|
159
|
57
|
|
|
|
|
|
int fd= alarm->watch_fd, revents; |
|
160
|
|
|
|
|
|
|
size_t pollfd_node; |
|
161
|
|
|
|
|
|
|
struct stat statbuf; |
|
162
|
|
|
|
|
|
|
// Is it still the same socket that we intended to watch? |
|
163
|
57
|
50
|
|
|
|
|
if (fstat(fd, &statbuf) != 0 |
|
164
|
57
|
50
|
|
|
|
|
|| statbuf.st_dev != alarm->watch_fd_dev |
|
165
|
57
|
50
|
|
|
|
|
|| statbuf.st_ino != alarm->watch_fd_ino |
|
166
|
|
|
|
|
|
|
) { |
|
167
|
|
|
|
|
|
|
// fd was closed/reused. If user watching event CLOSE, then trigger the actions, |
|
168
|
|
|
|
|
|
|
// else assume that the host program took care of the socket and doesn't want |
|
169
|
|
|
|
|
|
|
// the alarm. |
|
170
|
0
|
0
|
|
|
|
|
if (alarm->event_mask & EVENT_CLOSE) |
|
171
|
0
|
|
|
|
|
|
trigger= true; |
|
172
|
|
|
|
|
|
|
else |
|
173
|
0
|
|
|
|
|
|
alarm->cur_action= alarm->action_count; |
|
174
|
|
|
|
|
|
|
} |
|
175
|
|
|
|
|
|
|
else { |
|
176
|
57
|
|
|
|
|
|
int poll_i= -1 + (int) pollfd_rbhash_find(pollset+capacity, capacity, fd & (buckets-1), fd); |
|
177
|
|
|
|
|
|
|
// Did we poll this fd? |
|
178
|
57
|
50
|
|
|
|
|
if (poll_i < 0) |
|
179
|
|
|
|
|
|
|
// can only happen if watch_list changed while we let go of the mutex (or a bug in rbhash) |
|
180
|
45
|
|
|
|
|
|
continue; |
|
181
|
|
|
|
|
|
|
|
|
182
|
57
|
|
|
|
|
|
revents= pollset[poll_i].revents; |
|
183
|
57
|
100
|
|
|
|
|
trigger= ((alarm->event_mask & EVENT_SHUT) && (revents & |
|
184
|
|
|
|
|
|
|
#ifdef POLLRDHUP |
|
185
|
|
|
|
|
|
|
(POLLHUP|POLLRDHUP|POLLERR) |
|
186
|
|
|
|
|
|
|
#else |
|
187
|
|
|
|
|
|
|
(POLLHUP|POLLERR) |
|
188
|
|
|
|
|
|
|
#endif |
|
189
|
|
|
|
|
|
|
)) |
|
190
|
45
|
50
|
|
|
|
|
|| ((alarm->event_mask & EVENT_IN) && (revents & POLLIN)) |
|
|
|
0
|
|
|
|
|
|
|
191
|
114
|
50
|
|
|
|
|
|| ((alarm->event_mask & EVENT_PRI) && (revents & POLLPRI)); |
|
|
|
50
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
// Now the tricky one, EVENT_EOF... |
|
193
|
57
|
100
|
|
|
|
|
if (!trigger && (alarm->event_mask & EVENT_EOF) && (alarm->unwaitable || (revents & POLLIN))) { |
|
|
|
50
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
194
|
0
|
|
|
|
|
|
int avail= recv(fd, msgbuf, sizeof(msgbuf), MSG_DONTWAIT|MSG_PEEK); |
|
195
|
0
|
0
|
|
|
|
|
if (avail == 0) |
|
196
|
|
|
|
|
|
|
// This the zero-length read that means EOF |
|
197
|
0
|
|
|
|
|
|
trigger= true; |
|
198
|
|
|
|
|
|
|
else |
|
199
|
|
|
|
|
|
|
// else if there is data on the socket, we are in the "unwaitable" condition |
|
200
|
|
|
|
|
|
|
// else, error conditions are not "EOF" and can still be waited using POLLIN. |
|
201
|
0
|
|
|
|
|
|
alarm->unwaitable= (avail > 0); |
|
202
|
|
|
|
|
|
|
} |
|
203
|
|
|
|
|
|
|
// We're playing with race conditions, so make sure one more time that we're |
|
204
|
|
|
|
|
|
|
// triggering on the socket we expected. |
|
205
|
57
|
100
|
|
|
|
|
if (trigger) { |
|
206
|
12
|
50
|
|
|
|
|
if ((fstat(fd, &statbuf) != 0 |
|
207
|
12
|
50
|
|
|
|
|
|| statbuf.st_dev != alarm->watch_fd_dev |
|
208
|
12
|
50
|
|
|
|
|
|| statbuf.st_ino != alarm->watch_fd_ino |
|
209
|
0
|
0
|
|
|
|
|
) && !(alarm->event_mask & EVENT_CLOSE) |
|
210
|
|
|
|
|
|
|
) { |
|
211
|
0
|
|
|
|
|
|
alarm->cur_action= alarm->action_count; |
|
212
|
0
|
|
|
|
|
|
trigger= false; |
|
213
|
|
|
|
|
|
|
} |
|
214
|
|
|
|
|
|
|
} |
|
215
|
|
|
|
|
|
|
} |
|
216
|
57
|
100
|
|
|
|
|
if (!trigger) |
|
217
|
45
|
|
|
|
|
|
continue; // don't exec_actions |
|
218
|
|
|
|
|
|
|
} |
|
219
|
57
|
|
|
|
|
|
socketalarm_exec_actions(alarm); |
|
220
|
|
|
|
|
|
|
} |
|
221
|
12
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
222
|
12
|
|
|
|
|
|
return true; |
|
223
|
|
|
|
|
|
|
} |
|
224
|
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
// May only be called by Perl's thread |
|
226
|
12
|
|
|
|
|
|
static bool watch_list_add(struct socketalarm *alarm) { |
|
227
|
|
|
|
|
|
|
int i; |
|
228
|
12
|
|
|
|
|
|
const char *error= NULL; |
|
229
|
|
|
|
|
|
|
|
|
230
|
12
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
231
|
0
|
|
|
|
|
|
croak("mutex_lock failed"); |
|
232
|
|
|
|
|
|
|
|
|
233
|
12
|
100
|
|
|
|
|
if (!watch_list) { |
|
234
|
1
|
|
|
|
|
|
Newxz(watch_list, 16, struct socketalarm * volatile); |
|
235
|
1
|
|
|
|
|
|
watch_list_alloc= 16; |
|
236
|
|
|
|
|
|
|
} |
|
237
|
|
|
|
|
|
|
else { |
|
238
|
|
|
|
|
|
|
// Clean up completed watches |
|
239
|
56
|
100
|
|
|
|
|
for (i= watch_list_count-1; i >= 0; --i) { |
|
240
|
45
|
50
|
|
|
|
|
if (watch_list[i]->cur_action >= watch_list[i]->action_count) { |
|
241
|
0
|
|
|
|
|
|
watch_list[i]->list_ofs= -1; |
|
242
|
0
|
0
|
|
|
|
|
if (--watch_list_count > i) { |
|
243
|
0
|
|
|
|
|
|
watch_list[i]= watch_list[watch_list_count]; |
|
244
|
0
|
|
|
|
|
|
watch_list[i]->list_ofs= i; |
|
245
|
|
|
|
|
|
|
} |
|
246
|
0
|
|
|
|
|
|
watch_list[watch_list_count]= NULL; |
|
247
|
|
|
|
|
|
|
} |
|
248
|
|
|
|
|
|
|
} |
|
249
|
|
|
|
|
|
|
// allocate more if needed |
|
250
|
11
|
50
|
|
|
|
|
if (watch_list_count >= watch_list_alloc) { |
|
251
|
0
|
|
|
|
|
|
Renew(watch_list, watch_list_alloc*2, struct socketalarm * volatile); |
|
252
|
0
|
|
|
|
|
|
watch_list_alloc= watch_list_alloc*2; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
|
|
|
|
|
|
} |
|
255
|
|
|
|
|
|
|
|
|
256
|
12
|
|
|
|
|
|
i= alarm->list_ofs; |
|
257
|
12
|
50
|
|
|
|
|
if (i < 0) { // only add if not already added |
|
258
|
12
|
|
|
|
|
|
alarm->list_ofs= watch_list_count; |
|
259
|
12
|
|
|
|
|
|
watch_list[watch_list_count++]= alarm; |
|
260
|
|
|
|
|
|
|
// Initialize fields that watcher uses to track status |
|
261
|
12
|
|
|
|
|
|
alarm->cur_action= -1; |
|
262
|
12
|
|
|
|
|
|
alarm->wake_ts.tv_nsec= -1; |
|
263
|
12
|
|
|
|
|
|
alarm->unwaitable= false; |
|
264
|
|
|
|
|
|
|
} |
|
265
|
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
// If the thread is not running, start it. Also create pipe if needed. |
|
267
|
12
|
100
|
|
|
|
|
if (control_pipe[1] < 0) { |
|
268
|
1
|
|
|
|
|
|
int ret= 0; |
|
269
|
|
|
|
|
|
|
sigset_t mask, orig; |
|
270
|
1
|
|
|
|
|
|
sigfillset(&mask); |
|
271
|
|
|
|
|
|
|
|
|
272
|
1
|
50
|
|
|
|
|
if (pipe(control_pipe) != 0) |
|
273
|
0
|
|
|
|
|
|
error= "pipe() failed"; |
|
274
|
|
|
|
|
|
|
// Block all signals before creating thread so that the new thread inherits it, |
|
275
|
|
|
|
|
|
|
// then restore the original signals. |
|
276
|
1
|
50
|
|
|
|
|
else if (pthread_sigmask(SIG_SETMASK, &mask, &orig) != 0) |
|
277
|
0
|
|
|
|
|
|
error= "pthread_sigmask(BLOCK) failed"; |
|
278
|
1
|
50
|
|
|
|
|
else if (pthread_create(&watch_thread, NULL, (void*(*)(void*)) watch_thread_main, NULL) != 0) |
|
279
|
0
|
|
|
|
|
|
error= "pthread_create failed"; |
|
280
|
1
|
50
|
|
|
|
|
else if (pthread_sigmask(SIG_SETMASK, &orig, NULL) != 0) |
|
281
|
0
|
|
|
|
|
|
error= "pthread_sigmask(UNBLOCK) failed"; |
|
282
|
|
|
|
|
|
|
} else { |
|
283
|
11
|
|
|
|
|
|
char msg= CONTROL_REWATCH; |
|
284
|
11
|
50
|
|
|
|
|
if (write(control_pipe[1], &msg, 1) != 1) |
|
285
|
0
|
|
|
|
|
|
error= "failed to notify watch_thread"; |
|
286
|
|
|
|
|
|
|
} |
|
287
|
12
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
288
|
12
|
50
|
|
|
|
|
if (error) |
|
289
|
0
|
|
|
|
|
|
croak(error); |
|
290
|
12
|
|
|
|
|
|
return i < 0; |
|
291
|
|
|
|
|
|
|
} |
|
292
|
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
// need to lock mutex before accessing concurrent alarm fields |
|
294
|
3
|
|
|
|
|
|
static void watch_list_item_get_status(struct socketalarm *alarm, int *cur_action_out) { |
|
295
|
3
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
296
|
0
|
|
|
|
|
|
croak("mutex_lock failed"); |
|
297
|
|
|
|
|
|
|
|
|
298
|
3
|
50
|
|
|
|
|
if (cur_action_out) *cur_action_out= alarm->cur_action; |
|
299
|
|
|
|
|
|
|
|
|
300
|
3
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
301
|
3
|
|
|
|
|
|
} |
|
302
|
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
// May only be called by Perl's thread |
|
304
|
28
|
|
|
|
|
|
static bool watch_list_remove(struct socketalarm *alarm) { |
|
305
|
|
|
|
|
|
|
int i; |
|
306
|
28
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
307
|
0
|
|
|
|
|
|
croak("mutex_lock failed"); |
|
308
|
|
|
|
|
|
|
// Clean up completed watches |
|
309
|
40
|
100
|
|
|
|
|
for (i= watch_list_count-1; i >= 0; --i) { |
|
310
|
12
|
50
|
|
|
|
|
if (watch_list[i]->cur_action >= watch_list[i]->action_count) { |
|
311
|
12
|
|
|
|
|
|
watch_list[i]->list_ofs= -1; |
|
312
|
12
|
50
|
|
|
|
|
if (--watch_list_count > i) { |
|
313
|
0
|
|
|
|
|
|
watch_list[i]= watch_list[watch_list_count]; |
|
314
|
0
|
|
|
|
|
|
watch_list[i]->list_ofs= i; |
|
315
|
|
|
|
|
|
|
} |
|
316
|
12
|
|
|
|
|
|
watch_list[watch_list_count]= NULL; |
|
317
|
|
|
|
|
|
|
} |
|
318
|
|
|
|
|
|
|
} |
|
319
|
28
|
|
|
|
|
|
i= alarm->list_ofs; |
|
320
|
28
|
50
|
|
|
|
|
if (i >= 0) { |
|
321
|
|
|
|
|
|
|
// fill the hole in the list by moving the final item |
|
322
|
0
|
0
|
|
|
|
|
if (i < watch_list_count-1) { |
|
323
|
0
|
|
|
|
|
|
watch_list[i]= watch_list[watch_list_count-1]; |
|
324
|
0
|
|
|
|
|
|
watch_list[i]->list_ofs= i; |
|
325
|
|
|
|
|
|
|
} |
|
326
|
0
|
|
|
|
|
|
--watch_list_count; |
|
327
|
0
|
|
|
|
|
|
alarm->list_ofs= -1; |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
// This one was still an active watch, so need to notify thread |
|
330
|
|
|
|
|
|
|
// not to listen for it anymore |
|
331
|
0
|
0
|
|
|
|
|
if (control_pipe[1] >= 0) { |
|
332
|
0
|
|
|
|
|
|
char msg= CONTROL_REWATCH; |
|
333
|
0
|
0
|
|
|
|
|
if (write(control_pipe[1], &msg, 1) != 1) { |
|
334
|
0
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
335
|
0
|
|
|
|
|
|
croak("failed to notify watch_thread"); |
|
336
|
|
|
|
|
|
|
} |
|
337
|
|
|
|
|
|
|
} |
|
338
|
|
|
|
|
|
|
} |
|
339
|
28
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
340
|
28
|
|
|
|
|
|
return i >= 0; |
|
341
|
|
|
|
|
|
|
} |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
// only called during Perl's END phase. Just need to let |
|
344
|
|
|
|
|
|
|
// things end gracefully and not have the thread go nuts |
|
345
|
|
|
|
|
|
|
// as sockets get closed. |
|
346
|
6
|
|
|
|
|
|
static void shutdown_watch_thread() { |
|
347
|
|
|
|
|
|
|
int i; |
|
348
|
|
|
|
|
|
|
// Wipe the alarm list |
|
349
|
6
|
50
|
|
|
|
|
if (pthread_mutex_lock(&watch_list_mutex)) |
|
350
|
0
|
|
|
|
|
|
croak("mutex_lock failed"); |
|
351
|
6
|
50
|
|
|
|
|
for (i= 0; i < watch_list_count; i++) { |
|
352
|
0
|
|
|
|
|
|
watch_list[i]->list_ofs= -1; |
|
353
|
0
|
|
|
|
|
|
watch_list[i]= NULL; |
|
354
|
|
|
|
|
|
|
} |
|
355
|
6
|
|
|
|
|
|
watch_list_count= 0; |
|
356
|
|
|
|
|
|
|
|
|
357
|
|
|
|
|
|
|
// Notify the thread to stop |
|
358
|
6
|
100
|
|
|
|
|
if (control_pipe[1] >= 0) { |
|
359
|
1
|
|
|
|
|
|
char msg= CONTROL_TERMINATE; |
|
360
|
1
|
50
|
|
|
|
|
if (write(control_pipe[1], &msg, 1) != 1) |
|
361
|
0
|
|
|
|
|
|
warn("write(control_pipe) failed"); |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
|
|
364
|
6
|
|
|
|
|
|
pthread_mutex_unlock(&watch_list_mutex); |
|
365
|
|
|
|
|
|
|
// don't bother unallocating watch_list or closing pipe, |
|
366
|
|
|
|
|
|
|
// because we're exiting anyway. |
|
367
|
6
|
|
|
|
|
|
} |