Skip to content

Commit 69cccd1

Browse files
committed
fix(cassandra): address reconnect retry review
1 parent 76d2163 commit 69cccd1

3 files changed

Lines changed: 184 additions & 49 deletions

File tree

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,20 +151,21 @@ public static synchronized CassandraOptions instance() {
151151
10_000L
152152
);
153153

154-
public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
154+
public static final ConfigOption<Integer> CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS =
155155
new ConfigOption<>(
156-
"cassandra.reconnect_max_retries",
157-
"The maximum number of retries applied at query-time when " +
158-
"a Cassandra host is temporarily unreachable " +
159-
"(NoHostAvailableException / OperationTimedOutException). " +
156+
"cassandra.query_retry_max_attempts",
157+
"The maximum number of retry attempts applied at query-time when " +
158+
"a Cassandra host is temporarily unreachable. " +
159+
"OperationTimedOutException is retried only for " +
160+
"idempotent statements. " +
160161
"Set to 0 to disable query-time retries.",
161162
rangeInt(0, Integer.MAX_VALUE),
162163
3
163164
);
164165

165-
public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
166+
public static final ConfigOption<Long> CASSANDRA_QUERY_RETRY_INTERVAL =
166167
new ConfigOption<>(
167-
"cassandra.reconnect_interval",
168+
"cassandra.query_retry_interval",
168169
"The interval in milliseconds between query-time retries " +
169170
"when a Cassandra host is temporarily unreachable. The " +
170171
"actual wait grows with exponential backoff, capped at " +

hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ public CassandraSessionPool(HugeConfig config,
7777
this.cluster = null;
7878
this.keyspace = keyspace;
7979
this.maxRetries = config.get(
80-
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES);
80+
CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS);
8181
this.retryInterval = config.get(
82-
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL);
82+
CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL);
8383
long reconnectBase = config.get(
8484
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
8585
long reconnectMax = config.get(
@@ -95,7 +95,7 @@ public CassandraSessionPool(HugeConfig config,
9595

9696
if (this.maxRetries > 0 &&
9797
ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) {
98-
LOG.warn("cassandra.reconnect_max_retries={} applies to sync commit()" +
98+
LOG.warn("cassandra.query_retry_max_attempts={} applies to sync commit()" +
9999
" only. commitAsync() has no retry protection.", this.maxRetries);
100100
}
101101
}
@@ -223,17 +223,25 @@ public ResultSet commit() {
223223

224224
public void commitAsync() {
225225
Collection<Statement> statements = this.batch.getStatements();
226+
if (statements.isEmpty()) {
227+
this.batch.clear();
228+
return;
229+
}
226230

227231
int count = 0;
228232
int processors = Math.min(statements.size(), 1023);
229233
List<ResultSetFuture> results = new ArrayList<>(processors + 1);
234+
com.datastax.driver.core.Session driverSession =
235+
this.sessionForAsyncCommit();
230236
for (Statement s : statements) {
231-
// TODO(issue #2740): commitAsync() bypasses executeWithRetry().
237+
// TODO: track async retry support in a follow-up issue.
238+
// commitAsync() bypasses executeWithRetry().
232239
// During a Cassandra restart, async writes may fail with
233240
// NoHostAvailableException even when maxRetries > 0. Callers
234-
// must handle CompletableFuture failures. A follow-up will
235-
// wrap each future with retry semantics.
236-
ResultSetFuture future = this.session.executeAsync(s);
241+
// must handle ResultSetFuture failures surfaced by
242+
// getUninterruptibly(). A follow-up issue should wrap each
243+
// future with retry semantics.
244+
ResultSetFuture future = driverSession.executeAsync(s);
237245
results.add(future);
238246

239247
if (++count > processors) {
@@ -274,18 +282,20 @@ public ResultSet execute(String statement, Object... args) {
274282
* reconnection policy, so once Cassandra comes back online, a
275283
* subsequent attempt here will succeed without restarting the server.
276284
*
285+
* <p>OperationTimedOutException is only retried for statements marked
286+
* idempotent; otherwise a timed-out mutation might be applied once by
287+
* Cassandra and then duplicated by a client-side retry.
288+
*
277289
* <p>If the driver session has been discarded (e.g. by
278290
* {@link #reconnectIfNeeded()} after a failed health-check) it is
279-
* lazily reopened at the start of each attempt. After a transient
280-
* failure the session is {@linkplain #reset() reset} so the next
281-
* iteration gets a fresh driver session.
291+
* lazily reopened at the start of each attempt.
282292
*
283293
* <p><b>Blocking note:</b> retries block the calling thread via
284294
* {@link Thread#sleep(long)}. Worst-case a single call blocks for
285295
* {@code maxRetries * retryMaxDelay} ms. Under high-throughput
286296
* workloads concurrent threads may pile up in {@code sleep()} during
287297
* a Cassandra outage. For such deployments lower
288-
* {@code cassandra.reconnect_max_retries} (default 3) and
298+
* {@code cassandra.query_retry_max_attempts} (default 3) and
289299
* {@code cassandra.reconnect_max_delay} (default 10000ms) so the
290300
* request fails fast and pressure is released back to the caller.
291301
*/
@@ -296,18 +306,23 @@ private ResultSet executeWithRetry(Statement statement) {
296306
DriverException lastError = null;
297307
for (int attempt = 0; attempt <= retries; attempt++) {
298308
try {
299-
if (this.session == null) {
309+
if (this.session == null || this.session.isClosed()) {
300310
// Lazy reopen: may itself throw NHAE while
301311
// Cassandra is still unreachable; the catch below
302312
// treats that as a transient failure.
313+
this.session = null;
303314
this.open();
304315
}
305316
return this.session.execute(statement);
306317
} catch (NoHostAvailableException | OperationTimedOutException e) {
307318
lastError = e;
308-
// Discard the (possibly broken) driver session so the
309-
// next iteration reopens cleanly.
310-
this.reset();
319+
if (e instanceof OperationTimedOutException &&
320+
!Boolean.TRUE.equals(statement.isIdempotent())) {
321+
throw new BackendException(
322+
"Cassandra query timed out and won't be " +
323+
"retried because the statement is not " +
324+
"marked idempotent", e);
325+
}
311326
if (attempt >= retries) {
312327
break;
313328
}
@@ -336,7 +351,7 @@ private ResultSet executeWithRetry(Statement statement) {
336351
}
337352
// Preserve original exception as cause (stack trace + type) by
338353
// pre-formatting the message and using the (String, Throwable)
339-
// constructor explicitly — avoids ambiguity with varargs overloads.
354+
// constructor explicitly to avoid ambiguity with varargs overloads.
340355
String msg = String.format(
341356
"Failed to execute Cassandra query after %s retries: %s",
342357
retries,
@@ -353,6 +368,24 @@ private void tryOpen() {
353368
}
354369
}
355370

371+
private com.datastax.driver.core.Session sessionForAsyncCommit() {
372+
if (this.session == null || this.session.isClosed()) {
373+
this.session = null;
374+
try {
375+
this.open();
376+
} catch (DriverException e) {
377+
throw new BackendException(
378+
"Failed to open Cassandra session for async commit",
379+
e);
380+
}
381+
}
382+
if (this.session == null) {
383+
throw new BackendException(
384+
"Cassandra session is unavailable for async commit");
385+
}
386+
return this.session;
387+
}
388+
356389
@Override
357390
public void open() {
358391
this.opened = true;
@@ -413,10 +446,10 @@ public void reconnectIfNeeded() {
413446
if (this.session != null) {
414447
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
415448
}
416-
} catch (DriverException e) {
449+
} catch (NoHostAvailableException | OperationTimedOutException e) {
417450
LOG.debug("Cassandra health-check failed, resetting session: {}",
418451
e.getMessage());
419-
this.session = null;
452+
this.reset();
420453
}
421454
}
422455

0 commit comments

Comments
 (0)