Skip to content

Commit f304a25

Browse files
committed
Storage tiering. Pass storage type to SCM
1 parent 6512c95 commit f304a25

12 files changed

Lines changed: 154 additions & 28 deletions

File tree

hadoop-ozone/common/src/main/java/org/apache/hadoop/hdds/protocol/StorageType.java renamed to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/StorageType.java

File renamed without changes.

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeoutException;
2424
import org.apache.hadoop.hdds.client.ReplicationConfig;
2525
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
26+
import org.apache.hadoop.hdds.protocol.StorageType;
2627
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
2728
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
2829
import org.apache.hadoop.hdds.scm.AddSCMRequest;
@@ -87,7 +88,7 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
8788
ReplicationConfig replicationConfig, String owner,
8889
ExcludeList excludeList) throws IOException {
8990
return allocateBlock(size, numBlocks, replicationConfig, owner,
90-
excludeList, null);
91+
excludeList, null, StorageType.DEFAULT);
9192
}
9293

9394
/**
@@ -107,9 +108,17 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
107108
* @return allocated block accessing info (key, pipeline).
108109
* @throws IOException
109110
*/
111+
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
112+
ReplicationConfig replicationConfig, String owner,
113+
ExcludeList excludeList, String clientMachine) throws IOException {
114+
return allocateBlock(size, numBlocks, replicationConfig, owner,
115+
excludeList, clientMachine, StorageType.DEFAULT);
116+
}
117+
110118
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
111119
ReplicationConfig replicationConfig, String owner,
112-
ExcludeList excludeList, String clientMachine) throws IOException;
120+
ExcludeList excludeList, String clientMachine,
121+
StorageType storageType) throws IOException;
113122

114123
/**
115124
* Delete blocks for a set of object keys.

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
3939
import org.apache.hadoop.hdds.conf.StorageUnit;
4040
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
41+
import org.apache.hadoop.hdds.protocol.StorageType;
4142
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
4243
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
4344
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
@@ -173,7 +174,8 @@ public List<AllocatedBlock> allocateBlock(
173174
long size, int num,
174175
ReplicationConfig replicationConfig,
175176
String owner, ExcludeList excludeList,
176-
String clientMachine
177+
String clientMachine,
178+
StorageType storageType
177179
) throws IOException {
178180
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
179181

@@ -189,6 +191,10 @@ public List<AllocatedBlock> allocateBlock(
189191
requestBuilder.setClient(clientMachine);
190192
}
191193

194+
if (storageType != null) {
195+
requestBuilder.setStorageType(storageType.toProto());
196+
}
197+
192198
switch (replicationConfig.getReplicationType()) {
193199
case STAND_ALONE:
194200
requestBuilder.setFactor(

hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ message AllocateScmBlockRequestProto {
161161

162162
optional string client = 9;
163163

164+
optional hadoop.hdds.StorageTypeProto storageType = 10;
164165
}
165166

166167
/**

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.TimeoutException;
2424
import org.apache.hadoop.hdds.client.ReplicationConfig;
25+
import org.apache.hadoop.hdds.protocol.StorageType;
2526
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
2627
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
2728
import org.apache.hadoop.ozone.common.BlockGroup;
@@ -42,8 +43,8 @@ public interface BlockManager extends Closeable {
4243
* @throws IOException
4344
*/
4445
AllocatedBlock allocateBlock(long size, ReplicationConfig replicationConfig,
45-
String owner,
46-
ExcludeList excludeList) throws IOException, TimeoutException;
46+
String owner, ExcludeList excludeList,
47+
StorageType storageType) throws IOException, TimeoutException;
4748

