File Coverage

blib/lib/AnyEvent/Task/Client/Checkout.pm
Criterion Covered Total %
statement 118 122 96.7
branch 50 64 78.1
condition 11 15 73.3
subroutine 17 17 100.0
pod 0 1 0.0
total 196 219 89.5


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client::Checkout;
2              
3 29     29   113 use common::sense;
  29         35  
  29         157  
4              
5 29     29   1205 use Scalar::Util;
  29         33  
  29         959  
6              
7 29     29   2729 use Callback::Frame;
  29         6655  
  29         2296  
8              
9              
10 29         227 use overload fallback => 1,
11 29     29   149 '&{}' => \&_invoked_as_sub;
  29         40  
12              
13             our $AUTOLOAD;
14              
15              
16             sub _new {
17 127     127   225 my ($class, %arg) = @_;
18 127         151 my $self = {};
19 127         146 bless $self, $class;
20              
21 127         512 $self->{client} = $arg{client};
22 127         200 Scalar::Util::weaken($self->{client});
23              
24             $self->{timeout} = exists $arg{timeout} ? $arg{timeout} :
25             exists $arg{client}->{timeout} ? $arg{client}->{timeout} :
26 127 50       309 30;
    100          
27              
28 127 100       228 $self->{log_defer_object} = $arg{log_defer_object} if exists $arg{log_defer_object};
29              
30 127         194 $self->{pending_requests} = [];
31              
32 127         240 return $self;
33             }
34              
35             sub AUTOLOAD {
36 21     21   9011 my $self = shift;
37              
38 21 50       60 my $type = ref($self) or die "$self is not an object";
39              
40 21         23 my $name = $AUTOLOAD;
41 21         128 $name =~ s/.*://;
42              
43 21         47 $self->{last_name} = $name;
44              
45 21         77 return $self->_queue_request([ $name, @_, ]);
46             }
47              
48             sub _invoked_as_sub {
49 121     121   3341 my $self = shift;
50              
51             return sub {
52 121     121   150 $self->{last_name} = undef;
53              
54 121         235 return $self->_queue_request([ undef, @_, ]);
55 121         409 };
56             }
57              
58             sub _queue_request {
59 142     142   138 my ($self, $request) = @_;
60              
61 142 100       306 unless (Callback::Frame::is_frame($request->[-1])) {
62 128         633 my $name = undef;
63              
64 128 100 100     492 if (defined $self->{client}->{name} || defined $self->{last_name}) {
65 16 100       44 $name = defined $self->{client}->{name} ? $self->{client}->{name} : 'ANONYMOUS CLIENT';
66 16         22 $name .= ' -> ';
67 16 100       53 $name .= defined $self->{last_name} ? $self->{last_name} : 'NO METHOD';
68             }
69              
70 128         234 my %args = (code => $request->[-1]);
71              
72 128 100       193 $args{name} = $name if defined $name;
73              
74 128 50       179 $request->[-1] = frame(%args)
75             unless Callback::Frame::is_frame($request->[-1]);
76             }
77              
78 142         4697 push @{$self->{pending_requests}}, $request;
  142         242  
79              
80 142         265 $self->_install_timeout_timer;
81              
82 142         225 $self->_try_to_fill_requests;
83              
84 142         703 return;
85             }
86              
87             sub _install_timeout_timer {
88 276     276   253 my ($self) = @_;
89              
90 276 50       495 return if !defined $self->{timeout};
91 276 100       480 return if exists $self->{timeout_timer};
92              
93             $self->{timeout_timer} = AE::timer $self->{timeout}, 0, sub {
94 3     3   592214 delete $self->{timeout_timer};
95              
96 3         50 $self->{client}->remove_pending_checkout($self);
97              
98 3 50       32 if (exists $self->{worker}) {
99 0         0 $self->{client}->destroy_worker($self->{worker});
100 0         0 delete $self->{worker};
101             }
102              
103 3         64 $self->throw_fatal_error("timed out after $self->{timeout} seconds");
104 133         823 };
105             }
106              
107             sub _throw_error {
108 17     17   34 my ($self, $err) = @_;
109              
110 17         64 $self->{error_occurred} = 1;
111              
112 17         26 my $current_cb;
113              
114 17 100       60 if ($self->{current_cb}) {
    50          
115 13         25 $current_cb = $self->{current_cb};
116 4         19 } elsif (@{$self->{pending_requests}}) {
117 4         12 $current_cb = $self->{pending_requests}->[0]->[-1];
118             } else {
119 0         0 die "_throw_error called but no callback installed. Error thrown was: $err";
120             }
121              
122 17         36 $self->{pending_requests} = undef;
123              
124 17 50       71 if ($current_cb) {
125             frame(existing_frame => $current_cb,
126             code => sub {
127 17     17   1413 die $err;
128 17         276 })->();
129             }
130              
131 17         10202 $self->{cmd_handler} = undef;
132             }
133              
134             sub throw_fatal_error {
135 6     6 0 33 my ($self, $err) = @_;
136              
137 6         27 $self->{fatal_error} = $err;
138              
139 6         28 $self->_throw_error($err);
140             }
141              
142             sub _try_to_fill_requests {
143 398     398   413 my ($self) = @_;
144              
145 398 100       803 return unless exists $self->{worker};
146 269 100       215 return unless @{$self->{pending_requests}};
  269         743  
147              
148 136         104 my $request = shift @{$self->{pending_requests}};
  136         221  
149              
150 136         104 my $cb = pop @{$request};
  136         171  
151 136         176 $self->{current_cb} = $cb;
152 136         347 Scalar::Util::weaken($self->{current_cb});
153              
154 136 100       247 if ($self->{fatal_error}) {
155 2         14 $self->_throw_error($self->{fatal_error});
156 2         15 return;
157             }
158              
159 134         137 my $method_name = $request->[0];
160              
161 134 100       231 if (!defined $method_name) {
162 117         120 $method_name = '->()';
163 117         109 shift @$request;
164             }
165              
166 134         227 $self->_install_timeout_timer;
167              
168 134         796 $self->{worker}->push_write( json => [ 'do', {}, @$request, ], );
169              
170 134         7047 my $timer;
171              
172 134 100       326 if ($self->{log_defer_object}) {
173 5         18 $timer = $self->{log_defer_object}->timer($method_name);
174             }
175              
176             $self->{cmd_handler} = sub {
177 132     132   231315 my ($handle, $response) = @_;
178              
179 132         148 undef $timer;
180              
181 132         311 my ($response_code, $meta, $response_value) = @$response;
182              
183 132 100 66     335 if ($self->{log_defer_object} && $meta->{ld}) {
184 2         10 $self->{log_defer_object}->merge($meta->{ld});
185             }
186              
187 132 100       397 if ($response_code eq 'ok') {
    50          
188 123         122 local $@ = undef;
189 123         277 $cb->($self, $response_value);
190             } elsif ($response_code eq 'er') {
191 9         32 $self->_throw_error($response_value);
192             } else {
193 0         0 die "Unrecognized response_code: $response_code";
194             }
195              
196 132         50192 delete $self->{timeout_timer};
197 132         187 delete $self->{cmd_handler};
198              
199 132         239 $self->_try_to_fill_requests;
200 134         842 };
201              
202 134         490 $self->{worker}->push_read( json => $self->{cmd_handler} );
203             }
204              
205             sub DESTROY {
206 126     126   3360 my ($self) = @_;
207              
208             $self->{client}->remove_pending_checkout($self)
209 126 50       587 if $self->{client};
210              
211 126 100       366 if (exists $self->{worker}) {
212 123         126 my $worker = $self->{worker};
213 123 50       385 delete $self->{client}->{workers_to_checkouts}->{0 + $worker} if $self->{client};
214 123         137 delete $self->{worker};
215              
216 123 100 66     566 if ($self->{fatal_error} || ($self->{error_occurred} && $self->{client} && !$self->{client}->{dont_refork_after_error})) {
      66        
      66        
217 9 50       58 $self->{client}->destroy_worker($worker) if $self->{client};
218 9 50       56 $self->{client}->populate_workers if $self->{client};
219             } else {
220 114         335 $worker->push_write( json => [ 'dn', {} ] );
221 114 50       6503 $self->{client}->make_worker_available($worker) if $self->{client};
222 114 50       330 $self->{client}->try_to_fill_pending_checkouts if $self->{client};
223             }
224             }
225              
226 126         3259 $self->{pending_requests} = $self->{current_cb} = $self->{timeout_timer} = $self->{cmd_handler} = undef;
227             }
228              
229              
230             1;