File Coverage

blib/lib/Data/Riak/Fast/MapReduce.pm
Criterion Covered Total %
statement 18 22 81.8
branch 0 2 0.0
condition n/a
subroutine 6 7 85.7
pod 0 1 0.0
total 24 32 75.0


line stmt bran cond sub pod time code
1             package Data::Riak::Fast::MapReduce;
2              
3             # ABSTRACT: A map/reduce query
4              
5 22     22   942 use Mouse;
  22         27746  
  22         155  
6 22     22   21111 use Data::Riak::Fast::MapReduce::Phase::Link;
  22         62  
  22         631  
7 22     22   2120 use Data::Riak::Fast::MapReduce::Phase::Map;
  22         42  
  22         679  
8 22     22   660 use Data::Riak::Fast::MapReduce::Phase::Reduce;
  22         44  
  22         1024  
9              
10 22     22   128 use JSON::XS qw/encode_json/;
  22         42  
  22         5940  
11              
12             with 'Data::Riak::Fast::Role::HasRiak';
13              
14             =head1 DESCRIPTION
15              
16             A map/reduce query.
17              
18             =head1 SYNOPSIS
19              
20             my $riak = Data::Riak::Fast->new;
21              
22             my $mr = Data::Riak::Fast::MapReduce->new({
23             riak => $riak,
24             inputs => [ [ "products8", $arg ] ],
25             phases => [
26             Data::Riak::Fast::MapReduce::Phase::Map->new(
27             language => "javascript",
28             source => "
29             function(v) {
30             var m = v.values[0].data.toLowerCase().match(/\w*/g);
31             var r = [];
32             for(var i in m) {
33             if(m[i] != '') {
34             var o = {};
35             o[m[i]] = 1;
36             r.push(o);
37             }
38             }
39             return r;
40             }
41             ",
42             ),
43             Data::Riak::Fast::MapReduce::Phase::Reduce->new(
44             language => "javascript",
45             source => "
46             function(v) {
47             var r = {};
48             for(var i in v) {
49             for(var w in v[i]) {
50             if(w in r) r[w] += v[i][w];
51             else r[w] = v[i][w];
52             }
53             }
54             return [r];
55             }
56             ",
57             ),
58             ]
59             });
60              
61             my $results = $mr->mapreduce;
62              
63             =head2 inputs
64              
65             Inputs to this query. There are few allowable forms.
66              
67             For a single bucket:
68              
69             inputs => "bucketname"
70              
71             For a bucket and key (or many!):
72              
73             inputs => [ [ "bucketname", "keyname" ] ]
74              
75             inputs => [ [ "bucketname", "keyname" ], [ "bucketname", "keyname2" ] ]
76            
77             And finally:
78              
79             inputs => [ [ "bucketname", "keyname", "keyData" ] ]
80              
81             =cut
82              
83             has inputs => (
84             is => 'ro',
85             isa => 'ArrayRef | Str | HashRef',
86             required => 1
87             );
88              
89             =head2 phases
90              
91             An arrayref of phases that will be executed in order. The phases should be
92             one of L,
93             L, or L.
94              
95             =cut
96              
97             has phases => (
98             is => 'ro',
99             isa => 'ArrayRef[Data::Riak::Fast::MapReduce::Phase]',
100             required => 1
101             );
102              
103             =head1 METHOD
104             =head2 mapreduce
105              
106             Execute the mapreduce query.
107              
108             To enable streaming, do the following:
109              
110             my $results = $mr->mapreduce(chunked => 1);
111              
112             =cut
113              
114             sub mapreduce {
115 0     0 0   my ($self, %options) = @_;
116            
117 0           return $self->riak->send_request({
118             content_type => 'application/json',
119             method => 'POST',
120             uri => 'mapred',
121             data => encode_json({
122             inputs => $self->inputs,
123 0 0         query => [ map { { $_->phase => $_->pack } } @{ $self->phases } ]
  0            
124             }),
125             ($options{'chunked'}
126             ? (query => { chunked => 'true' })
127             : ()),
128             });
129             }
130              
131             __PACKAGE__->meta->make_immutable;
132 22     22   118 no Mouse;
  22         43  
  22         124  
133              
134             1;
135              
136             __END__