Skip to content

Commit cd7f495

Browse files
committed
Refactor RedundantStatus to encode vector of states that can be merged independently
Also fix: - Truncate command on first access, without participants - Use Ballot.ZERO when invoking CFK.insertOutOfRange where appropriate - Don't supply a command's own route to ProgressLog.waiting to ensure new keys are incorporated - Ensure progress in CommandsForKey by setting vestigial commands to ERASED - Add any missing owned keys to StoreParticipants.route to ensure fetch can make progress - Recovery must wait for earlier not-accepted transactions if either has the privileged coordinator optimisation - Inclusive SyncPoint used incorrect topologies for propose phase - Barrier must not register local listener without up-to-date topology information - Stop home shard truncating a TxnId to vestigial rather than Invalidated so other shards can make progress Also improve: - Validate commands are constructed with non-empty participants - Remove some unnecessary synchronized keywords - Clear ok messages on PreAccept and Accept to free up memory - Introduce TxnId.Cardinality flag so we can optimise single key queries - Update CommandsForKey serialization to better handle larger flag space - Configurable which Txn.Kind can result in a CommandStore being marked stale - Process DefaultProgressLog queue synchronously when relevant state is resident in memory - Remove defunct CollectMaxApplied version of ListStore bootstrap - Standardise linearizability violation reporting - Improve CommandStore.execute method naming to reduce chance of misuse - Relax Invariant on recovery that we cannot both Accept and Reject, as we could be a stale recovery - Prune and address some comments patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20282
1 parent 78ab7ee commit cd7f495

110 files changed

Lines changed: 1737 additions & 1734 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

accord-core/src/main/java/accord/api/Agent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
import java.util.concurrent.TimeUnit;
2222

23+
import javax.annotation.Nullable;
24+
2325
import accord.api.ProgressLog.BlockedUntil;
2426
import accord.local.Command;
2527
import accord.local.Node;
2628
import accord.local.SafeCommandStore;
2729
import accord.messages.ReplyContext;
30+
import accord.primitives.Participants;
2831
import accord.primitives.Ranges;
2932
import accord.primitives.Routable;
3033
import accord.primitives.Status.Phase;
@@ -153,5 +156,5 @@ default EventsListener metricsEventsListener()
153156
long localExpiresAt(TxnId txnId, Phase phase, TimeUnit unit);
154157
long expiresAt(ReplyContext replyContext, TimeUnit unit);
155158

156-
default void onViolation(String message) { throw illegalState(message); }
159+
default void onViolation(String message, Participants<?> participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt) { throw illegalState(message); }
157160
}

accord-core/src/main/java/accord/api/ProtocolModifiers.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import java.util.regex.Matcher;
2222
import java.util.regex.Pattern;
2323

24+
import accord.primitives.Txn;
2425
import accord.primitives.TxnId;
2526
import accord.primitives.TxnId.FastPath;
2627
import accord.primitives.TxnId.FastPaths;
2728
import accord.primitives.TxnId.MediumPath;
2829
import accord.utils.Invariants;
30+
import accord.utils.TinyEnumSet;
2931

3032
import static accord.api.ProtocolModifiers.QuorumEpochIntersections.ChaseFixedPoint.Chase;
3133
import static accord.api.ProtocolModifiers.QuorumEpochIntersections.ChaseFixedPoint.DoNotChase;
@@ -162,7 +164,6 @@ static class Spec
162164

