Skip to content

Commit 636c477

Browse files
committed
Paxos support for live migration to mutation tracking
During mutation tracking migration (tracked <-> untracked), the per-range migration state must be consulted for every routing decision. Before this change, all Paxos V1 and V2 paths used the static keyspace-level replicationType().isTracked() check, which does not reflect per-range migration state and produces incorrect routing decisions during migration. Coordinator-side routing: Replace all static isTracked() checks with MigrationRouter calls across StorageProxy (commitPaxos, sendCommit, isTrackedKeyspaceRequiringPaxosCommitForwarding, checkAndForwardCasIfNeeded, checkAndForwardConsensusReadIfNeeded), PaxosCommit (constructor, isTrackedKeyspaceRequiringForwarding), PaxosCommitAndPrepare, PaxosPrepare (start + isTracked field), PaxosPrepareRefresh, and PaxosState truncation acknowledgment. Handler-side validation: Add migration state validation to four Paxos replica handlers that receive messages carrying mutations or tracked reads: PaxosCommit.RequestHandler (direct V1/V2 commits), PaxosPrepare.RequestHandler (V2 prepare with tracked read), PaxosCommitAndPrepare.RequestHandler (combined commit+prepare), and PaxosPrepareRefresh.RequestHandler (refresh commits). Each uses the conditional-fetch pattern from AbstractMutationVerbHandler.checkReplicationMigration: compare the coordinator routing decision against the handler MigrationRouter result, fetch only on mismatch when the coordinator epoch is ahead (handler is behind and needs to catch up), throw CoordinatorBehindException when the coordinator epoch is behind. Coordinator-side commit retry: Add commit-level COORDINATOR_BEHIND retry in Paxos.cas() (V2) and commitPaxos() (V1). When replicas reject a commit due to migration state mismatch, ResponseVerbHandler.maybeFetchLogs() catches up the coordinator synchronously before delivering the failure. The retry re-creates the commit with fresh MigrationRouter routing. This retries only the commit phase, not the entire prepare+propose protocol. Stale mutation ID reconciliation: Commits saved in system.paxos may have a mutation ID from when the keyspace was tracked. When replayed after migration to untracked (via PaxosPrepareRefresh, PaxosCommitAndPrepare, sendCommit, or commitPaxos), the stale ID must be stripped to avoid Keyspace.apply() rejecting the mutation. Uses Commit.withMutationId() to reconcile in all four replay paths. Forward handlers: Forwarding is harmless -- the receiving replica re-executes the full CAS/read with its own fresh routing decisions, so no migration validation is needed at the forward boundary itself. Removed the "reject if keyspace not tracked" guards from CasForwardHandler and ConsensusReadForwardHandler (forwarding is now valid in either direction). Replaced unconditional fetchLogFromPeerOrCMS with the conditional-fetch pattern in Paxos2CommitForwardHandler, PaxosCommitForwardHandler, PrepareRefreshForwardHandler, and PaxosCommitAndPrepare.RequestHandler (unconditional fetch added unnecessary latency on the no-mismatch case). PaxosCommit failure tracking: Added super.onFailure() call to PaxosCommit.onFailure() so FailureRecordingCallback.failureResponses is populated, enabling failureReasonsAsMap() to return actual failure reasons. This was required for the V2 commit retry to detect COORDINATOR_BEHIND in MaybeFailure.failures. PaxosCommit hint suppression: Tracked mutations must not be written as hints because hint replay routes through Keyspace.applyInternalTracked() based on the mutation ID presence, which fails after migration to untracked. Guard submitHint with !isTracked() -- tracked mutations use MutationTrackingService for retries, not the hint system. MigrationRouter null safety: Replace getKeyspaceMetadata() (throws NoSuchElementException on missing keyspace) with maybeGetKeyspaceMetadata().orElse(null) at four call sites so the existing null guards actually protect against concurrent keyspace drops.
1 parent a79d34a commit 636c477

27 files changed

Lines changed: 4267 additions & 414 deletions

src/java/org/apache/cassandra/exceptions/CoordinatorBehindException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public CoordinatorBehindException(String msg)
2525
super(msg);
2626
}
2727

