| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
2
|
|
|
2
|
|
193061
|
use Object::Pad ':experimental(init_expr)'; |
|
|
2
|
|
|
|
|
18924
|
|
|
|
2
|
|
|
|
|
15
|
|
|
2
|
|
|
|
|
|
|
# ABSTRACT: A batched OpenTelemetry span processor |
|
3
|
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
package OpenTelemetry::SDK::Trace::Span::Processor::Batch; |
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.028'; |
|
7
|
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
class OpenTelemetry::SDK::Trace::Span::Processor::Batch |
|
9
|
|
|
|
|
|
|
:does(OpenTelemetry::Trace::Span::Processor) |
|
10
|
1
|
|
|
1
|
|
1224
|
{ |
|
|
1
|
|
|
|
|
2455
|
|
|
|
1
|
|
|
|
|
150
|
|
|
11
|
2
|
|
|
2
|
|
1265
|
use Feature::Compat::Defer; |
|
|
2
|
|
|
|
|
526
|
|
|
|
2
|
|
|
|
|
16
|
|
|
12
|
2
|
|
|
2
|
|
831
|
use Feature::Compat::Try; |
|
|
2
|
|
|
|
|
572
|
|
|
|
2
|
|
|
|
|
40
|
|
|
13
|
2
|
|
|
2
|
|
1057
|
use Future::AsyncAwait; |
|
|
2
|
|
|
|
|
37435
|
|
|
|
2
|
|
|
|
|
19
|
|
|
14
|
2
|
|
|
2
|
|
2876
|
use IO::Async::Function; |
|
|
2
|
|
|
|
|
240192
|
|
|
|
2
|
|
|
|
|
127
|
|
|
15
|
2
|
|
|
2
|
|
2052
|
use IO::Async::Loop; |
|
|
2
|
|
|
|
|
40043
|
|
|
|
2
|
|
|
|
|
558
|
|
|
16
|
2
|
|
|
2
|
|
715
|
use Mutex; |
|
|
2
|
|
|
|
|
495
|
|
|
|
2
|
|
|
|
|
134
|
|
|
17
|
2
|
|
|
2
|
|
517
|
use OpenTelemetry::Common qw( config timeout_timestamp maybe_timeout ); |
|
|
2
|
|
|
|
|
54874
|
|
|
|
2
|
|
|
|
|
241
|
|
|
18
|
2
|
|
|
2
|
|
17
|
use OpenTelemetry::Constants -trace_export; |
|
|
2
|
|
|
|
|
21
|
|
|
|
2
|
|
|
|
|
21
|
|
|
19
|
2
|
|
|
2
|
|
2346
|
use OpenTelemetry::X; |
|
|
2
|
|
|
|
|
7147
|
|
|
|
2
|
|
|
|
|
97
|
|
|
20
|
2
|
|
|
2
|
|
511
|
use OpenTelemetry; |
|
|
2
|
|
|
|
|
84522
|
|
|
|
2
|
|
|
|
|
29
|
|
|
21
|
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
my $logger = OpenTelemetry::Common::internal_logger; |
|
23
|
|
|
|
|
|
|
|
|
24
|
2
|
|
|
|
|
26
|
use Metrics::Any '$metrics', strict => 1, |
|
25
|
2
|
|
|
2
|
|
809
|
name_prefix => [qw( otel bsp )]; |
|
|
2
|
|
|
|
|
4
|
|
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
$metrics->make_counter( 'failure', |
|
28
|
|
|
|
|
|
|
description => 'Number of times the span processing pipeline failed irrecoverably', |
|
29
|
|
|
|
|
|
|
); |
|
30
|
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
$metrics->make_counter( 'success', |
|
32
|
|
|
|
|
|
|
description => 'Number of spans that were successfully processed', |
|
33
|
|
|
|
|
|
|
); |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
$metrics->make_counter( 'dropped', |
|
36
|
|
|
|
|
|
|
name => [qw( spans dropped )], |
|
37
|
|
|
|
|
|
|
description => 'Number of spans that could not be processed and were dropped', |
|
38
|
|
|
|
|
|
|
labels => [qw( reason )], |
|
39
|
|
|
|
|
|
|
); |
|
40
|
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
$metrics->make_counter( 'processed', |
|
42
|
|
|
|
|
|
|
name => [qw( spans processed )], |
|
43
|
|
|
|
|
|
|
description => 'Number of spans that were successfully processed', |
|
44
|
|
|
|
|
|
|
); |
|
45
|
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
$metrics->make_gauge( 'buffer_use', |
|
47
|
|
|
|
|
|
|
name => [qw( buffer utilization )], |
|
48
|
|
|
|
|
|
|
description => 'Number of spans that could not be processed and were dropped', |
|
49
|
|
|
|
|
|
|
); |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
field $batch_size :param //= config('BSP_MAX_EXPORT_BATCH_SIZE') // 512; |
|
52
|
|
|
|
|
|
|
field $exporter_timeout :param //= config('BSP_EXPORT_TIMEOUT') // 30_000; |
|
53
|
|
|
|
|
|
|
field $max_queue_size :param //= config('BSP_MAX_QUEUE_SIZE') // 2_048; |
|
54
|
|
|
|
|
|
|
field $schedule_delay :param //= config('BSP_SCHEDULE_DELAY') // 5_000; |
|
55
|
|
|
|
|
|
|
field $exporter :param; |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
field $lock = Mutex->new; |
|
58
|
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
field $done; |
|
60
|
|
|
|
|
|
|
field $function; |
|
61
|
|
|
|
|
|
|
field @queue; |
|
62
|
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
ADJUST { |
|
64
|
|
|
|
|
|
|
die OpenTelemetry::X->create( |
|
65
|
|
|
|
|
|
|
Invalid => "Exporter must implement the OpenTelemetry::Exporter interface: " . ( ref $exporter || $exporter ) |
|
66
|
|
|
|
|
|
|
) unless $exporter && $exporter->DOES('OpenTelemetry::Exporter'); |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
if ( $batch_size > $max_queue_size ) { |
|
69
|
|
|
|
|
|
|
$logger->warn( |
|
70
|
|
|
|
|
|
|
'Max export batch size cannot be greater than maximum queue size when instantiating batch processor', |
|
71
|
|
|
|
|
|
|
{ |
|
72
|
|
|
|
|
|
|
batch_size => $batch_size, |
|
73
|
|
|
|
|
|
|
queue_size => $max_queue_size, |
|
74
|
|
|
|
|
|
|
}, |
|
75
|
|
|
|
|
|
|
); |
|
76
|
|
|
|
|
|
|
$batch_size = $max_queue_size; |
|
77
|
|
|
|
|
|
|
} |
|
78
|
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
# This is a non-standard variable, so we make it Perl-specific |
|
80
|
|
|
|
|
|
|
my $max_workers = $ENV{OTEL_PERL_BSP_MAX_WORKERS}; |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
$function = IO::Async::Function->new( |
|
83
|
|
|
|
|
|
|
$max_workers ? ( max_workers => $max_workers ) : (), |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
code => sub ( $exporter, $batch, $timeout ) { |
|
86
|
|
|
|
|
|
|
$exporter->export( $batch, $timeout ); |
|
87
|
|
|
|
|
|
|
}, |
|
88
|
|
|
|
|
|
|
); |
|
89
|
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
IO::Async::Loop->new->add($function); |
|
91
|
|
|
|
|
|
|
} |
|
92
|
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
method $report_dropped_spans ( $reason, $count ) { |
|
94
|
|
|
|
|
|
|
$metrics->inc_counter_by( dropped => $count, [ reason => $reason ] ); |
|
95
|
|
|
|
|
|
|
} |
|
96
|
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
method $report_result ( $code, $count ) { |
|
98
|
|
|
|
|
|
|
if ( $code == TRACE_EXPORT_SUCCESS ) { |
|
99
|
|
|
|
|
|
|
$metrics->inc_counter('success'); |
|
100
|
|
|
|
|
|
|
$metrics->inc_counter_by( processed => $count ); |
|
101
|
|
|
|
|
|
|
return; |
|
102
|
|
|
|
|
|
|
} |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
OpenTelemetry->handle_error( |
|
105
|
|
|
|
|
|
|
exception => sprintf( |
|
106
|
|
|
|
|
|
|
'Unable to export %s span%s', $count, $count ? 's' : '' |
|
107
|
|
|
|
|
|
|
), |
|
108
|
|
|
|
|
|
|
); |
|
109
|
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
$metrics->inc_counter('failure'); |
|
111
|
|
|
|
|
|
|
$self->$report_dropped_spans( 'export-failure' => $count ); |
|
112
|
|
|
|
|
|
|
} |
|
113
|
|
|
|
|
|
|
|
|
114
|
5
|
|
|
5
|
1
|
25937
|
method process ( @items ) { |
|
|
5
|
|
|
|
|
82
|
|
|
|
5
|
|
|
|
|
25
|
|
|
|
5
|
|
|
|
|
7
|
|
|
115
|
|
|
|
|
|
|
my $batch = $lock->enter( |
|
116
|
|
|
|
|
|
|
sub { |
|
117
|
5
|
|
|
5
|
|
474
|
my $overflow = @queue + @items- $max_queue_size; |
|
118
|
5
|
50
|
|
|
|
43
|
if ( $overflow > 0 ) { |
|
119
|
|
|
|
|
|
|
# If the buffer is full, we drop old spans first |
|
120
|
|
|
|
|
|
|
# The queue is always FIFO, even for dropped spans |
|
121
|
|
|
|
|
|
|
# This behaviour is not in the spec, but is |
|
122
|
|
|
|
|
|
|
# consistent with the Ruby implementation. |
|
123
|
|
|
|
|
|
|
# For context, the Go implementation instead |
|
124
|
|
|
|
|
|
|
# blocks until there is room in the buffer. |
|
125
|
0
|
|
|
|
|
0
|
splice @queue, 0, $overflow; |
|
126
|
0
|
|
|
|
|
0
|
$self->$report_dropped_spans( |
|
127
|
|
|
|
|
|
|
'buffer-full' => $overflow, |
|
128
|
|
|
|
|
|
|
); |
|
129
|
|
|
|
|
|
|
} |
|
130
|
|
|
|
|
|
|
|
|
131
|
5
|
|
|
|
|
17
|
push @queue, @items; |
|
132
|
|
|
|
|
|
|
|
|
133
|
5
|
100
|
|
|
|
47
|
return [] if @queue < $batch_size; |
|
134
|
|
|
|
|
|
|
|
|
135
|
1
|
50
|
|
|
|
37
|
$metrics->set_gauge_to( |
|
136
|
|
|
|
|
|
|
buffer_use => @queue / $max_queue_size, |
|
137
|
|
|
|
|
|
|
) if @queue; |
|
138
|
|
|
|
|
|
|
|
|
139
|
1
|
|
|
|
|
1203
|
[ splice @queue, 0, $batch_size ]; |
|
140
|
|
|
|
|
|
|
} |
|
141
|
5
|
|
|
|
|
441
|
); |
|
142
|
|
|
|
|
|
|
|
|
143
|
5
|
100
|
|
|
|
328
|
return unless @$batch; |
|
144
|
|
|
|
|
|
|
|
|
145
|
1
|
|
|
|
|
21
|
$function->call( |
|
146
|
|
|
|
|
|
|
args => [ $exporter, $batch, $exporter_timeout ], |
|
147
|
1
|
|
|
1
|
|
3399
|
on_result => sub ( $type, $result ) { |
|
|
1
|
|
|
|
|
15
|
|
|
|
1
|
|
|
|
|
11
|
|
|
148
|
1
|
|
|
|
|
4
|
my $count = @$batch; |
|
149
|
1
|
50
|
|
|
|
6
|
return $self->$report_result( TRACE_EXPORT_FAILURE, $count ) |
|
150
|
|
|
|
|
|
|
unless $type eq 'return'; |
|
151
|
|
|
|
|
|
|
|
|
152
|
1
|
|
|
|
|
6
|
$self->$report_result( $result, $count ); |
|
153
|
|
|
|
|
|
|
}, |
|
154
|
1
|
|
|
|
|
47
|
); |
|
155
|
|
|
|
|
|
|
|
|
156
|
1
|
|
|
|
|
4508
|
return; |
|
157
|
|
|
|
|
|
|
} |
|
158
|
|
|
|
|
|
|
|
|
159
|
2
|
|
|
2
|
1
|
65028
|
method on_start ( $span, $context ) { } |
|
|
2
|
|
|
|
|
43
|
|
|
|
2
|
|
|
|
|
6
|
|
|
|
2
|
|
|
|
|
86
|
|
|
160
|
|
|
|
|
|
|
|
|
161
|
10
|
|
|
10
|
1
|
152936
|
method on_end ($span) { |
|
|
10
|
|
|
|
|
228
|
|
|
|
10
|
|
|
|
|
121
|
|
|
|
10
|
|
|
|
|
42
|
|
|
162
|
10
|
|
|
|
|
113
|
try { |
|
163
|
10
|
100
|
|
|
|
107
|
return if $done; |
|
164
|
9
|
100
|
|
|
|
109
|
return unless $span->context->trace_flags->sampled; |
|
165
|
5
|
|
|
|
|
27440
|
$self->process( $span->snapshot ); |
|
166
|
|
|
|
|
|
|
} |
|
167
|
|
|
|
|
|
|
catch ($e) { |
|
168
|
2
|
|
|
|
|
122
|
OpenTelemetry->handle_error( |
|
169
|
|
|
|
|
|
|
exception => $e, |
|
170
|
|
|
|
|
|
|
message => 'unexpected error in ' . ref($self) . '->on_end', |
|
171
|
|
|
|
|
|
|
); |
|
172
|
|
|
|
|
|
|
} |
|
173
|
|
|
|
|
|
|
} |
|
174
|
|
|
|
|
|
|
|
|
175
|
6
|
|
|
6
|
1
|
46977
|
async method shutdown ( $timeout = undef ) { |
|
|
6
|
|
|
|
|
104
|
|
|
|
6
|
|
|
|
|
119
|
|
|
|
6
|
|
|
|
|
17
|
|
|
|
6
|
|
|
|
|
13
|
|
|
176
|
6
|
100
|
|
|
|
69
|
return TRACE_EXPORT_SUCCESS if $done; |
|
177
|
|
|
|
|
|
|
|
|
178
|
5
|
|
|
|
|
11
|
$done = 1; |
|
179
|
|
|
|
|
|
|
|
|
180
|
5
|
|
|
|
|
142
|
my $start = timeout_timestamp; |
|
181
|
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
# TODO: The Ruby implementation ignores whether the force_flush |
|
183
|
|
|
|
|
|
|
# times out. Is this correct? |
|
184
|
5
|
|
|
|
|
344
|
await $self->force_flush( maybe_timeout $timeout, $start ); |
|
185
|
|
|
|
|
|
|
|
|
186
|
5
|
50
|
|
|
|
737
|
$self->$report_dropped_spans( terminating => scalar @queue ) |
|
187
|
|
|
|
|
|
|
if @queue; |
|
188
|
|
|
|
|
|
|
|
|
189
|
5
|
|
|
|
|
36
|
@queue = (); |
|
190
|
|
|
|
|
|
|
|
|
191
|
5
|
50
|
|
|
|
50
|
$function->stop->get if $function->workers; |
|
192
|
|
|
|
|
|
|
|
|
193
|
5
|
|
|
|
|
61272
|
await $exporter->shutdown( maybe_timeout $timeout, $start ); |
|
194
|
|
|
|
|
|
|
} |
|
195
|
|
|
|
|
|
|
|
|
196
|
7
|
|
|
7
|
1
|
5261
|
async method force_flush ( $timeout = undef ) { |
|
|
7
|
|
|
|
|
21
|
|
|
|
7
|
|
|
|
|
97
|
|
|
|
7
|
|
|
|
|
42
|
|
|
|
7
|
|
|
|
|
35
|
|
|
197
|
7
|
100
|
|
|
|
377
|
return TRACE_EXPORT_SUCCESS if $done; |
|
198
|
|
|
|
|
|
|
|
|
199
|
1
|
|
|
|
|
62
|
my $start = timeout_timestamp; |
|
200
|
|
|
|
|
|
|
|
|
201
|
1
|
|
|
1
|
|
78
|
my @stack = $lock->enter( sub { splice @queue, 0, @queue } ); |
|
|
1
|
|
|
|
|
58
|
|
|
202
|
|
|
|
|
|
|
|
|
203
|
1
|
|
|
|
|
40
|
defer { |
|
204
|
|
|
|
|
|
|
# If we still have any spans left it has to be because we |
|
205
|
|
|
|
|
|
|
# timed out and couldn't export them. In that case, we drop |
|
206
|
|
|
|
|
|
|
# them and report |
|
207
|
1
|
50
|
|
|
|
560
|
$self->$report_dropped_spans( 'force-flush' => scalar @stack ) |
|
208
|
|
|
|
|
|
|
if @stack; |
|
209
|
|
|
|
|
|
|
} |
|
210
|
|
|
|
|
|
|
|
|
211
|
1
|
|
|
|
|
47
|
while ( @stack ) { |
|
212
|
1
|
|
|
|
|
23
|
my $remaining = maybe_timeout $timeout, $start; |
|
213
|
1
|
50
|
33
|
|
|
62
|
return TRACE_EXPORT_TIMEOUT if $timeout and !$remaining; |
|
214
|
|
|
|
|
|
|
|
|
215
|
1
|
|
|
|
|
5
|
my $batch = [ splice @stack, 0, $batch_size ]; |
|
216
|
1
|
|
|
|
|
3
|
my $count = @$batch; |
|
217
|
|
|
|
|
|
|
|
|
218
|
1
|
|
|
|
|
4
|
try { |
|
219
|
1
|
|
|
|
|
46
|
my $result = await $function->call( |
|
220
|
|
|
|
|
|
|
args => [ $exporter, $batch, $remaining ], |
|
221
|
|
|
|
|
|
|
); |
|
222
|
|
|
|
|
|
|
|
|
223
|
1
|
|
|
|
|
9393
|
$self->$report_result( $result, $count ); |
|
224
|
|
|
|
|
|
|
|
|
225
|
1
|
50
|
|
|
|
11
|
return $result unless $result == TRACE_EXPORT_SUCCESS; |
|
226
|
|
|
|
|
|
|
} |
|
227
|
|
|
|
|
|
|
catch ($e) { |
|
228
|
0
|
|
|
|
|
0
|
return $self->$report_result( TRACE_EXPORT_FAILURE, $count ); |
|
229
|
|
|
|
|
|
|
} |
|
230
|
|
|
|
|
|
|
} |
|
231
|
|
|
|
|
|
|
|
|
232
|
1
|
|
|
|
|
26
|
await $exporter->force_flush( maybe_timeout $timeout, $start ); |
|
233
|
|
|
|
|
|
|
} |
|
234
|
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
method DESTROY { |
|
236
|
|
|
|
|
|
|
try { $function->stop->get } |
|
237
|
|
|
|
|
|
|
catch ($e) { } |
|
238
|
|
|
|
|
|
|
} |
|
239
|
|
|
|
|
|
|
} |