Skip to content

Commit b2d152c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into HDDS-13919-conditional-writes
2 parents 7bfb33b + d5b9267 commit b2d152c

885 files changed

Lines changed: 35294 additions & 7738 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/intermittent-test-check.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ jobs:
129129
java-version: ${{ github.event.inputs.java-version }}
130130
- name: Build (most) of Ozone
131131
run: |
132-
args="-DskipRecon -DskipShade -Dmaven.javadoc.skip=true"
132+
args="-DskipRecon -DskipShade -Dmaven.javadoc.skip=true -Drocks_tools_native"
133133
if [[ "${{ github.event.inputs.ratis-ref }}" != "" ]]; then
134134
args="$args -Dratis.version=${{ needs.ratis.outputs.ratis-version }}"
135135
args="$args -Dratis.thirdparty.version=${{ needs.ratis.outputs.thirdparty-version }}"
@@ -199,7 +199,7 @@ jobs:
199199
export OZONE_REPO_CACHED=true
200200
fi
201201
202-
args="-DexcludedGroups=native|slow|unhealthy -DskipShade"
202+
args="-DexcludedGroups=native|slow|unhealthy -DskipShade -Drocks_tools_native"
203203
if [[ "${{ github.event.inputs.ratis-ref }}" != "" ]]; then
204204
args="$args -Dratis.version=${{ needs.ratis.outputs.ratis-version }}"
205205
args="$args -Dratis.thirdparty.version=${{ needs.ratis.outputs.thirdparty-version }}"

dev-support/byteman/hadooprpc.btm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ ENDRULE
3737

3838

3939
RULE Hadoop RPC source IP
40-
CLASS org.apache.hadoop.ipc.Server$RpcCall
40+
CLASS org.apache.hadoop.ipc_.Server$RpcCall
4141
METHOD run
4242
IF true
4343
DO link(Thread.currentThread(), "source", $0.connection.toString())

dev-support/pmd/pmd-ruleset.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,5 @@
5252
</rule>
5353

5454
<exclude-pattern>.*/generated-sources/.*</exclude-pattern>
55+
<exclude-pattern>.*/org/apache/hadoop/.*_/.*</exclude-pattern>
5556
</ruleset>

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ public class OzoneClientConfig {
113113
tags = ConfigTag.CLIENT)
114114
private long streamBufferMaxSize = 32 * 1024 * 1024;
115115

