From ca4e71f2dcd4829269cf9b9dd817e354df9ee12f Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Tue, 26 May 2026 13:16:40 +0000 Subject: [PATCH] [SYSTEMDS-2651] Poll for async compression in federated component tests FedWorkerReadMatrixCompress.verifyRead failed roughly once per ten component-test CI runs because it called FederatedTestUtils.wait(1000) to give the worker time to finish its async compression (kicked off by CompressedMatrixBlockFactory.compressAsync), then asserted that the returned block was a CompressedMatrixBlock. On a contended runner the 1 s sleep was not enough, the subsequent read returned the still- uncompressed block, and the assertion failed. Surefire's rerunFailingTestsCount=2 hid this as a "Flake" rather than a job failure. Add FedWorkerBase.awaitCompressed(long id), which polls getMatrixBlock at 25 ms intervals for up to COMPRESS_TIMEOUT_MS (10 s) and returns as soon as the worker reports the compressed form, or returns the last- observed block on timeout so the caller's assertion still produces a meaningful failure. Convert the three call sites that used the fixed-sleep anti-pattern: - FedWorkerReadMatrixCompress.verifyRead (the actual CI flake) - FedWorkerMatrixCompress.verifySameOrAlsoCompressedAsLocalCompress (polls only when local compresses, so the "do not compress" parametrization stays fast) - FedWorkerMatrixMultiplyWorkload.verifySameOrAlsoCompressedAsLocalCompress Remove the now-unused FederatedTestUtils.wait helper so the anti-pattern is harder to reintroduce. --- .../component/federated/FedWorkerBase.java | 39 +++++++++++++++++++ .../federated/FedWorkerMatrixCompress.java | 12 +++--- .../FedWorkerMatrixMultiplyWorkload.java | 13 +++---- .../FedWorkerReadMatrixCompress.java | 7 ++-- .../federated/FederatedTestUtils.java | 9 ----- 5 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java index 1bf5d330066..2c854b4a81b 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java @@ -26,12 +26,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.test.AutomatedTestBase; public abstract class FedWorkerBase { protected static final Log LOG = LogFactory.getLog(FedWorkerBase.class.getName()); + /** Upper bound (ms) for {@link #awaitCompressed(long)} polling against async worker-side compression. */ + protected static final int COMPRESS_TIMEOUT_MS = 10_000; + + /** Poll interval used by {@link #awaitCompressed(long)} between successive reads. */ + private static final int COMPRESS_POLL_INTERVAL_MS = 25; + private final InetSocketAddress addr; public final int port; @@ -70,6 +77,38 @@ public MatrixBlock getMatrixBlock(long id) { return FederatedTestUtils.getMatrixBlock(id, addr); } + /** + * Poll the federated worker until the matrix at {@code id} is observed as a + * {@link CompressedMatrixBlock}, or {@link #COMPRESS_TIMEOUT_MS} elapses. + * + *

Federated workers compress asynchronously after a PUT/READ_VAR (see + * {@code CompressedMatrixBlockFactory.compressAsync}), so a {@code getMatrixBlock} fired right + * after the operation can race against the in-flight compression and return the uncompressed + * block. Tests that need to observe the compressed form should poll instead of sleeping a fixed + * amount. + * + *

On timeout this returns the most recent (uncompressed) read so the caller can produce a + * meaningful assertion failure naming the variable. + * + * @param id federated variable id + * @return the matrix block, compressed if compression finished in time, otherwise the latest read + */ + public MatrixBlock awaitCompressed(long id) { + final long deadline = System.currentTimeMillis() + COMPRESS_TIMEOUT_MS; + MatrixBlock mb = getMatrixBlock(id); + while(!(mb instanceof CompressedMatrixBlock) && System.currentTimeMillis() < deadline) { + try { + Thread.sleep(COMPRESS_POLL_INTERVAL_MS); + } + catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + fail("Interrupted while waiting for federated compression of id=" + id); + } + mb = getMatrixBlock(id); + } + return mb; + } + public long matrixMult(long idLeft, long idRight) { return FederatedTestUtils.exec_MM(idLeft, idRight, addr); } diff --git a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java index 29c6f94e7a3..2b5ff327ef3 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java @@ -65,14 +65,16 @@ public void verifySameOrAlsoCompressedAsLocalCompress() { // local final MatrixBlock mbcLocal = CompressedMatrixBlockFactory.compress(mb).getLeft(); - // federated + // federated. Compression on the worker is async; poll only when we expect compression to + // match the local result, otherwise a single read is enough. final long id = putMatrixBlock(mb); - // give the federated site time to compress async. - FederatedTestUtils.wait(1000); - final MatrixBlock mbr = getMatrixBlock(id); + final MatrixBlock mbr = (mbcLocal instanceof CompressedMatrixBlock) + ? awaitCompressed(id) + : getMatrixBlock(id); if(mbcLocal instanceof CompressedMatrixBlock && !(mbr instanceof CompressedMatrixBlock)) - fail("Invalid result, the federated site did not compress the matrix block"); + fail("Invalid result, the federated site did not compress the matrix block within " + + COMPRESS_TIMEOUT_MS + "ms"); TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr, 0, 0, "Not equivalent matrix block returned from federated site"); diff --git a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java index 06a193368c1..59c9a093c40 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java @@ -88,19 +88,16 @@ public void verifySameOrAlsoCompressedAsLocalCompress() { for(int i = 0; i < 9; i++) // chain left side compressed multiplications with idr. ide = matrixMult(ide, idr); - // give the federated site time to compress async (it should already be done, but just to be safe). - FederatedTestUtils.wait(1000); - - // Get back the matrix block stored behind mbr that should be compressed now. - final MatrixBlock mbr_compressed = getMatrixBlock(idr); + // Workload-driven compression runs async on the worker; poll instead of sleeping a fixed + // amount so a slow runner doesn't observe the still-uncompressed block. + final MatrixBlock mbr_compressed = awaitCompressed(idr); if(!(mbr_compressed instanceof CompressedMatrixBlock)) - fail("Invalid result, the federated site did not compress the matrix block based on workload"); + fail("Invalid result, the federated site did not compress the matrix block based on workload within " + + COMPRESS_TIMEOUT_MS + "ms"); TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr_compressed, 0, 0, "Not equivalent matrix block returned from federated site"); } - - } diff --git a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java index ed47a87e1e8..d94cd367a1d 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java @@ -65,15 +65,14 @@ public FedWorkerReadMatrixCompress(int port, String path) { public void verifyRead() { MatrixBlock expected = readCSV(); Long id = readMatrix(path); - // give the federated site time to compress async. - FederatedTestUtils.wait(1000); - MatrixBlock actual = getMatrixBlock(id); + // Compression happens async on the worker; poll instead of sleeping a fixed amount. + MatrixBlock actual = awaitCompressed(id); if(actual instanceof CompressedMatrixBlock){ TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0, "Not equivalent matrix block read from federated site"); } else - fail("Did not compress the matrix input"); + fail("Did not compress the matrix input within " + COMPRESS_TIMEOUT_MS + "ms"); } protected MatrixBlock readCSV() { diff --git a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java index 4d3796892aa..9b589c35f7d 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java @@ -190,13 +190,4 @@ private static void exec(long id, String inst, InetSocketAddress addr, int timeo fail("Failed to get response from put Matrix Block"); } } - - protected static void wait(int ms) { - try { - Thread.sleep(ms); - } - catch(Exception e) { - fail("Failed to wait"); - } - } }