File Coverage

blib/lib/Fluent/Logger.pm
Criterion Covered Total %
statement 195 254 76.7
branch 44 80 55.0
condition 9 25 36.0
subroutine 35 40 87.5
pod 4 6 66.6
total 287 405 70.8


line stmt bran cond sub pod time code
1             # -*- coding: utf-8; -*-
2             package Fluent::Logger;
3              
4 8     8   488681 use strict;
  8         23  
  8         237  
5 8     8   41 use warnings;
  8         16  
  8         320  
6              
7             our $VERSION = '0.28';
8              
9 8     8   3941 use IO::Select;
  8         13469  
  8         373  
10 8     8   548 use IO::Socket::INET;
  8         22282  
  8         94  
11 8     8   4408 use IO::Socket::UNIX;
  8         18  
  8         83  
12 8     8   7794 use Data::MessagePack;
  8         8692  
  8         300  
13 8     8   4506 use Time::Piece;
  8         80116  
  8         44  
14 8     8   587 use Carp;
  8         15  
  8         464  
15 8     8   46 use Scalar::Util qw/ refaddr /;
  8         16  
  8         385  
16 8     8   577 use Time::HiRes qw/ time /;
  8         1318  
  8         57  
17 8     8   5862 use UUID::Tiny qw/ create_uuid UUID_V4 /;
  8         101491  
  8         649  
18 8     8   65 use MIME::Base64 qw/ encode_base64 /;
  8         16  
  8         444  
19              
20 8     8   54 use constant RECONNECT_WAIT => 0.5;
  8         39  
  8         424  
21 8     8   45 use constant RECONNECT_WAIT_INCR_RATE => 1.5;
  8         13  
  8         347  
22 8     8   44 use constant RECONNECT_WAIT_MAX => 60;
  8         13  
  8         513  
23 8     8   49 use constant RECONNECT_WAIT_MAX_COUNT => 12;
  8         23  
  8         454  
24              
25 8     8   51 use constant MP_HEADER_3ELM_ARRAY => "\x93";
  8         14  
  8         514  
26 8     8   50 use constant MP_HEADER_4ELM_ARRAY => "\x94";
  8         13  
  8         458  
27 8     8   53 use constant MP_HEADER_EVENT_TIME => "\xd7\x00";
  8         13  
  8         386  
28              
29 8     8   4592 use subs 'prefer_integer';
  8         193  
  8         41  
30              
31             use Class::Tiny +{
32             tag_prefix => sub {},
33 1         60 host => sub { "127.0.0.1" },
34 0         0 port => sub { 24224 },
35             socket => sub {},
36 5         351 timeout => sub { 3.0 },
37 0         0 buffer_limit => sub { 8 * 1024 * 1024 }, # fixme
38 1         34 buffer_overflow_handler => sub { undef },
39 0         0 truncate_buffer_at_overflow => sub { 0 },
40 0         0 max_write_retry => sub { 5 },
41 2         78 write_length => sub { 8 * 1024 * 1024 },
42             socket_io => sub {},
43 2         27 errors => sub { [] },
44 4         60 prefer_integer => sub { 1 },
45             packer => sub {
46 4         39 my $self = shift;
47 4         132 my $mp = Data::MessagePack->new;
48 4         126 $mp->prefer_integer( $self->prefer_integer );
49 4         65 $mp;
50             },
51 0         0 pending => sub { "" },
52 2         53 connect_error_history => sub { +[] },
53             owner_pid => sub {},
54 4         47 event_time => sub { 0 },
55 4         55 ack => sub { 0 },
56 0         0 pending_acks => sub { +[] },
57             unpacker => sub {
58 0         0 require Data::MessagePack::Stream;
59 0         0 Data::MessagePack::Stream->new;
60             },
61             selector => sub { },
62 4         75 retry_immediately => sub { 0 },
63 8     8   6803 };
  8         14568  
  8         294  
