File Coverage

blib/lib/AWS/Lambda/Bootstrap.pm
Criterion Covered Total %
statement 134 156 85.9
branch 18 32 56.2
condition 13 35 37.1
subroutine 26 29 89.6
pod 0 8 0.0
total 191 260 73.4


line stmt bran cond sub pod time code
1             package AWS::Lambda::Bootstrap;
2 18     18   4597067 use 5.026000;
  18         95  
3 18     18   928 use utf8;
  18         372  
  18         174  
4 18     18   673 use strict;
  18         39  
  18         902  
5 18     18   96 use warnings;
  18         54  
  18         1101  
6 18     18   14791 use HTTP::Tiny;
  18         682635  
  18         1126  
7 18     18   16693 use JSON::XS qw/decode_json encode_json/;
  18         134878  
  18         1659  
8 18     18   7300 use Try::Tiny;
  18         35239  
  18         1505  
9 18     18   10445 use AWS::Lambda;
  18         95  
  18         946  
10 18     18   10230 use AWS::Lambda::Context;
  18         63  
  18         809  
11 18     18   9706 use AWS::Lambda::ResponseWriter;
  18         64  
  18         926  
12 18     18   130 use Scalar::Util qw(blessed);
  18         78  
  18         1218  
13 18     18   113 use Exporter 'import';
  18         38  
  18         45974  
