Skip to content

Commit bf44372

Browse files
committed
Merge branch 'cassandra-5.0' into cassandra-6.0
* cassandra-5.0: Make synchronization on VectorMemoryIndex inserts more granular
2 parents 5282f81 + 448d98c commit bf44372

7 files changed

Lines changed: 1660 additions & 52 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* Fix a removed TTLed row re-appearance in a materialized view after a cursor compaction (CASSANDRA-21152)
6060
* Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209)
6161
Merged from 5.0:
62+
* Make synchronization on VectorMemoryIndex inserts more granular (CASSANDRA-21160)
6263
* putShortVolatile is not volatile in InMemoryTrie (CASSANDRA-21353)
6364
* Fix RequestFailureReason serializer and nits in a few others (CASSANDRA-21437)
6465
* Remove golang dependency in gen-doc and replace with python implementation (CASSANDRA-21432)

src/java/org/apache/cassandra/index/sai/disk/v1/vector/PrimaryKeyWithScore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.cassandra.index.sai.disk.v1.vector;
2020

21+
import com.google.common.annotations.VisibleForTesting;
22+
2123
import org.apache.cassandra.db.CellSourceIdentifier;
2224
import org.apache.cassandra.db.rows.Cell;
2325
import org.apache.cassandra.db.rows.Row;
@@ -52,6 +54,12 @@ public PrimaryKey primaryKey()
5254
return primaryKey;
5355
}
5456

57+
@VisibleForTesting
58+
public float score()
59+
{
60+
return indexScore;
61+
}
62+
5563
public boolean isIndexDataValid(Row row, long nowInSecs)
5664
{
5765
// If the indexed column is part of the primary key, we don't need this type of validation because we would have

src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ public class VectorMemoryIndex extends MemoryIndex
7575
private final Memtable memtable;
7676
private final LongAdder writeCount = new LongAdder();
7777

78-
private PrimaryKey minimumKey;
79-
private PrimaryKey maximumKey;
78+
private volatile KeyBounds keyBounds;
8079

8180
private final NavigableSet<PrimaryKey> primaryKeys = new ConcurrentSkipListSet<>();
8281

@@ -88,7 +87,7 @@ public VectorMemoryIndex(StorageAttachedIndex index, Memtable memtable)
8887
}
8988

9089
@Override
91-
public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value)
90+
public long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value)
9291
{
9392
if (value == null || value.remaining() == 0 || !index.validateTermSize(key, value, false, null))
9493
return 0;
@@ -100,11 +99,11 @@ public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuf
10099

101100
private long index(PrimaryKey primaryKey, ByteBuffer value)
102101
{
103-
updateKeyBounds(primaryKey);
104-
102+
long bytesUsed = graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL);
105103
writeCount.increment();
106104
primaryKeys.add(primaryKey);
107-
return graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL);
105+
updateKeyBounds(primaryKey);
106+
return bytesUsed;
108107
}
109108

110109
@Override
@@ -131,9 +130,6 @@ public long update(DecoratedKey key, Clustering<?> clustering, ByteBuffer oldVal
131130
{
132131
PrimaryKey primaryKey = index.hasClustering() ? index.keyFactory().create(key, clustering)
133132
: index.keyFactory().create(key);
134-
// update bounds because only rows with vectors are included in the key bounds,
135-
// so if the vector was null before, we won't have included it
136-
updateKeyBounds(primaryKey);
137133

138134
// make the changes in this order, so we don't have a window where the row is not in the index at all
139135
if (newRemaining > 0)
@@ -144,20 +140,18 @@ public long update(DecoratedKey key, Clustering<?> clustering, ByteBuffer oldVal
144140
// remove primary key if it's no longer indexed
145141
if (newRemaining <= 0 && oldRemaining > 0)
146142
primaryKeys.remove(primaryKey);
143+
144+
// update bounds because only rows with vectors are included in the key bounds,
145+
// so if the vector was null before, we won't have included it
146+
updateKeyBounds(primaryKey);
147147
}
148148
return bytesUsed;
149149
}
150150

151-
private void updateKeyBounds(PrimaryKey primaryKey)
151+
private synchronized void updateKeyBounds(PrimaryKey key)
152152
{
153-
if (minimumKey == null)
154-
minimumKey = primaryKey;
155-
else if (primaryKey.compareTo(minimumKey) < 0)
156-
minimumKey = primaryKey;
157-
if (maximumKey == null)
158-
maximumKey = primaryKey;
159-
else if (primaryKey.compareTo(maximumKey) > 0)
160-
maximumKey = primaryKey;
153+
KeyBounds current = keyBounds;
154+
keyBounds = current == null ? new KeyBounds(key, key) : current.withUpdated(key);
161155
}
162156

163157
@Override
@@ -213,15 +207,15 @@ public CloseableIterator<PrimaryKeyWithScore> orderBy(QueryContext queryContext,
213207
@Override
214208
public CloseableIterator<PrimaryKeyWithScore> orderResultsBy(QueryContext queryContext, List<PrimaryKey> results, Expression orderer)
215209
{
216-
if (minimumKey == null)
217-
// This case implies maximumKey is empty too.
210+
KeyBounds bounds = keyBounds;
211+
if (bounds == null)
218212
return CloseableIterator.empty();
219213

220214
int limit = queryContext.limit();
221215

222216
List<PrimaryKey> resultsInRange = results.stream()
223-
.dropWhile(k -> k.compareTo(minimumKey) < 0)
224-
.takeWhile(k -> k.compareTo(maximumKey) <= 0)
217+
.dropWhile(k -> k.compareTo(bounds.minimum) < 0)
218+
.takeWhile(k -> k.compareTo(bounds.maximum) <= 0)
225219
.collect(Collectors.toList());
226220

227221
int maxBruteForceRows = maxBruteForceRows(limit, resultsInRange.size(), graph.size());
@@ -420,4 +414,25 @@ public void close()
420414
FileUtils.closeQuietly(nodeScores);
421415
}
422416
}
417+
418+
private static final class KeyBounds
419+
{
420+
final PrimaryKey minimum;
421+
final PrimaryKey maximum;
422+
423+
KeyBounds(PrimaryKey minimum, PrimaryKey maximum)
424+
{
425+
this.minimum = minimum;
426+
this.maximum = maximum;
427+
}
428+
429+
KeyBounds withUpdated(PrimaryKey key)
430+
{
431+
PrimaryKey newMin = minimum.compareTo(key) > 0 ? key : minimum;
432+
PrimaryKey newMax = maximum.compareTo(key) < 0 ? key : maximum;
433+
434+
// Avoid allocation if nothing changed
435+
return newMin == minimum && newMax == maximum ? this : new KeyBounds(newMin, newMax);
436+
}
437+
}
423438
}

