File Coverage

blib/lib/Linux/Event/Loop.pm
Criterion Covered Total %
statement 285 384 74.2
branch 67 142 47.1
condition 29 81 35.8
subroutine 42 51 82.3
pod 18 18 100.0
total 441 676 65.2


line stmt bran cond sub pod time code
1             package Linux::Event::Loop;
2 13     13   762311 use v5.36;
  13         42  
3 13     13   52 use strict;
  13         58  
  13         284  
4 13     13   48 use warnings;
  13         35  
  13         845  
5              
6             our $VERSION = '0.012';
7              
8 13     13   61 use Scalar::Util qw(looks_like_number);
  13         18  
  13         706  
9 13     13   60 use Carp qw(croak);
  13         35  
  13         496  
10 13     13   4834 use Linux::Event::Scheduler;
  13         27  
  13         392  
11 13     13   4881 use Linux::Event::Clock;
  13         11697  
  13         363  
12 13     13   4735 use Linux::Event::Timer;
  13         101382  
  13         514  
13 13     13   5640 use Linux::Event::Backend::Epoll;
  13         27  
  13         3525  
14 13     13   4965 use Linux::Event::Watcher;
  13         32  
  13         432  
15 13     13   4841 use Linux::Event::Signal;
  13         55  
  13         467  
16 13     13   5150 use Linux::Event::Wakeup;
  13         30  
  13         399  
17 13     13   5213 use Linux::Event::Pid;
  13         37  
  13         340  
18 13     13   56 use Linux::Event::XS ();
  13         16  
  13         219  
19              
20 13     13   40 use constant READABLE => 0x01;
  13         18  
  13         650  
21 13     13   50 use constant WRITABLE => 0x02;
  13         16  
  13         420  
22 13     13   45 use constant PRIO => 0x04;
  13         35  
  13         422  
23 13     13   48 use constant RDHUP => 0x08;
  13         16  
  13         437  
24 13     13   43 use constant ET => 0x10;
  13         18  
  13         366  
25 13     13   39 use constant ONESHOT => 0x20;
  13         18  
  13         387  
26 13     13   85 use constant ERR => 0x40;
  13         19  
  13         426  
27 13     13   49 use constant HUP => 0x80;
  13         24  
  13         31163  
28              
29 21     21 1 1292219 sub new ($class, %args) {
  21         41  
  21         50  
  21         27  
30 21         49 my $backend = delete $args{backend};
31 21         42 my $clock = delete $args{clock};
32 21         34 my $timer = delete $args{timer};
33 21 100       212 croak "model is no longer supported; Linux::Event is backend-only, not model-selected" if exists $args{model};
34              
35 20 50       46 croak "unknown args: " . join(", ", sort keys %args) if %args;
36              
37 20   66     144 $clock //= Linux::Event::Clock->new(clock => 'monotonic');
38 20         834 for my $m (qw(tick now_ns deadline_in_ns remaining_ns)) {
39 80 50       250 croak "clock missing method '$m'" if !$clock->can($m);
40             }
41              
42 20   66     167 $timer //= Linux::Event::Timer->new;
43 20         1626 for my $m (qw(after disarm read_ticks fh)) {
44 80 50       230 croak "timer missing method '$m'" if !$timer->can($m);
45             }
46              
47 20         58 $backend = _build_backend($backend);
48 20         40 for my $m (qw(watch unwatch run_once)) {
49 60 50       185 croak "backend missing method '$m'" if !$backend->can($m);
50             }
51             # modify is optional in this release; Loop can fall back to unwatch+watch.
52              
53 20         115 my $sched = Linux::Event::Scheduler->new(clock => $clock);
54              
55 20         165 my $self = bless {
56             clock => $clock,
57             timer => $timer,
58             backend => $backend,
59             sched => $sched,
60             running => 0,
61              
62             _watchers => Linux::Event::XS::registry_new(), # fd -> Linux::Event::Watcher
63             _timer_w => undef, # internal timerfd watcher
64             _timer_armed => 0, # whether the Linux timerfd currently has an armed deadline
65             }, $class;
66              
67             # Internal timerfd watcher: read -> dispatch due timers -> rearm kernel timer.
68 20         66 my $t_fh = $timer->fh;
69 20         137 my $t_fd = fileno($t_fh);
70 20 50       53 croak "timer fh has no fileno" if !defined $t_fd;
71              
72 12         27 $self->{_timer_w} = $self->watch(
73             $t_fh,
74 12     12   131 read => sub ($loop, $fh, $w) {
  12         21  
  12         18  
  12         31  
75 12         153 $self->{timer}->read_ticks;
76 12         439 $self->{clock}->tick;
77 12         322 $self->_dispatch_due;
78 12         47 $self->_rearm_timer;
79             },
80 20         151 data => undef,
81             );
82              
83 20         106 return $self;
84             }
85              
86 20     20   30 sub _build_backend ($backend) {
  20         33  
  20         23  
87 20 100       61 return Linux::Event::Backend::Epoll->new if !defined $backend;
88              
89 17 100       42 if (!ref($backend)) {
90 16 50       123 return Linux::Event::Backend::Epoll->new if $backend eq 'epoll';
91 0         0 croak "unknown backend '$backend'";
92             }
93              
94 1         2 return $backend;
95             }
96              
97 0     0 1 0 sub clock ($self) { return $self->{clock} }
  0         0  
  0         0  
  0         0  
