File Coverage

lib/Amazon/DynamoDB/20120810.pm
Criterion Covered Total %
statement 25 27 92.5
branch n/a
condition n/a
subroutine 9 9 100.0
pod n/a
total 34 36 94.4


line stmt bran cond sub pod time code
1             package Amazon::DynamoDB::20120810;
2             $Amazon::DynamoDB::20120810::VERSION = '0.33';
3 9     9   41 use strict;
  9         10  
  9         253  
4 9     9   28 use warnings;
  9         11  
  9         165  
5              
6              
7 9     9   5123 use Future;
  9         62695  
  9         327  
8 9     9   4743 use Future::Utils qw(repeat try_repeat);
  9         16063  
  9         543  
9 9     9   4285 use POSIX qw(strftime);
  9         42874  
  9         46  
10 9     9   11838 use JSON::MaybeXS qw(decode_json encode_json);
  9         34196  
  9         595  
11 9     9   3863 use MIME::Base64;
  9         4688  
  9         629  
12 9     9   51 use List::Util;
  9         12  
  9         455  
13 9     9   8094 use List::MoreUtils;
  0            
  0            
14             use B qw(svref_2object);
15             use HTTP::Request;
16             use Kavorka;
17             use Amazon::DynamoDB::Types;
18             use Type::Registry;
19             use VM::EC2::Security::CredentialCache;
20             use AWS::Signature4;
21            
22             BEGIN {
23             my $reg = "Type::Registry"->for_me;
24             $reg->add_types(-Standard);
25             $reg->add_types("Amazon::DynamoDB::Types");
26             };
27              
28              
29              
30             sub new {
31             my $class = shift;
32             bless { @_ }, $class
33             }
34              
35             sub implementation { shift->{implementation} }
36             sub host { shift->{host} }
37             sub port { shift->{port} }
38             sub ssl { shift->{ssl} }
39             sub algorithm { 'AWS4-HMAC-SHA256' }
40             sub scope { shift->{scope} }
41             sub access_key { shift->{access_key} }
42             sub secret_key { shift->{secret_key} }
43             sub debug_failures { shift->{debug} }
44              
45             sub max_retries { shift->{max_retries} }
46              
47              
48              
49              
50              
51             method create_table(TableNameType :$TableName!,
52             Int :$ReadCapacityUnits = 2,
53             Int :$WriteCapacityUnits = 2,
54             AttributeDefinitionsType :$AttributeDefinitions,
55             KeySchemaType :$KeySchema!,
56             ArrayRef[GlobalSecondaryIndexType] :$GlobalSecondaryIndexes where { scalar(@$_) <= 5 },
57             ArrayRef[LocalSecondaryIndexType] :$LocalSecondaryIndexes
58             ) {
59             my %payload = (
60             TableName => $TableName,
61             ProvisionedThroughput => {
62             ReadCapacityUnits => int($ReadCapacityUnits),
63             WriteCapacityUnits => int($WriteCapacityUnits),
64             }
65             );
66              
67             if (defined($AttributeDefinitions)) {
68             foreach my $field_name (keys %$AttributeDefinitions) {
69             my $type = $AttributeDefinitions->{$field_name};
70              
71             push @{$payload{AttributeDefinitions}}, {
72             AttributeName => $field_name,
73             AttributeType => $type // 'S',
74             }
75             }
76             }
77              
78             $payload{KeySchema} = _create_key_schema($KeySchema, $AttributeDefinitions);
79              
80             foreach my $index_record (['GlobalSecondaryIndexes', $GlobalSecondaryIndexes],
81             ['LocalSecondaryIndexes', $LocalSecondaryIndexes]) {
82             my $index_type = $index_record->[0];
83             my $index = $index_record->[1];
84            
85             if (defined($index)) {
86             foreach my $i (@$index) {
87             my $r = {
88             IndexName => $i->{IndexName},
89             (($index_type eq 'GlobalSecondaryIndexes') ?
90             (ProvisionedThroughput => {
91             ReadCapacityUnits => int($i->{ProvisionedThroughput}->{ReadCapacityUnits} // 1),
92             WriteCapacityUnits => int($i->{ProvisionedThroughput}->{WriteCapacityUnits} // 1),
93             }) : ()),
94             KeySchema => _create_key_schema($i->{KeySchema}, $AttributeDefinitions),
95             };
96              
97             my $type = $i->{Projection}->{ProjectionType};
98             $r->{Projection}->{ProjectionType} = $type;
99            
100             if (defined($i->{Projection}->{NonKeyAttributes})) {
101             my $attrs = $i->{Projection}->{NonKeyAttributes};
102             # Can't validate these attribute names since they aren't part of the key.
103             $r->{Projection}->{NonKeyAttributes} = $attrs;
104             }
105             push @{$payload{$index_type}}, $r;
106             }
107             }
108             }
109              
110             my $req = $self->make_request(
111             target => 'CreateTable',
112             payload => \%payload,
113             );
114             $self->_process_request($req)
115             }
116              
117              
118             method describe_table(TableNameType :$TableName!) {
119             my $req = $self->make_request(
120             target => 'DescribeTable',
121             payload => _make_payload({
122             TableName => $TableName
123             }));
124             $self->_process_request($req,
125             sub {
126             my $content = shift;
127             decode_json($content)->{Table};
128             });
129             }
130              
131              
132             method delete_table(TableNameType :$TableName!) {
133             my $req = $self->make_request(
134             target => 'DeleteTable',
135             payload => _make_payload({ TableName => $TableName }));
136             $self->_process_request($req,
137             sub {
138             my $content = shift;
139             decode_json($content)->{TableDescription}
140             });
141             }
142              
143              
144             method wait_for_table_status(TableNameType :$TableName!,
145             Int :$WaitInterval = 2,
146             TableStatusType :$DesiredStatus = "ACTIVE") {
147             repeat {
148             my $retry = shift;
149            
150             $self->{implementation}->delay($retry ? $WaitInterval : 0)
151             ->then(sub {
152             $self->describe_table(TableName => $TableName)
153             });
154             } until => sub {
155             my $f = shift;
156             my $status = $f->get->{TableStatus};
157             $status eq $DesiredStatus
158             };
159             }
160              
161              
162             method each_table(CodeRef $code,
163             TableNameType :$ExclusiveStartTableName,
164             Int :$Limit where { $_ >= 0 && $_ <= 100}
165             ) {
166             my $finished = 0;
167             try_repeat {
168             my $req = $self->make_request(
169             target => 'ListTables',
170             payload => _make_payload({
171             ExclusiveStartTableName => $ExclusiveStartTableName,
172             Limit => $Limit
173             }));
174             $self->_process_request($req,
175             sub {
176             my $result = shift;
177             my $data = decode_json($result);
178             for my $tbl (@{$data->{TableNames}}) {
179             $code->($tbl);
180             }
181             $ExclusiveStartTableName = $data->{LastEvaluatedTableName};
182             if (!defined($ExclusiveStartTableName)) {
183             $finished = 1
184             }
185             });
186             } while => sub { !$finished };
187             }
188              
189              
190             method put_item (ConditionalOperatorType :$ConditionalOperator,
191             Str :$ConditionExpression,
192             ItemType :$Item!,
193             ExpectedType :$Expected,
194             ExpressionAttributeValuesType :$ExpressionAttributeValues,
195             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
196             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
197             ReturnValuesType :$ReturnValues,
198             TableNameType :$TableName!) {
199             my $req = $self->make_request(
200             target => 'PutItem',
201             payload => _make_payload({
202             'ConditionalOperator' => $ConditionalOperator,
203             'Expected' => $Expected,
204             'ConditionExpression' => $ConditionExpression,
205             'ExpressionAttributeValues' => $ExpressionAttributeValues,
206             'Item' => $Item,
207             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
208             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
209             'ReturnValues' => $ReturnValues,
210             'TableName' => $TableName
211             }));
212            
213             $self->_process_request($req, \&_decode_single_item_change_response);
214             }
215              
216              
217              
218             method update_item (AttributeUpdatesType :$AttributeUpdates!,
219             ConditionalOperatorType :$ConditionalOperator,
220             ExpectedType :$Expected,
221             KeyType :$Key!,
222             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
223             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
224             ReturnValuesType :$ReturnValues,
225             TableNameType :$TableName!) {
226             my $req = $self->make_request(
227             target => 'UpdateItem',
228             payload => _make_payload({
229             'AttributeUpdates' => $AttributeUpdates,
230             'ConditionalOperator' => $ConditionalOperator,
231             'Expected' => $Expected,
232             'Key' => $Key,
233             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
234             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
235             'ReturnValues' => $ReturnValues,
236             'TableName' => $TableName
237             }));
238             $self->_process_request($req, \&_decode_single_item_change_response);
239             }
240              
241              
242              
243              
244             method delete_item(ConditionalOperatorType :$ConditionalOperator,
245             ExpectedType :$Expected,
246             KeyType :$Key!,
247             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
248             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
249             ReturnValuesType :$ReturnValues,
250             TableNameType :$TableName!) {
251             my $req = $self->make_request(
252             target => 'DeleteItem',
253             payload => _make_payload({
254             'ConditionalOperator' => $ConditionalOperator,
255             'Expected' => $Expected,
256             'Key' => $Key,
257             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
258             'ReturnItemCollectionMetrics' => $ReturnItemCollectionMetrics,
259             'ReturnValues' => $ReturnValues,
260             'TableName' => $TableName
261             }));
262            
263             $self->_process_request($req, \&_decode_single_item_change_response);
264             }
265              
266              
267              
268              
269             method get_item(CodeRef $code,
270             AttributesToGetType :$AttributesToGet,
271             StringBooleanType :$ConsistentRead,
272             KeyType :$Key!,
273             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
274             TableNameType :$TableName!) {
275             my $req = $self->make_request(
276             target => 'GetItem',
277             payload => _make_payload({
278             'AttributesToGet' => $AttributesToGet,
279             'ConsistentRead' => $ConsistentRead,
280             'Key' => $Key,
281             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
282             'TableName' => $TableName
283             }));
284            
285             $self->_process_request(
286             $req,
287             sub {
288             my $result = shift;
289             my $data = decode_json($result);
290             $code->(_decode_item_attributes($data->{Item}));
291             });
292             }
293              
294              
295              
296              
297             method batch_write_item(BatchWriteRequestItemsType :$RequestItems! where { scalar(keys %$_) > 0 },
298             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
299             ReturnItemCollectionMetricsType :$ReturnItemCollectionMetrics,
300             ) {
301             my @all_requests;
302              
303             foreach my $table_name (keys %$RequestItems) {
304             # Item.
305             my $table_items = $RequestItems->{$table_name};
306            
307             my $seen_type;
308             foreach my $item (@$table_items) {
309             my $r;
310             foreach my $t (['DeleteRequest', 'Key'], ['PutRequest', 'Item']) {
311             if (defined($item->{$t->[0]})) {
312             my $key = $item->{$t->[0]}->{$t->[1]};
313             foreach my $k (keys %$key) {
314             # Don't bother encoding undefined values, same behavior as put_item
315             if (defined($key->{$k})) {
316             $r->{$t->[0]}->{$t->[1]}->{$k} = { _encode_type_and_value($key->{$k}) };
317             }
318             }
319             }
320             }
321             if (defined($r)) {
322             push @all_requests, [$table_name, $r];
323             }
324             }
325             }
326              
327             try_repeat {
328             my %payload = (
329             ReturnConsumedCapacity => $ReturnConsumedCapacity,
330             ReturnItemCollectionMetrics => $ReturnItemCollectionMetrics
331             );
332              
333             # print "Pending requests: " . scalar(@all_requests) . "\n";
334             my @records = splice @all_requests, 0, List::Util::min(25, scalar(@all_requests));
335            
336              
337             foreach my $record (@records) {
338             push @{$payload{RequestItems}->{$record->[0]}}, $record->[1];
339             }
340            
341              
342             my $req = $self->make_request(
343             target => 'BatchWriteItem',
344             payload => \%payload,
345             );
346              
347             $self->_process_request(
348             $req,
349             sub {
350             my $result = shift;
351             my $data = decode_json($result);
352            
353             if (defined($data->{UnprocessedItems})) {
354             foreach my $table_name (keys %{$data->{UnprocessedItems}}) {
355             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedItems}->{$table_name}};
356             }
357             }
358             return $data;
359             })->on_fail(sub {
360             @all_requests = ();
361             });
362             } until => sub { scalar(@all_requests) == 0 };
363             }
364              
365              
366              
367              
368              
369             method batch_get_item(CodeRef $code,
370             BatchGetItemsType :$RequestItems!,
371             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
372             Int :$ResultLimit where { !defined($_) || $_ > 0 }
373             ) {
374             my @all_requests;
375             my $table_flags = {};
376              
377             foreach my $table_name (keys %$RequestItems) {
378             my $table_details = $RequestItems->{$table_name};
379              
380             # Store these flags for later.
381             map {
382             if (defined($table_details->{$_})) {
383             $table_flags->{$_} = $table_details->{$_};
384             }
385             } ('ConsistentRead', 'AttributesToGet');
386              
387             foreach my $item (@{$table_details->{Keys}}) {
388             my $r = {};
389             foreach my $key_field (keys %$item) {
390             $r->{$key_field} = { _encode_type_and_value($item->{$key_field}) };
391             }
392             push @all_requests, [$table_name, $r];
393             }
394             }
395              
396             my $records_seen =0;
397             try_repeat {
398              
399             my %payload = (
400             ReturnConsumedCapacity => $ReturnConsumedCapacity
401             );
402              
403             # Only try 100 requests at one time.
404             my @records = splice @all_requests, 0, List::Util::min(100, scalar(@all_requests));
405              
406              
407             foreach my $record (@records) {
408             push @{$payload{RequestItems}->{$record->[0]}->{Keys}}, $record->[1];
409             }
410            
411             foreach my $seen_table_name (grep { defined($table_flags->{$_}) } List::MoreUtils::uniq(map { $_->[0] } @records)) {
412             $payload{RequestItems}->{$seen_table_name} = {
413             %{$table_flags->{$seen_table_name}},
414             Keys => $payload{RequestItems}->{$seen_table_name}->{Keys}
415             };
416             }
417              
418             my $req = $self->make_request(
419             target => 'BatchGetItem',
420             payload => \%payload,
421             );
422              
423             $self->_process_request(
424             $req,
425             sub {
426             my $result = shift;
427             my $data = decode_json($result);
428             foreach my $table_name (keys %{$data->{Responses}}) {
429             foreach my $item (@{$data->{Responses}->{$table_name}}) {
430             $code->($table_name, _decode_item_attributes($item));
431             $records_seen += 1;
432             if (defined($ResultLimit) &&$records_seen >= $ResultLimit) {
433             @all_requests = ();
434             return $data;
435             }
436             }
437             }
438            
439             if (defined($data->{UnprocessedKeys})) {
440             foreach my $table_name (keys %{$data->{UnprocessedKeys}}) {
441             push @all_requests, map { [$table_name, $_] } @{$data->{UnprocessedKeys}->{$table_name}->{Keys}};
442             }
443             }
444             return $data;
445             })->on_fail(sub {
446             @all_requests = ();
447             });
448             } until => sub { scalar(@all_requests) == 0 };
449             }
450              
451              
452             method query (CodeRef $code,
453             AttributesToGetType :$AttributesToGet,
454             StringBooleanType :$ConsistentRead,
455             ConditionalOperatorType :$ConditionalOperator,
456             KeyType :$ExclusiveStartKey,
457             TableNameType :$IndexName,
458             KeyConditionsType :$KeyConditions!,
459             Int :$Limit where { $_ >= 0 },
460             QueryFilterType :$QueryFilter,
461             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
462             StringBooleanType :$ScanIndexForward,
463             SelectType :$Select,
464             TableNameType :$TableName!,
465             Str :$FilterExpression,
466             ExpressionAttributeValuesType :$ExpressionAttributeValues,
467             ExpressionAttributeNamesType :$ExpressionAttributeNames,
468             ) {
469              
470             my $payload = _make_payload({
471             'AttributesToGet' => $AttributesToGet,
472             'ConsistentRead' => $ConsistentRead,
473             'ConditionalOperator' => $ConditionalOperator,
474             'ExclusiveStartKey' => $ExclusiveStartKey,
475             'ExpressionAttributeNames' => $ExpressionAttributeNames,
476             'ExpressionAttributeValues' => $ExpressionAttributeValues,
477             'FilterExpression' => $FilterExpression,
478             'IndexName' => $IndexName,
479             'QueryFilter' => $QueryFilter,
480             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
481             'ScanIndexForward' => $ScanIndexForward,
482             'Select' => $Select,
483             'TableName' => $TableName
484             });
485            
486              
487             foreach my $key_name (keys %$KeyConditions) {
488             my $key_details = $KeyConditions->{$key_name};
489             $payload->{KeyConditions}->{$key_name} = {
490             AttributeValueList => _encode_attribute_value_list($key_details->{AttributeValueList}, $key_details->{ComparisonOperator}),
491             ComparisonOperator => $key_details->{ComparisonOperator}
492             };
493             }
494              
495             $self->_scan_or_query_process('Query', $payload, $code, { ResultLimit => $Limit});
496             }
497              
498              
499              
500              
501              
502             method scan (CodeRef $code,
503             AttributesToGetType :$AttributesToGet,
504             KeyType :$ExclusiveStartKey,
505             Int :$Limit where { $_ >= 0},
506             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
507             ScanFilterType :$ScanFilter,
508             Int :$Segment where { $_ >= 0 },
509             SelectType :$Select,
510             TableNameType :$TableName!,
511             Int :$TotalSegments where { $_ >= 1 && $_ <= 1000000 },
512             Str :$FilterExpression,
513             ExpressionAttributeValuesType :$ExpressionAttributeValues,
514             ExpressionAttributeNamesType :$ExpressionAttributeNames,
515             ) {
516             my $payload = _make_payload({
517             'AttributesToGet' => $AttributesToGet,
518             'ExclusiveStartKey' => $ExclusiveStartKey,
519             'ExpressionAttributeValues' => $ExpressionAttributeValues,
520             'ExpressionAttributeNames' => $ExpressionAttributeNames,
521             'FilterExpression' => $FilterExpression,
522             'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
523             'ScanFilter' => $ScanFilter,
524             'Segment' => $Segment,
525             'Select' => $Select,
526             'TableName' => $TableName,
527             'TotalSegments' => $TotalSegments
528             });
529              
530             $self->_scan_or_query_process('Scan', $payload, $code, { ResultLimit => $Limit});
531             }
532              
533              
534             method make_request(Str :$target,
535             HashRef :$payload,
536             ) {
537             my $api_version = '20120810';
538             my $host = $self->host;
539             my $req = HTTP::Request->new(
540             POST => (($self->ssl) ? 'https' : 'http') . '://' . $self->host . ($self->port ? (':' . $self->port) : '') . '/'
541             );
542             $req->header( host => $host );
543             # Amazon requires ISO-8601 basic format
544             my $now = time;
545             my $http_date = strftime('%Y%m%dT%H%M%SZ', gmtime($now));
546             my $date = strftime('%Y%m%d', gmtime($now));
547              
548             $req->protocol('HTTP/1.1');
549             $req->header( 'Date' => $http_date );
550             $req->header( 'x-amz-target', 'DynamoDB_'. $api_version. '.'. $target );
551             $req->header( 'content-type' => 'application/x-amz-json-1.0' );
552             $payload = encode_json($payload);
553             $req->content($payload);
554             $req->header( 'Content-Length' => length($payload));
555            
556             if ($self->{use_iam_role}) {
557             my $creds = VM::EC2::Security::CredentialCache->get();
558             defined($creds) || die("Unable to retrieve IAM role credentials");
559             $self->{access_key} = $creds->accessKeyId;
560             $self->{secret_key} = $creds->secretAccessKey;
561             $req->header('x-amz-security-token' => $creds->sessionToken);
562             }
563              
564             my $signer = AWS::Signature4->new(-access_key => $self->access_key,
565             -secret_key => $self->secret_key);
566            
567             $signer->sign($req);
568             return $req;
569             }
570              
571             method _request(HTTP::Request $req) {
572             $self->implementation->request($req);
573             }
574              
575              
576             # Since scan and query have the same type of responses share the processing.
577             method _scan_or_query_process (Str $target,
578             HashRef $payload,
579             CodeRef $code,
580             HashRef $args) {
581             my $finished = 0;
582             my $records_seen = 0;
583             my $repeat = try_repeat {
584            
585             # Since we're may be making more than one request in this repeat loop
586             # decrease our limit of results to scan in each call by the number
587             # of records remaining that the overall request wanted ot pull.
588             if (defined($args->{ResultLimit})) {
589             $payload->{Limit} = $args->{ResultLimit} - $records_seen;
590             }
591              
592             my $req = $self->make_request(
593             target => $target,
594             payload => $payload,
595             );
596            
597             $self->_process_request(
598             $req,
599             sub {
600             my $result = shift;
601             my $data = decode_json($result);
602            
603             for my $entry (@{$data->{Items}}) {
604             $code->(_decode_item_attributes($entry));
605             }
606              
607             $records_seen += scalar(@{$data->{Items}});
608             if ((defined($args->{ResultLimit}) && $records_seen >= $args->{ResultLimit})) {
609             $finished = 1;
610             }
611              
612             if (!defined($data->{LastEvaluatedKey})) {
613             $finished = 1;
614             } else {
615             if (!$finished) {
616             $payload->{ExclusiveStartKey} = $data->{LastEvaluatedKey};
617             }
618             }
619            
620             if (defined($data->{LastEvaluatedKey}) && $finished) {
621             $data->{LastEvaluatedKey} = _decode_item_attributes($data->{LastEvaluatedKey});
622             }
623              
624              
625             return $data;
626             })
627             ->on_fail(sub {
628             $finished = 1;
629             });
630             } until => sub { $finished };
631             }
632              
633              
634              
635             fun _encode_type_and_value(Any $v) {
636             my $type;
637              
638             if (ref($v)) {
639             # An array maps to a sequence
640             if (ref($v) eq 'ARRAY') {
641             # Any refs mean we're sending binary data
642            
643             # Start by guessing we have an array of numeric strings,
644             # but on the first value we encoutner that is either a reference
645             # or a variable that isn't an integer or numeric. Stop.
646             $type = 'NS';
647             foreach my $value (@$v) {
648             if (ref($value)) {
649             $type = 'BS';
650             last;
651             }
652             my $element_flags = B::svref_2object(\$value)->FLAGS;
653             if ($element_flags & (B::SVp_IOK | B::SVp_NOK)) {
654             next;
655             }
656             $type = 'SS';
657             last;
658             }
659             } else {
660             ref($v) eq 'SCALAR' || Carp::confess("Reference found but not a scalar");
661             $type = 'B';
662             }
663             } else {
664             my $flags = B::svref_2object(\$v)->FLAGS;
665             if ($flags & B::SVp_POK) {
666             $type = 'S';
667             } elsif ($flags & (B::SVp_IOK | B::SVp_NOK)) {
668             $type = 'N';
669             } else {
670             $type = 'S';
671             }
672             }
673            
674             if ($type eq 'N' || $type eq 'S') {
675             defined($v) || Carp::confess("Attempt to encode undefined value");
676             return ($type, "$v");
677             } elsif ($type eq 'B') {
678             return ($type, MIME::Base64::encode_base64(${$v}, ''));
679             } elsif ($type eq 'NS' || $type eq 'SS') {
680             return ($type, [map { "$_" } @$v]);
681             } elsif ($type eq 'BS') {
682             return ($type, [map { MIME::Base64::encode_base64(${$_}, '') } @$v]);
683             } else {
684             die("Unknown type for quoting and escaping: $type");
685             }
686             }
687              
688             fun _decode_type_and_value(Str $type, Any $value) {
689             if ($type eq 'S' || $type eq 'SS') {
690             return $value;
691             } elsif ($type eq 'N') {
692             return 0+$value;
693             } elsif ($type eq 'B') {
694             return MIME::Base64::decode_base64($value);
695             } elsif ($type eq 'BS') {
696             return [map { MIME::Base64::decode_base64($_) } @$value];
697             } elsif ($type eq 'NS') {
698             return [map { 0+$_} @$value];
699             } else {
700             die("Don't know how to decode type: $type");
701             }
702             }
703              
704              
705             fun _decode_item_attributes(Maybe[HashRef] $item) {
706             my $r;
707             foreach my $key (keys %$item) {
708             my $type = (keys %{$item->{$key}})[0];
709             my $value = $item->{$key}->{$type};
710             $r->{$key} = _decode_type_and_value($type, $item->{$key}->{$type});
711             }
712             return $r;
713             }
714              
715             method _process_request(HTTP::Request $req, CodeRef $done?) {
716             my $current_retry = 0;
717             my $do_retry = 1;
718             try_repeat {
719             $do_retry = 0;
720            
721             my $sleep_amount = 0;
722             if ($current_retry > 0) {
723             $sleep_amount = (2 ** $current_retry * 50)/1000;
724             }
725              
726             my $complete = sub {
727             $self->_request($req)->transform(
728             fail => sub {
729             my ($status, $resp, $req)= @_;
730             my $r;
731             if (defined($resp) && defined($resp->code)) {
732             if ($resp->code == 500) {
733             $do_retry = 1;
734             $current_retry++;
735             } elsif ($resp->code == 400) {
736             my $json = $resp->can('decoded_content')
737             ? $resp->decoded_content
738             : $resp->body; # Mojo
739             $r = decode_json($json);
740             if ($r->{__type} =~ /ProvisionedThroughputExceededException$/) {
741             # Need to sleep
742             $do_retry = 1;
743             $current_retry++;
744            
745             if (defined($self->max_retries()) && $current_retry > $self->max_retries()) {
746             $do_retry = 0;
747             }
748            
749             } else {
750             # extract the type into a better prettyier name.
751             if ($r->{__type} =~ /^com\.amazonaws\.dynamodb\.v20120810#(.+)$/) {
752             $r->{type} = $1;
753             }
754             }
755             }
756             }
757              
758             if (!$do_retry) {
759             if ($self->debug_failures()) {
760             print "DynamoDB Failure: $status\n";
761             if (defined($resp)) {
762             print "response:\n";
763             print $resp->as_string() . "\n";
764             }
765             if (defined($req)) {
766             print "Request:\n";
767             print $req->as_string() . "\n";
768             }
769             }
770             return $r || $status;
771             }
772             },
773             done => $done);
774             };
775              
776             if ($sleep_amount > 0) {
777             $self->{implementation}->delay($sleep_amount)->then($complete);
778             } else {
779             $complete->();
780             }
781             } until => sub { !$do_retry };
782             }
783              
784             my $encode_key = sub {
785             my $source = shift;
786             my $r;
787             foreach my $k (keys %$source) {
788             my $v = $source->{$k};
789             # There is no sense in encoding undefined values or values that
790             # are the empty string.
791             if (defined($v) && $v ne '') {
792             # Reference $source->{$k} since the earlier test may cause
793             # the value to be stringified.
794             $r->{$k} = { _encode_type_and_value($source->{$k}) };
795             }
796             }
797             return $r;
798             };
799              
800              
801             fun _encode_attribute_value_list(Any $value_list, Str $compare_op) {
802             if ($compare_op =~ /^(EQ|NE|LE|LT|GE|GT|CONTAINS|NOT_CONTAINS|BEGINS_WITH)$/) {
803             defined($value_list) || Carp::confess("No defined value for comparison operator: $compare_op");
804             $value_list = [ { _encode_type_and_value($value_list) } ];
805             } elsif ($compare_op eq 'IN') {
806             if (!ref($value_list)) {
807             $value_list = [$value_list];
808             }
809             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
810             } elsif ($compare_op eq 'BETWEEN') {
811             ref($value_list) eq 'ARRAY' || Carp::confess("Use of BETWEEN comparison operator requires an array");
812             scalar(@$value_list) == 2 || Carp::confess("BETWEEN comparison operator requires two values");
813             $value_list = [ map { { _encode_type_and_value($_) } } @$value_list];
814             }
815             return $value_list;
816             }
817              
818             my $encode_filter = sub {
819             my $source = shift;
820              
821             my $r;
822              
823             foreach my $field_name (keys %$source) {
824             my $f = $source->{$field_name};
825             my $compare_op = $f->{ComparisonOperator} // 'EQ';
826             $compare_op =~ /^(EQ|NE|LE|LT|GE|GT|NOT_NULL|NULL|CONTAINS|NOT_CONTAINS|BEGINS_WITH|IN|BETWEEN)$/
827             || Carp::confess("Unknown comparison operator specified: $compare_op");
828            
829             $r->{$field_name} = {
830             ComparisonOperator => $compare_op,
831             (defined($f->{AttributeValueList}) ? (AttributeValueList => _encode_attribute_value_list($f->{AttributeValueList}, $compare_op)) : ())
832             };
833             }
834             return $r;
835             };
836              
837             my $parameter_type_definitions = {
838             AttributesToGet => {},
839             AttributeUpdates => {
840             encode => sub {
841             my $source = shift;
842             my $r;
843             ref($source) eq 'HASH' || Carp::confess("Attribute updates is not a hash ref");
844             foreach my $k (keys %$source) {
845             my $op = $source->{$k};
846             ref($op) eq 'HASH' || Carp::confess("AttributeUpdate for field $k is not a hash ref:" . Data::Dumper->Dump([$op]));
847             $r->{$k} = {
848             (defined($op->{Action}) ? (Action => $op->{Action}) : ()),
849             (defined($op->{Value}) ? (Value => { _encode_type_and_value($op->{Value}) }) : ()),
850             };
851             }
852             return $r;
853             }
854             },
855             # should be a boolean
856             ConsistentRead => {},
857             ConditionalOperator => {},
858             ConditionExpression => {},
859             ExclusiveStartKey => {
860             encode => $encode_key,
861             },
862             ExclusiveStartTableName => {},
863             ExpressionAttributeNames => {},
864             ExpressionAttributeValues => {
865             encode => sub {
866             my $source = shift;
867             my $r;
868             foreach my $key (grep { defined($source->{$_}) } keys %$source) {
869             $r->{$key} = { _encode_type_and_value($source->{$key}) };
870             }
871             return $r;
872             }
873             },
874             Expected => {
875             encode => sub {
876             my $source = shift;
877             my $r;
878             foreach my $key (keys %$source) {
879             my $info = $source->{$key};
880              
881             if (defined($info->{AttributeValueList}) ) {
882             $r->{$key}->{AttributeValueList} = _encode_attribute_value_list($info->{AttributeValueList}, $info->{ComparisonOperator});
883             }
884              
885             if (defined($info->{Exists})) {
886             $r->{$key}->{Exists} = $info->{Exists};
887             }
888              
889             if (defined($info->{ComparisonOperator})) {
890             $r->{$key}->{ComparisonOperator} = $info->{ComparisonOperator};
891             }
892            
893             if (defined($info->{Value})) {
894             $r->{$key}->{Value} = { _encode_type_and_value($info->{Value}) };
895             }
896             }
897             return $r;
898             },
899             },
900             FilterExpression => {},
901             IndexName => {},
902             Item => {
903             encode => $encode_key,
904             },
905             Key => {
906             encode => $encode_key,
907             },
908             Limit => {
909             type_check => 'integer',
910             },
911             QueryFilter => {
912             encode => $encode_filter,
913             },
914             ReturnConsumedCapacity => {},
915             ReturnItemCollectionMetrics => {},
916             ReturnValues => {},
917             ScanIndexForward => {},
918             ScanFilter => {
919             encode => $encode_filter,
920             },
921             Segment => {
922             type_check => 'integer',
923             },
924             Select => {},
925             TableName => {},
926             TotalSegments => {
927             type_check => 'integer',
928             },
929             };
930              
931              
932              
933              
934             # Build a parameter hash from all of the standardized parameters.
935             sub _make_payload {
936             my $args = shift;
937             my @field_names = @_;
938              
939             if (scalar(@field_names) == 0) {
940             @field_names = keys %$args;
941             }
942              
943             my %r;
944             foreach my $field_name (@field_names) {
945             my $value = $args->{$field_name};
946             my $def = $parameter_type_definitions->{$field_name} || Carp::confess("Unknown parameter type: $field_name");
947             if (defined($value)) {
948             if ($def->{type_check} && $def->{type_check} eq 'integer') {
949             $value =~ /^\d+$/ || Carp::confess("$field_name is specified to be an integer but the value is not an integer: $value");
950             $value = int($value);
951             }
952             }
953              
954             if (defined($def->{encode})) {
955             $value = $def->{encode}->($value);
956             }
957              
958             if (defined($value)) {
959             $r{$field_name} = $value;
960             }
961             }
962             return \%r;
963             }
964              
965             fun _decode_single_item_change_response(Str $response) {
966             my $r = decode_json($response);
967             if (defined($r->{Attributes})) {
968             $r->{Attributes} = _decode_item_attributes($r->{Attributes});
969             }
970            
971             if (defined($r->{ItemCollectionMetrics})) {
972             foreach my $key (keys %{$r->{ItemCollectionMetrics}}) {
973             foreach my $key_part (keys %{$r->{ItemCollectionMetrics}->{$key}}) {
974             $r->{ItemCollectionMetrics}->{$key}->{$key_part} = _decode_item_attributes($r->{ItemCollectionMetrics}->{$key})
975             }
976             }
977             }
978             return $r;
979             }
980              
981              
982             fun _create_key_schema(ArrayRef $source, HashRef $known_fields) {
983             defined($source) || die("No source passed to create_key_schema");
984             defined($known_fields) || die("No known fields passed to create_key_schmea");
985             my @r;
986             foreach my $field_name (@$source) {
987             defined($known_fields->{$field_name}) || Carp::confess("Unknown field specified '$field_name' in schema, must be defined in fields. schema:" . Data::Dumper->Dump([$source]));
988             push @r, {
989             AttributeName => $field_name,
990             KeyType => (scalar(@r) ? 'RANGE' : 'HASH')
991             };
992             }
993             return \@r;
994             };
995              
996              
997              
998             1;
999              
1000             __END__
1001              
1002             =pod
1003              
1004             =encoding UTF-8
1005              
1006             =head1 NAME
1007              
1008             Amazon::DynamoDB::20120810
1009              
1010             =head1 VERSION
1011              
1012             version 0.33
1013              
1014             =head1 DESCRIPTION
1015              
1016             =head2 new
1017              
1018             Instantiates the API object.
1019              
1020             Expects the following named parameters:
1021              
1022             =over 4
1023              
1024             =item * implementation - the object which provides a Future-returning C<request> method,
1025             see L<Amazon::DynamoDB::NaHTTP> for example.
1026              
1027             =item * host - the host (IP or hostname) to communicate with
1028              
1029             =item * port - the port to use for HTTP(S) requests
1030              
1031             =item * ssl - true for HTTPS, false for HTTP
1032              
1033             =item * algorithm - which signing algorithm to use, default AWS4-HMAC-SHA256
1034              
1035             =item * scope - the scope for requests, typically C<region/host/aws4_request>
1036              
1037             =item * access_key - the access key for signing requests
1038              
1039             =item * secret_key - the secret key for signing requests
1040              
1041             =item * debug_failures - print errors if they occur
1042              
1043             =item * max_retries - maximum number of retries for a request
1044              
1045             =back
1046              
1047             =head2 create_table
1048              
1049             Creates a new table. It may take some time before the table is marked
1050             as active - use L</wait_for_table_status> to poll until the status changes.
1051              
1052             Amazon Documentation:
1053              
1054             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html>
1055              
1056             $ddb->create_table(
1057             TableName => $table_name,
1058             ReadCapacityUnits => 2,
1059             WriteCapacityUnits => 2,
1060             AttributeDefinitions => {
1061             user_id => 'N',
1062             date => 'N',
1063             },
1064             KeySchema => ['user_id', 'date'],
1065             LocalSecondaryIndexes => [
1066             {
1067             IndexName => 'UserDateIndex',
1068             KeySchema => ['user_id', 'date'],
1069             Projection => {
1070             ProjectionType => 'KEYS_ONLY',
1071             },
1072             ProvisionedThroughput => {
1073             ReadCapacityUnits => 2,
1074             WriteCapacityUnits => 2,
1075             }
1076             }
1077             ]
1078             );
1079              
1080             =back
1081              
1082             =head2 describe_table
1083              
1084             Describes the given table.
1085              
1086             Amazon Documentation:
1087              
1088             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeTable.html>
1089              
1090             $ddb->describe_table(TableName => $table_name);
1091              
1092             =head2 delete_table
1093              
1094             Delete a table.
1095              
1096             Amazon Documentation:
1097              
1098             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteTable.html>
1099              
1100             $ddb->delete_table(TableName => $table_name)
1101              
1102             =head2 wait_for_table_status
1103              
1104             Waits for the given table to be marked as active.
1105              
1106             =over 4
1107              
1108             =item * TableName - the table name
1109              
1110             =item * WaitInterval - default wait interval in seconds.
1111              
1112             =item * DesiredStatus - status to expect before completing. Defaults to ACTIVE
1113              
1114             =back
1115              
1116             $ddb->wait_for_table_status(TableName => $table_name);
1117              
1118             =head2 each_table
1119              
1120             Run code for all current tables.
1121              
1122             Takes a coderef as the first parameter, will call this for each table found.
1123              
1124             Amazon Documentation:
1125              
1126             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ListTables.html>
1127              
1128             my @all_tables;
1129             $ddb->each_table(
1130             sub {
1131             my $table_name =shift;
1132             push @all_tables, $table_name;
1133             });
1134              
1135             =head2 put_item
1136              
1137             Writes a single item to the table.
1138              
1139             Amazon Documentation:
1140              
1141             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html>
1142              
1143             $ddb->put_item(
1144             TableName => $table_name,
1145             Item => {
1146             name => 'Test Name'
1147             },
1148             ReturnValues => 'ALL_OLD');
1149              
1150             =head2 update_item
1151              
1152             Updates a single item in the table.
1153              
1154             Amazon Documentation:
1155              
1156             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html>
1157              
1158             $ddb->update_item(
1159             TableName => $table_name,
1160             Key => {
1161             user_id => 2
1162             },
1163             AttributeUpdates => {
1164             name => {
1165             Action => 'PUT',
1166             Value => "Rusty Conover-3",
1167             },
1168             favorite_color => {
1169             Action => 'DELETE'
1170             },
1171             test_numbers => {
1172             Action => 'DELETE',
1173             Value => [500]
1174             },
1175             added_number => {
1176             Action => 'ADD',
1177             Value => 5,
1178             },
1179             subtracted_number => {
1180             Action => 'ADD',
1181             Value => -5,
1182             },
1183             });
1184              
1185             =head2 delete_item
1186              
1187             Deletes a single item from the table.
1188              
1189             Amazon Documentation:
1190              
1191             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html>
1192              
1193             $ddb->delete_item(
1194             TableName => $table_name,
1195             Key => {
1196             user_id => 5
1197             });
1198              
1199             =head2 get_item
1200              
1201             Retrieve an items from one tables.
1202              
1203             Amazon Documentation:
1204              
1205             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html>
1206              
1207             my $found_item;
1208             my $get = $ddb->get_item(
1209             sub {
1210             $found_item = shift;
1211             },
1212             TableName => $table_name,
1213             Key => {
1214             user_id => 6
1215             });
1216              
1217             =head2 batch_write_item
1218              
1219             Put or delete a collection of items.
1220              
1221             Has no restriction on the number of items able to be processed at one time.
1222              
1223             Amazon Documentation:
1224              
1225             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html>
1226              
1227             $ddb->batch_write_item(
1228             RequestItems => {
1229             books => [
1230             {
1231             DeleteRequest => {
1232             book_id => 3000,
1233             }
1234             },
1235             ],
1236             users => [
1237             {
1238             PutRequest => {
1239             user_id => 3000,
1240             name => "Test batch write",
1241             }
1242             },
1243             {
1244             PutRequest => {
1245             user_id => 3001,
1246             name => "Test batch write",
1247             }
1248             }
1249             ]
1250             });
1251              
1252             =head2 batch_get_item
1253              
1254             Retrieve a batch of items from one or more tables.
1255              
1256             Takes a coderef which will be called for each found item.
1257              
1258             Amazon Documentation:
1259              
1260             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html>
1261              
1262             Additional Parameters:
1263              
1264             =over
1265              
1266             =item * ResultLimit - limit on the total number of results to return.
1267              
1268             =back
1269              
1270             $ddb->batch_get_item(
1271             sub {
1272             my ($table, $item) = @_;
1273             },
1274             RequestItems => {
1275             $table_name => {
1276             ConsistentRead => 'true',
1277             AttributesToGet => ['user_id', 'name'],
1278             Keys => [
1279             {
1280             user_id => 1,
1281             },
1282             ],
1283             }
1284             })
1285              
1286             =head2 scan
1287              
1288             Scan a table for values with an optional filter expression.
1289              
1290             Amazon Documentation:
1291              
1292             L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html>
1293              
1294             Additional parameters:
1295              
1296             =back
1297              
1298             $ddb->scan(
1299             sub {
1300             my $item = shift;
1301             push @found_items, $item;
1302             },
1303             TableName => $table_name,
1304             ScanFilter => {
1305             user_id => {
1306             ComparisonOperator => 'NOT_NULL',
1307             }
1308             });
1309              
1310             =head1 NAME
1311              
1312             Amazon::DynamoDB::20120810 - interact with DynamoDB using API version 20120810
1313              
1314             =head1 METHODS - Internal
1315              
1316             The following methods are intended for internal use and are documented
1317             purely for completeness - for normal operations see L</METHODS> instead.
1318              
1319             =head2 make_request
1320              
1321             Generates an L<HTTP::Request>.
1322              
1323             =head1 FUNCTIONS - Internal
1324              
1325             =head2 _encode_type_and_value
1326              
1327             Returns an appropriate type (N, S, SS etc.) and stringified/encoded value for the given
1328             value.
1329              
1330             DynamoDB only uses strings even if there is a Numeric value specified,
1331             so while the type will be expressed as a Number the value will be
1332             stringified.
1333              
1334             C<http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataFormat.html>
1335              
1336             =head1 AUTHORS
1337              
1338             =over 4
1339              
1340             =item *
1341              
1342             Rusty Conover <rusty@luckydinosaur.com>
1343              
1344             =item *
1345              
1346             Tom Molesworth <cpan@entitymodel.com>
1347              
1348             =back
1349              
1350             =head1 COPYRIGHT AND LICENSE
1351              
1352             This software is copyright (c) 2013 by Tom Molesworth, copyright (c) 2014 Lucky Dinosaur LLC. L<http://www.luckydinosaur.com>.
1353              
1354             This is free software; you can redistribute it and/or modify it under
1355             the same terms as the Perl 5 programming language system itself.
1356              
1357             =cut