File Coverage

blib/lib/PAGI/Request/MultipartStream.pm
Criterion Covered Total %
statement 195 201 97.0
branch 82 94 87.2
condition 33 60 55.0
subroutine 32 34 94.1
pod 2 2 100.0
total 344 391 87.9


line stmt bran cond sub pod time code
1             package PAGI::Request::MultipartStream;
2             $PAGI::Request::MultipartStream::VERSION = '0.002001';
3 4     4   522999 use strict;
  4         7  
  4         159  
4 4     4   19 use warnings;
  4         9  
  4         196  
5              
6 4     4   1218 use Future::AsyncAwait;
  4         9413  
  4         22  
7 4     4   278 use Carp qw(croak);
  4         9  
  4         275  
8 4     4   1141 use HTTP::MultiPartParser;
  4         6879  
  4         10820  
9              
10             =head1 NAME
11              
12             PAGI::Request::MultipartStream - Pull-based streaming multipart/form-data engine
13              
14             =head1 SYNOPSIS
15              
16             use PAGI::Request::MultipartStream;
17             use Future::AsyncAwait;
18              
19             # Usually obtained via $req->multipart_stream, not constructed directly:
20             my $stream = $req->multipart_stream;
21              
22             while (defined(my $part = await $stream->next)) {
23             if ($part->is_file) {
24             await $part->stream_to_file($path);
25             }
26             else {
27             my $value = await $part->value; # raw bytes; you decode
28             }
29             }
30              
31             =head1 DESCRIPTION
32              
33             A pull-based streaming parser for C request bodies. Each
34             part of the body is exposed in turn as a L via C,
35             and B: you choose its sink (a
36             file, an object store, an async transform) per part, rather than accepting the
37             buffered, spool-each-upload-to-a-temp-file behaviour of C and
38             C in L.
39              
40             Because you own the sink, it can be fully asynchronous: C awaits a
41             sink that returns a Future, so a slow downstream naturally backpressures the
42             read. This is what the buffered multipart path cannot offer -- its spool to a
43             temp file is blocking.
44              
45             Internally this drives L on demand, bridging its
46             push-based callbacks onto an internal event queue that C and the part
47             methods consume.
48              
49             B An HTTP request body can
50             only be consumed once. Once you create a multipart stream you cannot also call
51             C/C/C/C/C, and a stream cannot be
52             created if the body was already read; see L.
53              
54             =cut
55              
56             our $MAX_FILES = 1000;
57             our $MAX_FIELDS = 1000;
58             our $MAX_FIELD_SIZE = 1024 * 1024; # buffered per-field cap
59             our $MAX_FILE_SIZE = 100 * 1024 * 1024;
60             our $MAX_REQUEST_BODY = 1024 * 1024 * 1024; # defense-in-depth; server max_body_size is primary
61              
62             =head1 CONSTRUCTOR
63              
64             =head2 new
65              
66             my $stream = PAGI::Request::MultipartStream->new(
67             receive => $receive, # required: PAGI receive callback
68             boundary => $boundary, # required: multipart boundary
69             max_files => 1000, # optional limits (defaults shown)
70             max_fields => 1000,
71             max_field_size => 1024 * 1024,
72             max_file_size => 100 * 1024 * 1024,
73             max_request_body => 1024 * 1024 * 1024,
74             );
75              
76             Creates a new streaming multipart engine. Most applications do not call this
77             directly -- they obtain a ready-built stream from
78             L, which extracts the boundary from the
79             request's C and passes through the same limit options.
80              
81             C and C are required. The remaining options cap the body to
82             bound memory and resource use:
83              
84             =over 4
85              
86             =item * C - Maximum number of file parts. Default: 1000.
87              
88             =item * C - Maximum number of non-file (field) parts. Default: 1000.
89              
90             =item * C - Maximum size, in bytes, of any single field part.
91             Default: 1 MiB (1024 * 1024).
92              
93             =item * C - Maximum size, in bytes, of any single file part.
94             Default: 100 MiB (100 * 1024 * 1024).
95              
96             =item * C - Maximum total bytes read from the request body.
97             Default: 1 GiB (1024 * 1024 * 1024). This is a per-stream defence-in-depth
98             cap; the PAGI server's C is the primary aggregate limit on the
99             request body.
100              
101             =back
102              
103             =cut
104              
105             sub new {
106 21     21 1 623447 my ($class, %args) = @_;
107 21 50       223 croak "receive is required" unless $args{receive};
108 21 100 66     223 croak "boundary is required" unless defined $args{boundary} && length $args{boundary};
109             my $self = bless {
110             receive => $args{receive},
111             boundary => $args{boundary},
112             max_files => $args{max_files} // $MAX_FILES,
113             max_fields => $args{max_fields} // $MAX_FIELDS,
114             max_field_size => $args{max_field_size} // $MAX_FIELD_SIZE,
115             max_file_size => $args{max_file_size} // $MAX_FILE_SIZE,
116 20   33     383 max_request_body => $args{max_request_body} // $MAX_REQUEST_BODY,
      66        
      66        
      66        
      66        
117             _queue => [], # FIFO: ['part',\%meta] | ['body',$chunk]
118             _file_count => 0,
119             _field_count => 0,
120             _bytes_total => 0,
121             _cur_is_file => 0,
122             _cur_bytes => 0,
123             _cur_name => undef,
124             _current => undef, # current Part
125             _exhausted => 0,
126             _parser_finished => 0, # guard: finish() is called at most once
127             _failed => undef, # sticky failure message (poisons the stream)
128             }, $class;
129 20         54 $self->{_parser} = $self->_build_parser;
130 20         1517 return $self;
131             }
132              
133             # Parse the on_header arrayref of header lines into
134             # {name,filename,content_type,encoding,headers}. is_file := defined(filename).
135             sub _disposition {
136 24     24   28 my ($lines) = @_;
137              
138             # $lines is an arrayref of raw header lines, e.g.
139             # 'Content-Disposition: form-data; name="x"'
140 24         29 my %headers;
141 24         35 for my $line (@$lines) {
142 35 50       110 if ($line =~ /^([^:]+):\s*(.*)$/) {
143 35         116 $headers{lc($1)} = $2;
144             }
145             }
146              
147 24         33 my $disposition = _parse_content_disposition(\%headers);
148              
149             return {
150             name => $disposition->{name},
151             filename => $disposition->{filename},
152             content_type => $headers{'content-type'} // 'text/plain',
153 24   100     153 encoding => $headers{'content-transfer-encoding'},
154             headers => \%headers,
155             };
156             }
157              
158             sub _parse_content_disposition {
159 24     24   31 my ($headers) = @_;
160 24   50     48 my $cd = $headers->{'content-disposition'} // '';
161              
162 24         26 my %result;
163              
164             # Parse name="value" pairs
165 24         106 while ($cd =~ /(\w+)="([^"]*)"/g) {
166 37         113 $result{$1} = $2;
167             }
168             # Also handle unquoted values
169 24         99 while ($cd =~ /(\w+)=([^;\s"]+)/g) {
170 0   0     0 $result{$1} //= $2;
171             }
172              
173 24         50 return \%result;
174             }
175              
176             sub _build_parser {
177 20     20   31 my ($self) = @_;
178             return HTTP::MultiPartParser->new(
179             boundary => $self->{boundary},
180             on_header => sub {
181 26     26   793 my ($headers) = @_;
182 26 100       51 return if $self->{_failed};
183 24         57 my $meta = _disposition($headers);
184 24 100       59 my $is_file = defined $meta->{filename} ? 1 : 0;
185 24 100       41 if ($is_file) {
186             $self->{_failed} = "Too many file parts (max $self->{max_files})"
187 13 50       32 if ++$self->{_file_count} > $self->{max_files};
188             } else {
189             $self->{_failed} = "Too many field parts (max $self->{max_fields})"
190 11 100       25 if ++$self->{_field_count} > $self->{max_fields};
191             }
192 24 100       43 return if $self->{_failed};
193 23         39 $self->{_cur_is_file} = $is_file;
194 23         42 $self->{_cur_bytes} = 0;
195 23         33 $self->{_cur_name} = $meta->{name};
196 23         27 push @{$self->{_queue}}, ['part', $meta];
  23         94  
197             },
198             on_body => sub {
199 24     24   295 my ($chunk, $final) = @_;
200 24 100       50 return if $self->{_failed};
201 21         26 $self->{_cur_bytes} += length $chunk;
202 21 100       37 my $max = $self->{_cur_is_file} ? $self->{max_file_size} : $self->{max_field_size};
203 21 100       36 if ($self->{_cur_bytes} > $max) {
204             $self->{_failed} = sprintf("%s part '%s' too large (max %d bytes)",
205 3 100 50     21 ($self->{_cur_is_file} ? 'File' : 'Field'), ($self->{_cur_name} // ''), $max);
206 3         7 return; # stop enqueuing -- bounds the queue
207             }
208 18         22 push @{$self->{_queue}}, ['body', $chunk];
  18         45  
209             },
210 2   33 2   45 on_error => sub { $self->{_failed} //= "Multipart parse error: $_[0]" },
211 20         207 );
212             }
213              
214             # Feed exactly one network message. Returns true if it fed data, false at exhaustion.
215 32     32   41 async sub _pump {
216 32         39 my ($self) = @_;
217 32 100       106 return 0 if $self->{_exhausted};
218 25         54 my $msg = await $self->{receive}->();
219 25 100 33     1265 if (!$msg || !$msg->{type} || $msg->{type} eq 'http.disconnect') {
      66        
220 4         7 $self->{_exhausted} = 1;
221 4 100       12 $self->_finish_parser if $self->{_bytes_total} > 0; # 0 bytes => empty stream, clean EOF
222 4         15 return 0;
223             }
224 21 100 66     70 if (defined $msg->{body} && length $msg->{body}) {
225 20         31 $self->{_bytes_total} += length $msg->{body};
226 20 100       42 if ($self->{_bytes_total} > $self->{max_request_body}) {
227 1         2 $self->{_failed} = "Request body exceeded max_request_body ($self->{max_request_body} bytes)";
228 1         2 $self->{_exhausted} = 1;
229 1         3 return 0;
230             }
231 19         55 $self->{_parser}->parse($msg->{body}); # fires callbacks (enqueue + bookkeep)
232             }
233 20 100       304 unless ($msg->{more}) { $self->{_exhausted} = 1; $self->_finish_parser if $self->{_bytes_total} > 0; }
  13 100       20  
  13         63  
234 20         120 return 1;
235             }
236              
237             # Finalize the parser once bytes have been fed and the stream has ended.
238             # HTTP::MultiPartParser->finish on a complete stream (closing boundary already
239             # parsed) is a clean no-op; called mid-part it signals truncation. The parser
240             # routes that end-of-stream condition through on_error (which records into the
241             # sticky _failed) rather than dying, so the eval guard is defence-in-depth in
242             # case finish ever throws. When finish is what introduces the failure we reword
243             # it to a clear "incomplete upload" message; a pre-existing failure (e.g. a
244             # size-limit hit) is preserved untouched.
245             sub _finish_parser {
246 15     15   66 my ($self) = @_;
247 15 50       27 return if $self->{_parser_finished};
248 15         20 $self->{_parser_finished} = 1;
249 15         26 my $had_failure = defined $self->{_failed};
250 15 50 0     16 eval { $self->{_parser}->finish; 1 } or $self->{_failed} //= "Multipart parse error (finish): $@";
  15         39  
  15         171  
251 15 100 100     68 if (!$had_failure && defined $self->{_failed}) { # finish introduced it => truncation
252 2         24 $self->{_failed} = "Incomplete multipart upload: client disconnected or stream ended before the closing boundary";
253             }
254 15         23 return;
255             }
256              
257             =head1 METHODS
258              
259             =head2 next
260              
261             my $part = await $stream->next;
262              
263             Returns a Future resolving to the next L, or C when
264             the stream is exhausted (end of body).
265              
266             Advancing past a part whose body you have not fully consumed auto-drains the
267             remainder of that part first, so you can always loop on C without
268             reading every part. To discard a part deliberately (and signal that intent),
269             call C<< $part->skip >>.
270              
271             Croaks if a size or count limit is breached, or if the upload is truncated
272             (see L).
273              
274             =cut
275              
276 27     27 1 752 async sub next {
277 27         38 my ($self) = @_;
278 27 100       182 croak $self->{_failed} if $self->{_failed};
279 26 100 100     71 if ($self->{_current} && !$self->{_current}{_done}) { await $self->{_current}->skip; } # auto-drain
  7         14  
280 26         155 while (1) {
281 44 100       950 croak $self->{_failed} if $self->{_failed};
282 41   66     57 shift @{$self->{_queue}} while @{$self->{_queue}} && $self->{_queue}[0][0] eq 'body'; # defensive
  41         116  
  0         0  
283 41 100 66     55 if (@{$self->{_queue}} && $self->{_queue}[0][0] eq 'part') {
  41         112  
284 18         21 my (undef, $meta) = @{ shift @{$self->{_queue}} };
  18         17  
  18         39  
285 18         53 $self->{_current} = PAGI::Request::Part->new(stream => $self, meta => $meta);
286 18         56 return $self->{_current};
287             }
288 23 100       43 last unless await $self->_pump;
289             }
290 5 100       253 croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish)
291 4         9 return undef;
292             }
293              
294             # Next body chunk for the current part: the chunk, or undef when the part ends.
295 28     28   198 async sub _next_chunk {
296 28         34 my ($self) = @_;
297 28         33 while (1) {
298 30 100       224 croak $self->{_failed} if $self->{_failed};
299 29 100       30 if (@{$self->{_queue}}) {
  29         52  
300 20         63 my $kind = $self->{_queue}[0][0];
301 20 100       33 if ($kind eq 'body') { my $ev = shift @{$self->{_queue}}; return $ev->[1]; }
  13         32  
  13         21  
  13         66  
302 7 50       24 return undef if $kind eq 'part'; # next part began -> current done
303             }
304 9 100       21 if (!(await $self->_pump)) {
305 7 100       455 croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish)
306 5         16 return undef; # clean EOF (complete body, then disconnect)
307             }
308             }
309             }
310              
311             package PAGI::Request::Part;
312 4     4   32 use strict;
  4         6  
  4         90  
313 4     4   13 use warnings;
  4         6  
  4         181  
314              
315 4     4   16 use Future::AsyncAwait;
  4         7  
  4         22  
316 4     4   208 use Carp qw(croak);
  4         5  
  4         206  
317 4     4   16 use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_NOFOLLOW);
  4         12  
  4         4785  
318              
319             =head1 NAME
320              
321             PAGI::Request::Part - A single part of a streaming multipart request
322              
323             =head1 DESCRIPTION
324              
325             A value object representing one part yielded by
326             L. It carries the part's metadata (name,
327             filename, headers) and provides the methods that consume the part's body: pull
328             it chunk by chunk, buffer it whole, or drain it to a sink of your choosing.
329              
330             A part's body must be consumed before the next part is fetched. Calling
331             C<< $stream->next >> while a part is only partially read drains the rest of
332             the current part automatically.
333              
334             =head1 CONSTRUCTOR
335              
336             =head2 new
337              
338             my $part = PAGI::Request::Part->new(stream => $stream, meta => \%meta);
339              
340             Constructs a part bound to its owning stream. Parts are normally created by
341             L, not by application code.
342              
343             =head1 METHODS
344              
345             =head2 name
346              
347             my $name = $part->name;
348              
349             The part's form field name, taken from its C header.
350              
351             =head2 filename
352              
353             my $filename = $part->filename;
354              
355             The part's filename from C, or C for non-file
356             (field) parts.
357              
358             =head2 content_type
359              
360             my $type = $part->content_type;
361              
362             The part's C header. Defaults to C if the part sent
363             no C.
364              
365             =head2 encoding
366              
367             my $encoding = $part->encoding;
368              
369             The part's C header, or C if not present.
370              
371             =head2 headers
372              
373             my $headers = $part->headers;
374              
375             A hashref of all the part's headers, keyed by lower-cased header name.
376              
377             =head2 is_file
378              
379             if ($part->is_file) { ... }
380              
381             True if the part has a filename (i.e. is a file upload), false otherwise.
382              
383             =head2 type
384              
385             my $type = $part->type; # 'file' or 'field'
386              
387             Returns the string C<'file'> for file parts and C<'field'> for non-file parts.
388              
389             =head2 next_chunk
390              
391             my $chunk = await $part->next_chunk;
392              
393             Returns this part's next body chunk as raw bytes, or C once the part's
394             body is exhausted. This is the low-level primitive; C, C,
395             and C are built on it.
396              
397             =head2 value
398              
399             my $bytes = await $part->value;
400              
401             Buffers and returns the part's entire body as B -- no decoding is
402             applied, so a text field encoded as UTF-8 (or any other charset) must be
403             decoded by the caller. Intended for small field parts; the buffered size is
404             bounded by the relevant per-part size limit (C for fields,
405             C for files), which croaks if exceeded.
406              
407             =head2 stream_to
408              
409             my $count = await $part->stream_to($cb);
410             my $count = await $part->stream_to(async sub ($chunk) { await $sink->write($chunk) });
411              
412             Drains the rest of the part to a sink callback, returning the number of bytes
413             processed. The callback is invoked with each chunk of raw bytes and may be
414             asynchronous: if it returns a Future, C awaits it before reading
415             the next chunk, giving the sink natural backpressure over the network read.
416              
417             If the sink callback throws, the error poisons the stream -- a later
418             C<< $stream->next >> will croak -- and the failed part is B auto-drained,
419             since the application has signalled it is aborting. The exception is re-thrown
420             to the caller.
421              
422             =head2 stream_to_file
423              
424             my $count = await $part->stream_to_file($path);
425              
426             Writes the part's body to a B file at C<$path>, returning the number of
427             bytes written. The file is opened with C: the call
428             B, croaking
429             instead. The result of C is checked. On any error -- a write failure, a
430             limit breach, a truncated upload, or a failed C -- the partially
431             written file is unlinked before the method croaks.
432              
433             =head2 skip
434              
435             await $part->skip;
436              
437             Drains and discards any remaining body of this part. Use this to deliberately
438             ignore a part; C<< $stream->next >> would otherwise drain it for you anyway.
439              
440             =head1 LIMITS AND ERRORS
441              
442             Size and count limits are enforced as bytes arrive from the network, before
443             your sink ever sees them, so an oversized part cannot stream partway into your
444             sink before being rejected. A per-part size overflow names the offending part
445             in its error message (for example, C<< File part 'avatar' too large >>).
446             Exceeding C, C, or C likewise causes
447             the next C/consume call to croak.
448              
449             A client disconnect that truncates a part mid-stream croaks with an
450             C<"Incomplete multipart upload"> message (the closing boundary was never
451             seen). By contrast, a complete body -- or an entirely empty one -- followed by
452             a disconnect ends cleanly, with C simply returning C.
453              
454             =head1 SEE ALSO
455              
456             L, L
457              
458             =cut
459              
460 18     18   42 sub new { my ($c,%a)=@_; bless { stream=>$a{stream}, meta=>$a{meta}, _done=>0 }, $c }
  18         62  
