Skip to content

Commit 16bb95f

Browse files
CEP-45: Implement independent mutation tracking versioning
patch by Blake Eggleston and Aleksey Yeschenko; reviewed by Aleksey Yeschenko and Blake Eggleston for CASSANDRA-21328 Co-authored-by: Aleksey Yeschenko <aleksey@yeschenko.com> Co-authored-by: Blake Eggleston <blake@ultrablake.com>
1 parent e982cf6 commit 16bb95f

75 files changed

Lines changed: 999 additions & 847 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,8 @@ public enum CassandraRelevantProperties
406406
MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
407407
/** Defines the interval for reporting any operations that have timed out. */
408408
MONITORING_REPORT_INTERVAL_MS("cassandra.monitoring_report_interval_ms", "5000"),
409+
/** The most recent version that's supported by all nodes in the cluster **/
410+
MT_CLUSTER_SAFE_VERSION("cassandra.mutation_tracking.cluster_safe_version"),
409411
MV_ALLOW_FILTERING_NONKEY_COLUMNS_UNSAFE("cassandra.mv.allow_filtering_nonkey_columns_unsafe"),
410412
MV_ENABLE_COORDINATOR_BATCHLOG("cassandra.mv_enable_coordinator_batchlog"),
411413
/** mx4jaddress */

src/java/org/apache/cassandra/db/AbstractReadCommandVerbHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ else if (localComparisonEpoch.isAfter(readCommand.serializedAtEpoch()))
108108

