Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -256,7 +258,12 @@ protected boolean replicateEntries(List<Entry> entries, final InFlightTask inFli

headersAndPayload.retain();

CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
CompletableFuture<SchemaInfo> schemaFuture;
try {
schemaFuture = getSchemaInfo(msg);
} catch (ExecutionException e) {
schemaFuture = CompletableFuture.failedFuture(e);
}
if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
/**
* Skip in flight reading tasks.
Expand All @@ -279,11 +286,18 @@ protected boolean replicateEntries(List<Entry> entries, final InFlightTask inFli
schemaFuture.whenComplete((__, e) -> {
if (e != null) {
log.warn()
.attr("backoffMs", PersistentTopic.MESSAGE_RATE_BACKOFF_MS)
.exception(e)
.log("Failed to get schema from local cluster, will try in the next loop");
topic.getBrokerService().executor().schedule(() -> {
log.info("Resume the data replication after the schema fetching done");
doRewindCursor(true);
},
PersistentTopic.MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
} else {
log.info("Resume the data replication after the schema fetching done");
doRewindCursor(true);
}
log.info("Resume the data replication after the schema fetching done");
doRewindCursor(true);
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
.attr("atLeastOneMessageSentForReplication", atLeastOneMessageSentForReplication)
.attr("isWritable", isWritable())
.log("Pausing replication traffic");
} else if (waitForCursorRewindingRefCnf > 0) {
log.debug()
.attr("reason", reasonOfWaitForCursorRewinding)
.log("Skipping read while waiting for cursor rewind");
} else {
readMoreEntries();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Answers.RETURNS_SELF;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.testng.annotations.Test;

@Test(groups = "broker-replication")
public class GeoPersistentReplicatorTest {

@Test
public void testSchemaInfoSynchronousFailureSkipsOuterReadUntilScheduledCursorRewind() throws Exception {
ThrowingSchemaReplicator replicator = new ThrowingSchemaReplicator();
replicator.forceStarted();

Position firstPosition = PositionFactory.create(1, 2);
ByteBuf headersAndPayload = newMessageWithSchemaVersion();
Entry firstEntry = newEntry(firstPosition, headersAndPayload);
Entry secondEntry = mock(Entry.class);

List<Entry> entries = List.of(firstEntry, secondEntry);
InFlightTask inFlightTask = new InFlightTask(firstPosition, entries.size(), replicator.getReplicatorId());

try {
replicator.readEntriesComplete(entries, inFlightTask);

assertThat(inFlightTask.getCompletedEntries())
.as("the failed entry and skipped remaining entries should release their in-flight permits")
.isEqualTo(entries.size());
verify(firstEntry).release();
verify(secondEntry).release();
verify(replicator.cursor, never()).rewind();
verify(replicator.context.executor).schedule(any(Runnable.class),
eq((long) PersistentTopic.MESSAGE_RATE_BACKOFF_MS), eq(TimeUnit.MILLISECONDS));

Runnable scheduledRewind = replicator.context.scheduledTask.get();
assertThat(scheduledRewind).isNotNull();
assertThat(replicator.readMoreEntriesCalls).isZero();

scheduledRewind.run();
verify(replicator.cursor).rewind();
assertThat(replicator.readMoreEntriesCalls).isEqualTo(1);
} finally {
while (headersAndPayload.refCnt() > 0) {
headersAndPayload.release();
}
}
}

private static Entry newEntry(Position position, ByteBuf headersAndPayload) {
Entry entry = mock(Entry.class);
when(entry.getLength()).thenReturn(headersAndPayload.readableBytes());
when(entry.getDataBuffer()).thenReturn(headersAndPayload);
when(entry.getPosition()).thenReturn(position);
when(entry.getLedgerId()).thenReturn(position.getLedgerId());
when(entry.getEntryId()).thenReturn(position.getEntryId());
doAnswer(invocation -> {
headersAndPayload.release();
return null;
}).when(entry).release();
return entry;
}

private static ByteBuf newMessageWithSchemaVersion() {
MessageMetadata metadata = new MessageMetadata()
.setProducerName("producer")
.setSequenceId(1)
.setPublishTime(System.currentTimeMillis())
.setSchemaVersion(new byte[] { 1 });
ByteBuf payload = Unpooled.wrappedBuffer(new byte[] { 1 });
try {
return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata, payload);
} finally {
payload.release();
}
}

private static class ThrowingSchemaReplicator extends GeoPersistentReplicator {

private final ReplicatorContext context;
private int readMoreEntriesCalls;

ThrowingSchemaReplicator() throws PulsarServerException {
this(new ReplicatorContext());
}

private ThrowingSchemaReplicator(ReplicatorContext context)
throws PulsarServerException {
super(context.topic, context.cursor, "local", "remote", context.brokerService,
context.replicationClient, context.replicationAdmin);
this.context = context;
}

@Override
protected void startProducer() {
// Avoid creating a real remote producer from the superclass constructor.
}

@Override
protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
throw new ExecutionException(new RuntimeException("injected schema provider failure"));
}

@Override
protected void readMoreEntries() {
readMoreEntriesCalls++;
}

void forceStarted() {
STATE_UPDATER.set(this, State.Started);
this.producer = mock(ProducerImpl.class);
}

private static PersistentTopic mockTopic(BrokerService brokerService) {
PersistentTopic topic = mock(PersistentTopic.class);
when(topic.getName()).thenReturn("persistent://public/default/t1");
when(topic.getBrokerService()).thenReturn(brokerService);
when(topic.getReplicatorPrefix()).thenReturn("pulsar.repl");
when(topic.getReplicatorDispatchRate()).thenReturn(null);
return topic;
}

private static ManagedCursor mockCursor() {
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("pulsar.repl.remote");
return cursor;
}

private static BrokerService mockBrokerService(EventLoopGroup executor,
AtomicReference<Runnable> scheduledTask)
throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
PulsarService pulsar = mock(PulsarService.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarClientImpl localClient = mock(PulsarClientImpl.class);
PulsarAdmin admin = mock(PulsarAdmin.class);

when(pulsar.getConfiguration()).thenReturn(config);
when(pulsar.getConfig()).thenReturn(config);
when(pulsar.getClient()).thenReturn(localClient);
when(pulsar.getAdminClient()).thenReturn(admin);
when(brokerService.pulsar()).thenReturn(pulsar);
when(brokerService.getPulsar()).thenReturn(pulsar);
when(brokerService.executor()).thenReturn(executor);
doAnswer(invocation -> {
scheduledTask.set(invocation.getArgument(0, Runnable.class));
return null;
}).when(executor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
return brokerService;
}

@SuppressWarnings("unchecked")
private static PulsarClientImpl mockReplicationClient() {
PulsarClientImpl replicationClient = mock(PulsarClientImpl.class);
ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class, RETURNS_SELF);
when(replicationClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
return replicationClient;
}
}

private static class ReplicatorContext {
private final AtomicReference<Runnable> scheduledTask;
private final EventLoopGroup executor;
private final BrokerService brokerService;
private final PersistentTopic topic;
private final ManagedCursor cursor;
private final PulsarClientImpl replicationClient;
private final PulsarAdmin replicationAdmin;

private ReplicatorContext() throws PulsarServerException {
this.scheduledTask = new AtomicReference<>();
this.executor = mock(EventLoopGroup.class);
this.brokerService = ThrowingSchemaReplicator.mockBrokerService(executor, scheduledTask);
this.topic = ThrowingSchemaReplicator.mockTopic(brokerService);
this.cursor = ThrowingSchemaReplicator.mockCursor();
this.replicationClient = ThrowingSchemaReplicator.mockReplicationClient();
this.replicationAdmin = mock(PulsarAdmin.class);
}
}
}