File Coverage

blib/lib/MHFS/EventLoop/Poll/Base.pm
Criterion Covered Total %
statement 35 145 24.1
branch 0 38 0.0
condition 0 5 0.0
subroutine 12 27 44.4
pod 0 12 0.0
total 47 227 20.7


line stmt bran cond sub pod time code
1             package MHFS::EventLoop::Poll::Base v0.7.0;
2 1     1   748 use 5.014;
  1         4  
3 1     1   6 use strict; use warnings;
  1     1   2  
  1         25  
  1         5  
  1         15  
  1         71  
4 1     1   7 use feature 'say';
  1         2  
  1         124  
5 1     1   591 use POSIX ":sys_wait_h";
  1         9003  
  1         6  
6 1     1   2261 use IO::Poll qw(POLLIN POLLOUT POLLHUP);
  1         3  
  1         76  
7 1     1   7 use Time::HiRes qw( usleep clock_gettime CLOCK_REALTIME CLOCK_MONOTONIC);
  1         1  
  1         11  
8 1     1   195 use Scalar::Util qw(looks_like_number);
  1         2  
  1         73  
9 1     1   7 use Data::Dumper;
  1         2  
  1         66  
10 1     1   1957 use Devel::Peek;
  1         826  
  1         7  
11             #use Devel::Refcount qw( refcount );
12              
13 1     1   151 use constant POLLRDHUP => 0;
  1         2  
  1         101  
14 1     1   7 use constant ALWAYSMASK => (POLLRDHUP | POLLHUP);
  1         3  
  1         3163  
