Skip to content

Commit 65e734e

Browse files
authored
[MINOR][CI] Fix leaked threads hanging Java test forks (#2488)
Several Java test suites (most visibly **.component.c** and data.misc/lineage) intermittently ran until the GitHub Actions job timeout even though the tests themselves had completed. The cause was leaked non-daemon threads keeping the surefire fork JVM alive, so the fork never exited and the job stalled until cancelled. There were two sources: in-JVM federated workers (FederatedWorker's Netty event loops and the test-side worker wrapper threads were non-daemon), and CommonThreadPool's fallback pools — when called off the main thread, it returned Executors.newFixedThreadPool/newCachedThreadPool, which default to non-daemon threads, while only the ForkJoinPool-backed variants were already daemon. This PR makes those threads daemon at the source: FederatedWorker now creates its Netty event-loop groups with a daemon thread factory, and CommonThreadPool routes its fixed/cached fallbacks through one too, so daemon behavior is uniform across all pool variants. On the test side, AutomatedTestBase marks spawned worker threads as daemon, TestUtils.shutdownThread bounds its join (30s, warns on stragglers, restores the interrupt flag), and the lineage tests (LineageFedReuseAlg, FedFullReuseTest, FedUDFReuseTest) now shut workers down in a finally block so failures no longer leak workers (the large line counts there are just reindentation from the try/finally wrap). The javaTests.yml job cap stays at 30 minutes, with a comment documenting why it sits above the 600s per-fork surefire timeout, which remains the backstop for genuine hangs.
1 parent ee69e51 commit 65e734e

14 files changed

Lines changed: 142 additions & 96 deletions

File tree

.github/workflows/javaTests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ concurrency:
5050
jobs:
5151
java_tests:
5252
runs-on: ubuntu-24.04
53+
# Job cap kept above the per-fork surefire timeout (test-forkedProcessTimeout,
54+
# 600s) so surefire can kill a hung fork before GitHub Actions cancels the job.
5355
timeout-minutes: 30
5456
strategy:
5557
fail-fast: false

dev/release/src/test/java/org/apache/sysds/validation/Utility.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public static int runCommand(String [] command, String strCurDir, String strOutp
185185
try {
186186
exitValue = process.waitFor();
187187
} catch (InterruptedException ie) {
188+
Thread.currentThread().interrupt();
188189
debugPrint(Constants.DEBUG_ERROR, "Program interrunpted: " + ie);
189190
}
190191
debugPrint(Constants.DEBUG_CODE, "Program '" + String.join(" ", command) + "' exited with exit status " + exitValue, strOutputFile);

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import io.netty.handler.ssl.SslContext;
6666
import io.netty.handler.ssl.SslContextBuilder;
6767
import io.netty.handler.ssl.util.SelfSignedCertificate;
68+
import io.netty.util.concurrent.DefaultThreadFactory;
6869

6970
@SuppressWarnings("deprecation")
7071
public class FederatedWorker {
@@ -99,9 +100,11 @@ private void run() {
99100
LOG.info("Setting up Federated Worker on port " + _port);
100101
int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
101102
final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
102-
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
103+
// Daemon event loops so a leaked in-JVM (test) worker cannot block JVM exit.
104+
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1,
105+
new DefaultThreadFactory("fed-worker-boss", true));
103106
ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
104-
new SynchronousQueue<Runnable>(true));
107+
new SynchronousQueue<Runnable>(true), new DefaultThreadFactory("fed-worker-pool", true));
105108
NioEventLoopGroup workerGroup = new NioEventLoopGroup(EVENT_LOOP_THREADS, workerTPE);
106109

107110
final boolean ssl = ConfigurationManager.isFederatedSSL();

src/main/java/org/apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ public void shutdown() {
148148
_q[i].close();
149149
}
150150
}
151-
catch(InterruptedException ignored) {
151+
catch(InterruptedException e) {
152+
Thread.currentThread().interrupt();
152153
}
153154
}
154155
_writeExec.getQueue().clear();
@@ -174,7 +175,8 @@ public CompletableFuture<Void> scheduleEviction(BlockEntry block) {
174175
int i = (int)(q % WRITER_SIZE);
175176
_q[i].enqueueIfOpen(new Tuple2<>(block, future));
176177
}
177-
catch(InterruptedException ignored) {
178+
catch(InterruptedException e) {
179+
Thread.currentThread().interrupt();
178180
}
179181

180182
return future;

src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.Executors;
3030
import java.util.concurrent.ForkJoinPool;
3131
import java.util.concurrent.Future;
32+
import java.util.concurrent.ThreadFactory;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
3435

@@ -141,11 +142,26 @@ else if(mainThread || threadName.contains("PARFOR") || threadName.contains("FedE
141142
incorrectPoolUse = true;
142143
}
143144

144-
return Executors.newFixedThreadPool(k);
145+
return Executors.newFixedThreadPool(k, daemonThreadFactory());
145146

146147
}
147148
}
148149

150+
/**
151+
* Thread factory that produces daemon threads. The ForkJoinPool-backed pools already use daemon
152+
* threads; the fallback {@link Executors#newFixedThreadPool} and {@link Executors#newCachedThreadPool}
153+
* pools default to non-daemon threads, which can keep the JVM (e.g. a surefire test fork) alive
154+
* if a caller forgets to shut the pool down. Making them daemon keeps that behavior uniform.
155+
*/
156+
private static ThreadFactory daemonThreadFactory() {
157+
final ThreadFactory base = Executors.defaultThreadFactory();
158+
return r -> {
159+
Thread t = base.newThread(r);
160+
t.setDaemon(true);
161+
return t;
162+
};
163+
}
164+
149165
/**
150166
* Invoke the collection of tasks and shutdown the pool upon job termination.
151167
*
@@ -180,7 +196,7 @@ public synchronized static ExecutorService getDynamicPool() {
180196
// It is guaranteed not to be shut down because of the synchronized barrier
181197
return asyncPool;
182198
else {
183-
asyncPool = Executors.newCachedThreadPool();
199+
asyncPool = Executors.newCachedThreadPool(daemonThreadFactory());
184200
return asyncPool;
185201
}
186202
}

src/test/java/org/apache/sysds/performance/generators/GenMatrices.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void generate(int N) throws InterruptedException {
7272
}
7373
}
7474
catch(InterruptedException e) {
75-
e.printStackTrace();
75+
Thread.currentThread().interrupt();
7676
}
7777
});
7878
}

src/test/java/org/apache/sysds/test/AutomatedTestBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,9 @@ private static Thread spawnLocalFedWorkerThread(int port, String[] otherArgs) {
19391939
LOG.error("Exception in startup of federated worker", e);
19401940
}
19411941
});
1942+
// Daemon so a worker left running by a failed/forgetful test cannot keep the
1943+
// surefire fork JVM alive and stall CI until the job-level timeout.
1944+
t.setDaemon(true);
19421945
t.start();
19431946
return t;
19441947
}
@@ -1979,6 +1982,9 @@ public static Thread startLocalFedWorkerWithArgs(String[] args) {
19791982
LOG.error("Exception in startup of federated worker on port " + port, e);
19801983
}
19811984
});
1985+
// Daemon so a worker left running by a failed/forgetful test cannot keep the
1986+
// surefire fork JVM alive and stall CI until the job-level timeout.
1987+
t.setDaemon(true);
19821988
t.start();
19831989
FederatedWorkerUtils.waitForWorker(t, port, FED_WORKER_WAIT);
19841990
return t;

src/test/java/org/apache/sysds/test/TestUtils.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3489,15 +3489,23 @@ public static void shutdownThreads(Process... ts) {
34893489
}
34903490
}
34913491

3492+
/** Upper bound (ms) on how long {@link #shutdownThread(Thread)} waits for a worker to stop. */
3493+
private static final long THREAD_SHUTDOWN_JOIN_MS = 30_000;
3494+
34923495
public static void shutdownThread(Thread t) {
34933496
// kill the worker
34943497
if( t != null ) {
34953498
t.interrupt();
34963499
try {
3497-
t.join();
3500+
// Bounded join: workers are daemon threads, so even if one ignores the interrupt
3501+
// we must not block cleanup (and the JVM) indefinitely waiting for it.
3502+
t.join(THREAD_SHUTDOWN_JOIN_MS);
3503+
if( t.isAlive() )
3504+
LOG.warn("Federated worker thread " + t.getName()
3505+
+ " did not stop within " + THREAD_SHUTDOWN_JOIN_MS + "ms; leaving it as a daemon.");
34983506
}
34993507
catch (InterruptedException e) {
3500-
e.printStackTrace();
3508+
Thread.currentThread().interrupt();
35013509
}
35023510
}
35033511
}
@@ -3514,7 +3522,8 @@ public static void shutdownThread(Process t) {
35143522
forciblyDestroyed.waitFor(); // Wait until it's definitely terminated
35153523
}
35163524
} catch (InterruptedException e) {
3517-
e.printStackTrace();
3525+
LOG.warn("Interrupted while shutting down federated worker process", e);
3526+
Thread.currentThread().interrupt();
35183527
}
35193528
}
35203529
}

src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedBackendPerformanceTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ public void testBackendPerformance() throws InterruptedException {
9191
taskFutures.forEach(res -> {
9292
try {
9393
Assert.assertEquals("Stats parsed correctly", res.get().statusCode(), 200);
94-
} catch (InterruptedException | ExecutionException e) {
95-
e.printStackTrace();
94+
} catch (InterruptedException e) {
95+
Thread.currentThread().interrupt();
96+
Assert.fail("Interrupted while fetching statistics: " + e.getMessage());
97+
} catch (ExecutionException e) {
98+
Assert.fail("Failed to fetch statistics: " + e.getMessage());
9699
}
97100
});
98101

src/test/java/org/apache/sysds/test/functions/federated/primitives/part5/FederatedMatrixScalarOperationsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ private void runGenericTest(String dmlFile, int scalar) {
209209
compareResults();
210210
}
211211
catch(InterruptedException e) {
212-
e.printStackTrace();
212+
Thread.currentThread().interrupt();
213213
assert (false);
214214
}
215215
finally {

0 commit comments

Comments
 (0)