Skip to content

Commit 0998495

Browse files
committed
Merge branch 'cassandra-6.0' into trunk
* cassandra-6.0: Reduce memory allocations in row merge logic
2 parents a656b7b + 71e8b7c commit 0998495

3 files changed

Lines changed: 54 additions & 17 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Merged from 5.0:
1212

1313

1414
6.0-alpha2
15+
* Reduce memory allocations in row merge logic (CASSANDRA-21359)
1516
* Restore option to avoid hint transfer during decommission (CASSANDRA-21341)
1617
* Add an offline cluster metadata tool (CASSANDRA-19151)
1718
* Accord: Tail Latency Improvements (CASSANDRA-21361)

src/java/org/apache/cassandra/db/rows/Row.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
import org.apache.cassandra.schema.TableMetadata;
4444
import org.apache.cassandra.service.paxos.Commit;
4545
import org.apache.cassandra.utils.BiLongAccumulator;
46+
import org.apache.cassandra.utils.BulkIterator;
4647
import org.apache.cassandra.utils.LongAccumulator;
4748
import org.apache.cassandra.utils.MergeIterator;
4849
import org.apache.cassandra.utils.ObjectSizes;
4950
import org.apache.cassandra.utils.SearchIterator;
5051
import org.apache.cassandra.utils.btree.BTree;
52+
import org.apache.cassandra.utils.btree.UpdateFunction;
5153
import org.apache.cassandra.utils.memory.Cloner;
5254

5355
/**
@@ -716,7 +718,10 @@ public static class Merger
716718
private int rowsToMerge;
717719
private int lastRowSet = -1;
718720

719-
private final List<ColumnData> dataBuffer = new ArrayList<>();
721+
private static final ColumnData[] EMPTY_DATA_BUFFER = new ColumnData[0];
722+
723+
private ColumnData[] dataBuffer = EMPTY_DATA_BUFFER;
724+
private int dataBufferSize;
720725
private final ColumnDataReducer columnDataReducer;
721726

722727
public Merger(int size, boolean hasComplex)
@@ -728,7 +733,8 @@ public Merger(int size, boolean hasComplex)
728733

729734
public void clear()
730735
{
731-
dataBuffer.clear();
736+
Arrays.fill(dataBuffer, 0, dataBufferSize, null);
737+
dataBufferSize = 0;
732738
Arrays.fill(rows, null);
733739
columnDataIterators.clear();
734740
rowsToMerge = 0;
@@ -778,22 +784,51 @@ public Row merge(DeletionTime activeDeletion)
778784
if (activeDeletion.deletes(rowInfo))
779785
rowInfo = LivenessInfo.EMPTY;
780786

787+
int columnsCountEstimation = 0;
781788
for (Row row : rows)
782-
columnDataIterators.add(row == null ? Collections.emptyIterator() : row.iterator());
789+
{
790+
if (row != null)
791+
{
792+
columnDataIterators.add(row.iterator());
793+
columnsCountEstimation = Math.max(columnsCountEstimation, row.columnCount());
794+
}
795+
else
796+
{
797+
columnDataIterators.add(Collections.emptyIterator());
798+
}
799+
}
800+
// try to estimate and set a potential target capacity
801+
if (dataBuffer.length < columnsCountEstimation)
802+
dataBuffer = new ColumnData[columnsCountEstimation];
783803

784804
columnDataReducer.setActiveDeletion(activeDeletion);
785805
Iterator<ColumnData> merged = MergeIterator.get(columnDataIterators, ColumnData.comparator, columnDataReducer);
786806
while (merged.hasNext())
787807
{
788808
ColumnData data = merged.next();
789809
if (data != null)
790-
dataBuffer.add(data);
810+
{
811+
ensureDataBufferCapacity();
812+
dataBuffer[dataBufferSize++] = data;
813+
}
791814
}
792815

793816
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
794-
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
795-
? null
796-
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer));
817+
if (rowInfo.isEmpty() && rowDeletion.isLive() && dataBufferSize == 0)
818+
return null;
819+
820+
try (BulkIterator<ColumnData> it = BulkIterator.of(dataBuffer))
821+
{
822+
return BTreeRow.create(clustering, rowInfo, rowDeletion,
823+
BTree.build(it, dataBufferSize, UpdateFunction.noOp()));
824+
}
825+
}
826+
827+
private void ensureDataBufferCapacity()
828+
{
829+
if (dataBufferSize == dataBuffer.length)
830+
// increase capacity by 50%, use 4 as a default capacity
831+
dataBuffer = Arrays.copyOf(dataBuffer, Math.max(dataBuffer.length + (dataBuffer.length >> 1), 4));
797832
}
798833

799834
public Clustering<?> mergedClustering()

src/java/org/apache/cassandra/utils/MergeIterator.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
149149
{
150150
protected final Candidate<In>[] heap;
151151

152+
private final Comparator<? super In> comp;
153+
152154
/** Number of non-exhausted iterators. */
153155
int size;
154156