test/simulator/main/org/apache/cassandra/simulator/asm/NemesisFieldSelectors.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
package org.apache.cassandra.simulator.asm;
2020

21-
import java.util.Collections;
2221
import java.util.HashMap;
2322
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2525
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2626
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -39,10 +39,13 @@
3939
/**
4040
* Define classes that receive special handling.
4141
* At present all instance methods invoked on such classes have nemesis points inserted either side of them.
42+
*
43+
* Tests that need nemesis behavior on fields without annotating the source class can use
44+
* {@link #register(String, String, NemesisFieldKind)} to dynamically add entries.
4245
*/
4346
public class NemesisFieldSelectors
4447
{
45-
public static final Map<String, Map<String, NemesisFieldKind>> classToFieldToNemesis;
48+
public static final ConcurrentHashMap<String, Map<String, NemesisFieldKind>> classToFieldToNemesis;
4649

4750
static
4851
{
@@ -53,12 +56,49 @@ public class NemesisFieldSelectors
5356
Stream.of(AtomicIntegerFieldUpdater.class, AtomicLongFieldUpdater.class, AtomicReferenceFieldUpdater.class)
5457
.forEach(c -> byClass.put(c, NemesisFieldKind.ATOMICUPDATERX));
5558

56-
Map<String, Map<String, NemesisFieldKind>> byField = new HashMap<>();
59+
ConcurrentHashMap<String, Map<String, NemesisFieldKind>> byField = new ConcurrentHashMap<>();
5760
new Reflections(ConfigurationBuilder.build("org.apache.cassandra").addScanners(new FieldAnnotationsScanner()))
5861
.getFieldsAnnotatedWith(Nemesis.class)
59-
.forEach(field -> byField.computeIfAbsent(dotsToSlashes(field.getDeclaringClass()), ignore -> new HashMap<>())
62+
.forEach(field -> byField.computeIfAbsent(dotsToSlashes(field.getDeclaringClass()), ignore -> new ConcurrentHashMap<>())
6063
.put(field.getName(), byClass.getOrDefault(field.getType(), SIMPLE)));
61-
classToFieldToNemesis = Collections.unmodifiableMap(byField);
64+
classToFieldToNemesis = byField;
65+
}
66+
67+
/**
68+
* Register a field for nemesis handling without requiring a {@link Nemesis} annotation on the source class.
69+
* This allows tests to opt-in fields from classes they do not own.
70+
*
71+
* @param className the internal class name (slashes, e.g. "org/apache/cassandra/index/sai/disk/v1/vector/OnHeapGraph")
72+
* @param fieldName the field name as declared in the class
73+
* @param kind the nemesis field kind (typically {@link NemesisFieldKind#SIMPLE} for plain volatile fields,
74+
* {@link NemesisFieldKind#ATOMICX} for AtomicInteger/AtomicLong/AtomicReference/AtomicBoolean fields)
75+
*/
76+
public static void register(String className, String fieldName, NemesisFieldKind kind)
77+
{
78+
classToFieldToNemesis.computeIfAbsent(className, ignore -> new ConcurrentHashMap<>())
79+
.put(fieldName, kind);
80+
}
81+
82+
/**
83+
* Register a field for nemesis handling using the class object directly.
84+
*
85+
* @param clazz the class owning the field
86+
* @param fieldName the field name as declared in the class
87+
* @param kind the nemesis field kind
88+
*/
89+
public static void register(Class<?> clazz, String fieldName, NemesisFieldKind kind)
90+
{
91+
register(dotsToSlashes(clazz), fieldName, kind);
92+
}
93+
94+
/**
95+
* Remove a previously registered nemesis field. Useful for test cleanup.
96+
*/
97+
public static void unregister(Class<?> clazz, String fieldName)
98+
{
99+
Map<String, NemesisFieldKind> fields = classToFieldToNemesis.get(dotsToSlashes(clazz));
100+
if (fields != null)
101+
fields.remove(fieldName);
62102
}
63103

64104
public static NemesisFieldKind.Selector get()

0 commit comments

Comments
 (0)