File Coverage

lib/Amazon/DynamoDB/20120810.pm
Criterion Covered Total %
statement 43 45 95.5
branch n/a
condition n/a
subroutine 15 15 100.0
pod n/a
total 58 60 96.6


line stmt bran cond sub pod time code
1             package Amazon::DynamoDB::20120810;
2             $Amazon::DynamoDB::20120810::VERSION = '0.34';
3 11     11   47 use strict;
  11         20  
  11         318  
4 11     11   39 use warnings;
  11         14  
  11         250  
5              
6              
7 11     11   6578 use Future;
  11         74286  
  11         368  
8 11     11   6027 use Future::Utils qw(repeat try_repeat);
  11         19066  
  11         676  
9 11     11   5286 use POSIX qw(strftime);
  11         53794  
  11         54  
10 11     11   14730 use JSON::MaybeXS qw(decode_json encode_json);
  11         44702  
  11         627  
11 11     11   5184 use MIME::Base64;
  11         6015  
  11         666  
12 11     11   60 use List::Util;
  11         11  
  11         575  
13 11     11   4710 use List::MoreUtils;
  11         73388  
  11         74  
14 11     11   3512 use B qw(svref_2object);
  11         18  
  11         636  
15 11     11   5078 use HTTP::Request;
  11         183631  
  11         353  
16 11     11   4717 use Kavorka;
  11         103021  
  11         101  
17 11     11   1522130 use Amazon::DynamoDB::Types;
  11         37  
  11         115  
18 11     11   12535 use Type::Registry;
  11         78291  
  11         78  
19 11     11   15465 use VM::EC2::Security::CredentialCache;
  0            
  0            
