Skip to content

Commit 76d2163

Browse files
committed
fix(cassandra): fix session poisoning on transient failures
- Reset driver session after each transient failure in executeWithRetry() so retries reopen cleanly via lazy open() - Remove redundant finally block in reconnectIfNeeded(); null session directly on DriverException - Store retryBaseDelay as field, reuse in open() (removes double-read) - One-time LOG.warn via AtomicBoolean for commitAsync() retry gap - Tighten defaults: max_delay 60s→10s, max_retries 10→3, interval 5s→1s - Wire retry config via HugeConfig in tests; add cross-validator tests
1 parent 5ac3990 commit 76d2163

3 files changed

Lines changed: 111 additions & 40 deletions

File tree

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public static synchronized CassandraOptions instance() {
148148
"exponential reconnection policy when a Cassandra host " +
149149
"becomes unreachable.",
150150
rangeInt(1000L, Long.MAX_VALUE),
151-
60_000L
151+
10_000L
152152
);
153153

154154
public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
@@ -159,7 +159,7 @@ public static synchronized CassandraOptions instance() {
159159
"(NoHostAvailableException / OperationTimedOutException). " +
160160
"Set to 0 to disable query-time retries.",
161161
rangeInt(0, Integer.MAX_VALUE),
162-
10
162+
3
163163
);
164164

