Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() {
positiveInt(),
12 * 60 * 60
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
new ConfigOption<>(
"cassandra.reconnect_base_delay",
"The base delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(100L, Long.MAX_VALUE),
1000L
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
new ConfigOption<>(
"cassandra.reconnect_max_delay",
"The maximum delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(1000L, Long.MAX_VALUE),
60_000L
);

public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
new ConfigOption<>(
"cassandra.reconnect_max_retries",
"The maximum number of retries applied at query-time when " +
"a Cassandra host is temporarily unreachable " +
"(NoHostAvailableException / OperationTimedOutException). " +
"Set to 0 to disable query-time retries.",
rangeInt(0, Integer.MAX_VALUE),
10
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
new ConfigOption<>(
"cassandra.reconnect_interval",
"The interval in milliseconds between query-time retries " +
"when a Cassandra host is temporarily unreachable. The " +
"actual wait grows with exponential backoff, capped at " +
"cassandra.reconnect_max_delay.",
rangeInt(100L, Long.MAX_VALUE),
5000L
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
Expand All @@ -34,22 +36,37 @@
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;

public class CassandraSessionPool extends BackendSessionPool {

private static final Logger LOG = Log.logger(CassandraSessionPool.class);

private static final int SECOND = 1000;
private static final String HEALTH_CHECK_CQL =
"SELECT now() FROM system.local";

private Cluster cluster;
private final String keyspace;
private int maxRetries;
private long retryInterval;
private long retryMaxDelay;

public CassandraSessionPool(HugeConfig config,
String keyspace, String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
this.maxRetries = 0;
this.retryInterval = 0L;
this.retryMaxDelay = 0L;
}

@Override
Expand Down Expand Up @@ -86,6 +103,26 @@ public synchronized void open() {

builder.withSocketOptions(socketOptions);

// Reconnection policy: let driver keep retrying nodes in background
// with exponential backoff after they go down (see issue #2740).
long reconnectBase = config.get(
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
long reconnectMax = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
E.checkArgument(reconnectMax >= reconnectBase,
"'%s' (%s) must be >= '%s' (%s)",
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
reconnectMax,
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
reconnectBase);
builder.withReconnectionPolicy(
new ExponentialReconnectionPolicy(reconnectBase, reconnectMax));
this.retryMaxDelay = reconnectMax;
this.maxRetries = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES);
this.retryInterval = config.get(
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL);

// Credential options
String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
Expand Down Expand Up @@ -161,7 +198,7 @@ public void rollback() {

@Override
public ResultSet commit() {
ResultSet rs = this.session.execute(this.batch);
ResultSet rs = this.executeWithRetry(this.batch);
// Clear batch if execute() successfully (retained if failed)
this.batch.clear();
return rs;
Expand Down Expand Up @@ -197,15 +234,59 @@ public ResultSet query(Statement statement) {
}

public ResultSet execute(Statement statement) {
return this.session.execute(statement);
return this.executeWithRetry(statement);
}

public ResultSet execute(String statement) {
return this.session.execute(statement);
return this.executeWithRetry(new SimpleStatement(statement));
}

public ResultSet execute(String statement, Object... args) {
return this.session.execute(statement, args);
return this.executeWithRetry(new SimpleStatement(statement, args));
}

/**
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Minor: Thread.sleep() in executeWithRetry blocks the calling thread

This is acceptable for low-QPS scenarios, but for high-throughput workloads with many concurrent queries hitting a downed Cassandra, all request threads will pile up sleeping here (up to 10 retries × 60s max delay = 10 minutes worst case per thread).

Consider whether the retry count and max delay defaults might be too aggressive for production. A thread blocked for minutes can cascade into thread pool exhaustion. An alternative would be to fail faster (e.g., 3 retries with shorter delays) and let the caller/user retry at a higher level.

Not a blocker — just worth considering for the default values.

* Execute a statement, retrying on transient connectivity failures
* (NoHostAvailableException / OperationTimedOutException). The driver
* itself keeps retrying connections in the background via the
* reconnection policy, so once Cassandra comes back online, a
* subsequent attempt here will succeed without restarting the server.
* See issue #2740.
*/
private ResultSet executeWithRetry(Statement statement) {
int retries = CassandraSessionPool.this.maxRetries;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Critical: BackendException argument order is wrong — lastError is passed as the first vararg instead of as the cause

The constructor signature being called here is BackendException(String message, Object... args), which means lastError is used as a format argument (%s), not as the exception cause. The original exception's stack trace is lost entirely.

The format string has two %s placeholders but the args are (lastError, retries, message) — three args. The first %s will be filled by lastError.toString() (the exception object), the second %s by retries (an int). The message string becomes a dangling unused arg.

To preserve the cause chain and format the message correctly, use the (String, Throwable, Object...) constructor:

Suggested change
int retries = CassandraSessionPool.this.maxRetries;
throw new BackendException("Failed to execute Cassandra query " +
"after %s retries: %s",
lastError, retries,
lastError == null ? "<null>" :
lastError.getMessage());

Should be:

throw new BackendException(
    "Failed to execute Cassandra query after %s retries: %s",
    lastError, retries,
    lastError == null ? "<null>" : lastError.getMessage());

Wait — looking more carefully at the available constructors:

  • BackendException(String message, Throwable cause, Object... args) — this is what you need.

The fix:

throw new BackendException(
    "Failed to execute Cassandra query after %s retries",
    lastError, retries);

This passes lastError as the cause (Throwable) and retries as the format arg. The cause's message is already accessible via getCause().getMessage(), so repeating it in the format string is unnecessary.

long interval = CassandraSessionPool.this.retryInterval;
long maxDelay = CassandraSessionPool.this.retryMaxDelay;
DriverException lastError = null;
for (int attempt = 0; attempt <= retries; attempt++) {
try {
return this.session.execute(statement);
} catch (NoHostAvailableException | OperationTimedOutException e) {
lastError = e;
if (attempt >= retries) {
break;
}
long delay = Math.min(interval * (1L << Math.min(attempt, 20)),
maxDelay > 0 ? maxDelay : interval);
LOG.warn("Cassandra temporarily unavailable ({}), " +
"retry {}/{} in {} ms",
e.getClass().getSimpleName(), attempt + 1,
retries, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BackendException("Interrupted while " +
"waiting to retry " +
"Cassandra query", ie);
}
}
}
throw new BackendException("Failed to execute Cassandra query " +
"after %s retries: %s",
lastError, retries,
lastError == null ? "<null>" :
lastError.getMessage());
}

private void tryOpen() {
Expand Down Expand Up @@ -255,6 +336,53 @@ public boolean hasChanges() {
return this.batch.size() > 0;
}

/**
* Periodic liveness probe invoked by {@link BackendSessionPool} to
* recover thread-local sessions after Cassandra has been restarted.
* Reopens the driver session if it was closed and pings the cluster
* with a lightweight query. Any failure here is swallowed so the
* caller can still issue the real query, which will drive retries via
* {@link #executeWithRetry(Statement)}.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ reconnectIfNeeded() checks this.opened but never sets it to false on failure

When the health check (HEALTH_CHECK_CQL) fails, the session stays in opened = true state. On the next call to detectSession() from BackendSessionPool, the idle-time check (now - session.updated() > interval) may skip reconnectIfNeeded() entirely because session.update() was called right after.

This means if the health check catches a failure, the session is marked as "recently updated" but is actually broken — and reconnectIfNeeded() won't be called again until the detect interval elapses.

Also: if tryOpen() fails inside reconnectIfNeeded(), the method silently returns. But this.opened was already true (the guard at the top), and this.session is now null. Any subsequent execute() call will NPE on this.session.execute(...) before executeWithRetry can even catch a DriverException.

Consider setting this.opened = false when the session is nulled out, so that tryOpen() is properly re-triggered on the next access.

@Override
public void reconnectIfNeeded() {
if (!this.opened) {
return;
}
try {
if (this.session == null || this.session.isClosed()) {
this.session = null;
this.tryOpen();
if (this.session == null) {
return;
}
}
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
} catch (DriverException e) {
LOG.debug("Cassandra health-check failed, " +
"will retry on next query: {}", e.getMessage());
}
}

/**
* Force-close the driver session so it is re-opened on the next
* {@link #opened()} call. Used when a failure is observed and we
* want to start fresh on the next attempt.
*/
@Override
public void reset() {
if (this.session == null) {
return;
}
try {
this.session.close();
} catch (Throwable e) {
LOG.warn("Failed to reset Cassandra session", e);
} finally {
this.session = null;
}
}

public Collection<Statement> statements() {
return this.batch.getStatements();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.hugegraph.unit.cassandra;

import java.util.Collections;
import java.util.Map;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.hugegraph.backend.store.cassandra.CassandraOptions;
import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool;
import org.apache.hugegraph.backend.store.cassandra.CassandraStore;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.OptionSpace;
Expand All @@ -30,7 +32,11 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -192,4 +198,99 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() {
Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config);
});
}

@Test
public void testReconnectOptionsHaveSensibleDefaults() {
// Runtime-reconnection options must exist with non-zero defaults so
// HugeGraph keeps running when Cassandra restarts (issue #2740).
Assert.assertEquals(1000L, (long) CassandraOptions
.CASSANDRA_RECONNECT_BASE_DELAY.defaultValue());
Assert.assertEquals(60_000L, (long) CassandraOptions
.CASSANDRA_RECONNECT_MAX_DELAY.defaultValue());
Assert.assertEquals(10, (int) CassandraOptions
.CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue());
Assert.assertEquals(5000L, (long) CassandraOptions
.CASSANDRA_RECONNECT_INTERVAL.defaultValue());
}

@Test
public void testReconnectOptionsAreOverridable() {
String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name();
String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name();
String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
.name();
String interval = CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name();

Configuration conf = new PropertiesConfiguration();
conf.setProperty(base, 500L);
conf.setProperty(max, 30_000L);
conf.setProperty(retries, 3);
conf.setProperty(interval, 1000L);
HugeConfig config = new HugeConfig(conf);

Assert.assertEquals(500L, (long) config.get(
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY));
Assert.assertEquals(30_000L, (long) config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY));
Assert.assertEquals(3, (int) config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES));
Assert.assertEquals(1000L, (long) config.get(
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL));
}

@Test
public void testReconnectRetriesCanBeDisabled() {
String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
.name();
Configuration conf = new PropertiesConfiguration();
conf.setProperty(retries, 0);
HugeConfig config = new HugeConfig(conf);
Assert.assertEquals(0, (int) config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES));
}

@Test
public void testExecuteWithRetrySucceedsAfterTransientFailures() {
Configuration conf = new PropertiesConfiguration();
HugeConfig config = new HugeConfig(conf);
CassandraSessionPool pool = new CassandraSessionPool(config,
"ks", "store");
Whitebox.setInternalState(pool, "maxRetries", 3);
Whitebox.setInternalState(pool, "retryInterval", 1L);
Whitebox.setInternalState(pool, "retryMaxDelay", 10L);

com.datastax.driver.core.Session driverSession = Mockito.mock(
com.datastax.driver.core.Session.class);
ResultSet rs = Mockito.mock(ResultSet.class);
NoHostAvailableException transientFailure =
new NoHostAvailableException(Collections.emptyMap());
Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
.thenThrow(transientFailure)
.thenThrow(transientFailure)
.thenReturn(rs);

CassandraSessionPool.Session session = pool.new Session();
Whitebox.setInternalState(session, "session", driverSession);

ResultSet result = session.execute("SELECT now() FROM system.local");
Assert.assertSame(rs, result);
Mockito.verify(driverSession, Mockito.times(3))
.execute(Mockito.any(Statement.class));
}

@Test
public void testReconnectOptionsExposeExpectedKeys() {
Assert.assertEquals("cassandra.reconnect_base_delay",
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY
.name());
Assert.assertEquals("cassandra.reconnect_max_delay",
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY
.name());
Assert.assertEquals("cassandra.reconnect_max_retries",
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
.name());
Assert.assertEquals("cassandra.reconnect_interval",
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL
.name());
}
}
Loading