Skip to content

Commit 9a4986b

Browse files
szetszwoechonesis
authored andcommitted
HDDS-14117. Add nonStreamRead and fileRead cases to tests. (apache#9476)
1 parent 514f663 commit 9a4986b

2 files changed

Lines changed: 144 additions & 61 deletions

File tree

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ public MultipartInputStream(String keyName,
8282
this.length = streamLength;
8383
}
8484

85+
public boolean isStreamBlockInputStream() {
86+
return isStreamBlockInputStream;
87+
}
88+
8589
@Override
8690
protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
8791
throws IOException {

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java

Lines changed: 140 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919

2020
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertFalse;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
2224

25+
import java.io.BufferedInputStream;
26+
import java.io.File;
27+
import java.io.InputStream;
2328
import java.io.OutputStream;
29+
import java.nio.file.Files;
2430
import java.security.MessageDigest;
31+
import java.util.Arrays;
2532
import java.util.Collections;
33+
import java.util.List;
2634
import java.util.concurrent.ThreadLocalRandom;
2735
import org.apache.hadoop.hdds.StringUtils;
36+
import org.apache.hadoop.hdds.client.BlockID;
2837
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
2938
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
3039
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -33,15 +42,22 @@
3342
import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
3443
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
3544
import org.apache.hadoop.ozone.ClientConfigForTesting;
45+
import org.apache.hadoop.ozone.HddsDatanodeService;
3646
import org.apache.hadoop.ozone.MiniOzoneCluster;
37-
import org.apache.hadoop.ozone.OzoneConfigKeys;
3847
import org.apache.hadoop.ozone.client.OzoneBucket;
3948
import org.apache.hadoop.ozone.client.OzoneClient;
4049
import org.apache.hadoop.ozone.client.OzoneClientFactory;
4150
import org.apache.hadoop.ozone.client.io.KeyInputStream;
51+
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
52+
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
53+
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
4254
import org.apache.hadoop.ozone.om.TestBucket;
55+
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
56+
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
4357
import org.apache.ozone.test.GenericTestUtils;
58+
import org.apache.ratis.util.JavaUtils;
4459
import org.apache.ratis.util.SizeInBytes;
60+
import org.apache.ratis.util.function.CheckedBiConsumer;
4561
import org.junit.jupiter.api.Test;
4662
import org.slf4j.LoggerFactory;
4763
import org.slf4j.event.Level;
@@ -52,24 +68,22 @@
5268
public class TestStreamRead {
5369
{
5470
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
55-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), Level.ERROR);
56-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"), Level.ERROR);
57-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"), Level.ERROR);
58-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"), Level.ERROR);
59-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"), Level.ERROR);
60-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"), Level.ERROR);
61-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"), Level.ERROR);
62-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"), Level.ERROR);
63-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), Level.ERROR);
71+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org"), Level.ERROR);
72+
73+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"), Level.ERROR);
74+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"), Level.ERROR);
75+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"), Level.ERROR);
6476
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class), Level.ERROR);
6577
}
6678

6779
static final int CHUNK_SIZE = 1 << 20; // 1MB
6880
static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB
6981
static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB
7082

71-
static final int BLOCK_SIZE = 64 << 20;
7283
static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
84+
static final int BLOCK_SIZE = KEY_SIZE.getSizeInt();
85+
86+
static final String DUMMY_KEY = "dummyKey";
7387

7488
static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
7589
final OzoneConfiguration conf = new OzoneConfiguration();
@@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
7993
conf.setFromObject(config);
8094

8195
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
82-
conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
96+
conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 1);
8397
conf.setQuietMode(true);
84-
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, StorageUnit.MB);
8598

8699
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
87100
.setBlockSize(BLOCK_SIZE)
@@ -114,53 +127,127 @@ void testReadKey256k() throws Exception {
114127
}
115128

