Skip to content

Commit 3bdec91

Browse files
committed
Merge remote-tracking branch 'origin/master' into HDDS-14209
2 parents 2cb8544 + e714615 commit 3bdec91

99 files changed

Lines changed: 3319 additions & 2385 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ public class OzoneClientConfig {
8484
tags = ConfigTag.CLIENT)
8585
private boolean datastreamPipelineMode = true;
8686

87+
@Config(key = "ozone.client.datastream.sync.size",
88+
defaultValue = "0B",
89+
type = ConfigType.SIZE,
90+
description = "The minimum size of written data before forcing the datanodes " +
91+
"in the pipeline to flush the pending data to underlying storage." +
92+
" If set to zero or negative, the client will not force the datanodes to flush.",
93+
tags = ConfigTag.CLIENT)
94+
private int dataStreamSyncSize = 0;
95+
8796
@Config(key = "ozone.client.stream.buffer.increment",
8897
defaultValue = "0B",
8998
type = ConfigType.SIZE,
@@ -570,6 +579,10 @@ public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
570579
this.datastreamPipelineMode = datastreamPipelineMode;
571580
}
572581

582+
public int getDataStreamSyncSize() {
583+
return dataStreamSyncSize;
584+
}
585+
573586
public void setHBaseEnhancementsAllowed(boolean isHBaseEnhancementsEnabled) {
574587
this.hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled;
575588
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
131131
private final DataStreamOutput out;
132132
private CompletableFuture<DataStreamReply> dataStreamCloseReply;
133133
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
134-
private static final long SYNC_SIZE = 0; // TODO: disk sync is disabled for now
134+
private final long syncSize;
135135
private long syncPosition = 0;
136136
private StreamBuffer currentBuffer;
137137
private XceiverClientMetrics metrics;
@@ -157,6 +157,7 @@ public BlockDataStreamOutput(
157157
this.xceiverClientFactory = xceiverClientManager;
158158
this.config = config;
159159
this.isDatastreamPipelineMode = config.isDatastreamPipelineMode();
160+
this.syncSize = config.getDataStreamSyncSize();
160161
this.blockID = new AtomicReference<>(blockID);
161162
KeyValue keyValue =
162163
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -647,9 +648,8 @@ public boolean isClosed() {
647648
}
648649

649650
private boolean needSync(long position) {
650-
if (SYNC_SIZE > 0) {
651-
// TODO: or position >= fileLength
652-
if (position - syncPosition >= SYNC_SIZE) {
651+
if (syncSize > 0) {
652+
if (position - syncPosition >= syncSize) {
653653
syncPosition = position;
654654
return true;
655655
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,23 @@ private boolean allDataNodesSupportStreamBlock(Pipeline pipeline) {
110110
return true;
111111
}
112112

113+
/**
114+
* Create a new BlockInputStream for RATIS.
115+
*
116+
* @param blockInfo The blockInfo representing the block.
117+
* @param pipeline The pipeline to be used for reading the block
118+
* @param token The block Access Token
119+
* @param xceiverFactory Factory to create the xceiver in the client
120+
* @param refreshFunction Function to refresh the block location if needed
121+
* @param config The client configuration
122+
* @return BlockInputStream instance.
123+
*/
124+
public BlockInputStream createBlockInputStream(BlockLocationInfo blockInfo,
125+
Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
126+
XceiverClientFactory xceiverFactory,
127+
Function<BlockID, BlockLocationInfo> refreshFunction,
128+
OzoneClientConfig config) throws IOException {
129+
130+
return new BlockInputStream(blockInfo, pipeline, token, xceiverFactory, refreshFunction, config);
131+
}
113132
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,9 @@
2525
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_INTERFACE_KEY;
2626
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_NAMESERVER_KEY;
2727
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_HOST_NAME_KEY;
28-
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
29-
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
30-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
3128
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
3229
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
3330
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
34-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
35-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
36-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
3731
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
3832
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
3933
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
@@ -58,12 +52,9 @@
5852
import java.util.Objects;
5953
import java.util.Optional;
6054
import java.util.OptionalInt;
61-
import java.util.Set;
6255
import java.util.TreeMap;
6356
import java.util.UUID;
6457
import javax.management.ObjectName;
65-
import org.apache.commons.lang3.StringUtils;
66-
import org.apache.commons.lang3.tuple.Pair;
6758
import org.apache.hadoop.conf.ConfigRedactor;
6859
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
6960
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -280,113 +271,6 @@ public static OptionalInt getPortNumberFromConfigKeys(
280271
return OptionalInt.empty();
281272
}
282273

283-
/**
284-
* Retrieve the socket addresses of all storage container managers.
285-
*
286-
* @return A collection of SCM addresses
287-
* @throws IllegalArgumentException If the configuration is invalid
288-
*/
289-
public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
290-
ConfigurationSource conf) {
291-
292-
// First check HA style config, if not defined fall back to OZONE_SCM_NAMES
293-
294-
if (getScmServiceId(conf) != null) {
295-
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
296-
Collection<InetSocketAddress> scmAddressList =
297-
new HashSet<>(scmNodeInfoList.size());
298-
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
299-
scmAddressList.add(
300-
NetUtils.createSocketAddr(scmNodeInfo.getScmDatanodeAddress()));
301-
}
302-
return scmAddressList;
303-
} else {
304-
// fall back to OZONE_SCM_NAMES.
305-
Collection<String> names =
306-
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
307-
if (names.isEmpty()) {
308-
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
309-
+ " need to be a set of valid DNS names or IP addresses."
310-
+ " Empty address list found.");
311-
}
312-
313-
Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
314-
for (String address : names) {
315-
Optional<String> hostname = getHostName(address);
316-
if (!hostname.isPresent()) {
317-
throw new IllegalArgumentException("Invalid hostname for SCM: "
318-
+ address);
319-
}
320-
int port = getHostPort(address)
321-
.orElse(conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
322-
OZONE_SCM_DATANODE_PORT_DEFAULT));
323-
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
324-
port);
325-
addresses.add(addr);
326-
}
327-
328-
if (addresses.size() > 1) {
329-
LOG.warn("When SCM HA is configured, configure {} appended with " +
330-
"serviceId and nodeId. {} is deprecated.", OZONE_SCM_ADDRESS_KEY,
331-
OZONE_SCM_NAMES);
332-
}
333-
return addresses;
334-
}
335-
}
336-
337-
/**
338-
* Returns the SCM address for datanodes based on the service ID and the SCM addresses.
339-
* @param conf Configuration
340-
* @param scmServiceId SCM service ID
341-
* @param scmNodeIds Requested SCM node IDs
342-
* @return A collection with addresses of the request SCM node IDs.
343-
* Null if there is any wrongly configured SCM address. Note that the returned collection
344-
* might not be ordered the same way as the requested SCM node IDs
345-
*/
346-
public static Collection<Pair<String, InetSocketAddress>> getSCMAddressForDatanodes(
347-
ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
348-
Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new HashSet<>(scmNodeIds.size());
349-
for (String scmNodeId : scmNodeIds) {
350-
String addressKey = ConfUtils.addKeySuffixes(
351-
OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
352-
String scmAddress = conf.get(addressKey);
353-
if (scmAddress == null) {
354-
LOG.warn("The SCM address configuration {} is not defined, return nothing", addressKey);
355-
return null;
356-
}
357-
358-
int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
359-
OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
360-
OZONE_SCM_DATANODE_PORT_DEFAULT);
361-
362-
String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress, scmDatanodePort);
363-
InetSocketAddress scmDatanodeAddress = NetUtils.createSocketAddr(scmDatanodeAddressStr);
364-
scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
365-
}
366-
return scmNodeAddress;
367-
}
368-
369-
/**
370-
* Retrieve the socket addresses of recon.
371-
*
372-
* @return Recon address
373-
* @throws IllegalArgumentException If the configuration is invalid
374-
*/
375-
public static InetSocketAddress getReconAddresses(
376-
ConfigurationSource conf) {
377-
String name = conf.get(OZONE_RECON_ADDRESS_KEY);
378-
if (StringUtils.isEmpty(name)) {
379-
return null;
380-
}
381-
Optional<String> hostname = getHostName(name);
382-
if (!hostname.isPresent()) {
383-
throw new IllegalArgumentException("Invalid hostname for Recon: "
384-
+ name);
385-
}
386-
int port = getHostPort(name).orElse(OZONE_RECON_DATANODE_PORT_DEFAULT);
387-
return NetUtils.createSocketAddr(hostname.get(), port);
388-
}
389-
390274
/**
391275
* Returns the hostname for this datanode. If the hostname is not
392276
* explicitly configured in the given config, then it is determined

hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java

Lines changed: 0 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,14 @@
1717

1818
package org.apache.hadoop.hdds;
1919

20-
import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
2120
import static org.apache.hadoop.hdds.HddsUtils.processForLogging;
22-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
23-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
2421
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
2522
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
26-
import static org.assertj.core.api.Assertions.assertThat;
2723
import static org.junit.jupiter.api.Assertions.assertEquals;
28-
import static org.junit.jupiter.api.Assertions.assertNotNull;
2924
import static org.junit.jupiter.api.Assertions.assertThrows;
30-
import static org.junit.jupiter.api.Assertions.assertTrue;
3125

32-
import java.net.InetSocketAddress;
3326
import java.nio.file.Paths;
34-
import java.util.ArrayList;
3527
import java.util.Arrays;
36-
import java.util.Collection;
37-
import java.util.HashMap;
38-
import java.util.Iterator;
3928
import java.util.List;
4029
import java.util.Map;
4130
import java.util.Optional;
@@ -103,128 +92,6 @@ void validatePathRejectsInvalidPath(String path, String ancestor) {
10392
() -> HddsUtils.validatePath(Paths.get(path), Paths.get(ancestor)));
10493
}
10594

106-
@Test
107-
void testGetSCMAddresses() {
108-
final OzoneConfiguration conf = new OzoneConfiguration();
109-
Collection<InetSocketAddress> addresses;
110-
InetSocketAddress addr;
111-
Iterator<InetSocketAddress> it;
112-
113-
// Verify valid IP address setup
114-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
115-
addresses = getSCMAddressForDatanodes(conf);
116-
assertEquals(1, addresses.size());
117-
addr = addresses.iterator().next();
118-
assertEquals("1.2.3.4", addr.getHostName());
119-
assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
120-
121-
// Verify valid hostname setup
122-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
123-
addresses = getSCMAddressForDatanodes(conf);
124-
assertEquals(1, addresses.size());
125-
addr = addresses.iterator().next();
126-
assertEquals("scm1", addr.getHostName());
127-
assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
128-
129-
// Verify valid hostname and port
130-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
131-
addresses = getSCMAddressForDatanodes(conf);
132-
assertEquals(1, addresses.size());
133-
addr = addresses.iterator().next();
134-
assertEquals("scm1", addr.getHostName());
135-
assertEquals(1234, addr.getPort());
136-
137-
final Map<String, Integer> hostsAndPorts = new HashMap<>();
138-
hostsAndPorts.put("scm1", 1234);
139-
hostsAndPorts.put("scm2", 2345);
140-
hostsAndPorts.put("scm3", 3456);
141-
142-
// Verify multiple hosts and port
143-
conf.setStrings(
144-
ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
145-
addresses = getSCMAddressForDatanodes(conf);
146-
assertEquals(3, addresses.size());
147-
it = addresses.iterator();
148-
HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
149-
while (it.hasNext()) {
150-
InetSocketAddress current = it.next();
151-
assertTrue(expected1.remove(current.getHostName(),
152-
current.getPort()));
153-
}
154-
assertThat(expected1).isEmpty();
155-
156-
// Verify names with spaces
157-
conf.setStrings(
158-
ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
159-
addresses = getSCMAddressForDatanodes(conf);
160-
assertEquals(3, addresses.size());
161-
it = addresses.iterator();
162-
HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
163-
while (it.hasNext()) {
164-
InetSocketAddress current = it.next();
165-
assertTrue(expected2.remove(current.getHostName(),
166-
current.getPort()));
167-
}
168-
assertThat(expected2).isEmpty();
169-
170-
// Verify empty value
171-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
172-
assertThrows(IllegalArgumentException.class,
173-
() -> getSCMAddressForDatanodes(conf),
174-
"Empty value should cause an IllegalArgumentException");
175-
176-
// Verify invalid hostname
177-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
178-
assertThrows(IllegalArgumentException.class,
179-
() -> getSCMAddressForDatanodes(conf),
180-
"An invalid hostname should cause an IllegalArgumentException");
181-
182-
// Verify invalid port
183-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
184-
assertThrows(IllegalArgumentException.class,
185-
() -> getSCMAddressForDatanodes(conf),
186-
"An invalid port should cause an IllegalArgumentException");
187-
188-
// Verify a mixed case (valid and invalid value both appears)
189-
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
190-
assertThrows(IllegalArgumentException.class,
191-
() -> getSCMAddressForDatanodes(conf),
192-
"An invalid value should cause an IllegalArgumentException");
193-
}
194-
195-
@Test
196-
void testGetSCMAddressesWithHAConfig() {
197-
OzoneConfiguration conf = new OzoneConfiguration();
198-
String scmServiceId = "scmserviceId";
199-
String[] nodes = new String[]{"scm1", "scm2", "scm3"};
200-
conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
201-
conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY + "." + scmServiceId,
202-
"scm1,scm2,scm3");
203-
204-
int port = 9880;
205-
List<String> expected = new ArrayList<>();
206-
for (String nodeId : nodes) {
207-
conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_ADDRESS_KEY,
208-
scmServiceId, nodeId), "scm");
209-
conf.setInt(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
210-
scmServiceId, nodeId), ++port);
211-
expected.add("scm" + ":" + port);
212-
}
213-
214-
Collection<InetSocketAddress> scmAddressList =
215-
HddsUtils.getSCMAddressForDatanodes(conf);
216-
217-
assertNotNull(scmAddressList);
218-
assertEquals(3, scmAddressList.size());
219-
220-
for (InetSocketAddress next : scmAddressList) {
221-
expected.remove(next.getHostName() + ":" + next.getPort());
222-
}
223-
224-
assertEquals(0, expected.size());
225-
226-
}
227-
22895
@Test
22996
void testGetNumberFromConfigKeys() {
23097
final String testnum1 = "8";

0 commit comments

Comments
 (0)