File Coverage

blib/lib/Riak/Light.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Light
3             #
4             # This software is copyright (c) 2013 by Weborama.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             ## no critic (RequireUseStrict, RequireUseWarnings)
10             package Riak::Light;
11             {
12             $Riak::Light::VERSION = '0.11';
13             }
14             ## use critic
15              
16 1     1   14510 use 5.010;
  1         2  
  1         28  
17 1     1   394 use Riak::Light::PBC;
  0            
  0            
18             use Riak::Light::Driver;
19             use MIME::Base64 qw(encode_base64);
20             use Type::Params qw(compile);
21             use Types::Standard -types;
22             use English qw(-no_match_vars );
23             use Scalar::Util qw(blessed);
24             use IO::Socket;
25             use Socket qw(TCP_NODELAY IPPROTO_TCP);
26             use Const::Fast;
27             use JSON;
28             use Carp;
29             use Module::Runtime qw(use_module);
30             use Moo;
31              
32             # ABSTRACT: Fast and lightweight Perl client for Riak
33              
34             has pid => ( is => 'lazy', isa => Int, clearer => 1 );
35             has port => ( is => 'ro', isa => Int, required => 1 );
36             has host => ( is => 'ro', isa => Str, required => 1 );
37             has r => ( is => 'ro', isa => Int, default => sub {2} );
38             has w => ( is => 'ro', isa => Int, default => sub {2} );
39             has dw => ( is => 'ro', isa => Int, default => sub {2} );
40              
41             has pr => ( is => 'ro', isa => Int, predicate => 1 );
42             has pw => ( is => 'ro', isa => Int, predicate => 1 );
43             has rw => ( is => 'ro', isa => Int, predicate => 1 );
44              
45             has autodie => ( is => 'ro', isa => Bool, default => sub {1}, trigger => 1 );
46             has timeout => ( is => 'ro', isa => Num, default => sub {0.5} );
47             has tcp_nodelay => ( is => 'ro', isa => Bool, default => sub {1} );
48             has in_timeout => ( is => 'lazy', trigger => 1 );
49             has out_timeout => ( is => 'lazy', trigger => 1 );
50             has client_id => ( is => 'lazy', isa => Str );
51              
52             sub _build_pid {
53             $$;
54             }
55              
56             sub _build_client_id {
57             "perl_riak_light" . encode_base64( int( rand(10737411824) ), '' );
58             }
59              
60             sub _trigger_autodie {
61             my ( $self, $value ) = @_;
62             carp "autodie will be disable in the next version" unless $value;
63             }
64              
65             sub _trigger_in_timeout {
66             carp
67             "this feature will be disabled in the next version, you should use just timeout instead";
68             }
69              
70             sub _trigger_out_timeout {
71             carp
72             "this feature will be disabled in the next version, you should use just timeout instead";
73             }
74              
75             sub _build_in_timeout {
76             $_[0]->timeout;
77             }
78              
79             sub _build_out_timeout {
80             $_[0]->timeout;
81             }
82              
83             has timeout_provider => (
84             is => 'ro',
85             isa => Maybe [Str],
86             default => sub {'Riak::Light::Timeout::Select'}
87             );
88              
89             has driver => ( is => 'lazy', clearer => 1 );
90              
91             sub _build_driver {
92             Riak::Light::Driver->new( socket => $_[0]->_build_socket() );
93             }
94              
95             sub _build_socket {
96             my ($self) = @_;
97              
98             my $host = $self->host;
99             my $port = $self->port;
100              
101             my $socket = IO::Socket::INET->new(
102             PeerHost => $host,
103             PeerPort => $port,
104             Timeout => $self->timeout,
105             );
106              
107             croak "Error ($!), can't connect to $host:$port"
108             unless defined $socket;
109              
110             if ( $self->tcp_nodelay ) {
111             $socket->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 )
112             or croak "Cannot set tcp nodelay $! ($^E)";
113             }
114              
115             return $socket unless defined $self->timeout_provider;
116              
117             use Module::Load qw(load);
118             load $self->timeout_provider;
119              
120             # TODO: add a easy way to inject this proxy
121             $self->timeout_provider->new(
122             socket => $socket,
123             in_timeout => $self->in_timeout,
124             out_timeout => $self->out_timeout,
125             );
126             }
127              
128             sub BUILD {
129             $_[0]->driver;
130             }
131              
132             const my $PING => 'ping';
133             const my $GET => 'get';
134             const my $PUT => 'put';
135             const my $DEL => 'del';
136             const my $GET_KEYS => 'get_keys';
137             const my $QUERY_INDEX => 'query_index';
138             const my $MAP_REDUCE => 'map_reduce';
139             const my $SET_CLIENT_ID => 'set_client_id';
140             const my $GET_CLIENT_ID => 'get_client_id';
141              
142             const my $ERROR_RESPONSE_CODE => 0;
143             const my $GET_RESPONSE_CODE => 10;
144             const my $GET_KEYS_RESPONSE_CODE => 18;
145             const my $MAP_REDUCE_RESPONSE_CODE => 24;
146             const my $QUERY_INDEX_RESPONSE_CODE => 26;
147             const my $GET_CLIENT_ID_RESPONSE_CODE => 4;
148              
149             const my $CODES => {
150             $PING => { request_code => 1, response_code => 2 },
151             $GET => { request_code => 9, response_code => 10 },
152             $PUT => { request_code => 11, response_code => 12 },
153             $DEL => { request_code => 13, response_code => 14 },
154             $GET_KEYS => { request_code => 17, response_code => 18 },
155             $MAP_REDUCE => { request_code => 23, response_code => 24 },
156             $QUERY_INDEX => { request_code => 25, response_code => 26 },
157             $GET_CLIENT_ID => { request_code => 3, response_code => 4 },
158             $SET_CLIENT_ID => { request_code => 5, response_code => 6 },
159             };
160              
161             const my $DEFAULT_MAX_RESULTS => 100;
162              
163             sub ping {
164             $_[0]->_parse_response(
165             operation => $PING,
166             body => q(),
167             );
168             }
169              
170             sub is_alive {
171             eval { $_[0]->ping };
172             }
173              
174             sub get_keys {
175             state $check = compile( Any, Str, Optional [CodeRef] );
176             my ( $self, $bucket, $callback ) = $check->(@_);
177              
178             my $body = RpbListKeysReq->encode( { bucket => $bucket } );
179             $self->_parse_response(
180             key => "*",
181             bucket => $bucket,
182             operation => $GET_KEYS,
183             body => $body,
184             callback => $callback,
185             );
186             }
187              
188             sub get_raw {
189             state $check = compile( Any, Str, Str, Optional [Bool] );
190             my ( $self, $bucket, $key, $return_all ) = $check->(@_);
191             my $response = $self->_fetch( $bucket, $key, 0 );
192              
193             my $result;
194             if ( defined $response ) {
195             $result = ($return_all) ? $response : $response->{value};
196             }
197             $result;
198             }
199              
200             sub get_full_raw {
201             state $check = compile( Any, Str, Str );
202             my ( $self, $bucket, $key ) = $check->(@_);
203              
204             $self->get_raw( $bucket, $key, 1 );
205             }
206              
207             sub get {
208             state $check = compile( Any, Str, Str, Optional [Bool] );
209             my ( $self, $bucket, $key, $return_all ) = $check->(@_);
210             my $response = $self->_fetch( $bucket, $key, 1 );
211             my $result;
212             if ( defined $response ) {
213             $result = ($return_all) ? $response : $response->{value};
214             }
215             $result;
216             }
217              
218             sub get_full {
219             state $check = compile( Any, Str, Str );
220             my ( $self, $bucket, $key ) = $check->(@_);
221              
222             $self->get( $bucket, $key, 1 );
223             }
224              
225             sub get_all_indexes {
226             state $check = compile( Any, Str, Str );
227             my ( $self, $bucket, $key ) = $check->(@_);
228             my $response = $self->_fetch( $bucket, $key, 0, 1 );
229              
230             return ( !defined $response )
231             ? []
232             : [ map { +{ value => $_->value, key => $_->key } }
233             @{ $response->{indexes} // [] } ];
234             }
235              
236             sub get_index_value {
237             state $check = compile( Any, Str, Str, Str );
238             my ( $self, $bucket, $key, $index_name ) = $check->(@_);
239              
240             $self->get_all_index_values( $bucket, $key )->{$index_name};
241             }
242              
243             sub get_all_index_values {
244             state $check = compile( Any, Str, Str );
245             my ( $self, $bucket, $key ) = $check->(@_);
246              
247             my %values;
248             foreach my $index ( @{ $self->get_all_indexes( $bucket, $key ) } ) {
249             my $key = $index->{key};
250             $values{$key} //= [];
251             push @{ $values{$key} }, $index->{value};
252             }
253              
254             \%values;
255             }
256              
257             sub get_vclock {
258             state $check = compile( Any, Str, Str );
259             my ( $self, $bucket, $key ) = $check->(@_);
260             my $response = $self->_fetch( $bucket, $key, 0, 1 );
261              
262             defined $response and $response->{vclock};
263             }
264              
265             sub exists {
266             state $check = compile( Any, Str, Str );
267             my ( $self, $bucket, $key ) = $check->(@_);
268             defined $self->_fetch( $bucket, $key, 0, 1 );
269             }
270              
271             sub _fetch {
272             my ( $self, $bucket, $key, $decode, $head ) = @_;
273              
274             my %extra_parameters;
275             $extra_parameters{pr} = $self->pr if $self->has_pr;
276              
277             my $body = RpbGetReq->encode(
278             { r => $self->r,
279             key => $key,
280             bucket => $bucket,
281             head => $head,
282             %extra_parameters
283             }
284             );
285              
286             $self->_parse_response(
287             key => $key,
288             bucket => $bucket,
289             operation => $GET,
290             body => $body,
291             decode => $decode,
292             );
293             }
294              
295             sub put_raw {
296             state $check =
297             compile( Any, Str, Str, Any, Optional [Str],
298             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
299             my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
300             $check->(@_);
301             $content_type ||= 'plain/text';
302             $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
303             }
304              
305             sub put {
306             state $check =
307             compile( Any, Str, Str, Any, Optional [Str],
308             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
309             my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
310             $check->(@_);
311              
312             ( $content_type ||= 'application/json' ) eq 'application/json'
313             and $value = encode_json($value);
314              
315             $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
316             }
317              
318             sub _store {
319             my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes,
320             $vclock ) = @_;
321              
322             my %extra_parameters = ();
323              
324             $extra_parameters{vclock} = $vclock if $vclock;
325             $extra_parameters{dw} = $self->dw;
326             $extra_parameters{pw} = $self->pw if $self->has_pw;
327              
328             my $body = RpbPutReq->encode(
329             { key => $key,
330             bucket => $bucket,
331             content => {
332             value => $encoded_value,
333             content_type => $content_type,
334             ( $indexes
335             ? ( indexes => [
336             map {
337             my $k = $_;
338             my $v = $indexes->{$_};
339             ref $v eq 'ARRAY'
340             ? map { { key => $k, value => $_ }; } @$v
341             : { key => $k, value => $v };
342             } keys %$indexes
343             ]
344             )
345             : ()
346             ),
347             },
348             w => $self->w,
349             %extra_parameters,
350             }
351             );
352              
353             $self->_parse_response(
354             key => $key,
355             bucket => $bucket,
356             operation => $PUT,
357             body => $body,
358             );
359             }
360              
361             sub del {
362             state $check = compile( Any, Str, Str );
363             my ( $self, $bucket, $key ) = $check->(@_);
364              
365             my %extra_parameters;
366              
367             $extra_parameters{rw} = $self->rw if $self->has_rw;
368             $extra_parameters{pr} = $self->pr if $self->has_pr;
369             $extra_parameters{pw} = $self->pw if $self->has_pw;
370              
371             my $body = RpbDelReq->encode(
372             { key => $key,
373             bucket => $bucket,
374             r => $self->r,
375             w => $self->w,
376             dw => $self->dw,
377             %extra_parameters
378             }
379             );
380              
381             $self->_parse_response(
382             key => $key,
383             bucket => $bucket,
384             operation => $DEL,
385             body => $body,
386             );
387             }
388              
389             sub query_index_loop {
390             state $check =
391             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
392             my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
393             $check->(@_);
394              
395             $extra_parameters //= {};
396             $extra_parameters->{max_results} //= $DEFAULT_MAX_RESULTS;
397              
398             my @keys;
399             do {
400              
401             my ( $temp_keys, $continuation, undef ) =
402             $self->query_index( $bucket, $index, $value_to_match,
403             $extra_parameters );
404              
405             $extra_parameters->{continuation} = $continuation;
406              
407             push @keys, @{$temp_keys};
408              
409             } while ( defined $extra_parameters->{continuation} );
410              
411             return \@keys;
412             }
413              
414             sub query_index {
415             state $check =
416             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
417             my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
418             $check->(@_);
419              
420             my $query_type = 0; # eq
421             ref $value_to_match
422             and $query_type = 1; # range
423              
424             croak "query index in stream mode not supported"
425             if defined $extra_parameters && $extra_parameters->{stream};
426              
427             my $body = RpbIndexReq->encode(
428             { index => $index,
429             bucket => $bucket,
430             qtype => $query_type,
431             $query_type
432             ? ( range_min => $value_to_match->[0],
433             range_max => $value_to_match->[1]
434             )
435             : ( key => $value_to_match ),
436             %{ $extra_parameters // {} },
437             }
438             );
439              
440             $self->_parse_response(
441             $query_type
442             ? ( key => "2i query on index='$index' => "
443             . $value_to_match->[0] . '...'
444             . $value_to_match->[1] )
445             : ( key => "2i query on index='$index' => " . $value_to_match ),
446             bucket => $bucket,
447             operation => $QUERY_INDEX,
448             body => $body,
449             paginate => defined $extra_parameters
450             && exists $extra_parameters->{max_results},
451             );
452             }
453              
454             sub map_reduce {
455             state $check = compile( Any, Any, Optional [CodeRef] );
456             my ( $self, $request, $callback ) = $check->(@_);
457              
458             my @args;
459              
460             push @args, ref($request) ? encode_json($request) : $request;
461             push @args, 'application/json';
462             push @args, $callback if $callback;
463              
464             $self->map_reduce_raw(@args);
465             }
466              
467             sub map_reduce_raw {
468             state $check = compile( Any, Str, Str, Optional [CodeRef] );
469             my ( $self, $request, $content_type, $callback ) = $check->(@_);
470              
471             my $body = RpbMapRedReq->encode(
472             { request => $request,
473             content_type => $content_type,
474             }
475             );
476              
477             $self->_parse_response(
478             key => 'no-key',
479             bucket => 'no-bucket',
480             operation => $MAP_REDUCE,
481             body => $body,
482             callback => $callback,
483             decode => ( $content_type eq 'application/json' ),
484             );
485             }
486              
487             sub get_client_id {
488             my $self = shift;
489              
490             $self->_parse_response(
491             operation => $GET_CLIENT_ID,
492             body => q(),
493             );
494             }
495              
496             sub set_client_id {
497             state $check = compile( Any, Str );
498             my ( $self, $client_id ) = $check->(@_);
499              
500             my $body = RpbSetClientIdReq->encode( { client_id => $client_id } );
501              
502             $self->_parse_response(
503             operation => $SET_CLIENT_ID,
504             body => $body,
505             );
506             }
507              
508             sub _pid_change {
509             $_[0]->pid != $$;
510             }
511              
512             sub _parse_response {
513             my ( $self, %args ) = @_;
514              
515             my $operation = $args{operation};
516              
517             my $request_code = $CODES->{$operation}->{request_code};
518             my $expected_code = $CODES->{$operation}->{response_code};
519              
520             my $request_body = $args{body};
521             my $decode = $args{decode};
522             my $bucket = $args{bucket};
523             my $key = $args{key};
524             my $callback = $args{callback};
525             my $paginate = $args{paginate};
526              
527             $self->autodie
528             or undef $@; ## no critic (RequireLocalizedPunctuationVars)
529              
530             if ( $self->_pid_change ) {
531             $self->clear_pid;
532             $self->clear_driver;
533             }
534              
535             $self->driver->perform_request(
536             code => $request_code,
537             body => $request_body
538             )
539             or return $self->_process_generic_error(
540             $ERRNO, $operation, $bucket,
541             $key
542             );
543              
544             # my $done = 0;
545             #$expected_code != $GET_KEYS_RESPONSE_CODE;
546              
547             my $response;
548             my @results;
549             while (1) {
550              
551             # get and check response
552             $response = $self->driver->read_response()
553             // { code => -1, body => undef, error => $ERRNO };
554              
555             my ( $response_code, $response_body, $response_error ) =
556             @{$response}{qw(code body error)};
557              
558             # in case of internal error message
559             defined $response_error
560             and return $self->_process_generic_error(
561             $response_error, $operation, $bucket,
562             $key
563             );
564              
565             # in case of error msg
566             $response_code == $ERROR_RESPONSE_CODE
567             and return $self->_process_riak_error(
568             $response_body, $operation, $bucket,
569             $key
570             );
571              
572             # in case of default message
573             $response_code != $expected_code
574             and return $self->_process_generic_error(
575             "Unexpected Response Code in (got: $response_code, expected: $expected_code)",
576             $operation, $bucket, $key
577             );
578              
579             $response_code == $GET_CLIENT_ID_RESPONSE_CODE
580             and return $self->_process_get_client_id_response($response_body);
581              
582             # we have a 'get' response
583             $response_code == $GET_RESPONSE_CODE
584             and
585             return $self->_process_get_response( $response_body, $bucket, $key,
586             $decode );
587              
588             # we have a 'get_keys' response
589             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
590             if ( $response_code == $GET_KEYS_RESPONSE_CODE ) {
591             my $obj = RpbListKeysResp->decode($response_body);
592             my @keys = @{ $obj->keys // [] };
593             if ($callback) {
594             $callback->($_) foreach @keys;
595             $obj->done
596             and return;
597             }
598             else {
599             push @results, @keys;
600             $obj->done
601             and return \@results;
602             }
603             next;
604             } # in case of a 'query_index' response
605             elsif ( $response_code == $QUERY_INDEX_RESPONSE_CODE ) {
606             my $obj = RpbIndexResp->decode($response_body);
607              
608             my $keys = $obj->keys // [];
609              
610             if ( $paginate and wantarray ) {
611             return ( $keys, $obj->continuation, $obj->done );
612             }
613             else {
614             return $keys;
615             }
616             }
617             elsif ( $response_code == $MAP_REDUCE_RESPONSE_CODE ) {
618             my $obj = RpbMapRedResp->decode($response_body);
619              
620             my $phase = $obj->phase;
621             my $response =
622             ($decode)
623             ? decode_json( $obj->response // '[]' )
624             : $obj->response;
625              
626             if ($callback) {
627             $obj->done
628             and return;
629             $callback->( $response, $phase );
630             }
631             else {
632             $obj->done
633             and return \@results;
634             push @results, { phase => $phase, response => $response };
635             }
636             next;
637             }
638              
639             # in case of no return value, signify success
640             return 1;
641             }
642              
643             }
644              
645             sub _process_get_client_id_response {
646             my ( $self, $encoded_message ) = @_;
647              
648             $self->_process_generic_error( "Undefined Message", 'get client id', '-',
649             '-' )
650             unless ( defined $encoded_message );
651              
652             my $decoded_message = RpbGetClientIdResp->decode($encoded_message);
653             $decoded_message->client_id;
654             }
655              
656             sub _process_get_response {
657             my ( $self, $encoded_message, $bucket, $key, $should_decode ) = @_;
658              
659             $self->_process_generic_error( "Undefined Message", 'get', $bucket, $key )
660             unless ( defined $encoded_message );
661              
662             my $decoded_message = RpbGetResp->decode($encoded_message);
663              
664             my $contents = $decoded_message->content;
665             if ( ref($contents) eq 'ARRAY' ) {
666             my $content = $contents->[0];
667              
668             my $decode =
669             $should_decode && ( $content->content_type eq 'application/json' );
670             return {
671             value => ($decode)
672             ? decode_json( $content->value )
673             : $content->value,
674             indexes => $content->indexes,
675             vclock => $decoded_message->vclock,
676             };
677             }
678              
679             undef;
680             }
681              
682             sub _process_riak_error {
683             my ( $self, $encoded_message, $operation, $bucket, $key ) = @_;
684              
685             my $decoded_message = RpbErrorResp->decode($encoded_message);
686              
687             my $errmsg = $decoded_message->errmsg;
688             my $errcode = $decoded_message->errcode;
689              
690             $self->_process_generic_error(
691             "Riak Error (code: $errcode) '$errmsg'",
692             $operation, $bucket, $key
693             );
694             }
695              
696             sub _process_generic_error {
697             my ( $self, $error, $operation, $bucket, $key ) = @_;
698              
699             my $extra = '';
700              
701             if ( $operation eq $PING ) {
702             $extra = '';
703             }
704             elsif ( $operation eq $QUERY_INDEX ) {
705             $extra = "(bucket: $bucket, $key)";
706             }
707             elsif ( $operation eq $MAP_REDUCE ) {
708             $extra = ''; # maybe add the sha1 of the request?
709             }
710             else {
711             $extra = "(bucket: $bucket, key: $key)";
712             }
713              
714             my $error_message = "Error in '$operation' $extra: $error";
715              
716             croak $error_message if $self->autodie;
717              
718             $@ = $error_message; ## no critic (RequireLocalizedPunctuationVars)
719              
720             undef;
721             }
722              
723             1;
724              
725              
726             =pod
727              
728             =head1 NAME
729              
730             Riak::Light - Fast and lightweight Perl client for Riak
731              
732             =head1 VERSION
733              
734             version 0.11
735              
736             =head1 SYNOPSIS
737              
738             use Riak::Light;
739              
740             # create a new instance - using pbc only
741             my $client = Riak::Light->new(
742             host => '127.0.0.1',
743             port => 8087
744             );
745              
746             $client->is_alive() or die "ops, riak is not alive";
747              
748             # store hashref into bucket 'foo', key 'bar'
749             # will serializer as 'application/json'
750             $client->put( foo => bar => { baz => 1024 });
751              
752             # store text into bucket 'foo', key 'bar'
753             $client->put( foo => baz => "sometext", 'text/plain');
754             $client->put_raw( foo => baz => "sometext"); # does not encode !
755              
756             # fetch hashref from bucket 'foo', key 'bar'
757             my $hash = $client->get( foo => 'bar');
758             my $text = $client->get_raw( foo => 'baz'); # does not decode !
759              
760             # delete hashref from bucket 'foo', key 'bar'
761             $client->del(foo => 'bar');
762              
763             # check if exists (like get but using less bytes in the response)
764             $client->exists(foo => 'baz') or warn "ops, foo => bar does not exist";
765              
766             # list keys in stream (callback only)
767             $client->get_keys(foo => sub{
768             my $key = $_[0];
769              
770             # you should use another client inside this callback!
771             $another_client->del(foo => $key);
772             });
773            
774             # perform 2i queries
775             my $keys = $client->query_index( $bucket_name => 'index_test_field_bin', 'plop');
776            
777             # list all 2i indexes and values
778             my $indexes = $client->get_all_indexes( $bucket_name => $key );
779            
780             # perform map / reduce operations
781             my $response = $client->map_reduce('{
782             "inputs":"training",
783             "query":[{"map":{"language":"javascript",
784             "source":"function(riakObject) {
785             var val = riakObject.values[0].data.match(/pizza/g);
786             return [[riakObject.key, (val ? val.length : 0 )]];
787             }"}}]}');
788              
789             =head1 DESCRIPTION
790              
791             Riak::Light is a very light (and fast) Perl client for Riak using PBC
792             interface. Support operations like ping, get, exists, put, del, and secondary
793             indexes (so-called 2i) setting and querying.
794              
795             It is flexible to change the timeout backend for I/O operations and can
796             suppress 'die' in case of error (autodie) using the configuration. There is no
797             auto-reconnect option. It can be very easily wrapped up by modules like
798             L to manage flexible retry/reconnect strategies.
799              
800             =head2 ATTRIBUTES
801              
802             =head3 host
803              
804             Riak ip or hostname. There is no default.
805              
806             =head3 port
807              
808             Port of the PBC interface. There is no default.
809              
810             =head3 r
811              
812             R value setting for this client. Default 2.
813              
814             =head3 w
815              
816             W value setting for this client. Default 2.
817              
818             =head3 dw
819              
820             DW value setting for this client. Default 2.
821              
822             =head3 rw
823              
824             RW value setting for this client. Default not set ( and omit in the request)
825              
826             =head3 pr
827              
828             PR value setting for this client. Default not set ( and omit in the request)
829              
830             =head3 pw
831              
832             PW value setting for this client. Default not set ( and omit in the request)
833              
834             =head3 autodie
835              
836             Boolean, if false each operation will return undef in case of error (stored in $@). Default is true.
837              
838             =head3 timeout
839              
840             Timeout for connection, write and read operations. Default is 0.5 seconds.
841              
842             =head3 in_timeout
843              
844             Timeout for read operations. Default is timeout value.
845              
846             =head3 out_timeout
847              
848             Timeout for write operations. Default is timeout value.
849              
850             =head3 tcp_nodelay
851              
852             Boolean, enable or disable TCP_NODELAY. If True (default), disables Nagle's Algorithm.
853              
854             See more in: L.
855              
856             =head3 timeout_provider
857              
858             Can change the backend for timeout. The default value is IO::Socket::INET and
859             there is only support to connection timeout.
860              
861             B: in case of any timeout error, the socket between this client and the
862             Riak server will be closed. To support I/O timeout you can choose 5 options (or
863             you can set undef to avoid IO Timeout):
864              
865             =over
866              
867             =item * Riak::Light::Timeout::Alarm
868              
869             uses alarm and Time::HiRes to control the I/O timeout. Does not work on Win32.
870             (Not Safe)
871              
872             =item * Riak::Light::Timeout::Time::Out
873              
874             uses Time::Out and Time::HiRes to control the I/O timeout. Does not work on
875             Win32. (Not Safe)
876              
877             =item * Riak::Light::Timeout::Select
878              
879             uses IO::Select to control the I/O timeout
880              
881             =item * Riak::Light::Timeout::SelectOnWrite
882              
883             uses IO::Select to control only Output Operations. Can block in Write
884             Operations. Be Careful.
885              
886             =item * Riak::Light::Timeout::SetSockOpt
887              
888             uses setsockopt to set SO_RCVTIMEO and SO_SNDTIMEO socket properties. Does not
889             Work on NetBSD 6.0.
890              
891             =back
892              
893             =head3 driver
894              
895             This is a Riak::Light::Driver instance, to be able to connect and perform
896             requests to Riak over PBC interface.
897              
898             =head2 METHODS
899              
900             =head3 is_alive
901              
902             $client->is_alive() or warn "ops... something is wrong: $@";
903              
904             Perform a ping operation. Will return false in case of error (will store in $@).
905              
906             =head3 ping
907              
908             try { $client->ping() } catch { "oops... something is wrong: $_" };
909              
910             Perform a ping operation. Will die in case of error.
911              
912             =head3 set_client_id
913              
914             $client->set_client_id('foobar');
915              
916             Set the client id.
917              
918             =head3 get_client_id
919              
920             my $client_id = $client->get_client_id();
921              
922             Get the client id.
923              
924             =head3 get
925              
926             my $value_or_reference = $client->get(bucket => 'key');
927              
928             Perform a fetch operation. Expects bucket and key names. Decode the json into a
929             Perl structure, if the content_type is 'application/json'. If you need the raw
930             data you can use L.
931              
932             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
933             value (the data decoded), indexes and vclock.
934              
935             =head3 get_raw
936              
937             my $scalar_value = $client->get_raw(bucket => 'key');
938              
939             Perform a fetch operation. Expects bucket and key names. Return the raw data.
940             If you need decode the json, you should use L instead.
941              
942             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
943             value (the data decoded), indexes and vclock.
944              
945             =head3 get_full
946              
947             my $value_or_reference = $client->get_full(bucket => 'key');
948              
949             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
950             value (the data decoded), indexes and vclock. It is the equivalent to call get(bucket, key, 1)
951              
952             =head3 get_full_raw
953              
954             my $scalar_value = $client->get_full_raw(bucket => 'key');
955              
956             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
957             value (the raw data), indexes and vclock. It is the equivalent to call get_raw(bucket, key, 1)
958              
959             =head3 exists
960              
961             $client->exists(bucket => 'key') or warn "key not found";
962              
963             Perform a fetch operation but with head => 0, and the if there is something
964             stored in the bucket/key.
965              
966             =head3 get_all_indexes
967              
968             $client->get_all_indexes(bucket => 'key');
969              
970             Perform a fetch operation but instead return the content, return a hashref with a mapping between index name and an arrayref with all possible values (or empty arrayref if none). For example one possible return is:
971              
972             [
973             { key => 'index_test_field_bin', value => 'plop' },
974             { key => 'index_test_field2_bin', value => 'plop2' },
975             { key => 'index_test_field2_bin', value => 'plop3' },
976             ]
977              
978             IMPORT: this arrayref is unsortered.
979              
980             =head3 get_index_value
981              
982             Perform a fetch operation, will return an arrayref with all values of the index or undef (if does not exists). There is no order for the array.
983              
984             my $value = $client->get_index_value(bucket => key => 'index_test_field_bin');
985              
986             It is similar to do
987              
988             my $value = $client->get_all_index_values(bucket => 'key')->{index_test_field_bin};
989              
990             =head3 get_all_index_values
991              
992             Perform a fetch operation, will return an hashref with all 2i indexes names as keys, and arrayref of all values for values.
993              
994             =head3 get_vclock
995              
996             Perform a fetch operation, will return the value of the vclock
997              
998             my $vclock = $client->get_vclock(bucket => 'key');
999              
1000             =head3 put
1001              
1002             $client->put('bucket', 'key', { some_values => [1,2,3] });
1003             $client->put('bucket', 'key', { some_values => [1,2,3] }, 'application/json);
1004             $client->put('bucket', 'key', 'text', 'plain/text');
1005              
1006             # you can set secondary indexes (2i)
1007             $client->put( 'bucket', 'key', 'text', 'plain/text',
1008             { field1_bin => 'abc', field2_int => 42 }
1009             );
1010             $client->put( 'bucket', 'key', { some_values => [1,2,3] }, undef,
1011             { field1_bin => 'abc', field2_int => 42 }
1012             );
1013             # remember that a key can have more than one value in a given index. In this
1014             # case, use ArrayRef:
1015             $client->put( 'bucket', 'key', 'value', undef,
1016             { field1_bin => [ 'abc', 'def' ] } );
1017              
1018             Perform a store operation. Expects bucket and key names, the value, the content
1019             type (optional, default is 'application/json'), and the indexes to set for this
1020             value (optional, default is none).
1021              
1022             Will encode the structure in json string if necessary. If you need only store
1023             the raw data you can use L instead.
1024              
1025             B: all the index field names should end by either C<_int> or
1026             C<_bin>, depending if the index type is integer or binary.
1027              
1028             To query secondary indexes, see L.
1029              
1030             =head3 put_raw
1031              
1032             $client->put_raw('bucket', 'key', encode_json({ some_values => [1,2,3] }), 'application/json');
1033             $client->put_raw('bucket', 'key', 'text');
1034             $client->put_raw('bucket', 'key', 'text', undef, {field_bin => 'foo'});
1035              
1036             Perform a store operation. Expects bucket and key names, the value, the content
1037             type (optional, default is 'plain/text'), and the indexes to set for this value
1038             (optional, default is none).
1039              
1040             Will encode the raw data. If you need encode the structure you can use L
1041             instead.
1042              
1043             B: all the index field names should end by either C<_int> or
1044             C<_bin>, depending if the index type is integer or binary.
1045              
1046             To query secondary indexes, see L.
1047              
1048             =head3 del
1049              
1050             $client->del(bucket => key);
1051              
1052             Perform a delete operation. Expects bucket and key names.
1053              
1054             =head3 get_keys
1055              
1056             $client->get_keys(foo => sub{
1057             my $key = $_[0];
1058              
1059             # you should use another client inside this callback!
1060             $another_client->del(foo => $key);
1061             });
1062              
1063             Perform a list keys operation. Receive a callback and will call it for each
1064             key. You can't use this callback to perform other operations!
1065              
1066             The callback is optional, in which case an ArrayRef of all the keys are
1067             returned. But you should always provide a callback, to avoid your RAM usage to
1068             skyrocket...
1069              
1070             =head3 query_index
1071              
1072             Perform a secondary index query. Expects a bucket name, the index field name,
1073             and the index value you're searching on. Returns and ArrayRef of matching keys.
1074              
1075             The index value you're searching on can be of two types. If it's a scalar, an
1076             B query will be performed. if the value is an ArrayRef, then a
1077             B query will be performed, the first element in the array will be the
1078             range_min, the second element the range_max. other elements will be ignored.
1079              
1080             Based on the example in C, here is how to query it:
1081              
1082             # exact match
1083             my $matching_keys = $client->query_index( 'bucket', 'field2_int', 42 );
1084              
1085             # range match
1086             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50] );
1087              
1088             # with pagination
1089             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, { max_results => 100 });
1090              
1091             to fetch the next 100 keys
1092              
1093             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, {
1094             max_results => 100,
1095             continuation => $continuation
1096             });
1097              
1098             to fetch only the first 100 keys you can do this
1099              
1100             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50], { max_results => 100 });
1101              
1102             =head3 query_index_loop
1103              
1104             Instead using a normal loop around query_index to query 2i with pagination, like this:
1105              
1106             do {
1107             ($matching_keys, $continuation) = $client->query_index( 'bucket', 'field2_int', 42, {
1108             max_results => 100,
1109             continuation => $continuation
1110             });
1111             push @keys, @{$matching_keys};
1112             } while(defined $continuation);
1113              
1114             you can simply use query_index_loop helper method
1115              
1116             my $matching_keys = $client->query_index_loop( 'bucket', 'field2_int', [ 40, 50], { max_results => 1024 });
1117              
1118             if you omit the max_results, the default value is 100
1119              
1120             =head3 map_reduce
1121              
1122             This is an alias for map_reduce_raw with content-type 'application/json'
1123              
1124             =head3 map_reduce_raw
1125              
1126             Performa a map/reduce operation. You can use content-type 'application/json' or 'application/x-erlang-binary' Accept callback.
1127              
1128             Example:
1129              
1130             my $map_reduce_json = '{
1131             "inputs":"training",
1132             "query":[{"map":{"language":"javascript",
1133             "source":"function(riakObject) {
1134             var val = riakObject.values[0].data.match(/pizza/g);
1135             return [[riakObject.key, (val ? val.length : 0 )]];
1136             }"}}]}';
1137            
1138             my $response = $client->map_reduce_raw($map_reduce_json, 'application/json');
1139              
1140             will return something like
1141              
1142             [
1143             {'response' => [['foo',1]],'phase' => 0},
1144             {'response' => [['bam',3]],'phase' => 0},
1145             {'response' => [['bar',4]],'phase' => 0},
1146             {'response' => [['baz',0]],'phase' => 0}
1147             ]
1148              
1149             a hashref with response (decoded if json) and phase value. you can also pass a callback
1150              
1151             $client->map_reduce( $map_reduce_json , sub {
1152             my ($response, $phase) = @_;
1153            
1154             # process the response
1155             });
1156              
1157             this callback will be called 4 times, with this response (decoded from json)
1158              
1159             [['foo', 1]]
1160             [['bam', 3]]
1161             [['bar', 4]]
1162             [['baz', 0]]
1163              
1164             using map_reduce method, you can also use a hashref as a map reduce query:
1165              
1166             my $json_hash = {
1167             inputs => "training",
1168             query => [{
1169             map => {
1170             language =>"javascript",
1171             source =>"function(riakObject) {
1172             var val = riakObject.values[0].data.match(/pizza/g);
1173             return [[riakObject.key, (val ? val.length : 0 )]];
1174             }"
1175             }
1176             }]
1177             };
1178            
1179             $client->map_reduce($json_hash, sub { ... });
1180              
1181             map_reduce encode/decode to json format. If you need control with the format (like to use with erlang), you should use map_reduce_raw.
1182              
1183             you can use erlang functions but using the json format (see L).
1184              
1185             {"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}
1186              
1187             More information:
1188              
1189             L
1190              
1191             L
1192              
1193             L
1194              
1195             =head1 SEE ALSO
1196              
1197             L
1198              
1199             L
1200              
1201             L
1202              
1203             L
1204              
1205             =head1 AUTHORS
1206              
1207             =over 4
1208              
1209             =item *
1210              
1211             Tiago Peczenyj
1212              
1213             =item *
1214              
1215             Damien Krotkine
1216              
1217             =back
1218              
1219             =head1 COPYRIGHT AND LICENSE
1220              
1221             This software is copyright (c) 2013 by Weborama.
1222              
1223             This is free software; you can redistribute it and/or modify it under
1224             the same terms as the Perl 5 programming language system itself.
1225              
1226             =cut
1227              
1228              
1229             __END__