diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 84e936256a868..d09cb9f70bccb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.PulsarServerException; @@ -256,7 +258,12 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli headersAndPayload.retain(); - CompletableFuture schemaFuture = getSchemaInfo(msg); + CompletableFuture schemaFuture; + try { + schemaFuture = getSchemaInfo(msg); + } catch (ExecutionException e) { + schemaFuture = CompletableFuture.failedFuture(e); + } if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) { /** * Skip in flight reading tasks. @@ -279,11 +286,18 @@ protected boolean replicateEntries(List entries, final InFlightTask inFli schemaFuture.whenComplete((__, e) -> { if (e != null) { log.warn() + .attr("backoffMs", PersistentTopic.MESSAGE_RATE_BACKOFF_MS) .exception(e) .log("Failed to get schema from local cluster, will try in the next loop"); + topic.getBrokerService().executor().schedule(() -> { + log.info("Resume the data replication after the schema fetching done"); + doRewindCursor(true); + }, + PersistentTopic.MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); + } else { + log.info("Resume the data replication after the schema fetching done"); + doRewindCursor(true); } - log.info("Resume the data replication after the schema fetching done"); - doRewindCursor(true); }); } else { msg.setSchemaInfoForReplicator(schemaFuture.get()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index e3d2ec7115006..2a9de2d4c5f2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -433,6 +433,10 @@ public void readEntriesComplete(List entries, Object ctx) { .attr("atLeastOneMessageSentForReplication", atLeastOneMessageSentForReplication) .attr("isWritable", isWritable()) .log("Pausing replication traffic"); + } else if (waitForCursorRewindingRefCnf > 0) { + log.debug() + .attr("reason", reasonOfWaitForCursorRewinding) + .log("Skipping read while waiting for cursor rewind"); } else { readMoreEntries(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicatorTest.java new file mode 100644 index 0000000000000..731c6dde77866 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicatorTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Answers.RETURNS_SELF; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.testng.annotations.Test; + +@Test(groups = "broker-replication") +public class GeoPersistentReplicatorTest { + + @Test + public void testSchemaInfoSynchronousFailureSkipsOuterReadUntilScheduledCursorRewind() throws Exception { + ThrowingSchemaReplicator replicator = new ThrowingSchemaReplicator(); + replicator.forceStarted(); + + Position firstPosition = PositionFactory.create(1, 2); + ByteBuf headersAndPayload = newMessageWithSchemaVersion(); + Entry firstEntry = newEntry(firstPosition, headersAndPayload); + Entry secondEntry = mock(Entry.class); + + List entries = List.of(firstEntry, secondEntry); + InFlightTask inFlightTask = new InFlightTask(firstPosition, entries.size(), replicator.getReplicatorId()); + + try { + replicator.readEntriesComplete(entries, inFlightTask); + + assertThat(inFlightTask.getCompletedEntries()) + .as("the failed entry and skipped remaining entries should release their in-flight permits") + .isEqualTo(entries.size()); + verify(firstEntry).release(); + verify(secondEntry).release(); + verify(replicator.cursor, never()).rewind(); + verify(replicator.context.executor).schedule(any(Runnable.class), + eq((long) PersistentTopic.MESSAGE_RATE_BACKOFF_MS), eq(TimeUnit.MILLISECONDS)); + + Runnable scheduledRewind = replicator.context.scheduledTask.get(); + assertThat(scheduledRewind).isNotNull(); + assertThat(replicator.readMoreEntriesCalls).isZero(); + + scheduledRewind.run(); + verify(replicator.cursor).rewind(); + assertThat(replicator.readMoreEntriesCalls).isEqualTo(1); + } finally { + while (headersAndPayload.refCnt() > 0) { + headersAndPayload.release(); + } + } + } + + private static Entry newEntry(Position position, ByteBuf headersAndPayload) { + Entry entry = mock(Entry.class); + when(entry.getLength()).thenReturn(headersAndPayload.readableBytes()); + when(entry.getDataBuffer()).thenReturn(headersAndPayload); + when(entry.getPosition()).thenReturn(position); + when(entry.getLedgerId()).thenReturn(position.getLedgerId()); + when(entry.getEntryId()).thenReturn(position.getEntryId()); + doAnswer(invocation -> { + headersAndPayload.release(); + return null; + }).when(entry).release(); + return entry; + } + + private static ByteBuf newMessageWithSchemaVersion() { + MessageMetadata metadata = new MessageMetadata() + .setProducerName("producer") + .setSequenceId(1) + .setPublishTime(System.currentTimeMillis()) + .setSchemaVersion(new byte[] { 1 }); + ByteBuf payload = Unpooled.wrappedBuffer(new byte[] { 1 }); + try { + return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata, payload); + } finally { + payload.release(); + } + } + + private static class ThrowingSchemaReplicator extends GeoPersistentReplicator { + + private final ReplicatorContext context; + private int readMoreEntriesCalls; + + ThrowingSchemaReplicator() throws PulsarServerException { + this(new ReplicatorContext()); + } + + private ThrowingSchemaReplicator(ReplicatorContext context) + throws PulsarServerException { + super(context.topic, context.cursor, "local", "remote", context.brokerService, + context.replicationClient, context.replicationAdmin); + this.context = context; + } + + @Override + protected void startProducer() { + // Avoid creating a real remote producer from the superclass constructor. + } + + @Override + protected CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { + throw new ExecutionException(new RuntimeException("injected schema provider failure")); + } + + @Override + protected void readMoreEntries() { + readMoreEntriesCalls++; + } + + void forceStarted() { + STATE_UPDATER.set(this, State.Started); + this.producer = mock(ProducerImpl.class); + } + + private static PersistentTopic mockTopic(BrokerService brokerService) { + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + when(topic.getBrokerService()).thenReturn(brokerService); + when(topic.getReplicatorPrefix()).thenReturn("pulsar.repl"); + when(topic.getReplicatorDispatchRate()).thenReturn(null); + return topic; + } + + private static ManagedCursor mockCursor() { + ManagedCursor cursor = mock(ManagedCursor.class); + when(cursor.getName()).thenReturn("pulsar.repl.remote"); + return cursor; + } + + private static BrokerService mockBrokerService(EventLoopGroup executor, + AtomicReference scheduledTask) + throws PulsarServerException { + ServiceConfiguration config = new ServiceConfiguration(); + PulsarService pulsar = mock(PulsarService.class); + BrokerService brokerService = mock(BrokerService.class); + PulsarClientImpl localClient = mock(PulsarClientImpl.class); + PulsarAdmin admin = mock(PulsarAdmin.class); + + when(pulsar.getConfiguration()).thenReturn(config); + when(pulsar.getConfig()).thenReturn(config); + when(pulsar.getClient()).thenReturn(localClient); + when(pulsar.getAdminClient()).thenReturn(admin); + when(brokerService.pulsar()).thenReturn(pulsar); + when(brokerService.getPulsar()).thenReturn(pulsar); + when(brokerService.executor()).thenReturn(executor); + doAnswer(invocation -> { + scheduledTask.set(invocation.getArgument(0, Runnable.class)); + return null; + }).when(executor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + return brokerService; + } + + @SuppressWarnings("unchecked") + private static PulsarClientImpl mockReplicationClient() { + PulsarClientImpl replicationClient = mock(PulsarClientImpl.class); + ProducerBuilder producerBuilder = mock(ProducerBuilder.class, RETURNS_SELF); + when(replicationClient.newProducer(any(Schema.class))).thenReturn(producerBuilder); + return replicationClient; + } + } + + private static class ReplicatorContext { + private final AtomicReference scheduledTask; + private final EventLoopGroup executor; + private final BrokerService brokerService; + private final PersistentTopic topic; + private final ManagedCursor cursor; + private final PulsarClientImpl replicationClient; + private final PulsarAdmin replicationAdmin; + + private ReplicatorContext() throws PulsarServerException { + this.scheduledTask = new AtomicReference<>(); + this.executor = mock(EventLoopGroup.class); + this.brokerService = ThrowingSchemaReplicator.mockBrokerService(executor, scheduledTask); + this.topic = ThrowingSchemaReplicator.mockTopic(brokerService); + this.cursor = ThrowingSchemaReplicator.mockCursor(); + this.replicationClient = ThrowingSchemaReplicator.mockReplicationClient(); + this.replicationAdmin = mock(PulsarAdmin.class); + } + } +}