File Coverage

blib/lib/Fluent/Logger.pm
Criterion Covered Total %
statement 197 256 76.9
branch 44 80 55.0
condition 9 25 36.0
subroutine 35 40 87.5
pod 4 6 66.6
total 289 407 71.0


line stmt bran cond sub pod time code
1             # -*- coding: utf-8; -*-
2             package Fluent::Logger;
3              
4 8     8   3309664 use strict;
  8         22  
  8         382  
5 8     8   47 use warnings;
  8         24  
  8         832  
6              
7             our $VERSION = '0.29';
8              
9 8     8   4872 use IO::Select;
  8         18064  
  8         655  
10 8     8   886 use IO::Socket::INET;
  8         27985  
  8         123  
11 8     8   6089 use IO::Socket::UNIX;
  8         21  
  8         93  
12 8     8   9842 use Data::MessagePack;
  8         12434  
  8         335  
13 8     8   5146 use Time::Piece;
  8         118629  
  8         54  
14 8     8   739 use Carp;
  8         25  
  8         638  
15 8     8   50 use Scalar::Util qw/ refaddr /;
  8         19  
  8         577  
16 8     8   82 use Time::HiRes qw/ time /;
  8         18  
  8         73  
17 8     8   5884 use UUID::Tiny qw/ create_uuid UUID_V4 /;
  8         136875  
  8         1089  
18 8     8   68 use MIME::Base64 qw/ encode_base64 /;
  8         17  
  8         526  
19              
20 8     8   46 use constant RECONNECT_WAIT => 0.5;
  8         17  
  8         551  
21 8     8   43 use constant RECONNECT_WAIT_INCR_RATE => 1.5;
  8         25  
  8         544  
22 8     8   61 use constant RECONNECT_WAIT_MAX => 60;
  8         35  
  8         595  
23 8     8   60 use constant RECONNECT_WAIT_MAX_COUNT => 12;
  8         42  
  8         1248  
24              
25 8     8   47 use constant MP_HEADER_3ELM_ARRAY => "\x93";
  8         33  
  8         519  
26 8     8   41 use constant MP_HEADER_4ELM_ARRAY => "\x94";
  8         16  
  8         499  
27 8     8   43 use constant MP_HEADER_EVENT_TIME => "\xd7\x00";
  8         13  
  8         474  
28              
29 8     8   4395 use subs 'prefer_integer';
  8         2965  
  8         56  
30              
31             use Class::Tiny +{
32             tag_prefix => sub {},
33 1         32 host => sub { "127.0.0.1" },
34 0         0 port => sub { 24224 },
35             socket => sub {},
36 5         356 timeout => sub { 3.0 },
37 0         0 buffer_limit => sub { 8 * 1024 * 1024 }, # fixme
38 1         38 buffer_overflow_handler => sub { undef },
39 0         0 truncate_buffer_at_overflow => sub { 0 },
40 0         0 max_write_retry => sub { 5 },
41 2         117 write_length => sub { 8 * 1024 * 1024 },
42             socket_io => sub {},
43 2         29 errors => sub { [] },
44 4         79 prefer_integer => sub { 1 },
45 4         360 utf8 => sub { 1 },
46             packer => sub {
47 4         58 my $self = shift;
48 4         168 my $mp = Data::MessagePack->new;
49 4         205 $mp->utf8( $self->utf8 );
50 4         118 $mp->prefer_integer( $self->prefer_integer );
51 4         36 $mp;
52             },
53 0         0 pending => sub { "" },
54 2         30 connect_error_history => sub { +[] },
55             owner_pid => sub {},
56 4         77 event_time => sub { 0 },
57 4         89 ack => sub { 0 },
58 0         0 pending_acks => sub { +[] },
59             unpacker => sub {
60 0         0 require Data::MessagePack::Stream;
61 0         0 Data::MessagePack::Stream->new;
62             },
63             selector => sub { },
64 4         57 retry_immediately => sub { 0 },
65 8     8   9660 };
  8         19902  
  8         413  
