File Coverage

blib/lib/PAGI/Request/BodyStream.pm
Criterion Covered Total %
statement 93 102 91.1
branch 35 56 62.5
condition 16 28 57.1
subroutine 12 13 92.3
pod 7 7 100.0
total 163 206 79.1


line stmt bran cond sub pod time code
1             package PAGI::Request::BodyStream;
2              
3 22     22   223251 use strict;
  22         82  
  22         729  
4 22     22   71 use warnings;
  22         30  
  22         913  
5              
6 22     22   83 use Future::AsyncAwait;
  22         28  
  22         171  
7 22     22   1579 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  22         17641  
  22         1490  
8 22     22   95 use Carp qw(croak);
  22         26  
  22         34703  
9              
10              
11             =head1 NAME
12              
13             PAGI::Request::BodyStream - Streaming body consumption for PAGI requests
14              
15             =head1 SYNOPSIS
16              
17             use PAGI::Request::BodyStream;
18             use Future::AsyncAwait;
19              
20             # Basic streaming
21             my $stream = PAGI::Request::BodyStream->new(receive => $receive);
22              
23             while (!$stream->is_done) {
24             my $chunk = await $stream->next_chunk;
25             last unless defined $chunk;
26             print "Got chunk: ", length($chunk), " bytes\n";
27             }
28              
29             # With size limit
30             my $stream = PAGI::Request::BodyStream->new(
31             receive => $receive,
32             max_bytes => 1024 * 1024, # 1MB limit
33             );
34              
35             # With UTF-8 decoding
36             my $stream = PAGI::Request::BodyStream->new(
37             receive => $receive,
38             decode => 'UTF-8',
39             );
40              
41             # Stream to file
42             await $stream->stream_to_file('/tmp/upload.dat');
43              
44             # Stream to custom sink
45             await $stream->stream_to(async sub ($chunk) {
46             # Process chunk
47             print STDERR "Processing: ", length($chunk), " bytes\n";
48             });
49              
50             =head1 DESCRIPTION
51              
52             PAGI::Request::BodyStream provides streaming body consumption for large request
53             bodies. This is useful when you need to process request data incrementally
54             without loading the entire body into memory.
55              
56             The stream is pull-based: you call C to receive the next chunk
57             of data. The stream handles:
58              
59             =over 4
60              
61             =item * Size limits with customizable error messages
62              
63             =item * UTF-8 decoding with proper handling of incomplete sequences at chunk boundaries
64              
65             =item * Client disconnect detection
66              
67             =item * Convenient file streaming with C
68              
69             =back
70              
71             B: Streaming is mutually exclusive with buffered body methods like
72             C, C, C in L. Once you start streaming,
73             you cannot use those methods.
74              
75             =head1 CONSTRUCTOR
76              
77             =head2 new
78              
79             my $stream = PAGI::Request::BodyStream->new(
80             receive => $receive, # Required: PAGI receive callback
81             max_bytes => 10485760, # Optional: max body size
82             decode => 'UTF-8', # Optional: decode to UTF-8
83             strict => 1, # Optional: strict UTF-8 (croak on invalid)
84             limit_name => 'body_size', # Optional: name for limit error message
85             );
86              
87             Creates a new body stream.
88              
89             =over 4
90              
91             =item * C - Required. The PAGI receive callback.
92              
93             =item * C - Optional. Maximum bytes to read. Throws error if exceeded.
94              
95             =item * C - Optional. Encoding to decode chunks to (typically 'UTF-8').
96              
97             =item * C - Optional. If true, throw on invalid UTF-8. If false (default),
98             use replacement characters.
99              
100             =item * C - Optional. Name to use in error message when max_bytes
101             is exceeded (default: 'max_bytes').
102              
103             =back
104              
105             =cut
106              
107             sub new {
108 15     15 1 186841 my ($class, %args) = @_;
109 15   66     125 my $receive = $args{receive} // croak("receive is required");
110              
111             my $self = bless {
112             receive => $receive,
113             max_bytes => $args{max_bytes},
114             decode => $args{decode},
115             strict => $args{strict} // 0,
116 14   100     233 limit_name => $args{limit_name} // 'max_bytes',
      100        
117             _bytes_read => 0,
118             _done => 0,
119             _error => undef,
120             _buffer => '', # For incomplete UTF-8 sequences
121             }, $class;
122              
123 14         36 return $self;
124             }
125              
126             =head1 METHODS
127              
128             =head2 next_chunk
129              
130             my $chunk = await $stream->next_chunk;
131              
132             Returns a Future that resolves to the next chunk of data, or undef when the
133             stream is exhausted or client disconnects.
134              
135             If C was specified in the constructor, chunks are decoded to the
136             specified encoding. UTF-8 decoding properly handles incomplete multi-byte
137             sequences at chunk boundaries.
138              
139             Throws an exception if C is exceeded.
140              
141             =cut
142              
143 20     20 1 1854 async sub next_chunk {
144 20         26 my ($self) = @_;
145 20 50       58 return undef if $self->{_done};
146 20 50       39 return undef if $self->{_error};
147              
148 20         36 my $message = await $self->{receive}->();
149              
150             # Handle disconnect
151 20 100 66     777 if (!$message || $message->{type} eq 'http.disconnect') {
152 1         3 $self->{_done} = 1;
153             # Flush any remaining buffered data from incomplete UTF-8 sequences
154 1 50 33     33 if ($self->{decode} && length($self->{_buffer})) {
155 0         0 my $final = $self->_decode_chunk('', 1); # flush=1
156 0 0       0 return $final if length($final);
157             }
158 1         9 return undef;
159             }
160              
161             # Extract body chunk
162 19   50     40 my $chunk = $message->{body} // '';
163 19   50     35 my $more = $message->{more} // 0;
164              
165             # Check size limit before processing
166 19 100       36 if (defined $self->{max_bytes}) {
167 2         5 my $new_total = $self->{_bytes_read} + length($chunk);
168 2 50       6 if ($new_total > $self->{max_bytes}) {
169 2         5 $self->{_error} = "Request body $self->{limit_name} exceeded";
170 2         4 $self->{_done} = 1;
171 2         280 croak($self->{_error});
172             }
173             }
174              
175 17         25 $self->{_bytes_read} += length($chunk);
176              
177             # Mark done if no more chunks
178 17 100       32 $self->{_done} = 1 unless $more;
179              
180             # Decode if requested
181 17 100       33 if ($self->{decode}) {
182 4         32 $chunk = $self->_decode_chunk($chunk, !$more);
183             }
184              
185 16         72 return $chunk;
186             }
187              
188             =head2 bytes_read
189              
190             my $total = $stream->bytes_read;
191              
192             Returns the total number of raw bytes read so far (before any decoding).
193              
194             =cut
195              
196             sub bytes_read {
197 5     5 1 51 my ($self) = @_;
198 5         18 return $self->{_bytes_read};
199             }
200              
201             =head2 is_done
202              
203             if ($stream->is_done) { ... }
204              
205             Returns true if the stream has been exhausted (no more chunks available).
206              
207             =cut
208              
209             sub is_done {
210 13     13 1 2929 my ($self) = @_;
211 13         47 return $self->{_done};
212             }
213              
214             =head2 error
215              
216             my $error = $stream->error;
217              
218             Returns any error that occurred during streaming, or undef.
219              
220             =cut
221              
222             sub error {
223 0     0 1 0 my ($self) = @_;
224 0         0 return $self->{_error};
225             }
226              
227             =head2 stream_to_file
228              
229             await $stream->stream_to_file($path);
230              
231             Streams the entire request body to a file. Returns a Future that resolves
232             to the number of bytes written.
233              
234             This is efficient for large uploads as it doesn't load the entire body into
235             memory - chunks are written incrementally as they arrive from the network.
236              
237             B File writes are B (synchronous I/O). Since chunks are
238             typically small (e.g., 64KB), each write completes quickly. The method
239             remains async overall because it awaits network chunks between writes.
240              
241             B Cannot be used with the C option as that would corrupt binary
242             data. Use C with a custom handler if you need decoded chunks
243             written to a file.
244              
245             For fully non-blocking file I/O, use C with your preferred
246             async file library:
247              
248             # Non-blocking alternative (bring your own async file library)
249             await $stream->stream_to(async sub {
250             my ($chunk) = @_;
251             await $my_async_file_writer->write($chunk);
252             });
253              
254             =cut
255              
256 2     2 1 1074 async sub stream_to_file {
257 2         4 my ($self, $path) = @_;
258 2 50       6 croak("path is required") unless defined $path;
259             croak("stream_to_file() cannot be used with decode option - use stream_to() instead")
260 2 100       103 if $self->{decode};
261              
262 1         2 my $bytes_written = 0;
263 1         1 my $fh;
264              
265 1         3 while (!$self->is_done) {
266 2         5 my $chunk = await $self->next_chunk;
267 2 50       61 last unless defined $chunk;
268 2 50       3 next unless length $chunk;
269              
270             # Open file on first chunk (truncate mode)
271 2 100       5 unless ($fh) {
272 1 50       79 open $fh, '>:raw', $path
273             or croak("Cannot open $path for writing: $!");
274             }
275              
276             # Blocking write - typically fast for small chunks
277 2 50       19 print $fh $chunk
278             or croak("Cannot write to $path: $!");
279              
280 2         25 $bytes_written += length($chunk);
281             }
282              
283             # Close file if we opened it
284 1 50       4 if ($fh) {
285 1 50       151 close $fh
286             or croak("Cannot close $path: $!");
287             }
288              
289 1         11 return $bytes_written;
290             }
291              
292             =head2 stream_to
293              
294             await $stream->stream_to(async sub ($chunk) {
295             # Process chunk
296             });
297              
298             Streams the entire request body to a custom sink callback. The callback
299             receives each chunk and can be async (return a Future).
300              
301             Returns a Future that resolves to the number of bytes processed.
302              
303             =cut
304              
305 1     1 1 8 async sub stream_to {
306 1         2 my ($self, $callback) = @_;
307 1 50       3 croak("callback is required") unless $callback;
308              
309 1         2 my $bytes_processed = 0;
310              
311 1         3 while (!$self->is_done) {
312 2         5 my $chunk = await $self->next_chunk;
313 2 50       43 last unless defined $chunk;
314 2 50       5 next unless length $chunk;
315              
316             # Call callback - it may be async
317 2         5 my $result = $callback->($chunk);
318 2 50 33     10 if (ref($result) && $result->can('get')) {
319 0         0 await $result;
320             }
321              
322 2         4 $bytes_processed += length($chunk);
323             }
324              
325 1         3 return $bytes_processed;
326             }
327              
328             =head1 INTERNAL METHODS
329              
330             =head2 _decode_chunk
331              
332             Internal method to decode a chunk with proper handling of incomplete UTF-8
333             sequences at boundaries.
334              
335             =cut
336              
337             sub _decode_chunk {
338 4     4   8 my ($self, $chunk, $flush) = @_;
339 4   50     10 $flush //= 0;
340 4         4 my $encoding = $self->{decode};
341 4 50       8 return $chunk unless $encoding;
342              
343             # Combine with buffered incomplete sequence from previous chunk
344 4         8 my $data = $self->{_buffer} . $chunk;
345              
346             # Use Encode::FB_QUIET for incremental decoding - the standard approach
347             # recommended by Encode documentation for handling partial multi-byte sequences
348 4         5 my $decoded = eval {
349 4 100 66     10 if ($flush || !length($data)) {
350             # Final chunk or empty - decode everything
351 3         5 $self->{_buffer} = '';
352 3 100       7 my $flag = $self->{strict} ? (Encode::FB_CROAK | Encode::LEAVE_SRC) : (Encode::FB_DEFAULT | Encode::LEAVE_SRC);
353 3         20 return decode($encoding, $data, $flag);
354             } else {
355             # Incremental decoding with FB_QUIET
356             # FB_QUIET modifies $data in place, removing the decoded portion
357             # and leaving incomplete sequences in $data for next time
358 1         15 my $result = decode($encoding, $data, Encode::FB_QUIET);
359              
360             # Whatever remains in $data is incomplete - buffer it
361 1         33 $self->{_buffer} = $data;
362              
363             # In strict mode, check if buffered data is invalid (not just incomplete)
364 1 50 33     4 if ($self->{strict} && length($self->{_buffer}) > 0) {
365 0         0 my $test = $self->{_buffer};
366 0         0 eval { decode($encoding, $test, Encode::FB_CROAK | Encode::LEAVE_SRC); };
  0         0  
367 0 0       0 die $@ if $@; # Propagate if invalid UTF-8
368             }
369              
370 1         3 return $result;
371             }
372             };
373              
374 4 100       127 if ($@) {
375 1         3 $self->{_error} = "Failed to decode chunk: $@";
376 1         1 $self->{_done} = 1;
377 1         97 croak($self->{_error});
378             }
379              
380 3         6 return $decoded;
381             }
382              
383             1;
384              
385             __END__