Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm;

import com.google.common.base.Preconditions;
import java.time.Duration;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
Expand Down Expand Up @@ -281,6 +282,27 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int maxConcurrentWritePerKey = 1;

@Config(key = "ozone.client.stream.read.pre-read-size",
defaultValue = "33554432",
type = ConfigType.LONG,
tags = {ConfigTag.CLIENT},
description = "Extra bytes to prefetch during streaming reads.")
private long streamReadPreReadSize = 32L << 20;

@Config(key = "ozone.client.stream.read.response-data-size",
defaultValue = "1048576",
type = ConfigType.INT,
tags = {ConfigTag.CLIENT},
description = "Chunk size of streaming read responses from datanodes.")
private int streamReadResponseDataSize = 1 << 20;

@Config(key = "ozone.client.stream.read.timeout",
defaultValue = "10s",
type = ConfigType.TIME,
tags = {ConfigTag.CLIENT},
description = "Timeout for receiving streaming read responses.")
private Duration streamReadTimeout = Duration.ofSeconds(10);

@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -335,6 +357,33 @@ public void validate() {
}
// Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here
}
// Validate streaming read configurations.
// Ensure pre-read size is non-negative. If it's invalid, reset to a sane default.
if (streamReadPreReadSize < 0) {
LOG.warn("Invalid ozone.client.stream.read.pre-read-size = {}. " +
"Resetting to default 32MB.",
streamReadPreReadSize);
streamReadPreReadSize = 32L << 20; // 32MB
}

// Ensure response data size is positive.
if (streamReadResponseDataSize <= 0) {
LOG.warn("Invalid ozone.client.stream.read.response-data-size = {}. " +
"Resetting to default 1MB.",
streamReadResponseDataSize);
streamReadResponseDataSize = 1 << 20; // 1MB
}

// Ensure stream read timeout is a positive duration.
Duration defaultTimeout = Duration.ofSeconds(10);
if (streamReadTimeout == null
|| streamReadTimeout.isZero()
|| streamReadTimeout.isNegative()) {
LOG.warn("Invalid ozone.client.stream.read.timeout = {}. " +
"Resetting to default {}.",
streamReadTimeout, defaultTimeout);
streamReadTimeout = defaultTimeout;
}
}

public long getStreamBufferFlushSize() {
Expand Down Expand Up @@ -553,6 +602,30 @@ public void setStreamReadBlock(boolean streamReadBlock) {
this.streamReadBlock = streamReadBlock;
}

public long getStreamReadPreReadSize() {
return streamReadPreReadSize;
}

public int getStreamReadResponseDataSize() {
return streamReadResponseDataSize;
}

public Duration getStreamReadTimeout() {
return streamReadTimeout;
}

public void setStreamReadPreReadSize(long streamReadPreReadSize) {
this.streamReadPreReadSize = streamReadPreReadSize;
}

public void setStreamReadResponseDataSize(int streamReadResponseDataSize) {
this.streamReadResponseDataSize = streamReadResponseDataSize;
}

public void setStreamReadTimeout(Duration streamReadTimeout) {
this.streamReadTimeout = streamReadTimeout;
}

/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -70,8 +71,10 @@ public class StreamBlockInputStream extends BlockExtendedInputStream {
private final String name = "stream" + STREAM_ID.getAndIncrement();
private final BlockID blockID;
private final long blockLength;
private final int responseDataSize = 1 << 20; // 1 MB
private final long preReadSize = 32 << 20; // 32 MB
private final int responseDataSize;
private final long preReadSize;
private final Duration readTimeout;
private final long readTimeoutNanos;
private final AtomicReference<Pipeline> pipelineRef = new AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef = new AtomicReference<>();
private XceiverClientFactory xceiverClientFactory;
Expand Down Expand Up @@ -101,6 +104,10 @@ public StreamBlockInputStream(
this.verifyChecksum = config.isChecksumVerify();
this.retryPolicy = getReadRetryPolicy(config);
this.refreshFunction = refreshFunction;
this.preReadSize = config.getStreamReadPreReadSize();
this.responseDataSize = config.getStreamReadResponseDataSize();
this.readTimeout = config.getStreamReadTimeout();
this.readTimeoutNanos = readTimeout.toNanos();
}

@Override
Expand Down Expand Up @@ -328,6 +335,19 @@ public String toString() {
return name;
}

public long getPreReadSize() {
return preReadSize;
}

public int getResponseDataSize() {
return responseDataSize;
}

/** Visible for testing: returns the configured streaming read timeout. */
public Duration getReadTimeout() {
return readTimeout;
}

/**
* Implementation of a StreamObserver used to received and buffer streaming GRPC reads.
*/
Expand All @@ -351,10 +371,9 @@ void checkError() throws IOException {
}
}

ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOException {
final long timeoutNanos = timeoutUnit.toNanos(timeout);
ReadBlockResponseProto poll() throws IOException {
final long startTime = System.nanoTime();
final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000);
final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000);

while (true) {
checkError();
Expand All @@ -374,9 +393,9 @@ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOExceptio
}

final long elapsedNanos = System.nanoTime() - startTime;
if (elapsedNanos >= timeoutNanos) {
if (elapsedNanos >= readTimeoutNanos) {
setFailedAndThrow(new TimeoutIOException(
"Timed out " + timeout + " " + timeoutUnit + " waiting for response"));
"Timed out waiting for response after " + readTimeout));
return null;
}
}
Expand All @@ -399,7 +418,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException {
}

