Skip to content

Commit 0c97c14

Browse files
authored
Code review sweep (run 25008151676) (open-telemetry#18341)
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
1 parent 5b99fbf commit 0c97c14

14 files changed

Lines changed: 53 additions & 51 deletions

File tree

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public static AdviceScope start(KafkaProducerRequest request) {
7373
return new AdviceScope(parentContext, request, context, context.makeCurrent());
7474
}
7575

76-
public Callback wrapCallback(Callback originalCallback) {
76+
public Callback wrapCallback(@Nullable Callback originalCallback) {
7777
return new ProducerCallback(originalCallback, parentContext, context, request);
7878
}
7979

@@ -105,7 +105,7 @@ public static Object[] onEnter(
105105
@Advice.FieldValue("clientId") String clientId,
106106
@Advice.FieldValue("producerConfig") ProducerConfig producerConfig,
107107
@Advice.Argument(0) ProducerRecord<?, ?> originalRecord,
108-
@Advice.Argument(1) Callback originalCallback) {
108+
@Advice.Argument(1) @Nullable Callback originalCallback) {
109109
ProducerRecord<?, ?> record = originalRecord;
110110
Callback callback = originalCallback;
111111

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractOpenTelemetryMetricsReporterTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.auto.value.AutoValue;
2020
import io.opentelemetry.api.common.AttributeKey;
21+
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
2122
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
2223
import io.opentelemetry.sdk.metrics.data.MetricData;
2324
import io.opentelemetry.sdk.metrics.data.PointData;
@@ -45,10 +46,10 @@
4546
import org.apache.kafka.common.metrics.MetricsReporter;
4647
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
4748
import org.apache.kafka.common.serialization.ByteArraySerializer;
48-
import org.junit.jupiter.api.AfterAll;
4949
import org.junit.jupiter.api.AfterEach;
5050
import org.junit.jupiter.api.BeforeEach;
5151
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.api.extension.RegisterExtension;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -63,6 +64,8 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
6364
private static final Logger logger =
6465
LoggerFactory.getLogger(AbstractOpenTelemetryMetricsReporterTest.class);
6566

67+
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
68+
6669
private static final List<String> TOPICS = asList("foo", "bar", "baz", "qux");
6770
private static final Random RANDOM = new Random();
6871

@@ -90,16 +93,12 @@ void beforeAll() {
9093
.withLogConsumer(new Slf4jLogConsumer(logger))
9194
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
9295
.withStartupTimeout(Duration.ofMinutes(1));
96+
cleanup.deferAfterAll(kafka::stop);
9397
kafka.start();
9498
producer = new KafkaProducer<>(producerConfig());
99+
cleanup.deferAfterAll(producer);
95100
consumer = new KafkaConsumer<>(consumerConfig());
96-
}
97-
98-
@AfterAll
99-
static void afterAll() {
100-
producer.close();
101-
consumer.close();
102-
kafka.stop();
101+
cleanup.deferAfterAll(consumer);
103102
}
104103

105104
@AfterEach
@@ -195,7 +194,7 @@ private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> met
195194

196195
@Test
197196
void observeMetrics() {
198-
// Firstly create new producer and consumer and close them. This is done tp verify that metrics
197+
// Firstly create new producer and consumer and close them. This is done to verify that metrics
199198
// are still produced after closing one producer/consumer. See
200199
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11880
201200
KafkaProducer<byte[], byte[]> producer2 = new KafkaProducer<>(producerConfig());

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import static java.util.Collections.emptyList;
3030
import static java.util.Collections.singletonList;
3131
import static java.util.concurrent.TimeUnit.SECONDS;
32-
import static org.assertj.core.api.Assertions.assertThat;
3332

3433
import io.opentelemetry.api.common.AttributeKey;
3534
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
@@ -83,7 +82,7 @@ public abstract class KafkaClientBaseTest {
8382
protected Consumer<Integer, String> consumer;
8483
private final CountDownLatch consumerReady = new CountDownLatch(1);
8584

86-
protected static final boolean isExperimentalEnabled =
85+
private static final boolean EXPERIMENTAL_ATTRIBUTES =
8786
Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes");
8887

8988
public static final int partition = 0;
@@ -200,7 +199,7 @@ protected static List<AttributeAssertion> sendAttributes(
200199
if (testHeaders) {
201200
assertions.add(equalTo(headerAttributeKey("Test-Message-Header"), singletonList("test")));
202201
}
203-
if (isExperimentalEnabled) {
202+
if (EXPERIMENTAL_ATTRIBUTES) {
204203
assertions.add(
205204
satisfies(
206205
stringKey("messaging.kafka.bootstrap.servers"),
@@ -239,13 +238,11 @@ protected static List<AttributeAssertion> processAttributes(
239238
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
240239
equalTo(MESSAGING_OPERATION, "process"),
241240
satisfies(MESSAGING_CLIENT_ID, val -> val.startsWith("consumer")),
242-
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
243-
satisfies(
244-
longKey("kafka.record.queue_time_ms"),
245-
val ->
246-
val.satisfiesAnyOf(
247-
v -> assertThat(v).isNotNegative(),
248-
v -> assertThat(isExperimentalEnabled).isFalse()))));
241+
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty)));
242+
if (EXPERIMENTAL_ATTRIBUTES) {
243+
assertions.add(
244+
satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative));
245+
}
249246
if (emitOldMessagingSemconv()) {
250247
assertions.add(satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative));
251248
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientPropagationBaseTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.junit.jupiter.api.Test;
1515

1616
public abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest {
17-
private static final boolean producerPropagationEnabled =
17+
private static final boolean PRODUCER_PROPAGATION_ENABLED =
1818
Boolean.parseBoolean(
1919
System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true"));
2020

@@ -28,7 +28,7 @@ void testClientHeaderPropagationManualConfig() throws InterruptedException {
2828
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
2929
assertThat(records).hasSize(1);
3030
for (ConsumerRecord<?, ?> record : records) {
31-
assertThat(record.headers().iterator().hasNext()).isEqualTo(producerPropagationEnabled);
31+
assertThat(record.headers().iterator().hasNext()).isEqualTo(PRODUCER_PROPAGATION_ENABLED);
3232
}
3333
}
3434
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ void testProducerExceptionPropagatesToCaller() {
6666
}
6767

6868
@Test
69-
@SuppressWarnings({"unchecked"})
69+
@SuppressWarnings("unchecked")
7070
void testProducerHandlesReadOnlyHeaders() {
7171
Producer<String, String> producer = mock(Producer.class);
7272
when(producer.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class WrapperSendExceptionTest {
2626
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
2727

2828
@Test
29-
@SuppressWarnings({"unchecked"})
29+
@SuppressWarnings("unchecked")
3030
void producerSpanEndedWhenSendThrowsSynchronously() {
3131
Producer<String, String> producer = mock(Producer.class);
3232
when(producer.send(any(), any())).thenThrow(new KafkaException("send failed"));

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ abstract class AbstractKafkaConsumerRequest {
1212
@Nullable private final String consumerGroup;
1313
@Nullable private final String clientId;
1414

15-
AbstractKafkaConsumerRequest(String consumerGroup, String clientId) {
15+
AbstractKafkaConsumerRequest(@Nullable String consumerGroup, @Nullable String clientId) {
1616
this.consumerGroup = consumerGroup;
1717
this.clientId = clientId;
1818
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContextUtil.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.context.Context;
99
import io.opentelemetry.instrumentation.api.util.VirtualField;
10+
import javax.annotation.Nullable;
1011
import org.apache.kafka.clients.consumer.Consumer;
1112
import org.apache.kafka.clients.consumer.ConsumerRecord;
1213
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -51,17 +52,16 @@ public static KafkaConsumerContext get(ConsumerRecords<?, ?> records) {
5152
return create(receiveContext, consumerGroup, clientId);
5253
}
5354

54-
public static KafkaConsumerContext create(Context context, Consumer<?, ?> consumer) {
55+
public static KafkaConsumerContext create(@Nullable Context context, Consumer<?, ?> consumer) {
5556
return create(context, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer));
5657
}
5758

5859
public static KafkaConsumerContext create(
59-
Context context, String consumerGroup, String clientId) {
60+
@Nullable Context context, @Nullable String consumerGroup, @Nullable String clientId) {
6061
return KafkaConsumerContext.create(context, consumerGroup, clientId);
6162
}
6263

6364
public static void set(ConsumerRecord<?, ?> record, Context context, Consumer<?, ?> consumer) {
64-
recordContextField.set(record, context);
6565
String consumerGroup = KafkaUtil.getConsumerGroup(consumer);
6666
String clientId = KafkaUtil.getClientId(consumer);
6767
set(record, context, consumerGroup, clientId);
@@ -76,7 +76,10 @@ public static void set(ConsumerRecord<?, ?> record, KafkaConsumerContext consume
7676
}
7777

7878
public static void set(
79-
ConsumerRecord<?, ?> record, Context context, String consumerGroup, String clientId) {
79+
ConsumerRecord<?, ?> record,
80+
@Nullable Context context,
81+
@Nullable String consumerGroup,
82+
@Nullable String clientId) {
8083
recordContextField.set(record, context);
8184
recordConsumerInfoField.set(record, new String[] {consumerGroup, clientId});
8285
}
@@ -88,7 +91,10 @@ public static void set(ConsumerRecords<?, ?> records, Context context, Consumer<
8891
}
8992

9093
public static void set(
91-
ConsumerRecords<?, ?> records, Context context, String consumerGroup, String clientId) {
94+
ConsumerRecords<?, ?> records,
95+
@Nullable Context context,
96+
@Nullable String consumerGroup,
97+
@Nullable String clientId) {
9298
recordsContextField.set(records, context);
9399
recordsConsumerInfoField.set(records, new String[] {consumerGroup, clientId});
94100
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProcessRequest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,16 @@ public static KafkaProcessRequest create(
2424

2525
public static KafkaProcessRequest create(
2626
KafkaConsumerContext consumerContext, ConsumerRecord<?, ?> record) {
27-
String consumerGroup = consumerContext != null ? consumerContext.getConsumerGroup() : null;
28-
String clientId = consumerContext != null ? consumerContext.getClientId() : null;
29-
return create(record, consumerGroup, clientId);
27+
return create(record, consumerContext.getConsumerGroup(), consumerContext.getClientId());
3028
}
3129

3230
public static KafkaProcessRequest create(
33-
ConsumerRecord<?, ?> record, String consumerGroup, String clientId) {
31+
ConsumerRecord<?, ?> record, @Nullable String consumerGroup, @Nullable String clientId) {
3432
return new KafkaProcessRequest(record, consumerGroup, clientId);
3533
}
3634

37-
public KafkaProcessRequest(ConsumerRecord<?, ?> record, String consumerGroup, String clientId) {
35+
public KafkaProcessRequest(
36+
ConsumerRecord<?, ?> record, @Nullable String consumerGroup, @Nullable String clientId) {
3837
super(consumerGroup, clientId);
3938
this.record = record;
4039
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@ public final class KafkaProducerRequest {
2424
@Nullable private final String bootstrapServers;
2525

2626
public static KafkaProducerRequest create(
27-
ProducerRecord<?, ?> record, Producer<?, ?> producer, String bootstrapServers) {
27+
ProducerRecord<?, ?> record, Producer<?, ?> producer, @Nullable String bootstrapServers) {
2828
return create(record, extractClientId(producer), bootstrapServers);
2929
}
3030

3131
public static KafkaProducerRequest create(
32-
ProducerRecord<?, ?> record, String clientId, String bootstrapServers) {
32+
ProducerRecord<?, ?> record, @Nullable String clientId, @Nullable String bootstrapServers) {
3333
return new KafkaProducerRequest(record, clientId, bootstrapServers);
3434
}
3535

3636
private KafkaProducerRequest(
37-
ProducerRecord<?, ?> record, String clientId, String bootstrapServers) {
37+
ProducerRecord<?, ?> record, @Nullable String clientId, @Nullable String bootstrapServers) {
3838
this.record = record;
3939
this.clientId = clientId;
4040
this.bootstrapServers = bootstrapServers;
@@ -44,14 +44,17 @@ private KafkaProducerRequest(
4444
return record;
4545
}
4646

47+
@Nullable
4748
public String getClientId() {
4849
return clientId;
4950
}
5051

52+
@Nullable
5153
public String getBootstrapServers() {
5254
return bootstrapServers;
5355
}
5456

57+
@Nullable
5558
private static String extractClientId(Producer<?, ?> producer) {
5659
try {
5760
Map<MetricName, ? extends Metric> metrics = producer.metrics();

0 commit comments

Comments
 (0)