File Coverage

blib/lib/ZMQ/Raw/Loop.pm
Criterion Covered Total %
statement 209 224 93.3
branch 53 78 67.9
condition 7 9 77.7
subroutine 32 33 96.9
pod 6 6 100.0
total 307 350 87.7


line stmt bran cond sub pod time code
1             package ZMQ::Raw::Loop;
2             $ZMQ::Raw::Loop::VERSION = '0.38';
3 14     14   97 use strict;
  14         28  
  14         398  
4 14     14   70 use warnings;
  14         151  
  14         333  
5 14     14   68 use Carp;
  14         22  
  14         1323  
6              
7 0     0   0 sub CLONE_SKIP { 1 }
8              
9             my @attributes;
10              
11             BEGIN
12             {
13 14     14   76 @attributes = qw/
14             context
15             poller
16             timers
17             handles
18             promises
19             events
20             terminated
21              
22             tevent
23             /;
24              
25 14     14   155 no strict 'refs';
  14         30  
  14         1388  
26 14         47 foreach my $accessor (@attributes)
27             {
28 112         862 *{$accessor} = sub
29             {
30 1378 100   1378   8598854 @_ > 1 ? $_[0]->{$accessor} = $_[1] : $_[0]->{$accessor}
31 112         390 };
32             }
33             }
34              
35 14     14   90 use ZMQ::Raw;
  14         25  
  14         338  
36 14     14   6377 use ZMQ::Raw::Loop::Event;
  14         35  
  14         422  
37 14     14   6178 use ZMQ::Raw::Loop::Handle;
  14         33  
  14         414  
38 14     14   6128 use ZMQ::Raw::Loop::Promise;
  14         37  
  14         378  
39 14     14   6530 use ZMQ::Raw::Loop::Timer;
  14         33  
  14         28493  
