File Coverage

blib/lib/Riak/Client.pm
Criterion Covered Total %
statement 47 260 18.0
branch 1 94 1.0
condition 0 37 0.0
subroutine 16 46 34.7
pod 16 17 94.1
total 80 454 17.6


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Client
3             #
4             # This software is copyright (c) 2014 by Damien Krotkine.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             package Riak::Client;
10             $Riak::Client::VERSION = '1.96';
11 1     1   15584 use 5.010;
  1         2  
  1         26  
12 1     1   384 use Riak::Client::PBC;
  1         3  
  1         57  
13 1     1   634 use Type::Params qw(compile);
  1         70429  
  1         7  
14 1     1   227 use Types::Standard -types;
  1         2  
  1         4  
15 1     1   4010 use Errno qw(EINTR);
  1         980  
  1         97  
16 1     1   7 use Scalar::Util qw(blessed);
  1         2  
  1         57  
17 1     1   673 use JSON::XS;
  1         4754  
  1         77  
18 1     1   7 use Carp;
  1         3  
  1         81  
19             $Carp::Internal{ (__PACKAGE__) }++;
20 1     1   501 use Module::Runtime qw(use_module);
  1         1686  
  1         5  
21             require bytes;
22 1     1   1269 use Moo;
  1         9339  
  1         4  
23              
24 1     1   8098 use IO::Socket::INET;
  1         15218  
  1         6  
25 1     1   951 use IO::Socket::Timeout;
  1         2843  
  1         6  
26              
27 1     1   26 use Scalar::Util qw(weaken);
  1         1  
  1         61  
28              
29             use constant {
30             # error
31 1         416 ERROR_RESPONSE_CODE => 0,
32             # ping
33             PING_REQUEST_CODE => 1,
34             PING_RESPONSE_CODE => 2,
35             # get, get_raw
36             GET_REQUEST_CODE => 9,
37             GET_RESPONSE_CODE => 10,
38             # put, put_raw
39             PUT_REQUEST_CODE => 11,
40             PUT_RESPONSE_CODE => 12,
41             # del
42             DEL_REQUEST_CODE => 13,
43             DEL_RESPONSE_CODE => 14,
44             # get_buckets
45             GET_BUCKETS_REQUEST_CODE => 15,
46             GET_BUCKETS_RESPONSE_CODE => 16,
47             # get_keys
48             GET_KEYS_REQUEST_CODE => 17,
49             GET_KEYS_RESPONSE_CODE => 18,
50             # get_bucket_props
51             GET_BUCKET_PROPS_REQUEST_CODE => 19,
52             GET_BUCKET_PROPS_RESPONSE_CODE => 20,
53             # set_bucket_props
54             SET_BUCKET_PROPS_REQUEST_CODE => 21,
55             SET_BUCKET_PROPS_RESPONSE_CODE => 22,
56             # map_reducd
57             MAP_REDUCE_REQUEST_CODE => 23,
58             MAP_REDUCE_RESPONSE_CODE => 24,
59             # query_index
60             QUERY_INDEX_REQUEST_CODE => 25,
61             QUERY_INDEX_RESPONSE_CODE => 26,
62 1     1   5 };
  1         1  