116+
@Config(key = "ozone.client.stream.readblock.enable",
117+
defaultValue = "false",
118+
type = ConfigType.BOOLEAN,
119+
description = "Allow ReadBlock to stream all the readChunk in one request.",
120+
tags = ConfigTag.CLIENT)
121+
private boolean streamReadBlock = false;
122+
116123
@Config(key = "ozone.client.max.retries",
117124
defaultValue = "5",
118125
description = "Maximum number of retries by Ozone Client on "
@@ -151,7 +158,7 @@ public class OzoneClientConfig {
151158
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
152159
+ "determines which algorithm would be used to compute checksum for "
153160
+ "chunk data. Default checksum type is CRC32.",
154-
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
161+
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
155162
private String checksumType = ChecksumType.CRC32.name();
156163

157164
@Config(key = "ozone.client.bytes.per.checksum",
@@ -160,7 +167,7 @@ public class OzoneClientConfig {
160167
description = "Checksum will be computed for every bytes per checksum "
161168
+ "number of bytes and stored sequentially. The minimum value for "
162169
+ "this config is 8KB.",
163-
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
170+
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
164171
private int bytesPerChecksum = 16 * 1024;
165172

166173
@Config(key = "ozone.client.verify.checksum",
@@ -538,6 +545,14 @@ public int getMaxConcurrentWritePerKey() {
538545
return this.maxConcurrentWritePerKey;
539546
}
540547

548+
public boolean isStreamReadBlock() {
549+
return streamReadBlock;
550+
}
551+
552+
public void setStreamReadBlock(boolean streamReadBlock) {
553+
this.streamReadBlock = streamReadBlock;
554+
}
555+
541556
/**
542557
* Enum for indicating what mode to use when combining chunk and block
543558
* checksums to define an aggregate FileChecksum. This should be considered

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.hadoop.hdds.scm;
1919

20-
import com.google.common.base.Preconditions;
2120
import java.io.IOException;
21+
import java.util.Objects;
2222
import org.apache.hadoop.hdds.conf.ConfigurationSource;
2323
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
2424
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -49,7 +49,7 @@ public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustMa
4949
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
5050
this.trustManager = trustManager;
5151
if (securityEnabled) {
52-
Preconditions.checkNotNull(trustManager);
52+
Objects.requireNonNull(trustManager, "trustManager == null");
5353
}
5454
}
5555

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.hadoop.hdds.HddsUtils.processForDebug;
2121

2222
import com.google.common.annotations.VisibleForTesting;
23-
import com.google.common.base.Preconditions;
2423
import java.io.IOException;
2524
import java.io.InterruptedIOException;
2625
import java.util.ArrayList;
@@ -37,6 +36,7 @@
3736
import java.util.concurrent.TimeUnit;
3837
import org.apache.hadoop.hdds.HddsConfigKeys;
3938
import org.apache.hadoop.hdds.HddsUtils;
39+
import org.apache.hadoop.hdds.client.BlockID;
4040
import org.apache.hadoop.hdds.conf.ConfigurationSource;
4141
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
4242
import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -59,10 +59,12 @@
5959
import org.apache.hadoop.ozone.OzoneConfigKeys;
6060
import org.apache.hadoop.ozone.OzoneConsts;
6161
import org.apache.hadoop.util.Time;
62+
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
6263
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
6364
import org.apache.ratis.thirdparty.io.grpc.Status;
6465
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
6566
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
67+
import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
6668
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
6769
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
6870
import org.slf4j.Logger;
@@ -111,8 +113,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
111113
public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
112114
ClientTrustManager trustManager) {
113115
super();
114-
Preconditions.checkNotNull(pipeline);
115-
Preconditions.checkNotNull(config);
116+
Objects.requireNonNull(pipeline, "pipeline == null");
117+
Objects.requireNonNull(config, "config == null");
116118
setTimeout(config.getTimeDuration(OzoneConfigKeys.
117119
OZONE_CLIENT_READ_TIMEOUT, OzoneConfigKeys
118120
.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
@@ -384,28 +386,15 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
384386
});
385387
}
386388

387-
private XceiverClientReply sendCommandWithRetry(
388-
ContainerCommandRequestProto request, List<Validator> validators)
389-
throws IOException {
390-
ContainerCommandResponseProto responseProto = null;
391-
IOException ioException = null;
389+
private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto request) throws IOException {
390+
return sortDatanodes(getRequestBlockID(request), request.getCmdType());
391+
}
392392

393-
// In case of an exception or an error, we will try to read from the
394-
// datanodes in the pipeline in a round-robin fashion.
395-
XceiverClientReply reply = new XceiverClientReply(null);
393+
List<DatanodeDetails> sortDatanodes(DatanodeBlockID blockID, ContainerProtos.Type cmdType) throws IOException {
396394
List<DatanodeDetails> datanodeList = null;
397395

398-
DatanodeBlockID blockID = null;
399-
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
400-
blockID = request.getGetBlock().getBlockID();
401-
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
402-
blockID = request.getReadChunk().getBlockID();
403-
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
404-
blockID = request.getGetSmallFile().getBlock().getBlockID();
405-
}
406-
407396
if (blockID != null) {
408-
if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
397+
if (cmdType != ContainerProtos.Type.ReadChunk) {
409398
datanodeList = pipeline.getNodes();
410399
int getBlockDNLeaderIndex = datanodeList.indexOf(pipeline.getLeaderNode());
411400
if (getBlockDNLeaderIndex > 0) {
@@ -440,6 +429,33 @@ private XceiverClientReply sendCommandWithRetry(
440429
if (!allInService) {
441430
datanodeList = sortDatanodeByOperationalState(datanodeList);
442431
}
432+
return datanodeList;
433+
}
434+
435+
private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto request) {
436+
DatanodeBlockID blockID = null;
437+
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
438+
blockID = request.getGetBlock().getBlockID();
439+
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
440+
blockID = request.getReadChunk().getBlockID();
441+
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
442+
blockID = request.getGetSmallFile().getBlock().getBlockID();
443+
} else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
444+
blockID = request.getReadBlock().getBlockID();
445+
}
446+
return blockID;
447+
}
448+
449+
private XceiverClientReply sendCommandWithRetry(
450+
ContainerCommandRequestProto request, List<Validator> validators)
451+
throws IOException {
452+
ContainerCommandResponseProto responseProto = null;
453+
IOException ioException = null;
454+
455+
// In case of an exception or an error, we will try to read from the
456+
// datanodes in the pipeline in a round-robin fashion.
457+
XceiverClientReply reply = new XceiverClientReply(null);
458+
List<DatanodeDetails> datanodeList = sortDatanodes(request);
443459

444460
for (DatanodeDetails dn : datanodeList) {
445461
try {
@@ -491,11 +507,11 @@ private XceiverClientReply sendCommandWithRetry(
491507
reply.setResponse(CompletableFuture.completedFuture(responseProto));
492508
return reply;
493509
} else {
494-
Objects.requireNonNull(ioException);
510+
Objects.requireNonNull(ioException, "ioException == null");
495511
String message = "Failed to execute command {}";
496512
if (LOG.isDebugEnabled()) {
497513
LOG.debug(message + " on the pipeline {}.",
498-
processForDebug(request), pipeline);
514+
processForDebug(request), pipeline);
499515
} else {
500516
LOG.warn(message + " on the pipeline {}.",
501517
request.getCmdType(), pipeline);
@@ -504,6 +520,61 @@ private XceiverClientReply sendCommandWithRetry(
504520
}
505521
}
506522

523+
@Override
524+
public void streamRead(ContainerCommandRequestProto request,
525+
StreamingReadResponse streamObserver) {
526+
if (LOG.isDebugEnabled()) {
527+
LOG.debug("->{}, send onNext request {}",
528+
streamObserver, TextFormat.shortDebugString(request.getReadBlock()));
529+
}
530+
streamObserver.getRequestObserver().onNext(request);
531+
}
532+
533+
@Override
534+
public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) throws IOException {
535+
final List<DatanodeDetails> datanodeList = sortDatanodes(null, ContainerProtos.Type.ReadBlock);
536+
IOException lastException = null;
537+
for (DatanodeDetails dn : datanodeList) {
538+
try {
539+
checkOpen(dn);
540+
semaphore.acquire();
541+
XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
542+
if (stub == null) {
543+
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
544+
}
545+
LOG.debug("initStreamRead {} on datanode {}", blockID.getContainerBlockID(), dn);
546+
StreamObserver<ContainerCommandRequestProto> requestObserver = stub
547+
.withDeadlineAfter(timeout, TimeUnit.SECONDS)
548+
.send(streamObserver);
549+
streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
550+
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver));
551+
return;
552+
} catch (IOException e) {
553+
LOG.error("Failed to start streaming read to DataNode {}", dn, e);
554+
semaphore.release();
555+
lastException = e;
556+
} catch (InterruptedException e) {
557+
Thread.currentThread().interrupt();
558+
throw new IOException("Interrupted initStreamRead to " + dn + " for " + blockID, e);
559+
}
560+
}
561+
if (lastException != null) {
562+
throw lastException;
563+
} else {
564+
throw new IOException("Failed to start streaming read to any available DataNodes");
565+
}
566+
}
567+
568+
/**
569+
* This method should be called to indicate the end of streaming read. Its primary purpose is to release the
570+
* semaphore acquired when starting the streaming read, but is also used to update any metrics or debug logs as
571+
* needed.
572+
*/
573+
@Override
574+
public void completeStreamRead() {
575+
semaphore.release();
576+
}
577+
507578
private static List<DatanodeDetails> sortDatanodeByOperationalState(
508579
List<DatanodeDetails> datanodeList) {
509580
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.common.cache.RemovalListener;
3030
import com.google.common.cache.RemovalNotification;
3131
import java.io.IOException;
32+
import java.util.Objects;
3233
import java.util.concurrent.TimeUnit;
3334
import org.apache.hadoop.hdds.conf.Config;
3435
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -79,8 +80,8 @@ public XceiverClientManager(ConfigurationSource conf,
7980
ScmClientConfig clientConf,
8081
ClientTrustManager trustManager) throws IOException {
8182
super(conf, trustManager);
82-
Preconditions.checkNotNull(clientConf);
83-
Preconditions.checkNotNull(conf);
83+
Objects.requireNonNull(clientConf, "clientConf == null");
84+
Objects.requireNonNull(conf, "conf == null");
8485
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
8586

8687
this.clientCache = CacheBuilder.newBuilder()
@@ -118,7 +119,7 @@ public Cache<String, XceiverClientSpi> getClientCache() {
118119
@Override
119120
public XceiverClientSpi acquireClient(Pipeline pipeline,
120121
boolean topologyAware) throws IOException {
121-
Preconditions.checkNotNull(pipeline);
122+
Objects.requireNonNull(pipeline, "pipeline == null");
122123
Preconditions.checkArgument(pipeline.getNodes() != null);
123124
Preconditions.checkArgument(!pipeline.getNodes().isEmpty(),
124125
NO_REPLICA_FOUND);
@@ -133,7 +134,7 @@ public XceiverClientSpi acquireClient(Pipeline pipeline,
133134
@Override
134135
public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
135136
boolean topologyAware) {
136-
Preconditions.checkNotNull(client);
137+
Objects.requireNonNull(client, "client == null");
137138
synchronized (clientCache) {
138139
client.decrementReference();
139140
if (invalidateClient) {
@@ -246,7 +247,7 @@ public static synchronized void resetXceiverClientMetrics() {
246247
@ConfigGroup(prefix = "scm.container.client")
247248
public static class ScmClientConfig {
248249

249-
@Config(key = "max.size",
250+
@Config(key = "scm.container.client.max.size",
250251
defaultValue = "256",
251252
tags = {OZONE, PERFORMANCE},
252253
description =
@@ -257,7 +258,7 @@ public static class ScmClientConfig {
257258
)
258259
private int maxSize;
259260

260-
@Config(key = "idle.threshold",
261+
@Config(key = "scm.container.client.idle.threshold",
261262
type = ConfigType.TIME, timeUnit = MILLISECONDS,
262263
defaultValue = "10s",
263264
tags = {OZONE, PERFORMANCE},

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,7 @@ public XceiverClientReply sendCommandAsync(
392392
// able to connect to leader in the pipeline, though the
393393
// pipeline can still be functional.
394394
RaftException exception = reply.getException();
395-
Preconditions.checkNotNull(exception, "Raft reply failure but " +
396-
"no exception propagated.");
395+
Objects.requireNonNull(exception, "Raft reply failure but no exception propagated.");
397396
throw new CompletionException(exception);
398397
}
399398
ContainerCommandResponseProto response =

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.hadoop.hdds.scm.client;
1919

20-
import com.google.common.base.Preconditions;
2120
import com.google.common.collect.ImmutableList;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
24+
import java.util.Objects;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.TimeoutException;
2727
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -219,7 +219,7 @@ public static void verifyKeyName(String keyName) {
219219
*/
220220
public static <T> void checkNotNull(T... references) {
221221
for (T ref: references) {
222-
Preconditions.checkNotNull(ref);
222+
Objects.requireNonNull(ref, "ref == null");
223223
}
224224
}
225225

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
105105
if (Thread.currentThread().isInterrupted()) {
106106
setExceptionAndThrow(exception);
107107
}
108-
Objects.requireNonNull(action);
108+
Objects.requireNonNull(action, "action == null");
109109
Preconditions.checkArgument(
110110
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
111111
if (action.delayMillis > 0) {

0 commit comments

Comments
 (0)