Skip to content

Commit 0337399

Browse files
committed
avoid memory leak
1 parent fc3b4e1 commit 0337399

2 files changed

Lines changed: 100 additions & 100 deletions

File tree

flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
import java.time.ZoneId;
3534
import java.util.Arrays;
3635
import java.util.List;
3736
import java.util.UUID;
@@ -62,9 +61,7 @@ public void testDoris2Doris() throws Exception {
6261
env.setParallelism(2);
6362
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
6463
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
65-
tEnv.getConfig()
66-
.getConfiguration()
67-
.setString("table.local-time-zone", ZoneId.systemDefault().getId());
64+
tEnv.getConfig().getConfiguration().setString("table.local-time-zone", "Asia/Tokyo");
6865
String sourceDDL =
6966
String.format(
7067
"CREATE TABLE doris_source ("

flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java

Lines changed: 99 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import org.testcontainers.containers.GenericContainer;
3030
import org.testcontainers.containers.Network;
3131
import org.testcontainers.containers.output.Slf4jLogConsumer;
32-
import org.testcontainers.containers.wait.strategy.Wait;
3332
import org.testcontainers.shaded.org.awaitility.Awaitility;
3433
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;
3534
import org.testcontainers.utility.DockerLoggerFactory;
3635

3736
import java.io.BufferedReader;
37+
import java.io.IOException;
3838
import java.io.InputStream;
3939
import java.io.InputStreamReader;
4040
import java.net.MalformedURLException;
@@ -53,6 +53,7 @@
5353
import java.util.List;
5454
import java.util.Map;
5555
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.atomic.AtomicBoolean;
5657
import java.util.concurrent.locks.LockSupport;
5758

5859
public class DorisContainer implements ContainerService {
@@ -69,6 +70,8 @@ public class DorisContainer implements ContainerService {
6970
private static final String PASSWORD = "";
7071
private final GenericContainer<?> dorisContainer;
7172
private final String systemTimeZone = ZoneId.systemDefault().getId();
73+
private static URLClassLoader jdbcClassLoader;
74+
private static final AtomicBoolean driverInitialized = new AtomicBoolean(false);
7275

7376
public DorisContainer() {
7477
dorisContainer = createDorisContainer();
@@ -123,13 +126,17 @@ public GenericContainer<?> createDorisContainer() {
123126

124127
public void startContainer() {
125128
try {
129+
if (dorisContainer.isRunning()) {
130+
return;
131+
}
126132
LOG.info("Starting doris containers.");
127133
// singleton doris container
128134
dorisContainer.start();
129-
// Wait for container to reach running state during first startup
130-
waitForContainerRunning();
131-
ExecResult feExecResult = dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf");
132-
ExecResult beExecResult = dorisContainer.execInContainer("cat", "/root/be/conf/be.conf");
135+
// print Doris configuration information.
136+
ExecResult feExecResult =
137+
dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf");
138+
ExecResult beExecResult =
139+
dorisContainer.execInContainer("cat", "/root/be/conf/be.conf");
133140
LOG.info("FE config: {}", feExecResult.getStdout());
134141
LOG.info("BE config: {}", beExecResult.getStdout());
135142
initializeJdbcConnection();
@@ -152,9 +159,7 @@ public void restartContainer() {
152159
.restartContainerCmd(dorisContainer.getContainerId())) {
153160
restartCmd.exec();
154161
LOG.info("Restart command executed, waiting for container services to be ready");
155-
// Add service detection logic to ensure Doris container services are fully initialized
156-
// and ready.
157-
waitForContainerRunning();
162+
initializeJdbcConnection();
158163
} catch (Exception e) {
159164
LOG.error("Failed to restart Doris container", e);
160165
throw new RuntimeException("Container restart failed", e);
@@ -190,89 +195,6 @@ private void initializeVariables() throws Exception {
190195
LOG.info("Init variables successfully.");
191196
}
192197

193-
// wait for container running
194-
private void waitForContainerRunning() {
195-
LOG.info("Waiting for Doris services to be accessible...");
196-
197-
try {
198-
Awaitility.await("FE HTTP Service")
199-
.atMost(5, TimeUnit.MINUTES)
200-
.pollInterval(1, TimeUnit.SECONDS)
201-
.until(
202-
() -> {
203-
try {
204-
ExecResult result =
205-
dorisContainer.execInContainer(
206-
"curl",
207-
"-s",
208-
"-o",
209-
"/dev/null",
210-
"-w",
211-
"%{http_code}",
212-
"-m",
213-
"2",
214-
"http://localhost:" + FE.HTTP_PORT);
215-
boolean ready = result.getStdout().equals("200");
216-
LOG.info(
217-
"FE HTTP service on port {} is ready: {}",
218-
FE.HTTP_PORT,
219-
ready);
220-
if (ready) {
221-
LOG.info(
222-
"FE HTTP service on port {} is ready",
223-
FE.HTTP_PORT);
224-
}
225-
return ready;
226-
} catch (Exception e) {
227-
LOG.debug(
228-
"Exception while checking FE HTTP service: {}",
229-
e.getMessage());
230-
return false;
231-
}
232-
});
233-
234-
Awaitility.await("BE HTTP Service")
235-
.atMost(5, TimeUnit.MINUTES)
236-
.pollInterval(1, TimeUnit.SECONDS)
237-
.until(
238-
() -> {
239-
try {
240-
ExecResult result =
241-
dorisContainer.execInContainer(
242-
"curl",
243-
"-s",
244-
"-o",
245-
"/dev/null",
246-
"-w",
247-
"%{http_code}",
248-
"-m",
249-
"2",
250-
"http://localhost:" + BE.WEBSERVICE_PORT);
251-
boolean ready = "200".equals(result.getStdout().trim());
252-
if (ready) {
253-
LOG.info(
254-
"BE HTTP service on port {} is ready",
255-
BE.WEBSERVICE_PORT);
256-
} else {
257-
LOG.debug(
258-
"BE HTTP service on port {} not ready yet, HTTP status: {}",
259-
BE.WEBSERVICE_PORT,
260-
result.getStdout().trim());
261-
}
262-
return ready;
263-
} catch (Exception e) {
264-
LOG.debug(
265-
"Exception while checking BE HTTP service: {}",
266-
e.getMessage());
267-
return false;
268-
}
269-
});
270-
271-
} catch (ConditionTimeoutException e) {
272-
LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready");
273-
}
274-
}
275-
276198
@Override
277199
public String getJdbcUrl() {
278200
return String.format(JDBC_URL, dorisContainer.getHost());
@@ -314,18 +236,34 @@ public void close() {
314236
dorisContainer.close();
315237
LOG.info("Doris container closed successfully.");
316238
}
239+
240+
closeJdbcClassLoader();
317241
}
318242

319243
private void initializeJDBCDriver() throws MalformedURLException {
320-
URLClassLoader urlClassLoader =
321-
new URLClassLoader(
322-
new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader());
323-
LOG.info("Try to connect to Doris.");
324-
Thread.currentThread().setContextClassLoader(urlClassLoader);
244+
// Checks if the driver has already been initialized to avoid memory leak and class loading
245+
// issues.
246+
if (driverInitialized.get()) {
247+
LOG.debug("JDBC driver already initialized, skipping initialization");
248+
return;
249+
}
250+
251+
LOG.info("Initializing JDBC driver");
252+
if (jdbcClassLoader == null) {
253+
jdbcClassLoader =
254+
new URLClassLoader(
255+
new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader());
256+
}
257+
258+
Thread.currentThread().setContextClassLoader(jdbcClassLoader);
259+
driverInitialized.set(true);
325260
}
326261

327262
private void initializeJdbcConnection() throws Exception {
328263
initializeJDBCDriver();
264+
// before connecting to Doris, wait for Doris FE to start,which is to avoid connect Doris
265+
// failed when Doris FE is not ready.
266+
waitDorisFeRunning();
329267
try (Connection connection = getQueryConnection();
330268
Statement statement = connection.createStatement()) {
331269
ResultSet resultSet;
@@ -337,6 +275,71 @@ private void initializeJdbcConnection() throws Exception {
337275
LOG.info("Connected to Doris successfully.");
338276
}
339277

278+
private synchronized void closeJdbcClassLoader() {
279+
if (jdbcClassLoader != null) {
280+
try {
281+
jdbcClassLoader.close();
282+
jdbcClassLoader = null;
283+
driverInitialized.set(false);
284+
LOG.info("JDBC class loader closed successfully");
285+
} catch (IOException e) {
286+
LOG.warn("Failed to close JDBC class loader", e);
287+
}
288+
}
289+
}
290+
291+
/**
292+
* Wait for Doris container to running. If the Doris FE not startup completely, try to connect
293+
* it again until the doris FE is ready.
294+
*/
295+
private void waitDorisFeRunning() {
296+
LOG.info("Waiting for Doris services to be accessible...");
297+
298+
// Poll Doris FE HTTP service every second with a maximum wait time of 5 minutes
299+
// If the service is not available within this time, a timeout exception will be thrown
300+
try {
301+
Awaitility.await("FE HTTP Service")
302+
.atMost(5, TimeUnit.MINUTES)
303+
.pollInterval(1, TimeUnit.SECONDS)
304+
.until(
305+
() -> {
306+
try {
307+
ExecResult result =
308+
dorisContainer.execInContainer(
309+
"curl",
310+
"-s",
311+
"-o",
312+
"/dev/null",
313+
"-w",
314+
"%{http_code}",
315+
"-m",
316+
"2",
317+
"http://localhost:" + FE.HTTP_PORT);
318+
boolean ready = result.getStdout().equals("200");
319+
LOG.info(
320+
"FE HTTP service on port {} is ready: {}",
321+
FE.HTTP_PORT,
322+
ready);
323+
324+
if (ready) {
325+
LOG.info(
326+
"FE HTTP service on port {} is ready",
327+
FE.HTTP_PORT);
328+
}
329+
return ready;
330+
} catch (Exception e) {
331+
LOG.debug(
332+
"Exception while checking FE HTTP service: {}",
333+
e.getMessage());
334+
return false;
335+
}
336+
});
337+
338+
} catch (ConditionTimeoutException e) {
339+
LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready");
340+
}
341+
}
342+
340343
private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
341344
LockSupport.parkNanos(duration.toNanos());
342345
if (rs.next()) {

0 commit comments

Comments
 (0)