Skip to content

Commit e1a2c53

Browse files
authored
[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions (#1415)
## Summary Design + implementation for PECOBLR-2321: periodic heartbeat polling to keep server-side result state alive while the client consumes results slowly. ## Problem When users read query results slowly (pausing between `next()` calls), the warehouse can auto-stop after its idle timeout. For inline results (data only on cluster, not uploaded to cloud storage), this means permanent data loss. The user gets errors like `INVALID_HANDLE_STATUS` or "operation not found". ## Solution A `ResultHeartbeatManager` that periodically calls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to signal the server that results are still being consumed. Opt-in via `EnableHeartbeat=1` connection parameter (default false due to cost implications). ### Design doc `docs/design/HEARTBEAT_KEEP_ALIVE.md` — includes cross-driver survey, Mermaid diagrams (sequence flows, state machine, class diagram), and detailed lifecycle analysis. ### Heartbeat eligibility (skipped when not needed) | Scenario | Heartbeat? | Reason | |----------|-----------|--------| | SEA cloud fetch (Arrow) | **Yes** | Statement must stay alive for URL refresh | | Thrift inline (columnar) | **Yes** | Data fetched on-demand; server can evict | | Thrift cloud fetch | **Yes** | Operation handle must stay alive | | SEA inline (JSON) | No | All data loaded into memory at construction | | Direct results (CLOSED state) | No | Server already closed; data fully delivered | | Update count (DML) | No | No result rows; execution polling already kept it alive | | Async execution wait | No | User controls polling via `getExecutionResult()` — heartbeat starts only when ResultSet is constructed | ### Error resilience - 10 consecutive failures before self-stop - Terminal states (CLOSED/ERROR/CANCELED/TIMEDOUT) auto-stop - Single success resets failure counter ### Zero-leak guarantee Heartbeat stops in 4 places: `next()` returns false, `ResultSet.close()`, `Statement.close()`, `Connection.close()` ### Implementation - **New**: `ResultHeartbeatManager` — per-connection manager with `ScheduledExecutorService` (daemon thread) - **New**: `ResultHeartbeatManagerTest` — 7 unit tests - **Modified**: `DatabricksJdbcUrlParams` — `EnableHeartbeat` (default 0), `HeartbeatIntervalSeconds` (default 60) - **Modified**: `DatabricksConnectionContext` — getter methods - **Modified**: `DatabricksConnection` — creates/shuts down manager - **Modified**: `DatabricksResultSet` — starts heartbeat in constructor, stops on close/next-false - **Modified**: `DatabricksStatement` — safety net stop in close() - **Modified**: `IDatabricksClient` — `checkStatementAlive()` default method - **Modified**: `DatabricksSdkClient` — SEA heartbeat via GET `/sql/statements/{id}` - **Modified**: `DatabricksThriftServiceClient` — Thrift heartbeat via `GetOperationStatus` --------- Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 6da122a commit e1a2c53

22 files changed

Lines changed: 2061 additions & 7 deletions

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ upgrading. These changes do not affect metadata on All-Purpose Clusters.
5353
compatibility issues.
5454

5555
### Added
56+
- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
5657
- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends,
5758
ensuring consistent behavior for SQL warehouses regardless of underlying
5859
protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.

docs/design/HEARTBEAT_KEEP_ALIVE.md

Lines changed: 535 additions & 0 deletions
Large diffs are not rendered by default.

jdbc-core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@
285285
<groupId>org.apache.maven.plugins</groupId>
286286
<artifactId>maven-surefire-plugin</artifactId>
287287
<configuration>
288+
<excludedGroups>e2e</excludedGroups>
288289
<excludes>
289290
<exclude>**/DatabricksDriverExamples.java</exclude>
290291
<exclude>**/integration/**/*.java</exclude>

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DatabricksConnection implements IDatabricksConnection, IDatabricksC
3838
private final Set<IDatabricksStatementInternal> statementSet = ConcurrentHashMap.newKeySet();
3939
private SQLWarning warnings = null;
4040
private final IDatabricksConnectionContext connectionContext;
41+
private final ResultHeartbeatManager heartbeatManager;
4142

4243
/**
4344
* Creates an instance of Databricks connection for given connection context.
@@ -49,6 +50,7 @@ public DatabricksConnection(IDatabricksConnectionContext connectionContext)
4950
this.connectionContext = connectionContext;
5051
DatabricksThreadContextHolder.setConnectionContext(connectionContext);
5152
this.session = new DatabricksSession(connectionContext);
53+
this.heartbeatManager = createHeartbeatManager(connectionContext);
5254
}
5355

5456
@VisibleForTesting
@@ -58,10 +60,27 @@ public DatabricksConnection(
5860
this.connectionContext = connectionContext;
5961
DatabricksThreadContextHolder.setConnectionContext(connectionContext);
6062
this.session = new DatabricksSession(connectionContext, testDatabricksClient);
63+
this.heartbeatManager = createHeartbeatManager(connectionContext);
6164
UserAgentManager.setUserAgent(connectionContext);
6265
TelemetryHelper.updateTelemetryAppName(connectionContext, null);
6366
}
6467

68+
private static ResultHeartbeatManager createHeartbeatManager(
69+
IDatabricksConnectionContext connectionContext) {
70+
// Use interface methods instead of instanceof check so mocks and
71+
// alternate implementations can also enable heartbeat
72+
if (connectionContext.isHeartbeatEnabled()) {
73+
return new ResultHeartbeatManager(
74+
connectionContext.getHeartbeatIntervalSeconds(), connectionContext.getConnectionUuid());
75+
}
76+
return null;
77+
}
78+
79+
/** Returns the heartbeat manager, or null if heartbeat is disabled. */
80+
ResultHeartbeatManager getHeartbeatManager() {
81+
return heartbeatManager;
82+
}
83+
6584
@Override
6685
public void open() throws SQLException {
6786
this.session.open();
@@ -416,6 +435,11 @@ public void rollback() throws SQLException {
416435
@Override
417436
public void close() throws SQLException {
418437
LOGGER.debug("public void close()");
438+
// Shutdown heartbeat FIRST — prevents RPCs on closing connections and
439+
// ensures shutdown runs even if statement.close() throws
440+
if (heartbeatManager != null) {
441+
heartbeatManager.shutdown();
442+
}
419443
for (IDatabricksStatementInternal statement : statementSet) {
420444
statement.close(false);
421445
statementSet.remove(statement);

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,33 @@ public boolean isTelemetryEnabled() {
973973
return getParameter(DatabricksJdbcUrlParams.ENABLE_TELEMETRY).equals("1");
974974
}
975975

976+
public boolean isHeartbeatEnabled() {
977+
return getParameter(DatabricksJdbcUrlParams.ENABLE_HEARTBEAT).equals("1");
978+
}
979+
980+
public int getHeartbeatIntervalSeconds() {
981+
int interval;
982+
try {
983+
interval = Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS));
984+
} catch (NumberFormatException e) {
985+
LOGGER.warn(
986+
"Invalid HeartbeatIntervalSeconds value '{}'. Using default 60.",
987+
getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS));
988+
return 60;
989+
}
990+
if (interval <= 0) {
991+
LOGGER.warn("HeartbeatIntervalSeconds must be positive, got {}. Using default 60.", interval);
992+
return 60;
993+
}
994+
if (interval > 3600) {
995+
LOGGER.warn(
996+
"HeartbeatIntervalSeconds {} is very large (> 1 hour). "
997+
+ "Heartbeat may not keep the operation alive.",
998+
interval);
999+
}
1000+
return interval;
1001+
}
1002+
9761003
@Override
9771004
public String getVolumeOperationAllowedPaths() {
9781005
return getParameter(

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 206 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.databricks.jdbc.common.Nullable;
2222
import com.databricks.jdbc.common.StatementType;
2323
import com.databricks.jdbc.common.util.WarningUtil;
24+
import com.databricks.jdbc.dbclient.IDatabricksClient;
2425
import com.databricks.jdbc.dbclient.impl.common.StatementId;
2526
import com.databricks.jdbc.exception.DatabricksParsingException;
2627
import com.databricks.jdbc.exception.DatabricksSQLException;
@@ -134,6 +135,7 @@ public DatabricksResultSet(
134135
this.maxRowsLimit = resolveMaxRowsLimit(parentStatement);
135136
this.isClosed = false;
136137
this.wasNull = false;
138+
startHeartbeatIfEnabled();
137139
}
138140

139141
@VisibleForTesting
@@ -204,6 +206,7 @@ public DatabricksResultSet(
204206
this.maxRowsLimit = resolveMaxRowsLimit(parentStatement);
205207
this.isClosed = false;
206208
this.wasNull = false;
209+
startHeartbeatIfEnabled();
207210
}
208211

209212
/* Constructing results for getUDTs, getTypeInfo, getProcedures metadata calls */
@@ -305,6 +308,12 @@ public DatabricksResultSet(
305308
@Override
306309
public boolean next() throws SQLException {
307310
checkIfClosed();
311+
if (executionResult == null) {
312+
throw new DatabricksSQLException(
313+
"Cannot iterate: no result data available. "
314+
+ "For async execution, call getExecutionResult() first.",
315+
DatabricksDriverErrorCode.INVALID_STATE);
316+
}
308317
// Client-side maxRows truncation: stop before delegating to the underlying result
309318
// implementation when the limit has been reached. This is skipped during
310319
// getUpdateCount() internal iteration (countingUpdateRows) to avoid breaking DML
@@ -317,15 +326,22 @@ public boolean next() throws SQLException {
317326
rowsReturned);
318327
if (!truncatedByMaxRows) {
319328
truncatedByMaxRows = true;
320-
// Record telemetry for truncated queries so dashboards reflect the truncation
321329
if (cachedTelemetryCollector != null) {
322330
cachedTelemetryCollector.recordResultSetIteration(
323331
statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), false);
324332
}
325333
}
326334
return false;
327335
}
328-
boolean hasNext = this.executionResult.next();
336+
boolean hasNext;
337+
try {
338+
hasNext = this.executionResult.next();
339+
} catch (Exception e) {
340+
// Stop heartbeat on iteration failure — prevents keeping the warehouse alive
341+
// for an abandoned ResultSet (up to 10 ticks × interval before self-stop).
342+
stopHeartbeat();
343+
throw e;
344+
}
329345
// Only count rows for customer iteration, not internal DML counting
330346
// (getUpdateCount() sets countingUpdateRows=true to iterate over affected-row counts
331347
// without inflating the user-visible row counter).
@@ -336,15 +352,21 @@ public boolean next() throws SQLException {
336352
cachedTelemetryCollector.recordResultSetIteration(
337353
statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext);
338354
}
355+
if (!hasNext) {
356+
stopHeartbeat();
357+
}
339358
return hasNext;
340359
}
341360

342361
@Override
343362
public void close() throws DatabricksSQLException {
363+
stopHeartbeat();
344364
// Proactively close server operation when ResultSet is closed explicitly.
345365
closeServerOperation();
346366
isClosed = true;
347-
this.executionResult.close();
367+
if (executionResult != null) {
368+
executionResult.close();
369+
}
348370
if (parentStatement != null) {
349371
parentStatement.handleResultSetClose(this);
350372
}
@@ -357,6 +379,187 @@ private void closeServerOperation() {
357379
}
358380
}
359381

382+
/** Starts heartbeat polling if enabled on the connection and this result set is eligible. */
383+
private void startHeartbeatIfEnabled() {
384+
if (parentStatement == null || statementId == null) {
385+
return;
386+
}
387+
if (!isHeartbeatEligible()) {
388+
return;
389+
}
390+
391+
try {
392+
// Use JDBC unwrap() to handle pooled connection wrappers (HikariCP, DBCP)
393+
java.sql.Connection rawConn = parentStatement.getStatement().getConnection();
394+
DatabricksConnection conn;
395+
if (rawConn instanceof DatabricksConnection) {
396+
conn = (DatabricksConnection) rawConn;
397+
} else if (rawConn.isWrapperFor(DatabricksConnection.class)) {
398+
conn = rawConn.unwrap(DatabricksConnection.class);
399+
} else {
400+
LOGGER.debug("Cannot start heartbeat: connection is not a DatabricksConnection");
401+
return;
402+
}
403+
404+
ResultHeartbeatManager mgr = conn.getHeartbeatManager();
405+
if (mgr == null) {
406+
return; // heartbeat not enabled
407+
}
408+
409+
// Capture only what the lambda needs — avoid capturing 'this' to prevent
410+
// abandoned ResultSets from keeping the warehouse alive via heartbeat.
411+
// Note: capturing 'client' retains a reference to the session/connection. If the
412+
// connection is GC'd without close(), heartbeat RPCs will fail and self-stop after
413+
// maxConsecutiveFailures (10 ticks, ~10 min at 60s interval). Acceptable tradeoff.
414+
final IDatabricksClient client = conn.getSession().getDatabricksClient();
415+
final StatementId capturedStatementId = this.statementId;
416+
final int maxConsecutiveFailures = 10;
417+
final java.util.concurrent.atomic.AtomicInteger consecutiveFailures =
418+
new java.util.concurrent.atomic.AtomicInteger(0);
419+
// Read the stopped flag from the manager on each tick instead of pre-capturing.
420+
// Pre-capturing caused an orphan-flag bug: startHeartbeat() internally calls
421+
// stopHeartbeat() which removes and replaces the flag, leaving the lambda with a
422+
// permanently-true reference. Reading from the manager each tick always gets the
423+
// current flag.
424+
final ResultHeartbeatManager capturedMgr = mgr;
425+
426+
Runnable heartbeatTask =
427+
() -> {
428+
// Read current flag each tick — avoids orphan-flag issue
429+
java.util.concurrent.atomic.AtomicBoolean stopped =
430+
capturedMgr.getStoppedFlag(capturedStatementId);
431+
if (stopped.get()) {
432+
return; // client/session may be closed, skip RPC
433+
}
434+
try {
435+
boolean alive = client.checkStatementAlive(capturedStatementId);
436+
consecutiveFailures.set(0); // reset on success
437+
if (!alive) {
438+
LOGGER.info(
439+
"Heartbeat detected terminal state for statement {}, stopping",
440+
capturedStatementId.toSQLExecStatementId());
441+
capturedMgr.stopHeartbeat(capturedStatementId);
442+
}
443+
} catch (Throwable e) {
444+
if (e instanceof VirtualMachineError) {
445+
capturedMgr.stopHeartbeat(capturedStatementId);
446+
throw (VirtualMachineError) e;
447+
}
448+
if (capturedMgr.getStoppedFlag(capturedStatementId).get()) {
449+
return;
450+
}
451+
if (e instanceof java.sql.SQLFeatureNotSupportedException) {
452+
LOGGER.debug(
453+
"Heartbeat not supported by client for statement {}, stopping",
454+
capturedStatementId.toSQLExecStatementId());
455+
capturedMgr.stopHeartbeat(capturedStatementId);
456+
return;
457+
}
458+
int failures = consecutiveFailures.incrementAndGet();
459+
if (failures == 1) {
460+
LOGGER.info(
461+
"Heartbeat failed for statement {} (first failure): {}",
462+
capturedStatementId.toSQLExecStatementId(),
463+
e.getMessage());
464+
} else {
465+
LOGGER.debug(
466+
"Heartbeat failed for statement {} (failure {}/{}): {}",
467+
capturedStatementId.toSQLExecStatementId(),
468+
failures,
469+
maxConsecutiveFailures,
470+
e.getMessage());
471+
}
472+
if (failures >= maxConsecutiveFailures) {
473+
LOGGER.warn(
474+
"Heartbeat stopped for statement {} after {} consecutive failures. "
475+
+ "Server-side results may expire. Last error: {}",
476+
capturedStatementId.toSQLExecStatementId(),
477+
failures,
478+
e.getMessage());
479+
capturedMgr.stopHeartbeat(capturedStatementId);
480+
}
481+
}
482+
};
483+
484+
mgr.startHeartbeat(capturedStatementId, heartbeatTask);
485+
LOGGER.debug(
486+
"Heartbeat started for statement {} (resultType={}, interval={}s)",
487+
capturedStatementId.toSQLExecStatementId(),
488+
resultSetType,
489+
mgr.getIntervalSeconds());
490+
} catch (Exception e) {
491+
LOGGER.debug("Failed to start heartbeat: {}", e.getMessage());
492+
}
493+
}
494+
495+
/** Stops the heartbeat for this result set's statement. Idempotent. */
496+
private void stopHeartbeat() {
497+
if (parentStatement == null || statementId == null) {
498+
return;
499+
}
500+
try {
501+
// Use same unwrap pattern as startHeartbeatIfEnabled() for pooled connections
502+
java.sql.Connection rawConn = parentStatement.getStatement().getConnection();
503+
DatabricksConnection conn;
504+
if (rawConn instanceof DatabricksConnection) {
505+
conn = (DatabricksConnection) rawConn;
506+
} else if (rawConn.isWrapperFor(DatabricksConnection.class)) {
507+
conn = rawConn.unwrap(DatabricksConnection.class);
508+
} else {
509+
return;
510+
}
511+
ResultHeartbeatManager mgr = conn.getHeartbeatManager();
512+
if (mgr != null) {
513+
mgr.stopHeartbeat(statementId);
514+
}
515+
} catch (Exception e) {
516+
LOGGER.debug("Failed to stop heartbeat: {}", e.getMessage());
517+
}
518+
}
519+
520+
/**
521+
* Determines whether this result set is eligible for heartbeat polling. Package-visible for
522+
* testing.
523+
*
524+
* <p>Heartbeat is NOT needed when:
525+
*
526+
* <ul>
527+
* <li>No execution result (nothing to fetch, also covers async PENDING/RUNNING with no data)
528+
* <li>SEA inline (InlineJsonResult): all rows loaded in memory at construction
529+
* <li>Update count (DML): no result rows to keep alive
530+
* <li>Direct results (CLOSED state): server already closed, data fully delivered
531+
* <li>Async execution (PENDING/RUNNING): user controls polling via getExecutionResult()
532+
* </ul>
533+
*/
534+
boolean isHeartbeatEligible() {
535+
// No execution result — nothing to fetch
536+
if (executionResult == null) {
537+
return false;
538+
}
539+
// SEA inline — all data loaded in memory at construction
540+
if (resultSetType == ResultSetType.SEA_INLINE) {
541+
return false;
542+
}
543+
// Update count — no result rows
544+
if (statementType == StatementType.UPDATE) {
545+
return false;
546+
}
547+
// Check execution state
548+
if (executionStatus != null) {
549+
com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState();
550+
// Direct results — server already closed
551+
if (state == com.databricks.jdbc.api.ExecutionState.CLOSED) {
552+
return false;
553+
}
554+
// Async execution — user controls polling
555+
if (state == com.databricks.jdbc.api.ExecutionState.PENDING
556+
|| state == com.databricks.jdbc.api.ExecutionState.RUNNING) {
557+
return false;
558+
}
559+
}
560+
return true;
561+
}
562+
360563
private static TelemetryCollector resolveTelemetryCollector(
361564
IDatabricksStatementInternal parentStatement) {
362565
try {

0 commit comments

Comments
 (0)