Skip to content

Commit 807cb65

Browse files
committed
Merge branch 'cassandra-6.0' into trunk
2 parents 2b3258f + b1f30e9 commit 807cb65

27 files changed

Lines changed: 1031 additions & 120 deletions

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Allow nodetool garbagecollect to take a user defined list of SSTables (CASSANDRA-16767)
55
* Add a guardrail for misprepared statements (CASSANDRA-21139)
66
Merged from 6.0:
7+
* Move long running TCM operations to a longer timout (CASSANDRA-21453)
78
* Offline nodetool commands should not print network options in help (CASSANDRA-20876)
89
* Defer creation of the system_cluster_metadata keyspace until CMS initialization (CASSANDRA-21477)
910
* Support direct I/O for background SSTable writes (CASSANDRA-21134)

src/java/org/apache/cassandra/config/Config.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ public static Set<String> splitCommaDelimited(String src)
184184

185185
public volatile DurationSpec.LongMillisecondsBound stream_transfer_task_timeout = new DurationSpec.LongMillisecondsBound("12h");
186186

187+
/**
188+
* Timeout for the per-message window when a non-CMS node sends a TCM_COMMIT_REQ to a CMS node.
189+
* The CMS will attempt Paxos retries for (cms_await_timeout - write_request_timeout) before
190+
* returning an explicit failure, giving the sender time to reschedule before its message callback fires.
191+
*
192+
* WARNING: cms_await_timeout should be substantially larger than write_request_timeout.
193+
* A single Paxos CAS attempt can take up to (cas_contention_timeout + write_request_timeout) to
194+
* complete. If cms_await_timeout is set close to write_request_timeout the deadline reduction has
195+
* no effect and many concurrent CMS operations timing out at the same time may create a thundering herd,
196+
* all retrying against the CMS.
197+
* Default 2 minutes to match "nodetool cms initialize".
198+
*/
187199
public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new DurationSpec.LongMillisecondsBound("120000ms");
188200
public volatile int cms_default_max_retries = 10;
189201
@Deprecated(since="6.0")
@@ -193,6 +205,39 @@ public static Set<String> splitCommaDelimited(String src)
193205
public String cms_retry_delay = "50ms*attempts <= 500ms ... 100ms*attempts <= 1s,retries=10";
194206

195207
public volatile CMSCommitMemberPreferencePolicy cms_commit_member_preference_policy = CMSCommitMemberPreferencePolicy.random;
208+
/**
209+
* Controls the sender-side retry behavior for CMS commits (topology changes,
210+
* CMS reconfiguration, node registration — everything except STARTUP and schema DDL
211+
* with an explicit client deadline).
212+
*
213+
* cms_commit_timeout: Overall deadline for the commit to succeed. The sender retries
214+
* with exponential backoff until this deadline expires. Each retry sends a fresh
215+
* TCM_COMMIT_REQ to a (possibly different) CMS node. Set this longer than the
216+
* expected total time for all concurrent operations to drain through the Paxos log.
217+
*
218+
* cms_commit_retry_initial_delay: Base delay for Full Jitter exponential backoff.
219+
* Actual delay per retry = uniform_random(0, min(max_delay, initial_delay * 2^attempt)).
220+
* Higher values reduce Paxos contention at the cost of slower progress when the log
221+
* is lightly loaded. 5s is a good default — it spaces retries enough to avoid
222+
* thundering herd while still making progress within minutes.
223+
*
224+
* cms_commit_retry_max_delay: Cap on the exponential backoff. Once 2^attempt growth
225+
* exceeds this, all subsequent retries draw from uniform_random(0, max_delay).
226+
* 60s keeps retries frequent enough that a freed Paxos slot is filled within
227+
* ~30s on average, while avoiding retry storms.
228+
*
229+
* When to change:
230+
* - If bulk topology ops complete but take too long: reduce max_delay (e.g. 30s)
231+
* to retry more aggressively. Monitor CommitRetries rate for contention impact.
232+
* - If bulk topology ops fail (timeout): increase cms_commit_timeout.
233+
* - If Paxos contention is extremely high (>100 concurrent commits): increase
234+
* initial_delay to 10-15s to spread the retry wavefront.
235+
* - All three are hot-settable via JMX without restart.
236+
*/
237+
public volatile DurationSpec.LongMillisecondsBound cms_commit_timeout = new DurationSpec.LongMillisecondsBound("1h");
238+
public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound("5s");
239+
public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound("60s");
240+
196241
public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100;
197242