40              
41             =head1 NAME
42              
43             ZMQ::Raw::Loop - Loop class
44              
45             =head1 VERSION
46              
47             version 0.38
48              
49             =head1 DESCRIPTION
50              
51             A L represents an event loop.
52              
53             B: The API of this module is unstable and may change without warning
54             (any change will be appropriately documented in the changelog).
55              
56             =head1 METHODS
57              
58             =head2 new( $context )
59              
60             Create a new event loop
61              
62             =head2 run( )
63              
64             Run the event loop
65              
66             =head2 run_one( )
67              
68             Run until a single event occurs
69              
70             =head2 add( $item )
71              
72             Add C<$item> to the event loop. C<$item> should be a L>,
73             L>, L> or
74             L>.
75              
76             =head2 remove( $item )
77              
78             Remove C<$item> from the event loop.
79              
80             =head2 terminate( )
81              
82             Terminate the event loop
83              
84             =cut
85              
86             sub new
87             {
88 2     2 1 422 my ($this, $context) = @_;
89              
90 2   33     17 my $class = ref ($this) || $this;
91             my $self =
92             {
93             context => $context,
94             poller => ZMQ::Raw::Poller->new,
95             timers => [],
96             handles => [],
97             events => [],
98             promises => [],
99             tevent => ZMQ::Raw::Loop::Event->new ($context,
100             on_set => sub
101             {
102 3     3   16 my ($event, $loop) = @_;
103 3         64 $loop->terminated (1);
104             }
105             )
106 2         120 };
107              
108 2         9 return bless $self, $class;
109             }
110              
111              
112              
113             sub run
114             {
115 15     15 1 1158 my ($this) = @_;
116              
117 15         53 $this->terminated (0);
118 15         49 $this->tevent->reset();
119 15         80 $this->add ($this->tevent);
120              
121 15   100     54 while (!$this->terminated && $this->poller->size > 1)
122             {
123 48         202 $this->run_one;
124             }
125              
126 15         61 $this->remove ($this->tevent);
127              
128 15         69 $this->_cancel_timers();
129 15         59 $this->_cancel_events();
130 15         65 $this->_cancel_handles();
131 15         52 $this->_clear_promises();
132             }
133              
134              
135              
136             sub run_one
137             {
138 58     58 1 170 my ($this) = @_;
139              
140 58 50       137 if ($this->poller->size)
141             {
142 58         155 my $count = $this->poller->wait (-1);
143 58 50       1124 if ($count)
144             {
145 58 100 100     595 $this->_dispatch_events() || $this->_dispatch_handles() || $this->_dispatch_timers();
146 58         172 $this->promises ([grep { $_->status == ZMQ::Raw::Loop::Promise->PLANNED } @{$this->promises}]);
  5         30  
  58         177  
147             }
148              
149 58         315 return 1;
150             }
151              
152 0         0 return 0;
153             }
154              
155              
156              
157             sub add
158             {
159 63     63 1 4224 my ($this, $item) = @_;
160              
161 63 100       262 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    100          
    100          
    50          
162             {
163 39         121 $this->_add_timer ($item);
164             }
165             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
166             {
167 4         12 $this->_add_handle ($item);
168             }
169             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
170             {
171 19         61 $this->_add_event ($item);
172             }
173             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Promise')
174             {
175 1         5 $this->_add_promise ($item);
176             }
177             else
178             {
179 0         0 croak "don't know how to add $item";
180             }
181             }
182              
183              
184              
185             sub _add_timer
186             {
187 76     76   199 my ($this, $timer) = @_;
188              
189 76         271 $timer->loop ($this);
190 76         168 $this->poller->add ($timer->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
191              
192 76 100       349 if (!$timer->running)
193             {
194 2         11 $timer->reset();
195             }
196              
197 76         166 push @{$this->timers}, $timer;
  76         141  
198             }
199              
200              
201              
202             sub _add_event
203             {
204 19     19   40 my ($this, $event) = @_;
205              
206 19         45 $this->poller->add ($event->read_handle, ZMQ::Raw->ZMQ_POLLIN);
207              
208 19 100       83 if ($event->timeout)
209             {
210 2         9 $event->timer (ZMQ::Raw::Timer->new ($this->context,
211             after => $event->timeout)
212             );
213 2         6 $this->poller->add ($event->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
214             }
215              
216 19         45 push @{$this->events}, $event;
  19         43  
217             }
218              
219              
220              
221             sub _add_promise
222             {
223 1     1   4 my ($this, $promise) = @_;
224              
225 1         3 push @{$this->promises}, $promise;
  1         3  
226             }
227              
228              
229              
230             sub _add_handle
231             {
232 4     4   12 my ($this, $handle) = @_;
233              
234 4         9 my $events = 0;
235 4 50       11 if ($handle->on_readable)
236             {
237 4         29 $events |= ZMQ::Raw->ZMQ_POLLIN;
238             }
239 4 50       12 if ($handle->on_writable)
240             {
241 0         0 $events |= ZMQ::Raw->ZMQ_POLLOUT;
242             }
243 4 50       13 if ($handle->timeout)
244             {
245 4         11 $handle->timer (ZMQ::Raw::Timer->new ($this->context,
246             after => $handle->timeout)
247             );
248 4         15 $this->poller->add ($handle->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
249             }
250              
251 4         23 $handle->loop ($this);
252 4         10 $this->poller->add ($handle->handle, $events);
253              
254 4         11 push @{$this->handles}, $handle;
  4         8  
255             }
256              
257              
258              
259             sub remove
260             {
261 45     45 1 138 my ($this, $item) = @_;
262              
263 45 100       392 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    50          
    50          
264             {
265 30         91 $this->_remove_timer ($item);
266             }
267             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
268             {
269 0         0 $this->_remove_handle ($item);
270             }
271             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
272             {
273 15         51 $this->_remove_event ($item);
274             }
275             else
276             {
277 0         0 croak "don't know how to remove $item";
278             }
279             }
280              
281              
282              
283             sub _remove_timer
284             {
285 78     78   178 my ($this, $timer) = @_;
286              
287 78         153 my @left;
288 78         130 foreach my $t (@{$this->timers})
  78         243  
289             {
290 120 100       377 if ($timer == $t)
291             {
292 76         212 my $socket = $timer->timer->socket;
293 76         518 $socket->recv (ZMQ::Raw->ZMQ_DONTWAIT);
294 76         296 $this->poller->remove ($socket);
295 76         352 next;
296             }
297              
298 44         188 push @left, $t;
299             }
300              
301 78         270 $this->timers (\@left);
302             }
303              
304              
305              
306             sub _remove_handle
307             {
308 4     4   14 my ($this, $handle) = @_;
309              
310 4         9 my @left;
311 4         8 foreach my $h (@{$this->handles})
  4         11  
312             {
313 4 50       14 if ($h == $handle)
314             {
315 4         11 $this->poller->remove ($handle->handle);
316              
317 4         14 my $timer = $handle->timer;
318 4 50       98 if ($timer)
319             {
320 4         14 $this->poller->remove ($timer->socket);
321 4         23 $timer->cancel();
322             }
323              
324 4         20 next;
325             }
326              
327 0         0 push @left, $h;
328             }
329              
330 4         15 $this->handles (\@left);
331             }
332              
333              
334              
335             sub _remove_event
336             {
337 21     21   56 my ($this, $event) = @_;
338              
339 21         38 my @left;
340 21         96 foreach my $e (@{$this->events})
  21         51  
341             {
342 26 100       83 if ($e == $event)
343             {
344 18         50 $this->poller->remove ($event->read_handle);
345              
346 18         64 my $timer = $event->timer;
347 18 100       93 if ($timer)
348             {
349 1         5 $this->poller->remove ($timer->socket);
350 1         6 $timer->cancel();
351             }
352              
353 18         60 next;
354             }
355              
356 8         26 push @left, $e;
357             }
358              
359 21         64 $this->events (\@left);
360             }
361              
362              
363              
364             sub _dispatch_handles
365             {
366 52     52   163 my ($this) = @_;
367              
368 52         102 foreach my $handle (@{$this->handles})
  52         181  
369             {
370 4         18 my $events = $this->poller->events ($handle->handle);
371 4 100       17 if ($events)
372             {
373 2         7 $this->_remove_handle ($handle);
374              
375 2 50       7 if ($events & ZMQ::Raw->ZMQ_POLLIN)
    0          
376             {
377 2         6 my $readable = $handle->on_readable;
378 2 50       5 &{$readable} ($handle, $this) if $readable;
  2         7  
379             }
380             elsif ($events & ZMQ::Raw->ZMQ_POLLOUT)
381             {
382 0         0 my $writable = $handle->on_writable;
383 0 0       0 &{$writable} ($handle, $this) if $writable;
  0         0  
384             }
385              
386 2         57 return 1;
387             }
388              
389 2 50       11 if ($handle->timer)
390             {
391 2         8 my $events = $this->poller->events ($handle->timer->socket);
392 2 50       11 if ($events)
393             {
394 2         12 $this->_remove_handle ($handle);
395              
396 2         12 my $timeout = $handle->on_timeout;
397 2 50       13 &{$timeout} ($handle, $this) if $timeout;
  2         13  
398              
399 2         23 return 1;
400             }
401             }
402             }
403              
404 48         414 return 0;
405             }
406              
407              
408              
409             sub _dispatch_events
410             {
411 58     58   293 my ($this) = @_;
412              
413 58         140 foreach my $event (@{$this->events})
  58         216  
414             {
415 62         239 my $events = $this->poller->events ($event->read_handle);
416 62 100       317 if ($events)
417             {
418 5         26 $event->reset();
419 5         47 $this->_remove_event ($event);
420              
421 5         21 my $set = $event->on_set;
422 5 50       22 &{$set} ($event, $this) if $set;
  5         24  
423 5         31 return 1;
424             }
425              
426 57 100       273 if ($event->timer)
427             {
428 4         15 my $events = $this->poller->events ($event->timer->socket);
429 4 100       19 if ($events)
430             {
431 1         10 $event->reset();
432 1         8 $this->_remove_event ($event);
433              
434 1         7 my $timeout = $event->on_timeout;
435 1 50       5 &{$timeout} ($event, $this) if $timeout;
  1         6  
436              
437 1         10 return 1;
438             }
439             }
440             }
441              
442 52         515 return 0;
443             }
444              
445              
446              
447             sub _dispatch_timers
448             {
449 48     48   178 my ($this) = @_;
450              
451 48         101 foreach my $timer (@{$this->timers})
  48         154  
452             {
453 87         394 my $socket = $timer->timer->socket;
454 87         247 my $events = $this->poller->events ($socket);
455 87 100       287 if ($events)
456             {
457 48         212 $this->_remove_timer ($timer);
458              
459 48         226 my $timeout = $timer->on_timeout;
460 48 50       171 &{$timeout} ($timer, $this) if ($timeout);
  48         250  
461              
462 48 100       328 if ($timer->timer->running())
463             {
464 37         142 $this->_add_timer ($timer);
465             }
466              
467 48         210 return 1;
468             }
469             }
470              
471 0         0 return 0;
472             }
473              
474              
475              
476             sub _cancel_timers
477             {
478 15     15   53 my ($this) = @_;
479              
480             AGAIN:
481 19         42 foreach my $timer (@{$this->timers})
  19         45  
482             {
483 4         42 $timer->cancel();
484 4         21 goto AGAIN;
485             }
486             }
487              
488              
489              
490             sub _cancel_events
491             {
492 15     15   33 my ($this) = @_;
493              
494 15         34 foreach my $event (@{$this->events})
  15         43  
495             {
496 1         5 my $events = $this->poller->events ($event->read_handle);
497 1         5 $this->poller->remove ($event->read_handle);
498              
499 1 50       5 if ($event->timer)
500             {
501 1         5 $event->timer->cancel();
502 1         6 $this->poller->remove ($event->timer->socket);
503             }
504             }
505              
506 15         73 $this->events ([]);
507             }
508              
509              
510              
511             sub _cancel_handles
512             {
513 15     15   44 my ($this) = @_;
514              
515 15         29 foreach my $handle (@{$this->handles})
  15         37  
516             {
517 0         0 $this->poller->remove ($handle->handle);
518              
519 0 0       0 if ($handle->timer)
520             {
521 0         0 $handle->timer->cancel();
522 0         0 $this->poller->remove ($handle->timer->socket);
523             }
524             }
525              
526 15         54 $this->handles ([]);
527             }
528              
529              
530              
531             sub _clear_promises
532             {
533 15     15   38 my ($this) = @_;
534              
535 15         40 $this->promises ([]);
536             }
537              
538              
539              
540             sub terminate
541             {
542 13     13 1 2468 my ($this) = @_;
543              
544 13         48 $this->tevent->set;
545             }
546              
547             =for Pod::Coverage context handles events poller timers promises terminated tevent
548              
549             =head1 AUTHOR
550              
551             Jacques Germishuys
552              
553             =head1 LICENSE AND COPYRIGHT
554              
555             Copyright 2017 Jacques Germishuys.
556              
557             This program is free software; you can redistribute it and/or modify it
558             under the terms of either: the GNU General Public License as published
559             by the Free Software Foundation; or the Artistic License.
560              
561             See http://dev.perl.org/licenses/ for more information.
562              
563             =cut
564              
565             1; # End of ZMQ::Raw::Loop