Skip to content

Commit d6ed40a

Browse files
committed
Merge branch 'master' into strong_password
# Conflicts: # integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java # integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
2 parents 35f7c94 + a0a4700 commit d6ed40a

161 files changed

Lines changed: 3846 additions & 1370 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import org.apache.commons.lang3.SystemUtils;
2525
import org.apache.tsfile.utils.Pair;
2626

27-
import java.io.File;
28-
import java.io.IOException;
27+
import java.io.*;
28+
import java.util.HashMap;
2929
import java.util.List;
30+
import java.util.Map;
3031
import java.util.stream.Collectors;
3132
import java.util.stream.IntStream;
3233

@@ -98,15 +99,71 @@ public static int[] searchAvailablePorts() {
9899
}
99100

100101
private static boolean checkPortsAvailable(final List<Integer> ports) {
101-
final String cmd = getSearchAvailablePortCmd(ports);
102102
try {
103-
return Runtime.getRuntime().exec(cmd).waitFor() == 1;
104-
} catch (final IOException ignore) {
105-
// ignore
106-
} catch (final InterruptedException e) {
107-
Thread.currentThread().interrupt();
103+
return listPortOccupation(ports).isEmpty();
104+
} catch (IOException e) {
105+
IoTDBTestLogger.logger.error("Cannot check available ports", e);
106+
return false;
108107
}
109-
return false;
108+
}
109+
110+
public static Map<Integer, Long> listPortOccupation(final List<Integer> ports)
111+
throws IOException {
112+
return SystemUtils.IS_OS_WINDOWS
113+
? listPortOccupationWindows(ports)
114+
: listPortOccupationUnix(ports);
115+
}
116+
117+
public static Map<Integer, Long> listPortOccupation(
118+
final List<Integer> ports,
119+
String cmd,
120+
int targetColumnLength,
121+
int addressColumnIndex,
122+
int pidColumnIndex)
123+
throws IOException {
124+
Process process = Runtime.getRuntime().exec(cmd);
125+
Map<Integer, Long> result = new HashMap<>();
126+
try (BufferedReader reader =
127+
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
128+
String line;
129+
while ((line = reader.readLine()) != null) {
130+
String[] split = line.trim().split("\\s+");
131+
if (split.length != targetColumnLength) {
132+
continue;
133+
}
134+
String localAddress = split[addressColumnIndex];
135+
for (Integer port : ports) {
136+
if (localAddress.endsWith(":" + port)) {
137+
result.put(port, Long.parseLong(split[pidColumnIndex]));
138+
break;
139+
}
140+
}
141+
}
142+
} catch (EOFException ignored) {
143+
}
144+
return result;
145+
}
146+
147+
/**
148+
* List occupied port and the associated pid on windows.
149+
*
150+
* @param ports ports to be checked
151+
* @return (occupiedPort, pid) pairs
152+
*/
153+
public static Map<Integer, Long> listPortOccupationWindows(final List<Integer> ports)
154+
throws IOException {
155+
return listPortOccupation(ports, "netstat -aon -p tcp", 5, 1, 4);
156+
}
157+
158+
/**
159+
* List occupied port and the associated pid on Unix.
160+
*
161+
* @param ports ports to be checked
162+
* @return (occupiedPort, pid) pairs
163+
*/
164+
public static Map<Integer, Long> listPortOccupationUnix(final List<Integer> ports)
165+
throws IOException {
166+
return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9, 1);
110167
}
111168

112169
private static String getSearchAvailablePortCmd(final List<Integer> ports) {
@@ -115,7 +172,7 @@ private static String getSearchAvailablePortCmd(final List<Integer> ports) {
115172

116173
private static String getWindowsSearchPortCmd(final List<Integer> ports) {
117174
return "netstat -aon -p tcp | findStr "
118-
+ ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" "));
175+
+ ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" "));
119176
}
120177

121178
private static String getUnixSearchPortCmd(final List<Integer> ports) {

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,12 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
526526
return this;
527527
}
528528

529+
@Override
530+
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
531+
setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
532+
return this;
533+
}
534+
529535
@Override
530536
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
531537
setProperty("default_storage_group_level", String.valueOf(defaultStorageGroupLevel));

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,13 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
541541
return this;
542542
}
543543

544+
@Override
545+
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
546+
dnConfig.setSubscriptionEnabled(subscriptionEnabled);
547+
cnConfig.setSubscriptionEnabled(subscriptionEnabled);
548+
return this;
549+
}
550+
544551
@Override
545552
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
546553
dnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);

0 commit comments

Comments
 (0)