Skip to content

Commit 152aa46

Browse files
committed
[SYSTEMDS-2651] Extend TCP port polling to federated monitoring backend
Wire startLocalFedMonitoring through FederatedWorkerUtils.waitForWorker so the monitoring backend's port-bind is polled instead of slept on (fixes flaky FederatedCoordinatorIntegrationCRUDTest), migrate FederatedLogicalTest to the bulk startLocalFedWorkers(int[]) API, and drop the now-unused FED_WORKER_WAIT_S and FED_MONITOR_WAIT constants.
1 parent 684531d commit 152aa46

2 files changed

Lines changed: 49 additions & 34 deletions

File tree

src/test/java/org/apache/sysds/test/AutomatedTestBase.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,10 @@ public abstract class AutomatedTestBase {
118118
public static final double GPU_TOLERANCE = 1e-9;
119119

120120
/**
121-
* Default upper bound (ms) passed to federated worker readiness waits. The wait returns as soon
122-
* as the worker's TCP port accepts a connection, so this value only affects the deadline used
123-
* when a worker never becomes ready. {@link FederatedWorkerUtils} clamps caller values below its
124-
* enforced floor up to that floor, so the effective ceiling is at least that floor regardless
125-
* of this constant.
121+
* Default deadline (ms) for federated worker/monitoring readiness waits and a few legacy
122+
* {@code sleep()} calls. {@link FederatedWorkerUtils} enforces its own minimum floor.
126123
*/
127124
public static final int FED_WORKER_WAIT = 3000;
128-
public static final int FED_MONITOR_WAIT = 10000;
129-
public static final int FED_WORKER_WAIT_S = 50;
130125

131126

132127
// The timeout for a test to fail. all tests must execute in less than this time.
@@ -1765,29 +1760,53 @@ private static Process spawnLocalFedWorker(int port, String[] addArgs) {
17651760
}
17661761

17671762
/**
1768-
* Start new JVM for a federated monitoring backend at the port.
1763+
* Start a new JVM for a federated monitoring backend at the port.
17691764
*
1770-
* @param port Port to use for the JVM
1771-
* @return the process associated with the worker.
1765+
* <p>Returns once the backend's TCP port accepts connections (Netty's bind has completed), or
1766+
* throws a {@link RuntimeException} once the {@link FederatedWorkerUtils} readiness floor
1767+
* elapses.
1768+
*
1769+
* @param port Port to use for the JVM
1770+
* @param addArgs Extra CLI args to append, or null
1771+
* @return the process associated with the monitoring backend.
17721772
*/
17731773
protected Process startLocalFedMonitoring(int port, String[] addArgs) {
1774-
Process process = null;
1774+
return startLocalFedMonitoring(port, addArgs, FED_WORKER_WAIT);
1775+
}
1776+
1777+
/**
1778+
* Start a new JVM for a federated monitoring backend at the port.
1779+
*
1780+
* <p>Returns once the backend's TCP port accepts connections, or throws a
1781+
* {@link RuntimeException} after {@code timeoutMs} elapses. The monitoring server opens the
1782+
* port after Netty's {@code bind().sync()} returns; a successful TCP connect therefore signals
1783+
* that the HTTP listener is ready to accept requests.
1784+
*
1785+
* @param port Port to use for the JVM
1786+
* @param addArgs Extra CLI args to append, or null
1787+
* @param timeoutMs Upper bound on the wait, in ms; raised to a minimum value enforced inside
1788+
* {@link FederatedWorkerUtils}.
1789+
* @return the process associated with the monitoring backend.
1790+
*/
1791+
protected Process startLocalFedMonitoring(int port, String[] addArgs, int timeoutMs) {
1792+
Process process = spawnLocalFedMonitoring(port, addArgs);
1793+
FederatedWorkerUtils.waitForWorker(port, timeoutMs, process::isAlive, "monitoring process");
1794+
return process;
1795+
}
1796+
1797+
/** Spawn a federated monitoring backend JVM and return without waiting for the port to bind. */
1798+
private static Process spawnLocalFedMonitoring(int port, String[] addArgs) {
17751799
String separator = System.getProperty("file.separator");
17761800
String classpath = System.getProperty("java.class.path");
17771801
String path = System.getProperty("java.home") + separator + "bin" + separator + "java";
1778-
String[] args = ArrayUtils.addAll(new String[]{path, "-cp", classpath, DMLScript.class.getName(),
1779-
"-fedMonitoring", Integer.toString(port)}, addArgs);
1780-
ProcessBuilder processBuilder = new ProcessBuilder(args);
1781-
1802+
String[] args = ArrayUtils.addAll(new String[] {path, "-cp", classpath, DMLScript.class.getName(),
1803+
"-fedMonitoring", Integer.toString(port)}, addArgs);
17821804
try {
1783-
process = processBuilder.start();
1784-
// Wait till process is started
1785-
sleep(FED_MONITOR_WAIT);
1805+
return new ProcessBuilder(args).start();
17861806
}
1787-
catch(IOException | InterruptedException e) {
1788-
throw new RuntimeException(e);
1807+
catch(IOException e) {
1808+
throw new RuntimeException("Failed to launch federated monitoring process on port " + port, e);
17891809
}
1790-
return process;
17911810
}
17921811

17931812
/**

src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -372,17 +372,15 @@ public void federatedLogicalTest(String testname, Type op_type, ExecMode execMod
372372
// empty script name because we don't execute any script, just start the worker
373373
fullDMLScriptName = "";
374374
int port1 = getRandomAvailablePort();
375-
int port2 = (!single_fed_worker ? getRandomAvailablePort() : 0);
376-
int port3 = (!single_fed_worker ? getRandomAvailablePort() : 0);
377-
int port4 = (!single_fed_worker ? getRandomAvailablePort() : 0);
378-
Process thread1 = startLocalFedWorker(port1, (!single_fed_worker ? FED_WORKER_WAIT_S : FED_WORKER_WAIT));
379-
Process thread2 = (!single_fed_worker ? startLocalFedWorker(port2, FED_WORKER_WAIT_S) : null);
380-
Process thread3 = (!single_fed_worker ? startLocalFedWorker(port3, FED_WORKER_WAIT_S) : null);
381-
Process thread4 = (!single_fed_worker ? startLocalFedWorker(port4) : null);
382-
383-
375+
int port2 = single_fed_worker ? 0 : getRandomAvailablePort();
376+
int port3 = single_fed_worker ? 0 : getRandomAvailablePort();
377+
int port4 = single_fed_worker ? 0 : getRandomAvailablePort();
378+
Process[] workers = startLocalFedWorkers(single_fed_worker
379+
? new int[] {port1}
380+
: new int[] {port1, port2, port3, port4});
381+
384382
try {
385-
if(!isAlive(thread1))
383+
if(!isAlive(workers))
386384
throw new RuntimeException("Failed starting federated worker");
387385

388386
getAndLoadTestConfiguration(testname);
@@ -449,9 +447,7 @@ public void federatedLogicalTest(String testname, Type op_type, ExecMode execMod
449447
}
450448
}
451449
finally {
452-
TestUtils.shutdownThreads(thread1);
453-
if(!single_fed_worker)
454-
TestUtils.shutdownThreads(thread2, thread3, thread4);
450+
TestUtils.shutdownThreads(workers);
455451

456452
resetExecMode(platform_old);
457453
}

0 commit comments

Comments
 (0)