Skip to content

Commit df08861

Browse files
committed
Merge branch 'master' into HDDS-5713
2 parents 64fa938 + cafd9dd commit df08861

209 files changed

Lines changed: 5630 additions & 2110 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/intermittent-test-check.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ jobs:
129129
java-version: ${{ github.event.inputs.java-version }}
130130
- name: Build (most) of Ozone
131131
run: |
132-
args="-DskipRecon -DskipShade -Dmaven.javadoc.skip=true"
132+
args="-DskipRecon -DskipShade -Dmaven.javadoc.skip=true -Drocks_tools_native"
133133
if [[ "${{ github.event.inputs.ratis-ref }}" != "" ]]; then
134134
args="$args -Dratis.version=${{ needs.ratis.outputs.ratis-version }}"
135135
args="$args -Dratis.thirdparty.version=${{ needs.ratis.outputs.thirdparty-version }}"
@@ -199,7 +199,7 @@ jobs:
199199
export OZONE_REPO_CACHED=true
200200
fi
201201
202-
args="-DexcludedGroups=native|slow|unhealthy -DskipShade"
202+
args="-DexcludedGroups=native|slow|unhealthy -DskipShade -Drocks_tools_native"
203203
if [[ "${{ github.event.inputs.ratis-ref }}" != "" ]]; then
204204
args="$args -Dratis.version=${{ needs.ratis.outputs.ratis-version }}"
205205
args="$args -Dratis.thirdparty.version=${{ needs.ratis.outputs.thirdparty-version }}"

dev-support/ci/selective_ci_checks.bats

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,17 @@ load bats-assert/load.bash
309309
assert_output -p needs-kubernetes-tests=false
310310
}
311311

