File Coverage

blib/lib/AWS/Lambda/Bootstrap.pm
Criterion Covered Total %
statement 134 156 85.9
branch 18 32 56.2
condition 13 35 37.1
subroutine 26 29 89.6
pod 0 8 0.0
total 191 260 73.4


line stmt bran cond sub pod time code
1             package AWS::Lambda::Bootstrap;
2 18     18   4291180 use 5.026000;
  18         66  
3 18     18   568 use utf8;
  18         383  
  18         163  
4 18     18   613 use strict;
  18         34  
  18         393  
5 18     18   73 use warnings;
  18         60  
  18         936  
6 18     18   14979 use HTTP::Tiny;
  18         568623  
  18         1118  
7 18     18   14857 use JSON::XS qw/decode_json encode_json/;
  18         116915  
  18         1677  
8 18     18   6441 use Try::Tiny;
  18         31323  
  18         1259  
9 18     18   9029 use AWS::Lambda;
  18         89  
  18         892  
10 18     18   9274 use AWS::Lambda::Context;
  18         74  
  18         727  
11 18     18   9091 use AWS::Lambda::ResponseWriter;
  18         57  
  18         959  
12 18     18   118 use Scalar::Util qw(blessed);
  18         33  
  18         1016  
13 18     18   90 use Exporter 'import';
  18         35  
  18         39259  
