File Coverage

blib/lib/PAGI/Server/AsyncFile.pm
Criterion Covered Total %
statement 79 112 70.5
branch 20 54 37.0
condition 6 11 54.5
subroutine 14 16 87.5
pod 7 7 100.0
total 126 200 63.0


line stmt bran cond sub pod time code
1             package PAGI::Server::AsyncFile;
2              
3 88     88   219688 use strict;
  88         365  
  88         3080  
4 88     88   448 use warnings;
  88         191  
  88         3318  
5              
6              
7 88     88   339 use Future;
  88         124  
  88         1388  
8 88     88   275 use Future::AsyncAwait;
  88         112  
  88         635  
9 88     88   42687 use IO::Async::Function;
  88         1700643  
  88         3742  
10 88     88   572 use Scalar::Util qw(blessed);
  88         129  
  88         145761  
11              
12             =head1 NAME
13              
14             PAGI::Server::AsyncFile - Non-blocking file I/O for PAGI::Server internals
15              
16             =head1 SYNOPSIS
17              
18             use PAGI::Server::AsyncFile;
19             use IO::Async::Loop;
20              
21             # Create or obtain an IO::Async::Loop
22             my $loop = IO::Async::Loop->new;
23              
24             # Read entire file
25             my $content = await PAGI::Server::AsyncFile->read_file($loop, '/path/to/file');
26              
27             # Read file in chunks (streaming)
28             await PAGI::Server::AsyncFile->read_file_chunked($loop, '/path/to/file', async sub {
29             my ($chunk) = @_;
30             # Process each chunk
31             }, chunk_size => 65536);
32              
33             # Write file
34             await PAGI::Server::AsyncFile->write_file($loop, '/path/to/file', $content);
35              
36             # Append to file
37             await PAGI::Server::AsyncFile->append_file($loop, '/path/to/file', $log_line);
38              
39             =head1 DESCRIPTION
40              
41             This module provides non-blocking file I/O operations using L
42             worker processes. It is used internally by L for efficient file
43             streaming.
44              
45             B This is a PAGI::Server internal module. PAGI applications are
46             loop-agnostic and should use synchronous file I/O (which is simple and fast
47             for typical file sizes) or bring their own async file library if needed.
48              
49             It uses L to offload blocking file operations to worker
50             processes, preventing the main event loop from being blocked during disk I/O.
51             Regular file I/O in POSIX is always blocking at the kernel level - even
52             C/C/C report regular files as always "ready".
53             This module works around this limitation by running file operations in
54             separate worker processes, similar to how Node.js/libuv handles file I/O.
55              
56             =head1 CLASS METHODS
57              
58             =cut
59              
60             # Singleton function pool per loop (keyed by loop address)
61             my %_function_pools;
62              
63             # Get or create the function pool for a given loop
64             sub _get_function {
65 28     28   88 my ($class, $loop) = @_;
66              
67 28 50       178 my $loop_id = blessed($loop) ? "$loop" : 'default';
68              
69 28 100       119 unless ($_function_pools{$loop_id}) {
70             my $function = IO::Async::Function->new(
71             code => sub {
72 0     0   0 my ($op, @args) = @_;
73 0         0 return _worker_operation($op, @args);
74             },
75 3         60 min_workers => 1,
76             max_workers => 4,
77             idle_timeout => 30,
78             );
79              
80 3         617 $loop->add($function);
81 3         45354 $_function_pools{$loop_id} = $function;
82             }
83              
84 28         317 return $_function_pools{$loop_id};
85             }
86              
87             # Worker process operations
88             sub _worker_operation {
89 0     0   0 my ($op, @args) = @_;
90              
91 0 0       0 if ($op eq 'read_file') {
    0          
    0          
    0          
    0          
    0          
92 0         0 my ($path) = @args;
93 0 0       0 open my $fh, '<:raw', $path or die "Cannot open $path: $!";
94 0         0 local $/;
95 0         0 my $content = <$fh>;
96 0         0 close $fh;
97 0         0 return $content;
98             }
99             elsif ($op eq 'read_chunk') {
100 0         0 my ($path, $offset, $chunk_size) = @args;
101 0 0       0 open my $fh, '<:raw', $path or die "Cannot open $path: $!";
102 0 0       0 seek($fh, $offset, 0) if $offset;
103 0         0 my $bytes_read = read($fh, my $buffer, $chunk_size);
104 0         0 close $fh;
105 0   0     0 return ($buffer, $bytes_read // 0);
106             }
107             elsif ($op eq 'write_file') {
108 0         0 my ($path, $content) = @args;
109 0 0       0 open my $fh, '>:raw', $path or die "Cannot open $path for writing: $!";
110 0         0 print $fh $content;
111 0         0 close $fh;
112 0         0 return length($content);
113             }
114             elsif ($op eq 'append_file') {
115 0         0 my ($path, $content) = @args;
116 0 0       0 open my $fh, '>>:raw', $path or die "Cannot open $path for appending: $!";
117 0         0 print $fh $content;
118 0         0 close $fh;
119 0         0 return length($content);
120             }
121             elsif ($op eq 'file_size') {
122 0         0 my ($path) = @args;
123 0         0 return -s $path;
124             }
125             elsif ($op eq 'file_exists') {
126 0         0 my ($path) = @args;
127 0 0       0 return -f $path ? 1 : 0;
128             }
129             else {
130 0         0 die "Unknown operation: $op";
131             }
132             }
133              
134             =head2 read_file
135              
136             my $content = await PAGI::Server::AsyncFile->read_file($loop, $path);
137              
138             Read the entire contents of a file asynchronously. Returns a Future that
139             resolves to the file contents.
140              
141             Parameters:
142              
143             =over 4
144              
145             =item * C<$loop> - IO::Async::Loop instance
146              
147             =item * C<$path> - Path to the file to read
148              
149             =back
150              
151             Throws an exception if the file cannot be read.
152              
153             =cut
154              
155 11     11 1 274163 async sub read_file {
156 11         47 my ($class, $loop, $path) = @_;
157              
158 11 100       319 die "File not found: $path" unless -f $path;
159 10 50       148 die "Cannot read file: $path" unless -r $path;
160              
161 10         145 my $function = $class->_get_function($loop);
162 10         118 return await $function->call(args => ['read_file', $path]);
163             }
164              
165             =head2 read_file_chunked
166              
167             await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, async sub {
168             my ($chunk) = @_;
169             # Process chunk
170             }, chunk_size => 65536);
171              
172             # For Range requests (partial file):
173             await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, $callback,
174             offset => 1000, # Start at byte 1000
175             length => 5000, # Read 5000 bytes total
176             );
177              
178             Read a file in chunks, calling a callback for each chunk. This is suitable
179             for streaming large files without loading the entire file into memory.
180              
181             Parameters:
182              
183             =over 4
184              
185             =item * C<$loop> - IO::Async::Loop instance
186              
187             =item * C<$path> - Path to the file to read
188              
189             =item * C<$callback> - Async callback called with each chunk. Receives the chunk data.
190              
191             =item * C<%opts> - Options:
192              
193             =over 4
194              
195             =item * C - Size of each chunk in bytes (default: 65536)
196              
197             =item * C - Byte offset to start reading from (default: 0)
198              
199             =item * C - Maximum bytes to read; omit to read to EOF
200              
201             =back
202              
203             =back
204              
205             Returns a Future that resolves to the number of bytes read when complete.
206             The callback should return/await properly if it needs to do async operations.
207              
208             =cut
209              
210 7     7 1 11672 async sub read_file_chunked {
211 7         46 my ($class, $loop, $path, $callback, %opts) = @_;
212              
213 7 50       103 die "File not found: $path" unless -f $path;
214 7 50       129 die "Cannot read file: $path" unless -r $path;
215              
216 7   50     32 my $chunk_size = $opts{chunk_size} // 65536;
217 7   100     40 my $start_offset = $opts{offset} // 0;
218 7         15 my $max_length = $opts{length}; # undef means read to EOF
219              
220 7         49 my $file_size = -s $path;
221 7         33 my $function = $class->_get_function($loop);
222              
223 7         50 my $offset = $start_offset;
224 7         30 my $bytes_sent = 0;
225              
226             # Calculate end position
227 7 100       85 my $end_pos = defined $max_length
228             ? $start_offset + $max_length
229             : $file_size;
230 7 50       77 $end_pos = $file_size if $end_pos > $file_size;
231              
232 7         56 while ($offset < $end_pos) {
233 17         49 my $to_read = $chunk_size;
234              
235             # Don't read past the end position
236 17 100       86 if ($offset + $to_read > $end_pos) {
237 5         10 $to_read = $end_pos - $offset;
238             }
239              
240 17 50       73 last if $to_read <= 0;
241              
242 17         212 my ($chunk, $bytes_read) = await $function->call(
243             args => ['read_chunk', $path, $offset, $to_read]
244             );
245              
246 17 50       40079 last unless $bytes_read;
247              
248             # Call the callback - it may be async
249 17         118 my $result = $callback->($chunk);
250 17 100 66     6492 if (blessed($result) && $result->can('get')) {
251 4         18 await $result;
252             }
253              
254 17         44593 $offset += $bytes_read;
255 17         62 $bytes_sent += $bytes_read;
256             }
257              
258 7         188 return $bytes_sent; # Return total bytes read
259             }
260              
261             =head2 write_file
262              
263             await PAGI::Server::AsyncFile->write_file($loop, $path, $content);
264              
265             Write content to a file asynchronously, replacing any existing content.
266              
267             Parameters:
268              
269             =over 4
270              
271             =item * C<$loop> - IO::Async::Loop instance
272              
273             =item * C<$path> - Path to the file to write
274              
275             =item * C<$content> - Content to write
276              
277             =back
278              
279             Returns a Future that resolves to the number of bytes written.
280              
281             =cut
282              
283 4     4 1 12454 async sub write_file {
284 4         39 my ($class, $loop, $path, $content) = @_;
285              
286 4         12 my $function = $class->_get_function($loop);
287 4         40 return await $function->call(args => ['write_file', $path, $content]);
288             }
289              
290             =head2 append_file
291              
292             await PAGI::Server::AsyncFile->append_file($loop, $path, $content);
293              
294             Append content to a file asynchronously.
295              
296             Parameters:
297              
298             =over 4
299              
300             =item * C<$loop> - IO::Async::Loop instance
301              
302             =item * C<$path> - Path to the file
303              
304             =item * C<$content> - Content to append
305              
306             =back
307              
308             Returns a Future that resolves to the number of bytes written.
309              
310             =cut
311              
312 4     4 1 9390 async sub append_file {
313 4         58 my ($class, $loop, $path, $content) = @_;
314              
315 4         15 my $function = $class->_get_function($loop);
316 4         17 return await $function->call(args => ['append_file', $path, $content]);
317             }
318              
319             =head2 file_size
320              
321             my $size = await PAGI::Server::AsyncFile->file_size($loop, $path);
322              
323             Get the size of a file asynchronously.
324              
325             =cut
326              
327 1     1 1 3147 async sub file_size {
328 1         4 my ($class, $loop, $path) = @_;
329              
330 1         3 my $function = $class->_get_function($loop);
331 1         5 return await $function->call(args => ['file_size', $path]);
332             }
333              
334             =head2 file_exists
335              
336             my $exists = await PAGI::Server::AsyncFile->file_exists($loop, $path);
337              
338             Check if a file exists asynchronously.
339              
340             =cut
341              
342 2     2 1 5905 async sub file_exists {
343 2         5 my ($class, $loop, $path) = @_;
344              
345 2         7 my $function = $class->_get_function($loop);
346 2         8 return await $function->call(args => ['file_exists', $path]);
347             }
348              
349             =head2 cleanup
350              
351             PAGI::Server::AsyncFile->cleanup($loop);
352              
353             Clean up the worker pool for a given loop. Call this during application
354             shutdown to properly terminate worker processes.
355              
356             =cut
357              
358             sub cleanup {
359 1     1 1 6100 my ($class, $loop) = @_;
360 1   50     13 $loop //= undef;
361              
362 1 50       29 if ($loop) {
363 1 50       15 my $loop_id = blessed($loop) ? "$loop" : 'default';
364 1 50       22 if (my $function = delete $_function_pools{$loop_id}) {
365 1         57 $loop->remove($function);
366             }
367             }
368             else {
369             # Clean up all pools
370 0           for my $loop_id (keys %_function_pools) {
371 0           my $function = delete $_function_pools{$loop_id};
372             # Can't remove without loop reference, but clearing the hash helps
373             }
374             }
375             }
376              
377             =head1 CONFIGURATION
378              
379             The worker pool is configured with sensible defaults:
380              
381             =over 4
382              
383             =item * C: 1 - Minimum worker processes to keep alive
384              
385             =item * C: 4 - Maximum concurrent worker processes
386              
387             =item * C: 30 - Seconds before idle workers are shut down
388              
389             =back
390              
391             These settings balance responsiveness with resource usage. For applications
392             with heavy file I/O, you may want to adjust these values by modifying the
393             C<_get_function> method or by configuring at the application level.
394              
395             =head1 THREAD SAFETY
396              
397             Each IO::Async::Loop gets its own worker pool. Worker processes are forked
398             from the main process, so they inherit the initial state but operate
399             independently. File operations in workers do not affect the main process
400             state.
401              
402             =head1 SEE ALSO
403              
404             L, L, L
405              
406             =head1 AUTHOR
407              
408             PAGI Contributors
409              
410             =cut
411              
412             1;