File Coverage

blib/lib/PAGI/Server/AsyncFile.pm
Criterion Covered Total %
statement 79 112 70.5
branch 20 54 37.0
condition 6 11 54.5
subroutine 14 16 87.5
pod 7 7 100.0
total 126 200 63.0


line stmt bran cond sub pod time code
1             package PAGI::Server::AsyncFile;
2              
3 110     110   247999 use strict;
  110         399  
  110         3747  
4 110     110   651 use warnings;
  110         205  
  110         6349  
5              
6             our $VERSION = '0.002001';
7              
8 110     110   464 use Future;
  110         151  
  110         2180  
9 110     110   370 use Future::AsyncAwait;
  110         174  
  110         805  
10 110     110   53938 use IO::Async::Function;
  110         2212751  
  110         4920  
11 110     110   751 use Scalar::Util qw(blessed);
  110         154  
  110         189426  
12              
13             =head1 NAME
14              
15             PAGI::Server::AsyncFile - Non-blocking file I/O for PAGI::Server internals
16              
17             =head1 SYNOPSIS
18              
19             use PAGI::Server::AsyncFile;
20             use IO::Async::Loop;
21              
22             # Create or obtain an IO::Async::Loop
23             my $loop = IO::Async::Loop->new;
24              
25             # Read entire file
26             my $content = await PAGI::Server::AsyncFile->read_file($loop, '/path/to/file');
27              
28             # Read file in chunks (streaming)
29             await PAGI::Server::AsyncFile->read_file_chunked($loop, '/path/to/file', async sub {
30             my ($chunk) = @_;
31             # Process each chunk
32             }, chunk_size => 65536);
33              
34             # Write file
35             await PAGI::Server::AsyncFile->write_file($loop, '/path/to/file', $content);
36              
37             # Append to file
38             await PAGI::Server::AsyncFile->append_file($loop, '/path/to/file', $log_line);
39              
40             =head1 DESCRIPTION
41              
42             This module provides non-blocking file I/O operations using L
43             worker processes. It is used internally by L for efficient file
44             streaming.
45              
46             B This is a PAGI::Server internal module. PAGI applications are
47             loop-agnostic and should use synchronous file I/O (which is simple and fast
48             for typical file sizes) or bring their own async file library if needed.
49              
50             It uses L to offload blocking file operations to worker
51             processes, preventing the main event loop from being blocked during disk I/O.
52             Regular file I/O in POSIX is always blocking at the kernel level - even
53             C/C/C report regular files as always "ready".
54             This module works around this limitation by running file operations in
55             separate worker processes, similar to how Node.js/libuv handles file I/O.
56              
57             =head1 CLASS METHODS
58              
59             =cut
60              
61             # Singleton function pool per loop (keyed by loop address)
62             my %_function_pools;
63              
64             # Get or create the function pool for a given loop
65             sub _get_function {
66 27     27   53 my ($class, $loop) = @_;
67              
68 27 50       127 my $loop_id = blessed($loop) ? "$loop" : 'default';
69              
70 27 100       110 unless ($_function_pools{$loop_id}) {
71             my $function = IO::Async::Function->new(
72             code => sub {
73 0     0   0 my ($op, @args) = @_;
74 0         0 return _worker_operation($op, @args);
75             },
76 2         35 min_workers => 1,
77             max_workers => 4,
78             idle_timeout => 30,
79             );
80              
81 2         397 $loop->add($function);
82 2         32230 $_function_pools{$loop_id} = $function;
83             }
84              
85 27         248 return $_function_pools{$loop_id};
86             }
87              
88             # Worker process operations
89             sub _worker_operation {
90 0     0   0 my ($op, @args) = @_;
91              
92 0 0       0 if ($op eq 'read_file') {
    0          
    0          
    0          
    0          
    0          
93 0         0 my ($path) = @args;
94 0 0       0 open my $fh, '<:raw', $path or die "Cannot open $path: $!";
95 0         0 local $/;
96 0         0 my $content = <$fh>;
97 0         0 close $fh;
98 0         0 return $content;
99             }
100             elsif ($op eq 'read_chunk') {
101 0         0 my ($path, $offset, $chunk_size) = @args;
102 0 0       0 open my $fh, '<:raw', $path or die "Cannot open $path: $!";
103 0 0       0 seek($fh, $offset, 0) if $offset;
104 0         0 my $bytes_read = read($fh, my $buffer, $chunk_size);
105 0         0 close $fh;
106 0   0     0 return ($buffer, $bytes_read // 0);
107             }
108             elsif ($op eq 'write_file') {
109 0         0 my ($path, $content) = @args;
110 0 0       0 open my $fh, '>:raw', $path or die "Cannot open $path for writing: $!";
111 0         0 print $fh $content;
112 0         0 close $fh;
113 0         0 return length($content);
114             }
115             elsif ($op eq 'append_file') {
116 0         0 my ($path, $content) = @args;
117 0 0       0 open my $fh, '>>:raw', $path or die "Cannot open $path for appending: $!";
118 0         0 print $fh $content;
119 0         0 close $fh;
120 0         0 return length($content);
121             }
122             elsif ($op eq 'file_size') {
123 0         0 my ($path) = @args;
124 0         0 return -s $path;
125             }
126             elsif ($op eq 'file_exists') {
127 0         0 my ($path) = @args;
128 0 0       0 return -f $path ? 1 : 0;
129             }
130             else {
131 0         0 die "Unknown operation: $op";
132             }
133             }
134              
135             =head2 read_file
136              
137             my $content = await PAGI::Server::AsyncFile->read_file($loop, $path);
138              
139             Read the entire contents of a file asynchronously. Returns a Future that
140             resolves to the file contents.
141              
142             Parameters:
143              
144             =over 4
145              
146             =item * C<$loop> - IO::Async::Loop instance
147              
148             =item * C<$path> - Path to the file to read
149              
150             =back
151              
152             Throws an exception if the file cannot be read.
153              
154             =cut
155              
156 11     11 1 262911 async sub read_file {
157 11         47 my ($class, $loop, $path) = @_;
158              
159 11 100       257 die "File not found: $path" unless -f $path;
160 10 50       127 die "Cannot read file: $path" unless -r $path;
161              
162 10         53 my $function = $class->_get_function($loop);
163 10         166 return await $function->call(args => ['read_file', $path]);
164             }
165              
166             =head2 read_file_chunked
167              
168             await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, async sub {
169             my ($chunk) = @_;
170             # Process chunk
171             }, chunk_size => 65536);
172              
173             # For Range requests (partial file):
174             await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, $callback,
175             offset => 1000, # Start at byte 1000
176             length => 5000, # Read 5000 bytes total
177             );
178              
179             Read a file in chunks, calling a callback for each chunk. This is suitable
180             for streaming large files without loading the entire file into memory.
181              
182             Parameters:
183              
184             =over 4
185              
186             =item * C<$loop> - IO::Async::Loop instance
187              
188             =item * C<$path> - Path to the file to read
189              
190             =item * C<$callback> - Async callback called with each chunk. Receives the chunk data.
191              
192             =item * C<%opts> - Options:
193              
194             =over 4
195              
196             =item * C - Size of each chunk in bytes (default: 65536)
197              
198             =item * C - Byte offset to start reading from (default: 0)
199              
200             =item * C - Maximum bytes to read; omit to read to EOF
201              
202             =back
203              
204             =back
205              
206             Returns a Future that resolves to the number of bytes read when complete.
207             The callback should return/await properly if it needs to do async operations.
208              
209             =cut
210              
211 6     6 1 8980 async sub read_file_chunked {
212 6         37 my ($class, $loop, $path, $callback, %opts) = @_;
213              
214 6 50       82 die "File not found: $path" unless -f $path;
215 6 50       93 die "Cannot read file: $path" unless -r $path;
216              
217 6   50     20 my $chunk_size = $opts{chunk_size} // 65536;
218 6   100     41 my $start_offset = $opts{offset} // 0;
219 6         11 my $max_length = $opts{length}; # undef means read to EOF
220              
221 6         36 my $file_size = -s $path;
222 6         23 my $function = $class->_get_function($loop);
223              
224 6         29 my $offset = $start_offset;
225 6         37 my $bytes_sent = 0;
226              
227             # Calculate end position
228 6 100       46 my $end_pos = defined $max_length
229             ? $start_offset + $max_length
230             : $file_size;
231 6 50       45 $end_pos = $file_size if $end_pos > $file_size;
232              
233 6         54 while ($offset < $end_pos) {
234 15         24 my $to_read = $chunk_size;
235              
236             # Don't read past the end position
237 15 100       60 if ($offset + $to_read > $end_pos) {
238 4         15 $to_read = $end_pos - $offset;
239             }
240              
241 15 50       44 last if $to_read <= 0;
242              
243 15         125 my ($chunk, $bytes_read) = await $function->call(
244             args => ['read_chunk', $path, $offset, $to_read]
245             );
246              
247 15 50       27591 last unless $bytes_read;
248              
249             # Call the callback - it may be async
250 15         53 my $result = $callback->($chunk);
251 15 100 66     4502 if (blessed($result) && $result->can('get')) {
252 4         13 await $result;
253             }
254              
255 15         42184 $offset += $bytes_read;
256 15         46 $bytes_sent += $bytes_read;
257             }
258              
259 6         134 return $bytes_sent; # Return total bytes read
260             }
261              
262             =head2 write_file
263              
264             await PAGI::Server::AsyncFile->write_file($loop, $path, $content);
265              
266             Write content to a file asynchronously, replacing any existing content.
267              
268             Parameters:
269              
270             =over 4
271              
272             =item * C<$loop> - IO::Async::Loop instance
273              
274             =item * C<$path> - Path to the file to write
275              
276             =item * C<$content> - Content to write
277              
278             =back
279              
280             Returns a Future that resolves to the number of bytes written.
281              
282             =cut
283              
284 4     4 1 10763 async sub write_file {
285 4         52 my ($class, $loop, $path, $content) = @_;
286              
287 4         15 my $function = $class->_get_function($loop);
288 4         37 return await $function->call(args => ['write_file', $path, $content]);
289             }
290              
291             =head2 append_file
292              
293             await PAGI::Server::AsyncFile->append_file($loop, $path, $content);
294              
295             Append content to a file asynchronously.
296              
297             Parameters:
298              
299             =over 4
300              
301             =item * C<$loop> - IO::Async::Loop instance
302              
303             =item * C<$path> - Path to the file
304              
305             =item * C<$content> - Content to append
306              
307             =back
308              
309             Returns a Future that resolves to the number of bytes written.
310              
311             =cut
312              
313 4     4 1 9163 async sub append_file {
314 4         22 my ($class, $loop, $path, $content) = @_;
315              
316 4         11 my $function = $class->_get_function($loop);
317 4         37 return await $function->call(args => ['append_file', $path, $content]);
318             }
319              
320             =head2 file_size
321              
322             my $size = await PAGI::Server::AsyncFile->file_size($loop, $path);
323              
324             Get the size of a file asynchronously.
325              
326             =cut
327              
328 1     1 1 3323 async sub file_size {
329 1         5 my ($class, $loop, $path) = @_;
330              
331 1         3 my $function = $class->_get_function($loop);
332 1         6 return await $function->call(args => ['file_size', $path]);
333             }
334              
335             =head2 file_exists
336              
337             my $exists = await PAGI::Server::AsyncFile->file_exists($loop, $path);
338              
339             Check if a file exists asynchronously.
340              
341             =cut
342              
343 2     2 1 5637 async sub file_exists {
344 2         5 my ($class, $loop, $path) = @_;
345              
346 2         6 my $function = $class->_get_function($loop);
347 2         8 return await $function->call(args => ['file_exists', $path]);
348             }
349              
350             =head2 cleanup
351              
352             PAGI::Server::AsyncFile->cleanup($loop);
353              
354             Clean up the worker pool for a given loop. Call this during application
355             shutdown to properly terminate worker processes.
356              
357             =cut
358              
359             sub cleanup {
360 1     1 1 5657 my ($class, $loop) = @_;
361 1   50     16 $loop //= undef;
362              
363 1 50       15 if ($loop) {
364 1 50       40 my $loop_id = blessed($loop) ? "$loop" : 'default';
365 1 50       20 if (my $function = delete $_function_pools{$loop_id}) {
366 1         51 $loop->remove($function);
367             }
368             }
369             else {
370             # Clean up all pools
371 0           for my $loop_id (keys %_function_pools) {
372 0           my $function = delete $_function_pools{$loop_id};
373             # Can't remove without loop reference, but clearing the hash helps
374             }
375             }
376             }
377              
378             =head1 CONFIGURATION
379              
380             The worker pool is configured with sensible defaults:
381              
382             =over 4
383              
384             =item * C: 1 - Minimum worker processes to keep alive
385              
386             =item * C: 4 - Maximum concurrent worker processes
387              
388             =item * C: 30 - Seconds before idle workers are shut down
389              
390             =back
391              
392             These settings balance responsiveness with resource usage. For applications
393             with heavy file I/O, you may want to adjust these values by modifying the
394             C<_get_function> method or by configuring at the application level.
395              
396             =head1 THREAD SAFETY
397              
398             Each IO::Async::Loop gets its own worker pool. Worker processes are forked
399             from the main process, so they inherit the initial state but operate
400             independently. File operations in workers do not affect the main process
401             state.
402              
403             =head1 SEE ALSO
404              
405             L, L, L
406              
407             =head1 AUTHOR
408              
409             PAGI Contributors
410              
411             =cut
412              
413             1;