109109
private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<T> message)
110110
{
111-
boolean acceptsTransient = message.verb() == Verb.TRACKED_SUMMARY_REQ;
111+
boolean acceptsTransient = message.verb() == Verb.MT_SUMMARY_REQ;
112112
ReadCommand command = getCommand(message.payload);
113113

114114
if (command.metadata().isVirtual())

src/java/org/apache/cassandra/db/EmbeddableSinglePartitionReadCommand.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ public void serialize(EmbeddableSinglePartitionReadCommand command, DataOutputPl
6767
ReadCommand.serializer.serialize((ReadCommand) command, out, version);
6868
break;
6969
case TRACKED_DATA:
70-
DataRequest.serializer.serialize((DataRequest) command, out, version);
70+
DataRequest.embedded.serialize((DataRequest) command, out, version);
7171
break;
7272
case TRACKED_SUMMARY:
73-
SummaryRequest.serializer.serialize((SummaryRequest) command, out, version);
73+
SummaryRequest.embedded.serialize((SummaryRequest) command, out, version);
7474
break;
7575
default:
7676
throw new IllegalStateException("Unhandled kind: " + command.kind());
@@ -87,9 +87,9 @@ public EmbeddableSinglePartitionReadCommand deserialize(DataInputPlus in, int ve
8787
case UNTRACKED:
8888
return (SinglePartitionReadCommand)ReadCommand.serializer.deserialize(in, version);
8989
case TRACKED_DATA:
90-
return DataRequest.serializer.deserialize(in, version);
90+
return DataRequest.embedded.deserialize(in, version);
9191
case TRACKED_SUMMARY:
92-
return SummaryRequest.serializer.deserialize(in, version);
92+
return SummaryRequest.embedded.deserialize(in, version);
9393
default:
9494
throw new IllegalStateException("Unhandled kind: " + kind);
9595
}
@@ -109,9 +109,9 @@ public long serializedSize(EmbeddableSinglePartitionReadCommand command, int ver
109109
case UNTRACKED:
110110
return size + ReadCommand.serializer.serializedSize((ReadCommand) command, version);
111111
case TRACKED_DATA:
112-
return size + DataRequest.serializer.serializedSize((DataRequest) command, version);
112+
return size + DataRequest.embedded.serializedSize((DataRequest) command, version);
113113
case TRACKED_SUMMARY:
114-
return size + SummaryRequest.serializer.serializedSize((SummaryRequest) command, version);
114+
return size + SummaryRequest.embedded.serializedSize((SummaryRequest) command, version);
115115
default:
116116
throw new IllegalStateException("Unhandled kind: " + command.kind());
117117
}

src/java/org/apache/cassandra/db/IReadResponse.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ public void serialize(IReadResponse response, DataOutputPlus out, int version) t
5050
ReadResponse.serializer.serialize((ReadResponse) response, out, version);
5151
break;
5252
case TRACKED_DATA:
53-
TrackedDataResponse.serializer.serialize((TrackedDataResponse) response, out, version);
53+
TrackedDataResponse.embedded.serialize((TrackedDataResponse) response, out, version);
5454
break;
5555
case TRACKED_SUMMARY:
56-
TrackedSummaryResponse.serializer.serialize((TrackedSummaryResponse) response, out, version);
56+
TrackedSummaryResponse.embedded.serialize((TrackedSummaryResponse) response, out, version);
5757
break;
5858
default:
5959
throw new IllegalStateException("Unhandled kind: " + response.kind());
@@ -70,9 +70,9 @@ public IReadResponse deserialize(DataInputPlus in, int version) throws IOExcepti
7070
case UNTRACKED:
7171
return ReadResponse.serializer.deserialize(in, version);
7272
case TRACKED_DATA:
73-
return TrackedDataResponse.serializer.deserialize(in, version);
73+
return TrackedDataResponse.embedded.deserialize(in, version);
7474
case TRACKED_SUMMARY:
75-
return TrackedSummaryResponse.serializer.deserialize(in, version);
75+
return TrackedSummaryResponse.embedded.deserialize(in, version);
7676
default:
7777
throw new IllegalStateException("Unhandled kind: " + kind);
7878
}
@@ -93,9 +93,9 @@ public long serializedSize(IReadResponse response, int version)
9393
case UNTRACKED:
9494
return size + ReadResponse.serializer.serializedSize((ReadResponse) response, version);
9595
case TRACKED_DATA:
96-
return size + TrackedDataResponse.serializer.serializedSize((TrackedDataResponse) response, version);
96+
return size + TrackedDataResponse.embedded.serializedSize((TrackedDataResponse) response, version);
9797
case TRACKED_SUMMARY:
98-
return size + TrackedSummaryResponse.serializer.serializedSize((TrackedSummaryResponse) response, version);
98+
return size + TrackedSummaryResponse.embedded.serializedSize((TrackedSummaryResponse) response, version);
9999
default:
100100
throw new IllegalStateException("Unhandled kind: " + response.kind());
101101
}

src/java/org/apache/cassandra/db/Mutation.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,8 @@
7373

