File Coverage

lib/Amazon/DynamoDB/20120810.pm
Criterion Covered Total %
statement 43 45 95.5
branch n/a
condition n/a
subroutine 15 15 100.0
pod n/a
total 58 60 96.6


line stmt bran cond sub pod time code
1             package Amazon::DynamoDB::20120810;
2             $Amazon::DynamoDB::20120810::VERSION = '0.35';
3 11     11   40 use strict;
  11         12  
  11         262  
4 11     11   34 use warnings;
  11         12  
  11         171  
5              
6              
7 11     11   5274 use Future;
  11         61682  
  11         292  
8 11     11   4369 use Future::Utils qw(repeat try_repeat);
  11         14925  
  11         636  
9 11     11   4050 use POSIX qw(strftime);
  11         42161  
  11         42  
10 11     11   11124 use JSON::MaybeXS qw(decode_json encode_json);
  11         34261  
  11         518  
11 11     11   3795 use MIME::Base64;
  11         4461  
  11         508  
12 11     11   45 use List::Util;
  11         9  
  11         431  
13 11     11   3515 use List::MoreUtils;
  11         62314  
  11         58  
14 11     11   3658 use B qw(svref_2object);
  11         12  
  11         445  
15 11     11   3915 use HTTP::Request;
  11         143456  
  11         290  
16 11     11   3775 use Kavorka;
  11         108663  
  11         76  
17 11     11   1298282 use Amazon::DynamoDB::Types;
  11         31  
  11         99  
18 11     11   9882 use Type::Registry;
  11         65475  
  11         65  
19 11     11   12074 use VM::EC2::Security::CredentialCache;
  0            
  0            
