Skip to content

Commit b798aec

Browse files
committed
Storage tiering. Thread storageType vis DatanodeBlockID
1 parent 9c96bcb commit b798aec

58 files changed

Lines changed: 1398 additions & 84 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public class BlockOutputStream extends OutputStream {
103103
= new AtomicReference<>();
104104

105105
private final BlockData.Builder containerBlockData;
106+
private final ContainerProtos.StorageTypeProto storageType;
106107
private volatile XceiverClientFactory xceiverClientFactory;
107108
private XceiverClientSpi xceiverClient;
108109
private OzoneClientConfig config;
@@ -177,7 +178,8 @@ public BlockOutputStream(
177178
OzoneClientConfig config,
178179
Token<? extends TokenIdentifier> token,
179180
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
180-
Supplier<ExecutorService> blockOutputStreamResourceProvider
181+
Supplier<ExecutorService> blockOutputStreamResourceProvider,
182+
ContainerProtos.StorageTypeProto storageType
181183
) throws IOException {
182184
this.xceiverClientFactory = xceiverClientManager;
183185
this.config = config;
@@ -195,6 +197,10 @@ public BlockOutputStream(
195197
if (replicationIndex > 0) {
196198
blkIDBuilder.setReplicaIndex(replicationIndex);
197199
}
200+
if (storageType != null) {
201+
blkIDBuilder.setStorageType(storageType);
202+
}
203+
this.storageType = storageType;
198204
this.containerBlockData = BlockData.newBuilder().setBlockID(
199205
blkIDBuilder.build()).addMetadata(keyValue);
200206
this.pipeline = pipeline;
@@ -964,7 +970,8 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
964970
}
965971

966972
asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
967-
blockID.get(), data, tokenString, replicationIndex, blockData, close);
973+
blockID.get(), data, tokenString, replicationIndex, blockData, close,
974+
storageType);
968975
CompletableFuture<ContainerCommandResponseProto>
969976
respFuture = asyncReply.getResponse();
970977
validateFuture = respFuture.thenApplyAsync(e -> {

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,12 @@ public ECBlockOutputStream(
8282
OzoneClientConfig config,
8383
Token<? extends TokenIdentifier> token,
8484
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
85-
Supplier<ExecutorService> executorServiceSupplier
85+
Supplier<ExecutorService> executorServiceSupplier,
86+
ContainerProtos.StorageTypeProto storageType
8687
) throws IOException {
8788
super(blockID, -1, xceiverClientManager,
88-
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
89+
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier,
90+
storageType);
8991
// In EC stream, there will be only one node in pipeline.
9092
this.datanodeDetails = pipeline.getClosestNode();
9193
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ public RatisBlockOutputStream(
7878
OzoneClientConfig config,
7979
Token<? extends TokenIdentifier> token,
8080
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
81-
Supplier<ExecutorService> blockOutputStreamResourceProvider
81+
Supplier<ExecutorService> blockOutputStreamResourceProvider,
82+
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.StorageTypeProto storageType
8283
) throws IOException {
8384
super(blockID, blockSize, xceiverClientManager, pipeline,
84-
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
85+
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider,
86+
storageType);
8587
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
8688
}
8789

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
179179
null,
180180
ContainerClientMetrics.acquire(),
181181
streamBufferArgs,
182-
() -> newFixedThreadPool(10));
182+
() -> newFixedThreadPool(10),
183+
null);
183184
}
184185

185186
private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientConfig,
@@ -193,7 +194,7 @@ private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientCo
193194
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, clientConfig);
194195

195196
return new ECBlockOutputStream(blockID, xcm, pipeline, BufferPool.empty(), clientConfig, null,
196-
clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2));
197+
clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2), null);
197198
}
198199

