File Coverage

blib/lib/RPC/ExtDirect/Client/Async.pm
Criterion Covered Total %
statement 155 161 96.2
branch 43 60 71.6
condition 9 12 75.0
subroutine 32 36 88.8
pod 7 9 77.7
total 246 278 88.4


line stmt bran cond sub pod time code
1             package RPC::ExtDirect::Client::Async;
2              
3 12     12   1043403 use strict;
  12         26  
  12         489  
4 12     12   61 use warnings;
  12         23  
  12         370  
5              
6 12     12   62 use Carp;
  12         25  
  12         806  
7 12     12   70 use File::Spec;
  12         23  
  12         270  
8 12     12   2331 use AnyEvent::HTTP;
  12         78625  
  12         722  
9              
10 12     12   7443 use RPC::ExtDirect::Util::Accessor;
  12         7290  
  12         317  
11 12     12   7194 use RPC::ExtDirect::Config;
  12         100788  
  12         355  
12 12     12   8717 use RPC::ExtDirect::API;
  12         174187  
  12         100  
13 12     12   7223 use RPC::ExtDirect;
  12         58162  
  12         80  
14 12     12   11825 use RPC::ExtDirect::Client;
  12         79597  
  12         370  
15              
16 12     12   84 use base 'RPC::ExtDirect::Client';
  12         21  
  12         8294  
