File Coverage

blib/lib/Amazon/DynamoDB/Simple.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Amazon::DynamoDB::Simple;
2 1     1   503 use Amazon::DynamoDB;
  0            
  0            
3             use Carp qw/cluck confess carp croak/;
4             use DDP;
5             use JSON::XS;
6             use Moo;
7             use Try::Tiny;
8              
9             our $VERSION="0.01";
10              
11             =head1 NAME
12              
13             Amazon::DynamoDB::Simple - Simple to use and highly available
14              
15             =head1 SYNOPSIS
16              
17             use Amazon::DynamoDB::Simple;
18              
19             my $table = Amazon::DynamoDB::Simple->new(
20             table => $table, # required
21             primary_key => $primary_key, # required
22             access_key_id => ..., # default: $ENV{AWS_ACCESS_KEY_ID};
23             secret_access_key => ..., # default: $ENV{AWS_SECRET_ACCESS_KEY};
24             );
25              
26             # returns a hash
27             my %item = $table->get($key);
28              
29             # create or update an item
30             $table->put(%item);
31              
32             # mark item as deleted
33             $table->delete($key);
34              
35             # returns a hash representing the whole table as key value pairs
36             $table->items();
37              
38             # returns all the keys in the table
39             $table->keys();
40              
41             # delete $old_key, create $new_key
42             $table->rename($old_key, $new_key);
43              
44             # sync data between AWS regions using the 'last_updated' field to select
45             # the newest data. This method will permanently delete any items marked as
46             # 'deleted'.
47             $table->sync_regions();
48              
49             # This sets the value of the hosts attribute. The value shown is the
50             # default value. You must use exactly two hosts for stuff to work atm.
51             # Sorry.
52             $table->hosts([qw/
53             dynamodb.us-east-1.amazonaws.com
54             dynamodb.us-west-1.amazonaws.com
55             /]);
56              
57             =head1 DESCRIPTION
58              
59             DynamoDB is a simple key value store. A Amazon::DynamoDB::Simple object
60             represents a single table in DynamoDB.
61              
62             This module provides a simple UI layer on top of Amazon::DynamoDB. It also
63             makes your data highly available across exactly 2 AWS regions. In other words
64             it provides redundancy in case one region goes down. It doesn't do async. It
65             doesn't (currently) support secondary keys.
66              
67             Note Amazon::DynamoDB can't handle complex data structures. But this module
68             can because it serializes yer stuff to JSON if needed.
69              
70             At the moment you cannot use this module against a single dynamodb server. The
71             table must exist in 2 regions. I want to make the high availability part
72             optional in the future. It should not be hard. Patches welcome.
73              
74             =head1 DATA REDUNDANCY
75              
76             TODO
77              
78             =cut
79              
80             has table => (is => 'rw', required => 1);
81             has primary_key => (is => 'rw', required => 1);
82             has dynamodbs => (is => 'lazy');
83             has hosts => (is => 'rw', lazy => 1, builder => 1);
84             has access_key_id => (is => 'rw', lazy => 1, builder => 1);
85             has secret_access_key => (is => 'rw', lazy => 1, builder => 1);
86              
87             sub _build_access_key_id { $ENV{AWS_ACCESS_KEY_ID} }
88             sub _build_secret_access_key { $ENV{AWS_SECRET_ACCESS_KEY} }
89              
90             sub _build_hosts {
91             return [qw/
92             dynamodb.us-east-1.amazonaws.com
93             dynamodb.us-west-1.amazonaws.com
94             /];
95             }
96              
97             sub _build_dynamodbs {
98             my $self = shift;
99              
100             my @dynamodbs;
101             my $hosts = $self->hosts();
102              
103             for my $host (@$hosts) {
104             push @dynamodbs,
105             Amazon::DynamoDB->new(
106             access_key => $self->access_key_id,
107             secret_key => $self->secret_access_key,
108             ssl => 1,
109             version => '20120810',
110             implementation => 'Amazon::DynamoDB::MojoUA',
111             host => $host,
112             );
113             }
114              
115             return \@dynamodbs;
116             }
117              
118             sub put {
119             my ($self, %item) = @_;
120              
121             # timestamp this transaction
122             $item{last_updated} = DateTime->now . ""; # stringify datetime
123             $item{deleted} ||= 0;
124              
125             %item = $self->deflate(%item);
126             my $dynamodbs = $self->dynamodbs();
127             my $success = 0;
128              
129             for my $dynamodb (@$dynamodbs) {
130             try {
131             $dynamodb->put_item(
132             TableName => $self->table,
133             Item => \%item,
134             )->get;
135             $success++;
136             }
137             catch {
138             warn "caught error: " . p $_;
139             };
140             }
141              
142             # make sure at least one put_item() was successful
143             confess "unable to save to any dynamodb" unless $success;
144             }
145              
146             sub delete {
147             my ($self, $key) = @_;
148              
149             my %item = $self->get($key);
150              
151             return unless keys %item;
152              
153             $self->put(%item, deleted => 1);
154             }
155              
156             # Amazon::DynamoDB can't handle anything other than simple scalars
157             sub inflate {
158             my ($self, %item) = @_;
159             my %new;
160              
161             for my $key (keys %item) {
162             my $value = $item{$key};
163             $new{$key} = $self->is_valid_json($value)
164             ? JSON::XS->new->utf8->pretty->decode($value)
165             : $value;
166             }
167              
168             return %new;
169             }
170              
171             # Amazon::DynamoDB can't handle anything other than simple scalars
172             sub deflate {
173             my ($self, %item) = @_;
174             my %new;
175              
176             for my $key (keys %item) {
177             my $value = $item{$key};
178             $new{$key} = ref $value
179             ? JSON::XS->new->utf8->pretty->encode($value)
180             : $item{$key};
181             }
182              
183             return %new;
184             }
185              
186             sub is_valid_json {
187             my ($self, $json) = @_;
188             eval { JSON::XS->new->utf8->pretty->decode($json) };
189             return 0 if $@;
190             return 1;
191             }
192              
193             sub permanent_delete {
194             my ($self, $key) = @_;
195              
196             my $dynamodbs = $self->dynamodbs();
197             my $success = 0;
198              
199             for my $dynamodb (@$dynamodbs) {
200             try {
201             $dynamodb->delete_item(
202             TableName => $self->table,
203             Key => { $self->primary_key => $key },
204             )->get;
205             $success++;
206             }
207             catch {
208             warn "caught error: " . p $_;
209             };
210             }
211              
212             confess "unable to permanently delete item from any dynamodb" unless $success;
213             }
214              
215             sub get {
216             my ($self, $key) = @_;
217              
218             my $dynamodbs = $self->dynamodbs();
219             my $success = 0;
220             my @items;
221              
222             for my $dynamodb (@$dynamodbs) {
223             try {
224             push @items, $dynamodb->get_item(
225             sub { shift },
226             TableName => $self->table,
227             Key => { $self->primary_key => $key },
228             )->get();
229             $success++;
230             }
231             catch {
232             warn "caught error: " . p $_;
233             };
234             }
235              
236             confess "unable to connect and get item from any dynamodb" unless $success;
237              
238             my $most_recent;
239             for my $item (@items) {
240             next unless $item;
241              
242             $most_recent = $item
243             if !$most_recent ||
244             $most_recent->{last_updated} le $item->{last_updated};
245             }
246              
247             return if $most_recent->{deleted};
248              
249             return $self->inflate(%$most_recent);
250             }
251              
252             sub scan {
253             my ($self) = @_;
254              
255             my $dynamodbs = $self->dynamodbs();
256             my $success = 0;
257             my @items;
258              
259             for my $dynamodb (@$dynamodbs) {
260             my $res;
261              
262             try {
263             $res = $dynamodb->scan(
264             sub { shift },
265             TableName => $self->table,
266             )->get();
267             }
268             catch {
269             warn "caught error: " . p $_;
270             };
271              
272             return $res if $res;
273             }
274              
275             confess "unable to connect and scan from any dynamodb";
276             }
277              
278             sub sync_regions {
279             my ($self) = @_;
280              
281             my $dynamodb0 = $self->dynamodbs->[0];
282             my $dynamodb1 = $self->dynamodbs->[1];
283              
284             my $scan0 = $dynamodb0->scan(sub { shift }, TableName => $self->table)->get->{Items};
285             my $scan1 = $dynamodb1->scan(sub { shift }, TableName => $self->table)->get->{Items};
286              
287             my $items0 = $self->_process_items($scan0);
288             my $items1 = $self->_process_items($scan1);
289              
290             # sync from $dynamodb0 -> $dynamodb1
291             $self->_sync_items($dynamodb0 => $dynamodb1, $items0 => $items1);
292              
293             # sync from $dynamodb1 -> $dynamodb0
294             $self->_sync_items($dynamodb1 => $dynamodb0, $items1 => $items0);
295             }
296              
297             sub _process_items {
298             my ($self, $items) = @_;
299              
300             my $key = $self->primary_key;
301             my $definitions;
302              
303             for my $item (@$items) {
304             my $primary_key = delete($item->{$key})->{S};
305              
306             for my $attr (keys %$item) {
307             my $type_value = $item->{$attr};
308             my ($type) = keys %$type_value;
309             $definitions->{$primary_key}->{$attr} = $item->{$attr}->{$type};
310             }
311              
312             $definitions->{$primary_key}->{$key} = $primary_key;
313             }
314              
315              
316             return $definitions;
317             }
318              
319             sub _sync_items {
320             my ($self, $from_ddb, $to_ddb, $from_items, $to_items) = @_;
321              
322             my $primary_key_name = $self->primary_key;
323              
324             for my $from_key (keys %$from_items) {
325             my $from_value = $from_items->{$from_key};
326             my $to_value = $to_items->{$from_key};
327             if (!$to_value) {
328             $to_value = {last_updated => '1900-01-01T00:00:00'};
329             $to_items->{$from_key} = $to_value;
330             }
331              
332             my $updated0 = $from_value->{last_updated};
333             my $updated1 = $to_value->{last_updated};
334              
335             # don't need to sync if the items are the same age and not deleted
336             next if $updated0 eq $updated1 && !$to_value->{deleted};
337              
338             # find the newest item
339             my $newest = $updated0 gt $updated1
340             ? $from_value
341             : $to_value;
342              
343             # sync newest item to the other region
344             if ($newest->{deleted}) {
345             $self->permanent_delete( $newest->{$primary_key_name} );
346             }
347             else {
348             # TODO: this could be more efficient by syncing to just the ddb
349             # that needs it
350             $self->put(%$newest);
351             }
352              
353             # Lets say we are syncing from $dynamodb0 -> $dynamodb1. This prevents
354             # us from re syncing this item when we sync in the other direction from
355             # $dynamodb1 -> $dynamodb0
356             $to_value->{last_updated} = $from_value->{last_updated};
357             }
358             }
359              
360             sub items {
361             my ($self) = @_;
362              
363             my $human_items = {};
364              
365             my $items = $self->scan->{Items};
366             my $primary_key_name = $self->primary_key;
367              
368             # convert $items to something more human readable
369             for my $item (@$items) {
370             my $primary_key = delete($item->{$primary_key_name})->{S};
371              
372             for my $attr (keys %$item) {
373             my $type_value = $item->{$attr};
374             my ($type) = keys %$type_value;
375             $human_items->{$primary_key}->{$attr} = $item->{$attr}->{$type};
376             }
377              
378             # inflate json values
379             my $new_item = $human_items->{$primary_key};
380             my %inflated_item = $self->inflate(%$new_item);
381             $human_items->{$primary_key} = \%inflated_item;
382              
383             delete $human_items->{$primary_key}
384             if $human_items->{$primary_key}->{deleted};
385             }
386              
387             return %$human_items;
388             }
389              
390             sub keys {
391             my ($self) = @_;
392             my %items = $self->items;
393             return keys %items;
394             }
395              
396             1;
397             __END__
398              
399             =head1 ACKNOWLEDGEMENTS
400              
401             Thanks to L<DuckDuckGo|http://duckduckgo.com> for making this module possible by donating developer time.
402              
403             =head1 LICENSE
404              
405             Copyright (C) Eric Johnson.
406              
407             This library is free software; you can redistribute it and/or modify
408             it under the same terms as Perl itself.
409              
410             =head1 AUTHOR
411              
412             Eric Johnson E<lt>eric.git@iijo.orgE<gt>
413              
414             =cut
415