File Coverage

blib/lib/Object/Remote/MiniLoop.pm
Criterion Covered Total %
statement 114 169 67.4
branch 19 36 52.7
condition 4 6 66.6
subroutine 13 45 28.8
pod 0 10 0.0
total 150 266 56.3


line stmt bran cond sub pod time code
1             package Object::Remote::MiniLoop;
2              
3 13     13   7000 use IO::Select;
  13         22588  
  13         813  
4 13     13   91 use Time::HiRes qw(time);
  13         26  
  13         130  
5 13     13   6620 use Object::Remote::Logging qw( :log :dlog router );
  13         57  
  13         138  
6 13     13   98 use Moo;
  13         26  
  13         66  
7              
8             BEGIN {
9 13     13   8689 $SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } };
  0         0  
  0         0  
10              
11 13         69 router()->exclude_forwarding
12             }
13              
14             # this is ro because we only actually set it using local in sub run
15             has is_running => (is => 'ro', clearer => 'stop');
16             #maximum duration that select() will block - undef means indefinite,
17             #0 means no blocking, otherwise maximum time in seconds
18             has block_duration => ( is => 'rw' );
19              
20             has _read_watches => (is => 'ro', default => sub { {} });
21             has _read_select => (is => 'ro', default => sub { IO::Select->new });
22              
23             has _write_watches => (is => 'ro', default => sub { {} });
24             has _write_select => (is => 'ro', default => sub { IO::Select->new });
25              
26             has _timers => (is => 'ro', default => sub { [] });
27              
28             sub pass_watches_to {
29 0     0 0 0 my ($self, $new_loop) = @_;
30 0     0   0 log_debug { "passing watches to new run loop" };
  0         0  
31 0         0 foreach my $fh ($self->_read_select->handles) {
32             $new_loop->watch_io(
33             handle => $fh,
34 0         0 on_read_ready => $self->_read_watches->{$fh}
35             );
36             }
37 0         0 foreach my $fh ($self->_write_select->handles) {
38             $new_loop->watch_io(
39             handle => $fh,
40 0         0 on_write_ready => $self->_write_watches->{$fh}
41             );
42             }
43             }
44              
45             sub watch_io {
46 40     40 0 651 my ($self, %watch) = @_;
47 40         180 my $fh = $watch{handle};
48 40     0   925 Dlog_debug { "Adding IO watch for $_" } $fh;
  0         0  
49              
50 40 100       1178 if (my $cb = $watch{on_read_ready}) {
51 20     0   306 log_trace { "IO watcher is registering with select for reading" };
  0         0  
52 20         437 $self->_read_select->add($fh);
53 20         1544 $self->_read_watches->{$fh} = $cb;
54             }
55 40 100       453 if (my $cb = $watch{on_write_ready}) {
56 20     0   387 log_trace { "IO watcher is registering with select for writing" };
  0         0  
57 20         860 $self->_write_select->add($fh);
58 20         2359 $self->_write_watches->{$fh} = $cb;
59             }
60 40         1806 return;
61             }
62              
63             sub unwatch_io {
64 21     21 0 177 my ($self, %watch) = @_;
65 21         67 my $fh = $watch{handle};
66 21     0   351 Dlog_debug { "Removing IO watch for $_" } $fh;
  0         0  
67 21 100       415 if ($watch{on_read_ready}) {
68 2     0   16 log_trace { "IO watcher is removing read from select()" };
  0         0  
69 2         50 $self->_read_select->remove($fh);
70 2         144 delete $self->_read_watches->{$fh};
71             }
72 21 100       100 if ($watch{on_write_ready}) {
73 19     0   118 log_trace { "IO watcher is removing write from select()" };
  0         0  
74 19         366 $self->_write_select->remove($fh);
75 19         1340 delete $self->_write_watches->{$fh};
76             }
77 21         8003 return;
78             }
79              
80             sub _sort_timers {
81 26     26   135 my ($self, @new) = @_;
82 26         114 my $timers = $self->_timers;
83              
84 26     0   243 log_trace { "Sorting timers" };
  0         0  
85              
86 26         363 @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
  26         160  
  37         208  
  26         277  
87 26         65 return;
88             }
89              
90             sub watch_time {
91 23     23 0 301 my ($self, %watch) = @_;
92 23         57 my $at;
93              
94 23     0   305 Dlog_trace { "watch_time() invoked with $_" } \%watch;
  0         0  
95              
96 23 100       509 if (exists($watch{every})) {
    50          
    0          
97 1         7 $at = time() + $watch{every};
98             } elsif (exists($watch{after})) {
99 22         160 $at = time() + $watch{after};
100             } elsif (exists($watch{at})) {
101 0         0 $at = $watch{at};
102             } else {
103 0         0 die "watch_time requires every, after or at";
104             }
105              
106 23 50       103 die "watch_time requires code" unless my $code = $watch{code};
107 23         208 my $timers = $self->_timers;
108 23         79 my $new = [ $at => $code, $watch{every} ];
109 23         114 $self->_sort_timers($new);
110 23     0   346 log_debug { "Created new timer with id '$new' that expires at '$at'" };
  0         0  
111 23         554 return "$new";
112             }
113              
114             sub unwatch_time {
115 1     1 0 5 my ($self, $id) = @_;
116 1     0   11 log_trace { "Removing timer with id of '$id'" };
  0         0  
117 1         37 @$_ = grep !($_ eq $id), @$_ for $self->_timers;
118 1         5 return;
119             }
120              
121             sub _next_timer_expires_delay {
122 1170     1170   2184 my ($self) = @_;
123 1170         2578 my $timers = $self->_timers;
124 1170         8768 my $delay_max = $self->block_duration;
125              
126 1170 50       7605 return $delay_max unless @$timers;
127 1170         4540 my $duration = $timers->[0]->[0] - time;
128              
129 1170     0   7094 log_trace { "next timer fires in '$duration' seconds" };
  0         0  
130              
131 1170 50 33     18083 if ($duration < 0) {
    50          
132 0         0 $duration = 0;
133             } elsif (defined $delay_max && $duration > $delay_max) {
134 0         0 $duration = $delay_max;
135             }
136              
137 1170         3027 return $duration;
138             }
139              
140             sub loop_once {
141 1170     1170 0 2369 my ($self) = @_;
142 1170         2843 my $read = $self->_read_watches;
143 1170         2740 my $write = $self->_write_watches;
144 1170         2033 my $read_count = 0;
145 1170         2008 my $write_count = 0;
146 1170         3704 my @c = caller;
147 1170         3046 my $wait_time = $self->_next_timer_expires_delay;
148             log_trace {
149 0 0   0   0 sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
150             scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' )
151 1170         20366 };
152 1170         28282 my ($readable, $writeable) = IO::Select->select(
153             $self->_read_select, $self->_write_select, undef, $wait_time
154             );
155             log_trace {
156 0 0   0   0 my $readable_count = defined $readable ? scalar(@$readable) : 0;
157 0 0       0 my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
158 0         0 "Run loop: select returned readable:$readable_count writeable:$writable_count";
159 1170         17932326 };
160             # I would love to trap errors in the select call but IO::Select doesn't
161             # differentiate between an error and a timeout.
162             # -- no, love, mst.
163              
164 1170     0   21422 log_trace { "Reading from ready filehandles" };
  0         0  
165 1170         15537 foreach my $fh (@$readable) {
166 193 50       1669 next unless $read->{$fh};
167 193         525 $read_count++;
168 193         1397 $read->{$fh}();
169             #FIXME this is a rough workaround for race conditions that can cause deadlocks
170             #under load
171 193         632 last;
172             }
173 1170     0   6099 log_trace { "Writing to ready filehandles" };
  0         0  
174 1170         15274 foreach my $fh (@$writeable) {
175 971 50       3823 next unless $write->{$fh};
176 971         1463 $write_count++;
177 971         4027 $write->{$fh}();
178             #FIXME this is a rough workaround for race conditions that can cause deadlocks
179             #under load
180 971         14299 last;
181             }
182              
183             #moving the timers above the read() section exposes a deadlock
184 1170     0   7767 log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
  0         0  
185 1170         17722 my $timers = $self->_timers;
186 1170         4108 my $now = time();
187 1170     0   5963 log_trace { "Checking timers" };
  0         0  
188 1170   100     18993 while (@$timers and $timers->[0][0] <= $now) {
189 6         19 my $active = $timers->[0];
190 6     0   50 Dlog_trace { "Found timer that needs to be executed: '$active'" };
  0         0  
191              
192 6 100       202 if (defined($active->[2])) {
193             #handle the case of an 'every' timer
194 3         17 $active->[0] = time() + $active->[2];
195 3     0   39 Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
  0         0  
196 3         57 $self->_sort_timers;
197             } else {
198             #it doesn't repeat again so get rid of it
199 3         19 shift(@$timers);
200             }
201              
202             #execute the timer
203 6         47 $active->[1]->();
204             }
205              
206 1170     0   5833 log_trace { "Run loop: single loop is completed" };
  0         0  
207 1170         19578 return;
208             }
209              
210             sub want_run {
211 0     0 0 0 my ($self) = @_;
212 0     0   0 Dlog_debug { "Run loop: Incremeting want_running, is now $_" }
213 0         0 ++$self->{want_running};
214             }
215              
216             sub run_while_wanted {
217 0     0 0 0 my ($self) = @_;
218 0     0   0 log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
  0         0  
  0         0  
219 0         0 $self->loop_once while $self->{want_running};
220 0     0   0 log_debug { "Run loop: run_while_wanted() completed" };
  0         0  
221 0         0 return;
222             }
223              
224             sub want_stop {
225 0     0 0 0 my ($self) = @_;
226 0 0       0 if (! $self->{want_running}) {
227 0     0   0 log_debug { "Run loop: want_stop() was called but want_running was not true" };
  0         0  
228 0         0 return;
229             }
230 0     0   0 Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
231 0         0 --$self->{want_running};
232             }
233              
234             sub run {
235 175     175 0 502 my ($self) = @_;
236 175     0   1462 log_trace { "Run loop: run() invoked" };
  0         0  
237 175         2823 local $self->{is_running} = 1;
238 175         781 while ($self->is_running) {
239 1170         3254 $self->loop_once;
240             }
241 175     0   1156 log_trace { "Run loop: run() completed" };
  0         0  
242 175         2865 return;
243             }
244              
245             1;