66              
67             sub BUILD {
68 5     5 0 2638924 my $self = shift;
69 5         189 $self->_connect;
70             }
71              
72             sub prefer_integer {
73 4     4   63 my $self = shift;
74              
75 4 50       33 if (@_) {
    50          
76 0         0 $self->{prefer_integer} = shift;
77 0         0 $self->packer->prefer_integer( $self->prefer_integer );
78             } elsif ( exists $self->{prefer_integer} ) {
79 0         0 return $self->{prefer_integer};
80             } else {
81 4         84 my $defaults = Class::Tiny->get_all_attribute_defaults_for( ref $self );
82 4         507 return $self->{prefer_integer} = $defaults->{prefer_integer}->();
83             }
84             }
85              
86             sub _carp {
87 28     28   57 my $self = shift;
88 28         63 my $msg = shift;
89 28         72 chomp $msg;
90 28         169 carp(
91             sprintf "%s %s[%s](%s): %s",
92             localtime->strftime("%Y-%m-%dT%H:%M:%S%z"),
93             ref $self,
94             refaddr $self,
95             $self->_connect_info,
96             $msg,
97             );
98             }
99              
100             sub _add_error {
101 24     24   61 my $self = shift;
102 24         68 my $msg = shift;
103 24         107 $self->_carp($msg);
104 24         9115 push @{ $self->errors }, $msg;
  24         952  
105             }
106              
107             sub errstr {
108 0     0 1 0 my $self = shift;
109 0         0 return join ("\n", @{ $self->errors });
  0         0  
110             }
111              
112             sub _connect_info {
113 28     28   7263 my $self = shift;
114 28 50       1131 $self->socket || sprintf "%s:%d", $self->host, $self->port;
115             }
116              
117             sub _connect {
118 17     17   96 my $self = shift;
119 17         58 my $force = shift;
120              
121 17 50 33     1234 return if $self->socket_io && !$force;
122              
123 17 100       628 my $sock = defined $self->socket
124             ? IO::Socket::UNIX->new( Peer => $self->socket )
125             : IO::Socket::INET->new(
126             PeerAddr => $self->host,
127             PeerPort => $self->port,
128             Proto => 'tcp',
129             Timeout => $self->timeout,
130             ReuseAddr => 1,
131             );
132 17 100       18347 if (!$sock) {
133 14         196 $self->_add_error("Can't connect: $!");
134 14         119 push @{ $self->connect_error_history }, time;
  14         346  
135 14 50       119 if (@{ $self->connect_error_history } > RECONNECT_WAIT_MAX_COUNT) {
  14         322  
136 0         0 shift @{ $self->connect_error_history };
  0         0  
137             }
138 14         126 return;
139             }
140 3         156 $self->connect_error_history([]);
141 3         141 $self->owner_pid($$);
142 3         130 $self->selector(IO::Select->new($sock));
143 3         502 $self->socket_io($sock);
144             }
145              
146             sub close {
147 7     7 1 25 my $self = shift;
148              
149 7 100       46 if ( length $self->{pending} ) {
150 2         11 $self->_carp("flushing pending data on close");
151 2 50       742 $self->_connect unless $self->socket_io;
152 2         6 my $written = eval {
153 2         29 $self->_write( $self->{pending} );
154             };
155 2 50 33     46 if ($@ || !$written) {
156 2         25 my $size = length $self->{pending};
157 2         15 $self->_carp("Can't send pending data. LOST $size bytes.: $@");
158 2         674 $self->_call_buffer_overflow_handler();
159             } else {
160 0         0 $self->_carp("pending data was flushed successfully");
161             }
162             };
163 7         43 $self->{pending} = "";
164 7         50 $self->{pending_acks} = [];
165 7         72 delete $self->{selector};
166 7         27 my $socket = delete $self->{socket_io};
167 7 100       215 $socket->close if $socket;
168             }
169              
170             sub post {
171 28     28 1 19641 my($self, $tag, $msg) = @_;
172              
173 28   50     315 $self->_post( $tag || "", $msg, time() );
174             }
175              
176             sub post_with_time {
177 0     0 1 0 my ($self, $tag, $msg, $time) = @_;
178              
179 0   0     0 $self->_post( $tag || "", $msg, $time );
180             }
181              
182             sub _pack_time {
183 28     28   104 my ($self, $time) = @_;
184              
185 28 50       731 if ($self->event_time) {
186 0         0 my $time_i = int $time;
187 0         0 my $nanosec = int(($time - $time_i) * 10 ** 9);
188 0         0 return MP_HEADER_EVENT_TIME . pack("NN", $time_i, $nanosec);
189             } else {
190 28         797 return $self->packer->pack(int $time);
191             }
192             }
193              
194             sub _post {
195 28     28   97 my ($self, $tag, $msg, $time) = @_;
196              
197 28 50       138 if (ref $msg ne "HASH") {
198 0         0 $self->_add_error("message '$msg' must be a HashRef");
199 0         0 return;
200             }
201              
202 28 50       1051 $tag = join('.', $self->tag_prefix, $tag) if $self->tag_prefix;
203 28         905 my $p = $self->packer;
204 28         423 $self->_send(
205             $p->pack($tag),
206             $self->_pack_time($time),
207             $p->pack($msg),
208             );
209             }
210              
211             sub _send {
212 28     28   11622 my ($self, @args) = @_;
213              
214 28         68 my ($data, $unique_key);
215 28 50       1040 if ( $self->ack ) {
216 0         0 $unique_key = encode_base64(create_uuid(UUID_V4));
217 0         0 $data = join('', MP_HEADER_4ELM_ARRAY, @args, $self->{packer}->pack({ chunk => $unique_key }));
218 0         0 push @{$self->{pending_acks}}, $unique_key;
  0         0  
219             } else {
220 28         18493 $data = join('', MP_HEADER_3ELM_ARRAY, @args);
221             }
222              
223 28         162 my $prev_size = length($self->{pending});
224 28         54 my $current_size = length($data);
225 28         22988 $self->{pending} .= $data;
226              
227 28         92 my $errors = @{ $self->connect_error_history };
  28         1194  
228 28 100 100     1011 if ( $errors && length $self->pending <= $self->buffer_limit )
229             {
230 16         41158 my $suppress_sec;
231 16 50       75 if ( $errors < RECONNECT_WAIT_MAX_COUNT ) {
232 16         105 $suppress_sec = RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** ($errors - 1));
233             } else {
234 0         0 $suppress_sec = RECONNECT_WAIT_MAX;
235             }
236 16 50       479 if ( time - $self->connect_error_history->[-1] < $suppress_sec ) {
237 16         4826 return;
238             }
239             }
240              
241 12         56675 my ($written, $error);
242 12         395 for ( 0 .. $self->retry_immediately ) {
243             # check owner pid for fork safe
244 12 100 66     338 if (!$self->socket_io || $self->owner_pid != $$) {
245 10         76 $self->_connect(1);
246             }
247 12         1236 eval {
248 12         81 $written = $self->_write( $self->{pending} );
249             my $acked = $self->ack
250 2 50       54 ? $self->_wait_ack(@{ $self->{pending_acks} })
  0         0  
251             : 1;
252 2 50 33     83 if ($written && $acked) {
253 2         7 $self->{pending} = "";
254 2         8 $self->{pending_acks} = [];
255             }
256             };
257 12 100       207 if (!$@) {
258 2         49 return $written;
259             }
260 10         25 my $e = $@;
261 10         55 $error = "Cannot send data: $e";
262 10         32 my $sock = delete $self->{socket_io};
263 10 50       42 $sock->close if $sock;
264 10         27 delete $self->{selector};
265              
266 10 50       320 if ( length($self->{pending}) > $self->buffer_limit ) {
267 10 100       383 if ( defined $self->buffer_overflow_handler ) {
    50          
268 1         15 $self->_call_buffer_overflow_handler();
269 1         4 $self->{pending} = "";
270 1 50       40 $self->{pending_acks} = [] if $self->ack;
271             } elsif ( $self->truncate_buffer_at_overflow ) {
272 9         293 substr($self->{pending}, $prev_size, $current_size, "");
273 9 50       238 pop @{$self->{pending_acks}} if $self->ack;
  0         0  
274             }
275             }
276             }
277              
278 10 50       129 $self->_add_error($error) if defined $error;
279 10         197 return $written;
280             }
281              
282             sub _wait_ack {
283 0     0   0 my $self = shift;
284 0         0 my @acks = @_;
285              
286 0         0 my $up = $self->unpacker;
287 0     0   0 local $SIG{"PIPE"} = sub { die $! };
  0         0  
288             READ:
289 0         0 while (1) {
290 0         0 my ($s) = $self->selector->can_read($self->timeout);
291 0 0       0 if (!$s) {
292 0         0 die "ack read timed out";
293             }
294 0         0 $s->sysread(my $buf, 1024);
295 0 0 0     0 return if @acks > 0 && length($buf) == 0;
296 0         0 $up->feed($buf);
297 0         0 while ($up->next) {
298 0         0 my $ack = $up->data;
299 0         0 my $unique_key = shift @acks;
300 0 0 0     0 if ($unique_key && ref $ack eq "HASH") {
301 0 0       0 if ($ack->{ack} ne $unique_key) {
302 0         0 die "ack is not expected: " . $ack->{ack};
303             }
304             } else {
305 0         0 unshift @{ $self->{pending_acks} }, $unique_key;
  0         0  
306 0         0 die "Can't send data. ack is not expected. $@";
307             }
308 0 0       0 last READ if @acks == 0;
309             }
310             }
311 0         0 return 1;
312             }
313              
314             sub _call_buffer_overflow_handler {
315 3     3   10 my $self = shift;
316 3 100       143 if (my $handler = $self->buffer_overflow_handler) {
317 2         18 eval {
318 2         28 $handler->($self->{pending});
319             };
320 2 50       17 if (my $error = $@) {
321 0         0 $self->_add_error("Can't call buffer overflow handler: $error");
322             }
323             }
324             }
325              
326             sub _write {
327 14     14   34 my $self = shift;
328 14         43595 my $data = shift;
329 14         72 my $length = length($data);
330 14         36 my $retry = my $written = 0;
331 14 100       596 die "Connection is not available" unless $self->socket_io;
332              
333 2     0   75 local $SIG{"PIPE"} = sub { die $! };
  0         0  
334              
335 2         9 while ($written < $length) {
336 2         59 my ($s) = $self->selector->can_write($self->timeout);
337 2 50       182 die "send write timed out" unless $s;
338 2         52 my $nwrite
339             = $s->syswrite($data, $self->write_length, $written);
340              
341 2 50       167 if (!$nwrite) {
342 0 0       0 if ($retry > $self->max_write_retry) {
343 0         0 die "failed write retry; max write retry count. $!";
344             }
345 0         0 $retry++;
346             } else {
347 2         33 $written += $nwrite;
348             }
349             }
350 2         60 $written;
351             }
352              
353             sub DEMOLISH {
354 5     5 0 11661 my $self = shift;
355 5         42 $self->close;
356             }
357              
358              
359             1;
360             __END__