461 1     1   4 sub name { $_[0]{meta}{name} }
462 3     3   82 sub filename { $_[0]{meta}{filename} }
463 1     1   4 sub content_type { $_[0]{meta}{content_type} }
464 0     0   0 sub encoding { $_[0]{meta}{encoding} }
465 0     0   0 sub headers { $_[0]{meta}{headers} }
466 4 100   4   74 sub is_file { defined $_[0]{meta}{filename} ? 1 : 0 }
467 2 100   2   49 sub type { $_[0]->is_file ? 'file' : 'field' }
468 7     7   8 async sub skip { my $s=shift; 1 while defined(await $s->{stream}->_next_chunk); $s->{_done}=1; return; }
  7         8  
  7         15  
  7         146  
  7         16  
469              
470 14     14   29 async sub next_chunk {
471 14         24 my ($self) = @_;
472 14 50       32 return undef if $self->{_done};
473 14         33 my $chunk = await $self->{stream}->_next_chunk;
474 11 100       290 $self->{_done} = 1 unless defined $chunk;
475 11         29 return $chunk;
476             }
477              
478 3     3   50 async sub value { # buffer the whole part (small fields). RAW BYTES.
479 3         7 my ($self) = @_;
480 3         7 my $buf = '';
481 3         10 while (defined(my $c = await $self->next_chunk)) { $buf .= $c }
  3         137  
482 3         101 return $buf;
483             }
484              
485 2     2   54 async sub stream_to { # drain to a (possibly async) sink callback
486 2         4 my ($self, $cb) = @_;
487 2 50       5 croak "callback is required" unless $cb;
488 2         2 my $n = 0;
489 2         3 my $ok = eval {
490 2         3 while (defined(my $c = await $self->next_chunk)) {
491 2         40 my $r = $cb->($c);
492 1 50 33     24 await $r if ref $r && $r->can('get'); # allow an async sink (returns a Future)
493 1         12 $n += length $c;
494             }
495 1         20 1;
496             };
497 2 100       11 if (!$ok) {
498 1         2 my $err = $@;
499             # poison the stream so a later ->next croaks; do NOT auto-drain (the app aborted)
500 1   33     6 $self->{stream}{_failed} //= "sink error: $err";
501 1         4 die $err;
502             }
503 1         2 return $n;
504             }
505              
506 4     4   280 async sub stream_to_file {
507 4         8 my ($self, $path) = @_;
508 4 50       11 croak "path is required" unless defined $path;
509 4 100       847 sysopen(my $fh, $path, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0600)
510             or croak "Cannot create $path: $!";
511 3         14 binmode $fh;
512 3         6 my $written = 0;
513 3         5 my $ok = eval {
514 3         11 while (defined(my $c = await $self->next_chunk)) {
515 1 50       35 print $fh $c or die "write to $path failed: $!\n";
516 1         4 $written += length $c;
517             }
518 1         21 1;
519             };
520 3         131 my $err = $@;
521 3         73 my $close_ok = close $fh;
522 3 100       9 if (!$ok) { unlink $path; croak $err; } # write/limit/disconnect error wins
  2         176  
  2         243  
523 1 50       3 unless ($close_ok) { unlink $path; croak "Cannot close $path: $!"; }
  0         0  
  0         0  
524 1         6 return $written;
525             }
526              
527             1;