Skip to content

Commit ce04f2c

Browse files
committed
Fix AggregatorFunctionConfiguration for Flux race condition
Defer consumption (subscription) in the `aggregatorFunction()` from the input channel until request is done in the output channel. This way we ensure that output `FluxMessageChannel` got subscribers before `subscribeTo()` starts to emit data
1 parent 8cf5da2 commit ce04f2c

1 file changed

Lines changed: 1 addition & 3 deletions

File tree

function/spring-aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ public Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction(
7979
.map((inputMessage) -> MessageBuilder.fromMessage(inputMessage).removeHeader("kafka_consumer").build())
8080
.delaySubscription(Duration.ZERO);
8181

82-
aggregatorInputChannel.subscribeTo(messageFlux);
83-
84-
return Flux.from(this.outputChannel);
82+
return Flux.from(this.outputChannel).doOnRequest((__) -> aggregatorInputChannel.subscribeTo(messageFlux));
8583
};
8684
}
8785

0 commit comments

Comments
 (0)