198243
/**

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@
125125
import org.apache.cassandra.security.SSLFactory;
126126
import org.apache.cassandra.service.CacheService.CacheType;
127127
import org.apache.cassandra.service.FileSystemOwnershipCheck;
128+
import org.apache.cassandra.service.RetryStrategy;
128129
import org.apache.cassandra.service.StartupChecks;
129130
import org.apache.cassandra.service.StorageService;
131+
import org.apache.cassandra.service.TimeoutStrategy;
130132
import org.apache.cassandra.service.accord.AccordService;
131133
import org.apache.cassandra.service.accord.api.AccordWaitStrategies;
132134
import org.apache.cassandra.service.consensus.TransactionalMode;
@@ -271,6 +273,15 @@ public class DatabaseDescriptor
271273

272274
public static volatile boolean allowUnlimitedConcurrentValidations = ALLOW_UNLIMITED_CONCURRENT_VALIDATIONS.getBoolean();
273275

276+
/**
277+
* RetryStrategy which provides exponential backoff with full jitter, for use by both CMS and non-CMS members
278+
* when submitting a Commit request. The range and increments of the backoff times are defined by
279+
* conf.cms_commit_retry_initial_delay and conf.cms_commit_retry_max_delay. Both are hot properties and so
280+
* changing either one causes this retry strategy to be reconstructed.
281+
*/
282+
private static volatile RetryStrategy cms_commit_retry_strategy;
283+
284+
274285
/**
275286
* The configuration for guardrails.
276287
*/
@@ -586,6 +597,8 @@ private static void applyAll() throws ConfigurationException
586597

587598
applyAccord();
588599

600+
applyCMS();
601+
589602
applyStartupChecks();
590603
}
591604

@@ -1399,6 +1412,25 @@ private static void applyAccord()
13991412
AccordService.applyProtocolModifiers(getAccord());
14001413
}
14011414

1415+
private static void applyCMS()
1416+
{
1417+
try
1418+
{
1419+
long initialDelayMs = conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS);
1420+
long maxDelayMs = conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS);
1421+
// range of backoff wait time starts at 0ms backing off exponentially at initialDelayMs * 2^attempts
1422+
String spec = String.format("0ms ... %dms * 2^attempts <= %dms", initialDelayMs, maxDelayMs);
1423+
logger.debug("Initializing cms_commit_retry_strategy from spec: " + spec);
1424+
cms_commit_retry_strategy = RetryStrategy.parse(spec,
1425+
TimeoutStrategy.LatencySourceFactory.none(),
1426+
RetryStrategy.randomizers.uniform());
1427+
}
1428+
catch (Exception e)
1429+
{
1430+
throw new ConfigurationException("Invalid configuration for cms_commit_retry_strategy. " + e.getMessage(), e);
1431+
}
1432+
}
1433+
14021434
public static StartupChecksConfiguration getStartupChecksConfiguration()
14031435
{
14041436
return startupChecksConfiguration;
@@ -6183,6 +6215,15 @@ public static Config.CMSCommitMemberPreferencePolicy getCmsCommitMemberPreferenc
61836215
return conf.cms_commit_member_preference_policy;
61846216
}
61856217

6218+
public static void setCmsAwaitTimeout(long timeoutInMillis)
6219+
{
6220+
if (timeoutInMillis != conf.cms_await_timeout.to(TimeUnit.MILLISECONDS))
6221+
{
6222+
logger.info("Setting cms_await_timeout to {}ms", timeoutInMillis);
6223+
conf.cms_await_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis);
6224+
}
6225+
}
6226+
61866227
public static void setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy policy)
61876228
{
61886229
conf.cms_commit_member_preference_policy = policy;
@@ -6193,6 +6234,57 @@ public static void setCmsCommitMemberPreferencePolicy(String policy)
61936234
setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy.valueOf(toLowerCaseLocalized(policy)));
61946235
}
61956236