14              
15             our @EXPORT = ('bootstrap');
16              
17             sub bootstrap {
18 0     0   0 my $handler = shift;
19 0         0 my $bootstrap = AWS::Lambda::Bootstrap->new(
20             handler => $handler,
21             );
22 0         0 $bootstrap->handle_events;
23             }
24              
25             sub new {
26 11     11 0 2725447 my $proto = shift;
27 11   33     935 my $class = ref $proto || $proto;
28 11         205 my %args;
29 11 50 33     609 if (@_ == 1 && ref $_[0] eq 'HASH') {
30 0         0 %args = %{$_[0]};
  0         0  
31             } else {
32 11         500 %args = @_;
33             }
34              
35 11         164 my $api_version = '2018-06-01';
36 11   33     203 my $env_handler = $args{handler} // $ENV{'_HANDLER'} // die '$_HANDLER is not found';
      0        
37 11         293 my ($handler, $function) = split(/[.]/, $env_handler, 2);
38 11   33     179 my $runtime_api = $args{runtime_api} // $ENV{'AWS_LAMBDA_RUNTIME_API'} // die '$AWS_LAMBDA_RUNTIME_API is not found';
      0        
39 11   33     186 my $task_root = $args{task_root} // $ENV{'LAMBDA_TASK_ROOT'} // die '$LAMBDA_TASK_ROOT is not found';
      0        
40 11   66     691 my $max_workers = $args{max_workers} // $ENV{'AWS_LAMBDA_MAX_CONCURRENCY'} // 0;
      50        
41 11 50 33     531 die "max_workers must be a non-negative integer, got: $max_workers"
42             unless $max_workers =~ /^\d+$/ && $max_workers >= 0;
43 11         520 my $self = bless +{
44             task_root => $task_root,
45             handler => $handler,
46             function_name => $function,
47             runtime_api => $runtime_api,
48             api_version => $api_version,
49             next_event_url => "http://${runtime_api}/${api_version}/runtime/invocation/next",
50             max_workers => $max_workers,
51             http => HTTP::Tiny->new(
52             # XXX: I want to disable timeout, but it seems HTTP::Tiny does not support it.
53             # So, I set a long timeout.
54             timeout => 365*24*60*60, # 365 days
55             ),
56             }, $class;
57 11         3448 return $self;
58             }
59              
60             sub handle_events {
61 1     1 0 24 my $self = shift;
62 1 50       7 $self->_init or return;
63              
64 1 50       40 if ($self->{max_workers} > 0) {
65 1         752 require Parallel::Prefork;
66             my $pm = Parallel::Prefork->new({
67             max_workers => $self->{max_workers},
68 1         6698 trap_signals => {
69             TERM => 'TERM',
70             HUP => 'TERM',
71             },
72             });
73 1         76 while ($pm->signal_received ne 'TERM') {
74 1 50       14 $pm->start and next;
75 0         0 $self->_handle_events;
76 0         0 $pm->finish;
77             }
78 1         1005580 $pm->wait_all_children;
79             } else {
80 0         0 $self->_handle_events
81             }
82             }
83              
84             sub _handle_events {
85 0     0   0 my $self = shift;
86 0         0 while(1) {
87 0         0 $self->handle_event;
88             }
89             }
90              
91             sub _init {
92 6     6   15 my $self = shift;
93 6 50       36 if (my $func = $self->{function}) {
94 0         0 return $func;
95             }
96              
97 6         17 my $task_root = $self->{task_root};
98 6         16 my $handler = $self->{handler};
99 6         31 my $name = $self->{function_name};
100             return try {
101             package main;
102 6     6   3354 require "${task_root}/${handler}.pl";
103 5   100     1575 my $f = main->can($name) // die "handler $name is not found";
104 4         27 $self->{function} = $f;
105             } catch {
106 2     2   70 $self->lambda_init_error($_);
107 2         30 $self->{function} = sub {};
108 2         70 undef;
109 6         159 };
110             }
111              
112             sub handle_event {
113 5     5 0 184 my $self = shift;
114 5 100       38 $self->_init or return;
115 3         98 my ($payload, $context) = $self->lambda_next;
116             my $response = try {
117 3     3   136 local $AWS::Lambda::context = $context;
118 3         44 local $ENV{_X_AMZN_TRACE_ID} = $context->{trace_id};
119 3         13 $self->{function}->($payload, $context);
120             } catch {
121 1     1   19 my $err = $_;
122 1         28 print STDERR "$err";
123 1         5 $self->lambda_error($err, $context);
124 1         29 bless {}, 'AWS::Lambda::ErrorSentinel';
125 3         76 };
126 3         79 my $ref = ref($response);
127 3 100       13 if ($ref eq 'AWS::Lambda::ErrorSentinel') {
128 1         11 return;
129             }
130 2 100       7 if ($ref eq 'CODE') {
131 1         5 $self->lambda_response_streaming($response, $context);
132             } else {
133 1         5 $self->lambda_response($response, $context);
134             }
135 2         35 return 1;
136             }
137              
138             sub lambda_next {
139 1     1 0 21 my $self = shift;
140 1         334 my $resp = $self->{http}->get($self->{next_event_url});
141 1 50       10169 if (!$resp->{success}) {
142 0         0 die "failed to retrieve the next event: $resp->{status} $resp->{reason}";
143             }
144 1         4 my $h = $resp->{headers};
145 1         50 my $payload = decode_json($resp->{content});
146             return $payload, AWS::Lambda::Context->new(
147             deadline_ms => $h->{'lambda-runtime-deadline-ms'},
148             aws_request_id => $h->{'lambda-runtime-aws-request-id'},
149             invoked_function_arn => $h->{'lambda-runtime-invoked-function-arn'},
150             trace_id => $h->{'lambda-runtime-trace-id'},
151 1         88 tenant_id => $h->{'lambda-runtime-aws-tenant-id'},
152             );
153             }
154              
155             sub lambda_response {
156 1     1 0 33 my $self = shift;
157 1         10 my ($response, $context) = @_;
158 1         26 my $runtime_api = $self->{runtime_api};
159 1         2 my $api_version = $self->{api_version};
160 1         19 my $request_id = $context->aws_request_id;
161 1         64 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/response";
162 1         235 my $resp = $self->{http}->post($url, {
163             content => encode_json($response),
164             });
165 1 50       24599 if (!$resp->{success}) {
166 0         0 die "failed to response of execution: $resp->{status} $resp->{reason}";
167             }
168             }
169              
170             sub lambda_response_streaming {
171 1     1 0 38 my $self = shift;
172 1         4 my ($response, $context) = @_;
173 1         23 my $runtime_api = $self->{runtime_api};
174 1         8 my $api_version = $self->{api_version};
175 1         27 my $request_id = $context->aws_request_id;
176 1         5 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/response";
177 1         99 my $writer = undef;
178             try {
179             $response->(sub {
180 1         7 my $content_type = shift;
181             $writer = AWS::Lambda::ResponseWriter->new(
182             response_url => $url,
183             http => $self->{http},
184 1         60 );
185 1         23 $writer->_request($content_type);
186 1         909 return $writer;
187 1     1   181 });
188             } catch {
189 0     0   0 my $err = $_;
190 0         0 print STDERR "$err";
191 0 0       0 if ($writer) {
192 0         0 $writer->_close_with_error($err);
193             } else {
194 0         0 $self->lambda_error($err, $context);
195             }
196 1         115 };
197 1 50       2360 if ($writer) {
198 1         10 my $response = $writer->_handle_response;
199 1 50       119 if (!$response->{success}) {
200 0         0 die "failed to response of execution: $response->{status} $response->{reason}";
201             }
202             }
203             }
204              
205             sub lambda_error {
206 1     1 0 38 my $self = shift;
207 1         3 my ($error, $context) = @_;
208 1         88 my $runtime_api = $self->{runtime_api};
209 1         14 my $api_version = $self->{api_version};
210 1         17 my $request_id = $context->aws_request_id;
211 1         76 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/error";
212 1   50     29 my $type = blessed($error) // "Error";
213 1         349 my $resp = $self->{http}->post($url, {
214             content => encode_json({
215             errorMessage => "$error",
216             errorType => "$type",
217             }),
218             });
219 1 50       37383 if (!$resp->{success}) {
220 0         0 die "failed to send error of execution: $resp->{status} $resp->{reason}";
221             }
222             }
223              
224             sub lambda_init_error {
225 1     1 0 67 my $self = shift;
226 1         3 my $error = shift;
227 1         21 my $runtime_api = $self->{runtime_api};
228 1         15 my $api_version = $self->{api_version};
229 1         23 my $url = "http://${runtime_api}/${api_version}/runtime/init/error";
230 1   50     55 my $type = blessed($error) // "Error";
231 1         352 my $resp = $self->{http}->post($url, {
232             content => encode_json({
233             errorMessage => "$error",
234             errorType => "$type",
235             }),
236             });
237 1 50       48111 if (!$resp->{success}) {
238 0           die "failed to send error of execution: $resp->{status} $resp->{reason}";
239             }
240             }
241              
242             1;
243             __END__