diff --git a/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java b/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java new file mode 100644 index 000000000..7d7e00d46 --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java @@ -0,0 +1,68 @@ +package io.smallrye.reactive.messaging; + +import jakarta.enterprise.inject.spi.Prioritized; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Uni; + +/** + * SPI interface for decorating message processing at the invocation level. + *

+ * Implementations can decorate the {@link Uni} representing the method invocation result, + * enabling cross-cutting concerns such as timeouts, metrics, or circuit breaking + * to be applied per-message. + *

+ * Similar to {@link PublisherDecorator} and {@link SubscriberDecorator} which operate + * at the stream level, this decorator operates at the individual message processing level. + *

+ * The {@link #decorate(MediatorConfiguration)} method is called once per mediator during + * initialization. It receives the mediator configuration and returns a + * {@link ProcessingInterceptor} to apply per-message, or {@code null} if this decorator + * does not apply to the given mediator. + *

+ * Implementations are CDI beans discovered at startup. Multiple decorators are chained + * in priority order (via {@link Prioritized}). + */ +@Experimental("SmallRye only feature") +public interface ProcessingDecorator extends Prioritized { + + int DEFAULT_PRIORITY = 100; + + /** + * Called once per mediator during initialization to obtain a per-message interceptor. + *

+ * The mediator configuration provides access to incoming channel names, acknowledgment + * strategy, method signature, blocking flag, and other mediator-level concerns. + * + * @param configuration the mediator configuration + * @return a {@link ProcessingInterceptor} to apply per-message, or {@code null} if this + * decorator does not apply to the given mediator + */ + ProcessingInterceptor decorate(MediatorConfiguration configuration); + + @Override + default int getPriority() { + return DEFAULT_PRIORITY; + } + + /** + * A per-message interceptor returned by {@link ProcessingDecorator#decorate(MediatorConfiguration)}. + */ + @FunctionalInterface + interface ProcessingInterceptor { + /** + * Intercepts the processing {@link Uni} for a single message. + *

+ * The returned Uni replaces the original processing Uni. Implementations must + * ensure that the returned Uni eventually completes (or fails) to avoid stalling + * the processing pipeline. + * + * @param processing the Uni representing the method invocation + * @param message the message being processed + * @return the decorated Uni + */ + Uni intercept(Uni processing, Message message); + } +} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index 70619b1a5..7af8e3639 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -3,7 +3,9 @@ import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; +import java.time.Duration; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; @@ -29,6 +31,8 @@ import io.opentelemetry.api.trace.Tracer; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.ClientCustomizer; +import io.smallrye.reactive.messaging.MediatorConfiguration; +import io.smallrye.reactive.messaging.ProcessingDecorator; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; import io.smallrye.reactive.messaging.connector.InboundConnector; @@ -137,7 +141,7 @@ @ConnectorAttribute(name = "pooled-producer", type = "boolean", direction = Direction.OUTGOING, description = "Whether to use a pool of Kafka producers for concurrent transactions. Each transaction scope acquires a producer from the pool, enabling concurrent exactly-once processing. Requires transactional.id to be set.", defaultValue = "false") @ConnectorAttribute(name = "pooled-producer.initial-pool-size", type = "int", direction = Direction.OUTGOING, description = "Number of Kafka producers to pre-create in the pool at startup. Respects the lazy-client setting. Defaults to 0 (lazy creation).", defaultValue = "0") @ConnectorAttribute(name = "pooled-producer.max-pool-size", type = "int", direction = Direction.OUTGOING, description = "Maximum number of Kafka producers in the pool. When all producers are in use and the pool is exhausted, an exception is thrown. Defaults to 10.", defaultValue = "10") -public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter { +public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter, ProcessingDecorator { public static final String CONNECTOR_NAME = "smallrye-kafka"; @@ -189,6 +193,7 @@ public SpanBuilder spanBuilder(final String spanName) { private final List> sources = new CopyOnWriteArrayList<>(); private final List> shareGroupSources = new CopyOnWriteArrayList<>(); private final List sinks = new CopyOnWriteArrayList<>(); + private final Map channelProcessingTimeouts = new ConcurrentHashMap<>(); @Inject KafkaAdminClientRegistry adminClientRegistry; @@ -239,6 +244,9 @@ public Flow.Publisher> getPublisher(Config config) { return s; }); + // Register processing timeout for this channel (used by ProcessingTimeoutProvider SPI) + registerProcessingTimeout(ic); + if (ic.getShareGroup()) { if (partitions > 1) { log.warn("Share group mode does not support multiple partitions. Using single consumer."); @@ -416,4 +424,33 @@ public List> getShareConsumers(String channel) { .map(s -> (KafkaShareConsumer) s.getConsumer()) .toList(); } + + private void registerProcessingTimeout(KafkaConnectorIncomingConfiguration ic) { + String channel = ic.getChannel(); + if (ic.getShareGroup()) { + int maxAge = ic.getShareGroupUnprocessedRecordMaxAgeMs(); + if (maxAge > 0) { + channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge)); + } + } else { + String commitStrategy = ic.getCommitStrategy().orElse(KafkaCommitHandler.Strategy.THROTTLED); + if (KafkaCommitHandler.Strategy.THROTTLED.equals(commitStrategy)) { + int maxAge = ic.getThrottledUnprocessedRecordMaxAgeMs(); + if (maxAge > 0) { + channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge)); + } + } + } + } + + @Override + public ProcessingInterceptor decorate(MediatorConfiguration configuration) { + for (String channel : configuration.getIncoming()) { + Duration timeout = channelProcessingTimeouts.get(channel); + if (timeout != null) { + return (processing, message) -> processing.ifNoItem().after(timeout).fail(); + } + } + return null; + } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java index 89853806e..36a5015e6 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java @@ -209,7 +209,7 @@ public Uni> received(IncomingKafkaRecord .emitOn(this::runOnContext) // Switch back to event loop .onItem().transform(offsets -> { OffsetAndMetadata lastCommitted = offsets.get(recordsTopicPartition); - OffsetStore store = new OffsetStore(recordsTopicPartition, unprocessedRecordMaxAge, + OffsetStore store = new OffsetStore(recordsTopicPartition, lastCommitted == null ? -1 : lastCommitted.offset() - 1); offsetStores.put(recordsTopicPartition, store); return store; @@ -220,7 +220,7 @@ public Uni> received(IncomingKafkaRecord return uni .onItem().invoke(store -> { - store.received(record.getOffset()); + store.received(record); if (timerId < 0) { startFlushAndCheckHealthTimer(); } @@ -307,44 +307,20 @@ private void flushAndCheckHealth(long ignored) { if (this.unprocessedRecordMaxAge > 0) { for (OffsetStore store : offsetStores.values()) { - long millis = store.hasTooManyMessagesWithoutAck(); - if (millis != -1) { - OffsetReceivedAt received = store.receivedOffsets.peek(); - if (received != null) { - long lastOffset = store.getLastProcessedOffset(); - TooManyMessagesWithoutAckException exception = new TooManyMessagesWithoutAckException( - store.topicPartition, - received.offset, - millis / 1000, - store.receivedOffsets.size(), - lastOffset); - this.reportFailure.accept(exception, true); - } - } + store.checkHasTooManyMessagesWithoutAck(); } } } - private static class OffsetReceivedAt { - private final long offset; - private final long receivedAt; - - private OffsetReceivedAt(long offset, long receivedAt) { - this.offset = offset; - this.receivedAt = receivedAt; - } + private record OffsetReceivedAt(IncomingKafkaRecord record, long receivedAt) { - static OffsetReceivedAt received(long offset) { - return new OffsetReceivedAt(offset, System.currentTimeMillis()); + OffsetReceivedAt(IncomingKafkaRecord record) { + this(record, System.currentTimeMillis()); } - public long getOffset() { - return offset; - } - - public long getReceivedAt() { - return receivedAt; + long offset() { + return record.getOffset(); } } @@ -352,26 +328,21 @@ private class OffsetStore { private final TopicPartition topicPartition; private final Queue receivedOffsets = new PriorityQueue<>( - Comparator.comparingLong(OffsetReceivedAt::getOffset)); + Comparator.comparingLong(OffsetReceivedAt::offset)); private final Set processedOffsets = new HashSet<>(); - private final int unprocessedRecordMaxAge; private final AtomicLong unProcessedTotal = new AtomicLong(); private long lastProcessedOffset; - OffsetStore(TopicPartition topicPartition, int unprocessedRecordMaxAge, long lastProcessedOffset) { + OffsetStore(TopicPartition topicPartition, long lastProcessedOffset) { this.topicPartition = topicPartition; - this.unprocessedRecordMaxAge = unprocessedRecordMaxAge; log.initializeStoreAtPosition(topicPartition, lastProcessedOffset); this.lastProcessedOffset = lastProcessedOffset; } - long getLastProcessedOffset() { - return lastProcessedOffset; - } - - void received(long offset) { + void received(IncomingKafkaRecord record) { + long offset = record.getOffset(); if (offset > lastProcessedOffset) { - this.receivedOffsets.offer(OffsetReceivedAt.received(offset)); + this.receivedOffsets.offer(new OffsetReceivedAt(record)); unProcessedTotal.incrementAndGet(); } else { log.receivedOutdatedOffset(topicPartition, offset, lastProcessedOffset); @@ -387,13 +358,13 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() { long largestSequentialProcessedOffset = -1; while (!receivedOffsets.isEmpty()) { - if (!processedOffsets.remove(receivedOffsets.peek().getOffset())) { + if (!processedOffsets.remove(receivedOffsets.peek().offset())) { break; } unProcessedTotal.decrementAndGet(); OffsetReceivedAt poll = receivedOffsets.poll(); if (poll != null) { - largestSequentialProcessedOffset = poll.getOffset(); + largestSequentialProcessedOffset = poll.offset(); } } @@ -411,26 +382,29 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() { private void removeReceivedOffsetsFromLastProcessedOffset() { // Remove received offset from previous assignments if any - receivedOffsets.removeIf(o -> o.getOffset() <= lastProcessedOffset); + receivedOffsets.removeIf(o -> o.offset() <= lastProcessedOffset); } - long hasTooManyMessagesWithoutAck() { - if (receivedOffsets.isEmpty() || !isStillAssigned()) { - return -1; - } - OffsetReceivedAt peek = receivedOffsets.peek(); - if (peek == null) { - return -1; - } - long elapsed = System.currentTimeMillis() - peek.getReceivedAt(); + public void checkHasTooManyMessagesWithoutAck() { long lag = receivedOffsets.size(); - boolean waitedTooLong = elapsed > unprocessedRecordMaxAge; - if (waitedTooLong) { - log.waitingForAckForTooLong(peek.getOffset(), topicPartition, elapsed / 1000, unprocessedRecordMaxAge, - lag, lastProcessedOffset); - return elapsed; + if (lag == 0 || !isStillAssigned()) { + return; + } + OffsetReceivedAt received = receivedOffsets.peek(); + if (received != null) { + long elapsed = System.currentTimeMillis() - received.receivedAt(); + if (elapsed > unprocessedRecordMaxAge) { + long elapsedSeconds = elapsed / 1000; + log.waitingForAckForTooLong(received.offset(), topicPartition, elapsedSeconds, unprocessedRecordMaxAge, + lag, lastProcessedOffset); + reportFailure.accept(new TooManyMessagesWithoutAckException( + topicPartition, + received.offset(), + elapsedSeconds, + lag, + lastProcessedOffset), true); + } } - return -1; } private boolean isStillAssigned() { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java new file mode 100644 index 000000000..e1c7848d6 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java @@ -0,0 +1,151 @@ +package io.smallrye.reactive.messaging.kafka.commit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +/** + * Tests that the Kafka connector's {@code ProcessingInterceptor} implementation + * applies a processing timeout based on {@code throttled.unprocessed-record-max-age.ms}, + * causing stuck messages to be nacked and handled by the failure strategy. + */ +public class ProcessingTimeoutTest extends KafkaCompanionTestBase { + + @Test + public void testThrottledProcessingTimeoutNacksSlowConsumer() { + companion.topics().createAndWait(topic, 1); + + // Produce 5 messages + companion.produceStrings() + .fromRecords(IntStream.range(0, 5).boxed() + .map(i -> new ProducerRecord<>(topic, 0, "key", "value-" + i)) + .toList()) + .awaitCompletion(Duration.ofSeconds(10)); + + String groupId = "test-throttled-timeout-" + topic; + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data") + .with("topic", topic) + .with("group.id", groupId) + .with("auto.offset.reset", "earliest") + .with("value.deserializer", StringDeserializer.class.getName()) + .with("key.deserializer", StringDeserializer.class.getName()) + .with("commit-strategy", "throttled") + .with("failure-strategy", "ignore") + .with("throttled.unprocessed-record-max-age.ms", 1000) + .withPrefix(""); + + SlowConsumer app = runApplication(config, SlowConsumer.class); + + // The consumer delays processing for 60s, but the ProcessingInterceptor + // should timeout after 1000ms and nack each message. + // With failure-strategy=ignore, the nack completes the message and processing continues. + // All 5 messages should eventually be processed (timed out + nacked + ignored). + await().atMost(60, TimeUnit.SECONDS) + .pollInterval(Duration.ofMillis(500)) + .untilAsserted(() -> assertThat(app.getReceived()).hasSize(5)); + + // Verify offsets are committed for all messages + await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(companion.consumerGroups().offsets(groupId)) + .values().extracting(OffsetAndMetadata::offset) + .containsOnly(5L); + }); + } + + @Test + public void testFastProcessingCompletesNormally() { + companion.topics().createAndWait(topic, 1); + + // Produce 10 messages + companion.produceStrings() + .fromRecords(IntStream.range(0, 10).boxed() + .map(i -> new ProducerRecord<>(topic, 0, "key", "value-" + i)) + .toList()) + .awaitCompletion(Duration.ofSeconds(10)); + + String groupId = "test-throttled-fast-" + topic; + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data") + .with("topic", topic) + .with("group.id", groupId) + .with("auto.offset.reset", "earliest") + .with("value.deserializer", StringDeserializer.class.getName()) + .with("key.deserializer", StringDeserializer.class.getName()) + .with("commit-strategy", "throttled") + .with("failure-strategy", "ignore") + .with("throttled.unprocessed-record-max-age.ms", 5000) + .withPrefix(""); + + FastConsumer app = runApplication(config, FastConsumer.class); + + // Fast processing should complete without timeouts + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(app.getProcessedCount()).isEqualTo(10)); + + // Verify offsets committed + await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(companion.consumerGroups().offsets(groupId)) + .values().extracting(OffsetAndMetadata::offset) + .containsOnly(10L); + }); + + // No failures should have occurred + assertThat(app.getFailureCount()).isEqualTo(0); + } + + @ApplicationScoped + public static class SlowConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public Uni consume(String payload) { + received.add(payload); + // Simulate processing that takes much longer than the timeout + return Uni.createFrom().voidItem() + .onItem().delayIt().by(Duration.ofSeconds(60)); + } + + public List getReceived() { + return received; + } + } + + @ApplicationScoped + public static class FastConsumer { + private final AtomicInteger processedCount = new AtomicInteger(); + private final AtomicInteger failureCount = new AtomicInteger(); + + @Incoming("data") + public Uni consume(String payload) { + processedCount.incrementAndGet(); + return Uni.createFrom().voidItem(); + } + + public int getProcessedCount() { + return processedCount.get(); + } + + public int getFailureCount() { + return failureCount.get(); + } + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java index a5a008fd4..d4cc17c93 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java @@ -5,7 +5,9 @@ import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log; import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -49,6 +51,7 @@ public abstract class AbstractMediator { private Instance converters; private Instance extractors; private int maxConcurrency; + private List processingInterceptors = Collections.emptyList(); public AbstractMediator(MediatorConfiguration configuration) { this.configuration = configuration; @@ -116,6 +119,17 @@ public void setMaxConcurrency(int maxConcurrency) { this.maxConcurrency = maxConcurrency; } + public void setProcessingDecorators(List decorators) { + List interceptors = new ArrayList<>(); + for (ProcessingDecorator decorator : decorators) { + ProcessingDecorator.ProcessingInterceptor interceptor = decorator.decorate(configuration); + if (interceptor != null) { + interceptors.add(interceptor); + } + } + this.processingInterceptors = interceptors; + } + public void run() { // Do nothing by default. } @@ -223,6 +237,14 @@ public static Uni skipContextPropagation(Supplier> suppl return Infrastructure.onUniCreation(new UniCreateFromDeferredSupplier<>(supplier)); } + @SuppressWarnings("unchecked") + protected Uni decorateProcessing(Message message, Uni uni) { + for (ProcessingDecorator.ProcessingInterceptor decorator : processingInterceptors) { + uni = (Uni) decorator.intercept(uni, message); + } + return uni; + } + protected CompletionStage> getAckOrCompletion(Message message) { CompletionStage ack = message.ack(); if (ack != null) { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index e200d25be..d71cdb68d 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -381,19 +381,24 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi .onItem() - .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message)) - .onItemOrFailure() - .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) - .onItem().transformToMulti(this::handleSkip)); + .transformToMultiAndConcatenate( + message -> decorateProcessing(message, invokeBlocking(message, getArguments(message))) + .onItemOrFailure() + .transformToUni( + (o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) + .onItem().transformToMulti(this::handleSkip)); }; } else { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi - .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) - .onItemOrFailure() - .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) - .onItem().transformToMulti(this::handleSkip)) + .onItem() + .transformToMulti( + message -> decorateProcessing(message, invokeBlocking(message, getArguments(message))) + .onItemOrFailure() + .transformToUni( + (o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) + .onItem().transformToMulti(this::handleSkip)) .merge(maxConcurrency()); }; } @@ -403,7 +408,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message))) .onItem().transform(o -> (Message) o) .onItemOrFailure() .transformToUni((r, f) -> handlePostInvocationWithMessage(message, r, f)) @@ -421,21 +426,23 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() if (configuration.isBlockingExecutionOrdered()) { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem() - .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message)) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) - .onItem().transformToMulti(this::handleSkip)); + .transformToMultiAndConcatenate( + message -> decorateProcessing(message, invokeBlocking(message, getArguments(message))) + .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) + .onItem().transformToMulti(this::handleSkip)); } else { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) - .onItem().transformToMulti(this::handleSkip)) + .onItem().transformToMulti( + message -> decorateProcessing(message, invokeBlocking(message, getArguments(message))) + .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) + .onItem().transformToMulti(this::handleSkip)) .merge(maxConcurrency()); } } else { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message))) .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) .onItem().transformToMulti(this::handleSkip)); } @@ -521,8 +528,8 @@ private Uni> handlePostInvocationWithMessage(Message MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) - .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs)) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)) + .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs))) .onItemOrFailure() .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); @@ -534,8 +541,8 @@ private void processMethodReturningACompletionStageOfMessageAndConsumingIndividu private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) - .onItem().transformToUni(u -> (Uni) u) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)) + .onItem().transformToUni(u -> (Uni) u)) .onItemOrFailure() .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); @@ -547,8 +554,8 @@ private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() { private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) - .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs)) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)) + .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs))) .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) .onItem().transformToMulti(this::handleSkip)); } @@ -559,8 +566,8 @@ private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividu private void processMethodReturningAUniOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( - message -> invokeOnMessageContext(message, getArguments(message)) - .onItem().transformToUni(u -> (Uni) u) + message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)) + .onItem().transformToUni(u -> (Uni) u)) .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) .onItem().transformToMulti(this::handleSkip)); } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java index 090d24fad..1d8f09724 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java @@ -159,13 +159,15 @@ private void processMethodReturningVoid() { if (configuration.isBlocking()) { if (configuration.isBlockingExecutionOrdered()) { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToUniAndConcatenate(msg -> invokeBlocking(msg, getArguments(msg)) + .onItem().transformToUniAndConcatenate(msg -> decorateProcessing(msg, + invokeBlocking(msg, getArguments(msg))) .onItemOrFailure().transformToUni(handleInvocationResult(msg))) .onFailure() .invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure)); } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToUni(msg -> invokeBlocking(msg, getArguments(msg)) + .onItem().transformToUni(msg -> decorateProcessing(msg, + invokeBlocking(msg, getArguments(msg))) .onItemOrFailure().transformToUni(handleInvocationResult(msg))) .merge(maxConcurrency()) .onFailure() @@ -175,7 +177,7 @@ private void processMethodReturningVoid() { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem() .transformToUniAndConcatenate( - msg -> invokeOnMessageContext(msg, getArguments(msg)) + msg -> decorateProcessing(msg, invokeOnMessageContext(msg, getArguments(msg))) .onItemOrFailure().transformToUni(handleInvocationResult(msg))) .onFailure().invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure)); } @@ -217,8 +219,8 @@ private void processMethodReturningACompletionStage() { } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToUniAndConcatenate(msg -> { - Uni uni = invokeOnMessageContext(msg, getArguments(msg)) - .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs)); + Uni uni = decorateProcessing(msg, invokeOnMessageContext(msg, getArguments(msg)) + .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs))); return uni.onItemOrFailure().transformToUni(handleInvocationResult(msg)); }) .onFailure().invoke(this::reportFailure); @@ -226,7 +228,7 @@ private void processMethodReturningACompletionStage() { } private Uni> invokeBlockingAndHandleOutcome(Message msg) { - Uni uni = invokeBlocking(msg, getArguments(msg)); + Uni uni = decorateProcessing(msg, invokeBlocking(msg, getArguments(msg))); return uni.onItemOrFailure().transformToUni(handleInvocationResult(msg)); } @@ -250,8 +252,8 @@ private void processMethodReturningAUni() { } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToUniAndConcatenate(msg -> { - Uni uni = invokeOnMessageContext(msg, getArguments(msg)) - .onItem().transformToUni(u -> (Uni) u); + Uni uni = decorateProcessing(msg, invokeOnMessageContext(msg, getArguments(msg)) + .onItem().transformToUni(u -> (Uni) u)); return uni.onItemOrFailure().transformToUni(handleInvocationResult(msg)); }) .onFailure().invoke(this::reportFailure); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java index 2d8858c18..884d57fa2 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java @@ -2,6 +2,7 @@ import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONCURRENCY; import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONFIG_PREFIX; +import static io.smallrye.reactive.messaging.providers.helpers.CDIUtils.getSortedInstances; import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log; import java.lang.reflect.Constructor; @@ -9,6 +10,7 @@ import java.util.*; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.spi.*; import jakarta.inject.Inject; @@ -88,6 +90,10 @@ public class MediatorManager { @Inject Instance configInstance; + @Inject + @Any + Instance processingDecorators; + public void analyze(AnnotatedType annotatedType, Bean bean) { if (strictMode) { @@ -215,6 +221,7 @@ public AbstractMediator createMediator(MediatorConfiguration configuration) { mediator.setHealth(health); mediator.setWorkerPoolRegistry(workerPoolRegistry); mediator.setMaxConcurrency(getWorkerMaxConcurrency(configuration)); + mediator.setProcessingDecorators(getSortedInstances(processingDecorators)); try { Object beanInstance = beanManager.getReference(configuration.getBean(), Object.class, diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/interceptor/ProcessingInterceptorTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/interceptor/ProcessingInterceptorTest.java new file mode 100644 index 000000000..f9c25aceb --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/interceptor/ProcessingInterceptorTest.java @@ -0,0 +1,155 @@ +package io.smallrye.reactive.messaging.interceptor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.MediatorConfiguration; +import io.smallrye.reactive.messaging.ProcessingDecorator; +import io.smallrye.reactive.messaging.WeldTestBase; + +public class ProcessingInterceptorTest extends WeldTestBase { + + @Test + public void testInterceptorAppliesTimeoutAndNacks() { + addBeanClass(TimeoutInterceptor.class, SlowSubscriberBean.class, NackTrackingSource.class); + initialize(); + + NackTrackingSource source = container.select(NackTrackingSource.class).get(); + + // The interceptor applies a 200ms timeout, processing takes 2s, + // so messages should be nacked (POST_PROCESSING ack). + // All 5 messages should eventually be nacked since they all time out. + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(source.getNacks()).hasSize(5)); + + // Verify nack reason is a TimeoutException + assertThat(source.getNackReasons().get(0)).isInstanceOf(io.smallrye.mutiny.TimeoutException.class); + } + + @Test + public void testInterceptorPassesThroughFastProcessing() { + addBeanClass(TimeoutInterceptor.class, FastSubscriberBean.class, NackTrackingSource.class); + initialize(); + + FastSubscriberBean bean = container.select(FastSubscriberBean.class).get(); + NackTrackingSource source = container.select(NackTrackingSource.class).get(); + + // Processing is fast, so all messages should complete normally + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(bean.getProcessedCount()).isEqualTo(5)); + + assertThat(source.getNacks()).isEmpty(); + assertThat(source.getAcks()).hasSize(5); + } + + @Test + public void testInterceptorOnlyAppliesToMatchingChannels() { + addBeanClass(ChannelFilteringInterceptor.class, FastSubscriberBean.class, NackTrackingSource.class); + initialize(); + + FastSubscriberBean bean = container.select(FastSubscriberBean.class).get(); + NackTrackingSource source = container.select(NackTrackingSource.class).get(); + + // The interceptor only applies to "other-channel", not "data", + // so processing should complete normally + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(bean.getProcessedCount()).isEqualTo(5)); + + assertThat(source.getNacks()).isEmpty(); + } + + @ApplicationScoped + public static class TimeoutInterceptor implements ProcessingDecorator { + @Override + public ProcessingInterceptor decorate(MediatorConfiguration configuration) { + return (processing, message) -> processing.ifNoItem().after(Duration.ofMillis(200)).fail(); + } + } + + @ApplicationScoped + public static class ChannelFilteringInterceptor implements ProcessingDecorator { + @Override + public ProcessingInterceptor decorate(MediatorConfiguration configuration) { + if (configuration.getIncoming().contains("other-channel")) { + return (processing, message) -> processing.ifNoItem().after(Duration.ofMillis(1)).fail(); + } + return null; + } + } + + @ApplicationScoped + public static class NackTrackingSource { + private final List acks = new CopyOnWriteArrayList<>(); + private final List nacks = new CopyOnWriteArrayList<>(); + private final List nackReasons = new CopyOnWriteArrayList<>(); + + @Outgoing("data") + public Flow.Publisher> source() { + return Multi.createFrom().range(0, 5) + .map(i -> Message.of(i, + () -> { + acks.add(i); + return CompletableFuture.completedFuture(null); + }, + t -> { + nacks.add(i); + nackReasons.add(t); + return CompletableFuture.completedFuture(null); + })); + } + + public List getAcks() { + return acks; + } + + public List getNacks() { + return nacks; + } + + public List getNackReasons() { + return nackReasons; + } + } + + @ApplicationScoped + public static class SlowSubscriberBean { + @Incoming("data") + public Uni consume(int payload) { + // Simulate slow processing (longer than 200ms timeout) + return Uni.createFrom().voidItem() + .onItem().delayIt().by(Duration.ofSeconds(2)); + } + } + + @ApplicationScoped + public static class FastSubscriberBean { + private final AtomicInteger processedCount = new AtomicInteger(); + + @Incoming("data") + public CompletionStage consume(int payload) { + processedCount.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + public int getProcessedCount() { + return processedCount.get(); + } + } +}