312+
@test "CI workflow change (check.yml)" {
313+
run dev-support/ci/selective_ci_checks.sh 1468af02067ec75b255f605816c32f8bf4dfaabf
314+
315+
assert_output -p 'basic-checks=["author","bats","checkstyle","docs","findbugs","pmd","rat"]'
316+
assert_output -p needs-build=true
317+
assert_output -p needs-compile=true
318+
assert_output -p needs-compose-tests=true
319+
assert_output -p needs-integration-tests=true
320+
assert_output -p needs-kubernetes-tests=true
321+
}
322+
312323
@test "CI workflow change (ci.yaml)" {
313324
run dev-support/ci/selective_ci_checks.sh 90fd5f2adc
314325

dev-support/ci/selective_ci_checks.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ function check_if_tests_are_needed_at_all() {
192192
function run_all_tests_if_environment_files_changed() {
193193
start_end::group_start "Check if everything should be run"
194194
local pattern_array=(
195+
"^.github/workflows/check.yml"
195196
"^.github/workflows/ci.yml"
196197
"^.github/workflows/post-commit.yml"
197198
"^dev-support/ci"

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdds.scm;
1919

2020
import com.google.common.base.Preconditions;
21+
import java.time.Duration;
2122
import org.apache.hadoop.hdds.conf.Config;
2223
import org.apache.hadoop.hdds.conf.ConfigGroup;
2324
import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -281,6 +282,27 @@ public class OzoneClientConfig {
281282
tags = ConfigTag.CLIENT)
282283
private int maxConcurrentWritePerKey = 1;
283284

285+
@Config(key = "ozone.client.stream.read.pre-read-size",
286+
defaultValue = "33554432",
287+
type = ConfigType.LONG,
288+
tags = {ConfigTag.CLIENT},
289+
description = "Extra bytes to prefetch during streaming reads.")
290+
private long streamReadPreReadSize = 32L << 20;
291+
292+
@Config(key = "ozone.client.stream.read.response-data-size",
293+
defaultValue = "1048576",
294+
type = ConfigType.INT,
295+
tags = {ConfigTag.CLIENT},
296+
description = "Chunk size of streaming read responses from datanodes.")
297+
private int streamReadResponseDataSize = 1 << 20;
298+
299+
@Config(key = "ozone.client.stream.read.timeout",
300+
defaultValue = "10s",
301+
type = ConfigType.TIME,
302+
tags = {ConfigTag.CLIENT},
303+
description = "Timeout for receiving streaming read responses.")
304+
private Duration streamReadTimeout = Duration.ofSeconds(10);
305+
284306
@PostConstruct
285307
public void validate() {
286308
Preconditions.checkState(streamBufferSize > 0);
@@ -335,6 +357,33 @@ public void validate() {
335357
}
336358
// Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here
337359
}
360+
// Validate streaming read configurations.
361+
// Ensure pre-read size is non-negative. If it's invalid, reset to a sane default.
362+
if (streamReadPreReadSize < 0) {
363+
LOG.warn("Invalid ozone.client.stream.read.pre-read-size = {}. " +
364+
"Resetting to default 32MB.",
365+
streamReadPreReadSize);
366+
streamReadPreReadSize = 32L << 20; // 32MB
367+
}
368+
369+
// Ensure response data size is positive.
370+
if (streamReadResponseDataSize <= 0) {
371+
LOG.warn("Invalid ozone.client.stream.read.response-data-size = {}. " +
372+
"Resetting to default 1MB.",
373+
streamReadResponseDataSize);
374+
streamReadResponseDataSize = 1 << 20; // 1MB
375+
}
376+
377+
// Ensure stream read timeout is a positive duration.
378+
Duration defaultTimeout = Duration.ofSeconds(10);
379+
if (streamReadTimeout == null
380+
|| streamReadTimeout.isZero()
381+
|| streamReadTimeout.isNegative()) {
382+
LOG.warn("Invalid ozone.client.stream.read.timeout = {}. " +
383+
"Resetting to default {}.",
384+
streamReadTimeout, defaultTimeout);
385+
streamReadTimeout = defaultTimeout;
386+
}
338387
}
339388

340389
public long getStreamBufferFlushSize() {
@@ -553,6 +602,30 @@ public void setStreamReadBlock(boolean streamReadBlock) {
553602
this.streamReadBlock = streamReadBlock;
554603
}
555604

605+
public long getStreamReadPreReadSize() {
606+
return streamReadPreReadSize;
607+
}
608+
609+
public int getStreamReadResponseDataSize() {
610+
return streamReadResponseDataSize;
611+
}
612+
613+
public Duration getStreamReadTimeout() {
614+
return streamReadTimeout;
615+
}
616+
617+
public void setStreamReadPreReadSize(long streamReadPreReadSize) {
618+
this.streamReadPreReadSize = streamReadPreReadSize;
619+
}
620+
621+
public void setStreamReadResponseDataSize(int streamReadResponseDataSize) {
622+
this.streamReadResponseDataSize = streamReadResponseDataSize;
623+
}
624+
625+
public void setStreamReadTimeout(Duration streamReadTimeout) {
626+
this.streamReadTimeout = streamReadTimeout;
627+
}
628+
556629
/**
557630
* Enum for indicating what mode to use when combining chunk and block
558631
* checksums to define an aggregate FileChecksum. This should be considered

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.EOFException;
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
25+
import java.time.Duration;
2526
import java.util.concurrent.BlockingQueue;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.ExecutionException;
@@ -70,8 +71,10 @@ public class StreamBlockInputStream extends BlockExtendedInputStream {
7071
private final String name = "stream" + STREAM_ID.getAndIncrement();
7172
private final BlockID blockID;
7273
private final long blockLength;
73-
private final int responseDataSize = 1 << 20; // 1 MB
74-
private final long preReadSize = 32 << 20; // 32 MB
74+
private final int responseDataSize;
75+
private final long preReadSize;
76+
private final Duration readTimeout;
77+
private final long readTimeoutNanos;
7578
private final AtomicReference<Pipeline> pipelineRef = new AtomicReference<>();
7679
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef = new AtomicReference<>();
7780
private XceiverClientFactory xceiverClientFactory;
@@ -101,6 +104,10 @@ public StreamBlockInputStream(
101104
this.verifyChecksum = config.isChecksumVerify();
102105
this.retryPolicy = getReadRetryPolicy(config);
103106
this.refreshFunction = refreshFunction;
107+
this.preReadSize = config.getStreamReadPreReadSize();
108+
this.responseDataSize = config.getStreamReadResponseDataSize();
109+
this.readTimeout = config.getStreamReadTimeout();
110+
this.readTimeoutNanos = readTimeout.toNanos();
104111
}
105112

106113
@Override
@@ -328,6 +335,19 @@ public String toString() {
328335
return name;
329336
}
330337

338+
public long getPreReadSize() {
339+
return preReadSize;
340+
}
341+
342+
public int getResponseDataSize() {
343+
return responseDataSize;
344+
}
345+
346+
/** Visible for testing: returns the configured streaming read timeout. */
347+
public Duration getReadTimeout() {
348+
return readTimeout;
349+
}
350+
331351
/**
332352
* Implementation of a StreamObserver used to received and buffer streaming GRPC reads.
333353
*/
@@ -351,10 +371,9 @@ void checkError() throws IOException {
351371
}
352372
}
353373

354-
ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOException {
355-
final long timeoutNanos = timeoutUnit.toNanos(timeout);
374+
ReadBlockResponseProto poll() throws IOException {
356375
final long startTime = System.nanoTime();
357-
final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000);
376+
final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000);
358377

359378
while (true) {
360379
checkError();
@@ -374,9 +393,9 @@ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws IOExceptio
374393
}
375394

376395
final long elapsedNanos = System.nanoTime() - startTime;
377-
if (elapsedNanos >= timeoutNanos) {
396+
if (elapsedNanos >= readTimeoutNanos) {
378397
setFailedAndThrow(new TimeoutIOException(
379-
"Timed out " + timeout + " " + timeoutUnit + " waiting for response"));
398+
"Timed out waiting for response after " + readTimeout));
380399
return null;
381400
}
382401
}
@@ -399,7 +418,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException {
399418
}
400419

401420
ByteBuffer readFromQueue() throws IOException {
402-
final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
421+
final ReadBlockResponseProto readBlock = poll();
403422
// The server always returns data starting from the last checksum boundary. Therefore if the reader position is
404423
// ahead of the position we received from the server, we need to adjust the buffer position accordingly.
405424
// If the reader position is behind

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestOzoneClientConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.jupiter.api.Assertions.assertFalse;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424

25+
import java.time.Duration;
2526
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
2627
import org.junit.jupiter.api.Test;
2728

@@ -76,4 +77,18 @@ void testClientHBaseEnhancementsAllowedFalse() {
7677
assertFalse(subject.getEnablePutblockPiggybacking());
7778
assertEquals(1, subject.getMaxConcurrentWritePerKey());
7879
}
80+
81+
@Test
82+
public void testStreamReadConfigParsing() {
83+
OzoneConfiguration conf = new OzoneConfiguration();
84+
conf.set("ozone.client.stream.read.pre-read-size", "67108864");
85+
conf.set("ozone.client.stream.read.response-data-size", "2097152");
86+
conf.set("ozone.client.stream.read.timeout", "5s");
87+
88+
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
89+
90+
assertEquals(64L << 20, clientConfig.getStreamReadPreReadSize());
91+
assertEquals(2 << 20, clientConfig.getStreamReadResponseDataSize());
92+
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
93+
}
7994
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.storage;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.mockito.Mockito.mock;
22+
23+
import java.time.Duration;
24+
import java.util.function.Function;
25+
import org.apache.hadoop.hdds.client.BlockID;
26+
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
27+
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
28+
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
29+
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
30+
import org.apache.hadoop.security.token.Token;
31+
import org.junit.jupiter.api.Test;
32+
33+
/**
34+
* Tests for StreamBlockInputStream custom configuration behavior.
35+
*/
36+
public class TestStreamBlockInputStream {
37+
38+
@Test
39+
public void testCustomStreamReadConfigIsApplied() throws Exception {
40+
// Arrange: create a config with non-default values
41+
OzoneClientConfig clientConfig = new OzoneClientConfig();
42+
clientConfig.setStreamReadPreReadSize(64L << 20);
43+
clientConfig.setStreamReadResponseDataSize(2 << 20);
44+
clientConfig.setStreamReadTimeout(Duration.ofSeconds(5));
45+
46+
// Sanity check
47+
assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
48+
// Create a dummy BlockID for the test
49+
BlockID blockID = new BlockID(1L, 1L);
50+
long length = 1024L;
51+
// Create a mock Pipeline instance.
52+
Pipeline pipeline = mock(Pipeline.class);
53+
54+
Token<OzoneBlockTokenIdentifier> token = null;
55+
// Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor
56+
XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class);
57+
Function<BlockID, BlockLocationInfo> refreshFunction = b -> null;
58+
// Create a StreamBlockInputStream instance
59+
try (StreamBlockInputStream sbis = new StreamBlockInputStream(
60+
blockID, length, pipeline, token,
61+
xceiverClientFactory, refreshFunction, clientConfig)) {
62+
63+
// Assert: fields should match config values
64+
assertEquals(64L << 20, sbis.getPreReadSize());
65+
assertEquals(2 << 20, sbis.getResponseDataSize());
66+
assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout());
67+
}
68+
}
69+
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
3232
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
3333
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
34+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
3435
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
3536
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
3637
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -57,10 +58,12 @@
5758
import java.util.Objects;
5859
import java.util.Optional;
5960
import java.util.OptionalInt;
61+
import java.util.Set;
6062
import java.util.TreeMap;
6163
import java.util.UUID;
6264
import javax.management.ObjectName;
6365
import org.apache.commons.lang3.StringUtils;
66+
import org.apache.commons.lang3.tuple.Pair;
6467
import org.apache.hadoop.conf.ConfigRedactor;
6568
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
6669
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -331,6 +334,38 @@ public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
331334
}
332335
}
333336