4849
/**
4950
* Deletes a list of blocks in an atomic operation. Internally, SCM

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.hdds.client.ContainerBlockID;
3232
import org.apache.hadoop.hdds.client.ReplicationConfig;
3333
import org.apache.hadoop.hdds.conf.ConfigurationSource;
34+
import org.apache.hadoop.hdds.protocol.StorageType;
3435
import org.apache.hadoop.hdds.conf.StorageUnit;
3536
import org.apache.hadoop.hdds.scm.ScmConfig;
3637
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -145,11 +146,14 @@ public void stop() throws IOException {
145146
@Override
146147
public AllocatedBlock allocateBlock(final long size,
147148
ReplicationConfig replicationConfig,
148-
String owner, ExcludeList excludeList)
149+
String owner, ExcludeList excludeList,
150+
StorageType storageType)
149151
throws IOException {
150152
if (LOG.isTraceEnabled()) {
151153
LOG.trace("Size : {} , replicationConfig: {}", size, replicationConfig);
152154
}
155+
LOG.debug("Allocating block: size={}, replication={}, storageType={}",
156+
size, replicationConfig, storageType);
153157
if (scm.getScmContext().isInSafeMode()) {
154158
throw new SCMException("SafeModePrecheck failed for allocateBlock",
155159
SCMException.ResultCodes.SAFE_MODE_EXCEPTION);

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.stream.Collectors;
2525
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
2626
import org.apache.hadoop.hdds.client.ReplicationConfig;
27+
import org.apache.hadoop.hdds.protocol.StorageType;
2728
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2829
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
2930
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -193,6 +194,10 @@ private Status exceptionToResponseStatus(IOException ex) {
193194
public AllocateScmBlockResponseProto allocateScmBlock(
194195
AllocateScmBlockRequestProto request, int clientVersion)
195196
throws IOException {
197+
StorageType storageType = request.hasStorageType()
198+
? StorageType.valueOf(request.getStorageType())
199+
: StorageType.DEFAULT;
200+
196201
List<AllocatedBlock> allocatedBlocks =
197202
impl.allocateBlock(request.getSize(),
198203
request.getNumBlocks(),
@@ -202,7 +207,8 @@ public AllocateScmBlockResponseProto allocateScmBlock(
202207
request.getEcReplicationConfig()),
203208
request.getOwner(),
204209
ExcludeList.getFromProtoBuf(request.getExcludeList()),
205-
request.getClient());
210+
request.getClient(),
211+
storageType);
206212

207213
AllocateScmBlockResponseProto.Builder builder =
208214
AllocateScmBlockResponseProto.newBuilder();

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.hadoop.hdds.client.ReplicationConfig;
4848
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
4949
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
50+
import org.apache.hadoop.hdds.protocol.StorageType;
5051
import org.apache.hadoop.hdds.protocol.DatanodeID;
5152
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
5253
import org.apache.hadoop.hdds.scm.AddSCMRequest;
@@ -189,7 +190,8 @@ public List<AllocatedBlock> allocateBlock(
189190
long size, int num,
190191
ReplicationConfig replicationConfig,
191192
String owner, ExcludeList excludeList,
192-
String clientMachine
193+
String clientMachine,
194+
StorageType storageType
193195
) throws IOException {
194196
long startNanos = Time.monotonicNowNanos();
195197
Map<String, String> auditMap = Maps.newHashMap();
@@ -198,6 +200,7 @@ public List<AllocatedBlock> allocateBlock(
198200
auditMap.put("replication", replicationConfig.toString());
199201
auditMap.put("owner", owner);
200202
auditMap.put("client", clientMachine);
203+
auditMap.put("storageType", String.valueOf(storageType));
201204
List<AllocatedBlock> blocks = new ArrayList<>(num);
202205

203206
if (LOG.isDebugEnabled()) {
@@ -207,7 +210,8 @@ public List<AllocatedBlock> allocateBlock(
207210
try {
208211
for (int i = 0; i < num; i++) {
209212
AllocatedBlock block = scm.getScmBlockManager()
210-
.allocateBlock(size, replicationConfig, owner, excludeList);
213+
.allocateBlock(size, replicationConfig, owner, excludeList,
214+
storageType);
211215
if (block != null) {
212216
// Sort the datanodes if client machine is specified
213217
final Node client = getClientNode(clientMachine);

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.hdds.HddsConfigKeys;
4545
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
4646
import org.apache.hadoop.hdds.client.ReplicationConfig;
47+
import org.apache.hadoop.hdds.protocol.StorageType;
4748
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
4849
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
4950
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -197,7 +198,8 @@ public void testAllocateBlock() throws Exception {
197198
pipelineManager.createPipeline(replicationConfig);
198199
HddsTestUtils.openAllRatisPipelines(pipelineManager);
199200
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
200-
replicationConfig, OzoneConsts.OZONE, new ExcludeList());
201+
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
202+
StorageType.DEFAULT);
201203
assertNotNull(block);
202204
}
203205

@@ -216,7 +218,7 @@ public void testAllocateBlockWithExclusion() throws Exception {
216218
.get(0).getId());
217219
AllocatedBlock block = blockManager
218220
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
219-
excludeList);
221+
excludeList, StorageType.DEFAULT);
220222
assertNotNull(block);
221223
for (PipelineID id : excludeList.getPipelineIds()) {
222224
assertNotEquals(block.getPipeline().getId(), id);
@@ -227,7 +229,7 @@ public void testAllocateBlockWithExclusion() throws Exception {
227229
}
228230
block = blockManager
229231
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
230-
excludeList);
232+
excludeList, StorageType.DEFAULT);
231233
assertNotNull(block);
232234
assertThat(excludeList.getPipelineIds()).contains(block.getPipeline().getId());
233235
}
@@ -249,7 +251,7 @@ void testAllocateBlockInParallel() throws Exception {
249251
future.complete(blockManager
250252
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
251253
OzoneConsts.OZONE,
252-
new ExcludeList()));
254+
new ExcludeList(), StorageType.DEFAULT));
253255
} catch (IOException e) {
254256
future.completeExceptionally(e);
255257
}
@@ -287,7 +289,7 @@ void testBlockDistribution() throws Exception {
287289
AllocatedBlock block = blockManager
288290
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
289291
OzoneConsts.OZONE,
290-
new ExcludeList());
292+
new ExcludeList(), StorageType.DEFAULT);
291293
long containerId = block.getBlockID().getContainerID();
292294
if (!allocatedBlockMap.containsKey(containerId)) {
293295
blockList = new ArrayList<>();
@@ -343,7 +345,7 @@ void testBlockDistributionWithMultipleDisks() throws Exception {
343345
AllocatedBlock block = blockManager
344346
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
345347
OzoneConsts.OZONE,
346-
new ExcludeList());
348+
new ExcludeList(), StorageType.DEFAULT);
347349
long containerId = block.getBlockID().getContainerID();
348350
if (!allocatedBlockMap.containsKey(containerId)) {
349351
blockList = new ArrayList<>();
@@ -403,7 +405,7 @@ void testBlockDistributionWithMultipleRaftLogDisks() throws Exception {
403405
AllocatedBlock block = blockManager
404406
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
405407
OzoneConsts.OZONE,
406-
new ExcludeList());
408+
new ExcludeList(), StorageType.DEFAULT);
407409
long containerId = block.getBlockID().getContainerID();
408410
if (!allocatedBlockMap.containsKey(containerId)) {
409411
blockList = new ArrayList<>();
@@ -439,7 +441,8 @@ public void testAllocateOversizedBlock() {
439441
long size = 6 * GB;
440442
Throwable t = assertThrows(IOException.class, () ->
441443
blockManager.allocateBlock(size,
442-
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
444+
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
445+
StorageType.DEFAULT));
443446
assertEquals("Unsupported block size: " + size,
444447
t.getMessage());
445448
}
@@ -450,7 +453,8 @@ public void testAllocateBlockFailureInSafeMode() {
450453
// Test1: In safe mode expect an SCMException.
451454
Throwable t = assertThrows(IOException.class, () ->
452455
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
453-
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
456+
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
457+
StorageType.DEFAULT));
454458
assertEquals("SafeModePrecheck failed for allocateBlock",
455459
t.getMessage());
456460
}
@@ -459,7 +463,8 @@ public void testAllocateBlockFailureInSafeMode() {
459463
public void testAllocateBlockSucInSafeMode() throws Exception {
460464
// Test2: Exit safe mode and then try allocateBock again.
461465
assertNotNull(blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
462-
replicationConfig, OzoneConsts.OZONE, new ExcludeList()));
466+
replicationConfig, OzoneConsts.OZONE, new ExcludeList(),
467+
StorageType.DEFAULT));
463468
}
464469

465470
@Test
@@ -472,14 +477,14 @@ public void testMultipleBlockAllocation()
472477

473478
AllocatedBlock allocatedBlock = blockManager
474479
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
475-
new ExcludeList());
480+
new ExcludeList(), StorageType.DEFAULT);
476481
// block should be allocated in different pipelines
477482
GenericTestUtils.waitFor(() -> {
478483
try {
479484
AllocatedBlock block = blockManager
480485
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
481486
OzoneConsts.OZONE,
482-
new ExcludeList());
487+
new ExcludeList(), StorageType.DEFAULT);
483488
return !block.getPipeline().getId()
484489
.equals(allocatedBlock.getPipeline().getId());
485490
} catch (IOException e) {
@@ -525,7 +530,7 @@ public void testMultipleBlockAllocationWithClosedContainer()
525530
blockManager
526531
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
527532
OzoneConsts.OZONE,
528-
new ExcludeList());
533+
new ExcludeList(), StorageType.DEFAULT);
529534
} catch (IOException e) {
530535
}
531536
return verifyNumberOfContainersInPipelines(
@@ -550,7 +555,7 @@ public void testMultipleBlockAllocationWithClosedContainer()
550555
blockManager
551556
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig,
552557
OzoneConsts.OZONE,
553-
new ExcludeList());
558+
new ExcludeList(), StorageType.DEFAULT);
554559
} catch (IOException e) {
555560
}
556561
return verifyNumberOfContainersInPipelines(
@@ -567,7 +572,7 @@ public void testBlockAllocationWithNoAvailablePipelines()
567572
assertEquals(0, pipelineManager.getPipelines(replicationConfig).size());
568573
assertNotNull(blockManager
569574
.allocateBlock(DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
570-
new ExcludeList()));
575+
new ExcludeList(), StorageType.DEFAULT));
571576
}
572577

573578
private class DatanodeCommandHandler implements

0 commit comments

Comments
 (0)