Skip to content

Commit 83cabd6

Browse files
committed
Fix: recoverable 'I/O reactor has been shut down' on OpenSearch HC5 transport (#27698)
* Fix I/O reactor has been shutdown * Add Safe Consumer Class * Fix review comments (cherry picked from commit 471c55c)
1 parent 1b72b1f commit 83cabd6

5 files changed

Lines changed: 491 additions & 10 deletions

File tree

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
package org.openmetadata.it.tests;
2+
3+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
6+
import static org.junit.jupiter.api.Assertions.assertNotNull;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
import static org.junit.jupiter.api.Assertions.fail;
9+
import static org.junit.jupiter.api.Assumptions.abort;
10+
11+
import java.lang.reflect.Field;
12+
import java.nio.ByteBuffer;
13+
import java.time.Duration;
14+
import java.util.List;
15+
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.TimeoutException;
18+
import lombok.extern.slf4j.Slf4j;
19+
import org.apache.hc.core5.concurrent.FutureCallback;
20+
import org.apache.hc.core5.http.ClassicHttpResponse;
21+
import org.apache.hc.core5.http.EntityDetails;
22+
import org.apache.hc.core5.http.Header;
23+
import org.apache.hc.core5.http.HttpHost;
24+
import org.apache.hc.core5.http.HttpResponse;
25+
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
26+
import org.apache.hc.core5.http.nio.CapacityChannel;
27+
import org.apache.hc.core5.http.protocol.HttpContext;
28+
import org.apache.hc.core5.reactor.IOReactorConfig;
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.TestInstance;
33+
import org.junit.jupiter.api.parallel.Execution;
34+
import org.junit.jupiter.api.parallel.ExecutionMode;
35+
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
36+
import org.openmetadata.service.search.opensearch.SafeResponseConsumer;
37+
import org.openmetadata.service.search.vector.OpenSearchVectorService;
38+
import org.opensearch.testcontainers.OpensearchContainer;
39+
import org.testcontainers.junit.jupiter.Container;
40+
import org.testcontainers.junit.jupiter.Testcontainers;
41+
import org.testcontainers.utility.DockerImageName;
42+
import os.org.opensearch.client.json.jackson.JacksonJsonpMapper;
43+
import os.org.opensearch.client.transport.httpclient5.ApacheHttpClient5Options;
44+
import os.org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport;
45+
import os.org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
46+
import os.org.opensearch.client.transport.httpclient5.HttpAsyncResponseConsumerFactory;
47+
48+
/**
49+
* Regression guards for the three fixes applied against the "I/O reactor has been shut down"
50+
* family of failures (opensearch-java
51+
* <a href="https://github.com/opensearch-project/opensearch-java/issues/1969">#1969</a>; ports
52+
* the pattern from elasticsearch-java
53+
* <a href="https://github.com/elastic/elasticsearch-java/pull/1049">#1049</a>).
54+
*
55+
* <p>Every test here asserts <b>post-fix</b> behavior and will fail if any of the following are
56+
* reverted:
57+
*
58+
* <ul>
59+
* <li>{@link OpenSearchVectorService#close()} must NOT close the shared HC5 transport.
60+
* <li>{@code OpenSearchClient.createApacheHttpClient5Transport} must install
61+
* {@link SafeResponseConsumer} as the outer wrapper on the response-consumer factory.
62+
* <li>{@link SafeResponseConsumer} itself must convert {@code Error} to
63+
* {@code RuntimeException} in every {@code AsyncResponseConsumer} method.
64+
* </ul>
65+
*/
66+
@Slf4j
67+
@Testcontainers
68+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
69+
@Execution(ExecutionMode.CONCURRENT)
70+
class OpensearchHC5ReactorReproIT {
71+
72+
@Container
73+
static OpensearchContainer<?> opensearch =
74+
new OpensearchContainer<>(DockerImageName.parse("opensearchproject/opensearch:3.4.0"))
75+
.withStartupTimeout(Duration.ofMinutes(5))
76+
.withEnv("discovery.type", "single-node")
77+
.withEnv("DISABLE_SECURITY_PLUGIN", "true")
78+
.withEnv("DISABLE_INSTALL_DEMO_CONFIG", "true")
79+
.withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "Test@12345")
80+
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms512m -Xmx512m");
81+
82+
private org.openmetadata.service.search.opensearch.OpenSearchClient osClient;
83+
84+
@BeforeAll
85+
void setUp() {
86+
ElasticSearchConfiguration cfg =
87+
new ElasticSearchConfiguration()
88+
.withHost(opensearch.getHost())
89+
.withPort(opensearch.getMappedPort(9200))
90+
.withScheme("http")
91+
.withConnectionTimeoutSecs(10)
92+
.withSocketTimeoutSecs(30)
93+
.withKeepAliveTimeoutSecs(600)
94+
.withBatchSize(10)
95+
.withClusterAlias("")
96+
.withSearchType(ElasticSearchConfiguration.SearchType.OPENSEARCH);
97+
osClient = new org.openmetadata.service.search.opensearch.OpenSearchClient(cfg);
98+
}
99+
100+
@AfterAll
101+
void tearDown() {
102+
if (osClient != null) {
103+
osClient.close();
104+
}
105+
}
106+
107+
/**
108+
* FIX #1: {@link OpenSearchVectorService#close()} must no-op on the shared transport.
109+
*
110+
* <p>Before the fix, this method called {@code client._transport().close()} on an
111+
* opensearch-java client whose transport is shared with every other manager on the production
112+
* {@code OpenSearchClient}. Closing it there permanently shut down the HC5 IOReactor for the
113+
* whole process — subsequent cluster operations threw "I/O reactor has been shut down" or
114+
* "Connection pool shut down" until JVM restart.
115+
*/
116+
@Test
117+
void vectorServiceCloseMustNotKillSharedTransport() throws Exception {
118+
assertNotNull(
119+
osClient.getNewClient().cluster().health(), "baseline cluster.health() should work");
120+
121+
OpenSearchVectorService sharedTransportConsumer =
122+
new OpenSearchVectorService(osClient.getNewClient(), null);
123+
sharedTransportConsumer.close();
124+
125+
assertDoesNotThrow(
126+
() -> osClient.getNewClient().cluster().health(),
127+
"after OpenSearchVectorService.close(), the main OpenSearchClient must still work — "
128+
+ "the vector service must not close the shared transport");
129+
}
130+
131+
/**
132+
* FIX #2: production {@code OpenSearchClient.createApacheHttpClient5Transport} must install
133+
* {@link SafeResponseConsumer} as the outer wrapper on the response-consumer factory.
134+
*
135+
* <p>Walks the production transport via reflection down to the response-consumer factory and
136+
* asserts the consumer it produces is (or wraps) a {@link SafeResponseConsumer}. Fails if the
137+
* {@code setOptions(...)} call in production wiring is removed or the factory is reconfigured
138+
* to bypass the wrapper.
139+
*/
140+
@Test
141+
void productionTransportMustWrapResponseConsumerWithSafeResponseConsumer() throws Exception {
142+
ApacheHttpClient5Transport transport =
143+
(ApacheHttpClient5Transport) osClient.getLowLevelClient();
144+
assertNotNull(transport, "production OpenSearchClient must expose HC5 transport");
145+
146+
// opensearch-java doesn't expose the transport options via a public getter, so this
147+
// assertion relies on the "transportOptions" field name. If a future upgrade renames or
148+
// repackages that field the test will skip (not fail) — signalling a needed review rather
149+
// than a false regression.
150+
ApacheHttpClient5Options options;
151+
try {
152+
options = (ApacheHttpClient5Options) readField(transport, "transportOptions");
153+
} catch (NoSuchFieldException e) {
154+
abort(
155+
"opensearch-java internal field layout changed (no 'transportOptions' field on "
156+
+ transport.getClass().getName()
157+
+ "). Review the SafeResponseConsumer wiring against the new API and update "
158+
+ "this test.");
159+
return;
160+
}
161+
assertNotNull(options, "transport must have ApacheHttpClient5Options set");
162+
163+
HttpAsyncResponseConsumerFactory factory = options.getHttpAsyncResponseConsumerFactory();
164+
assertNotNull(factory, "options must have a HttpAsyncResponseConsumerFactory");
165+
166+
AsyncResponseConsumer<?> produced = factory.createHttpAsyncResponseConsumer();
167+
assertInstanceOf(
168+
SafeResponseConsumer.class,
169+
produced,
170+
"production consumer factory must produce a SafeResponseConsumer instance. Got "
171+
+ produced.getClass().getName()
172+
+ ". This means OpenSearchClient.createApacheHttpClient5Transport is missing the "
173+
+ "setOptions(...) call that installs SafeResponseConsumer.");
174+
}
175+
176+
/**
177+
* FIX #3: {@link SafeResponseConsumer} must convert {@code Error} thrown from response
178+
* parsing into a {@code RuntimeException} so the HC5 IOReactor's Exception-path catches it and
179+
* keeps running.
180+
*
181+
* <p>We build a transport whose delegate response consumer throws {@link Error} (simulating an
182+
* OOM during response parsing), wrap it with {@link SafeResponseConsumer}, and assert that:
183+
*
184+
* <ul>
185+
* <li>the request that triggered the Error fails fast with a RuntimeException (not hung);
186+
* <li>a subsequent request does NOT throw "I/O reactor has been shut down" — the reactor
187+
* survived.
188+
* </ul>
189+
*
190+
* If SafeResponseConsumer is broken or missing, the follow-up assertion fails with the literal
191+
* production symptom.
192+
*/
193+
@Test
194+
@org.junit.jupiter.api.Timeout(value = 60, unit = TimeUnit.SECONDS)
195+
void safeResponseConsumerMustKeepReactorAliveWhenDelegateThrowsError() throws Exception {
196+
HttpHost host = new HttpHost("http", opensearch.getHost(), opensearch.getMappedPort(9200));
197+
198+
// Factory produces a fresh delegate + wrapper per request (matches production wiring
199+
// and avoids cross-request state sharing on the delegate).
200+
HttpAsyncResponseConsumerFactory wrappedFactory =
201+
() ->
202+
new SafeResponseConsumer<>(
203+
new AsyncResponseConsumer<ClassicHttpResponse>() {
204+
@Override
205+
public void consumeResponse(
206+
HttpResponse response,
207+
EntityDetails entityDetails,
208+
HttpContext context,
209+
FutureCallback<ClassicHttpResponse> resultCallback) {
210+
throw new Error(
211+
"simulated allocation failure in response consumer on selector thread");
212+
}
213+
214+
@Override
215+
public void informationResponse(HttpResponse response, HttpContext context) {}
216+
217+
@Override
218+
public void failed(Exception cause) {}
219+
220+
@Override
221+
public void updateCapacity(CapacityChannel capacityChannel) {}
222+
223+
@Override
224+
public void consume(ByteBuffer src) {}
225+
226+
@Override
227+
public void streamEnd(List<? extends Header> trailers) {}
228+
229+
@Override
230+
public void releaseResources() {}
231+
});
232+
233+
ApacheHttpClient5Options.Builder optsBuilder = ApacheHttpClient5Options.DEFAULT.toBuilder();
234+
optsBuilder.setHttpAsyncResponseConsumerFactory(wrappedFactory);
235+
236+
ApacheHttpClient5Transport transport =
237+
ApacheHttpClient5TransportBuilder.builder(host)
238+
.setMapper(new JacksonJsonpMapper())
239+
.setHttpClientConfigCallback(
240+
hc -> {
241+
hc.setIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
242+
return hc;
243+
})
244+
.setOptions(optsBuilder.build())
245+
.build();
246+
247+
os.org.opensearch.client.opensearch.OpenSearchClient oc =
248+
new os.org.opensearch.client.opensearch.OpenSearchClient(transport);
249+
250+
try {
251+
// Trigger: consumer throws Error → SafeResponseConsumer rewrites to RuntimeException
252+
// → HC5 Exception-path catches it → request fails, reactor stays alive.
253+
CompletableFuture<Throwable> triggerResult =
254+
CompletableFuture.supplyAsync(
255+
() -> {
256+
try {
257+
oc.cluster().health();
258+
return null;
259+
} catch (Throwable t) {
260+
return t;
261+
}
262+
});
263+
Throwable triggerError;
264+
try {
265+
triggerError = triggerResult.get(5, TimeUnit.SECONDS);
266+
} catch (TimeoutException te) {
267+
fail(
268+
"trigger request hung for 5s — SafeResponseConsumer should rethrow as "
269+
+ "RuntimeException so HC5's Exception path completes the future, not orphan it");
270+
return;
271+
}
272+
assertNotNull(triggerError, "trigger request should have failed with wrapped Error");
273+
assertTrue(
274+
chainOf(triggerError).contains("Error consuming response"),
275+
() -> "expected SafeResponseConsumer wrapping: " + chainOf(triggerError));
276+
277+
// No explicit wait needed: SafeResponseConsumer's rewrap-and-rethrow is synchronous
278+
// inside HC5's Exception path, so the reactor has already settled into its post-error
279+
// state by the time triggerResult.get() returned above.
280+
281+
// Follow-up: reactor must still be alive. This request will ALSO hit the failing consumer
282+
// (same factory) and fail, but NOT with "I/O reactor has been shut down".
283+
CompletableFuture<Throwable> followUpResult =
284+
CompletableFuture.supplyAsync(
285+
() -> {
286+
try {
287+
oc.cluster().health();
288+
return null;
289+
} catch (Throwable t) {
290+
return t;
291+
}
292+
});
293+
Throwable followUpError;
294+
try {
295+
followUpError = followUpResult.get(10, TimeUnit.SECONDS);
296+
} catch (TimeoutException hung) {
297+
fail("follow-up request hung for 10s - reactor appears dead");
298+
return;
299+
}
300+
301+
String followUpChain = chainOf(followUpError);
302+
assertFalse(
303+
followUpChain.contains("I/O reactor has been shut down"),
304+
() ->
305+
"SafeResponseConsumer failed to keep the reactor alive. Follow-up request "
306+
+ "reports reactor shutdown: "
307+
+ followUpChain);
308+
} finally {
309+
try {
310+
transport.close();
311+
} catch (Exception e) {
312+
log.debug("Error closing test transport", e);
313+
}
314+
}
315+
}
316+
317+
private static Object readField(Object target, String fieldName) throws Exception {
318+
Class<?> c = target.getClass();
319+
while (c != null) {
320+
try {
321+
Field f = c.getDeclaredField(fieldName);
322+
f.setAccessible(true);
323+
return f.get(target);
324+
} catch (NoSuchFieldException ignored) {
325+
c = c.getSuperclass();
326+
}
327+
}
328+
throw new NoSuchFieldException(
329+
"field '" + fieldName + "' not found on " + target.getClass().getName() + " or parents");
330+
}
331+
332+
private static String chainOf(Throwable root) {
333+
if (root == null) return "<null>";
334+
StringBuilder sb = new StringBuilder();
335+
for (Throwable t = root; t != null; t = t.getCause()) {
336+
sb.append("\n -> ").append(t.getClass().getName()).append(": ").append(t.getMessage());
337+
if (t.getCause() == t) break;
338+
}
339+
return sb.toString();
340+
}
341+
}

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,10 @@ public Rest5Client getLowLevelRestClient(ElasticSearchConfiguration esConfig) {
760760
esConfig.getKeepAliveTimeoutSecs()));
761761
}
762762