15              
16             # You must provide event handlers for the events you are listening for
17             # return undef to have them removed from poll's structures
18              
19             sub _decode_status {
20 0     0     my ($rc) = @_;
21 0 0         print "$rc: normal exit with code ". WEXITSTATUS($rc)."\n" if WIFEXITED( $rc);
22 0 0         print "$rc: terminated with signal ".WTERMSIG( $rc)."\n" if WIFSIGNALED($rc);
23 0 0         print "$rc: stopped with signal ". WSTOPSIG( $rc)."\n" if WIFSTOPPED( $rc);
24             }
25              
26             sub new {
27 0     0 0   my ($class) = @_;
28 0           my %self = ('poll' => IO::Poll->new(), 'fh_map' => {}, 'timers' => [], 'children' => {}, 'deadchildren' => []);
29 0           bless \%self, $class;
30              
31             $SIG{CHLD} = sub {
32 0     0     while((my $child = waitpid(-1, WNOHANG)) > 0) {
33 0           my ($wstatus, $exitcode) = ($?, $?>> 8);
34 0 0         if(defined $self{'children'}{$child}) {
35 0           say "PID $child reaped (func) $exitcode";
36 0           push @{$self{'deadchildren'}}, [$self{'children'}{$child}, $child, $wstatus];
  0            
37 0           $self{'children'}{$child} = undef;
38             }
39             else {
40 0           say "PID $child reaped (No func) $exitcode";
41             }
42             }
43 0           };
44              
45 0           return \%self;
46             }
47              
48             sub register_child {
49 0     0 0   my ($self, $pid, $cb) = @_;
50 0           $self->{'children'}{$pid} = $cb;
51             }
52              
53             sub run_dead_children_callbacks {
54 0     0 0   my ($self) = @_;
55 0           while(my $chld = shift(@{$self->{'deadchildren'}})) {
  0            
56 0           say "PID " . $chld->[1] . ' running SIGCHLD cb';
57 0           $chld->[0]($chld->[2]);
58             }
59             }
60              
61             sub set {
62 0     0 0   my ($self, $handle, $obj, $events) = @_;
63 0           $self->{'poll'}->mask($handle, $events);
64 0           $self->{'fh_map'}{$handle} = $obj;
65             }
66              
67             sub getEvents {
68 0     0 0   my ($self, $handle) = @_;
69 0           return $self->{'poll'}->mask($handle);
70             }
71              
72             sub remove {
73 0     0 0   my ($self, $handle) = @_;
74 0           $self->{'poll'}->remove($handle);
75 0           $self->{'fh_map'}{$handle} = undef;
76             }
77              
78              
79             sub _insert_timer {
80 0     0     my ($self, $timer) = @_;
81 0           my $i;
82 0   0       for($i = 0; defined($self->{'timers'}[$i]) && ($timer->{'desired'} >= $self->{'timers'}[$i]{'desired'}); $i++) { }
83 0           splice @{$self->{'timers'}}, $i, 0, ($timer);
  0            
84 0           return $i;
85             }
86              
87              
88             # all times are relative, is 0 is set as the interval, it will be run every main loop iteration
89             # return undef in the callback to delete the timer
90             sub add_timer {
91 0     0 0   my ($self, $start, $interval, $callback, $id) = @_;
92 0           my $current_time = clock_gettime(CLOCK_MONOTONIC);
93 0           my $desired = $current_time + $start;
94 0           my $timer = { 'desired' => $desired, 'interval' => $interval, 'callback' => $callback };
95 0 0         $timer->{'id'} = $id if(defined $id);
96 0           return _insert_timer($self, $timer);
97             }
98              
99             sub remove_timer_by_id {
100 0     0 0   my ($self, $id) = @_;
101 0           my $lastindex = scalar(@{$self->{'timers'}}) - 1;
  0            
102 0           for my $i (0 .. $lastindex) {
103 0 0         next if(! defined $self->{'timers'}[$i]{'id'});
104 0 0         if($self->{'timers'}[$i]{'id'} == $id) {
105             #say "Removing timer with id: $id";
106 0           splice(@{$self->{'timers'}}, $i, 1);
  0            
107 0           return;
108             }
109             }
110 0           say "unable to remove timer $id, not found";
111             }
112              
113             sub requeue_timers {
114 0     0 0   my ($self, $timers, $current_time) = @_;
115 0           foreach my $timer (@$timers) {
116 0           $timer->{'desired'} = $current_time + $timer->{'interval'};
117 0           _insert_timer($self, $timer);
118             }
119             }
120              
121             sub check_timers {
122 0     0 0   my ($self) = @_;
123 0           my @requeue_timers;
124 0           my $timerhit = 0;
125 0           my $current_time = clock_gettime(CLOCK_MONOTONIC);
126 0           while(my $timer = shift (@{$self->{'timers'}}) ) {
  0            
127 0 0         if($current_time >= $timer->{'desired'}) {
128 0           $timerhit = 1;
129 0 0         if(defined $timer->{'callback'}->($timer, $current_time, $self)) { # callback may change interval
130 0           push @requeue_timers, $timer;
131             }
132             }
133             else {
134 0           unshift @{$self->{'timers'}}, $timer;
  0            
135 0           last;
136             }
137             }
138 0           $self->requeue_timers(\@requeue_timers, $current_time);
139             }
140              
141             sub do_poll {
142 0     0 0   my ($self, $loop_interval, $poll) = @_;
143 0           my $pollret = $poll->poll($loop_interval);
144 0 0         if($pollret > 0){
    0          
    0          
145 0           foreach my $handle ($poll->handles()) {
146 0           my $revents = $poll->events($handle);
147 0           my $obj = $self->{'fh_map'}{$handle};
148 0 0         if($revents & POLLIN) {
149             #say "read Ready " .$$;
150 0 0         if(! defined($obj->onReadReady)) {
151 0           $self->remove($handle);
152 0           say "poll has " . scalar ( $self->{'poll'}->handles) . " handles";
153 0           next;
154             }
155             }
156              
157 0 0         if($revents & POLLOUT) {
158             #say "writeReady";
159 0 0         if(! defined($obj->onWriteReady)) {
160 0           $self->remove($handle);
161 0           say "poll has " . scalar ( $self->{'poll'}->handles) . " handles";
162 0           next;
163             }
164             }
165              
166 0 0         if($revents & (POLLHUP | POLLRDHUP )) {
167 0           say "Hangup $handle, before ". scalar ( $self->{'poll'}->handles);
168 0           $obj->onHangUp();
169 0           $self->remove($handle);
170 0           say "poll has " . scalar ( $self->{'poll'}->handles) . " handles";
171             }
172             }
173              
174             }
175             elsif($pollret == 0) {
176             #say "pollret == 0";
177             }
178             elsif(! $!{EINTR}){
179 0           say "Poll ERROR $!";
180             #return undef;
181             }
182              
183 0           $self->run_dead_children_callbacks;
184             }
185              
186             sub run {
187 0     0 0   my ($self, $loop_interval) = @_;
188 0   0       my $default_lp_interval = $loop_interval // -1;
189 0           my $poll = $self->{'poll'};
190 0           for(;;)
191             {
192 0           check_timers($self);
193 0           print "do_poll $$";
194 0 0         if($self->{'timers'}) {
195 0           say " timers " . scalar(@{$self->{'timers'}}) . ' handles ' . scalar($self->{'poll'}->handles());
  0            
196             }
197             else {
198 0           print "\n";
199             }
200             # we don't need to expire until a timer is expiring
201 0 0         if(@{$self->{'timers'}}) {
  0            
202 0           $loop_interval = $self->{'timers'}[0]{'desired'} - clock_gettime(CLOCK_MONOTONIC);
203             }
204             else {
205 0           $loop_interval = $default_lp_interval;
206             }
207 0           do_poll($self, $loop_interval, $poll);
208             }
209             }
210              
211             1;