163165
static
164166
{
165-
// TODO (expected): configurable
166167
Spec spec = new Spec(false, false, false, false);
167168
txnInstability = spec.txnInstability;
168169
txnDiscardPreAcceptDeps = spec.txnDiscardPreAcceptDeps;
@@ -187,7 +188,7 @@ public static class Toggles
187188
public static void setPermittedFastPaths(FastPaths newPermittedFastPaths) { permittedFastPaths = newPermittedFastPaths; }
188189
public static FastPath ensurePermitted(FastPath path) { return path.toPermitted(permittedFastPaths); }
189190

190-
private static MediumPath defaultMediumPath = MediumPath.TRACK_STABLE;
191+
private static MediumPath defaultMediumPath = MediumPath.TrackStable;
191192
public static MediumPath defaultMediumPath() { return defaultMediumPath; }
192193
public static void setDefaultMediumPath(MediumPath newDefaultMediumPath) { defaultMediumPath = newDefaultMediumPath; }
193194

@@ -203,6 +204,15 @@ public static class Toggles
203204
public static boolean requiresUniqueHlcs() { return requiresUniqueHlcs; }
204205
public static void setRequiresUniqueHlcs(boolean newRequiresUniqueHlcs) { requiresUniqueHlcs = newRequiresUniqueHlcs; }
205206

207+
private static int markStaleIfCannotExecute = TinyEnumSet.encode(Txn.Kind.Write);
208+
public static boolean markStaleIfCannotExecute(Txn.Kind kind) { return TinyEnumSet.contains(markStaleIfCannotExecute, kind); }
209+
public static void setMarkStaleIfCannotExecute(Txn.Kind ... kinds)
210+
{
211+
int newMarkStaleIfCannotExecute = TinyEnumSet.encode(kinds);
212+
Invariants.require(TinyEnumSet.contains(newMarkStaleIfCannotExecute, Txn.Kind.Write));
213+
markStaleIfCannotExecute = newMarkStaleIfCannotExecute;
214+
}
215+
206216
public enum DependencyElision { OFF, ON, IF_DURABLE }
207217
private static DependencyElision dependencyElision = IF_DURABLE;
208218
public static DependencyElision dependencyElision() { return dependencyElision; }

accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,30 +74,29 @@ void onSlowResponseInternal(Id from) {}
7474
abstract long executeAtEpoch();
7575

7676
@Override
77-
public synchronized final void onFailure(Id from, Throwable failure)
77+
public final void onFailure(Id from, Throwable failure)
7878
{
7979
if (!isDone)
8080
onFailureInternal(from, failure);
8181
}
8282

8383
@Override
84-
public final synchronized boolean onCallbackFailure(Id from, Throwable failure)
84+
public final boolean onCallbackFailure(Id from, Throwable failure)
8585
{
8686
if (isDone) return false;
8787
isDone = true;
8888
return tryFailure(failure);
8989
}
9090

9191
@Override
92-
// TODO (expected): shouldn't need synchronized
93-
public final synchronized void onSuccess(Id from, R reply)
92+
public final void onSuccess(Id from, R reply)
9493
{
9594
if (!isDone)
9695
onSuccessInternal(from, reply);
9796
}
9897

9998
@Override
100-
public final synchronized void onSlowResponse(Id from)
99+
public final void onSlowResponse(Id from)
101100
{
102101
if (!isDone)
103102
onSlowResponseInternal(from);

accord-core/src/main/java/accord/coordinate/Barrier.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static accord.local.CommandSummaries.ComputeIsDep.IGNORE;
5858
import static accord.local.CommandSummaries.TestStartedAt.STARTED_AFTER;
5959
import static accord.primitives.Txn.Kind.AnyGloballyVisible;
60+
import static accord.primitives.TxnId.Cardinality.cardinality;
6061
import static accord.utils.Invariants.illegalState;
6162

6263
/**
@@ -149,7 +150,7 @@ public static Barrier barrier(Node node, Seekables<?, ?> keysOrRanges, FullRoute
149150
private static BiFunction<Node, FullRoute<?>, AsyncSyncPoint> wrap(TriFunction<Node, TxnId, FullRoute<?>, AsyncResult<? extends SyncPoint<?>>> syncPointFactory)
150151
{
151152
return (node, route) -> {
152-
TxnId txnId = node.nextTxnId(Txn.Kind.SyncPoint, route.domain());
153+
TxnId txnId = node.nextTxnId(Txn.Kind.SyncPoint, route.domain(), cardinality(route));
153154
return new AsyncSyncPoint(txnId, syncPointFactory.apply(node, txnId, route));
154155
};
155156
}
@@ -186,11 +187,10 @@ private void createSyncPoint()
186187
coordinateSyncPoint = async.async;
187188
if (barrierType.async)
188189
{
189-
Invariants.require(barrierType.async);
190190
TxnId txnId = async.txnId;
191191
long epoch = txnId.epoch();
192192
RoutingKey homeKey = route.homeKey();
193-
node.commandStores().ifLocal(txnId, homeKey, epoch, epoch, safeStore -> register(safeStore, txnId, homeKey))
193+
node.withEpoch(epoch, () -> node.commandStores().ifLocal(txnId, homeKey, epoch, epoch, safeStore -> register(safeStore, txnId, homeKey)))
194194
.begin(node.agent());
195195
}
196196

@@ -278,7 +278,7 @@ class ExistingTransactionCheck extends AsyncResults.AbstractResult<BarrierTxn> i
278278
@Override
279279
public BarrierTxn apply(SafeCommandStore safeStore)
280280
{
281-
// TODO (required): consider these semantics carefully
281+
// TODO (required): it isn't probably safe to use a Read here; but, we can and should implement Barriers via writes
282282
// Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
283283
// so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
284284
// this barrier or the transaction we listen on and that is fine
@@ -289,8 +289,9 @@ public BarrierTxn apply(SafeCommandStore safeStore)
289289
//noinspection SillyAssignment,ConstantConditions
290290
safeStore = safeStore; // prevent use in lambda
291291
safeStore.commandStore()
292-
.execute(found.txnId, safeStoreWithTxn -> register(safeStoreWithTxn, found.txnId, found.key))
293-
.begin(node.agent());
292+
.execute(found.txnId, safeStoreWithTxn -> {
293+
register(safeStoreWithTxn, found.txnId, found.key);
294+
}, node.agent());
294295
}
295296
return found;
296297
}

accord-core/src/main/java/accord/coordinate/CheckShards.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ protected CheckShards(Node node, TxnId txnId, U route, long srcEpoch, IncludeInf
6767

6868
private static Topologies topologyFor(Node node, TxnId txnId, Unseekables<?> contact, long epoch)
6969
{
70-
// TODO (expected): only fetch data from source epoch
70+
// TODO (desired): only fetch data from source epoch
7171
return node.topology().preciseEpochs(contact, txnId.epoch(), epoch);
7272
}
7373

accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import accord.primitives.Txn;
3333
import accord.primitives.TxnId;
3434
import accord.topology.Topologies;
35+
import accord.utils.Invariants;
3536
import accord.utils.SortedListMap;
3637
import accord.utils.async.AsyncResult;
3738
import accord.utils.async.AsyncResults;
@@ -125,6 +126,7 @@ void onPreAccepted(Topologies topologies)
125126
{
126127
Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue, ok -> ok.deps);
127128
topologies = node.topology().reselect(topologies, QuorumEpochIntersections.preaccept.include, route, executeAtEpoch, executeAtEpoch, Owned);
128-
new ExecuteEphemeralRead(node, topologies, route, txnId.withEpoch(executeAtEpoch), txn, executeAtEpoch, deps, this).start();
129+
new ExecuteEphemeralRead(node, topologies, route, txnId.withEpoch(executeAtEpoch), txn, deps, this).start();
130+
if (!Invariants.debug()) oks.clear();
129131
}
130132
}

accord-core/src/main/java/accord/coordinate/CoordinateGloballyDurable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
import accord.utils.async.AsyncResult;
3232
import accord.utils.async.AsyncResults.SettableResult;
3333

34-
// TODO (expected): this does not need to query every shard; can disseminate globally any sub-range of the ring
35-
// (indeed, we could slice both the query and dissemination only so that they always overlap)
34+
// TODO (expected): this does not need to query every shard OR more than one replica per shard;
35+
// can disseminate globally any sub-range of the ring.
36+
// Infact, we could simply autonomously disseminate our latest information to some subset of replicas
3637
public class CoordinateGloballyDurable extends SettableResult<Void> implements Callback<DurableBeforeReply>
3738
{
3839
final Node node;
39-
// TODO (expected): this can be a ReadTracker, we only need one response from each shard
4040
final QuorumTracker tracker;
4141
private DurableBefore durableBefore = DurableBefore.EMPTY;
4242

accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import accord.primitives.Txn;
3737
import accord.primitives.TxnId;
3838
import accord.topology.Topologies;
39+
import accord.utils.Invariants;
3940
import accord.utils.SortedListMap;
4041
import accord.utils.WrappableException;
4142

@@ -49,12 +50,10 @@
4950
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
5051
*
5152
* TODO (desired, testing): dedicated burn test to validate outcomes
52-
* TODO (expected):
5353
*/
5454
abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, PreAcceptReply>
5555
{
5656
final PreAcceptTracker<?> tracker;
57-
// TODO (expected): this can be cleared after preaccept
5857
private final SortedListMap<Id, PreAcceptOk> oks;
5958
final Txn txn;
6059
boolean fastPathEnabled = true;
@@ -148,10 +147,10 @@ void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
148147
@Override
149148
void onPreAccepted(Topologies topologies)
150149
{
151-
// TODO (expected): we do not have to take max here if we have enough fast path votes (this may unnecessarily force us onto the slow path)
152150
Timestamp executeAt = oks.foldlNonNullValues((ok, prev) -> mergeMaxAndFlags(ok.witnessedAt, prev), Timestamp.NONE);
153151
node.withEpoch(executeAt.epoch(), this, t -> WrappableException.wrap(t), () -> {
154152
onPreAccepted(topologies, executeAt, oks);
153+
if (!Invariants.debug()) oks.clear();
155154
});
156155
}
157156

accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import static accord.primitives.Timestamp.Flag.HLC_BOUND;
5757
import static accord.primitives.Timestamp.Flag.REJECTED;
5858
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
59+
import static accord.primitives.TxnId.Cardinality.cardinality;
5960

6061
/**
6162
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
@@ -124,14 +125,14 @@ public static <U extends Unseekable> AsyncResult<SyncPoint<U>> inclusiveAndAwait
124125
public static <U extends Unseekable> AsyncResult<SyncPoint<U>> coordinate(Node node, Kind kind, Unseekables<U> keysOrRanges, SyncPointAdapter<SyncPoint<U>> adapter)
125126
{
126127
Invariants.requireArgument(kind.isSyncPoint());
127-
TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
128+
TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain(), cardinality(keysOrRanges));
128129
return node.withEpoch(txnId.epoch(), () -> coordinate(node, txnId, keysOrRanges, adapter)).beginAsResult();
129130
}
130131

131132
public static <U extends Unseekable> AsyncResult<SyncPoint<U>> coordinate(Node node, Kind kind, FullRoute<U> route, SyncPointAdapter<SyncPoint<U>> adapter)
132133
{
133134
Invariants.requireArgument(kind.isSyncPoint());
134-
TxnId txnId = node.nextTxnId(kind, route.domain());
135+
TxnId txnId = node.nextTxnId(kind, route.domain(), cardinality(route));
135136
return node.withEpoch(txnId.epoch(), () -> coordinate(node, txnId, route, adapter)).beginAsResult();
136137
}
137138

@@ -150,7 +151,7 @@ private static <U extends Unseekable> AsyncResult<SyncPoint<U>> coordinate(Node
150151
: TopologyMismatch.checkForMismatchOrPendingRemoval(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), route);
151152
if (mismatch != null)
152153
return AsyncResults.failure(mismatch);
153-
CoordinateSyncPoint<SyncPoint<U>> coordinate = new CoordinateSyncPoint<>(node, txnId, adapter.forDecision(node, route, txnId), node.agent().emptySystemTxn(txnId.kind(), txnId.domain()), route, adapter);
154+
CoordinateSyncPoint<SyncPoint<U>> coordinate = new CoordinateSyncPoint<>(node, txnId, adapter.forDecision(node, route, txnId, txnId), node.agent().emptySystemTxn(txnId.kind(), txnId.domain()), route, adapter);
154155
coordinate.start();
155156
return coordinate;
156157
}

accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate;
6565
import static accord.local.Commands.AcceptOutcome.Success;
6666
import static accord.primitives.Timestamp.Flag.REJECTED;
67-
import static accord.primitives.TxnId.FastPath.PRIVILEGED_COORDINATOR_WITH_DEPS;
67+
import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
6868
import static java.util.concurrent.TimeUnit.MICROSECONDS;
6969

7070
/**
@@ -148,7 +148,6 @@ protected CoordinationAdapter<Result> proposeAdapter()
148148
return Adapters.standard();
149149
}
150150

151-
// TODO (expected): override in C* rather than default to configurability here
152151
protected CoordinationAdapter<Result> executeAdapter()
153152
{
154153
return node.coordinationAdapter(txnId, Standard);
@@ -201,7 +200,7 @@ public void accept(PreAcceptReply result, Throwable failure)
201200
// TODO (desired): we can probably still process and record fast path votes from peers, just with different quorum requirements
202201
boolean hasCoordinatorVote = txnId.equals(ok.witnessedAt);
203202
if (!hasCoordinatorVote) fastPathEnabled = false;
204-
Deps deps = hasCoordinatorVote && txnId.is(PRIVILEGED_COORDINATOR_WITH_DEPS) ? ok.deps : null;
203+
Deps deps = hasCoordinatorVote && txnId.is(PrivilegedCoordinatorWithDeps) ? ok.deps : null;
205204
onSuccess(node.id(), ok);
206205
for (Node.Id id : topologies.nodes())
207206
{
@@ -225,7 +224,7 @@ public PreAcceptReply apply(SafeCommandStore safeStore)
225224
SafeCommand safeCommand = safeStore.get(txnId, participants);
226225

227226
Deps deps;
228-
if (txnId.is(PRIVILEGED_COORDINATOR_WITH_DEPS))
227+
if (txnId.is(PrivilegedCoordinatorWithDeps))
229228
{
230229
deps = PreAccept.calculateDeps(safeStore, txnId, participants, minEpoch, txnId, true);
231230
if (deps == null)

0 commit comments

Comments
 (0)