7474
public class Mutation implements IMutation, Supplier<Mutation>, Commitable
7575
{
76-
public static final MutationSerializer serializer = new MutationSerializer();
7776
public static final int ALLOW_POTENTIAL_TRANSACTION_CONFLICTS = 0x01;
78-
77+
public static final int HAS_MUTATION_ID = 0x02;
7978

8079
private final MutationId id;
8180
// todo this is redundant
@@ -608,17 +607,20 @@ static void serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializ
608607
DataOutputPlus out,
609608
int version) throws IOException
610609
{
611-
Map<TableId, PartitionUpdate> modifications = mutation.modifications;
610+
boolean hasMutationId = version >= VERSION_61 && !mutation.id.isNone();
612611

613612
if (version >= VERSION_60)
614613
{
615614
int flags = 0;
616615
flags |= potentialTxnConflictsFlag(mutation.potentialTxnConflicts);
616+
if (hasMutationId) flags |= HAS_MUTATION_ID;
617617
out.write(flags);
618618
}
619619

620-
if (version >= MessagingService.VERSION_61)
621-
MutationId.serializer.serialize(mutation.id, out, version);
620+
if (hasMutationId)
621+
MutationId.serializer.serialize(mutation.id, out);
622+
623+
Map<TableId, PartitionUpdate> modifications = mutation.modifications;
622624

623625
/* serialize the modifications in the mutation */
624626
int size = modifications.size();
@@ -640,15 +642,15 @@ public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper
640642
teeIn = new TeeDataInputPlus(in, dob, CACHEABLE_MUTATION_SIZE_LIMIT);
641643

642644
PotentialTxnConflicts potentialTxnConflicts = PotentialTxnConflicts.DISALLOW;
645+
boolean hasMutationId = false;
643646
if (version >= VERSION_60)
644647
{
645-
int flags = teeIn.readByte();
648+
int flags = teeIn.readUnsignedByte();
646649
potentialTxnConflicts = potentialTxnConflicts(flags);
650+
hasMutationId = version >= VERSION_61 && (flags & HAS_MUTATION_ID) == HAS_MUTATION_ID;
647651
}
648652

649-
MutationId id = version >= MessagingService.VERSION_61
650-
? MutationId.serializer.deserialize(teeIn, version)
651-
: MutationId.none();
653+
MutationId id = hasMutationId ? MutationId.serializer.deserialize(teeIn) : MutationId.none();
652654

653655
int size = teeIn.readUnsignedVInt32();
654656
assert size > 0;
@@ -686,32 +688,37 @@ public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper
686688
*/
687689
public Pair<DecoratedKey, TableMetadata> deserializeKeyAndTableMetadata(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
688690
{
689-
if (version >= VERSION_60)
690-
in.skipBytes(1); // potentialTxnConflicts
691-
692-
if (version >= VERSION_61)
693-
MutationId.serializer.skip(in, version);
694-
691+
skipHeader(in, version);
695692
int size = in.readUnsignedVInt32();
696693
assert size > 0;
697-
698694
return PartitionUpdate.serializer.deserializeMetadataAndKey(in, version, flag);
699695
}
700696

701-
public TableId deserializeTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
697+
/**
698+
* Return first (out of potentially multiple) table ids in this mutation.
699+
*/
700+
public TableId deserializeFirstTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
702701
{
703-
if (version >= VERSION_60)
704-
in.skipBytes(1); // flags
705-
706-
if (version >= VERSION_61)
707-
MutationId.serializer.skip(in, version);
708-
702+
skipHeader(in, version);
709703
int size = in.readUnsignedVInt32();
710704
assert size > 0;
711-
712705
return PartitionUpdate.serializer.deserializeTableId(in, version, flag);
713706
}
714707

708+
private void skipHeader(DataInputBuffer in, int version) throws IOException
709+
{
710+
boolean hasMutationId = false;
711+
if (version >= VERSION_60)
712+
{
713+
int flags = in.readUnsignedByte();
714+
if (version >= VERSION_61)
715+
hasMutationId = (flags & HAS_MUTATION_ID) == HAS_MUTATION_ID;
716+
}
717+
718+
if (hasMutationId)
719+
MutationId.serializer.skip(in);
720+
}
721+
715722
public Mutation deserialize(DataInputPlus in, int version) throws IOException
716723
{
717724
return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE);
@@ -723,6 +730,8 @@ public long serializedSize(Mutation mutation, int version)
723730
}
724731
}
725732

