File Coverage

blib/lib/Mojo/IOLoop.pm
Criterion Covered Total %
statement 134 138 97.1
branch 40 44 90.9
condition 32 44 72.7
subroutine 40 42 95.2
pod 16 16 100.0
total 262 284 92.2


line stmt bran cond sub pod time code
1             package Mojo::IOLoop;
2 67     67   633501 use Mojo::Base 'Mojo::EventEmitter';
  67         164  
  67         513  
3              
4             # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5             # used. Like the death ray."
6 67     67   599 use Carp qw(croak);
  67         128  
  67         5933  
7 67     67   39138 use Mojo::IOLoop::Client;
  67         298  
  67         560  
8 67     67   41111 use Mojo::IOLoop::Server;
  67         284  
  67         593  
9 67     67   41305 use Mojo::IOLoop::Stream;
  67         266  
  67         640  
10 67     67   47003 use Mojo::IOLoop::Subprocess;
  67         313  
  67         645  
11 67     67   42704 use Mojo::Reactor::Poll;
  67         333  
  67         990  
12 67     67   524 use Mojo::Util qw(md5_sum steady_time);
  67         155  
  67         4936  
13 67     67   506 use Scalar::Util qw(blessed weaken);
  67         179  
  67         4972  
14              
15 67   50 67   568 use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
  67         170  
  67         233886  
