Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,29 @@ private void logInternal(
return;
}

String message = messageSupplier.get();

var msgKey = new LogThrottleKey(throwable, throttleKey == null ? message : throttleKey);
String message = null;
LogThrottleKey msgKey = null;

while (true) {
if (msgKey == null) {
if (throttleKey == null) {
message = messageSupplier.get();
msgKey = new LogThrottleKey(throwable, message);
} else {
msgKey = new LogThrottleKey(throwable, throttleKey);
}
}

Long loggedTs = messagesMap.get(msgKey);

long curTs = FastTimestamps.coarseCurrentTimeMillis();

if (loggedTs == null || curTs - loggedTs >= throttleIntervalMs) {
if (replace(msgKey, loggedTs, curTs)) {
if (message == null) {
message = messageSupplier.get();
}

if (throwable == null) {
delegate.log(level, message);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,17 @@ boolean updateKnownLeaderAndTerm(@Nullable Peer leader, long term) {

synchronized (mutex) {
if (stopped) {
LOG.debug("Ignoring leader update after stop [leader={}, term={}]", leader, term);
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring leader update after stop [leader={}, term={}]", leader, term);
}
return false;
}

// Ignore stale term notifications.
if (term <= currentTerm) {
LOG.debug("Ignoring stale leader [newTerm={}, currentTerm={}]", term, currentTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring stale leader [newTerm={}, currentTerm={}]", term, currentTerm);
}
return false;
}

Expand All @@ -147,8 +151,10 @@ boolean updateKnownLeaderAndTerm(@Nullable Peer leader, long term) {
futureToComplete = waiters;
}

LOG.debug("Leader updated [leader={}, term={}, previousTerm={}, stateChange={}->{}]",
leader, term, previousTerm, previousState, currentState);
if (LOG.isDebugEnabled()) {
LOG.debug("Leader updated [leader={}, term={}, previousTerm={}, stateChange={}->{}]",
leader, term, previousTerm, previousState, currentState);
}
}

// Complete outside the lock to avoid potential deadlocks with future callbacks.
Expand Down Expand Up @@ -212,8 +218,10 @@ void resetLeaderState() {
currentState = State.WAITING_FOR_LEADER;
waiters = new CompletableFuture<>();

LOG.debug("Leader state reset [previousTerm={}, stateChange={}->{}]",
previousTerm, previousState, currentState);
if (LOG.isDebugEnabled()) {
LOG.debug("Leader state reset [previousTerm={}, stateChange={}->{}]",
previousTerm, previousState, currentState);
}
}
}

Expand Down Expand Up @@ -243,8 +251,10 @@ void onGroupUnavailable(long termWhenDetected) {
currentState = State.WAITING_FOR_LEADER;
waiters = new CompletableFuture<>();

LOG.debug("Group unavailable [term={}, stateChange={}->{}]",
termWhenDetected, previousState, currentState);
if (LOG.isDebugEnabled()) {
LOG.debug("Group unavailable [term={}, stateChange={}->{}]",
termWhenDetected, previousState, currentState);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ private void requestLeaderManually(
}

if (leaderWithTerm == null || leaderWithTerm.leader() == null) {
LOG.debug("No leader information available [grp={}].", groupId());
if (LOG.isDebugEnabled()) {
LOG.debug("No leader information available [grp={}].", groupId());
}
return;
}

Expand Down Expand Up @@ -309,11 +311,17 @@ private void finishSubscriptions() {
CompletableFutures.allOf(futures).get(SUBSCRIPTION_CLEANUP_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Interrupted while waiting for subscription cleanup [grp={}].", groupId);
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting for subscription cleanup [grp={}].", groupId);
}
} catch (ExecutionException e) {
LOG.debug("Error during subscription cleanup [grp={}].", groupId, e.getCause());
if (LOG.isDebugEnabled()) {
LOG.debug("Error during subscription cleanup [grp={}].", groupId, e.getCause());
}
} catch (TimeoutException e) {
LOG.debug("Timeout waiting for subscription cleanup [grp={}].", groupId);
if (LOG.isDebugEnabled()) {
LOG.debug("Timeout waiting for subscription cleanup [grp={}].", groupId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,9 @@ private <R extends NetworkMessage> void sendWithRetryWaitingForLeader(
// Infinite wait mode: start a new retry phase.
// We may not have probed all peers yet (e.g., with many peers and short retry timeout),
// so we can't assume there's no leader. Start fresh and keep trying.
LOG.debug("Retry phase timeout expired with infinite deadline, starting new retry phase [groupId={}]", groupId);
if (LOG.isDebugEnabled()) {
LOG.debug("Retry phase timeout expired with infinite deadline, starting new retry phase [groupId={}]", groupId);
}
Peer initialPeer = resolveInitialPeer(targetStrategy, originalPeer);
if (initialPeer == null) {
fut.completeExceptionally(new ReplicationGroupUnavailableException(groupId));
Expand Down Expand Up @@ -1016,7 +1018,9 @@ public void onAllPeersExhausted() {
return;
}

LOG.debug("All peers exhausted, waiting for leader [groupId={}, term={}]", groupId, termWhenStarted);
if (LOG.isDebugEnabled()) {
LOG.debug("All peers exhausted, waiting for leader [groupId={}, term={}]", groupId, termWhenStarted);
}
leaderAvailabilityState.onGroupUnavailable(termWhenStarted);
waitForLeaderAndRetry(futureInvokeResult, requestFactory, targetSelectionStrategy, originalPeer, deadline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,10 @@ private boolean allowLaunchElection() {
}

if (this.electionTimeoutCounter == 1) {
LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.",
getNodeId());
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.",
getNodeId());
}
return false;
}
}
Expand Down Expand Up @@ -1146,7 +1148,9 @@ public boolean init(final NodeOptions opts) {
}

if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
}
this.snapshotTimer.start();
}

Expand Down Expand Up @@ -1223,7 +1227,9 @@ private boolean initBallotBox() {
long lastCommittedIndex = getLastCommittedIndexOnInit();

ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
LOG.debug("Node {} init ballot box's lastCommittedIndex={}.", getNodeId(), lastCommittedIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} init ballot box's lastCommittedIndex={}.", getNodeId(), lastCommittedIndex);
}
return this.ballotBox.init(ballotBoxOpts);
}

Expand Down Expand Up @@ -1447,15 +1453,19 @@ private void electSelf() {
return;
}
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
}
this.electionTimer.stop();
}
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
this.state = State.STATE_CANDIDATE;
this.currTerm++;
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
}
this.voteTimer.start();
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
electSelfTerm = this.currTerm;
Expand Down Expand Up @@ -1568,15 +1578,19 @@ private void becomeLeader() {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
}
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add a replicator [node={}, peer={}].", getNodeId(), peer);
}
}