20             use AWS::Signature4;
21            
22             BEGIN {
23             my $reg = "Type::Registry"->for_me;
24             $reg->add_types(-Standard);
25             $reg->add_types("Amazon::DynamoDB::Types");
26             };
27              
28              
29              
30             sub new {
31             my $class = shift;
32             bless { @_ }, $class
33             }
34              
35             sub implementation { shift->{implementation} }
36             sub host { shift->{host} }
37             sub port { shift->{port} }
38             sub ssl { shift->{ssl} }
39             sub algorithm { 'AWS4-HMAC-SHA256' }
40             sub scope { shift->{scope} }
41             sub access_key { shift->{access_key} }
42             sub secret_key { shift->{secret_key} }
43             sub debug_failures { shift->{debug} }
44              
45             sub max_retries { shift->{max_retries} }
46              
47              
48              
49              
50              
51             method create_table(TableNameType :$TableName!,
52             Int :$ReadCapacityUnits = 2,
53             Int :$WriteCapacityUnits = 2,
54             AttributeDefinitionsType :$AttributeDefinitions,
55             KeySchemaType :$KeySchema!,
56             ArrayRef[GlobalSecondaryIndexType] :$GlobalSecondaryIndexes where { scalar(@$_) <= 5 },
57             ArrayRef[LocalSecondaryIndexType] :$LocalSecondaryIndexes
58             ) {
59             my %payload = (
60             TableName => $TableName,
61             ProvisionedThroughput => {
62             ReadCapacityUnits => int($ReadCapacityUnits),
63             WriteCapacityUnits => int($WriteCapacityUnits),
64             }
65             );
66              
67             if (defined($AttributeDefinitions)) {
68             foreach my $field_name (keys %$AttributeDefinitions) {
69             my $type = $AttributeDefinitions->{$field_name};
70              
71             push @{$payload{AttributeDefinitions}}, {
72             AttributeName => $field_name,
73             AttributeType => $type // 'S',
74             }
75             }
76             }
77              
78             $payload{KeySchema} = _create_key_schema($KeySchema, $AttributeDefinitions);
79              
80             foreach my $index_record (['GlobalSecondaryIndexes', $GlobalSecondaryIndexes],
81             ['LocalSecondaryIndexes', $LocalSecondaryIndexes]) {
82             my $index_type = $index_record->[0];
83             my $index = $index_record->[1];
84            
85             if (defined($index)) {
86             foreach my $i (@$index) {
87             my $r = {
88             IndexName => $i->{IndexName},
89             (($index_type eq 'GlobalSecondaryIndexes') ?
90             (ProvisionedThroughput => {
91             ReadCapacityUnits => int($i->{ProvisionedThroughput}->{ReadCapacityUnits} // 1),
92             WriteCapacityUnits => int($i->{ProvisionedThroughput}->{WriteCapacityUnits} // 1),
93             }) : ()),
94             KeySchema => _create_key_schema($i->{KeySchema}, $AttributeDefinitions),
95             };
96              
97             my $type = $i->{Projection}->{ProjectionType};
98             $r->{Projection}->{ProjectionType} = $type;
99            
100             if (defined($i->{Projection}->{NonKeyAttributes})) {
101             my $attrs = $i->{Projection}->{NonKeyAttributes};
102             # Can't validate these attribute names since they aren't part of the key.
103             $r->{Projection}->{NonKeyAttributes} = $attrs;
104             }
105             push @{$payload{$index_type}}, $r;
106             }
107             }
108             }
109              
110             my $req = $self->make_request(
111             target => 'CreateTable',
112             payload => \%payload,
113             );
114             $self->_process_request($req)
115             }
116              
117              
118             method describe_table(TableNameType :$TableName!) {
119             my $req = $self->make_request(
120             target => 'DescribeTable',
121             payload => _make_payload({
122             TableName => $TableName
123             }));
124             $self->_process_request($req,
125             sub {
126             my $content = shift;
127             decode_json($content)->{Table};
128             });
129             }
130              
131              
132             method delete_table(TableNameType :$TableName!) {
133             my $req = $self->make_request(
134             target => 'DeleteTable',
135             payload => _make_payload({ TableName => $TableName }));
136             $self->_process_request($req,
137             sub {
138             my $content = shift;
139             decode_json($content)->{TableDescription}
140             });
141             }
142              
143              
144             method wait_for_table_status(TableNameType :$TableName!,
145             Int :$WaitInterval = 2,
146             TableStatusType :$DesiredStatus = "ACTIVE") {
147             repeat {
148             my $retry = shift;
149            
150             $self->{implementation}->delay($retry ? $WaitInterval : 0)
151             ->then(sub {
152             $self->describe_table(TableName => $TableName)
153             });
154             } until => sub {
155             my $f = shift;
156             my $status = $f->get->{TableStatus};
157             $status eq $DesiredStatus
158             };
159             }
160              
161              
162             method each_table(CodeRef $code,
163             TableNameType :$ExclusiveStartTableName,
164             Int :$Limit where { $_ >= 0 && $_ <= 100}
165             ) {
166             my $finished = 0;
167             try_repeat {
168             my $req = $self->make_request(
169             target => 'ListTables',
170             payload => _make_payload({
171             ExclusiveStartTableName => $ExclusiveStartTableName,
172             Limit => $Limit
173             }));
174             $self->_process_request($req,
175             sub {
176             my $result = shift;
177             my $data = decode_json($result);
178             for my $tbl (@{$data->{TableNames}}) {
179             $code->($tbl);
180             }
181             $ExclusiveStartTableName = $data->{LastEvaluatedTableName};
182             if (!defined($ExclusiveStartTableName)) {
183             $finished = 1
184             }
185             });
186             } while => sub { !$finished };
187             }
188              
189              
190             method put_item (ConditionalOperatorType :$ConditionalOperator,
191             Str :$ConditionExpression,
192             ItemType :$Item!,
193             ExpectedType :$Expected,
194             ExpressionAttributeValuesType :$ExpressionAttributeValues,
195             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
196             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
197             ReturnValuesType :$ReturnValues,
198             TableNameType :$TableName!) {
199             my $req = $self->make_request(
200             target => 'PutItem',
201             payload => _make_payload({
202             'ConditionalOperator' => $ConditionalOperator,
203             'Expected' => $Expected,
204             'ConditionExpression' => $ConditionExpression,
205             'ExpressionAttributeValues' => $ExpressionAttributeValues,
206             'Item' => $Item,
207             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
208             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
209             'ReturnValues' => $ReturnValues,
210             'TableName' => $TableName
211             }));
212            
213             $self->_process_request($req, \&_decode_single_item_change_response);
214             }
215              
216              
217              
218             method update_item (AttributeUpdatesType :$AttributeUpdates,
219             Str :$ConditionExpression,
220             ConditionalOperatorType :$ConditionalOperator,
221             ExpectedType :$Expected,
222             KeyType :$Key!,
223             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
224             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
225             ReturnValuesType :$ReturnValues,
226             TableNameType :$TableName!,
227             ExpressionAttributeValuesType :$ExpressionAttributeValues,
228             ExpressionAttributeNamesType :$ExpressionAttributeNames,
229             Str :$UpdateExpression,
230             ) {
231             (defined($AttributeUpdates) xor defined($UpdateExpression)) || die("Either AttributeUpdates or UpdateExpression is required");
232            
233             my $req = $self->make_request(
234             target => 'UpdateItem',
235             payload => _make_payload({
236             'AttributeUpdates' => $AttributeUpdates,
237             'ConditionalOperator' => $ConditionalOperator,
238             'ConditionExpression' => $ConditionExpression,
239             'Expected' => $Expected,
240             'ExpressionAttributeNames' => $ExpressionAttributeNames,
241             'ExpressionAttributeValues' => $ExpressionAttributeValues,
242             'Key' => $Key,
243             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
244             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
245             'ReturnValues' => $ReturnValues,
246             'TableName' => $TableName,
247             'UpdateExpression' => $UpdateExpression,
248             }));
249             $self->_process_request($req, \&_decode_single_item_change_response);
250             }
251              
252              
253              
254              
255             method delete_item(ConditionalOperatorType :$ConditionalOperator,
256             ExpectedType :$Expected,
257             KeyType :$Key!,
258             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
259             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
260             ReturnValuesType :$ReturnValues,
261             TableNameType :$TableName!) {
262             my $req = $self->make_request(
263             target => 'DeleteItem',
264             payload => _make_payload({
265             'ConditionalOperator' => $ConditionalOperator,
266             'Expected' => $Expected,
267             'Key' => $Key,
268             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
269             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
270             'ReturnValues' => $ReturnValues,
271             'TableName' => $TableName
272             }));
273            
274             $self->_process_request($req, \&_decode_single_item_change_response);
275             }
276              
277              
278              
279              
280             method get_item(CodeRef $code,
281             AttributesToGetType :$AttributesToGet,
282             StringBooleanType :$ConsistentRead,
283             KeyType :$Key!,
284             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
285             TableNameType :$TableName!) {
286             my $req = $self->make_request(
287             target => 'GetItem',
288             payload => _make_payload({
289             'AttributesToGet' => $AttributesToGet,
290             'ConsistentRead' => $ConsistentRead,
291             'Key' => $Key,
292             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
293             'TableName' => $TableName
294             }));
295            
296             $self->_process_request(
297             $req,
298             sub {
299             my $result = shift;
300             my $data = decode_json($result);
301             $code->(_decode_item_attributes($data->{Item}));
302             });
303             }
304              
305              
306              
307              
308             method batch_write_item(BatchWriteRequestItemsType :$RequestItems! where { scalar(keys %$_) > 0 },
309             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
310             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
311             ) {
312             my @all_requests;
313              
314             foreach my $table_name (keys %$RequestItems) {
315             # Item.
316             my $table_items = $RequestItems->{$table_name};
317            
318             my $seen_type;
319             foreach my $item (@$table_items) {
320             my $r;
321             foreach my $t (['DeleteRequest', 'Key'], ['PutRequest', 'Item']) {
322             if (defined($item->{$t->[0]})) {
323             my $key = $item->{$t->[0]}->{$t->[1]};
324             foreach my $k (keys %$key) {
325             # Don't bother encoding undefined values, same behavior as put_item
326             if (defined($key->{$k})) {
327             $r->{$t->[0]}->{$t->[1]}->{$k} = { _encode_type_and_value($key->{$k}) };
328             }
329             }
330             }
331             }
332             if (defined($r)) {
333             push @all_requests, [$table_name, $r];
334             }
335             }
336             }
337              
338             try_repeat {
339             my %payload = (
340             ReturnConsumedCapacity => $ReturnConsumedCapacity,
341             ReturnItemCollectionMetrics => $ReturnItemCollectionMetrics
342             );
343              
344             # print "Pending requests: " . scalar(@all_requests) . "\n";
345             my @records = splice @all_requests, 0, List::Util::min(25, scalar(@all_requests));
346            
347              
348             foreach my $record (@records) {
349             push @{$payload{RequestItems}->{$record->[0]}}, $record->[1];
350             }
351            
352              
353             my $req = $self->make_request(
354             target => 'BatchWriteItem',
355             payload => \%payload,
356             );
357              
358             $self->_process_request(
359             $req,
360             sub {
361             my $result = shift;
362             my $data = decode_json($result);
363            
364             if (defined($data->{UnprocessedItems})) {
365             foreach my $table_name (keys %{$data->{UnprocessedItems}}) {
366             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedItems}->{$table_name}};
367             }
368             }
369             return $data;
370             })->on_fail(sub {
371             @all_requests = ();
372             });
373             } until => sub { scalar(@all_requests) == 0 };
374             }
375              
376              
377              
378              
379              
380             method batch_get_item(CodeRef $code,
381             BatchGetItemsType :$RequestItems!,
382             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
383             Int :$ResultLimit where { !defined($_) || $_ > 0 }
384             ) {
385             my @all_requests;
386             my $table_flags = {};
387              
388             foreach my $table_name (keys %$RequestItems) {
389             my $table_details = $RequestItems->{$table_name};
390              
391             # Store these flags for later.
392             map {
393             if (defined($table_details->{$_})) {
394             $table_flags->{$_} = $table_details->{$_};
395             }
396             } ('ConsistentRead', 'AttributesToGet');
397              
398             foreach my $item (@{$table_details->{Keys}}) {
399             my $r = {};
400             foreach my $key_field (keys %$item) {
401             $r->{$key_field} = { _encode_type_and_value($item->{$key_field}) };
402             }
403             push @all_requests, [$table_name, $r];
404             }
405             }
406              
407             my $records_seen =0;
408             try_repeat {
409              
410             my %payload = (
411             ReturnConsumedCapacity => $ReturnConsumedCapacity
412             );
413              
414             # Only try 100 requests at one time.
415             my @records = splice @all_requests, 0, List::Util::min(100, scalar(@all_requests));
416              
417              
418             foreach my $record (@records) {
419             push @{$payload{RequestItems}->{$record->[0]}->{Keys}}, $record->[1];
420             }
421            
422             foreach my $seen_table_name (grep { defined($table_flags->{$_}) } List::MoreUtils::uniq(map { $_->[0] } @records)) {
423             $payload{RequestItems}->{$seen_table_name} = {
424             %{$table_flags->{$seen_table_name}},
425             Keys => $payload{RequestItems}->{$seen_table_name}->{Keys}
426             };
427             }
428              
429             my $req = $self->make_request(
430             target => 'BatchGetItem',
431             payload => \%payload,
432             );
433              
434             $self->_process_request(
435             $req,
436             sub {
437             my $result = shift;
438             my $data = decode_json($result);
439             foreach my $table_name (keys %{$data->{Responses}}) {
440             foreach my $item (@{$data->{Responses}->{$table_name}}) {
441             $code->($table_name, _decode_item_attributes($item));
442             $records_seen += 1;
443             if (defined($ResultLimit) &&$records_seen >= $ResultLimit) {
444             @all_requests = ();
445             return $data;
446             }
447             }
448             }
449            
450             if (defined($data->{UnprocessedKeys})) {
451             foreach my $table_name (keys %{$data->{UnprocessedKeys}}) {
452             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedKeys}->{$table_name}->{Keys}};
453             }
454             }
455             return $data;
456             })->on_fail(sub {
457             @all_requests = ();
458             });
459             } until => sub { scalar(@all_requests) == 0 };
460             }
461              
462              
463             method query (CodeRef $code,
464             AttributesToGetType :$AttributesToGet,
465             StringBooleanType :$ConsistentRead,
466             ConditionalOperatorType :$ConditionalOperator,
467             KeyType :$ExclusiveStartKey,
468             TableNameType :$IndexName,
469             KeyConditionsType :$KeyConditions!,
470             Int :$Limit where { $_ >= 0 },
471             QueryFilterType :$QueryFilter,
472             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
473             StringBooleanType :$ScanIndexForward,
474             SelectType :$Select,
475             TableNameType :$TableName!,
476             Str :$FilterExpression,
477             ExpressionAttributeValuesType :$ExpressionAttributeValues,
478             ExpressionAttributeNamesType :$ExpressionAttributeNames,
479             ) {
480              
481             my $payload = _make_payload({
482             'AttributesToGet' => $AttributesToGet,
483             'ConsistentRead' => $ConsistentRead,
484             'ConditionalOperator' => $ConditionalOperator,
485             'ExclusiveStartKey' => $ExclusiveStartKey,
486             'ExpressionAttributeNames' => $ExpressionAttributeNames,
487             'ExpressionAttributeValues' => $ExpressionAttributeValues,
488             'FilterExpression' => $FilterExpression,
489             'IndexName' => $IndexName,
490             'QueryFilter' => $QueryFilter,
491             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
492             'ScanIndexForward' => $ScanIndexForward,
493             'Select' => $Select,
494             'TableName' => $TableName
495             });
496            
497              
498             foreach my $key_name (keys %$KeyConditions) {
499             my $key_details = $KeyConditions->{$key_name};
500             $payload->{KeyConditions}->{$key_name} = {
501             AttributeValueList => _encode_attribute_value_list($key_details->{AttributeValueList}, $key_details->{ComparisonOperator}),
502             ComparisonOperator => $key_details->{ComparisonOperator}
503             };
504             }
505              
506             $self->_scan_or_query_process('Query', $payload, $code, { ResultLimit => $Limit});
507             }
508              
509              
510              
511              
512              
513             method scan (CodeRef $code,
514             AttributesToGetType :$AttributesToGet,
515             KeyType :$ExclusiveStartKey,
516             Int :$Limit where { $_ >= 0},
517             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
518             ScanFilterType :$ScanFilter,
519             Int :$Segment where { $_ >= 0 },
520             SelectType :$Select,
521             TableNameType :$TableName!,
522             Int :$TotalSegments where { $_ >= 1 && $_ <= 1000000 },
523             Str :$FilterExpression,
524             ExpressionAttributeValuesType :$ExpressionAttributeValues,
525             ExpressionAttributeNamesType :$ExpressionAttributeNames,
526             ) {
527             my $payload = _make_payload({
528             'AttributesToGet' => $AttributesToGet,
529             'ExclusiveStartKey' => $ExclusiveStartKey,
530             'ExpressionAttributeValues' => $ExpressionAttributeValues,
531             'ExpressionAttributeNames' => $ExpressionAttributeNames,
532             'FilterExpression' => $FilterExpression,
533             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
534             'ScanFilter' => $ScanFilter,
535             'Segment' => $Segment,
536             'Select' => $Select,
537             'TableName' => $TableName,
538             'TotalSegments' => $TotalSegments
539             });
540              
541             $self->_scan_or_query_process('Scan', $payload, $code, { ResultLimit => $Limit});
542             }
543              
544              
545             method make_request(Str :$target,
546             HashRef :$payload,
547             ) {
548             my $api_version = '20120810';
549             my $host = $self->host;
550             my $req = HTTP::Request->new(
551             POST => (($self->ssl) ? 'https' : 'http') . '://' . $self->host . ($self->port ? (':' . $self->port) : '') . '/'
552             );
553             $req->header( host => $host );
554             # Amazon requires ISO-8601 basic format
555             my $now = time;
556             my $http_date = strftime('%Y%m%dT%H%M%SZ', gmtime($now));
557             my $date = strftime('%Y%m%d', gmtime($now));
558              
559             $req->protocol('HTTP/1.1');
560             $req->header( 'Date' => $http_date );
561             $req->header( 'x-amz-target', 'DynamoDB_'. $api_version. '.'. $target );
562             $req->header( 'content-type' => 'application/x-amz-json-1.0' );
563             $payload = encode_json($payload);
564             $req->content($payload);
565             $req->header( 'Content-Length' => length($payload));
566            
567             if ($self->{use_iam_role}) {
568             my $creds = VM::EC2::Security::CredentialCache->get();
569             defined($creds) || die("Unable to retrieve IAM role credentials");
570             $self->{access_key} = $creds->accessKeyId;
571             $self->{secret_key} = $creds->secretAccessKey;
572             $req->header('x-amz-security-token' => $creds->sessionToken);
573             }
574              
575             my $signer = AWS::Signature4->new(-access_key => $self->access_key,
576             -secret_key => $self->secret_key);
577            
578             $signer->sign($req);
579             return $req;
580             }
581              
582             method _request(HTTP::Request $req) {
583             $self->implementation->request($req);
584             }
585              
586              
587             # Since scan and query have the same type of responses share the processing.
588             method _scan_or_query_process (Str $target,
589             HashRef $payload,
590             CodeRef $code,
591             HashRef $args) {
592             my $finished = 0;
593             my $records_seen = 0;
594             my $repeat = try_repeat {
595            
596             # Since we're may be making more than one request in this repeat loop
597             # decrease our limit of results to scan in each call by the number
598             # of records remaining that the overall request wanted ot pull.
599             if (defined($args->{ResultLimit})) {
600             $payload->{Limit} = $args->{ResultLimit} - $records_seen;
601             }
602              
603             my $req = $self->make_request(
604             target => $target,
605             payload => $payload,
606             );
607            
608             $self->_process_request(
609             $req,
610             sub {
611             my $result = shift;
612             my $data = decode_json($result);
613            
614             for my $entry (@{$data->{Items}}) {
615             $code->(_decode_item_attributes($entry));
616             }
617              
618             $records_seen += scalar(@{$data->{Items}});
619             if ((defined($args->{ResultLimit}) && $records_seen >= $args->{ResultLimit})) {
620             $finished = 1;
621             }
622              
623             if (!defined($data->{LastEvaluatedKey})) {
624             $finished = 1;
625             } else {
626             if (!$finished) {
627             $payload->{ExclusiveStartKey} = $data->{LastEvaluatedKey};
628             }
629             }
630            
631             if (defined($data->{LastEvaluatedKey}) && $finished) {
632             $data->{LastEvaluatedKey} = _decode_item_attributes($data->{LastEvaluatedKey});
633             }
634              
635              
636             return $data;
637             })
638             ->on_fail(sub {
639             $finished = 1;
640             });
641             } until => sub { $finished };
642             }
643              
644              
645              
646             fun _encode_type_and_value(Any $v) {
647             my $type;
648              
649             if (ref($v)) {
650             # An array maps to a sequence
651             if (ref($v) eq 'ARRAY') {
652             # Any refs mean we're sending binary data
653            
654             # Start by guessing we have an array of numeric strings,
655             # but on the first value we encoutner that is either a reference
656             # or a variable that isn't an integer or numeric. Stop.
657             $type = 'NS';
658             foreach my $value (@$v) {
659             if (ref($value)) {
660             $type = 'BS';
661             last;
662             }
663             my $element_flags = B::svref_2object(\$value)->FLAGS;
664             if ($element_flags & (B::SVp_IOK | B::SVp_NOK)) {
665             next;
666             }
667             $type = 'SS';
668             last;
669             }
670             } else {
671             ref($v) eq 'SCALAR' || Carp::confess("Reference found but not a scalar");
672             $type = 'B';
673             }
674             } else {
675             my $flags = B::svref_2object(\$v)->FLAGS;
676             if ($flags & B::SVp_POK) {
677             $type = 'S';
678             } elsif ($flags & (B::SVp_IOK | B::SVp_NOK)) {
679             $type = 'N';
680             } else {
681             $type = 'S';
682             }
683             }
684            
685             if ($type eq 'N' || $type eq 'S') {
686             defined($v) || Carp::confess("Attempt to encode undefined value");
687             return ($type, "$v");
688             } elsif ($type eq 'B') {
689             return ($type, MIME::Base64::encode_base64(${$v}, ''));
690             } elsif ($type eq 'NS' || $type eq 'SS') {
691             return ($type, [map { "$_" } @$v]);
692             } elsif ($type eq 'BS') {
693             return ($type, [map { MIME::Base64::encode_base64(${$_}, '') } @$v]);
694             } else {
695             die("Unknown type for quoting and escaping: $type");
696             }
697             }
698              
699             fun _decode_type_and_value(Str $type, Any $value) {
700             if ($type eq 'S' || $type eq 'SS') {
701             return $value;
702             } elsif ($type eq 'N') {
703             return 0+$value;
704             } elsif ($type eq 'B') {
705             return MIME::Base64::decode_base64($value);
706             } elsif ($type eq 'BS') {
707             return [map { MIME::Base64::decode_base64($_) } @$value];
708             } elsif ($type eq 'NS') {
709             return [map { 0+$_} @$value];
710             } else {
711             die("Don't know how to decode type: $type");
712             }
713             }
714              
715              
716             fun _decode_item_attributes(Maybe[HashRef] $item) {
717             my $r;
718             foreach my $key (keys %$item) {
719             my $type = (keys %{$item->{$key}})[0];
720             my $value = $item->{$key}->{$type};
721             $r->{$key} = _decode_type_and_value($type, $item->{$key}->{$type});
722             }
723             return $r;
724             }
725              
726             method _process_request(HTTP::Request $req, CodeRef $done?) {
727             my $current_retry = 0;
728             my $do_retry = 1;
729             try_repeat {
730             $do_retry = 0;
731            
732             my $sleep_amount = 0;
733             if ($current_retry > 0) {
734             $sleep_amount = (2 ** $current_retry * 50)/1000;
735             }
736              
737             my $complete = sub {
738             $self->_request($req)->transform(
739             fail => sub {
740             my ($status, $resp, $req)= @_;
741             my $r;
742             if (defined($resp) && defined($resp->code)) {
743             if ($resp->code == 500) {
744             $do_retry = 1;
745             $current_retry++;
746             } elsif ($resp->code == 400) {
747             my $json = $resp->can('decoded_content')
748             ? $resp->decoded_content
749             : $resp->body; # Mojo
750             $r = decode_json($json);
751             if ($r->{__type} =~ /ProvisionedThroughputExceededException$/) {
752             # Need to sleep
753             $do_retry = 1;
754             $current_retry++;
755            
756             if (defined($self->max_retries()) && $current_retry > $self->max_retries()) {
757             $do_retry = 0;
758             }
759            
760             } else {
761             # extract the type into a better prettyier name.
762             if ($r->{__type} =~ /^com\.amazonaws\.dynamodb\.v20120810#(.+)$/) {
763             $r->{type} = $1;
764             }
765             }
766             }
767             }
768              
769             if (!$do_retry) {
770             if ($self->debug_failures()) {
771             print "DynamoDB Failure: $status\n";
772             if (defined($resp)) {
773             print "response:\n";
774             print $resp->as_string() . "\n";
775             }
776             if (defined($req)) {
777             print "Request:\n";
778             print $req->as_string() . "\n";
779             }
780             }
781             return $r || $status;
782             }
783             },
784             done => $done);
785             };
786              
787             if ($sleep_amount > 0) {
788             $self->{implementation}->delay($sleep_amount)->then($complete);
789             } else {
790             $complete->();
791             }
792             } until => sub { !$do_retry };
793             }
794              
795             my $encode_key = sub {
796             my $source = shift;
797             my $r;
798             foreach my $k (keys %$source) {
799             my $v = $source->{$k};
800             # There is no sense in encoding undefined values or values that
801             # are the empty string.
802             if (defined($v) && $v ne '') {
803             # Reference $source->{$k} since the earlier test may cause
804             # the value to be stringified.
805             $r->{$k} = { _encode_type_and_value($source->{$k}) };
806             }
807             }
808             return $r;
809             };
810              
811              
812             fun _encode_attribute_value_list(Any $value_list, Str $compare_op) {
813             if ($compare_op =~ /^(EQ|NE|LE|LT|GE|GT|CONTAINS|NOT_CONTAINS|BEGINS_WITH)$/) {
814             defined($value_list) || Carp::confess("No defined value for comparison operator: $compare_op");
815             $value_list = [ { _encode_type_and_value($value_list) } ];
816             } elsif ($compare_op eq 'IN') {
817             if (!ref($value_list)) {
818             $value_list = [$value_list];
819             }
820             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
821             } elsif ($compare_op eq 'BETWEEN') {
822             ref($value_list) eq 'ARRAY' || Carp::confess("Use of BETWEEN comparison operator requires an array");
823             scalar(@$value_list) == 2 || Carp::confess("BETWEEN comparison operator requires two values");
824             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
825             }
826             return $value_list;
827             }
828              
829             my $encode_filter = sub {
830             my $source = shift;
831              
832             my $r;
833              
834             foreach my $field_name (keys %$source) {
835             my $f = $source->{$field_name};
836             my $compare_op = $f->{ComparisonOperator} // 'EQ';
837             $compare_op =~ /^(EQ|NE|LE|LT|GE|GT|NOT_NULL|NULL|CONTAINS|NOT_CONTAINS|BEGINS_WITH|IN|BETWEEN)$/
838             || Carp::confess("Unknown comparison operator specified: $compare_op");
839            
840             $r->{$field_name} = {
841             ComparisonOperator => $compare_op,
842             (defined($f->{AttributeValueList}) ? (AttributeValueList => _encode_attribute_value_list($f->{AttributeValueList}, $compare_op)) : ())
843             };
844             }
845             return $r;
846             };
847              
848             my $parameter_type_definitions = {
849             AttributesToGet => {},
850             AttributeUpdates => {
851             encode => sub {
852             my $source = shift;
853             my $r;
854             ref($source) eq 'HASH' || Carp::confess("Attribute updates is not a hash ref");
855             foreach my $k (keys %$source) {
856             my $op = $source->{$k};
857             ref($op) eq 'HASH' || Carp::confess("AttributeUpdate for field $k is not a hash ref:" . Data::Dumper->Dump([$op]));
858             $r->{$k} = {
859             (defined($op->{Action}) ? (Action => $op->{Action}) : ()),
860             (defined($op->{Value}) ? (Value => { _encode_type_and_value($op->{Value}) }) : ()),
861             };
862             }
863             return $r;
864             }
865             },
866             # should be a boolean
867             ConsistentRead => {},
868             ConditionalOperator => {},
869             ConditionExpression => {},
870             ExclusiveStartKey => {
871             encode => $encode_key,
872             },
873             ExclusiveStartTableName => {},
874             ExpressionAttributeNames => {},
875             ExpressionAttributeValues => {
876             encode => sub {
877             my $source = shift;
878             my $r;
879             foreach my $key (grep { defined($source->{$_}) } keys %$source) {
880             $r->{$key} = { _encode_type_and_value($source->{$key}) };
881             }
882             return $r;
883             }
884             },
885             Expected => {
886             encode => sub {
887             my $source = shift;
888             my $r;
889             foreach my $key (keys %$source) {
890             my $info = $source->{$key};
891              
892             if (defined($info->{AttributeValueList}) ) {
893             $r->{$key}->{AttributeValueList} = _encode_attribute_value_list($info->{AttributeValueList}, $info->{ComparisonOperator});
894             }
895              
896             if (defined($info->{Exists})) {
897             $r->{$key}->{Exists} = $info->{Exists};
898             }
899              
900             if (defined($info->{ComparisonOperator})) {
901             $r->{$key}->{ComparisonOperator} = $info->{ComparisonOperator};
902             }
903            
904             if (defined($info->{Value})) {
905             $r->{$key}->{Value} = { _encode_type_and_value($info->{Value}) };
906             }
907             }
908             return $r;
909             },
910             },
911             FilterExpression => {},
912             IndexName => {},
913             Item => {
914             encode => $encode_key,
915             },
916             Key => {
917             encode => $encode_key,
918             },
919             Limit => {
920             type_check => 'integer',
921             },
922             QueryFilter => {
923             encode => $encode_filter,
924             },
925             ReturnConsumedCapacity => {},
926             ReturnItemCollectionMetrics => {},
927             ReturnValues => {},
928             ScanIndexForward => {},
929             ScanFilter => {
930             encode => $encode_filter,
931             },
932             Segment => {
933             type_check => 'integer',
934             },
935             Select => {},
936             TableName => {},
937             TotalSegments => {
938             type_check => 'integer',
939             },
940             UpdateExpression => {},
941             };
942              
943              
944              
945              
946             # Build a parameter hash from all of the standardized parameters.
947             sub _make_payload {
948             my $args = shift;
949             my @field_names = @_;
950              
951             if (scalar(@field_names) == 0) {
952             @field_names = keys %$args;
953             }
954              
955             my %r;
956             foreach my $field_name (@field_names) {
957             my $value = $args->{$field_name};
958             if (!defined($value)) {
959             next;
960             }
961             my $def = $parameter_type_definitions->{$field_name} || Carp::confess("Unknown parameter type: $field_name");
962             if (defined($value)) {
963             if ($def->{type_check} && $def->{type_check} eq 'integer') {
964             $value =~ /^\d+$/ || Carp::confess("$field_name is specified to be an integer but the value is not an integer: $value");
965             $value = int($value);
966             }
967             }
968              
969             if (defined($def->{encode})) {
970             $value = $def->{encode}->($value);
971             }
972              
973             if (defined($value)) {
974             $r{$field_name} = $value;
975             }
976             }
977             return \%r;
978             }
979              
980             fun _decode_single_item_change_response(Str $response) {
981             my $r = decode_json($response);
982             if (defined($r->{Attributes})) {
983             $r->{Attributes} = _decode_item_attributes($r->{Attributes});
984             }
985            
986             if (defined($r->{ItemCollectionMetrics})) {
987             foreach my $key (keys %{$r->{ItemCollectionMetrics}}) {
988             foreach my $key_part (keys %{$r->{ItemCollectionMetrics}->{$key}}) {
989             $r->{ItemCollectionMetrics}->{$key}->{$key_part} = _decode_item_attributes($r->{ItemCollectionMetrics}->{$key})
990             }
991             }
992             }
993             return $r;
994             }
995              
996              
997             fun _create_key_schema(ArrayRef $source, HashRef $known_fields) {
998             defined($source) || die("No source passed to create_key_schema");
999             defined($known_fields) || die("No known fields passed to create_key_schmea");
1000             my @r;
1001             foreach my $field_name (@$source) {
1002             defined($known_fields->{$field_name}) || Carp::confess("Unknown field specified '$field_name' in schema, must be defined in fields. schema:" . Data::Dumper->Dump([$source]));
1003             push @r, {
1004             AttributeName => $field_name,
1005             KeyType => (scalar(@r) ? 'RANGE' : 'HASH')
1006             };
1007             }
1008             return \@r;
1009             };
1010              
1011              
1012              
1013             1;
1014              
1015             __END__
1016              
1017             =pod
1018              
1019             =encoding UTF-8
1020              
1021             =head1 NAME
1022              
1023             Amazon::DynamoDB::20120810
1024              
1025             =head1 VERSION
1026              
1027             version 0.34
1028              
1029             =head1 DESCRIPTION
1030              
1031             =head2 new
1032              
1033             Instantiates the API object.
1034              
1035             Expects the following named parameters:
1036              
1037             =over 4
1038              
1039             =item * implementation - the object which provides a Future-returning C<request> method,
1040             see L<Amazon::DynamoDB::NaHTTP> for example.
1041              
1042             =item * host - the host (IP or hostname) to communicate with
1043              
1044             =item * port - the port to use for HTTP(S) requests
1045              
1046             =item * ssl - true for HTTPS, false for HTTP
1047              
1048             =item * algorithm - which signing algorithm to use, default AWS4-HMAC-SHA256
1049              
1050             =item * scope - the scope for requests, typically C<region/host/aws4_request>
1051              
1052             =item * access_key - the access key for signing requests
1053              
1054             =item * secret_key - the secret key for signing requests
1055              
1056             =item * debug_failures - print errors if they occur
1057              
1058             =item * max_retries - maximum number of retries for a request
1059              
1060             =back
1061              
1062             =head2 create_table
1063              
1064             Creates a new table. It may take some time before the table is marked
1065             as active - use L</wait_for_table_status> to poll until the status changes.
1066              
1067             Amazon Documentation:
1068              
1069             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html>
1070              
1071             $ddb->create_table(
1072             TableName => $table_name,
1073             ReadCapacityUnits => 2,
1074             WriteCapacityUnits => 2,
1075             AttributeDefinitions => {
1076             user_id => 'N',
1077             date => 'N',
1078             },
1079             KeySchema => ['user_id', 'date'],
1080             LocalSecondaryIndexes => [
1081             {
1082             IndexName => 'UserDateIndex',
1083             KeySchema => ['user_id', 'date'],
1084             Projection => {
1085             ProjectionType => 'KEYS_ONLY',
1086             },
1087             ProvisionedThroughput => {
1088             ReadCapacityUnits => 2,
1089             WriteCapacityUnits => 2,
1090             }
1091             }
1092             ]
1093             );
1094              
1095             =back
1096              
1097             =head2 describe_table
1098              
1099             Describes the given table.
1100              
1101             Amazon Documentation:
1102              
1103             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeTable.html>
1104              
1105             $ddb->describe_table(TableName => $table_name);
1106              
1107             =head2 delete_table
1108              
1109             Delete a table.
1110              
1111             Amazon Documentation:
1112              
1113             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteTable.html>
1114              
1115             $ddb->delete_table(TableName => $table_name)
1116              
1117             =head2 wait_for_table_status
1118              
1119             Waits for the given table to be marked as active.
1120              
1121             =over 4
1122              
1123             =item * TableName - the table name
1124              
1125             =item * WaitInterval - default wait interval in seconds.
1126              
1127             =item * DesiredStatus - status to expect before completing. Defaults to ACTIVE
1128              
1129             =back
1130              
1131             $ddb->wait_for_table_status(TableName => $table_name);
1132              
1133             =head2 each_table
1134              
1135             Run code for all current tables.
1136              
1137             Takes a coderef as the first parameter, will call this for each table found.
1138              
1139             Amazon Documentation:
1140              
1141             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ListTables.html>
1142              
1143             my @all_tables;
1144             $ddb->each_table(
1145             sub {
1146             my $table_name =shift;
1147             push @all_tables, $table_name;
1148             });
1149              
1150             =head2 put_item
1151              
1152             Writes a single item to the table.
1153              
1154             Amazon Documentation:
1155              
1156             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html>
1157              
1158             $ddb->put_item(
1159             TableName => $table_name,
1160             Item => {
1161             name => 'Test Name'
1162             },
1163             ReturnValues => 'ALL_OLD');
1164              
1165             =head2 update_item
1166              
1167             Updates a single item in the table.
1168              
1169             Amazon Documentation:
1170              
1171             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html>
1172              
1173             $ddb->update_item(
1174             TableName => $table_name,
1175             Key => {
1176             user_id => 2
1177             },
1178             AttributeUpdates => {
1179             name => {
1180             Action => 'PUT',
1181             Value => "Rusty Conover-3",
1182             },
1183             favorite_color => {
1184             Action => 'DELETE'
1185             },
1186             test_numbers => {
1187             Action => 'DELETE',
1188             Value => [500]
1189             },
1190             added_number => {
1191             Action => 'ADD',
1192             Value => 5,
1193             },
1194             subtracted_number => {
1195             Action => 'ADD',
1196             Value => -5,
1197             },
1198             });
1199              
1200             =head2 delete_item
1201              
1202             Deletes a single item from the table.
1203              
1204             Amazon Documentation:
1205              
1206             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html>
1207              
1208             $ddb->delete_item(
1209             TableName => $table_name,
1210             Key => {
1211             user_id => 5
1212             });
1213              
1214             =head2 get_item
1215              
1216             Retrieve an items from one tables.
1217              
1218             Amazon Documentation:
1219              
1220             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html>
1221              
1222             my $found_item;
1223             my $get = $ddb->get_item(
1224             sub {
1225             $found_item = shift;
1226             },
1227             TableName => $table_name,
1228             Key => {
1229             user_id => 6
1230             });
1231              
1232             =head2 batch_write_item
1233              
1234             Put or delete a collection of items.
1235              
1236             Has no restriction on the number of items able to be processed at one time.
1237              
1238             Amazon Documentation:
1239              
1240             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html>
1241              
1242             $ddb->batch_write_item(
1243             RequestItems => {
1244             books => [
1245             {
1246             DeleteRequest => {
1247             book_id => 3000,
1248             }
1249             },
1250             ],
1251             users => [
1252             {
1253             PutRequest => {
1254             user_id => 3000,
1255             name => "Test batch write",
1256             }
1257             },
1258             {
1259             PutRequest => {
1260             user_id => 3001,
1261             name => "Test batch write",
1262             }
1263             }
1264             ]
1265             });
1266              
1267             =head2 batch_get_item
1268              
1269             Retrieve a batch of items from one or more tables.
1270              
1271             Takes a coderef which will be called for each found item.
1272              
1273             Amazon Documentation:
1274              
1275             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html>
1276              
1277             Additional Parameters:
1278              
1279             =over
1280              
1281             =item * ResultLimit - limit on the total number of results to return.
1282              
1283             =back
1284              
1285             $ddb->batch_get_item(
1286             sub {
1287             my ($table, $item) = @_;
1288             },
1289             RequestItems => {
1290             $table_name => {
1291             ConsistentRead => 'true',
1292             AttributesToGet => ['user_id', 'name'],
1293             Keys => [
1294             {
1295             user_id => 1,
1296             },
1297             ],
1298             }
1299             })
1300              
1301             =head2 scan
1302              
1303             Scan a table for values with an optional filter expression.
1304              
1305             Amazon Documentation:
1306              
1307             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html>
1308              
1309             Additional parameters:
1310              
1311             =back
1312              
1313             $ddb->scan(
1314             sub {
1315             my $item = shift;
1316             push @found_items, $item;
1317             },
1318             TableName => $table_name,
1319             ScanFilter => {
1320             user_id => {
1321             ComparisonOperator => 'NOT_NULL',
1322             }
1323             });
1324              
1325             =head1 NAME
1326              
1327             Amazon::DynamoDB::20120810 - interact with DynamoDB using API version 20120810
1328              
1329             =head1 METHODS - Internal
1330              
1331             The following methods are intended for internal use and are documented
1332             purely for completeness - for normal operations see L</METHODS> instead.
1333              
1334             =head2 make_request
1335              
1336             Generates an L<HTTP::Request>.
1337              
1338             =head1 FUNCTIONS - Internal
1339              
1340             =head2 _encode_type_and_value
1341              
1342             Returns an appropriate type (N, S, SS etc.) and stringified/encoded value for the given
1343             value.
1344              
1345             DynamoDB only uses strings even if there is a Numeric value specified,
1346             so while the type will be expressed as a Number the value will be
1347             stringified.
1348              
1349             C<http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataFormat.html>
1350              
1351             =head1 AUTHORS
1352              
1353             =over 4
1354              
1355             =item *
1356              
1357             Rusty Conover <rusty@luckydinosaur.com>
1358              
1359             =item *
1360              
1361             Tom Molesworth <cpan@entitymodel.com>
1362              
1363             =back
1364              
1365             =head1 COPYRIGHT AND LICENSE
1366              
1367             This software is copyright (c) 2013 by Tom Molesworth, copyright (c) 2014 Lucky Dinosaur LLC. L<http://www.luckydinosaur.com>.
1368              
1369             This is free software; you can redistribute it and/or modify it under
1370             the same terms as the Perl 5 programming language system itself.
1371              
1372             =cut