20             use AWS::Signature4;
21            
22             BEGIN {
23             my $reg = "Type::Registry"->for_me;
24             $reg->add_types(-Standard);
25             $reg->add_types("Amazon::DynamoDB::Types");
26             };
27              
28              
29              
30             sub new {
31             my $class = shift;
32             bless { @_ }, $class
33             }
34              
35             sub implementation { shift->{implementation} }
36             sub host { shift->{host} }
37             sub port { shift->{port} }
38             sub ssl { shift->{ssl} }
39             sub algorithm { 'AWS4-HMAC-SHA256' }
40             sub scope { shift->{scope} }
41             sub access_key { shift->{access_key} }
42             sub secret_key { shift->{secret_key} }
43             sub debug_failures { shift->{debug} }
44              
45             sub max_retries { shift->{max_retries} }
46              
47              
48              
49              
50              
51             method create_table(TableNameType :$TableName!,
52             Int :$ReadCapacityUnits = 2,
53             Int :$WriteCapacityUnits = 2,
54             AttributeDefinitionsType :$AttributeDefinitions,
55             KeySchemaType :$KeySchema!,
56             ArrayRef[GlobalSecondaryIndexType] :$GlobalSecondaryIndexes where { scalar(@$_) <= 5 },
57             ArrayRef[LocalSecondaryIndexType] :$LocalSecondaryIndexes
58             ) {
59             my %payload = (
60             TableName => $TableName,
61             ProvisionedThroughput => {
62             ReadCapacityUnits => int($ReadCapacityUnits),
63             WriteCapacityUnits => int($WriteCapacityUnits),
64             }
65             );
66              
67             if (defined($AttributeDefinitions)) {
68             foreach my $field_name (keys %$AttributeDefinitions) {
69             my $type = $AttributeDefinitions->{$field_name};
70              
71             push @{$payload{AttributeDefinitions}}, {
72             AttributeName => $field_name,
73             AttributeType => $type // 'S',
74             }
75             }
76             }
77              
78             $payload{KeySchema} = _create_key_schema($KeySchema, $AttributeDefinitions);
79              
80             foreach my $index_record (['GlobalSecondaryIndexes', $GlobalSecondaryIndexes],
81             ['LocalSecondaryIndexes', $LocalSecondaryIndexes]) {
82             my $index_type = $index_record->[0];
83             my $index = $index_record->[1];
84            
85             if (defined($index)) {
86             foreach my $i (@$index) {
87             my $r = {
88             IndexName => $i->{IndexName},
89             (($index_type eq 'GlobalSecondaryIndexes') ?
90             (ProvisionedThroughput => {
91             ReadCapacityUnits => int($i->{ProvisionedThroughput}->{ReadCapacityUnits} // 1),
92             WriteCapacityUnits => int($i->{ProvisionedThroughput}->{WriteCapacityUnits} // 1),
93             }) : ()),
94             KeySchema => _create_key_schema($i->{KeySchema}, $AttributeDefinitions),
95             };
96              
97             my $type = $i->{Projection}->{ProjectionType};
98             $r->{Projection}->{ProjectionType} = $type;
99            
100             if (defined($i->{Projection}->{NonKeyAttributes})) {
101             my $attrs = $i->{Projection}->{NonKeyAttributes};
102             # Can't validate these attribute names since they aren't part of the key.
103             $r->{Projection}->{NonKeyAttributes} = $attrs;
104             }
105             push @{$payload{$index_type}}, $r;
106             }
107             }
108             }
109              
110             my $req = $self->make_request(
111             target => 'CreateTable',
112             payload => \%payload,
113             );
114             $self->_process_request($req)
115             }
116              
117              
118             method describe_table(TableNameType :$TableName!) {
119             my $req = $self->make_request(
120             target => 'DescribeTable',
121             payload => _make_payload({
122             TableName => $TableName
123             }));
124             $self->_process_request($req,
125             sub {
126             my $content = shift;
127             decode_json($content)->{Table};
128             });
129             }
130              
131              
132             method delete_table(TableNameType :$TableName!) {
133             my $req = $self->make_request(
134             target => 'DeleteTable',
135             payload => _make_payload({ TableName => $TableName }));
136             $self->_process_request($req,
137             sub {
138             my $content = shift;
139             decode_json($content)->{TableDescription}
140             });
141             }
142              
143              
144             method wait_for_table_status(TableNameType :$TableName!,
145             Int :$WaitInterval = 2,
146             TableStatusType :$DesiredStatus = "ACTIVE") {
147             repeat {
148             my $retry = shift;
149            
150             $self->{implementation}->delay($retry ? $WaitInterval : 0)
151             ->then(sub {
152             $self->describe_table(TableName => $TableName)
153             });
154             } until => sub {
155             my $f = shift;
156             my $status = $f->get->{TableStatus};
157             $status eq $DesiredStatus
158             };
159             }
160              
161              
162             method each_table(CodeRef $code,
163             TableNameType :$ExclusiveStartTableName,
164             Int :$Limit where { $_ >= 0 && $_ <= 100}
165             ) {
166             my $finished = 0;
167             try_repeat {
168             my $req = $self->make_request(
169             target => 'ListTables',
170             payload => _make_payload({
171             ExclusiveStartTableName => $ExclusiveStartTableName,
172             Limit => $Limit
173             }));
174             $self->_process_request($req,
175             sub {
176             my $result = shift;
177             my $data = decode_json($result);
178             for my $tbl (@{$data->{TableNames}}) {
179             $code->($tbl);
180             }
181             $ExclusiveStartTableName = $data->{LastEvaluatedTableName};
182             if (!defined($ExclusiveStartTableName)) {
183             $finished = 1
184             }
185             });
186             } while => sub { !$finished };
187             }
188              
189              
190             method put_item (ConditionalOperatorType :$ConditionalOperator,
191             Str :$ConditionExpression,
192             ItemType :$Item!,
193             ExpectedType :$Expected,
194             ExpressionAttributeValuesType :$ExpressionAttributeValues,
195             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
196             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
197             ReturnValuesType :$ReturnValues,
198             TableNameType :$TableName!) {
199             my $req = $self->make_request(
200             target => 'PutItem',
201             payload => _make_payload({
202             'ConditionalOperator' => $ConditionalOperator,
203             'Expected' => $Expected,
204             'ConditionExpression' => $ConditionExpression,
205             'ExpressionAttributeValues' => $ExpressionAttributeValues,
206             'Item' => $Item,
207             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
208             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
209             'ReturnValues' => $ReturnValues,
210             'TableName' => $TableName
211             }));
212            
213             $self->_process_request($req, \&_decode_single_item_change_response);
214             }
215              
216              
217              
218             method update_item (AttributeUpdatesType :$AttributeUpdates,
219             Str :$ConditionExpression,
220             ConditionalOperatorType :$ConditionalOperator,
221             ExpectedType :$Expected,
222             KeyType :$Key!,
223             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
224             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
225             ReturnValuesType :$ReturnValues,
226             TableNameType :$TableName!,
227             ExpressionAttributeValuesType :$ExpressionAttributeValues,
228             ExpressionAttributeNamesType :$ExpressionAttributeNames,
229             Str :$UpdateExpression,
230             ) {
231             (defined($AttributeUpdates) xor defined($UpdateExpression)) || die("Either AttributeUpdates or UpdateExpression is required");
232            
233             my $req = $self->make_request(
234             target => 'UpdateItem',
235             payload => _make_payload({
236             'AttributeUpdates' => $AttributeUpdates,
237             'ConditionalOperator' => $ConditionalOperator,
238             'ConditionExpression' => $ConditionExpression,
239             'Expected' => $Expected,
240             'ExpressionAttributeNames' => $ExpressionAttributeNames,
241             'ExpressionAttributeValues' => $ExpressionAttributeValues,
242             'Key' => $Key,
243             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
244             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
245             'ReturnValues' => $ReturnValues,
246             'TableName' => $TableName,
247             'UpdateExpression' => $UpdateExpression,
248             }));
249             $self->_process_request($req, \&_decode_single_item_change_response);
250             }
251              
252              
253              
254              
255             method delete_item(ConditionalOperatorType :$ConditionalOperator,
256             ExpectedType :$Expected,
257             KeyType :$Key!,
258             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
259             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
260             ReturnValuesType :$ReturnValues,
261             TableNameType :$TableName!) {
262             my $req = $self->make_request(
263             target => 'DeleteItem',
264             payload => _make_payload({
265             'ConditionalOperator' => $ConditionalOperator,
266             'Expected' => $Expected,
267             'Key' => $Key,
268             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
269             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
270             'ReturnValues' => $ReturnValues,
271             'TableName' => $TableName
272             }));
273            
274             $self->_process_request($req, \&_decode_single_item_change_response);
275             }
276              
277              
278              
279              
280             method get_item(CodeRef $code,
281             AttributesToGetType :$AttributesToGet,
282             StringBooleanType :$ConsistentRead,
283             KeyType :$Key!,
284             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
285             TableNameType :$TableName!) {
286             my $req = $self->make_request(
287             target => 'GetItem',
288             payload => _make_payload({
289             'AttributesToGet' => $AttributesToGet,
290             'ConsistentRead' => $ConsistentRead,
291             'Key' => $Key,
292             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
293             'TableName' => $TableName
294             }));
295            
296             $self->_process_request(
297             $req,
298             sub {
299             my $result = shift;
300             my $data = decode_json($result);
301             $code->(_decode_item_attributes($data->{Item}));
302             });
303             }
304              
305              
306              
307              
308             method batch_write_item(BatchWriteRequestItemsType :$RequestItems! where { scalar(keys %$_) > 0 },
309             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
310             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
311             ) {
312             my @all_requests;
313              
314             foreach my $table_name (keys %$RequestItems) {
315             # Item.
316             my $table_items = $RequestItems->{$table_name};
317            
318             my $seen_type;
319             foreach my $item (@$table_items) {
320             my $r;
321             foreach my $t (['DeleteRequest', 'Key'], ['PutRequest', 'Item']) {
322             if (defined($item->{$t->[0]})) {
323             my $key = $item->{$t->[0]}->{$t->[1]};
324             foreach my $k (keys %$key) {
325             # Don't bother encoding undefined values, same behavior as put_item
326             if (defined($key->{$k})) {
327             $r->{$t->[0]}->{$t->[1]}->{$k} = { _encode_type_and_value($key->{$k}) };
328             }
329             }
330             }
331             }
332             if (defined($r)) {
333             push @all_requests, [$table_name, $r];
334             }
335             }
336             }
337              
338             try_repeat {
339             my %payload = (
340             ReturnConsumedCapacity => $ReturnConsumedCapacity,
341             ReturnItemCollectionMetrics => $ReturnItemCollectionMetrics
342             );
343              
344             # print "Pending requests: " . scalar(@all_requests) . "\n";
345             my @records = splice @all_requests, 0, List::Util::min(25, scalar(@all_requests));
346            
347              
348             foreach my $record (@records) {
349             push @{$payload{RequestItems}->{$record->[0]}}, $record->[1];
350             }
351            
352              
353             my $req = $self->make_request(
354             target => 'BatchWriteItem',
355             payload => \%payload,
356             );
357              
358             $self->_process_request(
359             $req,
360             sub {
361             my $result = shift;
362             my $data = decode_json($result);
363            
364             if (defined($data->{UnprocessedItems})) {
365             foreach my $table_name (keys %{$data->{UnprocessedItems}}) {
366             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedItems}->{$table_name}};
367             }
368             }
369             return $data;
370             })->on_fail(sub {
371             @all_requests = ();
372             });
373             } until => sub { scalar(@all_requests) == 0 };
374             }
375              
376              
377              
378              
379              
380             method batch_get_item(CodeRef $code,
381             BatchGetItemsType :$RequestItems!,
382             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
383             Int :$ResultLimit where { !defined($_) || $_ > 0 }
384             ) {
385             my @all_requests;
386             my $table_flags = {};
387              
388             foreach my $table_name (keys %$RequestItems) {
389             my $table_details = $RequestItems->{$table_name};
390              
391             # Store these flags for later.
392             map {
393             if (defined($table_details->{$_})) {
394             $table_flags->{$_} = $table_details->{$_};
395             }
396             } ('ConsistentRead', 'AttributesToGet');
397              
398             foreach my $item (@{$table_details->{Keys}}) {
399             my $r = {};
400             foreach my $key_field (keys %$item) {
401             $r->{$key_field} = { _encode_type_and_value($item->{$key_field}) };
402             }
403             push @all_requests, [$table_name, $r];
404             }
405             }
406              
407             my $records_seen =0;
408             try_repeat {
409              
410             my %payload = (
411             ReturnConsumedCapacity => $ReturnConsumedCapacity
412             );
413              
414             # Only try 100 requests at one time.
415             my @records = splice @all_requests, 0, List::Util::min(100, scalar(@all_requests));
416              
417              
418             foreach my $record (@records) {
419             push @{$payload{RequestItems}->{$record->[0]}->{Keys}}, $record->[1];
420             }
421            
422             foreach my $seen_table_name (grep { defined($table_flags->{$_}) } List::MoreUtils::uniq(map { $_->[0] } @records)) {
423             $payload{RequestItems}->{$seen_table_name} = {
424             %{$table_flags->{$seen_table_name}},
425             Keys => $payload{RequestItems}->{$seen_table_name}->{Keys}
426             };
427             }
428              
429             my $req = $self->make_request(
430             target => 'BatchGetItem',
431             payload => \%payload,
432             );
433              
434             $self->_process_request(
435             $req,
436             sub {
437             my $result = shift;
438             my $data = decode_json($result);
439             foreach my $table_name (keys %{$data->{Responses}}) {
440             foreach my $item (@{$data->{Responses}->{$table_name}}) {
441             $code->($table_name, _decode_item_attributes($item));
442             $records_seen += 1;
443             if (defined($ResultLimit) &&$records_seen >= $ResultLimit) {
444             @all_requests = ();
445             return $data;
446             }
447             }
448             }
449            
450             if (defined($data->{UnprocessedKeys})) {
451             foreach my $table_name (keys %{$data->{UnprocessedKeys}}) {
452             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedKeys}->{$table_name}->{Keys}};
453             }
454             }
455             return $data;
456             })->on_fail(sub {
457             @all_requests = ();
458             });
459             } until => sub { scalar(@all_requests) == 0 };
460             }
461              
462              
463             method query (CodeRef $code,
464             AttributesToGetType :$AttributesToGet,
465             StringBooleanType :$ConsistentRead,
466             ConditionalOperatorType :$ConditionalOperator,
467             KeyType :$ExclusiveStartKey,
468             TableNameType :$IndexName,
469             KeyConditionsType :$KeyConditions!,
470             Int :$Limit where { $_ >= 0 },
471             QueryFilterType :$QueryFilter,
472             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
473             StringBooleanType :$ScanIndexForward,
474             SelectType :$Select,
475             TableNameType :$TableName!,
476             Str :$FilterExpression,
477             ExpressionAttributeValuesType :$ExpressionAttributeValues,
478             ExpressionAttributeNamesType :$ExpressionAttributeNames,
479             ) {
480              
481             my $payload = _make_payload({
482             'AttributesToGet' => $AttributesToGet,
483             'ConsistentRead' => $ConsistentRead,
484             'ConditionalOperator' => $ConditionalOperator,
485             'ExclusiveStartKey' => $ExclusiveStartKey,
486             'ExpressionAttributeNames' => $ExpressionAttributeNames,
487             'ExpressionAttributeValues' => $ExpressionAttributeValues,
488             'FilterExpression' => $FilterExpression,
489             'IndexName' => $IndexName,
490             'QueryFilter' => $QueryFilter,
491             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
492             'ScanIndexForward' => $ScanIndexForward,
493             'Select' => $Select,
494             'TableName' => $TableName
495             });
496            
497              
498             foreach my $key_name (keys %$KeyConditions) {
499             my $key_details = $KeyConditions->{$key_name};
500             $payload->{KeyConditions}->{$key_name} = {
501             AttributeValueList => _encode_attribute_value_list($key_details->{AttributeValueList}, $key_details->{ComparisonOperator}),
502             ComparisonOperator => $key_details->{ComparisonOperator}
503             };
504             }
505              
506             $self->_scan_or_query_process('Query', $payload, $code, { ResultLimit => $Limit});
507             }
508              
509              
510              
511              
512              
513             method scan (CodeRef $code,
514             AttributesToGetType :$AttributesToGet,
515             KeyType :$ExclusiveStartKey,
516             Int :$Limit where { $_ >= 0},
517             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
518             ScanFilterType :$ScanFilter,
519             Int :$Segment where { $_ >= 0 },
520             SelectType :$Select,
521             TableNameType :$TableName!,
522             Int :$TotalSegments where { $_ >= 1 && $_ <= 1000000 },
523             Str :$FilterExpression,
524             ExpressionAttributeValuesType :$ExpressionAttributeValues,
525             ExpressionAttributeNamesType :$ExpressionAttributeNames,
526             ) {
527             my $payload = _make_payload({
528             'AttributesToGet' => $AttributesToGet,
529             'ExclusiveStartKey' => $ExclusiveStartKey,
530             'ExpressionAttributeValues' => $ExpressionAttributeValues,
531             'ExpressionAttributeNames' => $ExpressionAttributeNames,
532             'FilterExpression' => $FilterExpression,
533             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
534             'ScanFilter' => $ScanFilter,
535             'Segment' => $Segment,
536             'Select' => $Select,
537             'TableName' => $TableName,
538             'TotalSegments' => $TotalSegments
539             });
540              
541             $self->_scan_or_query_process('Scan', $payload, $code, { ResultLimit => $Limit});
542             }
543              
544              
545             method make_request(Str :$target,
546             HashRef :$payload,
547             ) {
548             my $api_version = '20120810';
549             my $host = $self->host;
550             my $req = HTTP::Request->new(
551             POST => (($self->ssl) ? 'https' : 'http') . '://' . $self->host . ($self->port ? (':' . $self->port) : '') . '/'
552             );
553             $req->header( host => $host );
554             # Amazon requires ISO-8601 basic format
555             my $now = time;
556             my $http_date = strftime('%Y%m%dT%H%M%SZ', gmtime($now));
557             my $date = strftime('%Y%m%d', gmtime($now));
558              
559             $req->protocol('HTTP/1.1');
560             $req->header( 'Date' => $http_date );
561             $req->header( 'x-amz-target', 'DynamoDB_'. $api_version. '.'. $target );
562             $req->header( 'content-type' => 'application/x-amz-json-1.0' );
563             $payload = encode_json($payload);
564             $req->content($payload);
565             $req->header( 'Content-Length' => length($payload));
566            
567             if ($self->{use_iam_role}) {
568             my $creds = VM::EC2::Security::CredentialCache->get();
569             defined($creds) || die("Unable to retrieve IAM role credentials");
570             $self->{access_key} = $creds->accessKeyId;
571             $self->{secret_key} = $creds->secretAccessKey;
572             $req->header('x-amz-security-token' => $creds->sessionToken);
573             }
574              
575             my $signer = AWS::Signature4->new(-access_key => $self->access_key,
576             -secret_key => $self->secret_key);
577            
578             $signer->sign($req);
579             return $req;
580             }
581              
582             method _request(HTTP::Request $req) {
583             $self->implementation->request($req);
584             }
585              
586              
587             # Since scan and query have the same type of responses share the processing.
588             method _scan_or_query_process (Str $target,
589             HashRef $payload,
590             CodeRef $code,
591             HashRef $args) {
592             my $finished = 0;
593             my $records_seen = 0;
594             my $repeat = try_repeat {
595            
596             # Since we're may be making more than one request in this repeat loop
597             # decrease our limit of results to scan in each call by the number
598             # of records remaining that the overall request wanted ot pull.
599             if (defined($args->{ResultLimit})) {
600             $payload->{Limit} = $args->{ResultLimit} - $records_seen;
601             }
602              
603             my $req = $self->make_request(
604             target => $target,
605             payload => $payload,
606             );
607            
608             $self->_process_request(
609             $req,
610             sub {
611             my $result = shift;
612             my $data = decode_json($result);
613            
614             for my $entry (@{$data->{Items}}) {
615             $code->(_decode_item_attributes($entry));
616             }
617              
618             $records_seen += scalar(@{$data->{Items}});
619             if ((defined($args->{ResultLimit}) && $records_seen >= $args->{ResultLimit})) {
620             $finished = 1;
621             }
622              
623             if (!defined($data->{LastEvaluatedKey})) {
624             $finished = 1;
625             } else {
626             if (!$finished) {
627             $payload->{ExclusiveStartKey} = $data->{LastEvaluatedKey};
628             }
629             }
630            
631             if (defined($data->{LastEvaluatedKey}) && $finished) {
632             $data->{LastEvaluatedKey} = _decode_item_attributes($data->{LastEvaluatedKey});
633             }
634              
635              
636             return $data;
637             })
638             ->on_fail(sub {
639             $finished = 1;
640             });
641             } until => sub { $finished };
642             }
643              
644              
645              
646             fun _encode_type_and_value(Any $v) {
647             my $type;
648              
649             if (ref($v)) {
650             # An array maps to a sequence
651             if (ref($v) eq 'ARRAY') {
652             # Any refs mean we're sending binary data
653            
654             # Start by guessing we have an array of numeric strings,
655             # but on the first value we encoutner that is either a reference
656             # or a variable that isn't an integer or numeric. Stop.
657             $type = 'NS';
658             foreach my $value (@$v) {
659             if (ref($value)) {
660             $type = 'BS';
661             last;
662             }
663             my $element_flags = B::svref_2object(\$value)->FLAGS;
664             if ($element_flags & (B::SVp_IOK | B::SVp_NOK)) {
665             next;
666             }
667             $type = 'SS';
668             last;
669             }
670             } else {
671             ref($v) eq 'SCALAR' || Carp::confess("Reference found but not a scalar");
672             $type = 'B';
673             }
674             } else {
675             my $flags = B::svref_2object(\$v)->FLAGS;
676             if ($flags & B::SVp_POK) {
677             $type = 'S';
678             } elsif ($flags & (B::SVp_IOK | B::SVp_NOK)) {
679             $type = 'N';
680             } else {
681             $type = 'S';
682             }
683             }
684            
685             if ($type eq 'N' || $type eq 'S') {
686             defined($v) || Carp::confess("Attempt to encode undefined value");
687             return ($type, "$v");
688             } elsif ($type eq 'B') {
689             return ($type, MIME::Base64::encode_base64(${$v}, ''));
690             } elsif ($type eq 'NS' || $type eq 'SS') {
691             return ($type, [map { "$_" } @$v]);
692             } elsif ($type eq 'BS') {
693             return ($type, [map { MIME::Base64::encode_base64(${$_}, '') } @$v]);
694             } else {
695             die("Unknown type for quoting and escaping: $type");
696             }
697             }
698              
699             fun _decode_type_and_value(Str $type, Any $value) {
700             if ($type eq 'S' || $type eq 'SS') {
701             return $value;
702             } elsif ($type eq 'N') {
703             return 0+$value;
704             } elsif ($type eq 'B') {
705             return MIME::Base64::decode_base64($value);
706             } elsif ($type eq 'BS') {
707             return [map { MIME::Base64::decode_base64($_) } @$value];
708             } elsif ($type eq 'NS') {
709             return [map { 0+$_} @$value];
710             } else {
711             die("Don't know how to decode type: $type");
712             }
713             }
714              
715              
716             fun _decode_item_attributes(Maybe[HashRef] $item) {
717             my $r;
718             foreach my $key (keys %$item) {
719             my $type = (keys %{$item->{$key}})[0];
720             my $value = $item->{$key}->{$type};
721             $r->{$key} = _decode_type_and_value($type, $item->{$key}->{$type});
722             }
723             return $r;
724             }
725              
726             method _process_request(HTTP::Request $req, CodeRef $done?) {
727             my $current_retry = 0;
728             my $do_retry = 1;
729             try_repeat {
730             $do_retry = 0;
731            
732             my $sleep_amount = 0;
733             if ($current_retry > 0) {
734             $sleep_amount = (2 ** $current_retry * 50)/1000;
735             }
736              
737             my $complete = sub {
738             $self->_request($req)->transform(
739             fail => sub {
740             my ($status, $resp, $req)= @_;
741             my $r;
742             if (defined($resp) && defined($resp->code)) {
743             if ($resp->code == 500) {
744             $do_retry = 1;
745             $current_retry++;
746             } elsif ($resp->code == 400) {
747             my $json = $resp->can('decoded_content')
748             ? $resp->decoded_content
749             : $resp->body; # Mojo
750             $r = decode_json($json);
751             if ($r->{__type} =~ /ProvisionedThroughputExceededException$/) {
752             # Need to sleep
753             $do_retry = 1;
754             $current_retry++;
755            
756            
757             } else {
758             # extract the type into a better prettyier name.
759             if ($r->{__type} =~ /^com\.amazonaws\.dynamodb\.v20120810#(.+)$/) {
760             $r->{type} = $1;
761             }
762             }
763             }
764             }
765            
766             if (defined($self->max_retries()) && $current_retry > $self->max_retries()) {
767             $do_retry = 0;
768             }
769              
770             if (!$do_retry) {
771             if ($self->debug_failures()) {
772             print "DynamoDB Failure: $status\n";
773             if (defined($resp)) {
774             print "response:\n";
775             print $resp->as_string() . "\n";
776             }
777             if (defined($req)) {
778             print "Request:\n";
779             print $req->as_string() . "\n";
780             }
781             }
782             return $r || $status;
783             }
784             },
785             done => $done);
786             };
787              
788             if ($sleep_amount > 0) {
789             $self->{implementation}->delay($sleep_amount)->then($complete);
790             } else {
791             $complete->();
792             }
793             } until => sub { !$do_retry };
794             }
795              
796             my $encode_key = sub {
797             my $source = shift;
798             my $r;
799             foreach my $k (keys %$source) {
800             my $v = $source->{$k};
801             # There is no sense in encoding undefined values or values that
802             # are the empty string.
803             if (defined($v) && $v ne '') {
804             # Reference $source->{$k} since the earlier test may cause
805             # the value to be stringified.
806             $r->{$k} = { _encode_type_and_value($source->{$k}) };
807             }
808             }
809             return $r;
810             };
811              
812              
813             fun _encode_attribute_value_list(Any $value_list, Str $compare_op) {
814             if ($compare_op =~ /^(EQ|NE|LE|LT|GE|GT|CONTAINS|NOT_CONTAINS|BEGINS_WITH)$/) {
815             defined($value_list) || Carp::confess("No defined value for comparison operator: $compare_op");
816             $value_list = [ { _encode_type_and_value($value_list) } ];
817             } elsif ($compare_op eq 'IN') {
818             if (!ref($value_list)) {
819             $value_list = [$value_list];
820             }
821             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
822             } elsif ($compare_op eq 'BETWEEN') {
823             ref($value_list) eq 'ARRAY' || Carp::confess("Use of BETWEEN comparison operator requires an array");
824             scalar(@$value_list) == 2 || Carp::confess("BETWEEN comparison operator requires two values");
825             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
826             }
827             return $value_list;
828             }
829              
830             my $encode_filter = sub {
831             my $source = shift;
832              
833             my $r;
834              
835             foreach my $field_name (keys %$source) {
836             my $f = $source->{$field_name};
837             my $compare_op = $f->{ComparisonOperator} // 'EQ';
838             $compare_op =~ /^(EQ|NE|LE|LT|GE|GT|NOT_NULL|NULL|CONTAINS|NOT_CONTAINS|BEGINS_WITH|IN|BETWEEN)$/
839             || Carp::confess("Unknown comparison operator specified: $compare_op");
840            
841             $r->{$field_name} = {
842             ComparisonOperator => $compare_op,
843             (defined($f->{AttributeValueList}) ? (AttributeValueList => _encode_attribute_value_list($f->{AttributeValueList}, $compare_op)) : ())
844             };
845             }
846             return $r;
847             };
848              
849             my $parameter_type_definitions = {
850             AttributesToGet => {},
851             AttributeUpdates => {
852             encode => sub {
853             my $source = shift;
854             my $r;
855             ref($source) eq 'HASH' || Carp::confess("Attribute updates is not a hash ref");
856             foreach my $k (keys %$source) {
857             my $op = $source->{$k};
858             ref($op) eq 'HASH' || Carp::confess("AttributeUpdate for field $k is not a hash ref:" . Data::Dumper->Dump([$op]));
859             $r->{$k} = {
860             (defined($op->{Action}) ? (Action => $op->{Action}) : ()),
861             (defined($op->{Value}) ? (Value => { _encode_type_and_value($op->{Value}) }) : ()),
862             };
863             }
864             return $r;
865             }
866             },
867             # should be a boolean
868             ConsistentRead => {},
869             ConditionalOperator => {},
870             ConditionExpression => {},
871             ExclusiveStartKey => {
872             encode => $encode_key,
873             },
874             ExclusiveStartTableName => {},
875             ExpressionAttributeNames => {},
876             ExpressionAttributeValues => {
877             encode => sub {
878             my $source = shift;
879             my $r;
880             foreach my $key (grep { defined($source->{$_}) } keys %$source) {
881             $r->{$key} = { _encode_type_and_value($source->{$key}) };
882             }
883             return $r;
884             }
885             },
886             Expected => {
887             encode => sub {
888             my $source = shift;
889             my $r;
890             foreach my $key (keys %$source) {
891             my $info = $source->{$key};
892              
893             if (defined($info->{AttributeValueList}) ) {
894             $r->{$key}->{AttributeValueList} = _encode_attribute_value_list($info->{AttributeValueList}, $info->{ComparisonOperator});
895             }
896              
897             if (defined($info->{Exists})) {
898             $r->{$key}->{Exists} = $info->{Exists};
899             }
900              
901             if (defined($info->{ComparisonOperator})) {
902             $r->{$key}->{ComparisonOperator} = $info->{ComparisonOperator};
903             }
904            
905             if (defined($info->{Value})) {
906             $r->{$key}->{Value} = { _encode_type_and_value($info->{Value}) };
907             }
908             }
909             return $r;
910             },
911             },
912             FilterExpression => {},
913             IndexName => {},
914             Item => {
915             encode => $encode_key,
916             },
917             Key => {
918             encode => $encode_key,
919             },
920             Limit => {
921             type_check => 'integer',
922             },
923             QueryFilter => {
924             encode => $encode_filter,
925             },
926             ReturnConsumedCapacity => {},
927             ReturnItemCollectionMetrics => {},
928             ReturnValues => {},
929             ScanIndexForward => {},
930             ScanFilter => {
931             encode => $encode_filter,
932             },
933             Segment => {
934             type_check => 'integer',
935             },
936             Select => {},
937             TableName => {},
938             TotalSegments => {
939             type_check => 'integer',
940             },
941             UpdateExpression => {},
942             };
943              
944              
945              
946              
947             # Build a parameter hash from all of the standardized parameters.
948             sub _make_payload {
949             my $args = shift;
950             my @field_names = @_;
951              
952             if (scalar(@field_names) == 0) {
953             @field_names = keys %$args;
954             }
955              
956             my %r;
957             foreach my $field_name (@field_names) {
958             my $value = $args->{$field_name};
959             if (!defined($value)) {
960             next;
961             }
962             my $def = $parameter_type_definitions->{$field_name} || Carp::confess("Unknown parameter type: $field_name");
963             if (defined($value)) {
964             if ($def->{type_check} && $def->{type_check} eq 'integer') {
965             $value =~ /^\d+$/ || Carp::confess("$field_name is specified to be an integer but the value is not an integer: $value");
966             $value = int($value);
967             }
968             }
969              
970             if (defined($def->{encode})) {
971             $value = $def->{encode}->($value);
972             }
973              
974             if (defined($value)) {
975             $r{$field_name} = $value;
976             }
977             }
978             return \%r;
979             }
980              
981             fun _decode_single_item_change_response(Str $response) {
982             my $r = decode_json($response);
983             if (defined($r->{Attributes})) {
984             $r->{Attributes} = _decode_item_attributes($r->{Attributes});
985             }
986            
987             if (defined($r->{ItemCollectionMetrics})) {
988             foreach my $key (keys %{$r->{ItemCollectionMetrics}}) {
989             foreach my $key_part (keys %{$r->{ItemCollectionMetrics}->{$key}}) {
990             $r->{ItemCollectionMetrics}->{$key}->{$key_part} = _decode_item_attributes($r->{ItemCollectionMetrics}->{$key})
991             }
992             }
993             }
994             return $r;
995             }
996              
997              
998             fun _create_key_schema(ArrayRef $source, HashRef $known_fields) {
999             defined($source) || die("No source passed to create_key_schema");
1000             defined($known_fields) || die("No known fields passed to create_key_schmea");
1001             my @r;
1002             foreach my $field_name (@$source) {
1003             defined($known_fields->{$field_name}) || Carp::confess("Unknown field specified '$field_name' in schema, must be defined in fields. schema:" . Data::Dumper->Dump([$source]));
1004             push @r, {
1005             AttributeName => $field_name,
1006             KeyType => (scalar(@r) ? 'RANGE' : 'HASH')
1007             };
1008             }
1009             return \@r;
1010             };
1011              
1012              
1013              
1014             1;
1015              
1016             __END__
1017              
1018             =pod
1019              
1020             =encoding UTF-8
1021              
1022             =head1 NAME
1023              
1024             Amazon::DynamoDB::20120810
1025              
1026             =head1 VERSION
1027              
1028             version 0.35
1029              
1030             =head1 DESCRIPTION
1031              
1032             =head2 new
1033              
1034             Instantiates the API object.
1035              
1036             Expects the following named parameters:
1037              
1038             =over 4
1039              
1040             =item * implementation - the object which provides a Future-returning C<request> method,
1041             see L<Amazon::DynamoDB::NaHTTP> for example.
1042              
1043             =item * host - the host (IP or hostname) to communicate with
1044              
1045             =item * port - the port to use for HTTP(S) requests
1046              
1047             =item * ssl - true for HTTPS, false for HTTP
1048              
1049             =item * algorithm - which signing algorithm to use, default AWS4-HMAC-SHA256
1050              
1051             =item * scope - the scope for requests, typically C<region/host/aws4_request>
1052              
1053             =item * access_key - the access key for signing requests
1054              
1055             =item * secret_key - the secret key for signing requests
1056              
1057             =item * debug_failures - print errors if they occur
1058              
1059             =item * max_retries - maximum number of retries for a request
1060              
1061             =back
1062              
1063             =head2 create_table
1064              
1065             Creates a new table. It may take some time before the table is marked
1066             as active - use L</wait_for_table_status> to poll until the status changes.
1067              
1068             Amazon Documentation:
1069              
1070             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html>
1071              
1072             $ddb->create_table(
1073             TableName => $table_name,
1074             ReadCapacityUnits => 2,
1075             WriteCapacityUnits => 2,
1076             AttributeDefinitions => {
1077             user_id => 'N',
1078             date => 'N',
1079             },
1080             KeySchema => ['user_id', 'date'],
1081             LocalSecondaryIndexes => [
1082             {
1083             IndexName => 'UserDateIndex',
1084             KeySchema => ['user_id', 'date'],
1085             Projection => {
1086             ProjectionType => 'KEYS_ONLY',
1087             },
1088             ProvisionedThroughput => {
1089             ReadCapacityUnits => 2,
1090             WriteCapacityUnits => 2,
1091             }
1092             }
1093             ]
1094             );
1095              
1096             =back
1097              
1098             =head2 describe_table
1099              
1100             Describes the given table.
1101              
1102             Amazon Documentation:
1103              
1104             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeTable.html>
1105              
1106             $ddb->describe_table(TableName => $table_name);
1107              
1108             =head2 delete_table
1109              
1110             Delete a table.
1111              
1112             Amazon Documentation:
1113              
1114             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteTable.html>
1115              
1116             $ddb->delete_table(TableName => $table_name)
1117              
1118             =head2 wait_for_table_status
1119              
1120             Waits for the given table to be marked as active.
1121              
1122             =over 4
1123              
1124             =item * TableName - the table name
1125              
1126             =item * WaitInterval - default wait interval in seconds.
1127              
1128             =item * DesiredStatus - status to expect before completing. Defaults to ACTIVE
1129              
1130             =back
1131              
1132             $ddb->wait_for_table_status(TableName => $table_name);
1133              
1134             =head2 each_table
1135              
1136             Run code for all current tables.
1137              
1138             Takes a coderef as the first parameter, will call this for each table found.
1139              
1140             Amazon Documentation:
1141              
1142             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ListTables.html>
1143              
1144             my @all_tables;
1145             $ddb->each_table(
1146             sub {
1147             my $table_name =shift;
1148             push @all_tables, $table_name;
1149             });
1150              
1151             =head2 put_item
1152              
1153             Writes a single item to the table.
1154              
1155             Amazon Documentation:
1156              
1157             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html>
1158              
1159             $ddb->put_item(
1160             TableName => $table_name,
1161             Item => {
1162             name => 'Test Name'
1163             },
1164             ReturnValues => 'ALL_OLD');
1165              
1166             =head2 update_item
1167              
1168             Updates a single item in the table.
1169              
1170             Amazon Documentation:
1171              
1172             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html>
1173              
1174             $ddb->update_item(
1175             TableName => $table_name,
1176             Key => {
1177             user_id => 2
1178             },
1179             AttributeUpdates => {
1180             name => {
1181             Action => 'PUT',
1182             Value => "Rusty Conover-3",
1183             },
1184             favorite_color => {
1185             Action => 'DELETE'
1186             },
1187             test_numbers => {
1188             Action => 'DELETE',
1189             Value => [500]
1190             },
1191             added_number => {
1192             Action => 'ADD',
1193             Value => 5,
1194             },
1195             subtracted_number => {
1196             Action => 'ADD',
1197             Value => -5,
1198             },
1199             });
1200              
1201             =head2 delete_item
1202              
1203             Deletes a single item from the table.
1204              
1205             Amazon Documentation:
1206              
1207             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html>
1208              
1209             $ddb->delete_item(
1210             TableName => $table_name,
1211             Key => {
1212             user_id => 5
1213             });
1214              
1215             =head2 get_item
1216              
1217             Retrieve an items from one tables.
1218              
1219             Amazon Documentation:
1220              
1221             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html>
1222              
1223             my $found_item;
1224             my $get = $ddb->get_item(
1225             sub {
1226             $found_item = shift;
1227             },
1228             TableName => $table_name,
1229             Key => {
1230             user_id => 6
1231             });
1232              
1233             =head2 batch_write_item
1234              
1235             Put or delete a collection of items.
1236              
1237             Has no restriction on the number of items able to be processed at one time.
1238              
1239             Amazon Documentation:
1240              
1241             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html>
1242              
1243             $ddb->batch_write_item(
1244             RequestItems => {
1245             books => [
1246             {
1247             DeleteRequest => {
1248             book_id => 3000,
1249             }
1250             },
1251             ],
1252             users => [
1253             {
1254             PutRequest => {
1255             user_id => 3000,
1256             name => "Test batch write",
1257             }
1258             },
1259             {
1260             PutRequest => {
1261             user_id => 3001,
1262             name => "Test batch write",
1263             }
1264             }
1265             ]
1266             });
1267              
1268             =head2 batch_get_item
1269              
1270             Retrieve a batch of items from one or more tables.
1271              
1272             Takes a coderef which will be called for each found item.
1273              
1274             Amazon Documentation:
1275              
1276             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html>
1277              
1278             Additional Parameters:
1279              
1280             =over
1281              
1282             =item * ResultLimit - limit on the total number of results to return.
1283              
1284             =back
1285              
1286             $ddb->batch_get_item(
1287             sub {
1288             my ($table, $item) = @_;
1289             },
1290             RequestItems => {
1291             $table_name => {
1292             ConsistentRead => 'true',
1293             AttributesToGet => ['user_id', 'name'],
1294             Keys => [
1295             {
1296             user_id => 1,
1297             },
1298             ],
1299             }
1300             })
1301              
1302             =head2 scan
1303              
1304             Scan a table for values with an optional filter expression.
1305              
1306             Amazon Documentation:
1307              
1308             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html>
1309              
1310             Additional parameters:
1311              
1312             =back
1313              
1314             $ddb->scan(
1315             sub {
1316             my $item = shift;
1317             push @found_items, $item;
1318             },
1319             TableName => $table_name,
1320             ScanFilter => {
1321             user_id => {
1322             ComparisonOperator => 'NOT_NULL',
1323             }
1324             });
1325              
1326             =head1 NAME
1327              
1328             Amazon::DynamoDB::20120810 - interact with DynamoDB using API version 20120810
1329              
1330             =head1 METHODS - Internal
1331              
1332             The following methods are intended for internal use and are documented
1333             purely for completeness - for normal operations see L</METHODS> instead.
1334              
1335             =head2 make_request
1336              
1337             Generates an L<HTTP::Request>.
1338              
1339             =head1 FUNCTIONS - Internal
1340              
1341             =head2 _encode_type_and_value
1342              
1343             Returns an appropriate type (N, S, SS etc.) and stringified/encoded value for the given
1344             value.
1345              
1346             DynamoDB only uses strings even if there is a Numeric value specified,
1347             so while the type will be expressed as a Number the value will be
1348             stringified.
1349              
1350             C<http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataFormat.html>
1351              
1352             =head1 AUTHORS
1353              
1354             =over 4
1355              
1356             =item *
1357              
1358             Rusty Conover <rusty@luckydinosaur.com>
1359              
1360             =item *
1361              
1362             Tom Molesworth <cpan@entitymodel.com>
1363              
1364             =back
1365              
1366             =head1 COPYRIGHT AND LICENSE
1367              
1368             This software is copyright (c) 2013 by Tom Molesworth, copyright (c) 2014 Lucky Dinosaur LLC. L<http://www.luckydinosaur.com>.
1369              
1370             This is free software; you can redistribute it and/or modify it under
1371             the same terms as the Perl 5 programming language system itself.
1372              
1373             =cut