File Coverage

blib/lib/Fluent/Logger.pm
Criterion Covered Total %
statement 192 251 76.4
branch 43 78 55.1
condition 9 25 36.0
subroutine 35 40 87.5
pod 4 6 66.6
total 283 400 70.7


line stmt bran cond sub pod time code
1             # -*- coding: utf-8; -*-
2             package Fluent::Logger;
3              
4 8     8   261322 use strict;
  8         22  
  8         267  
5 8     8   53 use warnings;
  8         20  
  8         431  
6              
7             our $VERSION = '0.25';
8              
9 8     8   3713 use IO::Select;
  8         11054  
  8         376  
10 8     8   441 use IO::Socket::INET;
  8         15785  
  8         90  
11 8     8   3814 use IO::Socket::UNIX;
  8         16  
  8         69  
12 8     8   6134 use Data::MessagePack;
  8         7215  
  8         193  
13 8     8   3412 use Time::Piece;
  8         57424  
  8         36  
14 8     8   558 use Carp;
  8         17  
  8         401  
15 8     8   47 use Scalar::Util qw/ refaddr /;
  8         26  
  8         354  
16 8     8   451 use Time::HiRes qw/ time /;
  8         916  
  8         56  
17 8     8   4430 use UUID::Tiny qw/ create_uuid UUID_V4 /;
  8         80620  
  8         534  
18 8     8   57 use MIME::Base64 qw/ encode_base64 /;
  8         16  
  8         322  
19              
20 8     8   41 use constant RECONNECT_WAIT => 0.5;
  8         16  
  8         500  
21 8     8   46 use constant RECONNECT_WAIT_INCR_RATE => 1.5;
  8         13  
  8         330  
22 8     8   40 use constant RECONNECT_WAIT_MAX => 60;
  8         11  
  8         322  
23 8     8   40 use constant RECONNECT_WAIT_MAX_COUNT => 12;
  8         24  
  8         348  
24              
25 8     8   53 use constant MP_HEADER_3ELM_ARRAY => "\x93";
  8         15  
  8         340  
26 8     8   41 use constant MP_HEADER_4ELM_ARRAY => "\x94";
  8         16  
  8         347  
27 8     8   38 use constant MP_HEADER_EVENT_TIME => "\xd7\x00";
  8         17  
  8         347  
28              
29 8     8   3557 use subs 'prefer_integer';
  8         234  
  8         45  
30              
31             use Class::Tiny +{
32             tag_prefix => sub {},
33 1         34 host => sub { "127.0.0.1" },
34 0         0 port => sub { 24224 },
35             socket => sub {},
36 5         315 timeout => sub { 3.0 },
37 0         0 buffer_limit => sub { 8 * 1024 * 1024 }, # fixme
38 1         27 buffer_overflow_handler => sub { undef },
39 0         0 truncate_buffer_at_overflow => sub { 0 },
40 0         0 max_write_retry => sub { 5 },
41 2         57 write_length => sub { 8 * 1024 * 1024 },
42             socket_io => sub {},
43 2         24 errors => sub { [] },
44 4         49 prefer_integer => sub { 1 },
45             packer => sub {
46 4         39 my $self = shift;
47 4         75 my $mp = Data::MessagePack->new;
48 4         55 $mp->prefer_integer( $self->prefer_integer );
49 4         55 $mp;
50             },
51 0         0 pending => sub { "" },
52 2         37 connect_error_history => sub { +[] },
53             owner_pid => sub {},
54 4         49 event_time => sub { 0 },
55 4         42 ack => sub { 0 },
56 0         0 pending_acks => sub { +[] },
57             unpacker => sub {
58 0         0 require Data::MessagePack::Stream;
59 0         0 Data::MessagePack::Stream->new;
60             },
61             selector => sub { },
62 8     8   5023 };
  8         17301  
  8         342  