63              
64              
65             # ABSTRACT: Fast and lightweight Perl client for Riak
66              
67              
68             has host => ( is => 'ro', isa => Str, required => 1 );
69             has port => ( is => 'ro', isa => Int, required => 1 );
70             has r => ( is => 'ro', isa => Int, default => sub {2} );
71             has w => ( is => 'ro', isa => Int, default => sub {2} );
72             has dw => ( is => 'ro', isa => Int, default => sub {1} );
73             has connection_timeout => ( is => 'ro', isa => Num, default => sub {5} );
74             has read_timeout => ( is => 'ro', isa => Num, default => sub {5} );
75             has write_timeout => ( is => 'ro', isa => Num, default => sub {5} );
76             has no_delay => ( is => 'ro', isa => Bool, default => sub {0} );
77              
78              
79             has no_auto_connect => ( is => 'ro', isa => Bool, default => sub {0} );
80              
81             has _on_connect_cb => ( is => 'rw' );
82              
83             has _requests_lock => ( is => 'rw', default => sub { undef });
84              
85             has _socket => ( is => 'ro', lazy => 1, builder => 1 );
86             sub _build__socket {
87 0     0   0 my ($self) = @_;
88              
89 0         0 my $host = $self->host;
90 0         0 my $port = $self->port;
91              
92 0         0 my $socket = IO::Socket::INET->new(
93             PeerHost => $host,
94             PeerPort => $port,
95             Timeout => $self->connection_timeout,
96             );
97              
98 0 0       0 croak "Error ($!), can't connect to $host:$port"
99             unless defined $socket;
100              
101 0 0 0     0 if ($self->read_timeout || $self->write_timeout) {
102             # enable read and write timeouts on the socket
103 0         0 IO::Socket::Timeout->enable_timeouts_on($socket);
104             # setup the timeouts
105 0         0 $socket->read_timeout($self->read_timeout);
106 0         0 $socket->write_timeout($self->write_timeout);
107             }
108              
109 1     1   5 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         1  
  1         2880  
110 0 0       0 $self->no_delay
111             and $socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
112              
113 0         0 return $socket;
114             }
115              
116             sub BUILD {
117 3     3 0 45 my ($self) = @_;
118 3 50       53 $self->no_auto_connect
119             and return;
120              
121 0           $self->connect();
122             }
123              
124              
125             sub connect {
126 0     0 1   state $check = compile(Any, Optional[CodeRef]);
127 0           my ( $self, $cb ) = $check->(@_);
128              
129             # that will perform connection
130 0           $self->_socket();
131 0 0         if ($cb) {
132 0           $cb->();
133 0           return;
134             } else {
135 0           return 1;
136             }
137              
138             }
139              
140             has _getkeys_accumulator => (is => 'rw', init_arg => undef);
141             has _mapreduce_accumulator => (is => 'rw', init_arg => undef);
142              
143              
144             sub ping {
145 0     0 1   state $check = compile(Any, Optional[CodeRef]);
146 0           my ( $self, $cb ) = $check->(@_);
147 0           $_[0]->_parse_response( {
148             request_code => PING_REQUEST_CODE,
149             expected_code => PING_RESPONSE_CODE,
150             operation_name => 'ping',
151             body_ref => \'',
152             cb => $cb,
153             } );
154             }
155              
156              
157             sub is_alive {
158 0     0 1   state $check = compile(Any, Optional[CodeRef]);
159 0           my ( $self, $cb ) = $check->(@_);
160 0           my $res = eval { $self->ping; 1 };
  0            
  0            
161 0 0         $cb and return $cb->($res);
162 0           return $res;
163             }
164              
165              
166             sub get {
167 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
168 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
169 0           $self->_fetch( $bucket, $key, 1, 0, $cb );
170             }
171              
172              
173             sub get_raw {
174 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
175 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
176 0           $self->_fetch( $bucket, $key, 0, 0, $cb );
177             }
178              
179              
180             #my $LinksStructure = declare as ArrayRef[Dict[bucket => Str, key => Str, tag => Str]];
181             #coerce $LinksStructure, from HashRef[] Num, q{ int($_) };
182              
183             sub put {
184 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
185 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
186             Optional[HashRef[Str]], # indexes
187             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
188             );
189 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
190              
191 0 0 0       ($content_type //= 'application/json')
192             eq 'application/json'
193             and $value = encode_json($value);
194              
195 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
196             }
197              
198              
199              
200             sub put_raw {
201 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
202 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
203             Optional[HashRef[Str]], # indexes
204             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
205             );
206 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
207              
208 0   0       $content_type ||= 'plain/text';
209 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
210             }
211              
212              
213             sub del {
214 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
215 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
216              
217 0           my $body = RpbDelReq->encode(
218             { key => $key,
219             bucket => $bucket,
220             rw => $self->dw
221             }
222             );
223              
224 0           $self->_parse_response( {
225             request_code => DEL_REQUEST_CODE,
226             expected_code => DEL_RESPONSE_CODE,
227             operation_name => 'del',
228             key => $key,
229             bucket => $bucket,
230             body_ref => \$body,
231             cb => $cb,
232             } );
233             }
234              
235              
236             sub get_keys {
237 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
238 0           my ( $self, $bucket, $cb ) = $check->(@_);
239              
240             # reset accumulator
241 0           $self->_getkeys_accumulator([]);
242 0           my $body = RpbListKeysReq->encode( { bucket => $bucket } );
243 0           $self->_parse_response( {
244             request_code => GET_KEYS_REQUEST_CODE,
245             expected_code => GET_KEYS_RESPONSE_CODE,
246             operation_name => 'get_keys',
247             key => "*",
248             bucket => $bucket,
249             body_ref => \$body,
250             cb => $cb,
251             handle_response => \&_handle_get_keys_response,
252             lock_requests => 1,
253             } );
254             }
255              
256             sub _handle_get_keys_response {
257 0     0     my ( $self, $encoded_message, $args ) = @_;
258              
259             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
260 0           my $obj = RpbListKeysResp->decode( $encoded_message );
261 0   0       my @keys = @{$obj->keys // []};
  0            
262              
263             # case 1 : no user callback
264 0           my $cb = $args->{cb};
265 0 0         if (! $cb ) {
266             # accumulate results
267 0           push @{$self->_getkeys_accumulator}, @keys;
  0            
268              
269             # if more to come, return by saying so
270 0 0         $obj->done
271             or return (undef, 1);
272              
273             # all results are there, return the whole
274 0           my $keys = $self->_getkeys_accumulator;
275 0           $self->_getkeys_accumulator([]);
276 0           return \$keys;
277             }
278              
279             # case 2 : we have a user callback
280 0           my $last_key;
281 0 0         my $obj_done = $obj->done
282             and $last_key = pop @keys;
283              
284             # no second arg = more to come
285 0           $cb->($_) foreach @keys;
286              
287             # if more to come, return by saying so
288 0 0         $obj->done
289             or return (undef, 1);
290              
291             # process last keys if any
292 0 0         defined $last_key and $cb->($last_key, 1);
293              
294             # means: nothing left to do, all results processed through callback
295 0           return;
296             }
297              
298              
299             sub exists {
300 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
301 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
302 0           $self->_fetch( $bucket, $key, 0, 1, $cb );
303             }
304              
305             sub _fetch {
306 0     0     my ( $self, $bucket, $key, $decode, $test_exist, $cb ) = @_;
307              
308 0           my $body = RpbGetReq->encode(
309             { r => $self->r,
310             key => $key,
311             bucket => $bucket,
312             head => $test_exist
313             }
314             );
315              
316 0           $self->_parse_response( {
317             request_code => GET_REQUEST_CODE,
318             expected_code => GET_RESPONSE_CODE,
319             operation_name => 'get',
320             key => $key,
321             bucket => $bucket,
322             body_ref => \$body,
323             decode => $decode,
324             handle_response => \&_handle_get_response,
325             test_exist => $test_exist,
326             cb => $cb,
327             cb_args => 1,
328             } );
329             }
330              
331             sub _handle_get_response {
332 0     0     my ( $self, $encoded_message, $args ) = @_;
333              
334 0 0         defined $encoded_message
335             or return _die_generic_error( "Undefined Message", 'get', $args );
336              
337 0           my $decoded_message = RpbGetResp->decode($encoded_message);
338 0           my $content = $decoded_message->content;
339              
340             # empty content
341 0 0         ref $content eq 'ARRAY'
342             or return \undef;
343              
344             # if we just need to test existence
345 0 0         $args->{test_exist}
346             and return \1;
347              
348             # TODO: handle metadata
349 0           my $value = $content->[0]->value;
350 0           my $content_type = $content->[0]->content_type;
351              
352             # if we need to decode
353 0 0 0       $args->{decode} && ($content_type // '') eq 'application/json'
      0        
354             and return \decode_json($value);
355              
356             # simply return the value
357 0           return \$value;
358             }
359              
360             sub _store {
361 0     0     my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes, $links, $cb ) = @_;
362              
363 0           my $body = RpbPutReq->encode(
364             { key => $key,
365             bucket => $bucket,
366             content => {
367             value => $encoded_value,
368             content_type => $content_type,
369             ( $indexes ?
370             ( indexes => [
371             map {
372 0 0         { key => $_ , value => $indexes->{$_} }
    0          
373             } keys %$indexes
374             ])
375             : ()
376             ),
377             ( $links ? ( links => $links) : () ),
378             },
379             }
380             );
381              
382 0           $self->_parse_response( {
383             request_code => PUT_REQUEST_CODE,
384             expected_code => PUT_RESPONSE_CODE,
385             operation_name => 'put',
386             key => $key,
387             bucket => $bucket,
388             body_ref => \$body,
389             cb => $cb,
390             } );
391             }
392              
393              
394             sub query_index {
395 0     0 1   state $check = compile(Any, Str, Str, Str|ArrayRef, Optional[CodeRef]);
396 0           my ( $self, $bucket, $index, $value_to_match, $cb ) = $check->(@_);
397              
398 0           my $query_type_is_eq = 0; # eq
399 0 0         ref $value_to_match
400             and $query_type_is_eq = 1; # range
401 0 0         my $body = RpbIndexReq->encode(
402             { index => $index,
403             bucket => $bucket,
404             qtype => $query_type_is_eq,
405             $query_type_is_eq ?
406             ( range_min => $value_to_match->[0],
407             range_max => $value_to_match->[1] )
408             : (key => $value_to_match ),
409             }
410             );
411            
412 0 0         $self->_parse_response( {
413             request_code => QUERY_INDEX_REQUEST_CODE,
414             expected_code => QUERY_INDEX_RESPONSE_CODE,
415             operation_name => 'query_index',
416             $query_type_is_eq ?
417             (key => '2i query on ' . join('...', @$value_to_match) )
418             : (key => $value_to_match ),
419             bucket => $bucket,
420             body_ref => \$body,
421             handle_response => \&_handle_query_index_response,
422             cb => $cb,
423             lock_requests => 1,
424             } );
425             }
426              
427             sub _handle_query_index_response {
428 0     0     my ( $self, $encoded_message, $args ) = @_;
429            
430 0           my $obj = RpbIndexResp->decode( $encoded_message );
431 0   0       my @keys = @{$obj->keys // []};
  0            
432              
433             # case 1 : no user callback
434 0 0         my $cb = $args->{cb}
435             or return \\@keys;
436              
437             # case 2 : we have a user callback
438 0           $cb->($_) foreach @keys;
439              
440             # means: nothing left to do, all results processed through callback
441 0           return;
442              
443             }
444              
445              
446             sub get_buckets {
447 0     0 1   state $check = compile(Any, Optional[CodeRef]);
448 0           my ( $self, $cb ) = $check->(@_);
449              
450 0           $self->_parse_response( {
451             request_code => GET_BUCKETS_REQUEST_CODE,
452             expected_code => GET_BUCKETS_RESPONSE_CODE,
453             operation_name => 'get_buckets',
454             handle_response => \&_handle_get_buckets_response,
455             cb => $cb,
456             } );
457             }
458              
459             sub _handle_get_buckets_response {
460 0     0     my ( $self, $encoded_message, $args ) = @_;
461 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
462 0   0       return \($obj->buckets // []);
463             }
464              
465              
466             sub get_bucket_props {
467 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
468 0           my ( $self, $bucket, $cb ) = $check->(@_);
469              
470 0           my $body = RpbGetBucketReq->encode( { bucket => $bucket } );
471 0           $self->_parse_response( {
472             request_code => GET_BUCKET_PROPS_REQUEST_CODE,
473             expected_code => GET_BUCKET_PROPS_RESPONSE_CODE,
474             bucket => $bucket,
475             body_ref => \$body,
476             handle_response => \&_handle_get_bucket_props_response,
477             cb => $cb,
478             } );
479             }
480              
481             sub _handle_get_bucket_props_response {
482 0     0     my ( $self, $encoded_message, $args ) = @_;
483              
484 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
485 0           my $props = RpbBucketProps->decode($obj->buckets->[0]);
486 0           return \{ %$props }; # unblessing variable
487             }
488              
489              
490             sub set_bucket_props {
491 0     0 1   state $check = compile( Any, Str,
492             Dict,
493             Optional[CodeRef] );
494 0           my ( $self, $bucket, $props, $cb ) = $check->(@_);
495 0 0 0       $props->{n_val} && $props->{n_val} < 0 and croak 'n_val should be possitive integer';
496              
497 0           my $body = RpbSetBucketReq->encode({ bucket => $bucket, props => $props });
498 0           $self->_parse_response( {
499             request_code => SET_BUCKET_PROPS_REQUEST_CODE,
500             expected_code => SET_BUCKET_PROPS_RESPONSE_CODE,
501             bucket => $bucket,
502             body_ref => \$body,
503             } );
504             }
505              
506              
507             sub map_reduce {
508 0     0 1   state $check = compile(Any, Any, Optional[CodeRef]);
509 0           my ( $self, $request, $cb) = $check->(@_);
510              
511 0           my @args;
512            
513 0 0         push @args, ref($request) ? encode_json($request): $request;
514 0           push @args, 'application/json';
515 0 0         push @args, $cb if $cb;
516            
517 0           map_reduce_raw($self, @args);
518              
519             }
520              
521              
522             sub map_reduce_raw {
523 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
524 0           my ( $self, $request, $content_type, $cb) = $check->(@_);
525            
526 0           my $body = RpbMapRedReq->encode(
527             {
528             request => $request,
529             content_type => $content_type,
530             }
531             );
532              
533             # reset accumulator
534 0           $self->_mapreduce_accumulator([]);
535              
536 0           $self->_parse_response( {
537             request_code => MAP_REDUCE_REQUEST_CODE,
538             expected_code => MAP_REDUCE_RESPONSE_CODE,
539             operation => 'map_reduce',
540             body_ref => \$body,
541             cb => $cb,
542             decode => ($content_type eq 'application/json'),
543             handle_response => \&_handle_map_reduce_response,
544             lock_requests => 1,
545             } );
546             }
547              
548             sub _handle_map_reduce_response {
549 0     0     my ( $self, $encoded_message, $args ) = @_;
550 0           my $obj = RpbMapRedResp->decode( $encoded_message );
551              
552             # case 1 : no user callback
553 0           my $cb = $args->{cb};
554 0 0         if (! $cb ) {
555              
556             # all results were there, reset the accumulator and return the whole,
557 0 0         if ($obj->done) {
558 0           my $results = $self->_mapreduce_accumulator();
559 0           $self->_mapreduce_accumulator([]);
560 0           return \$results;
561             }
562              
563             # accumulate results
564 0 0 0       push @{$self->_mapreduce_accumulator},
  0            
565             { phase => $obj->phase, response => ($args->{decode}) ? decode_json($obj->response // '[]') : $obj->response };
566              
567             # more stuff to come, say so
568 0           return (undef, 1);
569              
570             }
571              
572             # case 2 : we have a user callback
573              
574             # means: nothing left to do, all results processed through callback
575             $obj->done
576 0 0         and return;
577              
578 0           $cb->($obj->response, $obj->phase, $obj->done);
579              
580             # more stuff to come, say so
581 0           return (undef, 1);
582              
583             }
584              
585             sub _parse_response {
586 0     0     my ( $self, $args ) = @_;
587              
588 0           my $socket = $self->_socket;
589 0   0       _send_bytes($socket, $args->{request_code}, $args->{body_ref} // \'');
590              
591 0           while (1) {
592 0           my $response;
593             # get and check response
594 0 0 0       my $raw_response_ref = _read_response($socket)
595             or return _die_generic_error( $! || "Socket Closed", $args);
596              
597 0           my ( $response_code, $response_body ) = unpack( 'c a*', $$raw_response_ref );
598              
599             # in case of error msg
600 0 0         if ($response_code == ERROR_RESPONSE_CODE) {
601 0           my $decoded_message = RpbErrorResp->decode($response_body);
602 0           my $errmsg = $decoded_message->errmsg;
603 0           my $errcode = $decoded_message->errcode;
604              
605 0           return _die_generic_error( "Riak Error (code: $errcode) '$errmsg'", $args);
606             }
607              
608              
609             # check if we have what we want
610 0 0         $response_code != $args->{expected_code}
611             and return _die_generic_error(
612             "Unexpected Response Code in (got: $response_code, expected: $args->{expected_code})",
613             $args );
614            
615             # default value if we don't need to handle the response.
616 0           my ($ret, $more_to_come) = ( \1, undef);
617              
618             # handle the response.
619 0 0         if (my $handle_response = $args->{handle_response}) {
620 0           ($ret, $more_to_come) = $handle_response->( $self, $response_body, $args);
621             }
622              
623             # it's a multiple response request, loop again
624             $more_to_come
625 0 0         and next;
626              
627             # there is a result, process or return it
628 0 0         if ($ret) {
629 0 0         $args->{cb} and return $args->{cb}->($$ret);
630 0           return $$ret;
631             }
632              
633             # ret was undef, means we have processed everything in the callback
634 0           return;
635              
636             }
637             }
638              
639             sub _die_generic_error {
640 0     0     my ( $error, $args ) = @_;
641              
642 0   0       my ($operation_name, $bucket, $key) =
643 0           map { $args->{$_} // "" } ( qw( operation_name bucket key) );
644              
645 0           my $extra = '';
646 0 0 0       defined $bucket && defined $key
647             and $extra = "(bucket: $bucket, key: $key) ";
648              
649 0           my $msg = "Error in '$operation_name' $extra: $error";
650 0 0         if ( my $cb = $args->{cb} ) {
    0          
651 0   0       $cb->((undef) x ($args->{cb_nb_args} // 0), $msg);
652 0           return;
653             } elsif (my $cv = $args->{cv}) {
654 0           $cv->croak($msg);
655             } else {
656 0           croak $msg;
657             }
658 0           return;
659             }
660              
661             sub _read_response {
662 0     0     my ($socket) = @_;
663 0   0       _read_bytes($socket, unpack( 'N', ${ _read_bytes($socket, 4) // return } ));
  0            
664             }
665              
666             sub _read_bytes {
667 0     0     my ( $socket, $length ) = @_;
668              
669 0           my $buffer;
670 0           my $offset = 0;
671 0           my $read = 0;
672              
673 0           while ($length > 0) {
674 0           $read = $socket->sysread( $buffer, $length, $offset );
675 0 0         if (! defined $read) {
676 0 0         $! == EINTR
677             and next;
678 0           return;
679             }
680              
681 0 0         $read > 0
682             or return;
683              
684 0           $offset += $read;
685 0           $length -= $read;
686             }
687              
688 0           return \$buffer;
689             }
690              
691              
692             sub _send_bytes {
693 0     0     my ( $socket, $request_code, $body_ref ) = @_;
694              
695 0           my $bytes = pack('N', my $length = (bytes::length($$body_ref) + 1)) . pack('c', $request_code) . $$body_ref;
696              
697 0           $length += 4;
698 0           my $offset = 0;
699 0           my $sent = 0;
700              
701 0           while ($length > 0) {
702 0           $sent = $socket->syswrite( $bytes, $length, $offset );
703 0 0         if (! defined $sent) {
704 0 0         $! == EINTR
705             and next;
706 0           return;
707             }
708              
709 0 0         $sent > 0
710             or return;
711              
712 0           $offset += $sent;
713 0           $length -= $sent;
714             }
715              
716 0           return $offset;
717             }
718              
719              
720              
721             1;
722              
723             __END__