File Coverage

blib/lib/AnyEvent/Task/Client.pm
Criterion Covered Total %
statement 123 123 100.0
branch 29 32 90.6
condition 3 5 60.0
subroutine 18 18 100.0
pod 0 9 0.0
total 173 187 92.5


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client;
2              
3 29     29   11575 use common::sense;
  29         37  
  29         145  
4              
5 29     29   1133 use Scalar::Util;
  29         27  
  29         837  
6              
7 29     29   91 use AnyEvent;
  29         27  
  29         478  
8 29     29   92 use AnyEvent::Util;
  29         36  
  29         1527  
9 29     29   19239 use AnyEvent::Handle;
  29         156296  
  29         1034  
10 29     29   196 use AnyEvent::Socket;
  29         33  
  29         2787  
11              
12 29     29   12591 use AnyEvent::Task::Client::Checkout;
  29         44  
  29         27905  
13              
14              
15             sub new {
16 15     15 0 1436 my ($class, %arg) = @_;
17 15         95 my $self = {};
18 15         84 bless $self, $class;
19              
20 15   50     318 $self->{connect} = $arg{connect} || die "need connect";
21 15         94 $self->{name} = $arg{name};
22              
23 15 50       163 $self->{min_workers} = defined $arg{min_workers} ? $arg{min_workers} : 2;
24 15 100       88 $self->{max_workers} = defined $arg{max_workers} ? $arg{max_workers} : 20;
25 15 100       79 $self->{min_workers} = $self->{max_workers} if $self->{min_workers} > $self->{max_workers};
26 15 50       102 $self->{timeout} = $arg{timeout} if exists $arg{timeout};
27 15 100       66 $self->{max_checkouts} = $arg{max_checkouts} if exists $arg{max_checkouts};
28 15 100       79 $self->{dont_refork_after_error} = 1 if $arg{dont_refork_after_error};
29              
30 15         84 $self->{total_workers} = 0;
31 15         67 $self->{connecting_workers} = {};
32 15         37 $self->{available_workers} = {};
33 15         42 $self->{occupied_workers} = {};
34 15         74 $self->{workers_to_checkouts} = {}; # used to map errors detected on worker connection to checkout callbacks
35 15         39 $self->{worker_checkout_counts} = {}; # used for max_checkouts "memory leak protection"
36              
37 15         38 $self->{pending_checkouts} = [];
38              
39 15         149 $self->populate_workers;
40              
41 15         29085 return $self;
42             }
43              
44              
45              
46             sub populate_workers {
47 270     270 0 304 my ($self) = @_;
48              
49 270         451 Scalar::Util::weaken($self);
50              
51 270 100       781 return if $self->{total_workers} >= $self->{max_workers};
52              
53 151         227 my $workers_to_create = $self->{min_workers} - $self->{total_workers};
54 151 100       381 if ($workers_to_create <= 0) {
55 112         83 $workers_to_create = 0;
56 112 100 66     72 $workers_to_create = 1 unless keys %{$self->{available_workers}} || keys %{$self->{connecting_workers}};
  112         234  
  112         308  
57             }
58              
59 151         489 for (1 .. $workers_to_create) {
60 56         40878 $self->{total_workers}++;
61              
62 56         135 my $host = $self->{connect}->[0];
63 56         90 my $service = $self->{connect}->[1];
64              
65 56         64 my $worker_guard;
66             $self->{connecting_workers}->{0 + $worker_guard} = $worker_guard = tcp_connect $host, $service, sub {
67 49     49   23880 my $fh = shift;
68              
69 49         194 delete $self->{connecting_workers}->{0 + $worker_guard};
70              
71 49 100       145 if (!$fh) {
72 23         47 $self->{total_workers}--;
73 23         141 $self->install_populate_workers_timer;
74 23         675 return;
75             }
76              
77 26         128 delete $self->{populate_workers_timer};
78              
79 26         100 my $worker; $worker = new AnyEvent::Handle
80             fh => $fh,
81             on_read => sub { }, ## So we always have a read watcher and can instantly detect worker deaths
82             on_error => sub {
83 2         1105749 my ($worker, $fatal, $message) = @_;
84              
85 2         13 my $checkout = $self->{workers_to_checkouts}->{0 + $worker};
86              
87 2         9 $checkout->{timeout_timer} = undef; ## timer keeps a circular reference
88              
89 2 50       47 $checkout->throw_fatal_error('worker connection suddenly died') if $checkout;
90              
91 2         11 $self->destroy_worker($worker);
92 2         8 $self->populate_workers;
93 26         609 };
94              
95 26         2797 $self->{worker_checkout_counts}->{0 + $worker} = 0;
96              
97 26         252 $self->make_worker_available($worker);
98              
99 26         69 $self->try_to_fill_pending_checkouts;
100 56         744 };
101             }
102              
103             }
104              
105              
106             sub install_populate_workers_timer {
107 23     23 0 33 my ($self) = @_;
108              
109 23 100       65 return if exists $self->{populate_workers_timer};
110              
111             $self->{populate_workers_timer} = AE::timer 0.2, 1, sub {
112 11     11   2204510 $self->populate_workers;
113 15         250 };
114             }
115              
116              
117             sub try_to_fill_pending_checkouts {
118 391     391 0 365 my ($self) = @_;
119              
120 391 100       302 return unless @{$self->{pending_checkouts}};
  391         896  
121              
122 357 100       241 if (keys %{$self->{available_workers}}) {
  357         687  
123 124         109 my @available_workers = values %{$self->{available_workers}};
  124         229  
124 124         129 my $worker = shift @available_workers;
125 124         198 $self->make_worker_occupied($worker);
126              
127 124         87 my $checkout = shift @{$self->{pending_checkouts}};
  124         154  
128 124         204 $checkout->{worker} = $worker;
129              
130 124         161 $self->{workers_to_checkouts}->{0 + $worker} = $checkout;
131 124         325 Scalar::Util::weaken($self->{workers_to_checkouts}->{0 + $worker});
132              
133 124         301 $checkout->_try_to_fill_requests;
134 124         2776 return $self->try_to_fill_pending_checkouts;
135             }
136              
137 233         322 $self->populate_workers;
138             }
139              
140              
141              
142             sub make_worker_occupied {
143 124     124 0 108 my ($self, $worker) = @_;
144              
145 124         178 delete $self->{available_workers}->{0 + $worker};
146 124         164 $self->{occupied_workers}->{0 + $worker} = $worker;
147              
148 124         192 $self->{worker_checkout_counts}->{0 + $worker}++;
149             }
150              
151              
152             sub make_worker_available {
153 140     140 0 174 my ($self, $worker) = @_;
154              
155 140 100       269 if (exists $self->{max_checkouts}) {
156 8 100       33 if ($self->{worker_checkout_counts}->{0 + $worker} >= $self->{max_checkouts}) {
157 2         7 $self->destroy_worker($worker);
158 2         4 return;
159             }
160             }
161              
162             ## Cancel any push_read callbacks installed while worker was occupied
163 138         186 $worker->{_queue} = [];
164              
165 138         323 delete $self->{occupied_workers}->{0 + $worker};
166 138         321 $self->{available_workers}->{0 + $worker} = $worker;
167             }
168              
169              
170             sub destroy_worker {
171 13     13 0 24 my ($self, $worker) = @_;
172              
173 13         90 $worker->destroy;
174              
175 13         448 $self->{total_workers}--;
176 13         64 delete $self->{available_workers}->{0 + $worker};
177 13         30 delete $self->{occupied_workers}->{0 + $worker};
178 13         35 delete $self->{worker_checkout_counts}->{0 + $worker};
179             }
180              
181              
182             sub checkout {
183 127     127 0 108897 my ($self, @args) = @_;
184              
185 127         497 my $checkout = AnyEvent::Task::Client::Checkout->_new( client => $self, @args, );
186              
187 127         106 push @{$self->{pending_checkouts}}, $checkout;
  127         196  
188              
189 127         211 $self->try_to_fill_pending_checkouts;
190              
191 127         356 return $checkout;
192             }
193              
194             sub remove_pending_checkout {
195 129     129 0 128 my ($self, $checkout) = @_;
196              
197 129         120 $self->{pending_checkouts} = [ grep { $_ != $checkout } @{$self->{pending_checkouts}} ];
  4577         5103  
  129         271  
198             }
199              
200             1;