diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 7329f2c16b74..23378863c02e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import java.time.Duration; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -281,6 +282,27 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int maxConcurrentWritePerKey = 1; + @Config(key = "ozone.client.stream.read.pre-read-size", + defaultValue = "33554432", + type = ConfigType.LONG, + tags = {ConfigTag.CLIENT}, + description = "Extra bytes to prefetch during streaming reads.") + private long streamReadPreReadSize = 32L << 20; + + @Config(key = "ozone.client.stream.read.response-data-size", + defaultValue = "1048576", + type = ConfigType.INT, + tags = {ConfigTag.CLIENT}, + description = "Chunk size of streaming read responses from datanodes.") + private int streamReadResponseDataSize = 1 << 20; + + @Config(key = "ozone.client.stream.read.timeout", + defaultValue = "10s", + type = ConfigType.TIME, + tags = {ConfigTag.CLIENT}, + description = "Timeout for receiving streaming read responses.") + private Duration streamReadTimeout = Duration.ofSeconds(10); + @PostConstruct public void validate() { Preconditions.checkState(streamBufferSize > 0); @@ -335,6 +357,33 @@ public void validate() { } // Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here } + // Validate streaming read configurations. + // Ensure pre-read size is non-negative. If it's invalid, reset to a sane default. + if (streamReadPreReadSize < 0) { + LOG.warn("Invalid ozone.client.stream.read.pre-read-size = {}. " + + "Resetting to default 32MB.", + streamReadPreReadSize); + streamReadPreReadSize = 32L << 20; // 32MB + } + + // Ensure response data size is positive. + if (streamReadResponseDataSize <= 0) { + LOG.warn("Invalid ozone.client.stream.read.response-data-size = {}. " + + "Resetting to default 1MB.", + streamReadResponseDataSize); + streamReadResponseDataSize = 1 << 20; // 1MB + } + + // Ensure stream read timeout is a positive duration. + Duration defaultTimeout = Duration.ofSeconds(10); + if (streamReadTimeout == null + || streamReadTimeout.isZero() + || streamReadTimeout.isNegative()) { + LOG.warn("Invalid ozone.client.stream.read.timeout = {}. " + + "Resetting to default {}.", + streamReadTimeout, defaultTimeout); + streamReadTimeout = defaultTimeout; + } } public long getStreamBufferFlushSize() { @@ -553,6 +602,30 @@ public void setStreamReadBlock(boolean streamReadBlock) { this.streamReadBlock = streamReadBlock; } + public long getStreamReadPreReadSize() { + return streamReadPreReadSize; + } + + public int getStreamReadResponseDataSize() { + return streamReadResponseDataSize; + } + + public Duration getStreamReadTimeout() { + return streamReadTimeout; + } + + public void setStreamReadPreReadSize(long streamReadPreReadSize) { + this.streamReadPreReadSize = streamReadPreReadSize; + } + + public void setStreamReadResponseDataSize(int streamReadResponseDataSize) { + this.streamReadResponseDataSize = streamReadResponseDataSize; + } + + public void setStreamReadTimeout(Duration streamReadTimeout) { + this.streamReadTimeout = streamReadTimeout; + } + /** * Enum for indicating what mode to use when combining chunk and block * checksums to define an aggregate FileChecksum. This should be considered diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 72e5d11edc4a..cac3f75c5d35 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -70,8 +71,10 @@ public class StreamBlockInputStream extends BlockExtendedInputStream { private final String name = "stream" + STREAM_ID.getAndIncrement(); private final BlockID blockID; private final long blockLength; - private final int responseDataSize = 1 << 20; // 1 MB - private final long preReadSize = 32 << 20; // 32 MB + private final int responseDataSize; + private final long preReadSize; + private final Duration readTimeout; + private final long readTimeoutNanos; private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = new AtomicReference<>(); private XceiverClientFactory xceiverClientFactory; @@ -101,6 +104,10 @@ public StreamBlockInputStream( this.verifyChecksum = config.isChecksumVerify(); this.retryPolicy = getReadRetryPolicy(config); this.refreshFunction = refreshFunction; + this.preReadSize = config.getStreamReadPreReadSize(); + this.responseDataSize = config.getStreamReadResponseDataSize(); + this.readTimeout = config.getStreamReadTimeout(); + this.readTimeoutNanos = readTimeout.toNanos(); } @Override @@ -328,6 +335,19 @@ public String toString() { return name; } + public long getPreReadSize() { + return preReadSize; + } + + public int getResponseDataSize() { + return responseDataSize; + } + + /** Visible for testing: returns the configured streaming read timeout. */ + public Duration getReadTimeout() { + return readTimeout; + } + /** * Implementation of a StreamObserver used to received and buffer streaming GRPC reads. */ @@ -351,10 +371,9 @@ void checkError() throws IOException { } } - ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOException { - final long timeoutNanos = timeoutUnit.toNanos(timeout); + ReadBlockResponseProto poll() throws IOException { final long startTime = System.nanoTime(); - final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000); + final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000); while (true) { checkError(); @@ -374,9 +393,9 @@ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOExceptio } final long elapsedNanos = System.nanoTime() - startTime; - if (elapsedNanos >= timeoutNanos) { + if (elapsedNanos >= readTimeoutNanos) { setFailedAndThrow(new TimeoutIOException( - "Timed out " + timeout + " " + timeoutUnit + " waiting for response")); + "Timed out waiting for response after " + readTimeout)); return null; } } @@ -399,7 +418,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { } ByteBuffer readFromQueue() throws IOException { - final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS); + final ReadBlockResponseProto readBlock = poll(); // The server always returns data starting from the last checksum boundary. Therefore if the reader position is // ahead of the position we received from the server, we need to adjust the buffer position accordingly. // If the reader position is behind diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java index 6aaa829df3ce..5c1eeccff919 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.junit.jupiter.api.Test; @@ -76,4 +77,18 @@ void testClientHBaseEnhancementsAllowedFalse() { assertFalse(subject.getEnablePutblockPiggybacking()); assertEquals(1, subject.getMaxConcurrentWritePerKey()); } + + @Test + public void testStreamReadConfigParsing() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.client.stream.read.pre-read-size", "67108864"); + conf.set("ozone.client.stream.read.response-data-size", "2097152"); + conf.set("ozone.client.stream.read.timeout", "5s"); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + + assertEquals(64L << 20, clientConfig.getStreamReadPreReadSize()); + assertEquals(2 << 20, clientConfig.getStreamReadResponseDataSize()); + assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java new file mode 100644 index 000000000000..568f7a3b918c --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.function.Function; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import org.junit.jupiter.api.Test; + +/** + * Tests for StreamBlockInputStream custom configuration behavior. + */ +public class TestStreamBlockInputStream { + + @Test + public void testCustomStreamReadConfigIsApplied() throws Exception { + // Arrange: create a config with non-default values + OzoneClientConfig clientConfig = new OzoneClientConfig(); + clientConfig.setStreamReadPreReadSize(64L << 20); + clientConfig.setStreamReadResponseDataSize(2 << 20); + clientConfig.setStreamReadTimeout(Duration.ofSeconds(5)); + + // Sanity check + assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + // Create a dummy BlockID for the test + BlockID blockID = new BlockID(1L, 1L); + long length = 1024L; + // Create a mock Pipeline instance. + Pipeline pipeline = mock(Pipeline.class); + + Token token = null; + // Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + Function refreshFunction = b -> null; + // Create a StreamBlockInputStream instance + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, length, pipeline, token, + xceiverClientFactory, refreshFunction, clientConfig)) { + + // Assert: fields should match config values + assertEquals(64L << 20, sbis.getPreReadSize()); + assertEquals(2 << 20, sbis.getResponseDataSize()); + assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + } + } +}