Skip to content

Commit 1eb0090

Browse files
committed
Introduce Range transactions (#23)
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18174
1 parent 686326e commit 1eb0090

103 files changed

Lines changed: 1415 additions & 685 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

accord-core/src/main/java/accord/api/Key.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public interface Key extends Seekable, RoutableKey
3030
@Override
3131
default Key asKey() { return this; }
3232

33+
@Override
34+
default Key slice(Range range) { return this; }
35+
3336
@Override
3437
default Range asRange() { throw new UnsupportedOperationException(); }
3538
}

accord-core/src/main/java/accord/api/Read.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
package accord.api;
2020

2121
import accord.local.SafeCommandStore;
22-
import accord.primitives.Ranges;
23-
import accord.primitives.Keys;
24-
import accord.primitives.Timestamp;
25-
import accord.primitives.Txn;
22+
import accord.primitives.*;
2623
import org.apache.cassandra.utils.concurrent.AsyncPromise;
2724
import org.apache.cassandra.utils.concurrent.Future;
2825

@@ -35,8 +32,8 @@
3532
*/
3633
public interface Read
3734
{
38-
Keys keys();
39-
Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
35+
Seekables<?, ?> keys();
36+
Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
4037

4138
class ReadFuture extends AsyncPromise<Data> implements BiConsumer<Data, Throwable>
4239
{
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
package accord.api;
22

3+
import accord.primitives.Range;
34
import accord.primitives.RoutableKey;
45
import accord.primitives.Unseekable;
6+
import accord.utils.ArrayBuffers;
7+
8+
import java.util.Arrays;
9+
10+
import static accord.utils.ArrayBuffers.cachedRoutingKeys;
511

612
public interface RoutingKey extends Unseekable, RoutableKey
713
{
814
@Override default RoutingKey toUnseekable() { return this; }
15+
Range asRange();
916
}

accord-core/src/main/java/accord/api/Update.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import accord.primitives.Ranges;
2222
import accord.primitives.Keys;
23+
import accord.primitives.Seekables;
2324

2425
import javax.annotation.Nullable;
2526

@@ -30,7 +31,7 @@
3031
*/
3132
public interface Update
3233
{
33-
Keys keys();
34+
Seekables<?, ?> keys();
3435
// null is provided only if nothing was read
3536
Write apply(@Nullable Data data);
3637
Update slice(Ranges ranges);

accord-core/src/main/java/accord/api/Write.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package accord.api;
2020

2121
import accord.local.SafeCommandStore;
22+
import accord.primitives.Seekable;
2223
import accord.primitives.Timestamp;
2324
import org.apache.cassandra.utils.concurrent.Future;
2425

@@ -29,5 +30,5 @@
2930
*/
3031
public interface Write
3132
{
32-
Future<Void> apply(Key key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
33+
Future<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
3334
}

accord-core/src/main/java/accord/coordinate/CheckOn.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static accord.local.PreLoadContext.contextFor;
4141
import static accord.local.SaveStatus.NotWitnessed;
4242
import static accord.local.Status.*;
43+
import static accord.primitives.Routables.Slice.Minimal;
4344

4445
/**
4546
* Check on the status of a transaction. Returns early once enough information has been achieved to meet the requested
@@ -136,21 +137,23 @@ class OnDone implements MapReduceConsume<SafeCommandStore, Void>
136137

137138
public OnDone()
138139
{
139-
Ranges localRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
140-
PartialRoute<?> selfRoute = route().slice(localRanges);
140+
Ranges sliceRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
141+
Ranges covering = route().sliceCovering(sliceRanges, Minimal);
142+
Unseekables<?, ?> intersectingKeys = route().slice(covering, Minimal);
143+
141144
full = (CheckStatusOkFull) merged;
142-
sufficientFor = full.sufficientFor(selfRoute);
145+
sufficientFor = full.sufficientFor(intersectingKeys);
143146
maxRoute = Route.merge((Route)route(), full.route);
144147
progressKey = node.trySelectProgressKey(txnId, maxRoute);
145148

146149
PartialTxn partialTxn = null;
147150
if (sufficientFor.definition.isKnown())
148-
partialTxn = full.partialTxn.slice(localRanges, true).reconstitutePartial(selfRoute);
151+
partialTxn = full.partialTxn.slice(sliceRanges, true).reconstitutePartial(covering);
149152
this.partialTxn = partialTxn;
150153

151154
PartialDeps partialDeps = null;
152-
if (sufficientFor.deps.isDecisionKnown())
153-
partialDeps = full.committedDeps.slice(localRanges).reconstitutePartial(selfRoute);
155+
if (sufficientFor.deps.hasDecidedDeps())
156+
partialDeps = full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
154157
this.partialDeps = partialDeps;
155158
}
156159

@@ -161,7 +164,7 @@ void start()
161164
keys = partialTxn.keys();
162165

163166
Iterable<TxnId> txnIds = Collections.singleton(txnId);
164-
if (sufficientFor.deps.isDecisionKnown())
167+
if (sufficientFor.deps.hasDecidedDeps())
165168
txnIds = Iterables.concat(txnIds, partialDeps.txnIds());
166169

167170
PreLoadContext loadContext = contextFor(txnIds, keys);
@@ -219,7 +222,7 @@ public Void apply(SafeCommandStore safeStore)
219222
if (!safeStore.ranges().at(txnId.epoch()).contains(homeKey))
220223
return null;
221224

222-
Timestamp executeAt = merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
225+
Timestamp executeAt = merged.saveStatus.known.executeAt.hasDecidedExecuteAt() ? merged.executeAt : null;
223226
command.setDurability(safeStore, merged.durability, homeKey, executeAt);
224227
safeStore.progressLog().durable(command, null);
225228
return null;

accord-core/src/main/java/accord/coordinate/CheckShards.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ private static Topologies topologyFor(Node node, TxnId txnId, Unseekables<?, ?>
4242
@Override
4343
protected void contact(Id id)
4444
{
45-
node.send(id, new CheckStatus(txnId, contact.slice(topologies().computeRangesForNode(id)), txnId.epoch(), untilRemoteEpoch, includeInfo), this);
45+
Unseekables<?, ?> unseekables = contact.slice(topologies().computeRangesForNode(id));
46+
node.send(id, new CheckStatus(txnId, unseekables, txnId.epoch(), untilRemoteEpoch, includeInfo), this);
4647
}
4748

4849
protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }

accord-core/src/main/java/accord/coordinate/FetchData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private static Object fetchInternal(Ranges ranges, Known target, Node node, TxnI
8989
// information to fulfil that phase locally we should downgrade the response we give to the callback
9090
Known sufficientFor = ok.sufficientFor(fetch);
9191
// if we discover the executeAt as part of this action, use that to decide if we requested enough info
92-
Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.isDecisionKnown() ? ok.executeAt : null;
92+
Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.hasDecidedExecuteAt() ? ok.executeAt : null;
9393
if (sufficientFor.outcome == OutcomeKnown && (exec == null || untilLocalEpoch < exec.epoch()))
9494
sufficientFor = sufficientFor.with(OutcomeUnknown);
9595
callback.accept(sufficientFor, null);

accord-core/src/main/java/accord/coordinate/Invalidate.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
package accord.coordinate;
2020

2121
import java.util.ArrayList;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.function.BiConsumer;
2426

2527
import accord.coordinate.tracking.InvalidationTracker;
2628
import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker;
2729
import accord.coordinate.tracking.RequestStatus;
30+
import accord.local.Node.Id;
2831
import accord.local.Status;
2932
import accord.messages.Commit;
3033
import accord.primitives.*;
@@ -59,6 +62,7 @@ public class Invalidate implements Callback<InvalidateReply>
5962
private final List<InvalidateReply> replies = new ArrayList<>();
6063
private final InvalidationTracker tracker;
6164
private Throwable failure;
65+
private final Map<Id, InvalidateReply> debug = Invariants.debug() ? new HashMap<>() : null;
6266

6367
private Invalidate(Node node, Ballot ballot, TxnId txnId, Unseekables<?, ?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, BiConsumer<Outcome, Throwable> callback)
6468
{
@@ -91,17 +95,18 @@ private void start()
9195
}
9296

9397
@Override
94-
public synchronized void onSuccess(Node.Id from, InvalidateReply reply)
98+
public synchronized void onSuccess(Id from, InvalidateReply reply)
9599
{
96100
if (isDone || isPrepareDone)
97101
return;
98102

103+
if (debug != null) debug.put(from, reply);
99104
replies.add(reply);
100105
handle(tracker.recordSuccess(from, reply.isPromised(), reply.acceptedFastPath));
101106
}
102107

103108
@Override
104-
public void onFailure(Node.Id from, Throwable failure)
109+
public void onFailure(Id from, Throwable failure)
105110
{
106111
if (isDone || isPrepareDone)
107112
return;
@@ -227,8 +232,9 @@ else if (homeKey != null)
227232

228233
// if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
229234
// Probably simplest to do so, but perhaps better for user if we don't.
230-
// TODO (now, rangetxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
231-
RoutingKey invalidateWithKey = invalidateWith.slice(Ranges.of(tracker.promisedShard().range)).get(0).someIntersectingRoutingKey();
235+
Ranges ranges = Ranges.of(tracker.promisedShard().range);
236+
// we look up by TxnId at the target node, so it's fine to pick a RoutingKey even if it's a range transaction
237+
RoutingKey invalidateWithKey = invalidateWith.slice(ranges).get(0).someIntersectingRoutingKey(ranges);
232238
proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, fail) -> {
233239
/*
234240
We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
@@ -271,7 +277,7 @@ private void commitInvalidate()
271277
}
272278

273279
@Override
274-
public void onCallbackFailure(Node.Id from, Throwable failure)
280+
public void onCallbackFailure(Id from, Throwable failure)
275281
{
276282
if (isDone)
277283
return;

accord-core/src/main/java/accord/coordinate/Propose.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package accord.coordinate;
2020

2121
import java.util.ArrayList;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.function.BiConsumer;
2426

2527
import accord.api.Result;
@@ -36,9 +38,11 @@
3638
import accord.local.Node.Id;
3739
import accord.messages.Accept;
3840
import accord.messages.Accept.AcceptReply;
41+
import accord.utils.Invariants;
3942

4043
import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
4144
import static accord.coordinate.tracking.RequestStatus.Failed;
45+
import static accord.utils.Invariants.debug;
4246

4347
class Propose implements Callback<AcceptReply>
4448
{
@@ -50,6 +54,7 @@ class Propose implements Callback<AcceptReply>
5054
final Deps deps;
5155

5256
private final List<AcceptReply> acceptOks;
57+
private final Map<Id, AcceptReply> debug = debug() ? new HashMap<>() : null;
5358
private final Timestamp executeAt;
5459
private final QuorumTracker acceptTracker;
5560
private final BiConsumer<Result, Throwable> callback;
@@ -80,7 +85,7 @@ public static void propose(Node node, Topologies topologies, Ballot ballot, TxnI
8085
Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
8186
{
8287
Propose propose = new Propose(node, topologies, ballot, txnId, txn, route, deps, executeAt, callback);
83-
node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps, txn.kind()), propose);
88+
node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps), propose);
8489
}
8590

8691
@Override
@@ -89,6 +94,8 @@ public void onSuccess(Id from, AcceptReply reply)
8994
if (isDone)
9095
return;
9196

97+
if (debug != null) debug.put(from, reply);
98+
9299
switch (reply.outcome())
93100
{
94101
default: throw new IllegalStateException();

0 commit comments

Comments
 (0)