File Coverage

blib/lib/ZeroMQ/Poller/Timer.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package ZeroMQ::Poller::Timer;
2              
3 4     4   74254 use strict;
  4         9  
  4         194  
4 4     4   22 use warnings;
  4         7  
  4         171  
5              
6             our $VERSION = '0.01';
7              
8 4     4   5251 use threads;
  0            
  0            
9             use ZeroMQ qw/ ZMQ_POLLIN ZMQ_PAIR /;
10              
11             my $zmq_ctxt = ZeroMQ::Context->new();
12             my $testmode = 0;
13              
14             # Nope, not using Moose or even Class::Accessor 'antlers'
15             # for this first pass. It's a lite weight little module
16             # and I'd like to keep it that way for now.
17              
18             sub new {
19             my $class = shift;
20             my $self = {@_};
21             $testmode ||= $self->{'test'};
22              
23             if ( !$self->{'name'} ) {
24             _pe("constuctor requires a 'name' field.");
25             return;
26             }
27              
28             if ( !defined $self->{'after'} ) {
29             _pe("constructor requires a 'after' field.");
30             return;
31             }
32              
33             if ( $self->{'after'} !~ /^\d+$/ ) {
34             _pe("the 'after' field must be an integer.");
35             return;
36             }
37              
38             if ( defined $self->{'interval'} && $self->{'interval'} !~ /^\d+$/ ) {
39             _pe("the 'interval' field must be an integer.");
40             return;
41             }
42              
43             $self->{'_addr'} = "inproc://" . $self->{'name'};
44             $self->{'_sock'} = $zmq_ctxt->socket(ZMQ_PAIR);
45             $self->{'_sock'}->bind( $self->{'_addr'} );
46              
47             my $obj = bless $self, $class;
48              
49             $obj->start unless $self->{'pause'};
50              
51             return $obj;
52             }
53              
54             sub start {
55             my $self = shift;
56             my $thread = threads->create( \&_timer, $self );
57              
58             $thread->detach;
59             }
60              
61             sub name { shift->{'name'} }
62              
63             sub socket { shift->{'_sock'} }
64              
65             sub reset { shift->socket->recv }
66              
67             sub poll_hash {
68             my $self = shift;
69              
70             return {
71             name => $self->name,
72             socket => $self->socket,
73             events => ZMQ_POLLIN,
74             };
75             }
76              
77             sub _timer {
78             my $self = shift;
79             my $after = $self->{'after'};
80             my $int = $self->{'interval'};
81             my $sock = $zmq_ctxt->socket(ZMQ_PAIR);
82              
83             $sock->connect( $self->{'_addr'} );
84              
85             sleep $after;
86              
87             if ( !$int ) {
88             $sock->send(1);
89             return;
90             }
91              
92             while (1) {
93             $sock->send(1);
94             sleep $int;
95             }
96             }
97              
98             # '_pe' is short for 'print error'. te he.
99             sub _pe {
100             return if $testmode;
101             print STDERR __PACKAGE__ . ': ' . (shift) . "\n";
102             }
103              
104             1;
105              
106             __END__