Skip to content

Commit 114d242

Browse files
authored
HDDS-14036. StreamRead: Make preReadSize, responseDataSize and read timeout configurable (#9461)
1 parent d5b9267 commit 114d242

4 files changed

Lines changed: 184 additions & 8 deletions

File tree

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdds.scm;
1919

2020
import com.google.common.base.Preconditions;
21+
import java.time.Duration;
2122
import org.apache.hadoop.hdds.conf.Config;
2223
import org.apache.hadoop.hdds.conf.ConfigGroup;
2324
import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -281,6 +282,27 @@ public class OzoneClientConfig {
281282
tags = ConfigTag.CLIENT)
282283
private int maxConcurrentWritePerKey = 1;
283284

285+
@Config(key = "ozone.client.stream.read.pre-read-size",
286+
defaultValue = "33554432",
287+
type = ConfigType.LONG,
288+
tags = {ConfigTag.CLIENT},
289+
description = "Extra bytes to prefetch during streaming reads.")
290+
private long streamReadPreReadSize = 32L << 20;
291+
292+
@Config(key = "ozone.client.stream.read.response-data-size",
293+
defaultValue = "1048576",
294+
type = ConfigType.INT,
295+
tags = {ConfigTag.CLIENT},
296+
description = "Chunk size of streaming read responses from datanodes.")
297+
private int streamReadResponseDataSize = 1 << 20;
298+
299+
@Config(key = "ozone.client.stream.read.timeout",
300+
defaultValue = "10s",
301+
type = ConfigType.TIME,
302+
tags = {ConfigTag.CLIENT},
303+
description = "Timeout for receiving streaming read responses.")
304+
private Duration streamReadTimeout = Duration.ofSeconds(10);
305+
284306
@PostConstruct
285307
public void validate() {
286308
Preconditions.checkState(streamBufferSize > 0);
@@ -335,6 +357,33 @@ public void validate() {
335357
}
336358
// Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here
337359
}
360+
// Validate streaming read configurations.
361+
// Ensure pre-read size is non-negative. If it's invalid, reset to a sane default.
362+
if (streamReadPreReadSize < 0) {
363+
LOG.warn("Invalid ozone.client.stream.read.pre-read-size = {}. " +
364+
"Resetting to default 32MB.",
365+
streamReadPreReadSize);
366+
streamReadPreReadSize = 32L << 20; // 32MB
367+
}
368+
369+
// Ensure response data size is positive.
370+
if (streamReadResponseDataSize <= 0) {
371+
LOG.warn("Invalid ozone.client.stream.read.response-data-size = {}. " +
372+
"Resetting to default 1MB.",
373+
streamReadResponseDataSize);
374+
streamReadResponseDataSize = 1 << 20; // 1MB
375+
}
376+
377+
// Ensure stream read timeout is a positive duration.
378+
Duration defaultTimeout = Duration.ofSeconds(10);
379+
if (streamReadTimeout == null
380+
|| streamReadTimeout.isZero()
381+
|| streamReadTimeout.isNegative()) {
382+
LOG.warn("Invalid ozone.client.stream.read.timeout = {}. " +
383+
"Resetting to default {}.",
384+
streamReadTimeout, defaultTimeout);
385+
streamReadTimeout = defaultTimeout;
386+
}
338387
}
339388

340389
public long getStreamBufferFlushSize() {
@@ -553,6 +602,30 @@ public void setStreamReadBlock(boolean streamReadBlock) {
553602
this.streamReadBlock = streamReadBlock;
554603
}
555604

605+
public long getStreamReadPreReadSize() {
606+
return streamReadPreReadSize;
607+
}
608+
609+
public int getStreamReadResponseDataSize() {
610+
return streamReadResponseDataSize;
611+
}
612+
613+
public Duration getStreamReadTimeout() {
614+
return streamReadTimeout;
615+
}
616+
617+
public void setStreamReadPreReadSize(long streamReadPreReadSize) {
618+
this.streamReadPreReadSize = streamReadPreReadSize;
619+
}
620+
621+
public void setStreamReadResponseDataSize(int streamReadResponseDataSize) {
622+
this.streamReadResponseDataSize = streamReadResponseDataSize;
623+
}
624+
625+
public void setStreamReadTimeout(Duration streamReadTimeout) {
626+
this.streamReadTimeout = streamReadTimeout;
627+
}
628+
556629
/**
557630
* Enum for indicating what mode to use when combining chunk and block
558631
* checksums to define an aggregate FileChecksum. This should be considered

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.EOFException;
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
25+
import java.time.Duration;
2526
import java.util.concurrent.BlockingQueue;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.ExecutionException;
@@ -70,8 +71,10 @@ public class StreamBlockInputStream extends BlockExtendedInputStream {
7071
private final String name = "stream" + STREAM_ID.getAndIncrement();
7172
private final BlockID blockID;
7273
private final long blockLength;
73-
private final int responseDataSize = 1 << 20; // 1 MB
74-
private final long preReadSize = 32 << 20; // 32 MB
74+
private final int responseDataSize;
75+
private final long preReadSize;
76+
private final Duration readTimeout;
77+
private final long readTimeoutNanos;
7578
private final AtomicReference<Pipeline> pipelineRef = new AtomicReference<>();
7679
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef = new AtomicReference<>();
7780
private XceiverClientFactory xceiverClientFactory;
@@ -101,6 +104,10 @@ public StreamBlockInputStream(
101104
this.verifyChecksum = config.isChecksumVerify();
102105
this.retryPolicy = getReadRetryPolicy(config);
103106
this.refreshFunction = refreshFunction;
107+
this.preReadSize = config.getStreamReadPreReadSize();
108+
this.responseDataSize = config.getStreamReadResponseDataSize();
109+
this.readTimeout = config.getStreamReadTimeout();
110+
this.readTimeoutNanos = readTimeout.toNanos();
104111
}
105112

106113
@Override
@@ -328,6 +335,19 @@ public String toString() {
328335
return name;
329336
}
330337

338+
public long getPreReadSize() {
339+
return preReadSize;
340+
}
341+
342+
public int getResponseDataSize() {
343+
return responseDataSize;
344+
}
345+
346+
/** Visible for testing: returns the configured streaming read timeout. */
347+
public Duration getReadTimeout() {
348+
return readTimeout;
349+
}
350+
331351
/**
332352
* Implementation of a StreamObserver used to received and buffer streaming GRPC reads.
333353
*/
@@ -351,10 +371,9 @@ void checkError() throws IOException {
351371
}
352372
}
353373

