line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Tatsumaki::MessageQueue; |
2
|
3
|
|
|
3
|
|
11358
|
use strict; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
141
|
|
3
|
|
|
|
|
|
|
|
4
|
3
|
|
|
3
|
|
3777
|
use AnyEvent; |
|
3
|
|
|
|
|
14059
|
|
|
3
|
|
|
|
|
85
|
|
5
|
3
|
|
|
3
|
|
3282
|
use Any::Moose; |
|
3
|
|
|
|
|
130033
|
|
|
3
|
|
|
|
|
26
|
|
6
|
3
|
|
|
3
|
|
7013
|
use Try::Tiny; |
|
3
|
|
|
|
|
5319
|
|
|
3
|
|
|
|
|
188
|
|
7
|
3
|
|
|
3
|
|
22
|
use Scalar::Util; |
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
158
|
|
8
|
3
|
|
|
3
|
|
3439
|
use Time::HiRes; |
|
3
|
|
|
|
|
22898
|
|
|
3
|
|
|
|
|
21
|
|
9
|
3
|
|
|
3
|
|
590
|
use constant DEBUG => $ENV{TATSUMAKI_DEBUG}; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
3326
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
has channel => (is => 'rw', isa => 'Str'); |
12
|
|
|
|
|
|
|
has backlog => (is => 'rw', isa => 'ArrayRef', default => sub { [] }); |
13
|
|
|
|
|
|
|
has clients => (is => 'rw', isa => 'HashRef', default => sub { +{} }); |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
our $BacklogLength = 30; # TODO configurable |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
my %instances; |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
sub channels { |
20
|
0
|
|
|
0
|
0
|
0
|
values %instances; |
21
|
|
|
|
|
|
|
} |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub instance { |
24
|
13
|
|
|
13
|
0
|
11810
|
my($class, $name) = @_; |
25
|
13
|
|
66
|
|
|
134
|
$instances{$name} ||= $class->new(channel => $name); |
26
|
|
|
|
|
|
|
} |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
sub backlog_events { |
29
|
7
|
|
|
7
|
0
|
11
|
my $self = shift; |
30
|
7
|
|
|
|
|
11
|
reverse grep defined, @{$self->backlog}; |
|
7
|
|
|
|
|
68
|
|
31
|
|
|
|
|
|
|
} |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
sub append_backlog { |
34
|
4
|
|
|
4
|
0
|
11
|
my($self, @events) = @_; |
35
|
4
|
|
|
|
|
8
|
my @new_backlog = (reverse(@events), @{$self->backlog}); |
|
4
|
|
|
|
|
17
|
|
36
|
4
|
|
|
|
|
31
|
$self->backlog([ splice @new_backlog, 0, $BacklogLength ]); |
37
|
|
|
|
|
|
|
} |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
sub publish { |
40
|
4
|
|
|
4
|
1
|
26
|
my($self, @events) = @_; |
41
|
|
|
|
|
|
|
|
42
|
4
|
|
|
|
|
6
|
for my $client_id (keys %{$self->clients}) { |
|
4
|
|
|
|
|
81
|
|
43
|
7
|
|
|
|
|
88
|
my $client = $self->clients->{$client_id}; |
44
|
7
|
100
|
|
|
|
25
|
if ($client->{cv}->cb) { |
45
|
|
|
|
|
|
|
# currently listening: flush and send the events right away |
46
|
5
|
|
|
|
|
36
|
$self->flush_events($client_id, @events); |
47
|
|
|
|
|
|
|
} else { |
48
|
|
|
|
|
|
|
# between long poll comet: buffer the events |
49
|
|
|
|
|
|
|
# TODO: limit buffer length |
50
|
2
|
|
|
|
|
97
|
warn "Buffering new events for $client_id" if DEBUG; |
51
|
2
|
|
|
|
|
3
|
push @{$client->{buffer}}, @events; |
|
2
|
|
|
|
|
8
|
|
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
} |
54
|
4
|
|
|
|
|
30
|
$self->append_backlog(@events); |
55
|
|
|
|
|
|
|
} |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
sub flush_events { |
58
|
9
|
|
|
9
|
0
|
24
|
my($self, $client_id, @events) = @_; |
59
|
|
|
|
|
|
|
|
60
|
9
|
50
|
|
|
|
65
|
my $client = $self->clients->{$client_id} or return; |
61
|
|
|
|
|
|
|
try { |
62
|
9
|
|
|
9
|
|
332
|
my $cb = $client->{cv}->cb; |
63
|
9
|
|
|
|
|
94
|
$client->{cv}->send(@events); |
64
|
9
|
|
|
|
|
399
|
$client->{cv} = AE::cv; |
65
|
9
|
|
|
|
|
92
|
$client->{buffer} = []; |
66
|
|
|
|
|
|
|
|
67
|
9
|
50
|
|
|
|
30
|
if ($client->{persistent}) { |
68
|
0
|
|
|
|
|
0
|
$client->{cv}->cb($cb); |
69
|
|
|
|
|
|
|
} else { |
70
|
9
|
|
|
|
|
15
|
undef $client->{longpoll_timer}; |
71
|
|
|
|
|
|
|
$client->{reconnect_timer} = AE::timer 30, 0, sub { |
72
|
0
|
|
|
|
|
0
|
Scalar::Util::weaken $self; |
73
|
0
|
|
|
|
|
0
|
warn "Sweep $client_id (no long-poll reconnect)" if DEBUG; |
74
|
0
|
|
|
|
|
0
|
undef $client; |
75
|
0
|
|
|
|
|
0
|
delete $self->clients->{$client_id}; |
76
|
9
|
|
|
|
|
142
|
}; |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
} catch { |
79
|
0
|
0
|
|
0
|
|
0
|
/Tatsumaki::Error::ClientDisconnect/ and do { |
80
|
0
|
|
|
|
|
0
|
warn "Client $client_id disconnected" if DEBUG; |
81
|
0
|
|
|
|
|
0
|
undef $client; |
82
|
0
|
|
|
|
|
0
|
delete $self->clients->{$client_id}; |
83
|
|
|
|
|
|
|
}; |
84
|
9
|
|
|
|
|
99
|
}; |
85
|
|
|
|
|
|
|
} |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
sub poll_once { |
88
|
9
|
|
|
9
|
1
|
71
|
my($self, $client_id, $cb, $timeout) = @_; |
89
|
|
|
|
|
|
|
|
90
|
9
|
|
|
|
|
15
|
my $is_new; |
91
|
9
|
|
66
|
|
|
73
|
my $client = $self->clients->{$client_id} ||= do { |
92
|
7
|
|
|
|
|
12
|
$is_new = 1; |
93
|
7
|
|
|
|
|
211
|
+ { cv => AE::cv, persistent => 0, buffer => [] }; |
94
|
|
|
|
|
|
|
}; |
95
|
|
|
|
|
|
|
|
96
|
9
|
50
|
|
|
|
5697
|
if ( $client->{longpoll_timer} ) { |
97
|
|
|
|
|
|
|
# close last connection from the same client_id |
98
|
0
|
|
|
|
|
0
|
$self->flush_events($client_id); |
99
|
0
|
|
|
|
|
0
|
undef $client->{longpoll_timer}; |
100
|
|
|
|
|
|
|
} |
101
|
9
|
|
|
|
|
18
|
undef $client->{reconnect_timer}; |
102
|
|
|
|
|
|
|
|
103
|
9
|
|
|
9
|
|
84
|
$client->{cv}->cb(sub { $cb->($_[0]->recv) }); |
|
9
|
|
|
|
|
101
|
|
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
# reset garbage collection timeout with the long-poll timeout |
106
|
|
|
|
|
|
|
# $timeout = 0 is a valid timeout for interval-polling |
107
|
9
|
100
|
|
|
|
343
|
$timeout = 55 unless defined $timeout; |
108
|
|
|
|
|
|
|
$client->{longpoll_timer} = AE::timer $timeout || 55, 0, sub { |
109
|
1
|
|
|
1
|
|
998163
|
Scalar::Util::weaken $self; |
110
|
1
|
|
|
|
|
3
|
warn "Timing out $client_id long-poll" if DEBUG; |
111
|
1
|
|
|
|
|
9
|
$self->flush_events($client_id); |
112
|
9
|
|
50
|
|
|
87
|
}; |
113
|
|
|
|
|
|
|
|
114
|
9
|
100
|
|
|
|
25
|
if ($is_new) { |
|
2
|
50
|
|
|
|
11
|
|
115
|
|
|
|
|
|
|
# flush backlog for a new client |
116
|
7
|
|
|
|
|
27
|
my @events = $self->backlog_events; |
117
|
7
|
100
|
|
|
|
40
|
$self->flush_events($client_id, @events) if @events; |
118
|
|
|
|
|
|
|
}elsif ( @{ $client->{buffer} } ) { |
119
|
|
|
|
|
|
|
# flush buffer for a long-poll client |
120
|
2
|
|
|
|
|
3
|
$self->flush_events($client_id, @{ $client->{buffer} }); |
|
2
|
|
|
|
|
8
|
|
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
} |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
sub poll { |
125
|
0
|
|
|
0
|
1
|
|
my($self, $client_id, $cb) = @_; |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
# TODO register client info like names and remote host in $client |
128
|
0
|
|
|
|
|
|
my $cv = AE::cv; |
129
|
0
|
|
|
0
|
|
|
$cv->cb(sub { $cb->($_[0]->recv) }); |
|
0
|
|
|
|
|
|
|
130
|
0
|
|
|
|
|
|
my $s = $self->clients->{$client_id} = { |
131
|
|
|
|
|
|
|
cv => $cv, persistent => 1, buffer => [], |
132
|
|
|
|
|
|
|
}; |
133
|
|
|
|
|
|
|
|
134
|
0
|
|
|
|
|
|
my @events = $self->backlog_events; |
135
|
0
|
0
|
|
|
|
|
$self->flush_events($client_id, @events) if @events; |
136
|
|
|
|
|
|
|
} |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
1; |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
__END__ |