16              
17             has max_accepts => 0;
18             has max_connections => 1000;
19             has reactor => sub {
20             my $class = Mojo::Reactor::Poll->detect;
21             warn "-- Reactor initialized ($class)\n" if DEBUG;
22             return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
23             };
24              
25             # Ignore PIPE signal
26             $SIG{PIPE} = 'IGNORE';
27              
28             # Initialize singleton reactor early
29             __PACKAGE__->singleton->reactor;
30              
31             sub acceptor {
32 369     369 1 7474 my ($self, $acceptor) = (_instance(shift), @_);
33              
34             # Find acceptor for id
35 369 100       1890 return $self->{acceptors}{$acceptor} unless ref $acceptor;
36              
37             # Connect acceptor with reactor
38 179         819 $self->{acceptors}{my $id = $self->_id} = $acceptor->reactor($self->reactor);
39              
40             # Allow new acceptor to get picked up
41 179         697 $self->_not_accepting->_maybe_accepting;
42              
43 179         1020 return $id;
44             }
45              
46             sub client {
47 238     238 1 1495 my ($self, $cb) = (_instance(shift), pop);
48              
49 238         1402 my $id = $self->_id;
50 238         1004 my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new(reactor => $self->reactor);
51              
52 238         515 weaken $self;
53             $client->on(
54             connect => sub {
55 233     233   841 delete $self->{out}{$id}{client};
56 233         2238 my $stream = Mojo::IOLoop::Stream->new(pop);
57 233         1325 $self->_stream($stream => $id);
58 233         1040 $self->$cb(undef, $stream);
59             }
60 238         2242 );
61 238     3   1903 $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
  3         9  
  3         10  
62 238         1526 $client->connect(@_);
63              
64 238         1660 return $id;
65             }
66              
67 3533     3533 1 8830 sub is_running { _instance(shift)->reactor->is_running }
68              
69             sub next_tick {
70 1891     1891 1 12580 my ($self, $cb) = (_instance(shift), @_);
71 1891         4297 weaken $self;
72 1891     1891   4877 return $self->reactor->next_tick(sub { $self->$cb });
  1891         6408  
73             }
74              
75             sub one_tick {
76 1381     1381 1 5870 my $self = _instance(shift);
77 1381 100       4252 croak 'Mojo::IOLoop already running' if $self->is_running;
78 1380         4314 $self->reactor->one_tick;
79             }
80              
81 9     9 1 759 sub recurring { shift->_timer(recurring => @_) }
82              
83             sub remove {
84 529     529 1 1677 my ($self, $id) = (_instance(shift), @_);
85 529   100     2917 my $c = $self->{in}{$id} || $self->{out}{$id};
86 529 100 100     2549 if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
  325         1555  
87 204         1376 $self->_remove($id);
88             }
89              
90             sub reset {
91 4   100 4 1 65 my ($self, $options) = (_instance(shift), shift // {});
92              
93 4         25 $self->emit('reset')->stop;
94 4 100       14 if ($options->{freeze}) {
95 1         3 state @frozen;
96 1         8 push @frozen, {%$self};
97 1         4 delete $self->{reactor};
98             }
99 3         8 else { $self->reactor->reset }
100              
101 4         33 delete @$self{qw(accepting acceptors events in out stop)};
102             }
103              
104             sub server {
105 179     179 1 10220 my ($self, $cb) = (_instance(shift), pop);
106              
107 179         2063 my $server = Mojo::IOLoop::Server->new;
108 179         525 weaken $self;
109             $server->on(
110             accept => sub {
111 235     235   2360 my $stream = Mojo::IOLoop::Stream->new(pop);
112 235         1182 $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
113              
114             # Enforce connection limit (randomize to improve load balancing)
115 235 100       1622 if (my $max = $self->max_accepts) {
116 1   33     12 $self->{accepts} //= $max - int rand $max / 2;
117 1 50       7 $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
118             }
119              
120             # Stop accepting if connection limit has been reached
121 235 100       1013 $self->_not_accepting if $self->_limit;
122             }
123 179         1778 );
124 179         1615 $server->listen(@_);
125              
126 178         818 return $self->acceptor($server);
127             }
128              
129 1519     1519 1 35890 sub singleton { state $loop = shift->new }
130              
131             sub start {
132 1077     1077 1 16334 my $self = _instance(shift);
133 1077 100       3568 croak 'Mojo::IOLoop already running' if $self->is_running;
134 1074         3484 $self->reactor->start;
135             }
136              
137 1087     1087 1 4033 sub stop { _instance(shift)->reactor->stop }
138              
139             sub stop_gracefully {
140 3     3 1 22 my $self = _instance(shift)->_not_accepting;
141 3 100 66     41 ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
142             }
143              
144             sub stream {
145 7632     7632 1 21808 my ($self, $stream) = (_instance(shift), @_);
146 7632 100       17528 return $self->_stream($stream => $self->_id) if ref $stream;
147 7631   100     35367 my $c = $self->{in}{$stream} || $self->{out}{$stream} // {};
      100        
148 7631         54280 return $c->{stream};
149             }
150              
151             sub subprocess {
152 0     0 1 0 my $subprocess = Mojo::IOLoop::Subprocess->new(ioloop => _instance(shift));
153 0 0       0 return @_ ? $subprocess->run(@_) : $subprocess;
154             }
155              
156 54     54 1 465 sub timer { shift->_timer(timer => @_) }
157              
158             sub _id {
159 653     653   1233 my $self = shift;
160 653         1088 my $id;
161 653   33     1210 do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
  653   33     2652  
162 653         18844 return $id;
163             }
164              
165 559   100 559   944 sub _in { scalar keys %{shift->{in} // {}} }
  559         3854  
166              
167 17986 100   17986   61243 sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
168              
169 555 100   555   2560 sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
170              
171             sub _maybe_accepting {
172 715     715   1238 my $self = shift;
173 715 100 100     2992 return if $self->{accepting} || $self->_limit;
174 318   50     668 $_->start for values %{$self->{acceptors} // {}};
  318         1961  
175 318         2152 $self->{accepting} = 1;
176             }
177              
178             sub _not_accepting {
179 323     323   644 my $self = shift;
180 323 100       1192 return $self unless delete $self->{accepting};
181 243   50     579 $_->stop for values %{$self->{acceptors} // {}};
  243         1738  
182 243         836 return $self;
183             }
184              
185 0   0 0   0 sub _out { scalar keys %{shift->{out} // {}} }
  0         0  
186              
187             sub _remove {
188 604     604   1418 my ($self, $id) = @_;
189              
190             # Timer
191 604 50       1850 return undef unless my $reactor = $self->reactor;
192 604 100       1998 return undef if $reactor->remove($id);
193              
194             # Acceptor
195 578 100       2444 return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id};
196              
197             # Connection
198 440 100 100     2565 return undef unless delete $self->{in}{$id} || delete $self->{out}{$id};
199 400 100 100     1697 return $self->stop if $self->{stop} && !$self->_in;
200 398         1515 $self->_maybe_accepting;
201 398         1877 warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
202             }
203              
204             sub _stream {
205 469     469   1621 my ($self, $stream, $id, $server) = @_;
206              
207             # Connect stream with reactor
208 469 100       1694 $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream->reactor($self->reactor);
209 469         811 warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
210 469         2130 weaken $self;
211 469 100   401   3503 $stream->on(close => sub { $self && $self->_remove($id) });
  401         2086  
212 469         2318 $stream->start;
213              
214 469         2033 return $id;
215             }
216              
217             sub _timer {
218 63     63   207 my ($self, $method, $after, $cb) = (_instance(shift), @_);
219 63         180 weaken $self;
220 63     19736   252 return $self->reactor->$method($after => sub { $self->$cb });
  19736         55052  
221             }
222              
223             1;
224              
225             =encoding utf8
226              
227             =head1 NAME
228              
229             Mojo::IOLoop - Minimalistic event loop
230              
231             =head1 SYNOPSIS
232              
233             use Mojo::IOLoop;
234              
235             # Listen on port 3000
236             Mojo::IOLoop->server({port => 3000} => sub ($loop, $stream, $id) {
237             $stream->on(read => sub ($stream, $bytes) {
238             # Process input chunk
239             say $bytes;
240              
241             # Write response
242             $stream->write('HTTP/1.1 200 OK');
243             });
244             });
245              
246             # Connect to port 3000
247             my $id = Mojo::IOLoop->client({port => 3000} => sub ($loop, $err, $stream) {
248             $stream->on(read => sub ($stream, $bytes) {
249             # Process input
250             say "Input: $bytes";
251             });
252              
253             # Write request
254             $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
255             });
256              
257             # Add a timer
258             Mojo::IOLoop->timer(5 => sub ($loop) { $loop->remove($id) });
259              
260             # Start event loop if necessary
261             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
262              
263             =head1 DESCRIPTION
264              
265             L is a very minimalistic event loop based on L, it has been reduced to the absolute
266             minimal feature set required to build solid and scalable non-blocking clients and servers.
267              
268             Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and
269             need to be tuned for better scalability. The C environment variable should also be used to select the best
270             possible L backend, which usually defaults to the not very scalable C
271              
272             LIBEV_FLAGS=1 # select
273             LIBEV_FLAGS=2 # poll
274             LIBEV_FLAGS=4 # epoll (Linux)
275             LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
276             LIBEV_FLAGS=64 # Linux AIO
277              
278             The event loop will be resilient to time jumps if a monotonic clock is available through L. A TLS
279             certificate and key are also built right in, to make writing test servers as easy as possible. Also note that for
280             convenience the C signal will be set to C when L is loaded.
281              
282             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
283             optional modules L (4.32+), L (0.15+), L (0.64+) and L
284             (2.009+) will be used automatically if possible. Individual features can also be disabled with the C,
285             C and C environment variables.
286              
287             See L for more.
288              
289             =head1 EVENTS
290              
291             L inherits all events from L and can emit the following new ones.
292              
293             =head2 finish
294              
295             $loop->on(finish => sub ($loop) {...});
296              
297             Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be
298             closed.
299              
300             =head2 reset
301              
302             $loop->on(reset => sub ($loop) {...});
303              
304             Emitted when the event loop is reset, this usually happens after the process is forked to clean up resources that
305             cannot be shared.
306              
307             =head1 ATTRIBUTES
308              
309             L implements the following attributes.
310              
311             =head2 max_accepts
312              
313             my $max = $loop->max_accepts;
314             $loop = $loop->max_accepts(1000);
315              
316             The maximum number of connections this event loop is allowed to accept, before shutting down gracefully without
317             interrupting existing connections, defaults to C<0>. Setting the value to C<0> will allow this event loop to accept new
318             connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing
319             between multiple server processes, and to make sure that not all of them restart at the same time.
320              
321             =head2 max_connections
322              
323             my $max = $loop->max_connections;
324             $loop = $loop->max_connections(100);
325              
326             The maximum number of accepted connections this event loop is allowed to handle concurrently, before stopping to accept
327             new incoming connections, defaults to C<1000>.
328              
329             =head2 reactor
330              
331             my $reactor = $loop->reactor;
332             $loop = $loop->reactor(Mojo::Reactor->new);
333              
334             Low-level event reactor, usually a L or L object with a default subscriber to
335             the event L.
336              
337             # Watch if handle becomes readable or writable
338             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
339             say $writable ? 'Handle is writable' : 'Handle is readable';
340             });
341              
342             # Change to watching only if handle becomes writable
343             Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
344              
345             # Remove handle again
346             Mojo::IOLoop->singleton->reactor->remove($handle);
347              
348             =head1 METHODS
349              
350             L inherits all methods from L and implements the following new ones.
351              
352             =head2 acceptor
353              
354             my $server = Mojo::IOLoop->acceptor($id);
355             my $server = $loop->acceptor($id);
356             my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
357              
358             Get L object for id or turn object into an acceptor.
359              
360             =head2 client
361              
362             my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
363             my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
364             my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
365              
366             Open a TCP/IP or UNIX domain socket connection with L and create a stream object (usually
367             L), takes the same arguments as L.
368              
369             =head2 is_running
370              
371             my $bool = Mojo::IOLoop->is_running;
372             my $bool = $loop->is_running;
373              
374             Check if event loop is running.
375              
376             =head2 next_tick
377              
378             my $undef = Mojo::IOLoop->next_tick(sub ($loop) {...});
379             my $undef = $loop->next_tick(sub ($loop) {...});
380              
381             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
382             method, always returns C.
383              
384             # Perform operation on next reactor tick
385             Mojo::IOLoop->next_tick(sub ($loop) {...});
386              
387             =head2 one_tick
388              
389             Mojo::IOLoop->one_tick;
390             $loop->one_tick;
391              
392             Run event loop until an event occurs.
393              
394             # Don't block longer than 0.5 seconds
395             my $id = Mojo::IOLoop->timer(0.5 => sub ($loop) {});
396             Mojo::IOLoop->one_tick;
397             Mojo::IOLoop->remove($id);
398              
399             =head2 recurring
400              
401             my $id = Mojo::IOLoop->recurring(3 => sub ($loop) {...});
402             my $id = $loop->recurring(0 => sub ($loop) {...});
403             my $id = $loop->recurring(0.25 => sub ($loop) {...});
404              
405             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
406              
407             # Perform operation every 5 seconds
408             Mojo::IOLoop->recurring(5 => sub ($loop) {...});
409              
410             =head2 remove
411              
412             Mojo::IOLoop->remove($id);
413             $loop->remove($id);
414              
415             Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their
416             write buffers.
417              
418             =head2 reset
419              
420             Mojo::IOLoop->reset;
421             $loop->reset;
422             $loop->reset({freeze => 1});
423              
424             Remove everything and stop the event loop.
425              
426             These options are currently available:
427              
428             =over 2
429              
430             =item freeze
431              
432             freeze => 1
433              
434             Freeze the current state of the event loop in time before resetting it. This will prevent active connections from
435             getting closed immediately, which can help with many unintended side effects when processes are forked.
436              
437             =back
438              
439             =head2 server
440              
441             my $id = Mojo::IOLoop->server(port => 3000, sub {...});
442             my $id = $loop->server(port => 3000, sub {...});
443             my $id = $loop->server({port => 3000} => sub {...});
444              
445             Accept TCP/IP and UNIX domain socket connections with L and create stream objects (usually
446             L, takes the same arguments as L.
447              
448             # Listen on random port
449             my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub ($loop, $stream, $id) {...});
450             my $port = Mojo::IOLoop->acceptor($id)->port;
451              
452             =head2 singleton
453              
454             my $loop = Mojo::IOLoop->singleton;
455              
456             The global L singleton, used to access a single shared event loop object from everywhere inside the
457             process.
458              
459             # Many methods also allow you to take shortcuts
460             Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
461             Mojo::IOLoop->start;
462              
463             # Restart active timer
464             my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
465             Mojo::IOLoop->singleton->reactor->again($id);
466              
467             # Turn file descriptor into handle and watch if it becomes readable
468             my $handle = IO::Handle->new_from_fd($fd, 'r');
469             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
470             say $writable ? 'Handle is writable' : 'Handle is readable';
471             })->watch($handle, 1, 0);
472              
473             =head2 start
474              
475             Mojo::IOLoop->start;
476             $loop->start;
477              
478             Start the event loop, this will block until L is called. Note that some reactors stop automatically if there
479             are no events being watched anymore.
480              
481             # Start event loop only if it is not running already
482             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
483              
484             =head2 stop
485              
486             Mojo::IOLoop->stop;
487             $loop->stop;
488              
489             Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running
490             L again.
491              
492             =head2 stop_gracefully
493              
494             Mojo::IOLoop->stop_gracefully;
495             $loop->stop_gracefully;
496              
497             Stop accepting new connections and wait for already accepted connections to be closed, before stopping the event loop.
498              
499             =head2 stream
500              
501             my $stream = Mojo::IOLoop->stream($id);
502             my $stream = $loop->stream($id);
503             my $id = $loop->stream(Mojo::IOLoop::Stream->new);
504              
505             Get L object for id or turn object into a connection.
506              
507             # Increase inactivity timeout for connection to 300 seconds
508             Mojo::IOLoop->stream($id)->timeout(300);
509              
510             =head2 subprocess
511              
512             my $subprocess = Mojo::IOLoop->subprocess;
513             my $subprocess = $loop->subprocess;
514             my $subprocess = $loop->subprocess(sub ($subprocess) {...}, sub ($subprocess, $err, @results) {...});
515              
516             Build L object to perform computationally expensive operations in subprocesses, without
517             blocking the event loop. Callbacks will be passed along to L.
518              
519             # Operation that would block the event loop for 5 seconds
520             Mojo::IOLoop->subprocess->run_p(sub {
521             sleep 5;
522             return '♥', 'Mojolicious';
523             })->then(sub (@results) {
524             say "I $results[0] $results[1]!";
525             })->catch(sub ($err) {
526             say "Subprocess error: $err";
527             });
528              
529             =head2 timer
530              
531             my $id = Mojo::IOLoop->timer(3 => sub ($loop) {...});
532             my $id = $loop->timer(0 => sub ($loop) {...});
533             my $id = $loop->timer(0.25 => sub ($loop) {...});
534              
535             Create a new timer, invoking the callback after a given amount of time in seconds.
536              
537             # Perform operation in 5 seconds
538             Mojo::IOLoop->timer(5 => sub ($loop) {...});
539              
540             =head1 DEBUGGING
541              
542             You can set the C environment variable to get some advanced diagnostics information printed to
543             C.
544              
545             MOJO_IOLOOP_DEBUG=1
546              
547             =head1 SEE ALSO
548              
549             L, L, L.
550              
551             =cut