| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package PAGI::Request::MultipartStream; |
|
2
|
|
|
|
|
|
|
$PAGI::Request::MultipartStream::VERSION = '0.002000'; |
|
3
|
4
|
|
|
4
|
|
471268
|
use strict; |
|
|
4
|
|
|
|
|
6
|
|
|
|
4
|
|
|
|
|
130
|
|
|
4
|
4
|
|
|
4
|
|
14
|
use warnings; |
|
|
4
|
|
|
|
|
9
|
|
|
|
4
|
|
|
|
|
155
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
4
|
|
|
4
|
|
1275
|
use Future::AsyncAwait; |
|
|
4
|
|
|
|
|
9567
|
|
|
|
4
|
|
|
|
|
22
|
|
|
7
|
4
|
|
|
4
|
|
214
|
use Carp qw(croak); |
|
|
4
|
|
|
|
|
9
|
|
|
|
4
|
|
|
|
|
206
|
|
|
8
|
4
|
|
|
4
|
|
1237
|
use HTTP::MultiPartParser; |
|
|
4
|
|
|
|
|
7380
|
|
|
|
4
|
|
|
|
|
9584
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
=head1 NAME |
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
PAGI::Request::MultipartStream - Pull-based streaming multipart/form-data engine |
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
use PAGI::Request::MultipartStream; |
|
17
|
|
|
|
|
|
|
use Future::AsyncAwait; |
|
18
|
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
# Usually obtained via $req->multipart_stream, not constructed directly: |
|
20
|
|
|
|
|
|
|
my $stream = $req->multipart_stream; |
|
21
|
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
while (defined(my $part = await $stream->next)) { |
|
23
|
|
|
|
|
|
|
if ($part->is_file) { |
|
24
|
|
|
|
|
|
|
await $part->stream_to_file($path); |
|
25
|
|
|
|
|
|
|
} |
|
26
|
|
|
|
|
|
|
else { |
|
27
|
|
|
|
|
|
|
my $value = await $part->value; # raw bytes; you decode |
|
28
|
|
|
|
|
|
|
} |
|
29
|
|
|
|
|
|
|
} |
|
30
|
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
A pull-based streaming parser for C request bodies. Each |
|
34
|
|
|
|
|
|
|
part of the body is exposed in turn as a L via C, |
|
35
|
|
|
|
|
|
|
and B: you choose its sink (a |
|
36
|
|
|
|
|
|
|
file, an object store, an async transform) per part, rather than accepting the |
|
37
|
|
|
|
|
|
|
buffered, spool-each-upload-to-a-temp-file behaviour of C and |
|
38
|
|
|
|
|
|
|
C in L. |
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
Because you own the sink, it can be fully asynchronous: C awaits a |
|
41
|
|
|
|
|
|
|
sink that returns a Future, so a slow downstream naturally backpressures the |
|
42
|
|
|
|
|
|
|
read. This is what the buffered multipart path cannot offer -- its spool to a |
|
43
|
|
|
|
|
|
|
temp file is blocking. |
|
44
|
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
Internally this drives L on demand, bridging its |
|
46
|
|
|
|
|
|
|
push-based callbacks onto an internal event queue that C and the part |
|
47
|
|
|
|
|
|
|
methods consume. |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
B An HTTP request body can |
|
50
|
|
|
|
|
|
|
only be consumed once. Once you create a multipart stream you cannot also call |
|
51
|
|
|
|
|
|
|
C/C/C/C/C, and a stream cannot be |
|
52
|
|
|
|
|
|
|
created if the body was already read; see L. |
|
53
|
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=cut |
|
55
|
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
our $MAX_FILES = 1000; |
|
57
|
|
|
|
|
|
|
our $MAX_FIELDS = 1000; |
|
58
|
|
|
|
|
|
|
our $MAX_FIELD_SIZE = 1024 * 1024; # buffered per-field cap |
|
59
|
|
|
|
|
|
|
our $MAX_FILE_SIZE = 100 * 1024 * 1024; |
|
60
|
|
|
|
|
|
|
our $MAX_REQUEST_BODY = 1024 * 1024 * 1024; # defense-in-depth; server max_body_size is primary |
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
=head1 CONSTRUCTOR |
|
63
|
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
=head2 new |
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
my $stream = PAGI::Request::MultipartStream->new( |
|
67
|
|
|
|
|
|
|
receive => $receive, # required: PAGI receive callback |
|
68
|
|
|
|
|
|
|
boundary => $boundary, # required: multipart boundary |
|
69
|
|
|
|
|
|
|
max_files => 1000, # optional limits (defaults shown) |
|
70
|
|
|
|
|
|
|
max_fields => 1000, |
|
71
|
|
|
|
|
|
|
max_field_size => 1024 * 1024, |
|
72
|
|
|
|
|
|
|
max_file_size => 100 * 1024 * 1024, |
|
73
|
|
|
|
|
|
|
max_request_body => 1024 * 1024 * 1024, |
|
74
|
|
|
|
|
|
|
); |
|
75
|
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
Creates a new streaming multipart engine. Most applications do not call this |
|
77
|
|
|
|
|
|
|
directly -- they obtain a ready-built stream from |
|
78
|
|
|
|
|
|
|
L, which extracts the boundary from the |
|
79
|
|
|
|
|
|
|
request's C and passes through the same limit options. |
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
C and C are required. The remaining options cap the body to |
|
82
|
|
|
|
|
|
|
bound memory and resource use: |
|
83
|
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
=over 4 |
|
85
|
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
=item * C - Maximum number of file parts. Default: 1000. |
|
87
|
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=item * C - Maximum number of non-file (field) parts. Default: 1000. |
|
89
|
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=item * C - Maximum size, in bytes, of any single field part. |
|
91
|
|
|
|
|
|
|
Default: 1 MiB (1024 * 1024). |
|
92
|
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
=item * C - Maximum size, in bytes, of any single file part. |
|
94
|
|
|
|
|
|
|
Default: 100 MiB (100 * 1024 * 1024). |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
=item * C - Maximum total bytes read from the request body. |
|
97
|
|
|
|
|
|
|
Default: 1 GiB (1024 * 1024 * 1024). This is a per-stream defence-in-depth |
|
98
|
|
|
|
|
|
|
cap; the PAGI server's C is the primary aggregate limit on the |
|
99
|
|
|
|
|
|
|
request body. |
|
100
|
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
=back |
|
102
|
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
=cut |
|
104
|
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
sub new { |
|
106
|
21
|
|
|
21
|
1
|
554971
|
my ($class, %args) = @_; |
|
107
|
21
|
50
|
|
|
|
85
|
croak "receive is required" unless $args{receive}; |
|
108
|
21
|
100
|
66
|
|
|
307
|
croak "boundary is required" unless defined $args{boundary} && length $args{boundary}; |
|
109
|
|
|
|
|
|
|
my $self = bless { |
|
110
|
|
|
|
|
|
|
receive => $args{receive}, |
|
111
|
|
|
|
|
|
|
boundary => $args{boundary}, |
|
112
|
|
|
|
|
|
|
max_files => $args{max_files} // $MAX_FILES, |
|
113
|
|
|
|
|
|
|
max_fields => $args{max_fields} // $MAX_FIELDS, |
|
114
|
|
|
|
|
|
|
max_field_size => $args{max_field_size} // $MAX_FIELD_SIZE, |
|
115
|
|
|
|
|
|
|
max_file_size => $args{max_file_size} // $MAX_FILE_SIZE, |
|
116
|
20
|
|
33
|
|
|
419
|
max_request_body => $args{max_request_body} // $MAX_REQUEST_BODY, |
|
|
|
|
66
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
117
|
|
|
|
|
|
|
_queue => [], # FIFO: ['part',\%meta] | ['body',$chunk] |
|
118
|
|
|
|
|
|
|
_file_count => 0, |
|
119
|
|
|
|
|
|
|
_field_count => 0, |
|
120
|
|
|
|
|
|
|
_bytes_total => 0, |
|
121
|
|
|
|
|
|
|
_cur_is_file => 0, |
|
122
|
|
|
|
|
|
|
_cur_bytes => 0, |
|
123
|
|
|
|
|
|
|
_cur_name => undef, |
|
124
|
|
|
|
|
|
|
_current => undef, # current Part |
|
125
|
|
|
|
|
|
|
_exhausted => 0, |
|
126
|
|
|
|
|
|
|
_parser_finished => 0, # guard: finish() is called at most once |
|
127
|
|
|
|
|
|
|
_failed => undef, # sticky failure message (poisons the stream) |
|
128
|
|
|
|
|
|
|
}, $class; |
|
129
|
20
|
|
|
|
|
59
|
$self->{_parser} = $self->_build_parser; |
|
130
|
20
|
|
|
|
|
1510
|
return $self; |
|
131
|
|
|
|
|
|
|
} |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
# Parse the on_header arrayref of header lines into |
|
134
|
|
|
|
|
|
|
# {name,filename,content_type,encoding,headers}. is_file := defined(filename). |
|
135
|
|
|
|
|
|
|
sub _disposition { |
|
136
|
24
|
|
|
24
|
|
30
|
my ($lines) = @_; |
|
137
|
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
# $lines is an arrayref of raw header lines, e.g. |
|
139
|
|
|
|
|
|
|
# 'Content-Disposition: form-data; name="x"' |
|
140
|
24
|
|
|
|
|
25
|
my %headers; |
|
141
|
24
|
|
|
|
|
36
|
for my $line (@$lines) { |
|
142
|
35
|
50
|
|
|
|
113
|
if ($line =~ /^([^:]+):\s*(.*)$/) { |
|
143
|
35
|
|
|
|
|
122
|
$headers{lc($1)} = $2; |
|
144
|
|
|
|
|
|
|
} |
|
145
|
|
|
|
|
|
|
} |
|
146
|
|
|
|
|
|
|
|
|
147
|
24
|
|
|
|
|
40
|
my $disposition = _parse_content_disposition(\%headers); |
|
148
|
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
return { |
|
150
|
|
|
|
|
|
|
name => $disposition->{name}, |
|
151
|
|
|
|
|
|
|
filename => $disposition->{filename}, |
|
152
|
|
|
|
|
|
|
content_type => $headers{'content-type'} // 'text/plain', |
|
153
|
24
|
|
100
|
|
|
151
|
encoding => $headers{'content-transfer-encoding'}, |
|
154
|
|
|
|
|
|
|
headers => \%headers, |
|
155
|
|
|
|
|
|
|
}; |
|
156
|
|
|
|
|
|
|
} |
|
157
|
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
sub _parse_content_disposition { |
|
159
|
24
|
|
|
24
|
|
29
|
my ($headers) = @_; |
|
160
|
24
|
|
50
|
|
|
122
|
my $cd = $headers->{'content-disposition'} // ''; |
|
161
|
|
|
|
|
|
|
|
|
162
|
24
|
|
|
|
|
25
|
my %result; |
|
163
|
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
# Parse name="value" pairs |
|
165
|
24
|
|
|
|
|
109
|
while ($cd =~ /(\w+)="([^"]*)"/g) { |
|
166
|
37
|
|
|
|
|
130
|
$result{$1} = $2; |
|
167
|
|
|
|
|
|
|
} |
|
168
|
|
|
|
|
|
|
# Also handle unquoted values |
|
169
|
24
|
|
|
|
|
99
|
while ($cd =~ /(\w+)=([^;\s"]+)/g) { |
|
170
|
0
|
|
0
|
|
|
0
|
$result{$1} //= $2; |
|
171
|
|
|
|
|
|
|
} |
|
172
|
|
|
|
|
|
|
|
|
173
|
24
|
|
|
|
|
48
|
return \%result; |
|
174
|
|
|
|
|
|
|
} |
|
175
|
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
sub _build_parser { |
|
177
|
20
|
|
|
20
|
|
33
|
my ($self) = @_; |
|
178
|
|
|
|
|
|
|
return HTTP::MultiPartParser->new( |
|
179
|
|
|
|
|
|
|
boundary => $self->{boundary}, |
|
180
|
|
|
|
|
|
|
on_header => sub { |
|
181
|
26
|
|
|
26
|
|
791
|
my ($headers) = @_; |
|
182
|
26
|
100
|
|
|
|
50
|
return if $self->{_failed}; |
|
183
|
24
|
|
|
|
|
41
|
my $meta = _disposition($headers); |
|
184
|
24
|
100
|
|
|
|
71
|
my $is_file = defined $meta->{filename} ? 1 : 0; |
|
185
|
24
|
100
|
|
|
|
34
|
if ($is_file) { |
|
186
|
|
|
|
|
|
|
$self->{_failed} = "Too many file parts (max $self->{max_files})" |
|
187
|
13
|
50
|
|
|
|
35
|
if ++$self->{_file_count} > $self->{max_files}; |
|
188
|
|
|
|
|
|
|
} else { |
|
189
|
|
|
|
|
|
|
$self->{_failed} = "Too many field parts (max $self->{max_fields})" |
|
190
|
11
|
100
|
|
|
|
24
|
if ++$self->{_field_count} > $self->{max_fields}; |
|
191
|
|
|
|
|
|
|
} |
|
192
|
24
|
100
|
|
|
|
35
|
return if $self->{_failed}; |
|
193
|
23
|
|
|
|
|
34
|
$self->{_cur_is_file} = $is_file; |
|
194
|
23
|
|
|
|
|
25
|
$self->{_cur_bytes} = 0; |
|
195
|
23
|
|
|
|
|
33
|
$self->{_cur_name} = $meta->{name}; |
|
196
|
23
|
|
|
|
|
26
|
push @{$self->{_queue}}, ['part', $meta]; |
|
|
23
|
|
|
|
|
99
|
|
|
197
|
|
|
|
|
|
|
}, |
|
198
|
|
|
|
|
|
|
on_body => sub { |
|
199
|
24
|
|
|
24
|
|
274
|
my ($chunk, $final) = @_; |
|
200
|
24
|
100
|
|
|
|
41
|
return if $self->{_failed}; |
|
201
|
21
|
|
|
|
|
30
|
$self->{_cur_bytes} += length $chunk; |
|
202
|
21
|
100
|
|
|
|
39
|
my $max = $self->{_cur_is_file} ? $self->{max_file_size} : $self->{max_field_size}; |
|
203
|
21
|
100
|
|
|
|
37
|
if ($self->{_cur_bytes} > $max) { |
|
204
|
|
|
|
|
|
|
$self->{_failed} = sprintf("%s part '%s' too large (max %d bytes)", |
|
205
|
3
|
100
|
50
|
|
|
23
|
($self->{_cur_is_file} ? 'File' : 'Field'), ($self->{_cur_name} // ''), $max); |
|
206
|
3
|
|
|
|
|
7
|
return; # stop enqueuing -- bounds the queue |
|
207
|
|
|
|
|
|
|
} |
|
208
|
18
|
|
|
|
|
19
|
push @{$self->{_queue}}, ['body', $chunk]; |
|
|
18
|
|
|
|
|
47
|
|
|
209
|
|
|
|
|
|
|
}, |
|
210
|
2
|
|
33
|
2
|
|
44
|
on_error => sub { $self->{_failed} //= "Multipart parse error: $_[0]" }, |
|
211
|
20
|
|
|
|
|
202
|
); |
|
212
|
|
|
|
|
|
|
} |
|
213
|
|
|
|
|
|
|
|
|
214
|
|
|
|
|
|
|
# Feed exactly one network message. Returns true if it fed data, false at exhaustion. |
|
215
|
32
|
|
|
32
|
|
36
|
async sub _pump { |
|
216
|
32
|
|
|
|
|
40
|
my ($self) = @_; |
|
217
|
32
|
100
|
|
|
|
64
|
return 0 if $self->{_exhausted}; |
|
218
|
25
|
|
|
|
|
48
|
my $msg = await $self->{receive}->(); |
|
219
|
25
|
100
|
33
|
|
|
1117
|
if (!$msg || !$msg->{type} || $msg->{type} eq 'http.disconnect') { |
|
|
|
|
66
|
|
|
|
|
|
220
|
4
|
|
|
|
|
7
|
$self->{_exhausted} = 1; |
|
221
|
4
|
100
|
|
|
|
26
|
$self->_finish_parser if $self->{_bytes_total} > 0; # 0 bytes => empty stream, clean EOF |
|
222
|
4
|
|
|
|
|
18
|
return 0; |
|
223
|
|
|
|
|
|
|
} |
|
224
|
21
|
100
|
66
|
|
|
65
|
if (defined $msg->{body} && length $msg->{body}) { |
|
225
|
20
|
|
|
|
|
33
|
$self->{_bytes_total} += length $msg->{body}; |
|
226
|
20
|
100
|
|
|
|
41
|
if ($self->{_bytes_total} > $self->{max_request_body}) { |
|
227
|
1
|
|
|
|
|
3
|
$self->{_failed} = "Request body exceeded max_request_body ($self->{max_request_body} bytes)"; |
|
228
|
1
|
|
|
|
|
3
|
$self->{_exhausted} = 1; |
|
229
|
1
|
|
|
|
|
3
|
return 0; |
|
230
|
|
|
|
|
|
|
} |
|
231
|
19
|
|
|
|
|
49
|
$self->{_parser}->parse($msg->{body}); # fires callbacks (enqueue + bookkeep) |
|
232
|
|
|
|
|
|
|
} |
|
233
|
20
|
100
|
|
|
|
306
|
unless ($msg->{more}) { $self->{_exhausted} = 1; $self->_finish_parser if $self->{_bytes_total} > 0; } |
|
|
13
|
100
|
|
|
|
25
|
|
|
|
13
|
|
|
|
|
37
|
|
|
234
|
20
|
|
|
|
|
132
|
return 1; |
|
235
|
|
|
|
|
|
|
} |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
# Finalize the parser once bytes have been fed and the stream has ended. |
|
238
|
|
|
|
|
|
|
# HTTP::MultiPartParser->finish on a complete stream (closing boundary already |
|
239
|
|
|
|
|
|
|
# parsed) is a clean no-op; called mid-part it signals truncation. The parser |
|
240
|
|
|
|
|
|
|
# routes that end-of-stream condition through on_error (which records into the |
|
241
|
|
|
|
|
|
|
# sticky _failed) rather than dying, so the eval guard is defence-in-depth in |
|
242
|
|
|
|
|
|
|
# case finish ever throws. When finish is what introduces the failure we reword |
|
243
|
|
|
|
|
|
|
# it to a clear "incomplete upload" message; a pre-existing failure (e.g. a |
|
244
|
|
|
|
|
|
|
# size-limit hit) is preserved untouched. |
|
245
|
|
|
|
|
|
|
sub _finish_parser { |
|
246
|
15
|
|
|
15
|
|
49
|
my ($self) = @_; |
|
247
|
15
|
50
|
|
|
|
28
|
return if $self->{_parser_finished}; |
|
248
|
15
|
|
|
|
|
17
|
$self->{_parser_finished} = 1; |
|
249
|
15
|
|
|
|
|
30
|
my $had_failure = defined $self->{_failed}; |
|
250
|
15
|
50
|
0
|
|
|
21
|
eval { $self->{_parser}->finish; 1 } or $self->{_failed} //= "Multipart parse error (finish): $@"; |
|
|
15
|
|
|
|
|
43
|
|
|
|
15
|
|
|
|
|
169
|
|
|
251
|
15
|
100
|
100
|
|
|
54
|
if (!$had_failure && defined $self->{_failed}) { # finish introduced it => truncation |
|
252
|
2
|
|
|
|
|
36
|
$self->{_failed} = "Incomplete multipart upload: client disconnected or stream ended before the closing boundary"; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
15
|
|
|
|
|
24
|
return; |
|
255
|
|
|
|
|
|
|
} |
|
256
|
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=head1 METHODS |
|
258
|
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=head2 next |
|
260
|
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
my $part = await $stream->next; |
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
Returns a Future resolving to the next L, or C when |
|
264
|
|
|
|
|
|
|
the stream is exhausted (end of body). |
|
265
|
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
Advancing past a part whose body you have not fully consumed auto-drains the |
|
267
|
|
|
|
|
|
|
remainder of that part first, so you can always loop on C without |
|
268
|
|
|
|
|
|
|
reading every part. To discard a part deliberately (and signal that intent), |
|
269
|
|
|
|
|
|
|
call C<< $part->skip >>. |
|
270
|
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
Croaks if a size or count limit is breached, or if the upload is truncated |
|
272
|
|
|
|
|
|
|
(see L). |
|
273
|
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
=cut |
|
275
|
|
|
|
|
|
|
|
|
276
|
27
|
|
|
27
|
1
|
805
|
async sub next { |
|
277
|
27
|
|
|
|
|
51
|
my ($self) = @_; |
|
278
|
27
|
100
|
|
|
|
159
|
croak $self->{_failed} if $self->{_failed}; |
|
279
|
26
|
100
|
100
|
|
|
106
|
if ($self->{_current} && !$self->{_current}{_done}) { await $self->{_current}->skip; } # auto-drain |
|
|
7
|
|
|
|
|
16
|
|
|
280
|
26
|
|
|
|
|
160
|
while (1) { |
|
281
|
44
|
100
|
|
|
|
925
|
croak $self->{_failed} if $self->{_failed}; |
|
282
|
41
|
|
66
|
|
|
43
|
shift @{$self->{_queue}} while @{$self->{_queue}} && $self->{_queue}[0][0] eq 'body'; # defensive |
|
|
41
|
|
|
|
|
118
|
|
|
|
0
|
|
|
|
|
0
|
|
|
283
|
41
|
100
|
66
|
|
|
44
|
if (@{$self->{_queue}} && $self->{_queue}[0][0] eq 'part') { |
|
|
41
|
|
|
|
|
86
|
|
|
284
|
18
|
|
|
|
|
19
|
my (undef, $meta) = @{ shift @{$self->{_queue}} }; |
|
|
18
|
|
|
|
|
20
|
|
|
|
18
|
|
|
|
|
33
|
|
|
285
|
18
|
|
|
|
|
57
|
$self->{_current} = PAGI::Request::Part->new(stream => $self, meta => $meta); |
|
286
|
18
|
|
|
|
|
73
|
return $self->{_current}; |
|
287
|
|
|
|
|
|
|
} |
|
288
|
23
|
100
|
|
|
|
47
|
last unless await $self->_pump; |
|
289
|
|
|
|
|
|
|
} |
|
290
|
5
|
100
|
|
|
|
202
|
croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish) |
|
291
|
4
|
|
|
|
|
9
|
return undef; |
|
292
|
|
|
|
|
|
|
} |
|
293
|
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
# Next body chunk for the current part: the chunk, or undef when the part ends. |
|
295
|
28
|
|
|
28
|
|
225
|
async sub _next_chunk { |
|
296
|
28
|
|
|
|
|
35
|
my ($self) = @_; |
|
297
|
28
|
|
|
|
|
29
|
while (1) { |
|
298
|
30
|
100
|
|
|
|
222
|
croak $self->{_failed} if $self->{_failed}; |
|
299
|
29
|
100
|
|
|
|
42
|
if (@{$self->{_queue}}) { |
|
|
29
|
|
|
|
|
48
|
|
|
300
|
20
|
|
|
|
|
31
|
my $kind = $self->{_queue}[0][0]; |
|
301
|
20
|
100
|
|
|
|
54
|
if ($kind eq 'body') { my $ev = shift @{$self->{_queue}}; return $ev->[1]; } |
|
|
13
|
|
|
|
|
14
|
|
|
|
13
|
|
|
|
|
21
|
|
|
|
13
|
|
|
|
|
70
|
|
|
302
|
7
|
50
|
|
|
|
23
|
return undef if $kind eq 'part'; # next part began -> current done |
|
303
|
|
|
|
|
|
|
} |
|
304
|
9
|
100
|
|
|
|
23
|
if (!(await $self->_pump)) { |
|
305
|
7
|
100
|
|
|
|
444
|
croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish) |
|
306
|
5
|
|
|
|
|
14
|
return undef; # clean EOF (complete body, then disconnect) |
|
307
|
|
|
|
|
|
|
} |
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
} |
|
310
|
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
package PAGI::Request::Part; |
|
312
|
4
|
|
|
4
|
|
30
|
use strict; |
|
|
4
|
|
|
|
|
6
|
|
|
|
4
|
|
|
|
|
78
|
|
|
313
|
4
|
|
|
4
|
|
12
|
use warnings; |
|
|
4
|
|
|
|
|
11
|
|
|
|
4
|
|
|
|
|
193
|
|
|
314
|
|
|
|
|
|
|
|
|
315
|
4
|
|
|
4
|
|
18
|
use Future::AsyncAwait; |
|
|
4
|
|
|
|
|
6
|
|
|
|
4
|
|
|
|
|
18
|
|
|
316
|
4
|
|
|
4
|
|
181
|
use Carp qw(croak); |
|
|
4
|
|
|
|
|
5
|
|
|
|
4
|
|
|
|
|
224
|
|
|
317
|
4
|
|
|
4
|
|
19
|
use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_NOFOLLOW); |
|
|
4
|
|
|
|
|
5
|
|
|
|
4
|
|
|
|
|
4887
|
|
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
=head1 NAME |
|
320
|
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
PAGI::Request::Part - A single part of a streaming multipart request |
|
322
|
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
324
|
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
A value object representing one part yielded by |
|
326
|
|
|
|
|
|
|
L. It carries the part's metadata (name, |
|
327
|
|
|
|
|
|
|
filename, headers) and provides the methods that consume the part's body: pull |
|
328
|
|
|
|
|
|
|
it chunk by chunk, buffer it whole, or drain it to a sink of your choosing. |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
A part's body must be consumed before the next part is fetched. Calling |
|
331
|
|
|
|
|
|
|
C<< $stream->next >> while a part is only partially read drains the rest of |
|
332
|
|
|
|
|
|
|
the current part automatically. |
|
333
|
|
|
|
|
|
|
|
|
334
|
|
|
|
|
|
|
=head1 CONSTRUCTOR |
|
335
|
|
|
|
|
|
|
|
|
336
|
|
|
|
|
|
|
=head2 new |
|
337
|
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
my $part = PAGI::Request::Part->new(stream => $stream, meta => \%meta); |
|
339
|
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
Constructs a part bound to its owning stream. Parts are normally created by |
|
341
|
|
|
|
|
|
|
L, not by application code. |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
=head1 METHODS |
|
344
|
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
=head2 name |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
my $name = $part->name; |
|
348
|
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
The part's form field name, taken from its C header. |
|
350
|
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
=head2 filename |
|
352
|
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
my $filename = $part->filename; |
|
354
|
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
The part's filename from C, or C for non-file |
|
356
|
|
|
|
|
|
|
(field) parts. |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
=head2 content_type |
|
359
|
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
my $type = $part->content_type; |
|
361
|
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
The part's C header. Defaults to C if the part sent |
|
363
|
|
|
|
|
|
|
no C. |
|
364
|
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
=head2 encoding |
|
366
|
|
|
|
|
|
|
|
|
367
|
|
|
|
|
|
|
my $encoding = $part->encoding; |
|
368
|
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
The part's C header, or C if not present. |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
=head2 headers |
|
372
|
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
my $headers = $part->headers; |
|
374
|
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
A hashref of all the part's headers, keyed by lower-cased header name. |
|
376
|
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
=head2 is_file |
|
378
|
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
if ($part->is_file) { ... } |
|
380
|
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
True if the part has a filename (i.e. is a file upload), false otherwise. |
|
382
|
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=head2 type |
|
384
|
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
my $type = $part->type; # 'file' or 'field' |
|
386
|
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
Returns the string C<'file'> for file parts and C<'field'> for non-file parts. |
|
388
|
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
=head2 next_chunk |
|
390
|
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
my $chunk = await $part->next_chunk; |
|
392
|
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
Returns this part's next body chunk as raw bytes, or C once the part's |
|
394
|
|
|
|
|
|
|
body is exhausted. This is the low-level primitive; C, C, |
|
395
|
|
|
|
|
|
|
and C are built on it. |
|
396
|
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
=head2 value |
|
398
|
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
my $bytes = await $part->value; |
|
400
|
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
Buffers and returns the part's entire body as B -- no decoding is |
|
402
|
|
|
|
|
|
|
applied, so a text field encoded as UTF-8 (or any other charset) must be |
|
403
|
|
|
|
|
|
|
decoded by the caller. Intended for small field parts; the buffered size is |
|
404
|
|
|
|
|
|
|
bounded by the relevant per-part size limit (C for fields, |
|
405
|
|
|
|
|
|
|
C for files), which croaks if exceeded. |
|
406
|
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
=head2 stream_to |
|
408
|
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
my $count = await $part->stream_to($cb); |
|
410
|
|
|
|
|
|
|
my $count = await $part->stream_to(async sub ($chunk) { await $sink->write($chunk) }); |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
Drains the rest of the part to a sink callback, returning the number of bytes |
|
413
|
|
|
|
|
|
|
processed. The callback is invoked with each chunk of raw bytes and may be |
|
414
|
|
|
|
|
|
|
asynchronous: if it returns a Future, C awaits it before reading |
|
415
|
|
|
|
|
|
|
the next chunk, giving the sink natural backpressure over the network read. |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
If the sink callback throws, the error poisons the stream -- a later |
|
418
|
|
|
|
|
|
|
C<< $stream->next >> will croak -- and the failed part is B auto-drained, |
|
419
|
|
|
|
|
|
|
since the application has signalled it is aborting. The exception is re-thrown |
|
420
|
|
|
|
|
|
|
to the caller. |
|
421
|
|
|
|
|
|
|
|
|
422
|
|
|
|
|
|
|
=head2 stream_to_file |
|
423
|
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
my $count = await $part->stream_to_file($path); |
|
425
|
|
|
|
|
|
|
|
|
426
|
|
|
|
|
|
|
Writes the part's body to a B file at C<$path>, returning the number of |
|
427
|
|
|
|
|
|
|
bytes written. The file is opened with C: the call |
|
428
|
|
|
|
|
|
|
B, croaking |
|
429
|
|
|
|
|
|
|
instead. The result of C is checked. On any error -- a write failure, a |
|
430
|
|
|
|
|
|
|
limit breach, a truncated upload, or a failed C -- the partially |
|
431
|
|
|
|
|
|
|
written file is unlinked before the method croaks. |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
=head2 skip |
|
434
|
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
await $part->skip; |
|
436
|
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
Drains and discards any remaining body of this part. Use this to deliberately |
|
438
|
|
|
|
|
|
|
ignore a part; C<< $stream->next >> would otherwise drain it for you anyway. |
|
439
|
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
=head1 LIMITS AND ERRORS |
|
441
|
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
Size and count limits are enforced as bytes arrive from the network, before |
|
443
|
|
|
|
|
|
|
your sink ever sees them, so an oversized part cannot stream partway into your |
|
444
|
|
|
|
|
|
|
sink before being rejected. A per-part size overflow names the offending part |
|
445
|
|
|
|
|
|
|
in its error message (for example, C<< File part 'avatar' too large >>). |
|
446
|
|
|
|
|
|
|
Exceeding C, C, or C likewise causes |
|
447
|
|
|
|
|
|
|
the next C/consume call to croak. |
|
448
|
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
A client disconnect that truncates a part mid-stream croaks with an |
|
450
|
|
|
|
|
|
|
C<"Incomplete multipart upload"> message (the closing boundary was never |
|
451
|
|
|
|
|
|
|
seen). By contrast, a complete body -- or an entirely empty one -- followed by |
|
452
|
|
|
|
|
|
|
a disconnect ends cleanly, with C simply returning C. |
|
453
|
|
|
|
|
|
|
|
|
454
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
455
|
|
|
|
|
|
|
|
|
456
|
|
|
|
|
|
|
L, L |
|
457
|
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
=cut |
|
459
|
|
|
|
|
|
|
|
|
460
|
18
|
|
|
18
|
|
54
|
sub new { my ($c,%a)=@_; bless { stream=>$a{stream}, meta=>$a{meta}, _done=>0 }, $c } |
|
|
18
|
|
|
|
|
78
|
|
|
461
|
1
|
|
|
1
|
|
4
|
sub name { $_[0]{meta}{name} } |
|
462
|
3
|
|
|
3
|
|
46
|
sub filename { $_[0]{meta}{filename} } |
|
463
|
1
|
|
|
1
|
|
3
|
sub content_type { $_[0]{meta}{content_type} } |
|
464
|
0
|
|
|
0
|
|
0
|
sub encoding { $_[0]{meta}{encoding} } |
|
465
|
0
|
|
|
0
|
|
0
|
sub headers { $_[0]{meta}{headers} } |
|
466
|
4
|
100
|
|
4
|
|
87
|
sub is_file { defined $_[0]{meta}{filename} ? 1 : 0 } |
|
467
|
2
|
100
|
|
2
|
|
47
|
sub type { $_[0]->is_file ? 'file' : 'field' } |
|
468
|
7
|
|
|
7
|
|
8
|
async sub skip { my $s=shift; 1 while defined(await $s->{stream}->_next_chunk); $s->{_done}=1; return; } |
|
|
7
|
|
|
|
|
10
|
|
|
|
7
|
|
|
|
|
16
|
|
|
|
7
|
|
|
|
|
184
|
|
|
|
7
|
|
|
|
|
15
|
|
|
469
|
|
|
|
|
|
|
|
|
470
|
14
|
|
|
14
|
|
29
|
async sub next_chunk { |
|
471
|
14
|
|
|
|
|
19
|
my ($self) = @_; |
|
472
|
14
|
50
|
|
|
|
31
|
return undef if $self->{_done}; |
|
473
|
14
|
|
|
|
|
34
|
my $chunk = await $self->{stream}->_next_chunk; |
|
474
|
11
|
100
|
|
|
|
268
|
$self->{_done} = 1 unless defined $chunk; |
|
475
|
11
|
|
|
|
|
26
|
return $chunk; |
|
476
|
|
|
|
|
|
|
} |
|
477
|
|
|
|
|
|
|
|
|
478
|
3
|
|
|
3
|
|
49
|
async sub value { # buffer the whole part (small fields). RAW BYTES. |
|
479
|
3
|
|
|
|
|
5
|
my ($self) = @_; |
|
480
|
3
|
|
|
|
|
6
|
my $buf = ''; |
|
481
|
3
|
|
|
|
|
7
|
while (defined(my $c = await $self->next_chunk)) { $buf .= $c } |
|
|
3
|
|
|
|
|
141
|
|
|
482
|
3
|
|
|
|
|
62
|
return $buf; |
|
483
|
|
|
|
|
|
|
} |
|
484
|
|
|
|
|
|
|
|
|
485
|
2
|
|
|
2
|
|
82
|
async sub stream_to { # drain to a (possibly async) sink callback |
|
486
|
2
|
|
|
|
|
5
|
my ($self, $cb) = @_; |
|
487
|
2
|
50
|
|
|
|
6
|
croak "callback is required" unless $cb; |
|
488
|
2
|
|
|
|
|
3
|
my $n = 0; |
|
489
|
2
|
|
|
|
|
3
|
my $ok = eval { |
|
490
|
2
|
|
|
|
|
5
|
while (defined(my $c = await $self->next_chunk)) { |
|
491
|
2
|
|
|
|
|
43
|
my $r = $cb->($c); |
|
492
|
1
|
50
|
33
|
|
|
25
|
await $r if ref $r && $r->can('get'); # allow an async sink (returns a Future) |
|
493
|
1
|
|
|
|
|
12
|
$n += length $c; |
|
494
|
|
|
|
|
|
|
} |
|
495
|
1
|
|
|
|
|
22
|
1; |
|
496
|
|
|
|
|
|
|
}; |
|
497
|
2
|
100
|
|
|
|
10
|
if (!$ok) { |
|
498
|
1
|
|
|
|
|
1
|
my $err = $@; |
|
499
|
|
|
|
|
|
|
# poison the stream so a later ->next croaks; do NOT auto-drain (the app aborted) |
|
500
|
1
|
|
33
|
|
|
8
|
$self->{stream}{_failed} //= "sink error: $err"; |
|
501
|
1
|
|
|
|
|
4
|
die $err; |
|
502
|
|
|
|
|
|
|
} |
|
503
|
1
|
|
|
|
|
3
|
return $n; |
|
504
|
|
|
|
|
|
|
} |
|
505
|
|
|
|
|
|
|
|
|
506
|
4
|
|
|
4
|
|
305
|
async sub stream_to_file { |
|
507
|
4
|
|
|
|
|
9
|
my ($self, $path) = @_; |
|
508
|
4
|
50
|
|
|
|
43
|
croak "path is required" unless defined $path; |
|
509
|
4
|
100
|
|
|
|
887
|
sysopen(my $fh, $path, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0600) |
|
510
|
|
|
|
|
|
|
or croak "Cannot create $path: $!"; |
|
511
|
3
|
|
|
|
|
14
|
binmode $fh; |
|
512
|
3
|
|
|
|
|
6
|
my $written = 0; |
|
513
|
3
|
|
|
|
|
5
|
my $ok = eval { |
|
514
|
3
|
|
|
|
|
12
|
while (defined(my $c = await $self->next_chunk)) { |
|
515
|
1
|
50
|
|
|
|
38
|
print $fh $c or die "write to $path failed: $!\n"; |
|
516
|
1
|
|
|
|
|
4
|
$written += length $c; |
|
517
|
|
|
|
|
|
|
} |
|
518
|
1
|
|
|
|
|
21
|
1; |
|
519
|
|
|
|
|
|
|
}; |
|
520
|
3
|
|
|
|
|
193
|
my $err = $@; |
|
521
|
3
|
|
|
|
|
75
|
my $close_ok = close $fh; |
|
522
|
3
|
100
|
|
|
|
12
|
if (!$ok) { unlink $path; croak $err; } # write/limit/disconnect error wins |
|
|
2
|
|
|
|
|
191
|
|
|
|
2
|
|
|
|
|
238
|
|
|
523
|
1
|
50
|
|
|
|
3
|
unless ($close_ok) { unlink $path; croak "Cannot close $path: $!"; } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
524
|
1
|
|
|
|
|
7
|
return $written; |
|
525
|
|
|
|
|
|
|
} |
|
526
|
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
1; |