Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public MultipartInputStream(String keyName,
this.length = streamLength;
}

public boolean isStreamBlockInputStream() {
return isStreamBlockInputStream;
}

@Override
protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
Expand All @@ -33,15 +42,22 @@
import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.om.TestBucket;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
Expand All @@ -52,24 +68,22 @@
public class TestStreamRead {
{
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org"), Level.ERROR);

GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class), Level.ERROR);
}

static final int CHUNK_SIZE = 1 << 20; // 1MB
static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB
static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB

static final int BLOCK_SIZE = 64 << 20;
static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
static final int BLOCK_SIZE = KEY_SIZE.getSizeInt();

static final String DUMMY_KEY = "dummyKey";

static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
final OzoneConfiguration conf = new OzoneConfiguration();
Expand All @@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
conf.setFromObject(config);

conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 1);
conf.setQuietMode(true);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, StorageUnit.MB);

ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
.setBlockSize(BLOCK_SIZE)
Expand Down Expand Up @@ -114,53 +127,127 @@ void testReadKey256k() throws Exception {
}

void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum) throws Exception {
System.out.println("cluster starting ...");
try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt())) {
cluster.waitForClusterToBeReady();

System.out.println("cluster ready");

final List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
assertEquals(1, datanodes.size());
final HddsDatanodeService datanode = datanodes.get(0);

OzoneConfiguration conf = cluster.getConf();
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamReadBlock(true);
OzoneConfiguration copy = new OzoneConfiguration(conf);
copy.setFromObject(clientConfig);
final OzoneConfiguration steamReadConf = new OzoneConfiguration(conf);
steamReadConf.setFromObject(clientConfig);

clientConfig.setStreamReadBlock(false);
final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration(conf);
nonSteamReadConf.setFromObject(clientConfig);

final int n = 5;
final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
final SizeInBytes[] readBufferSizes = {
final SizeInBytes[] bufferSizes = {
SizeInBytes.valueOf("32M"),
SizeInBytes.valueOf("8M"),
SizeInBytes.valueOf("1M"),
SizeInBytes.valueOf("4k"),
};

try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
final TestBucket bucket = TestBucket.newBuilder(client).build();
try (OzoneClient streamReadClient = OzoneClientFactory.getRpcClient(steamReadConf);
OzoneClient nonStreamReadClient = OzoneClientFactory.getRpcClient(nonSteamReadConf)) {
final TestBucket testBucket = TestBucket.newBuilder(streamReadClient).build();
final String volume = testBucket.delegate().getVolumeName();
final String bucket = testBucket.delegate().getName();
final String keyName = "key0";

// get the client ready by writing a dummy key
createKey(testBucket.delegate(), DUMMY_KEY, SizeInBytes.ONE_KB, SizeInBytes.ONE_KB);

for (SizeInBytes bufferSize : bufferSizes) {
// create key
System.out.println("---------------------------------------------------------");
createKey(testBucket.delegate(), keyName, keySize, bufferSize);

// get block file and generate md5
final OmKeyInfo info = nonStreamReadClient.getProxy().getKeyInfo(volume, bucket, keyName, false);
final List<OmKeyLocationInfo> locations = info.getLatestVersionLocations().getLocationList();
assertEquals(1, locations.size());
final BlockID blockId = locations.get(0).getBlockID();
final ContainerData containerData = datanode.getDatanodeStateMachine().getContainer().getContainerSet()
.getContainer(blockId.getContainerID()).getContainerData();
final File blockFile = ContainerLayoutVersion.FILE_PER_BLOCK.getChunkFile(containerData, blockId, null);
assertTrue(blockFile.exists());
assertEquals(BLOCK_SIZE, blockFile.length());
final String expectedMd5 = generateMd5(keySize, SizeInBytes.ONE_MB, blockFile);

for (int i = 0; i < n; i++) {
final String keyName = "key" + i;
// run tests
System.out.println("---------------------------------------------------------");
System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
keyName, keySize, bytesPerChecksum);

final String md5 = createKey(bucket.delegate(), keyName, keySize, writeBufferSize);
for (SizeInBytes readBufferSize : readBufferSizes) {
runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
final CheckedBiConsumer<SizeInBytes, String, Exception> streamRead = (readBufferSize, md5)
-> streamRead(keySize, readBufferSize, md5, testBucket, keyName);
final CheckedBiConsumer<SizeInBytes, String, Exception> nonStreamRead = (readBufferSize, md5)
-> nonStreamRead(keySize, readBufferSize, md5, nonStreamReadClient, volume, bucket, keyName);
final CheckedBiConsumer<SizeInBytes, String, Exception> fileRead = (readBufferSize, md5)
-> fileRead(keySize, readBufferSize, md5, blockFile);
final List<CheckedBiConsumer<SizeInBytes, String, Exception>> operations
= Arrays.asList(streamRead, nonStreamRead, fileRead);
Collections.shuffle(operations);

for (CheckedBiConsumer<SizeInBytes, String, Exception> op : operations) {
for (int i = 0; i < 5; i++) {
op.accept(bufferSize, null);
}
op.accept(bufferSize, expectedMd5);
}
}
}
}
}

static void streamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
TestBucket bucket, String keyName) throws Exception {
try (KeyInputStream in = bucket.getKeyInputStream(keyName)) {
assertTrue(in.isStreamBlockInputStream());
runTestReadKey(keySize, bufferSize, expectedMD5, in);
}
}