763+
httpAsyncClientBuilder.evictExpiredConnections();
764+
httpAsyncClientBuilder.evictIdleConnections(
765+
org.apache.hc.core5.util.TimeValue.ofSeconds(30));
766+
763767
httpAsyncClientBuilder.useSystemProperties();
764768
});
765769

openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
2727
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
2828
import org.apache.hc.core5.http.HttpHost;
29+
import org.apache.hc.core5.util.TimeValue;
2930
import org.apache.hc.core5.util.Timeout;
3031
import org.jetbrains.annotations.NotNull;
3132
import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipRequest;
@@ -826,11 +827,12 @@ private ApacheHttpClient5Transport createApacheHttpClient5Transport(
826827
if (esConfig.getKeepAliveTimeoutSecs() != null
827828
&& esConfig.getKeepAliveTimeoutSecs() > 0) {
828829
httpClientBuilder.setKeepAliveStrategy(
829-
(response, context) ->
830-
org.apache.hc.core5.util.TimeValue.ofSeconds(
831-
esConfig.getKeepAliveTimeoutSecs()));
830+
(response, context) -> TimeValue.ofSeconds(esConfig.getKeepAliveTimeoutSecs()));
832831
}
833832

833+
httpClientBuilder.evictExpiredConnections();
834+
httpClientBuilder.evictIdleConnections(TimeValue.ofSeconds(30));
835+
834836
httpClientBuilder.useSystemProperties();
835837