98 0     0 1 0 sub timer ($self) { return $self->{timer} }
  0         0  
  0         0  
  0         0  
99 5     5 1 42 sub backend ($self) { return $self->{backend} }
  5         11  
  5         29  
  5         14  
100 2 50   2 1 30 sub backend_name ($self) { return $self->{backend}->can('name') ? $self->{backend}->name : ref($self->{backend}) }
  2         2  
  2         2  
  2         11  
101 0     0 1 0 sub sched ($self) { return $self->{sched} }
  0         0  
  0         0  
  0         0  
102 0 0   0 1 0 sub is_running ($self) { return $self->{running} ? 1 : 0 }
  0         0  
  0         0  
  0         0  
103 56     56   82 sub _public_loop ($self) { return $self }
  56         65  
  56         65  
  56         691  
104              
105             # -- Signals --------------------------------------------------------------
106              
107 3     3 1 2763 sub signal ($self, @args) {
  3         4  
  3         7  
  3         3  
108 3   66     12 return ($self->{_signal} ||= Linux::Event::Signal->new(loop => $self->_public_loop))->signal(@args);
109             }
110              
111             # -- Wakeups --------------------------------------------------------------
112              
113 1     1 1 25 sub waker ($self) {
  1         1  
  1         2  
114 1 50       3 if (!$self->{_waker}) {
115 1         2 my $w = Linux::Event::Wakeup->new(loop => $self->_public_loop);
116 1         3 $self->{_waker} = $w;
117              
118             # Internal watcher: drain wakeups.
119 0         0 $self->watch(
120             $w->fh,
121 0     0   0 read => sub ($loop, $fh, $watcher) {
  0         0  
  0         0  
  0         0  
122 0         0 $w->drain;
123             },
124 1         3 data => undef,
125             );
126             }
127              
128 1         4 return $self->{_waker};
129             }
130              
131             # -- Pidfds ---------------------------------------------------------------
132              
133 0     0 1 0 sub pid ($self, @args) {
  0         0  
  0         0  
  0         0  
134 0   0     0 $self->{pid_adaptor} //= Linux::Event::Pid->new(loop => $self->_public_loop);
135 0         0 return $self->{pid_adaptor}->pid(@args);
136             }
137              
138             # -- Timers ---------------------------------------------------------------
139              
140 16     16 1 585 sub after ($self, $seconds, $cb) {
  16         56  
  16         20  
  16         17  
  16         17  
141 16 50       33 croak "seconds is required" if !defined $seconds;
142 16 50       30 croak "cb is required" if !$cb;
143 16 50       34 croak "cb must be a coderef" if ref($cb) ne 'CODE';
144              
145 16         49 $self->{clock}->tick;
146              
147 16         157 my $delta_ns = int($seconds * 1_000_000_000);
148 16 50       37 $delta_ns = 0 if $delta_ns < 0;
149              
150 16         47 my $id = $self->{sched}->after_ns($delta_ns, $cb);
151 16         98 $self->_rearm_timer;
152 16         25 return $id;
153             }
154              
155 0     0 1 0 sub at ($self, $deadline_seconds, $cb) {
  0         0  
  0         0  
  0         0  
  0         0  
156 0 0       0 croak "deadline_seconds is required" if !defined $deadline_seconds;
157 0 0       0 croak "cb is required" if !$cb;
158 0 0       0 croak "cb must be a coderef" if ref($cb) ne 'CODE';
159              
160 0         0 my $deadline_ns = int($deadline_seconds * 1_000_000_000);
161              
162 0         0 my $id = $self->{sched}->at_ns($deadline_ns, $cb);
163 0         0 $self->_rearm_timer;
164 0         0 return $id;
165             }
166              
167 1     1 1 5 sub cancel ($self, $id) {
  1         2  
  1         2  
  1         1  
168 1         3 my $ok = $self->{sched}->cancel($id);
169 1 50       4 $self->_rearm_timer if $ok;
170 1         5 return $ok;
171             }
172              
173             # -- Watchers -------------------------------------------------------------
174              
175 37     37 1 2129 sub watch ($self, $fh, %spec) {
  37         40  
  37         71  
  37         73  
  37         280  
176 37 50       73 croak "fh is required" if !$fh;
177              
178 37         58 my $read = delete $spec{read};
179 37         92 my $write = delete $spec{write};
180 37         47 my $error = delete $spec{error};
181 37         73 my $data = delete $spec{data};
182 37         45 my $edge_triggered = delete $spec{edge_triggered};
183 37         49 my $oneshot = delete $spec{oneshot};
184              
185 37 50       64 croak "unknown args: " . join(", ", sort keys %spec) if %spec;
186              
187 37 50 33     211 if (defined $read && ref($read) ne 'CODE') {
188 0         0 croak "read must be a coderef or undef";
189             }
190 37 50 66     78 if (defined $write && ref($write) ne 'CODE') {
191 0         0 croak "write must be a coderef or undef";
192             }
193 37 50 66     93 if (defined $error && ref($error) ne 'CODE') {
194 0         0 croak "error must be a coderef or undef";
195             }
196              
197 37         51 my $fd = fileno($fh);
198 37 50       62 croak "fh has no fileno" if !defined $fd;
199 37         45 $fd = int($fd);
200              
201 37 100       146 if (my $old = Linux::Event::XS::registry_get($self->{_watchers}, $fd)) {
202 2         8 $self->_watcher_cancel($old);
203             }
204              
205 37 100       106 my $w = Linux::Event::XS::watcher_new(
    100          
206             'Linux::Event::Watcher',
207             $self->_public_loop,
208             $fh,
209             $fd,
210             $read,
211             $write,
212             $error,
213             $data,
214             $edge_triggered ? 1 : 0,
215             $oneshot ? 1 : 0,
216             );
217              
218 37         102 my $mask = $self->_watcher_mask($w);
219              
220             # Use the single-call XS install path only for the built-in epoll
221             # backend. Custom/mock backends may not have the internal epoll/watch fields
222             # expected by this private fast path. Socket watchers keep the direct backend
223             # record path below, which is faster for the TCP dispatch workload.
224 37         40 my $can_xs_backend_fast = do {
225 37         54 my $b = $self->{backend};
226             ref($b) eq 'Linux::Event::Backend::Epoll'
227             && exists $b->{ep}
228 37 50 66     204 && exists $b->{watch};
229             };
230              
231 37 100 100     769 if ($can_xs_backend_fast && !-S $fh && Linux::Event::XS::loop_watch_watcher_fast($self, $fh, $fd, $w, $mask)) {
      66        
232 35         123 return $w;
233             }
234              
235 2 100       15 if ($self->{backend}->can('watch_watcher')) {
236             # Loop-created watchers use an XS direct-dispatch backend record.
237             # This avoids allocating one Perl dispatch closure per watcher while keeping
238             # the public Backend->watch($fh, $mask, $cb) callback API available.
239 1         4 Linux::Event::XS::registry_set($self->{_watchers}, $fd, $w);
240 1         3 $self->{backend}->watch_watcher($fh, $mask, $w, _loop => $self->_public_loop, tag => undef);
241 1         2 return $w;
242             }
243              
244 1         2 my $public_loop = $self->_public_loop;
245 1         1 my $owner = $self;
246              
247 0     0   0 my $dispatch = sub ($ignored_loop, $fh_from_backend, $fd2, $mask, $tag) {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
248 0 0       0 my $ww = Linux::Event::XS::registry_get($owner->{_watchers}, $fd2) or return;
249              
250 0         0 my $fhw = $ww->{fh};
251 0 0       0 if (!$fhw) {
252 0         0 $owner->_watcher_cancel($ww);
253 0         0 return;
254             }
255 0         0 my $fnow = fileno($fhw);
256 0 0 0     0 if (!defined $fnow || int($fnow) != $fd2) {
257 0         0 $owner->_watcher_cancel($ww);
258 0         0 return;
259             }
260              
261             # Frozen dispatch contract:
262             # - ERR: call error handler first if installed+enabled; otherwise treat as read+write.
263             # - HUP: also triggers read (EOF detection).
264 0 0       0 my $read_trig = ($mask & READABLE) ? 1 : 0;
265 0 0       0 my $write_trig = ($mask & WRITABLE) ? 1 : 0;
266              
267 0 0       0 if ($mask & ERR) {
268 0 0 0     0 if ($ww->{error_cb} && $ww->{error_enabled}) {
269 0         0 $ww->{error_cb}->($public_loop, $fhw, $ww);
270 0         0 return;
271             }
272 0         0 $read_trig = 1;
273 0         0 $write_trig = 1;
274             }
275              
276 0 0       0 $read_trig = 1 if ($mask & HUP);
277              
278 0 0 0     0 if ($read_trig && $ww->{read_cb} && $ww->{read_enabled}) {
      0        
279 0         0 $ww->{read_cb}->($public_loop, $fhw, $ww);
280             }
281              
282 0         0 my $still = Linux::Event::XS::registry_get($owner->{_watchers}, $fd2);
283 0 0 0     0 if (!$still || $still != $ww) {
284 0         0 return;
285             }
286              
287 0 0 0     0 if ($write_trig && $ww->{write_cb} && $ww->{write_enabled}) {
      0        
288 0         0 $ww->{write_cb}->($public_loop, $fhw, $ww);
289             }
290              
291 0         0 return;
292 1         10 };
293              
294 1         2 $w->{_dispatch_cb} = $dispatch;
295              
296 1         3 Linux::Event::XS::registry_set($self->{_watchers}, $fd, $w);
297              
298 1         3 $self->{backend}->watch($fh, $mask, $dispatch, _loop => $self->_public_loop, tag => undef);
299              
300 1         7 return $w;
301             }
302              
303 37     37   37 sub _watcher_mask ($self, $w) {
  37         40  
  37         40  
  37         34  
304 37         37 my $mask = 0;
305              
306 37 50 33     166 $mask |= READABLE if $w->{read_cb} && $w->{read_enabled};
307 37 50 66     77 $mask |= WRITABLE if $w->{write_cb} && $w->{write_enabled};
308              
309 37 100       106 $mask |= ET if $w->{edge_triggered};
310 37 100       77 $mask |= ONESHOT if $w->{oneshot};
311              
312 37         55 return $mask;
313             }
314              
315 0     0   0 sub _watcher_update ($self, $w) {
  0         0  
  0         0  
  0         0  
316 0 0       0 return 0 if !$w->{active};
317              
318 0         0 my $mask = $self->_watcher_mask($w);
319              
320 0 0       0 if ($self->{backend}->can('modify')) {
321 0         0 return $self->{backend}->modify($w->{fh}, $mask, _loop => $self->_public_loop, tag => undef);
322             }
323              
324             # Fallback: unwatch+watch, preserving dispatch cb.
325 0         0 $self->{backend}->unwatch($w->{fh});
326 0         0 $self->{backend}->watch($w->{fh}, $mask, $w->{_dispatch_cb}, _loop => $self->_public_loop, tag => undef);
327 0         0 return 1;
328             }
329              
330 3     3   3 sub _watcher_cancel ($self, $w) {
  3         5  
  3         4  
  3         4  
331 3 50 33     16 return 0 if !$w || !$w->{active};
332              
333 3         4 my $b = $self->{backend};
334 3 50 33     28 if (ref($b) eq 'Linux::Event::Backend::Epoll'
      33        
335             && exists $b->{ep}
336             && exists $b->{watch}) {
337 3 50       41 return Linux::Event::XS::loop_cancel_watcher($self, $w) ? 1 : 0;
338             }
339              
340             # Compatibility path for custom/mock backends used by tests and advanced
341             # callers. The XS cancellation helper intentionally assumes the built-in
342             # epoll backend layout.
343 0         0 my $fd = $w->{fd};
344 0 0       0 Linux::Event::XS::registry_delete($self->{_watchers}, $fd) if defined $fd;
345 0 0 0     0 $self->{backend}->unwatch($w->{fh}) if $self->{backend}->can('unwatch') && $w->{fh};
346 0         0 $w->{active} = 0;
347 0         0 $w->{fh} = undef;
348 0         0 return 1;
349             }
350              
351 3     3 1 39 sub unwatch ($self, $fh) {
  3         4  
  3         5  
  3         3  
352 3 50       7 return 0 if !$fh;
353              
354 3         6 my $fd = fileno($fh);
355 3 50       6 return 0 if !defined $fd;
356 3         3 $fd = int($fd);
357              
358 3 100       18 my $w = Linux::Event::XS::registry_get($self->{_watchers}, $fd) or return 0;
359 1         5 $self->_watcher_cancel($w);
360              
361 1         4 return 1;
362             }
363              
364 10     10 1 55 sub run ($self) {
  10         17  
  10         15  
365             # run() controls the running flag. run_once() may be called manually even
366             # when the loop is not in run() mode (tests and advanced callers rely on
367             # this), so run_once() must only honor running=0 when run() is active.
368 10         19 local $self->{_in_run} = 1;
369 10         21 $self->{running} = 1;
370              
371 10         52 while ($self->{running}) {
372 19         68 $self->run_once;
373             }
374              
375 10         51 return;
376             }
377              
378 10     10 1 142 sub stop ($self) {
  10         12  
  10         43  
379 10         27 $self->{running} = 0;
380              
381             # If a waker was already created (user called $loop->waker), poke it so a
382             # currently-blocking backend wait can return promptly. This does not create
383             # the waker implicitly (contract: no implicit watcher).
384 10 50       75 if (my $w = $self->{_waker}) {
385 0         0 eval { $w->signal; 1 };
  0         0  
  0         0  
386             }
387              
388 10         71 return;
389             }
390              
391 32     32 1 1023 sub run_once ($self, $timeout_s = undef) {
  32         55  
  32         60  
  32         47  
392             # One syscall per iteration/batch: refresh cached monotonic time.
393 32         184 $self->{clock}->tick;
394              
395             # Snapshot whether we were "running" at entry. This matters because callers
396             # are allowed to pump the loop manually via run_once() without calling run()
397             # (so running may be false). If running *was* true and stop() flips it during
398             # callback dispatch, we must not enter backend wait in the same iteration.
399 32 100       690 my $was_running = $self->{running} ? 1 : 0;
400              
401             # Run any due timer callbacks before blocking.
402 32         119 $self->_dispatch_due;
403              
404             # stop() can be called from a timer callback (or other user callback).
405             # - If we're inside run(), honor running immediately.
406             # - If running was true at entry, also honor it (prevents an extra backend wait)
407             # - If running was false at entry, allow manual pumping.
408 32 100 66     215 return 0 if (!$self->{running} && ($self->{_in_run} || $was_running));
      66        
409              
410 30         101 my $next = $self->{sched}->next_deadline_ns;
411              
412             # Fast path: when the XS timer heap is empty, skip timerfd
413             # rearm/disarm churn and wait directly in the backend. Timer-heavy behavior
414             # stays on the scheduler-aware path below.
415 30 100       80 if (!defined $next) {
416 16 50       56 if ($self->{_timer_armed}) {
417 0         0 $self->{timer}->disarm;
418 0         0 $self->{_timer_armed} = 0;
419             }
420 16         104 return $self->{backend}->run_once($self, $timeout_s);
421             }
422              
423             # If no explicit timeout was provided, derive one from the next scheduled
424             # timer deadline. This is what makes $loop->run() advance timers without
425             # requiring callers to manually pass a timeout.
426 14 50       38 if (!defined $timeout_s) {
427 14         32 my $remain_ns = $self->{clock}->remaining_ns($next);
428 14 50       105 $timeout_s = ($remain_ns <= 0) ? 0 : ($remain_ns / 1_000_000_000);
429             }
430              
431             # Keep timerfd state in sync for users who may be watching the timer fd
432             # directly (or for future backends that integrate it). This is not relied on
433             # for core scheduling.
434 14         49 $self->_rearm_timer;
435              
436             # Re-check after rearm, since user callbacks can run during _rearm_timer()
437             # (e.g. via a custom timer implementation).
438 14 0 0     37 return 0 if (!$self->{running} && ($self->{_in_run} || $was_running));
      33        
439              
440 14         58 return $self->{backend}->run_once($self, $timeout_s);
441             }
442              
443              
444 44     44   59 sub _dispatch_due ($self) {
  44         60  
  44         50  
445 44         281 my @ready = $self->{sched}->pop_expired;
446 44         145 for my $item (@ready) {
447 14         37 my ($id, $cb, $deadline_ns) = @$item;
448              
449             # Timer callbacks are invoked with just ($loop).
450 14         70 $cb->($self->_public_loop);
451             }
452 44         367 return;
453             }
454              
455              
456 13     13   94 use Scalar::Util qw(looks_like_number);
  13         18  
  13         2887  
