Skip to content

Commit fc3d291

Browse files
committed
fix(cassandra): auto-recover session after Cassandra restart
- Register ExponentialReconnectionPolicy on the Cluster builder so the Datastax driver keeps retrying downed nodes in the background. - Wrap every Session.execute() in executeWithRetry() with exponential backoff on transient connectivity failures. - Implement reconnectIfNeeded()/reset() so the pool reopens closed sessions and issues a lightweight health-check (SELECT now() FROM system.local) before subsequent queries. - Add tunable options: cassandra.reconnect_base_delay, cassandra.reconnect_max_delay, cassandra.reconnect_max_retries, cassandra.reconnect_interval. - Add unit tests covering defaults, overrides, disabling retries and option keys. Fixes #2740
1 parent 9336b5e commit fc3d291

File tree

3 files changed

+275
-4
lines changed

3 files changed

+275
-4
lines changed

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() {
130130
positiveInt(),
131131
12 * 60 * 60
132132
);
133+
134+
public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
135+
new ConfigOption<>(
136+
"cassandra.reconnect_base_delay",
137+
"The base delay in milliseconds used by the driver's " +
138+
"exponential reconnection policy when a Cassandra host " +
139+
"becomes unreachable.",
140+
rangeInt(100L, Long.MAX_VALUE),
141+
1000L
142+
);
143+
144+
public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
145+
new ConfigOption<>(
146+
"cassandra.reconnect_max_delay",
147+
"The maximum delay in milliseconds used by the driver's " +
148+
"exponential reconnection policy when a Cassandra host " +
149+
"becomes unreachable.",
150+
rangeInt(1000L, Long.MAX_VALUE),
151+
60_000L
152+
);
153+
154+
public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
155+
new ConfigOption<>(
156+
"cassandra.reconnect_max_retries",
157+
"The maximum number of retries applied at query-time when " +
158+
"a Cassandra host is temporarily unreachable " +
159+
"(NoHostAvailableException / OperationTimedOutException). " +
160+
"Set to 0 to disable query-time retries.",
161+
rangeInt(0, Integer.MAX_VALUE),
162+
10
163+
);
164+
165+
public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
166+
new ConfigOption<>(
167+
"cassandra.reconnect_interval",
168+
"The interval in milliseconds between query-time retries " +
169+
"when a Cassandra host is temporarily unreachable. The " +
170+
"actual wait grows with exponential backoff, capped at " +
171+
"cassandra.reconnect_max_delay.",
172+
rangeInt(100L, Long.MAX_VALUE),
173+
5000L
174+
);
133175
}

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java

Lines changed: 132 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.hugegraph.backend.store.BackendSessionPool;
2727
import org.apache.hugegraph.config.HugeConfig;
2828
import org.apache.hugegraph.util.E;
29+
import org.apache.hugegraph.util.Log;
30+
import org.slf4j.Logger;
2931

3032
import com.datastax.driver.core.BatchStatement;
3133
import com.datastax.driver.core.Cluster;
@@ -34,22 +36,37 @@
3436
import com.datastax.driver.core.ProtocolOptions.Compression;
3537
import com.datastax.driver.core.ResultSet;
3638
import com.datastax.driver.core.ResultSetFuture;
39+
import com.datastax.driver.core.SimpleStatement;
3740
import com.datastax.driver.core.SocketOptions;
3841
import com.datastax.driver.core.Statement;
42+
import com.datastax.driver.core.exceptions.DriverException;
3943
import com.datastax.driver.core.exceptions.InvalidQueryException;
44+
import com.datastax.driver.core.exceptions.NoHostAvailableException;
45+
import com.datastax.driver.core.exceptions.OperationTimedOutException;
46+
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
4047

