Skip to content

Commit a4a28c3

Browse files
committed
Add scm pipeline filtering
1 parent f304a25 commit a4a28c3

11 files changed

Lines changed: 777 additions & 9 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineRequestInformation.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.hadoop.hdds.scm;
1919

20+
import org.apache.hadoop.hdds.protocol.StorageType;
21+
2022
/**
2123
* The information of the request of pipeline.
2224
*/
2325
public final class PipelineRequestInformation {
2426
private final long size;
27+
private final StorageType storageType;
2528

2629
/**
2730
* Builder for PipelineRequestInformation.
2831
*/
2932
public static class Builder {
3033
private long size;
34+
private StorageType storageType;
3135

3236
public static Builder getBuilder() {
3337
return new Builder();
@@ -43,16 +47,26 @@ public Builder setSize(long sz) {
4347
return this;
4448
}
4549

50+
public Builder setStorageType(StorageType st) {
51+
this.storageType = st;
52+
return this;
53+
}
54+
4655
public PipelineRequestInformation build() {
47-
return new PipelineRequestInformation(size);
56+
return new PipelineRequestInformation(size, storageType);
4857
}
4958
}
5059

51-
private PipelineRequestInformation(long size) {
60+
private PipelineRequestInformation(long size, StorageType storageType) {
5261
this.size = size;
62+
this.storageType = storageType;
5363
}
5464

5565
public long getSize() {
5666
return size;
5767
}
68+
69+
public StorageType getStorageType() {
70+
return storageType;
71+
}
5872
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public AllocatedBlock allocateBlock(final long size,
165165
}
166166

167167
ContainerInfo containerInfo = writableContainerFactory.getContainer(
168-
size, replicationConfig, owner, excludeList);
168+
size, replicationConfig, owner, excludeList, storageType);
169169

170170
if (containerInfo != null) {
171171
return newBlock(containerInfo);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.pipeline;
19+
20+
import java.util.HashSet;
21+
import java.util.List;
22+
import java.util.Set;
23+
import java.util.UUID;
24+
import java.util.stream.Collectors;
25+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
26+
import org.apache.hadoop.hdds.protocol.StorageType;
27+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
28+
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
29+
import org.apache.hadoop.hdds.scm.node.NodeManager;
30+
import org.apache.hadoop.hdds.scm.node.NodeStatus;
31+
32+
/**
33+
* Utility for filtering pipelines by StorageType. Builds a set of
34+
* qualifying node UUIDs (nodes that have at least one volume of the
35+
* requested StorageType), then filters pipelines to those whose every
36+
* member is in that set.
37+
*/
38+
final class PipelineStorageTypeFilter {
39+
40+
private PipelineStorageTypeFilter() {
41+
}
42+
43+
static Set<UUID> getNodesWithStorageType(NodeManager nodeManager,
44+
StorageType storageType) {
45+
Set<UUID> result = new HashSet<>();
46+
for (DatanodeDetails dn :
47+
nodeManager.getNodes(NodeStatus.inServiceHealthy())) {
48+
DatanodeInfo info = nodeManager.getDatanodeInfo(dn);
49+
if (info == null) {
50+
continue;
51+
}
52+
for (StorageReportProto report : info.getStorageReports()) {
53+
if (StorageType.valueOf(report.getStorageType()) == storageType) {
54+
result.add(dn.getUuid());
55+
break;
56+
}
57+
}
58+
}
59+
return result;
60+
}
61+
62+
static List<Pipeline> filter(List<Pipeline> pipelines,
63+
NodeManager nodeManager, StorageType storageType) {
64+
if (storageType == null) {
65+
return pipelines;
66+
}
67+
Set<UUID> qualifiedNodes =
68+
getNodesWithStorageType(nodeManager, storageType);
69+
return pipelines.stream()
70+
.filter(p -> p.getNodes().stream()
71+
.allMatch(dn -> qualifiedNodes.contains(dn.getUuid())))
72+
.collect(Collectors.toList());
73+
}
74+
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.hdds.client.ECReplicationConfig;
2626
import org.apache.hadoop.hdds.client.ReplicationConfig;
2727
import org.apache.hadoop.hdds.conf.ConfigurationSource;
28+
import org.apache.hadoop.hdds.protocol.StorageType;
2829
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
2930
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
3031
import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
@@ -45,7 +46,8 @@ public WritableContainerFactory(StorageContainerManager scm) {
4546

4647
this.ratisProvider = new WritableRatisContainerProvider(
4748
scm.getPipelineManager(),
48-
scm.getContainerManager(), scm.getPipelineChoosePolicy());
49+
scm.getContainerManager(), scm.getPipelineChoosePolicy(),
50+
scm.getScmNodeManager());
4951
this.standaloneProvider = ratisProvider;
5052

5153
WritableECContainerProviderConfig ecProviderConfig =
@@ -79,6 +81,25 @@ public ContainerInfo getContainer(final long size,
7981
}
8082
}
8183

84+
public ContainerInfo getContainer(final long size,
85+
ReplicationConfig repConfig, String owner, ExcludeList excludeList,
86+
StorageType storageType) throws IOException {
87+
switch (repConfig.getReplicationType()) {
88+
case STAND_ALONE:
89+
return standaloneProvider.getContainer(size, repConfig, owner,
90+
excludeList, storageType);
91+
case RATIS:
92+
return ratisProvider.getContainer(size, repConfig, owner,
93+
excludeList, storageType);
94+
case EC:
95+
return ecProvider.getContainer(size, (ECReplicationConfig) repConfig,
96+
owner, excludeList, storageType);
97+
default:
98+
throw new IOException(repConfig.getReplicationType()
99+
+ " is an invalid replication type");
100+
}
101+
}
102+
82103
private long getConfiguredContainerSize(ConfigurationSource conf) {
83104
return (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
84105
OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import org.apache.hadoop.hdds.client.ReplicationConfig;
22+
import org.apache.hadoop.hdds.protocol.StorageType;
2223
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
2324
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
2425

@@ -53,4 +54,10 @@ ContainerInfo getContainer(long size, T repConfig,
5354
String owner, ExcludeList excludeList)
5455
throws IOException;
5556

57+
default ContainerInfo getContainer(long size, T repConfig,
58+
String owner, ExcludeList excludeList, StorageType storageType)
59+
throws IOException {
60+
return getContainer(size, repConfig, owner, excludeList);
61+
}
62+
5663
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.hdds.conf.PostConstruct;
3535
import org.apache.hadoop.hdds.conf.ReconfigurableConfig;
3636
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
37+
import org.apache.hadoop.hdds.protocol.StorageType;
3738
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
3839
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
3940
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -93,6 +94,20 @@ public WritableECContainerProvider(WritableECContainerProviderConfig config,
9394
public ContainerInfo getContainer(final long size,
9495
ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
9596
throws IOException {
97+
return getContainerInternal(size, repConfig, owner, excludeList, null);
98+
}
99+
100+
@Override
101+
public ContainerInfo getContainer(final long size,
102+
ECReplicationConfig repConfig, String owner, ExcludeList excludeList,
103+
StorageType storageType) throws IOException {
104+
return getContainerInternal(size, repConfig, owner, excludeList,
105+
storageType);
106+
}
107+
108+
private ContainerInfo getContainerInternal(final long size,
109+
ECReplicationConfig repConfig, String owner, ExcludeList excludeList,
110+
StorageType storageType) throws IOException {
96111
int maximumPipelines = getMaximumPipelines(repConfig);
97112
int openPipelineCount;
98113
synchronized (this) {
@@ -115,12 +130,15 @@ public ContainerInfo getContainer(final long size,
115130
}
116131
List<Pipeline> existingPipelines = pipelineManager.getPipelines(
117132
repConfig, Pipeline.PipelineState.OPEN);
133+
existingPipelines = PipelineStorageTypeFilter.filter(
134+
existingPipelines, nodeManager, storageType);
118135
final int pipelineCount = existingPipelines.size();
119136
LOG.debug("Checking existing pipelines: {}", existingPipelines);
120137

121138
PipelineRequestInformation pri =
122139
PipelineRequestInformation.Builder.getBuilder()
123140
.setSize(size)
141+
.setStorageType(storageType)
124142
.build();
125143
while (!existingPipelines.isEmpty()) {
126144
int pipelineIndex =

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.util.List;
2323
import java.util.stream.Collectors;
2424
import org.apache.hadoop.hdds.client.ReplicationConfig;
25+
import org.apache.hadoop.hdds.protocol.StorageType;
2526
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
2627
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
2728
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
2829
import org.apache.hadoop.hdds.scm.container.ContainerManager;
2930
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
3031
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
32+
import org.apache.hadoop.hdds.scm.node.NodeManager;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -43,20 +45,44 @@ public class WritableRatisContainerProvider
4345
private final PipelineManager pipelineManager;
4446
private final PipelineChoosePolicy pipelineChoosePolicy;
4547
private final ContainerManager containerManager;
48+
private final NodeManager nodeManager;
4649

4750
public WritableRatisContainerProvider(
4851
PipelineManager pipelineManager,
4952
ContainerManager containerManager,
50-
PipelineChoosePolicy pipelineChoosePolicy) {
53+
PipelineChoosePolicy pipelineChoosePolicy,
54+
NodeManager nodeManager) {
5155
this.pipelineManager = pipelineManager;
5256
this.containerManager = containerManager;
5357
this.pipelineChoosePolicy = pipelineChoosePolicy;
58+
this.nodeManager = nodeManager;
59+
}
60+
61+
public WritableRatisContainerProvider(
62+
PipelineManager pipelineManager,
63+
ContainerManager containerManager,
64+
PipelineChoosePolicy pipelineChoosePolicy) {
65+
this(pipelineManager, containerManager, pipelineChoosePolicy, null);
5466
}
5567

5668
@Override
5769
public ContainerInfo getContainer(final long size,
5870
ReplicationConfig repConfig, String owner, ExcludeList excludeList)
5971
throws IOException {
72+
return getContainerInternal(size, repConfig, owner, excludeList, null);
73+
}
74+
75+
@Override
76+
public ContainerInfo getContainer(final long size,
77+
ReplicationConfig repConfig, String owner, ExcludeList excludeList,
78+
StorageType storageType) throws IOException {
79+
return getContainerInternal(size, repConfig, owner, excludeList,
80+
storageType);
81+
}
82+
83+
private ContainerInfo getContainerInternal(final long size,
84+
ReplicationConfig repConfig, String owner, ExcludeList excludeList,
85+
StorageType storageType) throws IOException {
6086
/*
6187
Here is the high level logic.
6288
@@ -77,10 +103,13 @@ public ContainerInfo getContainer(final long size,
77103
//in downstream managers.
78104

79105
PipelineRequestInformation req =
80-
PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
106+
PipelineRequestInformation.Builder.getBuilder()
107+
.setSize(size)
108+
.setStorageType(storageType)
109+
.build();
81110

82111
ContainerInfo containerInfo =
83-
getContainer(repConfig, owner, excludeList, req);
112+
getContainer(repConfig, owner, excludeList, req, storageType);
84113
if (containerInfo != null) {
85114
return containerInfo;
86115
}
@@ -126,7 +155,8 @@ public ContainerInfo getContainer(final long size,
126155

127156
// If Exception occurred or successful creation of pipeline do one
128157
// final try to fetch pipelines.
129-
containerInfo = getContainer(repConfig, owner, excludeList, req);
158+
containerInfo = getContainer(repConfig, owner, excludeList, req,
159+
storageType);
130160
if (containerInfo != null) {
131161
return containerInfo;
132162
}
@@ -143,14 +173,19 @@ public ContainerInfo getContainer(final long size,
143173

144174
@Nullable
145175
private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
146-
ExcludeList excludeList, PipelineRequestInformation req) {
176+
ExcludeList excludeList, PipelineRequestInformation req,
177+
StorageType storageType) {
147178
// Acquire pipeline manager lock, to avoid any updates to pipeline
148179
// while allocate container happens. This is to avoid scenario like
149180
// mentioned in HDDS-5655.
150181
pipelineManager.acquireReadLock();
151182
try {
152183
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
153184
excludeList, Pipeline.PipelineState.OPEN);
185+
if (nodeManager != null) {
186+
availablePipelines = PipelineStorageTypeFilter.filter(
187+
availablePipelines, nodeManager, storageType);
188+
}
154189
return selectContainer(availablePipelines, req, owner, excludeList);
155190
} finally {
156191
pipelineManager.releaseReadLock();

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,36 @@ public void testBlockAllocationWithNoAvailablePipelines()
575575
new ExcludeList(), StorageType.DEFAULT));
576576
}
577577

578+
/**
579+
* Integration test: verifies the full BlockManagerImpl → WritableContainerFactory
580+
* → WritableRatisContainerProvider → PipelineStorageTypeFilter chain.
581+
* MockNodeManager reports only DISK volumes. Requesting SSD should fail
582+
* because the filter removes all existing pipelines, and newly created
583+
* pipelines also contain DISK-only nodes so they too get filtered on
584+
* the second attempt.
585+
*/
586+
@Test
587+
public void testAllocateBlockWithNonMatchingStorageTypeFails()
588+
throws Exception {
589+
pipelineManager.createPipeline(replicationConfig);
590+
HddsTestUtils.openAllRatisPipelines(pipelineManager);
591+
592+
// Verify DISK allocation works (baseline — all nodes report DISK)
593+
AllocatedBlock diskBlock = blockManager.allocateBlock(
594+
DEFAULT_BLOCK_SIZE, replicationConfig, OzoneConsts.OZONE,
595+
new ExcludeList(), StorageType.DISK);
596+
assertNotNull(diskBlock);
597+
598+
// SSD allocation should fail: MockNodeManager nodes only have DISK
599+
// volumes, so the PipelineStorageTypeFilter removes all pipelines,
600+
// pipeline creation adds another DISK-only pipeline which also gets
601+
// filtered, resulting in IOException.
602+
assertThrows(IOException.class,
603+
() -> blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
604+
replicationConfig, OzoneConsts.OZONE,
605+
new ExcludeList(), StorageType.SSD));
606+
}
607+
578608
private class DatanodeCommandHandler implements
579609
EventHandler<CommandForDatanode> {
580610

0 commit comments

Comments
 (0)