Skip to content

Commit c6d5e29

Browse files
authored
more accurate mermory size (#15713)
1 parent cb91cbf commit c6d5e29

4 files changed

Lines changed: 18 additions & 15 deletions

File tree

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
2323
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
2424

25-
import java.nio.Buffer;
2625
import java.util.ArrayList;
2726
import java.util.List;
2827

@@ -37,7 +36,7 @@ public class Batch {
3736

3837
private long logEntriesNumFromWAL = 0L;
3938

40-
private long serializedSize;
39+
private long memorySize;
4140
// indicates whether this batch has been successfully synchronized to another node
4241
private boolean synced;
4342

@@ -60,14 +59,12 @@ public void addTLogEntry(TLogEntry entry) {
6059
if (entry.fromWAL) {
6160
logEntriesNumFromWAL++;
6261
}
63-
// TODO Maybe we need to add in additional fields for more accurate calculations
64-
serializedSize +=
65-
entry.getData() == null ? 0 : entry.getData().stream().mapToInt(Buffer::capacity).sum();
62+
memorySize += entry.getMemorySize();
6663
}
6764

6865
public boolean canAccumulate() {
6966
return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch()
70-
&& serializedSize < config.getReplication().getMaxSizePerBatch();
67+
&& memorySize < config.getReplication().getMaxSizePerBatch();
7168
}
7269

7370
public long getStartIndex() {
@@ -94,8 +91,8 @@ public boolean isEmpty() {
9491
return logEntries.isEmpty();
9592
}
9693

97-
public long getSerializedSize() {
98-
return serializedSize;
94+
public long getMemorySize() {
95+
return memorySize;
9996
}
10097

10198
public long getLogEntriesNumFromWAL() {
@@ -111,8 +108,8 @@ public String toString() {
111108
+ endIndex
112109
+ ", size="
113110
+ logEntries.size()
114-
+ ", serializedSize="
115-
+ serializedSize
111+
+ ", memorySize="
112+
+ memorySize
116113
+ '}';
117114
}
118115
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,8 @@ private boolean constructBatchFromWAL(long currentIndex, long maxIndex, Batch lo
567567
data.buildSerializedRequests();
568568
// construct request from wal
569569
logBatches.addTLogEntry(
570-
new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
570+
new TLogEntry(
571+
data.getSerializedRequests(), data.getSearchIndex(), true, data.getMemorySize()));
571572
}
572573
// In the case of corrupt Data, we return true so that we can send a batch as soon as
573574
// possible, avoiding potential duplication
@@ -577,7 +578,11 @@ private boolean constructBatchFromWAL(long currentIndex, long maxIndex, Batch lo
577578
private void constructBatchIndexedFromConsensusRequest(
578579
IndexedConsensusRequest request, Batch logBatches) {
579580
logBatches.addTLogEntry(
580-
new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false));
581+
new TLogEntry(
582+
request.getSerializedRequests(),
583+
request.getSearchIndex(),
584+
false,
585+
request.getMemorySize()));
581586
}
582587
}
583588
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
4545
*/
4646
public synchronized void addNextBatch(Batch batch) throws InterruptedException {
4747
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48-
|| !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) {
48+
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
4949
wait();
5050
}
5151
pendingBatches.add(batch);
@@ -64,7 +64,7 @@ public synchronized void removeBatch(Batch batch) {
6464
while (current.isSynced()) {
6565
controller.update(current.getEndIndex(), false);
6666
iterator.remove();
67-
iotConsensusMemoryManager.free(current.getSerializedSize(), false);
67+
iotConsensusMemoryManager.free(current.getMemorySize(), false);
6868
if (iterator.hasNext()) {
6969
current = iterator.next();
7070
} else {
@@ -79,7 +79,7 @@ public synchronized void removeBatch(Batch batch) {
7979
public synchronized void free() {
8080
long size = 0;
8181
for (Batch pendingBatch : pendingBatches) {
82-
size += pendingBatch.getSerializedSize();
82+
size += pendingBatch.getMemorySize();
8383
}
8484
pendingBatches.clear();
8585
controller.update(0L, true);

iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TLogEntry {
2626
1: required list<binary> data
2727
2: required i64 searchIndex
2828
3: required bool fromWAL
29+
4: required i64 memorySize
2930
}
3031

3132
struct TSyncLogEntriesReq {

0 commit comments

Comments
 (0)