From 50c342d5232e32efd04b76c1bb7937e7c502d443 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Fri, 12 Jun 2026 12:48:25 -0700 Subject: [PATCH 1/7] Enable ChaosTest + add toxiProxyTest --- build.gradle.kts | 2 +- gradle/libs.versions.toml | 1 + transact/build.gradle.kts | 1 + .../dev/dbos/transact/database/ChaosTest.java | 114 ++++++++++++++---- 4 files changed, 94 insertions(+), 24 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index edfb1ff0..8d4f22e1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -165,7 +165,7 @@ subprojects { tasks.withType { javaLauncher.set(null as JavaLauncher?) } tasks.withType { - useJUnitPlatform() + useJUnitPlatform { if (System.getenv("CI") != "true") excludeTags("ci-only") } testLogging { events("failed") showStandardStreams = true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2c178dc9..4958a1af 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -78,6 +78,7 @@ sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" } system-stubs-jupiter = { module = "uk.org.webcompere:system-stubs-jupiter", version.ref = "system-stubs" } testcontainers-cockroachdb = { module = "org.testcontainers:testcontainers-cockroachdb", version.ref = "testcontainers" } testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" } +testcontainers-toxiproxy = { module = "org.testcontainers:testcontainers-toxiproxy", version.ref = "testcontainers" } [plugins] kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } diff --git a/transact/build.gradle.kts b/transact/build.gradle.kts index 0d756fa2..49744825 100644 --- a/transact/build.gradle.kts +++ b/transact/build.gradle.kts @@ -32,6 +32,7 @@ dependencies { testImplementation(libs.maven.artifact) testImplementation(libs.testcontainers.cockroachdb) testImplementation(libs.testcontainers.postgresql) + testImplementation(libs.testcontainers.toxiproxy) } val projectVersion = project.version.toString() diff --git a/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java b/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java index c9799da8..da2e8e14 100644 --- a/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java @@ -4,18 +4,27 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.migrations.MigrationManager; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; import dev.dbos.transact.workflow.Workflow; import java.sql.SQLException; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import javax.sql.DataSource; -import org.junit.jupiter.api.AutoClose; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import eu.rekawek.toxiproxy.model.ToxicDirection; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.toxiproxy.ToxiproxyContainer; interface ChaosService { @@ -103,40 +112,99 @@ WHERE pid <> pg_backend_pid() } } -// TODO: finish this test, run it many times but only in cloud -// Tracking issue: https://github.com/dbos-inc/dbos-transact-java/issues/319 +@Tag("ci-only") public class ChaosTest { private static final Logger logger = LoggerFactory.getLogger(ChaosTest.class); + private static final int REPETITIONS = 5; - @AutoClose final PgContainer pgContainer = new PgContainer(); - - // @Test + @RepeatedTest(REPETITIONS) public void chaosTest() throws Exception { - var dbosConfig = pgContainer.dbosConfig(); - try (var dataSource = pgContainer.dataSource(); - var dbos = new DBOS(dbosConfig)) { + try (var pgContainer = new PgContainer()) { + var dbosConfig = pgContainer.dbosConfig(); + try (var dataSource = pgContainer.dataSource(); + var dbos = new DBOS(dbosConfig)) { + + var impl = new ChaosServiceImpl(dbos, dataSource); + var proxy = dbos.registerProxy(ChaosService.class, impl); + impl.setSelf(proxy); - var impl = new ChaosServiceImpl(dbos, dataSource); - var proxy = dbos.registerProxy(ChaosService.class, impl); - impl.setSelf(proxy); + dbos.launch(); + DBOSTestAccess.getSystemDatabase(dbos).speedUpPollingForTest(); - dbos.launch(); - DBOSTestAccess.getSystemDatabase(dbos).speedUpPollingForTest(); + assertEquals("Hehehe", proxy.dbLossBetweenSteps()); - assertEquals("Hehehe", proxy.dbLossBetweenSteps()); + assertEquals("Hehehe", proxy.runChildWf()); - assertEquals("Hehehe", proxy.runChildWf()); + var h1 = dbos.startWorkflow(() -> proxy.wfPart1()); + var h2 = dbos.startWorkflow(() -> proxy.wfPart2(h1.workflowId())); - var h1 = dbos.startWorkflow(() -> proxy.wfPart1()); - var h2 = dbos.startWorkflow(() -> proxy.wfPart2(h1.workflowId())); + if (!"Part1hello1".equals(h1.getResult()) || !"Part2v1".equals(h2.getResult())) { + logWorkflowDetails(dataSource, "Part 1", h1.workflowId()); + logWorkflowDetails(dataSource, "Part 2", h2.workflowId()); + } - if (!"Part1hello1".equals(h1.getResult()) || !"Part2v1".equals(h2.getResult())) { - logWorkflowDetails(dataSource, "Part 1", h1.workflowId()); - logWorkflowDetails(dataSource, "Part 2", h2.workflowId()); + assertEquals("Part1hello1", h1.getResult()); + assertEquals("Part2v1", h2.getResult()); } + } + } - assertEquals("Part1hello1", h1.getResult()); - assertEquals("Part2v1", h2.getResult()); + @RepeatedTest(REPETITIONS) + public void toxiProxyTest() throws Exception { + try (var network = Network.newNetwork(); + var pg = PgContainer.getPG()) { + + pg.withNetwork(network).withNetworkAliases("postgres"); + pg.start(); + + var directUrl = pg.getJdbcUrl().replaceFirst("/[^/]+$", "/dbos_test_db"); + MigrationManager.runMigrations(directUrl, pg.getUsername(), pg.getPassword(), "dbos", true); + + try (var toxiContainer = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.9.0")) { + toxiContainer.withNetwork(network); + toxiContainer.start(); + + var toxiClient = + new ToxiproxyClient(toxiContainer.getHost(), toxiContainer.getControlPort()); + var proxy = toxiClient.createProxy("pg", "0.0.0.0:8666", "postgres:5432"); + var proxiedUrl = + "jdbc:postgresql://%s:%d/dbos_test_db" + .formatted(toxiContainer.getHost(), toxiContainer.getMappedPort(8666)); + + var dbosConfig = + DBOSConfig.defaults("chaos-toxi-test") + .withDatabaseUrl(proxiedUrl) + .withDbUser(pg.getUsername()) + .withDbPassword(pg.getPassword()); + + try (var directDs = + dev.dbos.transact.database.SystemDatabase.createDataSource( + directUrl, pg.getUsername(), pg.getPassword()); + var dbos = new DBOS(dbosConfig)) { + + var impl = new ChaosServiceImpl(dbos, directDs); + var svc = dbos.registerProxy(ChaosService.class, impl); + impl.setSelf(svc); + dbos.launch(); + DBOSTestAccess.getSystemDatabase(dbos).speedUpPollingForTest(); + + // Scenario 1: proxy disabled — simulates a sustained network partition + proxy.disable(); + var wf1 = + CompletableFuture.supplyAsync( + () -> dbos.startWorkflow(() -> svc.dbLossBetweenSteps())); + Thread.sleep(3000); + proxy.enable(); + assertEquals("Hehehe", wf1.get(10, TimeUnit.SECONDS).getResult()); + + // Scenario 2: TCP RST mid-execution — simulates a flapping connection + var wf2 = dbos.startWorkflow(() -> svc.dbLossBetweenSteps()); + proxy.toxics().resetPeer("reset", ToxicDirection.DOWNSTREAM, 0); + Thread.sleep(3000); + proxy.toxics().get("reset").remove(); + assertEquals("Hehehe", wf2.getResult()); + } + } } } From b53efddfe8f603be0b1739b9a99a9861a4dbd427 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Fri, 12 Jun 2026 12:51:26 -0700 Subject: [PATCH 2/7] @Tag for scale test --- .../test/java/dev/dbos/transact/execution/ScaleTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java b/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java index bcbbf4eb..a111df23 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/ScaleTest.java @@ -14,8 +14,9 @@ import org.junit.jupiter.api.AutoClose; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,8 @@ public String workflow(String input) { } } -@org.junit.jupiter.api.Timeout(value = 5, unit = java.util.concurrent.TimeUnit.MINUTES) +@Tag("ci-only") +@Timeout(value = 5, unit = java.util.concurrent.TimeUnit.MINUTES) public class ScaleTest { private static final Logger logger = LoggerFactory.getLogger(ScaleTest.class); @@ -51,7 +53,6 @@ void setUp() { } @Test - @EnabledIfEnvironmentVariable(named = "SCALE_TEST", matches = "^true$") public void scaleTest() throws Exception { try (var dbos = new DBOS(dbosConfig)) { var service = dbos.registerProxy(ScaleService.class, new ScaleServiceImpl()); From 26cfe9e3b3c04371510b34d99dd6759d58eb9f7b Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 15 Jun 2026 12:16:54 -0700 Subject: [PATCH 3/7] update retry logic --- .../transact/database/SystemDatabase.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index fd46ab8c..b0dbb704 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -41,6 +41,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.sql.DataSource; @@ -253,14 +254,14 @@ private static boolean isConnectionFailure(SQLException e) { private static boolean isTransientState(SQLException e) { String state = e.getSQLState(); - return state != null && (state.startsWith("40") || state.equals("53300")); + return state != null && (state.startsWith("40") || state.startsWith("53")); } - private static void waitForRecovery(int attempt, long baseDelay) { + private static void sleepWithJitter(double baseMs) { + double jitter = 0.5 + ThreadLocalRandom.current().nextDouble(); // [0.5, 1.5) + long sleepMs = (long) (baseMs * jitter); try { - // Exponential backoff: 1x, 2x, 4x the base delay - long sleepTime = (long) (baseDelay * Math.pow(2, attempt - 1)); - Thread.sleep(sleepTime); + Thread.sleep(sleepMs); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } @@ -285,7 +286,8 @@ interface SqlSupplier { } private T dbRetry(SqlSupplier supplier) { - final int MAX_RETRIES = 20; + double backoffMs = 1000.0; + final double maxBackoffMs = 60_000.0; int attempt = 0; while (true) { if (closed.get()) { @@ -294,22 +296,20 @@ private T dbRetry(SqlSupplier supplier) { try { return supplier.get(); } catch (SQLException e) { - if (++attempt > MAX_RETRIES) { - String msg = "Database operation failed after %d attempts".formatted(attempt); - throw new RuntimeException(msg, e); - } + attempt++; if (e instanceof SQLRecoverableException || isConnectionFailure(e)) { - logger.warn("Recoverable connection error. Resetting client pool.", e); + logger.warn( + "Recoverable connection error (attempt {}), resetting client pool", attempt, e); if (ctx.dataSource() instanceof HikariDataSource hikariDataSource) { hikariDataSource.getHikariPoolMXBean().softEvictConnections(); } - waitForRecovery(attempt, 2000); } else if (e instanceof SQLTransientException || isTransientState(e)) { - logger.warn("Transient DB error. Retrying command.", e); - waitForRecovery(attempt, 500); + logger.warn("Transient DB error (attempt {}), retrying", attempt, e); } else { throw new RuntimeException(e); } + sleepWithJitter(backoffMs); + backoffMs = Math.min(backoffMs * 2, maxBackoffMs); } } } From 7de82e5f14d020e7e9747e6875b899cf7d0a424a Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 15 Jun 2026 15:14:28 -0700 Subject: [PATCH 4/7] reliability improvements --- .../transact/database/SystemDatabase.java | 20 ++++++++++++++++++- .../dev/dbos/transact/database/ChaosTest.java | 3 +++ .../dbos/transact/queue/StaticQueuesTest.java | 3 ++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 7c0ba8b8..7d218a8e 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -227,7 +227,25 @@ public void start() { private static boolean isConnectionFailure(SQLException e) { String state = e.getSQLState(); - return state != null && (state.startsWith("08") || state.startsWith("57")); + if (state != null && (state.startsWith("08") || state.startsWith("57"))) { + return true; + } + // HikariCP and JDBC throw connection errors without a SQLSTATE (e.g. "Connection is closed"). + // Walk the cause chain so wrapped exceptions are also caught. + for (Throwable t = e; t != null; t = t.getCause()) { + String msg = t.getMessage(); + if (msg != null) { + String lower = msg.toLowerCase(); + if (lower.contains("connection is closed") + || lower.contains("connection is not available") + || lower.contains("connection reset") + || lower.contains("broken pipe") + || lower.contains("socket closed")) { + return true; + } + } + } + return false; } private static boolean isTransientState(SQLException e) { diff --git a/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java b/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java index 109ae2bf..467e9651 100644 --- a/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/ChaosTest.java @@ -1,6 +1,7 @@ package dev.dbos.transact.database; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import dev.dbos.transact.DBOS; import dev.dbos.transact.config.DBOSConfig; @@ -118,6 +119,8 @@ public class ChaosTest { @RepeatedTest(REPETITIONS) public void chaosTest() throws Exception { + assumeFalse( + PgContainer.USE_COCKROACH_DB, "pg_terminate_backend() not supported on CockroachDB"); try (var pgContainer = new PgContainer()) { var dbosConfig = pgContainer.dbosConfig(); try (var dataSource = pgContainer.dataSource(); diff --git a/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java index fd6dc4be..8bb6586b 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java @@ -377,7 +377,8 @@ public void testLimiter() throws Exception { times.add(result); } - double waveTolerance = 1.0; + // CockroachDB's slower transaction throughput can spread tasks across a wider window + double waveTolerance = PgContainer.USE_COCKROACH_DB ? 3.0 : 1.0; for (int wave = 0; wave < numWaves; wave++) { for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) { double diff = times.get(i + 1) - times.get(i); From c7a0c6ddf843f54ac5ed259557cce4a3903f3c33 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 15 Jun 2026 16:01:25 -0700 Subject: [PATCH 5/7] update timeout for timeout tests --- .../test/java/dev/dbos/transact/workflow/TimeoutTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java b/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java index 9f1ea3cf..0f902671 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/TimeoutTest.java @@ -54,7 +54,7 @@ public void async() throws Exception { String wfid1 = "wf-124"; String result; - var options = new StartWorkflowOptions(wfid1).withTimeout(3, TimeUnit.SECONDS); + var options = new StartWorkflowOptions(wfid1).withTimeout(15, TimeUnit.SECONDS); var handle = dbos.startWorkflow(() -> simpleService.longWorkflow("12345"), options); result = handle.getResult(); assertEquals("1234512345", result); @@ -130,7 +130,7 @@ public void queued() throws Exception { String result; var options = - new StartWorkflowOptions(wfid1).withQueue(simpleQ).withTimeout(3, TimeUnit.SECONDS); + new StartWorkflowOptions(wfid1).withQueue(simpleQ).withTimeout(15, TimeUnit.SECONDS); WorkflowHandle handle = dbos.startWorkflow(() -> simpleService.longWorkflow("12345"), options); @@ -187,7 +187,7 @@ public void sync() throws Exception { String wfid1 = "wf-128"; String result; - WorkflowOptions options = new WorkflowOptions(wfid1).withTimeout(3, TimeUnit.SECONDS); + WorkflowOptions options = new WorkflowOptions(wfid1).withTimeout(15, TimeUnit.SECONDS); try (var id = options.setContext()) { result = simpleService.longWorkflow("12345"); @@ -244,7 +244,7 @@ public void recovery() throws Exception { String wfid1 = "wf-128"; - WorkflowOptions options = new WorkflowOptions(wfid1).withTimeout(3, TimeUnit.SECONDS); + WorkflowOptions options = new WorkflowOptions(wfid1).withTimeout(15, TimeUnit.SECONDS); try (var id = options.setContext()) { simpleService.workWithString("12345"); From 879145a2bcca9419e9379a4d7edce20cb9c66cfa Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 15 Jun 2026 16:21:16 -0700 Subject: [PATCH 6/7] moar test timeout fixes --- .../java/dev/dbos/transact/database/SignalMapTest.java | 8 ++++---- .../transact/notifications/NotificationServiceTest.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java b/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java index 1a94d7c4..dcde5f38 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java @@ -220,7 +220,7 @@ void testMultipleSubscribersInSeparateThreads() throws Exception { CompletableFuture.runAsync( () -> { try { - done1.complete(map.subscribe(FOO).get(500, TimeUnit.MILLISECONDS)); + done1.complete(map.subscribe(FOO).get(5, TimeUnit.SECONDS)); } catch (Exception e) { done1.completeExceptionally(e); } @@ -228,17 +228,17 @@ void testMultipleSubscribersInSeparateThreads() throws Exception { CompletableFuture.runAsync( () -> { try { - done2.complete(map.subscribe(FOO).get(500, TimeUnit.MILLISECONDS)); + done2.complete(map.subscribe(FOO).get(5, TimeUnit.SECONDS)); } catch (Exception e) { done2.completeExceptionally(e); } }); - Thread.sleep(100); + Thread.sleep(500); map.raiseSignal(FOO); assertTimeoutPreemptively( - Duration.ofSeconds(1), + Duration.ofSeconds(10), () -> { done1.get(); done2.get(); diff --git a/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java b/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java index 5018ca3a..941a0c06 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java @@ -355,7 +355,7 @@ public void timeout() { assertNull(rv); long elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed < 3000, "Call should return in under 3 seconds"); + assertTrue(elapsed < 6000, "Call should return in under 6 seconds"); } @Test From 446a4b229e9abac0c93e46b807a06448eace69c2 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 17 Jun 2026 10:50:21 -0700 Subject: [PATCH 7/7] improve tests --- .../dbos/transact/database/SignalMapTest.java | 41 +++++++++++++------ .../transact/notifications/EventsTest.java | 2 +- .../dbos/transact/queue/StaticQueuesTest.java | 3 +- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java b/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java index dcde5f38..ab822a2e 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SignalMapTest.java @@ -12,6 +12,8 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; @@ -216,11 +218,14 @@ void testSignalFromMainAfterBackgroundSubscribes() throws Exception { void testMultipleSubscribersInSeparateThreads() throws Exception { var done1 = new CompletableFuture(); var done2 = new CompletableFuture(); + var subscribed = new CountDownLatch(2); CompletableFuture.runAsync( () -> { try { - done1.complete(map.subscribe(FOO).get(5, TimeUnit.SECONDS)); + var sub = map.subscribe(FOO); + subscribed.countDown(); + done1.complete(sub.get(5, TimeUnit.SECONDS)); } catch (Exception e) { done1.completeExceptionally(e); } @@ -228,13 +233,17 @@ void testMultipleSubscribersInSeparateThreads() throws Exception { CompletableFuture.runAsync( () -> { try { - done2.complete(map.subscribe(FOO).get(5, TimeUnit.SECONDS)); + var sub = map.subscribe(FOO); + subscribed.countDown(); + done2.complete(sub.get(5, TimeUnit.SECONDS)); } catch (Exception e) { done2.completeExceptionally(e); } }); - Thread.sleep(500); + // Ensure both subscriptions are registered before signalling; the signal is one-shot, + // so a subscriber that registers after raiseSignal would miss it and time out. + assertTrue(subscribed.await(5, TimeUnit.SECONDS), "subscribers did not register in time"); map.raiseSignal(FOO); assertTimeoutPreemptively( @@ -247,16 +256,22 @@ void testMultipleSubscribersInSeparateThreads() throws Exception { @Test void testConcurrentSignalAndSubscribe() throws Exception { - assertTimeoutPreemptively( - Duration.ofSeconds(5), - () -> { - for (int i = 0; i < 1000; i++) { - var m = new SignalMap(); - var sub = m.subscribe(KEY); - CompletableFuture.runAsync(() -> m.raiseSignal(KEY)); - sub.get(1, TimeUnit.SECONDS); - } - }); + // Use a dedicated executor so signal dispatch isn't starved by an overloaded common pool. + var executor = Executors.newCachedThreadPool(); + try { + assertTimeoutPreemptively( + Duration.ofSeconds(10), + () -> { + for (int i = 0; i < 1000; i++) { + var m = new SignalMap(); + var sub = m.subscribe(KEY); + CompletableFuture.runAsync(() -> m.raiseSignal(KEY), executor); + sub.get(1, TimeUnit.SECONDS); + } + }); + } finally { + executor.shutdownNow(); + } } // --- awaitAny --- diff --git a/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java b/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java index c5741910..bde2ec2a 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java @@ -377,7 +377,7 @@ public void set_twice() throws Exception { @Test public void notification() throws Exception { dbos.startWorkflow( - () -> proxy.getWithlatch("id1", "key1", Duration.ofSeconds(5)), + () -> proxy.getWithlatch("id1", "key1", Duration.ofSeconds(20)), new StartWorkflowOptions("id2")); dbos.startWorkflow(() -> proxy.setWithLatch("key1", "value1"), new StartWorkflowOptions("id1")); diff --git a/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java index 8bb6586b..c403a7ca 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java @@ -392,7 +392,8 @@ public void testLimiter() throws Exception { } logger.info("Verified intra-wave timing."); - double periodTolerance = 0.5; + // CockroachDB's slower transaction throughput can widen the inter-wave gap as well + double periodTolerance = PgContainer.USE_COCKROACH_DB ? 1.0 : 0.5; for (int wave = 0; wave < numWaves - 1; wave++) { double startOfNextWave = times.get(limit * (wave + 1)); double startOfCurrentWave = times.get(limit * wave);