165165
public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
@@ -170,6 +170,6 @@ public static synchronized CassandraOptions instance() {
170170
"actual wait grows with exponential backoff, capped at " +
171171
"cassandra.reconnect_max_delay.",
172172
rangeInt(100L, Long.MAX_VALUE),
173-
5000L
173+
1000L
174174
);
175175
}

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Collection;
2222
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import org.apache.hugegraph.backend.BackendException;
2526
import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
@@ -53,10 +54,21 @@ public class CassandraSessionPool extends BackendSessionPool {
5354
private static final String HEALTH_CHECK_CQL =
5455
"SELECT now() FROM system.local";
5556

57+
/**
58+
* Guards the one-time JVM-wide warning about {@code commitAsync()} not
59+
* being covered by query-time retries. {@link CassandraSessionPool} is
60+
* instantiated once per backend store per graph, so without this guard
61+
* the warning would fire many times on startup for a structural
62+
* limitation that does not change between instances.
63+
*/
64+
private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED =
65+
new AtomicBoolean(false);
66+
5667
private Cluster cluster;
5768
private final String keyspace;
5869
private final int maxRetries;
5970
private final long retryInterval;
71+
private final long retryBaseDelay;
6072
private final long retryMaxDelay;
6173

6274
public CassandraSessionPool(HugeConfig config,
@@ -78,7 +90,14 @@ public CassandraSessionPool(HugeConfig config,
7890
reconnectMax,
7991
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
8092
reconnectBase);
93+
this.retryBaseDelay = reconnectBase;
8194
this.retryMaxDelay = reconnectMax;
95+
96+
if (this.maxRetries > 0 &&
97+
ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) {
98+
LOG.warn("cassandra.reconnect_max_retries={} applies to sync commit()" +
99+
" only. commitAsync() has no retry protection.", this.maxRetries);
100+
}
82101
}
83102

84103
@Override
@@ -117,10 +136,8 @@ public synchronized void open() {
117136

118137
// Reconnection policy: let driver keep retrying nodes in background
119138
// with exponential backoff after they go down (see issue #2740).
120-
long reconnectBase = config.get(
121-
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
122139
builder.withReconnectionPolicy(
123-
new ExponentialReconnectionPolicy(reconnectBase,
140+
new ExponentialReconnectionPolicy(this.retryBaseDelay,
124141
this.retryMaxDelay));
125142

126143
// Credential options
@@ -211,7 +228,11 @@ public void commitAsync() {
211228
int processors = Math.min(statements.size(), 1023);
212229
List<ResultSetFuture> results = new ArrayList<>(processors + 1);
213230
for (Statement s : statements) {
214-
// TODO: commitAsync is not retried (async retry semantics are complex)
231+
// TODO(issue #2740): commitAsync() bypasses executeWithRetry().
232+
// During a Cassandra restart, async writes may fail with
233+
// NoHostAvailableException even when maxRetries > 0. Callers
234+
// must handle CompletableFuture failures. A follow-up will
235+
// wrap each future with retry semantics.
215236
ResultSetFuture future = this.session.executeAsync(s);
216237
results.add(future);
217238

@@ -253,13 +274,19 @@ public ResultSet execute(String statement, Object... args) {
253274
* reconnection policy, so once Cassandra comes back online, a
254275
* subsequent attempt here will succeed without restarting the server.
255276
*
277+
* <p>If the driver session has been discarded (e.g. by
278+
* {@link #reconnectIfNeeded()} after a failed health-check) it is
279+
* lazily reopened at the start of each attempt. After a transient
280+
* failure the session is {@linkplain #reset() reset} so the next
281+
* iteration gets a fresh driver session.
282+
*
256283
* <p><b>Blocking note:</b> retries block the calling thread via
257284
* {@link Thread#sleep(long)}. Worst-case a single call blocks for
258285
* {@code maxRetries * retryMaxDelay} ms. Under high-throughput
259286
* workloads concurrent threads may pile up in {@code sleep()} during
260287
* a Cassandra outage. For such deployments lower
261-
* {@code cassandra.reconnect_max_retries} (default 10) and
262-
* {@code cassandra.reconnect_max_delay} (default 60000ms) so the
288+
* {@code cassandra.reconnect_max_retries} (default 3) and
289+
* {@code cassandra.reconnect_max_delay} (default 10000ms) so the
263290
* request fails fast and pressure is released back to the caller.
264291
*/
265292
private ResultSet executeWithRetry(Statement statement) {
@@ -269,9 +296,18 @@ private ResultSet executeWithRetry(Statement statement) {
269296
DriverException lastError = null;
270297
for (int attempt = 0; attempt <= retries; attempt++) {
271298
try {
299+
if (this.session == null) {
300+
// Lazy reopen: may itself throw NHAE while
301+
// Cassandra is still unreachable; the catch below
302+
// treats that as a transient failure.
303+
this.open();
304+
}
272305
return this.session.execute(statement);
273306
} catch (NoHostAvailableException | OperationTimedOutException e) {
274307
lastError = e;
308+
// Discard the (possibly broken) driver session so the
309+
// next iteration reopens cleanly.
310+
this.reset();
275311
if (attempt >= retries) {
276312
break;
277313
}
@@ -359,9 +395,10 @@ public boolean hasChanges() {
359395
* Periodic liveness probe invoked by {@link BackendSessionPool} to
360396
* recover thread-local sessions after Cassandra has been restarted.
361397
* Reopens the driver session if it was closed and pings the cluster
362-
* with a lightweight query. Any failure here is swallowed so the
363-
* caller can still issue the real query, which will drive retries via
364-
* {@link #executeWithRetry(Statement)}.
398+
* with a lightweight query. On failure the session is discarded via
399+
* {@link #reset()} so the next call to
400+
* {@link #executeWithRetry(Statement)} reopens it; any exception
401+
* here is swallowed so the caller can still issue the real query.
365402
*/
366403
@Override
367404
public void reconnectIfNeeded() {
@@ -377,15 +414,9 @@ public void reconnectIfNeeded() {
377414
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
378415
}
379416
} catch (DriverException e) {
380-
LOG.debug("Cassandra health-check failed, " +
381-
"will retry on next query: {}", e.getMessage());
382-
} finally {
383-
// Keep opened flag consistent with session: if tryOpen()
384-
// failed to reopen, clear opened so the next execute() does
385-
// not NPE before executeWithRetry() can intercept.
386-
if (this.session == null) {
387-
this.opened = false;
388-
}
417+
LOG.debug("Cassandra health-check failed, resetting session: {}",
418+
e.getMessage());
419+
this.session = null;
389420
}
390421
}
391422

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,11 @@ public void testReconnectOptionsHaveSensibleDefaults() {
205205
// HugeGraph keeps running when Cassandra restarts (issue #2740).
206206
Assert.assertEquals(1000L, (long) CassandraOptions
207207
.CASSANDRA_RECONNECT_BASE_DELAY.defaultValue());
208-
Assert.assertEquals(60_000L, (long) CassandraOptions
208+
Assert.assertEquals(10_000L, (long) CassandraOptions
209209
.CASSANDRA_RECONNECT_MAX_DELAY.defaultValue());
210-
Assert.assertEquals(10, (int) CassandraOptions
210+
Assert.assertEquals(3, (int) CassandraOptions
211211
.CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue());
212-
Assert.assertEquals(5000L, (long) CassandraOptions
212+
Assert.assertEquals(1000L, (long) CassandraOptions
213213
.CASSANDRA_RECONNECT_INTERVAL.defaultValue());
214214
}
215215

@@ -251,13 +251,22 @@ public void testReconnectRetriesCanBeDisabled() {
251251

252252
@Test
253253
public void testExecuteWithRetrySucceedsAfterTransientFailures() {
254+
// Configure retry knobs via config so the pool reads them through
255+
// the normal path (no Whitebox overrides on retry fields). Keep the
256+
// values within the validators' lower bounds (base >= 100, max >=
257+
// base, interval >= 100).
254258
Configuration conf = new PropertiesConfiguration();
259+
conf.setProperty(
260+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
261+
conf.setProperty(
262+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
263+
conf.setProperty(
264+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES.name(), 3);
265+
conf.setProperty(
266+
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(), 100L);
255267
HugeConfig config = new HugeConfig(conf);
256268
CassandraSessionPool pool = new CassandraSessionPool(config,
257269
"ks", "store");
258-
Whitebox.setInternalState(pool, "maxRetries", 3);
259-
Whitebox.setInternalState(pool, "retryInterval", 1L);
260-
Whitebox.setInternalState(pool, "retryMaxDelay", 10L);
261270

262271
com.datastax.driver.core.Session driverSession = Mockito.mock(
263272
com.datastax.driver.core.Session.class);
@@ -269,6 +278,17 @@ public void testExecuteWithRetrySucceedsAfterTransientFailures() {
269278
.thenThrow(transientFailure)
270279
.thenReturn(rs);
271280

281+
// executeWithRetry now resets the driver session on transient
282+
// failures, so the next iteration calls cluster().connect(keyspace)
283+
// to obtain a fresh one. Install a mocked Cluster that hands back
284+
// the same driverSession for each reconnect.
285+
com.datastax.driver.core.Cluster mockCluster = Mockito.mock(
286+
com.datastax.driver.core.Cluster.class);
287+
Mockito.when(mockCluster.isClosed()).thenReturn(false);
288+
Mockito.when(mockCluster.connect(Mockito.anyString()))
289+
.thenReturn(driverSession);
290+
Whitebox.setInternalState(pool, "cluster", mockCluster);
291+
272292
CassandraSessionPool.Session session = pool.new Session();
273293
Whitebox.setInternalState(session, "session", driverSession);
274294

@@ -279,18 +299,38 @@ public void testExecuteWithRetrySucceedsAfterTransientFailures() {
279299
}
280300

281301
@Test
282-
public void testReconnectOptionsExposeExpectedKeys() {
283-
Assert.assertEquals("cassandra.reconnect_base_delay",
284-
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY
285-
.name());
286-
Assert.assertEquals("cassandra.reconnect_max_delay",
287-
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY
288-
.name());
289-
Assert.assertEquals("cassandra.reconnect_max_retries",
290-
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES
291-
.name());
292-
Assert.assertEquals("cassandra.reconnect_interval",
293-
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL
294-
.name());
302+
public void testReconnectBaseDelayBelowMinimumRejected() {
303+
// The validator on CASSANDRA_RECONNECT_BASE_DELAY is
304+
// rangeInt(100L, Long.MAX_VALUE); values below 100 must be rejected
305+
// at parse time. Setting the property as a String forces HugeConfig
306+
// to run parseConvert() which invokes the range check.
307+
Configuration conf = new PropertiesConfiguration();
308+
Assert.assertThrows(Exception.class, () -> {
309+
conf.setProperty(
310+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
311+
"50");
312+
new HugeConfig(conf);
313+
});
314+
}
315+
316+
@Test
317+
public void testReconnectMaxDelayLessThanBaseRejected() {
318+
// Both values must pass their individual range validators with margin
319+
// (base >= 100, max >= 1000), so the only thing that can throw is the
320+
// E.checkArgument(max >= base) cross-check inside the pool ctor. Set
321+
// all four reconnect properties explicitly so the test does not depend
322+
// on default values changing in CassandraOptions.
323+
Configuration conf = new PropertiesConfiguration();
324+
conf.setProperty(
325+
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 10_000L);
326+
conf.setProperty(
327+
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 2_000L);
328+
conf.setProperty(
329+
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES.name(), 3);
330+
conf.setProperty(
331+
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(), 1_000L);
332+
HugeConfig config = new HugeConfig(conf);
333+
Assert.assertThrows(IllegalArgumentException.class, () ->
334+
new CassandraSessionPool(config, "ks", "store"));
295335
}
296336
}

0 commit comments

Comments
 (0)