File Coverage

blib/lib/AnyEvent/Task/Client.pm
Criterion Covered Total %
statement 123 123 100.0
branch 29 32 90.6
condition 3 5 60.0
subroutine 18 18 100.0
pod 0 9 0.0
total 173 187 92.5


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client;
2              
3 29     29   12174 use common::sense;
  29         41  
  29         134  
4              
5 29     29   1175 use Scalar::Util;
  29         33  
  29         897  
6              
7 29     29   101 use AnyEvent;
  29         29  
  29         458  
8 29     29   87 use AnyEvent::Util;
  29         43  
  29         1580  
9 29     29   20384 use AnyEvent::Handle;
  29         168257  
  29         1012  
10 29     29   177 use AnyEvent::Socket;
  29         31  
  29         2669  
11              
12 29     29   12269 use AnyEvent::Task::Client::Checkout;
  29         45  
  29         28759  
13              
14              
15             sub new {
16 15     15 0 1344 my ($class, %arg) = @_;
17 15         100 my $self = {};
18 15         88 bless $self, $class;
19              
20 15   50     281 $self->{connect} = $arg{connect} || die "need connect";
21 15         89 $self->{name} = $arg{name};
22              
23 15 50       170 $self->{min_workers} = defined $arg{min_workers} ? $arg{min_workers} : 2;
24 15 100       105 $self->{max_workers} = defined $arg{max_workers} ? $arg{max_workers} : 20;
25 15 100       88 $self->{min_workers} = $self->{max_workers} if $self->{min_workers} > $self->{max_workers};
26 15 50       103 $self->{timeout} = $arg{timeout} if exists $arg{timeout};
27 15 100       70 $self->{max_checkouts} = $arg{max_checkouts} if exists $arg{max_checkouts};
28 15 100       60 $self->{dont_refork_after_error} = 1 if $arg{dont_refork_after_error};
29              
30 15         82 $self->{total_workers} = 0;
31 15         31 $self->{connecting_workers} = {};
32 15         43 $self->{available_workers} = {};
33 15         50 $self->{occupied_workers} = {};
34 15         72 $self->{workers_to_checkouts} = {}; # used to map errors detected on worker connection to checkout callbacks
35 15         44 $self->{worker_checkout_counts} = {}; # used for max_checkouts "memory leak protection"
36              
37 15         31 $self->{pending_checkouts} = [];
38              
39 15         161 $self->populate_workers;
40              
41 15         27314 return $self;
42             }
43              
44              
45              
46             sub populate_workers {
47 271     271 0 291 my ($self) = @_;
48              
49 271         435 Scalar::Util::weaken($self);
50              
51 271 100       784 return if $self->{total_workers} >= $self->{max_workers};
52              
53 152         223 my $workers_to_create = $self->{min_workers} - $self->{total_workers};
54 152 100       261 if ($workers_to_create <= 0) {
55 112         82 $workers_to_create = 0;
56 112 100 66     68 $workers_to_create = 1 unless keys %{$self->{available_workers}} || keys %{$self->{connecting_workers}};
  112         225  
  112         288  
57             }
58              
59 152         499 for (1 .. $workers_to_create) {
60 58         41610 $self->{total_workers}++;
61              
62 58         130 my $host = $self->{connect}->[0];
63 58         91 my $service = $self->{connect}->[1];
64              
65 58         62 my $worker_guard;
66             $self->{connecting_workers}->{0 + $worker_guard} = $worker_guard = tcp_connect $host, $service, sub {
67 51     51   26305 my $fh = shift;
68              
69 51         192 delete $self->{connecting_workers}->{0 + $worker_guard};
70              
71 51 100       173 if (!$fh) {
72 24         35 $self->{total_workers}--;
73 24         106 $self->install_populate_workers_timer;
74 24         575 return;
75             }
76              
77 27         136 delete $self->{populate_workers_timer};
78              
79 27         120 my $worker; $worker = new AnyEvent::Handle
80             fh => $fh,
81             on_read => sub { }, ## So we always have a read watcher and can instantly detect worker deaths
82             on_error => sub {
83 2         1105795 my ($worker, $fatal, $message) = @_;
84              
85 2         32 my $checkout = $self->{workers_to_checkouts}->{0 + $worker};
86              
87 2         11 $checkout->{timeout_timer} = undef; ## timer keeps a circular reference
88              
89 2 50       59 $checkout->throw_fatal_error('worker connection suddenly died') if $checkout;
90              
91 2         9 $self->destroy_worker($worker);
92 2         6 $self->populate_workers;
93 27         717 };
94              
95 27         3093 $self->{worker_checkout_counts}->{0 + $worker} = 0;
96              
97 27         260 $self->make_worker_available($worker);
98              
99 27         88 $self->try_to_fill_pending_checkouts;
100 58         762 };
101             }
102              
103             }
104              
105              
106             sub install_populate_workers_timer {
107 24     24 0 43 my ($self) = @_;
108              
109 24 100       75 return if exists $self->{populate_workers_timer};
110              
111             $self->{populate_workers_timer} = AE::timer 0.2, 1, sub {
112 12     12   2405252 $self->populate_workers;
113 15         273 };
114             }
115              
116              
117             sub try_to_fill_pending_checkouts {
118 392     392 0 366 my ($self) = @_;
119              
120 392 100       302 return unless @{$self->{pending_checkouts}};
  392         940  
121              
122 357 100       245 if (keys %{$self->{available_workers}}) {
  357         761  
123 124         116 my @available_workers = values %{$self->{available_workers}};
  124         248  
124 124         129 my $worker = shift @available_workers;
125 124         223 $self->make_worker_occupied($worker);
126              
127 124         89 my $checkout = shift @{$self->{pending_checkouts}};
  124         143  
128 124         218 $checkout->{worker} = $worker;
129              
130 124         197 $self->{workers_to_checkouts}->{0 + $worker} = $checkout;
131 124         327 Scalar::Util::weaken($self->{workers_to_checkouts}->{0 + $worker});
132              
133 124         291 $checkout->_try_to_fill_requests;
134 124         2961 return $self->try_to_fill_pending_checkouts;
135             }
136              
137 233         334 $self->populate_workers;
138             }
139              
140              
141              
142             sub make_worker_occupied {
143 124     124 0 103 my ($self, $worker) = @_;
144              
145 124         206 delete $self->{available_workers}->{0 + $worker};
146 124         173 $self->{occupied_workers}->{0 + $worker} = $worker;
147              
148 124         191 $self->{worker_checkout_counts}->{0 + $worker}++;
149             }
150              
151              
152             sub make_worker_available {
153 141     141 0 187 my ($self, $worker) = @_;
154              
155 141 100       276 if (exists $self->{max_checkouts}) {
156 8 100       31 if ($self->{worker_checkout_counts}->{0 + $worker} >= $self->{max_checkouts}) {
157 2         7 $self->destroy_worker($worker);
158 2         5 return;
159             }
160             }
161              
162             ## Cancel any push_read callbacks installed while worker was occupied
163 139         200 $worker->{_queue} = [];
164              
165 139         326 delete $self->{occupied_workers}->{0 + $worker};
166 139         334 $self->{available_workers}->{0 + $worker} = $worker;
167             }
168              
169              
170             sub destroy_worker {
171 13     13 0 23 my ($self, $worker) = @_;
172              
173 13         75 $worker->destroy;
174              
175 13         439 $self->{total_workers}--;
176 13         67 delete $self->{available_workers}->{0 + $worker};
177 13         29 delete $self->{occupied_workers}->{0 + $worker};
178 13         41 delete $self->{worker_checkout_counts}->{0 + $worker};
179             }
180              
181              
182             sub checkout {
183 127     127 0 107285 my ($self, @args) = @_;
184              
185 127         485 my $checkout = AnyEvent::Task::Client::Checkout->_new( client => $self, @args, );
186              
187 127         102 push @{$self->{pending_checkouts}}, $checkout;
  127         232  
188              
189 127         190 $self->try_to_fill_pending_checkouts;
190              
191 127         333 return $checkout;
192             }
193              
194             sub remove_pending_checkout {
195 129     129 0 139 my ($self, $checkout) = @_;
196              
197 129         105 $self->{pending_checkouts} = [ grep { $_ != $checkout } @{$self->{pending_checkouts}} ];
  4577         5453  
  129         285  
198             }
199              
200             1;