@@ -174,9 +176,10 @@ public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp
174176
this.heap = heap;
175177
size = 0;
176178

179+
this.comp = comp;
177180
for (int i = 0; i < iters.size(); i++)
178181
{
179-
Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
182+
Candidate<In> candidate = new Candidate<>(i, iters.get(i));
180183
heap[size++] = candidate;
181184
}
182185
needingAdvance = size;
@@ -292,7 +295,7 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
292295
{
293296
if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child
294297
{
295-
int cmp = candidate.compareTo(heap[nextIdx]);
298+
int cmp = candidate.compareTo(heap[nextIdx], comp);
296299
if (cmp <= 0)
297300
{
298301
heap[nextIdx].equalParent = cmp == 0;
@@ -316,12 +319,12 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
316319
if (!heap[nextIdx + 1].equalParent)
317320
{
318321
// pick the smallest of the two children
319-
int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]);
322+
int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx], comp);
320323
if (siblingCmp < 0)
321324
++nextIdx;
322325

323326
// if we're smaller than this, we are done, and must only restore the heap and equalParent properties
324-
int cmp = candidate.compareTo(heap[nextIdx]);
327+
int cmp = candidate.compareTo(heap[nextIdx], comp);
325328
if (cmp <= 0)
326329
{
327330
if (cmp == 0)
@@ -362,7 +365,7 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
362365
// ... but sometimes we will have one last child to compare against, that has no siblings
363366
if (!heap[nextIdx].equalParent)
364367
{
365-
int cmp = candidate.compareTo(heap[nextIdx]);
368+
int cmp = candidate.compareTo(heap[nextIdx], comp);
366369
if (cmp <= 0)
367370
{
368371
heap[nextIdx].equalParent = cmp == 0;
@@ -377,19 +380,17 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
377380
}
378381

379382
// Holds and is comparable by the head item of an iterator it owns
380-
protected static final class Candidate<In> implements Comparable<Candidate<In>>
383+
protected static final class Candidate<In>
381384
{
382385
private final Iterator<? extends In> iter;
383-
private final Comparator<? super In> comp;
384386
private final int idx;
385387
private In item;
386388
private In lowerBound;
387389
boolean equalParent;
388390

389-
public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
391+
public Candidate(int idx, Iterator<? extends In> iter)
390392
{
391393
this.iter = iter;
392-
this.comp = comp;
393394
this.idx = idx;
394395
this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null;
395396
}
@@ -410,7 +411,7 @@ protected Candidate<In> advance()
410411
return this;
411412
}
412413

413-
public int compareTo(Candidate<In> that)
414+
int compareTo(Candidate<In> that, Comparator<? super In> comp)
414415
{
415416
assert this.item != null && that.item != null;
416417
int ret = comp.compare(this.item, that.item);

0 commit comments

Comments
 (0)