File Coverage

blib/lib/OpenTelemetry/SDK/Trace/Span/Processor/Batch.pm
Criterion Covered Total %
statement 107 110 97.2
branch 20 28 71.4
condition 1 3 33.3
subroutine 21 21 100.0
pod 5 5 100.0
total 154 167 92.2


line stmt bran cond sub pod time code
1 2     2   193061 use Object::Pad ':experimental(init_expr)';
  2         18924  
  2         15  
2             # ABSTRACT: A batched OpenTelemetry span processor
3              
4             package OpenTelemetry::SDK::Trace::Span::Processor::Batch;
5              
6             our $VERSION = '0.028';
7              
8             class OpenTelemetry::SDK::Trace::Span::Processor::Batch
9             :does(OpenTelemetry::Trace::Span::Processor)
10 1     1   1224 {
  1         2455  
  1         150  
11 2     2   1265 use Feature::Compat::Defer;
  2         526  
  2         16  
12 2     2   831 use Feature::Compat::Try;
  2         572  
  2         40  
13 2     2   1057 use Future::AsyncAwait;
  2         37435  
  2         19  
14 2     2   2876 use IO::Async::Function;
  2         240192  
  2         127  
15 2     2   2052 use IO::Async::Loop;
  2         40043  
  2         558  
16 2     2   715 use Mutex;
  2         495  
  2         134  
17 2     2   517 use OpenTelemetry::Common qw( config timeout_timestamp maybe_timeout );
  2         54874  
  2         241  
18 2     2   17 use OpenTelemetry::Constants -trace_export;
  2         21  
  2         21  
19 2     2   2346 use OpenTelemetry::X;
  2         7147  
  2         97  
20 2     2   511 use OpenTelemetry;
  2         84522  
  2         29  
21              
22             my $logger = OpenTelemetry::Common::internal_logger;
23              
24 2         26 use Metrics::Any '$metrics', strict => 1,
25 2     2   809 name_prefix => [qw( otel bsp )];
  2         4  
26              
27             $metrics->make_counter( 'failure',
28             description => 'Number of times the span processing pipeline failed irrecoverably',
29             );
30              
31             $metrics->make_counter( 'success',
32             description => 'Number of spans that were successfully processed',
33             );
34              
35             $metrics->make_counter( 'dropped',
36             name => [qw( spans dropped )],
37             description => 'Number of spans that could not be processed and were dropped',
38             labels => [qw( reason )],
39             );
40              
41             $metrics->make_counter( 'processed',
42             name => [qw( spans processed )],
43             description => 'Number of spans that were successfully processed',
44             );
45              
46             $metrics->make_gauge( 'buffer_use',
47             name => [qw( buffer utilization )],
48             description => 'Number of spans that could not be processed and were dropped',
49             );
50              
51             field $batch_size :param //= config('BSP_MAX_EXPORT_BATCH_SIZE') // 512;
52             field $exporter_timeout :param //= config('BSP_EXPORT_TIMEOUT') // 30_000;
53             field $max_queue_size :param //= config('BSP_MAX_QUEUE_SIZE') // 2_048;
54             field $schedule_delay :param //= config('BSP_SCHEDULE_DELAY') // 5_000;
55             field $exporter :param;
56              
57             field $lock = Mutex->new;
58              
59             field $done;
60             field $function;
61             field @queue;
62              
63             ADJUST {
64             die OpenTelemetry::X->create(
65             Invalid => "Exporter must implement the OpenTelemetry::Exporter interface: " . ( ref $exporter || $exporter )
66             ) unless $exporter && $exporter->DOES('OpenTelemetry::Exporter');
67              
68             if ( $batch_size > $max_queue_size ) {
69             $logger->warn(
70             'Max export batch size cannot be greater than maximum queue size when instantiating batch processor',
71             {
72             batch_size => $batch_size,
73             queue_size => $max_queue_size,
74             },
75             );
76             $batch_size = $max_queue_size;
77             }
78              
79             # This is a non-standard variable, so we make it Perl-specific
80             my $max_workers = $ENV{OTEL_PERL_BSP_MAX_WORKERS};
81              
82             $function = IO::Async::Function->new(
83             $max_workers ? ( max_workers => $max_workers ) : (),
84              
85             code => sub ( $exporter, $batch, $timeout ) {
86             $exporter->export( $batch, $timeout );
87             },
88             );
89              
90             IO::Async::Loop->new->add($function);
91             }
92              
93             method $report_dropped_spans ( $reason, $count ) {
94             $metrics->inc_counter_by( dropped => $count, [ reason => $reason ] );
95             }
96              
97             method $report_result ( $code, $count ) {
98             if ( $code == TRACE_EXPORT_SUCCESS ) {
99             $metrics->inc_counter('success');
100             $metrics->inc_counter_by( processed => $count );
101             return;
102             }
103              
104             OpenTelemetry->handle_error(
105             exception => sprintf(
106             'Unable to export %s span%s', $count, $count ? 's' : ''
107             ),
108             );
109              
110             $metrics->inc_counter('failure');
111             $self->$report_dropped_spans( 'export-failure' => $count );
112             }
113              
114 5     5 1 25937 method process ( @items ) {
  5         82  
  5         25  
  5         7  
115             my $batch = $lock->enter(
116             sub {
117 5     5   474 my $overflow = @queue + @items- $max_queue_size;
118 5 50       43 if ( $overflow > 0 ) {
119             # If the buffer is full, we drop old spans first
120             # The queue is always FIFO, even for dropped spans
121             # This behaviour is not in the spec, but is
122             # consistent with the Ruby implementation.
123             # For context, the Go implementation instead
124             # blocks until there is room in the buffer.
125 0         0 splice @queue, 0, $overflow;
126 0         0 $self->$report_dropped_spans(
127             'buffer-full' => $overflow,
128             );
129             }
130              
131 5         17 push @queue, @items;
132              
133 5 100       47 return [] if @queue < $batch_size;
134              
135 1 50       37 $metrics->set_gauge_to(
136             buffer_use => @queue / $max_queue_size,
137             ) if @queue;
138              
139 1         1203 [ splice @queue, 0, $batch_size ];
140             }
141 5         441 );
142              
143 5 100       328 return unless @$batch;
144              
145 1         21 $function->call(
146             args => [ $exporter, $batch, $exporter_timeout ],
147 1     1   3399 on_result => sub ( $type, $result ) {
  1         15  
  1         11  
148 1         4 my $count = @$batch;
149 1 50       6 return $self->$report_result( TRACE_EXPORT_FAILURE, $count )
150             unless $type eq 'return';
151              
152 1         6 $self->$report_result( $result, $count );
153             },
154 1         47 );
155              
156 1         4508 return;
157             }
158              
159 2     2 1 65028 method on_start ( $span, $context ) { }
  2         43  
  2         6  
  2         86  
160              
161 10     10 1 152936 method on_end ($span) {
  10         228  
  10         121  
  10         42  
162 10         113 try {
163 10 100       107 return if $done;
164 9 100       109 return unless $span->context->trace_flags->sampled;
165 5         27440 $self->process( $span->snapshot );
166             }
167             catch ($e) {
168 2         122 OpenTelemetry->handle_error(
169             exception => $e,
170             message => 'unexpected error in ' . ref($self) . '->on_end',
171             );
172             }
173             }
174              
175 6     6 1 46977 async method shutdown ( $timeout = undef ) {
  6         104  
  6         119  
  6         17  
  6         13  
176 6 100       69 return TRACE_EXPORT_SUCCESS if $done;
177              
178 5         11 $done = 1;
179              
180 5         142 my $start = timeout_timestamp;
181              
182             # TODO: The Ruby implementation ignores whether the force_flush
183             # times out. Is this correct?
184 5         344 await $self->force_flush( maybe_timeout $timeout, $start );
185              
186 5 50       737 $self->$report_dropped_spans( terminating => scalar @queue )
187             if @queue;
188              
189 5         36 @queue = ();
190              
191 5 50       50 $function->stop->get if $function->workers;
192              
193 5         61272 await $exporter->shutdown( maybe_timeout $timeout, $start );
194             }
195              
196 7     7 1 5261 async method force_flush ( $timeout = undef ) {
  7         21  
  7         97  
  7         42  
  7         35  
197 7 100       377 return TRACE_EXPORT_SUCCESS if $done;
198              
199 1         62 my $start = timeout_timestamp;
200              
201 1     1   78 my @stack = $lock->enter( sub { splice @queue, 0, @queue } );
  1         58  
202              
203 1         40 defer {
204             # If we still have any spans left it has to be because we
205             # timed out and couldn't export them. In that case, we drop
206             # them and report
207 1 50       560 $self->$report_dropped_spans( 'force-flush' => scalar @stack )
208             if @stack;
209             }
210              
211 1         47 while ( @stack ) {
212 1         23 my $remaining = maybe_timeout $timeout, $start;
213 1 50 33     62 return TRACE_EXPORT_TIMEOUT if $timeout and !$remaining;
214              
215 1         5 my $batch = [ splice @stack, 0, $batch_size ];
216 1         3 my $count = @$batch;
217              
218 1         4 try {
219 1         46 my $result = await $function->call(
220             args => [ $exporter, $batch, $remaining ],
221             );
222              
223 1         9393 $self->$report_result( $result, $count );
224              
225 1 50       11 return $result unless $result == TRACE_EXPORT_SUCCESS;
226             }
227             catch ($e) {
228 0         0 return $self->$report_result( TRACE_EXPORT_FAILURE, $count );
229             }
230             }
231              
232 1         26 await $exporter->force_flush( maybe_timeout $timeout, $start );
233             }
234              
235             method DESTROY {
236             try { $function->stop->get }
237             catch ($e) { }
238             }
239             }