// Start learner's replicators
for (final PeerId peer : this.conf.listLearners()) {
LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
}
if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
LOG.error("Fail to add a learner replicator [node={}, peer={}].", getNodeId(), peer);
}
Expand All @@ -1597,8 +1611,10 @@ private void becomeLeader() {

// should be in writeLock
private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
}
if (!this.state.isActive()) {
return;
}
Expand Down Expand Up @@ -1653,7 +1669,9 @@ else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
this.electionTimer.restart();
}
else {
LOG.debug("Node {} is a learner, election timer is not started.", this.getNodeId());
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} is a learner, election timer is not started.", this.getNodeId());
}
}
}

Expand Down Expand Up @@ -1722,7 +1740,9 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
State nodeState = this.state;
if (nodeState != State.STATE_LEADER) {
final Status st = cannotApplyBecauseNotLeaderStatus(nodeState);
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
}
final List<Closure> dones = tasks.stream().map(ele -> ele.done)
.filter(Objects::nonNull).collect(Collectors.toList());
Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
Expand All @@ -1740,8 +1760,10 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
final LogEntryAndClosure task = tasks.get(i);

if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),
task.expectedTerm, this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),
task.expectedTerm, this.currTerm);
}
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Expand Down Expand Up @@ -2807,7 +2829,9 @@ private void onCaughtUp(final PeerId peer, final long term, final long version,
if (st.getCode() == RaftError.ETIMEDOUT.getNumber()
&& Utils.monotonicMs() - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options
.getElectionTimeoutMs()) {
LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer);
}
final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version);
final long dueTime = Utils.nowMs() + this.options.getElectionTimeoutMs();
if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) {
Expand Down Expand Up @@ -2913,8 +2937,10 @@ private void handleStepDownTimeout() {
this.readLock.lock();
try {
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm,
this.state);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm,
this.state);
}
return;
}
final long monotonicNowMs = Utils.monotonicMs();
Expand All @@ -2937,7 +2963,9 @@ private void handleStepDownTimeout() {
this.writeLock.lock();
try {
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
}
return;
}
final long monotonicNowMs = Utils.monotonicMs();
Expand Down Expand Up @@ -3319,8 +3347,10 @@ public void handlePreVoteResponse(final PeerId peerId, final long term, final Re
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.debug("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.term(), response.granted());
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.term(), response.granted());
}
// check granted quorum?
if (response.granted()) {
this.prevVoteCtx.grant(peerId);
Expand Down Expand Up @@ -3368,7 +3398,9 @@ public void run(final Status status) {
private void preVote() {
long preVoteTerm;
try {
LOG.debug("Node {} term {} start preVote.", getNodeId(), this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} term {} start preVote.", getNodeId(), this.currTerm);
}
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn(
"Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
Expand Down Expand Up @@ -3454,7 +3486,9 @@ private void handleVoteTimeout() {
preVote();
}
else {
LOG.debug("Node {} term {} retry to vote self.", getNodeId(), this.currTerm);
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} term {} retry to vote self.", getNodeId(), this.currTerm);
}
// unlock in electSelf
electSelf();
}
Expand Down
Loading