Skip to content

Commit a79d34a

Browse files
committed
CEP-45: BRR does tracked writes during migration to mutation tracking
During migration from untracked to tracked replication, blocking read repair (BRR) continues to run because mutation tracking lacks sufficient initialized state to provide monotonic reads for pending ranges. However, BRR was previously bypassing mutation tracking entirely — read repair mutations were sent as untracked writes via READ_REPAIR_REQ, which meant they were invisible to the mutation journal. This is unsafe because the migration repair that completes the transition assumes all writes after it started are tracked, so that tracked reads can safely begin afterward. Route BRR writes through TrackedWriteRequest.perform() when MigrationRouter.shouldUseTrackedForWrites() indicates the token is migrating. This gives each read repair mutation a proper MutationId and records it in the mutation journal, ensuring the migration invariant holds. Key changes: - Remove the isReadRepair flag from Mutation and all special-case bypasses that let read repair skip mutation tracking validation and routing - Add repairViaTrackedWrite() to BlockingReadRepair which sends each per-replica mutation as an independent tracked write with retry logic for migration races (RetryOnDifferentSystemException/CoordinatorBehindException) - Introduce WriteCallback interface (replacing Runnable) on write response handlers so tracked write completion can be observed without races - Make tracked→untracked instant in AlterSchema by skipping migration state entirely — tracked writes are already quorum writes so untracked reads are strictly less demanding. Reverting an in-progress untracked→tracked migration cleans up the now-unnecessary state. - Add RepairedBlockingViaTrackedWrite metric to observe BRR-via-tracked path - Add pauseRegularPriorityForTesting() to ActiveLogReconciler to allow tests to suppress background reconciliation while keeping high-priority (tracked read) reconciliation active - Add MutationTrackingReadRepairTest covering BRR behavior across all migration phases with both point reads and range scans
1 parent 9d336f6 commit a79d34a

29 files changed

Lines changed: 1235 additions & 260 deletions

src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,6 @@ else if (message.epoch().isBefore(metadata.schema.lastModified()))
202202
*/
203203
protected ClusterMetadata checkReplicationMigration(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo)
204204
{
205-
// Read repair mutations always bypass mutation tracking and use the untracked
206-
// write path, so skip the replication migration routing check. The isReadRepair
207-
// flag on the mutation hasn't been set yet at this point — it's set later in
208-
// applyMutation() — so we check the handler type instead.
209-
if (this instanceof ReadRepairVerbHandler)
210-
return metadata;
211-
212205
IMutation mutation = message.payload;
213206
MutationRouting expected = mutation.id().isNone() ? MutationRouting.UNTRACKED : MutationRouting.TRACKED;
214207
if (expected == MigrationRouter.getMutationRouting(metadata, mutation))

src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
4747
{
4848
group = Keyspace.writeOrder.start();
4949

50-
if (!mutation.isReadRepair())
51-
MigrationRouter.validateUntrackedMutation(mutation);
50+
MigrationRouter.validateUntrackedMutation(mutation);
5251
// write the mutation to the commitlog and memtables
5352
CommitLogPosition position = null;
5453
if (makeDurable)

src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ protected void applyMutation(final Message<CounterMutation> message, InetAddress
7070
// it's own in that case.
7171
StorageProxy.applyCounterMutationOnLeader(cm,
7272
localDataCenter,
73-
() -> MessagingService.instance().send(message.emptyResponse(), respondToAddress),
73+
handler -> MessagingService.instance().send(message.emptyResponse(), respondToAddress),
7474
Dispatcher.RequestTime.forImmediateExecution());
7575
}
7676
}

src/java/org/apache/cassandra/db/Keyspace.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public void apply(final Mutation mutation,
439439
boolean updateIndexes,
440440
boolean isDroppable)
441441
{
442-
if (MigrationRouter.isFullyTracked(mutation) && !mutation.isReadRepair())
442+
if (MigrationRouter.isFullyTracked(mutation))
443443
applyInternalTracked(mutation, null);
444444
else
445445
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
@@ -462,7 +462,7 @@ private Future<?> applyInternal(final Mutation mutation,
462462
boolean isDeferrable,
463463
Promise<?> future)
464464
{
465-
Preconditions.checkState((!getMetadata().useMutationTracking() || mutation.isReadRepair()) && mutation.id().isNone());
465+
Preconditions.checkState(!MigrationRouter.isFullyTracked(mutation) && mutation.id().isNone());
466466

467467
if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS))
468468
throw new RuntimeException("Testing write failures");

src/java/org/apache/cassandra/db/Mutation.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -110,21 +110,6 @@ public class Mutation implements IMutation, Supplier<Mutation>, Commitable
110110
// because it is being applied by one or in a context where transaction conflicts don't occur
111111
private PotentialTxnConflicts potentialTxnConflicts;
112112

113-
// Transient: not serialized on the wire. Set by ReadRepairVerbHandler on the
114-
// receiving side so downstream code (Keyspace.apply, write handlers) can route
115-
// read repair mutations through the untracked write path during migration.
116-
private transient boolean isReadRepair;
117-
118-
public void setReadRepair(boolean readRepair)
119-
{
120-
this.isReadRepair = readRepair;
121-
}
122-
123-
public boolean isReadRepair()
124-
{
125-
return isReadRepair;
126-
}
127-
128113
public Mutation(MutationId id, PartitionUpdate update)
129114
{
130115
this(id, update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc, PotentialTxnConflicts.DISALLOW);
@@ -165,9 +150,7 @@ public MutationId id()
165150
@Override
166151
public Mutation withMutationId(MutationId mutationId)
167152
{
168-
Mutation m = new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts);
169-
m.isReadRepair = this.isReadRepair;
170-
return m;
153+
return new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts);
171154
}
172155

