line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Mojo::IOLoop; |
2
|
63
|
|
|
63
|
|
351761
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
63
|
|
|
|
|
150
|
|
|
63
|
|
|
|
|
392
|
|
3
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
# "Professor: Amy, technology isn't intrinsically good or evil. It's how it's |
5
|
|
|
|
|
|
|
# used. Like the death ray." |
6
|
63
|
|
|
63
|
|
436
|
use Carp qw(croak); |
|
63
|
|
|
|
|
147
|
|
|
63
|
|
|
|
|
3128
|
|
7
|
63
|
|
|
63
|
|
28398
|
use Mojo::IOLoop::Client; |
|
63
|
|
|
|
|
241
|
|
|
63
|
|
|
|
|
447
|
|
8
|
63
|
|
|
63
|
|
28070
|
use Mojo::IOLoop::Server; |
|
63
|
|
|
|
|
203
|
|
|
63
|
|
|
|
|
958
|
|
9
|
63
|
|
|
63
|
|
29087
|
use Mojo::IOLoop::Stream; |
|
63
|
|
|
|
|
192
|
|
|
63
|
|
|
|
|
694
|
|
10
|
63
|
|
|
63
|
|
28698
|
use Mojo::IOLoop::Subprocess; |
|
63
|
|
|
|
|
224
|
|
|
63
|
|
|
|
|
646
|
|
11
|
63
|
|
|
63
|
|
28417
|
use Mojo::Reactor::Poll; |
|
63
|
|
|
|
|
213
|
|
|
63
|
|
|
|
|
726
|
|
12
|
63
|
|
|
63
|
|
431
|
use Mojo::Util qw(md5_sum steady_time); |
|
63
|
|
|
|
|
161
|
|
|
63
|
|
|
|
|
3257
|
|
13
|
63
|
|
|
63
|
|
388
|
use Scalar::Util qw(blessed weaken); |
|
63
|
|
|
|
|
136
|
|
|
63
|
|
|
|
|
3837
|
|
14
|
|
|
|
|
|
|
|
15
|
63
|
|
50
|
63
|
|
438
|
use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0; |
|
63
|
|
|
|
|
142
|
|
|
63
|
|
|
|
|
171251
|
|
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
has max_accepts => 0; |
18
|
|
|
|
|
|
|
has max_connections => 1000; |
19
|
|
|
|
|
|
|
has reactor => sub { |
20
|
|
|
|
|
|
|
my $class = Mojo::Reactor::Poll->detect; |
21
|
|
|
|
|
|
|
warn "-- Reactor initialized ($class)\n" if DEBUG; |
22
|
|
|
|
|
|
|
return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" }); |
23
|
|
|
|
|
|
|
}; |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
# Ignore PIPE signal |
26
|
|
|
|
|
|
|
$SIG{PIPE} = 'IGNORE'; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
# Initialize singleton reactor early |
29
|
|
|
|
|
|
|
__PACKAGE__->singleton->reactor; |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
sub acceptor { |
32
|
305
|
|
|
305
|
1
|
4797
|
my ($self, $acceptor) = (_instance(shift), @_); |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# Find acceptor for id |
35
|
305
|
100
|
|
|
|
1468
|
return $self->{acceptors}{$acceptor} unless ref $acceptor; |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
# Connect acceptor with reactor |
38
|
147
|
|
|
|
|
552
|
$self->{acceptors}{my $id = $self->_id} = $acceptor->reactor($self->reactor); |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# Allow new acceptor to get picked up |
41
|
147
|
|
|
|
|
545
|
$self->_not_accepting->_maybe_accepting; |
42
|
|
|
|
|
|
|
|
43
|
147
|
|
|
|
|
613
|
return $id; |
44
|
|
|
|
|
|
|
} |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
sub client { |
47
|
196
|
|
|
196
|
1
|
1174
|
my ($self, $cb) = (_instance(shift), pop); |
48
|
|
|
|
|
|
|
|
49
|
196
|
|
|
|
|
635
|
my $id = $self->_id; |
50
|
196
|
|
|
|
|
952
|
my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new(reactor => $self->reactor); |
51
|
|
|
|
|
|
|
|
52
|
196
|
|
|
|
|
694
|
weaken $self; |
53
|
|
|
|
|
|
|
$client->on( |
54
|
|
|
|
|
|
|
connect => sub { |
55
|
191
|
|
|
191
|
|
580
|
delete $self->{out}{$id}{client}; |
56
|
191
|
|
|
|
|
1356
|
my $stream = Mojo::IOLoop::Stream->new(pop); |
57
|
191
|
|
|
|
|
899
|
$self->_stream($stream => $id); |
58
|
191
|
|
|
|
|
780
|
$self->$cb(undef, $stream); |
59
|
|
|
|
|
|
|
} |
60
|
196
|
|
|
|
|
1489
|
); |
61
|
196
|
|
|
3
|
|
1343
|
$client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) }); |
|
3
|
|
|
|
|
11
|
|
|
3
|
|
|
|
|
10
|
|
62
|
196
|
|
|
|
|
1123
|
$client->connect(@_); |
63
|
|
|
|
|
|
|
|
64
|
196
|
|
|
|
|
1191
|
return $id; |
65
|
|
|
|
|
|
|
} |
66
|
|
|
|
|
|
|
|
67
|
3255
|
|
|
3255
|
1
|
6633
|
sub is_running { _instance(shift)->reactor->is_running } |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sub next_tick { |
70
|
1797
|
|
|
1797
|
1
|
10403
|
my ($self, $cb) = (_instance(shift), @_); |
71
|
1797
|
|
|
|
|
5382
|
weaken $self; |
72
|
1797
|
|
|
1797
|
|
4503
|
return $self->reactor->next_tick(sub { $self->$cb }); |
|
1797
|
|
|
|
|
4522
|
|
73
|
|
|
|
|
|
|
} |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
sub one_tick { |
76
|
1257
|
|
|
1257
|
1
|
3918
|
my $self = _instance(shift); |
77
|
1257
|
100
|
|
|
|
3121
|
croak 'Mojo::IOLoop already running' if $self->is_running; |
78
|
1256
|
|
|
|
|
3541
|
$self->reactor->one_tick; |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
7
|
|
|
7
|
1
|
510
|
sub recurring { shift->_timer(recurring => @_) } |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
sub remove { |
84
|
442
|
|
|
442
|
1
|
1177
|
my ($self, $id) = (_instance(shift), @_); |
85
|
442
|
|
100
|
|
|
1959
|
my $c = $self->{in}{$id} || $self->{out}{$id}; |
86
|
442
|
100
|
100
|
|
|
1835
|
if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully } |
|
283
|
|
|
|
|
1061
|
|
87
|
159
|
|
|
|
|
525
|
$self->_remove($id); |
88
|
|
|
|
|
|
|
} |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
sub reset { |
91
|
4
|
|
100
|
4
|
1
|
23
|
my ($self, $options) = (_instance(shift), shift // {}); |
92
|
|
|
|
|
|
|
|
93
|
4
|
|
|
|
|
19
|
$self->emit('reset')->stop; |
94
|
4
|
100
|
|
|
|
15
|
if ($options->{freeze}) { |
95
|
1
|
|
|
|
|
2
|
state @frozen; |
96
|
1
|
|
|
|
|
8
|
push @frozen, {%$self}; |
97
|
1
|
|
|
|
|
5
|
delete $self->{reactor}; |
98
|
|
|
|
|
|
|
} |
99
|
3
|
|
|
|
|
8
|
else { $self->reactor->reset } |
100
|
|
|
|
|
|
|
|
101
|
4
|
|
|
|
|
25
|
delete @$self{qw(accepting acceptors events in out stop)}; |
102
|
|
|
|
|
|
|
} |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
sub server { |
105
|
147
|
|
|
147
|
1
|
4853
|
my ($self, $cb) = (_instance(shift), pop); |
106
|
|
|
|
|
|
|
|
107
|
147
|
|
|
|
|
1300
|
my $server = Mojo::IOLoop::Server->new; |
108
|
147
|
|
|
|
|
596
|
weaken $self; |
109
|
|
|
|
|
|
|
$server->on( |
110
|
|
|
|
|
|
|
accept => sub { |
111
|
194
|
|
|
194
|
|
1424
|
my $stream = Mojo::IOLoop::Stream->new(pop); |
112
|
194
|
|
|
|
|
754
|
$self->$cb($stream, $self->_stream($stream, $self->_id, 1)); |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
# Enforce connection limit (randomize to improve load balancing) |
115
|
194
|
100
|
|
|
|
839
|
if (my $max = $self->max_accepts) { |
116
|
1
|
|
33
|
|
|
14
|
$self->{accepts} //= $max - int rand $max / 2; |
117
|
1
|
50
|
|
|
|
4
|
$self->stop_gracefully if ($self->{accepts} -= 1) <= 0; |
118
|
|
|
|
|
|
|
} |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
# Stop accepting if connection limit has been reached |
121
|
194
|
100
|
|
|
|
609
|
$self->_not_accepting if $self->_limit; |
122
|
|
|
|
|
|
|
} |
123
|
147
|
|
|
|
|
1253
|
); |
124
|
147
|
|
|
|
|
823
|
$server->listen(@_); |
125
|
|
|
|
|
|
|
|
126
|
146
|
|
|
|
|
630
|
return $self->acceptor($server); |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
|
129
|
1336
|
|
|
1336
|
1
|
17139
|
sub singleton { state $loop = shift->new } |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
sub start { |
132
|
1001
|
|
|
1001
|
1
|
7588
|
my $self = _instance(shift); |
133
|
1001
|
100
|
|
|
|
2649
|
croak 'Mojo::IOLoop already running' if $self->is_running; |
134
|
998
|
|
|
|
|
2831
|
$self->reactor->start; |
135
|
|
|
|
|
|
|
} |
136
|
|
|
|
|
|
|
|
137
|
995
|
|
|
995
|
1
|
2780
|
sub stop { _instance(shift)->reactor->stop } |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
sub stop_gracefully { |
140
|
3
|
|
|
3
|
1
|
20
|
my $self = _instance(shift)->_not_accepting; |
141
|
3
|
100
|
66
|
|
|
18
|
++$self->{stop} and !$self->emit('finish')->_in and $self->stop; |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
sub stream { |
145
|
7124
|
|
|
7124
|
1
|
15912
|
my ($self, $stream) = (_instance(shift), @_); |
146
|
7124
|
100
|
|
|
|
16172
|
return $self->_stream($stream => $self->_id) if ref $stream; |
147
|
7123
|
|
100
|
|
|
28427
|
my $c = $self->{in}{$stream} || $self->{out}{$stream} // {}; |
|
|
|
100
|
|
|
|
|
148
|
7123
|
|
|
|
|
34627
|
return $c->{stream}; |
149
|
|
|
|
|
|
|
} |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
sub subprocess { |
152
|
0
|
|
|
0
|
1
|
0
|
my $subprocess = Mojo::IOLoop::Subprocess->new(ioloop => _instance(shift)); |
153
|
0
|
0
|
|
|
|
0
|
return @_ ? $subprocess->run(@_) : $subprocess; |
154
|
|
|
|
|
|
|
} |
155
|
|
|
|
|
|
|
|
156
|
53
|
|
|
53
|
1
|
391
|
sub timer { shift->_timer(timer => @_) } |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
sub _id { |
159
|
538
|
|
|
538
|
|
951
|
my $self = shift; |
160
|
538
|
|
|
|
|
816
|
my $id; |
161
|
538
|
|
33
|
|
|
842
|
do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id}; |
|
538
|
|
33
|
|
|
1787
|
|
162
|
538
|
|
|
|
|
12996
|
return $id; |
163
|
|
|
|
|
|
|
} |
164
|
|
|
|
|
|
|
|
165
|
458
|
|
100
|
458
|
|
713
|
sub _in { scalar keys %{shift->{in} // {}} } |
|
458
|
|
|
|
|
2330
|
|
166
|
|
|
|
|
|
|
|
167
|
16586
|
100
|
|
16586
|
|
48128
|
sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton } |
168
|
|
|
|
|
|
|
|
169
|
454
|
100
|
|
454
|
|
1687
|
sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections } |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
sub _maybe_accepting { |
172
|
596
|
|
|
596
|
|
969
|
my $self = shift; |
173
|
596
|
100
|
100
|
|
|
2096
|
return if $self->{accepting} || $self->_limit; |
174
|
258
|
|
50
|
|
|
563
|
$_->start for values %{$self->{acceptors} // {}}; |
|
258
|
|
|
|
|
1484
|
|
175
|
258
|
|
|
|
|
1096
|
$self->{accepting} = 1; |
176
|
|
|
|
|
|
|
} |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
sub _not_accepting { |
179
|
263
|
|
|
263
|
|
491
|
my $self = shift; |
180
|
263
|
100
|
|
|
|
1053
|
return $self unless delete $self->{accepting}; |
181
|
200
|
|
50
|
|
|
413
|
$_->stop for values %{$self->{acceptors} // {}}; |
|
200
|
|
|
|
|
1345
|
|
182
|
200
|
|
|
|
|
631
|
return $self; |
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
|
185
|
0
|
|
0
|
0
|
|
0
|
sub _out { scalar keys %{shift->{out} // {}} } |
|
0
|
|
|
|
|
0
|
|
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
sub _remove { |
188
|
500
|
|
|
500
|
|
1059
|
my ($self, $id) = @_; |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
# Timer |
191
|
500
|
50
|
|
|
|
4787
|
return undef unless my $reactor = $self->reactor; |
192
|
500
|
100
|
|
|
|
1839
|
return undef if $reactor->remove($id); |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
# Acceptor |
195
|
475
|
100
|
|
|
|
2360
|
return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id}; |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
# Connection |
198
|
365
|
100
|
100
|
|
|
1863
|
return undef unless delete $self->{in}{$id} || delete $self->{out}{$id}; |
199
|
341
|
100
|
100
|
|
|
1247
|
return $self->stop if $self->{stop} && !$self->_in; |
200
|
339
|
|
|
|
|
1028
|
$self->_maybe_accepting; |
201
|
339
|
|
|
|
|
853
|
warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG; |
202
|
|
|
|
|
|
|
} |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
sub _stream { |
205
|
386
|
|
|
386
|
|
1195
|
my ($self, $stream, $id, $server) = @_; |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
# Connect stream with reactor |
208
|
386
|
100
|
|
|
|
1126
|
$self->{$server ? 'in' : 'out'}{$id}{stream} = $stream->reactor($self->reactor); |
209
|
386
|
|
|
|
|
665
|
warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG; |
210
|
386
|
|
|
|
|
1066
|
weaken $self; |
211
|
386
|
50
|
|
338
|
|
2260
|
$stream->on(close => sub { $self && $self->_remove($id) }); |
|
338
|
|
|
|
|
1744
|
|
212
|
386
|
|
|
|
|
1589
|
$stream->start; |
213
|
|
|
|
|
|
|
|
214
|
386
|
|
|
|
|
1379
|
return $id; |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
sub _timer { |
218
|
60
|
|
|
60
|
|
178
|
my ($self, $method, $after, $cb) = (_instance(shift), @_); |
219
|
60
|
|
|
|
|
217
|
weaken $self; |
220
|
60
|
|
|
24254
|
|
201
|
return $self->reactor->$method($after => sub { $self->$cb }); |
|
24254
|
|
|
|
|
40515
|
|
221
|
|
|
|
|
|
|
} |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
1; |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
=encoding utf8 |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=head1 NAME |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
Mojo::IOLoop - Minimalistic event loop |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
=head1 SYNOPSIS |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
use Mojo::IOLoop; |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
# Listen on port 3000 |
236
|
|
|
|
|
|
|
Mojo::IOLoop->server({port => 3000} => sub ($loop, $stream, $id) { |
237
|
|
|
|
|
|
|
$stream->on(read => sub ($stream, $bytes) { |
238
|
|
|
|
|
|
|
# Process input chunk |
239
|
|
|
|
|
|
|
say $bytes; |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
# Write response |
242
|
|
|
|
|
|
|
$stream->write('HTTP/1.1 200 OK'); |
243
|
|
|
|
|
|
|
}); |
244
|
|
|
|
|
|
|
}); |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
# Connect to port 3000 |
247
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->client({port => 3000} => sub ($loop, $err, $stream) { |
248
|
|
|
|
|
|
|
$stream->on(read => sub ($stream, $bytes) { |
249
|
|
|
|
|
|
|
# Process input |
250
|
|
|
|
|
|
|
say "Input: $bytes"; |
251
|
|
|
|
|
|
|
}); |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
# Write request |
254
|
|
|
|
|
|
|
$stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a"); |
255
|
|
|
|
|
|
|
}); |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
# Add a timer |
258
|
|
|
|
|
|
|
Mojo::IOLoop->timer(5 => sub ($loop) { $loop->remove($id) }); |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
# Start event loop if necessary |
261
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=head1 DESCRIPTION |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
L is a very minimalistic event loop based on L, it has been reduced to the absolute |
266
|
|
|
|
|
|
|
minimal feature set required to build solid and scalable non-blocking clients and servers. |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and |
269
|
|
|
|
|
|
|
need to be tuned for better scalability. The C environment variable should also be used to select the best |
270
|
|
|
|
|
|
|
possible L backend, which usually defaults to the not very scalable C |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
LIBEV_FLAGS=1 # select |
273
|
|
|
|
|
|
|
LIBEV_FLAGS=2 # poll |
274
|
|
|
|
|
|
|
LIBEV_FLAGS=4 # epoll (Linux) |
275
|
|
|
|
|
|
|
LIBEV_FLAGS=8 # kqueue (*BSD, OS X) |
276
|
|
|
|
|
|
|
LIBEV_FLAGS=64 # Linux AIO |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
The event loop will be resilient to time jumps if a monotonic clock is available through L. A TLS |
279
|
|
|
|
|
|
|
certificate and key are also built right in, to make writing test servers as easy as possible. Also note that for |
280
|
|
|
|
|
|
|
convenience the C signal will be set to C when L is loaded. |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the |
283
|
|
|
|
|
|
|
optional modules L (4.32+), L (0.15+), L (0.64+) and L |
284
|
|
|
|
|
|
|
(2.009+) will be used automatically if possible. Individual features can also be disabled with the C, |
285
|
|
|
|
|
|
|
C and C environment variables. |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
See L for more. |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=head1 EVENTS |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
L inherits all events from L and can emit the following new ones. |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
=head2 finish |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
$loop->on(finish => sub ($loop) {...}); |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be |
298
|
|
|
|
|
|
|
closed. |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
=head2 reset |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
$loop->on(reset => sub ($loop) {...}); |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
Emitted when the event loop is reset, this usually happens after the process is forked to clean up resources that |
305
|
|
|
|
|
|
|
cannot be shared. |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
L implements the following attributes. |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
=head2 max_accepts |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
my $max = $loop->max_accepts; |
314
|
|
|
|
|
|
|
$loop = $loop->max_accepts(1000); |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
The maximum number of connections this event loop is allowed to accept, before shutting down gracefully without |
317
|
|
|
|
|
|
|
interrupting existing connections, defaults to C<0>. Setting the value to C<0> will allow this event loop to accept new |
318
|
|
|
|
|
|
|
connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing |
319
|
|
|
|
|
|
|
between multiple server processes, and to make sure that not all of them restart at the same time. |
320
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
=head2 max_connections |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
my $max = $loop->max_connections; |
324
|
|
|
|
|
|
|
$loop = $loop->max_connections(100); |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
The maximum number of accepted connections this event loop is allowed to handle concurrently, before stopping to accept |
327
|
|
|
|
|
|
|
new incoming connections, defaults to C<1000>. |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
=head2 reactor |
330
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
my $reactor = $loop->reactor; |
332
|
|
|
|
|
|
|
$loop = $loop->reactor(Mojo::Reactor->new); |
333
|
|
|
|
|
|
|
|
334
|
|
|
|
|
|
|
Low-level event reactor, usually a L or L object with a default subscriber to |
335
|
|
|
|
|
|
|
the event L. |
336
|
|
|
|
|
|
|
|
337
|
|
|
|
|
|
|
# Watch if handle becomes readable or writable |
338
|
|
|
|
|
|
|
Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) { |
339
|
|
|
|
|
|
|
say $writable ? 'Handle is writable' : 'Handle is readable'; |
340
|
|
|
|
|
|
|
}); |
341
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
# Change to watching only if handle becomes writable |
343
|
|
|
|
|
|
|
Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1); |
344
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
# Remove handle again |
346
|
|
|
|
|
|
|
Mojo::IOLoop->singleton->reactor->remove($handle); |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
=head1 METHODS |
349
|
|
|
|
|
|
|
|
350
|
|
|
|
|
|
|
L inherits all methods from L and implements the following new ones. |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
=head2 acceptor |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
my $server = Mojo::IOLoop->acceptor($id); |
355
|
|
|
|
|
|
|
my $server = $loop->acceptor($id); |
356
|
|
|
|
|
|
|
my $id = $loop->acceptor(Mojo::IOLoop::Server->new); |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
Get L object for id or turn object into an acceptor. |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
=head2 client |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...}); |
363
|
|
|
|
|
|
|
my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...}); |
364
|
|
|
|
|
|
|
my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...}); |
365
|
|
|
|
|
|
|
|
366
|
|
|
|
|
|
|
Open a TCP/IP or UNIX domain socket connection with L and create a stream object (usually |
367
|
|
|
|
|
|
|
L), takes the same arguments as L. |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
=head2 is_running |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
my $bool = Mojo::IOLoop->is_running; |
372
|
|
|
|
|
|
|
my $bool = $loop->is_running; |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
Check if event loop is running. |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
=head2 next_tick |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
my $undef = Mojo::IOLoop->next_tick(sub ($loop) {...}); |
379
|
|
|
|
|
|
|
my $undef = $loop->next_tick(sub ($loop) {...}); |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this |
382
|
|
|
|
|
|
|
method, always returns C. |
383
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
# Perform operation on next reactor tick |
385
|
|
|
|
|
|
|
Mojo::IOLoop->next_tick(sub ($loop) {...}); |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
=head2 one_tick |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
Mojo::IOLoop->one_tick; |
390
|
|
|
|
|
|
|
$loop->one_tick; |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
Run event loop until an event occurs. |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
# Don't block longer than 0.5 seconds |
395
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->timer(0.5 => sub ($loop) {}); |
396
|
|
|
|
|
|
|
Mojo::IOLoop->one_tick; |
397
|
|
|
|
|
|
|
Mojo::IOLoop->remove($id); |
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
=head2 recurring |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->recurring(3 => sub ($loop) {...}); |
402
|
|
|
|
|
|
|
my $id = $loop->recurring(0 => sub ($loop) {...}); |
403
|
|
|
|
|
|
|
my $id = $loop->recurring(0.25 => sub ($loop) {...}); |
404
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds. |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
# Perform operation every 5 seconds |
408
|
|
|
|
|
|
|
Mojo::IOLoop->recurring(5 => sub ($loop) {...}); |
409
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
=head2 remove |
411
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
Mojo::IOLoop->remove($id); |
413
|
|
|
|
|
|
|
$loop->remove($id); |
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their |
416
|
|
|
|
|
|
|
write buffers. |
417
|
|
|
|
|
|
|
|
418
|
|
|
|
|
|
|
=head2 reset |
419
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
Mojo::IOLoop->reset; |
421
|
|
|
|
|
|
|
$loop->reset; |
422
|
|
|
|
|
|
|
$loop->reset({freeze => 1}); |
423
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
Remove everything and stop the event loop. |
425
|
|
|
|
|
|
|
|
426
|
|
|
|
|
|
|
These options are currently available: |
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
=over 2 |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
=item freeze |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
freeze => 1 |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
Freeze the current state of the event loop in time before resetting it. This will prevent active connections from |
435
|
|
|
|
|
|
|
getting closed immediately, which can help with many unintended side effects when processes are forked. Note that this |
436
|
|
|
|
|
|
|
option is B and might change without warning! |
437
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
=back |
439
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
=head2 server |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->server(port => 3000, sub {...}); |
443
|
|
|
|
|
|
|
my $id = $loop->server(port => 3000, sub {...}); |
444
|
|
|
|
|
|
|
my $id = $loop->server({port => 3000} => sub {...}); |
445
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
Accept TCP/IP and UNIX domain socket connections with L and create stream objects (usually |
447
|
|
|
|
|
|
|
L, takes the same arguments as L. |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
# Listen on random port |
450
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub ($loop, $stream, $id) {...}); |
451
|
|
|
|
|
|
|
my $port = Mojo::IOLoop->acceptor($id)->port; |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=head2 singleton |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
my $loop = Mojo::IOLoop->singleton; |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
The global L singleton, used to access a single shared event loop object from everywhere inside the |
458
|
|
|
|
|
|
|
process. |
459
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
# Many methods also allow you to take shortcuts |
461
|
|
|
|
|
|
|
Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop }); |
462
|
|
|
|
|
|
|
Mojo::IOLoop->start; |
463
|
|
|
|
|
|
|
|
464
|
|
|
|
|
|
|
# Restart active timer |
465
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' }); |
466
|
|
|
|
|
|
|
Mojo::IOLoop->singleton->reactor->again($id); |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
# Turn file descriptor into handle and watch if it becomes readable |
469
|
|
|
|
|
|
|
my $handle = IO::Handle->new_from_fd($fd, 'r'); |
470
|
|
|
|
|
|
|
Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) { |
471
|
|
|
|
|
|
|
say $writable ? 'Handle is writable' : 'Handle is readable'; |
472
|
|
|
|
|
|
|
})->watch($handle, 1, 0); |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
=head2 start |
475
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
Mojo::IOLoop->start; |
477
|
|
|
|
|
|
|
$loop->start; |
478
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
Start the event loop, this will block until L"stop"> is called. Note that some reactors stop automatically if there |
480
|
|
|
|
|
|
|
are no events being watched anymore. |
481
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
# Start event loop only if it is not running already |
483
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
=head2 stop |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
Mojo::IOLoop->stop; |
488
|
|
|
|
|
|
|
$loop->stop; |
489
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running |
491
|
|
|
|
|
|
|
L"start"> again. |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
=head2 stop_gracefully |
494
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
Mojo::IOLoop->stop_gracefully; |
496
|
|
|
|
|
|
|
$loop->stop_gracefully; |
497
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
Stop accepting new connections and wait for already accepted connections to be closed, before stopping the event loop. |
499
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
=head2 stream |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
my $stream = Mojo::IOLoop->stream($id); |
503
|
|
|
|
|
|
|
my $stream = $loop->stream($id); |
504
|
|
|
|
|
|
|
my $id = $loop->stream(Mojo::IOLoop::Stream->new); |
505
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
Get L object for id or turn object into a connection. |
507
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
# Increase inactivity timeout for connection to 300 seconds |
509
|
|
|
|
|
|
|
Mojo::IOLoop->stream($id)->timeout(300); |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
=head2 subprocess |
512
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
my $subprocess = Mojo::IOLoop->subprocess; |
514
|
|
|
|
|
|
|
my $subprocess = $loop->subprocess; |
515
|
|
|
|
|
|
|
my $subprocess = $loop->subprocess(sub ($subprocess) {...}, sub ($subprocess, $err, @results) {...}); |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
Build L object to perform computationally expensive operations in subprocesses, without |
518
|
|
|
|
|
|
|
blocking the event loop. Callbacks will be passed along to L. |
519
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
# Operation that would block the event loop for 5 seconds |
521
|
|
|
|
|
|
|
Mojo::IOLoop->subprocess->run_p(sub { |
522
|
|
|
|
|
|
|
sleep 5; |
523
|
|
|
|
|
|
|
return '♥', 'Mojolicious'; |
524
|
|
|
|
|
|
|
})->then(sub (@results) { |
525
|
|
|
|
|
|
|
say "I $results[0] $results[1]!"; |
526
|
|
|
|
|
|
|
})->catch(sub ($err) { |
527
|
|
|
|
|
|
|
say "Subprocess error: $err"; |
528
|
|
|
|
|
|
|
}); |
529
|
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
=head2 timer |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
my $id = Mojo::IOLoop->timer(3 => sub ($loop) {...}); |
533
|
|
|
|
|
|
|
my $id = $loop->timer(0 => sub ($loop) {...}); |
534
|
|
|
|
|
|
|
my $id = $loop->timer(0.25 => sub ($loop) {...}); |
535
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
Create a new timer, invoking the callback after a given amount of time in seconds. |
537
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
# Perform operation in 5 seconds |
539
|
|
|
|
|
|
|
Mojo::IOLoop->timer(5 => sub ($loop) {...}); |
540
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
=head1 DEBUGGING |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
You can set the C environment variable to get some advanced diagnostics information printed to |
544
|
|
|
|
|
|
|
C. |
545
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
MOJO_IOLOOP_DEBUG=1 |
547
|
|
|
|
|
|
|
|
548
|
|
|
|
|
|
|
=head1 SEE ALSO |
549
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
L, L, L. |
551
|
|
|
|
|
|
|
|
552
|
|
|
|
|
|
|
=cut |