Skip to content

Commit a0f31fe

Browse files
authored
Fix ref count of IoTConsensus request not decreased in allocation failure (#16169)
* fix IoTConsensus memory management * Fix ref count of IoTConsensus request not decreased in allocation failure * fix log level
1 parent 8fb982d commit a0f31fe

4 files changed

Lines changed: 216 additions & 29 deletions

File tree

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
2323
import org.apache.iotdb.commons.memory.IMemoryBlock;
2424
import org.apache.iotdb.commons.service.metric.MetricService;
25+
import org.apache.iotdb.commons.utils.TestOnly;
2526
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
2627

2728
import org.slf4j.Logger;
@@ -41,16 +42,58 @@ private IoTConsensusMemoryManager() {
4142
MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this));
4243
}
4344

44-
public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
45+
public boolean reserve(IndexedConsensusRequest request) {
4546
long prevRef = request.incRef();
4647
if (prevRef == 0) {
47-
return reserve(request.getMemorySize(), fromQueue);
48-
} else {
49-
return true;
48+
boolean reserved = reserve(request.getMemorySize(), true);
49+
if (reserved) {
50+
if (logger.isDebugEnabled()) {
51+
logger.debug(
52+
"Reserving {} bytes for request {} succeeds, current total usage {}",
53+
request.getMemorySize(),
54+
request.getSearchIndex(),
55+
memoryBlock.getUsedMemoryInBytes());
56+
}
57+
} else {
58+
request.decRef();
59+
if (logger.isDebugEnabled()) {
60+
logger.debug(
61+
"Reserving {} bytes for request {} fails, current total usage {}",
62+
request.getMemorySize(),
63+
request.getSearchIndex(),
64+
memoryBlock.getUsedMemoryInBytes());
65+
}
66+
}
67+
return reserved;
68+
} else if (logger.isDebugEnabled()) {
69+
logger.debug(
70+
"Skip memory reservation for {} because its ref count is not 0",
71+
request.getSearchIndex());
72+
}
73+
return true;
74+
}
75+
76+
public boolean reserve(Batch batch) {
77+
boolean reserved = reserve(batch.getMemorySize(), false);
78+
if (reserved && logger.isDebugEnabled()) {
79+
logger.debug(
80+
"Reserving {} bytes for batch {}-{} succeeds, current total usage {}",
81+
batch.getMemorySize(),
82+
batch.getStartIndex(),
83+
batch.getEndIndex(),
84+
memoryBlock.getUsedMemoryInBytes());
85+
} else if (logger.isDebugEnabled()) {
86+
logger.debug(
87+
"Reserving {} bytes for batch {}-{} fails, current total usage {}",
88+
batch.getMemorySize(),
89+
batch.getStartIndex(),
90+
batch.getEndIndex(),
91+
memoryBlock.getUsedMemoryInBytes());
5092
}
93+
return reserved;
5194
}
5295

