Skip to content

Commit 88c26e2

Browse files
authored
[SYSTEMDS-2651] Poll for async compression in federated component tests (#2472)
FedWorkerReadMatrixCompress.verifyRead failed roughly once per ten component-test CI runs because it called FederatedTestUtils.wait(1000) to give the worker time to finish its async compression (kicked off by CompressedMatrixBlockFactory.compressAsync), then asserted that the returned block was a CompressedMatrixBlock. On a contended runner the 1 s sleep was not enough, the subsequent read returned the still- uncompressed block, and the assertion failed. Surefire's rerunFailingTestsCount=2 hid this as a "Flake" rather than a job failure. Add FedWorkerBase.awaitCompressed(long id), which polls getMatrixBlock at 25 ms intervals for up to COMPRESS_TIMEOUT_MS (10 s) and returns as soon as the worker reports the compressed form, or returns the last- observed block on timeout so the caller's assertion still produces a meaningful failure. Convert the three call sites that used the fixed-sleep anti-pattern: - FedWorkerReadMatrixCompress.verifyRead (the actual CI flake) - FedWorkerMatrixCompress.verifySameOrAlsoCompressedAsLocalCompress (polls only when local compresses, so the "do not compress" parametrization stays fast) - FedWorkerMatrixMultiplyWorkload.verifySameOrAlsoCompressedAsLocalCompress Remove the now-unused FederatedTestUtils.wait helper so the anti-pattern is harder to reintroduce.
1 parent 34a19f0 commit 88c26e2

5 files changed

Lines changed: 54 additions & 26 deletions

File tree

src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@
2626

2727
import org.apache.commons.logging.Log;
2828
import org.apache.commons.logging.LogFactory;
29+
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
2930
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3031
import org.apache.sysds.test.AutomatedTestBase;
3132

3233
public abstract class FedWorkerBase {
3334
protected static final Log LOG = LogFactory.getLog(FedWorkerBase.class.getName());
3435

36+
/** Upper bound (ms) for {@link #awaitCompressed(long)} polling against async worker-side compression. */
37+
protected static final int COMPRESS_TIMEOUT_MS = 10_000;
38+
39+
/** Poll interval used by {@link #awaitCompressed(long)} between successive reads. */
40+
private static final int COMPRESS_POLL_INTERVAL_MS = 25;
41+
3542
private final InetSocketAddress addr;
3643
public final int port;
3744

@@ -70,6 +77,38 @@ public MatrixBlock getMatrixBlock(long id) {
7077
return FederatedTestUtils.getMatrixBlock(id, addr);
7178
}
7279

80+
/**
81+
* Poll the federated worker until the matrix at {@code id} is observed as a
82+
* {@link CompressedMatrixBlock}, or {@link #COMPRESS_TIMEOUT_MS} elapses.
83+
*
84+
* <p>Federated workers compress asynchronously after a PUT/READ_VAR (see
85+
* {@code CompressedMatrixBlockFactory.compressAsync}), so a {@code getMatrixBlock} fired right
86+
* after the operation can race against the in-flight compression and return the uncompressed
87+
* block. Tests that need to observe the compressed form should poll instead of sleeping a fixed
88+
* amount.
89+
*
90+
* <p>On timeout this returns the most recent (uncompressed) read so the caller can produce a
91+
* meaningful assertion failure naming the variable.
92+
*
93+
* @param id federated variable id
94+
* @return the matrix block, compressed if compression finished in time, otherwise the latest read
95+
*/
96+
public MatrixBlock awaitCompressed(long id) {
97+
final long deadline = System.currentTimeMillis() + COMPRESS_TIMEOUT_MS;
98+
MatrixBlock mb = getMatrixBlock(id);
99+
while(!(mb instanceof CompressedMatrixBlock) && System.currentTimeMillis() < deadline) {
100+
try {
101+
Thread.sleep(COMPRESS_POLL_INTERVAL_MS);
102+
}
103+
catch(InterruptedException ie) {
104+
Thread.currentThread().interrupt();
105+
fail("Interrupted while waiting for federated compression of id=" + id);
106+
}
107+
mb = getMatrixBlock(id);
108+
}
109+
return mb;
110+
}
111+
73112
public long matrixMult(long idLeft, long idRight) {
74113
return FederatedTestUtils.exec_MM(idLeft, idRight, addr);
75114
}

src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ public void verifySameOrAlsoCompressedAsLocalCompress() {
6565
// local
6666
final MatrixBlock mbcLocal = CompressedMatrixBlockFactory.compress(mb).getLeft();
6767

68-
// federated
68+
// federated. Compression on the worker is async; poll only when we expect compression to
69+
// match the local result, otherwise a single read is enough.
6970
final long id = putMatrixBlock(mb);
70-
// give the federated site time to compress async.
71-
FederatedTestUtils.wait(1000);
72-
final MatrixBlock mbr = getMatrixBlock(id);
71+
final MatrixBlock mbr = (mbcLocal instanceof CompressedMatrixBlock)
72+
? awaitCompressed(id)
73+
: getMatrixBlock(id);
7374

7475
if(mbcLocal instanceof CompressedMatrixBlock && !(mbr instanceof CompressedMatrixBlock))
75-
fail("Invalid result, the federated site did not compress the matrix block");
76+
fail("Invalid result, the federated site did not compress the matrix block within "
77+
+ COMPRESS_TIMEOUT_MS + "ms");
7678

7779
TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr, 0, 0,
7880
"Not equivalent matrix block returned from federated site");

src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,16 @@ public void verifySameOrAlsoCompressedAsLocalCompress() {
8888
for(int i = 0; i < 9; i++) // chain left side compressed multiplications with idr.
8989
ide = matrixMult(ide, idr);
9090

91-
// give the federated site time to compress async (it should already be done, but just to be safe).
92-
FederatedTestUtils.wait(1000);
93-
94-
// Get back the matrix block stored behind mbr that should be compressed now.
95-
final MatrixBlock mbr_compressed = getMatrixBlock(idr);
91+
// Workload-driven compression runs async on the worker; poll instead of sleeping a fixed
92+
// amount so a slow runner doesn't observe the still-uncompressed block.
93+
final MatrixBlock mbr_compressed = awaitCompressed(idr);
9694

9795
if(!(mbr_compressed instanceof CompressedMatrixBlock))
98-
fail("Invalid result, the federated site did not compress the matrix block based on workload");
96+
fail("Invalid result, the federated site did not compress the matrix block based on workload within "
97+
+ COMPRESS_TIMEOUT_MS + "ms");
9998

10099
TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr_compressed, 0, 0,
101100
"Not equivalent matrix block returned from federated site");
102101
}
103102

104-
105-
106103
}