4148
public class CassandraSessionPool extends BackendSessionPool {
4249

50+
private static final Logger LOG = Log.logger(CassandraSessionPool.class);
51+
4352
private static final int SECOND = 1000;
53+
private static final String HEALTH_CHECK_CQL =
54+
"SELECT now() FROM system.local";
4455

4556
private Cluster cluster;
4657
private final String keyspace;
58+
private int maxRetries;
59+
private long retryInterval;
60+
private long retryMaxDelay;
4761

4862
public CassandraSessionPool(HugeConfig config,
4963
String keyspace, String store) {
5064
super(config, keyspace + "/" + store);
5165
this.cluster = null;
5266
this.keyspace = keyspace;
67+
this.maxRetries = 0;
68+
this.retryInterval = 0L;
69+
this.retryMaxDelay = 0L;
5370
}
5471

5572
@Override
@@ -86,6 +103,26 @@ public synchronized void open() {
86103

87104
builder.withSocketOptions(socketOptions);
88105

106+
// Reconnection policy: let driver keep retrying nodes in background
107+
// with exponential backoff after they go down (see issue #2740).
108+
long reconnectBase = config.get(
109+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
110+
long reconnectMax = config.get(
111+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
112+
E.checkArgument(reconnectMax >= reconnectBase,
113+
"'%s' (%s) must be >= '%s' (%s)",
114+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
115+
reconnectMax,
116+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
117+
reconnectBase);
118+
builder.withReconnectionPolicy(
119+
new ExponentialReconnectionPolicy(reconnectBase, reconnectMax));
120+
this.retryMaxDelay = reconnectMax;
121+
this.maxRetries = config.get(
122+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES);
123+
this.retryInterval = config.get(
124+
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL);
125+
89126
// Credential options
90127
String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
91128
String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
@@ -161,7 +198,7 @@ public void rollback() {
161198

162199
@Override
163200
public ResultSet commit() {
164-
ResultSet rs = this.session.execute(this.batch);
201+
ResultSet rs = this.executeWithRetry(this.batch);
165202
// Clear batch if execute() successfully (retained if failed)
166203
this.batch.clear();
167204
return rs;
@@ -197,15 +234,59 @@ public ResultSet query(Statement statement) {
197234
}
198235

199236
public ResultSet execute(Statement statement) {
200-
return this.session.execute(statement);
237+
return this.executeWithRetry(statement);
201238
}
202239

203240
public ResultSet execute(String statement) {
204-
return this.session.execute(statement);
241+
return this.executeWithRetry(new SimpleStatement(statement));
205242
}
206243

207244
public ResultSet execute(String statement, Object... args) {
208-
return this.session.execute(statement, args);
245+
return this.executeWithRetry(new SimpleStatement(statement, args));
246+
}
247+
248+
/**
249+
* Execute a statement, retrying on transient connectivity failures
250+
* (NoHostAvailableException / OperationTimedOutException). The driver
251+
* itself keeps retrying connections in the background via the
252+
* reconnection policy, so once Cassandra comes back online, a
253+
* subsequent attempt here will succeed without restarting the server.
254+
* See issue #2740.
255+
*/
256+
private ResultSet executeWithRetry(Statement statement) {
257+
int retries = CassandraSessionPool.this.maxRetries;
258+
long interval = CassandraSessionPool.this.retryInterval;
259+
long maxDelay = CassandraSessionPool.this.retryMaxDelay;
260+
DriverException lastError = null;
261+
for (int attempt = 0; attempt <= retries; attempt++) {
262+
try {
263+
return this.session.execute(statement);
264+
} catch (NoHostAvailableException | OperationTimedOutException e) {
265+
lastError = e;
266+
if (attempt >= retries) {
267+
break;
268+
}
269+
long delay = Math.min(interval * (1L << Math.min(attempt, 20)),
270+
maxDelay > 0 ? maxDelay : interval);
271+
LOG.warn("Cassandra temporarily unavailable ({}), " +
272+
"retry {}/{} in {} ms",
273+
e.getClass().getSimpleName(), attempt + 1,
274+
retries, delay);
275+
try {
276+
Thread.sleep(delay);
277+
} catch (InterruptedException ie) {
278+
Thread.currentThread().interrupt();
279+
throw new BackendException("Interrupted while " +
280+
"waiting to retry " +
281+
"Cassandra query", ie);
282+
}
283+
}
284+
}
285+
throw new BackendException("Failed to execute Cassandra query " +
286+
"after %s retries: %s",
287+
lastError, retries,
288+
lastError == null ? "<null>" :
289+
lastError.getMessage());
209290
}
210291

211292
private void tryOpen() {
@@ -255,6 +336,53 @@ public boolean hasChanges() {
255336
return this.batch.size() > 0;
256337
}
257338

339+
/**
340+
* Periodic liveness probe invoked by {@link BackendSessionPool} to
341+
* recover thread-local sessions after Cassandra has been restarted.
342+
* Reopens the driver session if it was closed and pings the cluster
343+
* with a lightweight query. Any failure here is swallowed so the
344+
* caller can still issue the real query, which will drive retries via
345+
* {@link #executeWithRetry(Statement)}.
346+
*/
347+
@Override
348+
public void reconnectIfNeeded() {
349+
if (!this.opened) {
350+
return;
351+
}
352+
try {
353+
if (this.session == null || this.session.isClosed()) {
354+
this.session = null;
355+
this.tryOpen();
356+
if (this.session == null) {
357+
return;
358+
}
359+
}
360+
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
361+
} catch (DriverException e) {
362+
LOG.debug("Cassandra health-check failed, " +
363+
"will retry on next query: {}", e.getMessage());
364+
}
365+
}
366+
367+
/**
368+
* Force-close the driver session so it is re-opened on the next
369+
* {@link #opened()} call. Used when a failure is observed and we
370+
* want to start fresh on the next attempt.
371+
*/
372+
@Override
373+
public void reset() {
374+
if (this.session == null) {
375+
return;
376+
}
377+
try {
378+
this.session.close();
379+
} catch (Throwable e) {
380+
LOG.warn("Failed to reset Cassandra session", e);
381+
} finally {
382+
this.session = null;
383+
}
384+
}
385+
258386
public Collection<Statement> statements() {
259387
return this.batch.getStatements();
260388
}

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.hugegraph.unit.cassandra;
1919

20+
import java.util.Collections;
2021
import java.util.Map;
2122

2223
import org.apache.commons.configuration2.Configuration;
2324
import org.apache.commons.configuration2.PropertiesConfiguration;
2425
import org.apache.hugegraph.backend.store.cassandra.CassandraOptions;
26+
import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool;
2527
import org.apache.hugegraph.backend.store.cassandra.CassandraStore;
2628
import org.apache.hugegraph.config.HugeConfig;
2729
import org.apache.hugegraph.config.OptionSpace;
@@ -30,7 +32,11 @@
3032
import org.junit.After;
3133
import org.junit.Before;
3234
import org.junit.Test;
35+
import org.mockito.Mockito;
3336

37+
import com.datastax.driver.core.ResultSet;
38+
import com.datastax.driver.core.Statement;
39+
import com.datastax.driver.core.exceptions.NoHostAvailableException;
3440
import com.google.common.collect.ImmutableList;
3541
import com.google.common.collect.ImmutableMap;
3642

@@ -192,4 +198,99 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() {
192198
Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config);
193199
});
194200
}
201+
202+
@Test
203+
public void testReconnectOptionsHaveSensibleDefaults() {
204+
// Runtime-reconnection options must exist with non-zero defaults so
205+
// HugeGraph keeps running when Cassandra restarts (issue #2740).
206+
Assert.assertEquals(1000L, (long) CassandraOptions
207+
.CASSANDRA_RECONNECT_BASE_DELAY.defaultValue());
208+
Assert.assertEquals(60_000L, (long) CassandraOptions
209+
.CASSANDRA_RECONNECT_MAX_DELAY.defaultValue());
210+
Assert.assertEquals(10, (int) CassandraOptions
211+
.CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue());
212+
Assert.assertEquals(5000L, (long) CassandraOptions
213+
.CASSANDRA_RECONNECT_INTERVAL.defaultValue());
214+
}
215+
216+
@Test
217+
public void testReconnectOptionsAreOverridable() {
218+
String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name();
219+
String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name();
220+
String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
221+
.name();
222+
String interval = CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name();
223+
224+
Configuration conf = new PropertiesConfiguration();
225+
conf.setProperty(base, 500L);
226+
conf.setProperty(max, 30_000L);
227+
conf.setProperty(retries, 3);
228+
conf.setProperty(interval, 1000L);
229+
HugeConfig config = new HugeConfig(conf);
230+
231+
Assert.assertEquals(500L, (long) config.get(
232+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY));
233+
Assert.assertEquals(30_000L, (long) config.get(
234+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY));
235+
Assert.assertEquals(3, (int) config.get(
236+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES));
237+
Assert.assertEquals(1000L, (long) config.get(
238+
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL));
239+
}
240+
241+
@Test
242+
public void testReconnectRetriesCanBeDisabled() {
243+
String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
244+
.name();
245+
Configuration conf = new PropertiesConfiguration();
246+
conf.setProperty(retries, 0);
247+
HugeConfig config = new HugeConfig(conf);
248+
Assert.assertEquals(0, (int) config.get(
249+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES));
250+
}
251+
252+
@Test
253+
public void testExecuteWithRetrySucceedsAfterTransientFailures() {
254+
Configuration conf = new PropertiesConfiguration();
255+
HugeConfig config = new HugeConfig(conf);
256+
CassandraSessionPool pool = new CassandraSessionPool(config,
257+
"ks", "store");
258+
Whitebox.setInternalState(pool, "maxRetries", 3);
259+
Whitebox.setInternalState(pool, "retryInterval", 1L);
260+
Whitebox.setInternalState(pool, "retryMaxDelay", 10L);
261+
262+
com.datastax.driver.core.Session driverSession = Mockito.mock(
263+
com.datastax.driver.core.Session.class);
264+
ResultSet rs = Mockito.mock(ResultSet.class);
265+
NoHostAvailableException transientFailure =
266+
new NoHostAvailableException(Collections.emptyMap());
267+
Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
268+
.thenThrow(transientFailure)
269+
.thenThrow(transientFailure)
270+
.thenReturn(rs);
271+
272+
CassandraSessionPool.Session session = pool.new Session();
273+
Whitebox.setInternalState(session, "session", driverSession);
274+
275+
ResultSet result = session.execute("SELECT now() FROM system.local");
276+
Assert.assertSame(rs, result);
277+
Mockito.verify(driverSession, Mockito.times(3))
278+
.execute(Mockito.any(Statement.class));
279+
}
280+
281+
@Test
282+
public void testReconnectOptionsExposeExpectedKeys() {
283+
Assert.assertEquals("cassandra.reconnect_base_delay",
284+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY
285+
.name());
286+
Assert.assertEquals("cassandra.reconnect_max_delay",
287+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY
288+
.name());
289+
Assert.assertEquals("cassandra.reconnect_max_retries",
290+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
291+
.name());
292+
Assert.assertEquals("cassandra.reconnect_interval",
293+
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL
294+
.name());
295+
}
195296
}

0 commit comments

Comments
 (0)