File Coverage

blib/lib/Mojo/Reactor/UV.pm
Criterion Covered Total %
statement 110 110 100.0
branch 40 46 86.9
condition 5 8 62.5
subroutine 26 26 100.0
pod 9 9 100.0
total 190 199 95.4


line stmt bran cond sub pod time code
1             package Mojo::Reactor::UV;
2 1     1   233679 use Mojo::Base 'Mojo::Reactor::Poll';
  1         8  
  1         6  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::UV';
5              
6 1     1   26440 use Carp 'croak';
  1         1  
  1         35  
7 1     1   5 use Mojo::Util 'md5_sum';
  1         1  
  1         27  
8 1     1   5 use Scalar::Util 'weaken';
  1         2  
  1         27  
9 1     1   441 use UV;
  1         5080  
  1         42  
10 1     1   390 use UV::Poll;
  1         2718  
  1         35  
11 1     1   372 use UV::Timer;
  1         354  
  1         34  
12 1     1   5 use UV::Loop;
  1         1  
  1         34  
13              
14 1   50 1   5 use constant DEBUG => $ENV{MOJO_REACTOR_UV_DEBUG} || 0;
  1         1  
  1         1221  
15              
16             our $VERSION = '1.001';
17              
18             my $UV;
19              
20             # Use UV::Loop singleton for the first instance only
21             sub new {
22 7     7 1 66124 my $self = shift->SUPER::new;
23 7 100       51 if ($UV++) {
24 4         23 $self->{loop} = UV::Loop->new;
25             } else {
26 3         13 $self->{loop} = UV::Loop->default_loop;
27 3         192 $self->{loop_singleton} = 1;
28             }
29 7         334 return $self;
30             }
31              
32 6 100   6   2965 sub DESTROY { undef $UV if shift->{loop_singleton} }
33              
34             sub again {
35 6     6 1 228 my $self = shift;
36 6 100       215 croak 'Timer not active' unless my $timer = $self->{timers}{shift()};
37 5         22 $self->_error($timer->{watcher}->again);
38             }
39              
40             sub io {
41 10     10 1 4983 my ($self, $handle, $cb) = @_;
42 10         28 my $fd = fileno $handle;
43 10         30 $self->{io}{$fd}{cb} = $cb;
44 10         13 warn "-- Set IO watcher for $fd\n" if DEBUG;
45 10         29 return $self->watch($handle, 1, 1);
46             }
47              
48             sub one_tick {
49 40046     40046 1 77604 my $self = shift;
50             # Just one tick
51 40046 100       55296 local $self->{running} = 1 unless $self->{running};
52 40046 100       502264 $self->{loop}->run(UV::Loop::UV_RUN_ONCE) or $self->stop;
53             }
54              
55 4     4 1 493 sub recurring { shift->_timer(1, @_) }
56              
57             sub remove {
58 37     37 1 2628 my ($self, $remove) = @_;
59 37 50       71 return unless defined $remove;
60 37 100       68 if (ref $remove) {
61 5         16 my $fd = fileno $remove;
62 5 100       20 if (exists $self->{io}{$fd}) {
63 3         11 warn "-- Removed IO watcher for $fd\n" if DEBUG;
64 3         9 my $w = delete $self->{io}{$fd}{watcher};
65 3 50       27 $w->close if $w;
66             }
67 5         168 return !!delete $self->{io}{$fd};
68             } else {
69 32 100       69 if (exists $self->{timers}{$remove}) {
70 31         36 warn "-- Removed timer $remove\n" if DEBUG;
71 31         62 my $w = delete $self->{timers}{$remove}{watcher};
72 31 50       100 $w->close if $w;
73             }
74 32         696 return !!delete $self->{timers}{$remove};
75             }
76             }
77              
78             sub reset {
79 2     2 1 10 my $self = shift;
80 2     5   10 $self->{loop}->walk(sub { $_[0]->close });
  5         159  
81 2         47 delete @{$self}{qw(io next_tick next_timer timers)};
  2         15  
82             }
83              
84 33     33 1 9854 sub timer { shift->_timer(0, @_) }
85              
86             sub watch {
87 24     24 1 175 my ($self, $handle, $read, $write) = @_;
88            
89 24         47 my $fd = fileno $handle;
90 24 100       149 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
91            
92 23         31 my $mode = 0;
93 23 100       40 $mode |= UV::Poll::UV_READABLE if $read;
94 23 100       38 $mode |= UV::Poll::UV_WRITABLE if $write;
95            
96 23         26 my $w;
97 23 100       48 unless ($w = $io->{watcher}) { $w = $io->{watcher} = UV::Poll->new(loop => $self->{loop}, fd => $fd); }
  9         46  
98            
99 23 100       1320 if ($mode == 0) { $self->_error($w->stop); }
  2         22  
100             else {
101 21         54 weaken $self;
102             my $cb = sub {
103 39949     39949   74360 my ($w, $status, $events) = @_;
104 39949 50       58769 return $self->_error($status) if $status < 0;
105 39949 100       107763 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0)
106             if UV::Poll::UV_READABLE & $events;
107             $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1)
108 39949 50 66     123057 if UV::Poll::UV_WRITABLE & $events && $self->{io}{$fd};
109 21         85 };
110 21         58 $self->_error($w->start($mode, $cb));
111             }
112            
113 23         57 return $self;
114             }
115              
116             sub _error {
117 65     65   2237 my ($self, $code) = @_;
118 65 50       130 $self->emit(error => sprintf "UV error: %s", UV::strerror($code)) if $code < 0;
119 65         115 return $code;
120             }
121              
122             sub _id {
123 37     37   45 my $self = shift;
124 37         41 my $id;
125 37         50 do { $id = md5_sum 't' . $self->{loop}->now() . rand 999 } while $self->{timers}{$id};
  37         435  
126 37         68 return $id;
127             }
128              
129             sub _timer {
130 37     37   74 my ($self, $recurring, $after, $cb) = @_;
131 37         55 $after *= 1000; # Intervals in milliseconds
132 37         51 my $recur_after = $after;
133             # Timer will not repeat with (integer) interval of 0
134 37 100 66     100 $recur_after = 1 if $recurring and $after < 1;
135            
136 37         62 my $id = $self->_id;
137 37         92 weaken $self;
138             my $wrapper = sub {
139 199 100   199   602 $self->remove($id) unless $recurring;
140 199         460 $self->_try('Timer', $cb);
141 37         124 };
142 37         137 my $w = $self->{timers}{$id}{watcher} = UV::Timer->new(loop => $self->{loop});
143 37         3737 $self->_error($w->start($after, $recur_after, $wrapper));
144            
145 37         40 if (DEBUG) {
146             my $is_recurring = $recurring ? ' (recurring)' : '';
147             my $seconds = $after / 1000;
148             warn "-- Set timer $id after $seconds seconds$is_recurring\n";
149             }
150            
151 37         91 return $id;
152             }
153              
154             sub _try {
155 43811     43811   61646 my ($self, $what, $cb) = (shift, shift, shift);
156 43811 100       50877 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  43811         83443  
  43810         137887  
157             }
158              
159             1;
160              
161             =encoding utf8
162              
163             =head1 NAME
164              
165             Mojo::Reactor::UV - UV backend for Mojo::Reactor
166              
167             =head1 SYNOPSIS
168              
169             use Mojo::Reactor::UV;
170              
171             # Watch if handle becomes readable or writable
172             my $reactor = Mojo::Reactor::UV->new;
173             $reactor->io($first => sub {
174             my ($reactor, $writable) = @_;
175             say $writable ? 'First handle is writable' : 'First handle is readable';
176             });
177              
178             # Change to watching only if handle becomes writable
179             $reactor->watch($first, 0, 1);
180              
181             # Turn file descriptor into handle and watch if it becomes readable
182             my $second = IO::Handle->new_from_fd($fd, 'r');
183             $reactor->io($second => sub {
184             my ($reactor, $writable) = @_;
185             say $writable ? 'Second handle is writable' : 'Second handle is readable';
186             })->watch($second, 1, 0);
187              
188             # Add a timer
189             $reactor->timer(15 => sub {
190             my $reactor = shift;
191             $reactor->remove($first);
192             $reactor->remove($second);
193             say 'Timeout!';
194             });
195              
196             # Start reactor if necessary
197             $reactor->start unless $reactor->is_running;
198              
199             # Or in an application using Mojo::IOLoop
200             use Mojo::Reactor::UV;
201             use Mojo::IOLoop;
202              
203             # Or in a Mojolicious application
204             $ MOJO_REACTOR=Mojo::Reactor::UV hypnotoad script/myapp
205              
206             =head1 DESCRIPTION
207              
208             L is an event reactor for L that uses
209             C. The usage is exactly the same as other L
210             implementations such as L. L will be
211             used as the default backend for L if it is loaded before
212             L or any module using the loop. However, when invoking a
213             L application through L or L, the reactor must
214             be set as the default by setting the C environment variable to
215             C.
216              
217             =head1 EVENTS
218              
219             L inherits all events from L.
220              
221             =head1 METHODS
222              
223             L inherits all methods from L and
224             implements the following new ones.
225              
226             =head2 new
227              
228             my $reactor = Mojo::Reactor::UV->new;
229              
230             Construct a new L object.
231              
232             =head2 again
233              
234             $reactor->again($id);
235              
236             Restart timer. Note that this method requires an active timer.
237              
238             =head2 io
239              
240             $reactor = $reactor->io($handle => sub {...});
241              
242             Watch handle for I/O events, invoking the callback whenever handle becomes
243             readable or writable.
244              
245             # Callback will be invoked twice if handle becomes readable and writable
246             $reactor->io($handle => sub {
247             my ($reactor, $writable) = @_;
248             say $writable ? 'Handle is writable' : 'Handle is readable';
249             });
250              
251             =head2 one_tick
252              
253             $reactor->one_tick;
254              
255             Run reactor until an event occurs or no events are being watched anymore. Note
256             that this method can recurse back into the reactor, so you need to be careful.
257              
258             # Don't block longer than 0.5 seconds
259             my $id = $reactor->timer(0.5 => sub {});
260             $reactor->one_tick;
261             $reactor->remove($id);
262              
263             =head2 recurring
264              
265             my $id = $reactor->recurring(0.25 => sub {...});
266              
267             Create a new recurring timer, invoking the callback repeatedly after a given
268             amount of time in seconds.
269              
270             =head2 remove
271              
272             my $bool = $reactor->remove($handle);
273             my $bool = $reactor->remove($id);
274              
275             Remove handle or timer.
276              
277             =head2 reset
278              
279             $reactor->reset;
280              
281             Remove all handles and timers.
282              
283             =head2 timer
284              
285             my $id = $reactor->timer(0.5 => sub {...});
286              
287             Create a new timer, invoking the callback after a given amount of time in
288             seconds.
289              
290             =head2 watch
291              
292             $reactor = $reactor->watch($handle, $readable, $writable);
293              
294             Change I/O events to watch handle for with true and false values. Note that
295             this method requires an active I/O watcher.
296              
297             # Watch only for readable events
298             $reactor->watch($handle, 1, 0);
299              
300             # Watch only for writable events
301             $reactor->watch($handle, 0, 1);
302              
303             # Watch for readable and writable events
304             $reactor->watch($handle, 1, 1);
305              
306             # Pause watching for events
307             $reactor->watch($handle, 0, 0);
308              
309             =head1 CAVEATS
310              
311             When using L with L, the event loop must be controlled by
312             L or L, such as with the methods L,
313             L, and L. Starting or stopping the event loop through
314             L will not provide required functionality to L applications.
315              
316             Care should be taken that file descriptors are not closed while being watched
317             by the reactor. They can be safely closed after calling L with
318             C and C set to 0, or after removing the handle with
319             L or L.
320              
321             On windows, C can only watch sockets, not regular filehandles.
322              
323             =head1 BUGS
324              
325             Report any issues on the public bugtracker.
326              
327             =head1 AUTHOR
328              
329             Dan Book, C
330              
331             =head1 COPYRIGHT AND LICENSE
332              
333             Copyright 2015, Dan Book.
334              
335             This library is free software; you may redistribute it and/or modify it under
336             the terms of the Artistic License version 2.0.
337              
338             =head1 SEE ALSO
339              
340             L, L, L