Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.smallrye.reactive.messaging;

import jakarta.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Uni;

/**
* SPI interface for decorating message processing at the invocation level.
* <p>
* Implementations can decorate the {@link Uni} representing the method invocation result,
* enabling cross-cutting concerns such as timeouts, metrics, or circuit breaking
* to be applied per-message.
* <p>
* Similar to {@link PublisherDecorator} and {@link SubscriberDecorator} which operate
* at the stream level, this decorator operates at the individual message processing level.
* <p>
* The {@link #decorate(MediatorConfiguration)} method is called once per mediator during
* initialization. It receives the mediator configuration and returns a
* {@link ProcessingInterceptor} to apply per-message, or {@code null} if this decorator
* does not apply to the given mediator.
* <p>
* Implementations are CDI beans discovered at startup. Multiple decorators are chained
* in priority order (via {@link Prioritized}).
*/
@Experimental("SmallRye only feature")
public interface ProcessingDecorator extends Prioritized {

int DEFAULT_PRIORITY = 100;

/**
* Called once per mediator during initialization to obtain a per-message interceptor.
* <p>
* The mediator configuration provides access to incoming channel names, acknowledgment
* strategy, method signature, blocking flag, and other mediator-level concerns.
*
* @param configuration the mediator configuration
* @return a {@link ProcessingInterceptor} to apply per-message, or {@code null} if this
* decorator does not apply to the given mediator
*/
ProcessingInterceptor decorate(MediatorConfiguration configuration);

@Override
default int getPriority() {
return DEFAULT_PRIORITY;
}

/**
* A per-message interceptor returned by {@link ProcessingDecorator#decorate(MediatorConfiguration)}.
*/
@FunctionalInterface
interface ProcessingInterceptor {
/**
* Intercepts the processing {@link Uni} for a single message.
* <p>
* The returned Uni replaces the original processing Uni. Implementations must
* ensure that the returned Uni eventually completes (or fails) to avoid stalling
* the processing pipeline.
*
* @param processing the Uni representing the method invocation
* @param message the message being processed
* @return the decorated Uni
*/
Uni<?> intercept(Uni<?> processing, Message<?> message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
Expand All @@ -29,6 +31,8 @@
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.ProcessingDecorator;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
Expand Down Expand Up @@ -137,7 +141,7 @@
@ConnectorAttribute(name = "pooled-producer", type = "boolean", direction = Direction.OUTGOING, description = "Whether to use a pool of Kafka producers for concurrent transactions. Each transaction scope acquires a producer from the pool, enabling concurrent exactly-once processing. Requires transactional.id to be set.", defaultValue = "false")
@ConnectorAttribute(name = "pooled-producer.initial-pool-size", type = "int", direction = Direction.OUTGOING, description = "Number of Kafka producers to pre-create in the pool at startup. Respects the lazy-client setting. Defaults to 0 (lazy creation).", defaultValue = "0")
@ConnectorAttribute(name = "pooled-producer.max-pool-size", type = "int", direction = Direction.OUTGOING, description = "Maximum number of Kafka producers in the pool. When all producers are in use and the pool is exhausted, an exception is thrown. Defaults to 10.", defaultValue = "10")
public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter {
public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter, ProcessingDecorator {

public static final String CONNECTOR_NAME = "smallrye-kafka";

Expand Down Expand Up @@ -189,6 +193,7 @@ public SpanBuilder spanBuilder(final String spanName) {
private final List<KafkaSource<?, ?>> sources = new CopyOnWriteArrayList<>();
private final List<KafkaShareGroupSource<?, ?>> shareGroupSources = new CopyOnWriteArrayList<>();
private final List<KafkaSink> sinks = new CopyOnWriteArrayList<>();
private final Map<String, Duration> channelProcessingTimeouts = new ConcurrentHashMap<>();

@Inject
KafkaAdminClientRegistry adminClientRegistry;
Expand Down Expand Up @@ -239,6 +244,9 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
return s;
});

// Register processing timeout for this channel (used by ProcessingTimeoutProvider SPI)
registerProcessingTimeout(ic);

if (ic.getShareGroup()) {
if (partitions > 1) {
log.warn("Share group mode does not support multiple partitions. Using single consumer.");
Expand Down Expand Up @@ -416,4 +424,33 @@ public <V, K> List<KafkaShareConsumer<K, V>> getShareConsumers(String channel) {
.map(s -> (KafkaShareConsumer<K, V>) s.getConsumer())
.toList();
}

private void registerProcessingTimeout(KafkaConnectorIncomingConfiguration ic) {
String channel = ic.getChannel();
if (ic.getShareGroup()) {
int maxAge = ic.getShareGroupUnprocessedRecordMaxAgeMs();
if (maxAge > 0) {
channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge));
}
} else {
String commitStrategy = ic.getCommitStrategy().orElse(KafkaCommitHandler.Strategy.THROTTLED);
if (KafkaCommitHandler.Strategy.THROTTLED.equals(commitStrategy)) {
int maxAge = ic.getThrottledUnprocessedRecordMaxAgeMs();
if (maxAge > 0) {
channelProcessingTimeouts.put(channel, Duration.ofMillis(maxAge));
}
}
}
}

@Override
public ProcessingInterceptor decorate(MediatorConfiguration configuration) {
for (String channel : configuration.getIncoming()) {
Duration timeout = channelProcessingTimeouts.get(channel);
if (timeout != null) {
return (processing, message) -> processing.ifNoItem().after(timeout).fail();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V>
.emitOn(this::runOnContext) // Switch back to event loop
.onItem().transform(offsets -> {
OffsetAndMetadata lastCommitted = offsets.get(recordsTopicPartition);
OffsetStore store = new OffsetStore(recordsTopicPartition, unprocessedRecordMaxAge,
OffsetStore store = new OffsetStore(recordsTopicPartition,
lastCommitted == null ? -1 : lastCommitted.offset() - 1);
offsetStores.put(recordsTopicPartition, store);
return store;
Expand All @@ -220,7 +220,7 @@ public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V>

return uni
.onItem().invoke(store -> {
store.received(record.getOffset());
store.received(record);
if (timerId < 0) {
startFlushAndCheckHealthTimer();
}
Expand Down Expand Up @@ -307,71 +307,42 @@ private void flushAndCheckHealth(long ignored) {

if (this.unprocessedRecordMaxAge > 0) {
for (OffsetStore store : offsetStores.values()) {
long millis = store.hasTooManyMessagesWithoutAck();
if (millis != -1) {
OffsetReceivedAt received = store.receivedOffsets.peek();
if (received != null) {
long lastOffset = store.getLastProcessedOffset();
TooManyMessagesWithoutAckException exception = new TooManyMessagesWithoutAckException(
store.topicPartition,
received.offset,
millis / 1000,
store.receivedOffsets.size(),
lastOffset);
this.reportFailure.accept(exception, true);
}
}
store.checkHasTooManyMessagesWithoutAck();
}
}

}

private static class OffsetReceivedAt {
private final long offset;
private final long receivedAt;

private OffsetReceivedAt(long offset, long receivedAt) {
this.offset = offset;
this.receivedAt = receivedAt;
}
private record OffsetReceivedAt(IncomingKafkaRecord<?, ?> record, long receivedAt) {

static OffsetReceivedAt received(long offset) {
return new OffsetReceivedAt(offset, System.currentTimeMillis());
OffsetReceivedAt(IncomingKafkaRecord<?, ?> record) {
this(record, System.currentTimeMillis());
}

public long getOffset() {
return offset;
}

public long getReceivedAt() {
return receivedAt;
long offset() {
return record.getOffset();
}
}

private class OffsetStore {

private final TopicPartition topicPartition;
private final Queue<OffsetReceivedAt> receivedOffsets = new PriorityQueue<>(
Comparator.comparingLong(OffsetReceivedAt::getOffset));
Comparator.comparingLong(OffsetReceivedAt::offset));
private final Set<Long> processedOffsets = new HashSet<>();
private final int unprocessedRecordMaxAge;
private final AtomicLong unProcessedTotal = new AtomicLong();
private long lastProcessedOffset;

OffsetStore(TopicPartition topicPartition, int unprocessedRecordMaxAge, long lastProcessedOffset) {
OffsetStore(TopicPartition topicPartition, long lastProcessedOffset) {
this.topicPartition = topicPartition;
this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
log.initializeStoreAtPosition(topicPartition, lastProcessedOffset);
this.lastProcessedOffset = lastProcessedOffset;
}

long getLastProcessedOffset() {
return lastProcessedOffset;
}

void received(long offset) {
void received(IncomingKafkaRecord<?, ?> record) {
long offset = record.getOffset();
if (offset > lastProcessedOffset) {
this.receivedOffsets.offer(OffsetReceivedAt.received(offset));
this.receivedOffsets.offer(new OffsetReceivedAt(record));
unProcessedTotal.incrementAndGet();
} else {
log.receivedOutdatedOffset(topicPartition, offset, lastProcessedOffset);
Expand All @@ -387,13 +358,13 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
long largestSequentialProcessedOffset = -1;

while (!receivedOffsets.isEmpty()) {
if (!processedOffsets.remove(receivedOffsets.peek().getOffset())) {
if (!processedOffsets.remove(receivedOffsets.peek().offset())) {
break;
}
unProcessedTotal.decrementAndGet();
OffsetReceivedAt poll = receivedOffsets.poll();
if (poll != null) {
largestSequentialProcessedOffset = poll.getOffset();
largestSequentialProcessedOffset = poll.offset();
}
}

Expand All @@ -411,26 +382,29 @@ long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {

private void removeReceivedOffsetsFromLastProcessedOffset() {
// Remove received offset from previous assignments if any
receivedOffsets.removeIf(o -> o.getOffset() <= lastProcessedOffset);
receivedOffsets.removeIf(o -> o.offset() <= lastProcessedOffset);
}

long hasTooManyMessagesWithoutAck() {
if (receivedOffsets.isEmpty() || !isStillAssigned()) {
return -1;
}
OffsetReceivedAt peek = receivedOffsets.peek();
if (peek == null) {
return -1;
}
long elapsed = System.currentTimeMillis() - peek.getReceivedAt();
public void checkHasTooManyMessagesWithoutAck() {
long lag = receivedOffsets.size();
boolean waitedTooLong = elapsed > unprocessedRecordMaxAge;
if (waitedTooLong) {
log.waitingForAckForTooLong(peek.getOffset(), topicPartition, elapsed / 1000, unprocessedRecordMaxAge,
lag, lastProcessedOffset);
return elapsed;
if (lag == 0 || !isStillAssigned()) {
return;
}
OffsetReceivedAt received = receivedOffsets.peek();
if (received != null) {
long elapsed = System.currentTimeMillis() - received.receivedAt();
if (elapsed > unprocessedRecordMaxAge) {
long elapsedSeconds = elapsed / 1000;
log.waitingForAckForTooLong(received.offset(), topicPartition, elapsedSeconds, unprocessedRecordMaxAge,
lag, lastProcessedOffset);
reportFailure.accept(new TooManyMessagesWithoutAckException(
topicPartition,
received.offset(),
elapsedSeconds,
lag,
lastProcessedOffset), true);
}
}
return -1;
}

private boolean isStillAssigned() {
Expand Down
Loading