File Coverage

blib/lib/ZMQ/Raw/Loop.pm
Criterion Covered Total %
statement 209 224 93.3
branch 53 78 67.9
condition 7 9 77.7
subroutine 32 33 96.9
pod 6 6 100.0
total 307 350 87.7


line stmt bran cond sub pod time code
1             package ZMQ::Raw::Loop;
2             $ZMQ::Raw::Loop::VERSION = '0.37';
3 14     14   75 use strict;
  14         25  
  14         326  
4 14     14   51 use warnings;
  14         140  
  14         281  
5 14     14   57 use Carp;
  14         18  
  14         1109  
6              
7 0     0   0 sub CLONE_SKIP { 1 }
8              
9             my @attributes;
10              
11             BEGIN
12             {
13 14     14   57 @attributes = qw/
14             context
15             poller
16             timers
17             handles
18             promises
19             events
20             terminated
21              
22             tevent
23             /;
24              
25 14     14   81 no strict 'refs';
  14         19  
  14         1061  
26 14         38 foreach my $accessor (@attributes)
27             {
28 112         988 *{$accessor} = sub
29             {
30 1378 100   1378   8643939 @_ > 1 ? $_[0]->{$accessor} = $_[1] : $_[0]->{$accessor}
31 112         286 };
32             }
33             }
34              
35 14     14   79 use ZMQ::Raw;
  14         18  
  14         258  
36 14     14   5200 use ZMQ::Raw::Loop::Event;
  14         28  
  14         347  
37 14     14   5056 use ZMQ::Raw::Loop::Handle;
  14         31  
  14         335  
38 14     14   5101 use ZMQ::Raw::Loop::Promise;
  14         30  
  14         306  
39 14     14   5422 use ZMQ::Raw::Loop::Timer;
  14         28  
  14         23649  