28-
public CoordinatorBehindException(String msg, UnknownTableException cause)
28+
public CoordinatorBehindException(String msg, Throwable cause)
2929
{
3030
super(msg, cause);
3131
}

src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency,
4242
this.writeType = writeType;
4343
}
4444

45+
public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor, Throwable cause)
46+
{
47+
super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor, cause);
48+
this.writeType = writeType;
49+
}
50+
4551
@Override
4652
protected void serializeSpecificFields(DataOutputPlus out, int version) throws IOException
4753
{

src/java/org/apache/cassandra/hints/HintsService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@
7373
*/
7474
public final class HintsService implements HintsServiceMBean
7575
{
76+
private static long REJECT_HINTS_BEFORE_NANOS;
77+
78+
@VisibleForTesting
79+
public static void setRejectHintsBeforeNanos(long nanos)
80+
{
81+
REJECT_HINTS_BEFORE_NANOS = nanos;
82+
}
83+
7684
// Dummy address to use for storing metrics for hints that will be retried on a different transaction system
7785
// and aren't being sent to a specific node
7886
public static final InetAddressAndPort RETRY_ON_DIFFERENT_SYSTEM_ADDRESS;
@@ -189,6 +197,9 @@ public void write(Collection<UUID> hostIds, Hint hint)
189197
if (isShutDown)
190198
throw new IllegalStateException("HintsService is shut down and can't accept new hints");
191199

200+
if (hint.mutation.getApproxCreatedAtNanos() < REJECT_HINTS_BEFORE_NANOS)
201+
return;
202+
192203
// we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
193204
catalog.maybeLoadStores(hostIds);
194205

src/java/org/apache/cassandra/service/StorageProxy.java

Lines changed: 111 additions & 81 deletions
Large diffs are not rendered by default.

src/java/org/apache/cassandra/service/paxos/CasForwardHandler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
/**
4141
* Handler for forwarded CAS (Compare-And-Set) operations.
4242
* Executes the CAS operation on behalf of the original coordinator,
43-
* ensuring that MutationId generation happens on a replica coordinator for tracked keyspaces.
43+
* ensuring that MutationId generation happens on a replica coordinator.
4444
*
4545
* TODO (expected): more comprehensive testing
4646
*/
@@ -60,7 +60,6 @@ public void doVerb(Message<CasForwardRequest> message)
6060
ClientWarn.instance.captureWarnings();
6161
try
6262
{
63-
// Validate keyspace exists and is tracked
6463
KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(request.keyspaceName);
6564
if (ksMetadata == null)
6665
{
@@ -69,13 +68,6 @@ public void doVerb(Message<CasForwardRequest> message)
6968
return;
7069
}
7170

72-
if (!ksMetadata.params.replicationType.isTracked())
73-
{
74-
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message);
75-
logger.error("Asked to perform forwarded CAS operation, but keyspace {} is not tracked", request.keyspaceName);
76-
return;
77-
}
78-
7971
// Execute the forwarded CAS operation
8072
logger.debug("Executing CAS operation for table {}.{} with key {}",
8173
request.keyspaceName, request.cfName, request.key);

src/java/org/apache/cassandra/service/paxos/Commit.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ public AcceptedWithTTL(Ballot ballot, PartitionUpdate update, long localDeletion
180180
this.localDeletionTime = localDeletionTime;
181181
}
182182

183+
public AcceptedWithTTL(Ballot ballot, Mutation mutation, long localDeletionTime)
184+
{
185+
super(ballot, mutation);
186+
this.localDeletionTime = localDeletionTime;
187+
}
188+
183189
boolean isExpired(long nowInSec)
184190
{
185191
return nowInSec >= localDeletionTime;
@@ -194,7 +200,7 @@ Accepted lastDeleted(Accepted b)
194200
@Override
195201
public AcceptedWithTTL withMutationId(MutationId mutationId)
196202
{
197-
return new AcceptedWithTTL(ballot, makeMutation(mutationId).getOnlyUpdate(), localDeletionTime);
203+
return new AcceptedWithTTL(ballot, makeMutation(mutationId), localDeletionTime);
198204
}
199205
}
200206

@@ -385,6 +391,12 @@ public static Commit newProposal(Ballot ballot, PartitionUpdate update)
385391
return new Commit(ballot, update);
386392
}
387393

394+
public static Commit newProposal(Ballot ballot, Mutation mutation)
395+
{
396+
PartitionUpdate update = withTimestamp(mutation.getOnlyUpdate(), ballot.unixMicros());
397+
return new Commit(ballot, new Mutation(mutation.id(), update, mutation.potentialTxnConflicts()));
398+
}
399+
388400
public boolean isAfter(Commit other)
389401
{
390402
return other == null || ballot.uuidTimestamp() > other.ballot.uuidTimestamp();
@@ -651,7 +663,7 @@ public T deserialize(DataInputPlus in, int version) throws IOException
651663
if (version >= MessagingService.VERSION_61)
652664
{
653665
// New format: deserialize Mutation
654-
Mutation mutation = org.apache.cassandra.db.Mutation.serializer.deserialize(in, version);
666+
Mutation mutation = Mutation.serializer.deserialize(in, version);
655667
return mutationConstructor.apply(ballot, mutation);
656668
}
657669
else

src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardHandler.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040

4141
/**
4242
* Handler for forwarded consensus read operations.
43-
* Executes the consensus read operation on behalf of the original coordinator,
44-
* ensuring proper coordination for tracked keyspaces on a replica coordinator.
43+
* Executes the consensus read operation on behalf of the original coordinator.
4544
*
4645
* TODO (expected): more comprehensive testing
4746
*/
@@ -55,13 +54,13 @@ public void doVerb(Message<ConsensusReadForwardRequest> message)
5554
{
5655
ConsensusReadForwardRequest request = message.payload;
5756
SinglePartitionReadCommand command = request.command;
57+
5858
Tracing.trace("Executing forwarded consensus read operation for {}", command.partitionKey());
5959

6060
// Start capturing client warnings for the forwarded operation
6161
ClientWarn.instance.captureWarnings();
6262
try
6363
{
64-
// Validate keyspace exists and is tracked
6564
String keyspaceName = command.metadata().keyspace;
6665
KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
6766
if (ksMetadata == null)
@@ -71,21 +70,14 @@ public void doVerb(Message<ConsensusReadForwardRequest> message)
7170
return;
7271
}
7372

74-
if (!ksMetadata.params.replicationType.isTracked())
75-
{
76-
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message);
77-
logger.error("Asked to perform forwarded consensus read operation, but keyspace {} is not tracked", keyspaceName);
78-
return;
79-
}
80-
8173
// Create a Group from the single command for reading
8274
SinglePartitionReadCommand.Group group = SinglePartitionReadCommand.Group.one(command);
8375

8476
// Execute the read using StorageProxy.read() which will:
8577
// 1. Check forwarding (returns null since we're on a replica)
8678
// 2. Execute the consensus read with the appropriate protocol
8779
logger.debug("Executing consensus read operation for table {}.{} with key {}",
88-
keyspaceName, command.metadata().name, command.partitionKey());
80+
command.metadata().keyspace, command.metadata().name, command.partitionKey());
8981

9082
Dispatcher.RequestTime requestTime = Dispatcher.RequestTime.forImmediateExecution();
9183
PartitionIterator result = StorageProxy.readWithConsensusForwarded(group, request.consistencyLevel, requestTime);

src/java/org/apache/cassandra/service/paxos/Paxos.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.util.Collection;
23+
import java.util.Collections;
2324
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
@@ -94,6 +95,7 @@
9495
import org.apache.cassandra.service.ClientState;
9596
import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
9697
import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
98+
import org.apache.cassandra.service.paxos.Commit.Agreed;
9799
import org.apache.cassandra.service.paxos.Commit.Proposal;
98100
import org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
99101
import org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
@@ -740,6 +742,8 @@ public static ConsensusAttemptResult cas(DecoratedKey partitionKey,
740742
try (PaxosOperationLock lock = PaxosState.lock(partitionKey, metadata, proposeDeadline, consistencyForConsensus, true))
741743
{
742744
Paxos.Async<PaxosCommit.Status> commit = null;
745+
Agreed agreed = null;
746+
Participants commitParticipants = null;
743747
done: while (true)
744748
{
745749
// read the current values and check they validate the conditions
@@ -839,7 +843,11 @@ else if (begin.isPromised)
839843
// 1) reached a majority, in which case it was agreed, had no effect and we can do nothing; or
840844
// 2) did not reach a majority, was not agreed, and was not user visible as a result so we can ignore it
841845
if (!proposal.isEmpty())
842-
commit = commit(proposal.agreed(), participants, consistencyForConsensus, consistencyForCommit, true);
846+
{
847+
agreed = proposal.agreed();
848+
commitParticipants = participants;
849+
commit = commit(agreed, participants, consistencyForConsensus, consistencyForCommit, true);
850+
}
843851

844852
break done;
845853
}
@@ -872,8 +880,26 @@ else if (begin.isPromised)
872880
if (commit != null)
873881
{
874882
PaxosCommit.Status result = commit.awaitUntil(commitDeadline);
875-
if (!result.isSuccess())
876-
throw result.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForCommit, failedAttemptsDueToContention);
883+
while (!result.isSuccess())
884+
{
885+
Paxos.MaybeFailure failure = result.maybeFailure();
886+
long coordinatorBehindCount = Collections.frequency(failure.failures.values(),
887+
RequestFailureReason.COORDINATOR_BEHIND);
888+
if (coordinatorBehindCount == 0
889+
|| failure.successes + coordinatorBehindCount < failure.required
890+
|| nanoTime() >= commitDeadline)
891+
{
892+
throw failure.markAndThrowAsTimeoutOrFailure(true, consistencyForCommit, failedAttemptsDueToContention);
893+
}
894+
895+
casWriteMetrics.retryCoordinatorBehind.mark();
896+
Tracing.trace("Retrying V2 Paxos commit after COORDINATOR_BEHIND for {}.{} partition {}, {} behind replicas out of {} required",
897+
metadata.keyspace, metadata.name, partitionKey, coordinatorBehindCount, failure.required);
898+
logger.warn("Retrying V2 Paxos commit after COORDINATOR_BEHIND for {}.{} partition {}, {} behind replicas out of {} required",
899+
metadata.keyspace, metadata.name, partitionKey, coordinatorBehindCount, failure.required);
900+
commit = commit(agreed, commitParticipants, consistencyForConsensus, consistencyForCommit, true);
901+
result = commit.awaitUntil(commitDeadline);
902+
}
877903
}
878904
Tracing.trace("CAS successful");
879905
return casResult((RowIterator)null);
@@ -1103,7 +1129,7 @@ private static BeginResult begin(long deadline,
11031129
// is equal to the latest commit (even if the ballots aren't) we're done and can abort earlier,
11041130
// and in fact it's possible for a CAS to sometimes determine if side effects occurred by reading
11051131
// the underlying data and not witnessing the timestamp of its ballot (or any newer for the relevant data).
1106-
Proposal repropose = new Proposal(inProgress.ballot, inProgress.accepted.update);
1132+
Proposal repropose = new Proposal(inProgress.ballot, inProgress.accepted.mutation);
11071133
PaxosPropose.Status proposeResult = propose(repropose, inProgress.participants, false).awaitUntil(deadline);
11081134
switch (proposeResult.outcome)
11091135
{

src/java/org/apache/cassandra/service/paxos/Paxos2CommitForwardHandler.java

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.apache.cassandra.net.Message;
3030
import org.apache.cassandra.net.MessagingService;
3131
import org.apache.cassandra.net.NoPayload;
32-
import org.apache.cassandra.schema.KeyspaceMetadata;
33-
import org.apache.cassandra.schema.Schema;
32+
import org.apache.cassandra.service.replication.migration.MigrationRouter;
33+
import org.apache.cassandra.tcm.ClusterMetadata;
3434
import org.apache.cassandra.tcm.ClusterMetadataService;
3535
import org.apache.cassandra.tracing.Tracing;
3636
import org.apache.cassandra.utils.concurrent.ConditionAsConsumer;
@@ -39,13 +39,11 @@
3939

4040
/**
4141
* Handler for forwarded Paxos V2 commit requests.
42-
* Executes the commit operation on behalf of the original coordinator,
43-
* ensuring that MutationId generation happens on a replica coordinator.
44-
*
45-
* The PaxosCommit constructor handles mutation ID generation, so this handler
46-
* simply delegates to PaxosCommit.commit() with the original commit.
42+
* Delegates to PaxosCommit.commit() which handles mutation ID generation
43+
* in its constructor.
4744
*
4845
* TODO (expected): more comprehensive testing
46+
* TODO: should loop on CoordinatorBehindException rather than propagating failure to the forwarding coordinator
4947
*/
5048
public class Paxos2CommitForwardHandler implements IVerbHandler<Paxos2CommitForwardRequest>
5149
{
@@ -55,27 +53,38 @@ public class Paxos2CommitForwardHandler implements IVerbHandler<Paxos2CommitForw
5553
@Override
5654
public void doVerb(Message<Paxos2CommitForwardRequest> message)
5755
{
58-
// Ensure we have up-to-date cluster metadata before executing the forwarded commit
59-
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), message.header.epoch);
6056
Paxos2CommitForwardRequest request = message.payload;
57+
Commit.Agreed commit = request.commit;
6158

62-
Tracing.trace("Executing forwarded Paxos V2 commit for {}", request.commit.partitionKey());
59+
Tracing.trace("Executing forwarded Paxos V2 commit for {}.{} partition {}",
60+
commit.metadata().keyspace, commit.metadata().name, commit.partitionKey());
6361

6462
try
6563
{
66-
String ksName = request.commit.metadata().keyspace;
67-
KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(ksName);
68-
if (ksMetadata == null)
64+
String ksName = commit.metadata().keyspace;
65+
ClusterMetadata metadata = ClusterMetadata.current();
66+
boolean shouldBeTracked = MigrationRouter.shouldUseTrackedForWrites(metadata,
67+
ksName,
68+
commit.metadata().id,
69+
commit.partitionKey().getToken());
70+
71+
if (!shouldBeTracked && message.epoch().isAfter(metadata.epoch))
6972
{
70-
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message);
71-
logger.error("Failed to forward paxos commit for non-existent keyspace {}", ksName);
72-
return;
73+
metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, message.from(), message.epoch());
74+
// shouldBeTracked isn't used after this, but is kept up to date just in case
75+
shouldBeTracked = MigrationRouter.shouldUseTrackedForWrites(metadata,
76+
ksName,
77+
commit.metadata().id,
78+
commit.partitionKey().getToken());
7379
}
7480

75-
if (!ksMetadata.params.replicationType.isTracked())
81+
if (metadata.schema.getKeyspaces().getNullable(ksName) == null)
7682
{
7783
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message);
78-
logger.error("Asked to perform forwarded paxos commit, but keyspace {} is not tracked", ksName);
84+
logger.error("Failed to forward paxos commit for non-existent keyspace {}.{} partition {}",
85+
ksName, commit.metadata().name, commit.partitionKey());
86+
Tracing.trace("Failed to forward paxos commit for non-existent keyspace {}.{} partition {}",
87+
ksName, commit.metadata().name, commit.partitionKey());
7988
return;
8089
}
8190

@@ -112,15 +121,29 @@ public void doVerb(Message<Paxos2CommitForwardRequest> message)
112121
}
113122
else
114123
{
115-
MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN, message);
124+
RequestFailureReason reason = RequestFailureReason.UNKNOWN;
125+
if (status != null && status.maybeFailure() != null)
126+
{
127+
for (RequestFailureReason r : status.maybeFailure().failures.values())
128+
{
129+
if (r == RequestFailureReason.COORDINATOR_BEHIND)
130+
{
131+
reason = RequestFailureReason.COORDINATOR_BEHIND;
132+
break;
133+
}
134+
}
135+
}
136+
MessagingService.instance().respondWithFailure(reason, message);
116137
logger.error("Forwarded Paxos V2 commit failed with status: {}", status);
138+
Tracing.trace("Forwarded Paxos V2 commit failed with status: {}", status);
117139
}
118140
}
119141
catch (InterruptedException e)
120142
{
121143
Thread.currentThread().interrupt();
122144
MessagingService.instance().respondWithFailure(RequestFailure.forException(e), message);
123145
logger.error("Forwarded Paxos V2 commit interrupted", e);
146+
Tracing.trace("Forwarded Paxos V2 commit interrupted");
124147
}
125148
}
126149
catch (Exception e)

0 commit comments

Comments
 (0)