Skip to content

Commit 25acfd9

Browse files
Search concurrent test in 512 MB heap size
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 12ac2a8 commit 25acfd9

2 files changed

Lines changed: 89 additions & 8 deletions

File tree

client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,26 @@
1111
import org.apache.http.client.methods.HttpPost;
1212
import org.apache.http.client.methods.HttpPut;
1313
import org.opensearch.OpenSearchStatusException;
14-
import org.opensearch.action.search.CreatePitRequest;
15-
import org.opensearch.action.search.CreatePitResponse;
16-
import org.opensearch.action.search.DeletePitInfo;
17-
import org.opensearch.action.search.DeletePitRequest;
18-
import org.opensearch.action.search.DeletePitResponse;
19-
import org.opensearch.action.search.GetAllPitNodesResponse;
14+
import org.opensearch.action.search.*;
2015
import org.opensearch.common.unit.TimeValue;
2116
import org.opensearch.core.action.ActionListener;
2217
import org.junit.Before;
18+
import org.opensearch.monitor.jvm.JvmInfo;
2319

2420
import java.io.IOException;
21+
import java.lang.management.ManagementFactory;
22+
import java.lang.management.MemoryMXBean;
2523
import java.util.ArrayList;
24+
import java.util.LinkedList;
2625
import java.util.List;
2726
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.CyclicBarrier;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
2930
import java.util.stream.Collectors;
3031

32+
import static org.hamcrest.Matchers.containsString;
33+
3134
/**
3235
* Tests point in time API with rest high level client
3336
*/
@@ -72,6 +75,84 @@ public void testCreateAndDeletePit() throws IOException {
7275
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
7376
}
7477

78+
public void testMaxRunningSearches() throws Exception {
79+
MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
80+
logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024);
81+
logger.info("heap size of env[{}]", JvmInfo.jvmInfo().getMem().getHeapMax());
82+
try {
83+
int numThreads = 50;
84+
List<Thread> threadsList = new LinkedList<>();
85+
logger.info(threadsList.size());
86+
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
87+
for (int i = 0; i < numThreads; i++) {
88+
threadsList.add(new Thread(() -> {
89+
try {
90+
SearchRequest validRequest = new SearchRequest();
91+
validRequest.indices("index");
92+
logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
93+
//client
94+
95+
SearchResponse searchResponse = execute(validRequest, highLevelClient()::search, highLevelClient()::searchAsync);
96+
assertNotNull(searchResponse);
97+
logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
98+
} catch (IOException e) {
99+
fail("submit request failed");
100+
} finally {
101+
try {
102+
103+
barrier.await();
104+
} catch (Exception e) {
105+
fail();
106+
}
107+
}
108+
}
109+
));
110+
}
111+
threadsList.forEach(Thread::start);
112+
barrier.await();
113+
for (Thread thread : threadsList) {
114+
logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
115+
thread.join();
116+
}
117+
118+
119+
//updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0);
120+
threadsList.clear();
121+
AtomicInteger numFailures = new AtomicInteger();
122+
for (int i = 0; i < numThreads; i++) {
123+
threadsList.add(new Thread(() -> {
124+
try {
125+
SearchRequest validRequest = new SearchRequest();
126+
//validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1));
127+
SearchResponse searchResponse = execute(validRequest, highLevelClient()::search, highLevelClient()::searchAsync);
128+
} catch (Exception e) {
129+
assertTrue(e instanceof ResponseException);
130+
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
131+
numFailures.getAndIncrement();
132+
133+
} finally {
134+
try {
135+
numFailures.getAndIncrement();
136+
barrier.await();
137+
} catch (Exception e) {
138+
fail();
139+
}
140+
}
141+
}
142+
));
143+
}
144+
threadsList.forEach(Thread::start);
145+
barrier.await();
146+
for (Thread thread : threadsList) {
147+
thread.join();
148+
}
149+
assertEquals(numFailures.get(), 50);
150+
} catch (Exception e) {
151+
logger.info("========== EXCEPTION : " + e.getMessage());
152+
logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
153+
}
154+
}
155+
75156
public void testDeleteAllAndListAllPits() throws IOException, InterruptedException {
76157
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
77158
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
org.gradle.caching=true
1414
org.gradle.warning.mode=none
1515
org.gradle.parallel=true
16-
org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m \
16+
org.gradle.jvmargs=-Xmx512M -XX:+HeapDumpOnOutOfMemoryError -Xss2m \
1717
--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
1818
--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
1919
--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
2020
--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
2121
--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
22-
options.forkOptions.memoryMaximumSize=3g
22+
options.forkOptions.memoryMaximumSize=512M
2323

2424
# Disable Gradle Enterprise Gradle plugin's test retry
2525
systemProp.gradle.enterprise.testretry.enabled=false

0 commit comments

Comments
 (0)