ByteBuffer readFromQueue() throws IOException {
final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
final ReadBlockResponseProto readBlock = poll();
// The server always returns data starting from the last checksum boundary. Therefore if the reader position is
// ahead of the position we received from the server, we need to adjust the buffer position accordingly.
// If the reader position is behind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -76,4 +77,18 @@ void testClientHBaseEnhancementsAllowedFalse() {
assertFalse(subject.getEnablePutblockPiggybacking());
assertEquals(1, subject.getMaxConcurrentWritePerKey());
}

@Test
public void testStreamReadConfigParsing() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("ozone.client.stream.read.pre-read-size", "67108864");
conf.set("ozone.client.stream.read.response-data-size", "2097152");
conf.set("ozone.client.stream.read.timeout", "5s");

OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);

assertEquals(64L << 20, clientConfig.getStreamReadPreReadSize());
assertEquals(2 << 20, clientConfig.getStreamReadResponseDataSize());
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.storage;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.junit.jupiter.api.Test;

/**
* Tests for StreamBlockInputStream custom configuration behavior.
*/
public class TestStreamBlockInputStream {

@Test
public void testCustomStreamReadConfigIsApplied() throws Exception {
// Arrange: create a config with non-default values
OzoneClientConfig clientConfig = new OzoneClientConfig();
clientConfig.setStreamReadPreReadSize(64L << 20);
clientConfig.setStreamReadResponseDataSize(2 << 20);
clientConfig.setStreamReadTimeout(Duration.ofSeconds(5));

// Sanity check
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
// Create a dummy BlockID for the test
BlockID blockID = new BlockID(1L, 1L);
long length = 1024L;
// Create a mock Pipeline instance.
Pipeline pipeline = mock(Pipeline.class);

Token<OzoneBlockTokenIdentifier> token = null;
// Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor
XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class);
Function<BlockID, BlockLocationInfo> refreshFunction = b -> null;
// Create a StreamBlockInputStream instance
try (StreamBlockInputStream sbis = new StreamBlockInputStream(
blockID, length, pipeline, token,
xceiverClientFactory, refreshFunction, clientConfig)) {

// Assert: fields should match config values
assertEquals(64L << 20, sbis.getPreReadSize());
assertEquals(2 << 20, sbis.getResponseDataSize());
assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout());
}
}
}