diff --git a/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java b/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java
new file mode 100644
index 000000000..7d7e00d46
--- /dev/null
+++ b/api/src/main/java/io/smallrye/reactive/messaging/ProcessingDecorator.java
@@ -0,0 +1,68 @@
+package io.smallrye.reactive.messaging;
+
+import jakarta.enterprise.inject.spi.Prioritized;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import io.smallrye.common.annotation.Experimental;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * SPI interface for decorating message processing at the invocation level.
+ *
+ * Implementations can decorate the {@link Uni} representing the method invocation result,
+ * enabling cross-cutting concerns such as timeouts, metrics, or circuit breaking
+ * to be applied per-message.
+ *
+ * Similar to {@link PublisherDecorator} and {@link SubscriberDecorator} which operate
+ * at the stream level, this decorator operates at the individual message processing level.
+ *
+ * The {@link #decorate(MediatorConfiguration)} method is called once per mediator during
+ * initialization. It receives the mediator configuration and returns a
+ * {@link ProcessingInterceptor} to apply per-message, or {@code null} if this decorator
+ * does not apply to the given mediator.
+ *
+ * Implementations are CDI beans discovered at startup. Multiple decorators are chained
+ * in priority order (via {@link Prioritized}).
+ */
+@Experimental("SmallRye only feature")
+public interface ProcessingDecorator extends Prioritized {
+
+ int DEFAULT_PRIORITY = 100;
+
+ /**
+ * Called once per mediator during initialization to obtain a per-message interceptor.
+ *
+ * The mediator configuration provides access to incoming channel names, acknowledgment
+ * strategy, method signature, blocking flag, and other mediator-level concerns.
+ *
+ * @param configuration the mediator configuration
+ * @return a {@link ProcessingInterceptor} to apply per-message, or {@code null} if this
+ * decorator does not apply to the given mediator
+ */
+ ProcessingInterceptor decorate(MediatorConfiguration configuration);
+
+ @Override
+ default int getPriority() {
+ return DEFAULT_PRIORITY;
+ }
+
+ /**
+ * A per-message interceptor returned by {@link ProcessingDecorator#decorate(MediatorConfiguration)}.
+ */
+ @FunctionalInterface
+ interface ProcessingInterceptor {
+ /**
+ * Intercepts the processing {@link Uni} for a single message.
+ *
+ * The returned Uni replaces the original processing Uni. Implementations must
+ * ensure that the returned Uni eventually completes (or fails) to avoid stalling
+ * the processing pipeline.
+ *
+ * @param processing the Uni representing the method invocation
+ * @param message the message being processed
+ * @return the decorated Uni
+ */
+ Uni> intercept(Uni> processing, Message> message);
+ }
+}
diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java
index 70619b1a5..7af8e3639 100644
--- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java
+++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java
@@ -3,7 +3,9 @@
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
+import java.time.Duration;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
@@ -29,6 +31,8 @@
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ClientCustomizer;
+import io.smallrye.reactive.messaging.MediatorConfiguration;
+import io.smallrye.reactive.messaging.ProcessingDecorator;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
@@ -137,7 +141,7 @@
@ConnectorAttribute(name = "pooled-producer", type = "boolean", direction = Direction.OUTGOING, description = "Whether to use a pool of Kafka producers for concurrent transactions. Each transaction scope acquires a producer from the pool, enabling concurrent exactly-once processing. Requires transactional.id to be set.", defaultValue = "false")
@ConnectorAttribute(name = "pooled-producer.initial-pool-size", type = "int", direction = Direction.OUTGOING, description = "Number of Kafka producers to pre-create in the pool at startup. Respects the lazy-client setting. Defaults to 0 (lazy creation).", defaultValue = "0")
@ConnectorAttribute(name = "pooled-producer.max-pool-size", type = "int", direction = Direction.OUTGOING, description = "Maximum number of Kafka producers in the pool. When all producers are in use and the pool is exhausted, an exception is thrown. Defaults to 10.", defaultValue = "10")
-public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter {
+public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter, ProcessingDecorator {
public static final String CONNECTOR_NAME = "smallrye-kafka";
@@ -189,6 +193,7 @@ public SpanBuilder spanBuilder(final String spanName) {
private final List> sources = new CopyOnWriteArrayList<>();
private final List> shareGroupSources = new CopyOnWriteArrayList<>();
private final List sinks = new CopyOnWriteArrayList<>();
+ private final Map channelProcessingTimeouts = new ConcurrentHashMap<>();
@Inject
KafkaAdminClientRegistry adminClientRegistry;
@@ -239,6 +244,9 @@ public Flow.Publisher extends Message>> getPublisher(Config config) {
return s;
});
+ // Register processing timeout for this channel (used by ProcessingTimeoutProvider SPI)
+ registerProcessingTimeout(ic);
+
if (ic.getShareGroup()) {
if (partitions > 1) {
log.warn("Share group mode does not support multiple partitions. Using single consumer.");
@@ -416,4 +424,33 @@ public List> getShareConsumers(String channel) {
.map(s -> (KafkaShareConsumer) s.getConsumer())
.toList();
}
+
+ private void registerProcessingTimeout(KafkaConnectorIncomingConfiguration ic) {
+ String channel = ic.getChannel();
+ if (ic.getShareGroup()) {
+ int maxAge = ic.getShareGroupUnprocessedRecordMaxAgeMs();
+ if (maxAge > 0) {
+ channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge));
+ }
+ } else {
+ String commitStrategy = ic.getCommitStrategy().orElse(KafkaCommitHandler.Strategy.THROTTLED);
+ if (KafkaCommitHandler.Strategy.THROTTLED.equals(commitStrategy)) {
+ int maxAge = ic.getThrottledUnprocessedRecordMaxAgeMs();
+ if (maxAge > 0) {
+ channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge));
+ }
+ }
+ }
+ }
+
+ @Override
+ public ProcessingInterceptor decorate(MediatorConfiguration configuration) {
+ for (String channel : configuration.getIncoming()) {
+ Duration timeout = channelProcessingTimeouts.get(channel);
+ if (timeout != null) {
+ return (processing, message) -> processing.ifNoItem().after(timeout).fail();
+ }
+ }
+ return null;
+ }
}
diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java
index 89853806e..36a5015e6 100644
--- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java
+++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.java
@@ -209,7 +209,7 @@ public Uni> received(IncomingKafkaRecord
.emitOn(this::runOnContext) // Switch back to event loop
.onItem().transform(offsets -> {
OffsetAndMetadata lastCommitted = offsets.get(recordsTopicPartition);
- OffsetStore store = new OffsetStore(recordsTopicPartition, unprocessedRecordMaxAge,
+ OffsetStore store = new OffsetStore(recordsTopicPartition,
lastCommitted == null ? -1 : lastCommitted.offset() - 1);
offsetStores.put(recordsTopicPartition, store);
return store;
@@ -220,7 +220,7 @@ public Uni> received(IncomingKafkaRecord
return uni
.onItem().invoke(store -> {
- store.received(record.getOffset());
+ store.received(record);
if (timerId < 0) {
startFlushAndCheckHealthTimer();
}
@@ -307,44 +307,20 @@ private void flushAndCheckHealth(long ignored) {
if (this.unprocessedRecordMaxAge > 0) {
for (OffsetStore store : offsetStores.values()) {
- long millis = store.hasTooManyMessagesWithoutAck();
- if (millis != -1) {
- OffsetReceivedAt received = store.receivedOffsets.peek();
- if (received != null) {
- long lastOffset = store.getLastProcessedOffset();
- TooManyMessagesWithoutAckException exception = new TooManyMessagesWithoutAckException(
- store.topicPartition,
- received.offset,
- millis / 1000,
- store.receivedOffsets.size(),
- lastOffset);
- this.reportFailure.accept(exception, true);
- }
- }
+ store.checkHasTooManyMessagesWithoutAck();
}
}
}
- private static class OffsetReceivedAt {
- private final long offset;
- private final long receivedAt;
-
- private OffsetReceivedAt(long offset, long receivedAt) {
- this.offset = offset;
- this.receivedAt = receivedAt;
- }
+ private record OffsetReceivedAt(IncomingKafkaRecord, ?> record, long receivedAt) {
- static OffsetReceivedAt received(long offset) {
- return new OffsetReceivedAt(offset, System.currentTimeMillis());
+ OffsetReceivedAt(IncomingKafkaRecord, ?> record) {
+ this(record, System.currentTimeMillis());
}
- public long getOffset() {
- return offset;
- }
-
- public long getReceivedAt() {
- return receivedAt;
+ long offset() {
+ return record.getOffset();
}
}
@@ -352,26 +328,21 @@ private class OffsetStore {
private final TopicPartition topicPartition;
private final Queue receivedOffsets = new PriorityQueue<>(
- Comparator.comparingLong(OffsetReceivedAt::getOffset));
+ Comparator.comparingLong(OffsetReceivedAt::offset));
private final Set processedOffsets = new HashSet<>();
- private final int unprocessedRecordMaxAge;
private final AtomicLong unProcessedTotal = new AtomicLong();
private long lastProcessedOffset;
- OffsetStore(TopicPartition topicPartition, int unprocessedRecordMaxAge, long lastProcessedOffset) {
+ OffsetStore(TopicPartition topicPartition, long lastProcessedOffset) {
this.topicPartition = topicPartition;
- this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
log.initializeStoreAtPosition(topicPartition, lastProcessedOffset);
this.lastProcessedOffset = lastProcessedOffset;
}
- long getLastProcessedOffset() {
- return lastProcessedOffset;
- }
-
- void received(long offset) {
+ void received(IncomingKafkaRecord, ?> record) {
+ long offset = record.getOffset();
if (offset > lastProcessedOffset) {
- this.receivedOffsets.offer(OffsetReceivedAt.received(offset));
+ this.receivedOffsets.offer(new OffsetReceivedAt(record));
unProcessedTotal.incrementAndGet();
} else {
log.receivedOutdatedOffset(topicPartition, offset, lastProcessedOffset);
@@ -387,13 +358,13 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
long largestSequentialProcessedOffset = -1;
while (!receivedOffsets.isEmpty()) {
- if (!processedOffsets.remove(receivedOffsets.peek().getOffset())) {
+ if (!processedOffsets.remove(receivedOffsets.peek().offset())) {
break;
}
unProcessedTotal.decrementAndGet();
OffsetReceivedAt poll = receivedOffsets.poll();
if (poll != null) {
- largestSequentialProcessedOffset = poll.getOffset();
+ largestSequentialProcessedOffset = poll.offset();
}
}
@@ -411,26 +382,29 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
private void removeReceivedOffsetsFromLastProcessedOffset() {
// Remove received offset from previous assignments if any
- receivedOffsets.removeIf(o -> o.getOffset() <= lastProcessedOffset);
+ receivedOffsets.removeIf(o -> o.offset() <= lastProcessedOffset);
}
- long hasTooManyMessagesWithoutAck() {
- if (receivedOffsets.isEmpty() || !isStillAssigned()) {
- return -1;
- }
- OffsetReceivedAt peek = receivedOffsets.peek();
- if (peek == null) {
- return -1;
- }
- long elapsed = System.currentTimeMillis() - peek.getReceivedAt();
+ public void checkHasTooManyMessagesWithoutAck() {
long lag = receivedOffsets.size();
- boolean waitedTooLong = elapsed > unprocessedRecordMaxAge;
- if (waitedTooLong) {
- log.waitingForAckForTooLong(peek.getOffset(), topicPartition, elapsed / 1000, unprocessedRecordMaxAge,
- lag, lastProcessedOffset);
- return elapsed;
+ if (lag == 0 || !isStillAssigned()) {
+ return;
+ }
+ OffsetReceivedAt received = receivedOffsets.peek();
+ if (received != null) {
+ long elapsed = System.currentTimeMillis() - received.receivedAt();
+ if (elapsed > unprocessedRecordMaxAge) {
+ long elapsedSeconds = elapsed / 1000;
+ log.waitingForAckForTooLong(received.offset(), topicPartition, elapsedSeconds, unprocessedRecordMaxAge,
+ lag, lastProcessedOffset);
+ reportFailure.accept(new TooManyMessagesWithoutAckException(
+ topicPartition,
+ received.offset(),
+ elapsedSeconds,
+ lag,
+ lastProcessedOffset), true);
+ }
}
- return -1;
}
private boolean isStillAssigned() {
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java
new file mode 100644
index 000000000..e1c7848d6
--- /dev/null
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ProcessingTimeoutTest.java
@@ -0,0 +1,151 @@
+package io.smallrye.reactive.messaging.kafka.commit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
+import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
+
+/**
+ * Tests that the Kafka connector's {@code ProcessingInterceptor} implementation
+ * applies a processing timeout based on {@code throttled.unprocessed-record-max-age.ms},
+ * causing stuck messages to be nacked and handled by the failure strategy.
+ */
+public class ProcessingTimeoutTest extends KafkaCompanionTestBase {
+
+ @Test
+ public void testThrottledProcessingTimeoutNacksSlowConsumer() {
+ companion.topics().createAndWait(topic, 1);
+
+ // Produce 5 messages
+ companion.produceStrings()
+ .fromRecords(IntStream.range(0, 5).boxed()
+ .map(i -> new ProducerRecord<>(topic, 0, "key", "value-" + i))
+ .toList())
+ .awaitCompletion(Duration.ofSeconds(10));
+
+ String groupId = "test-throttled-timeout-" + topic;
+ MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
+ .with("topic", topic)
+ .with("group.id", groupId)
+ .with("auto.offset.reset", "earliest")
+ .with("value.deserializer", StringDeserializer.class.getName())
+ .with("key.deserializer", StringDeserializer.class.getName())
+ .with("commit-strategy", "throttled")
+ .with("failure-strategy", "ignore")
+ .with("throttled.unprocessed-record-max-age.ms", 1000)
+ .withPrefix("");
+
+ SlowConsumer app = runApplication(config, SlowConsumer.class);
+
+ // The consumer delays processing for 60s, but the ProcessingInterceptor
+ // should timeout after 1000ms and nack each message.
+ // With failure-strategy=ignore, the nack completes the message and processing continues.
+ // All 5 messages should eventually be processed (timed out + nacked + ignored).
+ await().atMost(60, TimeUnit.SECONDS)
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> assertThat(app.getReceived()).hasSize(5));
+
+ // Verify offsets are committed for all messages
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ assertThat(companion.consumerGroups().offsets(groupId))
+ .values().extracting(OffsetAndMetadata::offset)
+ .containsOnly(5L);
+ });
+ }
+
+ @Test
+ public void testFastProcessingCompletesNormally() {
+ companion.topics().createAndWait(topic, 1);
+
+ // Produce 10 messages
+ companion.produceStrings()
+ .fromRecords(IntStream.range(0, 10).boxed()
+ .map(i -> new ProducerRecord<>(topic, 0, "key", "value-" + i))
+ .toList())
+ .awaitCompletion(Duration.ofSeconds(10));
+
+ String groupId = "test-throttled-fast-" + topic;
+ MapBasedConfig config = kafkaConfig("mp.messaging.incoming.data")
+ .with("topic", topic)
+ .with("group.id", groupId)
+ .with("auto.offset.reset", "earliest")
+ .with("value.deserializer", StringDeserializer.class.getName())
+ .with("key.deserializer", StringDeserializer.class.getName())
+ .with("commit-strategy", "throttled")
+ .with("failure-strategy", "ignore")
+ .with("throttled.unprocessed-record-max-age.ms", 5000)
+ .withPrefix("");
+
+ FastConsumer app = runApplication(config, FastConsumer.class);
+
+ // Fast processing should complete without timeouts
+ await().atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(app.getProcessedCount()).isEqualTo(10));
+
+ // Verify offsets committed
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ assertThat(companion.consumerGroups().offsets(groupId))
+ .values().extracting(OffsetAndMetadata::offset)
+ .containsOnly(10L);
+ });
+
+ // No failures should have occurred
+ assertThat(app.getFailureCount()).isEqualTo(0);
+ }
+
+ @ApplicationScoped
+ public static class SlowConsumer {
+ private final List received = new CopyOnWriteArrayList<>();
+
+ @Incoming("data")
+ public Uni consume(String payload) {
+ received.add(payload);
+ // Simulate processing that takes much longer than the timeout
+ return Uni.createFrom().voidItem()
+ .onItem().delayIt().by(Duration.ofSeconds(60));
+ }
+
+ public List getReceived() {
+ return received;
+ }
+ }
+
+ @ApplicationScoped
+ public static class FastConsumer {
+ private final AtomicInteger processedCount = new AtomicInteger();
+ private final AtomicInteger failureCount = new AtomicInteger();
+
+ @Incoming("data")
+ public Uni consume(String payload) {
+ processedCount.incrementAndGet();
+ return Uni.createFrom().voidItem();
+ }
+
+ public int getProcessedCount() {
+ return processedCount.get();
+ }
+
+ public int getFailureCount() {
+ return failureCount.get();
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java
index a5a008fd4..d4cc17c93 100644
--- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java
+++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java
@@ -5,7 +5,9 @@
import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log;
import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -49,6 +51,7 @@ public abstract class AbstractMediator {
private Instance converters;
private Instance extractors;
private int maxConcurrency;
+ private List processingInterceptors = Collections.emptyList();
public AbstractMediator(MediatorConfiguration configuration) {
this.configuration = configuration;
@@ -116,6 +119,17 @@ public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}
+ public void setProcessingDecorators(List decorators) {
+ List interceptors = new ArrayList<>();
+ for (ProcessingDecorator decorator : decorators) {
+ ProcessingDecorator.ProcessingInterceptor interceptor = decorator.decorate(configuration);
+ if (interceptor != null) {
+ interceptors.add(interceptor);
+ }
+ }
+ this.processingInterceptors = interceptors;
+ }
+
public void run() {
// Do nothing by default.
}
@@ -223,6 +237,14 @@ public static Uni skipContextPropagation(Supplier> suppl
return Infrastructure.onUniCreation(new UniCreateFromDeferredSupplier<>(supplier));
}
+ @SuppressWarnings("unchecked")
+ protected Uni decorateProcessing(Message> message, Uni uni) {
+ for (ProcessingDecorator.ProcessingInterceptor decorator : processingInterceptors) {
+ uni = (Uni) decorator.intercept(uni, message);
+ }
+ return uni;
+ }
+
protected CompletionStage> getAckOrCompletion(Message> message) {
CompletionStage ack = message.ack();
if (ack != null) {
diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java
index e200d25be..d71cdb68d 100644
--- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java
+++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java
@@ -381,19 +381,24 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem()
Multi extends Message>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi
.onItem()
- .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message))
- .onItemOrFailure()
- .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message>) o, t))
- .onItem().transformToMulti(this::handleSkip));
+ .transformToMultiAndConcatenate(
+ message -> decorateProcessing(message, invokeBlocking(message, getArguments(message)))
+ .onItemOrFailure()
+ .transformToUni(
+ (o, t) -> handlePostInvocationWithMessage(message, (Message>) o, t))
+ .onItem().transformToMulti(this::handleSkip));
};
} else {
this.mapper = upstream -> {
Multi extends Message>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi
- .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
- .onItemOrFailure()
- .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message>) o, t))
- .onItem().transformToMulti(this::handleSkip))
+ .onItem()
+ .transformToMulti(
+ message -> decorateProcessing(message, invokeBlocking(message, getArguments(message)))
+ .onItemOrFailure()
+ .transformToUni(
+ (o, t) -> handlePostInvocationWithMessage(message, (Message>) o, t))
+ .onItem().transformToMulti(this::handleSkip))
.merge(maxConcurrency());
};
}
@@ -403,7 +408,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem()
Multi extends Message>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi
.onItem().transformToMultiAndConcatenate(
- message -> invokeOnMessageContext(message, getArguments(message))
+ message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)))
.onItem().transform(o -> (Message>) o)
.onItemOrFailure()
.transformToUni((r, f) -> handlePostInvocationWithMessage(message, r, f))
@@ -421,21 +426,23 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem()
if (configuration.isBlockingExecutionOrdered()) {
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem()
- .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message))
- .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
- .onItem().transformToMulti(this::handleSkip));
+ .transformToMultiAndConcatenate(
+ message -> decorateProcessing(message, invokeBlocking(message, getArguments(message)))
+ .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
+ .onItem().transformToMulti(this::handleSkip));
} else {
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
- .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
- .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
- .onItem().transformToMulti(this::handleSkip))
+ .onItem().transformToMulti(
+ message -> decorateProcessing(message, invokeBlocking(message, getArguments(message)))
+ .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
+ .onItem().transformToMulti(this::handleSkip))
.merge(maxConcurrency());
}
} else {
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToMultiAndConcatenate(
- message -> invokeOnMessageContext(message, getArguments(message))
+ message -> decorateProcessing(message, invokeOnMessageContext(message, getArguments(message)))
.onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
.onItem().transformToMulti(this::handleSkip));
}
@@ -521,8 +528,8 @@ private Uni extends Message