Skip to content

Commit 34a19f0

Browse files
authored
[SYSTEMDS-2651] Extend TCP port polling to federated monitoring backend
Wire startLocalFedMonitoring through FederatedWorkerUtils.waitForWorker so the monitoring backend's port-bind is polled instead of slept on (fixes flaky FederatedCoordinatorIntegrationCRUDTest), migrate FederatedLogicalTest to the bulk startLocalFedWorkers(int[]) API, and drop the now-unused FED_WORKER_WAIT_S and FED_MONITOR_WAIT constants.
1 parent b150d8e commit 34a19f0

2 files changed

Lines changed: 49 additions & 35 deletions

File tree

src/test/java/org/apache/sysds/test/AutomatedTestBase.java

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.sysds.test;
2121

2222
import static java.lang.Math.ceil;
23-
import static java.lang.Thread.sleep;
2423
import static org.junit.Assert.assertEquals;
2524
import static org.junit.Assert.fail;
2625

@@ -118,15 +117,10 @@ public abstract class AutomatedTestBase {
118117
public static final double GPU_TOLERANCE = 1e-9;
119118

120119
/**
121-
* Default upper bound (ms) passed to federated worker readiness waits. The wait returns as soon
122-
* as the worker's TCP port accepts a connection, so this value only affects the deadline used
123-
* when a worker never becomes ready. {@link FederatedWorkerUtils} clamps caller values below its
124-
* enforced floor up to that floor, so the effective ceiling is at least that floor regardless
125-
* of this constant.
120+
* Default deadline (ms) for federated worker/monitoring readiness waits and a few legacy
121+
* {@code sleep()} calls. {@link FederatedWorkerUtils} enforces its own minimum floor.
126122
*/
127123
public static final int FED_WORKER_WAIT = 3000;
128-
public static final int FED_MONITOR_WAIT = 10000;
129-
public static final int FED_WORKER_WAIT_S = 50;
130124

131125

132126
// The timeout for a test to fail. all tests must execute in less than this time.
@@ -1765,29 +1759,53 @@ private static Process spawnLocalFedWorker(int port, String[] addArgs) {
17651759
}
17661760

17671761
/**
1768-
* Start new JVM for a federated monitoring backend at the port.
1762+
* Start a new JVM for a federated monitoring backend at the port.
17691763
*
1770-
* @param port Port to use for the JVM
1771-
* @return the process associated with the worker.
1764+
* <p>Returns once the backend's TCP port accepts connections (Netty's bind has completed), or
1765+
* throws a {@link RuntimeException} once the {@link FederatedWorkerUtils} readiness floor
1766+
* elapses.
1767+
*
1768+
* @param port Port to use for the JVM
1769+
* @param addArgs Extra CLI args to append, or null
1770+
* @return the process associated with the monitoring backend.
17721771
*/
17731772
protected Process startLocalFedMonitoring(int port, String[] addArgs) {
1774-
Process process = null;
1773+
return startLocalFedMonitoring(port, addArgs, FED_WORKER_WAIT);
1774+
}
1775+
1776+
/**
1777+
* Start a new JVM for a federated monitoring backend at the port.
1778+
*
1779+
* <p>Returns once the backend's TCP port accepts connections, or throws a
1780+
* {@link RuntimeException} after {@code timeoutMs} elapses. The monitoring server opens the
1781+
* port after Netty's {@code bind().sync()} returns; a successful TCP connect therefore signals
1782+
* that the HTTP listener is ready to accept requests.
1783+
*
1784+
* @param port Port to use for the JVM
1785+
* @param addArgs Extra CLI args to append, or null
1786+
* @param timeoutMs Upper bound on the wait, in ms; raised to a minimum value enforced inside
1787+
* {@link FederatedWorkerUtils}.
1788+
* @return the process associated with the monitoring backend.
1789+
*/
1790+
protected Process startLocalFedMonitoring(int port, String[] addArgs, int timeoutMs) {
1791+
Process process = spawnLocalFedMonitoring(port, addArgs);
1792+
FederatedWorkerUtils.waitForWorker(port, timeoutMs, process::isAlive, "monitoring process");
1793+
return process;
1794+
}
1795+
1796+
/** Spawn a federated monitoring backend JVM and return without waiting for the port to bind. */
1797+
private static Process spawnLocalFedMonitoring(int port, String[] addArgs) {
17751798
String separator = System.getProperty("file.separator");
17761799
String classpath = System.getProperty("java.class.path");
17771800
String path = System.getProperty("java.home") + separator + "bin" + separator + "java";
1778-
String[] args = ArrayUtils.addAll(new String[]{path, "-cp", classpath, DMLScript.class.getName(),
1779-
"-fedMonitoring", Integer.toString(port)}, addArgs);
1780-
ProcessBuilder processBuilder = new ProcessBuilder(args);
1781-
1801+
String[] args = ArrayUtils.addAll(new String[] {path, "-cp", classpath, DMLScript.class.getName(),
1802+
"-fedMonitoring", Integer.toString(port)}, addArgs);
17821803
try {
1783-
process = processBuilder.start();
1784-
// Wait till process is started
1785-
sleep(FED_MONITOR_WAIT);
1804+
return new ProcessBuilder(args).start();
17861805
}
1787-
catch(IOException | InterruptedException e) {
1788-
throw new RuntimeException(e);
1806+
catch(IOException e) {
1807+
throw new RuntimeException("Failed to launch federated monitoring process on port " + port, e);
17891808
}
1790-
return process;
17911809
}
17921810

17931811
/**

src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -372,17 +372,15 @@ public void federatedLogicalTest(String testname, Type op_type, ExecMode execMod
372372
// empty script name because we don't execute any script, just start the worker
373373
fullDMLScriptName = "";
374374
int port1 = getRandomAvailablePort();
375-
int port2 = (!single_fed_worker ? getRandomAvailablePort() : 0);
376-
int port3 = (!single_fed_worker ? getRandomAvailablePort() : 0);
377-
int port4 = (!single_fed_worker ? getRandomAvailablePort() : 0);
378-
Process thread1 = startLocalFedWorker(port1, (!single_fed_worker ? FED_WORKER_WAIT_S : FED_WORKER_WAIT));
379-
Process thread2 = (!single_fed_worker ? startLocalFedWorker(port2, FED_WORKER_WAIT_S) : null);
380-
Process thread3 = (!single_fed_worker ? startLocalFedWorker(port3, FED_WORKER_WAIT_S) : null);
381-
Process thread4 = (!single_fed_worker ? startLocalFedWorker(port4) : null);
382-
383-
375+
int port2 = single_fed_worker ? 0 : getRandomAvailablePort();
376+
int port3 = single_fed_worker ? 0 : getRandomAvailablePort();
377+
int port4 = single_fed_worker ? 0 : getRandomAvailablePort();
378+
Process[] workers = startLocalFedWorkers(single_fed_worker
379+
? new int[] {port1}
380+
: new int[] {port1, port2, port3, port4});
381+
384382
try {
385-
if(!isAlive(thread1))
383+
if(!isAlive(workers))
386384
throw new RuntimeException("Failed starting federated worker");
387385

388386
getAndLoadTestConfiguration(testname);
@@ -449,9 +447,7 @@ public void federatedLogicalTest(String testname, Type op_type, ExecMode execMod
449447
}
450448
}
451449
finally {
452-
TestUtils.shutdownThreads(thread1);
453-
if(!single_fed_worker)
454-
TestUtils.shutdownThreads(thread2, thread3, thread4);
450+
TestUtils.shutdownThreads(workers);
455451

456452
resetExecMode(platform_old);
457453
}

0 commit comments

Comments
 (0)