17              
18             #
19             # This module is not compatible with RPC::ExtDirect < 3.0
20             #
21              
22             croak __PACKAGE__." requires RPC::ExtDirect 3.0+"
23             if $RPC::ExtDirect::VERSION lt '3.0';
24              
25             ### PACKAGE GLOBAL VARIABLE ###
26             #
27             # Module version
28             #
29              
30             our $VERSION = '1.00';
31              
32             ### PUBLIC INSTANCE METHOD ###
33             #
34             # Call specified Action's Method asynchronously
35             #
36              
37 29     29 1 198090 sub call_async { shift->async_request('call', @_) }
38              
39             ### PUBLIC INSTANCE METHOD ###
40             #
41             # Submit a form to specified Action's Method asynchronously
42             #
43              
44 10     10 1 11772 sub submit_async { shift->async_request('form', @_) }
45              
46             ### PUBLIC INSTANCE METHOD ###
47             #
48             # Upload a file using POST form. Same as submit()
49             #
50              
51             *upload_async = *submit_async;
52              
53             ### PUBLIC INSTANCE METHOD ###
54             #
55             # Poll server for events asynchronously
56             #
57              
58 8     8 1 2742 sub poll_async { shift->async_request('poll', @_) }
59              
60             #
61             # This is to prevent mistakes leading to hard to find bugs
62             #
63              
64 0     0 1 0 sub call { croak "Use call_async instead" }
65 0     0 1 0 sub submit { croak "Use submit_async instead" }
66 0     0 1 0 sub upload { croak "Use upload_async instead" }
67 0     0 1 0 sub poll { croak "Use poll_async instead" }
68              
69             ### PUBLIC INSTANCE METHOD ###
70             #
71             # Run a specified request type asynchronously
72             #
73              
74             sub async_request {
75 47     47 0 112 my $self = shift;
76 47         155 my $type = shift;
77            
78 47         210 my $tr_class = $self->transaction_class;
79            
80             # We try to avoid action-at-a-distance here, so we will
81             # call all the stuff that could die() up front, to pass
82             # the exception on to the caller immediately rather than
83             # blowing up later on.
84             # The only case when that may happen realistically is
85             # when the caller forgot to specify a callback coderef;
86             # anything else is passed as an $error to the callback
87             # (which is hard to do when it's missing).
88 47         86 eval {
89 47         460 my $transaction = $tr_class->new(@_);
90 45         168 $self->_async_request($type, $transaction);
91             };
92            
93 47 50       151 if ($@) { croak 'ARRAY' eq ref($@) ? $@->[0] : $@ };
  2 100       720  
94            
95             # Stay positive
96 45         126 return 1;
97             }
98              
99             ### PUBLIC INSTANCE METHOD ###
100             #
101             # Return the name of the Transaction class
102             #
103              
104 47     47 0 175 sub transaction_class { 'RPC::ExtDirect::Client::Async::Transaction' }
105              
106             ### PUBLIC INSTANCE METHOD ###
107             #
108             # Read-write accessor
109             #
110              
111             RPC::ExtDirect::Util::Accessor->mk_accessor(
112             simple => [qw/ api_ready exception request_queue /],
113             );
114              
115             ############## PRIVATE METHODS BELOW ##############
116              
117             ### PRIVATE INSTANCE METHOD ###
118             #
119             # Initialize API declaration
120             #
121              
122             sub _init_api {
123 12     12   833649 my ($self, $api) = @_;
124            
125             # If we're passed a local API instance, init immediately
126             # and don't bother with request queue - we won't need it anyway.
127 12 100       112 if ($api) {
128 1         32 my $cv = $self->cv;
129 1         37 my $api_cb = $self->api_cb;
130            
131 1 50       294 $cv->begin if $cv;
132            
133 1         19 $self->_assign_api($api);
134 1         366 $self->api_ready(1);
135            
136 1 50       18 $api_cb->($self, 1) if $api_cb;
137            
138 1 50       16 $cv->end if $cv;
139             }
140             else {
141            
142             # We want to be truly asynchronous, so instead of blocking
143             # on API retrieval, we create a request queue and return
144             # immediately. If any call/form/poll requests happen before
145             # we've got the API result back, we push them in the queue
146             # and wait for the API to arrive, then re-run the requests.
147             # After the API declaration has been retrieved, all subsequent
148             # requests run without queuing.
149 11         489 $self->request_queue([]);
150              
151             $self->_get_api(sub {
152 11     11   206 my ($success, $api_js, $error) = @_;
153            
154 11 100       65 if ( $success ) {
155 10         263 $self->_import_api($api_js);
156 10         34270 $self->api_ready(1);
157             }
158             else {
159 1         33 $self->exception($error);
160             }
161            
162 11 100       761 $self->api_cb->($self, $success, $error) if $self->api_cb;
163            
164 11         6010 my $queue = $self->request_queue;
165 11         112 delete $self->{request_queue}; # A bit quirky
166            
167 11         361 $_->($success, $error) for @$queue;
168 11         359 });
169             }
170            
171 12         151 return 1;
172             }
173              
174             ### PRIVATE INSTANCE METHOD ###
175             #
176             # Receive API declaration from the specified server,
177             # parse it and return a Client::API object
178             #
179              
180             sub _get_api {
181 11     11   41 my ($self, $cb) = @_;
182              
183             # Run additional checks before firing the curried callback
184             my $api_cb = sub {
185 11     11   396201 my ($content, $headers) = @_;
186              
187 11         51 my $status = $headers->{Status};
188 12     12   11782 my $content_length = do { use bytes; length $content; };
  12         115  
  12         64  
  11         29  
  11         35  
189 11   66     180 my $success = $status eq '200' && $content_length > 0;
190 11         30 my $error;
191            
192 11 100       52 if ( !$success ) {
193 1 50       5 if ( $status ne '200' ) {
    0          
194 1         4 $error = "Can't download API declaration: $status";
195             }
196             elsif ( !$content_length ) {
197 0         0 $error = "Empty API declaration received";
198             }
199             }
200            
201 11         1711 my $cv = $self->cv;
202 11 50       230 $cv->end if $cv;
203            
204 11         226 $self->{api_guard} = undef;
205            
206 11         74 $cb->($success, $content, $error);
207 11         112 };
208            
209 11         621 my $cv = $self->cv;
210 11         396 my $uri = $self->_get_uri('api');
211 11         2067 my $params = $self->{http_params};
212            
213 11 50       1416 $cv->begin if $cv;
214            
215             #
216             # Note that we're passing a falsy value to the `persistent` option
217             # here; that's because without it, GET requests will generate some
218             # weird 596 error code responses for every request after the very
219             # first one, if a condvar is used.
220             #
221             # I can surmise that it has something to do with AnyEvent::HTTP
222             # having procedural interface without any clear way to separate
223             # requests. Probably something within the (very tangled) bowels
224             # of AnyEvent::HTTP::http_request is erroneously confusing condvars;
225             # in any case, turning off permanent connections seem to cure that.
226             #
227             # You can override that by passing `persistent => 1` to the Client
228             # constructor, but don't try to do that if you are not ready to
229             # spend HOURS untangling the callback hell inside http_request.
230             # I was not, hence the "fix".
231             #
232             # Also store the "cancellation guard" to prevent it being destroyed,
233             # which would end the request prematurely.
234             #
235 11         836 $self->{api_guard} = AnyEvent::HTTP::http_request(
236             GET => $uri,
237             persistent => !1,
238             %$params,
239             $api_cb,
240             );
241            
242 11         198945 return 1;
243             }
244              
245             ### PRIVATE INSTANCE METHOD ###
246             #
247             # Queue asynchronous request(s)
248             #
249              
250             sub _queue_request {
251 23     23   35 my $self = shift;
252            
253 23         49 my $queue = $self->{request_queue};
254            
255 23         60 push @$queue, @_;
256             }
257              
258             ### PRIVATE INSTANCE METHOD ###
259             #
260             # Make an HTTP request in asynchronous fashion
261             #
262              
263             sub _async_request {
264 45     45   88 my ($self, $type, $transaction) = @_;
265            
266             # Transaction should be primed *before* the request has been
267             # dispatched. This way we ensure that requests don't get stuck
268             # in the queue if something goes wrong (API retrieval fails, etc).
269             # Also if we're passed a cv this will prime it enough times so
270             # that any blocking later on won't end prematurely before *all*
271             # queued requests have had a chance to run.
272 45         147 $transaction->start;
273            
274             # The parameters to this sub ($api_success, $api_error) mean
275             # success of the API retrieval operation, and an error that caused
276             # the failure, if any. This should NOT be confused with success
277             # of the HTTP request below.
278             my $request_closure = sub {
279 45     45   105 my ($api_success, $api_error) = @_;
280            
281             # If request was queued and API retrieval failed,
282             # transaction still has to finish.
283 45 100       140 return $transaction->finish(undef, $api_success, $api_error)
284             unless $api_success;
285            
286 44         141 my $prepare = "_prepare_${type}_request";
287 44 100       154 my $method = $type eq 'poll' ? 'GET' : 'POST';
288              
289             # We can't allow an exception to be thrown - there is no
290             # enveloping code to handle it. So we catch it here instead,
291             # and pass it to the transaction to be treated as an error.
292             # Note that the transaction itself has already been started
293             # before the request closure was executed.
294             my ($uri, $request_content, $http_params, $request_options)
295 44         79 = eval { $self->$prepare($transaction) };
  44         477  
296            
297 44 100       9195 if ( my $xcpt = $@ ) {
298 25 50       94 my $err = 'ARRAY' eq ref($xcpt) ? $xcpt->[0] : $xcpt;
299            
300 25         75 return $transaction->finish(undef, !1, $err);
301             }
302            
303 19         40 my $request_headers = $request_options->{headers};
304              
305             # TODO Handle errors
306 19         110 my $guard = AnyEvent::HTTP::http_request(
307             $method, $uri,
308             headers => $request_headers,
309             body => $request_content,
310             persistent => !1,
311             %$http_params,
312             $self->_curry_response_cb($type, $transaction),
313             );
314            
315 19         33770 $transaction->guard($guard);
316            
317 19         941 return 1;
318 45         379 };
319            
320             # If a fatal exception has occured before this point in time
321             # (API retrieval failed) run the request closure immediately
322             # with an error. This will fall through and finish the
323             # transaction, passing the error to the callback subroutine.
324 45 50       1221 if ( my $fatal_exception = $self->exception ) {
    100          
325 0         0 $request_closure->(!1, $fatal_exception);
326             }
327            
328             # If API is ready, run the request closure immediately with the
329             # success flag set to true.
330             elsif ( $self->api_ready ) {
331 22         919 $request_closure->(1);
332             }
333            
334             # If API is not ready, queue the request closure to be ran
335             # at a later time, when the result of API retrieval operation
336             # will be known.
337             else {
338 23         891 $self->_queue_request($request_closure);
339             }
340            
341 45         311 return 1;
342             }
343              
344             ### PRIVATE INSTANCE METHOD ###
345             #
346             # Parse cookies if provided, creating Cookie header
347             #
348              
349             sub _parse_cookies {
350 21     21   33936 my ($self, $to, $from) = @_;
351            
352 21         231 $self->SUPER::_parse_cookies($to, $from);
353            
354             # This results in Cookie header being a hashref,
355             # but we need a string for AnyEvent::HTTP::http_request
356 21 100 66     608 if ( $to->{headers} && (my $cookies = $to->{headers}->{Cookie}) ) {
357 8         40 $to->{headers}->{Cookie} = join '; ', @$cookies;
358             }
359             }
360              
361             ### PRIVATE INSTANCE METHOD ###
362             #
363             # Generate result handling callback
364             #
365              
366             sub _curry_response_cb {
367 19     19   47 my ($self, $type, $transaction) = @_;
368            
369             return sub {
370 19     19   298667 my ($data, $headers) = @_;
371            
372 19         77 my $status = $headers->{Status};
373 19         65 my $success = $status eq '200';
374            
375             # No sense in trying to decode the response if request failed
376 19 50       104 return $transaction->finish(undef, !1, $headers->{Reason})
377             unless $success;
378            
379 19         47 local $@;
380 19         87 my $handler = "_handle_${type}_response";
381 19 50       97 my $response = eval {
382 19         483 $self->$handler({
383             status => $status,
384             success => $success,
385             content => $data,
386             })
387             } if $success;
388            
389 19 50       2212 my $error = 'ARRAY' eq ref($@) ? $@->[0] : $@;
390            
391 19 50       85 return $transaction->finish(undef, !1, $error) if $error;
392            
393             # We're only interested in the data, unless it was a poll.
394 19 50       168 my $result = 'poll' eq $type ? $response
    100          
395             : 'HASH' eq ref($response) ? $response->{result}
396             : $response
397             ;
398            
399 19         122 return $transaction->finish($result, $success);
400 19         299 };
401             }
402              
403             package
404             RPC::ExtDirect::Client::Async::Transaction;
405              
406 12     12   10202 use Carp;
  12         27  
  12         881  