173156
private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)
@@ -201,9 +184,7 @@ private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)
201184

202185
Map<TableId, PartitionUpdate> updates = builder.build();
203186
checkState(!updates.isEmpty(), "Updates should not be empty");
204-
Mutation result = new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts);
205-
result.isReadRepair = this.isReadRepair;
206-
return result;
187+
return new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts);
207188
}
208189

209190
public @Nullable Mutation without(TableId tableId)

src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public class ReadRepairVerbHandler extends AbstractMutationVerbHandler<Mutation>
2727

2828
public void applyMutation(Mutation mutation)
2929
{
30-
mutation.setReadRepair(true);
3130
mutation.apply();
3231
}
3332

src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
28+
import java.util.function.Consumer;
2829
import java.util.function.Supplier;
2930

3031
import com.google.common.base.Preconditions;
@@ -93,7 +94,7 @@ protected AbstractReplicationStrategy(String keyspaceName, Map<String, String> c
9394
public abstract DataPlacement calculateDataPlacement(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata);
9495

9596
public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
96-
Runnable callback,
97+
Consumer<AbstractWriteResponseHandler<?>> callback,
9798
WriteType writeType,
9899
Supplier<Mutation> hintOnFailure,
99100
Dispatcher.RequestTime requestTime)
@@ -103,7 +104,7 @@ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.F
103104
}
104105

105106
public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
106-
Runnable callback,
107+
Consumer<AbstractWriteResponseHandler<?>> callback,
107108
WriteType writeType,
108109
Supplier<Mutation> hintOnFailure,
109110
Dispatcher.RequestTime requestTime,

src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ public class ReadRepairMetrics
3737
*/
3838
public static final Meter repairedBlockingViaAccord = Metrics.meter(factory.createMetricName("RepairedBlockingViaAccord"));
3939

40+
/**
41+
* Blocking read repair was sent as a tracked write. This happens when the token is migrating to tracked replication
42+
* but migration hasn't completed
43+
*/
44+
public static final Meter repairedBlockingViaTrackedWrite = Metrics.meter(factory.createMetricName("RepairedBlockingViaTrackedWrite"));
45+
4046
/**
4147
* This should be zero if you are trying to run Accord in a 100% correct way and interoperating with non-transactional writes.
4248
*

src/java/org/apache/cassandra/replication/ActiveLogReconciler.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ public void run(Interruptible.State state) throws InterruptedException
120120
Task task;
121121
while ((task = highPriorityTasks.poll()) != null)
122122
task.send();
123-
while ((task = regularPriorityTasks.poll()) != null)
124-
task.send();
123+
if (!isRegularPriorityPaused)
124+
{
125+
while ((task = regularPriorityTasks.poll()) != null)
126+
task.send();
127+
}
125128

126129
haveWork.acquire(1);
127130
}
@@ -261,6 +264,7 @@ void send()
261264

262265
private volatile boolean isShutdown = false;
263266
private volatile boolean isPaused = false;
267+
private volatile boolean isRegularPriorityPaused = false;
264268

265269
@Override
266270
public boolean isTerminated()
@@ -307,4 +311,22 @@ void resumeForTesting()
307311
{
308312
isPaused = false;
309313
}
314+
315+
/**
316+
* Pause only regular-priority (background write retry) task delivery.
317+
* High-priority tasks (needed by tracked read reconciliation) continue to be processed.
318+
* This allows tests to prevent the reconciler from proactively fixing inconsistencies
319+
* while still allowing tracked reads to pull missing mutations.
320+
*/
321+
@VisibleForTesting
322+
void pauseRegularPriorityForTesting()
323+
{
324+
isRegularPriorityPaused = true;
325+
}
326+
327+
@VisibleForTesting
328+
void resumeRegularPriorityForTesting()
329+
{
330+
isRegularPriorityPaused = false;
331+
}
310332
}

src/java/org/apache/cassandra/replication/ForwardedWrite.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.function.Consumer;
2728

