File Coverage

blib/lib/PAGI/App/WrapPSGI.pm
Criterion Covered Total %
statement 94 117 80.3
branch 21 38 55.2
condition 1 8 12.5
subroutine 13 15 86.6
pod 0 2 0.0
total 129 180 71.6


line stmt bran cond sub pod time code
1             package PAGI::App::WrapPSGI;
2 2     2   8571 use strict;
  2         5  
  2         68  
3 2     2   7 use warnings;
  2         4  
  2         118  
4 2     2   8 use Future::AsyncAwait;
  2         3  
  2         18  
5              
6              
7             =head1 NAME
8              
9             PAGI::App::WrapPSGI - PSGI-to-PAGI adapter
10              
11             =head1 SYNOPSIS
12              
13             use PAGI::App::WrapPSGI;
14              
15             my $psgi_app = sub {
16             my ($env) = @_;
17             return [200, ['Content-Type' => 'text/plain'], ['Hello']];
18             };
19              
20             my $wrapper = PAGI::App::WrapPSGI->new(psgi_app => $psgi_app);
21             my $pagi_app = $wrapper->to_app;
22              
23             =head1 DESCRIPTION
24              
25             PAGI::App::WrapPSGI wraps a PSGI application to make it work with
26             PAGI servers. It converts PAGI scope to PSGI %env and converts
27             PSGI responses to PAGI events.
28              
29             =cut
30              
31             sub new {
32 5     5 0 7813 my ($class, %args) = @_;
33              
34             my $self = bless {
35             psgi_app => $args{psgi_app},
36 5         21 }, $class;
37 5         12 return $self;
38             }
39              
40             sub to_app {
41 5     5 0 36 my ($self) = @_;
42              
43 5         15 my $psgi_app = $self->{psgi_app};
44              
45 8     8   40 return async sub {
46 8         20 my ($scope, $receive, $send) = @_;
47 8 100       57 die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';
48              
49 6         26 my $env = $self->_build_env($scope);
50              
51             # Collect request body
52 6         9 my $body = '';
53 6         6 while (1) {
54 6         12 my $event = await $receive->();
55 6 50       106 last if $event->{type} ne 'http.request';
56 6   50     16 $body .= $event->{body} // '';
57 6 50       23 last unless $event->{more};
58             }
59              
60             # Create psgi.input
61 6 50       100 open my $input, '<', \$body or die $!;
62 6         16 $env->{'psgi.input'} = $input;
63              
64             # Call PSGI app
65 6         21 my $response = $psgi_app->($env);
66              
67             # Handle response - could be arrayref or coderef (streaming)
68 6 100       68 if (ref $response eq 'CODE') {
69             # Delayed/streaming response
70 1         5 await $self->_handle_streaming_response($send, $response);
71             } else {
72 5         19 await $self->_send_response($send, $response);
73             }
74 5         39 };
75             }
76              
77             sub _build_env {
78 6     6   35 my ($self, $scope) = @_;
79              
80             my %env = (
81             REQUEST_METHOD => $scope->{method},
82             SCRIPT_NAME => $scope->{root_path},
83             PATH_INFO => $scope->{path},
84             QUERY_STRING => $scope->{query_string},
85             SERVER_PROTOCOL => 'HTTP/' . $scope->{http_version},
86             'psgi.version' => [1, 1],
87             'psgi.url_scheme' => $scope->{scheme},
88 6         119 'psgi.errors' => \*STDERR,
89             'psgi.multithread' => 0,
90             'psgi.multiprocess' => 0,
91             'psgi.run_once' => 0,
92             'psgi.streaming' => 1,
93             'psgi.nonblocking' => 1,
94             );
95              
96             # Add headers
97 6         9 for my $header (@{$scope->{headers}}) {
  6         16  
98 21         31 my ($name, $value) = @$header;
99 21         29 my $key = uc($name);
100 21         43 $key =~ s/-/_/g;
101 21 100       41 if ($key eq 'CONTENT_TYPE') {
    100          
102 1         2 $env{CONTENT_TYPE} = $value;
103             } elsif ($key eq 'CONTENT_LENGTH') {
104 1         3 $env{CONTENT_LENGTH} = $value;
105             } else {
106 19         34 $env{"HTTP_$key"} = $value;
107             }
108             }
109              
110             # Server/client info
111 6 50       17 if ($scope->{server}) {
112 6         14 $env{SERVER_NAME} = $scope->{server}[0];
113 6         13 $env{SERVER_PORT} = $scope->{server}[1];
114             }
115 6 50       17 if ($scope->{client}) {
116 6         17 $env{REMOTE_ADDR} = $scope->{client}[0];
117 6         16 $env{REMOTE_PORT} = $scope->{client}[1];
118             }
119              
120 6         11 return \%env;
121             }
122              
123 5     5   8 async sub _send_response {
124 5         11 my ($self, $send, $response) = @_;
125              
126 5         8 my ($status, $headers, $body) = @$response;
127              
128             await $send->({
129             type => 'http.response.start',
130             status => $status,
131 5         14 headers => [ map { [lc($_->[0]), $_->[1]] } @{_pairs($headers)} ],
132             });
133              
134 5 100       199 if (ref $body eq 'ARRAY') {
    50          
135 4         14 my $content = join '', @$body;
136 4         18 await $send->({
137             type => 'http.response.body',
138             body => $content,
139             more => 0,
140             });
141             } elsif (ref $body eq 'CODE') {
142             # Streaming response body (coderef)
143             # This is the "pull" pattern where we call the coderef repeatedly
144 0         0 while (1) {
145 0         0 my $chunk = $body->();
146 0 0       0 last unless defined $chunk;
147 0         0 await $send->({
148             type => 'http.response.body',
149             body => $chunk,
150             more => 1,
151             });
152             }
153             await $send->({
154             type => 'http.response.body',
155             body => '',
156             more => 0,
157 0         0 });
158             } else {
159             # Filehandle
160 1         4 local $/;
161 1         5 my $content = <$body>;
162 1         7 await $send->({
163             type => 'http.response.body',
164             body => $content // '',
165             more => 0,
166             });
167             }
168             }
169              
170             # Handle PSGI delayed/streaming response pattern
171 1     1   2 async sub _handle_streaming_response {
172 1         3 my ($self, $send, $responder_callback) = @_;
173              
174 1         1 my @body_chunks;
175 1         2 my $response_started = 0;
176 1         2 my $writer;
177              
178             # Create a writer object for streaming
179             my $create_writer = sub {
180 0     0   0 my ($send_ref, $status, $headers) = @_;
181             return {
182             write => sub {
183 0         0 my ($chunk) = @_;
184 0         0 push @body_chunks, $chunk;
185             },
186             close => sub {
187             # Mark as closed - will be handled after responder returns
188             },
189 0         0 };
190 1         4 };
191              
192             # Create the responder callback for the PSGI app
193             my $responder = sub {
194 1     1   5 my ($response) = @_;
195              
196 1 50       6 if (@$response == 3) {
    50          
197             # Complete response [status, headers, body]
198 0         0 my ($status, $headers, $body) = @$response;
199 0         0 $response_started = 1;
200              
201             # Store for later sending
202 0         0 push @body_chunks, { status => $status, headers => $headers, body => $body };
203 0         0 return;
204             } elsif (@$response == 2) {
205             # Streaming response [status, headers] - return writer
206 1         2 my ($status, $headers) = @$response;
207 1         2 $response_started = 1;
208              
209             # Store header info
210 1         3 push @body_chunks, { status => $status, headers => $headers };
211              
212             # Return a writer object
213 1         6 $writer = bless {
214             chunks => \@body_chunks,
215             }, 'PAGI::App::WrapPSGI::Writer';
216              
217 1         3 return $writer;
218             }
219 1         5 };
220              
221             # Call the PSGI delayed response callback
222 1         3 $responder_callback->($responder);
223              
224             # Now send all the collected response data
225 1 50       3 if (@body_chunks) {
226 1         2 my $first = shift @body_chunks;
227              
228 1 50       3 if (ref $first eq 'HASH') {
229 1         3 my $status = $first->{status};
230 1         2 my $headers = $first->{headers};
231 1         33 my $body = $first->{body};
232              
233             await $send->({
234             type => 'http.response.start',
235             status => $status,
236 1         4 headers => [ map { [lc($_->[0]), $_->[1]] } @{_pairs($headers)} ],
237             });
238              
239 1 50       36 if (defined $body) {
240             # Complete response with body
241 0         0 await $self->_send_body($send, $body);
242             } else {
243             # Streaming - send collected chunks
244 1         2 for my $chunk (@body_chunks) {
245 2         35 await $send->({
246             type => 'http.response.body',
247             body => $chunk,
248             more => 1,
249             });
250             }
251             await $send->({
252             type => 'http.response.body',
253             body => '',
254             more => 0,
255 1         29 });
256             }
257             }
258             }
259             }
260              
261 0     0   0 async sub _send_body {
262 0         0 my ($self, $send, $body) = @_;
263              
264 0 0 0     0 if (ref $body eq 'ARRAY') {
    0 0        
265 0         0 my $content = join '', @$body;
266 0         0 await $send->({
267             type => 'http.response.body',
268             body => $content,
269             more => 0,
270             });
271             } elsif (ref $body eq 'GLOB' || (ref $body && $body->can('getline'))) {
272             # Filehandle
273 0         0 local $/;
274 0         0 my $content = <$body>;
275 0         0 await $send->({
276             type => 'http.response.body',
277             body => $content // '',
278             more => 0,
279             });
280             } else {
281 0         0 await $send->({
282             type => 'http.response.body',
283             body => $body // '',
284             more => 0,
285             });
286             }
287             }
288              
289             # Simple writer class for streaming responses
290             package PAGI::App::WrapPSGI::Writer;
291              
292             sub write {
293 2     2   9 my ($self, $chunk) = @_;
294 2         2 push @{$self->{chunks}}, $chunk;
  2         9  
295             }
296              
297             sub close {
298 1     1   5 my ($self) = @_;
299             # Nothing special needed - chunks are already collected
300             }
301              
302             package PAGI::App::WrapPSGI;
303              
304             sub _pairs {
305 6     6   9 my ($arrayref) = @_;
306              
307 6         9 my @pairs;
308 6         18 for (my $i = 0; $i < @$arrayref; $i += 2) {
309 6         26 push @pairs, [$arrayref->[$i], $arrayref->[$i+1]];
310             }
311 6         18 return \@pairs;
312             }
313              
314             1;
315              
316             __END__