From abdf8542d2e0589fc624a881f1db6950b5cf145b Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sat, 6 Dec 2025 16:55:08 +0800 Subject: [PATCH 01/10] HDDS-14036. StreamRead: Make preReadSize, responseDataSize and read timeout configurable --- .../hadoop/hdds/scm/OzoneClientConfig.java | 33 +++++++++++++++++++ .../scm/storage/StreamBlockInputStream.java | 10 ++++-- .../src/main/resources/ozone-default.xml | 21 ++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 7329f2c16b74..6524c5e128a0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -281,6 +281,27 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int maxConcurrentWritePerKey = 1; + @Config(key = "ozone.client.stream.read.pre-read-size", + defaultValue = "33554432", + type = ConfigType.LONG, + tags = {ConfigTag.CLIENT}, + description = "Extra bytes to prefetch during streaming reads.") + private long streamReadPreReadSize = 32L << 20; + + @Config(key = "ozone.client.stream.read.response-data-size", + defaultValue = "1048576", + type = ConfigType.INT, + tags = {ConfigTag.CLIENT}, + description = "Chunk size of streaming read responses from datanodes.") + private int streamReadResponseDataSize = 1 << 20; + + @Config(key = "ozone.client.stream.read.timeout", + defaultValue = "1000", + type = ConfigType.INT, + tags = {ConfigTag.CLIENT}, + description = "Timeout in ms for receiving streaming read responses.") + private int streamReadTimeoutMs = 1000; + @PostConstruct public void validate() { Preconditions.checkState(streamBufferSize > 0); @@ -553,6 +574,18 @@ public void setStreamReadBlock(boolean streamReadBlock) { this.streamReadBlock = streamReadBlock; } + public long getStreamReadPreReadSize() { + return streamReadPreReadSize; + } + + public int getStreamReadResponseDataSize() { + return streamReadResponseDataSize; + } + + public int getStreamReadTimeoutMs() { + return streamReadTimeoutMs; + } + /** * Enum for indicating what mode to use when combining chunk and block * checksums to define an aggregate FileChecksum. This should be considered diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 72e5d11edc4a..e2d9f0b479eb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -70,8 +70,9 @@ public class StreamBlockInputStream extends BlockExtendedInputStream { private final String name = "stream" + STREAM_ID.getAndIncrement(); private final BlockID blockID; private final long blockLength; - private final int responseDataSize = 1 << 20; // 1 MB - private final long preReadSize = 32 << 20; // 32 MB + private final int responseDataSize; // Default size is 1 MB + private final long preReadSize; // Default size is 32 MB + private final int readTimeoutMs; // // Default timeout is 10 second private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = new AtomicReference<>(); private XceiverClientFactory xceiverClientFactory; @@ -101,6 +102,9 @@ public StreamBlockInputStream( this.verifyChecksum = config.isChecksumVerify(); this.retryPolicy = getReadRetryPolicy(config); this.refreshFunction = refreshFunction; + this.preReadSize = config.getStreamReadPreReadSize(); + this.responseDataSize = config.getStreamReadResponseDataSize(); + this.readTimeoutMs = config.getStreamReadTimeoutMs(); } @Override @@ -399,7 +403,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { } ByteBuffer readFromQueue() throws IOException { - final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS); + final ReadBlockResponseProto readBlock = poll(readTimeoutMs, TimeUnit.SECONDS); // The server always returns data starting from the last checksum boundary. Therefore if the reader position is // ahead of the position we received from the server, we need to adjust the buffer position accordingly. // If the reader position is behind diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 658a0d67d5ab..9305e5a7febf 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4852,4 +4852,25 @@ 5m Interval for cleaning up orphan snapshot local data versions corresponding to snapshots + + ozone.scm.client.stream.read.pre-read-size + 33554432 + + Extra bytes to prefetch during streaming block reads. + + + + ozone.scm.client.stream.read.response-data-size + 1048576 + + Maximum chunk size returned by datanode in a single streaming read response. + + + + ozone.scm.client.stream.read.timeout + 10000 + + Timeout (in milliseconds) when waiting for streaming read responses. + + From a58f7ae2e05ca7326353a7d67300c1f6bc567e98 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sun, 7 Dec 2025 10:55:32 +0800 Subject: [PATCH 02/10] Fix failed unit test --- .../hadoop/hdds/scm/OzoneClientConfig.java | 2 +- .../apache/hadoop/ozone/OzoneConfigKeys.java | 21 +++++++++++++++++++ .../src/main/resources/ozone-default.xml | 6 +++--- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 6524c5e128a0..8458dafb19c5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -300,7 +300,7 @@ public class OzoneClientConfig { type = ConfigType.INT, tags = {ConfigTag.CLIENT}, description = "Timeout in ms for receiving streaming read responses.") - private int streamReadTimeoutMs = 1000; + private int streamReadTimeoutMs = 10_000; @PostConstruct public void validate() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index ceca7d0c8824..22a064ea5a88 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -694,6 +694,27 @@ public final class OzoneConfigKeys { "ozone.client.elastic.byte.buffer.pool.max.size"; public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; + public static final String + OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE = + "ozone.client.stream.read.pre-read-size"; + public static final long + OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE_DEFAULT = + 32L << 20; + + public static final String + OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE = + "ozone.client.stream.read.response-data-size"; + public static final int + OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE_DEFAULT = + 1 << 20; + + public static final String + OZONE_CLIENT_STREAM_READ_TIMEOUT = + "ozone.client.stream.read.timeout"; + public static final int + OZONE_CLIENT_STREAM_READ_TIMEOUT_DEFAULT = + 10_000; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 9305e5a7febf..69ef95b1edaa 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4853,21 +4853,21 @@ Interval for cleaning up orphan snapshot local data versions corresponding to snapshots - ozone.scm.client.stream.read.pre-read-size + ozone.client.stream.read.pre-read-size 33554432 Extra bytes to prefetch during streaming block reads. - ozone.scm.client.stream.read.response-data-size + ozone.client.stream.read.response-data-size 1048576 Maximum chunk size returned by datanode in a single streaming read response. - ozone.scm.client.stream.read.timeout + ozone.client.stream.read.timeout 10000 Timeout (in milliseconds) when waiting for streaming read responses. From 7975ce34dfa56da5ed6823f685052b58b0db6acb Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Wed, 10 Dec 2025 02:15:42 +0800 Subject: [PATCH 03/10] Apply fixes based on review feedback --- .../hadoop/hdds/scm/OzoneClientConfig.java | 13 +++--- .../scm/storage/StreamBlockInputStream.java | 29 ++++++++++-- .../src/main/resources/ozone-default.xml | 21 --------- .../rpc/read/TestStreamBlockInputStream.java | 44 +++++++++++++++++++ 4 files changed, 77 insertions(+), 30 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 8458dafb19c5..33cc7264203a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import java.time.Duration; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -296,11 +297,11 @@ public class OzoneClientConfig { private int streamReadResponseDataSize = 1 << 20; @Config(key = "ozone.client.stream.read.timeout", - defaultValue = "1000", - type = ConfigType.INT, + defaultValue = "10s", + type = ConfigType.TIME, tags = {ConfigTag.CLIENT}, - description = "Timeout in ms for receiving streaming read responses.") - private int streamReadTimeoutMs = 10_000; + description = "Timeout for receiving streaming read responses.") + private Duration streamReadTimeout = Duration.ofSeconds(10); @PostConstruct public void validate() { @@ -582,8 +583,8 @@ public int getStreamReadResponseDataSize() { return streamReadResponseDataSize; } - public int getStreamReadTimeoutMs() { - return streamReadTimeoutMs; + public Duration getStreamReadTimeout() { + return streamReadTimeout; } /** diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index e2d9f0b479eb..83c0bdacd7b3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -72,7 +73,7 @@ public class StreamBlockInputStream extends BlockExtendedInputStream { private final long blockLength; private final int responseDataSize; // Default size is 1 MB private final long preReadSize; // Default size is 32 MB - private final int readTimeoutMs; // // Default timeout is 10 second + private final Duration readTimeout; // // Default timeout is 10 second private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = new AtomicReference<>(); private XceiverClientFactory xceiverClientFactory; @@ -104,7 +105,7 @@ public StreamBlockInputStream( this.refreshFunction = refreshFunction; this.preReadSize = config.getStreamReadPreReadSize(); this.responseDataSize = config.getStreamReadResponseDataSize(); - this.readTimeoutMs = config.getStreamReadTimeoutMs(); + this.readTimeout = config.getStreamReadTimeout(); } @Override @@ -332,6 +333,19 @@ public String toString() { return name; } + public long getPreReadSize() { + return preReadSize; + } + + public int getResponseDataSize() { + return responseDataSize; + } + + /** Visible for testing: returns the configured streaming read timeout. */ + public Duration getReadTimeout() { + return readTimeout; + } + /** * Implementation of a StreamObserver used to received and buffer streaming GRPC reads. */ @@ -403,7 +417,16 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { } ByteBuffer readFromQueue() throws IOException { - final ReadBlockResponseProto readBlock = poll(readTimeoutMs, TimeUnit.SECONDS); + // Convert Duration -> int seconds for poll(...) + final int timeoutSeconds; + if (readTimeout == null || readTimeout.isZero() || readTimeout.isNegative()) { + timeoutSeconds = 0; + } else { + long sec = readTimeout.getSeconds(); + // Prevent overflow if client config is extremely large + timeoutSeconds = sec > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) sec; + } + final ReadBlockResponseProto readBlock = poll(timeoutSeconds, TimeUnit.SECONDS); // The server always returns data starting from the last checksum boundary. Therefore if the reader position is // ahead of the position we received from the server, we need to adjust the buffer position accordingly. // If the reader position is behind diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 69ef95b1edaa..658a0d67d5ab 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4852,25 +4852,4 @@ 5m Interval for cleaning up orphan snapshot local data versions corresponding to snapshots - - ozone.client.stream.read.pre-read-size - 33554432 - - Extra bytes to prefetch during streaming block reads. - - - - ozone.client.stream.read.response-data-size - 1048576 - - Maximum chunk size returned by datanode in a single streaming read response. - - - - ozone.client.stream.read.timeout - 10000 - - Timeout (in milliseconds) when waiting for streaming read responses. - - diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java index 44b753210d91..e3d5eeeeae4f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -23,19 +23,28 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService; import org.apache.hadoop.ozone.om.TestBucket; +import org.apache.hadoop.security.token.Token; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -280,4 +289,39 @@ private void testReadEmptyBlock() throws Exception { assertEquals(-1, keyInputStream.read()); } } + + @Test + public void testCustomStreamReadConfigIsApplied() throws Exception { + // Arrange: create a config with non-default values + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.client.stream.read.pre-read-size", "67108864"); + conf.set("ozone.client.stream.read.response-data-size", "2097152"); + conf.set("ozone.client.stream.read.timeout", "5s"); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + + // Sanity check + assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + + BlockID blockID = new BlockID(1L, 1L); + long length = 1024L; + + // 💡 Pipeline 用 mock 就可以,不需要真的建一個 + Pipeline pipeline = Mockito.mock(Pipeline.class); + + Token token = null; + XceiverClientFactory xceiverClientFactory = Mockito.mock(XceiverClientFactory.class); + Function refreshFunction = b -> null; + + // Act + StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, length, pipeline, token, + xceiverClientFactory, refreshFunction, clientConfig); + + // Assert + assertEquals(64L << 20, sbis.getPreReadSize()); + assertEquals(2 << 20, sbis.getResponseDataSize()); + assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + } + } From e2538b20d1402cb83d82f56a29deafa6e167bf48 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Wed, 10 Dec 2025 20:24:34 +0800 Subject: [PATCH 04/10] Add comments for clarity --- .../client/rpc/read/TestStreamBlockInputStream.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java index e3d5eeeeae4f..0c35243cba6b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -302,18 +302,17 @@ public void testCustomStreamReadConfigIsApplied() throws Exception { // Sanity check assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); - + // Create a dummy BlockID for the test BlockID blockID = new BlockID(1L, 1L); long length = 1024L; - - // 💡 Pipeline 用 mock 就可以,不需要真的建一個 + // Use Mockito to create a mock Pipeline instance. Pipeline pipeline = Mockito.mock(Pipeline.class); Token token = null; + // Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor XceiverClientFactory xceiverClientFactory = Mockito.mock(XceiverClientFactory.class); Function refreshFunction = b -> null; - - // Act + // Create a StreamBlockInputStream instance StreamBlockInputStream sbis = new StreamBlockInputStream( blockID, length, pipeline, token, xceiverClientFactory, refreshFunction, clientConfig); From 99a2c5142d1dda766106ab8eb375eb350df38da9 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Wed, 10 Dec 2025 21:58:13 +0800 Subject: [PATCH 05/10] Fix findbugs problem --- .../client/rpc/read/TestStreamBlockInputStream.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java index 0c35243cba6b..13d784d4d747 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -313,14 +313,15 @@ public void testCustomStreamReadConfigIsApplied() throws Exception { XceiverClientFactory xceiverClientFactory = Mockito.mock(XceiverClientFactory.class); Function refreshFunction = b -> null; // Create a StreamBlockInputStream instance - StreamBlockInputStream sbis = new StreamBlockInputStream( + try (StreamBlockInputStream sbis = new StreamBlockInputStream( blockID, length, pipeline, token, - xceiverClientFactory, refreshFunction, clientConfig); + xceiverClientFactory, refreshFunction, clientConfig)) { - // Assert - assertEquals(64L << 20, sbis.getPreReadSize()); - assertEquals(2 << 20, sbis.getResponseDataSize()); - assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + // Assert: fields should match config values + assertEquals(64L << 20, sbis.getPreReadSize()); + assertEquals(2 << 20, sbis.getResponseDataSize()); + assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + } } } From 9025662fd48102f1d01b8e7647fc8739b121178d Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Thu, 11 Dec 2025 23:57:26 +0800 Subject: [PATCH 06/10] Update the patch according to reviewer suggestions --- .../hadoop/hdds/scm/OzoneClientConfig.java | 39 +++++++++++++++ .../scm/storage/StreamBlockInputStream.java | 25 ++++------ .../hdds/scm/TestOzoneClientConfig.java | 15 ++++++ .../storage/TestStreamBlockInputStream.java | 49 +++++++++++++++++++ .../apache/hadoop/ozone/OzoneConfigKeys.java | 23 +-------- .../rpc/read/TestStreamBlockInputStream.java | 44 ----------------- 6 files changed, 113 insertions(+), 82 deletions(-) create mode 100644 hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 33cc7264203a..23378863c02e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -357,6 +357,33 @@ public void validate() { } // Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here } + // Validate streaming read configurations. + // Ensure pre-read size is non-negative. If it's invalid, reset to a sane default. + if (streamReadPreReadSize < 0) { + LOG.warn("Invalid ozone.client.stream.read.pre-read-size = {}. " + + "Resetting to default 32MB.", + streamReadPreReadSize); + streamReadPreReadSize = 32L << 20; // 32MB + } + + // Ensure response data size is positive. + if (streamReadResponseDataSize <= 0) { + LOG.warn("Invalid ozone.client.stream.read.response-data-size = {}. " + + "Resetting to default 1MB.", + streamReadResponseDataSize); + streamReadResponseDataSize = 1 << 20; // 1MB + } + + // Ensure stream read timeout is a positive duration. + Duration defaultTimeout = Duration.ofSeconds(10); + if (streamReadTimeout == null + || streamReadTimeout.isZero() + || streamReadTimeout.isNegative()) { + LOG.warn("Invalid ozone.client.stream.read.timeout = {}. " + + "Resetting to default {}.", + streamReadTimeout, defaultTimeout); + streamReadTimeout = defaultTimeout; + } } public long getStreamBufferFlushSize() { @@ -587,6 +614,18 @@ public Duration getStreamReadTimeout() { return streamReadTimeout; } + public void setStreamReadPreReadSize(long streamReadPreReadSize) { + this.streamReadPreReadSize = streamReadPreReadSize; + } + + public void setStreamReadResponseDataSize(int streamReadResponseDataSize) { + this.streamReadResponseDataSize = streamReadResponseDataSize; + } + + public void setStreamReadTimeout(Duration streamReadTimeout) { + this.streamReadTimeout = streamReadTimeout; + } + /** * Enum for indicating what mode to use when combining chunk and block * checksums to define an aggregate FileChecksum. This should be considered diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 83c0bdacd7b3..cefdb19c4d68 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -71,9 +71,10 @@ public class StreamBlockInputStream extends BlockExtendedInputStream { private final String name = "stream" + STREAM_ID.getAndIncrement(); private final BlockID blockID; private final long blockLength; - private final int responseDataSize; // Default size is 1 MB - private final long preReadSize; // Default size is 32 MB - private final Duration readTimeout; // // Default timeout is 10 second + private final int responseDataSize; + private final long preReadSize; + private final Duration readTimeout; + private final long readTimeoutNanos; private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = new AtomicReference<>(); private XceiverClientFactory xceiverClientFactory; @@ -106,6 +107,7 @@ public StreamBlockInputStream( this.preReadSize = config.getStreamReadPreReadSize(); this.responseDataSize = config.getStreamReadResponseDataSize(); this.readTimeout = config.getStreamReadTimeout(); + this.readTimeoutNanos = readTimeout.toNanos(); } @Override @@ -369,8 +371,8 @@ void checkError() throws IOException { } } - ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOException { - final long timeoutNanos = timeoutUnit.toNanos(timeout); + ReadBlockResponseProto poll() throws IOException { + final long timeoutNanos = readTimeoutNanos; final long startTime = System.nanoTime(); final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000); @@ -394,7 +396,7 @@ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOExceptio final long elapsedNanos = System.nanoTime() - startTime; if (elapsedNanos >= timeoutNanos) { setFailedAndThrow(new TimeoutIOException( - "Timed out " + timeout + " " + timeoutUnit + " waiting for response")); + "Timed out waiting for response after " + readTimeout)); return null; } } @@ -417,16 +419,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { } ByteBuffer readFromQueue() throws IOException { - // Convert Duration -> int seconds for poll(...) - final int timeoutSeconds; - if (readTimeout == null || readTimeout.isZero() || readTimeout.isNegative()) { - timeoutSeconds = 0; - } else { - long sec = readTimeout.getSeconds(); - // Prevent overflow if client config is extremely large - timeoutSeconds = sec > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) sec; - } - final ReadBlockResponseProto readBlock = poll(timeoutSeconds, TimeUnit.SECONDS); + final ReadBlockResponseProto readBlock = poll(); // The server always returns data starting from the last checksum boundary. Therefore if the reader position is // ahead of the position we received from the server, we need to adjust the buffer position accordingly. // If the reader position is behind diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java index 6aaa829df3ce..5ca219034687 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.junit.jupiter.api.Test; +import java.time.Duration; class TestOzoneClientConfig { @@ -76,4 +77,18 @@ void testClientHBaseEnhancementsAllowedFalse() { assertFalse(subject.getEnablePutblockPiggybacking()); assertEquals(1, subject.getMaxConcurrentWritePerKey()); } + + @Test + public void testStreamReadConfigParsing() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.client.stream.read.pre-read-size", "67108864"); + conf.set("ozone.client.stream.read.response-data-size", "2097152"); + conf.set("ozone.client.stream.read.timeout", "5s"); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + + assertEquals(64L << 20, clientConfig.getStreamReadPreReadSize()); + assertEquals(2 << 20, clientConfig.getStreamReadResponseDataSize()); + assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java new file mode 100644 index 000000000000..e6c85d823966 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hdds.scm.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.util.function.Function; + +public class TestStreamBlockInputStream { + + @Test + public void testCustomStreamReadConfigIsApplied() throws Exception { + // Arrange: create a config with non-default values + OzoneClientConfig clientConfig = new OzoneClientConfig(); + clientConfig.setStreamReadPreReadSize(64L << 20); + clientConfig.setStreamReadResponseDataSize(2 << 20); + clientConfig.setStreamReadTimeout(Duration.ofSeconds(5)); + + // Sanity check + assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + // Create a dummy BlockID for the test + BlockID blockID = new BlockID(1L, 1L); + long length = 1024L; + // Create a mock Pipeline instance. + Pipeline pipeline = mock(Pipeline.class); + + Token token = null; + // Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + Function refreshFunction = b -> null; + // Create a StreamBlockInputStream instance + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, length, pipeline, token, + xceiverClientFactory, refreshFunction, clientConfig)) { + + // Assert: fields should match config values + assertEquals(64L << 20, sbis.getPreReadSize()); + assertEquals(2 << 20, sbis.getResponseDataSize()); + assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 22a064ea5a88..940ad2e155fa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -693,28 +693,7 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE = "ozone.client.elastic.byte.buffer.pool.max.size"; public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; - - public static final String - OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE = - "ozone.client.stream.read.pre-read-size"; - public static final long - OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE_DEFAULT = - 32L << 20; - - public static final String - OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE = - "ozone.client.stream.read.response-data-size"; - public static final int - OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE_DEFAULT = - 1 << 20; - - public static final String - OZONE_CLIENT_STREAM_READ_TIMEOUT = - "ozone.client.stream.read.timeout"; - public static final int - OZONE_CLIENT_STREAM_READ_TIMEOUT_DEFAULT = - 10_000; - + /** * There is no need to instantiate this class. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java index 13d784d4d747..44b753210d91 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -23,28 +23,19 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; -import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService; import org.apache.hadoop.ozone.om.TestBucket; -import org.apache.hadoop.security.token.Token; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -289,39 +280,4 @@ private void testReadEmptyBlock() throws Exception { assertEquals(-1, keyInputStream.read()); } } - - @Test - public void testCustomStreamReadConfigIsApplied() throws Exception { - // Arrange: create a config with non-default values - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set("ozone.client.stream.read.pre-read-size", "67108864"); - conf.set("ozone.client.stream.read.response-data-size", "2097152"); - conf.set("ozone.client.stream.read.timeout", "5s"); - - OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); - - // Sanity check - assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); - // Create a dummy BlockID for the test - BlockID blockID = new BlockID(1L, 1L); - long length = 1024L; - // Use Mockito to create a mock Pipeline instance. - Pipeline pipeline = Mockito.mock(Pipeline.class); - - Token token = null; - // Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor - XceiverClientFactory xceiverClientFactory = Mockito.mock(XceiverClientFactory.class); - Function refreshFunction = b -> null; - // Create a StreamBlockInputStream instance - try (StreamBlockInputStream sbis = new StreamBlockInputStream( - blockID, length, pipeline, token, - xceiverClientFactory, refreshFunction, clientConfig)) { - - // Assert: fields should match config values - assertEquals(64L << 20, sbis.getPreReadSize()); - assertEquals(2 << 20, sbis.getResponseDataSize()); - assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); - } - } - } From dbf468c0f062b5072e5664407135f6c07119694b Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Fri, 12 Dec 2025 00:13:21 +0800 Subject: [PATCH 07/10] Fix checkstyle --- .../hdds/scm/TestOzoneClientConfig.java | 2 +- .../storage/TestStreamBlockInputStream.java | 24 +++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java index 5ca219034687..5c1eeccff919 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java @@ -22,9 +22,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.junit.jupiter.api.Test; -import java.time.Duration; class TestOzoneClientConfig { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java index e6c85d823966..568f7a3b918c 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -1,8 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdds.scm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; +import java.time.Duration; +import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -10,9 +29,10 @@ import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.junit.jupiter.api.Test; -import java.time.Duration; -import java.util.function.Function; +/** + * Tests for StreamBlockInputStream custom configuration behavior. + */ public class TestStreamBlockInputStream { @Test From 4befcdf19dbecc53c6077373743f6c178c78887c Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sat, 13 Dec 2025 09:44:26 +0800 Subject: [PATCH 08/10] Remove timeoutNanos and WhiteSpace --- .../hadoop/hdds/scm/storage/StreamBlockInputStream.java | 5 ++--- .../main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index cefdb19c4d68..cac3f75c5d35 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -372,9 +372,8 @@ void checkError() throws IOException { } ReadBlockResponseProto poll() throws IOException { - final long timeoutNanos = readTimeoutNanos; final long startTime = System.nanoTime(); - final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000); + final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000); while (true) { checkError(); @@ -394,7 +393,7 @@ ReadBlockResponseProto poll() throws IOException { } final long elapsedNanos = System.nanoTime() - startTime; - if (elapsedNanos >= timeoutNanos) { + if (elapsedNanos >= readTimeoutNanos) { setFailedAndThrow(new TimeoutIOException( "Timed out waiting for response after " + readTimeout)); return null; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 940ad2e155fa..8ca20171c3b9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -693,10 +693,10 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE = "ozone.client.elastic.byte.buffer.pool.max.size"; public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; - /** * There is no need to instantiate this class. */ + private OzoneConfigKeys() { } } From d17873248cfed2d17bcf63c5b4da6c6f6c8dcf78 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sat, 13 Dec 2025 16:54:02 +0800 Subject: [PATCH 09/10] Remove whitespace change --- .../src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8ca20171c3b9..940ad2e155fa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -693,10 +693,10 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE = "ozone.client.elastic.byte.buffer.pool.max.size"; public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; + /** * There is no need to instantiate this class. */ - private OzoneConfigKeys() { } } From c200ae693a73a52ee6b2e6215c2d07bd0a6f271a Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sat, 13 Dec 2025 17:07:53 +0800 Subject: [PATCH 10/10] Restore OzoneConfigKeys to remove whitespace change --- .../src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 940ad2e155fa..ceca7d0c8824 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -693,7 +693,7 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE = "ozone.client.elastic.byte.buffer.pool.max.size"; public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; - + /** * There is no need to instantiate this class. */