File Coverage

blib/lib/Mojo/IOLoop/Stream.pm
Criterion Covered Total %
statement 81 83 97.5
branch 50 62 80.6
condition 8 20 40.0
subroutine 26 26 100.0
pod 15 15 100.0
total 180 206 87.3


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::Stream;
2 67     67   486 use Mojo::Base 'Mojo::EventEmitter';
  67         143  
  67         496  
3              
4 67     67   502 use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
  67         145  
  67         4755  
5 67     67   455 use Mojo::IOLoop;
  67         174  
  67         474  
6 67     67   432 use Mojo::Util;
  67         164  
  67         3935  
7 67     67   400 use Scalar::Util qw(weaken);
  67         156  
  67         148090  
8              
9             has high_water_mark => 1048576;
10             has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1;
11              
12 405 50   405   8835 sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
13              
14 2 100   2 1 34 sub bytes_read { shift->{read} || 0 }
15              
16 17   100 17 1 127 sub bytes_waiting { length(shift->{buffer} // '') }
17              
18 2 100   2 1 16 sub bytes_written { shift->{written} || 0 }
19              
20 14 100   14 1 103 sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark }
21              
22             sub close {
23 804     804 1 1633 my $self = shift;
24 804 100       2759 return unless my $reactor = $self->reactor;
25 801 100       2631 return unless my $handle = delete $self->timeout(0)->{handle};
26 402         1490 $reactor->remove($handle);
27 402         2027 $self->emit('close');
28             }
29              
30 325 100   325 1 1388 sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close }
31              
32 2030     2030 1 6121 sub handle { shift->{handle} }
33              
34             sub is_readable {
35 796     796 1 2996 my $self = shift;
36 796         3558 $self->_again;
37 796   33     11903 return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle});
38             }
39              
40             sub is_writing {
41 6749     6749 1 12207 my $self = shift;
42 6749 100       19007 return undef unless $self->{handle};
43 6702   100     34311 return !!length($self->{buffer}) || $self->has_subscribers('drain');
44             }
45              
46 471     471 1 24428 sub new { shift->SUPER::new(handle => shift, timeout => 15) }
47              
48             sub start {
49 471     471 1 910 my $self = shift;
50              
51             # Resume
52 471 50       1489 return unless $self->{handle};
53 471         1230 my $reactor = $self->reactor;
54 471 100       1407 return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused};
55              
56 470         990 weaken $self;
57 470 100   10808   1912 my $cb = sub { pop() ? $self->_write : $self->_read };
  10808         47034  
58 470         1740 $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
59             }
60              
61             sub steal_handle {
62 1     1 1 9 my $self = shift;
63 1         4 $self->reactor->remove($self->{handle});
64 1         7 return delete $self->{handle};
65             }
66              
67 1 50 33 1 1 21 sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ }
68              
69             sub timeout {
70 4425     4425 1 12430 my ($self, $timeout) = @_;
71              
72 4425 100       11291 return $self->{timeout} unless defined $timeout;
73 4416         11116 $self->{timeout} = $timeout;
74              
75 4416         11833 my $reactor = $self->reactor;
76 4416 100       13208 if ($self->{timer}) {
    100          
77 3538 100       10009 if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) }
  398         1824  
78 3140         13162 else { $reactor->again($self->{timer}, $self->{timeout}) }
79             }
80             elsif ($self->{timeout}) {
81 471         1081 weaken $self;
82             $self->{timer}
83 471 50 33 6   2924 = $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close });
  6         172  
