File Coverage

blib/lib/AnyEvent/Task/Client/Checkout.pm
Criterion Covered Total %
statement 118 122 96.7
branch 50 64 78.1
condition 10 15 66.6
subroutine 17 17 100.0
pod 0 1 0.0
total 195 219 89.0


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client::Checkout;
2              
3 29     29   99 use common::sense;
  29         31  
  29         154  
4              
5 29     29   1030 use Scalar::Util;
  29         35  
  29         829  
6              
7 29     29   2570 use Callback::Frame;
  29         6511  
  29         1994  
8              
9              
10 29         191 use overload fallback => 1,
11 29     29   123 '&{}' => \&_invoked_as_sub;
  29         39  
12              
13             our $AUTOLOAD;
14              
15              
16             sub _new {
17 127     127   224 my ($class, %arg) = @_;
18 127         134 my $self = {};
19 127         146 bless $self, $class;
20              
21 127         495 $self->{client} = $arg{client};
22 127         209 Scalar::Util::weaken($self->{client});
23              
24             $self->{timeout} = exists $arg{timeout} ? $arg{timeout} :
25             exists $arg{client}->{timeout} ? $arg{client}->{timeout} :
26 127 50       315 30;
    100          
27              
28 127 100       208 $self->{log_defer_object} = $arg{log_defer_object} if exists $arg{log_defer_object};
29              
30 127         178 $self->{pending_requests} = [];
31              
32 127         235 return $self;
33             }
34              
35             sub AUTOLOAD {
36 21     21   4261 my $self = shift;
37              
38 21 50       50 my $type = ref($self) or die "$self is not an object";
39              
40 21         24 my $name = $AUTOLOAD;
41 21         108 $name =~ s/.*://;
42              
43 21         36 $self->{last_name} = $name;
44              
45 21         62 return $self->_queue_request([ $name, @_, ]);
46             }
47              
48             sub _invoked_as_sub {
49 121     121   3184 my $self = shift;
50              
51             return sub {
52 121     121   137 $self->{last_name} = undef;
53              
54 121         226 return $self->_queue_request([ undef, @_, ]);
55 121         392 };
56             }
57              
58             sub _queue_request {
59 142     142   142 my ($self, $request) = @_;
60              
61 142 100       308 unless (Callback::Frame::is_frame($request->[-1])) {
62 128         592 my $name = undef;
63              
64 128 100 100     475 if (defined $self->{client}->{name} || defined $self->{last_name}) {
65 16 100       41 $name = defined $self->{client}->{name} ? $self->{client}->{name} : 'ANONYMOUS CLIENT';
66 16         20 $name .= ' -> ';
67 16 100       32 $name .= defined $self->{last_name} ? $self->{last_name} : 'NO METHOD';
68             }
69              
70 128         262 my %args = (code => $request->[-1]);
71              
72 128 100       173 $args{name} = $name if defined $name;
73              
74 128 50       181 $request->[-1] = frame(%args)
75             unless Callback::Frame::is_frame($request->[-1]);
76             }
77              
78 142         4440 push @{$self->{pending_requests}}, $request;
  142         265  
79              
80 142         253 $self->_install_timeout_timer;
81              
82 142         216 $self->_try_to_fill_requests;
83              
84 142         593 return;
85             }
86              
87             sub _install_timeout_timer {
88 276     276   240 my ($self) = @_;
89              
90 276 50       488 return if !defined $self->{timeout};
91 276 100       448 return if exists $self->{timeout_timer};
92              
93             $self->{timeout_timer} = AE::timer $self->{timeout}, 0, sub {
94 3     3   592310 delete $self->{timeout_timer};
95              
96 3         64 $self->{client}->remove_pending_checkout($self);
97              
98 3 50       40 if (exists $self->{worker}) {
99 0         0 $self->{client}->destroy_worker($self->{worker});
100 0         0 delete $self->{worker};
101             }
102              
103 3         80 $self->throw_fatal_error("timed out after $self->{timeout} seconds");
104 133         763 };
105             }
106              
107             sub _throw_error {
108 17     17   33 my ($self, $err) = @_;
109              
110 17         46 $self->{error_occurred} = 1;
111              
112 17         23 my $current_cb;
113              
114 17 100       59 if ($self->{current_cb}) {
    50          
115 13         24 $current_cb = $self->{current_cb};
116 4         18 } elsif (@{$self->{pending_requests}}) {
117 4         14 $current_cb = $self->{pending_requests}->[0]->[-1];
118             } else {
119 0         0 die "_throw_error called but no callback installed. Error thrown was: $err";
120             }
121              
122 17         31 $self->{pending_requests} = undef;
123              
124 17 50       138 if ($current_cb) {
125             frame(existing_frame => $current_cb,
126             code => sub {
127 17     17   1445 die $err;
128 17         216 })->();
129             }
130              
131 17         11300 $self->{cmd_handler} = undef;
132             }
133              
134             sub throw_fatal_error {
135 6     6 0 37 my ($self, $err) = @_;
136              
137 6         88 $self->{fatal_error} = $err;
138              
139 6         43 $self->_throw_error($err);
140             }
141              
142             sub _try_to_fill_requests {
143 398     398   377 my ($self) = @_;
144              
145 398 100       743 return unless exists $self->{worker};
146 269 100       216 return unless @{$self->{pending_requests}};
  269         696  
147              
148 136         105 my $request = shift @{$self->{pending_requests}};
  136         211  
149              
150 136         101 my $cb = pop @{$request};
  136         153  
151 136         195 $self->{current_cb} = $cb;
152 136         289 Scalar::Util::weaken($self->{current_cb});
153              
154 136 100       231 if ($self->{fatal_error}) {
155 2         10 $self->_throw_error($self->{fatal_error});
156 2         13 return;
157             }
158              
159 134         131 my $method_name = $request->[0];
160              
161 134 100       254 if (!defined $method_name) {
162 117         130 $method_name = '->()';
163 117         98 shift @$request;
164             }
165              
166 134         218 $self->_install_timeout_timer;
167              
168 134         508 $self->{worker}->push_write( json => [ 'do', {}, @$request, ], );
169              
170 134         6334 my $timer;
171              
172 134 100       314 if ($self->{log_defer_object}) {
173 5         26 $timer = $self->{log_defer_object}->timer($method_name);
174             }
175              
176             $self->{cmd_handler} = sub {
177 132     132   225474 my ($handle, $response) = @_;
178              
179 132         150 undef $timer;
180              
181 132         340 my ($response_code, $meta, $response_value) = @$response;
182              
183 132 100 66     340 if ($self->{log_defer_object} && $meta->{ld}) {
184 2         11 $self->{log_defer_object}->merge($meta->{ld});
185             }
186              
187 132 100       362 if ($response_code eq 'ok') {
    50          
188 123         119 local $@ = undef;
189 123         286 $cb->($self, $response_value);
190             } elsif ($response_code eq 'er') {
191 9         28 $self->_throw_error($response_value);
192             } else {
193 0         0 die "Unrecognized response_code: $response_code";
194             }
195              
196 132         49984 delete $self->{timeout_timer};
197 132         170 delete $self->{cmd_handler};
198              
199 132         209 $self->_try_to_fill_requests;
200 134         875 };
201              
202 134         419 $self->{worker}->push_read( json => $self->{cmd_handler} );
203             }
204              
205             sub DESTROY {
206 124     124   2783 my ($self) = @_;
207              
208             $self->{client}->remove_pending_checkout($self)
209 124 50       505 if $self->{client};
210              
211 124 100       353 if (exists $self->{worker}) {
212 121         125 my $worker = $self->{worker};
213 121 50       364 delete $self->{client}->{workers_to_checkouts}->{0 + $worker} if $self->{client};
214 121         122 delete $self->{worker};
215              
216 121 100 66     542 if ($self->{fatal_error} || ($self->{error_occurred} && $self->{client} && !$self->{client}->{dont_refork_after_error})) {
      66        
      33        
217 7 50       38 $self->{client}->destroy_worker($worker) if $self->{client};
218 7 50       36 $self->{client}->populate_workers if $self->{client};
219             } else {
220 114         348 $worker->push_write( json => [ 'dn', {} ] );
221 114 50       6509 $self->{client}->make_worker_available($worker) if $self->{client};
222 114 50       344 $self->{client}->try_to_fill_pending_checkouts if $self->{client};
223             }
224             }
225              
226 124         4011 $self->{pending_requests} = $self->{current_cb} = $self->{timeout_timer} = $self->{cmd_handler} = undef;
227             }
228              
229              
230             1;