63              
64             sub BUILD {
65 5     5 0 2018514 my $self = shift;
66 5         57 $self->_connect;
67             }
68              
69             sub prefer_integer {
70 4     4   11 my $self = shift;
71              
72 4 50       29 if (@_) {
    50          
73 0         0 $self->{prefer_integer} = shift;
74 0         0 $self->packer->prefer_integer( $self->prefer_integer );
75             } elsif ( exists $self->{prefer_integer} ) {
76 0         0 return $self->{prefer_integer};
77             } else {
78 4         30 my $defaults = Class::Tiny->get_all_attribute_defaults_for( ref $self );
79 4         454 return $self->{prefer_integer} = $defaults->{prefer_integer}->();
80             }
81             }
82              
83             sub _carp {
84 28     28   55 my $self = shift;
85 28         56 my $msg = shift;
86 28         77 chomp $msg;
87 28         137 carp(
88             sprintf "%s %s[%s](%s): %s",
89             localtime->strftime("%Y-%m-%dT%H:%M:%S%z"),
90             ref $self,
91             refaddr $self,
92             $self->_connect_info,
93             $msg,
94             );
95             }
96              
97             sub _add_error {
98 24     24   62 my $self = shift;
99 24         53 my $msg = shift;
100 24         98 $self->_carp($msg);
101 24         8420 push @{ $self->errors }, $msg;
  24         608  
102             }
103              
104             sub errstr {
105 0     0 1 0 my $self = shift;
106 0         0 return join ("\n", @{ $self->errors });
  0         0  
107             }
108              
109             sub _connect_info {
110 28     28   4374 my $self = shift;
111 28 50       633 $self->socket || sprintf "%s:%d", $self->host, $self->port;
112             }
113              
114             sub _connect {
115 17     17   68 my $self = shift;
116 17         46 my $force = shift;
117              
118 17 50 33     682 return if $self->socket_io && !$force;
119              
120 17 100       407 my $sock = defined $self->socket
121             ? IO::Socket::UNIX->new( Peer => $self->socket )
122             : IO::Socket::INET->new(
123             PeerAddr => $self->host,
124             PeerPort => $self->port,
125             Proto => 'tcp',
126             Timeout => $self->timeout,
127             ReuseAddr => 1,
128             );
129 17 100       15146 if (!$sock) {
130 14         114 $self->_add_error("Can't connect: $!");
131 14         109 push @{ $self->connect_error_history }, time;
  14         226  
132 14 50       119 if (@{ $self->connect_error_history } > RECONNECT_WAIT_MAX_COUNT) {
  14         182  
133 0         0 shift @{ $self->connect_error_history };
  0         0  
134             }
135 14         106 return;
136             }
137 3         111 $self->connect_error_history([]);
138 3         113 $self->owner_pid($$);
139 3         81 $self->selector(IO::Select->new($sock));
140 3         361 $self->socket_io($sock);
141             }
142              
143             sub close {
144 7     7 1 23 my $self = shift;
145              
146 7 100       32 if ( length $self->{pending} ) {
147 2         10 $self->_carp("flushing pending data on close");
148 2 50       554 $self->_connect unless $self->socket_io;
149 2         5 my $written = eval {
150 2         10 $self->_write( $self->{pending} );
151             };
152 2 50 33     45 if ($@ || !$written) {
153 2         7 my $size = length $self->{pending};
154 2         15 $self->_carp("Can't send pending data. LOST $size bytes.: $@");
155 2         460 $self->_call_buffer_overflow_handler();
156             } else {
157 0         0 $self->_carp("pending data was flushed successfully");
158             }
159             };
160 7         32 $self->{pending} = "";
161 7         22 $self->{pending_acks} = [];
162 7         36 delete $self->{selector};
163 7         48 my $socket = delete $self->{socket_io};
164 7 100       73 $socket->close if $socket;
165             }
166              
167             sub post {
168 28     28 1 33591 my($self, $tag, $msg) = @_;
169              
170 28   50     337 $self->_post( $tag || "", $msg, time() );
171             }
172              
173             sub post_with_time {
174 0     0 1 0 my ($self, $tag, $msg, $time) = @_;
175              
176 0   0     0 $self->_post( $tag || "", $msg, $time );
177             }
178              
179             sub _pack_time {
180 28     28   79 my ($self, $time) = @_;
181              
182 28 50       464 if ($self->event_time) {
183 0         0 my $time_i = int $time;
184 0         0 my $nanosec = int(($time - $time_i) * 10 ** 9);
185 0         0 return MP_HEADER_EVENT_TIME . pack("NN", $time_i, $nanosec);
186             } else {
187 28         502 return $self->packer->pack(int $time);
188             }
189             }
190              
191             sub _post {
192 28     28   79 my ($self, $tag, $msg, $time) = @_;
193              
194 28 50       111 if (ref $msg ne "HASH") {
195 0         0 $self->_add_error("message '$msg' must be a HashRef");
196 0         0 return;
197             }
198              
199 28 50       680 $tag = join('.', $self->tag_prefix, $tag) if $self->tag_prefix;
200 28         604 my $p = $self->packer;
201 28         389 $self->_send(
202             $p->pack($tag),
203             $self->_pack_time($time),
204             $p->pack($msg),
205             );
206             }
207              
208             sub _send {
209 28     28   6367 my ($self, @args) = @_;
210              
211 28         59 my ($data, $unique_key);
212 28 50       614 if ( $self->ack ) {
213 0         0 $unique_key = encode_base64(create_uuid(UUID_V4));
214 0         0 $data = join('', MP_HEADER_4ELM_ARRAY, @args, $self->{packer}->pack({ chunk => $unique_key }));
215 0         0 push @{$self->{pending_acks}}, $unique_key;
  0         0  
216             } else {
217 28         10900 $data = join('', MP_HEADER_3ELM_ARRAY, @args);
218             }
219              
220 28         105 my $prev_size = length($self->{pending});
221 28         61 my $current_size = length($data);
222 28         12001 $self->{pending} .= $data;
223              
224 28         73 my $errors = @{ $self->connect_error_history };
  28         697  
225 28 100 100     588 if ( $errors && length $self->pending <= $self->buffer_limit )
226             {
227 16         13698 my $suppress_sec;
228 16 50       45 if ( $errors < RECONNECT_WAIT_MAX_COUNT ) {
229 16         72 $suppress_sec = RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** ($errors - 1));
230             } else {
231 0         0 $suppress_sec = RECONNECT_WAIT_MAX;
232             }
233 16 50       303 if ( time - $self->connect_error_history->[-1] < $suppress_sec ) {
234 16         1468 return;
235             }
236             }
237              
238             # fork safe
239 12 100 66     24363 if (!$self->socket_io || $self->owner_pid != $$) {
240 10         79 $self->_connect(1);
241             }
242              
243 12         98 my $written;
244 12         35 eval {
245 12         58 $written = $self->_write( $self->{pending} );
246             my $acked = $self->ack
247 2 50       44 ? $self->_wait_ack(@{ $self->{pending_acks} })
  0         0  
248             : 1;
249 2 50 33     34 if ($written && $acked) {
250 2         5 $self->{pending} = "";
251 2         9 $self->{pending_acks} = [];
252             }
253             };
254 12 100       309 if ($@) {
255 10         30 my $error = $@;
256 10         58 $self->_add_error("Cannot send data: $error");
257 10         97 my $sock = delete $self->{socket_io};
258 10 50       34 $sock->close if $sock;
259 10         29 delete $self->{selector};
260 10 50       166 if ( length($self->{pending}) > $self->buffer_limit ) {
261 10 100       227 if ( defined $self->buffer_overflow_handler ) {
    50          
262 1         12 $self->_call_buffer_overflow_handler();
263 1         3 $self->{pending} = "";
264 1 50       18 $self->{pending_acks} = [] if $self->ack;
265             } elsif ( $self->truncate_buffer_at_overflow ) {
266 9         226 substr($self->{pending}, $prev_size, $current_size, "");
267 9 50       147 pop @{$self->{pending_acks}} if $self->ack;
  0         0  
268             }
269             }
270 10         169 return;
271             }
272              
273 2         23 return $written;
274             }
275              
276             sub _wait_ack {
277 0     0   0 my $self = shift;
278 0         0 my @acks = @_;
279              
280 0         0 my $up = $self->unpacker;
281 0     0   0 local $SIG{"PIPE"} = sub { die $! };
  0         0  
282             READ:
283 0         0 while (1) {
284 0         0 my ($s) = $self->selector->can_read($self->timeout);
285 0 0       0 if (!$s) {
286 0         0 die "ack read timed out";
287             }
288 0         0 $s->sysread(my $buf, 1024);
289 0 0 0     0 return if @acks > 0 && length($buf) == 0;
290 0         0 $up->feed($buf);
291 0         0 while ($up->next) {
292 0         0 my $ack = $up->data;
293 0         0 my $unique_key = shift @acks;
294 0 0 0     0 if ($unique_key && ref $ack eq "HASH") {
295 0 0       0 if ($ack->{ack} ne $unique_key) {
296 0         0 die "ack is not expected: " . $ack->{ack};
297             }
298             } else {
299 0         0 unshift @{ $self->{pending_acks} }, $unique_key;
  0         0  
300 0         0 die "Can't send data. ack is not expected. $@";
301             }
302 0 0       0 last READ if @acks == 0;
303             }
304             }
305 0         0 return 1;
306             }
307              
308             sub _call_buffer_overflow_handler {
309 3     3   7 my $self = shift;
310 3 100       60 if (my $handler = $self->buffer_overflow_handler) {
311 2         15 eval {
312 2         10 $handler->($self->{pending});
313             };
314 2 50       16 if (my $error = $@) {
315 0         0 $self->_add_error("Can't call buffer overflow handler: $error");
316             }
317             }
318             }
319              
320             sub _write {
321 14     14   34 my $self = shift;
322 14         21636 my $data = shift;
323 14         96 my $length = length($data);
324 14         33 my $retry = my $written = 0;
325 14 100       552 die "Connection is not available" unless $self->socket_io;
326              
327 2     0   79 local $SIG{"PIPE"} = sub { die $! };
  0         0  
328              
329 2         13 while ($written < $length) {
330 2         47 my ($s) = $self->selector->can_write($self->timeout);
331 2 50       128 die "send write timed out" unless $s;
332 2         68 my $nwrite
333             = $s->syswrite($data, $self->write_length, $written);
334              
335 2 50       86 if (!$nwrite) {
336 0 0       0 if ($retry > $self->max_write_retry) {
337 0         0 die "failed write retry; max write retry count. $!";
338             }
339 0         0 $retry++;
340             } else {
341 2         11 $written += $nwrite;
342             }
343             }
344 2         17 $written;
345             }
346              
347             sub DEMOLISH {
348 5     5 0 13824 my $self = shift;
349 5         32 $self->close;
350             }
351              
352              
353             1;
354             __END__