84             }
85              
86 4416         95279 return $self;
87             }
88              
89             sub write {
90 6606     6606 1 25193 my ($self, $chunk, $cb) = @_;
91              
92             # IO::Socket::SSL will corrupt data with the wrong internal representation
93 6606         22113 utf8::downgrade $chunk;
94 6606         19769 $self->{buffer} .= $chunk;
95 6606 100       15749 if ($cb) { $self->once(drain => $cb) }
  6325 100       21643  
96 208         1054 elsif (!length $self->{buffer}) { return $self }
97 6398 50       31968 $self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle};
98              
99 6398         32519 return $self;
100             }
101              
102 10370 100   10370   56645 sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
103              
104             sub _read {
105 4387     4387   11139 my $self = shift;
106              
107 4387 50       24207 if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) {
108 4387         158223 $self->{read} += $read;
109 4387 100       23365 return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again;
110             }
111              
112             # Retry
113 0 0 0     0 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
114              
115             # Closed (maybe real error)
116 0 0       0 $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
117             }
118              
119             sub _write {
120 6421     6421   11376 my $self = shift;
121              
122             # Handle errors only when reading (to avoid timing problems)
123 6421         16757 my $handle = $self->{handle};
124 6421 100       18855 if (length $self->{buffer}) {
125 5296 50       28569 return undef unless defined(my $written = $handle->syswrite($self->{buffer}));
126 5296         286646 $self->{written} += $written;
127 5296         41800 $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
128             }
129              
130             # Clear the buffer to free the underlying SV* memory
131 6421 50       80832 undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer};
132 6421 100       22281 return undef if $self->is_writing;
133 2509 100       8320 return $self->close if $self->{graceful};
134 2451 100       13909 $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
135             }
136              
137             1;
138              
139             =encoding utf8
140              
141             =head1 NAME
142              
143             Mojo::IOLoop::Stream - Non-blocking I/O stream
144              
145             =head1 SYNOPSIS
146              
147             use Mojo::IOLoop::Stream;
148              
149             # Create stream
150             my $stream = Mojo::IOLoop::Stream->new($handle);
151             $stream->on(read => sub ($stream, $bytes) {...});
152             $stream->on(close => sub ($stream) {...});
153             $stream->on(error => sub ($stream, $err) {...});
154              
155             # Start and stop watching for new data
156             $stream->start;
157             $stream->stop;
158              
159             # Start reactor if necessary
160             $stream->reactor->start unless $stream->reactor->is_running;
161              
162             =head1 DESCRIPTION
163              
164             L is a container for I/O streams used by L.
165              
166             =head1 EVENTS
167              
168             L inherits all events from L and can emit the following new ones.
169              
170             =head2 close
171              
172             $stream->on(close => sub ($stream) {...});
173              
174             Emitted if the stream gets closed.
175              
176             =head2 drain
177              
178             $stream->on(drain => sub ($stream) {...});
179              
180             Emitted once all data has been written.
181              
182             =head2 error
183              
184             $stream->on(error => sub ($stream, $err) {...});
185              
186             Emitted if an error occurs on the stream, fatal if unhandled.
187              
188             =head2 read
189              
190             $stream->on(read => sub ($stream, $bytes) {...});
191              
192             Emitted if new data arrives on the stream.
193              
194             =head2 timeout
195              
196             $stream->on(timeout => sub ($stream) {...});
197              
198             Emitted if the stream has been inactive for too long and will get closed automatically.
199              
200             =head2 write
201              
202             $stream->on(write => sub ($stream, $bytes) {...});
203              
204             Emitted if new data has been written to the stream.
205              
206             =head1 ATTRIBUTES
207              
208             L implements the following attributes.
209              
210             =head2 high_water_mark
211              
212             my $size = $msg->high_water_mark;
213             $msg = $msg->high_water_mark(1024);
214              
215             Maximum size of L buffer in bytes before L returns false, defaults to C<1048576> (1MiB).
216              
217             =head2 reactor
218              
219             my $reactor = $stream->reactor;
220             $stream = $stream->reactor(Mojo::Reactor::Poll->new);
221              
222             Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that
223             this attribute is weakened.
224              
225             =head1 METHODS
226              
227             L inherits all methods from L and implements the following new ones.
228              
229             =head2 bytes_read
230              
231             my $num = $stream->bytes_read;
232              
233             Number of bytes received.
234              
235             =head2 bytes_waiting
236              
237             my $num = $stream->bytes_waiting;
238              
239             Number of bytes that have been enqueued with L and are waiting to be written.
240              
241             =head2 bytes_written
242              
243             my $num = $stream->bytes_written;
244              
245             Number of bytes written.
246              
247             =head2 can_write
248              
249             my $bool = $stream->can_write;
250              
251             Returns true if calling L is safe.
252              
253             =head2 close
254              
255             $stream->close;
256              
257             Close stream immediately.
258              
259             =head2 close_gracefully
260              
261             $stream->close_gracefully;
262              
263             Close stream gracefully.
264              
265             =head2 handle
266              
267             my $handle = $stream->handle;
268              
269             Get handle for stream, usually an L or L object.
270              
271             =head2 is_readable
272              
273             my $bool = $stream->is_readable;
274              
275             Quick non-blocking check if stream is readable, useful for identifying tainted sockets.
276              
277             =head2 is_writing
278              
279             my $bool = $stream->is_writing;
280              
281             Check if stream is writing.
282              
283             =head2 new
284              
285             my $stream = Mojo::IOLoop::Stream->new($handle);
286              
287             Construct a new L object.
288              
289             =head2 start
290              
291             $stream->start;
292              
293             Start or resume watching for new data on the stream.
294              
295             =head2 steal_handle
296              
297             my $handle = $stream->steal_handle;
298              
299             Steal L and prevent it from getting closed automatically.
300              
301             =head2 stop
302              
303             $stream->stop;
304              
305             Stop watching for new data on the stream.
306              
307             =head2 timeout
308              
309             my $timeout = $stream->timeout;
310             $stream = $stream->timeout(45);
311              
312             Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>.
313             Setting the value to C<0> will allow this stream to be inactive indefinitely.
314              
315             =head2 write
316              
317             $stream = $stream->write($bytes);
318             $stream = $stream->write($bytes => sub {...});
319              
320             Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all
321             data has been written.
322              
323             =head1 SEE ALSO
324              
325             L, L, L.
326              
327             =cut