457              
458 43     43   52 sub _rearm_timer ($self) {
  43         59  
  43         42  
459 43         94 my $next = $self->{sched}->next_deadline_ns;
460              
461 43 100       90 if (!defined $next) {
462 6 50       28 if ($self->{_timer_armed}) {
463 6         31 $self->{timer}->disarm;
464 6         84 $self->{_timer_armed} = 0;
465             }
466 6         54 return;
467             }
468              
469 37         103 my $remain_ns = $self->{clock}->remaining_ns($next);
470              
471 37 50       312 return if !defined $remain_ns;
472 37 50       118 return if !looks_like_number($remain_ns);
473              
474 37 100       60 if ($remain_ns <= 0) {
475             # Minimal non-zero delay (fixed decimal, no exponent).
476 2         8 my $min_s = sprintf('%.9f', 1 / 1_000_000_000);
477 2         7 $self->{timer}->after($min_s);
478 2         61 $self->{_timer_armed} = 1;
479 2         4 return;
480             }
481              
482 35         54 my $after_s = $remain_ns / 1_000_000_000;
483              
484 35 50       61 return if !looks_like_number($after_s);
485              
486             # IMPORTANT: format to fixed decimal so Timer::_num accepts it.
487 35         280 $self->{timer}->after(sprintf('%.9f', $after_s));
488 35         1014 $self->{_timer_armed} = 1;
489              
490 35         86 return;
491             }
492              
493             1;
494              
495             __END__