733+
public static final MutationSerializer serializer = new MutationSerializer();
734+
726735
/**
727736
* There are two implementations of this class. One that keeps the serialized representation on-heap for later
728737
* reuse and one that doesn't. Keeping all sized mutations around may lead to "bad" GC pressure (G1 GC) due to humongous objects.
@@ -780,9 +789,11 @@ long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutati
780789
if (size == 0L)
781790
{
782791
if (version >= VERSION_60)
783-
size += TypeSizes.sizeof((byte)ALLOW_POTENTIAL_TRANSACTION_CONFLICTS); // flags
784-
if (version >= MessagingService.VERSION_61)
785-
size += MutationId.serializer.serializedSize(mutation.id, version);
792+
size += TypeSizes.BYTE_SIZE; // flags
793+
794+
if (version >= MessagingService.VERSION_61 && !mutation.id.isNone())
795+
size += MutationId.serializer.serializedSize(mutation.id);
796+
786797
size += TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
787798
for (PartitionUpdate partitionUpdate : mutation.modifications.values())
788799
size += serializer.serializedSize(partitionUpdate, version);

src/java/org/apache/cassandra/db/SystemKeyspace.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.cassandra.db.partitions.PartitionUpdate;
7272
import org.apache.cassandra.db.rows.Row;
7373
import org.apache.cassandra.db.rows.Rows;
74+
import org.apache.cassandra.dht.AbstractBounds;
7475
import org.apache.cassandra.dht.IPartitioner;
7576
import org.apache.cassandra.dht.LocalPartitioner;
7677
import org.apache.cassandra.dht.Range;
@@ -2027,7 +2028,7 @@ public static ByteBuffer rangeToBytes(Range<Token> range)
20272028
// pass 0 as the version to trigger that legacy code.
20282029
// In the future, it might be worth switching to a stable text format for the ranges to 1) save that and 2)
20292030
// be more user friendly (the serialization format we currently use is pretty custom).
2030-
Range.tokenSerializer.serialize(range, out, 0);
2031+
AbstractBounds.tokenSerializer.serialize(range, out, 0);
20312032
return out.buffer();
20322033
}
20332034
catch (IOException e)
@@ -2042,7 +2043,7 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
20422043
try (DataInputPlus.DataInputStreamPlus in = new DataInputBuffer(ByteBufferUtil.getArray(rawRange)))
20432044
{
20442045
// See rangeToBytes above for why version is 0.
2045-
return (Range<Token>) Range.tokenSerializer.deserialize(in, partitioner, 0);
2046+
return (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, 0);
20462047
}
20472048
catch (IOException e)
20482049
{

src/java/org/apache/cassandra/exceptions/FunctionExecutionException.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.cassandra.io.util.DataInputPlus;
2828
import org.apache.cassandra.io.util.DataOutputPlus;
2929
import org.apache.cassandra.utils.CollectionSerializers;
30+
import org.apache.cassandra.utils.StringSerializer;
3031

3132
public class FunctionExecutionException extends RequestExecutionException
3233
{
@@ -63,7 +64,7 @@ protected void serializeSpecificFields(DataOutputPlus out, int version) throws I
6364
if (functionName.keyspace != null)
6465
out.writeUTF(functionName.keyspace);
6566
out.writeUTF(functionName.name);
66-
CollectionSerializers.serializeList(argTypes, out, version, CollectionSerializers.STRING_SERIALIZER);
67+
CollectionSerializers.serializeList(argTypes, out, version, StringSerializer.instance);
6768
out.writeUTF(detail);
6869
}
6970

@@ -74,7 +75,7 @@ protected long serializedSizeSpecificFields(int version)
7475
if (functionName.keyspace != null)
7576
size += TypeSizes.sizeof(functionName.keyspace);
7677
size += TypeSizes.sizeof(functionName.name);
77-
size += CollectionSerializers.serializedListSize(argTypes, version, CollectionSerializers.STRING_SERIALIZER);
78+
size += CollectionSerializers.serializedListSize(argTypes, version, StringSerializer.instance);
7879
size += TypeSizes.sizeof(detail);
7980
return size;
8081
}
@@ -83,7 +84,7 @@ static FunctionExecutionException deserializeFields(String message, DataInputPlu
8384
{
8485
String keyspace = in.readBoolean() ? in.readUTF() : null;
8586
String name = in.readUTF();
86-
List<String> argTypes = CollectionSerializers.deserializeList(in, version, CollectionSerializers.STRING_SERIALIZER);
87+
List<String> argTypes = CollectionSerializers.deserializeList(in, version, StringSerializer.instance);
8788
String detail = in.readUTF();
8889
return new FunctionExecutionException(new FunctionName(keyspace, name), argTypes, detail);
8990
}

src/java/org/apache/cassandra/io/AsymmetricVersionedSerializer.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
1918
package org.apache.cassandra.io;
2019

2120
import java.io.IOException;
22-
import java.io.UncheckedIOException;
2321
import java.nio.ByteBuffer;
2422

2523
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -42,17 +40,6 @@ default ByteBuffer serialize(In t, Version version) throws IOException
4240
}
4341
}
4442

45-
default ByteBuffer serializeUnchecked(In t, Version version)
46-
{
47-
try
48-
{
49-
return serialize(t, version);
50-
}
51-
catch (IOException e)
52-
{
53-
throw new UncheckedIOException(e);
54-
}
55-
}
5643
Out deserialize(DataInputPlus in, Version version) throws IOException;
5744

5845
default void skip(DataInputPlus in, Version version) throws IOException
@@ -68,17 +55,6 @@ default Out deserialize(ByteBuffer buffer, Version version) throws IOException
6855
}
6956
}
7057

71-
default Out deserializeUnchecked(ByteBuffer buffer, Version version)
72-
{
73-
try
74-
{
75-
return deserialize(buffer, version);
76-
}
77-
catch (IOException e)
78-
{
79-
throw new UncheckedIOException(e);
80-
}
81-
}
8258
long serializedSize(In t, Version version);
8359

8460
static <In, Out, Version> AsymmetricVersionedSerializer<In, Out, Version> from(AsymmetricUnversionedSerializer<In, Out> delegate)

src/java/org/apache/cassandra/io/EmbeddedAsymmetricVersionedSerializer.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
1918
package org.apache.cassandra.io;
2019

2120
import java.io.IOException;
@@ -64,11 +63,6 @@ public Out deserialize(DataInputPlus in) throws IOException
6463
return delegate.deserialize(in, version);
6564
}
6665

67-
public Version deserializeVersion(DataInputPlus in) throws IOException
68-
{
69-
return versionSerializer.deserialize(in);
70-
}
71-
7266
@Override
7367
public long serializedSize(In t, int msgVersion)
7468
{
@@ -81,4 +75,32 @@ public long serializedSize(In t)
8175
return versionSerializer.serializedSize(version)
8276
+ delegate.serializedSize(t, version);
8377
}
78+
79+
public static <A, B> IVersionedAsymmetricSerializer<A, B> accordEmbedded(AsymmetricVersionedSerializer<A, B, org.apache.cassandra.service.accord.serializers.Version> delegate)
80+
{
81+
return new EmbeddedAsymmetricVersionedSerializer<>(
82+
org.apache.cassandra.service.accord.serializers.Version.CLUSTER_SAFE_VERSION,
83+
org.apache.cassandra.service.accord.serializers.Version.Serializer.instance,
84+
delegate
85+
);
86+
}
87+
88+
public static <A, B> IVersionedAsymmetricSerializer<A, B> accordEmbedded(AsymmetricUnversionedSerializer<A, B> delegate)
89+
{
90+
return accordEmbedded(AsymmetricVersionedSerializer.from(delegate));
91+
}
92+
93+
public static <A, B> IVersionedAsymmetricSerializer<A, B> mtEmbedded(AsymmetricVersionedSerializer<A, B, org.apache.cassandra.replication.Version> delegate)
94+
{
95+
return new EmbeddedAsymmetricVersionedSerializer<>(
96+
org.apache.cassandra.replication.Version.CLUSTER_SAFE_VERSION,
97+
org.apache.cassandra.replication.Version.serializer,
98+
delegate
99+
);
100+
}
101+
102+
public static <A, B> IVersionedAsymmetricSerializer<A, B> mtEmbedded(AsymmetricUnversionedSerializer<A, B> delegate)
103+
{
104+
return mtEmbedded(AsymmetricVersionedSerializer.from(delegate));
105+
}
84106
}

0 commit comments

Comments
 (0)