407              
408 12     12   77 use base 'RPC::ExtDirect::Client::Transaction';
  12         17  
  12         9902  
409              
410             my @fields = qw/ cb cv actual_arg fields /;
411              
412             sub new {
413 47     47   311 my ($class, %params) = @_;
414            
415 47         98 my $cb = $params{cb};
416            
417 47 100 66     360 die ["Callback subroutine is required"]
      100        
418             if 'CODE' ne ref $cb && !($cb && $cb->isa('AnyEvent::CondVar'));
419            
420 45         141 my %self_params = map { $_ => delete $params{$_} } @fields;
  180         573  
421            
422 45         517 my $self = $class->SUPER::new(%params);
423            
424 45         1725 @$self{ keys %self_params } = values %self_params;
425            
426 45         171 return $self;
427             }
428              
429             sub start {
430 45     45   66 my ($self) = @_;
431            
432 45         1520 my $cv = $self->cv;
433            
434 45 100       470 $cv->begin if $cv;
435             }
436              
437             sub finish {
438 45     45   115 my ($self, $result, $success, $error) = @_;
439            
440 45         3662 my $cb = $self->cb;
441 45         1859 my $cv = $self->cv;
442            
443 45 50       553 $cb->($result, $success, $error) if $cb;
444 45 100       103476 $cv->end if $cv;
445            
446 45         3541 return $success;
447             }
448              
449             RPC::ExtDirect::Util::Accessor->mk_accessors(
450             simple => [qw/ cb cv guard /],
451             );
452              
453             1;