Skip to content

Commit e224e19

Browse files
authored
Port event span event bridge from contrib (#8372)
1 parent b29f3df commit e224e19

7 files changed

Lines changed: 446 additions & 136 deletions

File tree

sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/LogRecordProcessorFactoryTest.java

Lines changed: 77 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -19,168 +19,109 @@
1919
import io.opentelemetry.sdk.declarativeconfig.internal.model.LogRecordProcessorPropertyModel;
2020
import io.opentelemetry.sdk.declarativeconfig.internal.model.OtlpHttpExporterModel;
2121
import io.opentelemetry.sdk.declarativeconfig.internal.model.SimpleLogRecordProcessorModel;
22+
import io.opentelemetry.sdk.extension.incubator.logs.EventToSpanEventBridge;
2223
import io.opentelemetry.sdk.logs.LogRecordProcessor;
2324
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
2425
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
25-
import java.io.Closeable;
2626
import java.time.Duration;
27-
import java.util.ArrayList;
28-
import java.util.List;
29-
import org.assertj.core.api.Assertions;
27+
import java.util.stream.Stream;
3028
import org.junit.jupiter.api.BeforeEach;
31-
import org.junit.jupiter.api.Test;
3229
import org.junit.jupiter.api.extension.RegisterExtension;
30+
import org.junit.jupiter.params.ParameterizedTest;
31+
import org.junit.jupiter.params.provider.Arguments;
32+
import org.junit.jupiter.params.provider.MethodSource;
3333

3434
class LogRecordProcessorFactoryTest {
3535

3636
@RegisterExtension CleanupExtension cleanup = new CleanupExtension();
3737

38-
private final DeclarativeConfigContext context =
39-
new DeclarativeConfigContext(ComponentLoader.forClassLoader(getClass().getClassLoader()));
38+
private static final DeclarativeConfigContext context =
39+
new DeclarativeConfigContext(
40+
ComponentLoader.forClassLoader(LogRecordProcessorFactoryTest.class.getClassLoader()));
4041

4142
@BeforeEach
4243
void setup() {
4344
context.setBuilder(new DeclarativeConfigurationBuilder());
4445
}
4546

46-
@Test
47-
void create_BatchNullExporter() {
48-
assertThatThrownBy(
49-
() ->
50-
LogRecordProcessorFactory.getInstance()
51-
.create(
52-
new LogRecordProcessorModel().withBatch(new BatchLogRecordProcessorModel()),
53-
context))
54-
.isInstanceOf(DeclarativeConfigException.class)
55-
.hasMessage("batch log record processor exporter is required but is null");
56-
}
57-
58-
@Test
59-
void create_BatchDefaults() {
60-
List<Closeable> closeables = new ArrayList<>();
61-
BatchLogRecordProcessor expectedProcessor =
62-
BatchLogRecordProcessor.builder(
63-
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build())
64-
.build();
65-
cleanup.addCloseable(expectedProcessor);
66-
67-
LogRecordProcessor processor =
68-
LogRecordProcessorFactory.getInstance()
69-
.create(
70-
new LogRecordProcessorModel()
71-
.withBatch(
72-
new BatchLogRecordProcessorModel()
73-
.withExporter(
74-
new LogRecordExporterModel()
75-
.withOtlpHttp(new OtlpHttpExporterModel()))),
76-
context);
77-
cleanup.addCloseable(processor);
78-
cleanup.addCloseables(closeables);
79-
80-
assertThat(processor.toString()).isEqualTo(expectedProcessor.toString());
81-
}
82-
83-
@Test
84-
void create_BatchConfigured() {
85-
List<Closeable> closeables = new ArrayList<>();
86-
BatchLogRecordProcessor expectedProcessor =
87-
BatchLogRecordProcessor.builder(
88-
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build())
89-
.setScheduleDelay(Duration.ofMillis(1))
90-
.setMaxExportBatchSize(2)
91-
.setExporterTimeout(Duration.ofMillis(3))
92-
.build();
47+
@ParameterizedTest
48+
@MethodSource("createTestCases")
49+
void create(LogRecordProcessorModel model, LogRecordProcessor expectedProcessor) {
9350
cleanup.addCloseable(expectedProcessor);
94-
95-
LogRecordProcessor processor =
96-
LogRecordProcessorFactory.getInstance()
97-
.create(
98-
new LogRecordProcessorModel()
99-
.withBatch(
100-
new BatchLogRecordProcessorModel()
101-
.withExporter(
102-
new LogRecordExporterModel()
103-
.withOtlpHttp(new OtlpHttpExporterModel()))
104-
.withScheduleDelay(1)
105-
.withMaxExportBatchSize(2)
106-
.withExportTimeout(3)),
107-
context);
51+
LogRecordProcessor processor = LogRecordProcessorFactory.getInstance().create(model, context);
10852
cleanup.addCloseable(processor);
109-
cleanup.addCloseables(closeables);
110-
11153
assertThat(processor.toString()).isEqualTo(expectedProcessor.toString());
11254
}
11355

114-
@Test
115-
void create_SimpleNullExporter() {
116-
assertThatThrownBy(
117-
() ->
118-
LogRecordProcessorFactory.getInstance()
119-
.create(
120-
new LogRecordProcessorModel()
121-
.withSimple(new SimpleLogRecordProcessorModel()),
122-
context))
123-
.isInstanceOf(DeclarativeConfigException.class)
124-
.hasMessage("simple log record processor exporter is required but is null");
125-
}
126-
127-
@Test
128-
void create_SimpleConfigured() {
129-
List<Closeable> closeables = new ArrayList<>();
130-
LogRecordProcessor expectedProcessor =
131-
SimpleLogRecordProcessor.create(
132-
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build());
133-
cleanup.addCloseable(expectedProcessor);
134-
135-
LogRecordProcessor processor =
136-
LogRecordProcessorFactory.getInstance()
137-
.create(
138-
new LogRecordProcessorModel()
139-
.withSimple(
140-
new SimpleLogRecordProcessorModel()
141-
.withExporter(
142-
new LogRecordExporterModel()
143-
.withOtlpHttp(new OtlpHttpExporterModel()))),
144-
context);
145-
cleanup.addCloseable(processor);
146-
cleanup.addCloseables(closeables);
147-
148-
assertThat(processor.toString()).isEqualTo(expectedProcessor.toString());
56+
private static Stream<Arguments> createTestCases() {
57+
return Stream.of(
58+
Arguments.of(
59+
new LogRecordProcessorModel()
60+
.withBatch(
61+
new BatchLogRecordProcessorModel()
62+
.withExporter(
63+
new LogRecordExporterModel()
64+
.withOtlpHttp(new OtlpHttpExporterModel()))),
65+
BatchLogRecordProcessor.builder(
66+
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build())
67+
.build()),
68+
Arguments.of(
69+
new LogRecordProcessorModel()
70+
.withBatch(
71+
new BatchLogRecordProcessorModel()
72+
.withExporter(
73+
new LogRecordExporterModel().withOtlpHttp(new OtlpHttpExporterModel()))
74+
.withScheduleDelay(1)
75+
.withMaxExportBatchSize(2)
76+
.withExportTimeout(3)),
77+
BatchLogRecordProcessor.builder(
78+
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build())
79+
.setScheduleDelay(Duration.ofMillis(1))
80+
.setMaxExportBatchSize(2)
81+
.setExporterTimeout(Duration.ofMillis(3))
82+
.build()),
83+
Arguments.of(
84+
new LogRecordProcessorModel()
85+
.withSimple(
86+
new SimpleLogRecordProcessorModel()
87+
.withExporter(
88+
new LogRecordExporterModel()
89+
.withOtlpHttp(new OtlpHttpExporterModel()))),
90+
SimpleLogRecordProcessor.create(
91+
OtlpHttpLogRecordExporter.builder().setComponentLoader(context).build())),
92+
Arguments.of(
93+
new LogRecordProcessorModel()
94+
.withAdditionalProperty(
95+
"event_to_span_event_bridge/development",
96+
new LogRecordProcessorPropertyModel()),
97+
EventToSpanEventBridge.create()),
98+
Arguments.of(
99+
new LogRecordProcessorModel()
100+
.withAdditionalProperty("test", new LogRecordProcessorPropertyModel()),
101+
LogRecordProcessorComponentProvider.TestLogRecordProcessor.create()));
149102
}
150103

151-
@Test
152-
void create_SpiProcessor_Unknown() {
153-
assertThatThrownBy(
154-
() ->
155-
LogRecordProcessorFactory.getInstance()
156-
.create(
157-
new LogRecordProcessorModel()
158-
.withAdditionalProperty(
159-
"unknown_key",
160-
new LogRecordProcessorPropertyModel()
161-
.withAdditionalProperty("key1", "value1")),
162-
context))
104+
@ParameterizedTest
105+
@MethodSource("createInvalidTestCases")
106+
void create_Invalid(LogRecordProcessorModel model, String expectedMessage) {
107+
assertThatThrownBy(() -> LogRecordProcessorFactory.getInstance().create(model, context))
163108
.isInstanceOf(DeclarativeConfigException.class)
164-
.hasMessage(
165-
"No component provider detected for io.opentelemetry.sdk.logs.LogRecordProcessor with name \"unknown_key\".");
109+
.hasMessage(expectedMessage);
166110
}
167111

168-
@Test
169-
void create_SpiExporter_Valid() {
170-
LogRecordProcessor logRecordProcessor =
171-
LogRecordProcessorFactory.getInstance()
172-
.create(
173-
new LogRecordProcessorModel()
174-
.withAdditionalProperty(
175-
"test",
176-
new LogRecordProcessorPropertyModel()
177-
.withAdditionalProperty("key1", "value1")),
178-
context);
179-
assertThat(logRecordProcessor)
180-
.isInstanceOf(LogRecordProcessorComponentProvider.TestLogRecordProcessor.class);
181-
Assertions.assertThat(
182-
((LogRecordProcessorComponentProvider.TestLogRecordProcessor) logRecordProcessor)
183-
.config.getString("key1"))
184-
.isEqualTo("value1");
112+
private static Stream<Arguments> createInvalidTestCases() {
113+
return Stream.of(
114+
Arguments.of(
115+
new LogRecordProcessorModel().withBatch(new BatchLogRecordProcessorModel()),
116+
"batch log record processor exporter is required but is null"),
117+
Arguments.of(
118+
new LogRecordProcessorModel().withSimple(new SimpleLogRecordProcessorModel()),
119+
"simple log record processor exporter is required but is null"),
120+
Arguments.of(
121+
new LogRecordProcessorModel()
122+
.withAdditionalProperty(
123+
"unknown_key",
124+
new LogRecordProcessorPropertyModel().withAdditionalProperty("key1", "value1")),
125+
"No component provider detected for io.opentelemetry.sdk.logs.LogRecordProcessor with name \"unknown_key\"."));
185126
}
186127
}

sdk-extensions/declarative-config/src/test/java/io/opentelemetry/sdk/autoconfigure/declarativeconfig/component/LogRecordProcessorComponentProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,21 @@ private TestLogRecordProcessor(DeclarativeConfigProperties config) {
3636
this.config = config;
3737
}
3838

39+
public static TestLogRecordProcessor create() {
40+
return new TestLogRecordProcessor(null);
41+
}
42+
3943
@Override
4044
public void onEmit(Context context, ReadWriteLogRecord logRecord) {}
4145

4246
@Override
4347
public CompletableResultCode shutdown() {
4448
return CompletableResultCode.ofSuccess();
4549
}
50+
51+
@Override
52+
public String toString() {
53+
return "TestLogRecordProcessor{}";
54+
}
4655
}
4756
}

sdk-extensions/incubator/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ dependencies {
1515

1616
annotationProcessor("com.google.auto.value:auto-value")
1717

18+
compileOnly(project(":api:incubator"))
19+
1820
// io.opentelemetry.sdk.extension.incubator.metric.viewconfig
1921
implementation(project(":sdk-extensions:autoconfigure-spi"))
2022
implementation("org.snakeyaml:snakeyaml-engine")
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.logs;
7+
8+
import io.opentelemetry.api.trace.Span;
9+
import io.opentelemetry.api.trace.SpanContext;
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
12+
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
13+
import io.opentelemetry.sdk.logs.data.LogRecordData;
14+
import java.util.concurrent.TimeUnit;
15+
16+
/**
17+
* A {@link LogRecordProcessor} which bridges events (i.e. log records with a non-empty {@code
18+
* event.name}) as span events on the current span.
19+
*
20+
* <p>A log record is bridged to a span event if and only if all of the following conditions are
21+
* met:
22+
*
23+
* <ul>
24+
* <li>The log record has a non-empty event name.
25+
* <li>The log record has a valid trace ID and span ID.
26+
* <li>The resolved context contains a current span whose {@link Span#isRecording()} is {@code
27+
* true}.
28+
* <li>The log record's trace ID and span ID equal those of the current span in the resolved
29+
* context.
30+
* </ul>
31+
*
32+
* <p>When bridged, a span event is added with:
33+
*
34+
* <ul>
35+
* <li>Name set to the log record's event name.
36+
* <li>Timestamp set to the log record's timestamp if set; otherwise, the observed timestamp.
37+
* <li>All log record attributes copied to the span event attributes.
38+
* </ul>
39+
*
40+
* <p>Bridging does NOT prevent the log record from continuing through the normal log processing
41+
* pipeline.
42+
*/
43+
public final class EventToSpanEventBridge implements LogRecordProcessor {
44+
45+
private EventToSpanEventBridge() {}
46+
47+
/** Create a new instance. */
48+
public static EventToSpanEventBridge create() {
49+
return new EventToSpanEventBridge();
50+
}
51+
52+
@Override
53+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
54+
LogRecordData logRecordData = logRecord.toLogRecordData();
55+
String eventName = logRecordData.getEventName();
56+
if (eventName == null || eventName.isEmpty()) {
57+
return;
58+
}
59+
SpanContext logSpanContext = logRecordData.getSpanContext();
60+
if (!logSpanContext.isValid()) {
61+
return;
62+
}
63+
Span currentSpan = Span.fromContext(context);
64+
if (!currentSpan.isRecording()) {
65+
return;
66+
}
67+
SpanContext currentSpanContext = currentSpan.getSpanContext();
68+
if (!currentSpanContext.getTraceId().equals(logSpanContext.getTraceId())
69+
|| !currentSpanContext.getSpanId().equals(logSpanContext.getSpanId())) {
70+
return;
71+
}
72+
long timestampNanos = logRecordData.getTimestampEpochNanos();
73+
if (timestampNanos == 0) {
74+
timestampNanos = logRecordData.getObservedTimestampEpochNanos();
75+
}
76+
currentSpan.addEvent(
77+
eventName, logRecordData.getAttributes(), timestampNanos, TimeUnit.NANOSECONDS);
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return "EventToSpanEventBridge{}";
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.logs.internal;
7+
8+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
9+
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
10+
import io.opentelemetry.sdk.extension.incubator.logs.EventToSpanEventBridge;
11+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
12+
13+
/**
14+
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public class EventToSpanEventBridgeComponentProvider implements ComponentProvider {
20+
21+
@Override
22+
public Class<LogRecordProcessor> getType() {
23+
return LogRecordProcessor.class;
24+
}
25+
26+
@Override
27+
public String getName() {
28+
return "event_to_span_event_bridge/development";
29+
}
30+
31+
@Override
32+
public LogRecordProcessor create(DeclarativeConfigProperties config) {
33+
return EventToSpanEventBridge.create();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.opentelemetry.sdk.extension.incubator.logs.internal.EventToSpanEventBridgeComponentProvider

0 commit comments

Comments
 (0)