Skip to content

Commit a2c22ac

Browse files
authored
Fix that the initial WAL file is not counted (#17662)
1 parent 3a714e7 commit a2c22ac

2 files changed

Lines changed: 204 additions & 0 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ protected AbstractWALBuffer(
7575
logDirectory,
7676
WALFileUtils.getLogFileName(
7777
startFileVersion, currentSearchIndex, WALFileStatus.CONTAINS_SEARCH_INDEX)));
78+
// count the newly created WAL file into file number statistics
79+
addFileNum(1);
7880
currentWALFileVersion = startFileVersion;
7981
}
8082

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
20+
21+
import org.apache.iotdb.commons.exception.IllegalPathException;
22+
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
24+
import org.apache.iotdb.consensus.ConsensusFactory;
25+
import org.apache.iotdb.db.conf.IoTDBConfig;
26+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
27+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
28+
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
29+
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
30+
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
31+
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
32+
import org.apache.iotdb.db.utils.EnvironmentUtils;
33+
import org.apache.iotdb.db.utils.constant.TestConstant;
34+
35+
import org.apache.tsfile.common.conf.TSFileConfig;
36+
import org.apache.tsfile.enums.TSDataType;
37+
import org.apache.tsfile.utils.Binary;
38+
import org.apache.tsfile.utils.BitMap;
39+
import org.apache.tsfile.write.schema.MeasurementSchema;
40+
import org.awaitility.Awaitility;
41+
import org.junit.After;
42+
import org.junit.Before;
43+
import org.junit.Test;
44+
45+
import java.io.File;
46+
import java.util.Collections;
47+
48+
import static org.junit.Assert.assertEquals;
49+
50+
/** Tests that WAL file number statistics correctly account for all WAL files. */
51+
public class WALBufferFileNumTest {
52+
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
53+
private static final String identifier = String.valueOf(Integer.MAX_VALUE);
54+
private static final String logDirectory =
55+
TestConstant.BASE_OUTPUT_PATH.concat("wal-file-num-test");
56+
private static final String devicePath = "root.test_sg.test_d";
57+
58+
private WALMode prevMode;
59+
private String prevConsensus;
60+
private long prevWalFileSizeThreshold;
61+
private WALNode walNode;
62+
63+
@Before
64+
public void setUp() throws Exception {
65+
EnvironmentUtils.cleanDir(logDirectory);
66+
prevMode = config.getWalMode();
67+
prevConsensus = config.getDataRegionConsensusProtocolClass();
68+
prevWalFileSizeThreshold = config.getWalFileSizeThresholdInByte();
69+
config.setWalMode(WALMode.SYNC);
70+
config.setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
71+
// use a small threshold so that writes can trigger auto-roll
72+
config.setWalFileSizeThresholdInByte(2 * 1024 * 1024);
73+
}
74+
75+
@After
76+
public void tearDown() throws Exception {
77+
if (walNode != null) {
78+
walNode.close();
79+
}
80+
config.setWalMode(prevMode);
81+
config.setDataRegionConsensusProtocolClass(prevConsensus);
82+
config.setWalFileSizeThresholdInByte(prevWalFileSizeThreshold);
83+
EnvironmentUtils.cleanDir(logDirectory);
84+
}
85+
86+
/** Verify that the initial WAL file writer created in the constructor is counted in fileNum. */
87+
@Test
88+
public void testInitialFileNumAfterConstruction() throws Exception {
89+
walNode = new WALNode(identifier, logDirectory);
90+
// after construction on a fresh directory, there should be exactly 1 WAL file
91+
// (the currentWALFileWriter created in the constructor)
92+
assertEquals(1, walNode.getFileNum());
93+
// verify disk agrees
94+
File[] walFilesOnDisk = WALFileUtils.listAllWALFiles(new File(logDirectory));
95+
assertEquals(1, walFilesOnDisk.length);
96+
}
97+
98+
/**
99+
* Verify that fileNum stays correct after rolling the WAL file. After one roll, there should be 2
100+
* files: the original (now closed) and the new writer.
101+
*/
102+
@Test
103+
public void testFileNumAfterRoll() throws Exception {
104+
walNode = new WALNode(identifier, logDirectory);
105+
assertEquals(1, walNode.getFileNum());
106+
107+
// write some data then roll
108+
walNode.log(
109+
0,
110+
getInsertTabletNode(devicePath, new long[] {1}),
111+
Collections.singletonList(new int[] {0, 1}));
112+
walNode.rollWALFile();
113+
Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed());
114+
115+
// after one roll: 1 closed file + 1 new open file = 2
116+
assertEquals(2, walNode.getFileNum());
117+
File[] walFilesOnDisk = WALFileUtils.listAllWALFiles(new File(logDirectory));
118+
assertEquals(2, walFilesOnDisk.length);
119+
}
120+
121+
/**
122+
* Verify that fileNum stays correct after multiple rolls. After N rolls, there should be N+1
123+
* files on disk and fileNum should match.
124+
*/
125+
@Test
126+
public void testFileNumAfterMultipleRolls() throws Exception {
127+
walNode = new WALNode(identifier, logDirectory);
128+
assertEquals(1, walNode.getFileNum());
129+
130+
int rollCount = 3;
131+
for (int i = 0; i < rollCount; i++) {
132+
walNode.log(
133+
0,
134+
getInsertTabletNode(devicePath, new long[] {i + 1}),
135+
Collections.singletonList(new int[] {0, 1}));
136+
walNode.rollWALFile();
137+
Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed());
138+
}
139+
140+
// rollCount closed files + 1 current open file
141+
long expectedFileNum = rollCount + 1;
142+
assertEquals(expectedFileNum, walNode.getFileNum());
143+
File[] walFilesOnDisk = WALFileUtils.listAllWALFiles(new File(logDirectory));
144+
assertEquals(expectedFileNum, walFilesOnDisk.length);
145+
}
146+
147+
/**
148+
* Verify that WALManager's totalFileNum is consistent with the per-node fileNum for a single WAL
149+
* node.
150+
*/
151+
@Test
152+
public void testTotalFileNumInWALManager() throws Exception {
153+
long totalFileNumBefore = WALManager.getInstance().getTotalFileNum();
154+
walNode = new WALNode(identifier, logDirectory);
155+
156+
// after construction, totalFileNum should increase by 1
157+
assertEquals(totalFileNumBefore + 1, WALManager.getInstance().getTotalFileNum());
158+
159+
// roll once
160+
walNode.log(
161+
0,
162+
getInsertTabletNode(devicePath, new long[] {1}),
163+
Collections.singletonList(new int[] {0, 1}));
164+
walNode.rollWALFile();
165+
Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed());
166+
167+
// after one roll, totalFileNum should increase by 1 more
168+
assertEquals(totalFileNumBefore + 2, WALManager.getInstance().getTotalFileNum());
169+
}
170+
171+
private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
172+
throws IllegalPathException {
173+
TSDataType[] dataTypes = new TSDataType[] {TSDataType.TEXT};
174+
String[] measurements = new String[] {"s1"};
175+
MeasurementSchema[] schemas =
176+
new MeasurementSchema[] {new MeasurementSchema("s1", TSDataType.TEXT)};
177+
178+
Object[] columns = new Object[1];
179+
Binary[] binaryValues = new Binary[times.length];
180+
for (int i = 0; i < times.length; i++) {
181+
binaryValues[i] = new Binary("test" + times[i], TSFileConfig.STRING_CHARSET);
182+
}
183+
columns[0] = binaryValues;
184+
185+
BitMap[] bitMaps = new BitMap[1];
186+
bitMaps[0] = new BitMap(times.length);
187+
188+
InsertTabletNode node =
189+
new InsertTabletNode(
190+
new PlanNodeId(""),
191+
new PartialPath(devicePath),
192+
false,
193+
measurements,
194+
dataTypes,
195+
times,
196+
bitMaps,
197+
columns,
198+
times.length);
199+
node.setMeasurementSchemas(schemas);
200+
return node;
201+
}
202+
}

0 commit comments

Comments
 (0)