File Coverage

blib/lib/AnyEvent/Task/Client/Checkout.pm
Criterion Covered Total %
statement 118 122 96.7
branch 50 64 78.1
condition 11 15 73.3
subroutine 17 17 100.0
pod 0 1 0.0
total 196 219 89.5


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client::Checkout;
2              
3 29     29   114 use common::sense;
  29         31  
  29         160  
4              
5 29     29   1167 use Scalar::Util;
  29         29  
  29         902  
6              
7 29     29   2806 use Callback::Frame;
  29         6749  
  29         2082  
8              
9              
10 29         230 use overload fallback => 1,
11 29     29   126 '&{}' => \&_invoked_as_sub;
  29         31  
12              
13             our $AUTOLOAD;
14              
15              
16             sub _new {
17 127     127   261 my ($class, %arg) = @_;
18 127         138 my $self = {};
19 127         161 bless $self, $class;
20              
21 127         516 $self->{client} = $arg{client};
22 127         211 Scalar::Util::weaken($self->{client});
23              
24             $self->{timeout} = exists $arg{timeout} ? $arg{timeout} :
25             exists $arg{client}->{timeout} ? $arg{client}->{timeout} :
26 127 50       341 30;
    100          
27              
28 127 100       223 $self->{log_defer_object} = $arg{log_defer_object} if exists $arg{log_defer_object};
29              
30 127         198 $self->{pending_requests} = [];
31              
32 127         231 return $self;
33             }
34              
35             sub AUTOLOAD {
36 21     21   5217 my $self = shift;
37              
38 21 50       52 my $type = ref($self) or die "$self is not an object";
39              
40 21         21 my $name = $AUTOLOAD;
41 21         128 $name =~ s/.*://;
42              
43 21         46 $self->{last_name} = $name;
44              
45 21         77 return $self->_queue_request([ $name, @_, ]);
46             }
47              
48             sub _invoked_as_sub {
49 121     121   2943 my $self = shift;
50              
51             return sub {
52 121     121   152 $self->{last_name} = undef;
53              
54 121         241 return $self->_queue_request([ undef, @_, ]);
55 121         423 };
56             }
57              
58             sub _queue_request {
59 142     142   125 my ($self, $request) = @_;
60              
61 142 100       333 unless (Callback::Frame::is_frame($request->[-1])) {
62 128         639 my $name = undef;
63              
64 128 100 100     510 if (defined $self->{client}->{name} || defined $self->{last_name}) {
65 16 100       39 $name = defined $self->{client}->{name} ? $self->{client}->{name} : 'ANONYMOUS CLIENT';
66 16         16 $name .= ' -> ';
67 16 100       39 $name .= defined $self->{last_name} ? $self->{last_name} : 'NO METHOD';
68             }
69              
70 128         235 my %args = (code => $request->[-1]);
71              
72 128 100       182 $args{name} = $name if defined $name;
73              
74 128 50       186 $request->[-1] = frame(%args)
75             unless Callback::Frame::is_frame($request->[-1]);
76             }
77              
78 142         4631 push @{$self->{pending_requests}}, $request;
  142         245  
79              
80 142         279 $self->_install_timeout_timer;
81              
82 142         224 $self->_try_to_fill_requests;
83              
84 142         566 return;
85             }
86              
87             sub _install_timeout_timer {
88 276     276   251 my ($self) = @_;
89              
90 276 50       470 return if !defined $self->{timeout};
91 276 100       494 return if exists $self->{timeout_timer};
92              
93             $self->{timeout_timer} = AE::timer $self->{timeout}, 0, sub {
94 3     3   592192 delete $self->{timeout_timer};
95              
96 3         44 $self->{client}->remove_pending_checkout($self);
97              
98 3 50       26 if (exists $self->{worker}) {
99 0         0 $self->{client}->destroy_worker($self->{worker});
100 0         0 delete $self->{worker};
101             }
102              
103 3         49 $self->throw_fatal_error("timed out after $self->{timeout} seconds");
104 133         837 };
105             }
106              
107             sub _throw_error {
108 17     17   30 my ($self, $err) = @_;
109              
110 17         72 $self->{error_occurred} = 1;
111              
112 17         27 my $current_cb;
113              
114 17 100       49 if ($self->{current_cb}) {
    50          
115 13         24 $current_cb = $self->{current_cb};
116 4         30 } elsif (@{$self->{pending_requests}}) {
117 4         14 $current_cb = $self->{pending_requests}->[0]->[-1];
118             } else {
119 0         0 die "_throw_error called but no callback installed. Error thrown was: $err";
120             }
121              
122 17         34 $self->{pending_requests} = undef;
123              
124 17 50       83 if ($current_cb) {
125             frame(existing_frame => $current_cb,
126             code => sub {
127 17     17   1246 die $err;
128 17         215 })->();
129             }
130              
131 17         10916 $self->{cmd_handler} = undef;
132             }
133              
134             sub throw_fatal_error {
135 6     6 0 61 my ($self, $err) = @_;
136              
137 6         38 $self->{fatal_error} = $err;
138              
139 6         29 $self->_throw_error($err);
140             }
141              
142             sub _try_to_fill_requests {
143 398     398   387 my ($self) = @_;
144              
145 398 100       775 return unless exists $self->{worker};
146 269 100       226 return unless @{$self->{pending_requests}};
  269         752  
147              
148 136         102 my $request = shift @{$self->{pending_requests}};
  136         198  
149              
150 136         102 my $cb = pop @{$request};
  136         162  
151 136         186 $self->{current_cb} = $cb;
152 136         288 Scalar::Util::weaken($self->{current_cb});
153              
154 136 100       230 if ($self->{fatal_error}) {
155 2         12 $self->_throw_error($self->{fatal_error});
156 2         13 return;
157             }
158              
159 134         127 my $method_name = $request->[0];
160              
161 134 100       213 if (!defined $method_name) {
162 117         127 $method_name = '->()';
163 117         107 shift @$request;
164             }
165              
166 134         226 $self->_install_timeout_timer;
167              
168 134         595 $self->{worker}->push_write( json => [ 'do', {}, @$request, ], );
169              
170 134         6201 my $timer;
171              
172 134 100       307 if ($self->{log_defer_object}) {
173 5         20 $timer = $self->{log_defer_object}->timer($method_name);
174             }
175              
176             $self->{cmd_handler} = sub {
177 132     132   230996 my ($handle, $response) = @_;
178              
179 132         166 undef $timer;
180              
181 132         339 my ($response_code, $meta, $response_value) = @$response;
182              
183 132 100 66     340 if ($self->{log_defer_object} && $meta->{ld}) {
184 2         11 $self->{log_defer_object}->merge($meta->{ld});
185             }
186              
187 132 100       355 if ($response_code eq 'ok') {
    50          
188 123         137 local $@ = undef;
189 123         275 $cb->($self, $response_value);
190             } elsif ($response_code eq 'er') {
191 9         29 $self->_throw_error($response_value);
192             } else {
193 0         0 die "Unrecognized response_code: $response_code";
194             }
195              
196 132         49133 delete $self->{timeout_timer};
197 132         201 delete $self->{cmd_handler};
198              
199 132         236 $self->_try_to_fill_requests;
200 134         831 };
201              
202 134         464 $self->{worker}->push_read( json => $self->{cmd_handler} );
203             }
204              
205             sub DESTROY {
206 126     126   3391 my ($self) = @_;
207              
208             $self->{client}->remove_pending_checkout($self)
209 126 50       539 if $self->{client};
210              
211 126 100       348 if (exists $self->{worker}) {
212 123         131 my $worker = $self->{worker};
213 123 50       373 delete $self->{client}->{workers_to_checkouts}->{0 + $worker} if $self->{client};
214 123         124 delete $self->{worker};
215              
216 123 100 66     859 if ($self->{fatal_error} || ($self->{error_occurred} && $self->{client} && !$self->{client}->{dont_refork_after_error})) {
      66        
      66        
217 9 50       64 $self->{client}->destroy_worker($worker) if $self->{client};
218 9 50       55 $self->{client}->populate_workers if $self->{client};
219             } else {
220 114         333 $worker->push_write( json => [ 'dn', {} ] );
221 114 50       6562 $self->{client}->make_worker_available($worker) if $self->{client};
222 114 50       356 $self->{client}->try_to_fill_pending_checkouts if $self->{client};
223             }
224             }
225              
226 126         2274 $self->{pending_requests} = $self->{current_cb} = $self->{timeout_timer} = $self->{cmd_handler} = undef;
227             }
228              
229              
230             1;