line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Mojo::IOLoop::Stream; |
2
|
63
|
|
|
63
|
|
505
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
63
|
|
|
|
|
178
|
|
|
63
|
|
|
|
|
453
|
|
3
|
|
|
|
|
|
|
|
4
|
63
|
|
|
63
|
|
457
|
use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK); |
|
63
|
|
|
|
|
206
|
|
|
63
|
|
|
|
|
3251
|
|
5
|
63
|
|
|
63
|
|
387
|
use Mojo::IOLoop; |
|
63
|
|
|
|
|
212
|
|
|
63
|
|
|
|
|
384
|
|
6
|
63
|
|
|
63
|
|
378
|
use Mojo::Util; |
|
63
|
|
|
|
|
167
|
|
|
63
|
|
|
|
|
2723
|
|
7
|
63
|
|
|
63
|
|
477
|
use Scalar::Util qw(weaken); |
|
63
|
|
|
|
|
216
|
|
|
63
|
|
|
|
|
110380
|
|
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
has high_water_mark => 1048576; |
10
|
|
|
|
|
|
|
has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1; |
11
|
|
|
|
|
|
|
|
12
|
347
|
50
|
|
347
|
|
5653
|
sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' } |
13
|
|
|
|
|
|
|
|
14
|
2
|
100
|
|
2
|
1
|
31
|
sub bytes_read { shift->{read} || 0 } |
15
|
|
|
|
|
|
|
|
16
|
17
|
|
100
|
17
|
1
|
137
|
sub bytes_waiting { length(shift->{buffer} // '') } |
17
|
|
|
|
|
|
|
|
18
|
2
|
100
|
|
2
|
1
|
13
|
sub bytes_written { shift->{written} || 0 } |
19
|
|
|
|
|
|
|
|
20
|
14
|
100
|
|
14
|
1
|
100
|
sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark } |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
sub close { |
23
|
687
|
|
|
687
|
1
|
2297
|
my $self = shift; |
24
|
687
|
100
|
|
|
|
2076
|
return unless my $reactor = $self->reactor; |
25
|
681
|
100
|
|
|
|
2174
|
return unless my $handle = delete $self->timeout(0)->{handle}; |
26
|
341
|
|
|
|
|
1231
|
$reactor->remove($handle); |
27
|
341
|
|
|
|
|
1659
|
$self->emit('close'); |
28
|
|
|
|
|
|
|
} |
29
|
|
|
|
|
|
|
|
30
|
284
|
100
|
|
284
|
1
|
790
|
sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close } |
31
|
|
|
|
|
|
|
|
32
|
1874
|
|
|
1874
|
1
|
4583
|
sub handle { shift->{handle} } |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
sub is_readable { |
35
|
760
|
|
|
760
|
1
|
1436
|
my $self = shift; |
36
|
760
|
|
|
|
|
2711
|
$self->_again; |
37
|
760
|
|
33
|
|
|
8503
|
return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle}); |
38
|
|
|
|
|
|
|
} |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
sub is_writing { |
41
|
7758
|
|
|
7758
|
1
|
12619
|
my $self = shift; |
42
|
7758
|
100
|
|
|
|
17950
|
return undef unless $self->{handle}; |
43
|
7728
|
|
100
|
|
|
30665
|
return !!length($self->{buffer}) || $self->has_subscribers('drain'); |
44
|
|
|
|
|
|
|
} |
45
|
|
|
|
|
|
|
|
46
|
388
|
|
|
388
|
1
|
10089
|
sub new { shift->SUPER::new(handle => shift, timeout => 15) } |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub start { |
49
|
388
|
|
|
388
|
1
|
798
|
my $self = shift; |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
# Resume |
52
|
388
|
50
|
|
|
|
1243
|
return unless $self->{handle}; |
53
|
388
|
|
|
|
|
1073
|
my $reactor = $self->reactor; |
54
|
388
|
100
|
|
|
|
1196
|
return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused}; |
55
|
|
|
|
|
|
|
|
56
|
387
|
|
|
|
|
1356
|
weaken $self; |
57
|
387
|
100
|
|
12337
|
|
1518
|
my $cb = sub { pop() ? $self->_write : $self->_read }; |
|
12337
|
|
|
|
|
39854
|
|
58
|
387
|
|
|
|
|
1293
|
$reactor->io($self->timeout($self->{timeout})->{handle} => $cb); |
59
|
|
|
|
|
|
|
} |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
sub steal_handle { |
62
|
1
|
|
|
1
|
1
|
19
|
my $self = shift; |
63
|
1
|
|
|
|
|
4
|
$self->reactor->remove($self->{handle}); |
64
|
1
|
|
|
|
|
5
|
return delete $self->{handle}; |
65
|
|
|
|
|
|
|
} |
66
|
|
|
|
|
|
|
|
67
|
1
|
50
|
33
|
1
|
1
|
43
|
sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ } |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sub timeout { |
70
|
3977
|
|
|
3977
|
1
|
8262
|
my ($self, $timeout) = @_; |
71
|
|
|
|
|
|
|
|
72
|
3977
|
100
|
|
|
|
9098
|
return $self->{timeout} unless defined $timeout; |
73
|
3968
|
|
|
|
|
6927
|
$self->{timeout} = $timeout; |
74
|
|
|
|
|
|
|
|
75
|
3968
|
|
|
|
|
8878
|
my $reactor = $self->reactor; |
76
|
3968
|
100
|
|
|
|
10241
|
if ($self->{timer}) { |
|
|
100
|
|
|
|
|
|
77
|
3232
|
100
|
|
|
|
7188
|
if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) } |
|
337
|
|
|
|
|
1454
|
|
78
|
2895
|
|
|
|
|
9245
|
else { $reactor->again($self->{timer}, $self->{timeout}) } |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
elsif ($self->{timeout}) { |
81
|
388
|
|
|
|
|
1208
|
weaken $self; |
82
|
|
|
|
|
|
|
$self->{timer} |
83
|
388
|
50
|
33
|
6
|
|
1924
|
= $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close }); |
|
6
|
|
|
|
|
251
|
|
84
|
|
|
|
|
|
|
} |
85
|
|
|
|
|
|
|
|
86
|
3968
|
|
|
|
|
62206
|
return $self; |
87
|
|
|
|
|
|
|
} |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub write { |
90
|
7672
|
|
|
7672
|
1
|
24460
|
my ($self, $chunk, $cb) = @_; |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
# IO::Socket::SSL will corrupt data with the wrong internal representation |
93
|
7672
|
|
|
|
|
19891
|
utf8::downgrade $chunk; |
94
|
7672
|
|
|
|
|
17784
|
$self->{buffer} .= $chunk; |
95
|
7672
|
100
|
|
|
|
16351
|
if ($cb) { $self->once(drain => $cb) } |
|
7415
|
100
|
|
|
|
19003
|
|
96
|
183
|
|
|
|
|
798
|
elsif (!length $self->{buffer}) { return $self } |
97
|
7489
|
50
|
|
|
|
25016
|
$self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle}; |
98
|
|
|
|
|
|
|
|
99
|
7489
|
|
|
|
|
23909
|
return $self; |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
|
102
|
11973
|
100
|
|
11973
|
|
45384
|
sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} } |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
sub _read { |
105
|
4866
|
|
|
4866
|
|
7788
|
my $self = shift; |
106
|
|
|
|
|
|
|
|
107
|
4866
|
50
|
|
|
|
18654
|
if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) { |
108
|
4866
|
|
|
|
|
120195
|
$self->{read} += $read; |
109
|
4866
|
100
|
|
|
|
20162
|
return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again; |
110
|
|
|
|
|
|
|
} |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
# Retry |
113
|
0
|
0
|
0
|
|
|
0
|
return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; |
|
|
|
0
|
|
|
|
|
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
# Closed (maybe real error) |
116
|
0
|
0
|
|
|
|
0
|
$! == ECONNRESET ? $self->close : $self->emit(error => $!)->close; |
117
|
|
|
|
|
|
|
} |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
sub _write { |
120
|
7471
|
|
|
7471
|
|
11069
|
my $self = shift; |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# Handle errors only when reading (to avoid timing problems) |
123
|
7471
|
|
|
|
|
13109
|
my $handle = $self->{handle}; |
124
|
7471
|
100
|
|
|
|
16671
|
if (length $self->{buffer}) { |
125
|
6438
|
50
|
|
|
|
22811
|
return undef unless defined(my $written = $handle->syswrite($self->{buffer})); |
126
|
6438
|
|
|
|
|
216095
|
$self->{written} += $written; |
127
|
6438
|
|
|
|
|
37943
|
$self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again; |
128
|
|
|
|
|
|
|
} |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
# Clear the buffer to free the underlying SV* memory |
131
|
7471
|
50
|
|
|
|
58216
|
undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer}; |
132
|
7471
|
100
|
|
|
|
17187
|
return undef if $self->is_writing; |
133
|
2302
|
100
|
|
|
|
6645
|
return $self->close if $self->{graceful}; |
134
|
2245
|
100
|
|
|
|
9207
|
$self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle}; |
135
|
|
|
|
|
|
|
} |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
1; |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
=encoding utf8 |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=head1 NAME |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
Mojo::IOLoop::Stream - Non-blocking I/O stream |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
=head1 SYNOPSIS |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
use Mojo::IOLoop::Stream; |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
# Create stream |
150
|
|
|
|
|
|
|
my $stream = Mojo::IOLoop::Stream->new($handle); |
151
|
|
|
|
|
|
|
$stream->on(read => sub ($stream, $bytes) {...}); |
152
|
|
|
|
|
|
|
$stream->on(close => sub ($stream) {...}); |
153
|
|
|
|
|
|
|
$stream->on(error => sub ($stream, $err) {...}); |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
# Start and stop watching for new data |
156
|
|
|
|
|
|
|
$stream->start; |
157
|
|
|
|
|
|
|
$stream->stop; |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
# Start reactor if necessary |
160
|
|
|
|
|
|
|
$stream->reactor->start unless $stream->reactor->is_running; |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
=head1 DESCRIPTION |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
L is a container for I/O streams used by L. |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
=head1 EVENTS |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
L inherits all events from L and can emit the following new ones. |
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
=head2 close |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
$stream->on(close => sub ($stream) {...}); |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
Emitted if the stream gets closed. |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
=head2 drain |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
$stream->on(drain => sub ($stream) {...}); |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
Emitted once all data has been written. |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
=head2 error |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
$stream->on(error => sub ($stream, $err) {...}); |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
Emitted if an error occurs on the stream, fatal if unhandled. |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=head2 read |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
$stream->on(read => sub ($stream, $bytes) {...}); |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Emitted if new data arrives on the stream. |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=head2 timeout |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
$stream->on(timeout => sub ($stream) {...}); |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
Emitted if the stream has been inactive for too long and will get closed automatically. |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=head2 write |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
$stream->on(write => sub ($stream, $bytes) {...}); |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
Emitted if new data has been written to the stream. |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
L implements the following attributes. |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
=head2 high_water_mark |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
my $size = $msg->high_water_mark; |
213
|
|
|
|
|
|
|
$msg = $msg->high_water_mark(1024); |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
Maximum size of L"write"> buffer in bytes before L"can_write"> returns false, defaults to C<1048576> (1MiB). |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
=head2 reactor |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
my $reactor = $stream->reactor; |
220
|
|
|
|
|
|
|
$stream = $stream->reactor(Mojo::Reactor::Poll->new); |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that |
223
|
|
|
|
|
|
|
this attribute is weakened. |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
=head1 METHODS |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
L inherits all methods from L and implements the following new ones. |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
=head2 bytes_read |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
my $num = $stream->bytes_read; |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
Number of bytes received. |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=head2 bytes_waiting |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
my $num = $stream->bytes_waiting; |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
Number of bytes that have been enqueued with L"write"> and are waiting to be written. |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=head2 bytes_written |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
my $num = $stream->bytes_written; |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
Number of bytes written. |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
=head2 can_write |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
my $bool = $stream->can_write; |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
Returns true if calling L"write"> is safe. |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=head2 close |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
$stream->close; |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
Close stream immediately. |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=head2 close_gracefully |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
$stream->close_gracefully; |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
Close stream gracefully. |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
=head2 handle |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
my $handle = $stream->handle; |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
Get handle for stream, usually an L or L object. |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=head2 is_readable |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
my $bool = $stream->is_readable; |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
Quick non-blocking check if stream is readable, useful for identifying tainted sockets. |
276
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
=head2 is_writing |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
my $bool = $stream->is_writing; |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
Check if stream is writing. |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
=head2 new |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
my $stream = Mojo::IOLoop::Stream->new($handle); |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
Construct a new L object. |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=head2 start |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
$stream->start; |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
Start or resume watching for new data on the stream. |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
=head2 steal_handle |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
my $handle = $stream->steal_handle; |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
Steal L"handle"> and prevent it from getting closed automatically. |
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
=head2 stop |
302
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
$stream->stop; |
304
|
|
|
|
|
|
|
|
305
|
|
|
|
|
|
|
Stop watching for new data on the stream. |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
=head2 timeout |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
my $timeout = $stream->timeout; |
310
|
|
|
|
|
|
|
$stream = $stream->timeout(45); |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>. |
313
|
|
|
|
|
|
|
Setting the value to C<0> will allow this stream to be inactive indefinitely. |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head2 write |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
$stream = $stream->write($bytes); |
318
|
|
|
|
|
|
|
$stream = $stream->write($bytes => sub {...}); |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all |
321
|
|
|
|
|
|
|
data has been written. |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
=head1 SEE ALSO |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
L, L, L. |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
=cut |