Skip to content

Commit 66401f2

Browse files
authored
RATIS-2387. Performance degradation after RATIS-2235 (#1337)
1 parent 2e90e01 commit 66401f2

3 files changed

Lines changed: 37 additions & 21 deletions

File tree

ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,16 @@ static void setReadLockEnabled(RaftProperties properties, boolean readLockEnable
440440
setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, readLockEnabled);
441441
}
442442

443+
String APPEND_ENTRIES_COMPOSE_ENABLED_KEY = PREFIX + ".append-entries.compose.enabled";
444+
boolean APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT = true;
445+
static boolean appendEntriesComposeEnabled(RaftProperties properties) {
446+
return getBoolean(properties::getBoolean,
447+
APPEND_ENTRIES_COMPOSE_ENABLED_KEY, APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT, getDefaultLog());
448+
}
449+
static void setAppendEntriesComposeEnabled(RaftProperties properties, boolean enabled) {
450+
setBoolean(properties::setBoolean, APPEND_ENTRIES_COMPOSE_ENABLED_KEY, enabled);
451+
}
452+
443453
/**
444454
* Besides the open segment, the max number of segments caching log entries.
445455
*/

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import org.apache.ratis.server.RaftServerRpc;
8282
import org.apache.ratis.server.impl.LeaderElection.Phase;
8383
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
84-
import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
8584
import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
8685
import org.apache.ratis.server.leader.LeaderState.StepDownReason;
8786
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -133,7 +132,6 @@
133132
import java.util.concurrent.ThreadLocalRandom;
134133
import java.util.concurrent.TimeUnit;
135134
import java.util.concurrent.atomic.AtomicBoolean;
136-
import java.util.concurrent.atomic.AtomicReference;
137135
import java.util.function.Function;
138136
import java.util.function.Supplier;
139137
import java.util.stream.Collectors;
@@ -260,8 +258,7 @@ public long[] getFollowerMatchIndices() {
260258
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
261259
private final ThreadGroup threadGroup;
262260

263-
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
264-
private final NavigableIndices appendLogTermIndices = new NavigableIndices();
261+
private final NavigableIndices appendLogTermIndices;
265262

266263
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
267264
throws IOException {
@@ -296,7 +293,8 @@ public long[] getFollowerMatchIndices() {
296293
this.transferLeadership = new TransferLeadership(this, properties);
297294
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
298295
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
299-
this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
296+
this.appendLogTermIndices = RaftServerConfigKeys.Log.appendEntriesComposeEnabled(properties) ?
297+
new NavigableIndices() : null;
300298

301299
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
302300
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1620,7 +1618,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16201618
state.updateConfiguration(entries);
16211619
}
16221620
future.join();
1623-
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
1621+
final CompletableFuture<Void> appendFuture = entries.isEmpty()? CompletableFuture.completedFuture(null)
1622+
: appendLogTermIndices != null ? appendLogTermIndices.append(entries, this::appendLog)
16241623
: appendLog(entries);
16251624

16261625
proto.getCommitInfosList().forEach(commitInfoCache::update);
@@ -1636,7 +1635,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16361635

16371636
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16381637
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
1639-
return appendLog.whenCompleteAsync((r, t) -> {
1638+
return appendFuture.whenCompleteAsync((r, t) -> {
16401639
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
16411640
timer.stop();
16421641
}, getServerExecutor()).thenApply(v -> {
@@ -1654,16 +1653,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16541653
});
16551654
}
16561655
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
1657-
final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries);
1658-
if (!appendLogTermIndices.append(entriesTermIndices)) {
1659-
// index already exists, return the last future
1660-
return appendLogFuture.get();
1661-
}
1662-
1663-
1664-
return appendLogFuture.updateAndGet(f -> f.thenComposeAsync(
1665-
ignored -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor))
1666-
.whenComplete((v, e) -> appendLogTermIndices.removeExisting(entriesTermIndices));
1656+
return CompletableFuture.completedFuture(null)
1657+
.thenComposeAsync(dummy -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor);
16671658
}
16681659

16691660
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
@@ -1690,7 +1681,9 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
16901681
}
16911682

16921683
// Check if "previous" is contained in current state.
1693-
if (previous != null && !(appendLogTermIndices.contains(previous) || state.containsTermIndex(previous))) {
1684+
if (previous != null
1685+
&& !(appendLogTermIndices != null && appendLogTermIndices.contains(previous))
1686+
&& !state.containsTermIndex(previous)) {
16941687
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
16951688
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
16961689
return replyNextIndex;

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
import java.util.NavigableMap;
4848
import java.util.TreeMap;
4949
import java.util.Objects;
50+
import java.util.concurrent.CompletableFuture;
5051
import java.util.concurrent.TimeUnit;
52+
import java.util.concurrent.atomic.AtomicReference;
53+
import java.util.function.Function;
5154

5255
/** Server utilities for internal use. */
5356
public final class ServerImplUtils {
@@ -119,6 +122,8 @@ Long getTerm(long index) {
119122
/** A data structure to support the {@link #contains(TermIndex)} method. */
120123
static class NavigableIndices {
121124
private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
125+
private final AtomicReference<CompletableFuture<Void>> future
126+
= new AtomicReference<>(CompletableFuture.completedFuture(null));
122127

123128
boolean contains(TermIndex ti) {
124129
final Long term = getTerm(ti.getIndex());
@@ -137,7 +142,15 @@ synchronized Long getTerm(long index) {
137142
return floorEntry.getValue().getTerm(index);
138143
}
139144

140-
synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
145+
CompletableFuture<Void> append(List<LogEntryProto> entries,
146+
Function<List<LogEntryProto>, CompletableFuture<Void>> appendLog) {
147+
final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries);
148+
return alreadyExists(entriesTermIndices) ? future.get()
149+
: future.updateAndGet(f -> f.thenComposeAsync(ignored -> appendLog.apply(entries)))
150+
.whenComplete((v, e) -> removeExisting(entriesTermIndices));
151+
}
152+
153+
private synchronized boolean alreadyExists(List<ConsecutiveIndices> entriesTermIndices) {
141154
for(int i = 0; i < entriesTermIndices.size(); i++) {
142155
final ConsecutiveIndices indices = entriesTermIndices.get(i);
143156
final ConsecutiveIndices previous = map.put(indices.startIndex, indices);
@@ -147,10 +160,10 @@ synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
147160
for(int j = 0; j < i; j++) {
148161
map.remove(entriesTermIndices.get(j).startIndex);
149162
}
150-
return false;
163+
return true;
151164
}
152165
}
153-
return true;
166+
return false;
154167
}
155168

156169
synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) {

0 commit comments

Comments
 (0)