337+
/**
338+
* Returns the SCM address for datanodes based on the service ID and the SCM addresses.
339+
* @param conf Configuration
340+
* @param scmServiceId SCM service ID
341+
* @param scmNodeIds Requested SCM node IDs
342+
* @return A collection with addresses of the request SCM node IDs.
343+
* Null if there is any wrongly configured SCM address. Note that the returned collection
344+
* might not be ordered the same way as the requested SCM node IDs
345+
*/
346+
public static Collection<Pair<String, InetSocketAddress>> getSCMAddressForDatanodes(
347+
ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
348+
Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new HashSet<>(scmNodeIds.size());
349+
for (String scmNodeId : scmNodeIds) {
350+
String addressKey = ConfUtils.addKeySuffixes(
351+
OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
352+
String scmAddress = conf.get(addressKey);
353+
if (scmAddress == null) {
354+
LOG.warn("The SCM address configuration {} is not defined, return nothing", addressKey);
355+
return null;
356+
}
357+
358+
int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
359+
OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
360+
OZONE_SCM_DATANODE_PORT_DEFAULT);
361+
362+
String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress, scmDatanodePort);
363+
InetSocketAddress scmDatanodeAddress = NetUtils.createSocketAddr(scmDatanodeAddressStr);
364+
scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
365+
}
366+
return scmNodeAddress;
367+
}
368+
334369
/**
335370
* Retrieve the socket addresses of recon.
336371
*

0 commit comments

Comments
 (0)