Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ public TsBlock remove() {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
tsBlock.getSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// corresponding LocalSinkChannel.
if (sinkChannel != null) {
Expand Down Expand Up @@ -226,10 +226,10 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
tsBlock.getSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();

// reserve memory failed, we should wait until there is enough memory
if (!Boolean.TRUE.equals(pair.right)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public synchronized void send(TsBlock tsBlock) {
if (noMoreTsBlocks) {
return;
}
long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
long sizeInBytes = tsBlock.getSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
Expand All @@ -211,17 +211,16 @@ public synchronized void send(TsBlock tsBlock) {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
retainedSizeInBytes,
sizeInBytes,
maxBytesCanReserve)
.left;
bufferRetainedSizeInBytes += retainedSizeInBytes;
bufferRetainedSizeInBytes += sizeInBytes;

sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
nextSequenceId += 1;
currentTsBlockSize = retainedSizeInBytes;
currentTsBlockSize = sizeInBytes;

// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes));
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ public TsBlock receive() {
if (tsBlock != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[GetTsBlockFromQueue] TsBlock:{} size:{}",
currSequenceId,
tsBlock.getRetainedSizeInBytes());
"[GetTsBlockFromQueue] TsBlock:{} size:{}", currSequenceId, tsBlock.getSizeInBytes());
}
currSequenceId++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void initializeMaxTsBlockLength(TsBlock tsBlock) {
long oneTupleSize =
Math.max(
1,
(tsBlock.getRetainedSizeInBytes() - tsBlock.getTotalInstanceSize())
(tsBlock.getSizeInBytes() - tsBlock.getTotalInstanceSize())
/ tsBlock.getPositionCount());
if (oneTupleSize > maxReturnSize) {
// make sure at least one-tuple-at-a-time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public TsBlock next() throws Exception {
if (tsBlock == null) {
return null;
}
dataSize += tsBlock.getRetainedSizeInBytes();
dataSize += tsBlock.getSizeInBytes();
cacheTsBlock(tsBlock);
} catch (IoTDBException e) {
clear();
Expand Down Expand Up @@ -184,7 +184,7 @@ private void prepareSortReaders() throws IoTDBException {
}

private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
long bytesSize = tsBlock.getRetainedSizeInBytes();
long bytesSize = tsBlock.getSizeInBytes();
if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
cachedBytes += bytesSize;
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static List<TsBlock> createMockTsBlocks(int numOfTsBlocks, long mockTsBlo
for (int i = 0; i < numOfTsBlocks; i++) {
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
mockTsBlocks.add(mockTsBlock);
}

Expand All @@ -50,6 +51,7 @@ public static List<TsBlock> createMockTsBlocks(int numOfTsBlocks, long mockTsBlo
public static TsBlock createMockTsBlock(long mockTsBlockSize) {
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
return mockTsBlock;
}

Expand Down Expand Up @@ -144,6 +146,7 @@ public static TsBlockSerde createMockTsBlockSerde(long mockTsBlockSize) {
TsBlockSerde mockTsBlockSerde = Mockito.mock(TsBlockSerde.class);
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
Mockito.when(mockTsBlockSerde.deserialize(Mockito.any(ByteBuffer.class)))
.thenReturn(mockTsBlock);
return mockTsBlockSerde;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public void pipelineExchangeOperatorTest() {
public void lastCacheScanOperatorTest() {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
Mockito.when(tsBlock.getSizeInBytes()).thenReturn(1024L);
LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock);

assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
Expand Down Expand Up @@ -379,6 +380,7 @@ public void lastQueryOperatorTest() {
public void lastQuerySortOperatorTest() {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
List<Operator> children = new ArrayList<>(4);

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
<tsfile.version>1.1.2-250603-SNAPSHOT</tsfile.version>
<tsfile.version>1.1.2-250616-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
Expand Down
Loading