File Coverage

blib/lib/Net/Amazon/DynamoDB/HighlyAvailable.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Net::Amazon::DynamoDB::HighlyAvailable;
2 1     1   1652 use Net::Amazon::DynamoDB::Table;
  0            
  0            
3             use Carp qw/cluck confess carp croak/;
4             use DDP;
5             use JSON::XS;
6             use Moo;
7             use Try::Tiny;
8             use DateTime;
9              
10             our $VERSION="0.01";
11              
12             has table => (is => 'rw', required => 1);
13             has hash_key => (is => 'rw', required => 1);
14             has range_key => (is => 'rw', required => 0);
15             has dynamodbs => (is => 'lazy');
16             has regions => (is => 'rw', required => 1);
17             has access_key => (is => 'rw', lazy => 1, builder => 1);
18             has secret_key => (is => 'rw', lazy => 1, builder => 1);
19              
20             sub _build_access_key { $ENV{AWS_ACCESS_KEY} }
21             sub _build_secret_key { $ENV{AWS_SECRET_KEY} }
22             sub _build_regions { [qw/ us-east-1 us-west-1 /] }
23              
24             sub _build_dynamodbs {
25             my $self = shift;
26             my $regions = $self->regions;
27             my @dynamodbs;
28              
29             for my $region (@$regions) {
30             push @dynamodbs, Net::Amazon::DynamoDB::Table->new(
31             region => $region,
32             table => $self->table,
33             hash_key => $self->hash_key,
34             range_key => $self->range_key,
35             access_key => $self->access_key,
36             secret_key => $self->secret_key,
37             );
38             }
39              
40             return \@dynamodbs;
41             }
42              
43             sub put {
44             my ($self, %args) = @_;
45              
46             # timestamp this transaction
47             $args{Item}->{last_updated} = DateTime->now . ""; # stringify datetime
48             $args{Item}->{deleted} ||= 0;
49              
50             my $dynamodbs = $self->dynamodbs;
51             my $success = 0;
52              
53             for my $dynamodb (@$dynamodbs) {
54             try {
55             $dynamodb->put(%args);
56             $success++;
57             }
58             catch {
59             warn "caught error: " . p $_;
60             };
61             }
62              
63             # make sure at least one put() was successful
64             confess "unable to save to any dynamodb" unless $success;
65             }
66              
67             sub delete {
68             my ($self, %args) = @_;
69              
70             my $item = $self->get(%args);
71              
72             return unless keys %$item;
73              
74             $self->put(Item => { %$item, deleted => 1 });
75             }
76              
77             sub permanent_delete {
78             my ($self, %args) = @_;
79              
80             my $dynamodbs = $self->dynamodbs;
81             my $success = 0;
82              
83             for my $dynamodb (@$dynamodbs) {
84             try {
85             $dynamodb->delete(%args);
86             $success++;
87             }
88             catch {
89             warn "caught error: " . p $_;
90             };
91             }
92              
93             confess "unable to permanently delete item from any dynamodb" unless $success;
94             }
95              
96             sub get {
97             my ($self, %args) = @_;
98              
99             my $dynamodbs = $self->dynamodbs;
100             my $success = 0;
101             my @items;
102              
103             for my $dynamodb (@$dynamodbs) {
104             try {
105             push @items, $dynamodb->get(%args);
106             $success++;
107             }
108             catch {
109             warn "caught error: " . p $_;
110             };
111             }
112              
113             confess "unable to connect and get item from any dynamodb" unless $success;
114              
115             my $most_recent;
116             for my $item (@items) {
117             next unless $item;
118              
119             $most_recent = $item
120             if !$most_recent ||
121             $most_recent->{last_updated} le $item->{last_updated};
122             }
123              
124             return if $most_recent->{deleted};
125             return $most_recent;
126             }
127              
128             sub scan {
129             my ($self, @args) = @_;
130              
131             my $dynamodbs = $self->dynamodbs;
132             my $success = 0;
133             my @items;
134              
135             for my $dynamodb (@$dynamodbs) {
136             my $res;
137              
138             try {
139             $res = $dynamodb->scan(@args);
140             }
141             catch {
142             warn "caught error: " . p $_;
143             };
144              
145             return $res if $res;
146             }
147              
148             confess "unable to connect and scan from any dynamodb";
149             }
150              
151             sub scan_as_hashref {
152             my ($self, @args) = @_;
153              
154             my $dynamodbs = $self->dynamodbs;
155             my $success = 0;
156             my @items;
157              
158             for my $dynamodb (@$dynamodbs) {
159             my $res;
160              
161             try {
162             $res = $dynamodb->scan_as_hashref(@args);
163             }
164             catch {
165             warn "caught error: " . p $_;
166             };
167              
168             return $res if $res;
169             }
170              
171             confess "unable to connect and scan from any dynamodb";
172             }
173              
174             sub sync_regions {
175             my ($self, @args) = @_;
176              
177             my $dynamodb0 = $self->dynamodbs->[0];
178             my $dynamodb1 = $self->dynamodbs->[1];
179              
180             my $scan0 = $dynamodb0->scan(@args);
181             my $scan1 = $dynamodb1->scan(@args);
182              
183             my $items0 = $self->_process_items($scan0);
184             my $items1 = $self->_process_items($scan1);
185              
186             # sync from $dynamodb0 -> $dynamodb1
187             $self->_sync_items($dynamodb0 => $dynamodb1, $items0 => $items1);
188              
189             # sync from $dynamodb1 -> $dynamodb0
190             $self->_sync_items($dynamodb1 => $dynamodb0, $items1 => $items0);
191             }
192              
193             sub _process_items {
194             my ($self, $items_arrayref) = @_;
195              
196             my $hash_key_name = $self->hash_key;
197             my $items_hashref;
198              
199             for my $item (@$items_arrayref) {
200             my $hash_key_value = $item->{$hash_key_name};
201             $items_hashref->{$hash_key_value} = $item;
202             }
203              
204             return $items_hashref;
205             }
206              
207             sub _sync_items {
208             my ($self, $from_ddb, $to_ddb, $from_items, $to_items) = @_;
209              
210             my $hash_key_name = $self->hash_key;
211              
212             for my $from_key (keys %$from_items) {
213             my $from_val = $from_items->{$from_key};
214             my $to_val = $to_items->{$from_key};
215              
216             if (!$to_val) {
217             $to_val = {last_updated => '1900-01-01T00:00:00'};
218             $to_items->{$from_key} = $to_val;
219             }
220              
221             my $updated0 = $from_val->{last_updated};
222             my $updated1 = $to_val->{last_updated};
223              
224             # don't need to sync if the items are the same age and not deleted
225             next if $updated0 eq $updated1 && !$to_val->{deleted};
226              
227             # find the newest item
228             my ($newest, $ddb) = $updated0 gt $updated1
229             ? ($from_val, $to_ddb)
230             : ($to_val, $from_ddb);
231              
232             # sync newest item to the other region
233             if ($newest->{deleted}) {
234             my $hash_key_value = $newest->{$hash_key_name};
235             $self->permanent_delete($hash_key_name => $hash_key_value);
236             }
237             else {
238             $ddb->put(Item => $newest);
239             }
240              
241             # Lets say we are syncing from $dynamodb0 -> $dynamodb1. This prevents
242             # us from re syncing this item when we sync in the other direction from
243             # $dynamodb1 -> $dynamodb0
244             $to_val->{last_updated} = $from_val->{last_updated};
245             }
246             }
247              
248             1;
249             __END__