53-
public boolean reserve(long size, boolean fromQueue) {
96+
private boolean reserve(long size, boolean fromQueue) {
5497
boolean result =
5598
fromQueue
5699
? memoryBlock.allocateIfSufficient(size, maxMemoryRatioForQueue)
@@ -65,14 +108,33 @@ public boolean reserve(long size, boolean fromQueue) {
65108
return result;
66109
}
67110

68-
public void free(IndexedConsensusRequest request, boolean fromQueue) {
111+
public void free(IndexedConsensusRequest request) {
69112
long prevRef = request.decRef();
70113
if (prevRef == 1) {
71-
free(request.getMemorySize(), fromQueue);
114+
free(request.getMemorySize(), true);
115+
if (logger.isDebugEnabled()) {
116+
logger.debug(
117+
"Freed {} bytes for request {}, current total usage {}",
118+
request.getMemorySize(),
119+
request.getSearchIndex(),
120+
memoryBlock.getUsedMemoryInBytes());
121+
}
122+
}
123+
}
124+
125+
public void free(Batch batch) {
126+
free(batch.getMemorySize(), false);
127+
if (logger.isDebugEnabled()) {
128+
logger.debug(
129+
"Freed {} bytes for batch {}-{}, current total usage {}",
130+
batch.getMemorySize(),
131+
batch.getStartIndex(),
132+
batch.getEndIndex(),
133+
getMemorySizeInByte());
72134
}
73135
}
74136

75-
public void free(long size, boolean fromQueue) {
137+
private void free(long size, boolean fromQueue) {
76138
long currentUsedMemory = memoryBlock.release(size);
77139
if (fromQueue) {
78140
queueMemorySizeInByte.addAndGet(-size);
@@ -91,6 +153,28 @@ public void init(IMemoryBlock memoryBlock, double maxMemoryRatioForQueue) {
91153
this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
92154
}
93155

156+
@TestOnly
157+
public void reset() {
158+
this.memoryBlock.release(this.memoryBlock.getUsedMemoryInBytes());
159+
this.queueMemorySizeInByte.set(0);
160+
this.syncMemorySizeInByte.set(0);
161+
}
162+
163+
@TestOnly
164+
public IMemoryBlock getMemoryBlock() {
165+
return memoryBlock;
166+
}
167+
168+
@TestOnly
169+
public void setMemoryBlock(IMemoryBlock memoryBlock) {
170+
this.memoryBlock = memoryBlock;
171+
}
172+
173+
@TestOnly
174+
public Double getMaxMemoryRatioForQueue() {
175+
return maxMemoryRatioForQueue;
176+
}
177+
94178
long getMemorySizeInByte() {
95179
return memoryBlock.getUsedMemoryInBytes();
96180
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -283,27 +283,27 @@ public int getBufferRequestSize() {
283283

284284
/** try to offer a request into queue with memory control. */
285285
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
286-
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
286+
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
287287
return false;
288288
}
289289
boolean success;
290290
try {
291291
success = pendingEntries.offer(indexedConsensusRequest);
292292
} catch (Throwable t) {
293293
// If exception occurs during request offer, the reserved memory should be released
294-
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
294+
iotConsensusMemoryManager.free(indexedConsensusRequest);
295295
throw t;
296296
}
297297
if (!success) {
298298
// If offer failed, the reserved memory should be released
299-
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
299+
iotConsensusMemoryManager.free(indexedConsensusRequest);
300300
}
301301
return success;
302302
}
303303

304304
/** try to remove a request from queue with memory control. */
305305
private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
306-
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
306+
iotConsensusMemoryManager.free(indexedConsensusRequest);
307307
}
308308

309309
public void stop() {
@@ -323,23 +323,13 @@ private void processStopped() {
323323
} catch (InterruptedException e) {
324324
Thread.currentThread().interrupt();
325325
}
326-
long requestSize = 0;
327326
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
328-
long prevRef = indexedConsensusRequest.decRef();
329-
if (prevRef == 1) {
330-
requestSize += indexedConsensusRequest.getMemorySize();
331-
}
327+
iotConsensusMemoryManager.free(indexedConsensusRequest);
332328
}
333329
pendingEntries.clear();
334-
iotConsensusMemoryManager.free(requestSize, true);
335-
requestSize = 0;
336330
for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
337-
long prevRef = indexedConsensusRequest.decRef();
338-
if (prevRef == 1) {
339-
requestSize += indexedConsensusRequest.getMemorySize();
340-
}
331+
iotConsensusMemoryManager.free(indexedConsensusRequest);
341332
}
342-
iotConsensusMemoryManager.free(requestSize, true);
343333
syncStatus.free();
344334
MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
345335
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121

2222
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
2323

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
2427
import java.util.Iterator;
2528
import java.util.LinkedList;
2629
import java.util.List;
2730

2831
public class SyncStatus {
2932

33+
private static final Logger LOGGER = LoggerFactory.getLogger(SyncStatus.class);
3034
private final IoTConsensusConfig config;
3135
private final IndexController controller;
3236
private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -45,10 +49,18 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
4549
*/
4650
public synchronized void addNextBatch(Batch batch) throws InterruptedException {
4751
while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
48-
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false))
52+
|| !iotConsensusMemoryManager.reserve(batch))
4953
&& !Thread.interrupted()) {
5054
wait();
5155
}
56+
if (LOGGER.isDebugEnabled()) {
57+
LOGGER.debug(
58+
"Reserved {} bytes for batch {}-{}, current total usage {}",
59+
batch.getMemorySize(),
60+
batch.getStartIndex(),
61+
batch.getEndIndex(),
62+
iotConsensusMemoryManager.getMemorySizeInByte());
63+
}
5264
pendingBatches.add(batch);
5365
}
5466

@@ -65,7 +77,7 @@ public synchronized void removeBatch(Batch batch) {
6577
while (current.isSynced()) {
6678
controller.update(current.getEndIndex(), false);
6779
iterator.remove();
68-
iotConsensusMemoryManager.free(current.getMemorySize(), false);
80+
iotConsensusMemoryManager.free(current);
6981
if (iterator.hasNext()) {
7082
current = iterator.next();
7183
} else {
@@ -78,13 +90,11 @@ public synchronized void removeBatch(Batch batch) {
7890
}
7991

8092
public synchronized void free() {
81-
long size = 0;
8293
for (Batch pendingBatch : pendingBatches) {
83-
size += pendingBatch.getMemorySize();
94+
iotConsensusMemoryManager.free(pendingBatch);
8495
}
8596
pendingBatches.clear();
8697
controller.update(0L, true);
87-
iotConsensusMemoryManager.free(size, false);
8898
}
8999

90100
/** Gets the first index that is not currently synchronized. */
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.consensus.iot.logdispatcher;
21+
22+
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
23+
import org.apache.iotdb.commons.memory.IMemoryBlock;
24+
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
25+
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
26+
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
import java.nio.ByteBuffer;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.List;
35+
36+
import static org.junit.Assert.assertEquals;
37+
import static org.junit.Assert.assertFalse;
38+
import static org.junit.Assert.assertTrue;
39+
40+
public class IoTConsensusMemoryManagerTest {
41+
42+
private IMemoryBlock previousMemoryBlock;
43+
private long memoryBlockSize = 16 * 1024L;
44+
45+
@Before
46+
public void setUp() throws Exception {
47+
previousMemoryBlock = IoTConsensusMemoryManager.getInstance().getMemoryBlock();
48+
IoTConsensusMemoryManager.getInstance()
49+
.setMemoryBlock(new AtomicLongMemoryBlock("Test", null, memoryBlockSize));
50+
}
51+
52+
@After
53+
public void tearDown() throws Exception {
54+
IoTConsensusMemoryManager.getInstance().setMemoryBlock(previousMemoryBlock);
55+
}
56+
57+
@Test
58+
public void testSingleReserveAndRelease() {
59+
testReserveAndRelease(1);
60+
}
61+
62+
@Test
63+
public void testMultiReserveAndRelease() {
64+
testReserveAndRelease(3);
65+
}
66+
67+
private void testReserveAndRelease(int numReservation) {
68+
int allocationSize = 1;
69+
long allocatedSize = 0;
70+
List<IndexedConsensusRequest> requestList = new ArrayList<>();
71+
while (true) {
72+
IndexedConsensusRequest request =
73+
new IndexedConsensusRequest(
74+
0,
75+
Collections.singletonList(
76+
new ByteBufferConsensusRequest(ByteBuffer.allocate(allocationSize))));
77+
request.buildSerializedRequests();
78+
if (allocatedSize + allocationSize
79+
<= memoryBlockSize
80+
* IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()) {
81+
for (int i = 0; i < numReservation; i++) {
82+
assertTrue(IoTConsensusMemoryManager.getInstance().reserve(request));
83+
requestList.add(request);
84+
}
85+
} else {
86+
for (int i = 0; i < numReservation; i++) {
87+
assertFalse(IoTConsensusMemoryManager.getInstance().reserve(request));
88+
}
89+
break;
90+
}
91+
allocatedSize += allocationSize;
92+
}
93+
94+
assertTrue(
95+
IoTConsensusMemoryManager.getInstance().getMemorySizeInByte()
96+
<= memoryBlockSize
97+
* IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue());
98+
for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
99+
IoTConsensusMemoryManager.getInstance().free(indexedConsensusRequest);
100+
}
101+
assertEquals(0, IoTConsensusMemoryManager.getInstance().getMemorySizeInByte());
102+
}
103+
}

0 commit comments

Comments
 (0)