6237+
public static DurationSpec getCmsCommitTimeout()
6238+
{
6239+
return conf.cms_commit_timeout;
6240+
}
6241+
6242+
public static void setCmsCommitTimeout(long timeoutInMillis)
6243+
{
6244+
if (timeoutInMillis != conf.cms_commit_timeout.to(TimeUnit.MILLISECONDS))
6245+
{
6246+
logger.info("Setting cms_commit_timeout to {}ms", timeoutInMillis);
6247+
conf.cms_commit_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis);
6248+
}
6249+
}
6250+
6251+
6252+
6253+
public static DurationSpec getCmsCommitRetryInitialDelay()
6254+
{
6255+
return conf.cms_commit_retry_initial_delay;
6256+
}
6257+
6258+
public static void setCmsCommitRetryInitialDelay(long delayInMillis)
6259+
{
6260+
if (delayInMillis != conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS))
6261+
{
6262+
logger.info("Setting cms_commit_retry_initial_delay to {}ms", delayInMillis);
6263+
conf.cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound(delayInMillis);
6264+
applyCMS();
6265+
}
6266+
}
6267+
6268+
public static DurationSpec getCmsCommitRetryMaxDelay()
6269+
{
6270+
return conf.cms_commit_retry_max_delay;
6271+
}
6272+
6273+
public static void setCmsCommitRetryMaxDelay(long delayInMillis)
6274+
{
6275+
if (delayInMillis != conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS))
6276+
{
6277+
logger.info("Setting cms_commit_retry_max_delay to {}ms", delayInMillis);
6278+
conf.cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound(delayInMillis);
6279+
applyCMS();
6280+
}
6281+
}
6282+
6283+
public static RetryStrategy getCmsCommitRetryStrategy()
6284+
{
6285+
return cms_commit_retry_strategy;
6286+
}
6287+
61966288
public static int getEpochAwareDebounceInFlightTrackerMaxSize()
61976289
{
61986290
return conf.epoch_aware_debounce_inflight_tracker_max_size;

src/java/org/apache/cassandra/net/Verb.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ public enum Verb
301301

302302
// transactional cluster metadata
303303
TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ),
304+
// message timeout is overridden in RemoteProcessor to cmsAwaitTimeout for non-client facing commit requests
304305
TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ),
305306
TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
306307
TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ),

src/java/org/apache/cassandra/service/RetryStrategy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ public long computeWait(int attempt, TimeUnit units)
241241
if (min > maxMinMicros)
242242
min = maxMinMicros;
243243
long max = this.max.getMicros(attempt);
244+
if (max > maxMaxMicros)
245+
max = maxMaxMicros;
244246
result = min >= max ? min : waitRandomizer.wait(min, max, attempt);
245247
}
246248

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1762,19 +1762,6 @@ public String listConsensusMigrations(@Nullable Set<String> keyspaceNames,
17621762
return pojoMapToString(snapshotAsMap, format);
17631763
}
17641764