2829
import com.google.common.base.Preconditions;
2930
import com.google.common.collect.Sets;
@@ -297,6 +298,13 @@ private static void applyLocallyAndForwardToReplicas(Mutation mutation, Set<Node
297298
}
298299

299300
public static AbstractWriteResponseHandler<Object> forwardMutation(Mutation mutation, ReplicaPlan.ForWrite plan, AbstractReplicationStrategy strategy, Dispatcher.RequestTime requestTime)
301+
{
302+
return forwardMutationInternal(mutation, plan, strategy, requestTime, null);
303+
}
304+
305+
private static AbstractWriteResponseHandler<Object> forwardMutationInternal(
306+
Mutation mutation, ReplicaPlan.ForWrite plan, AbstractReplicationStrategy strategy,
307+
Dispatcher.RequestTime requestTime, Consumer<AbstractWriteResponseHandler<?>> onComplete)
300308
{
301309
// find leader
302310
NodeProximity proximity = DatabaseDescriptor.getNodeProximity();
@@ -318,7 +326,7 @@ public static AbstractWriteResponseHandler<Object> forwardMutation(Mutation muta
318326
// create callback and forward to leader
319327
logger.trace("Selected {} as leader for mutation with key {}", leader.endpoint(), mutation.key());
320328

321-
AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime);
329+
AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, onComplete, WriteType.SIMPLE, null, requestTime);
322330

323331
// Add callbacks for replicas to respond directly to coordinator
324332
Message<MutationRequest> toLeader = Message.outWithRequestTime(Verb.FORWARD_WRITE_REQ, new MutationRequest(mutation, plan), requestTime);
@@ -348,6 +356,14 @@ public static AbstractWriteResponseHandler<Object> forwardCounterMutation(Counte
348356
ReplicaPlan.ForWrite plan,
349357
AbstractReplicationStrategy strategy,
350358
Dispatcher.RequestTime requestTime)
359+
{
360+
return forwardCounterMutationInternal(counterMutation, plan, strategy, requestTime, null);
361+
}
362+
363+
private static AbstractWriteResponseHandler<Object> forwardCounterMutationInternal(
364+
CounterMutation counterMutation, ReplicaPlan.ForWrite plan,
365+
AbstractReplicationStrategy strategy, Dispatcher.RequestTime requestTime,
366+
Consumer<AbstractWriteResponseHandler<?>> onComplete)
351367
{
352368
Preconditions.checkArgument(counterMutation.id().isNone(), "CounterMutation should not have an ID when forwarding");
353369

@@ -373,7 +389,7 @@ public static AbstractWriteResponseHandler<Object> forwardCounterMutation(Counte
373389
logger.trace("Forwarding tracked counter mutation to leader replica {}", leader);
374390

375391
// Create response handler for all replicas
376-
AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, requestTime);
392+
AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, onComplete, WriteType.COUNTER, null, requestTime);
377393

378394
// Add callbacks for all live replicas to respond directly to coordinator
379395
Message<CounterMutation> forwardMessage = Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation, requestTime);
@@ -417,6 +433,33 @@ public static AbstractWriteResponseHandler<Object> forward(IMutation mutation,
417433
return forwardMutation((Mutation) mutation, plan, strategy, requestTime);
418434
}
419435

436+
/**
437+
* Forward a mutation to a replica leader for processing.
438+
* Dispatches to the appropriate method based on mutation type.
439+
*
440+
* <p>Like {@link #forward(IMutation, ReplicaPlan.ForWrite, AbstractReplicationStrategy, Dispatcher.RequestTime)},
441+
* but wires a completion callback on the handler before any messages are sent, avoiding races where the handler
442+
* is signaled before the caller can observe it.
443+
*
444+
* @param mutation the mutation to forward (can be Mutation or CounterMutation)
445+
* @param plan the replica plan
446+
* @param strategy the replication strategy
447+
* @param requestTime the request time
448+
* @param onComplete callback invoked when the write response handler completes
449+
* @return the write response handler
450+
*/
451+
static AbstractWriteResponseHandler<?> forward(IMutation mutation,
452+
ReplicaPlan.ForWrite plan,
453+
AbstractReplicationStrategy strategy,
454+
Dispatcher.RequestTime requestTime,
455+
Consumer<AbstractWriteResponseHandler<?>> onComplete)
456+
{
457+
if (mutation instanceof CounterMutation)
458+
return forwardCounterMutationInternal((CounterMutation) mutation, plan, strategy, requestTime, onComplete);
459+
else
460+
return forwardMutationInternal((Mutation) mutation, plan, strategy, requestTime, onComplete);
461+
}
462+
420463
/**
421464
* Apply a forwarded tracked counter mutation on the leader replica.
422465
* Called by CounterMutationVerbHandler when receiving a forwarded counter write.

0 commit comments

Comments
 (0)