diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index c3fd950b17..67811ebdfe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -40,11 +40,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.TopicPartition; -import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import reactor.core.publisher.Mono; import org.springframework.core.MethodParameter; +import org.springframework.core.Nullness; import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.expression.Expression; @@ -883,7 +883,7 @@ protected Type determineInferredType(@Nullable Method method) { // NOSONAR compl } this.hasAckParameter |= isAck; if (isAck) { - this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null; + this.noOpAck |= Nullness.forMethodParameter(methodParameter) == Nullness.NON_NULL; } isNotConvertible |= isAck; boolean isConsumer = parameterIsType(parameterType, Consumer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index 60fd43108b..b484e3f574 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -20,6 +20,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -31,6 +33,7 @@ import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.BDDMockito.willReturn; @@ -118,7 +121,24 @@ void testMissingAck() throws NoSuchMethodException, SecurityException { .withStackTraceContaining("MANUAL"); } - public void test(Acknowledgment ack) { + @Test + void noOpAckWhenAcknowledgmentParameterIsNonNull() throws NoSuchMethodException { + KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); + Method method = getClass().getDeclaredMethod("testNonNullAck", Acknowledgment.class); + RecordMessagingMessageListenerAdapter adapter = + new RecordMessagingMessageListenerAdapter<>(this, method); + adapter.setHandlerMethod( + new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method))); + // A non-null Acknowledgment parameter must substitute a no-op ack, not fail with "No Acknowledgment available" + assertThatNoException().isThrownBy(() -> adapter.onMessage( + new ConsumerRecord<>("foo", 0, 0L, null, "foo"), null, null)); + } + + public void test(@Nullable Acknowledgment ack) { + + } + + public void testNonNullAck(@NonNull Acknowledgment ack) { }