@@ -117,9 +117,15 @@ public abstract class AutomatedTestBase {
117117 public static boolean TEST_GPU = false ;
118118 public static final double GPU_TOLERANCE = 1e-9 ;
119119
120- // ms wait time
121- public static final int FED_WORKER_WAIT = 3000 ;
122- public static final int FED_MONITOR_WAIT = 10000 ;
120+ /**
121+ * Default upper bound (ms) passed to federated worker readiness waits. The wait returns as soon
122+ * as the worker's TCP port accepts a connection, so this value only affects the deadline used
123+ * when a worker never becomes ready. {@link FederatedWorkerUtils} clamps caller values below its
124+ * enforced floor up to that floor, so the effective ceiling is at least that floor regardless
125+ * of this constant.
126+ */
127+ public static final int FED_WORKER_WAIT = 3000 ;
128+ public static final int FED_MONITOR_WAIT = 10000 ;
123129 public static final int FED_WORKER_WAIT_S = 50 ;
124130
125131
@@ -1642,13 +1648,14 @@ protected Process startLocalFedWorker(int port){
16421648
16431649 /**
16441650 * Start a new JVM for a federated worker at the port.
1645- *
1646- * @param port Port to use for the JVM
1647- * @param sleep The sleep time to wait for the worker to start
1651+ *
1652+ * @param port Port to use for the JVM
1653+ * @param timeoutMs Upper bound on the wait for the worker to become ready, in ms; raised to a
1654+ * minimum value enforced inside {@link FederatedWorkerUtils}.
16481655 * @return The process containing the worker
16491656 */
1650- protected Process startLocalFedWorker (int port , int sleep ){
1651- return startLocalFedWorker (port , null , sleep );
1657+ protected Process startLocalFedWorker (int port , int timeoutMs ){
1658+ return startLocalFedWorker (port , null , timeoutMs );
16521659 }
16531660
16541661 /**
@@ -1665,18 +1672,64 @@ protected Process startLocalFedWorker(int port, String[] addArgs) {
16651672
16661673 /**
16671674 * Start new JVM for a federated worker at the port.
1668- *
1669- * @param port Port to use for the JVM
1670- * @param addArgs The arguments to add
1671- * @param sleep The time to wait for the process to start
1675+ *
1676+ * <p>Returns once the worker's TCP port accepts connections (the worker opens the port after
1677+ * Netty's bind completes), or throws a {@link RuntimeException} after {@code timeoutMs} elapses.
1678+ *
1679+ * @param port Port to use for the JVM
1680+ * @param addArgs The arguments to add
1681+ * @param timeoutMs Upper bound on the wait for the worker to become ready, in ms; raised to a
1682+ * minimum value enforced inside {@link FederatedWorkerUtils}.
16721683 * @return the process associated with the worker.
16731684 */
1674- protected static Process startLocalFedWorker (int port , String [] addArgs , int sleep ) {
1675- Process process = null ;
1685+ protected static Process startLocalFedWorker (int port , String [] addArgs , int timeoutMs ) {
1686+ Process process = spawnLocalFedWorker (port , addArgs );
1687+ FederatedWorkerUtils .waitForWorker (process , port , timeoutMs );
1688+ return process ;
1689+ }
1690+
1691+ /**
1692+ * Start N federated worker JVMs back to back, then wait for all of them to become ready in one
1693+ * shared poll loop. The wall-clock wait scales with the slowest worker rather than the sum of the
1694+ * per-worker waits.
1695+ *
1696+ * @param ports Ports to use, one per worker
1697+ * @return The process per port, in the same order as {@code ports}.
1698+ */
1699+ protected static Process [] startLocalFedWorkers (int [] ports ) {
1700+ return startLocalFedWorkers (ports , null , FED_WORKER_WAIT );
1701+ }
1702+
1703+ /** @see #startLocalFedWorkers(int[], String[], int) */
1704+ protected static Process [] startLocalFedWorkers (int [] ports , String [] addArgs ) {
1705+ return startLocalFedWorkers (ports , addArgs , FED_WORKER_WAIT );
1706+ }
1707+
1708+ /**
1709+ * Start N federated worker JVMs back to back, then wait for all of them to become ready in one
1710+ * shared poll loop.
1711+ *
1712+ * @param ports Ports to use, one per worker
1713+ * @param addArgs Extra worker CLI args (applied to every worker), or null
1714+ * @param timeoutMs Upper bound on the wait, in ms; raised to a minimum value enforced inside
1715+ * {@link FederatedWorkerUtils}.
1716+ * @return The process per port, in the same order as {@code ports}.
1717+ */
1718+ protected static Process [] startLocalFedWorkers (int [] ports , String [] addArgs , int timeoutMs ) {
1719+ Process [] processes = new Process [ports .length ];
1720+ for (int i = 0 ; i < ports .length ; i ++) {
1721+ processes [i ] = spawnLocalFedWorker (ports [i ], addArgs );
1722+ }
1723+ FederatedWorkerUtils .waitForWorkers (processes , ports , timeoutMs );
1724+ return processes ;
1725+ }
1726+
1727+ /** Spawn a federated worker JVM and return without waiting for the port to bind. */
1728+ private static Process spawnLocalFedWorker (int port , String [] addArgs ) {
16761729 String separator = System .getProperty ("file.separator" );
16771730 String classpath = System .getProperty ("java.class.path" );
16781731 String path = System .getProperty ("java.home" ) + separator + "bin" + separator + "java" ;
1679- String [] args = new String [] {path , "-Xmx1000m" , "-Xms1000m" , "-Xmn100m" ,
1732+ String [] args = new String [] {path , "-Xmx1000m" , "-Xms1000m" , "-Xmn100m" ,
16801733 "--add-opens=java.base/java.nio=ALL-UNNAMED" ,
16811734 "--add-opens=java.base/java.io=ALL-UNNAMED" ,
16821735 "--add-opens=java.base/java.util=ALL-UNNAMED" ,
@@ -1701,19 +1754,14 @@ protected static Process startLocalFedWorker(int port, String[] addArgs, int sle
17011754 DMLScript .class .getName (), "-w" , Integer .toString (port ), "-stats" });
17021755 if (addArgs != null )
17031756 args = ArrayUtils .addAll (args , addArgs );
1704-
1705- ProcessBuilder processBuilder = new ProcessBuilder (args ).inheritIO ();
17061757
1758+ ProcessBuilder processBuilder = new ProcessBuilder (args ).inheritIO ();
17071759 try {
1708- process = processBuilder .start ();
1709- // Give some time to startup the worker.
1710- sleep (sleep );
1760+ return processBuilder .start ();
17111761 }
1712- catch (IOException | InterruptedException e ) {
1713- e . printStackTrace ( );
1762+ catch (IOException e ) {
1763+ throw new RuntimeException ( "Failed to launch federated worker process on port " + port , e );
17141764 }
1715- isAlive (process );
1716- return process ;
17171765 }
17181766
17191767 /**
@@ -1743,7 +1791,7 @@ protected Process startLocalFedMonitoring(int port, String[] addArgs) {
17431791 }
17441792
17451793 /**
1746- * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.!
1794+ * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.
17471795 *
17481796 * Also when using the local Fed Worker thread the statistics printing, and clearing from the worker is disabled.
17491797 *
@@ -1769,63 +1817,112 @@ public static Thread startLocalFedWorkerThread(int port, String[] otherArgs) {
17691817 }
17701818
17711819 /**
1772- * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.!
1820+ * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.
17731821 *
17741822 * Also when using the local Fed Worker thread the statistics printing, and clearing from the worker is disabled.
17751823 *
1776- * @param port Port to use
1777- * @param sleep The amount of time to wait for the worker startup. in Milliseconds
1824+ * @param port Port to use
1825+ * @param timeoutMs Upper bound on the wait for the worker to become ready, in ms; raised to a
1826+ * minimum value enforced inside {@link FederatedWorkerUtils}.
17781827 * @return The thread associated with the worker.
17791828 */
1780- public static Thread startLocalFedWorkerThread (int port , int sleep ) {
1781- return startLocalFedWorkerThread (port , null , sleep );
1829+ public static Thread startLocalFedWorkerThread (int port , int timeoutMs ) {
1830+ return startLocalFedWorkerThread (port , null , timeoutMs );
17821831 }
17831832
17841833 /**
1785- * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.!
1786- *
1787- * Also when using the local Fed Worker thread the statistics printing, and clearing from the worker is disabled.
1788- *
1834+ * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.
1835+ *
1836+ * <p>Also when using the local Fed Worker thread the statistics printing, and clearing from the worker is
1837+ * disabled.
1838+ *
1839+ * <p>Returns once the worker's TCP port accepts connections (the worker opens the port after Netty's bind
1840+ * completes), or throws a {@link RuntimeException} after {@code timeoutMs} elapses.
1841+ *
17891842 * @param port Port to use
17901843 * @param otherArgs The command line arguments to start the worker with
1791- * @param sleep The amount of time to wait for the worker startup. in Milliseconds
1844+ * @param timeoutMs Upper bound on the wait for the worker to become ready, in ms; raised to a
1845+ * minimum value enforced inside {@link FederatedWorkerUtils}.
17921846 * @return The thread associated with the worker.
17931847 */
1794- public static Thread startLocalFedWorkerThread (int port , String [] otherArgs , int sleep ) {
1848+ public static Thread startLocalFedWorkerThread (int port , String [] otherArgs , int timeoutMs ) {
1849+ Thread t = spawnLocalFedWorkerThread (port , otherArgs );
1850+ FederatedWorkerUtils .waitForWorker (t , port , timeoutMs );
1851+ return t ;
1852+ }
17951853
1854+ /**
1855+ * Start N federated worker threads in the same JVM back to back, then wait for all of them to
1856+ * become ready in one shared poll loop. The wall-clock wait scales with the slowest worker rather
1857+ * than the sum of the per-worker waits.
1858+ *
1859+ * @param ports Ports to use, one per worker
1860+ * @return The thread per port, in the same order as {@code ports}.
1861+ */
1862+ public static Thread [] startLocalFedWorkerThreads (int [] ports ) {
1863+ return startLocalFedWorkerThreads (ports , null , FED_WORKER_WAIT );
1864+ }
1865+
1866+ /** @see #startLocalFedWorkerThreads(int[], String[], int) */
1867+ public static Thread [] startLocalFedWorkerThreads (int [] ports , String [] otherArgs ) {
1868+ return startLocalFedWorkerThreads (ports , otherArgs , FED_WORKER_WAIT );
1869+ }
1870+
1871+ /**
1872+ * Start N federated worker threads in the same JVM back to back, then wait for all of them to
1873+ * become ready in one shared poll loop.
1874+ *
1875+ * @param ports Ports to use, one per worker
1876+ * @param otherArgs Extra worker CLI args (applied to every worker), or null
1877+ * @param timeoutMs Upper bound on the wait, in ms; raised to a minimum value enforced inside
1878+ * {@link FederatedWorkerUtils}.
1879+ * @return The thread per port, in the same order as {@code ports}.
1880+ */
1881+ public static Thread [] startLocalFedWorkerThreads (int [] ports , String [] otherArgs , int timeoutMs ) {
1882+ Thread [] threads = new Thread [ports .length ];
1883+ for (int i = 0 ; i < ports .length ; i ++) {
1884+ threads [i ] = spawnLocalFedWorkerThread (ports [i ], otherArgs );
1885+ // Sleep THREAD_SPAWN_STAGGER_MS between in-JVM thread spawns to reduce contention on
1886+ // shared static initialization in DMLScript / FederatedWorker (e.g. LineageCacheConfig
1887+ // setters) when multiple worker threads enter main() concurrently.
1888+ if (i + 1 < ports .length ) {
1889+ try {
1890+ java .util .concurrent .TimeUnit .MILLISECONDS .sleep (THREAD_SPAWN_STAGGER_MS );
1891+ }
1892+ catch (InterruptedException e ) {
1893+ Thread .currentThread ().interrupt ();
1894+ throw new RuntimeException ("Interrupted while spawning federated worker threads" , e );
1895+ }
1896+ }
1897+ }
1898+ FederatedWorkerUtils .waitForWorkers (threads , ports , timeoutMs );
1899+ return threads ;
1900+ }
1901+
1902+ private static final int THREAD_SPAWN_STAGGER_MS = 25 ;
1903+
1904+ /** Spawn a federated worker thread in this JVM and return without waiting for the port to bind. */
1905+ private static Thread spawnLocalFedWorkerThread (int port , String [] otherArgs ) {
17961906 ArrayList <String > args = new ArrayList <>();
1797-
17981907 args .add ("-w" );
17991908 args .add (Integer .toString (port ));
1800-
18011909 if (otherArgs != null )
1802- for ( String s : otherArgs )
1910+ for (String s : otherArgs )
18031911 args .add (s );
18041912
18051913 String [] finalArguments = args .toArray (new String [args .size ()]);
18061914 Statistics .allowWorkerStatistics = false ;
18071915
1808- try {
1809- Thread t = new Thread (() -> {
1810- try {
1811- main (finalArguments );
1812- }
1813- catch (Exception e ) {
1814- LOG .error ("Exception in startup of federated worker" , e );
1815- }
1816- });
1817- t .start ();
1818- java .util .concurrent .TimeUnit .MILLISECONDS .sleep (sleep );
1819- if (!t .isAlive ())
1820- throw new RuntimeException ("Failed starting federated worker" );
1821- return t ;
1822- }
1823- catch (InterruptedException e ) {
1824- e .printStackTrace ();
1825- fail ("Failed to start federated worker : " + e );
1826- // should never happen
1827- return null ;
1828- }
1916+ Thread t = new Thread (() -> {
1917+ try {
1918+ main (finalArguments );
1919+ }
1920+ catch (Exception e ) {
1921+ LOG .error ("Exception in startup of federated worker" , e );
1922+ }
1923+ });
1924+ t .start ();
1925+ return t ;
18291926 }
18301927
18311928 public static boolean isAlive (Thread ... threads ){
@@ -1846,28 +1943,43 @@ public static boolean isAlive(Process... processes) {
18461943
18471944 /**
18481945 * Start java worker in same JVM.
1849- *
1946+ *
1947+ * <p>Returns once the worker's TCP port accepts connections (the worker opens the port after
1948+ * Netty's bind completes), or throws a {@link RuntimeException} after the default federated worker
1949+ * timeout elapses. The port is extracted from {@code args}, which must contain {@code "-w" <port>}.
1950+ *
18501951 * @param args the command line arguments
1851- * @return the thread associated with the process.s
1952+ * @return the thread associated with the worker.
18521953 */
18531954 public static Thread startLocalFedWorkerWithArgs (String [] args ) {
1854- Thread t = null ;
1955+ final int port = extractWorkerPort (args );
1956+ Thread t = new Thread (() -> {
1957+ try {
1958+ main (args );
1959+ }
1960+ catch (IOException e ) {
1961+ LOG .error ("Exception in startup of federated worker on port " + port , e );
1962+ }
1963+ });
1964+ t .start ();
1965+ FederatedWorkerUtils .waitForWorker (t , port , FED_WORKER_WAIT );
1966+ return t ;
1967+ }
18551968
1856- try {
1857- t = new Thread (() -> {
1969+ private static int extractWorkerPort (String [] args ) {
1970+ for (int i = 0 ; i < args .length - 1 ; i ++) {
1971+ if ("-w" .equals (args [i ])) {
18581972 try {
1859- main (args );
1973+ return Integer . parseInt (args [ i + 1 ] );
18601974 }
1861- catch (IOException e ) {
1975+ catch (NumberFormatException e ) {
1976+ throw new IllegalArgumentException (
1977+ "Federated worker args contain non-numeric port after -w: " + args [i + 1 ], e );
18621978 }
1863- });
1864- t .start ();
1865- java .util .concurrent .TimeUnit .MILLISECONDS .sleep (FED_WORKER_WAIT );
1866- }
1867- catch (InterruptedException e ) {
1868- // Should happen at closing of the worker so don't print
1979+ }
18691980 }
1870- return t ;
1981+ throw new IllegalArgumentException ("Federated worker args must contain '-w <port>': "
1982+ + Arrays .toString (args ));
18711983 }
18721984
18731985 private boolean rCompareException (boolean exceptionExpected , String errMessage , Throwable e , boolean result ) {
0 commit comments