src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,14 @@ public FedWorkerReadMatrixCompress(int port, String path) {
6565
public void verifyRead() {
6666
MatrixBlock expected = readCSV();
6767
Long id = readMatrix(path);
68-
// give the federated site time to compress async.
69-
FederatedTestUtils.wait(1000);
70-
MatrixBlock actual = getMatrixBlock(id);
68+
// Compression happens async on the worker; poll instead of sleeping a fixed amount.
69+
MatrixBlock actual = awaitCompressed(id);
7170
if(actual instanceof CompressedMatrixBlock){
7271
TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0,
7372
"Not equivalent matrix block read from federated site");
7473
}
7574
else
76-
fail("Did not compress the matrix input");
75+
fail("Did not compress the matrix input within " + COMPRESS_TIMEOUT_MS + "ms");
7776
}
7877

7978
protected MatrixBlock readCSV() {

src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,4 @@ private static void exec(long id, String inst, InetSocketAddress addr, int timeo
190190
fail("Failed to get response from put Matrix Block");
191191
}
192192
}
193-
194-
protected static void wait(int ms) {
195-
try {
196-
Thread.sleep(ms);
197-
}
198-
catch(Exception e) {
199-
fail("Failed to wait");
200-
}
201-
}
202193
}

0 commit comments

Comments
 (0)