File Coverage

deps/libzmqraw/timers.c
Criterion Covered Total %
statement 183 196 93.3
branch 59 102 57.8
condition n/a
subroutine n/a
pod n/a
total 242 298 81.2


line stmt bran cond sub pod time code
1             #include
2              
3             #include
4             #include
5              
6             #include "mutex.h"
7             #include "timers.h"
8              
9              
10             struct zmq_raw_timers
11             {
12             zmq_raw_mutex *mutex;
13             void *timers;
14             int timer_count;
15             void *thread;
16             int running;
17              
18             zmq_raw_timer **last;
19             int run_count;
20              
21             zmq_pollitem_t wakeup_item;
22             void *wakeup_context;
23             void *wakeup_send;
24             void *wakeup_recv;
25             };
26              
27             struct zmq_raw_timer
28             {
29             int id;
30             int running;
31             int after;
32             int interval;
33             void *context;
34             void *send;
35             void *recv;
36             void *recv_sv;
37             zmq_raw_timers *timers;
38             };
39              
40              
41             static void timer_handler (int timer_id, void *arg);
42             static void timer_thread (void *arg);
43              
44              
45 3           zmq_raw_timers *zmq_raw_timers_create()
46             {
47             int rc;
48             zmq_raw_timers *timers;
49              
50 6           if ((timers = calloc (1, sizeof (zmq_raw_timers))) == NULL ||
51 6 50         (timers->timers = zmq_timers_new()) == NULL ||
52 6 50         (timers->wakeup_context = zmq_ctx_new()) == NULL ||
53 6 50         (timers->wakeup_send = zmq_socket (timers->wakeup_context, ZMQ_PAIR)) == NULL ||
54 3           (timers->wakeup_recv = zmq_socket (timers->wakeup_context, ZMQ_PAIR)) == NULL)
55             goto on_error;
56              
57 3 50         if ((rc = zmq_bind (timers->wakeup_recv, "inproc://_wakeup")) < 0 ||
    50          
58 3           (rc = zmq_connect (timers->wakeup_send, "inproc://_wakeup")) < 0)
59             goto on_error;
60              
61 3           goto done;
62              
63             on_error:
64 0           zmq_close (timers->wakeup_recv);
65 0           zmq_close (timers->wakeup_send);
66 0           zmq_ctx_term (timers->wakeup_context);
67 0           free (timers);
68 0           return NULL;
69              
70             done:
71 3           timers->wakeup_item.events = ZMQ_POLLIN;
72 3           timers->wakeup_item.socket = timers->wakeup_recv;
73 3           timers->mutex = zmq_raw_mutex_create();
74 3           timers->timer_count = 0;
75 3           return timers;
76             }
77              
78 3           void zmq_raw_timers_destroy (zmq_raw_timers *timers)
79             {
80 3 50         assert (timers);
81 3 50         assert (timers->timer_count == 0);
82              
83 3           zmq_raw_mutex_lock (timers->mutex);
84 3           timers->running = 0;
85 3           zmq_send_const (timers->wakeup_send, "", 1, ZMQ_DONTWAIT);
86 3           zmq_raw_mutex_unlock (timers->mutex);
87              
88 3 50         if (timers->thread)
89 3           zmq_threadclose (timers->thread);
90 3           zmq_raw_mutex_destroy (timers->mutex);
91              
92 3           zmq_close (timers->wakeup_send);
93 3           zmq_close (timers->wakeup_recv);
94 3           zmq_ctx_term (timers->wakeup_context);
95 3           zmq_timers_destroy (&timers->timers);
96              
97 3           free (timers);
98 3           }
99              
100 24           static zmq_raw_timer *zmq_raw_timer_create (void *context, int after, int interval)
101             {
102             int rc;
103             char endpoint[64];
104             static const int v = 1;
105             static int id = 0;
106             zmq_raw_timer *timer;
107              
108 24 50         assert (context);
109              
110 24           sprintf (endpoint, "inproc://_timer-%d", ++id);
111              
112 48           if ((timer = calloc (1, sizeof (zmq_raw_timer))) == NULL ||
113 48 50         (timer->send = zmq_socket (context, ZMQ_PAIR)) == NULL ||
114 24           (timer->recv = zmq_socket (context, ZMQ_PAIR)) == NULL)
115             goto on_error;
116              
117             /* Setting an interval of 0 will block zmq_timers_execute! Always add 10ms */
118 24           after += 10;
119              
120 24           timer->after = after;
121 24           timer->interval = interval;
122              
123 24 50         if ((rc = zmq_bind (timer->recv, endpoint)) < 0 ||
    50          
124 24 50         (rc = zmq_setsockopt (timer->recv, ZMQ_CONFLATE, &v, sizeof (v))) < 0 ||
125 24           (rc = zmq_connect (timer->send, endpoint)) < 0)
126             goto on_error;
127              
128 24           goto done;
129              
130             on_error:
131 0           zmq_close (timer->send);
132 0           zmq_close (timer->recv);
133 0           free (timer);
134 0           return NULL;
135              
136             done:
137 24           return timer;
138             }
139              
140 24           static void zmq_raw_timer_destroy (zmq_raw_timer *timer)
141             {
142 24 50         assert (timer);
143 24 50         assert (timer->send);
144              
145 24           zmq_close (timer->send);
146              
147 24 50         if (timer->recv && !timer->recv_sv)
    50          
148 0           zmq_close (timer->recv);
149              
150 24           zmq_raw_mutex_lock (timer->timers->mutex);
151 24           --timer->timers->timer_count;
152 24           zmq_raw_mutex_unlock (timer->timers->mutex);
153              
154 24           free (timer);
155 24           }
156              
157 27           static void zmq_raw_timers__start (zmq_raw_timer *timer)
158             {
159             zmq_raw_timers *timers;
160 27 50         assert (timer);
161 27 50         assert (timer->timers);
162              
163 27           timers = timer->timers;
164              
165 27           timer->id = zmq_timers_add (timers->timers, timer->after, timer_handler, timer);
166 27           timer->running = 1;
167              
168 27 100         if (!timers->running)
169             {
170             /* start the timer thread */
171 3           timers->running = 1;
172 3           timers->thread = zmq_threadstart (timer_thread, timers);
173             }
174             else
175             {
176             /* wakeup the timer thread */
177 24           zmq_send_const (timers->wakeup_send, "", 1, ZMQ_DONTWAIT);
178             }
179 27           }
180              
181 24           zmq_raw_timer *zmq_raw_timers_start (zmq_raw_timers *timers, void *context, int after, int interval)
182             {
183             int rc;
184             zmq_raw_timer *timer;
185              
186 24 50         assert (timers);
187 24 50         assert (context);
188              
189 24           zmq_raw_mutex_lock (timers->mutex);
190              
191 24           timer = zmq_raw_timer_create (context, after, interval);
192 24 50         if (timer == NULL)
193             {
194 0           zmq_raw_mutex_unlock (timers->mutex);
195 0           return NULL;
196             }
197              
198 24           timer->timers = timers;
199 24           zmq_raw_timers__start (timer);
200              
201 24           ++timer->timers->timer_count;
202 24           zmq_raw_mutex_unlock (timers->mutex);
203              
204 24           return timer;
205             }
206              
207 27           void zmq_raw_timers_reset (zmq_raw_timer *timer)
208             {
209 27 50         assert (timer);
210              
211 27           zmq_raw_mutex_lock (timer->timers->mutex);
212 27 100         if (timer->running)
213 24           zmq_timers_reset (timer->timers->timers, timer->id);
214             else
215 3           zmq_raw_timers__start (timer);
216              
217 27 50         while (zmq_recv (timer->recv, NULL, 0, ZMQ_DONTWAIT) == 0);
218 27           zmq_raw_mutex_unlock (timer->timers->mutex);
219 27           }
220              
221 52           static void zmq_raw_timers__stop (zmq_raw_timer *timer)
222             {
223 52 50         assert (timer);
224              
225 52 100         if (timer->running)
226             {
227 27           timer->running = 0;
228             /* explicitly set the interval to 0 so the next call
229             * to zmq_timers_execute() cleans up the cancelled
230             * timer immediately */
231 27           zmq_timers_set_interval (timer->timers->timers, timer->id, 0);
232 27           zmq_timers_cancel (timer->timers->timers, timer->id);
233             }
234 52           }
235              
236 1           static void zmq_raw_timers__expire (zmq_raw_timer *timer)
237             {
238 1 50         if (timer->running)
239             {
240 1           zmq_raw_timers__stop (timer);
241              
242 1           timer->timers->last = NULL;
243 1           timer->timers->run_count = 0;
244              
245 1           timer_handler (timer->id, timer);
246              
247 1           free (timer->timers->last);
248             }
249 1           }
250              
251 1           void zmq_raw_timers_expire (zmq_raw_timer *timer)
252             {
253 1 50         assert (timer);
254              
255 1           zmq_raw_mutex_lock (timer->timers->mutex);
256 1           zmq_raw_timers__expire (timer);
257 1           zmq_raw_mutex_unlock (timer->timers->mutex);
258 1           }
259              
260 38           void zmq_raw_timers_stop (zmq_raw_timer *timer)
261             {
262 38 50         assert (timer);
263              
264 38           zmq_raw_mutex_lock (timer->timers->mutex);
265 38           zmq_raw_timers__stop (timer);
266 38           zmq_raw_mutex_unlock (timer->timers->mutex);
267 38           }
268              
269 24           void zmq_raw_timers_remove (zmq_raw_timer *timer)
270             {
271 24 50         assert (timer);
272              
273 24           zmq_raw_timers_stop (timer);
274 24           zmq_raw_timer_destroy (timer);
275 24           }
276              
277 1           int zmq_raw_timer_id (zmq_raw_timer *timer)
278             {
279 1 50         assert (timer);
280 1           return timer->id;
281             }
282              
283 24           void *zmq_raw_timer_get_recv (zmq_raw_timer *timer)
284             {
285 24 50         assert (timer);
286 24           return timer->recv;
287             }
288              
289 124           int zmq_raw_timer_is_running (zmq_raw_timer *timer)
290             {
291 124 50         assert (timer);
292 124           return timer->running;
293             }
294              
295 24           void zmq_raw_timer_set_sv (zmq_raw_timer *timer, void *sv)
296             {
297 24 50         assert (timer);
298 24 50         assert (sv);
299 24 50         assert (timer->recv_sv == NULL);
300              
301 24           timer->recv_sv = sv;
302 24           }
303              
304 330           void *zmq_raw_timer_get_sv (zmq_raw_timer *timer)
305             {
306 330           return timer->recv_sv;
307             }
308              
309 6           void zmq_raw_timer_set_interval (zmq_raw_timer *timer, int interval)
310             {
311 6 50         assert (timer);
312 6 50         assert (interval > 0);
313              
314 6           zmq_raw_mutex_lock (timer->timers->mutex);
315 6           timer->interval = interval;
316 6           zmq_raw_mutex_unlock (timer->timers->mutex);
317 6           }
318              
319 8           int zmq_raw_timer_get_interval (zmq_raw_timer *timer)
320             {
321             int interval;
322 8 50         assert (timer);
323              
324 8           zmq_raw_mutex_lock (timer->timers->mutex);
325 8           interval = timer->interval;
326 8           zmq_raw_mutex_unlock (timer->timers->mutex);
327              
328 8           return interval;
329             }
330              
331 3           void timer_thread (void *arg)
332             {
333 3           int count = 0, running = 1;
334             long timeout;
335 3           zmq_raw_timers *timers = (zmq_raw_timers *)arg;
336              
337 112 100         while (running)
338             {
339 109           zmq_raw_mutex_lock (timers->mutex);
340              
341             /* clear any 'pending' wakeup signals */
342 109 50         while (zmq_recv (timers->wakeup_recv, NULL, 0, ZMQ_DONTWAIT) == 0);
343              
344 109           timers->last = NULL;
345 109           timers->run_count = 0;
346 109           zmq_timers_execute (timers->timers);
347              
348 188 100         while (--timers->run_count >= 0)
349             {
350 79           int index = timers->run_count;
351 79           zmq_raw_timer *timer = timers->last[index];
352              
353 79 100         if (timer->interval == 0)
354 13           zmq_raw_timers__stop (timer);
355             else
356 66           zmq_timers_set_interval (timers->timers, timer->id,
357 66           (size_t)timer->interval);
358             }
359              
360 109 100         if (timers->last)
361 79           free (timers->last);
362              
363 109           running = timers->running;
364 109           timeout = zmq_timers_timeout (timers->timers);
365 109           zmq_raw_mutex_unlock (timers->mutex);
366              
367             /* sleep for 'timeout'. this may be interrupted
368             * by adding a new timer*/
369 109 100         if (running)
370 106           zmq_poll (&timers->wakeup_item, 1, timeout);
371             }
372 3           }
373              
374 80           void timer_handler (int timer_id, void *arg)
375             {
376 80 50         assert (arg);
377              
378             /* this is guaranteed to execute with the timers mutex locked */
379 80           zmq_raw_timer *timer = (zmq_raw_timer *)arg;
380              
381 80 50         assert (timer->id == timer_id);
382              
383 80           zmq_send_const (timer->send, "", 1, ZMQ_DONTWAIT);
384              
385 80           zmq_raw_timers *timers = timer->timers;
386 80           int index = timers->run_count++;
387              
388 80 50         if (index == 0)
389 80           timers->last = calloc (1, sizeof (zmq_raw_timer *));
390             else
391 0           timers->last = realloc (timers->last, timers->run_count*sizeof (zmq_raw_timer *));
392              
393 80 50         assert (timers->last);
394 80           timers->last[index] = timer;
395 80           }