diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java index a28658b1ebbf..221a48be828d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java @@ -82,6 +82,10 @@ public MultipartInputStream(String keyName, this.length = streamLength; } + public boolean isStreamBlockInputStream() { + return isStreamBlockInputStream; + } + @Override protected synchronized int readWithStrategy(ByteReaderStrategy strategy) throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java index 29a438766303..9fc217b6df3a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java @@ -19,12 +19,21 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Files; import java.security.MessageDigest; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -33,15 +42,22 @@ import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.hadoop.ozone.ClientConfigForTesting; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyInputStream; +import org.apache.hadoop.ozone.client.protocol.ClientProtocol; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.om.TestBucket; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.function.CheckedBiConsumer; import org.junit.jupiter.api.Test; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -52,15 +68,11 @@ public class TestStreamRead { { GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"), Level.ERROR); - GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), Level.ERROR); + GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org"), Level.ERROR); + + GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"), Level.ERROR); + GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"), Level.ERROR); + GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"), Level.ERROR); GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class), Level.ERROR); } @@ -68,8 +80,10 @@ public class TestStreamRead { static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB - static final int BLOCK_SIZE = 64 << 20; static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M"); + static final int BLOCK_SIZE = KEY_SIZE.getSizeInt(); + + static final String DUMMY_KEY = "dummyKey"; static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception { final OzoneConfiguration conf = new OzoneConfiguration(); @@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception { conf.setFromObject(config); conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); - conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5); + conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 1); conf.setQuietMode(true); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, StorageUnit.MB); ClientConfigForTesting.newBuilder(StorageUnit.BYTES) .setBlockSize(BLOCK_SIZE) @@ -114,53 +127,127 @@ void testReadKey256k() throws Exception { } void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum) throws Exception { + System.out.println("cluster starting ..."); try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt())) { cluster.waitForClusterToBeReady(); - System.out.println("cluster ready"); + final List datanodes = cluster.getHddsDatanodes(); + assertEquals(1, datanodes.size()); + final HddsDatanodeService datanode = datanodes.get(0); + OzoneConfiguration conf = cluster.getConf(); OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setStreamReadBlock(true); - OzoneConfiguration copy = new OzoneConfiguration(conf); - copy.setFromObject(clientConfig); + final OzoneConfiguration steamReadConf = new OzoneConfiguration(conf); + steamReadConf.setFromObject(clientConfig); + + clientConfig.setStreamReadBlock(false); + final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration(conf); + nonSteamReadConf.setFromObject(clientConfig); - final int n = 5; - final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB"); - final SizeInBytes[] readBufferSizes = { + final SizeInBytes[] bufferSizes = { SizeInBytes.valueOf("32M"), SizeInBytes.valueOf("8M"), SizeInBytes.valueOf("1M"), SizeInBytes.valueOf("4k"), }; - try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) { - final TestBucket bucket = TestBucket.newBuilder(client).build(); + try (OzoneClient streamReadClient = OzoneClientFactory.getRpcClient(steamReadConf); + OzoneClient nonStreamReadClient = OzoneClientFactory.getRpcClient(nonSteamReadConf)) { + final TestBucket testBucket = TestBucket.newBuilder(streamReadClient).build(); + final String volume = testBucket.delegate().getVolumeName(); + final String bucket = testBucket.delegate().getName(); + final String keyName = "key0"; + + // get the client ready by writing a dummy key + createKey(testBucket.delegate(), DUMMY_KEY, SizeInBytes.ONE_KB, SizeInBytes.ONE_KB); + + for (SizeInBytes bufferSize : bufferSizes) { + // create key + System.out.println("---------------------------------------------------------"); + createKey(testBucket.delegate(), keyName, keySize, bufferSize); + + // get block file and generate md5 + final OmKeyInfo info = nonStreamReadClient.getProxy().getKeyInfo(volume, bucket, keyName, false); + final List locations = info.getLatestVersionLocations().getLocationList(); + assertEquals(1, locations.size()); + final BlockID blockId = locations.get(0).getBlockID(); + final ContainerData containerData = datanode.getDatanodeStateMachine().getContainer().getContainerSet() + .getContainer(blockId.getContainerID()).getContainerData(); + final File blockFile = ContainerLayoutVersion.FILE_PER_BLOCK.getChunkFile(containerData, blockId, null); + assertTrue(blockFile.exists()); + assertEquals(BLOCK_SIZE, blockFile.length()); + final String expectedMd5 = generateMd5(keySize, SizeInBytes.ONE_MB, blockFile); - for (int i = 0; i < n; i++) { - final String keyName = "key" + i; + // run tests System.out.println("---------------------------------------------------------"); System.out.printf("%s with %s bytes and %s bytesPerChecksum%n", keyName, keySize, bytesPerChecksum); - final String md5 = createKey(bucket.delegate(), keyName, keySize, writeBufferSize); - for (SizeInBytes readBufferSize : readBufferSizes) { - runTestReadKey(keyName, keySize, readBufferSize, null, bucket); - runTestReadKey(keyName, keySize, readBufferSize, md5, bucket); + final CheckedBiConsumer streamRead = (readBufferSize, md5) + -> streamRead(keySize, readBufferSize, md5, testBucket, keyName); + final CheckedBiConsumer nonStreamRead = (readBufferSize, md5) + -> nonStreamRead(keySize, readBufferSize, md5, nonStreamReadClient, volume, bucket, keyName); + final CheckedBiConsumer fileRead = (readBufferSize, md5) + -> fileRead(keySize, readBufferSize, md5, blockFile); + final List> operations + = Arrays.asList(streamRead, nonStreamRead, fileRead); + Collections.shuffle(operations); + + for (CheckedBiConsumer op : operations) { + for (int i = 0; i < 5; i++) { + op.accept(bufferSize, null); + } + op.accept(bufferSize, expectedMd5); } } } } } + static void streamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5, + TestBucket bucket, String keyName) throws Exception { + try (KeyInputStream in = bucket.getKeyInputStream(keyName)) { + assertTrue(in.isStreamBlockInputStream()); + runTestReadKey(keySize, bufferSize, expectedMD5, in); + } + } + + static void nonStreamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5, + OzoneClient nonStreamReadClient, String volume, String bucket, String keyName) throws Exception { + final ClientProtocol proxy = nonStreamReadClient.getProxy(); + try (KeyInputStream in = (KeyInputStream) proxy.getKey(volume, bucket, keyName).getInputStream()) { + assertFalse(in.isStreamBlockInputStream()); + runTestReadKey(keySize, bufferSize, expectedMD5, in); + } + } + + static void fileRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5, + File blockFile) throws Exception { + try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) { + runTestReadKey(keySize, bufferSize, expectedMD5, in); + } + } + + static String generateMd5(SizeInBytes keySize, SizeInBytes bufferSize, File blockFile) throws Exception { + try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) { + return runTestReadKey("generateMd5", keySize, bufferSize, true, in); + } + } + static void print(String name, long keySizeByte, long elapsedNanos, SizeInBytes bufferSize, String computedMD5) { final double keySizeMb = keySizeByte * 1.0 / (1 << 20); final double elapsedSeconds = elapsedNanos / 1_000_000_000.0; - System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB, md5=%s)%n", - name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb, computedMD5); + if (computedMD5 == null) { + System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB)%n", + name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb); + } else { + System.out.printf("%16s md5=%s%n", name, computedMD5); + } } - static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize) + static void createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize) throws Exception { final byte[] buffer = new byte[bufferSize.getSizeInt()]; ThreadLocalRandom.current().nextBytes(buffer); @@ -176,50 +263,42 @@ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, } } final long elapsedNanos = System.nanoTime() - startTime; - - final MessageDigest md5 = MessageDigest.getInstance("MD5"); - for (long pos = 0; pos < keySizeByte;) { - final int writeSize = Math.toIntExact(Math.min(buffer.length, keySizeByte - pos)); - md5.update(buffer, 0, writeSize); - pos += writeSize; + if (!keyName.startsWith(DUMMY_KEY)) { + print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, null); } + } - final String computedMD5 = StringUtils.bytes2Hex(md5.digest()); - print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5); - return computedMD5; + static void runTestReadKey(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5, + InputStream in) throws Exception { + final String method = JavaUtils.getCallerStackTraceElement().getMethodName(); + final String computedMD5 = runTestReadKey(method, keySize, bufferSize, expectedMD5 != null, in); + assertEquals(expectedMD5, computedMD5); } - private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5, - TestBucket bucket) throws Exception { + static String runTestReadKey(String name, SizeInBytes keySize, SizeInBytes bufferSize, boolean generateMd5, + InputStream in) throws Exception { final long keySizeByte = keySize.getSize(); final MessageDigest md5 = MessageDigest.getInstance("MD5"); // Read the data fully into a large enough byte array final byte[] buffer = new byte[bufferSize.getSizeInt()]; final long startTime = System.nanoTime(); - try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { - int pos = 0; - for (; pos < keySizeByte;) { - final int read = keyInputStream.read(buffer, 0, buffer.length); - if (read == -1) { - break; - } + int pos = 0; + for (; pos < keySizeByte;) { + final int read = in.read(buffer, 0, buffer.length); + if (read == -1) { + break; + } - if (expectedMD5 != null) { - md5.update(buffer, 0, read); - } - pos += read; + if (generateMd5) { + md5.update(buffer, 0, read); } - assertEquals(keySizeByte, pos); + pos += read; } + assertEquals(keySizeByte, pos); final long elapsedNanos = System.nanoTime() - startTime; - final String computedMD5; - if (expectedMD5 == null) { - computedMD5 = null; - } else { - computedMD5 = StringUtils.bytes2Hex(md5.digest()); - assertEquals(expectedMD5, computedMD5); - } - print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5); + final String computedMD5 = generateMd5 ? StringUtils.bytes2Hex(md5.digest()) : null; + print(name, keySizeByte, elapsedNanos, bufferSize, computedMD5); + return computedMD5; } }