File Coverage

blib/lib/Object/Remote/MiniLoop.pm
Criterion Covered Total %
statement 131 166 78.9
branch 19 36 52.7
condition 6 9 66.6
subroutine 17 42 40.4
pod 0 9 0.0
total 173 262 66.0


line stmt bran cond sub pod time code
1             package Object::Remote::MiniLoop;
2              
3 13     13   5222 use IO::Select;
  13         18037  
  13         705  
4 13     13   76 use Time::HiRes qw(time);
  13         24  
  13         122  
5 13     13   5336 use Object::Remote::Logging qw( :log :dlog router );
  13         40  
  13         83  
6 13     13   95 use Moo;
  13         17  
  13         59  
7              
8             BEGIN {
9 13     13   7189 router()->exclude_forwarding
10             }
11              
12             # this is ro because we only actually set it using local in sub run
13             has is_running => (is => 'ro', clearer => 'stop');
14             #maximum duration that select() will block - undef means indefinite,
15             #0 means no blocking, otherwise maximum time in seconds
16             has block_duration => ( is => 'rw' );
17              
18             has _read_watches => (is => 'ro', default => sub { {} });
19             has _read_select => (is => 'ro', default => sub { IO::Select->new });
20              
21             has _write_watches => (is => 'ro', default => sub { {} });
22             has _write_select => (is => 'ro', default => sub { IO::Select->new });
23              
24             has _timers => (is => 'ro', default => sub { [] });
25              
26             sub watch_io {
27 40     40 0 279 my ($self, %watch) = @_;
28 40         83 my $fh = $watch{handle};
29 40     0   491 Dlog_debug { "Adding IO watch for $_" } $fh;
  0         0  
30              
31 40 100       626 if (my $cb = $watch{on_read_ready}) {
32 20     0   147 log_trace { "IO watcher is registering with select for reading" };
  0         0  
33 20         322 $self->_read_select->add($fh);
34 20         968 $self->_read_watches->{$fh} = $cb;
35             }
36 40 100       153 if (my $cb = $watch{on_write_ready}) {
37 20     0   208 log_trace { "IO watcher is registering with select for writing" };
  0         0  
38 20         388 $self->_write_select->add($fh);
39 20         1523 $self->_write_watches->{$fh} = $cb;
40             }
41 40         521 return;
42             }
43              
44             sub unwatch_io {
45 21     21 0 149 my ($self, %watch) = @_;
46 21         50 my $fh = $watch{handle};
47 21     0   267 Dlog_debug { "Removing IO watch for $_" } $fh;
  0         0  
48 21 100       354 if ($watch{on_read_ready}) {
49 2     0   28 log_trace { "IO watcher is removing read from select()" };
  0         0  
50 2         46 $self->_read_select->remove($fh);
51 2         159 delete $self->_read_watches->{$fh};
52             }
53 21 100       83 if ($watch{on_write_ready}) {
54 19     0   101 log_trace { "IO watcher is removing write from select()" };
  0         0  
55 19         312 $self->_write_select->remove($fh);
56 19         1187 delete $self->_write_watches->{$fh};
57             }
58 21         3155 return;
59             }
60              
61             sub _sort_timers {
62 26     26   81 my ($self, @new) = @_;
63 26         99 my $timers = $self->_timers;
64              
65 26     0   171 log_trace { "Sorting timers" };
  0         0  
66              
67 26         246 @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
  26         80  
  37         117  
  26         192  
68 26         54 return;
69             }
70              
71             sub watch_time {
72 26     26 0 120 my ($self, %watch) = @_;
73 26         47 my $at;
74              
75 26     0   231 Dlog_trace { "watch_time() invoked with $_" } \%watch;
  0         0  
76              
77 26 50       332 if (exists($watch{after})) {
    0          
78 26         156 $at = time() + $watch{after};
79             } elsif (exists($watch{at})) {
80 0         0 $at = $watch{at};
81             } else {
82 0         0 die "watch_time requires after or at";
83             }
84              
85 26 50       98 die "watch_time requires code" unless my $code = $watch{code};
86 26         133 my $timers = $self->_timers;
87 26         64 my $new = [ $at => $code ];
88 26         157 $self->_sort_timers($new);
89 26     0   273 log_debug { "Created new timer with id '$new' that expires at '$at'" };
  0         0  
90 26         459 return "$new";
91             }
92              
93             sub unwatch_time {
94 1     1 0 5 my ($self, $id) = @_;
95 1     0   15 log_trace { "Removing timer with id of '$id'" };
  0         0  
96 1         44 @$_ = grep !($_ eq $id), @$_ for $self->_timers;
97 1         7 return;
98             }
99              
100             sub _next_timer_expires_delay {
101 1170     1170   1673 my ($self) = @_;
102 1170         1933 my $timers = $self->_timers;
103 1170         1895 my $delay_max = $self->block_duration;
104              
105 1170 50       2165 return $delay_max unless @$timers;
106 1170         3508 my $duration = $timers->[0]->[0] - time;
107              
108 1170     0   5190 log_trace { "next timer fires in '$duration' seconds" };
  0         0  
109              
110 1170 50 33     12321 if ($duration < 0) {
    50          
111 0         0 $duration = 0;
112             } elsif (defined $delay_max && $duration > $delay_max) {
113 0         0 $duration = $delay_max;
114             }
115              
116 1170         2418 return $duration;
117             }
118              
119             sub loop_once {
120 1170     1170 0 1873 my ($self) = @_;
121 1170         2316 my $read = $self->_read_watches;
122 1170         2042 my $write = $self->_write_watches;
123 1170         1450 my $read_count = 0;
124 1170         1468 my $write_count = 0;
125 1170         2591 my @c = caller;
126 1170         2316 my $wait_time = $self->_next_timer_expires_delay;
127             log_trace {
128 0 0   0   0 sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
129             scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' )
130 1170         4749 };
131 1170         15570 my ($readable, $writeable) = IO::Select->select(
132             $self->_read_select, $self->_write_select, undef, $wait_time
133             );
134             log_trace {
135 0 0   0   0 my $readable_count = defined $readable ? scalar(@$readable) : 0;
136 0 0       0 my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
137 0         0 "Run loop: select returned readable:$readable_count writeable:$writable_count";
138 1170         13189963 };
139             # I would love to trap errors in the select call but IO::Select doesn't
140             # differentiate between an error and a timeout.
141             # -- no, love, mst.
142              
143 1170     0   14538 log_trace { "Reading from ready filehandles" };
  0         0  
144 1170         10444 foreach my $fh (@$readable) {
145 193 50       1638 next unless $read->{$fh};
146 193         491 $read_count++;
147 193         1232 $read->{$fh}();
148             #FIXME this is a rough workaround for race conditions that can cause deadlocks
149             #under load
150 193         601 last;
151             }
152 1170     0   4255 log_trace { "Writing to ready filehandles" };
  0         0  
153 1170         10552 foreach my $fh (@$writeable) {
154 971 50       2731 next unless $write->{$fh};
155 971         1156 $write_count++;
156 971         2961 $write->{$fh}();
157             #FIXME this is a rough workaround for race conditions that can cause deadlocks
158             #under load
159 971         9492 last;
160             }
161              
162             #moving the timers above the read() section exposes a deadlock
163 1170     0   5688 log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
  0         0  
164 1170         11556 my $timers = $self->_timers;
165 1170         3212 my $now = time();
166 1170     0   4048 log_trace { "Checking timers" };
  0         0  
167 1170   100     13419 while (@$timers and $timers->[0][0] <= $now) {
168 6         12 my $active = $timers->[0];
169 6     0   50 Dlog_trace { "Found timer that needs to be executed: '$active'" };
  0         0  
170 6         104 shift(@$timers);
171              
172             #execute the timer
173 6         35 $active->[1]->();
174             }
175              
176 1170     0   3990 log_trace { "Run loop: single loop is completed" };
  0         0  
177 1170         12585 return;
178             }
179              
180             sub run {
181 175     175 0 406 my ($self) = @_;
182 175     0   914 log_trace { "Run loop: run() invoked" };
  0         0  
183 175         2136 local $self->{is_running} = 1;
184 175         732 while ($self->is_running) {
185 1170         2970 $self->loop_once;
186             }
187 175     0   931 log_trace { "Run loop: run() completed" };
  0         0  
188 175         2115 return;
189             }
190              
191             sub new_future {
192 313     313 0 1720 return Future->new;
193             }
194              
195             our @await;
196              
197             sub await {
198 175     175 0 2042 my ($self, $f) = @_;
199 175     0   1446 log_trace { my $ir = $f->is_ready; "await_future() invoked; is_ready: $ir" };
  0         0  
  0         0  
200 175 50       2953 return $f if $f->is_ready;
201 175         2407 require Object::Remote;
202 175         725 my $loop = Object::Remote->current_loop;
203             {
204 175         328 local @await = (@await, $f);
  175         627  
205             $f->on_ready(sub {
206 175     175   4368 log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
  0         0  
  0         0  
207 175 50       2666 if ($f == $await[-1]) {
208 175         965 log_trace { "This future is not waiting on anything so calling stop on the run loop" };
  0         0  
209 175         6465 $loop->stop;
210             }
211 175         1434 });
212 175     0   5508 log_trace { "Starting run loop for newly created future" };
  0         0  
213 175         6061 $loop->run;
214             }
215 175 100 66     942 if (@await and $await[-1]->is_ready) {
216 57     0   657 log_trace { "Last future in await list was ready, stopping run loop" };
  0         0  
217 57         1516 $loop->stop;
218             }
219 175     0   992 log_trace { "await_future() returning" };
  0         0  
220 175         2051 return $f;
221             }
222              
223             sub await_all {
224 2     2 0 6 my $self = shift;
225 2         49 $self->await(Future->wait_all(@_));
226 2         21 return;
227             }
228              
229             1;