Skip to content

Commit 5614f9c

Browse files
darinspiveymerlimat
authored andcommitted
[fix][broker] Race condition causes perpetual backlog on internal topics (#25572)
Signed-off-by: Darin Spivey <1874788+darinspivey@users.noreply.github.com> Co-authored-by: Darin Spivey <1874788+darinspivey@users.noreply.github.com> (cherry picked from commit 60efea1)
1 parent 0aa23bb commit 5614f9c

2 files changed

Lines changed: 86 additions & 1 deletion

File tree

pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ScheduledExecutorService;
3030
import java.util.concurrent.Semaphore;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.function.BiFunction;
3233
import java.util.function.BiPredicate;
3334
import org.apache.bookkeeper.client.BKException;
3435
import org.apache.bookkeeper.client.BookKeeper;
@@ -38,13 +39,15 @@
3839
import org.apache.pulsar.broker.ServiceConfiguration;
3940
import org.apache.pulsar.client.api.MessageId;
4041
import org.apache.pulsar.client.api.PulsarClient;
42+
import org.apache.pulsar.client.api.PulsarClientException;
4143
import org.apache.pulsar.client.api.RawMessage;
4244
import org.apache.pulsar.client.api.RawReader;
4345
import org.apache.pulsar.client.impl.MessageIdImpl;
4446
import org.apache.pulsar.client.impl.RawBatchConverter;
4547
import org.apache.pulsar.common.api.proto.MessageMetadata;
4648
import org.apache.pulsar.common.protocol.Commands;
4749
import org.apache.pulsar.common.protocol.Markers;
50+
import org.apache.pulsar.common.util.Backoff;
4851
import org.apache.pulsar.common.util.FutureUtil;
4952
import org.slf4j.Logger;
5053
import org.slf4j.LoggerFactory;
@@ -62,6 +65,9 @@ public abstract class AbstractTwoPhaseCompactor<T> extends Compactor {
6265

6366
@VisibleForTesting
6467
static Runnable injectionAfterSeekInPhaseTwo = () -> {};
68+
@VisibleForTesting
69+
static BiFunction<RawReader, MessageId, CompletableFuture<Void>> injectionPhaseTwoSeek =
70+
RawReader::seekAsync;
6571
private static final Logger log = LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
6672
protected static final int MAX_OUTSTANDING = 500;
6773
protected final Duration phaseOneLoopReadTimeout;
@@ -190,7 +196,7 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId
190196
LedgerHandle ledger) {
191197
CompletableFuture<Long> promise = new CompletableFuture<>();
192198

193-
reader.seekAsync(from).thenCompose((v) -> {
199+
phaseTwoSeekWithRetry(reader, from).thenCompose((v) -> {
194200
injectionAfterSeekInPhaseTwo.run();
195201
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
196202
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
@@ -215,6 +221,53 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId
215221
return promise;
216222
}
217223

224+
/**
225+
* Seek the compaction subscription to {@code from}, retrying on transient
226+
* {@link PulsarClientException.ConnectException}.
227+
*
228+
* <p>Server-side, {@code PersistentSubscription.resetCursorInternal} disconnects the compaction
229+
* consumer before resetting the managed cursor, then sends the success response. This races with
230+
* the client: {@code channelInactive} fires on the consumer's {@code ClientCnx} and fails the
231+
* in-flight seek future with {@code ConnectException} before the broker's success response
232+
* arrives. The seek is idempotent and the cursor is already repositioned server-side, so
233+
* retrying after a short backoff lets the client reconnect and the next seek complete normally.
234+
* Non-transient failures propagate immediately.
235+
*/
236+
private CompletableFuture<Void> phaseTwoSeekWithRetry(RawReader reader, MessageId from) {
237+
CompletableFuture<Void> promise = new CompletableFuture<>();
238+
Backoff backoff = Backoff.builder()
239+
.initialDelay(Duration.ofMillis(100))
240+
.maxBackoff(Duration.ofSeconds(1))
241+
.mandatoryStop(Duration.ofSeconds(10))
242+
.build();
243+
attemptPhaseTwoSeek(reader, from, backoff, promise);
244+
return promise;
245+
}
246+
247+
private void attemptPhaseTwoSeek(RawReader reader, MessageId from, Backoff backoff,
248+
CompletableFuture<Void> promise) {
249+
injectionPhaseTwoSeek.apply(reader, from).whenComplete((v, ex) -> {
250+
if (ex == null) {
251+
promise.complete(null);
252+
return;
253+
}
254+
Throwable cause = FutureUtil.unwrapCompletionException(ex);
255+
if (!(cause instanceof PulsarClientException.ConnectException)) {
256+
promise.completeExceptionally(cause);
257+
return;
258+
}
259+
long nextMs = backoff.next().toMillis();
260+
if (backoff.isMandatoryStopMade()) {
261+
promise.completeExceptionally(cause);
262+
return;
263+
}
264+
log.warn("Phase two seek failed transiently, will retry. topic={} from={} nextMs={}",
265+
reader.getTopic(), from, nextMs, cause);
266+
scheduler.schedule(() -> attemptPhaseTwoSeek(reader, from, backoff, promise),
267+
nextMs, TimeUnit.MILLISECONDS);
268+
});
269+
}
270+
218271
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
219272
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
220273
MessageId lastCompactedMessageId) {

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.concurrent.Executors;
5757
import java.util.concurrent.ScheduledExecutorService;
5858
import java.util.concurrent.TimeUnit;
59+
import java.util.concurrent.atomic.AtomicInteger;
5960
import java.util.concurrent.atomic.AtomicLong;
6061
import java.util.function.BiConsumer;
6162
import lombok.Cleanup;
@@ -91,6 +92,7 @@
9192
import org.apache.pulsar.client.api.ProducerBuilder;
9293
import org.apache.pulsar.client.api.PulsarClient;
9394
import org.apache.pulsar.client.api.PulsarClientException;
95+
import org.apache.pulsar.client.api.RawReader;
9496
import org.apache.pulsar.client.api.Reader;
9597
import org.apache.pulsar.client.api.Schema;
9698
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -161,6 +163,7 @@ public void cleanup() throws Exception {
161163
public void beforeMethod() throws Exception {
162164
admin.namespaces().removeRetention("my-tenant/my-ns");
163165
AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
166+
AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
164167
}
165168

166169
protected long compact(String topic) throws ExecutionException, InterruptedException {
@@ -2518,6 +2521,35 @@ public void testPhaseTwoInterruption() throws Exception {
25182521
"value-2", "key-2", "value-0", "key-2", "value-1"));
25192522
}
25202523

2524+
@Test
2525+
public void testPhaseTwoSeekRetriesOnConnectException() throws Exception {
2526+
final var topic = "persistent://my-tenant/my-ns/phase-two-seek-retry";
2527+
@Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
2528+
producer.newMessage().key("k").value("v0").send();
2529+
producer.newMessage().key("k").value("v1").send();
2530+
2531+
// Simulate the production race: PersistentSubscription.resetCursorInternal disconnects the
2532+
// consumer before responding to the seek, which fails the client's in-flight seek future
2533+
// with ConnectException. The first attempt here returns a synthetic failure without invoking
2534+
// the real seek (so nothing is in flight on the underlying ConsumerImpl); the retry calls
2535+
// seekAsync for real and the compaction proceeds.
2536+
final var attempts = new AtomicInteger(0);
2537+
AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = (reader, msgId) -> {
2538+
if (attempts.getAndIncrement() == 0) {
2539+
return FutureUtil.failedFuture(
2540+
new PulsarClientException.ConnectException("simulated disconnect during seek"));
2541+
}
2542+
return reader.seekAsync(msgId);
2543+
};
2544+
2545+
final long compactedLedgerId = compact(topic);
2546+
assertNotEquals(compactedLedgerId, -1L);
2547+
assertTrue(attempts.get() >= 2,
2548+
"Seek should have been retried at least once, attempts=" + attempts.get());
2549+
2550+
verifyReadKeyValues(topic, true, List.of("k", "v1"));
2551+
}
2552+
25212553
private void verifyReadKeyValues(String topic, boolean readCompacted, List<String> expectedKeyValues)
25222554
throws Exception {
25232555
@Cleanup final var reader = pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)

0 commit comments

Comments
 (0)