116129
void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum) throws Exception {
130+
System.out.println("cluster starting ...");
117131
try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt())) {
118132
cluster.waitForClusterToBeReady();
119-
120133
System.out.println("cluster ready");
121134

135+
final List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
136+
assertEquals(1, datanodes.size());
137+
final HddsDatanodeService datanode = datanodes.get(0);
138+
122139
OzoneConfiguration conf = cluster.getConf();
123140
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
124141
clientConfig.setStreamReadBlock(true);
125-
OzoneConfiguration copy = new OzoneConfiguration(conf);
126-
copy.setFromObject(clientConfig);
142+
final OzoneConfiguration steamReadConf = new OzoneConfiguration(conf);
143+
steamReadConf.setFromObject(clientConfig);
144+
145+
clientConfig.setStreamReadBlock(false);
146+
final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration(conf);
147+
nonSteamReadConf.setFromObject(clientConfig);
127148

128-
final int n = 5;
129-
final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
130-
final SizeInBytes[] readBufferSizes = {
149+
final SizeInBytes[] bufferSizes = {
131150
SizeInBytes.valueOf("32M"),
132151
SizeInBytes.valueOf("8M"),
133152
SizeInBytes.valueOf("1M"),
134153
SizeInBytes.valueOf("4k"),
135154
};
136155

137-
try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
138-
final TestBucket bucket = TestBucket.newBuilder(client).build();
156+
try (OzoneClient streamReadClient = OzoneClientFactory.getRpcClient(steamReadConf);
157+
OzoneClient nonStreamReadClient = OzoneClientFactory.getRpcClient(nonSteamReadConf)) {
158+
final TestBucket testBucket = TestBucket.newBuilder(streamReadClient).build();
159+
final String volume = testBucket.delegate().getVolumeName();
160+
final String bucket = testBucket.delegate().getName();
161+
final String keyName = "key0";
162+
163+
// get the client ready by writing a dummy key
164+
createKey(testBucket.delegate(), DUMMY_KEY, SizeInBytes.ONE_KB, SizeInBytes.ONE_KB);
165+
166+
for (SizeInBytes bufferSize : bufferSizes) {
167+
// create key
168+
System.out.println("---------------------------------------------------------");
169+
createKey(testBucket.delegate(), keyName, keySize, bufferSize);
170+
171+
// get block file and generate md5
172+
final OmKeyInfo info = nonStreamReadClient.getProxy().getKeyInfo(volume, bucket, keyName, false);
173+
final List<OmKeyLocationInfo> locations = info.getLatestVersionLocations().getLocationList();
174+
assertEquals(1, locations.size());
175+
final BlockID blockId = locations.get(0).getBlockID();
176+
final ContainerData containerData = datanode.getDatanodeStateMachine().getContainer().getContainerSet()
177+
.getContainer(blockId.getContainerID()).getContainerData();
178+
final File blockFile = ContainerLayoutVersion.FILE_PER_BLOCK.getChunkFile(containerData, blockId, null);
179+
assertTrue(blockFile.exists());
180+
assertEquals(BLOCK_SIZE, blockFile.length());
181+
final String expectedMd5 = generateMd5(keySize, SizeInBytes.ONE_MB, blockFile);
139182

140-
for (int i = 0; i < n; i++) {
141-
final String keyName = "key" + i;
183+
// run tests
142184
System.out.println("---------------------------------------------------------");
143185
System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
144186
keyName, keySize, bytesPerChecksum);
145187

146-
final String md5 = createKey(bucket.delegate(), keyName, keySize, writeBufferSize);
147-
for (SizeInBytes readBufferSize : readBufferSizes) {
148-
runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
149-
runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
188+
final CheckedBiConsumer<SizeInBytes, String, Exception> streamRead = (readBufferSize, md5)
189+
-> streamRead(keySize, readBufferSize, md5, testBucket, keyName);
190+
final CheckedBiConsumer<SizeInBytes, String, Exception> nonStreamRead = (readBufferSize, md5)
191+
-> nonStreamRead(keySize, readBufferSize, md5, nonStreamReadClient, volume, bucket, keyName);
192+
final CheckedBiConsumer<SizeInBytes, String, Exception> fileRead = (readBufferSize, md5)
193+
-> fileRead(keySize, readBufferSize, md5, blockFile);
194+
final List<CheckedBiConsumer<SizeInBytes, String, Exception>> operations
195+
= Arrays.asList(streamRead, nonStreamRead, fileRead);
196+
Collections.shuffle(operations);
197+
198+
for (CheckedBiConsumer<SizeInBytes, String, Exception> op : operations) {
199+
for (int i = 0; i < 5; i++) {
200+
op.accept(bufferSize, null);
201+
}
202+
op.accept(bufferSize, expectedMd5);
150203
}
151204
}
152205
}
153206
}
154207
}
155208

209+
static void streamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
210+
TestBucket bucket, String keyName) throws Exception {
211+
try (KeyInputStream in = bucket.getKeyInputStream(keyName)) {
212+
assertTrue(in.isStreamBlockInputStream());
213+
runTestReadKey(keySize, bufferSize, expectedMD5, in);
214+
}
215+
}
216+
217+
static void nonStreamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
218+
OzoneClient nonStreamReadClient, String volume, String bucket, String keyName) throws Exception {
219+
final ClientProtocol proxy = nonStreamReadClient.getProxy();
220+
try (KeyInputStream in = (KeyInputStream) proxy.getKey(volume, bucket, keyName).getInputStream()) {
221+
assertFalse(in.isStreamBlockInputStream());
222+
runTestReadKey(keySize, bufferSize, expectedMD5, in);
223+
}
224+
}
225+
226+
static void fileRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
227+
File blockFile) throws Exception {
228+
try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) {
229+
runTestReadKey(keySize, bufferSize, expectedMD5, in);
230+
}
231+
}
232+
233+
static String generateMd5(SizeInBytes keySize, SizeInBytes bufferSize, File blockFile) throws Exception {
234+
try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) {
235+
return runTestReadKey("generateMd5", keySize, bufferSize, true, in);
236+
}
237+
}
238+
156239
static void print(String name, long keySizeByte, long elapsedNanos, SizeInBytes bufferSize, String computedMD5) {
157240
final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
158241
final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
159-
System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB, md5=%s)%n",
160-
name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb, computedMD5);
242+
if (computedMD5 == null) {
243+
System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB)%n",
244+
name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb);
245+
} else {
246+
System.out.printf("%16s md5=%s%n", name, computedMD5);
247+
}
161248
}
162249