836838
return httpClientBuilder;
@@ -842,6 +844,17 @@ private ApacheHttpClient5Transport createApacheHttpClient5Transport(
842844
.setConnectTimeout(Timeout.ofSeconds(esConfig.getConnectionTimeoutSecs()))
843845
.setResponseTimeout(Timeout.ofSeconds(esConfig.getSocketTimeoutSecs())));
844846

847+
var defaultFactory =
848+
os.org.opensearch.client.transport.httpclient5.ApacheHttpClient5Options.DEFAULT
849+
.getHttpAsyncResponseConsumerFactory();
850+
os.org.opensearch.client.transport.httpclient5.HttpAsyncResponseConsumerFactory safeFactory =
851+
() -> new SafeResponseConsumer<>(defaultFactory.createHttpAsyncResponseConsumer());
852+
var optsBuilder =
853+
os.org.opensearch.client.transport.httpclient5.ApacheHttpClient5Options.DEFAULT
854+
.toBuilder();
855+
optsBuilder.setHttpAsyncResponseConsumerFactory(safeFactory);
856+
builder.setOptions(optsBuilder.build());
857+
845858
builder.setCompressionEnabled(true);
846859
builder.setChunkedEnabled(true);
847860
return builder.build();

0 commit comments

Comments
 (0)