File Coverage

blib/lib/OpenTelemetry/Exporter/OTLP.pm
Criterion Covered Total %
statement 64 79 81.0
branch 2 4 50.0
condition n/a
subroutine 18 20 90.0
pod 1 3 33.3
total 85 106 80.1


line stmt bran cond sub pod time code
1 2     2   12 use Object::Pad ':experimental(init_expr)';
  2         7  
  2         11  
2             # ABSTRACT: An OpenTelemetry Protocol span exporter
3              
4             package OpenTelemetry::Exporter::OTLP;
5              
6             our $VERSION = '0.021';
7              
8 2     2   1430 class OpenTelemetry::Exporter::OTLP :does(OpenTelemetry::Exporter) {
  2         1276  
  2         182  
9 2     2   1091 use Feature::Compat::Try;
  2         927  
  2         8  
10 2     2   1137 use Future::AsyncAwait;
  2         42909  
  2         12  
11 2     2   1593 use HTTP::Tiny;
  2         125704  
  2         91  
12 2     2   907 use Module::Runtime 'require_module';
  2         3675  
  2         16  
13 2     2   859 use OpenTelemetry::Common qw( config maybe_timeout timeout_timestamp );
  2         66012  
  2         234  
14 2     2   18 use OpenTelemetry::Constants -trace_export;
  2         4  
  2         14  
15 2     2   2728 use OpenTelemetry::Context;
  2         47689  
  2         148  
16 2     2   1127 use OpenTelemetry::Trace;
  2         20557  
  2         93  
17 2     2   38 use OpenTelemetry::X;
  2         3  
  2         93  
18 2     2   1160 use Syntax::Keyword::Dynamically;
  2         2524  
  2         25  
19 2     2   1177 use Syntax::Keyword::Match;
  2         7600  
  2         13  
20 2     2   210 use Time::HiRes 'sleep';
  2         5  
  2         17  
21 2     2   1445 use Time::Piece;
  2         32922  
  2         8  
22 2     2   935 use URL::Encode 'url_decode';
  2         8551  
  2         353  
23              
24             my $CAN_USE_PROTOBUF = eval {
25             require Google::ProtocolBuffers::Dynamic;
26             1;
27             };
28              
29             my $PROTOCOL = $CAN_USE_PROTOBUF ? 'http/protobuf' : 'http/json';
30              
31             my $COMPRESSION = eval {
32             require Compress::Zlib;
33             'gzip';
34             } // 'none';
35              
36             my $logger = OpenTelemetry::Common::internal_logger;
37              
38 2         12 use Metrics::Any '$metrics', strict => 1,
39 2     2   744 name_prefix => [qw( otel exporter otlp )];
  2         10207  
40              
41             $metrics->make_counter( 'success',
42             name => [qw( success )],
43             description => 'Number of times the export process succeeded',
44             );
45              
46             $metrics->make_counter( 'failure',
47             name => [qw( failure )],
48             description => 'Number of times the export process failed',
49             labels => [qw( reason )],
50             );
51              
52             $metrics->make_distribution( 'uncompressed',
53             name => [qw( message uncompressed size )],
54             description => 'Size of exporter payload before compression',
55             units => 'bytes',
56             );
57              
58             $metrics->make_distribution( 'compressed',
59             name => [qw( message compressed size )],
60             description => 'Size of exporter payload after compression',
61             units => 'bytes',
62             );
63              
64             $metrics->make_timer( 'request',
65             name => [qw( request duration )],
66             description => 'Duration of the export request',
67             labels => [qw( status )],
68             );
69              
70             field $stopped;
71             field $ua;
72             field $endpoint;
73             field $compression;
74             field $encoder;
75             field $retries;
76              
77             ADJUSTPARAMS ($params) {
78             $endpoint = delete $params->{endpoint}
79             // config('EXPORTER_OTLP_TRACES_ENDPOINT')
80             // do {
81             my $base = config('EXPORTER_OTLP_ENDPOINT')
82             // 'http://localhost:4318';
83              
84             ( $base =~ s|/+$||r ) . '/v1/traces';
85             };
86              
87             $compression = delete $params->{compression}
88             // config()
89             // $COMPRESSION;
90              
91             $retries = delete $params->{retries} // 5;
92              
93             my $timeout = delete $params->{timeout}
94             // config()
95             // 10;
96              
97             my $headers = delete $params->{headers}
98             // config()
99             // {};
100              
101             $headers = {
102             map {
103             my ( $k, $v ) = map url_decode($_), split '=', $_, 2;
104             $k =~ s/^\s+|\s+$//g;
105             $v =~ s/^\s+|\s+$//g;
106             $k => $v;
107             } split ',', $headers
108             } unless ref $headers;
109              
110             die OpenTelemetry::X->create(
111             Invalid => "invalid URL for OTLP exporter: $endpoint"
112             ) unless "$endpoint" =~ m|^https?://|;
113              
114             die OpenTelemetry::X->create(
115             Unsupported => "unsupported compression key for OTLP exporter: $compression"
116             ) unless $compression =~ /^(?:gzip|none)$/;
117              
118             $headers->{'Content-Encoding'} = $compression unless $compression eq 'none';
119              
120             $encoder = do {
121             my $protocol = delete $params->{protocol}
122             // config('EXPORTER_OTLP_PROTOCOL')
123             // $PROTOCOL;
124              
125             die OpenTelemetry::X->create(
126             Unsupported => "unsupported protocol for OTLP exporter: $protocol",
127             ) unless $protocol =~ /^http\/(protobuf|json)$/;
128              
129             my $class = 'OpenTelemetry::Exporter::OTLP::Encoder::';
130             $class .= 'Protobuf' if $1 eq 'protobuf';
131             $class .= 'JSON' if $1 eq 'json';
132              
133             try {
134             require_module $class;
135             $class->new;
136             }
137             catch ($e) {
138             $logger->warn(
139             'Could not load OTLP encoder class. Defaulting to JSON',
140             { class => $class, error => $e },
141             );
142              
143             require OpenTelemetry::Exporter::OTLP::Encoder::JSON;
144             OpenTelemetry::Exporter::OTLP::Encoder::JSON->new;
145             }
146             };
147              
148             my %ssl_options;
149             {
150             my $ca = delete $params->{certificate} // config(qw(
151             EXPORTER_OTLP_TRACES_CERTIFICATE
152             EXPORTER_OTLP_CERTIFICATE
153             ));
154              
155             my $key = delete $params->{client_key} // config(qw(
156             EXPORTER_OTLP_TRACES_CLIENT_KEY
157             EXPORTER_OTLP_CLIENT_KEY
158             ));
159              
160             my $cert = delete $params->{client_certificate} // config(qw(
161             EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE
162             EXPORTER_OTLP_CLIENT_CERTIFICATE
163             ));
164              
165             $ssl_options{SSL_ca_file} = $ca if $ca;
166             $ssl_options{SSL_key_file} = $key if $key;
167             $ssl_options{SSL_cert_file} = $cert if $cert;
168             };
169              
170             $ua = HTTP::Tiny->new(
171             timeout => $timeout,
172             agent => "OTel-OTLP-Exporter-Perl/$VERSION",
173             default_headers => {
174             %$headers,
175             'Content-Type' => $encoder->content_type,
176             },
177             %ssl_options ? ( SSL_options => \%ssl_options ) : (),
178             );
179             }
180              
181             method $maybe_backoff ( $attempts, $after = undef ) {
182             $after //= 0; # Breaks tests in Perls under 5.38 if in signature
183              
184             return if $attempts > $retries;
185              
186             my $sleep;
187             try {
188             my $date = Time::Piece->strptime($after, '%a, %d %b %Y %T %Z');
189             $sleep = ( $date - localtime )->seconds;
190             }
191             catch($e) {
192             die $e unless $e =~ /^Error parsing time/;
193             $sleep = $after if $after > 0;
194             }
195             $sleep //= int rand 2 ** $attempts;
196              
197             sleep $sleep + rand;
198              
199             return 1;
200             }
201              
202             method $send_request ( $data, $timeout ) {
203             my %request = ( content => $data );
204              
205             $metrics->report_distribution(
206             uncompressed => length $request{content},
207             );
208              
209             if ( $compression eq 'gzip' ) {
210             require Compress::Zlib;
211             $request{content} = Compress::Zlib::memGzip($request{content});
212              
213             unless ($request{content}) {
214             OpenTelemetry->handle_error(
215             message => "Error compressing data: $Compress::Zlib::gzerrno"
216             );
217              
218             $metrics->inc_counter(
219             failure => [ reason => 'zlib_error' ],
220             );
221              
222             return TRACE_EXPORT_FAILURE;
223             }
224              
225             $metrics->report_distribution(
226             compressed => length $request{content},
227             );
228             }
229              
230             my $start = timeout_timestamp;
231             my $attempts = 0;
232             while (1) {
233             my $remaining = maybe_timeout $timeout, $start;
234             return TRACE_EXPORT_TIMEOUT if $timeout && !$remaining;
235              
236             # We are changing the state of the user-agent here
237             # There doesn't seem to be another way to do this.
238             # As long as this exporter is running with the Batch
239             # processor, it should only be processing one request
240             # at a time, so this should not be a problem.
241             $ua->timeout($remaining);
242              
243             my $request_start = timeout_timestamp;
244             my $res = $ua->post( $endpoint, \%request );
245             my $request_end = timeout_timestamp;
246              
247             $metrics->report_timer(
248             request => $request_end - $request_start,
249             [ status => $res->{status} ],
250             );
251              
252             if ( $res->{success} ) {
253             $metrics->inc_counter('success');
254             return TRACE_EXPORT_SUCCESS;
255             }
256              
257             match ( $res->{status} : =~ ) {
258             case( m/^ 599 $/x ) {
259             my $reason = do {
260             match ( $res->{content} : =~ ) {
261             case(m/^Timed out/) { 'timeout' }
262             case(m/^Could not connect /) { 'socket_error' }
263             case(m/^Could not .* socket /) { 'socket_error' }
264             case(m/^Socket closed /) { 'socket_error' }
265             case(m/^Wide character in write/) { 'socket_error' }
266             case(m/^Error halting .* SSL /) { 'ssl_error' }
267             case(m/^SSL connection failed /) { 'ssl_error' }
268             case(m/^Unexpected end of stream/) { 'eof_error' }
269             case(m/^Cannot parse/) { 'parse_error' }
270             default {
271             $metrics->inc_counter(
272             failure => [ reason => $res->{status} ],
273             );
274              
275             OpenTelemetry->handle_error(
276             exception => $res->{content},
277             message => 'Unhandled error sending OTLP request',
278             );
279              
280             return TRACE_EXPORT_FAILURE;
281             }
282             }
283             };
284              
285             $metrics->inc_counter( failure => [ reason => $reason ] );
286              
287             redo if $self->$maybe_backoff( ++$attempts );
288             }
289             case( m/^(?: 4 | 5 ) \d{2} $/ax ) {
290             my $code = $res->{status};
291              
292             $metrics->inc_counter( failure => [ reason => $code ] );
293              
294             if ( $CAN_USE_PROTOBUF ) {
295             try {
296             require OpenTelemetry::Proto;
297              
298             my $status = OTel::Google::RPC::Status
299             ->decode($res->{content});
300              
301             OpenTelemetry->handle_error(
302             exception => $status->encode_json,
303             message => 'OTLP exporter received an RPC error status',
304             );
305             }
306             catch($e) {
307             OpenTelemetry->handle_error(
308             exception => $e,
309             message => 'Unexpected error decoding RPC status in OTLP exporter',
310             );
311             }
312             }
313              
314             my $after = ( $code == 429 || $code == 503 )
315             ? $res->{headers}{'retry-after'}
316             : undef;
317              
318             # As-per https://opentelemetry.io/docs/specs/otlp/#failures-1
319             redo if ( $code == 429
320             || $code == 502
321             || $code == 503
322             || $code == 504
323             ) && $self->$maybe_backoff( ++$attempts, $after );
324             }
325             }
326              
327             return TRACE_EXPORT_FAILURE;
328             }
329             }
330              
331 4     4 1 4525 method export ( $data, $timeout = undef ) {
  4         24  
  4         10  
  4         11  
  4         6  
332 4 50       23 return TRACE_EXPORT_FAILURE if $stopped;
333 4 50       17 return unless @$data;
334              
335 4         13 try {
336 4         63 dynamically OpenTelemetry::Context->current
337             = OpenTelemetry::Trace->untraced_context;
338              
339 4         608 my $request = $encoder->encode($data);
340 4         84 my $result = $self->$send_request( $request, $timeout );
341              
342 4         12 $metrics->inc_counter('success');
343              
344 4         164 return $result;
345             }
346             catch($e) {
347 0           warn "Could not export data: $e";
348 0           return TRACE_EXPORT_FAILURE;
349             }
350             }
351              
352 0     0 0   async method shutdown ( $timeout = undef ) {
  0            
  0            
  0            
  0            
353 0           $stopped = 1;
354 0           TRACE_EXPORT_SUCCESS;
355             }
356              
357 0     0 0   async method force_flush ( $timeout = undef ) {
  0            
  0            
  0            
  0            
358 0           TRACE_EXPORT_SUCCESS;
359             }
360             }