163-
static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize)
250+
static void createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize)
164251
throws Exception {
165252
final byte[] buffer = new byte[bufferSize.getSizeInt()];
166253
ThreadLocalRandom.current().nextBytes(buffer);
@@ -176,50 +263,42 @@ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize,
176263
}
177264
}
178265
final long elapsedNanos = System.nanoTime() - startTime;
179-
180-
final MessageDigest md5 = MessageDigest.getInstance("MD5");
181-
for (long pos = 0; pos < keySizeByte;) {
182-
final int writeSize = Math.toIntExact(Math.min(buffer.length, keySizeByte - pos));
183-
md5.update(buffer, 0, writeSize);
184-
pos += writeSize;
266+
if (!keyName.startsWith(DUMMY_KEY)) {
267+
print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, null);
185268
}
269+
}
186270

187-
final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
188-
print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
189-
return computedMD5;
271+
static void runTestReadKey(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
272+
InputStream in) throws Exception {
273+
final String method = JavaUtils.getCallerStackTraceElement().getMethodName();
274+
final String computedMD5 = runTestReadKey(method, keySize, bufferSize, expectedMD5 != null, in);
275+
assertEquals(expectedMD5, computedMD5);
190276
}
191277

192-
private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
193-
TestBucket bucket) throws Exception {
278+
static String runTestReadKey(String name, SizeInBytes keySize, SizeInBytes bufferSize, boolean generateMd5,
279+
InputStream in) throws Exception {
194280
final long keySizeByte = keySize.getSize();
195281
final MessageDigest md5 = MessageDigest.getInstance("MD5");
196282
// Read the data fully into a large enough byte array
197283
final byte[] buffer = new byte[bufferSize.getSizeInt()];
198284
final long startTime = System.nanoTime();
199-
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
200-
int pos = 0;
201-
for (; pos < keySizeByte;) {
202-
final int read = keyInputStream.read(buffer, 0, buffer.length);
203-
if (read == -1) {
204-
break;
205-
}
285+
int pos = 0;
286+
for (; pos < keySizeByte;) {
287+
final int read = in.read(buffer, 0, buffer.length);
288+
if (read == -1) {
289+
break;
290+
}
206291

207-
if (expectedMD5 != null) {
208-
md5.update(buffer, 0, read);
209-
}
210-
pos += read;
292+
if (generateMd5) {
293+
md5.update(buffer, 0, read);
211294
}
212-
assertEquals(keySizeByte, pos);
295+
pos += read;
213296
}
297+
assertEquals(keySizeByte, pos);
214298
final long elapsedNanos = System.nanoTime() - startTime;
215299

216-
final String computedMD5;
217-
if (expectedMD5 == null) {
218-
computedMD5 = null;
219-
} else {
220-
computedMD5 = StringUtils.bytes2Hex(md5.digest());
221-
assertEquals(expectedMD5, computedMD5);
222-
}
223-
print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
300+
final String computedMD5 = generateMd5 ? StringUtils.bytes2Hex(md5.digest()) : null;
301+
print(name, keySizeByte, elapsedNanos, bufferSize, computedMD5);
302+
return computedMD5;
224303
}
225304
}

0 commit comments

Comments
 (0)