-
Notifications
You must be signed in to change notification settings - Fork 596
fix(cassandra): auto-recover session after Cassandra restart #2997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,8 @@ | |||||||||||||
| import org.apache.hugegraph.backend.store.BackendSessionPool; | ||||||||||||||
| import org.apache.hugegraph.config.HugeConfig; | ||||||||||||||
| import org.apache.hugegraph.util.E; | ||||||||||||||
| import org.apache.hugegraph.util.Log; | ||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||
|
|
||||||||||||||
| import com.datastax.driver.core.BatchStatement; | ||||||||||||||
| import com.datastax.driver.core.Cluster; | ||||||||||||||
|
|
@@ -34,22 +36,37 @@ | |||||||||||||
| import com.datastax.driver.core.ProtocolOptions.Compression; | ||||||||||||||
| import com.datastax.driver.core.ResultSet; | ||||||||||||||
| import com.datastax.driver.core.ResultSetFuture; | ||||||||||||||
| import com.datastax.driver.core.SimpleStatement; | ||||||||||||||
| import com.datastax.driver.core.SocketOptions; | ||||||||||||||
| import com.datastax.driver.core.Statement; | ||||||||||||||
| import com.datastax.driver.core.exceptions.DriverException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.InvalidQueryException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.NoHostAvailableException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.OperationTimedOutException; | ||||||||||||||
| import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; | ||||||||||||||
|
|
||||||||||||||
| public class CassandraSessionPool extends BackendSessionPool { | ||||||||||||||
|
|
||||||||||||||
| private static final Logger LOG = Log.logger(CassandraSessionPool.class); | ||||||||||||||
|
|
||||||||||||||
| private static final int SECOND = 1000; | ||||||||||||||
| private static final String HEALTH_CHECK_CQL = | ||||||||||||||
| "SELECT now() FROM system.local"; | ||||||||||||||
|
|
||||||||||||||
| private Cluster cluster; | ||||||||||||||
| private final String keyspace; | ||||||||||||||
| private int maxRetries; | ||||||||||||||
| private long retryInterval; | ||||||||||||||
| private long retryMaxDelay; | ||||||||||||||
|
|
||||||||||||||
| public CassandraSessionPool(HugeConfig config, | ||||||||||||||
| String keyspace, String store) { | ||||||||||||||
| super(config, keyspace + "/" + store); | ||||||||||||||
| this.cluster = null; | ||||||||||||||
| this.keyspace = keyspace; | ||||||||||||||
| this.maxRetries = 0; | ||||||||||||||
| this.retryInterval = 0L; | ||||||||||||||
| this.retryMaxDelay = 0L; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
|
|
@@ -86,6 +103,26 @@ public synchronized void open() { | |||||||||||||
|
|
||||||||||||||
| builder.withSocketOptions(socketOptions); | ||||||||||||||
|
|
||||||||||||||
| // Reconnection policy: let driver keep retrying nodes in background | ||||||||||||||
| // with exponential backoff after they go down (see issue #2740). | ||||||||||||||
| long reconnectBase = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); | ||||||||||||||
| long reconnectMax = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); | ||||||||||||||
| E.checkArgument(reconnectMax >= reconnectBase, | ||||||||||||||
| "'%s' (%s) must be >= '%s' (%s)", | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), | ||||||||||||||
| reconnectMax, | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), | ||||||||||||||
| reconnectBase); | ||||||||||||||
| builder.withReconnectionPolicy( | ||||||||||||||
| new ExponentialReconnectionPolicy(reconnectBase, reconnectMax)); | ||||||||||||||
| this.retryMaxDelay = reconnectMax; | ||||||||||||||
| this.maxRetries = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); | ||||||||||||||
| this.retryInterval = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); | ||||||||||||||
|
|
||||||||||||||
| // Credential options | ||||||||||||||
| String username = config.get(CassandraOptions.CASSANDRA_USERNAME); | ||||||||||||||
| String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); | ||||||||||||||
|
|
@@ -161,7 +198,7 @@ public void rollback() { | |||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public ResultSet commit() { | ||||||||||||||
| ResultSet rs = this.session.execute(this.batch); | ||||||||||||||
| ResultSet rs = this.executeWithRetry(this.batch); | ||||||||||||||
| // Clear batch if execute() successfully (retained if failed) | ||||||||||||||
| this.batch.clear(); | ||||||||||||||
| return rs; | ||||||||||||||
|
|
@@ -197,15 +234,59 @@ public ResultSet query(Statement statement) { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(Statement statement) { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| return this.executeWithRetry(statement); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(String statement) { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| return this.executeWithRetry(new SimpleStatement(statement)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(String statement, Object... args) { | ||||||||||||||
| return this.session.execute(statement, args); | ||||||||||||||
| return this.executeWithRetry(new SimpleStatement(statement, args)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Execute a statement, retrying on transient connectivity failures | ||||||||||||||
| * (NoHostAvailableException / OperationTimedOutException). The driver | ||||||||||||||
| * itself keeps retrying connections in the background via the | ||||||||||||||
| * reconnection policy, so once Cassandra comes back online, a | ||||||||||||||
| * subsequent attempt here will succeed without restarting the server. | ||||||||||||||
| * See issue #2740. | ||||||||||||||
| */ | ||||||||||||||
| private ResultSet executeWithRetry(Statement statement) { | ||||||||||||||
| int retries = CassandraSessionPool.this.maxRetries; | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The constructor signature being called here is The format string has two To preserve the cause chain and format the message correctly, use the
Suggested change
Should be: throw new BackendException(
"Failed to execute Cassandra query after %s retries: %s",
lastError, retries,
lastError == null ? "<null>" : lastError.getMessage());Wait — looking more carefully at the available constructors:
The fix: throw new BackendException(
"Failed to execute Cassandra query after %s retries",
lastError, retries);This passes |
||||||||||||||
| long interval = CassandraSessionPool.this.retryInterval; | ||||||||||||||
| long maxDelay = CassandraSessionPool.this.retryMaxDelay; | ||||||||||||||
| DriverException lastError = null; | ||||||||||||||
| for (int attempt = 0; attempt <= retries; attempt++) { | ||||||||||||||
| try { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| } catch (NoHostAvailableException | OperationTimedOutException e) { | ||||||||||||||
| lastError = e; | ||||||||||||||
| if (attempt >= retries) { | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| long delay = Math.min(interval * (1L << Math.min(attempt, 20)), | ||||||||||||||
| maxDelay > 0 ? maxDelay : interval); | ||||||||||||||
| LOG.warn("Cassandra temporarily unavailable ({}), " + | ||||||||||||||
| "retry {}/{} in {} ms", | ||||||||||||||
| e.getClass().getSimpleName(), attempt + 1, | ||||||||||||||
| retries, delay); | ||||||||||||||
| try { | ||||||||||||||
| Thread.sleep(delay); | ||||||||||||||
| } catch (InterruptedException ie) { | ||||||||||||||
| Thread.currentThread().interrupt(); | ||||||||||||||
| throw new BackendException("Interrupted while " + | ||||||||||||||
| "waiting to retry " + | ||||||||||||||
| "Cassandra query", ie); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| throw new BackendException("Failed to execute Cassandra query " + | ||||||||||||||
| "after %s retries: %s", | ||||||||||||||
| lastError, retries, | ||||||||||||||
| lastError == null ? "<null>" : | ||||||||||||||
| lastError.getMessage()); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| private void tryOpen() { | ||||||||||||||
|
|
@@ -255,6 +336,53 @@ public boolean hasChanges() { | |||||||||||||
| return this.batch.size() > 0; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Periodic liveness probe invoked by {@link BackendSessionPool} to | ||||||||||||||
| * recover thread-local sessions after Cassandra has been restarted. | ||||||||||||||
| * Reopens the driver session if it was closed and pings the cluster | ||||||||||||||
| * with a lightweight query. Any failure here is swallowed so the | ||||||||||||||
| * caller can still issue the real query, which will drive retries via | ||||||||||||||
| * {@link #executeWithRetry(Statement)}. | ||||||||||||||
| */ | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the health check ( This means if the health check catches a failure, the session is marked as "recently updated" but is actually broken — and Also: if Consider setting |
||||||||||||||
| @Override | ||||||||||||||
| public void reconnectIfNeeded() { | ||||||||||||||
| if (!this.opened) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| if (this.session == null || this.session.isClosed()) { | ||||||||||||||
| this.session = null; | ||||||||||||||
| this.tryOpen(); | ||||||||||||||
| if (this.session == null) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); | ||||||||||||||
| } catch (DriverException e) { | ||||||||||||||
| LOG.debug("Cassandra health-check failed, " + | ||||||||||||||
| "will retry on next query: {}", e.getMessage()); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Force-close the driver session so it is re-opened on the next | ||||||||||||||
| * {@link #opened()} call. Used when a failure is observed and we | ||||||||||||||
| * want to start fresh on the next attempt. | ||||||||||||||
| */ | ||||||||||||||
| @Override | ||||||||||||||
| public void reset() { | ||||||||||||||
| if (this.session == null) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| this.session.close(); | ||||||||||||||
| } catch (Throwable e) { | ||||||||||||||
| LOG.warn("Failed to reset Cassandra session", e); | ||||||||||||||
| } finally { | ||||||||||||||
| this.session = null; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public Collection<Statement> statements() { | ||||||||||||||
| return this.batch.getStatements(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Minor:
Thread.sleep()inexecuteWithRetryblocks the calling threadThis is acceptable for low-QPS scenarios, but for high-throughput workloads with many concurrent queries hitting a downed Cassandra, all request threads will pile up sleeping here (up to 10 retries × 60s max delay = 10 minutes worst case per thread).
Consider whether the retry count and max delay defaults might be too aggressive for production. A thread blocked for minutes can cascade into thread pool exhaustion. An alternative would be to fail faster (e.g., 3 retries with shorter delays) and let the caller/user retry at a higher level.
Not a blocker — just worth considering for the default values.