line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
########################################################################### |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
=head1 NAME |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
Danga::Socket::AnyEvent - Danga::Socket reimplemented in terms of AnyEvent |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 SYNOPSIS |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
# This will clobber the Danga::Socket namespace |
10
|
|
|
|
|
|
|
# with the new implementation. |
11
|
|
|
|
|
|
|
use Danga::Socket::AnyEvent; |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
# Then just use Danga::Socket as normal. |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
=head1 DESCRIPTION |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
This is an alternative implementation of L that is |
18
|
|
|
|
|
|
|
implemented in terms of L, an abstraction layer for |
19
|
|
|
|
|
|
|
event loops. This allows Danga::Socket applications to run |
20
|
|
|
|
|
|
|
in any event loop supported by AnyEvent, and allows Danga::Socket |
21
|
|
|
|
|
|
|
applications to make use of AnyEvent-based libraries. |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
Loading this module will install a workalike set of functions |
24
|
|
|
|
|
|
|
into the Danga::Socket package. It must therefore be loaded before |
25
|
|
|
|
|
|
|
anything loads the real L. If you try to load |
26
|
|
|
|
|
|
|
this module after Danga::Socket has been loaded then it will |
27
|
|
|
|
|
|
|
die. |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
=head1 DIFFERENCES FROM Danga::Socket |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
Although this module aims to be a faithful recreation of the |
32
|
|
|
|
|
|
|
features and interface of Danga::Socket, there are some known |
33
|
|
|
|
|
|
|
differences: |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
=over |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=item * The C feature will only work if the caller |
38
|
|
|
|
|
|
|
runs the event loop via CEventLoop>; if a caller |
39
|
|
|
|
|
|
|
runs the AnyEvent event loop directly, or if some other library |
40
|
|
|
|
|
|
|
runs it, then the timeout will not take effect. |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
=item * The C feature behaves in a slightly |
43
|
|
|
|
|
|
|
different way than in the stock Danga::Socket. It's currently |
44
|
|
|
|
|
|
|
implemented via an AnyEvent idlewatcher that runs whenever |
45
|
|
|
|
|
|
|
the event loop goes idle after running a Danga::Socket event. |
46
|
|
|
|
|
|
|
This means that it will probably run at different times than |
47
|
|
|
|
|
|
|
it would have in Danga::Socket's own event loops. |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
=item * The C method will always return true, regardless |
50
|
|
|
|
|
|
|
of what backend is actually implementing the event loop. Make |
51
|
|
|
|
|
|
|
sure to use AnyEvent's L backend if you would like to use |
52
|
|
|
|
|
|
|
Epoll/KQueue/etc rather than other, less efficient mechanisms. |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=back |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=cut |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
########################################################################### |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
package Danga::Socket::AnyEvent; |
61
|
|
|
|
|
|
|
|
62
|
1
|
|
|
1
|
|
483
|
use vars qw{$VERSION}; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
45
|
|
63
|
|
|
|
|
|
|
$VERSION = "0.01_01"; |
64
|
1
|
|
|
1
|
|
4
|
use Carp; |
|
1
|
|
|
|
|
0
|
|
|
1
|
|
|
|
|
70
|
|
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
BEGIN { |
67
|
|
|
|
|
|
|
# Detect if someone's already loaded Danga::Socket and bail out. |
68
|
1
|
50
|
|
1
|
|
27
|
if ($INC{"Danga/Socket.pm"}) { |
69
|
0
|
|
|
|
|
0
|
Carp::croak("Can't load Danga::Socket::AnyEvent: the real Danga::Socket was already loaded from ".$INC{"Danga/Socket.pm"}); |
70
|
|
|
|
|
|
|
} |
71
|
|
|
|
|
|
|
} |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
package # hidden from PAUSE |
74
|
|
|
|
|
|
|
Danga::Socket; |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
our $VERSION = "1.61"; |
77
|
|
|
|
|
|
|
|
78
|
1
|
|
|
1
|
|
8
|
use strict; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
15
|
|
79
|
1
|
|
|
1
|
|
440
|
use bytes; |
|
1
|
|
|
|
|
7
|
|
|
1
|
|
|
|
|
3
|
|
80
|
1
|
|
|
1
|
|
403
|
use POSIX (); |
|
1
|
|
|
|
|
4253
|
|
|
1
|
|
|
|
|
21
|
|
81
|
1
|
|
|
1
|
|
425
|
use Time::HiRes (); |
|
1
|
|
|
|
|
954
|
|
|
1
|
|
|
|
|
20
|
|
82
|
1
|
|
|
1
|
|
783
|
use AnyEvent; |
|
1
|
|
|
|
|
3700
|
|
|
1
|
|
|
|
|
34
|
|
83
|
|
|
|
|
|
|
|
84
|
1
|
|
|
1
|
|
221
|
my $opt_bsd_resource = eval "use BSD::Resource; 1;"; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
85
|
|
|
|
|
|
|
|
86
|
1
|
|
|
1
|
|
4
|
use warnings; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
29
|
|
87
|
1
|
|
|
1
|
|
3
|
no warnings qw(deprecated); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
30
|
|
88
|
|
|
|
|
|
|
|
89
|
1
|
|
|
1
|
|
396
|
use Sys::Syscall qw(:epoll); |
|
1
|
|
|
|
|
2312
|
|
|
1
|
|
|
|
|
159
|
|
90
|
|
|
|
|
|
|
|
91
|
1
|
|
|
|
|
4
|
use fields ('sock', # underlying socket |
92
|
|
|
|
|
|
|
'fd', # numeric file descriptor |
93
|
|
|
|
|
|
|
'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write |
94
|
|
|
|
|
|
|
'write_buf_offset', # offset into first array of write_buf to start writing at |
95
|
|
|
|
|
|
|
'write_buf_size', # total length of data in all write_buf items |
96
|
|
|
|
|
|
|
'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass |
97
|
|
|
|
|
|
|
'read_push_back', # arrayref of "pushed-back" read data the application didn't want |
98
|
|
|
|
|
|
|
'closed', # bool: socket is closed |
99
|
|
|
|
|
|
|
'corked', # bool: socket is corked |
100
|
|
|
|
|
|
|
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) |
101
|
|
|
|
|
|
|
'peer_v6', # bool: cached; if peer is an IPv6 address |
102
|
|
|
|
|
|
|
'peer_ip', # cached stringified IP address of $sock |
103
|
|
|
|
|
|
|
'peer_port', # cached port number of $sock |
104
|
|
|
|
|
|
|
'local_ip', # cached stringified IP address of local end of $sock |
105
|
|
|
|
|
|
|
'local_port', # cached port number of local end of $sock |
106
|
|
|
|
|
|
|
'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors |
107
|
1
|
|
|
1
|
|
452
|
); |
|
1
|
|
|
|
|
1052
|
|
108
|
|
|
|
|
|
|
|
109
|
1
|
|
|
|
|
97
|
use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK |
110
|
1
|
|
|
1
|
|
110
|
EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT); |
|
1
|
|
|
|
|
1
|
|
111
|
1
|
|
|
1
|
|
477
|
use Socket qw(IPPROTO_TCP); |
|
1
|
|
|
|
|
2527
|
|
|
1
|
|
|
|
|
125
|
|
112
|
1
|
|
|
1
|
|
5
|
use Carp qw(croak confess); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
44
|
|
113
|
|
|
|
|
|
|
|
114
|
1
|
50
|
|
1
|
|
4
|
use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # FIXME: not hard-coded (Linux-specific too) |
|
1
|
|
|
|
|
0
|
|
|
1
|
|
|
|
|
49
|
|
115
|
1
|
|
|
1
|
|
3
|
use constant DebugLevel => 0; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
33
|
|
116
|
|
|
|
|
|
|
|
117
|
1
|
|
|
1
|
|
3
|
use constant POLLIN => 1; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
31
|
|
118
|
1
|
|
|
1
|
|
2
|
use constant POLLOUT => 4; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
30
|
|
119
|
1
|
|
|
1
|
|
3
|
use constant POLLERR => 8; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
29
|
|
120
|
1
|
|
|
1
|
|
3
|
use constant POLLHUP => 16; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
28
|
|
121
|
1
|
|
|
1
|
|
3
|
use constant POLLNVAL => 32; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
3118
|
|
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
our ( |
124
|
|
|
|
|
|
|
%Timers, # timers |
125
|
|
|
|
|
|
|
%FdWatchers, # fd (num) -> [ AnyEvent read watcher, AnyEvent write watcher ] |
126
|
|
|
|
|
|
|
%DescriptorMap, # fd (num) -> Danga::Socket object that owns it |
127
|
|
|
|
|
|
|
%OtherFds, # fd (num) -> sub to run when that fd is ready to read or write |
128
|
|
|
|
|
|
|
%PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data) |
129
|
|
|
|
|
|
|
$PostLoopCallback, # subref to call at the end of each loop, if defined (global) |
130
|
|
|
|
|
|
|
%PLCMap, # fd (num) -> PostLoopCallback (per-object) |
131
|
|
|
|
|
|
|
$IdleWatcher, # an AnyEvent idle watcher that'll run PostEventLoop and then delete itself. |
132
|
|
|
|
|
|
|
$MainLoopCondVar, # When EventLoop is running this contains the AnyEvent condvar that |
133
|
|
|
|
|
|
|
# will cause the main loop to exit if you call ->send() on it. |
134
|
|
|
|
|
|
|
@ToClose, # sockets to close when event loop is done |
135
|
|
|
|
|
|
|
$DoProfile, # if on, enable profiling |
136
|
|
|
|
|
|
|
%Profiling, # what => [ utime, stime, calls ] |
137
|
|
|
|
|
|
|
$DoneInit, # if we've done the one-time module init yet |
138
|
|
|
|
|
|
|
$LoopTimeout, # timeout of event loop in milliseconds |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
); |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
Reset(); |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=head1 METHODS |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
=cut |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
##################################################################### |
149
|
|
|
|
|
|
|
### C L A S S M E T H O D S |
150
|
|
|
|
|
|
|
##################################################################### |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
=head2 C<< CLASS->Reset() >> |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
Reset all state |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
=cut |
157
|
|
|
|
|
|
|
sub Reset { |
158
|
1
|
|
|
1
|
0
|
2
|
%Timers = (); |
159
|
1
|
|
|
|
|
13
|
%FdWatchers = (); |
160
|
1
|
|
|
|
|
1
|
%DescriptorMap = (); |
161
|
1
|
|
|
|
|
2
|
%OtherFds = (); |
162
|
|
|
|
|
|
|
} |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
=head2 C<< CLASS->HaveEpoll() >> |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
Returns a true value if this class will use IO::Epoll for async IO. |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
=cut |
169
|
|
|
|
|
|
|
sub HaveEpoll { |
170
|
0
|
|
|
0
|
0
|
|
return 1; |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
=head2 C<< CLASS->WatchedSockets() >> |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
Returns the number of file descriptors for which we have watchers installed. |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
=cut |
178
|
|
|
|
|
|
|
sub WatchedSockets { |
179
|
0
|
|
|
0
|
0
|
|
return scalar(keys(%FdWatchers)); |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
*watched_sockets = *WatchedSockets; |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
=head2 C<< CLASS->EnableProfiling() >> |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
Turns profiling on, clearing current profiling data. |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
=cut |
188
|
|
|
|
|
|
|
sub EnableProfiling { |
189
|
0
|
0
|
|
0
|
0
|
|
if ($opt_bsd_resource) { |
190
|
0
|
|
|
|
|
|
%Profiling = (); |
191
|
0
|
|
|
|
|
|
$DoProfile = 1; |
192
|
0
|
|
|
|
|
|
return 1; |
193
|
|
|
|
|
|
|
} |
194
|
0
|
|
|
|
|
|
return 0; |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
=head2 C<< CLASS->DisableProfiling() >> |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
Turns off profiling, but retains data up to this point |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
=cut |
202
|
|
|
|
|
|
|
sub DisableProfiling { |
203
|
0
|
|
|
0
|
0
|
|
$DoProfile = 0; |
204
|
|
|
|
|
|
|
} |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=head2 C<< CLASS->ProfilingData() >> |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
Returns reference to a hash of data in format: |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
ITEM => [ utime, stime, #calls ] |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
=cut |
213
|
|
|
|
|
|
|
sub ProfilingData { |
214
|
0
|
|
|
0
|
0
|
|
return \%Profiling; |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
=head2 C<< CLASS->ToClose() >> |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
Return the list of sockets that are awaiting close() at the end of the |
220
|
|
|
|
|
|
|
current event loop. |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
=cut |
223
|
0
|
|
|
0
|
0
|
|
sub ToClose { return @ToClose; } |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
=head2 C<< CLASS->OtherFds( [%fdmap] ) >> |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
Get/set the hash of file descriptors that need processing in parallel with |
228
|
|
|
|
|
|
|
the registered Danga::Socket objects. |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
Callers must not modify the returned hash. |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
=cut |
233
|
|
|
|
|
|
|
sub OtherFds { |
234
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
235
|
0
|
0
|
|
|
|
|
if ( @_ ) { |
236
|
|
|
|
|
|
|
# Clean up any watchers that we no longer need. |
237
|
0
|
|
|
|
|
|
foreach my $fd (keys %OtherFds) { |
238
|
0
|
|
|
|
|
|
delete $FdWatchers{$fd}; |
239
|
|
|
|
|
|
|
} |
240
|
0
|
|
|
|
|
|
%OtherFds = (); |
241
|
0
|
|
|
|
|
|
$class->AddOtherFds(@_); |
242
|
|
|
|
|
|
|
} |
243
|
0
|
0
|
|
|
|
|
return wantarray ? %OtherFds : \%OtherFds; |
244
|
|
|
|
|
|
|
} |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
=head2 C<< CLASS->AddOtherFds( [%fdmap] ) >> |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
Add fds to the OtherFds hash for processing. |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=cut |
251
|
|
|
|
|
|
|
sub AddOtherFds { |
252
|
0
|
|
|
0
|
0
|
|
my ($class, %fdmap) = @_; |
253
|
|
|
|
|
|
|
|
254
|
0
|
|
|
|
|
|
foreach my $fd (keys %fdmap) { |
255
|
0
|
|
|
|
|
|
my $coderef = $fdmap{$fd}; |
256
|
0
|
|
|
|
|
|
$OtherFds{$fd} = $coderef; |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
# The OtherFds interface uses the same callback for both read and write events, |
259
|
|
|
|
|
|
|
# so create two AnyEvent watchers that differ only in their mode. |
260
|
|
|
|
|
|
|
$FdWatchers{$fd} = [ map { |
261
|
0
|
|
|
|
|
|
my $mode = $_; |
|
0
|
|
|
|
|
|
|
262
|
0
|
|
|
|
|
|
AnyEvent->io( |
263
|
|
|
|
|
|
|
fh => $fd, |
264
|
|
|
|
|
|
|
poll => $mode, |
265
|
|
|
|
|
|
|
cb => _wrap_watcher_cb($coderef), |
266
|
|
|
|
|
|
|
) |
267
|
|
|
|
|
|
|
} qw(r w) ]; |
268
|
|
|
|
|
|
|
} |
269
|
|
|
|
|
|
|
} |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=head2 C<< CLASS->SetLoopTimeout( $timeout ) >> |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
Set the loop timeout for the event loop to some value in milliseconds. |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return |
276
|
|
|
|
|
|
|
immediately. |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
=cut |
279
|
|
|
|
|
|
|
sub SetLoopTimeout { |
280
|
0
|
|
|
0
|
0
|
|
return $LoopTimeout = $_[1] + 0; |
281
|
|
|
|
|
|
|
} |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
=head2 C<< CLASS->DebugMsg( $format, @args ) >> |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
Print the debugging message specified by the C-style I and |
286
|
|
|
|
|
|
|
I |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
=cut |
289
|
|
|
|
|
|
|
sub DebugMsg { |
290
|
0
|
|
|
0
|
0
|
|
my ( $class, $fmt, @args ) = @_; |
291
|
0
|
|
|
|
|
|
chomp $fmt; |
292
|
0
|
|
|
|
|
|
printf STDERR ">>> $fmt\n", @args; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
Add a timer to occur $seconds from now. $seconds may be fractional, but timers |
298
|
|
|
|
|
|
|
are not guaranteed to fire at the exact time you ask for. |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
Returns a timer object which you can call C<< $timer->cancel >> on if you need to. |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
=cut |
303
|
|
|
|
|
|
|
sub AddTimer { |
304
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
305
|
0
|
|
|
|
|
|
my ($secs, $coderef) = @_; |
306
|
|
|
|
|
|
|
|
307
|
0
|
|
|
|
|
|
my $timer = [ undef ]; |
308
|
|
|
|
|
|
|
|
309
|
0
|
|
|
|
|
|
my $key = "$timer"; # Just stringify the timer array to get our hash key |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
my $cancel = sub { |
312
|
0
|
|
|
0
|
|
|
delete $Timers{$key}; |
313
|
0
|
|
|
|
|
|
}; |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
my $cb = sub { |
316
|
0
|
|
|
0
|
|
|
$coderef->(); |
317
|
0
|
|
|
|
|
|
$cancel->(); |
318
|
0
|
|
|
|
|
|
}; |
319
|
|
|
|
|
|
|
|
320
|
0
|
|
|
|
|
|
$timer->[0] = $cancel; |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
# We save the watcher in $Timers to keep it alive until it runs, |
323
|
|
|
|
|
|
|
# or until $cancel above overwrites it with undef to cause it to |
324
|
|
|
|
|
|
|
# get collected. |
325
|
0
|
|
|
|
|
|
$Timers{$key} = AnyEvent->timer( |
326
|
|
|
|
|
|
|
after => $secs, |
327
|
|
|
|
|
|
|
cb => _wrap_watcher_cb($cb), |
328
|
|
|
|
|
|
|
); |
329
|
|
|
|
|
|
|
|
330
|
0
|
|
|
|
|
|
return bless $timer, 'Danga::Socket::Timer'; |
331
|
|
|
|
|
|
|
} |
332
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
=head2 C<< CLASS->DescriptorMap() >> |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
Get the hash of Danga::Socket objects keyed by the file descriptor (fileno) they |
336
|
|
|
|
|
|
|
are wrapping. |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
Returns a hash in list context or a hashref in scalar context. |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
=cut |
341
|
|
|
|
|
|
|
sub DescriptorMap { |
342
|
0
|
0
|
|
0
|
0
|
|
return wantarray ? %DescriptorMap : \%DescriptorMap; |
343
|
|
|
|
|
|
|
} |
344
|
|
|
|
|
|
|
*descriptor_map = *DescriptorMap; |
345
|
|
|
|
|
|
|
*get_sock_ref = *DescriptorMap; |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
=head2 C<< CLASS->EventLoop() >> |
348
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
Start processing IO events. In most daemon programs this never exits. See |
350
|
|
|
|
|
|
|
C below for how to exit the loop. |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
=cut |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
sub EventLoop { |
355
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
356
|
|
|
|
|
|
|
|
357
|
0
|
|
|
|
|
|
my $timeout_watcher; |
358
|
0
|
0
|
0
|
|
|
|
if ($LoopTimeout && $LoopTimeout != -1) { |
359
|
|
|
|
|
|
|
# Return after the given amount of milliseconds (which we must of, of course, convert to seconds) |
360
|
0
|
|
|
|
|
|
my $timeout = $LoopTimeout * 0.001; |
361
|
|
|
|
|
|
|
$timeout_watcher = AnyEvent->timer( |
362
|
0
|
|
|
0
|
|
|
cb => sub { PostEventLoop() }, |
363
|
0
|
|
|
|
|
|
after => $timeout, |
364
|
|
|
|
|
|
|
interval => $timeout, |
365
|
|
|
|
|
|
|
); |
366
|
|
|
|
|
|
|
} |
367
|
|
|
|
|
|
|
|
368
|
0
|
|
|
|
|
|
$MainLoopCondVar = AnyEvent->condvar; |
369
|
0
|
|
|
|
|
|
$MainLoopCondVar->recv(); # Blocks until $MainLoopCondVar->send is called |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# Always run PostLoopCallback before we return, even if we timed out before we completed an event. |
372
|
0
|
|
|
|
|
|
PostEventLoop(); |
373
|
|
|
|
|
|
|
|
374
|
0
|
|
|
|
|
|
$MainLoopCondVar = undef; |
375
|
|
|
|
|
|
|
} |
376
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
## profiling-related data/functions |
378
|
|
|
|
|
|
|
our ($Prof_utime0, $Prof_stime0); |
379
|
|
|
|
|
|
|
sub _pre_profile { |
380
|
0
|
|
|
0
|
|
|
($Prof_utime0, $Prof_stime0) = getrusage(); |
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
sub _post_profile { |
384
|
|
|
|
|
|
|
# get post information |
385
|
0
|
|
|
0
|
|
|
my ($autime, $astime) = getrusage(); |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
# calculate differences |
388
|
0
|
|
|
|
|
|
my $utime = $autime - $Prof_utime0; |
389
|
0
|
|
|
|
|
|
my $stime = $astime - $Prof_stime0; |
390
|
|
|
|
|
|
|
|
391
|
0
|
|
|
|
|
|
foreach my $k (@_) { |
392
|
0
|
|
0
|
|
|
|
$Profiling{$k} ||= [ 0.0, 0.0, 0 ]; |
393
|
0
|
|
|
|
|
|
$Profiling{$k}->[0] += $utime; |
394
|
0
|
|
|
|
|
|
$Profiling{$k}->[1] += $stime; |
395
|
0
|
|
|
|
|
|
$Profiling{$k}->[2]++; |
396
|
|
|
|
|
|
|
} |
397
|
|
|
|
|
|
|
} |
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
Sets post loop callback function. Pass a subref and it will be |
402
|
|
|
|
|
|
|
called every time the event loop finishes. |
403
|
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
Return 1 (or any true value) from the sub to make the loop continue, 0 or false |
405
|
|
|
|
|
|
|
and it will exit. |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
The callback function will be passed two parameters: \%DescriptorMap, \%OtherFds. |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
=cut |
410
|
|
|
|
|
|
|
sub SetPostLoopCallback { |
411
|
0
|
|
|
0
|
0
|
|
my ($class, $ref) = @_; |
412
|
|
|
|
|
|
|
|
413
|
0
|
0
|
|
|
|
|
if (ref $class) { |
414
|
|
|
|
|
|
|
# per-object callback |
415
|
0
|
|
|
|
|
|
my Danga::Socket $self = $class; |
416
|
0
|
0
|
0
|
|
|
|
if (defined $ref && ref $ref eq 'CODE') { |
417
|
0
|
|
|
|
|
|
$PLCMap{$self->{fd}} = $ref; |
418
|
|
|
|
|
|
|
} else { |
419
|
0
|
|
|
|
|
|
delete $PLCMap{$self->{fd}}; |
420
|
|
|
|
|
|
|
} |
421
|
|
|
|
|
|
|
} else { |
422
|
|
|
|
|
|
|
# global callback |
423
|
0
|
0
|
0
|
|
|
|
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
} |
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
# Internal function: run the post-event callback, send read events |
428
|
|
|
|
|
|
|
# for pushed-back data, and close pending connections. returns 1 |
429
|
|
|
|
|
|
|
# if event loop should continue, or 0 to shut it all down. |
430
|
|
|
|
|
|
|
sub PostEventLoop { |
431
|
|
|
|
|
|
|
# fire read events for objects with pushed-back read data |
432
|
0
|
|
|
0
|
0
|
|
my $loop = 1; |
433
|
0
|
|
|
|
|
|
while ($loop) { |
434
|
0
|
|
|
|
|
|
$loop = 0; |
435
|
0
|
|
|
|
|
|
foreach my $fd (keys %PushBackSet) { |
436
|
0
|
|
|
|
|
|
my Danga::Socket $pob = $PushBackSet{$fd}; |
437
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
# a previous event_read invocation could've closed a |
439
|
|
|
|
|
|
|
# connection that we already evaluated in "keys |
440
|
|
|
|
|
|
|
# %PushBackSet", so skip ones that seem to have |
441
|
|
|
|
|
|
|
# disappeared. this is expected. |
442
|
0
|
0
|
|
|
|
|
next unless $pob; |
443
|
|
|
|
|
|
|
|
444
|
0
|
0
|
|
|
|
|
die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}}; |
|
0
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
next unless (! $pob->{closed} && |
446
|
0
|
0
|
0
|
|
|
|
$pob->{event_watch} & POLLIN); |
447
|
0
|
|
|
|
|
|
$loop = 1; |
448
|
0
|
|
|
|
|
|
$pob->event_read; |
449
|
|
|
|
|
|
|
} |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
# now we can close sockets that wanted to close during our event processing. |
453
|
|
|
|
|
|
|
# (we didn't want to close them during the loop, as we didn't want fd numbers |
454
|
|
|
|
|
|
|
# being reused and confused during the event loop) |
455
|
0
|
|
|
|
|
|
while (my $sock = shift @ToClose) { |
456
|
0
|
|
|
|
|
|
my $fd = fileno($sock); |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
# close the socket. (not a Danga::Socket close) |
459
|
0
|
|
|
|
|
|
$sock->close; |
460
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
# and now we can finally remove the fd from the map. see |
462
|
|
|
|
|
|
|
# comment above in _cleanup. |
463
|
0
|
|
|
|
|
|
delete $DescriptorMap{$fd}; |
464
|
|
|
|
|
|
|
} |
465
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
# by default we keep running, unless a postloop callback (either per-object |
468
|
|
|
|
|
|
|
# or global) cancels it |
469
|
0
|
|
|
|
|
|
my $keep_running = 1; |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
# per-object post-loop-callbacks |
472
|
0
|
|
|
|
|
|
for my $plc (values %PLCMap) { |
473
|
0
|
|
0
|
|
|
|
$keep_running &&= $plc->(\%DescriptorMap, \%OtherFds); |
474
|
|
|
|
|
|
|
} |
475
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
# now we're at the very end, call callback if defined |
477
|
0
|
0
|
|
|
|
|
if (defined $PostLoopCallback) { |
478
|
0
|
|
0
|
|
|
|
$keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds); |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
|
481
|
0
|
|
|
|
|
|
return $keep_running; |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
# Internal method to decorate a watcher callback with extra code to install |
485
|
|
|
|
|
|
|
# the IdleWatcher necessary to run PostEventLoop. |
486
|
|
|
|
|
|
|
sub _wrap_watcher_cb { |
487
|
0
|
|
|
0
|
|
|
my ($cb) = @_; |
488
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
return sub { |
490
|
0
|
|
|
0
|
|
|
my $ret = $cb->(@_); |
491
|
|
|
|
|
|
|
$IdleWatcher = AnyEvent->idle( |
492
|
|
|
|
|
|
|
cb => sub { |
493
|
0
|
|
|
|
|
|
my $keep_running = PostEventLoop(); |
494
|
0
|
|
|
|
|
|
$IdleWatcher = undef; # Free this watcher |
495
|
0
|
0
|
|
|
|
|
$MainLoopCondVar->send unless $keep_running; |
496
|
|
|
|
|
|
|
}, |
497
|
0
|
|
|
|
|
|
); |
498
|
0
|
|
|
|
|
|
return $ret; |
499
|
0
|
|
|
|
|
|
}; |
500
|
|
|
|
|
|
|
} |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
##################################################################### |
503
|
|
|
|
|
|
|
### Danga::Socket-the-object code |
504
|
|
|
|
|
|
|
##################################################################### |
505
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
=head2 OBJECT METHODS |
507
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
=head2 C<< CLASS->new( $socket ) >> |
509
|
|
|
|
|
|
|
|
510
|
|
|
|
|
|
|
Create a new Danga::Socket subclass object for the given I which will |
511
|
|
|
|
|
|
|
react to events on it during the C. |
512
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
This is normally (always?) called from your subclass via: |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
$class->SUPER::new($socket); |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
=cut |
518
|
|
|
|
|
|
|
sub new { |
519
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
520
|
0
|
0
|
|
|
|
|
$self = fields::new($self) unless ref $self; |
521
|
|
|
|
|
|
|
|
522
|
0
|
|
|
|
|
|
my $sock = shift; |
523
|
|
|
|
|
|
|
|
524
|
0
|
|
|
|
|
|
$self->{sock} = $sock; |
525
|
0
|
|
|
|
|
|
my $fd = fileno($sock); |
526
|
|
|
|
|
|
|
|
527
|
0
|
0
|
0
|
|
|
|
Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
528
|
|
|
|
|
|
|
unless $sock && $fd; |
529
|
|
|
|
|
|
|
|
530
|
0
|
|
|
|
|
|
$self->{fd} = $fd; |
531
|
0
|
|
|
|
|
|
$self->{write_buf} = []; |
532
|
0
|
|
|
|
|
|
$self->{write_buf_offset} = 0; |
533
|
0
|
|
|
|
|
|
$self->{write_buf_size} = 0; |
534
|
0
|
|
|
|
|
|
$self->{closed} = 0; |
535
|
0
|
|
|
|
|
|
$self->{corked} = 0; |
536
|
0
|
|
|
|
|
|
$self->{read_push_back} = []; |
537
|
|
|
|
|
|
|
|
538
|
0
|
|
|
|
|
|
$self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
# Create the slots where the watchers will go if the caller |
541
|
|
|
|
|
|
|
# decides to watch_read or watch_write. |
542
|
0
|
|
|
|
|
|
$FdWatchers{$fd} = [ undef, undef ]; |
543
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
Carp::cluck("Danga::Socket::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") |
545
|
0
|
0
|
|
|
|
|
if $DescriptorMap{$fd}; |
546
|
|
|
|
|
|
|
|
547
|
0
|
|
|
|
|
|
$DescriptorMap{$fd} = $self; |
548
|
0
|
|
|
|
|
|
return $self; |
549
|
|
|
|
|
|
|
} |
550
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
|
552
|
|
|
|
|
|
|
##################################################################### |
553
|
|
|
|
|
|
|
### I N S T A N C E M E T H O D S |
554
|
|
|
|
|
|
|
##################################################################### |
555
|
|
|
|
|
|
|
|
556
|
|
|
|
|
|
|
=head2 C<< $obj->tcp_cork( $boolean ) >> |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
Turn TCP_CORK on or off depending on the value of I. |
559
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
=cut |
561
|
|
|
|
|
|
|
sub tcp_cork { |
562
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = $_[0]; |
563
|
0
|
|
|
|
|
|
my $val = $_[1]; |
564
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
# make sure we have a socket |
566
|
0
|
0
|
|
|
|
|
return unless $self->{sock}; |
567
|
0
|
0
|
|
|
|
|
return if $val == $self->{corked}; |
568
|
|
|
|
|
|
|
|
569
|
0
|
|
|
|
|
|
my $rv; |
570
|
0
|
|
|
|
|
|
if (TCP_CORK) { |
571
|
0
|
0
|
|
|
|
|
$rv = setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK, |
572
|
|
|
|
|
|
|
pack("l", $val ? 1 : 0)); |
573
|
|
|
|
|
|
|
} else { |
574
|
|
|
|
|
|
|
# FIXME: implement freebsd *PUSH sockopts |
575
|
|
|
|
|
|
|
$rv = 1; |
576
|
|
|
|
|
|
|
} |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
# if we failed, close (if we're not already) and warn about the error |
579
|
0
|
0
|
|
|
|
|
if ($rv) { |
580
|
0
|
|
|
|
|
|
$self->{corked} = $val; |
581
|
|
|
|
|
|
|
} else { |
582
|
0
|
0
|
0
|
|
|
|
if ($! == EBADF || $! == ENOTSOCK) { |
|
|
0
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
583
|
|
|
|
|
|
|
# internal state is probably corrupted; warn and then close if |
584
|
|
|
|
|
|
|
# we're not closed already |
585
|
0
|
|
|
|
|
|
warn "setsockopt: $!"; |
586
|
0
|
|
|
|
|
|
$self->close('tcp_cork_failed'); |
587
|
|
|
|
|
|
|
} elsif ($! == ENOPROTOOPT || $!{ENOTSOCK} || $!{EOPNOTSUPP}) { |
588
|
|
|
|
|
|
|
# TCP implementation doesn't support corking, so just ignore it |
589
|
|
|
|
|
|
|
# or we're trying to tcp-cork a non-socket (like a socketpair pipe |
590
|
|
|
|
|
|
|
# which is acting like a socket, which Perlbal does for child |
591
|
|
|
|
|
|
|
# processes acting like inetd-like web servers) |
592
|
|
|
|
|
|
|
} else { |
593
|
|
|
|
|
|
|
# some other error; we should never hit here, but if we do, die |
594
|
0
|
|
|
|
|
|
die "setsockopt: $!"; |
595
|
|
|
|
|
|
|
} |
596
|
|
|
|
|
|
|
} |
597
|
|
|
|
|
|
|
} |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=head2 C<< $obj->steal_socket() >> |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
Basically returns our socket and makes it so that we don't try to close it, |
602
|
|
|
|
|
|
|
but we do remove it from epoll handlers. THIS CLOSES $self. It is the same |
603
|
|
|
|
|
|
|
thing as calling close, except it gives you the socket to use. |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
=cut |
606
|
|
|
|
|
|
|
sub steal_socket { |
607
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = $_[0]; |
608
|
0
|
0
|
|
|
|
|
return if $self->{closed}; |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
# cleanup does most of the work of closing this socket |
611
|
0
|
|
|
|
|
|
$self->_cleanup(); |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
# now undef our internal sock and fd structures so we don't use them |
614
|
0
|
|
|
|
|
|
my $sock = $self->{sock}; |
615
|
0
|
|
|
|
|
|
$self->{sock} = undef; |
616
|
0
|
|
|
|
|
|
return $sock; |
617
|
|
|
|
|
|
|
} |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
=head2 C<< $obj->close( [$reason] ) >> |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
Close the socket. The I argument will be used in debugging messages. |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
=cut |
624
|
|
|
|
|
|
|
sub close { |
625
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = $_[0]; |
626
|
0
|
0
|
|
|
|
|
return if $self->{closed}; |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
# print out debugging info for this close |
629
|
0
|
|
|
|
|
|
if (DebugLevel) { |
630
|
|
|
|
|
|
|
my ($pkg, $filename, $line) = caller; |
631
|
|
|
|
|
|
|
my $reason = $_[1] || ""; |
632
|
|
|
|
|
|
|
warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n"; |
633
|
|
|
|
|
|
|
} |
634
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
# this does most of the work of closing us |
636
|
0
|
|
|
|
|
|
$self->_cleanup(); |
637
|
|
|
|
|
|
|
|
638
|
|
|
|
|
|
|
# defer closing the actual socket until the event loop is done |
639
|
|
|
|
|
|
|
# processing this round of events. (otherwise we might reuse fds) |
640
|
0
|
0
|
|
|
|
|
if ($self->{sock}) { |
641
|
0
|
|
|
|
|
|
push @ToClose, $self->{sock}; |
642
|
0
|
|
|
|
|
|
$self->{sock} = undef; |
643
|
|
|
|
|
|
|
} |
644
|
|
|
|
|
|
|
|
645
|
0
|
|
|
|
|
|
return 0; |
646
|
|
|
|
|
|
|
} |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
### METHOD: _cleanup() |
649
|
|
|
|
|
|
|
### Called by our closers so we can clean internal data structures. |
650
|
|
|
|
|
|
|
sub _cleanup { |
651
|
0
|
|
|
0
|
|
|
my Danga::Socket $self = $_[0]; |
652
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
# we're effectively closed; we have no fd and sock when we leave here |
654
|
0
|
|
|
|
|
|
$self->{closed} = 1; |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
# we need to flush our write buffer, as there may |
657
|
|
|
|
|
|
|
# be self-referential closures (sub { $client->close }) |
658
|
|
|
|
|
|
|
# preventing the object from being destroyed |
659
|
0
|
|
|
|
|
|
$self->{write_buf} = []; |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
# uncork so any final data gets sent. only matters if the person closing |
662
|
|
|
|
|
|
|
# us forgot to do it, but we do it to be safe. |
663
|
0
|
|
|
|
|
|
$self->tcp_cork(0); |
664
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
# now delete from mappings. this fd no longer belongs to us, so we don't want |
666
|
|
|
|
|
|
|
# to get alerts for it if it becomes writable/readable/etc. |
667
|
0
|
|
|
|
|
|
delete $PushBackSet{$self->{fd}}; |
668
|
0
|
|
|
|
|
|
delete $PLCMap{$self->{fd}}; |
669
|
0
|
|
|
|
|
|
delete $FdWatchers{$self->{fd}}; |
670
|
|
|
|
|
|
|
|
671
|
|
|
|
|
|
|
# we explicitly don't delete from DescriptorMap here until we |
672
|
|
|
|
|
|
|
# actually close the socket, as we might be in the middle of |
673
|
|
|
|
|
|
|
# processing an epoll_wait/etc that returned hundreds of fds, one |
674
|
|
|
|
|
|
|
# of which is not yet processed and is what we're closing. if we |
675
|
|
|
|
|
|
|
# keep it in DescriptorMap, then the event harnesses can just |
676
|
|
|
|
|
|
|
# looked at $pob->{closed} and ignore it. but if it's an |
677
|
|
|
|
|
|
|
# un-accounted for fd, then it (understandably) freak out a bit |
678
|
|
|
|
|
|
|
# and emit warnings, thinking their state got off. |
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
# and finally get rid of our fd so we can't use it anywhere else |
681
|
0
|
|
|
|
|
|
$self->{fd} = undef; |
682
|
|
|
|
|
|
|
} |
683
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
=head2 C<< $obj->sock() >> |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
Returns the underlying IO::Handle for the object. |
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
=cut |
689
|
|
|
|
|
|
|
sub sock { |
690
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
691
|
0
|
|
|
|
|
|
return $self->{sock}; |
692
|
|
|
|
|
|
|
} |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
=head2 C<< $obj->set_writer_func( CODEREF ) >> |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
Sets a function to use instead of C when writing data to the socket. |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
=cut |
699
|
|
|
|
|
|
|
sub set_writer_func { |
700
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
701
|
0
|
|
|
|
|
|
my $wtr = shift; |
702
|
0
|
0
|
0
|
|
|
|
Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE"); |
703
|
0
|
|
|
|
|
|
$self->{writer_func} = $wtr; |
704
|
|
|
|
|
|
|
} |
705
|
|
|
|
|
|
|
|
706
|
|
|
|
|
|
|
=head2 C<< $obj->write( $data ) >> |
707
|
|
|
|
|
|
|
|
708
|
|
|
|
|
|
|
Write the specified data to the underlying handle. I may be scalar, |
709
|
|
|
|
|
|
|
scalar ref, code ref (to run when there), or undef just to kick-start. |
710
|
|
|
|
|
|
|
Returns 1 if writes all went through, or 0 if there are writes in queue. If |
711
|
|
|
|
|
|
|
it returns 1, caller should stop waiting for 'writable' events) |
712
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
=cut |
714
|
|
|
|
|
|
|
sub write { |
715
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self; |
716
|
|
|
|
|
|
|
my $data; |
717
|
0
|
|
|
|
|
|
($self, $data) = @_; |
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
# nobody should be writing to closed sockets, but caller code can |
720
|
|
|
|
|
|
|
# do two writes within an event, have the first fail and |
721
|
|
|
|
|
|
|
# disconnect the other side (whose destructor then closes the |
722
|
|
|
|
|
|
|
# calling object, but it's still in a method), and then the |
723
|
|
|
|
|
|
|
# now-dead object does its second write. that is this case. we |
724
|
|
|
|
|
|
|
# just lie and say it worked. it'll be dead soon and won't be |
725
|
|
|
|
|
|
|
# hurt by this lie. |
726
|
0
|
0
|
|
|
|
|
return 1 if $self->{closed}; |
727
|
|
|
|
|
|
|
|
728
|
0
|
|
|
|
|
|
my $bref; |
729
|
|
|
|
|
|
|
|
730
|
|
|
|
|
|
|
# just queue data if there's already a wait |
731
|
|
|
|
|
|
|
my $need_queue; |
732
|
|
|
|
|
|
|
|
733
|
0
|
0
|
|
|
|
|
if (defined $data) { |
734
|
0
|
0
|
|
|
|
|
$bref = ref $data ? $data : \$data; |
735
|
0
|
0
|
|
|
|
|
if ($self->{write_buf_size}) { |
736
|
0
|
|
|
|
|
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
|
|
737
|
0
|
0
|
|
|
|
|
$self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; |
738
|
0
|
|
|
|
|
|
return 0; |
739
|
|
|
|
|
|
|
} |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
# this flag says we're bypassing the queue system, knowing we're the |
742
|
|
|
|
|
|
|
# only outstanding write, and hoping we don't ever need to use it. |
743
|
|
|
|
|
|
|
# if so later, though, we'll need to queue |
744
|
0
|
|
|
|
|
|
$need_queue = 1; |
745
|
|
|
|
|
|
|
} |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
WRITE: |
748
|
0
|
|
|
|
|
|
while (1) { |
749
|
0
|
0
|
0
|
|
|
|
return 1 unless $bref ||= $self->{write_buf}[0]; |
750
|
|
|
|
|
|
|
|
751
|
0
|
|
|
|
|
|
my $len; |
752
|
0
|
|
|
|
|
|
eval { |
753
|
0
|
|
|
|
|
|
$len = length($$bref); # this will die if $bref is a code ref, caught below |
754
|
|
|
|
|
|
|
}; |
755
|
0
|
0
|
|
|
|
|
if ($@) { |
756
|
0
|
0
|
|
|
|
|
if (UNIVERSAL::isa($bref, "CODE")) { |
757
|
0
|
0
|
|
|
|
|
unless ($need_queue) { |
758
|
0
|
|
|
|
|
|
$self->{write_buf_size}--; # code refs are worth 1 |
759
|
0
|
|
|
|
|
|
shift @{$self->{write_buf}}; |
|
0
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
} |
761
|
0
|
|
|
|
|
|
$bref->(); |
762
|
|
|
|
|
|
|
|
763
|
|
|
|
|
|
|
# code refs are just run and never get reenqueued |
764
|
|
|
|
|
|
|
# (they're one-shot), so turn off the flag indicating the |
765
|
|
|
|
|
|
|
# outstanding data needs queueing. |
766
|
0
|
|
|
|
|
|
$need_queue = 0; |
767
|
|
|
|
|
|
|
|
768
|
0
|
|
|
|
|
|
undef $bref; |
769
|
0
|
|
|
|
|
|
next WRITE; |
770
|
|
|
|
|
|
|
} |
771
|
0
|
|
|
|
|
|
die "Write error: $@ <$bref>"; |
772
|
|
|
|
|
|
|
} |
773
|
|
|
|
|
|
|
|
774
|
0
|
|
|
|
|
|
my $to_write = $len - $self->{write_buf_offset}; |
775
|
0
|
|
|
|
|
|
my $written; |
776
|
0
|
0
|
|
|
|
|
if (my $wtr = $self->{writer_func}) { |
777
|
0
|
|
|
|
|
|
$written = $wtr->($bref, $to_write, $self->{write_buf_offset}); |
778
|
|
|
|
|
|
|
} else { |
779
|
0
|
|
|
|
|
|
$written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); |
780
|
|
|
|
|
|
|
} |
781
|
|
|
|
|
|
|
|
782
|
0
|
0
|
|
|
|
|
if (! defined $written) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
783
|
0
|
0
|
|
|
|
|
if ($! == EPIPE) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
784
|
0
|
|
|
|
|
|
return $self->close("EPIPE"); |
785
|
|
|
|
|
|
|
} elsif ($! == EAGAIN) { |
786
|
|
|
|
|
|
|
# since connection has stuff to write, it should now be |
787
|
|
|
|
|
|
|
# interested in pending writes: |
788
|
0
|
0
|
|
|
|
|
if ($need_queue) { |
789
|
0
|
|
|
|
|
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
|
|
790
|
0
|
|
|
|
|
|
$self->{write_buf_size} += $len; |
791
|
|
|
|
|
|
|
} |
792
|
0
|
0
|
|
|
|
|
$self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; |
793
|
0
|
|
|
|
|
|
$self->watch_write(1); |
794
|
0
|
|
|
|
|
|
return 0; |
795
|
|
|
|
|
|
|
} elsif ($! == ECONNRESET) { |
796
|
0
|
|
|
|
|
|
return $self->close("ECONNRESET"); |
797
|
|
|
|
|
|
|
} |
798
|
|
|
|
|
|
|
|
799
|
0
|
|
|
|
|
|
DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); |
800
|
|
|
|
|
|
|
|
801
|
0
|
|
|
|
|
|
return $self->close("write_error"); |
802
|
|
|
|
|
|
|
} elsif ($written != $to_write) { |
803
|
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", |
804
|
0
|
|
|
|
|
|
$written, $self->{fd}); |
805
|
0
|
0
|
|
|
|
|
if ($need_queue) { |
806
|
0
|
|
|
|
|
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
|
|
807
|
0
|
|
|
|
|
|
$self->{write_buf_size} += $len; |
808
|
|
|
|
|
|
|
} |
809
|
|
|
|
|
|
|
# since connection has stuff to write, it should now be |
810
|
|
|
|
|
|
|
# interested in pending writes: |
811
|
0
|
|
|
|
|
|
$self->{write_buf_offset} += $written; |
812
|
0
|
|
|
|
|
|
$self->{write_buf_size} -= $written; |
813
|
0
|
|
|
|
|
|
$self->on_incomplete_write; |
814
|
0
|
|
|
|
|
|
return 0; |
815
|
|
|
|
|
|
|
} elsif ($written == $to_write) { |
816
|
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", |
817
|
0
|
|
|
|
|
|
$written, $self->{fd}, $need_queue); |
818
|
0
|
|
|
|
|
|
$self->{write_buf_offset} = 0; |
819
|
|
|
|
|
|
|
|
820
|
0
|
0
|
|
|
|
|
if ($self->{write_set_watch}) { |
821
|
0
|
|
|
|
|
|
$self->watch_write(0); |
822
|
0
|
|
|
|
|
|
$self->{write_set_watch} = 0; |
823
|
|
|
|
|
|
|
} |
824
|
|
|
|
|
|
|
|
825
|
|
|
|
|
|
|
# this was our only write, so we can return immediately |
826
|
|
|
|
|
|
|
# since we avoided incrementing the buffer size or |
827
|
|
|
|
|
|
|
# putting it in the buffer. we also know there |
828
|
|
|
|
|
|
|
# can't be anything else to write. |
829
|
0
|
0
|
|
|
|
|
return 1 if $need_queue; |
830
|
|
|
|
|
|
|
|
831
|
0
|
|
|
|
|
|
$self->{write_buf_size} -= $written; |
832
|
0
|
|
|
|
|
|
shift @{$self->{write_buf}}; |
|
0
|
|
|
|
|
|
|
833
|
0
|
|
|
|
|
|
undef $bref; |
834
|
0
|
|
|
|
|
|
next WRITE; |
835
|
|
|
|
|
|
|
} |
836
|
|
|
|
|
|
|
} |
837
|
|
|
|
|
|
|
} |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
sub on_incomplete_write { |
840
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
841
|
0
|
0
|
|
|
|
|
$self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; |
842
|
0
|
|
|
|
|
|
$self->watch_write(1); |
843
|
|
|
|
|
|
|
} |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
=head2 C<< $obj->push_back_read( $buf ) >> |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
Push back I (a scalar or scalarref) into the read stream. Useful if you read |
848
|
|
|
|
|
|
|
more than you need to and want to return this data on the next "read". |
849
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
=cut |
851
|
|
|
|
|
|
|
sub push_back_read { |
852
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
853
|
0
|
|
|
|
|
|
my $buf = shift; |
854
|
0
|
0
|
|
|
|
|
push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf; |
|
0
|
|
|
|
|
|
|
855
|
0
|
|
|
|
|
|
$PushBackSet{$self->{fd}} = $self; |
856
|
|
|
|
|
|
|
} |
857
|
|
|
|
|
|
|
|
858
|
|
|
|
|
|
|
=head2 C<< $obj->read( $bytecount ) >> |
859
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
Read at most I bytes from the underlying handle; returns scalar |
861
|
|
|
|
|
|
|
ref on read, or undef on connection closed. |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
=cut |
864
|
|
|
|
|
|
|
sub read { |
865
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
866
|
0
|
0
|
|
|
|
|
return if $self->{closed}; |
867
|
0
|
|
|
|
|
|
my $bytes = shift; |
868
|
0
|
|
|
|
|
|
my $buf; |
869
|
0
|
|
|
|
|
|
my $sock = $self->{sock}; |
870
|
|
|
|
|
|
|
|
871
|
0
|
0
|
|
|
|
|
if (@{$self->{read_push_back}}) { |
|
0
|
|
|
|
|
|
|
872
|
0
|
|
|
|
|
|
$buf = shift @{$self->{read_push_back}}; |
|
0
|
|
|
|
|
|
|
873
|
0
|
|
|
|
|
|
my $len = length($$buf); |
874
|
|
|
|
|
|
|
|
875
|
0
|
0
|
|
|
|
|
if ($len <= $bytes) { |
876
|
0
|
0
|
|
|
|
|
delete $PushBackSet{$self->{fd}} unless @{$self->{read_push_back}}; |
|
0
|
|
|
|
|
|
|
877
|
0
|
|
|
|
|
|
return $buf; |
878
|
|
|
|
|
|
|
} else { |
879
|
|
|
|
|
|
|
# if the pushed back read is too big, we have to split it |
880
|
0
|
|
|
|
|
|
my $overflow = substr($$buf, $bytes); |
881
|
0
|
|
|
|
|
|
$buf = substr($$buf, 0, $bytes); |
882
|
0
|
|
|
|
|
|
unshift @{$self->{read_push_back}}, \$overflow; |
|
0
|
|
|
|
|
|
|
883
|
0
|
|
|
|
|
|
return \$buf; |
884
|
|
|
|
|
|
|
} |
885
|
|
|
|
|
|
|
} |
886
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
# if this is too high, perl quits(!!). reports on mailing lists |
888
|
|
|
|
|
|
|
# don't seem to point to a universal answer. 5MB worked for some, |
889
|
|
|
|
|
|
|
# crashed for others. 1MB works for more people. let's go with 1MB |
890
|
|
|
|
|
|
|
# for now. :/ |
891
|
0
|
0
|
|
|
|
|
my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes; |
892
|
|
|
|
|
|
|
|
893
|
0
|
|
|
|
|
|
my $res = sysread($sock, $buf, $req_bytes, 0); |
894
|
0
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); |
895
|
|
|
|
|
|
|
|
896
|
0
|
0
|
0
|
|
|
|
if (! $res && $! != EWOULDBLOCK) { |
897
|
|
|
|
|
|
|
# catches 0=conn closed or undef=error |
898
|
0
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); |
899
|
0
|
|
|
|
|
|
return undef; |
900
|
|
|
|
|
|
|
} |
901
|
|
|
|
|
|
|
|
902
|
0
|
|
|
|
|
|
return \$buf; |
903
|
|
|
|
|
|
|
} |
904
|
|
|
|
|
|
|
|
905
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_read() >> |
906
|
|
|
|
|
|
|
|
907
|
|
|
|
|
|
|
Readable event handler. Concrete deriviatives of Danga::Socket should |
908
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
909
|
|
|
|
|
|
|
called. |
910
|
|
|
|
|
|
|
|
911
|
|
|
|
|
|
|
=cut |
912
|
0
|
|
|
0
|
0
|
|
sub event_read { die "Base class event_read called for $_[0]\n"; } |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_err() >> |
915
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
Error event handler. Concrete deriviatives of Danga::Socket should |
917
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
918
|
|
|
|
|
|
|
called. |
919
|
|
|
|
|
|
|
|
920
|
|
|
|
|
|
|
=cut |
921
|
0
|
|
|
0
|
0
|
|
sub event_err { die "Base class event_err called for $_[0]\n"; } |
922
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_hup() >> |
924
|
|
|
|
|
|
|
|
925
|
|
|
|
|
|
|
'Hangup' event handler. Concrete deriviatives of Danga::Socket should |
926
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
927
|
|
|
|
|
|
|
called. |
928
|
|
|
|
|
|
|
|
929
|
|
|
|
|
|
|
=cut |
930
|
0
|
|
|
0
|
0
|
|
sub event_hup { die "Base class event_hup called for $_[0]\n"; } |
931
|
|
|
|
|
|
|
|
932
|
|
|
|
|
|
|
=head2 C<< $obj->event_write() >> |
933
|
|
|
|
|
|
|
|
934
|
|
|
|
|
|
|
Writable event handler. Concrete deriviatives of Danga::Socket may wish to |
935
|
|
|
|
|
|
|
provide an implementation of this. The default implementation calls |
936
|
|
|
|
|
|
|
C with an C. |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
=cut |
939
|
|
|
|
|
|
|
sub event_write { |
940
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
941
|
0
|
|
|
|
|
|
$self->write(undef); |
942
|
|
|
|
|
|
|
} |
943
|
|
|
|
|
|
|
|
944
|
|
|
|
|
|
|
=head2 C<< $obj->watch_read( $boolean ) >> |
945
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
Turn 'readable' event notification on or off. |
947
|
|
|
|
|
|
|
|
948
|
|
|
|
|
|
|
=cut |
949
|
|
|
|
|
|
|
sub watch_read { |
950
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
951
|
0
|
0
|
0
|
|
|
|
return if $self->{closed} || !$self->{sock}; |
952
|
|
|
|
|
|
|
|
953
|
0
|
|
|
|
|
|
my $val = shift; |
954
|
0
|
|
|
|
|
|
my $fd = fileno($self->{sock}); |
955
|
0
|
|
|
|
|
|
my $watchers = $FdWatchers{$fd}; |
956
|
0
|
0
|
|
|
|
|
if ($val) { |
957
|
|
|
|
|
|
|
$watchers->[0] = AnyEvent->io( |
958
|
|
|
|
|
|
|
fh => $fd, |
959
|
|
|
|
|
|
|
poll => 'r', |
960
|
|
|
|
|
|
|
cb => _wrap_watcher_cb(sub { |
961
|
0
|
0
|
|
0
|
|
|
$self->event_read() unless $self->{closed}; |
962
|
0
|
|
|
|
|
|
}), |
963
|
|
|
|
|
|
|
); |
964
|
|
|
|
|
|
|
} |
965
|
|
|
|
|
|
|
else { |
966
|
0
|
|
|
|
|
|
$watchers->[0] = undef; |
967
|
|
|
|
|
|
|
} |
968
|
|
|
|
|
|
|
} |
969
|
|
|
|
|
|
|
|
970
|
|
|
|
|
|
|
=head2 C<< $obj->watch_write( $boolean ) >> |
971
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
Turn 'writable' event notification on or off. |
973
|
|
|
|
|
|
|
|
974
|
|
|
|
|
|
|
=cut |
975
|
|
|
|
|
|
|
sub watch_write { |
976
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
977
|
0
|
0
|
0
|
|
|
|
return if $self->{closed} || !$self->{sock}; |
978
|
|
|
|
|
|
|
|
979
|
0
|
|
|
|
|
|
my $val = shift; |
980
|
0
|
|
|
|
|
|
my $fd = fileno($self->{sock}); |
981
|
|
|
|
|
|
|
|
982
|
0
|
0
|
0
|
|
|
|
if ($val && caller ne __PACKAGE__) { |
983
|
|
|
|
|
|
|
# A subclass registered interest, it's now responsible for this. |
984
|
0
|
|
|
|
|
|
$self->{write_set_watch} = 0; |
985
|
|
|
|
|
|
|
} |
986
|
|
|
|
|
|
|
|
987
|
0
|
|
|
|
|
|
my $watchers = $FdWatchers{$fd}; |
988
|
0
|
0
|
|
|
|
|
if ($val) { |
989
|
|
|
|
|
|
|
$watchers->[1] = AnyEvent->io( |
990
|
|
|
|
|
|
|
fh => $fd, |
991
|
|
|
|
|
|
|
poll => 'w', |
992
|
|
|
|
|
|
|
cb => _wrap_watcher_cb(sub { |
993
|
0
|
0
|
|
0
|
|
|
$self->event_write() unless $self->{closed}; |
994
|
0
|
|
|
|
|
|
}), |
995
|
|
|
|
|
|
|
); |
996
|
|
|
|
|
|
|
} |
997
|
|
|
|
|
|
|
else { |
998
|
0
|
|
|
|
|
|
$watchers->[1] = undef; |
999
|
|
|
|
|
|
|
} |
1000
|
|
|
|
|
|
|
} |
1001
|
|
|
|
|
|
|
|
1002
|
|
|
|
|
|
|
=head2 C<< $obj->dump_error( $message ) >> |
1003
|
|
|
|
|
|
|
|
1004
|
|
|
|
|
|
|
Prints to STDERR a backtrace with information about this socket and what lead |
1005
|
|
|
|
|
|
|
up to the dump_error call. |
1006
|
|
|
|
|
|
|
|
1007
|
|
|
|
|
|
|
=cut |
1008
|
|
|
|
|
|
|
sub dump_error { |
1009
|
0
|
|
|
0
|
0
|
|
my $i = 0; |
1010
|
0
|
|
|
|
|
|
my @list; |
1011
|
0
|
|
|
|
|
|
while (my ($file, $line, $sub) = (caller($i++))[1..3]) { |
1012
|
0
|
|
|
|
|
|
push @list, "\t$file:$line called $sub\n"; |
1013
|
|
|
|
|
|
|
} |
1014
|
|
|
|
|
|
|
|
1015
|
0
|
|
|
|
|
|
warn "ERROR: $_[1]\n" . |
1016
|
|
|
|
|
|
|
"\t$_[0] = " . $_[0]->as_string . "\n" . |
1017
|
|
|
|
|
|
|
join('', @list); |
1018
|
|
|
|
|
|
|
} |
1019
|
|
|
|
|
|
|
|
1020
|
|
|
|
|
|
|
=head2 C<< $obj->debugmsg( $format, @args ) >> |
1021
|
|
|
|
|
|
|
|
1022
|
|
|
|
|
|
|
Print the debugging message specified by the C-style I and |
1023
|
|
|
|
|
|
|
I. |
1024
|
|
|
|
|
|
|
|
1025
|
|
|
|
|
|
|
=cut |
1026
|
|
|
|
|
|
|
sub debugmsg { |
1027
|
0
|
|
|
0
|
0
|
|
my ( $self, $fmt, @args ) = @_; |
1028
|
0
|
0
|
|
|
|
|
confess "Not an object" unless ref $self; |
1029
|
|
|
|
|
|
|
|
1030
|
0
|
|
|
|
|
|
chomp $fmt; |
1031
|
0
|
|
|
|
|
|
printf STDERR ">>> $fmt\n", @args; |
1032
|
|
|
|
|
|
|
} |
1033
|
|
|
|
|
|
|
|
1034
|
|
|
|
|
|
|
|
1035
|
|
|
|
|
|
|
=head2 C<< $obj->peer_ip_string() >> |
1036
|
|
|
|
|
|
|
|
1037
|
|
|
|
|
|
|
Returns the string describing the peer's IP |
1038
|
|
|
|
|
|
|
|
1039
|
|
|
|
|
|
|
=cut |
1040
|
|
|
|
|
|
|
sub peer_ip_string { |
1041
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
1042
|
0
|
0
|
|
|
|
|
return _undef("peer_ip_string undef: no sock") unless $self->{sock}; |
1043
|
0
|
0
|
|
|
|
|
return $self->{peer_ip} if defined $self->{peer_ip}; |
1044
|
|
|
|
|
|
|
|
1045
|
0
|
|
|
|
|
|
my $pn = getpeername($self->{sock}); |
1046
|
0
|
0
|
|
|
|
|
return _undef("peer_ip_string undef: getpeername") unless $pn; |
1047
|
|
|
|
|
|
|
|
1048
|
0
|
|
|
|
|
|
my ($port, $iaddr) = eval { |
1049
|
0
|
0
|
|
|
|
|
if (length($pn) >= 28) { |
1050
|
0
|
|
|
|
|
|
return Socket6::unpack_sockaddr_in6($pn); |
1051
|
|
|
|
|
|
|
} else { |
1052
|
0
|
|
|
|
|
|
return Socket::sockaddr_in($pn); |
1053
|
|
|
|
|
|
|
} |
1054
|
|
|
|
|
|
|
}; |
1055
|
|
|
|
|
|
|
|
1056
|
0
|
0
|
|
|
|
|
if ($@) { |
1057
|
0
|
|
|
|
|
|
$self->{peer_port} = "[Unknown peerport '$@']"; |
1058
|
0
|
|
|
|
|
|
return "[Unknown peername '$@']"; |
1059
|
|
|
|
|
|
|
} |
1060
|
|
|
|
|
|
|
|
1061
|
0
|
|
|
|
|
|
$self->{peer_port} = $port; |
1062
|
|
|
|
|
|
|
|
1063
|
0
|
0
|
|
|
|
|
if (length($iaddr) == 4) { |
1064
|
0
|
|
|
|
|
|
return $self->{peer_ip} = Socket::inet_ntoa($iaddr); |
1065
|
|
|
|
|
|
|
} else { |
1066
|
0
|
|
|
|
|
|
$self->{peer_v6} = 1; |
1067
|
0
|
|
|
|
|
|
return $self->{peer_ip} = Socket6::inet_ntop(Socket6::AF_INET6(), |
1068
|
|
|
|
|
|
|
$iaddr); |
1069
|
|
|
|
|
|
|
} |
1070
|
|
|
|
|
|
|
} |
1071
|
|
|
|
|
|
|
|
1072
|
|
|
|
|
|
|
=head2 C<< $obj->peer_addr_string() >> |
1073
|
|
|
|
|
|
|
|
1074
|
|
|
|
|
|
|
Returns the string describing the peer for the socket which underlies this |
1075
|
|
|
|
|
|
|
object in form "ip:port" |
1076
|
|
|
|
|
|
|
|
1077
|
|
|
|
|
|
|
=cut |
1078
|
|
|
|
|
|
|
sub peer_addr_string { |
1079
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
1080
|
0
|
0
|
|
|
|
|
my $ip = $self->peer_ip_string |
1081
|
|
|
|
|
|
|
or return undef; |
1082
|
|
|
|
|
|
|
return $self->{peer_v6} ? |
1083
|
0
|
0
|
|
|
|
|
"[$ip]:$self->{peer_port}" : |
1084
|
|
|
|
|
|
|
"$ip:$self->{peer_port}"; |
1085
|
|
|
|
|
|
|
} |
1086
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
=head2 C<< $obj->local_ip_string() >> |
1088
|
|
|
|
|
|
|
|
1089
|
|
|
|
|
|
|
Returns the string describing the local IP |
1090
|
|
|
|
|
|
|
|
1091
|
|
|
|
|
|
|
=cut |
1092
|
|
|
|
|
|
|
sub local_ip_string { |
1093
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
1094
|
0
|
0
|
|
|
|
|
return _undef("local_ip_string undef: no sock") unless $self->{sock}; |
1095
|
0
|
0
|
|
|
|
|
return $self->{local_ip} if defined $self->{local_ip}; |
1096
|
|
|
|
|
|
|
|
1097
|
0
|
|
|
|
|
|
my $pn = getsockname($self->{sock}); |
1098
|
0
|
0
|
|
|
|
|
return _undef("local_ip_string undef: getsockname") unless $pn; |
1099
|
|
|
|
|
|
|
|
1100
|
0
|
|
|
|
|
|
my ($port, $iaddr) = Socket::sockaddr_in($pn); |
1101
|
0
|
|
|
|
|
|
$self->{local_port} = $port; |
1102
|
|
|
|
|
|
|
|
1103
|
0
|
|
|
|
|
|
return $self->{local_ip} = Socket::inet_ntoa($iaddr); |
1104
|
|
|
|
|
|
|
} |
1105
|
|
|
|
|
|
|
|
1106
|
|
|
|
|
|
|
=head2 C<< $obj->local_addr_string() >> |
1107
|
|
|
|
|
|
|
|
1108
|
|
|
|
|
|
|
Returns the string describing the local end of the socket which underlies this |
1109
|
|
|
|
|
|
|
object in form "ip:port" |
1110
|
|
|
|
|
|
|
|
1111
|
|
|
|
|
|
|
=cut |
1112
|
|
|
|
|
|
|
sub local_addr_string { |
1113
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
1114
|
0
|
|
|
|
|
|
my $ip = $self->local_ip_string; |
1115
|
0
|
0
|
|
|
|
|
return $ip ? "$ip:$self->{local_port}" : undef; |
1116
|
|
|
|
|
|
|
} |
1117
|
|
|
|
|
|
|
|
1118
|
|
|
|
|
|
|
|
1119
|
|
|
|
|
|
|
=head2 C<< $obj->as_string() >> |
1120
|
|
|
|
|
|
|
|
1121
|
|
|
|
|
|
|
Returns a string describing this socket. |
1122
|
|
|
|
|
|
|
|
1123
|
|
|
|
|
|
|
=cut |
1124
|
|
|
|
|
|
|
sub as_string { |
1125
|
0
|
|
|
0
|
0
|
|
my Danga::Socket $self = shift; |
1126
|
|
|
|
|
|
|
my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . |
1127
|
0
|
0
|
|
|
|
|
($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; |
|
|
0
|
|
|
|
|
|
1128
|
0
|
0
|
|
|
|
|
my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open"); |
1129
|
0
|
|
|
|
|
|
my $peer = $self->peer_addr_string; |
1130
|
0
|
0
|
|
|
|
|
if ($peer) { |
1131
|
0
|
|
|
|
|
|
$ret .= " to " . $self->peer_addr_string; |
1132
|
|
|
|
|
|
|
} |
1133
|
0
|
|
|
|
|
|
return $ret; |
1134
|
|
|
|
|
|
|
} |
1135
|
|
|
|
|
|
|
|
1136
|
|
|
|
|
|
|
sub _undef { |
1137
|
0
|
0
|
|
0
|
|
|
return undef unless $ENV{DS_DEBUG}; |
1138
|
0
|
|
0
|
|
|
|
my $msg = shift || ""; |
1139
|
0
|
|
|
|
|
|
warn "Danga::Socket: $msg\n"; |
1140
|
0
|
|
|
|
|
|
return undef; |
1141
|
|
|
|
|
|
|
} |
1142
|
|
|
|
|
|
|
|
1143
|
|
|
|
|
|
|
package # Hide from PAUSE |
1144
|
|
|
|
|
|
|
Danga::Socket::Timer; |
1145
|
|
|
|
|
|
|
# [$cancel_coderef]; |
1146
|
|
|
|
|
|
|
sub cancel { |
1147
|
0
|
|
|
0
|
|
|
$_[0][0]->(); |
1148
|
|
|
|
|
|
|
} |
1149
|
|
|
|
|
|
|
|
1150
|
|
|
|
|
|
|
=head1 AUTHORS |
1151
|
|
|
|
|
|
|
|
1152
|
|
|
|
|
|
|
Martin Atkins |
1153
|
|
|
|
|
|
|
|
1154
|
|
|
|
|
|
|
Based on L by Brad Fitzpatrick and others. |
1155
|
|
|
|
|
|
|
|
1156
|
|
|
|
|
|
|
=head1 LICENSE |
1157
|
|
|
|
|
|
|
|
1158
|
|
|
|
|
|
|
License is granted to use and distribute this module under the same |
1159
|
|
|
|
|
|
|
terms as Perl itself. |
1160
|
|
|
|
|
|
|
|
1161
|
|
|
|
|
|
|
=cut |
1162
|
|
|
|
|
|
|
|
1163
|
|
|
|
|
|
|
# Pretend that we loaded Danga::Socket so that |
1164
|
|
|
|
|
|
|
# later "use Danga::Socket" calls don't conflict. |
1165
|
|
|
|
|
|
|
$INC{"Danga/Socket.pm"} = __FILE__; |
1166
|
|
|
|
|
|
|
|
1167
|
|
|
|
|
|
|
1; |
1168
|
|
|
|
|
|
|
|