File Coverage

blib/lib/AnyEvent/Task/Client.pm
Criterion Covered Total %
statement 123 123 100.0
branch 29 32 90.6
condition 3 5 60.0
subroutine 18 18 100.0
pod 0 9 0.0
total 173 187 92.5


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Client;
2              
3 29     29   10931 use common::sense;
  29         35  
  29         126  
4              
5 29     29   1078 use Scalar::Util;
  29         33  
  29         810  
6              
7 29     29   88 use AnyEvent;
  29         27  
  29         430  
8 29     29   90 use AnyEvent::Util;
  29         38  
  29         1440  
9 29     29   18552 use AnyEvent::Handle;
  29         151787  
  29         952  
10 29     29   153 use AnyEvent::Socket;
  29         31  
  29         2378  
11              
12 29     29   10464 use AnyEvent::Task::Client::Checkout;
  29         46  
  29         26890  
13              
14              
15             sub new {
16 15     15 0 1254 my ($class, %arg) = @_;
17 15         124 my $self = {};
18 15         68 bless $self, $class;
19              
20 15   50     277 $self->{connect} = $arg{connect} || die "need connect";
21 15         88 $self->{name} = $arg{name};
22              
23 15 50       140 $self->{min_workers} = defined $arg{min_workers} ? $arg{min_workers} : 2;
24 15 100       83 $self->{max_workers} = defined $arg{max_workers} ? $arg{max_workers} : 20;
25 15 100       89 $self->{min_workers} = $self->{max_workers} if $self->{min_workers} > $self->{max_workers};
26 15 50       78 $self->{timeout} = $arg{timeout} if exists $arg{timeout};
27 15 100       80 $self->{max_checkouts} = $arg{max_checkouts} if exists $arg{max_checkouts};
28 15 100       44 $self->{dont_refork_after_error} = 1 if $arg{dont_refork_after_error};
29              
30 15         74 $self->{total_workers} = 0;
31 15         36 $self->{connecting_workers} = {};
32 15         43 $self->{available_workers} = {};
33 15         61 $self->{occupied_workers} = {};
34 15         74 $self->{workers_to_checkouts} = {}; # used to map errors detected on worker connection to checkout callbacks
35 15         44 $self->{worker_checkout_counts} = {}; # used for max_checkouts "memory leak protection"
36              
37 15         50 $self->{pending_checkouts} = [];
38              
39 15         147 $self->populate_workers;
40              
41 15         27083 return $self;
42             }
43              
44              
45              
46             sub populate_workers {
47 269     269 0 285 my ($self) = @_;
48              
49 269         467 Scalar::Util::weaken($self);
50              
51 269 100       731 return if $self->{total_workers} >= $self->{max_workers};
52              
53 150         218 my $workers_to_create = $self->{min_workers} - $self->{total_workers};
54 150 100       273 if ($workers_to_create <= 0) {
55 112         78 $workers_to_create = 0;
56 112 100 66     69 $workers_to_create = 1 unless keys %{$self->{available_workers}} || keys %{$self->{connecting_workers}};
  112         242  
  112         273  
57             }
58              
59 150         476 for (1 .. $workers_to_create) {
60 56         39217 $self->{total_workers}++;
61              
62 56         135 my $host = $self->{connect}->[0];
63 56         81 my $service = $self->{connect}->[1];
64              
65 56         77 my $worker_guard;
66             $self->{connecting_workers}->{0 + $worker_guard} = $worker_guard = tcp_connect $host, $service, sub {
67 51     51   24550 my $fh = shift;
68              
69 51         201 delete $self->{connecting_workers}->{0 + $worker_guard};
70              
71 51 100       172 if (!$fh) {
72 24         38 $self->{total_workers}--;
73 24         106 $self->install_populate_workers_timer;
74 24         581 return;
75             }
76              
77 27         139 delete $self->{populate_workers_timer};
78              
79 27         97 my $worker; $worker = new AnyEvent::Handle
80             fh => $fh,
81             on_read => sub { }, ## So we always have a read watcher and can instantly detect worker deaths
82             on_error => sub {
83 2         1106164 my ($worker, $fatal, $message) = @_;
84              
85 2         14 my $checkout = $self->{workers_to_checkouts}->{0 + $worker};
86 2 50       30 $checkout->throw_fatal_error('worker connection suddenly died') if $checkout;
87              
88 2         11 $self->destroy_worker($worker);
89 2         8 $self->populate_workers;
90 27         721 };
91              
92 27         2813 $self->{worker_checkout_counts}->{0 + $worker} = 0;
93              
94 27         251 $self->make_worker_available($worker);
95              
96 27         81 $self->try_to_fill_pending_checkouts;
97 56         798 };
98             }
99              
100             }
101              
102              
103             sub install_populate_workers_timer {
104 24     24 0 43 my ($self) = @_;
105              
106 24 100       74 return if exists $self->{populate_workers_timer};
107              
108             $self->{populate_workers_timer} = AE::timer 0.2, 1, sub {
109 12     12   2405418 $self->populate_workers;
110 15         226 };
111             }
112              
113              
114             sub try_to_fill_pending_checkouts {
115 392     392 0 313 my ($self) = @_;
116              
117 392 100       288 return unless @{$self->{pending_checkouts}};
  392         824  
118              
119 357 100       272 if (keys %{$self->{available_workers}}) {
  357         733  
120 124         98 my @available_workers = values %{$self->{available_workers}};
  124         247  
121 124         148 my $worker = shift @available_workers;
122 124         195 $self->make_worker_occupied($worker);
123              
124 124         86 my $checkout = shift @{$self->{pending_checkouts}};
  124         146  
125 124         225 $checkout->{worker} = $worker;
126              
127 124         173 $self->{workers_to_checkouts}->{0 + $worker} = $checkout;
128 124         319 Scalar::Util::weaken($self->{workers_to_checkouts}->{0 + $worker});
129              
130 124         291 $checkout->_try_to_fill_requests;
131 124         2881 return $self->try_to_fill_pending_checkouts;
132             }
133              
134 233         346 $self->populate_workers;
135             }
136              
137              
138              
139             sub make_worker_occupied {
140 124     124 0 98 my ($self, $worker) = @_;
141              
142 124         187 delete $self->{available_workers}->{0 + $worker};
143 124         146 $self->{occupied_workers}->{0 + $worker} = $worker;
144              
145 124         198 $self->{worker_checkout_counts}->{0 + $worker}++;
146             }
147              
148              
149             sub make_worker_available {
150 141     141 0 165 my ($self, $worker) = @_;
151              
152 141 100       279 if (exists $self->{max_checkouts}) {
153 8 100       24 if ($self->{worker_checkout_counts}->{0 + $worker} >= $self->{max_checkouts}) {
154 2         5 $self->destroy_worker($worker);
155 2         4 return;
156             }
157             }
158              
159             ## Cancel any push_read callbacks installed while worker was occupied
160 139         184 $worker->{_queue} = [];
161              
162 139         313 delete $self->{occupied_workers}->{0 + $worker};
163 139         336 $self->{available_workers}->{0 + $worker} = $worker;
164             }
165              
166              
167             sub destroy_worker {
168 11     11 0 20 my ($self, $worker) = @_;
169              
170 11         73 $worker->destroy;
171              
172 11         423 $self->{total_workers}--;
173 11         57 delete $self->{available_workers}->{0 + $worker};
174 11         27 delete $self->{occupied_workers}->{0 + $worker};
175 11         32 delete $self->{worker_checkout_counts}->{0 + $worker};
176             }
177              
178              
179             sub checkout {
180 127     127 0 111444 my ($self, @args) = @_;
181              
182 127         482 my $checkout = AnyEvent::Task::Client::Checkout->_new( client => $self, @args, );
183              
184 127         95 push @{$self->{pending_checkouts}}, $checkout;
  127         163  
185              
186 127         187 $self->try_to_fill_pending_checkouts;
187              
188 127         340 return $checkout;
189             }
190              
191             sub remove_pending_checkout {
192 127     127 0 134 my ($self, $checkout) = @_;
193              
194 127         121 my @out;
195              
196 127         99 $self->{pending_checkouts} = [ grep { $_ != $checkout } @{$self->{pending_checkouts}} ];
  4577         5079  
  127         355  
197             }
198              
199             1;