Skip to content

Commit a132596

Browse files
committed
Update kafka-connect test for PostgreSQL db.namespace format
Gate database|schema namespace behind stable semconv flag. The PostgreSQL JDBC URL parser now only sets the database|schema namespace format when stable database semconv is enabled. Under old semconv (default), namespace falls back to just the database name. Added testStableSemconv task for kafka-connect-2.6 testing module. The base test class forwards the semconv-stability.opt-in system property to the container as OTEL_SEMCONV_STABILITY_OPT_IN env var.
1 parent 4a1299c commit a132596

5 files changed

Lines changed: 56 additions & 19 deletions

File tree

instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/parser/PostgresqlUrlParser.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.jdbc.internal.parser;
77

8+
import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv;
89
import static io.opentelemetry.instrumentation.jdbc.internal.parser.UrlParsingUtils.splitQuery;
910

1011
import java.util.Map;
@@ -52,16 +53,19 @@ public void parse(String jdbcUrl, ParseContext ctx) {
5253
GenericUrlParser.INSTANCE.parse(jdbcUrl, ctx);
5354

5455
// Extract schema from currentSchema URL parameter for namespace formatting
55-
String schema = extractCurrentSchema(jdbcUrl);
56-
if (schema == null && ctx.user() != null) {
57-
// Fall back to user as schema if no currentSchema param
58-
schema = ctx.user();
59-
}
56+
// Only set database|schema namespace under stable semconv
57+
if (emitStableDatabaseSemconv()) {
58+
String schema = extractCurrentSchema(jdbcUrl);
59+
if (schema == null && ctx.user() != null) {
60+
// Fall back to user as schema if no currentSchema param
61+
schema = ctx.user();
62+
}
6063

61-
// Format namespace as database|schema (only when schema is available)
62-
String database = ctx.databaseName();
63-
if (database != null && schema != null) {
64-
ctx.namespace(database + "|" + schema);
64+
// Format namespace as database|schema (only when schema is available)
65+
String database = ctx.databaseName();
66+
if (database != null && schema != null) {
67+
ctx.namespace(database + "|" + schema);
68+
}
6569
}
6670
}
6771

instrumentation/jdbc/library/src/test/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcConnectionUrlParserTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.jdbc.internal;
77

8+
import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv;
89
import static io.opentelemetry.instrumentation.jdbc.internal.JdbcConnectionUrlParser.parse;
910
import static io.opentelemetry.instrumentation.jdbc.internal.dbinfo.DbInfo.DEFAULT;
1011
import static org.assertj.core.api.Assertions.assertThat;
@@ -220,7 +221,7 @@ private static Stream<Arguments> postgresArguments() {
220221
.setUser("stdUserName")
221222
.setHost("stdServerName")
222223
.setPort(9999)
223-
.setNamespace("stdDatabaseName|stdUserName")
224+
.setNamespace(emitStableDatabaseSemconv() ? "stdDatabaseName|stdUserName" : null)
224225
.setName("stdDatabaseName")
225226
.build(),
226227
arg("jdbc:postgresql://pg.host")
@@ -235,7 +236,7 @@ private static Stream<Arguments> postgresArguments() {
235236
.setUser("pguser")
236237
.setHost("pg.host")
237238
.setPort(11)
238-
.setNamespace("pgdb|pguser")
239+
.setNamespace(emitStableDatabaseSemconv() ? "pgdb|pguser" : null)
239240
.setName("pgdb")
240241
.build(),
241242
arg("jdbc:postgresql://pg.host:11/pgdb?user=pguser&password=PW")
@@ -245,7 +246,7 @@ private static Stream<Arguments> postgresArguments() {
245246
.setUser("pguser")
246247
.setHost("pg.host")
247248
.setPort(11)
248-
.setNamespace("pgdb|pguser")
249+
.setNamespace(emitStableDatabaseSemconv() ? "pgdb|pguser" : null)
249250
.setName("pgdb")
250251
.build(),
251252
// currentSchema param takes precedence over user for namespace
@@ -255,7 +256,7 @@ private static Stream<Arguments> postgresArguments() {
255256
.setUser("pguser")
256257
.setHost("pg.host")
257258
.setPort(11)
258-
.setNamespace("pgdb|myschema")
259+
.setNamespace(emitStableDatabaseSemconv() ? "pgdb|myschema" : null)
259260
.setName("pgdb")
260261
.build(),
261262
// currentSchema without user
@@ -264,7 +265,7 @@ private static Stream<Arguments> postgresArguments() {
264265
.setSystem("postgresql")
265266
.setHost("pg.host")
266267
.setPort(5432)
267-
.setNamespace("pgdb|myschema")
268+
.setNamespace(emitStableDatabaseSemconv() ? "pgdb|myschema" : null)
268269
.setName("pgdb")
269270
.build(),
270271
// database only, no schema or user — namespace falls back to database name

instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,16 @@ tasks.withType<Test>().configureEach {
3636
systemProperty("io.opentelemetry.smoketest.agent.shadowJar.path", agentShadowJar.get().archiveFile.get().toString())
3737
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
3838
}
39+
40+
tasks {
41+
val testStableSemconv by registering(Test::class) {
42+
testClassesDirs = sourceSets.test.get().output.classesDirs
43+
classpath = sourceSets.test.get().runtimeClasspath
44+
45+
jvmArgs("-Dotel.semconv-stability.opt-in=database")
46+
}
47+
48+
check {
49+
dependsOn(testStableSemconv)
50+
}
51+
}

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,15 @@ private void setupKafkaConnect() {
274274
.withEnv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc")
275275
.withEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "1")
276276
.withEnv("OTEL_BSP_SCHEDULE_DELAY", "10ms")
277-
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "1000")
277+
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "1000");
278+
279+
// Pass semconv stability opt-in to the agent inside the container
280+
String semconvOptIn = System.getProperty("otel.semconv-stability.opt-in");
281+
if (semconvOptIn != null) {
282+
kafkaConnect.withEnv("OTEL_SEMCONV_STABILITY_OPT_IN", semconvOptIn);
283+
}
284+
285+
kafkaConnect
278286
.withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBoostrapServers())
279287
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KAFKA_CONNECT_NETWORK_ALIAS)
280288
.withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER)

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class PostgresKafkaConnectSinkTaskTest extends KafkaConnectSinkTaskBaseTest {
6767
private static final String DB_USERNAME = "postgres";
6868
private static final String DB_PASSWORD = "password";
6969
private static final String DB_TABLE_PERSON = "person";
70+
private static final String DB_NAMESPACE = stableDatabaseSemconv()
71+
? DATABASE_NAME + "|" + DB_USERNAME : DATABASE_NAME;
7072
private static final String CONNECTOR_NAME = "test-postgres-connector";
7173
private static final String TOPIC_NAME = "test-postgres-topic";
7274

@@ -163,7 +165,9 @@ void testSingleMessage() throws Exception {
163165
// kafka connect consumer trace, linked to producer span via a span link
164166
Consumer<SpanDataAssert> selectAssertion =
165167
span ->
166-
span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
168+
span.hasName("SELECT " + DB_NAMESPACE)
169+
.hasKind(SpanKind.CLIENT)
170+
.hasParent(trace.getSpan(0));
167171

168172
trace.hasSpansSatisfyingExactly(
169173
span ->
@@ -184,7 +188,7 @@ void testSingleMessage() throws Exception {
184188
selectAssertion,
185189
selectAssertion,
186190
span ->
187-
span.hasName("INSERT test." + DB_TABLE_PERSON)
191+
span.hasName("INSERT " + DB_NAMESPACE + "." + DB_TABLE_PERSON)
188192
.hasKind(SpanKind.CLIENT)
189193
.hasParent(trace.getSpan(0)));
190194
},
@@ -288,7 +292,9 @@ void testMultiTopic() throws Exception {
288292
// kafka connect consumer trace, linked to producer span via a span link
289293
Consumer<SpanDataAssert> selectAssertion =
290294
span ->
291-
span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
295+
span.hasName("SELECT " + DB_NAMESPACE)
296+
.hasKind(SpanKind.CLIENT)
297+
.hasParent(trace.getSpan(0));
292298

293299
trace.hasSpansSatisfyingExactly(
294300
span ->
@@ -311,7 +317,7 @@ void testMultiTopic() throws Exception {
311317
selectAssertion,
312318
selectAssertion,
313319
span ->
314-
span.hasName("INSERT test." + DB_TABLE_PERSON)
320+
span.hasName("INSERT " + DB_NAMESPACE + "." + DB_TABLE_PERSON)
315321
.hasKind(SpanKind.CLIENT)
316322
.hasParent(trace.getSpan(0)));
317323
},
@@ -426,4 +432,9 @@ private static void clearPostgresTable() throws SQLException {
426432
logger.info("Cleared PostgreSQL table: {}", DB_TABLE_PERSON);
427433
}
428434
}
435+
436+
private static boolean stableDatabaseSemconv() {
437+
String optIn = System.getProperty("otel.semconv-stability.opt-in", "");
438+
return optIn.contains("database");
439+
}
429440
}

0 commit comments

Comments
 (0)