199200
/**

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.lang3.builder.HashCodeBuilder;
2929
import org.apache.hadoop.hdds.client.ECReplicationConfig;
3030
import org.apache.hadoop.hdds.client.ReplicationConfig;
31+
import org.apache.hadoop.hdds.protocol.StorageType;
3132
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
3233
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
3334
import org.apache.hadoop.hdds.utils.db.Codec;
@@ -87,6 +88,7 @@ public final class ContainerInfo implements Comparable<ContainerInfo> {
8788
private long sequenceId;
8889
// Health state of the container (determined by ReplicationManager)
8990
private ContainerHealthState healthState;
91+
private final StorageType storageType;
9092

9193
private ContainerInfo(Builder b) {
9294
containerID = ContainerID.valueOf(b.containerID);
@@ -102,6 +104,7 @@ private ContainerInfo(Builder b) {
102104
replicationConfig = b.replicationConfig;
103105
clock = b.clock;
104106
healthState = b.healthState != null ? b.healthState : ContainerHealthState.HEALTHY;
107+
storageType = b.storageType;
105108
}
106109

107110
public static Codec<ContainerInfo> getCodec() {
@@ -126,6 +129,9 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
126129
if (info.hasPipelineID()) {
127130
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
128131
}
132+
if (info.hasStorageType()) {
133+
builder.setStorageType(StorageType.valueOf(info.getStorageType()));
134+
}
129135
return builder.build();
130136

131137
}
@@ -288,9 +294,17 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
288294
builder.setPipelineID(getPipelineID().getProtobuf());
289295
}
290296

297+
if (storageType != null) {
298+
builder.setStorageType(storageType.toProto());
299+
}
300+
291301
return builder.build();
292302
}
293303

304+
public StorageType getStorageType() {
305+
return storageType;
306+
}
307+
294308
public String getOwner() {
295309
return owner;
296310
}
@@ -390,6 +404,7 @@ public static class Builder {
390404
private PipelineID pipelineID;
391405
private ReplicationConfig replicationConfig;
392406
private ContainerHealthState healthState;
407+
private StorageType storageType;
393408

394409
public Builder setPipelineID(PipelineID pipelineId) {
395410
this.pipelineID = pipelineId;
@@ -447,6 +462,11 @@ public Builder setHealthState(ContainerHealthState healthState) {
447462
return this;
448463
}
449464

465+
public Builder setStorageType(StorageType storageType) {
466+
this.storageType = storageType;
467+
return this;
468+
}
469+
450470
/**
451471
* Also resets {@code stateEnterTime}, so make sure to set clock first.
452472
*/

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Objects;
2121
import org.apache.hadoop.hdds.client.BlockID;
22+
import org.apache.hadoop.hdds.protocol.StorageType;
2223
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
2324
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
2425
import org.apache.hadoop.security.token.Token;
@@ -43,6 +44,8 @@ public class BlockLocationInfo {
4344
// The block is under construction. Apply to hsynced file last block.
4445
private boolean underConstruction;
4546

47+
private StorageType storageType;
48+
4649
protected BlockLocationInfo(Builder builder) {
4750
this.blockID = builder.blockID;
4851
this.pipeline = builder.pipeline;
@@ -51,6 +54,7 @@ protected BlockLocationInfo(Builder builder) {
5154
this.token = builder.token;
5255
this.partNumber = builder.partNumber;
5356
this.createVersion = builder.createVersion;
57+
this.storageType = builder.storageType;
5458
}
5559

5660
public void setCreateVersion(long version) {
@@ -121,6 +125,14 @@ public boolean isUnderConstruction() {
121125
return this.underConstruction;
122126
}
123127

128+
public StorageType getStorageType() {
129+
return storageType;
130+
}
131+
132+
public void setStorageType(StorageType storageType) {
133+
this.storageType = storageType;
134+
}
135+
124136
/**
125137
* Builder of BlockLocationInfo.
126138
*/
@@ -132,6 +144,7 @@ public static class Builder {
132144
private Pipeline pipeline;
133145
private int partNumber;
134146
private long createVersion;
147+
private StorageType storageType;
135148

136149
public Builder setBlockID(BlockID blockId) {
137150
this.blockID = blockId;
@@ -168,6 +181,11 @@ public Builder setCreateVersion(long version) {
168181
return this;
169182
}
170183

184+
public Builder setStorageType(StorageType storageType) {
185+
this.storageType = storageType;
186+
return this;
187+
}
188+
171189
public BlockLocationInfo build() {
172190
return new BlockLocationInfo(this);
173191
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -436,17 +436,21 @@ static long getLen(ReadChunkResponseProto response) {
436436
public static XceiverClientReply writeChunkAsync(
437437
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
438438
ByteString data, String tokenString,
439-
int replicationIndex, BlockData blockData, boolean close)
439+
int replicationIndex, BlockData blockData, boolean close,
440+
ContainerProtos.StorageTypeProto storageType)
440441
throws IOException, ExecutionException, InterruptedException {
441442

443+
DatanodeBlockID.Builder blkIDBuilder = DatanodeBlockID.newBuilder()
444+
.setContainerID(blockID.getContainerID())
445+
.setLocalID(blockID.getLocalID())
446+
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId())
447+
.setReplicaIndex(replicationIndex);
448+
if (storageType != null) {
449+
blkIDBuilder.setStorageType(storageType);
450+
}
442451
WriteChunkRequestProto.Builder writeChunkRequest =
443452
WriteChunkRequestProto.newBuilder()
444-
.setBlockID(DatanodeBlockID.newBuilder()
445-
.setContainerID(blockID.getContainerID())
446-
.setLocalID(blockID.getLocalID())
447-
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId())
448-
.setReplicaIndex(replicationIndex)
449-
.build())
453+
.setBlockID(blkIDBuilder.build())
450454
.setChunkData(chunk)
451455
.setData(data);
452456
if (blockData != null) {
@@ -548,6 +552,15 @@ public static void createRecoveringContainer(XceiverClientSpi client,
548552
ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
549553
}
550554

555+
@InterfaceStability.Evolving
556+
public static void createRecoveringContainer(XceiverClientSpi client,
557+
long containerID, String encodedToken, int replicaIndex,
558+
ContainerProtos.StorageTypeProto storageType) throws IOException {
559+
createContainer(client, containerID, encodedToken,
560+
ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex,
561+
storageType);
562+
}
563+
551564
/**
552565
* createContainer call that creates a container on the datanode.
553566
* @param client - client

hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.commons.lang3.builder.HashCodeBuilder;
3333
import org.apache.hadoop.hdds.client.ECReplicationConfig;
3434
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
35+
import org.apache.hadoop.hdds.protocol.StorageType;
3536
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
3637
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
3738
import org.apache.ozone.test.TestClock;
@@ -131,6 +132,29 @@ void restoreState() {
131132
assertThrows(IllegalStateException.class, subject::revertState);
132133
}
133134

135+
@Test
136+
void getProtobufWithStorageType() {
137+
ContainerInfo container = newBuilderForTest()
138+
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
139+
.setStorageType(StorageType.SSD)
140+
.build();
141+
142+
HddsProtos.ContainerInfoProto proto = container.getProtobuf();
143+
ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
144+
assertEquals(StorageType.SSD, recovered.getStorageType());
145+
}
146+
147+
@Test
148+
void getProtobufWithNullStorageType() {
149+
ContainerInfo container = newBuilderForTest()
150+
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
151+
.build();
152+
153+
HddsProtos.ContainerInfoProto proto = container.getProtobuf();
154+
ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
155+
assertEquals(null, recovered.getStorageType());
156+
}
157+
134158
public static ContainerInfo.Builder newBuilderForTest() {
135159
return new ContainerInfo.Builder()
136160
.setContainerID(1234)

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -495,15 +495,36 @@ ContainerCommandResponseProto createContainer(
495495
ContainerProtos.ContainerType.KeyValueContainer;
496496
createRequest.setContainerType(containerType);
497497

498+
ContainerProtos.StorageTypeProto storageType = null;
498499
if (containerRequest.hasWriteChunk()) {
499-
createRequest.setReplicaIndex(
500-
containerRequest.getWriteChunk().getBlockID().getReplicaIndex());
500+
ContainerProtos.DatanodeBlockID bid =
501+
containerRequest.getWriteChunk().getBlockID();
502+
createRequest.setReplicaIndex(bid.getReplicaIndex());
503+
if (bid.hasStorageType()) {
504+
storageType = bid.getStorageType();
505+
}
501506
}
502507

503508
if (containerRequest.hasPutBlock()) {
504-
createRequest.setReplicaIndex(
505-
containerRequest.getPutBlock().getBlockData().getBlockID()
506-
.getReplicaIndex());
509+
ContainerProtos.DatanodeBlockID bid =
510+
containerRequest.getPutBlock().getBlockData().getBlockID();
511+
createRequest.setReplicaIndex(bid.getReplicaIndex());
512+
if (bid.hasStorageType()) {
513+
storageType = bid.getStorageType();
514+
}
515+
}
516+
517+
if (containerRequest.hasPutSmallFile()) {
518+
ContainerProtos.DatanodeBlockID bid =
519+
containerRequest.getPutSmallFile()
520+
.getBlock().getBlockData().getBlockID();
521+
if (bid.hasStorageType()) {
522+
storageType = bid.getStorageType();
523+
}
524+
}
525+
526+
if (storageType != null) {
527+
createRequest.setStorageType(storageType);
507528
}
508529

509530
ContainerCommandRequestProto.Builder requestBuilder =

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hdds.client.ECReplicationConfig;
2929
import org.apache.hadoop.hdds.conf.ConfigurationSource;
3030
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
31+
import org.apache.hadoop.hdds.protocol.StorageType;
3132
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
3233
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
3334
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -159,12 +160,23 @@ public void deleteContainerInState(long containerID, DatanodeDetails dn,
159160
public void createRecoveringContainer(long containerID, DatanodeDetails dn,
160161
ECReplicationConfig repConfig, String encodedToken, int replicaIndex)
161162
throws IOException {
163+
createRecoveringContainer(containerID, dn, repConfig, encodedToken,
164+
replicaIndex, null);
165+
}
166+
167+
public void createRecoveringContainer(long containerID, DatanodeDetails dn,
168+
ECReplicationConfig repConfig, String encodedToken, int replicaIndex,
169+
StorageType storageType) throws IOException {
162170
XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(
163171
singleNodePipeline(dn, repConfig));
164172
try {
173+
ContainerProtos.StorageTypeProto storageTypeProto =
174+
storageType != null
175+
? ContainerProtos.StorageTypeProto.valueOf(storageType.name())
176+
: null;
165177
ContainerProtocolCalls
166178
.createRecoveringContainer(xceiverClient, containerID, encodedToken,
167-
replicaIndex);
179+
replicaIndex, storageTypeProto);
168180
} finally {
169181
this.xceiverClientManager.releaseClient(xceiverClient, false);
170182
}

0 commit comments

Comments
 (0)