File Coverage

blib/lib/Mojo/Reactor/UV.pm
Criterion Covered Total %
statement 114 114 100.0
branch 43 50 86.0
condition 8 14 57.1
subroutine 26 26 100.0
pod 9 9 100.0
total 200 213 93.9


line stmt bran cond sub pod time code
1             package Mojo::Reactor::UV;
2 1     1   308088 use Mojo::Base 'Mojo::Reactor::Poll';
  1         12  
  1         10  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::UV';
5              
6 1     1   41546 use Carp 'croak';
  1         3  
  1         49  
7 1     1   6 use Mojo::Util qw(md5_sum steady_time);
  1         3  
  1         44  
8 1     1   6 use Scalar::Util 'weaken';
  1         2  
  1         37  
9 1     1   630 use UV;
  1         6148  
  1         62  
10 1     1   510 use UV::Poll;
  1         3562  
  1         47  
11 1     1   472 use UV::Timer;
  1         488  
  1         46  
12 1     1   6 use UV::Loop;
  1         3  
  1         46  
13              
14 1   50 1   6 use constant DEBUG => $ENV{MOJO_REACTOR_UV_DEBUG} || 0;
  1         2  
  1         1713  
15              
16             our $VERSION = '1.002';
17              
18             my $UV;
19              
20             # Use UV::Loop singleton for the first instance only
21             sub new {
22 7     7 1 90396 my $self = shift->SUPER::new;
23 7 100       66 if ($UV++) {
24 4         28 $self->{loop} = UV::Loop->new;
25             } else {
26 3         23 $self->{loop} = UV::Loop->default_loop;
27 3         265 $self->{loop_singleton} = 1;
28             }
29 7         428 return $self;
30             }
31              
32 6 100   6   4034 sub DESTROY { undef $UV if shift->{loop_singleton} }
33              
34             sub again {
35 8     8 1 380 my ($self, $id, $after) = @_;
36 8 100       327 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
37 7         17 my $w = $timer->{watcher};
38 7 100       21 if (defined $after) {
39 2         10 $after *= 1000; # Intervals in milliseconds
40             # Timer will not repeat with (integer) interval of 0
41 2 50       12 $after = 1 if $after < 1;
42 2         11 $self->_error($w->repeat($after));
43             }
44 7         35 $self->_error($w->again);
45             }
46              
47             sub io {
48 10     10 1 6770 my ($self, $handle, $cb) = @_;
49 10   33     57 my $fd = fileno($handle) // croak 'Handle is closed';
50             # Must use existing watcher if present
51 10         60 $self->{io}{$fd}{cb} = $cb;
52 10         18 warn "-- Set IO watcher for $fd\n" if DEBUG;
53 10         38 return $self->watch($handle, 1, 1);
54             }
55              
56             sub one_tick {
57 35090     35090 1 79412 my $self = shift;
58             # Just one tick
59 35090 100       59870 local $self->{running} = 1 unless $self->{running};
60 35090 100       549498 $self->{loop}->run(UV::Loop::UV_RUN_ONCE) or $self->stop;
61             }
62              
63 5     5 1 656 sub recurring { shift->_timer(1, @_) }
64              
65             sub remove {
66 40     40 1 3629 my ($self, $remove) = @_;
67 40 50       117 return unless defined $remove;
68 40 100       118 if (ref $remove) {
69 5   33     33 my $fd = fileno($remove) // croak 'Handle is closed';
70 5 100       27 if (exists $self->{io}{$fd}) {
71 3         6 warn "-- Removed IO watcher for $fd\n" if DEBUG;
72 3         12 my $w = delete $self->{io}{$fd}{watcher};
73 3 50       42 $w->close if $w;
74             }
75 5         260 return !!delete $self->{io}{$fd};
76             } else {
77 35 100       124 if (exists $self->{timers}{$remove}) {
78 34         61 warn "-- Removed timer $remove\n" if DEBUG;
79 34         112 my $w = delete $self->{timers}{$remove}{watcher};
80 34 50       200 $w->close if $w;
81             }
82 35         1204 return !!delete $self->{timers}{$remove};
83             }
84             }
85              
86             sub reset {
87 4     4 1 1732 my $self = shift;
88 4     7   40 $self->{loop}->walk(sub { $_[0]->close });
  7         252  
89 4         108 $self->SUPER::reset;
90             }
91              
92 35     35 1 15124 sub timer { shift->_timer(0, @_) }
93              
94             sub watch {
95 25     25 1 237 my ($self, $handle, $read, $write) = @_;
96            
97 25         57 my $fd = fileno $handle;
98 25 100       202 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
99            
100 24         44 my $mode = 0;
101 24 100       62 $mode |= UV::Poll::UV_READABLE if $read;
102 24 100       51 $mode |= UV::Poll::UV_WRITABLE if $write;
103            
104 24         35 my $w;
105 24 100       59 unless ($w = $io->{watcher}) { $w = $io->{watcher} = UV::Poll->new(loop => $self->{loop}, fd => $fd); }
  9         57  
106            
107 24 100       1644 if ($mode == 0) { $self->_error($w->stop); }
  2         27  
108             else {
109 22         84 weaken $self;
110             my $cb = sub {
111 35025     35025   86977 my ($w, $status, $events) = @_;
112 35025 50       65765 return $self->_error($status) if $status < 0;
113 35025 100       114278 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0)
114             if UV::Poll::UV_READABLE & $events;
115             $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1)
116 35025 50 66     127552 if UV::Poll::UV_WRITABLE & $events && $self->{io}{$fd};
117 22         125 };
118 22         81 $self->_error($w->start($mode, $cb));
119             }
120            
121 24         78 return $self;
122             }
123              
124             sub _error {
125 73     73   3113 my ($self, $code) = @_;
126 73 50       189 $self->emit(error => sprintf "UV error: %s", UV::strerror($code)) if $code < 0;
127 73         123 return $code;
128             }
129              
130             sub _id {
131 40     40   64 my $self = shift;
132 40         59 my $id;
133 40         120 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  40         185  
134 40         1017 return $id;
135             }
136              
137             sub _timer {
138 40     40   141 my ($self, $recurring, $after, $cb) = @_;
139 40         82 $after *= 1000; # Intervals in milliseconds
140 40         75 my $recur_after = $after;
141             # Timer will not repeat with (integer) interval of 0
142 40 100 100     150 $recur_after = 1 if $recurring and $after < 1;
143            
144 40         99 my $id = $self->_id;
145 40         142 weaken $self;
146             my $wrapper = sub {
147 155 100   155   773 $self->remove($id) unless $recurring;
148 155         490 $self->_try('Timer', $cb);
149 40         188 };
150 40         209 my $w = $self->{timers}{$id}{watcher} = UV::Timer->new(loop => $self->{loop});
151 40         5377 $self->_error($w->start($after, $recur_after, $wrapper));
152            
153 40         60 if (DEBUG) {
154             my $is_recurring = $recurring ? ' (recurring)' : '';
155             my $seconds = $after / 1000;
156             warn "-- Set timer $id after $seconds seconds$is_recurring\n";
157             }
158            
159 40         117 return $id;
160             }
161              
162             sub _try {
163 38337     38337   65822 my ($self, $what, $cb) = (shift, shift, shift);
164 38337 100       57132 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  38337         93396  
  38336         149185  
165             }
166              
167             1;
168              
169             =encoding utf8
170              
171             =head1 NAME
172              
173             Mojo::Reactor::UV - UV backend for Mojo::Reactor
174              
175             =head1 SYNOPSIS
176              
177             use Mojo::Reactor::UV;
178              
179             # Watch if handle becomes readable or writable
180             my $reactor = Mojo::Reactor::UV->new;
181             $reactor->io($first => sub {
182             my ($reactor, $writable) = @_;
183             say $writable ? 'First handle is writable' : 'First handle is readable';
184             });
185              
186             # Change to watching only if handle becomes writable
187             $reactor->watch($first, 0, 1);
188              
189             # Turn file descriptor into handle and watch if it becomes readable
190             my $second = IO::Handle->new_from_fd($fd, 'r');
191             $reactor->io($second => sub {
192             my ($reactor, $writable) = @_;
193             say $writable ? 'Second handle is writable' : 'Second handle is readable';
194             })->watch($second, 1, 0);
195              
196             # Add a timer
197             $reactor->timer(15 => sub {
198             my $reactor = shift;
199             $reactor->remove($first);
200             $reactor->remove($second);
201             say 'Timeout!';
202             });
203              
204             # Start reactor if necessary
205             $reactor->start unless $reactor->is_running;
206              
207             # Or in an application using Mojo::IOLoop
208             use Mojo::Reactor::UV;
209             use Mojo::IOLoop;
210              
211             # Or in a Mojolicious application
212             $ MOJO_REACTOR=Mojo::Reactor::UV hypnotoad script/myapp
213              
214             =head1 DESCRIPTION
215              
216             L is an event reactor for L that uses
217             C. The usage is exactly the same as other L
218             implementations such as L. L will be
219             used as the default backend for L if it is loaded before
220             L or any module using the loop. However, when invoking a
221             L application through L or L, the reactor must
222             be set as the default by setting the C environment variable to
223             C.
224              
225             =head1 EVENTS
226              
227             L inherits all events from L.
228              
229             =head1 METHODS
230              
231             L inherits all methods from L and
232             implements the following new ones.
233              
234             =head2 new
235              
236             my $reactor = Mojo::Reactor::UV->new;
237              
238             Construct a new L object.
239              
240             =head2 again
241              
242             $reactor->again($id);
243             $reactor->again($id, 0.5);
244              
245             Restart timer and optionally change the invocation time. Note that this method
246             requires an active timer.
247              
248             =head2 io
249              
250             $reactor = $reactor->io($handle => sub {...});
251              
252             Watch handle for I/O events, invoking the callback whenever handle becomes
253             readable or writable.
254              
255             # Callback will be invoked twice if handle becomes readable and writable
256             $reactor->io($handle => sub {
257             my ($reactor, $writable) = @_;
258             say $writable ? 'Handle is writable' : 'Handle is readable';
259             });
260              
261             =head2 one_tick
262              
263             $reactor->one_tick;
264              
265             Run reactor until an event occurs or no events are being watched anymore. Note
266             that this method can recurse back into the reactor, so you need to be careful.
267              
268             # Don't block longer than 0.5 seconds
269             my $id = $reactor->timer(0.5 => sub {});
270             $reactor->one_tick;
271             $reactor->remove($id);
272              
273             =head2 recurring
274              
275             my $id = $reactor->recurring(0.25 => sub {...});
276              
277             Create a new recurring timer, invoking the callback repeatedly after a given
278             amount of time in seconds.
279              
280             =head2 remove
281              
282             my $bool = $reactor->remove($handle);
283             my $bool = $reactor->remove($id);
284              
285             Remove handle or timer.
286              
287             =head2 reset
288              
289             $reactor->reset;
290              
291             Remove all handles and timers.
292              
293             =head2 timer
294              
295             my $id = $reactor->timer(0.5 => sub {...});
296              
297             Create a new timer, invoking the callback after a given amount of time in
298             seconds.
299              
300             =head2 watch
301              
302             $reactor = $reactor->watch($handle, $readable, $writable);
303              
304             Change I/O events to watch handle for with true and false values. Note that
305             this method requires an active I/O watcher.
306              
307             # Watch only for readable events
308             $reactor->watch($handle, 1, 0);
309              
310             # Watch only for writable events
311             $reactor->watch($handle, 0, 1);
312              
313             # Watch for readable and writable events
314             $reactor->watch($handle, 1, 1);
315              
316             # Pause watching for events
317             $reactor->watch($handle, 0, 0);
318              
319             =head1 CAVEATS
320              
321             When using L with L, the event loop must be controlled by
322             L or L, such as with the methods
323             L, L, and L. Starting
324             or stopping the event loop through L will not provide required
325             functionality to L applications.
326              
327             Care should be taken that file descriptors are not closed while being watched
328             by the reactor. They can be safely closed after calling L with
329             C and C set to 0, or after removing the handle with
330             L or L.
331              
332             On windows, C can only watch sockets, not regular filehandles.
333              
334             =head1 BUGS
335              
336             Report any issues on the public bugtracker.
337              
338             =head1 AUTHOR
339              
340             Dan Book, C
341              
342             =head1 COPYRIGHT AND LICENSE
343              
344             Copyright 2015, Dan Book.
345              
346             This library is free software; you may redistribute it and/or modify it under
347             the terms of the Artistic License version 2.0.
348              
349             =head1 SEE ALSO
350              
351             L, L, L