1765-
@Override
1766-
public String getCmsCommitMemberPreferencePolicy()
1767-
{
1768-
return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name();
1769-
}
1770-
1771-
@Override
1772-
public void setCmsCommitMemberPreferencePolicy(String policy)
1773-
{
1774-
DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy);
1775-
logger.info("Set cms_commit_member_preference_policy to {}", policy);
1776-
}
1777-
17781765
public Map<String,List<Integer>> getConcurrency(List<String> stageNames)
17791766
{
17801767
Stream<Stage> stageStream = stageNames.isEmpty() ? stream(Stage.values()) : stageNames.stream().map(Stage::fromPoolName);

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,18 +1176,6 @@ Integer finishConsensusMigration(@Nonnull String keyspace,
11761176
List<String> getAccordManagedKeyspaces();
11771177
List<String> getAccordManagedTables();
11781178

1179-
/** Get the CMS commit member preference policy
1180-
*
1181-
* @return how to choose the cms member preference order for commits
1182-
*/
1183-
public String getCmsCommitMemberPreferencePolicy();
1184-
1185-
/** Update the CMS commit member preference policy
1186-
*
1187-
* @param policy see Config.CMSCommitMemberPreferencePolicy
1188-
*/
1189-
public void setCmsCommitMemberPreferencePolicy(String policy);
1190-
11911179
/** Gets the concurrency settings for processing stages*/
11921180
static class StageConcurrency implements Serializable
11931181
{

src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import org.apache.cassandra.metrics.TCMMetrics;
2627
import org.apache.cassandra.tcm.log.Entry;
2728
import org.apache.cassandra.tcm.log.LocalLog;
2829
import org.apache.cassandra.tcm.log.LogState;
2930
import org.apache.cassandra.utils.FBUtilities;
3031
import org.apache.cassandra.utils.JVMStabilityInspector;
3132

33+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
3234
import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
3335
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
36+
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
3437

3538
public abstract class AbstractLocalProcessor implements Processor
3639
{
@@ -50,6 +53,9 @@ public AbstractLocalProcessor(LocalLog log)
5053
@Override
5154
public final Commit.Result commit(Entry.Id entryId, Transformation transform, final Epoch lastKnown, Retry retryPolicy)
5255
{
56+
String transformStr = transform.toString(); // convert once as idempotent and used in multiple logs
57+
logger.debug("Starting local commit of {} with policy {}", transformStr, retryPolicy);
58+
long commitStart = nanoTime();
5359
while (!retryPolicy.hasExpired())
5460
{
5561
ClusterMetadata previous = log.waitForHighestConsecutive();
@@ -66,7 +72,7 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
6672
Transformation.Result result;
6773
if (!transform.eligibleToCommit(previous))
6874
{
69-
result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transform +
75+
result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transformStr +
7076
" it not supported with cluster common serialization version " + previous.directory.commonSerializationVersion +
7177
" and min/max serialization versions " + previous.directory.clusterMinVersion + "/" + previous.directory.clusterMaxVersion);
7278
}
@@ -79,29 +85,43 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
7985
// Just try to catch up to the latest distributed state.
8086
if (result.isRejected())
8187
{
82-
ClusterMetadata replayed = fetchLogAndWait(null, retryPolicy);
88+
// Use a dedicated retry policy here as the one for the commit itself may not be appropriate.
89+
// It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not
90+
// what we want here.
91+
Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries);
92+
ClusterMetadata replayed = fetchLogAndWait(null, fetchLogRetry);
8393

8494
// Retry if replay has changed the epoch, return rejection otherwise.
8595
if (!replayed.epoch.isAfter(previous.epoch))
8696
{
97+
logger.info("No epoch change after fetched latest log entries, returning rejection response");
8798
return maybeFailure(entryId,
8899
lastKnown,
89100
() -> Commit.Result.rejected(result.rejected().code, result.rejected().reason, toLogState(lastKnown)));
90101
}
91102

103+
logger.info("Fetched latest log entries after transformation rejection, re-entering " +
104+
"commit retry loop with {}ms remaining until deadline",
105+
NANOSECONDS.toMillis(retryPolicy.remainingNanos()));
92106
continue;
93107
}
94108

95109
try
96110
{
97111
Epoch nextEpoch = result.success().metadata.epoch;
98112
// If metadata applies, try committing it to the log
113+
long casStart = nanoTime();
99114
boolean applied = tryCommitOne(entryId, transform, previous.epoch, nextEpoch);
115+
long casElapsedUs = NANOSECONDS.toMicros(nanoTime() - casStart);
116+
logger.debug("tryCommitOne for {} epoch {}->{}: applied={}, took {}us",
117+
transform.kind(), previous.epoch, nextEpoch, applied, casElapsedUs);
100118

101119
// Application here semantially means "succeeded in committing to the distributed log".
102120
if (applied)
103121
{
104-
logger.info("Committed {}. New epoch is {}", transform, nextEpoch);
122+
logger.info("Committed {}. New epoch is {}. Took {} attempts in {}us total.",
123+
transformStr, nextEpoch, retryPolicy.attempts(),
124+
NANOSECONDS.toMicros(nanoTime() - commitStart));
105125
log.append(new Entry(entryId, nextEpoch, new Transformation.Executed(transform, result)));
106126
log.awaitAtLeast(nextEpoch);
107127

@@ -112,8 +132,16 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
112132
{
113133
if (!retryPolicy.maybeSleep())
114134
break;
135+
136+
logger.info("Backed off after failure to commit to log, fetching latest log entries before retry");
115137
// TODO: could also add epoch from mis-application from [applied].
116-
fetchLogAndWait(null, retryPolicy);
138+
// Use a dedicated retry policy here as the one for the commit itself may not be appropriate.
139+
// It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not
140+
// what we want here.
141+
Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries);
142+
fetchLogAndWait(null, fetchLogRetry);
143+
logger.info("Fetched latest log entries, re-entering commit retry loop with {}ms remaining until deadline",
144+
NANOSECONDS.toMillis(retryPolicy.remainingNanos()));
117145
}
118146
}
119147
catch (Throwable e)
@@ -124,8 +152,12 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
124152
break;
125153
}
126154
}
127-
return Commit.Result.failed(SERVER_ERROR,
128-
String.format("Could not perform commit; policy %s gave up", retryPolicy));
155+
long remainingMillis = NANOSECONDS.toMillis(retryPolicy.remainingNanos());
156+
String failureMsg = String.format("Could not perform commit after %d attempts. Time remaining: %dms",
157+
retryPolicy.attempts(), remainingMillis);
158+
logger.debug("Commit {} failed in {}us total. {}",
159+
transformStr, NANOSECONDS.toMicros(nanoTime() - commitStart), failureMsg);
160+
return Commit.Result.failed(SERVER_ERROR, failureMsg);
129161
}
130162

131163
public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown, Supplier<Commit.Result.Failure> orElse)

0 commit comments

Comments
 (0)