line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# You may distribute under the terms of either the GNU General Public License |
2
|
|
|
|
|
|
|
# or the Artistic License (the same terms as Perl itself) |
3
|
|
|
|
|
|
|
# |
4
|
|
|
|
|
|
|
# (C) Paul Evans, 2006-2020 -- leonerd@leonerd.org.uk |
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
package IO::Async::Stream; |
7
|
|
|
|
|
|
|
|
8
|
54
|
|
|
54
|
|
7253
|
use strict; |
|
54
|
|
|
|
|
137
|
|
|
54
|
|
|
|
|
1971
|
|
9
|
54
|
|
|
54
|
|
294
|
use warnings; |
|
54
|
|
|
|
|
116
|
|
|
54
|
|
|
|
|
2943
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our $VERSION = '0.79'; |
12
|
|
|
|
|
|
|
|
13
|
54
|
|
|
54
|
|
354
|
use base qw( IO::Async::Handle ); |
|
54
|
|
|
|
|
123
|
|
|
54
|
|
|
|
|
31884
|
|
14
|
|
|
|
|
|
|
|
15
|
54
|
|
|
54
|
|
3329
|
use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE ); |
|
54
|
|
|
|
|
7263
|
|
|
54
|
|
|
|
|
4522
|
|
16
|
|
|
|
|
|
|
|
17
|
54
|
|
|
54
|
|
370
|
use Carp; |
|
54
|
|
|
|
|
136
|
|
|
54
|
|
|
|
|
3437
|
|
18
|
|
|
|
|
|
|
|
19
|
54
|
|
|
54
|
|
37871
|
use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL ); |
|
54
|
|
|
|
|
607177
|
|
|
54
|
|
|
|
|
4862
|
|
20
|
54
|
|
|
54
|
|
537
|
use Scalar::Util qw( blessed ); |
|
54
|
|
|
|
|
139
|
|
|
54
|
|
|
|
|
2652
|
|
21
|
|
|
|
|
|
|
|
22
|
54
|
|
|
54
|
|
342
|
use IO::Async::Debug; |
|
54
|
|
|
|
|
164
|
|
|
54
|
|
|
|
|
1403
|
|
23
|
54
|
|
|
54
|
|
4252
|
use IO::Async::Metrics '$METRICS'; |
|
54
|
|
|
|
|
111
|
|
|
54
|
|
|
|
|
429
|
|
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
# Tuneable from outside |
26
|
|
|
|
|
|
|
# Not yet documented |
27
|
|
|
|
|
|
|
our $READLEN = 8192; |
28
|
|
|
|
|
|
|
our $WRITELEN = 8192; |
29
|
|
|
|
|
|
|
|
30
|
54
|
|
|
54
|
|
24315
|
use Struct::Dumb; |
|
54
|
|
|
|
|
96557
|
|
|
54
|
|
|
|
|
308
|
|
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
# Element of the writequeue |
33
|
|
|
|
|
|
|
struct Writer => [qw( data writelen on_write on_flush on_error watching )]; |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
# Element of the readqueue |
36
|
|
|
|
|
|
|
struct Reader => [qw( on_read future )]; |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# Bitfields in the want flags |
39
|
54
|
|
|
54
|
|
5357
|
use constant WANT_READ_FOR_READ => 0x01; |
|
54
|
|
|
|
|
129
|
|
|
54
|
|
|
|
|
3199
|
|
40
|
54
|
|
|
54
|
|
336
|
use constant WANT_READ_FOR_WRITE => 0x02; |
|
54
|
|
|
|
|
119
|
|
|
54
|
|
|
|
|
2548
|
|
41
|
54
|
|
|
54
|
|
336
|
use constant WANT_WRITE_FOR_READ => 0x04; |
|
54
|
|
|
|
|
131
|
|
|
54
|
|
|
|
|
2578
|
|
42
|
54
|
|
|
54
|
|
360
|
use constant WANT_WRITE_FOR_WRITE => 0x08; |
|
54
|
|
|
|
|
113
|
|
|
54
|
|
|
|
|
3049
|
|
43
|
54
|
|
|
54
|
|
360
|
use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE; |
|
54
|
|
|
|
|
124
|
|
|
54
|
|
|
|
|
3150
|
|
44
|
54
|
|
|
54
|
|
338
|
use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE; |
|
54
|
|
|
|
|
101
|
|
|
54
|
|
|
|
|
232259
|
|
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
=head1 NAME |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
C - event callbacks and write bufering for a stream |
49
|
|
|
|
|
|
|
filehandle |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
=head1 SYNOPSIS |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
use IO::Async::Stream; |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
use IO::Async::Loop; |
56
|
|
|
|
|
|
|
my $loop = IO::Async::Loop->new; |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
my $stream = IO::Async::Stream->new( |
59
|
|
|
|
|
|
|
read_handle => \*STDIN, |
60
|
|
|
|
|
|
|
write_handle => \*STDOUT, |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
on_read => sub { |
63
|
|
|
|
|
|
|
my ( $self, $buffref, $eof ) = @_; |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
while( $$buffref =~ s/^(.*\n)// ) { |
66
|
|
|
|
|
|
|
print "Received a line $1"; |
67
|
|
|
|
|
|
|
} |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
if( $eof ) { |
70
|
|
|
|
|
|
|
print "EOF; last partial line is $$buffref\n"; |
71
|
|
|
|
|
|
|
} |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
return 0; |
74
|
|
|
|
|
|
|
} |
75
|
|
|
|
|
|
|
); |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
$loop->add( $stream ); |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
$stream->write( "An initial line here\n" ); |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
=head1 DESCRIPTION |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
This subclass of L contains a filehandle that represents |
84
|
|
|
|
|
|
|
a byte-stream. It provides buffering for both incoming and outgoing data. It |
85
|
|
|
|
|
|
|
invokes the C handler when new data is read from the filehandle. Data |
86
|
|
|
|
|
|
|
may be written to the filehandle by calling the C method. |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
This class is suitable for any kind of filehandle that provides a |
89
|
|
|
|
|
|
|
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or |
90
|
|
|
|
|
|
|
C socket (such as TCP or a byte-oriented UNIX local socket). For |
91
|
|
|
|
|
|
|
datagram or raw message-based sockets (such as UDP) see instead |
92
|
|
|
|
|
|
|
L. |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=cut |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
=head1 EVENTS |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
The following events are invoked, either using subclass methods or CODE |
99
|
|
|
|
|
|
|
references in parameters: |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
=head2 $ret = on_read \$buffer, $eof |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
Invoked when more data is available in the internal receiving buffer. |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
The first argument is a reference to a plain perl string. The code should |
106
|
|
|
|
|
|
|
inspect and remove any data it likes, but is not required to remove all, or |
107
|
|
|
|
|
|
|
indeed any of the data. Any data remaining in the buffer will be preserved for |
108
|
|
|
|
|
|
|
the next call, the next time more data is received from the handle. |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
In this way, it is easy to implement code that reads records of some form when |
111
|
|
|
|
|
|
|
completed, but ignores partially-received records, until all the data is |
112
|
|
|
|
|
|
|
present. If the handler wishes to be immediately invoke a second time, to have |
113
|
|
|
|
|
|
|
another attempt at consuming more content, it should return C<1>. Otherwise, |
114
|
|
|
|
|
|
|
it should return C<0>, and the handler will next be invoked when more data has |
115
|
|
|
|
|
|
|
arrived from the underlying read handle and appended to the buffer. This makes |
116
|
|
|
|
|
|
|
it easy to implement code that handles multiple incoming records at the same |
117
|
|
|
|
|
|
|
time. Alternatively, if the handler function already attempts to consume as |
118
|
|
|
|
|
|
|
much as possible from the buffer, it will have no need to return C<1> at all. |
119
|
|
|
|
|
|
|
See the examples at the end of this documentation for more detail. |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
The second argument is a scalar indicating whether the stream has reported an |
122
|
|
|
|
|
|
|
end-of-file (EOF) condition. A reference to the buffer is passed to the |
123
|
|
|
|
|
|
|
handler in the usual way, so it may inspect data contained in it. Once the |
124
|
|
|
|
|
|
|
handler returns a false value, it will not be called again, as the handle is |
125
|
|
|
|
|
|
|
now at EOF and no more data can arrive. |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
The C code may also dynamically replace itself with a new callback |
128
|
|
|
|
|
|
|
by returning a CODE reference instead of C<0> or C<1>. The original callback |
129
|
|
|
|
|
|
|
or method that the object first started with may be restored by returning |
130
|
|
|
|
|
|
|
C. Whenever the callback is changed in this way, the new code is called |
131
|
|
|
|
|
|
|
again; even if the read buffer is currently empty. See the examples at the end |
132
|
|
|
|
|
|
|
of this documentation for more detail. |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
The C method can be used to insert new, temporary handlers that |
135
|
|
|
|
|
|
|
take precedence over the global C handler. This event is only used if |
136
|
|
|
|
|
|
|
there are no further pending handlers created by C. |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
=head2 on_read_eof |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
Optional. Invoked when the read handle indicates an end-of-file (EOF) |
141
|
|
|
|
|
|
|
condition. If there is any data in the buffer still to be processed, the |
142
|
|
|
|
|
|
|
C event will be invoked first, before this one. |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=head2 on_write_eof |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
Optional. Invoked when the write handle indicates an end-of-file (EOF) |
147
|
|
|
|
|
|
|
condition. Note that this condition can only be detected after a C |
148
|
|
|
|
|
|
|
syscall returns the C error. If there is no data pending to be written |
149
|
|
|
|
|
|
|
then it will not be detected yet. |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=head2 on_read_error $errno |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
Optional. Invoked when the C method on the read handle fails. |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
=head2 on_write_error $errno |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
Optional. Invoked when the C method on the write handle fails. |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
The C and C handlers are passed the value of |
160
|
|
|
|
|
|
|
C<$!> at the time the error occurred. (The C<$!> variable itself, by its |
161
|
|
|
|
|
|
|
nature, may have changed from the original error by the time this handler |
162
|
|
|
|
|
|
|
runs so it should always use the value passed in). |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
If an error occurs when the corresponding error callback is not supplied, and |
165
|
|
|
|
|
|
|
there is not a handler for it, then the C method is called instead. |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
=head2 on_read_high_watermark $length |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
=head2 on_read_low_watermark $length |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
Optional. Invoked when the read buffer grows larger than the high watermark |
172
|
|
|
|
|
|
|
or smaller than the low watermark respectively. These are edge-triggered |
173
|
|
|
|
|
|
|
events; they will only be triggered once per crossing, not continuously while |
174
|
|
|
|
|
|
|
the buffer remains above or below the given limit. |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
If these event handlers are not defined, the default behaviour is to disable |
177
|
|
|
|
|
|
|
read-ready notifications if the read buffer grows larger than the high |
178
|
|
|
|
|
|
|
watermark (so as to avoid it growing arbitrarily if nothing is consuming it), |
179
|
|
|
|
|
|
|
and re-enable notifications again once something has read enough to cause it to |
180
|
|
|
|
|
|
|
drop. If these events are overridden, the overriding code will have to perform |
181
|
|
|
|
|
|
|
this behaviour if required, by using |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
$self->want_readready_for_read(...) |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
=head2 on_outgoing_empty |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
Optional. Invoked when the writing data buffer becomes empty. |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
=head2 on_writeable_start |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
=head2 on_writeable_stop |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
Optional. These two events inform when the filehandle becomes writeable, and |
194
|
|
|
|
|
|
|
when it stops being writeable. C is invoked by the |
195
|
|
|
|
|
|
|
C event if previously it was known to be not writeable. |
196
|
|
|
|
|
|
|
C is invoked after a C operation fails with |
197
|
|
|
|
|
|
|
C or C. These two events track the writeability state, |
198
|
|
|
|
|
|
|
and ensure that only state change cause events to be invoked. A stream starts |
199
|
|
|
|
|
|
|
off being presumed writeable, so the first of these events to be observed will |
200
|
|
|
|
|
|
|
be C. |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
=cut |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
sub _init |
205
|
|
|
|
|
|
|
{ |
206
|
712
|
|
|
712
|
|
3012
|
my $self = shift; |
207
|
|
|
|
|
|
|
|
208
|
712
|
|
|
|
|
7293
|
$self->{writequeue} = []; # Queue of Writers |
209
|
712
|
|
|
|
|
3563
|
$self->{readqueue} = []; # Queue of Readers |
210
|
712
|
|
|
|
|
2299
|
$self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN) |
211
|
712
|
|
|
|
|
4005
|
$self->{readbuff} = ""; |
212
|
|
|
|
|
|
|
|
213
|
712
|
|
|
|
|
4262
|
$self->{reader} = "_sysread"; |
214
|
712
|
|
|
|
|
2380
|
$self->{writer} = "_syswrite"; |
215
|
|
|
|
|
|
|
|
216
|
712
|
|
|
|
|
2148
|
$self->{read_len} = $READLEN; |
217
|
712
|
|
|
|
|
3198
|
$self->{write_len} = $WRITELEN; |
218
|
|
|
|
|
|
|
|
219
|
712
|
|
|
|
|
3949
|
$self->{want} = WANT_READ_FOR_READ; |
220
|
|
|
|
|
|
|
|
221
|
712
|
|
|
|
|
2681
|
$self->{close_on_read_eof} = 1; |
222
|
|
|
|
|
|
|
} |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
=head1 PARAMETERS |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
The following named parameters may be passed to C or C: |
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
=head2 read_handle => IO |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
The IO handle to read from. Must implement C and C methods. |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
=head2 write_handle => IO |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
The IO handle to write to. Must implement C and C methods. |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=head2 handle => IO |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
Shortcut to specifying the same IO handle for both of the above. |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=head2 on_read => CODE |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
=head2 on_read_error => CODE |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=head2 on_outgoing_empty => CODE |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
=head2 on_write_error => CODE |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
=head2 on_writeable_start => CODE |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=head2 on_writeable_stop => CODE |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
CODE references for event handlers. |
253
|
|
|
|
|
|
|
|
254
|
|
|
|
|
|
|
=head2 autoflush => BOOL |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
Optional. If true, the C method will attempt to write data to the |
257
|
|
|
|
|
|
|
operating system immediately, without waiting for the loop to indicate the |
258
|
|
|
|
|
|
|
filehandle is write-ready. This is useful, for example, on streams that should |
259
|
|
|
|
|
|
|
contain up-to-date logging or console information. |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
It currently defaults to false for any file handle, but future versions of |
262
|
|
|
|
|
|
|
L may enable this by default on STDOUT and STDERR. |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=head2 read_len => INT |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes. |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
=head2 read_all => BOOL |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
Optional. If true, attempt to read as much data from the kernel as possible |
271
|
|
|
|
|
|
|
when the handle becomes readable. By default this is turned off, meaning at |
272
|
|
|
|
|
|
|
most one fixed-size buffer is read. If there is still more data in the |
273
|
|
|
|
|
|
|
kernel's buffer, the handle will still be readable, and will be read from |
274
|
|
|
|
|
|
|
again. |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
This behaviour allows multiple streams and sockets to be multiplexed |
277
|
|
|
|
|
|
|
simultaneously, meaning that a large bulk transfer on one cannot starve other |
278
|
|
|
|
|
|
|
filehandles of processing time. Turning this option on may improve bulk data |
279
|
|
|
|
|
|
|
transfer rate, at the risk of delaying or stalling processing on other |
280
|
|
|
|
|
|
|
filehandles. |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
=head2 write_len => INT |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes. |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=head2 write_all => BOOL |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
Optional. Analogous to the C option, but for writing. When |
289
|
|
|
|
|
|
|
C is enabled, this option only affects deferred writing if the |
290
|
|
|
|
|
|
|
initial attempt failed due to buffer space. |
291
|
|
|
|
|
|
|
|
292
|
|
|
|
|
|
|
=head2 read_high_watermark => INT |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
=head2 read_low_watermark => INT |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
Optional. If defined, gives a way to implement flow control or other |
297
|
|
|
|
|
|
|
behaviours that depend on the size of Stream's read buffer. |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
If after more data is read from the underlying filehandle the read buffer is |
300
|
|
|
|
|
|
|
now larger than the high watermark, the C event is |
301
|
|
|
|
|
|
|
triggered (which, by default, will disable read-ready notifications and pause |
302
|
|
|
|
|
|
|
reading from the filehandle). |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
If after data is consumed by an C handler the read buffer is now |
305
|
|
|
|
|
|
|
smaller than the low watermark, the C event is |
306
|
|
|
|
|
|
|
triggered (which, by default, will re-enable read-ready notifications and |
307
|
|
|
|
|
|
|
resume reading from the filehandle). For to be possible, the read handler |
308
|
|
|
|
|
|
|
would have to be one added by the C method or one of the |
309
|
|
|
|
|
|
|
Future-returning C methods. |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
By default these options are not defined, so this behaviour will not happen. |
312
|
|
|
|
|
|
|
C may not be set to a larger value than |
313
|
|
|
|
|
|
|
C, but it may be set to a smaller value, creating a |
314
|
|
|
|
|
|
|
hysteresis region. If either option is defined then both must be. |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
If these options are used with the default event handlers, be careful not to |
317
|
|
|
|
|
|
|
cause deadlocks by having a high watermark sufficiently low that a single |
318
|
|
|
|
|
|
|
C invocation might not consider it finished yet. |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
=head2 reader => STRING|CODE |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=head2 writer => STRING|CODE |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
Optional. If defined, gives the name of a method or a CODE reference to use |
325
|
|
|
|
|
|
|
to implement the actual reading from or writing to the filehandle. These will |
326
|
|
|
|
|
|
|
be invoked as |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
$stream->reader( $read_handle, $buffer, $len ) |
329
|
|
|
|
|
|
|
$stream->writer( $write_handle, $buffer, $len ) |
330
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
Each is expected to modify the passed buffer; C by appending to it, |
332
|
|
|
|
|
|
|
C by removing a prefix from it. Each is expected to return a true |
333
|
|
|
|
|
|
|
value on success, zero on EOF, or C with C<$!> set for errors. If not |
334
|
|
|
|
|
|
|
provided, they will be substituted by implenentations using C and |
335
|
|
|
|
|
|
|
C on the underlying handle, respectively. |
336
|
|
|
|
|
|
|
|
337
|
|
|
|
|
|
|
=head2 close_on_read_eof => BOOL |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
Optional. Usually true, but if set to a false value then the stream will not |
340
|
|
|
|
|
|
|
be Cd when an EOF condition occurs on read. This is normally not useful |
341
|
|
|
|
|
|
|
as at that point the underlying stream filehandle is no longer useable, but it |
342
|
|
|
|
|
|
|
may be useful for reading regular files, or interacting with TTY devices. |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
=head2 encoding => STRING |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
If supplied, sets the name of encoding of the underlying stream. If an |
347
|
|
|
|
|
|
|
encoding is set, then the C method will expect to receive Unicode |
348
|
|
|
|
|
|
|
strings and encodes them into bytes, and incoming bytes will be decoded into |
349
|
|
|
|
|
|
|
Unicode strings for the C event. |
350
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
If an encoding is not supplied then C and C will work in byte |
352
|
|
|
|
|
|
|
strings. |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
I in order to handle reads of UTF-8 content or other |
355
|
|
|
|
|
|
|
multibyte encodings, the code implementing the C event uses a feature |
356
|
|
|
|
|
|
|
of L; the C flag. While this flag has existed for a |
357
|
|
|
|
|
|
|
while and is used by the C<:encoding> PerlIO layer itself for similar |
358
|
|
|
|
|
|
|
purposes, the flag is not officially documented by the C module. In |
359
|
|
|
|
|
|
|
principle this undocumented feature could be subject to change, in practice I |
360
|
|
|
|
|
|
|
believe it to be reasonably stable. |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
This note applies only to the C event; data written using the |
363
|
|
|
|
|
|
|
C method does not rely on any undocumented features of C. |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
If a read handle is given, it is required that either an C callback |
366
|
|
|
|
|
|
|
reference is configured, or that the object provides an C method. It |
367
|
|
|
|
|
|
|
is optional whether either is true for C; if neither is |
368
|
|
|
|
|
|
|
supplied then no action will be taken when the writing buffer becomes empty. |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
An C handler may be supplied even if no read handle is yet given, to |
371
|
|
|
|
|
|
|
be used when a read handle is eventually provided by the C |
372
|
|
|
|
|
|
|
method. |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
This condition is checked at the time the object is added to a Loop; it is |
375
|
|
|
|
|
|
|
allowed to create a C object with a read handle but without |
376
|
|
|
|
|
|
|
a C handler, provided that one is later given using C |
377
|
|
|
|
|
|
|
before the stream is added to its containing Loop, either directly or by being |
378
|
|
|
|
|
|
|
a child of another Notifier already in a Loop, or added to one. |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
=cut |
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
sub configure |
383
|
|
|
|
|
|
|
{ |
384
|
993
|
|
|
993
|
1
|
6454
|
my $self = shift; |
385
|
993
|
|
|
|
|
6475
|
my %params = @_; |
386
|
|
|
|
|
|
|
|
387
|
993
|
|
|
|
|
4713
|
for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error |
388
|
|
|
|
|
|
|
on_write_error on_writeable_start on_writeable_stop autoflush |
389
|
|
|
|
|
|
|
read_len read_all write_len write_all on_read_high_watermark |
390
|
|
|
|
|
|
|
on_read_low_watermark reader writer close_on_read_eof )) { |
391
|
17874
|
100
|
|
|
|
43711
|
$self->{$_} = delete $params{$_} if exists $params{$_}; |
392
|
|
|
|
|
|
|
} |
393
|
|
|
|
|
|
|
|
394
|
993
|
100
|
66
|
|
|
9521
|
if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) { |
395
|
1
|
|
|
|
|
22
|
my $high = delete $params{read_high_watermark}; |
396
|
1
|
50
|
|
|
|
10
|
defined $high or $high = $self->{read_high_watermark}; |
397
|
|
|
|
|
|
|
|
398
|
1
|
|
|
|
|
2
|
my $low = delete $params{read_low_watermark}; |
399
|
1
|
50
|
|
|
|
4
|
defined $low or $low = $self->{read_low_watermark}; |
400
|
|
|
|
|
|
|
|
401
|
1
|
50
|
33
|
|
|
8
|
croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high; |
402
|
1
|
50
|
33
|
|
|
6
|
croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low; |
403
|
|
|
|
|
|
|
|
404
|
1
|
50
|
33
|
|
|
8
|
croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high; |
|
|
|
33
|
|
|
|
|
405
|
|
|
|
|
|
|
|
406
|
1
|
|
|
|
|
3
|
$self->{read_high_watermark} = $high; |
407
|
1
|
|
|
|
|
3
|
$self->{read_low_watermark} = $low; |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
# TODO: reassert levels if we've moved them |
410
|
|
|
|
|
|
|
} |
411
|
|
|
|
|
|
|
|
412
|
993
|
100
|
|
|
|
2758
|
if( exists $params{encoding} ) { |
413
|
2
|
|
|
|
|
4
|
my $encoding = delete $params{encoding}; |
414
|
2
|
|
|
|
|
8
|
my $obj = find_encoding( $encoding ); |
415
|
2
|
50
|
|
|
|
246
|
defined $obj or croak "Cannot handle an encoding of '$encoding'"; |
416
|
2
|
|
|
|
|
5
|
$self->{encoding} = $obj; |
417
|
|
|
|
|
|
|
} |
418
|
|
|
|
|
|
|
|
419
|
993
|
|
|
|
|
14028
|
$self->SUPER::configure( %params ); |
420
|
|
|
|
|
|
|
|
421
|
993
|
100
|
100
|
|
|
3072
|
if( $self->loop and $self->read_handle ) { |
422
|
5
|
50
|
|
|
|
25
|
$self->can_event( "on_read" ) or |
423
|
|
|
|
|
|
|
croak 'Expected either an on_read callback or to be able to ->on_read'; |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
|
426
|
993
|
100
|
100
|
|
|
4732
|
if( $self->{autoflush} and my $write_handle = $self->write_handle ) { |
427
|
62
|
50
|
|
|
|
700
|
carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle" |
428
|
|
|
|
|
|
|
if $write_handle->blocking; |
429
|
|
|
|
|
|
|
} |
430
|
|
|
|
|
|
|
} |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
sub _add_to_loop |
433
|
|
|
|
|
|
|
{ |
434
|
703
|
|
|
703
|
|
1556
|
my $self = shift; |
435
|
|
|
|
|
|
|
|
436
|
703
|
100
|
|
|
|
2196
|
if( defined $self->read_handle ) { |
437
|
592
|
100
|
|
|
|
1965
|
$self->can_event( "on_read" ) or |
438
|
|
|
|
|
|
|
croak 'Expected either an on_read callback or to be able to ->on_read'; |
439
|
|
|
|
|
|
|
} |
440
|
|
|
|
|
|
|
|
441
|
702
|
|
|
|
|
5554
|
$self->SUPER::_add_to_loop( @_ ); |
442
|
|
|
|
|
|
|
|
443
|
702
|
100
|
|
|
|
6036
|
if( !$self->_is_empty ) { |
444
|
39
|
|
|
|
|
243
|
$self->want_writeready_for_write( 1 ); |
445
|
|
|
|
|
|
|
} |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
|
448
|
|
|
|
|
|
|
=head1 METHODS |
449
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
The following methods documented with a trailing call to C<< ->get >> return |
451
|
|
|
|
|
|
|
L instances. |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=cut |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
=head2 want_readready_for_read |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
=head2 want_readready_for_write |
458
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
$stream->want_readready_for_read( $set ) |
460
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
$stream->want_readready_for_write( $set ) |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
Mutators for the C property on L, which |
464
|
|
|
|
|
|
|
control whether the C or C behaviour should be continued once the |
465
|
|
|
|
|
|
|
filehandle becomes ready for read. |
466
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
Normally, C is always true (though the read watermark |
468
|
|
|
|
|
|
|
behaviour can modify it), and C is not used. |
469
|
|
|
|
|
|
|
However, if a custom C function is provided, it may find this useful |
470
|
|
|
|
|
|
|
for being invoked again if it cannot proceed with a write operation until the |
471
|
|
|
|
|
|
|
filehandle becomes readable (such as during transport negotiation or SSL key |
472
|
|
|
|
|
|
|
management, for example). |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
=cut |
475
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
sub want_readready_for_read |
477
|
|
|
|
|
|
|
{ |
478
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
479
|
0
|
|
|
|
|
0
|
my ( $set ) = @_; |
480
|
0
|
0
|
|
|
|
0
|
$set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ ); |
481
|
|
|
|
|
|
|
|
482
|
0
|
0
|
|
|
|
0
|
$self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; |
483
|
|
|
|
|
|
|
} |
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
sub want_readready_for_write |
486
|
|
|
|
|
|
|
{ |
487
|
1
|
|
|
1
|
1
|
357
|
my $self = shift; |
488
|
1
|
|
|
|
|
3
|
my ( $set ) = @_; |
489
|
1
|
50
|
|
|
|
6
|
$set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE ); |
490
|
|
|
|
|
|
|
|
491
|
1
|
50
|
|
|
|
4
|
$self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; |
492
|
|
|
|
|
|
|
} |
493
|
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
=head2 want_writeready_for_read |
495
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
=head2 want_writeready_for_write |
497
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
$stream->want_writeready_for_write( $set ) |
499
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
$stream->want_writeready_for_read( $set ) |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
Mutators for the C property on L, which |
503
|
|
|
|
|
|
|
control whether the C or C behaviour should be continued once the |
504
|
|
|
|
|
|
|
filehandle becomes ready for write. |
505
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
Normally, C is managed by the C method and |
507
|
|
|
|
|
|
|
associated flushing, and C is not used. However, if |
508
|
|
|
|
|
|
|
a custom C function is provided, it may find this useful for being |
509
|
|
|
|
|
|
|
invoked again if it cannot proceed with a read operation until the filehandle |
510
|
|
|
|
|
|
|
becomes writable (such as during transport negotiation or SSL key management, |
511
|
|
|
|
|
|
|
for example). |
512
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
=cut |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
sub want_writeready_for_write |
516
|
|
|
|
|
|
|
{ |
517
|
223
|
|
|
223
|
1
|
422
|
my $self = shift; |
518
|
223
|
|
|
|
|
610
|
my ( $set ) = @_; |
519
|
223
|
100
|
|
|
|
857
|
$set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE ); |
520
|
|
|
|
|
|
|
|
521
|
223
|
100
|
|
|
|
749
|
$self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; |
522
|
|
|
|
|
|
|
} |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
sub want_writeready_for_read |
525
|
|
|
|
|
|
|
{ |
526
|
1
|
|
|
1
|
1
|
605
|
my $self = shift; |
527
|
1
|
|
|
|
|
3
|
my ( $set ) = @_; |
528
|
1
|
50
|
|
|
|
7
|
$set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ ); |
529
|
|
|
|
|
|
|
|
530
|
1
|
50
|
|
|
|
7
|
$self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; |
531
|
|
|
|
|
|
|
} |
532
|
|
|
|
|
|
|
|
533
|
|
|
|
|
|
|
# FUNCTION not method |
534
|
|
|
|
|
|
|
sub _nonfatal_error |
535
|
|
|
|
|
|
|
{ |
536
|
9
|
|
|
9
|
|
18
|
my ( $errno ) = @_; |
537
|
|
|
|
|
|
|
|
538
|
9
|
|
66
|
|
|
75
|
return $errno == EAGAIN || |
539
|
|
|
|
|
|
|
$errno == EWOULDBLOCK || |
540
|
|
|
|
|
|
|
$errno == EINTR; |
541
|
|
|
|
|
|
|
} |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
sub _is_empty |
544
|
|
|
|
|
|
|
{ |
545
|
1327
|
|
|
1327
|
|
2464
|
my $self = shift; |
546
|
1327
|
|
|
|
|
2049
|
return !@{ $self->{writequeue} }; |
|
1327
|
|
|
|
|
7676
|
|
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
=head2 close |
550
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
$stream->close |
552
|
|
|
|
|
|
|
|
553
|
|
|
|
|
|
|
A synonym for C. This should not be used when the deferred |
554
|
|
|
|
|
|
|
wait behaviour is required, as the behaviour of C may change in a |
555
|
|
|
|
|
|
|
future version of L. Instead, call C directly. |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
=cut |
558
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
sub close |
560
|
|
|
|
|
|
|
{ |
561
|
8
|
|
|
8
|
1
|
1088
|
my $self = shift; |
562
|
8
|
|
|
|
|
67
|
$self->close_when_empty; |
563
|
|
|
|
|
|
|
} |
564
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
=head2 close_when_empty |
566
|
|
|
|
|
|
|
|
567
|
|
|
|
|
|
|
$stream->close_when_empty |
568
|
|
|
|
|
|
|
|
569
|
|
|
|
|
|
|
If the write buffer is empty, this method calls C on the underlying IO |
570
|
|
|
|
|
|
|
handles, and removes the stream from its containing loop. If the write buffer |
571
|
|
|
|
|
|
|
still contains data, then this is deferred until the buffer is empty. This is |
572
|
|
|
|
|
|
|
intended for "write-then-close" one-shot streams. |
573
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
$stream->write( "Here is my final data\n" ); |
575
|
|
|
|
|
|
|
$stream->close_when_empty; |
576
|
|
|
|
|
|
|
|
577
|
|
|
|
|
|
|
Because of this deferred nature, it may not be suitable for error handling. |
578
|
|
|
|
|
|
|
See instead the C method. |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
=cut |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
sub close_when_empty |
583
|
|
|
|
|
|
|
{ |
584
|
173
|
|
|
173
|
1
|
2216
|
my $self = shift; |
585
|
|
|
|
|
|
|
|
586
|
173
|
100
|
|
|
|
449
|
return $self->SUPER::close if $self->_is_empty; |
587
|
|
|
|
|
|
|
|
588
|
8
|
|
|
|
|
171
|
$self->{stream_closing} = 1; |
589
|
|
|
|
|
|
|
} |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
=head2 close_now |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
$stream->close_now |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
This method immediately closes the underlying IO handles and removes the |
596
|
|
|
|
|
|
|
stream from the containing loop. It will not wait to flush the remaining data |
597
|
|
|
|
|
|
|
in the write buffer. |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=cut |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
sub close_now |
602
|
|
|
|
|
|
|
{ |
603
|
478
|
|
|
478
|
1
|
1342
|
my $self = shift; |
604
|
|
|
|
|
|
|
|
605
|
478
|
|
|
|
|
889
|
foreach ( @{ $self->{writequeue} } ) { |
|
478
|
|
|
|
|
1968
|
|
606
|
2
|
100
|
|
|
|
7
|
$_->on_error->( $self, "stream closing" ) if $_->on_error; |
607
|
|
|
|
|
|
|
} |
608
|
|
|
|
|
|
|
|
609
|
478
|
|
|
|
|
1011
|
undef @{ $self->{writequeue} }; |
|
478
|
|
|
|
|
1329
|
|
610
|
478
|
|
|
|
|
2688
|
undef $self->{stream_closing}; |
611
|
|
|
|
|
|
|
|
612
|
478
|
|
|
|
|
3473
|
$self->SUPER::close; |
613
|
|
|
|
|
|
|
} |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
=head2 is_read_eof |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
=head2 is_write_eof |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
$eof = $stream->is_read_eof |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
$eof = $stream->is_write_eof |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
Returns true after an EOF condition is reported on either the read or the |
624
|
|
|
|
|
|
|
write handle, respectively. |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
=cut |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
sub is_read_eof |
629
|
|
|
|
|
|
|
{ |
630
|
2
|
|
|
2
|
1
|
61
|
my $self = shift; |
631
|
2
|
|
|
|
|
9
|
return $self->{read_eof}; |
632
|
|
|
|
|
|
|
} |
633
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
sub is_write_eof |
635
|
|
|
|
|
|
|
{ |
636
|
2
|
|
|
2
|
1
|
36
|
my $self = shift; |
637
|
2
|
|
|
|
|
10
|
return $self->{write_eof}; |
638
|
|
|
|
|
|
|
} |
639
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
=head2 write |
641
|
|
|
|
|
|
|
|
642
|
|
|
|
|
|
|
$stream->write( $data, %params ) |
643
|
|
|
|
|
|
|
|
644
|
|
|
|
|
|
|
This method adds data to the outgoing data queue, or writes it immediately, |
645
|
|
|
|
|
|
|
according to the C parameter. |
646
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
If the C option is set, this method will try immediately to write |
648
|
|
|
|
|
|
|
the data to the underlying filehandle. If this completes successfully then it |
649
|
|
|
|
|
|
|
will have been written by the time this method returns. If it fails to write |
650
|
|
|
|
|
|
|
completely, then the data is queued as if C were not set, and will |
651
|
|
|
|
|
|
|
be flushed as normal. |
652
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
C<$data> can either be a plain string, a L, or a CODE reference. If it |
654
|
|
|
|
|
|
|
is a plain string it is written immediately. If it is not, its value will be |
655
|
|
|
|
|
|
|
used to generate more C<$data> values, eventually leading to strings to be |
656
|
|
|
|
|
|
|
written. |
657
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
If C<$data> is a C, the Stream will wait until it is ready, and take |
659
|
|
|
|
|
|
|
the single value it yields. |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
If C<$data> is a CODE reference, it will be repeatedly invoked to generate new |
662
|
|
|
|
|
|
|
values. Each time the filehandle is ready to write more data to it, the |
663
|
|
|
|
|
|
|
function is invoked. Once the function has finished generating data it should |
664
|
|
|
|
|
|
|
return undef. The function is passed the Stream object as its first argument. |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
It is allowed that Cs yield CODE references, or CODE references return |
667
|
|
|
|
|
|
|
Cs, as well as plain strings. |
668
|
|
|
|
|
|
|
|
669
|
|
|
|
|
|
|
For example, to stream the contents of an existing opened filehandle: |
670
|
|
|
|
|
|
|
|
671
|
|
|
|
|
|
|
open my $fileh, "<", $path or die "Cannot open $path - $!"; |
672
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
$stream->write( sub { |
674
|
|
|
|
|
|
|
my ( $stream ) = @_; |
675
|
|
|
|
|
|
|
|
676
|
|
|
|
|
|
|
sysread $fileh, my $buffer, 8192 or return; |
677
|
|
|
|
|
|
|
return $buffer; |
678
|
|
|
|
|
|
|
} ); |
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
Takes the following optional named parameters in C<%params>: |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
=over 8 |
683
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
=item write_len => INT |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
Overrides the C parameter for the data written by this call. |
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
=item on_write => CODE |
689
|
|
|
|
|
|
|
|
690
|
|
|
|
|
|
|
A CODE reference which will be invoked after every successful C |
691
|
|
|
|
|
|
|
operation on the underlying filehandle. It will be passed the number of bytes |
692
|
|
|
|
|
|
|
that were written by this call, which may not be the entire length of the |
693
|
|
|
|
|
|
|
buffer - if it takes more than one C operation to empty the buffer |
694
|
|
|
|
|
|
|
then this callback will be invoked multiple times. |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
$on_write->( $stream, $len ) |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
=item on_flush => CODE |
699
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
A CODE reference which will be invoked once the data queued by this C |
701
|
|
|
|
|
|
|
call has been flushed. This will be invoked even if the buffer itself is not |
702
|
|
|
|
|
|
|
yet empty; if more data has been queued since the call. |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
$on_flush->( $stream ) |
705
|
|
|
|
|
|
|
|
706
|
|
|
|
|
|
|
=item on_error => CODE |
707
|
|
|
|
|
|
|
|
708
|
|
|
|
|
|
|
A CODE reference which will be invoked if a C error happens while |
709
|
|
|
|
|
|
|
performing this write. Invoked as for the C's C event. |
710
|
|
|
|
|
|
|
|
711
|
|
|
|
|
|
|
$on_error->( $stream, $errno ) |
712
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
=back |
714
|
|
|
|
|
|
|
|
715
|
|
|
|
|
|
|
If the object is not yet a member of a loop and doesn't yet have a |
716
|
|
|
|
|
|
|
C, then calls to the C method will simply queue the data |
717
|
|
|
|
|
|
|
and return. It will be flushed when the object is added to the loop. |
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
If C<$data> is a defined but empty string, the write is still queued, and the |
720
|
|
|
|
|
|
|
C continuation will be invoked, if supplied. This can be used to |
721
|
|
|
|
|
|
|
obtain a marker, to invoke some code once the output queue has been flushed up |
722
|
|
|
|
|
|
|
to this point. |
723
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
=head2 write (scalar) |
725
|
|
|
|
|
|
|
|
726
|
|
|
|
|
|
|
$stream->write( ... )->get |
727
|
|
|
|
|
|
|
|
728
|
|
|
|
|
|
|
If called in non-void context, this method returns a L which will |
729
|
|
|
|
|
|
|
complete (with no value) when the write operation has been flushed. This may |
730
|
|
|
|
|
|
|
be used as an alternative to, or combined with, the C callback. |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=cut |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
sub _syswrite |
735
|
|
|
|
|
|
|
{ |
736
|
166
|
|
|
166
|
|
2004
|
my $self = shift; |
737
|
166
|
|
|
|
|
448
|
my ( $handle, undef, $len ) = @_; |
738
|
|
|
|
|
|
|
|
739
|
166
|
|
|
|
|
2293
|
my $written = $handle->syswrite( $_[1], $len ); |
740
|
166
|
100
|
|
|
|
7333
|
return $written if !$written; # zero or undef |
741
|
|
|
|
|
|
|
|
742
|
158
|
|
|
|
|
713
|
substr( $_[1], 0, $written ) = ""; |
743
|
158
|
|
|
|
|
509
|
return $written; |
744
|
|
|
|
|
|
|
} |
745
|
|
|
|
|
|
|
|
746
|
|
|
|
|
|
|
sub _flush_one_write |
747
|
|
|
|
|
|
|
{ |
748
|
177
|
|
|
177
|
|
426
|
my $self = shift; |
749
|
|
|
|
|
|
|
|
750
|
177
|
|
|
|
|
427
|
my $writequeue = $self->{writequeue}; |
751
|
|
|
|
|
|
|
|
752
|
177
|
|
|
|
|
299
|
my $head; |
753
|
177
|
|
66
|
|
|
2720
|
while( $head = $writequeue->[0] and ref $head->data ) { |
754
|
18
|
100
|
33
|
|
|
203
|
if( ref $head->data eq "CODE" ) { |
|
|
50
|
|
|
|
|
|
755
|
12
|
|
|
|
|
71
|
my $data = $head->data->( $self ); |
756
|
12
|
100
|
|
|
|
2572
|
if( !defined $data ) { |
757
|
5
|
50
|
|
|
|
17
|
$head->on_flush->( $self ) if $head->on_flush; |
758
|
5
|
|
|
|
|
605
|
shift @$writequeue; |
759
|
5
|
|
|
|
|
56
|
return 1; |
760
|
|
|
|
|
|
|
} |
761
|
7
|
100
|
100
|
|
|
38
|
if( !ref $data and my $encoding = $self->{encoding} ) { |
762
|
1
|
|
|
|
|
6
|
$data = $encoding->encode( $data ); |
763
|
|
|
|
|
|
|
} |
764
|
7
|
|
|
|
|
24
|
unshift @$writequeue, my $new = Writer( |
765
|
|
|
|
|
|
|
$data, $head->writelen, $head->on_write, undef, undef, 0 |
766
|
|
|
|
|
|
|
); |
767
|
7
|
|
|
|
|
104
|
next; |
768
|
|
|
|
|
|
|
} |
769
|
|
|
|
|
|
|
elsif( blessed $head->data and $head->data->isa( "Future" ) ) { |
770
|
6
|
|
|
|
|
119
|
my $f = $head->data; |
771
|
6
|
100
|
|
|
|
37
|
if( !$f->is_ready ) { |
772
|
2
|
50
|
|
|
|
13
|
return 0 if $head->watching; |
773
|
2
|
|
|
2
|
|
23
|
$f->on_ready( sub { $self->_flush_one_write } ); |
|
2
|
|
|
|
|
713
|
|
774
|
2
|
|
|
|
|
60
|
$head->watching++; |
775
|
2
|
|
|
|
|
15
|
return 0; |
776
|
|
|
|
|
|
|
} |
777
|
4
|
|
|
|
|
34
|
my $data = $f->get; |
778
|
4
|
100
|
100
|
|
|
80
|
if( !ref $data and my $encoding = $self->{encoding} ) { |
779
|
1
|
|
|
|
|
6
|
$data = $encoding->encode( $data ); |
780
|
|
|
|
|
|
|
} |
781
|
4
|
|
|
|
|
10
|
$head->data = $data; |
782
|
4
|
|
|
|
|
45
|
next; |
783
|
|
|
|
|
|
|
} |
784
|
|
|
|
|
|
|
else { |
785
|
0
|
|
|
|
|
0
|
die "Unsure what to do with reference ".ref($head->data)." in write queue"; |
786
|
|
|
|
|
|
|
} |
787
|
|
|
|
|
|
|
} |
788
|
|
|
|
|
|
|
|
789
|
170
|
|
|
|
|
4457
|
my $second; |
790
|
170
|
|
100
|
|
|
742
|
while( $second = $writequeue->[1] and |
|
|
|
66
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
|
33
|
|
|
|
|
791
|
|
|
|
|
|
|
!ref $second->data and |
792
|
|
|
|
|
|
|
$head->writelen == $second->writelen and |
793
|
|
|
|
|
|
|
!$head->on_write and !$second->on_write and |
794
|
|
|
|
|
|
|
!$head->on_flush ) { |
795
|
1
|
|
|
|
|
40
|
$head->data .= $second->data; |
796
|
1
|
|
|
|
|
8
|
$head->on_write = $second->on_write; |
797
|
1
|
|
|
|
|
8
|
$head->on_flush = $second->on_flush; |
798
|
1
|
|
|
|
|
9
|
splice @$writequeue, 1, 1, (); |
799
|
|
|
|
|
|
|
} |
800
|
|
|
|
|
|
|
|
801
|
170
|
50
|
|
|
|
624
|
die "TODO: head data does not contain a plain string" if ref $head->data; |
802
|
|
|
|
|
|
|
|
803
|
170
|
50
|
|
|
|
1377
|
if( $IO::Async::Debug::DEBUG > 1 ) { |
804
|
0
|
|
|
|
|
0
|
my $data = substr $head->data, 0, $head->writelen; |
805
|
0
|
|
|
|
|
0
|
$self->debug_printf( "WRITE len=%d", length $data ); |
806
|
0
|
0
|
|
|
|
0
|
IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw}; |
807
|
|
|
|
|
|
|
} |
808
|
|
|
|
|
|
|
|
809
|
170
|
|
|
|
|
454
|
my $writer = $self->{writer}; |
810
|
170
|
|
|
|
|
605
|
my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen ); |
811
|
|
|
|
|
|
|
|
812
|
170
|
100
|
|
|
|
706
|
if( !defined $len ) { |
813
|
3
|
|
|
|
|
16
|
my $errno = $!; |
814
|
|
|
|
|
|
|
|
815
|
3
|
100
|
66
|
|
|
104
|
if( $errno == EAGAIN or $errno == EWOULDBLOCK ) { |
816
|
1
|
50
|
|
|
|
13
|
$self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable}; |
817
|
1
|
|
|
|
|
6
|
$self->{writeable} = 0; |
818
|
|
|
|
|
|
|
} |
819
|
|
|
|
|
|
|
|
820
|
3
|
100
|
|
|
|
8
|
return 0 if _nonfatal_error( $errno ); |
821
|
|
|
|
|
|
|
|
822
|
2
|
50
|
|
|
|
6
|
$self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1; |
823
|
|
|
|
|
|
|
|
824
|
2
|
100
|
|
|
|
6
|
if( $errno == EPIPE ) { |
825
|
1
|
|
|
|
|
16
|
$self->debug_printf( "WRITE-EOF" ); |
826
|
1
|
|
|
|
|
2
|
$self->{write_eof} = 1; |
827
|
1
|
|
|
|
|
3
|
$self->maybe_invoke_event( on_write_eof => ); |
828
|
|
|
|
|
|
|
} |
829
|
|
|
|
|
|
|
|
830
|
2
|
50
|
|
|
|
10
|
$head->on_error->( $self, $errno ) if $head->on_error; |
831
|
2
|
100
|
|
|
|
6
|
$self->maybe_invoke_event( on_write_error => $errno ) |
832
|
|
|
|
|
|
|
or $self->close_now; |
833
|
|
|
|
|
|
|
|
834
|
2
|
|
|
|
|
21
|
return 0; |
835
|
|
|
|
|
|
|
} |
836
|
|
|
|
|
|
|
|
837
|
167
|
100
|
66
|
|
|
873
|
$METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len; |
838
|
|
|
|
|
|
|
|
839
|
167
|
100
|
|
|
|
4481
|
if( my $on_write = $head->on_write ) { |
840
|
3
|
|
|
|
|
23
|
$on_write->( $self, $len ); |
841
|
|
|
|
|
|
|
} |
842
|
|
|
|
|
|
|
|
843
|
167
|
100
|
|
|
|
1944
|
if( !length $head->data ) { |
844
|
157
|
100
|
|
|
|
1385
|
$head->on_flush->( $self ) if $head->on_flush; |
845
|
157
|
|
|
|
|
1429
|
shift @{ $self->{writequeue} }; |
|
157
|
|
|
|
|
436
|
|
846
|
|
|
|
|
|
|
} |
847
|
|
|
|
|
|
|
|
848
|
167
|
|
|
|
|
1624
|
return 1; |
849
|
|
|
|
|
|
|
} |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
sub write |
852
|
|
|
|
|
|
|
{ |
853
|
169
|
|
|
169
|
1
|
6842
|
my $self = shift; |
854
|
169
|
|
|
|
|
736
|
my ( $data, %params ) = @_; |
855
|
|
|
|
|
|
|
|
856
|
169
|
50
|
0
|
|
|
956
|
carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing}; |
857
|
|
|
|
|
|
|
|
858
|
|
|
|
|
|
|
# Allow writes without a filehandle if we're not yet in a Loop, just don't |
859
|
|
|
|
|
|
|
# try to flush them |
860
|
169
|
|
|
|
|
772
|
my $handle = $self->write_handle; |
861
|
|
|
|
|
|
|
|
862
|
169
|
100
|
100
|
|
|
935
|
croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop; |
863
|
|
|
|
|
|
|
|
864
|
168
|
100
|
100
|
|
|
1691
|
if( !ref $data and my $encoding = $self->{encoding} ) { |
865
|
2
|
|
|
|
|
18
|
$data = $encoding->encode( $data ); |
866
|
|
|
|
|
|
|
} |
867
|
|
|
|
|
|
|
|
868
|
168
|
|
|
|
|
485
|
my $on_write = delete $params{on_write}; |
869
|
168
|
|
|
|
|
387
|
my $on_flush = delete $params{on_flush}; |
870
|
168
|
|
|
|
|
370
|
my $on_error = delete $params{on_error}; |
871
|
|
|
|
|
|
|
|
872
|
168
|
|
|
|
|
317
|
my $f; |
873
|
168
|
100
|
|
|
|
478
|
if( defined wantarray ) { |
874
|
3
|
|
|
|
|
4
|
my $orig_on_flush = $on_flush; |
875
|
3
|
|
|
|
|
5
|
my $orig_on_error = $on_error; |
876
|
|
|
|
|
|
|
|
877
|
3
|
50
|
|
|
|
9
|
my $loop = $self->loop or |
878
|
|
|
|
|
|
|
croak "Cannot ->write data returning a Future to a Stream not in a Loop"; |
879
|
3
|
|
|
|
|
13
|
$f = $loop->new_future; |
880
|
|
|
|
|
|
|
$on_flush = sub { |
881
|
1
|
|
|
1
|
|
18
|
$f->done; |
882
|
1
|
50
|
|
|
|
51
|
$orig_on_flush->( @_ ) if $orig_on_flush; |
883
|
3
|
|
|
|
|
15
|
}; |
884
|
|
|
|
|
|
|
$on_error = sub { |
885
|
3
|
|
|
3
|
|
32
|
my $self = shift; |
886
|
3
|
|
|
|
|
6
|
my ( $errno ) = @_; |
887
|
|
|
|
|
|
|
|
888
|
3
|
100
|
|
|
|
12
|
$f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready; |
889
|
|
|
|
|
|
|
|
890
|
3
|
50
|
|
|
|
123
|
$orig_on_error->( $self, @_ ) if $orig_on_error; |
891
|
3
|
|
|
|
|
10
|
}; |
892
|
|
|
|
|
|
|
} |
893
|
|
|
|
|
|
|
|
894
|
168
|
|
|
|
|
380
|
my $write_len = $params{write_len}; |
895
|
168
|
50
|
|
|
|
633
|
defined $write_len or $write_len = $self->{write_len}; |
896
|
|
|
|
|
|
|
|
897
|
168
|
|
|
|
|
335
|
push @{ $self->{writequeue} }, Writer( |
|
168
|
|
|
|
|
2095
|
|
898
|
|
|
|
|
|
|
$data, $write_len, $on_write, $on_flush, $on_error, 0 |
899
|
|
|
|
|
|
|
); |
900
|
|
|
|
|
|
|
|
901
|
168
|
50
|
|
|
|
4201
|
keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params ); |
902
|
|
|
|
|
|
|
|
903
|
168
|
100
|
|
|
|
556
|
return $f unless $handle; |
904
|
|
|
|
|
|
|
|
905
|
132
|
100
|
|
|
|
323
|
if( $self->{autoflush} ) { |
906
|
104
|
|
66
|
|
|
319
|
1 while !$self->_is_empty and $self->_flush_one_write; |
907
|
|
|
|
|
|
|
|
908
|
104
|
50
|
|
|
|
246
|
if( $self->_is_empty ) { |
909
|
104
|
|
|
|
|
341
|
$self->want_writeready_for_write( 0 ); |
910
|
104
|
|
|
|
|
490
|
return $f; |
911
|
|
|
|
|
|
|
} |
912
|
|
|
|
|
|
|
} |
913
|
|
|
|
|
|
|
|
914
|
28
|
|
|
|
|
98
|
$self->want_writeready_for_write( 1 ); |
915
|
28
|
|
|
|
|
83
|
return $f; |
916
|
|
|
|
|
|
|
} |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
sub on_write_ready |
919
|
|
|
|
|
|
|
{ |
920
|
68
|
|
|
68
|
1
|
289
|
my $self = shift; |
921
|
|
|
|
|
|
|
|
922
|
68
|
100
|
|
|
|
384
|
if( !$self->{writeable} ) { |
923
|
1
|
|
|
|
|
5
|
$self->maybe_invoke_event( on_writeable_start => ); |
924
|
1
|
|
|
|
|
4
|
$self->{writeable} = 1; |
925
|
|
|
|
|
|
|
} |
926
|
|
|
|
|
|
|
|
927
|
68
|
100
|
|
|
|
715
|
$self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE; |
928
|
68
|
100
|
|
|
|
340
|
$self->_do_read if $self->{want} & WANT_WRITE_FOR_READ; |
929
|
|
|
|
|
|
|
} |
930
|
|
|
|
|
|
|
|
931
|
|
|
|
|
|
|
sub _do_write |
932
|
|
|
|
|
|
|
{ |
933
|
68
|
|
|
68
|
|
327
|
my $self = shift; |
934
|
|
|
|
|
|
|
|
935
|
68
|
|
100
|
|
|
445
|
1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all}; |
|
|
|
100
|
|
|
|
|
936
|
|
|
|
|
|
|
|
937
|
|
|
|
|
|
|
# All data successfully flushed |
938
|
68
|
100
|
|
|
|
318
|
if( $self->_is_empty ) { |
939
|
51
|
|
|
|
|
234
|
$self->want_writeready_for_write( 0 ); |
940
|
|
|
|
|
|
|
|
941
|
51
|
|
|
|
|
348
|
$self->maybe_invoke_event( on_outgoing_empty => ); |
942
|
|
|
|
|
|
|
|
943
|
51
|
100
|
|
|
|
254
|
$self->close_now if $self->{stream_closing}; |
944
|
|
|
|
|
|
|
} |
945
|
|
|
|
|
|
|
} |
946
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
sub _flush_one_read |
948
|
|
|
|
|
|
|
{ |
949
|
1152
|
|
|
1152
|
|
2393
|
my $self = shift; |
950
|
1152
|
|
|
|
|
2597
|
my ( $eof ) = @_; |
951
|
|
|
|
|
|
|
|
952
|
1152
|
|
|
|
|
4891
|
local $self->{flushing_read} = 1; |
953
|
|
|
|
|
|
|
|
954
|
1152
|
|
|
|
|
2323
|
my $readqueue = $self->{readqueue}; |
955
|
|
|
|
|
|
|
|
956
|
1152
|
|
|
|
|
1801
|
my $ret; |
957
|
1152
|
100
|
66
|
|
|
4489
|
if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) { |
958
|
17
|
|
|
|
|
205
|
$ret = $on_read->( $self, \$self->{readbuff}, $eof ); |
959
|
|
|
|
|
|
|
} |
960
|
|
|
|
|
|
|
else { |
961
|
1135
|
|
|
|
|
6206
|
$ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof ); |
962
|
|
|
|
|
|
|
} |
963
|
|
|
|
|
|
|
|
964
|
1152
|
100
|
100
|
|
|
5462
|
if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and |
|
|
|
66
|
|
|
|
|
965
|
|
|
|
|
|
|
length $self->{readbuff} < $self->{read_low_watermark} ) { |
966
|
1
|
|
|
|
|
2
|
undef $self->{at_read_high_watermark}; |
967
|
1
|
|
|
|
|
6
|
$self->invoke_event( on_read_low_watermark => length $self->{readbuff} ); |
968
|
|
|
|
|
|
|
} |
969
|
|
|
|
|
|
|
|
970
|
1152
|
100
|
100
|
|
|
4798
|
if( ref $ret eq "CODE" ) { |
|
|
100
|
|
|
|
|
|
971
|
|
|
|
|
|
|
# Replace the top CODE, or add it if there was none |
972
|
1
|
|
|
|
|
6
|
$readqueue->[0] = Reader( $ret, undef ); |
973
|
1
|
|
|
|
|
28
|
return 1; |
974
|
|
|
|
|
|
|
} |
975
|
|
|
|
|
|
|
elsif( @$readqueue and !defined $ret ) { |
976
|
13
|
|
|
|
|
22
|
shift @$readqueue; |
977
|
13
|
|
|
|
|
124
|
return 1; |
978
|
|
|
|
|
|
|
} |
979
|
|
|
|
|
|
|
else { |
980
|
1138
|
|
100
|
|
|
9177
|
return $ret && ( length( $self->{readbuff} ) > 0 || $eof ); |
981
|
|
|
|
|
|
|
} |
982
|
|
|
|
|
|
|
} |
983
|
|
|
|
|
|
|
|
984
|
|
|
|
|
|
|
sub _sysread |
985
|
|
|
|
|
|
|
{ |
986
|
943
|
|
|
943
|
|
2919
|
my $self = shift; |
987
|
943
|
|
|
|
|
2776
|
my ( $handle, undef, $len ) = @_; |
988
|
943
|
|
|
|
|
6448
|
return $handle->sysread( $_[1], $len ); |
989
|
|
|
|
|
|
|
} |
990
|
|
|
|
|
|
|
|
991
|
|
|
|
|
|
|
sub on_read_ready |
992
|
|
|
|
|
|
|
{ |
993
|
942
|
|
|
942
|
1
|
2181
|
my $self = shift; |
994
|
|
|
|
|
|
|
|
995
|
942
|
50
|
|
|
|
6900
|
$self->_do_read if $self->{want} & WANT_READ_FOR_READ; |
996
|
942
|
100
|
|
|
|
10457
|
$self->_do_write if $self->{want} & WANT_READ_FOR_WRITE; |
997
|
|
|
|
|
|
|
} |
998
|
|
|
|
|
|
|
|
999
|
|
|
|
|
|
|
sub _do_read |
1000
|
|
|
|
|
|
|
{ |
1001
|
943
|
|
|
943
|
|
2019
|
my $self = shift; |
1002
|
|
|
|
|
|
|
|
1003
|
943
|
|
|
|
|
3583
|
my $handle = $self->read_handle; |
1004
|
943
|
|
|
|
|
2383
|
my $reader = $self->{reader}; |
1005
|
|
|
|
|
|
|
|
1006
|
943
|
|
|
|
|
1602
|
while(1) { |
1007
|
947
|
|
|
|
|
1543
|
my $data; |
1008
|
947
|
|
|
|
|
4914
|
my $len = $self->$reader( $handle, $data, $self->{read_len} ); |
1009
|
|
|
|
|
|
|
|
1010
|
947
|
100
|
|
|
|
21405
|
if( !defined $len ) { |
1011
|
6
|
|
|
|
|
32
|
my $errno = $!; |
1012
|
|
|
|
|
|
|
|
1013
|
6
|
100
|
|
|
|
18
|
return if _nonfatal_error( $errno ); |
1014
|
|
|
|
|
|
|
|
1015
|
4
|
50
|
|
|
|
14
|
$self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1; |
1016
|
|
|
|
|
|
|
|
1017
|
4
|
100
|
|
|
|
15
|
$self->maybe_invoke_event( on_read_error => $errno ) |
1018
|
|
|
|
|
|
|
or $self->close_now; |
1019
|
|
|
|
|
|
|
|
1020
|
4
|
|
|
|
|
20
|
foreach ( @{ $self->{readqueue} } ) { |
|
4
|
|
|
|
|
10
|
|
1021
|
1
|
50
|
|
|
|
6
|
$_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future; |
1022
|
|
|
|
|
|
|
} |
1023
|
4
|
|
|
|
|
86
|
undef @{ $self->{readqueue} }; |
|
4
|
|
|
|
|
10
|
|
1024
|
|
|
|
|
|
|
|
1025
|
4
|
|
|
|
|
11
|
return; |
1026
|
|
|
|
|
|
|
} |
1027
|
|
|
|
|
|
|
|
1028
|
941
|
50
|
|
|
|
2821
|
if( $IO::Async::Debug::DEBUG > 1 ) { |
1029
|
0
|
|
|
|
|
0
|
$self->debug_printf( "READ len=%d", $len ); |
1030
|
0
|
0
|
|
|
|
0
|
IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr}; |
1031
|
|
|
|
|
|
|
} |
1032
|
|
|
|
|
|
|
|
1033
|
941
|
100
|
66
|
|
|
3487
|
$METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len; |
1034
|
|
|
|
|
|
|
|
1035
|
941
|
|
|
|
|
11496
|
my $eof = $self->{read_eof} = ( $len == 0 ); |
1036
|
|
|
|
|
|
|
|
1037
|
941
|
100
|
|
|
|
3296
|
if( my $encoding = $self->{encoding} ) { |
1038
|
4
|
100
|
|
|
|
21
|
my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data; |
1039
|
4
|
|
|
|
|
31
|
$data = $encoding->decode( $bytes, STOP_AT_PARTIAL ); |
1040
|
4
|
|
|
|
|
29
|
$self->{bytes_remaining} = $bytes; |
1041
|
|
|
|
|
|
|
} |
1042
|
|
|
|
|
|
|
|
1043
|
941
|
100
|
|
|
|
3556
|
$self->{readbuff} .= $data if !$eof; |
1044
|
|
|
|
|
|
|
|
1045
|
941
|
|
|
|
|
4098
|
1 while $self->_flush_one_read( $eof ); |
1046
|
|
|
|
|
|
|
|
1047
|
941
|
100
|
|
|
|
2581
|
if( $eof ) { |
1048
|
476
|
|
|
|
|
2823
|
$self->debug_printf( "READ-EOF" ); |
1049
|
476
|
|
|
|
|
2414
|
$self->maybe_invoke_event( on_read_eof => ); |
1050
|
476
|
100
|
|
|
|
3508
|
$self->close_now if $self->{close_on_read_eof}; |
1051
|
476
|
|
|
|
|
997
|
foreach ( @{ $self->{readqueue} } ) { |
|
476
|
|
|
|
|
1459
|
|
1052
|
0
|
0
|
|
|
|
0
|
$_->future->done( undef ) if $_->future; |
1053
|
|
|
|
|
|
|
} |
1054
|
476
|
|
|
|
|
799
|
undef @{ $self->{readqueue} }; |
|
476
|
|
|
|
|
1053
|
|
1055
|
476
|
|
|
|
|
1318
|
return; |
1056
|
|
|
|
|
|
|
} |
1057
|
|
|
|
|
|
|
|
1058
|
465
|
100
|
|
|
|
1771
|
last unless $self->{read_all}; |
1059
|
|
|
|
|
|
|
} |
1060
|
|
|
|
|
|
|
|
1061
|
461
|
100
|
66
|
|
|
1722
|
if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) { |
1062
|
|
|
|
|
|
|
$self->{at_read_high_watermark} or |
1063
|
1
|
50
|
|
|
|
8
|
$self->invoke_event( on_read_high_watermark => length $self->{readbuff} ); |
1064
|
|
|
|
|
|
|
|
1065
|
1
|
|
|
|
|
4
|
$self->{at_read_high_watermark} = 1; |
1066
|
|
|
|
|
|
|
} |
1067
|
|
|
|
|
|
|
} |
1068
|
|
|
|
|
|
|
|
1069
|
|
|
|
|
|
|
sub on_read_high_watermark |
1070
|
|
|
|
|
|
|
{ |
1071
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
1072
|
0
|
|
|
|
|
0
|
$self->want_readready_for_read( 0 ); |
1073
|
|
|
|
|
|
|
} |
1074
|
|
|
|
|
|
|
|
1075
|
|
|
|
|
|
|
sub on_read_low_watermark |
1076
|
|
|
|
|
|
|
{ |
1077
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
1078
|
0
|
|
|
|
|
0
|
$self->want_readready_for_read( 1 ); |
1079
|
|
|
|
|
|
|
} |
1080
|
|
|
|
|
|
|
|
1081
|
|
|
|
|
|
|
=head2 push_on_read |
1082
|
|
|
|
|
|
|
|
1083
|
|
|
|
|
|
|
$stream->push_on_read( $on_read ) |
1084
|
|
|
|
|
|
|
|
1085
|
|
|
|
|
|
|
Pushes a new temporary C handler to the end of the queue. This queue, |
1086
|
|
|
|
|
|
|
if non-empty, is used to provide C event handling code in preference |
1087
|
|
|
|
|
|
|
to using the object's main event handler or method. New handlers can be |
1088
|
|
|
|
|
|
|
supplied at any time, and they will be used in first-in first-out (FIFO) |
1089
|
|
|
|
|
|
|
order. |
1090
|
|
|
|
|
|
|
|
1091
|
|
|
|
|
|
|
As with the main C event handler, each can return a (defined) boolean |
1092
|
|
|
|
|
|
|
to indicate if they wish to be invoked again or not, another C reference |
1093
|
|
|
|
|
|
|
to replace themself with, or C to indicate it is now complete and |
1094
|
|
|
|
|
|
|
should be removed. When a temporary handler returns C it is shifted |
1095
|
|
|
|
|
|
|
from the queue and the next one, if present, is invoked instead. If there are |
1096
|
|
|
|
|
|
|
no more then the object's main handler is invoked instead. |
1097
|
|
|
|
|
|
|
|
1098
|
|
|
|
|
|
|
=cut |
1099
|
|
|
|
|
|
|
|
1100
|
|
|
|
|
|
|
sub push_on_read |
1101
|
|
|
|
|
|
|
{ |
1102
|
13
|
|
|
13
|
1
|
39
|
my $self = shift; |
1103
|
13
|
|
|
|
|
32
|
my ( $on_read, %args ) = @_; |
1104
|
|
|
|
|
|
|
# %args undocumented for internal use |
1105
|
|
|
|
|
|
|
|
1106
|
13
|
|
|
|
|
19
|
push @{ $self->{readqueue} }, Reader( $on_read, $args{future} ); |
|
13
|
|
|
|
|
45
|
|
1107
|
|
|
|
|
|
|
|
1108
|
|
|
|
|
|
|
# TODO: Should this always defer? |
1109
|
13
|
100
|
|
|
|
102
|
return if $self->{flushing_read}; |
1110
|
12
|
|
100
|
|
|
44
|
1 while length $self->{readbuff} and $self->_flush_one_read( 0 ); |
1111
|
|
|
|
|
|
|
} |
1112
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
=head1 FUTURE-RETURNING READ METHODS |
1114
|
|
|
|
|
|
|
|
1115
|
|
|
|
|
|
|
The following methods all return a L which will become ready when |
1116
|
|
|
|
|
|
|
enough data has been read by the Stream into its buffer. At this point, the |
1117
|
|
|
|
|
|
|
data is removed from the buffer and given to the C object to complete |
1118
|
|
|
|
|
|
|
it. |
1119
|
|
|
|
|
|
|
|
1120
|
|
|
|
|
|
|
my $f = $stream->read_... |
1121
|
|
|
|
|
|
|
|
1122
|
|
|
|
|
|
|
my ( $string ) = $f->get; |
1123
|
|
|
|
|
|
|
|
1124
|
|
|
|
|
|
|
Unlike the C event handlers, these methods don't allow for access to |
1125
|
|
|
|
|
|
|
"partial" results; they only provide the final result once it is ready. |
1126
|
|
|
|
|
|
|
|
1127
|
|
|
|
|
|
|
If a C is cancelled before it completes it is removed from the read |
1128
|
|
|
|
|
|
|
queue without consuming any data; i.e. each C atomically either |
1129
|
|
|
|
|
|
|
completes or is cancelled. |
1130
|
|
|
|
|
|
|
|
1131
|
|
|
|
|
|
|
Since it is possible to use a readable C entirely using these |
1132
|
|
|
|
|
|
|
C-returning methods instead of the C event, it may be useful |
1133
|
|
|
|
|
|
|
to configure a trivial return-false event handler to keep it from consuming |
1134
|
|
|
|
|
|
|
any input, and to allow it to be added to a C in the first place. |
1135
|
|
|
|
|
|
|
|
1136
|
|
|
|
|
|
|
my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... ); |
1137
|
|
|
|
|
|
|
$loop->add( $stream ); |
1138
|
|
|
|
|
|
|
|
1139
|
|
|
|
|
|
|
my $f = $stream->read_... |
1140
|
|
|
|
|
|
|
|
1141
|
|
|
|
|
|
|
If a read EOF or error condition happens while there are read Cs |
1142
|
|
|
|
|
|
|
pending, they are all completed. In the case of a read EOF, they are done with |
1143
|
|
|
|
|
|
|
C; in the case of a read error they are failed using the C<$!> error |
1144
|
|
|
|
|
|
|
value as the failure. |
1145
|
|
|
|
|
|
|
|
1146
|
|
|
|
|
|
|
$f->fail( $message, sysread => $! ) |
1147
|
|
|
|
|
|
|
|
1148
|
|
|
|
|
|
|
If a read EOF condition happens to the currently-processing read C, it |
1149
|
|
|
|
|
|
|
will return a partial result. The calling code can detect this by the fact |
1150
|
|
|
|
|
|
|
that the returned data is not complete according to the specification (too |
1151
|
|
|
|
|
|
|
short in C's case, or lacking the ending pattern in |
1152
|
|
|
|
|
|
|
C's case). Additionally, each C will yield the C<$eof> |
1153
|
|
|
|
|
|
|
value in its results. |
1154
|
|
|
|
|
|
|
|
1155
|
|
|
|
|
|
|
my ( $string, $eof ) = $f->get; |
1156
|
|
|
|
|
|
|
|
1157
|
|
|
|
|
|
|
=cut |
1158
|
|
|
|
|
|
|
|
1159
|
|
|
|
|
|
|
sub _read_future |
1160
|
|
|
|
|
|
|
{ |
1161
|
11
|
|
|
11
|
|
17
|
my $self = shift; |
1162
|
11
|
|
|
|
|
29
|
my $f = $self->loop->new_future; |
1163
|
|
|
|
|
|
|
$f->on_cancel( $self->_capture_weakself( sub { |
1164
|
1
|
50
|
|
1
|
|
5
|
my $self = shift or return; |
1165
|
1
|
|
|
|
|
3
|
1 while $self->_flush_one_read; |
1166
|
11
|
|
|
|
|
62
|
})); |
1167
|
11
|
|
|
|
|
229
|
return $f; |
1168
|
|
|
|
|
|
|
} |
1169
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
=head2 read_atmost |
1171
|
|
|
|
|
|
|
|
1172
|
|
|
|
|
|
|
=head2 read_exactly |
1173
|
|
|
|
|
|
|
|
1174
|
|
|
|
|
|
|
( $string, $eof ) = $stream->read_atmost( $len )->get |
1175
|
|
|
|
|
|
|
|
1176
|
|
|
|
|
|
|
( $string, $eof ) = $stream->read_exactly( $len )->get |
1177
|
|
|
|
|
|
|
|
1178
|
|
|
|
|
|
|
Completes the C when the read buffer contains C<$len> or more |
1179
|
|
|
|
|
|
|
characters of input. C will also complete after the first |
1180
|
|
|
|
|
|
|
invocation of C, even if fewer characters are available, whereas |
1181
|
|
|
|
|
|
|
C will wait until at least C<$len> are available. |
1182
|
|
|
|
|
|
|
|
1183
|
|
|
|
|
|
|
=cut |
1184
|
|
|
|
|
|
|
|
1185
|
|
|
|
|
|
|
sub read_atmost |
1186
|
|
|
|
|
|
|
{ |
1187
|
2
|
|
|
2
|
1
|
584
|
my $self = shift; |
1188
|
2
|
|
|
|
|
6
|
my ( $len ) = @_; |
1189
|
|
|
|
|
|
|
|
1190
|
2
|
|
|
|
|
7
|
my $f = $self->_read_future; |
1191
|
|
|
|
|
|
|
$self->push_on_read( sub { |
1192
|
1
|
|
|
1
|
|
3
|
my ( undef, $buffref, $eof ) = @_; |
1193
|
1
|
50
|
|
|
|
11
|
return undef if $f->is_cancelled; |
1194
|
1
|
|
|
|
|
18
|
$f->done( substr( $$buffref, 0, $len, "" ), $eof ); |
1195
|
1
|
|
|
|
|
120
|
return undef; |
1196
|
2
|
|
|
|
|
11
|
}, future => $f ); |
1197
|
2
|
|
|
|
|
8
|
return $f; |
1198
|
|
|
|
|
|
|
} |
1199
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
sub read_exactly |
1201
|
|
|
|
|
|
|
{ |
1202
|
4
|
|
|
4
|
1
|
1142
|
my $self = shift; |
1203
|
4
|
|
|
|
|
8
|
my ( $len ) = @_; |
1204
|
|
|
|
|
|
|
|
1205
|
4
|
|
|
|
|
9
|
my $f = $self->_read_future; |
1206
|
|
|
|
|
|
|
$self->push_on_read( sub { |
1207
|
4
|
|
|
4
|
|
10
|
my ( undef, $buffref, $eof ) = @_; |
1208
|
4
|
50
|
|
|
|
12
|
return undef if $f->is_cancelled; |
1209
|
4
|
50
|
33
|
|
|
46
|
return 0 unless $eof or length $$buffref >= $len; |
1210
|
4
|
|
|
|
|
20
|
$f->done( substr( $$buffref, 0, $len, "" ), $eof ); |
1211
|
4
|
|
|
|
|
247
|
return undef; |
1212
|
4
|
|
|
|
|
20
|
}, future => $f ); |
1213
|
4
|
|
|
|
|
31
|
return $f; |
1214
|
|
|
|
|
|
|
} |
1215
|
|
|
|
|
|
|
|
1216
|
|
|
|
|
|
|
=head2 read_until |
1217
|
|
|
|
|
|
|
|
1218
|
|
|
|
|
|
|
( $string, $eof ) = $stream->read_until( $end )->get |
1219
|
|
|
|
|
|
|
|
1220
|
|
|
|
|
|
|
Completes the C when the read buffer contains a match for C<$end>, |
1221
|
|
|
|
|
|
|
which may either be a plain string or a compiled C reference. Yields |
1222
|
|
|
|
|
|
|
the prefix of the buffer up to and including this match. |
1223
|
|
|
|
|
|
|
|
1224
|
|
|
|
|
|
|
=cut |
1225
|
|
|
|
|
|
|
|
1226
|
|
|
|
|
|
|
sub read_until |
1227
|
|
|
|
|
|
|
{ |
1228
|
4
|
|
|
4
|
1
|
1813
|
my $self = shift; |
1229
|
4
|
|
|
|
|
14
|
my ( $until ) = @_; |
1230
|
|
|
|
|
|
|
|
1231
|
4
|
100
|
|
|
|
38
|
ref $until or $until = qr/\Q$until\E/; |
1232
|
|
|
|
|
|
|
|
1233
|
4
|
|
|
|
|
11
|
my $f = $self->_read_future; |
1234
|
|
|
|
|
|
|
$self->push_on_read( sub { |
1235
|
5
|
|
|
5
|
|
12
|
my ( undef, $buffref, $eof ) = @_; |
1236
|
5
|
100
|
|
|
|
13
|
return undef if $f->is_cancelled; |
1237
|
4
|
100
|
|
|
|
50
|
if( $$buffref =~ $until ) { |
|
|
50
|
|
|
|
|
|
1238
|
3
|
|
|
|
|
27
|
$f->done( substr( $$buffref, 0, $+[0], "" ), $eof ); |
1239
|
3
|
|
|
|
|
126
|
return undef; |
1240
|
|
|
|
|
|
|
} |
1241
|
|
|
|
|
|
|
elsif( $eof ) { |
1242
|
0
|
|
|
|
|
0
|
$f->done( $$buffref, $eof ); $$buffref = ""; |
|
0
|
|
|
|
|
0
|
|
1243
|
0
|
|
|
|
|
0
|
return undef; |
1244
|
|
|
|
|
|
|
} |
1245
|
|
|
|
|
|
|
else { |
1246
|
1
|
|
|
|
|
3
|
return 0; |
1247
|
|
|
|
|
|
|
} |
1248
|
4
|
|
|
|
|
25
|
}, future => $f ); |
1249
|
4
|
|
|
|
|
18
|
return $f; |
1250
|
|
|
|
|
|
|
} |
1251
|
|
|
|
|
|
|
|
1252
|
|
|
|
|
|
|
=head2 read_until_eof |
1253
|
|
|
|
|
|
|
|
1254
|
|
|
|
|
|
|
( $string, $eof ) = $stream->read_until_eof->get |
1255
|
|
|
|
|
|
|
|
1256
|
|
|
|
|
|
|
Completes the C when the stream is eventually closed at EOF, and |
1257
|
|
|
|
|
|
|
yields all of the data that was available. |
1258
|
|
|
|
|
|
|
|
1259
|
|
|
|
|
|
|
=cut |
1260
|
|
|
|
|
|
|
|
1261
|
|
|
|
|
|
|
sub read_until_eof |
1262
|
|
|
|
|
|
|
{ |
1263
|
1
|
|
|
1
|
1
|
607
|
my $self = shift; |
1264
|
|
|
|
|
|
|
|
1265
|
1
|
|
|
|
|
4
|
my $f = $self->_read_future; |
1266
|
|
|
|
|
|
|
$self->push_on_read( sub { |
1267
|
2
|
|
|
2
|
|
7
|
my ( undef, $buffref, $eof ) = @_; |
1268
|
2
|
50
|
|
|
|
6
|
return undef if $f->is_cancelled; |
1269
|
2
|
100
|
|
|
|
16
|
return 0 unless $eof; |
1270
|
1
|
|
|
|
|
5
|
$f->done( $$buffref, $eof ); $$buffref = ""; |
|
1
|
|
|
|
|
41
|
|
1271
|
1
|
|
|
|
|
2
|
return undef; |
1272
|
1
|
|
|
|
|
8
|
}, future => $f ); |
1273
|
1
|
|
|
|
|
3
|
return $f; |
1274
|
|
|
|
|
|
|
} |
1275
|
|
|
|
|
|
|
|
1276
|
|
|
|
|
|
|
=head1 UTILITY CONSTRUCTORS |
1277
|
|
|
|
|
|
|
|
1278
|
|
|
|
|
|
|
=cut |
1279
|
|
|
|
|
|
|
|
1280
|
|
|
|
|
|
|
=head2 new_for_stdin |
1281
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
=head2 new_for_stdout |
1283
|
|
|
|
|
|
|
|
1284
|
|
|
|
|
|
|
=head2 new_for_stdio |
1285
|
|
|
|
|
|
|
|
1286
|
|
|
|
|
|
|
$stream = IO::Async::Stream->new_for_stdin |
1287
|
|
|
|
|
|
|
|
1288
|
|
|
|
|
|
|
$stream = IO::Async::Stream->new_for_stdout |
1289
|
|
|
|
|
|
|
|
1290
|
|
|
|
|
|
|
$stream = IO::Async::Stream->new_for_stdio |
1291
|
|
|
|
|
|
|
|
1292
|
|
|
|
|
|
|
Return a C object preconfigured with the correct |
1293
|
|
|
|
|
|
|
C, C or both. |
1294
|
|
|
|
|
|
|
|
1295
|
|
|
|
|
|
|
=cut |
1296
|
|
|
|
|
|
|
|
1297
|
1
|
|
|
1
|
1
|
18
|
sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) } |
1298
|
1
|
|
|
1
|
1
|
21
|
sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) } |
1299
|
|
|
|
|
|
|
|
1300
|
1
|
|
|
1
|
1
|
397
|
sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) } |
1301
|
|
|
|
|
|
|
|
1302
|
|
|
|
|
|
|
=head2 connect |
1303
|
|
|
|
|
|
|
|
1304
|
|
|
|
|
|
|
$future = $stream->connect( %args ) |
1305
|
|
|
|
|
|
|
|
1306
|
|
|
|
|
|
|
A convenient wrapper for calling the C method on the underlying |
1307
|
|
|
|
|
|
|
L object, passing the C hint as C if not |
1308
|
|
|
|
|
|
|
otherwise supplied. |
1309
|
|
|
|
|
|
|
|
1310
|
|
|
|
|
|
|
=cut |
1311
|
|
|
|
|
|
|
|
1312
|
|
|
|
|
|
|
sub connect |
1313
|
|
|
|
|
|
|
{ |
1314
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
1315
|
0
|
|
|
|
|
|
return $self->SUPER::connect( socktype => "stream", @_ ); |
1316
|
|
|
|
|
|
|
} |
1317
|
|
|
|
|
|
|
|
1318
|
|
|
|
|
|
|
=head1 DEBUGGING FLAGS |
1319
|
|
|
|
|
|
|
|
1320
|
|
|
|
|
|
|
The following flags in C enable extra logging: |
1321
|
|
|
|
|
|
|
|
1322
|
|
|
|
|
|
|
=over 4 |
1323
|
|
|
|
|
|
|
|
1324
|
|
|
|
|
|
|
=item C |
1325
|
|
|
|
|
|
|
|
1326
|
|
|
|
|
|
|
Log byte buffers as data is read from a Stream |
1327
|
|
|
|
|
|
|
|
1328
|
|
|
|
|
|
|
=item C |
1329
|
|
|
|
|
|
|
|
1330
|
|
|
|
|
|
|
Log byte buffers as data is written to a Stream |
1331
|
|
|
|
|
|
|
|
1332
|
|
|
|
|
|
|
=back |
1333
|
|
|
|
|
|
|
|
1334
|
|
|
|
|
|
|
=cut |
1335
|
|
|
|
|
|
|
|
1336
|
|
|
|
|
|
|
=head1 EXAMPLES |
1337
|
|
|
|
|
|
|
|
1338
|
|
|
|
|
|
|
=head2 A line-based C method |
1339
|
|
|
|
|
|
|
|
1340
|
|
|
|
|
|
|
The following C method accepts incoming C<\n>-terminated lines and |
1341
|
|
|
|
|
|
|
prints them to the program's C stream. |
1342
|
|
|
|
|
|
|
|
1343
|
|
|
|
|
|
|
sub on_read |
1344
|
|
|
|
|
|
|
{ |
1345
|
|
|
|
|
|
|
my $self = shift; |
1346
|
|
|
|
|
|
|
my ( $buffref, $eof ) = @_; |
1347
|
|
|
|
|
|
|
|
1348
|
|
|
|
|
|
|
while( $$buffref =~ s/^(.*\n)// ) { |
1349
|
|
|
|
|
|
|
print "Received a line: $1"; |
1350
|
|
|
|
|
|
|
} |
1351
|
|
|
|
|
|
|
|
1352
|
|
|
|
|
|
|
return 0; |
1353
|
|
|
|
|
|
|
} |
1354
|
|
|
|
|
|
|
|
1355
|
|
|
|
|
|
|
Because a reference to the buffer itself is passed, it is simple to use a |
1356
|
|
|
|
|
|
|
C regular expression on the scalar it points at, to both check if data |
1357
|
|
|
|
|
|
|
is ready (i.e. a whole line), and to remove it from the buffer. Since it |
1358
|
|
|
|
|
|
|
always removes as many complete lines as possible, it doesn't need invoking |
1359
|
|
|
|
|
|
|
again when it has finished, so it can return a constant C<0>. |
1360
|
|
|
|
|
|
|
|
1361
|
|
|
|
|
|
|
=head2 Reading binary data |
1362
|
|
|
|
|
|
|
|
1363
|
|
|
|
|
|
|
This C method accepts incoming records in 16-byte chunks, printing |
1364
|
|
|
|
|
|
|
each one. |
1365
|
|
|
|
|
|
|
|
1366
|
|
|
|
|
|
|
sub on_read |
1367
|
|
|
|
|
|
|
{ |
1368
|
|
|
|
|
|
|
my ( $self, $buffref, $eof ) = @_; |
1369
|
|
|
|
|
|
|
|
1370
|
|
|
|
|
|
|
if( length $$buffref >= 16 ) { |
1371
|
|
|
|
|
|
|
my $record = substr( $$buffref, 0, 16, "" ); |
1372
|
|
|
|
|
|
|
print "Received a 16-byte record: $record\n"; |
1373
|
|
|
|
|
|
|
|
1374
|
|
|
|
|
|
|
return 1; |
1375
|
|
|
|
|
|
|
} |
1376
|
|
|
|
|
|
|
|
1377
|
|
|
|
|
|
|
if( $eof and length $$buffref ) { |
1378
|
|
|
|
|
|
|
print "EOF: a partial record still exists\n"; |
1379
|
|
|
|
|
|
|
} |
1380
|
|
|
|
|
|
|
|
1381
|
|
|
|
|
|
|
return 0; |
1382
|
|
|
|
|
|
|
} |
1383
|
|
|
|
|
|
|
|
1384
|
|
|
|
|
|
|
This time, rather than a C loop we have decided to have the handler |
1385
|
|
|
|
|
|
|
just process one record, and use the C mechanism to ask that the |
1386
|
|
|
|
|
|
|
handler be invoked again if there still remains data that might contain |
1387
|
|
|
|
|
|
|
another record; only stopping with C when we know we can't find one. |
1388
|
|
|
|
|
|
|
|
1389
|
|
|
|
|
|
|
The 4-argument form of C extracts the 16-byte record from the buffer |
1390
|
|
|
|
|
|
|
and assigns it to the C<$record> variable, if there was enough data in the |
1391
|
|
|
|
|
|
|
buffer to extract it. |
1392
|
|
|
|
|
|
|
|
1393
|
|
|
|
|
|
|
A lot of protocols use a fixed-size header, followed by a variable-sized body |
1394
|
|
|
|
|
|
|
of data, whose size is given by one of the fields of the header. The following |
1395
|
|
|
|
|
|
|
C method extracts messages in such a protocol. |
1396
|
|
|
|
|
|
|
|
1397
|
|
|
|
|
|
|
sub on_read |
1398
|
|
|
|
|
|
|
{ |
1399
|
|
|
|
|
|
|
my ( $self, $buffref, $eof ) = @_; |
1400
|
|
|
|
|
|
|
|
1401
|
|
|
|
|
|
|
return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes |
1402
|
|
|
|
|
|
|
|
1403
|
|
|
|
|
|
|
my ( $len, $x, $y ) = unpack "N n n", $$buffref; |
1404
|
|
|
|
|
|
|
|
1405
|
|
|
|
|
|
|
return 0 unless length $$buffref >= 8 + $len; |
1406
|
|
|
|
|
|
|
|
1407
|
|
|
|
|
|
|
substr( $$buffref, 0, 8, "" ); |
1408
|
|
|
|
|
|
|
my $data = substr( $$buffref, 0, $len, "" ); |
1409
|
|
|
|
|
|
|
|
1410
|
|
|
|
|
|
|
print "A record with values x=$x y=$y\n"; |
1411
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
return 1; |
1413
|
|
|
|
|
|
|
} |
1414
|
|
|
|
|
|
|
|
1415
|
|
|
|
|
|
|
In this example, the header is Ced first, to extract the body |
1416
|
|
|
|
|
|
|
length, and then the body is extracted. If the buffer does not have enough |
1417
|
|
|
|
|
|
|
data yet for a complete message then C<0> is returned, and the buffer is left |
1418
|
|
|
|
|
|
|
unmodified for next time. Only when there are enough bytes in total does it |
1419
|
|
|
|
|
|
|
use C to remove them. |
1420
|
|
|
|
|
|
|
|
1421
|
|
|
|
|
|
|
=head2 Dynamic replacement of C |
1422
|
|
|
|
|
|
|
|
1423
|
|
|
|
|
|
|
Consider the following protocol (inspired by IMAP), which consists of |
1424
|
|
|
|
|
|
|
C<\n>-terminated lines that may have an optional data block attached. The |
1425
|
|
|
|
|
|
|
presence of such a data block, as well as its size, is indicated by the line |
1426
|
|
|
|
|
|
|
prefix. |
1427
|
|
|
|
|
|
|
|
1428
|
|
|
|
|
|
|
sub on_read |
1429
|
|
|
|
|
|
|
{ |
1430
|
|
|
|
|
|
|
my $self = shift; |
1431
|
|
|
|
|
|
|
my ( $buffref, $eof ) = @_; |
1432
|
|
|
|
|
|
|
|
1433
|
|
|
|
|
|
|
if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) { |
1434
|
|
|
|
|
|
|
my $length = $1; |
1435
|
|
|
|
|
|
|
my $line = $2; |
1436
|
|
|
|
|
|
|
|
1437
|
|
|
|
|
|
|
return sub { |
1438
|
|
|
|
|
|
|
my $self = shift; |
1439
|
|
|
|
|
|
|
my ( $buffref, $eof ) = @_; |
1440
|
|
|
|
|
|
|
|
1441
|
|
|
|
|
|
|
return 0 unless length $$buffref >= $length; |
1442
|
|
|
|
|
|
|
|
1443
|
|
|
|
|
|
|
# Take and remove the data from the buffer |
1444
|
|
|
|
|
|
|
my $data = substr( $$buffref, 0, $length, "" ); |
1445
|
|
|
|
|
|
|
|
1446
|
|
|
|
|
|
|
print "Received a line $line with some data ($data)\n"; |
1447
|
|
|
|
|
|
|
|
1448
|
|
|
|
|
|
|
return undef; # Restore the original method |
1449
|
|
|
|
|
|
|
} |
1450
|
|
|
|
|
|
|
} |
1451
|
|
|
|
|
|
|
elsif( $$buffref =~ s/^LINE:(.*)\n// ) { |
1452
|
|
|
|
|
|
|
my $line = $1; |
1453
|
|
|
|
|
|
|
|
1454
|
|
|
|
|
|
|
print "Received a line $line with no data\n"; |
1455
|
|
|
|
|
|
|
|
1456
|
|
|
|
|
|
|
return 1; |
1457
|
|
|
|
|
|
|
} |
1458
|
|
|
|
|
|
|
else { |
1459
|
|
|
|
|
|
|
print STDERR "Unrecognised input\n"; |
1460
|
|
|
|
|
|
|
# Handle it somehow |
1461
|
|
|
|
|
|
|
} |
1462
|
|
|
|
|
|
|
} |
1463
|
|
|
|
|
|
|
|
1464
|
|
|
|
|
|
|
In the case where trailing data is supplied, a new temporary C |
1465
|
|
|
|
|
|
|
callback is provided in a closure. This closure captures the C<$length> |
1466
|
|
|
|
|
|
|
variable so it knows how much data to expect. It also captures the C<$line> |
1467
|
|
|
|
|
|
|
variable so it can use it in the event report. When this method has finished |
1468
|
|
|
|
|
|
|
reading the data, it reports the event, then restores the original method by |
1469
|
|
|
|
|
|
|
returning C. |
1470
|
|
|
|
|
|
|
|
1471
|
|
|
|
|
|
|
=head1 SEE ALSO |
1472
|
|
|
|
|
|
|
|
1473
|
|
|
|
|
|
|
=over 4 |
1474
|
|
|
|
|
|
|
|
1475
|
|
|
|
|
|
|
=item * |
1476
|
|
|
|
|
|
|
|
1477
|
|
|
|
|
|
|
L - Supply object methods for I/O handles |
1478
|
|
|
|
|
|
|
|
1479
|
|
|
|
|
|
|
=back |
1480
|
|
|
|
|
|
|
|
1481
|
|
|
|
|
|
|
=head1 AUTHOR |
1482
|
|
|
|
|
|
|
|
1483
|
|
|
|
|
|
|
Paul Evans |
1484
|
|
|
|
|
|
|
|
1485
|
|
|
|
|
|
|
=cut |
1486
|
|
|
|
|
|
|
|
1487
|
|
|
|
|
|
|
0x55AA; |