line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package MojoX::HTTP::Async; |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
=encoding utf-8 |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 NAME |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
MojoX::HTTP::Async - simple package to execute multiple parallel requests to the same host |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
=head1 SYNOPSIS |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
use MojoX::HTTP::Async (); |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
# creates new instance for async requests |
14
|
|
|
|
|
|
|
# restricts max amount of simultaneously executed requests |
15
|
|
|
|
|
|
|
my $ua = MojoX::HTTP::Async->new('host' => 'my-site.com', 'slots' => 2); |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
# let's fill slots |
18
|
|
|
|
|
|
|
$ua->add( '/page1.html?lang=en'); |
19
|
|
|
|
|
|
|
$ua->add( 'http://my-site.com/page2.html'); |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
# non-blocking requests processing |
22
|
|
|
|
|
|
|
while ( $ua->not_empty() ) { |
23
|
|
|
|
|
|
|
if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class |
24
|
|
|
|
|
|
|
print $tx->res->headers->to_string; |
25
|
|
|
|
|
|
|
} else { |
26
|
|
|
|
|
|
|
# do something else |
27
|
|
|
|
|
|
|
} |
28
|
|
|
|
|
|
|
} |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
# blocking requests processing |
31
|
|
|
|
|
|
|
while (my $tx = $ua->wait_for_next_response($timeout)) { |
32
|
|
|
|
|
|
|
# do something here |
33
|
|
|
|
|
|
|
} |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
# how to process connect timeouts |
36
|
|
|
|
|
|
|
if (my $error = $tx->req()->error()) { |
37
|
|
|
|
|
|
|
say $error->{code}; |
38
|
|
|
|
|
|
|
say $error->{message}; |
39
|
|
|
|
|
|
|
} |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
# how to process request timeouts and other errors sucn as broken pipes, etc |
42
|
|
|
|
|
|
|
if (my $error = $tx->res()->error()) { |
43
|
|
|
|
|
|
|
say $error->{code}; |
44
|
|
|
|
|
|
|
say $error->{message}; |
45
|
|
|
|
|
|
|
} |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
# makes reconnection if either slot was timeouted or was inactive too long |
48
|
|
|
|
|
|
|
$ua->refresh_connections(); |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
# close everything |
51
|
|
|
|
|
|
|
$ua->close_all(); |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=head1 DESCRIPTION |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode. |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
In comparison with C, this library doesn't make a new connection on each request. |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
And in comparison with C, it's it's more intuitive how to use it, and there is no any Singleton restrictions. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
The instance of this class can work only with one domain and scheme: either HTTP or HTTPS. |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
=cut |
64
|
|
|
|
|
|
|
|
65
|
9
|
|
|
9
|
|
1521369
|
use 5.026; |
|
9
|
|
|
|
|
36
|
|
66
|
9
|
|
|
9
|
|
63
|
use warnings; |
|
9
|
|
|
|
|
9
|
|
|
9
|
|
|
|
|
234
|
|
67
|
9
|
|
|
9
|
|
45
|
use bytes (); |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
189
|
|
68
|
9
|
|
|
9
|
|
36
|
use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
945
|
|
69
|
9
|
|
|
9
|
|
7380
|
use IO::Socket::SSL (); |
|
9
|
|
|
|
|
727389
|
|
|
9
|
|
|
|
|
468
|
|
70
|
9
|
|
|
9
|
|
81
|
use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC /; |
|
9
|
|
|
|
|
27
|
|
|
9
|
|
|
|
|
801
|
|
71
|
9
|
|
|
9
|
|
81
|
use experimental qw/ signatures /; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
99
|
|
72
|
9
|
|
|
9
|
|
1737
|
use Carp qw/ croak /; |
|
9
|
|
|
|
|
27
|
|
|
9
|
|
|
|
|
531
|
|
73
|
9
|
|
|
9
|
|
54
|
use List::Util qw/ first /; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
1071
|
|
74
|
9
|
|
|
9
|
|
108
|
use Time::HiRes qw/ alarm time sleep /; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
108
|
|
75
|
9
|
|
|
9
|
|
8550
|
use Mojo::Message::Request (); |
|
9
|
|
|
|
|
2993499
|
|
|
9
|
|
|
|
|
333
|
|
76
|
9
|
|
|
9
|
|
6867
|
use Mojo::Message::Response (); |
|
9
|
|
|
|
|
85509
|
|
|
9
|
|
|
|
|
306
|
|
77
|
9
|
|
|
9
|
|
6318
|
use Mojo::Transaction::HTTP (); |
|
9
|
|
|
|
|
31896
|
|
|
9
|
|
|
|
|
828
|
|
78
|
9
|
|
|
9
|
|
6246
|
use URI (); |
|
9
|
|
|
|
|
52002
|
|
|
9
|
|
|
|
|
333
|
|
79
|
9
|
|
|
9
|
|
108
|
use Scalar::Util qw/ blessed /; |
|
9
|
|
|
|
|
45
|
|
|
9
|
|
|
|
|
675
|
|
80
|
9
|
|
|
9
|
|
81
|
use Errno qw / :POSIX /; |
|
9
|
|
|
|
|
27
|
|
|
9
|
|
|
|
|
48834
|
|
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
our $VERSION = 0.02; |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
=head2 new($class, %opts) |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
The class constructor. |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=over |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=item host |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
It's the obligatory option. |
93
|
|
|
|
|
|
|
Sets the name/adress of remote host to be requested. |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
=item port |
96
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
By default it's equal to 80. |
98
|
|
|
|
|
|
|
Sets the port number of remote point. |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=item slots |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
By default it's equal to 5. |
103
|
|
|
|
|
|
|
Sets the max amount slots. |
104
|
|
|
|
|
|
|
These slot will be filled one by one if required. |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item ssl |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
By default it's equal to 0 (means HTTP). |
109
|
|
|
|
|
|
|
Sets the scheme of requests: HTTP or HTTPS. |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
=item ssl_opts |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
It's a HashRef with options to control SSL Layer. |
114
|
|
|
|
|
|
|
See the constructor arguments of C for details. |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
=item connect_timeout |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
By default it's equal to 1. |
119
|
|
|
|
|
|
|
Sets connection timeout in seconds (can be float with micro seconds accuracy). |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
If it's equal to 0, then there will be no timeout restrictions. |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
=item request_timeout |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
By default it's equal to 1 |
126
|
|
|
|
|
|
|
Sets the time in seconds with granular accuracy as micro seconds. |
127
|
|
|
|
|
|
|
The awaiting time of response will be limited with this value. |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
In case of 0 value there will be no time restrictions. |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
=item sol_socket |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
It's a HashRef with socket options. |
134
|
|
|
|
|
|
|
THe possible keys are: |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
=item B |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
Enables TCP KeepAlive on socket. |
139
|
|
|
|
|
|
|
The default value is 1 (means that option is enabled). |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=item B |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
WARNING: These options can be unsupported on some OS platforms. |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
It's a HashRef with socket TCP-options. |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
If some key is absent in HashRef then system settings will be used. |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
The supported key are shown below: |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
B - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
B - the time (in seconds) between individual keepalive probes |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
B - the maximum number of keepalive probes TCP should send before dropping the connection. |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
=item inactivity_conn_ts |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
If last response was received C seconds or more ago, |
160
|
|
|
|
|
|
|
then such slots will be destroyed in C method. |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
By default the value is 0 (disabled). |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
=item debug |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
Enables debug mode. The dbug messages will be printed in STDERR. |
167
|
|
|
|
|
|
|
By default the value is 0 (disabled). |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
=back |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=cut |
172
|
|
|
|
|
|
|
|
173
|
1
|
|
|
1
|
1
|
8815
|
sub new ($class, %opts) { |
|
1
|
|
|
|
|
11
|
|
|
1
|
|
|
|
|
51
|
|
|
1
|
|
|
|
|
11
|
|
174
|
1
|
50
|
|
|
|
28
|
croak("host is mandatory") if (! $opts{'host'}); |
175
|
|
|
|
|
|
|
my $self = bless({ |
176
|
|
|
|
|
|
|
'slots' => 5, |
177
|
|
|
|
|
|
|
'ssl' => 0, |
178
|
|
|
|
|
|
|
'ssl_opts' => undef, |
179
|
1
|
50
|
|
|
|
39
|
'port' => $opts{'ssl'} ? 443 : 80, |
180
|
|
|
|
|
|
|
'request_timeout' => 1, # 1 sec |
181
|
|
|
|
|
|
|
'connect_timeout' => 1, # 1 sec |
182
|
|
|
|
|
|
|
'sol_socket' => { |
183
|
|
|
|
|
|
|
'so_keepalive' => 1, |
184
|
|
|
|
|
|
|
}, |
185
|
|
|
|
|
|
|
'sol_tcp' => {}, |
186
|
|
|
|
|
|
|
'inactivity_conn_ts' => 0, |
187
|
|
|
|
|
|
|
%opts, |
188
|
|
|
|
|
|
|
'_conns' => [], |
189
|
|
|
|
|
|
|
}, $class); |
190
|
1
|
|
|
|
|
19
|
return $self; |
191
|
|
|
|
|
|
|
} |
192
|
|
|
|
|
|
|
|
193
|
8
|
|
|
8
|
|
18
|
sub _connect ($self, $slot, $proto, $peer_addr) { |
|
8
|
|
|
|
|
21
|
|
|
8
|
|
|
|
|
11
|
|
|
8
|
|
|
|
|
29
|
|
|
8
|
|
|
|
|
19
|
|
|
8
|
|
|
|
|
12
|
|
194
|
|
|
|
|
|
|
|
195
|
8
|
50
|
|
|
|
49
|
warn("Connecting\n") if $self->{'debug'}; |
196
|
|
|
|
|
|
|
|
197
|
8
|
50
|
|
|
|
415
|
socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!"); |
198
|
8
|
50
|
|
|
|
8912
|
connect($socket, $peer_addr) || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS |
199
|
8
|
50
|
|
|
|
98
|
fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!"); |
200
|
|
|
|
|
|
|
|
201
|
8
|
|
50
|
|
|
58
|
my $sol_socket_opts = $self->{'sol_socket'} // {}; |
202
|
|
|
|
|
|
|
|
203
|
8
|
50
|
|
|
|
33
|
if (exists($sol_socket_opts->{'so_keepalive'})) { |
204
|
0
|
0
|
|
|
|
0
|
setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!"); |
205
|
|
|
|
|
|
|
|
206
|
0
|
0
|
|
|
|
0
|
if ($sol_socket_opts->{'so_keepalive'}) { |
207
|
0
|
|
0
|
|
|
0
|
my $sol_tcp_opts = $self->{'sol_tcp'} // {}; |
208
|
0
|
|
|
|
|
0
|
state $SOL_TCP = &IPPROTO_TCP(); |
209
|
|
|
|
|
|
|
|
210
|
0
|
0
|
|
|
|
0
|
if (exists($sol_tcp_opts->{'tcp_keepidle'})) { |
211
|
0
|
0
|
|
|
|
0
|
setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!"); |
212
|
|
|
|
|
|
|
} |
213
|
|
|
|
|
|
|
|
214
|
0
|
0
|
|
|
|
0
|
if (exists($sol_tcp_opts->{'tcp_keepintvl'})) { |
215
|
0
|
0
|
|
|
|
0
|
setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!"); |
216
|
|
|
|
|
|
|
} |
217
|
|
|
|
|
|
|
|
218
|
0
|
0
|
|
|
|
0
|
if (exists($sol_tcp_opts->{'tcp_keepcnt'})) { |
219
|
0
|
0
|
|
|
|
0
|
setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!"); |
220
|
|
|
|
|
|
|
} |
221
|
|
|
|
|
|
|
} |
222
|
|
|
|
|
|
|
} |
223
|
|
|
|
|
|
|
|
224
|
8
|
|
|
|
|
45
|
$slot->{'connected_ts'} = time(); |
225
|
8
|
|
|
|
|
21
|
$slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket; |
226
|
8
|
|
|
|
|
26
|
$slot->{'sock_no'} = fileno($socket); |
227
|
8
|
50
|
|
|
|
41
|
if ($self->{'ssl'}) { |
228
|
0
|
|
0
|
|
|
0
|
my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*); |
229
|
0
|
0
|
|
|
|
0
|
croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket); |
230
|
0
|
|
|
|
|
0
|
$ssl_socket->blocking(0); # just to be sure |
231
|
0
|
|
|
|
|
0
|
$slot->{'reader'} = $slot->{'writer'} = $ssl_socket; |
232
|
|
|
|
|
|
|
} |
233
|
|
|
|
|
|
|
} |
234
|
|
|
|
|
|
|
|
235
|
8
|
|
|
8
|
|
12
|
sub _connect_slot ($self, $slot) { |
|
8
|
|
|
|
|
24
|
|
|
8
|
|
|
|
|
15
|
|
|
8
|
|
|
|
|
8
|
|
236
|
8
|
|
|
|
|
25
|
my $timeout = $self->{'connect_timeout'}; |
237
|
|
|
|
|
|
|
|
238
|
8
|
50
|
|
|
|
34
|
if ($timeout > 0) { |
239
|
8
|
|
|
|
|
29
|
eval { |
240
|
8
|
|
|
0
|
|
250
|
local $SIG{'ALRM'} = sub { die "alarm\n" }; |
|
0
|
|
|
|
|
0
|
|
241
|
8
|
|
|
|
|
99
|
alarm($timeout); |
242
|
8
|
|
|
|
|
22
|
$self->_connect($slot, @{$self}{qw/ proto peer_addr /}); |
|
8
|
|
|
|
|
44
|
|
243
|
8
|
|
|
|
|
185
|
alarm(0); |
244
|
|
|
|
|
|
|
}; |
245
|
|
|
|
|
|
|
|
246
|
8
|
|
|
|
|
33
|
my $error = $@; |
247
|
|
|
|
|
|
|
|
248
|
8
|
|
|
|
|
47
|
alarm(0); |
249
|
|
|
|
|
|
|
|
250
|
8
|
50
|
|
|
|
33
|
if ($error) { |
251
|
0
|
0
|
|
|
|
0
|
croak($error) if ($error ne "alarm\n"); |
252
|
0
|
|
|
|
|
0
|
$self->_mark_request_as_timeouted($slot, 'Connect timeout'); |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
} else { |
255
|
0
|
|
|
|
|
0
|
$self->_connect($slot, @{$self}{qw/ proto peer_addr /}); |
|
0
|
|
|
|
|
0
|
|
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
} |
258
|
|
|
|
|
|
|
|
259
|
6
|
|
|
6
|
|
15
|
sub _make_connections ($self, $amount) { |
|
6
|
|
|
|
|
18
|
|
|
6
|
|
|
|
|
17
|
|
|
6
|
|
|
|
|
9
|
|
260
|
|
|
|
|
|
|
|
261
|
6
|
|
|
|
|
465
|
my $host_addr = inet_aton($self->{'host'}); |
262
|
6
|
50
|
|
|
|
61
|
croak("can't call inet_aton") if (! $host_addr); |
263
|
|
|
|
|
|
|
|
264
|
6
|
|
66
|
|
|
75
|
$self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr); |
265
|
6
|
|
66
|
|
|
92
|
$self->{'proto'} //= getprotobyname("tcp"); |
266
|
|
|
|
|
|
|
|
267
|
6
|
|
|
|
|
28
|
for (1 .. $amount) { |
268
|
6
|
|
|
|
|
30
|
my $slot = $self->_make_slot(); |
269
|
6
|
|
|
|
|
26
|
$self->_connect_slot($slot); |
270
|
6
|
|
|
|
|
31
|
$self->_add_slot($slot); |
271
|
|
|
|
|
|
|
} |
272
|
|
|
|
|
|
|
} |
273
|
|
|
|
|
|
|
|
274
|
6
|
|
|
6
|
|
11
|
sub _add_slot ($self, $slot) { |
|
6
|
|
|
|
|
9
|
|
|
6
|
|
|
|
|
9
|
|
|
6
|
|
|
|
|
6
|
|
275
|
6
|
50
|
|
|
|
45
|
push($self->{'_conns'}->@*, $slot) if ($slot); |
276
|
|
|
|
|
|
|
} |
277
|
|
|
|
|
|
|
|
278
|
6
|
|
|
6
|
|
11
|
sub _make_slot ($self) { |
|
6
|
|
|
|
|
14
|
|
|
6
|
|
|
|
|
11
|
|
279
|
|
|
|
|
|
|
return { |
280
|
6
|
|
|
|
|
121
|
'reader' => undef, |
281
|
|
|
|
|
|
|
'writer' => undef, |
282
|
|
|
|
|
|
|
'socket' => undef, |
283
|
|
|
|
|
|
|
'sock_no' => 0, |
284
|
|
|
|
|
|
|
'is_busy' => 0, |
285
|
|
|
|
|
|
|
'request' => undef, |
286
|
|
|
|
|
|
|
'tx' => undef, |
287
|
|
|
|
|
|
|
'exp_ts' => 0, |
288
|
|
|
|
|
|
|
'tmp_response' => undef, |
289
|
|
|
|
|
|
|
'reconnect_is_required' => 0, |
290
|
|
|
|
|
|
|
'last_response_ts' => 0, |
291
|
|
|
|
|
|
|
'connected_ts' => 0, |
292
|
|
|
|
|
|
|
}; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
14311
|
|
|
14311
|
|
15761
|
sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') { |
|
14311
|
|
|
|
|
18578
|
|
|
14311
|
|
|
|
|
16956
|
|
|
14311
|
|
|
|
|
15443
|
|
|
14311
|
|
|
|
|
31409
|
|
|
14311
|
|
|
|
|
16427
|
|
296
|
|
|
|
|
|
|
|
297
|
14311
|
|
|
|
|
16500
|
my $message = $reason; |
298
|
|
|
|
|
|
|
|
299
|
14311
|
50
|
|
|
|
25847
|
if (!$message) { |
300
|
14311
|
50
|
33
|
|
|
70433
|
$message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error'; |
301
|
|
|
|
|
|
|
} |
302
|
|
|
|
|
|
|
|
303
|
14311
|
|
|
|
|
510888
|
for my $slot_no (keys %$socks2slots) { |
304
|
18929
|
50
|
|
|
|
44039
|
if ( vec($error_handles, $slot_no, 1) != 0 ) { |
305
|
0
|
|
|
|
|
0
|
my $slot = $socks2slots->{ $slot_no }; |
306
|
0
|
|
|
|
|
0
|
$self->_mark_response_as_broken($slot, 520, $message); |
307
|
|
|
|
|
|
|
} |
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
} |
310
|
|
|
|
|
|
|
|
311
|
16
|
|
|
16
|
|
30
|
sub _get_free_slot ($self) { |
|
16
|
|
|
|
|
32
|
|
|
16
|
|
|
|
|
26
|
|
312
|
|
|
|
|
|
|
|
313
|
16
|
|
|
|
|
27
|
my $slot; |
314
|
10
|
|
|
|
|
58
|
my %socks2slots = map { $_->{'sock_no'} => $_ } |
315
|
20
|
50
|
66
|
|
|
175
|
grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} } |
316
|
16
|
|
|
|
|
66
|
$self->{'_conns'}->@*; |
317
|
|
|
|
|
|
|
|
318
|
16
|
100
|
|
|
|
52
|
if (%socks2slots) { |
319
|
|
|
|
|
|
|
|
320
|
8
|
|
|
|
|
77
|
local $!; |
321
|
8
|
|
|
|
|
16
|
my $write_handles = ''; |
322
|
|
|
|
|
|
|
|
323
|
8
|
|
|
|
|
65
|
vec($write_handles, $_, 1) = 1 for keys %socks2slots; |
324
|
|
|
|
|
|
|
|
325
|
8
|
|
|
|
|
77
|
my $error_handles = $write_handles; |
326
|
8
|
|
|
|
|
95
|
my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0); |
327
|
|
|
|
|
|
|
|
328
|
8
|
|
|
|
|
43
|
$self->_check_for_errors(\%socks2slots, $error_handles, $!); |
329
|
|
|
|
|
|
|
|
330
|
8
|
50
|
|
|
|
18
|
if ($nfound) { |
331
|
8
|
|
|
8
|
|
76
|
my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots; |
|
8
|
|
|
|
|
26
|
|
332
|
8
|
|
|
|
|
89
|
$slot = $socks2slots{ $slot_no }; |
333
|
|
|
|
|
|
|
} |
334
|
|
|
|
|
|
|
} |
335
|
|
|
|
|
|
|
|
336
|
16
|
|
|
|
|
45
|
return $slot; |
337
|
|
|
|
|
|
|
} |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
=head2 add ($self, $request_or_uri, $timeout = undef) |
340
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
Adss HTTP request into empty slot. |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
If the request was successfully added, then it will return 1. |
344
|
|
|
|
|
|
|
Otherwise it will return 0. |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to |
347
|
|
|
|
|
|
|
the limit of slot's amount had been reached (see C and C. |
348
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
It's recommendable always to check result code of this method. |
350
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
Example: |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1); |
354
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
# let's occupy the only slot |
356
|
|
|
|
|
|
|
$ua->add('/page1.html'); |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
# let's wait until it's become free again |
359
|
|
|
|
|
|
|
while ( ! $ua->add('/page2.html') ) { |
360
|
|
|
|
|
|
|
while (my $tx = $ua->wait_for_next_response() ) { |
361
|
|
|
|
|
|
|
# do something here |
362
|
|
|
|
|
|
|
} |
363
|
|
|
|
|
|
|
} |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
=over |
366
|
|
|
|
|
|
|
|
367
|
|
|
|
|
|
|
=item $request_or_uri |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
It can be either an instance of C class, or an instance of C. |
370
|
|
|
|
|
|
|
It also can be a simple URI srtring. |
371
|
|
|
|
|
|
|
|
372
|
|
|
|
|
|
|
If the resourse contains the host, then it must be the same as in the constructor C. |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
Using of string with URI or an instance of C class assumes that GET HTTP method will be used. |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
=item $timeout |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
Time in seconds. Can be fractional with microseconds tolerance. |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
The C from conmtrucor will be used by default. |
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
=back |
383
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
=cut |
385
|
|
|
|
|
|
|
|
386
|
10
|
|
|
10
|
1
|
800
|
sub add ($self, $request_or_uri, $timeout = undef) { |
|
10
|
|
|
|
|
29
|
|
|
10
|
|
|
|
|
25
|
|
|
10
|
|
|
|
|
19
|
|
|
10
|
|
|
|
|
21
|
|
387
|
10
|
|
|
|
|
24
|
my $status = 0; |
388
|
10
|
|
|
|
|
45
|
my $slot = $self->_get_free_slot(); |
389
|
|
|
|
|
|
|
|
390
|
10
|
100
|
100
|
|
|
90
|
if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) { |
391
|
6
|
|
|
|
|
64
|
$self->_make_connections(1); |
392
|
6
|
|
|
|
|
16
|
$slot = $self->_get_free_slot(); |
393
|
|
|
|
|
|
|
} |
394
|
|
|
|
|
|
|
|
395
|
10
|
100
|
|
|
|
33
|
if ($slot) { |
396
|
8
|
|
|
|
|
15
|
my $request = $request_or_uri; |
397
|
8
|
50
|
0
|
|
|
41
|
if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) { |
|
|
|
33
|
|
|
|
|
398
|
8
|
|
|
|
|
170
|
$request = Mojo::Message::Request->new(); |
399
|
8
|
|
|
|
|
105
|
$request->url()->parse($request_or_uri); |
400
|
|
|
|
|
|
|
} |
401
|
8
|
50
|
|
|
|
4173
|
if ($request) { |
402
|
8
|
|
|
|
|
51
|
$self->_send_request($slot, $request, $timeout); |
403
|
8
|
|
|
|
|
44
|
$status = 1; |
404
|
|
|
|
|
|
|
} |
405
|
|
|
|
|
|
|
} |
406
|
|
|
|
|
|
|
|
407
|
10
|
|
|
|
|
107
|
return $status; |
408
|
|
|
|
|
|
|
} |
409
|
|
|
|
|
|
|
|
410
|
14
|
|
|
14
|
|
24
|
sub _clear_slot ($self, $slot, $force = 0) { |
|
14
|
|
|
|
|
18
|
|
|
14
|
|
|
|
|
21
|
|
|
14
|
|
|
|
|
20
|
|
|
14
|
|
|
|
|
16
|
|
411
|
14
|
|
|
|
|
23
|
$slot->{'is_busy'} = 0; |
412
|
14
|
|
|
|
|
23
|
$slot->{'exp_ts'} = 0; |
413
|
14
|
|
|
|
|
19
|
$slot->{'tx'} = undef; |
414
|
14
|
|
|
|
|
28
|
$slot->{'request'} = undef; |
415
|
14
|
|
|
|
|
20
|
$slot->{'tmp_response'} = undef; |
416
|
14
|
100
|
|
|
|
30
|
if ($force) { |
417
|
6
|
50
|
|
|
|
400
|
close($slot->{'socket'}) if $slot->{'socket'}; |
418
|
6
|
|
|
|
|
24
|
$slot->{'socket'} = undef; |
419
|
6
|
|
|
|
|
13
|
$slot->{'reader'} = undef; |
420
|
6
|
|
|
|
|
26
|
$slot->{'writer'} = undef; |
421
|
6
|
|
|
|
|
11
|
$slot->{'sock_no'} = 0; |
422
|
6
|
|
|
|
|
10
|
$slot->{'reconnect_is_required'} = 0; |
423
|
6
|
|
|
|
|
11
|
$slot->{'last_response_ts'} = 0; |
424
|
6
|
|
|
|
|
19
|
$slot->{'connected_ts'} = 0; |
425
|
|
|
|
|
|
|
} |
426
|
|
|
|
|
|
|
} |
427
|
|
|
|
|
|
|
|
428
|
4
|
|
|
4
|
|
9
|
sub _mark_slot_as_broken($self, $slot) { |
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
7
|
|
429
|
4
|
|
|
|
|
6
|
$slot->{'reconnect_is_required'} = 1; |
430
|
4
|
|
|
|
|
8
|
$slot->{'is_busy'} = 1; |
431
|
4
|
|
33
|
|
|
15
|
$slot->{'request'} //= Mojo::Message::Request->new(); |
432
|
|
|
|
|
|
|
$slot->{'tx'} //= Mojo::Transaction::HTTP->new( |
433
|
4
|
|
33
|
|
|
85
|
'req' => $slot->{'request'}, |
434
|
|
|
|
|
|
|
'res' => Mojo::Message::Response->new() |
435
|
|
|
|
|
|
|
); |
436
|
|
|
|
|
|
|
} |
437
|
|
|
|
|
|
|
|
438
|
0
|
|
|
0
|
|
0
|
sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') { |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
439
|
0
|
|
|
|
|
0
|
$self->_mark_slot_as_broken($slot);li |
440
|
0
|
|
|
|
|
0
|
$slot->{'request'}->error({'message' => $msg, 'code' => $code}); |
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
|
443
|
4
|
|
|
4
|
|
6
|
sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') { |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
5
|
|
444
|
4
|
|
|
|
|
14
|
$self->_mark_slot_as_broken($slot); |
445
|
|
|
|
|
|
|
|
446
|
4
|
|
|
|
|
97
|
my $res = $slot->{'tx'}->res(); |
447
|
4
|
|
|
|
|
88
|
$res->error({'message' => $msg, 'code' => $code}); |
448
|
4
|
|
|
|
|
156
|
$res->headers()->content_length(0); |
449
|
4
|
|
|
|
|
340
|
$res->code($code); |
450
|
4
|
|
|
|
|
36
|
$res->message($msg); |
451
|
|
|
|
|
|
|
} |
452
|
|
|
|
|
|
|
|
453
|
0
|
|
|
0
|
|
0
|
sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') { |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
454
|
0
|
|
|
|
|
0
|
$self->_mark_request_as_broken($slot, 524, $message); |
455
|
|
|
|
|
|
|
} |
456
|
|
|
|
|
|
|
|
457
|
4
|
|
|
4
|
|
5
|
sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') { |
|
4
|
|
|
|
|
9
|
|
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
20
|
|
|
4
|
|
|
|
|
7
|
|
458
|
4
|
|
|
|
|
18
|
$self->_mark_response_as_broken($slot, 524, $message); |
459
|
|
|
|
|
|
|
} |
460
|
|
|
|
|
|
|
|
461
|
8
|
|
|
8
|
|
11
|
sub _send_request ($self, $slot, $request, $timeout = undef) { |
|
8
|
|
|
|
|
10
|
|
|
8
|
|
|
|
|
13
|
|
|
8
|
|
|
|
|
9
|
|
|
8
|
|
|
|
|
13
|
|
|
8
|
|
|
|
|
9
|
|
462
|
|
|
|
|
|
|
|
463
|
8
|
50
|
|
|
|
22
|
croak("slot is busy") if ($slot->{'is_busy'}); |
464
|
8
|
50
|
|
|
|
20
|
croak("request object is obligatory") if (!$request); |
465
|
8
|
50
|
|
|
|
47
|
croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request')); |
466
|
|
|
|
|
|
|
|
467
|
8
|
50
|
|
|
|
40
|
my $required_scheme = $self->{'ssl'} ? 'https' : 'http'; |
468
|
8
|
|
|
|
|
19
|
my $url = $request->url(); |
469
|
8
|
|
|
|
|
114
|
my $uri = URI->new( $url ); |
470
|
8
|
|
|
|
|
10757
|
my $scheme = $url->scheme(); |
471
|
|
|
|
|
|
|
|
472
|
8
|
50
|
33
|
|
|
100
|
if ($scheme && $required_scheme ne $scheme) { |
473
|
0
|
|
|
|
|
0
|
croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string())); |
474
|
|
|
|
|
|
|
} |
475
|
|
|
|
|
|
|
|
476
|
8
|
50
|
|
|
|
53
|
if (! $uri->scheme()) { |
477
|
|
|
|
|
|
|
# URI::_generic doesn't have C method, that's why we set the scheme by ourseleves to change C<$uri> type |
478
|
8
|
|
|
|
|
243
|
$uri->scheme($required_scheme); |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
|
481
|
8
|
50
|
|
|
|
5280
|
if (my $host = $uri->host()) { |
482
|
0
|
0
|
|
|
|
0
|
if ($host ne $self->{'host'}) { |
483
|
0
|
|
|
|
|
0
|
croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'})); |
484
|
|
|
|
|
|
|
} |
485
|
|
|
|
|
|
|
} |
486
|
|
|
|
|
|
|
|
487
|
8
|
|
33
|
|
|
263
|
$timeout //= $self->{'request_timeout'}; |
488
|
|
|
|
|
|
|
|
489
|
8
|
|
|
|
|
64
|
my $response = ''; |
490
|
8
|
|
|
|
|
17
|
state $default_ua_hdr = 'perl/' . __PACKAGE__; |
491
|
|
|
|
|
|
|
|
492
|
8
|
|
|
|
|
71
|
my $h = $request->headers(); |
493
|
8
|
50
|
|
|
|
837
|
$h->host($self->{'host'}) if (! $h->host() ); |
494
|
8
|
50
|
|
|
|
145
|
$h->user_agent($default_ua_hdr) if (! $h->user_agent() ); |
495
|
|
|
|
|
|
|
|
496
|
8
|
|
|
|
|
106
|
$slot->{'request'} = $request; |
497
|
8
|
|
|
|
|
16
|
$slot->{'is_busy'} = 1; |
498
|
8
|
50
|
|
|
|
46
|
$slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0; |
499
|
|
|
|
|
|
|
|
500
|
8
|
|
|
|
|
49
|
my $plain_request = $request->to_string(); |
501
|
|
|
|
|
|
|
|
502
|
8
|
50
|
|
|
|
5629
|
if ($self->{'ssl'}) { |
503
|
0
|
|
|
|
|
0
|
$slot->{'writer'}->print($plain_request); |
504
|
|
|
|
|
|
|
} else { |
505
|
8
|
|
|
|
|
27
|
my $socket = $slot->{'socket'}; |
506
|
8
|
|
|
|
|
72
|
my $msg_len = bytes::length($plain_request); |
507
|
8
|
|
|
|
|
917
|
my $sent_bytes = 0; |
508
|
8
|
|
|
|
|
13
|
my $attempts = 10; |
509
|
|
|
|
|
|
|
|
510
|
8
|
|
|
|
|
52
|
local $!; |
511
|
|
|
|
|
|
|
|
512
|
8
|
|
66
|
|
|
91
|
while ($sent_bytes < $msg_len && $attempts--) { |
513
|
8
|
|
|
|
|
422
|
my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes); |
514
|
|
|
|
|
|
|
|
515
|
8
|
50
|
33
|
|
|
72
|
if ($! || ! defined($bytes)) { |
516
|
0
|
|
0
|
|
|
0
|
my $error = $! // 'Unknown error'; |
517
|
0
|
|
|
|
|
0
|
$self->_mark_request_as_broken($slot, 520, $error); |
518
|
0
|
|
|
|
|
0
|
return; |
519
|
|
|
|
|
|
|
} |
520
|
|
|
|
|
|
|
|
521
|
8
|
|
|
|
|
14
|
$sent_bytes += $bytes; |
522
|
8
|
50
|
|
|
|
26
|
$plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len; |
523
|
|
|
|
|
|
|
} |
524
|
|
|
|
|
|
|
|
525
|
8
|
50
|
|
|
|
35
|
if ($sent_bytes < $msg_len) { |
526
|
0
|
|
0
|
|
|
0
|
my $error = $! // 'sent message is shorter than original'; |
527
|
0
|
|
|
|
|
0
|
$self->_mark_request_as_broken($slot, 520, $error); |
528
|
0
|
|
|
|
|
0
|
return; |
529
|
|
|
|
|
|
|
} |
530
|
|
|
|
|
|
|
} |
531
|
|
|
|
|
|
|
|
532
|
8
|
50
|
33
|
|
|
72
|
if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) { |
533
|
0
|
|
|
|
|
0
|
$self->_mark_request_as_timeouted($slot); |
534
|
|
|
|
|
|
|
} |
535
|
|
|
|
|
|
|
|
536
|
8
|
|
|
|
|
28
|
return; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
|
539
|
4
|
|
|
4
|
|
7
|
sub _try_to_read ($self, $slot) { |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
5
|
|
540
|
|
|
|
|
|
|
|
541
|
4
|
50
|
33
|
|
|
60
|
return if $slot->{'tx'} || ! $slot->{'is_busy'}; |
542
|
|
|
|
|
|
|
|
543
|
4
|
|
|
|
|
13
|
my $reader = $slot->{'reader'}; |
544
|
4
|
|
33
|
|
|
102
|
my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new(); |
545
|
|
|
|
|
|
|
|
546
|
4
|
|
|
|
|
220
|
$response->parse($_) while (<$reader>); |
547
|
|
|
|
|
|
|
|
548
|
4
|
50
|
33
|
|
|
5668
|
if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data) |
|
|
50
|
33
|
|
|
|
|
|
|
|
33
|
|
|
|
|
549
|
0
|
|
|
|
|
0
|
$self->_mark_response_as_broken($slot, 520, $!); |
550
|
|
|
|
|
|
|
} elsif ($response && $response->code()) { |
551
|
|
|
|
|
|
|
|
552
|
4
|
|
|
|
|
106
|
my $content = $response->content(); |
553
|
|
|
|
|
|
|
|
554
|
4
|
50
|
|
|
|
24
|
if ($content->is_finished()) { # this is required to support "Transfer-Encoding: chunked" |
555
|
|
|
|
|
|
|
$slot->{'tx'} = Mojo::Transaction::HTTP->new( |
556
|
4
|
|
|
|
|
87
|
'req' => $slot->{'request'}, |
557
|
|
|
|
|
|
|
'res' => $response |
558
|
|
|
|
|
|
|
); |
559
|
4
|
|
|
|
|
64
|
$slot->{'tmp_response'} = undef; |
560
|
4
|
|
|
|
|
25
|
$slot->{'last_response_ts'} = time(); |
561
|
|
|
|
|
|
|
} else { |
562
|
0
|
|
|
|
|
0
|
$slot->{'tmp_response'} = $response; |
563
|
|
|
|
|
|
|
} |
564
|
|
|
|
|
|
|
|
565
|
4
|
50
|
|
|
|
12
|
$slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close |
566
|
|
|
|
|
|
|
} |
567
|
|
|
|
|
|
|
|
568
|
4
|
0
|
0
|
|
|
48
|
if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) { |
|
|
|
33
|
|
|
|
|
569
|
0
|
|
|
|
|
0
|
$self->_mark_response_as_timeouted($slot); |
570
|
|
|
|
|
|
|
} |
571
|
|
|
|
|
|
|
} |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
=head2 not_empty($self) |
574
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
Returns 1 if there even one slot is busy or slot contains a not processed response. |
576
|
|
|
|
|
|
|
Otherwise the method returns 0. |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
=cut |
579
|
|
|
|
|
|
|
|
580
|
14301
|
|
|
14301
|
1
|
25011
|
sub not_empty ($self) { |
|
14301
|
|
|
|
|
15862
|
|
|
14301
|
|
|
|
|
12833
|
|
581
|
|
|
|
|
|
|
|
582
|
14301
|
|
|
|
|
17871
|
my $not_empty = scalar $self->{'_conns'}->@*; |
583
|
|
|
|
|
|
|
|
584
|
14301
|
|
|
|
|
21274
|
for my $slot ($self->{'_conns'}->@*) { |
585
|
28602
|
50
|
66
|
|
|
58898
|
$not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'}; |
586
|
|
|
|
|
|
|
} |
587
|
|
|
|
|
|
|
|
588
|
14301
|
100
|
|
|
|
36007
|
return $not_empty ? 1 : 0; |
589
|
|
|
|
|
|
|
} |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
=head2 wait_for_next_response($self, $timeout = 0) |
593
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
Waits for first received response or time-outed request in any slot. |
595
|
|
|
|
|
|
|
Returns the C instance with result. |
596
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
=over |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=item $timeout |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
Period of time in seconds. Can be fractial with microsecond tollerance. |
602
|
|
|
|
|
|
|
The response will be marked as time-outed after this time is out. |
603
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
The default value is 0, which means that request will have been blocked until the response is received. |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
If all slots are empty, then C will be returned. |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
=back |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
=cut |
611
|
|
|
|
|
|
|
|
612
|
10
|
|
|
10
|
1
|
27328
|
sub wait_for_next_response ($self, $timeout = 0) { |
|
10
|
|
|
|
|
22
|
|
|
10
|
|
|
|
|
18
|
|
|
10
|
|
|
|
|
17
|
|
613
|
|
|
|
|
|
|
|
614
|
10
|
|
|
|
|
16
|
my $response; |
615
|
10
|
50
|
|
|
|
56
|
my $exp_ts = $timeout ? (time() + $timeout) : 0; |
616
|
|
|
|
|
|
|
|
617
|
10
|
|
|
|
|
17
|
while (1) { |
618
|
13765
|
50
|
33
|
|
|
93979
|
last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed |
619
|
13765
|
100
|
100
|
|
|
31425
|
last if (($response = $self->next_response()) || !$self->not_empty()); |
620
|
13755
|
50
|
|
|
|
1300534
|
sleep(1E-6) if (!$response); # sleep 1 microsecond |
621
|
|
|
|
|
|
|
} |
622
|
|
|
|
|
|
|
|
623
|
10
|
|
|
|
|
31
|
return $response; |
624
|
|
|
|
|
|
|
} |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
=head2 next_response ($self) |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
Returns an instance of C class. |
629
|
|
|
|
|
|
|
If there is no response, it will return C. |
630
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
=cut |
632
|
|
|
|
|
|
|
|
633
|
14306
|
|
|
14306
|
1
|
21516
|
sub next_response ($self) { |
|
14306
|
|
|
|
|
16341
|
|
|
14306
|
|
|
|
|
14010
|
|
634
|
14306
|
|
66
|
|
|
25100
|
return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot(); |
635
|
|
|
|
|
|
|
} |
636
|
|
|
|
|
|
|
|
637
|
14298
|
|
|
14298
|
|
14792
|
sub _get_response_from_slot ($self) { |
|
14298
|
|
|
|
|
14090
|
|
|
14298
|
|
|
|
|
13525
|
|
638
|
|
|
|
|
|
|
|
639
|
14298
|
|
|
|
|
13361
|
my $tx; |
640
|
14298
|
|
|
28596
|
|
72466
|
my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*; |
|
28596
|
|
|
|
|
45278
|
|
641
|
|
|
|
|
|
|
|
642
|
14298
|
50
|
|
|
|
39427
|
if ($slot) { |
643
|
0
|
|
|
|
|
0
|
$tx = $slot->{'tx'}; |
644
|
0
|
|
|
|
|
0
|
$self->_clear_slot($slot, $slot->{'reconnect_is_required'}); |
645
|
|
|
|
|
|
|
} |
646
|
|
|
|
|
|
|
|
647
|
14298
|
|
|
|
|
55142
|
return $tx; |
648
|
|
|
|
|
|
|
} |
649
|
|
|
|
|
|
|
|
650
|
14306
|
|
|
14306
|
|
16350
|
sub _get_response_from_ready_slot ($self) { |
|
14306
|
|
|
|
|
14383
|
|
|
14306
|
|
|
|
|
15268
|
|
651
|
|
|
|
|
|
|
|
652
|
14306
|
|
|
|
|
14455
|
my $tx; |
653
|
18917
|
|
|
|
|
61630
|
my %socks2slots = map { $_->{'sock_no'} => $_ } |
654
|
28612
|
100
|
66
|
|
|
117738
|
grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} } |
655
|
14306
|
|
|
|
|
31873
|
$self->{'_conns'}->@*; |
656
|
|
|
|
|
|
|
|
657
|
14306
|
100
|
|
|
|
31000
|
if (%socks2slots) { |
658
|
|
|
|
|
|
|
|
659
|
14302
|
|
|
|
|
72133
|
local $!; |
660
|
14302
|
|
|
|
|
19681
|
my $read_handles = ''; |
661
|
|
|
|
|
|
|
|
662
|
14302
|
|
|
|
|
74133
|
vec($read_handles, $_, 1) = 1 for keys %socks2slots; |
663
|
|
|
|
|
|
|
|
664
|
14302
|
|
|
|
|
21638
|
my $error_handles = $read_handles; |
665
|
14302
|
|
|
|
|
120116
|
my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0); |
666
|
|
|
|
|
|
|
|
667
|
14302
|
|
|
|
|
61600
|
$self->_check_for_errors(\%socks2slots, $error_handles, $!); |
668
|
|
|
|
|
|
|
|
669
|
14302
|
|
|
|
|
25084
|
for my $sock_no (keys %socks2slots) { |
670
|
18915
|
|
|
|
|
23012
|
my $slot = $socks2slots{ $sock_no }; |
671
|
18915
|
100
|
100
|
|
|
30594
|
if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) { |
672
|
4
|
|
|
|
|
16
|
$self->_try_to_read($slot); |
673
|
4
|
50
|
|
|
|
14
|
next if ! $slot->{'tx'}; |
674
|
4
|
50
|
|
|
|
10
|
next if ! $slot->{'is_busy'}; |
675
|
4
|
|
|
|
|
8
|
$tx = $slot->{'tx'}; |
676
|
|
|
|
|
|
|
} else { |
677
|
18911
|
100
|
66
|
|
|
85585
|
if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) { |
|
|
|
66
|
|
|
|
|
678
|
4
|
|
|
|
|
20
|
$self->_mark_response_as_timeouted($slot); |
679
|
4
|
|
|
|
|
26
|
$tx = $slot->{'tx'}; |
680
|
|
|
|
|
|
|
} |
681
|
|
|
|
|
|
|
} |
682
|
|
|
|
|
|
|
|
683
|
18915
|
100
|
|
|
|
49758
|
if ($tx) { |
684
|
8
|
|
|
|
|
40
|
$self->_clear_slot($slot, 0); |
685
|
8
|
|
|
|
|
32
|
last; |
686
|
|
|
|
|
|
|
} |
687
|
|
|
|
|
|
|
} |
688
|
|
|
|
|
|
|
} |
689
|
|
|
|
|
|
|
|
690
|
14306
|
|
|
|
|
46091
|
return $tx; |
691
|
|
|
|
|
|
|
} |
692
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
=head2 refresh_connections ($self) |
694
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
Closes connections in slots in the following cases: |
696
|
|
|
|
|
|
|
|
697
|
|
|
|
|
|
|
1. The slot was marked as timeouted |
698
|
|
|
|
|
|
|
2. C was set and the connection was expired |
699
|
|
|
|
|
|
|
3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc) |
700
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
=cut |
702
|
|
|
|
|
|
|
|
703
|
1
|
|
|
1
|
1
|
2
|
sub refresh_connections ($self) { |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
2
|
|
704
|
|
|
|
|
|
|
|
705
|
1
|
|
|
|
|
2
|
my $n = 0; |
706
|
1
|
|
|
|
|
3
|
my $now = time(); |
707
|
1
|
|
50
|
|
|
4
|
my $keep_ts = $self->{'inactivity_conn_ts'} // 0; |
708
|
|
|
|
|
|
|
|
709
|
1
|
50
|
|
|
|
3
|
if (scalar $self->{'_conns'}->@*) { |
710
|
|
|
|
|
|
|
|
711
|
1
|
|
|
|
|
5
|
local $!; |
712
|
1
|
|
|
|
|
2
|
my $error_handles = ''; |
713
|
1
|
|
|
|
|
3
|
my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*; |
|
2
|
|
|
|
|
6
|
|
714
|
|
|
|
|
|
|
|
715
|
1
|
|
|
|
|
8
|
vec($error_handles, $_, 1) = 1 for keys %socks2slots; |
716
|
1
|
|
|
|
|
11
|
select(undef, undef, $error_handles, 0); |
717
|
|
|
|
|
|
|
|
718
|
1
|
|
|
|
|
6
|
$self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect |
719
|
|
|
|
|
|
|
} |
720
|
|
|
|
|
|
|
|
721
|
1
|
|
|
|
|
4
|
for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) { |
|
1
|
|
|
|
|
3
|
|
722
|
2
|
|
|
|
|
5
|
my $slot = $self->{'_conns'}->[$i]; |
723
|
2
|
|
33
|
|
|
26
|
my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts; |
724
|
2
|
|
33
|
|
|
12
|
my $is_outdated = $keep_ts && $slot_exp_ts <= $now; |
725
|
|
|
|
|
|
|
|
726
|
2
|
50
|
33
|
|
|
8
|
warn("Outdated\n") if $self->{'debug'} && $is_outdated; |
727
|
|
|
|
|
|
|
|
728
|
2
|
50
|
33
|
|
|
9
|
if ($slot->{'reconnect_is_required'} || $is_outdated) { |
729
|
2
|
50
|
|
|
|
7
|
warn("Going to reconnect\n") if $self->{'debug'}; |
730
|
2
|
|
|
|
|
8
|
$self->_clear_slot($slot, 1); |
731
|
2
|
|
|
|
|
9
|
$self->_connect_slot($slot); |
732
|
2
|
|
|
|
|
7
|
$n++; |
733
|
|
|
|
|
|
|
} |
734
|
|
|
|
|
|
|
} |
735
|
|
|
|
|
|
|
|
736
|
1
|
|
|
|
|
6
|
return $n; |
737
|
|
|
|
|
|
|
} |
738
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
=head2 close_all ($self) |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
Closes all opened connections and resets all slots with requests. |
742
|
|
|
|
|
|
|
|
743
|
|
|
|
|
|
|
=cut |
744
|
|
|
|
|
|
|
|
745
|
2
|
|
|
2
|
1
|
3017
|
sub close_all ($self) { |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
6
|
|
746
|
2
|
|
|
|
|
10
|
$self->_clear_slot($_, 1) for $self->{'_conns'}->@*; |
747
|
2
|
|
|
|
|
14
|
$self->{'_conns'} = []; |
748
|
2
|
|
|
|
|
7
|
return; |
749
|
|
|
|
|
|
|
} |
750
|
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
=head2 DESTROY($class) |
752
|
|
|
|
|
|
|
|
753
|
|
|
|
|
|
|
The class destructor. |
754
|
|
|
|
|
|
|
|
755
|
|
|
|
|
|
|
Closes all opened sockets. |
756
|
|
|
|
|
|
|
|
757
|
|
|
|
|
|
|
=cut |
758
|
|
|
|
|
|
|
|
759
|
1
|
|
|
1
|
|
10696
|
sub DESTROY ($self) { |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
2
|
|
760
|
1
|
|
|
|
|
2
|
my $in_use = 0; |
761
|
1
|
|
|
|
|
10
|
while ( my $slot = shift($self->{'_conns'}->@*) ) { |
762
|
2
|
50
|
|
|
|
47
|
$in_use++ if ($slot->{'is_busy'}); |
763
|
2
|
50
|
|
|
|
53
|
$slot->{'socket'}->close() if ($slot->{'socket'}); |
764
|
|
|
|
|
|
|
} |
765
|
1
|
50
|
|
|
|
69
|
warn ref($self) ." object destroyed but still in use" if $in_use; |
766
|
|
|
|
|
|
|
} |
767
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
1; |
769
|
|
|
|
|
|
|
__END__ |