40              
41             =head1 NAME
42              
43             ZMQ::Raw::Loop - Loop class
44              
45             =head1 VERSION
46              
47             version 0.37
48              
49             =head1 DESCRIPTION
50              
51             A L represents an event loop.
52              
53             B: The API of this module is unstable and may change without warning
54             (any change will be appropriately documented in the changelog).
55              
56             =head1 METHODS
57              
58             =head2 new( $context )
59              
60             Create a new event loop
61              
62             =head2 run( )
63              
64             Run the event loop
65              
66             =head2 run_one( )
67              
68             Run until a single event occurs
69              
70             =head2 add( $item )
71              
72             Add C<$item> to the event loop. C<$item> should be a L>,
73             L>, L> or
74             L>.
75              
76             =head2 remove( $item )
77              
78             Remove C<$item> from the event loop.
79              
80             =head2 terminate( )
81              
82             Terminate the event loop
83              
84             =cut
85              
86             sub new
87             {
88 2     2 1 355 my ($this, $context) = @_;
89              
90 2   33     14 my $class = ref ($this) || $this;
91             my $self =
92             {
93             context => $context,
94             poller => ZMQ::Raw::Poller->new,
95             timers => [],
96             handles => [],
97             events => [],
98             promises => [],
99             tevent => ZMQ::Raw::Loop::Event->new ($context,
100             on_set => sub
101             {
102 3     3   12 my ($event, $loop) = @_;
103 3         24 $loop->terminated (1);
104             }
105             )
106 2         91 };
107              
108 2         7 return bless $self, $class;
109             }
110              
111              
112              
113             sub run
114             {
115 15     15 1 583 my ($this) = @_;
116              
117 15         39 $this->terminated (0);
118 15         34 $this->tevent->reset();
119 15         54 $this->add ($this->tevent);
120              
121 15   100     130 while (!$this->terminated && $this->poller->size > 1)
122             {
123 48         132 $this->run_one;
124             }
125              
126 15         44 $this->remove ($this->tevent);
127              
128 15         50 $this->_cancel_timers();
129 15         114 $this->_cancel_events();
130 15         48 $this->_cancel_handles();
131 15         37 $this->_clear_promises();
132             }
133              
134              
135              
136             sub run_one
137             {
138 58     58 1 105 my ($this) = @_;
139              
140 58 50       157 if ($this->poller->size)
141             {
142 58         123 my $count = $this->poller->wait (-1);
143 58 50       2295 if ($count)
144             {
145 58 100 100     347 $this->_dispatch_events() || $this->_dispatch_handles() || $this->_dispatch_timers();
146 58         130 $this->promises ([grep { $_->status == ZMQ::Raw::Loop::Promise->PLANNED } @{$this->promises}]);
  5         21  
  58         165  
147             }
148              
149 58         306 return 1;
150             }
151              
152 0         0 return 0;
153             }
154              
155              
156              
157             sub add
158             {
159 63     63 1 2949 my ($this, $item) = @_;
160              
161 63 100       222 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    100          
    100          
    50          
162             {
163 39         117 $this->_add_timer ($item);
164             }
165             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
166             {
167 4         14 $this->_add_handle ($item);
168             }
169             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
170             {
171 19         44 $this->_add_event ($item);
172             }
173             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Promise')
174             {
175 1         5 $this->_add_promise ($item);
176             }
177             else
178             {
179 0         0 croak "don't know how to add $item";
180             }
181             }
182              
183              
184              
185             sub _add_timer
186             {
187 76     76   150 my ($this, $timer) = @_;
188              
189 76         227 $timer->loop ($this);
190 76         158 $this->poller->add ($timer->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
191              
192 76 100       297 if (!$timer->running)
193             {
194 2         8 $timer->reset();
195             }
196              
197 76         133 push @{$this->timers}, $timer;
  76         145  
198             }
199              
200              
201              
202             sub _add_event
203             {
204 19     19   34 my ($this, $event) = @_;
205              
206 19         36 $this->poller->add ($event->read_handle, ZMQ::Raw->ZMQ_POLLIN);
207              
208 19 100       69 if ($event->timeout)
209             {
210 2         6 $event->timer (ZMQ::Raw::Timer->new ($this->context,
211             after => $event->timeout)
212             );
213 2         5 $this->poller->add ($event->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
214             }
215              
216 19         27 push @{$this->events}, $event;
  19         34  
217             }
218              
219              
220              
221             sub _add_promise
222             {
223 1     1   2 my ($this, $promise) = @_;
224              
225 1         2 push @{$this->promises}, $promise;
  1         3  
226             }
227              
228              
229              
230             sub _add_handle
231             {
232 4     4   8 my ($this, $handle) = @_;
233              
234 4         5 my $events = 0;
235 4 50       11 if ($handle->on_readable)
236             {
237 4         16 $events |= ZMQ::Raw->ZMQ_POLLIN;
238             }
239 4 50       11 if ($handle->on_writable)
240             {
241 0         0 $events |= ZMQ::Raw->ZMQ_POLLOUT;
242             }
243 4 50       20 if ($handle->timeout)
244             {
245 4         10 $handle->timer (ZMQ::Raw::Timer->new ($this->context,
246             after => $handle->timeout)
247             );
248 4         11 $this->poller->add ($handle->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
249             }
250              
251 4         21 $handle->loop ($this);
252 4         7 $this->poller->add ($handle->handle, $events);
253              
254 4         8 push @{$this->handles}, $handle;
  4         9  
255             }
256              
257              
258              
259             sub remove
260             {
261 45     45 1 98 my ($this, $item) = @_;
262              
263 45 100       264 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    50          
    50          
264             {
265 30         111 $this->_remove_timer ($item);
266             }
267             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
268             {
269 0         0 $this->_remove_handle ($item);
270             }
271             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
272             {
273 15         31 $this->_remove_event ($item);
274             }
275             else
276             {
277 0         0 croak "don't know how to remove $item";
278             }
279             }
280              
281              
282              
283             sub _remove_timer
284             {
285 78     78   178 my ($this, $timer) = @_;
286              
287 78         109 my @left;
288 78         109 foreach my $t (@{$this->timers})
  78         156  
289             {
290 120 100       308 if ($timer == $t)
291             {
292 76         299 my $socket = $timer->timer->socket;
293 76         435 $socket->recv (ZMQ::Raw->ZMQ_DONTWAIT);
294 76         298 $this->poller->remove ($socket);
295 76         485 next;
296             }
297              
298 44         104 push @left, $t;
299             }
300              
301 78         183 $this->timers (\@left);
302             }
303              
304              
305              
306             sub _remove_handle
307             {
308 4     4   10 my ($this, $handle) = @_;
309              
310 4         7 my @left;
311 4         6 foreach my $h (@{$this->handles})
  4         14  
312             {
313 4 50       10 if ($h == $handle)
314             {
315 4         10 $this->poller->remove ($handle->handle);
316              
317 4         11 my $timer = $handle->timer;
318 4 50       89 if ($timer)
319             {
320 4         10 $this->poller->remove ($timer->socket);
321 4         19 $timer->cancel();
322             }
323              
324 4         15 next;
325             }
326              
327 0         0 push @left, $h;
328             }
329              
330 4         13 $this->handles (\@left);
331             }
332              
333              
334              
335             sub _remove_event
336             {
337 21     21   42 my ($this, $event) = @_;
338              
339 21         32 my @left;
340 21         29 foreach my $e (@{$this->events})
  21         114  
341             {
342 26 100       57 if ($e == $event)
343             {
344 18         38 $this->poller->remove ($event->read_handle);
345              
346 18         54 my $timer = $event->timer;
347 18 100       47 if ($timer)
348             {
349 1         4 $this->poller->remove ($timer->socket);
350 1         6 $timer->cancel();
351             }
352              
353 18         41 next;
354             }
355              
356 8         18 push @left, $e;
357             }
358              
359 21         54 $this->events (\@left);
360             }
361              
362              
363              
364             sub _dispatch_handles
365             {
366 52     52   124 my ($this) = @_;
367              
368 52         92 foreach my $handle (@{$this->handles})
  52         139  
369             {
370 4         10 my $events = $this->poller->events ($handle->handle);
371 4 100       13 if ($events)
372             {
373 2         6 $this->_remove_handle ($handle);
374              
375 2 50       6 if ($events & ZMQ::Raw->ZMQ_POLLIN)
    0          
376             {
377 2         5 my $readable = $handle->on_readable;
378 2 50       5 &{$readable} ($handle, $this) if $readable;
  2         3  
379             }
380             elsif ($events & ZMQ::Raw->ZMQ_POLLOUT)
381             {
382 0         0 my $writable = $handle->on_writable;
383 0 0       0 &{$writable} ($handle, $this) if $writable;
  0         0  
384             }
385              
386 2         53 return 1;
387             }
388              
389 2 50       8 if ($handle->timer)
390             {
391 2         7 my $events = $this->poller->events ($handle->timer->socket);
392 2 50       6 if ($events)
393             {
394 2         9 $this->_remove_handle ($handle);
395              
396 2         8 my $timeout = $handle->on_timeout;
397 2 50       6 &{$timeout} ($handle, $this) if $timeout;
  2         10  
398              
399 2         16 return 1;
400             }
401             }
402             }
403              
404 48         340 return 0;
405             }
406              
407              
408              
409             sub _dispatch_events
410             {
411 58     58   164 my ($this) = @_;
412              
413 58         109 foreach my $event (@{$this->events})
  58         234  
414             {
415 62         356 my $events = $this->poller->events ($event->read_handle);
416 62 100       375 if ($events)
417             {
418 5         32 $event->reset();
419 5         31 $this->_remove_event ($event);
420              
421 5         16 my $set = $event->on_set;
422 5 50       13 &{$set} ($event, $this) if $set;
  5         97  
423 5         26 return 1;
424             }
425              
426 57 100       200 if ($event->timer)
427             {
428 4         11 my $events = $this->poller->events ($event->timer->socket);
429 4 100       14 if ($events)
430             {
431 1         7 $event->reset();
432 1         6 $this->_remove_event ($event);
433              
434 1         4 my $timeout = $event->on_timeout;
435 1 50       5 &{$timeout} ($event, $this) if $timeout;
  1         6  
436              
437 1         8 return 1;
438             }
439             }
440             }
441              
442 52         283 return 0;
443             }
444              
445              
446              
447             sub _dispatch_timers
448             {
449 48     48   111 my ($this) = @_;
450              
451 48         76 foreach my $timer (@{$this->timers})
  48         134  
452             {
453 87         346 my $socket = $timer->timer->socket;
454 87         206 my $events = $this->poller->events ($socket);
455 87 100       320 if ($events)
456             {
457 48         189 $this->_remove_timer ($timer);
458              
459 48         176 my $timeout = $timer->on_timeout;
460 48 50       144 &{$timeout} ($timer, $this) if ($timeout);
  48         185  
461              
462 48 100       297 if ($timer->timer->running())
463             {
464 37         114 $this->_add_timer ($timer);
465             }
466              
467 48         159 return 1;
468             }
469             }
470              
471 0         0 return 0;
472             }
473              
474              
475              
476             sub _cancel_timers
477             {
478 15     15   35 my ($this) = @_;
479              
480             AGAIN:
481 19         32 foreach my $timer (@{$this->timers})
  19         38  
482             {
483 4         16 $timer->cancel();
484 4         13 goto AGAIN;
485             }
486             }
487              
488              
489              
490             sub _cancel_events
491             {
492 15     15   33 my ($this) = @_;
493              
494 15         25 foreach my $event (@{$this->events})
  15         22  
495             {
496 1         3 my $events = $this->poller->events ($event->read_handle);
497 1         4 $this->poller->remove ($event->read_handle);
498              
499 1 50       4 if ($event->timer)
500             {
501 1         3 $event->timer->cancel();
502 1         5 $this->poller->remove ($event->timer->socket);
503             }
504             }
505              
506 15         35 $this->events ([]);
507             }
508              
509              
510              
511             sub _cancel_handles
512             {
513 15     15   29 my ($this) = @_;
514              
515 15         22 foreach my $handle (@{$this->handles})
  15         34  
516             {
517 0         0 $this->poller->remove ($handle->handle);
518              
519 0 0       0 if ($handle->timer)
520             {
521 0         0 $handle->timer->cancel();
522 0         0 $this->poller->remove ($handle->timer->socket);
523             }
524             }
525              
526 15         54 $this->handles ([]);
527             }
528              
529              
530              
531             sub _clear_promises
532             {
533 15     15   29 my ($this) = @_;
534              
535 15         35 $this->promises ([]);
536             }
537              
538              
539              
540             sub terminate
541             {
542 13     13 1 1353 my ($this) = @_;
543              
544 13         47 $this->tevent->set;
545             }
546              
547             =for Pod::Coverage context handles events poller timers promises terminated tevent
548              
549             =head1 AUTHOR
550              
551             Jacques Germishuys
552              
553             =head1 LICENSE AND COPYRIGHT
554              
555             Copyright 2017 Jacques Germishuys.
556              
557             This program is free software; you can redistribute it and/or modify it
558             under the terms of either: the GNU General Public License as published
559             by the Free Software Foundation; or the Artistic License.
560              
561             See http://dev.perl.org/licenses/ for more information.
562              
563             =cut
564              
565             1; # End of ZMQ::Raw::Loop