Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dependencies {
api project(":application-graph")
api libs.slf4j.api
api libs.micrometer.core
api project(":micrometer:micrometer-api")
api libs.opentelemetry.api
}

Expand Down
16 changes: 10 additions & 6 deletions common/src/main/java/io/koraframework/common/util/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ public static long started() {
return System.nanoTime();
}

public static Duration took(long started) {
return Duration.ofNanos(System.nanoTime() - started).truncatedTo(ChronoUnit.MILLIS);
public static Duration took(long startedNanos) {
return Duration.ofNanos(System.nanoTime() - startedNanos).truncatedTo(ChronoUnit.MILLIS);
}

public static String tookForLogging(long started) {
return durationForLogging(System.nanoTime() - started);
public static String tookForLogging(long startedNanos) {
return durationForLogging(System.nanoTime() - startedNanos);
}

public static String durationForLogging(long duration) {
return Duration.ofNanos(duration).truncatedTo(ChronoUnit.MILLIS).toString().substring(2).toLowerCase();
public static String durationForLogging(long durationNanos) {
return Duration.ofNanos(durationNanos).truncatedTo(ChronoUnit.MILLIS).toString().substring(2).toLowerCase();
}

public static String durationForLogging(Duration duration) {
return duration.truncatedTo(ChronoUnit.MILLIS).toString().substring(2).toLowerCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.palantir.javapoet.MethodSpec;
import com.palantir.javapoet.ParameterSpec;
import com.palantir.javapoet.ParameterizedTypeName;
import io.koraframework.annotation.processor.common.AnnotationUtils;
import org.jspecify.annotations.Nullable;
import io.koraframework.annotation.processor.common.CommonClassNames;
import io.koraframework.annotation.processor.common.TagUtils;
Expand Down Expand Up @@ -64,8 +65,10 @@ public MethodSpec generate(Elements elements, ExecutableElement executableElemen
.addAnnotation(Nullable.class)
.build());

var configPath = AnnotationUtils.parseAnnotationValueWithoutDefault(listenerAnnotation, "value");
var consumerName = ((TypeElement) executableElement.getEnclosingElement()).getQualifiedName() + "#" + executableElement.getSimpleName();
methodBuilder.addStatement("var telemetry = telemetryFactory.get($S, config.driverProperties(), config.telemetry())", consumerName);
methodBuilder.addStatement("var telemetry = telemetryFactory.get($S, $S, config.driverProperties(), config.telemetry())",
configPath, consumerName);

var consumerParameter = parameters.stream().filter(r -> r instanceof ConsumerParameter.Consumer).map(ConsumerParameter.Consumer.class::cast).findFirst();
if (handlerTypeName.rawType().equals(recordHandler)) {
Expand All @@ -77,11 +80,11 @@ public MethodSpec generate(Elements elements, ExecutableElement executableElemen
methodBuilder.beginControlFlow("if (config.topics() == null || config.topics().size() != 1)"); // todo allow list?
methodBuilder.addStatement("throw new java.lang.IllegalArgumentException($S + config.topics())", "@KafkaListener require to specify 1 topic to subscribe when groupId is null, but received: ");
methodBuilder.endControlFlow();
methodBuilder.addCode("return new $T<>($S, config, config.topics().get(0), keyDeserializer, valueDeserializer, telemetry, wrappedHandler);",
kafkaAssignConsumerContainer, consumerName);
methodBuilder.addCode("return new $T<>($S, $S, config, config.topics().get(0), keyDeserializer, valueDeserializer, telemetry, wrappedHandler);",
kafkaAssignConsumerContainer, configPath, consumerName);
methodBuilder.addCode("$<\n} else {$>\n");
methodBuilder.addCode("return new $T<>($S, config, keyDeserializer, valueDeserializer, wrappedHandler, telemetry, rebalanceListener);",
kafkaSubscribeConsumerContainer, consumerName);
methodBuilder.addCode("return new $T<>($S, $S, config, keyDeserializer, valueDeserializer, wrappedHandler, telemetry, rebalanceListener);",
kafkaSubscribeConsumerContainer, configPath, consumerName);
methodBuilder.addCode("$<\n}\n");
return methodBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ public void generatePublisherImplementation(TypeElement publisher, List<Executab
.addParameter(publisherTelemetryConfig, "telemetryConfig")
.addParameter(ClassName.get(Properties.class), "driverProperties")
.addParameter(topicConfigTypeName, "topicConfig")
.addStatement("var telemetry = telemetryFactory.get($S, telemetryConfig, driverProperties);", configPath)
.addStatement("super(driverProperties, telemetryConfig, telemetry)")
.addStatement("var telemetry = telemetryFactory.get($S, $S, telemetryConfig, driverProperties);", configPath, publisher.getQualifiedName().toString())
.addStatement("super($S, $S, driverProperties, telemetryConfig, telemetry)", configPath, publisher.getQualifiedName().toString())
.addStatement("this.topicConfig = topicConfig");
record TypeWithTag(TypeName typeName, String tag) {}
var parameters = new HashMap<TypeWithTag, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,11 @@ protected RecordsHandlerAssertions(Class<?> controllerClass, Class<?> moduleClas
}

public void handle(ConsumerRecords<K, V> record) {
moduleHandler.handle(consumer, new NoopKafkaConsumerPollObservation(), record);
moduleHandler.handle(consumer, NoopKafkaConsumerPollObservation.INSTANCE, record);
}

public void handle(ConsumerRecord<K, V> record, ThrowingConsumer<InvocationAssertions<K, V>> verifier) {
moduleHandler.handle(consumer, new NoopKafkaConsumerPollObservation(), new ConsumerRecords<>(Map.of(
moduleHandler.handle(consumer, NoopKafkaConsumerPollObservation.INSTANCE, new ConsumerRecords<>(Map.of(
new TopicPartition("test", 1),
List.of(record)
)));
Expand All @@ -350,7 +350,7 @@ public void handle(ConsumerRecord<K, V> record, ThrowingConsumer<InvocationAsser

public void handle(ConsumerRecord<K, V> record, Class<? extends Throwable> expectedError) {
assertThatThrownBy(() -> {
moduleHandler.handle(consumer, new NoopKafkaConsumerPollObservation(), new ConsumerRecords<>(Map.of(
moduleHandler.handle(consumer, NoopKafkaConsumerPollObservation.INSTANCE, new ConsumerRecords<>(Map.of(
new TopicPartition("test", 1),
List.of(record)
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.koraframework.kafka.symbol.processor.KafkaClassNames
import io.koraframework.kafka.symbol.processor.KafkaUtils.consumerTag
import io.koraframework.kafka.symbol.processor.KafkaUtils.containerFunName
import io.koraframework.kafka.symbol.processor.consumer.KafkaHandlerGenerator.HandlerFunction
import io.koraframework.ksp.common.AnnotationUtils.findValueNoDefault
import io.koraframework.ksp.common.CommonClassNames
import io.koraframework.ksp.common.KotlinPoetUtils.controlFlow
import io.koraframework.ksp.common.TagUtils.addTag
Expand All @@ -36,8 +37,10 @@ class KafkaContainerGenerator {
.addTag(consumerTag)
.returns(CommonClassNames.lifecycle)

val configPath = listenerAnnotation.findValueNoDefault<String>("value")!!
val consumerName = functionDeclaration.parentDeclaration?.qualifiedName?.asString() + "." + functionDeclaration.simpleName.asString()
funBuilder.addStatement("val telemetry = telemetryFactory.get(%S, config.driverProperties(), config.telemetry())", consumerName)
funBuilder.addStatement("val telemetry = telemetryFactory.get(%S, %S, config.driverProperties(), config.telemetry())",
configPath, consumerName)
if (handlerType.rawType == KafkaClassNames.recordHandler) {
funBuilder.addStatement("val wrappedHandler = %T.wrapHandlerRecord(%L, handler)", KafkaClassNames.handlerWrapper, consumerParameter == null)
} else {
Expand All @@ -47,11 +50,11 @@ class KafkaContainerGenerator {
addStatement("val topics = config.topics()")
addStatement("require(topics != null)")
addStatement("require(topics.size == 1)")
addStatement("return %T(%S, config, topics[0], keyDeserializer, valueDeserializer, telemetry, wrappedHandler)",
KafkaClassNames.kafkaAssignConsumerContainer, consumerName)
addStatement("return %T(%S, %S, config, topics[0], keyDeserializer, valueDeserializer, telemetry, wrappedHandler)",
KafkaClassNames.kafkaAssignConsumerContainer, configPath, consumerName)
nextControlFlow("else")
addStatement("return %T(%S, config, keyDeserializer, valueDeserializer, wrappedHandler, telemetry, rebalanceListener)",
KafkaClassNames.kafkaSubscribeConsumerContainer, consumerName)
addStatement("return %T(%S, %S, config, keyDeserializer, valueDeserializer, wrappedHandler, telemetry, rebalanceListener)",
KafkaClassNames.kafkaSubscribeConsumerContainer, configPath, consumerName)
}
return funBuilder.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,11 @@ class KafkaPublisherGenerator(val env: SymbolProcessorEnvironment, val resolver:
val b = classDeclaration.extendsKeepAop(implementationName, resolver)
.generated(KafkaPublisherSymbolProcessor::class)
.superclass(KafkaClassNames.abstractPublisher)
.addSuperclassConstructorParameter("%S", configPath)
.addSuperclassConstructorParameter("%S", classDeclaration.qualifiedName!!.asString())
.addSuperclassConstructorParameter("driverProperties")
.addSuperclassConstructorParameter("telemetryConfig")
.addSuperclassConstructorParameter("telemetryFactory.get(%S, telemetryConfig, driverProperties)", configPath)
.addSuperclassConstructorParameter("telemetryFactory.get(%S, %S, telemetryConfig, driverProperties)", configPath, classDeclaration.qualifiedName!!.asString())
.apply { topicConfig?.let { addProperty(PropertySpec.builder("topicConfig", it, KModifier.PRIVATE, KModifier.FINAL).initializer("topicConfig").build()) } }

val constructorBuilder = FunSpec.constructorBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
package io.koraframework.kafka.common;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import org.jspecify.annotations.Nullable;
import io.koraframework.common.DefaultComponent;
import io.koraframework.kafka.common.consumer.telemetry.DefaultKafkaConsumerMetricsFactory;
import io.koraframework.kafka.common.consumer.telemetry.DefaultKafkaConsumerTelemetryFactory;
import io.koraframework.kafka.common.producer.telemetry.DefaultKafkaPublisherMetricsFactory;
import io.koraframework.kafka.common.producer.telemetry.DefaultKafkaPublisherTelemetryFactory;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentelemetry.api.trace.Tracer;
import org.jspecify.annotations.Nullable;

public interface KafkaModule extends KafkaDeserializersModule, KafkaSerializersModule {
@DefaultComponent
default DefaultKafkaConsumerTelemetryFactory defaultKafkaConsumerTelemetryFactory(@Nullable Tracer tracer, @Nullable MeterRegistry meterRegistry) {
if (tracer == null) {
tracer = TracerProvider.noop().get("kafkaPublisherTelemetry");
}
if (meterRegistry == null) {
meterRegistry = new CompositeMeterRegistry();
}

return new DefaultKafkaConsumerTelemetryFactory(tracer, meterRegistry);
@DefaultComponent
default DefaultKafkaConsumerTelemetryFactory defaultKafkaConsumerTelemetryFactory(@Nullable Tracer tracer,
@Nullable MeterRegistry meterRegistry,
@Nullable DefaultKafkaConsumerMetricsFactory metricsFactory) {
return new DefaultKafkaConsumerTelemetryFactory(tracer, meterRegistry, metricsFactory);
}

@DefaultComponent
default DefaultKafkaPublisherTelemetryFactory defaultKafkaProducerTelemetryFactory(@Nullable Tracer tracer, @Nullable MeterRegistry meterRegistry) {
if (tracer == null) {
tracer = TracerProvider.noop().get("kafkaPublisherTelemetry");
}
if (meterRegistry == null) {
meterRegistry = new CompositeMeterRegistry();
}
return new DefaultKafkaPublisherTelemetryFactory(tracer, meterRegistry);
default DefaultKafkaPublisherTelemetryFactory defaultKafkaPublisherTelemetryFactory(@Nullable Tracer tracer,
@Nullable MeterRegistry meterRegistry,
@Nullable DefaultKafkaPublisherMetricsFactory metricsFactory) {
return new DefaultKafkaPublisherTelemetryFactory(tracer, meterRegistry, metricsFactory);
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,15 @@
package io.koraframework.kafka.common;

import org.apache.kafka.clients.CommonClientConfigs;
import org.jspecify.annotations.NullMarked;
import io.koraframework.kafka.common.consumer.KafkaListenerConfig;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@NullMarked
public final class KafkaUtils {

private KafkaUtils() {}

public static String getConsumerPrefix(KafkaListenerConfig config) {
final Object groupId = config.driverProperties().get(CommonClientConfigs.GROUP_ID_CONFIG);
if (groupId != null) {
return groupId.toString();
}

if (config.topics() != null) {
return String.join(";", config.topics());
} else if (config.topicsPattern() != null) {
return config.topicsPattern().toString();
} else if (config.partitions() != null) {
return String.join(";", config.partitions());
} else {
return "unknown";
}
}

public static class NamedThreadFactory implements ThreadFactory {
private static final String CONSUMER_PREFIX = "kafka-consumer-";

private static final String CONSUMER_PREFIX = "kafka-listener-";

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public interface KafkaListenerConfig {
@Nullable
List<String> partitions();

default Either<Duration, String> offset() {
return Either.right("latest");
default Either<Duration, Offset> offset() {
return Either.right(Offset.latest);
}

default Duration pollTimeout() {
Expand All @@ -53,8 +53,15 @@ default boolean allowEmptyRecords() {
return false;
}

@Nullable
Duration initializationFailTimeout();

KafkaConsumerTelemetryConfig telemetry();

enum Offset {
latest, earliest
}

default KafkaListenerConfig withDriverPropertiesOverrides(Map<String, Object> overrides) {
var props = new Properties();
props.putAll(driverProperties());
Expand All @@ -71,6 +78,7 @@ default KafkaListenerConfig withDriverPropertiesOverrides(Map<String, Object> ov
partitionRefreshInterval(),
shutdownWait(),
allowEmptyRecords(),
initializationFailTimeout(),
telemetry()
);
}
Expand Down
Loading
Loading