14              
15             our @EXPORT = ('bootstrap');
16              
17             sub bootstrap {
18 0     0   0 my $handler = shift;
19 0         0 my $bootstrap = AWS::Lambda::Bootstrap->new(
20             handler => $handler,
21             );
22 0         0 $bootstrap->handle_events;
23             }
24              
25             sub new {
26 11     11 0 3165478 my $proto = shift;
27 11   33     805 my $class = ref $proto || $proto;
28 11         268 my %args;
29 11 50 33     737 if (@_ == 1 && ref $_[0] eq 'HASH') {
30 0         0 %args = %{$_[0]};
  0         0  
31             } else {
32 11         839 %args = @_;
33             }
34              
35 11         200 my $api_version = '2018-06-01';
36 11   33     230 my $env_handler = $args{handler} // $ENV{'_HANDLER'} // die '$_HANDLER is not found';
      0        
37 11         218 my ($handler, $function) = split(/[.]/, $env_handler, 2);
38 11   33     153 my $runtime_api = $args{runtime_api} // $ENV{'AWS_LAMBDA_RUNTIME_API'} // die '$AWS_LAMBDA_RUNTIME_API is not found';
      0        
39 11   33     367 my $task_root = $args{task_root} // $ENV{'LAMBDA_TASK_ROOT'} // die '$LAMBDA_TASK_ROOT is not found';
      0        
40 11   66     423 my $max_workers = $args{max_workers} // $ENV{'AWS_LAMBDA_MAX_CONCURRENCY'} // 0;
      50        
41 11 50 33     840 die "max_workers must be a non-negative integer, got: $max_workers"
42             unless $max_workers =~ /^\d+$/ && $max_workers >= 0;
43 11         622 my $self = bless +{
44             task_root => $task_root,
45             handler => $handler,
46             function_name => $function,
47             runtime_api => $runtime_api,
48             api_version => $api_version,
49             next_event_url => "http://${runtime_api}/${api_version}/runtime/invocation/next",
50             max_workers => $max_workers,
51             http => HTTP::Tiny->new(
52             # XXX: I want to disable timeout, but it seems HTTP::Tiny does not support it.
53             # So, I set a long timeout.
54             timeout => 365*24*60*60, # 365 days
55             ),
56             }, $class;
57 11         3871 return $self;
58             }
59              
60             sub handle_events {
61 1     1 0 26 my $self = shift;
62 1 50       6 $self->_init or return;
63              
64 1 50       39 if ($self->{max_workers} > 0) {
65 1         785 require Parallel::Prefork;
66             my $pm = Parallel::Prefork->new({
67             max_workers => $self->{max_workers},
68 1         7069 trap_signals => {
69             TERM => 'TERM',
70             HUP => 'TERM',
71             },
72             });
73 1         73 while ($pm->signal_received ne 'TERM') {
74 1 50       13 $pm->start and next;
75 0         0 $self->_handle_events;
76 0         0 $pm->finish;
77             }
78 1         1020287 $pm->wait_all_children;
79             } else {
80 0         0 $self->_handle_events
81             }
82             }
83              
84             sub _handle_events {
85 0     0   0 my $self = shift;
86 0         0 while(1) {
87 0         0 $self->handle_event;
88             }
89             }
90              
91             sub _init {
92 6     6   14 my $self = shift;
93 6 50       39 if (my $func = $self->{function}) {
94 0         0 return $func;
95             }
96              
97 6         19 my $task_root = $self->{task_root};
98 6         18 my $handler = $self->{handler};
99 6         17 my $name = $self->{function_name};
100             return try {
101             package main;
102 6     6   3726 require "${task_root}/${handler}.pl";
103 5   100     1776 my $f = main->can($name) // die "handler $name is not found";
104 4         27 $self->{function} = $f;
105             } catch {
106 2     2   67 $self->lambda_init_error($_);
107 2         28 $self->{function} = sub {};
108 2         70 undef;
109 6         138 };
110             }
111              
112             sub handle_event {
113 5     5 0 282 my $self = shift;
114 5 100       45 $self->_init or return;
115 3         145 my ($payload, $context) = $self->lambda_next;
116             my $response = try {
117 3     3   161 local $AWS::Lambda::context = $context;
118 3         48 local $ENV{_X_AMZN_TRACE_ID} = $context->{trace_id};
119 3         15 $self->{function}->($payload, $context);
120             } catch {
121 1     1   30 my $err = $_;
122 1         45 print STDERR "$err";
123 1         10 $self->lambda_error($err, $context);
124 1         27 bless {}, 'AWS::Lambda::ErrorSentinel';
125 3         67 };
126 3         106 my $ref = ref($response);
127 3 100       15 if ($ref eq 'AWS::Lambda::ErrorSentinel') {
128 1         35 return;
129             }
130 2 100       7 if ($ref eq 'CODE') {
131 1         5 $self->lambda_response_streaming($response, $context);
132             } else {
133 1         7 $self->lambda_response($response, $context);
134             }
135 2         33 return 1;
136             }
137              
138             sub lambda_next {
139 1     1 0 33 my $self = shift;
140 1         287 my $resp = $self->{http}->get($self->{next_event_url});
141 1 50       8388 if (!$resp->{success}) {
142 0         0 die "failed to retrieve the next event: $resp->{status} $resp->{reason}";
143             }
144 1         3 my $h = $resp->{headers};
145 1         116 my $payload = decode_json($resp->{content});
146             return $payload, AWS::Lambda::Context->new(
147             deadline_ms => $h->{'lambda-runtime-deadline-ms'},
148             aws_request_id => $h->{'lambda-runtime-aws-request-id'},
149             invoked_function_arn => $h->{'lambda-runtime-invoked-function-arn'},
150             trace_id => $h->{'lambda-runtime-trace-id'},
151 1         82 tenant_id => $h->{'lambda-runtime-aws-tenant-id'},
152             );
153             }
154              
155             sub lambda_response {
156 1     1 0 94 my $self = shift;
157 1         4 my ($response, $context) = @_;
158 1         187 my $runtime_api = $self->{runtime_api};
159 1         4 my $api_version = $self->{api_version};
160 1         19 my $request_id = $context->aws_request_id;
161 1         75 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/response";
162 1         344 my $resp = $self->{http}->post($url, {
163             content => encode_json($response),
164             });
165 1 50       42238 if (!$resp->{success}) {
166 0         0 die "failed to response of execution: $resp->{status} $resp->{reason}";
167             }
168             }
169              
170             sub lambda_response_streaming {
171 1     1 0 48 my $self = shift;
172 1         19 my ($response, $context) = @_;
173 1         20787 my $runtime_api = $self->{runtime_api};
174 1         33 my $api_version = $self->{api_version};
175 1         32 my $request_id = $context->aws_request_id;
176 1         5 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/response";
177 1         15 my $writer = undef;
178             try {
179             $response->(sub {
180 1         7 my $content_type = shift;
181             $writer = AWS::Lambda::ResponseWriter->new(
182             response_url => $url,
183             http => $self->{http},
184 1         39 );
185 1         17 $writer->_request($content_type);
186 1         988 return $writer;
187 1     1   337 });
188             } catch {
189 0     0   0 my $err = $_;
190 0         0 print STDERR "$err";
191 0 0       0 if ($writer) {
192 0         0 $writer->_close_with_error($err);
193             } else {
194 0         0 $self->lambda_error($err, $context);
195             }
196 1         270 };
197 1 50       173 if ($writer) {
198 1         6 my $response = $writer->_handle_response;
199 1 50       95 if (!$response->{success}) {
200 0         0 die "failed to response of execution: $response->{status} $response->{reason}";
201             }
202             }
203             }
204              
205             sub lambda_error {
206 1     1 0 51 my $self = shift;
207 1         14 my ($error, $context) = @_;
208 1         54 my $runtime_api = $self->{runtime_api};
209 1         3 my $api_version = $self->{api_version};
210 1         44 my $request_id = $context->aws_request_id;
211 1         4 my $url = "http://${runtime_api}/${api_version}/runtime/invocation/${request_id}/error";
212 1   50     45 my $type = blessed($error) // "Error";
213 1         290 my $resp = $self->{http}->post($url, {
214             content => encode_json({
215             errorMessage => "$error",
216             errorType => "$type",
217             }),
218             });
219 1 50       39718 if (!$resp->{success}) {
220 0         0 die "failed to send error of execution: $resp->{status} $resp->{reason}";
221             }
222             }
223              
224             sub lambda_init_error {
225 1     1 0 58 my $self = shift;
226 1         12 my $error = shift;
227 1         26 my $runtime_api = $self->{runtime_api};
228 1         3 my $api_version = $self->{api_version};
229 1         22 my $url = "http://${runtime_api}/${api_version}/runtime/init/error";
230 1   50     32 my $type = blessed($error) // "Error";
231 1         322 my $resp = $self->{http}->post($url, {
232             content => encode_json({
233             errorMessage => "$error",
234             errorType => "$type",
235             }),
236             });
237 1 50       46100 if (!$resp->{success}) {
238 0           die "failed to send error of execution: $resp->{status} $resp->{reason}";
239             }
240             }
241              
242             1;
243             __END__