64              
65             sub BUILD {
66 5     5 0 2023662 my $self = shift;
67 5         114 $self->_connect;
68             }
69              
70             sub prefer_integer {
71 4     4   24 my $self = shift;
72              
73 4 50       45 if (@_) {
    50          
74 0         0 $self->{prefer_integer} = shift;
75 0         0 $self->packer->prefer_integer( $self->prefer_integer );
76             } elsif ( exists $self->{prefer_integer} ) {
77 0         0 return $self->{prefer_integer};
78             } else {
79 4         42 my $defaults = Class::Tiny->get_all_attribute_defaults_for( ref $self );
80 4         467 return $self->{prefer_integer} = $defaults->{prefer_integer}->();
81             }
82             }
83              
84             sub _carp {
85 28     28   59 my $self = shift;
86 28         61 my $msg = shift;
87 28         136 chomp $msg;
88 28         180 carp(
89             sprintf "%s %s[%s](%s): %s",
90             localtime->strftime("%Y-%m-%dT%H:%M:%S%z"),
91             ref $self,
92             refaddr $self,
93             $self->_connect_info,
94             $msg,
95             );
96             }
97              
98             sub _add_error {
99 24     24   57 my $self = shift;
100 24         53 my $msg = shift;
101 24         113 $self->_carp($msg);
102 24         8664 push @{ $self->errors }, $msg;
  24         668  
103             }
104              
105             sub errstr {
106 0     0 1 0 my $self = shift;
107 0         0 return join ("\n", @{ $self->errors });
  0         0  
108             }
109              
110             sub _connect_info {
111 28     28   4206 my $self = shift;
112 28 50       778 $self->socket || sprintf "%s:%d", $self->host, $self->port;
113             }
114              
115             sub _connect {
116 17     17   70 my $self = shift;
117 17         73 my $force = shift;
118              
119 17 50 33     960 return if $self->socket_io && !$force;
120              
121 17 100       459 my $sock = defined $self->socket
122             ? IO::Socket::UNIX->new( Peer => $self->socket )
123             : IO::Socket::INET->new(
124             PeerAddr => $self->host,
125             PeerPort => $self->port,
126             Proto => 'tcp',
127             Timeout => $self->timeout,
128             ReuseAddr => 1,
129             );
130 17 100       18810 if (!$sock) {
131 14         135 $self->_add_error("Can't connect: $!");
132 14         115 push @{ $self->connect_error_history }, time;
  14         247  
133 14 50       114 if (@{ $self->connect_error_history } > RECONNECT_WAIT_MAX_COUNT) {
  14         233  
134 0         0 shift @{ $self->connect_error_history };
  0         0  
135             }
136 14         117 return;
137             }
138 3         131 $self->connect_error_history([]);
139 3         165 $self->owner_pid($$);
140 3         134 $self->selector(IO::Select->new($sock));
141 3         438 $self->socket_io($sock);
142             }
143              
144             sub close {
145 7     7 1 19 my $self = shift;
146              
147 7 100       33 if ( length $self->{pending} ) {
148 2         18 $self->_carp("flushing pending data on close");
149 2 50       592 $self->_connect unless $self->socket_io;
150 2         25 my $written = eval {
151 2         12 $self->_write( $self->{pending} );
152             };
153 2 50 33     54 if ($@ || !$written) {
154 2         26 my $size = length $self->{pending};
155 2         15 $self->_carp("Can't send pending data. LOST $size bytes.: $@");
156 2         560 $self->_call_buffer_overflow_handler();
157             } else {
158 0         0 $self->_carp("pending data was flushed successfully");
159             }
160             };
161 7         34 $self->{pending} = "";
162 7         25 $self->{pending_acks} = [];
163 7         65 delete $self->{selector};
164 7         35 my $socket = delete $self->{socket_io};
165 7 100       92 $socket->close if $socket;
166             }
167              
168             sub post {
169 28     28 1 23628 my($self, $tag, $msg) = @_;
170              
171 28   50     410 $self->_post( $tag || "", $msg, time() );
172             }
173              
174             sub post_with_time {
175 0     0 1 0 my ($self, $tag, $msg, $time) = @_;
176              
177 0   0     0 $self->_post( $tag || "", $msg, $time );
178             }
179              
180             sub _pack_time {
181 28     28   90 my ($self, $time) = @_;
182              
183 28 50       513 if ($self->event_time) {
184 0         0 my $time_i = int $time;
185 0         0 my $nanosec = int(($time - $time_i) * 10 ** 9);
186 0         0 return MP_HEADER_EVENT_TIME . pack("NN", $time_i, $nanosec);
187             } else {
188 28         543 return $self->packer->pack(int $time);
189             }
190             }
191              
192             sub _post {
193 28     28   87 my ($self, $tag, $msg, $time) = @_;
194              
195 28 50       116 if (ref $msg ne "HASH") {
196 0         0 $self->_add_error("message '$msg' must be a HashRef");
197 0         0 return;
198             }
199              
200 28 50       757 $tag = join('.', $self->tag_prefix, $tag) if $self->tag_prefix;
201 28         666 my $p = $self->packer;
202 28         479 $self->_send(
203             $p->pack($tag),
204             $self->_pack_time($time),
205             $p->pack($msg),
206             );
207             }
208              
209             sub _send {
210 28     28   8936 my ($self, @args) = @_;
211              
212 28         61 my ($data, $unique_key);
213 28 50       662 if ( $self->ack ) {
214 0         0 $unique_key = encode_base64(create_uuid(UUID_V4));
215 0         0 $data = join('', MP_HEADER_4ELM_ARRAY, @args, $self->{packer}->pack({ chunk => $unique_key }));
216 0         0 push @{$self->{pending_acks}}, $unique_key;
  0         0  
217             } else {
218 28         15661 $data = join('', MP_HEADER_3ELM_ARRAY, @args);
219             }
220              
221 28         118 my $prev_size = length($self->{pending});
222 28         54 my $current_size = length($data);
223 28         21596 $self->{pending} .= $data;
224              
225 28         88 my $errors = @{ $self->connect_error_history };
  28         792  
226 28 100 100     755 if ( $errors && length $self->pending <= $self->buffer_limit )
227             {
228 16         31874 my $suppress_sec;
229 16 50       44 if ( $errors < RECONNECT_WAIT_MAX_COUNT ) {
230 16         77 $suppress_sec = RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** ($errors - 1));
231             } else {
232 0         0 $suppress_sec = RECONNECT_WAIT_MAX;
233             }
234 16 50       294 if ( time - $self->connect_error_history->[-1] < $suppress_sec ) {
235 16         2618 return;
236             }
237             }
238              
239 12         44221 my ($written, $error);
240 12         274 for ( 0 .. $self->retry_immediately ) {
241             # check owner pid for fork safe
242 12 100 66     297 if (!$self->socket_io || $self->owner_pid != $$) {
243 10         71 $self->_connect(1);
244             }
245 12         143 eval {
246 12         69 $written = $self->_write( $self->{pending} );
247             my $acked = $self->ack
248 2 50       53 ? $self->_wait_ack(@{ $self->{pending_acks} })
  0         0  
249             : 1;
250 2 50 33     46 if ($written && $acked) {
251 2         8 $self->{pending} = "";
252 2         8 $self->{pending_acks} = [];
253             }
254             };
255 12 100       263 if (!$@) {
256 2         47 return $written;
257             }
258 10         26 my $e = $@;
259 10         32 $error = "Cannot send data: $e";
260 10         44 my $sock = delete $self->{socket_io};
261 10 50       42 $sock->close if $sock;
262 10         31 delete $self->{selector};
263              
264 10 50       210 if ( length($self->{pending}) > $self->buffer_limit ) {
265 10 100       237 if ( defined $self->buffer_overflow_handler ) {
    50          
266 1         15 $self->_call_buffer_overflow_handler();
267 1         3 $self->{pending} = "";
268 1 50       32 $self->{pending_acks} = [] if $self->ack;
269             } elsif ( $self->truncate_buffer_at_overflow ) {
270 9         284 substr($self->{pending}, $prev_size, $current_size, "");
271 9 50       150 pop @{$self->{pending_acks}} if $self->ack;
  0         0  
272             }
273             }
274             }
275              
276 10 50       135 $self->_add_error($error) if defined $error;
277 10         198 return $written;
278             }
279              
280             sub _wait_ack {
281 0     0   0 my $self = shift;
282 0         0 my @acks = @_;
283              
284 0         0 my $up = $self->unpacker;
285 0     0   0 local $SIG{"PIPE"} = sub { die $! };
  0         0  
286             READ:
287 0         0 while (1) {
288 0         0 my ($s) = $self->selector->can_read($self->timeout);
289 0 0       0 if (!$s) {
290 0         0 die "ack read timed out";
291             }
292 0         0 $s->sysread(my $buf, 1024);
293 0 0 0     0 return if @acks > 0 && length($buf) == 0;
294 0         0 $up->feed($buf);
295 0         0 while ($up->next) {
296 0         0 my $ack = $up->data;
297 0         0 my $unique_key = shift @acks;
298 0 0 0     0 if ($unique_key && ref $ack eq "HASH") {
299 0 0       0 if ($ack->{ack} ne $unique_key) {
300 0         0 die "ack is not expected: " . $ack->{ack};
301             }
302             } else {
303 0         0 unshift @{ $self->{pending_acks} }, $unique_key;
  0         0  
304 0         0 die "Can't send data. ack is not expected. $@";
305             }
306 0 0       0 last READ if @acks == 0;
307             }
308             }
309 0         0 return 1;
310             }
311              
312             sub _call_buffer_overflow_handler {
313 3     3   11 my $self = shift;
314 3 100       69 if (my $handler = $self->buffer_overflow_handler) {
315 2         19 eval {
316 2         9 $handler->($self->{pending});
317             };
318 2 50       15 if (my $error = $@) {
319 0         0 $self->_add_error("Can't call buffer overflow handler: $error");
320             }
321             }
322             }
323              
324             sub _write {
325 14     14   34 my $self = shift;
326 14         33097 my $data = shift;
327 14         141 my $length = length($data);
328 14         40 my $retry = my $written = 0;
329 14 100       642 die "Connection is not available" unless $self->socket_io;
330              
331 2     0   113 local $SIG{"PIPE"} = sub { die $! };
  0         0  
332              
333 2         13 while ($written < $length) {
334 2         58 my ($s) = $self->selector->can_write($self->timeout);
335 2 50       185 die "send write timed out" unless $s;
336 2         47 my $nwrite
337             = $s->syswrite($data, $self->write_length, $written);
338              
339 2 50       161 if (!$nwrite) {
340 0 0       0 if ($retry > $self->max_write_retry) {
341 0         0 die "failed write retry; max write retry count. $!";
342             }
343 0         0 $retry++;
344             } else {
345 2         9 $written += $nwrite;
346             }
347             }
348 2         39 $written;
349             }
350              
351             sub DEMOLISH {
352 5     5 0 7417 my $self = shift;
353 5         39 $self->close;
354             }
355              
356              
357             1;
358             __END__