354-
ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOException {
355-
final long timeoutNanos = timeoutUnit.toNanos(timeout);
374+
ReadBlockResponseProto poll() throws IOException {
356375
final long startTime = System.nanoTime();
357-
final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000);
376+
final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000);
358377

359378
while (true) {
360379
checkError();
@@ -374,9 +393,9 @@ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOExceptio
374393
}
375394

376395
final long elapsedNanos = System.nanoTime() - startTime;
377-
if (elapsedNanos >= timeoutNanos) {
396+
if (elapsedNanos >= readTimeoutNanos) {
378397
setFailedAndThrow(new TimeoutIOException(
379-
"Timed out " + timeout + " " + timeoutUnit + " waiting for response"));
398+
"Timed out waiting for response after " + readTimeout));
380399
return null;
381400
}
382401
}
@@ -399,7 +418,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException {
399418
}
400419

401420
ByteBuffer readFromQueue() throws IOException {
402-
final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
421+
final ReadBlockResponseProto readBlock = poll();
403422
// The server always returns data starting from the last checksum boundary. Therefore if the reader position is
404423
// ahead of the position we received from the server, we need to adjust the buffer position accordingly.
405424
// If the reader position is behind

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.jupiter.api.Assertions.assertFalse;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424

25+
import java.time.Duration;
2526
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
2627
import org.junit.jupiter.api.Test;
2728

@@ -76,4 +77,18 @@ void testClientHBaseEnhancementsAllowedFalse() {
7677
assertFalse(subject.getEnablePutblockPiggybacking());
7778
assertEquals(1, subject.getMaxConcurrentWritePerKey());
7879
}
80+
81+
@Test
82+
public void testStreamReadConfigParsing() {
83+
OzoneConfiguration conf = new OzoneConfiguration();
84+
conf.set("ozone.client.stream.read.pre-read-size", "67108864");
85+
conf.set("ozone.client.stream.read.response-data-size", "2097152");
86+
conf.set("ozone.client.stream.read.timeout", "5s");
87+
88+
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
89+
90+
assertEquals(64L << 20, clientConfig.getStreamReadPreReadSize());
91+
assertEquals(2 << 20, clientConfig.getStreamReadResponseDataSize());
92+
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
93+
}
7994
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.storage;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.mockito.Mockito.mock;
22+
23+
import java.time.Duration;
24+
import java.util.function.Function;
25+
import org.apache.hadoop.hdds.client.BlockID;
26+
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
27+
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
28+
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
29+
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
30+
import org.apache.hadoop.security.token.Token;
31+
import org.junit.jupiter.api.Test;
32+
33+
/**
34+
* Tests for StreamBlockInputStream custom configuration behavior.
35+
*/
36+
public class TestStreamBlockInputStream {
37+
38+
@Test
39+
public void testCustomStreamReadConfigIsApplied() throws Exception {
40+
// Arrange: create a config with non-default values
41+
OzoneClientConfig clientConfig = new OzoneClientConfig();
42+
clientConfig.setStreamReadPreReadSize(64L << 20);
43+
clientConfig.setStreamReadResponseDataSize(2 << 20);
44+
clientConfig.setStreamReadTimeout(Duration.ofSeconds(5));
45+
46+
// Sanity check
47+
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
48+
// Create a dummy BlockID for the test
49+
BlockID blockID = new BlockID(1L, 1L);
50+
long length = 1024L;
51+
// Create a mock Pipeline instance.
52+
Pipeline pipeline = mock(Pipeline.class);
53+
54+
Token<OzoneBlockTokenIdentifier> token = null;
55+
// Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor
56+
XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class);
57+
Function<BlockID, BlockLocationInfo> refreshFunction = b -> null;
58+
// Create a StreamBlockInputStream instance
59+
try (StreamBlockInputStream sbis = new StreamBlockInputStream(
60+
blockID, length, pipeline, token,
61+
xceiverClientFactory, refreshFunction, clientConfig)) {
62+
63+
// Assert: fields should match config values
64+
assertEquals(64L << 20, sbis.getPreReadSize());
65+
assertEquals(2 << 20, sbis.getResponseDataSize());
66+
assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout());
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)