static void nonStreamRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
OzoneClient nonStreamReadClient, String volume, String bucket, String keyName) throws Exception {
final ClientProtocol proxy = nonStreamReadClient.getProxy();
try (KeyInputStream in = (KeyInputStream) proxy.getKey(volume, bucket, keyName).getInputStream()) {
assertFalse(in.isStreamBlockInputStream());
runTestReadKey(keySize, bufferSize, expectedMD5, in);
}
}

static void fileRead(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
File blockFile) throws Exception {
try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) {
runTestReadKey(keySize, bufferSize, expectedMD5, in);
}
}

static String generateMd5(SizeInBytes keySize, SizeInBytes bufferSize, File blockFile) throws Exception {
try (InputStream in = new BufferedInputStream(Files.newInputStream(blockFile.toPath()), bufferSize.getSizeInt())) {
return runTestReadKey("generateMd5", keySize, bufferSize, true, in);
}
}

static void print(String name, long keySizeByte, long elapsedNanos, SizeInBytes bufferSize, String computedMD5) {
final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB, md5=%s)%n",
name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb, computedMD5);
if (computedMD5 == null) {
System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB)%n",
name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, keySizeMb);
} else {
System.out.printf("%16s md5=%s%n", name, computedMD5);
}
}

static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize)
static void createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize, SizeInBytes bufferSize)
throws Exception {
final byte[] buffer = new byte[bufferSize.getSizeInt()];
ThreadLocalRandom.current().nextBytes(buffer);
Expand All @@ -176,50 +263,42 @@ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize,
}
}
final long elapsedNanos = System.nanoTime() - startTime;

final MessageDigest md5 = MessageDigest.getInstance("MD5");
for (long pos = 0; pos < keySizeByte;) {
final int writeSize = Math.toIntExact(Math.min(buffer.length, keySizeByte - pos));
md5.update(buffer, 0, writeSize);
pos += writeSize;
if (!keyName.startsWith(DUMMY_KEY)) {
print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, null);
}
}

final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
return computedMD5;
static void runTestReadKey(SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
InputStream in) throws Exception {
final String method = JavaUtils.getCallerStackTraceElement().getMethodName();
final String computedMD5 = runTestReadKey(method, keySize, bufferSize, expectedMD5 != null, in);
assertEquals(expectedMD5, computedMD5);
}

private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes bufferSize, String expectedMD5,
TestBucket bucket) throws Exception {
static String runTestReadKey(String name, SizeInBytes keySize, SizeInBytes bufferSize, boolean generateMd5,
InputStream in) throws Exception {
final long keySizeByte = keySize.getSize();
final MessageDigest md5 = MessageDigest.getInstance("MD5");
// Read the data fully into a large enough byte array
final byte[] buffer = new byte[bufferSize.getSizeInt()];
final long startTime = System.nanoTime();
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
int pos = 0;
for (; pos < keySizeByte;) {
final int read = keyInputStream.read(buffer, 0, buffer.length);
if (read == -1) {
break;
}
int pos = 0;
for (; pos < keySizeByte;) {
final int read = in.read(buffer, 0, buffer.length);
if (read == -1) {
break;
}

if (expectedMD5 != null) {
md5.update(buffer, 0, read);
}
pos += read;
if (generateMd5) {
md5.update(buffer, 0, read);
}
assertEquals(keySizeByte, pos);
pos += read;
}
assertEquals(keySizeByte, pos);
final long elapsedNanos = System.nanoTime() - startTime;

final String computedMD5;
if (expectedMD5 == null) {
computedMD5 = null;
} else {
computedMD5 = StringUtils.bytes2Hex(md5.digest());
assertEquals(expectedMD5, computedMD5);
}
print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
final String computedMD5 = generateMd5 ? StringUtils.bytes2Hex(md5.digest()) : null;
print(name, keySizeByte, elapsedNanos, bufferSize, computedMD5);
return computedMD5;
}
}