Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ subprojects {
tasks.withType<Test> { javaLauncher.set(null as JavaLauncher?) }

tasks.withType<Test> {
useJUnitPlatform()
useJUnitPlatform { if (System.getenv("CI") != "true") excludeTags("ci-only") }
testLogging {
events("failed")
showStandardStreams = true
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
system-stubs-jupiter = { module = "uk.org.webcompere:system-stubs-jupiter", version.ref = "system-stubs" }
testcontainers-cockroachdb = { module = "org.testcontainers:testcontainers-cockroachdb", version.ref = "testcontainers" }
testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }
testcontainers-toxiproxy = { module = "org.testcontainers:testcontainers-toxiproxy", version.ref = "testcontainers" }

[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand Down
1 change: 1 addition & 0 deletions transact/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.testcontainers.cockroachdb)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.testcontainers.toxiproxy)
}

val projectVersion = project.version.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand Down Expand Up @@ -226,19 +227,37 @@ public void start() {

private static boolean isConnectionFailure(SQLException e) {
String state = e.getSQLState();
return state != null && (state.startsWith("08") || state.startsWith("57"));
if (state != null && (state.startsWith("08") || state.startsWith("57"))) {
return true;
}
// HikariCP and JDBC throw connection errors without a SQLSTATE (e.g. "Connection is closed").
// Walk the cause chain so wrapped exceptions are also caught.
for (Throwable t = e; t != null; t = t.getCause()) {
String msg = t.getMessage();
if (msg != null) {
String lower = msg.toLowerCase();
if (lower.contains("connection is closed")
|| lower.contains("connection is not available")
|| lower.contains("connection reset")
|| lower.contains("broken pipe")
|| lower.contains("socket closed")) {
return true;
}
}
}
return false;
}

private static boolean isTransientState(SQLException e) {
String state = e.getSQLState();
return state != null && (state.startsWith("40") || state.equals("53300"));
return state != null && (state.startsWith("40") || state.startsWith("53"));
}

private static void waitForRecovery(int attempt, long baseDelay) {
private static void sleepWithJitter(double baseMs) {
double jitter = 0.5 + ThreadLocalRandom.current().nextDouble(); // [0.5, 1.5)
long sleepMs = (long) (baseMs * jitter);
try {
// Exponential backoff: 1x, 2x, 4x the base delay
long sleepTime = (long) (baseDelay * Math.pow(2, attempt - 1));
Thread.sleep(sleepTime);
Thread.sleep(sleepMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
Expand All @@ -263,7 +282,8 @@ interface SqlSupplier<T> {
}

private <T> T dbRetry(SqlSupplier<T> supplier) {
final int MAX_RETRIES = 20;
double backoffMs = 1000.0;
final double maxBackoffMs = 60_000.0;
int attempt = 0;
while (true) {
if (closed.get()) {
Expand All @@ -272,22 +292,20 @@ private <T> T dbRetry(SqlSupplier<T> supplier) {
try {
return supplier.get();
} catch (SQLException e) {
if (++attempt > MAX_RETRIES) {
String msg = "Database operation failed after %d attempts".formatted(attempt);
throw new RuntimeException(msg, e);
}
attempt++;
if (e instanceof SQLRecoverableException || isConnectionFailure(e)) {
logger.warn("Recoverable connection error. Resetting client pool.", e);
logger.warn(
"Recoverable connection error (attempt {}), resetting client pool", attempt, e);
if (ctx.dataSource() instanceof HikariDataSource hikariDataSource) {
hikariDataSource.getHikariPoolMXBean().softEvictConnections();
}
waitForRecovery(attempt, 2000);
} else if (e instanceof SQLTransientException || isTransientState(e)) {
logger.warn("Transient DB error. Retrying command.", e);
waitForRecovery(attempt, 500);
logger.warn("Transient DB error (attempt {}), retrying", attempt, e);
} else {
throw new RuntimeException(e);
}
sleepWithJitter(backoffMs);
backoffMs = Math.min(backoffMs * 2, maxBackoffMs);
}
}
}
Expand Down
114 changes: 92 additions & 22 deletions transact/src/test/java/dev/dbos/transact/database/ChaosTest.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package dev.dbos.transact.database;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

import dev.dbos.transact.DBOS;
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.migrations.MigrationManager;
import dev.dbos.transact.utils.DBUtils;
import dev.dbos.transact.utils.PgContainer;
import dev.dbos.transact.workflow.Workflow;

import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.junit.jupiter.api.AutoClose;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.toxiproxy.ToxiproxyContainer;

interface ChaosService {

Expand Down Expand Up @@ -102,39 +112,99 @@ WHERE pid <> pg_backend_pid()
}
}

// TODO: finish this test, run it many times but only in cloud
// Tracking issue: https://github.com/dbos-inc/dbos-transact-java/issues/319
@Tag("ci-only")
public class ChaosTest {
private static final Logger logger = LoggerFactory.getLogger(ChaosTest.class);
private static final int REPETITIONS = 5;

@AutoClose final PgContainer pgContainer = new PgContainer();

// @Test
@RepeatedTest(REPETITIONS)
public void chaosTest() throws Exception {
var dbosConfig = pgContainer.dbosConfig();
try (var dataSource = pgContainer.dataSource();
var dbos = new DBOS(dbosConfig)) {
assumeFalse(
PgContainer.USE_COCKROACH_DB, "pg_terminate_backend() not supported on CockroachDB");
try (var pgContainer = new PgContainer()) {
var dbosConfig = pgContainer.dbosConfig();
try (var dataSource = pgContainer.dataSource();
var dbos = new DBOS(dbosConfig)) {

var impl = new ChaosServiceImpl(dbos, dataSource);
var proxy = dbos.registerProxy(ChaosService.class, impl);
impl.setSelf(proxy);

var impl = new ChaosServiceImpl(dbos, dataSource);
var proxy = dbos.registerProxy(ChaosService.class, impl);
impl.setSelf(proxy);
dbos.launch();

dbos.launch();
assertEquals("Hehehe", proxy.dbLossBetweenSteps());

assertEquals("Hehehe", proxy.dbLossBetweenSteps());
assertEquals("Hehehe", proxy.runChildWf());

assertEquals("Hehehe", proxy.runChildWf());
var h1 = dbos.startWorkflow(() -> proxy.wfPart1());
var h2 = dbos.startWorkflow(() -> proxy.wfPart2(h1.workflowId()));

var h1 = dbos.startWorkflow(() -> proxy.wfPart1());
var h2 = dbos.startWorkflow(() -> proxy.wfPart2(h1.workflowId()));
if (!"Part1hello1".equals(h1.getResult()) || !"Part2v1".equals(h2.getResult())) {
logWorkflowDetails(dataSource, "Part 1", h1.workflowId());
logWorkflowDetails(dataSource, "Part 2", h2.workflowId());
}

if (!"Part1hello1".equals(h1.getResult()) || !"Part2v1".equals(h2.getResult())) {
logWorkflowDetails(dataSource, "Part 1", h1.workflowId());
logWorkflowDetails(dataSource, "Part 2", h2.workflowId());
assertEquals("Part1hello1", h1.getResult());
assertEquals("Part2v1", h2.getResult());
}
}
}

assertEquals("Part1hello1", h1.getResult());
assertEquals("Part2v1", h2.getResult());
@RepeatedTest(REPETITIONS)
public void toxiProxyTest() throws Exception {
try (var network = Network.newNetwork();
var pg = PgContainer.getPG()) {

pg.withNetwork(network).withNetworkAliases("postgres");
pg.start();

var directUrl = pg.getJdbcUrl().replaceFirst("/[^/]+$", "/dbos_test_db");
MigrationManager.runMigrations(directUrl, pg.getUsername(), pg.getPassword(), "dbos", true);

try (var toxiContainer = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.9.0")) {
toxiContainer.withNetwork(network);
toxiContainer.start();

var toxiClient =
new ToxiproxyClient(toxiContainer.getHost(), toxiContainer.getControlPort());
var proxy = toxiClient.createProxy("pg", "0.0.0.0:8666", "postgres:5432");
var proxiedUrl =
"jdbc:postgresql://%s:%d/dbos_test_db"
.formatted(toxiContainer.getHost(), toxiContainer.getMappedPort(8666));

var dbosConfig =
DBOSConfig.defaults("chaos-toxi-test")
.withDatabaseUrl(proxiedUrl)
.withDbUser(pg.getUsername())
.withDbPassword(pg.getPassword());

try (var directDs =
dev.dbos.transact.database.SystemDatabase.createDataSource(
directUrl, pg.getUsername(), pg.getPassword());
var dbos = new DBOS(dbosConfig)) {

var impl = new ChaosServiceImpl(dbos, directDs);
var svc = dbos.registerProxy(ChaosService.class, impl);
impl.setSelf(svc);
dbos.launch();

// Scenario 1: proxy disabled — simulates a sustained network partition
proxy.disable();
var wf1 =
CompletableFuture.supplyAsync(
() -> dbos.startWorkflow(() -> svc.dbLossBetweenSteps()));
Thread.sleep(3000);
proxy.enable();
assertEquals("Hehehe", wf1.get(10, TimeUnit.SECONDS).getResult());

// Scenario 2: TCP RST mid-execution — simulates a flapping connection
var wf2 = dbos.startWorkflow(() -> svc.dbLossBetweenSteps());
proxy.toxics().resetPeer("reset", ToxicDirection.DOWNSTREAM, 0);
Thread.sleep(3000);
proxy.toxics().get("reset").remove();
assertEquals("Hehehe", wf2.getResult());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -216,29 +218,36 @@ void testSignalFromMainAfterBackgroundSubscribes() throws Exception {
void testMultipleSubscribersInSeparateThreads() throws Exception {
var done1 = new CompletableFuture<Void>();
var done2 = new CompletableFuture<Void>();
var subscribed = new CountDownLatch(2);

CompletableFuture.runAsync(
() -> {
try {
done1.complete(map.subscribe(FOO).get(500, TimeUnit.MILLISECONDS));
var sub = map.subscribe(FOO);
subscribed.countDown();
done1.complete(sub.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
done1.completeExceptionally(e);
}
});
CompletableFuture.runAsync(
() -> {
try {
done2.complete(map.subscribe(FOO).get(500, TimeUnit.MILLISECONDS));
var sub = map.subscribe(FOO);
subscribed.countDown();
done2.complete(sub.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
done2.completeExceptionally(e);
}
});

Thread.sleep(100);
// Ensure both subscriptions are registered before signalling; the signal is one-shot,
// so a subscriber that registers after raiseSignal would miss it and time out.
assertTrue(subscribed.await(5, TimeUnit.SECONDS), "subscribers did not register in time");
map.raiseSignal(FOO);

assertTimeoutPreemptively(
Duration.ofSeconds(1),
Duration.ofSeconds(10),
() -> {
done1.get();
done2.get();
Expand All @@ -247,16 +256,22 @@ void testMultipleSubscribersInSeparateThreads() throws Exception {

@Test
void testConcurrentSignalAndSubscribe() throws Exception {
assertTimeoutPreemptively(
Duration.ofSeconds(5),
() -> {
for (int i = 0; i < 1000; i++) {
var m = new SignalMap();
var sub = m.subscribe(KEY);
CompletableFuture.runAsync(() -> m.raiseSignal(KEY));
sub.get(1, TimeUnit.SECONDS);
}
});
// Use a dedicated executor so signal dispatch isn't starved by an overloaded common pool.
var executor = Executors.newCachedThreadPool();
try {
assertTimeoutPreemptively(
Duration.ofSeconds(10),
() -> {
for (int i = 0; i < 1000; i++) {
var m = new SignalMap();
var sub = m.subscribe(KEY);
CompletableFuture.runAsync(() -> m.raiseSignal(KEY), executor);
sub.get(1, TimeUnit.SECONDS);
}
});
} finally {
executor.shutdownNow();
}
}

// --- awaitAny ---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

import org.junit.jupiter.api.AutoClose;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,8 @@ public String workflow(String input) {
}
}

@org.junit.jupiter.api.Timeout(value = 5, unit = java.util.concurrent.TimeUnit.MINUTES)
@Tag("ci-only")
@Timeout(value = 5, unit = java.util.concurrent.TimeUnit.MINUTES)
public class ScaleTest {
private static final Logger logger = LoggerFactory.getLogger(ScaleTest.class);

Expand All @@ -51,7 +53,6 @@ void setUp() {
}

@Test
@EnabledIfEnvironmentVariable(named = "SCALE_TEST", matches = "^true$")
public void scaleTest() throws Exception {
try (var dbos = new DBOS(dbosConfig)) {
var service = dbos.registerProxy(ScaleService.class, new ScaleServiceImpl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public void set_twice() throws Exception {
@Test
public void notification() throws Exception {
dbos.startWorkflow(
() -> proxy.getWithlatch("id1", "key1", Duration.ofSeconds(5)),
() -> proxy.getWithlatch("id1", "key1", Duration.ofSeconds(20)),
new StartWorkflowOptions("id2"));
dbos.startWorkflow(() -> proxy.setWithLatch("key1", "value1"), new StartWorkflowOptions("id1"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public void timeout() {
assertNull(rv);

long elapsed = System.currentTimeMillis() - start;
assertTrue(elapsed < 3000, "Call should return in under 3 seconds");
assertTrue(elapsed < 6000, "Call should return in under 6 seconds");
}

@Test
Expand Down
Loading