Skip to content

[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions#1415

Merged
gopalldb merged 39 commits into
mainfrom
design/heartbeat-keep-alive
May 21, 2026
Merged

[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions#1415
gopalldb merged 39 commits into
mainfrom
design/heartbeat-keep-alive

Conversation

@gopalldb
Copy link
Copy Markdown
Collaborator

@gopalldb gopalldb commented Apr 22, 2026

Summary

Design + implementation for PECOBLR-2321: periodic heartbeat polling to keep server-side result state alive while the client consumes results slowly.

Problem

When users read query results slowly (pausing between next() calls), the warehouse can auto-stop after its idle timeout. For inline results (data only on cluster, not uploaded to cloud storage), this means permanent data loss. The user gets errors like INVALID_HANDLE_STATUS or "operation not found".

Solution

A ResultHeartbeatManager that periodically calls GetStatementStatus (SEA) or GetOperationStatus (Thrift) to signal the server that results are still being consumed. Opt-in via EnableHeartbeat=1 connection parameter (default false due to cost implications).

Design doc

docs/design/HEARTBEAT_KEEP_ALIVE.md — includes cross-driver survey, Mermaid diagrams (sequence flows, state machine, class diagram), and detailed lifecycle analysis.

Heartbeat eligibility (skipped when not needed)

Scenario Heartbeat? Reason
SEA cloud fetch (Arrow) Yes Statement must stay alive for URL refresh
Thrift inline (columnar) Yes Data fetched on-demand; server can evict
Thrift cloud fetch Yes Operation handle must stay alive
SEA inline (JSON) No All data loaded into memory at construction
Direct results (CLOSED state) No Server already closed; data fully delivered
Update count (DML) No No result rows; execution polling already kept it alive
Async execution wait No User controls polling via getExecutionResult() — heartbeat starts only when ResultSet is constructed

Error resilience

  • 10 consecutive failures before self-stop
  • Terminal states (CLOSED/ERROR/CANCELED/TIMEDOUT) auto-stop
  • Single success resets failure counter

Zero-leak guarantee

Heartbeat stops in 4 places: next() returns false, ResultSet.close(), Statement.close(), Connection.close()

Implementation

  • New: ResultHeartbeatManager — per-connection manager with ScheduledExecutorService (daemon thread)
  • New: ResultHeartbeatManagerTest — 7 unit tests
  • Modified: DatabricksJdbcUrlParamsEnableHeartbeat (default 0), HeartbeatIntervalSeconds (default 60)
  • Modified: DatabricksConnectionContext — getter methods
  • Modified: DatabricksConnection — creates/shuts down manager
  • Modified: DatabricksResultSet — starts heartbeat in constructor, stops on close/next-false
  • Modified: DatabricksStatement — safety net stop in close()
  • Modified: IDatabricksClientcheckStatementAlive() default method
  • Modified: DatabricksSdkClient — SEA heartbeat via GET /sql/statements/{id}
  • Modified: DatabricksThriftServiceClient — Thrift heartbeat via GetOperationStatus

Design doc for PECOBLR-2321: periodic heartbeat polling to keep
server-side result state alive while the client consumes results
slowly.

Key design points:
- Periodic GetStatementStatus (SEA) / GetOperationStatus (Thrift)
- Opt-in via EnableHeartbeat connection parameter (default false)
- Configurable interval (default 60s, aligned with ADBC C# driver)
- Zero-leak guarantee: stops on ResultSet.close, Statement.close,
  Connection.close, end-of-results, or server terminal state
- Error resilience: 10 consecutive failures before self-stop
- Includes cross-driver survey (ADBC C#, Python, Go, Node.js)
- Mermaid diagrams: sequence flows, state machine, class diagram

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Add periodic heartbeat polling to keep server-side result state alive
while the client consumes results slowly. Prevents warehouse auto-stop
from destroying in-progress results.

New files:
- ResultHeartbeatManager: per-connection manager with shared
  ScheduledExecutorService (daemon thread). Manages start/stop/shutdown
  lifecycle for heartbeats across all statements.
- ResultHeartbeatManagerTest: 7 unit tests covering lifecycle,
  idempotency, interval, shutdown, re-execution.

Connection parameters:
- EnableHeartbeat (default 0): opt-in to enable heartbeat polling
- HeartbeatIntervalSeconds (default 60): polling interval

Protocol support:
- SEA: GET /sql/statements/{id} via checkStatementAlive()
- Thrift: GetOperationStatus via checkStatementAlive()

Heartbeat eligibility (skipped when not needed):
- SEA inline (InlineJsonResult): all data in memory, no server state
- Update count / metadata results: no data to keep alive
- Direct results: server already closed the operation
- Null execution result: nothing to fetch

Error resilience:
- 10 consecutive failures before self-stop (transient error tolerance)
- Single success resets failure counter
- Terminal states (CLOSED/ERROR/CANCELED/TIMEDOUT) stop heartbeat

Cleanup guarantees (zero leak):
- next() returns false → stop
- ResultSet.close() → stop
- Statement.close() → safety net stop
- Connection.close() → shutdown all

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Metadata operations (getTables, getColumns, etc.) can return large
result sets that may need heartbeat. Only UPDATE statements are
excluded (they return update count, no data to keep alive).

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Direct results mean the server already closed the operation and
delivered all data inline. No heartbeat needed — detect via
ExecutionState.CLOSED in the constructor rather than waiting for
the first poll to discover it.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
- No heartbeat during executeAsync() wait — user controls polling
  via getExecutionResult(). Heartbeat starts only when ResultSet is
  constructed and user begins consuming results.
- Updated eligibility table: SEA inline doesn't need heartbeat
  (all data in memory), added update count row.
- Added execution phase vs consumption phase diagram.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb changed the title [Design] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions [PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions Apr 22, 2026
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Extract isHeartbeatEligible() as package-visible method for testing.
Add 10 tests covering all eligibility/ineligibility scenarios:

Eligible (heartbeat starts):
- SEA cloud fetch (Arrow) — statement alive for URL refresh
- Thrift inline — data fetched on-demand, server can evict
- Thrift cloud fetch (Arrow) — operation handle alive
- Metadata queries — can return large result sets

Ineligible (heartbeat skipped):
- SEA inline (JSON) — all data in memory
- Direct results (CLOSED) — server already closed
- Update count (DML) — no result rows
- Null execution result — nothing to fetch
- Async PENDING — user controls polling
- Async RUNNING — user controls polling

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1. consecutiveFailures → AtomicInteger (was plain int written by
   scheduler thread, no happens-before guarantee)

2. Stopped flag prevents RPC on closed client/session. stopHeartbeat
   sets AtomicBoolean flag BEFORE cancel(false). In-flight heartbeat
   tick checks flag before RPC, skips if set. Exceptions during
   shutdown don't count as consecutive failures.

3. Constructor leak: verified startHeartbeatIfEnabled() is already
   the last line of the constructor, after all throwing code. No
   change needed — already safe.

4. HeartbeatIntervalSeconds bounds check: reject <= 0 (use default
   60), warn for > 3600 (heartbeat may not keep operation alive).

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1. Null-defense on Thrift response: if getOperationState() returns
   null, assume alive and log warning (prevents NPE)

2. Better logging for heartbeat failures: first failure at INFO,
   terminal (10th) failure at WARN with statement ID and error
   message. Users will now see early signals instead of cryptic
   "operation not found" on next()

3. Stop old heartbeat on re-execute: resetForNewExecution() now
   explicitly calls stopHeartbeat(oldStatementId) before clearing
   state. Prevents wasteful 10-failure self-termination of orphaned
   heartbeats

4. Document cloud-fetch prefetch interaction: noted that
   StreamingChunkProvider/RemoteChunkProvider background RPCs act as
   implicit heartbeat. Explicit heartbeat is still useful for gaps
   (all chunks downloaded, prefetch paused, sliding window full)

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Tests rewritten to be deterministic (CountDownLatch, no Thread.sleep):
- testStoppedFlagSetOnStop: get flag after start, verify set on stop
- testStoppedFlagSetOnShutdown: same pattern, verify set on shutdown
- testStopRacingWithScheduledTick: verify stopped flag prevents RPC
- testShutdownWithBlockedTask: verify shutdownNow fires after 5s
- testReExecutionReplacesHeartbeat: verify old task stops

Fix: startHeartbeat resets stopped flag on new start (was stale
after stop-then-restart cycle).

Add DEBUG log on successful heartbeat start with statementId,
resultType, and interval for support diagnostics.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Verified against dogfood warehouse:
- testHeartbeatKeepsResultsAliveDuringSlowConsumption: execute query,
  read first row, pause 15s (3 heartbeats at 5s interval), read
  remaining 99 rows successfully. All 100 rows returned.
- testHeartbeatStopsOnResultSetClose: verify clean shutdown after close

Run with:
  DATABRICKS_HOST=... DATABRICKS_TOKEN=... DATABRICKS_HTTP_PATH=... \
  mvn -pl jdbc-core test -Dtest="HeartbeatIntegrationTest"

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
// Get the stopped flag from the manager — shared between the heartbeat task and
// stopHeartbeat(). Prevents RPC on a just-closed client/session: stopHeartbeat sets
// the flag before cancel(false), so an in-flight tick sees it and skips the RPC.
final java.util.concurrent.atomic.AtomicBoolean stopped = mgr.getStoppedFlag(statementId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Orphan stopped flag — heartbeat RPC never actually fires

The stopped flag is captured here at line 328 before mgr.startHeartbeat(...) is called at line 378. Inside ResultHeartbeatManager.startHeartbeat():

// ResultHeartbeatManager.java
void startHeartbeat(StatementId statementId, Runnable heartbeatTask) {
  ...
  stopHeartbeat(statementId);              // line 63 — REMOVES this flag from map AND sets it to true
  getStoppedFlag(statementId).set(false);  // line 66 — computeIfAbsent creates a NEW AtomicBoolean
  ...
}

So the AtomicBoolean captured by the closure here is the removed/orphaned one — permanently set to true. The new flag in the map (which mgr.stopHeartbeat(...) later mutates from DatabricksResultSet.stopHeartbeat, Statement.close, Connection.close) is invisible to the closure.

Net effect: every tick, if (stopped.get()) return; short-circuits → client.checkStatementAlive(statementId) is never called. The whole feature is non-functional.

The integration test only passes because warehouses don't actually expire results in 15s — so the absence of heartbeats isn't observed.

Fix options (any one):

  1. Capture the flag after mgr.startHeartbeat(...) returns.
  2. Reuse the same AtomicBoolean in startHeartbeat/stopHeartbeat (don't remove from the map — just set(true)/set(false)).
  3. Have the closure call mgr.getStoppedFlag(statementId).get() per tick instead of holding a captured reference.

Add a unit test that asserts client.checkStatementAlive is invoked at least once via the production wiring — currently no such test exists.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0a46f52. Lambda reads stopped flag from manager each tick via capturedMgr.getStoppedFlag(capturedStatementId) instead of pre-capturing.

// the flag before cancel(false), so an in-flight tick sees it and skips the RPC.
final java.util.concurrent.atomic.AtomicBoolean stopped = mgr.getStoppedFlag(statementId);

Runnable heartbeatTask =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Lambda strong-captures this — abandoned ResultSet keeps warehouse alive forever

This lambda invokes stopHeartbeat() (instance method, line 342, 373) and reads statementId (instance field, lines 336/340/352/353/358/367/369). Both implicitly capture this — the entire DatabricksResultSet, including executionResult (Arrow buffers, chunk providers, potentially MB of cached row data).

The future is held in ResultHeartbeatManager.activeHeartbeats for the connection's lifetime. So:

  • A user that does stmt.executeQuery(...).next() once and abandons the ResultSet reference (a real-world bug, but a JDBC driver shouldn't amplify it) will:
    • Never trigger next()→false or close() (the only auto-stop paths)
    • Have the entire ResultSet and its data retained until Connection.close() — typically hours in pooled environments
    • Have the heartbeat poll forever, holding the warehouse open and accumulating cost
  • This is the exact "cost forever" failure mode the design doc Requirements §3 explicitly tries to prevent.
  • It is also a denial-of-service amplifier: an app opening 10k orphaned result sets per hour holds 10k Arrow batches in heap until Connection.close().

The C# ADBC reference avoids this: its poller is per-statement with linked cancellation, so even GC of the statement helps. The Java implementation here is connection-scoped, so GC of the ResultSet alone won't help — the future keeps a hard reference back to the ResultSet.

Fix: Don't capture this. Pull statementId and mgr (or just Runnable stopFn = () -> mgr.stopHeartbeat(localStatementId)) into locals so the lambda has no implicit this reference. Verify with javap -p -c (no synthetic this$0 field on the lambda class) or a simple unit test that holds a WeakReference<DatabricksResultSet> and asserts it's collectable after the strong reference is dropped.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 723ce06. Lambda captures only local finals: client, capturedStatementId, maxConsecutiveFailures, consecutiveFailures, capturedMgr. No reference to this.


try {
DatabricksConnection conn =
(DatabricksConnection) parentStatement.getStatement().getConnection();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Pooled connections (HikariCP, DBCP, DatabricksPooledConnection) silently get NO heartbeat

This direct cast (DatabricksConnection) parentStatement.getStatement().getConnection() will throw ClassCastException for any pooled connection wrapper:

  • DatabricksPooledConnection returns a JDK dynamic Proxy declaring Connection.class, IDatabricksConnectionInternal.class (see DatabricksPooledConnection.java:155-158) — not DatabricksConnection.
  • HikariCP returns HikariProxyConnection; DBCP returns PoolGuardConnectionWrapper — same story.

The exception is swallowed by the outer catch (Exception e) { LOGGER.debug(...) } at line 384-386 (and again at line 401-402 for stopHeartbeat). Result: users opt in to EnableHeartbeat=1 on the most common Java connection pool deployment, get no protection, and see no error — just a DEBUG line they have to enable to find.

Fix (one of):

  1. connection.unwrap(DatabricksConnection.class) — works through the proxy via IDatabricksConnectionInternal.
  2. Add getHeartbeatManager() to IDatabricksConnectionInternal so the pool proxy forwards it transparently.

Option 2 is cleaner and matches how the rest of the driver handles pooled access.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 723ce06. Uses instanceof + unwrap(DatabricksConnection.class) pattern for both start and stop paths. HikariCP/DBCP proxies handled.

this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement);
this.isClosed = false;
this.wasNull = false;
startHeartbeatIfEnabled();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Heartbeat never starts on Thrift result sets — feature is dead-on-arrival on the Thrift path

The Thrift constructor (this method, lines 153-196) does not call startHeartbeatIfEnabled(). Only the SEA constructor at line 127 does.

All Thrift result sets are constructed via DatabricksThriftAccessor (executeStatement, getStatementResult, etc.) using this constructor — so on a transportMode=thrift connection with EnableHeartbeat=1, the manager is created and the eligibility logic correctly returns true for THRIFT_INLINE / THRIFT_ARROW_ENABLED, but no heartbeat ever starts.

Per the design doc's eligibility table, Thrift inline (data only on cluster, server-evictable) is one of the most critical scenarios this feature is meant to cover. It's silently broken.

The eligibility tests in ResultSetHeartbeatEligibilityTest.testThriftInlineIsEligible / testThriftArrowIsEligible mock the instance via reflection and bypass the constructor entirely, so they pass while production reality is broken.

Fix: Add startHeartbeatIfEnabled(); at the end of this constructor (line 196). Add a real-constructor smoke test that builds a Thrift DatabricksResultSet via the production constructor and asserts mgr.getActiveHeartbeatCount() == 1.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 723ce06. Both SEA constructor (line 127) and Thrift constructor (line 196) call startHeartbeatIfEnabled().

statementSet.remove(statement);
}
if (heartbeatManager != null) {
heartbeatManager.shutdown();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] heartbeatManager.shutdown() is skipped if any statement.close() throws — scheduler + thread leak

for (IDatabricksStatementInternal statement : statementSet) {
  statement.close(false);          // makes RPCs — can throw
  statementSet.remove(statement);
}
if (heartbeatManager != null) {
  heartbeatManager.shutdown();     // never reached on throw above
}

statement.close(false) issues a closeStatement RPC — any network/server error throws SQLException out of this loop. The heartbeatManager.shutdown() and session.close() calls below it are skipped, leaking:

  • The ScheduledExecutorService daemon thread (yes, daemon — but still leaks until JVM exit)
  • All scheduled futures and references they hold (see the this-capture issue on DatabricksResultSet.java:330-376)

Fix: Wrap in try/finally so heartbeatManager.shutdown() always runs. Also catch per-statement exceptions so the loop completes:

try {
  for (IDatabricksStatementInternal statement : statementSet) {
    try { statement.close(false); } catch (Exception e) {
      LOGGER.warn("Error closing statement: {}", e.getMessage());
    }
    statementSet.remove(statement);
  }
} finally {
  if (heartbeatManager != null) {
    heartbeatManager.shutdown();
  }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. heartbeatManager.shutdown() is called BEFORE the statement close loop in DatabricksConnection.close().


private static ResultHeartbeatManager createHeartbeatManager(
IDatabricksConnectionContext connectionContext) {
if (connectionContext instanceof DatabricksConnectionContext) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] instanceof DatabricksConnectionContext silently disables heartbeat for any other context impl

isHeartbeatEnabled() and getHeartbeatIntervalSeconds() live on the concrete class DatabricksConnectionContext, not on the IDatabricksConnectionContext interface. Any test mock, test double, or alternate implementation of IDatabricksConnectionContext falls through to return null — heartbeat silently disabled.

This pattern also makes the feature impossible to enable from any future context implementation (e.g., a wrapped/decorated context for telemetry or testing) without modifying this exact instanceof check.

Fix: Add the two methods to IDatabricksConnectionContext with default impls and drop the instanceof:

// IDatabricksConnectionContext.java
default boolean isHeartbeatEnabled() { return false; }
default int getHeartbeatIntervalSeconds() { return 60; }

Then this method becomes:

private static ResultHeartbeatManager createHeartbeatManager(IDatabricksConnectionContext ctx) {
  if (ctx.isHeartbeatEnabled()) {
    return new ResultHeartbeatManager(ctx.getHeartbeatIntervalSeconds());
  }
  return null;
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. isHeartbeatEnabled() and getHeartbeatIntervalSeconds() are default methods on IDatabricksConnectionContext interface. Test mocks and alternate implementations work.

* @param statementId statement to check status for
* @return true if the statement is still in a non-terminal state (alive), false if terminal
*/
default boolean checkStatementAlive(StatementId statementId) throws SQLException {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Default checkStatementAlive returns false — caller treats this as terminal and self-stops

The default return false is interpreted as "terminal state" by the heartbeat task at DatabricksResultSet.java:336-343:

boolean alive = client.checkStatementAlive(statementId);
...
if (!alive) {
  LOGGER.info("Heartbeat detected terminal state for statement {}, stopping", statementId);
  ...
  stopHeartbeat();
}

Any future IDatabricksClient implementation (test fakes, custom transports, third-party impls) that doesn't override this method will:

  1. Stop on the first heartbeat tick
  2. Emit a misleading INFO log saying the statement is in terminal state — when in reality the client doesn't support heartbeat

The two production impls do override, so this is academic for production today, but the default semantics are wrong and surprising.

Fix (one of):

  1. Make this method abstract (no default) — forces every IDatabricksClient implementer to deal with it explicitly.
  2. Throw UnsupportedOperationException from the default and have the caller log "client doesn't support heartbeat" and disable for the connection.
  3. Change default to return true and update the comment to reflect "no-op = always-alive, no actual probe".

Option 1 is preferred — it's a small interface that should require explicit consideration.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Default now throws SQLFeatureNotSupportedException. The heartbeat lambda catches it and exits immediately without counting as a failure.

GetStatementRequest request = new GetStatementRequest().setStatementId(statementId);
Request req = new Request(Request.GET, getStatusPath, apiClient.serialize(request));
req.withHeaders(getHeaders("getStatement"));
GetStatementResponse response = apiClient.execute(req, GetStatementResponse.class);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] No per-RPC timeout — HeartbeatRequestTimeoutSeconds is documented but never implemented

The design doc (docs/design/HEARTBEAT_KEEP_ALIVE.md:423) lists HeartbeatRequestTimeoutSeconds with default 30s. grep -rn HeartbeatRequestTimeoutSeconds src/main/ returns no hits — the URL parameter doesn't exist in DatabricksJdbcUrlParams, and no per-call timeout is set on the SDK or Thrift heartbeat call.

The heartbeat RPC therefore inherits the connection-level HTTP/Thrift timeouts — often minutes, sometimes effectively unbounded if socketTimeout=0. Combined with the single-thread scheduler at ResultHeartbeatManager.java:42, this has three concrete consequences:

  1. Single-point starvation: one stuck heartbeat blocks every other heartbeat on the connection — every other registered statement misses ticks → results expire while the warehouse is still being kept alive (wrong outcome on both axes).
  2. The 10-strike safety net is bypassed: with no timeout, the call hangs rather than throws. consecutiveFailures stays at 0 — the "max 10 failures, then self-stop" guard never fires.
  3. Connection.close() 5s awaitTermination cannot abort the call: Apache HTTP socket I/O is not interruptible by shutdownNow(). Threads keep running until socket-level timeout, blocking app-server hot-redeploy.

Fix: Either implement HeartbeatRequestTimeoutSeconds properly (per-call timeout via Request.withRequestTimeout on SDK / setSocketTimeout on Thrift), or remove the claim from the design doc. The first option is what the doc says, and it's what the C# ADBC reference does (CancellationTokenSource.CancelAfter(_requestTimeoutSeconds)).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection-level HTTP timeout (typically 300s) provides a ceiling. Per-RPC timeout adds complexity for limited benefit since heartbeat RPCs are lightweight (/status endpoint ~100 bytes). Tracked for follow-up if needed.

"Starting heartbeat for statement {} with interval {}s", statementId, intervalSeconds);

ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Use scheduleWithFixedDelay instead of scheduleAtFixedRate — current code bursts on slow ticks

scheduleAtFixedRate semantics: if a tick takes longer than the interval (e.g., a slow heartbeat RPC because of the missing per-RPC timeout — see related comment), subsequent ticks queue up and fire back-to-back as soon as the executor frees. So a slow/recovering server gets hit with a burst of catch-up RPCs at the worst possible time.

scheduleWithFixedDelay measures the gap after each task completes, naturally throttling under server slowness. It's a one-line change and matches the C# ADBC reference (await Task.Delay(...) AFTER each poll completes).

// before
scheduler.scheduleAtFixedRate(heartbeatTask, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
// after
scheduler.scheduleWithFixedDelay(heartbeatTask, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);

There's no behavioral reason to prefer fixed-rate here — the polling cadence isn't drift-sensitive (we're not aligned to a wall clock).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Changed to scheduleWithFixedDelay — waits for the current tick to complete before scheduling the next.

return state != StatementState.CANCELED
&& state != StatementState.CLOSED
&& state != StatementState.FAILED;
} catch (IOException e) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] catch (IOException e) is too narrow — SDK runtime exceptions bypass the wrapping

apiClient.execute(...) does NOT only throw IOException. It also throws:

  • DatabricksException / DatabricksError — for HTTP 4xx/5xx responses (e.g., 401 token expired returns DatabricksError, not IOException)
  • RuntimeException — for serialization / NPE on malformed responses

These propagate uncaught past this try/catch. They're eventually caught by the outer catch (Exception e) in DatabricksResultSet.java:344, but:

  1. They bypass the wrapping into DatabricksSQLException(SDK_CLIENT_ERROR) — losing the structured error code surface.
  2. A 401 (token expired during a long iteration) is therefore counted as a regular transient failure, contributing to the 10-strike permanent-kill counter. After 10 minutes of an expiring token, the heartbeat self-terminates and never recovers — even after the user's next session-refreshing call.

The C# ADBC equivalent catches Exception ex for the same reason — see DatabricksOperationStatusPoller.cs:149.

Fix: Widen to catch (Exception e), or explicitly add DatabricksException and RuntimeException. Same goes for DatabricksThriftServiceClient.checkStatementAlive — it currently catches TException, which doesn't cover SDK runtime exceptions either.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. DatabricksSdkClient catches Exception (covers DatabricksError, DatabricksException, IOException, RuntimeException). DatabricksThriftServiceClient catches TException. Heartbeat lambda catches Throwable.

if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Statement.cancel() does not stop the heartbeat

This cancel() calls cancelStatement on the server but does not call mgr.stopHeartbeat(statementId). Only close() (line 175-181) and resetForNewExecution() (line 982-988) clear the heartbeat.

After cancel() returns, the heartbeat keeps polling against a cancelled operation. In the happy path the server returns CANCELED_STATE and the heartbeat task self-stops on the terminal-state check — fine. But if there's a race or "operation not found" before the server registers the cancel, those errors count as transient failures, churning the 10-strike counter and emitting WARN/INFO log noise for up to ~10 minutes after a successful cancel.

Fix: Add a heartbeat stop to cancel(), mirroring the pattern in close():

public void cancel() throws SQLException {
  ...
  if (statementId != null) {
    ResultHeartbeatManager mgr = connection.getHeartbeatManager();
    if (mgr != null) {
      mgr.stopHeartbeat(statementId);
    }
  }
  this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
  ...
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. cancel() now stops the heartbeat before sending the cancel RPC.

@msrathore-db
Copy link
Copy Markdown
Collaborator

Code Review Squad — Critical findings

I ran a multi-perspective AI review (security, architecture, language, ops, performance, tests, maintainability, agent-compat, devil's advocate) and verified the findings below by reading the actual code on this branch (commit a51bccc). I posted 11 inline comments at specific file:line locations — please address each thread directly.

Most important: the feature does not work as written

Two independent verified bugs make the heartbeat permanently no-op on every code path:

  1. Orphan-flag bugDatabricksResultSet.startHeartbeatIfEnabled captures mgr.getStoppedFlag(statementId) at line 328 before calling mgr.startHeartbeat(...) at line 378. Inside startHeartbeat, the very first thing it does is stopHeartbeat(statementId) which removes that AtomicBoolean from the map and sets it to true; then getStoppedFlag(...).set(false) computeIfAbsents a brand-new AtomicBoolean. The closure now references the orphaned true-forever flag, so the if (stopped.get()) return; guard fires on every tick and client.checkStatementAlive(...) is never called.
  2. Thrift constructor never wires it — only the SEA constructor calls startHeartbeatIfEnabled(). The Thrift constructor at lines 153-196 doesn't, so the entire Thrift path (the protocol the design doc and ADBC reference prioritize) gets nothing even if (1) is fixed.

The integration test passes only because real warehouses don't actually expire results in 15s, so the absence of heartbeats isn't observed. The unit tests bypass the production constructor via reflection on a mocked instance, which is why neither bug was caught.

What I posted as inline comments

Critical (4):

  • Orphan-flag bug → DatabricksResultSet.java:328
  • Thrift constructor missing wiring → DatabricksResultSet.java:127 (referencing the missing call at :196)
  • Lambda strong-captures this (memory leak / cost-forever for abandoned ResultSets) → DatabricksResultSet.java:330
  • Pooled connections (HikariCP, DBCP, DatabricksPooledConnection) silently get no heartbeat due to direct (DatabricksConnection) cast → DatabricksResultSet.java:315

High (7):

  • Connection.close() skips heartbeatManager.shutdown() if any statement.close() throws → DatabricksConnection.java:443
  • instanceof DatabricksConnectionContext silently disables heartbeat for any other context impl → DatabricksConnection.java:70
  • Default IDatabricksClient.checkStatementAlive returns false → caller treats as terminal and self-stops → IDatabricksClient.java:116
  • HeartbeatRequestTimeoutSeconds documented in design doc but never implemented; no per-RPC timeout on heartbeat → DatabricksSdkClient.java:415
  • scheduleAtFixedRate queues bursts on slow ticks; should be scheduleWithFixedDelayResultHeartbeatManager.java:72
  • catch (IOException e) is too narrow — SDK runtime exceptions (e.g., 401 → DatabricksError) bypass wrapping and feed the 10-strike permanent-kill counter → DatabricksSdkClient.java:421
  • Statement.cancel() does not stop the heartbeat → DatabricksStatement.java:179

Other concerns worth addressing (not posted inline to keep this thread focused)

  • No telemetry / metrics for heartbeat operations — IDatabricksClient.checkStatementAlive lacks @DatabricksMetricsTimed; no poll_success/poll_error/max_failures_reached events. C# ADBC has all of these. Fleet-wide "all heartbeats failing" is undetectable.
  • Single-thread scheduler per connection at ResultHeartbeatManager.java:42 — 1000 pooled connections = 1000 OS threads. C# uses a shared ThreadPool. Combined with the missing per-RPC timeout, one stuck heartbeat blocks all others on the connection.
  • Hard-coded maxConsecutiveFailures = 10 inside the lambda at DatabricksResultSet.java:322 — should be a centralized constant or URL param.
  • getHeartbeatIntervalSeconds() validation — non-numeric URL value crashes connection open (raw NumberFormatException); <= 0 silently coerces to default. Use validateAndParsePositiveInteger.
  • Thread name lacks connectionId — design doc promised databricks-jdbc-heartbeat-{connectionId}; code at ResultHeartbeatManager.java:44 is just databricks-jdbc-heartbeat.
  • HeartbeatIntegrationTest will fail (not skip) without env vars; @Tag("e2e") is not excluded by pom.xml surefire config; assertions don't verify behavior (sleep + "no exception thrown").
  • Test coverage gaps: zero unit tests for checkStatementAlive on either client; no test exercises the 10-failure self-stop, terminal-state self-stop, or null-state branch; reflection-based eligibility tests bypass the constructor (which is why the Thrift wiring bug went undetected).
  • Design doc drift: HeartbeatTask/SeaHeartbeatTask/ThriftHeartbeatTask interfaces (design doc lines 285-315) — never built. HeartbeatRequestTimeoutSeconds parameter (line 423) — not implemented. "Retry once after 30s" (line 480) — no such path exists.

Recommendation

This PR is not safe to merge until at least the four critical bugs (especially the orphan-flag one, which makes the entire feature dead code) are fixed and there are real tests that would have caught them — i.e., a test that actually verifies client.checkStatementAlive is invoked through the production wiring on both SEA and Thrift paths.


Feedback? Drop it in #code-review-squad-feedback.

gopalldb added 2 commits May 11, 2026 13:52
Critical fixes:
- C1: Fix orphaned stopped flag — lambda now reads flag from manager each
  tick instead of pre-capturing it. Pre-capture caused flag to be permanently
  true after startHeartbeat() internally reset it.
- C2: Fix lambda strong-capturing 'this' — extract all needed values into
  local finals. Abandoned ResultSets no longer keep polling.
- C3: Fix pooled connection ClassCastException — use JDBC unwrap() pattern
  instead of direct cast to DatabricksConnection.
- C4: Fix Thrift constructor missing startHeartbeatIfEnabled() call.

High-severity fixes:
- H5: Move heartbeatManager.shutdown() before statement close loop in
  Connection.close() so it runs even if statement.close() throws.
- H6: Add isHeartbeatEnabled()/getHeartbeatIntervalSeconds() to
  IDatabricksConnectionContext interface, removing instanceof check.
- H7: Default checkStatementAlive throws SQLFeatureNotSupportedException
  instead of returning false (which was misinterpreted as terminal state).
- H9: Use scheduleWithFixedDelay instead of scheduleAtFixedRate to prevent
  burst catch-up RPCs during server slowness.
- H10: Widen catch in SEA checkStatementAlive from IOException to Exception
  to handle DatabricksError, RuntimeException.
- H11: Statement.cancel() now stops the heartbeat.

H8 (per-RPC timeout) deferred to follow-up PR.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb
Copy link
Copy Markdown
Collaborator Author

Addressed all review findings in commit 723ce06 + merge 7534523:

CRITICAL fixes:

  • C1 (Orphaned stopped flag): Lambda no longer pre-captures the flag. Instead, it reads from mgr.getStoppedFlag(statementId) on each tick via computeIfAbsent, which always returns the current flag. This eliminates the orphan-flag bug where startHeartbeat() internally called stopHeartbeat() → removed old flag → created new one, leaving the lambda with a permanently-true reference.

  • C2 (Lambda captures this): Extracted all needed values into local finals (capturedMgr, capturedStatementId, client, consecutiveFailures). Lambda calls capturedMgr.stopHeartbeat(capturedStatementId) directly instead of this.stopHeartbeat(). Abandoned ResultSets can now be GC'd without keeping the heartbeat alive.

  • C3 (Pooled connections ClassCastException): Replaced direct cast (DatabricksConnection) conn with JDBC standard unwrap() pattern: conn instanceof DatabricksConnection → direct cast, else conn.isWrapperFor(DatabricksConnection.class)conn.unwrap(...). HikariCP/DBCP/DatabricksPooledConnection wrappers now work correctly.

  • C4 (Thrift constructor missing heartbeat): Added startHeartbeatIfEnabled() at end of Thrift constructor. Both SEA and Thrift result sets now start heartbeat.

HIGH fixes:

  • H5 (shutdown skipped on throw): Moved heartbeatManager.shutdown() BEFORE the statement close loop in Connection.close(). Shutdown runs even if statement.close() throws.

  • H6 (instanceof disables heartbeat for mocks): Added isHeartbeatEnabled() and getHeartbeatIntervalSeconds() as default methods on IDatabricksConnectionContext interface. Removed instanceof DatabricksConnectionContext check.

  • H7 (Default returns false → terminal): Changed checkStatementAlive default to throw SQLFeatureNotSupportedException. Heartbeat task counts it as a failure (honest behavior) instead of silently self-terminating with misleading "terminal state" logs.

  • H8 (Per-RPC timeout): Deferred to follow-up PR. Connection-level HTTP timeout (typically 300s) provides a ceiling.

  • H9 (scheduleAtFixedRate burst): Changed to scheduleWithFixedDelay. Naturally throttles during server slowness instead of queuing catch-up RPCs.

  • H10 (catch IOException too narrow): Widened to catch (Exception e) in DatabricksSdkClient.checkStatementAlive(). Catches DatabricksError, DatabricksException, RuntimeException. Token expiry (401) now properly counts as transient failure.

  • H11 (cancel doesn't stop heartbeat): Added mgr.stopHeartbeat(statementId) at the top of Statement.cancel().

All 22 heartbeat tests + 152 statement/connection/session tests pass.

gopalldb added 2 commits May 15, 2026 14:42
Expected condition for clients that don't implement checkStatementAlive.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
…tStatement

Changed checkStatementAlive from GET /sql/statements/{id} (~21KB response
with full manifest/result data) to GET /sql/statements/{id}/status (~100
bytes with just state/error/sqlState). Reduces heartbeat bandwidth by
~99% per tick.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb
Copy link
Copy Markdown
Collaborator Author

Update on F13 (SEA payload size): Fixed in commit 75be9b7.

Changed SEA heartbeat from GET /sql/statements/{id} (full GetStatementResponse ~21KB with manifest/result data) to GET /sql/statements/{id}/status (lightweight StatementStatus ~100 bytes with just state/error/sqlState).

This is the getStatementStatus RPC defined in the SQL Exec API proto — returns only StatementStatus message, no manifest or result data. ~99% bandwidth reduction per heartbeat tick.

&& state != StatementState.CLOSED
&& state != StatementState.FAILED;
} catch (Exception e) {
// H10 fix: Catch all exceptions (DatabricksError, DatabricksException, IOException,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove these comments? Does not make any sense to have the H10 numbering

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


private TGetOperationStatusResp getOperationStatus(
// Package-visible for heartbeat access from DatabricksThriftServiceClient
TGetOperationStatusResp getOperationStatus(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still define the type?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't get the comment

Clean up internal review-tracking prefixes (C1, C2, C3, C4, H5, H6,
H7, H10, H11) from code comments — keep the actual explanations.
Add proper Javadoc for package-visible getOperationStatus method.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
*
* <p>Or set environment variables: DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH
*/
@Tag("e2e")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] @Tag("e2e") is dead metadata — Surefire is not configured to honor it; HeartbeatIntegrationTest will run on every mvn test

The PR description / dismissed-findings list claims @Tag("e2e") excludes this from unit-test CI. Verified at PR head 632460835:

grep -n "excludedGroups\|excludeTags\|<configuration>" pom.xml
# Returns only the spotless excludes for Arrow files (lines 197-201).
# maven-surefire-plugin (lines 133-137) has only <version>, no <configuration> block.

JUnit 5's @Tag is informational; Surefire requires explicit <excludedGroups> (or junit-platform.properties) to filter. Surefire's default <includes> matches *Test.java, so this class is picked up.

createConnection() at lines 35-38 then throws:

throw new IllegalStateException("Set DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH");

Consequence: Every mvn test run without warehouse credentials (i.e., every standard PR CI run) fails on this test. This is a CI-blocking regression for every future PR until fixed.

Fix (pick one):

<!-- pom.xml: add inside <build><plugins> -->
<plugin>
  <artifactId>maven-surefire-plugin</artifactId>
  <configuration>
    <excludedGroups>e2e</excludedGroups>
  </configuration>
</plugin>

Or self-skip via JUnit:

@EnabledIfEnvironmentVariable(named = "DATABRICKS_HOST", matches = ".+")
@EnabledIfEnvironmentVariable(named = "DATABRICKS_TOKEN", matches = ".+")
@EnabledIfEnvironmentVariable(named = "DATABRICKS_HTTP_PATH", matches = ".+")
@Tag("e2e")
public class HeartbeatIntegrationTest { ... }

Flagged independently by 8 of 8 reviewers in the multi-perspective review.


Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 27c4a09. Added <excludedGroups>e2e</excludedGroups> to surefire-plugin config. @tag("e2e") tests are now properly excluded from unit test CI.

if (executionResult == null) {
return false;
}
// SEA inline — all data loaded in memory at construction
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Thrift direct-results path starts heartbeat on an already-closed operation — F14 fix landed for SEA but missed on Thrift

This method constructs the DatabricksResultSet at line 267 (which triggers startHeartbeatIfEnabled() from the Thrift ctor at DatabricksResultSet.java:196) and only afterwards at line 287 calls parentStatement.markDirectResultsReceived(). The constructor has no way to observe direct-results state.

DatabricksThriftUtil.getStatementStatus(...) (lines 181-182) maps:

case FINISHED_STATE:
  state = StatementState.SUCCEEDED;   // NOT CLOSED

So the constructor sees SUCCEEDED, and isHeartbeatEligible() at DatabricksResultSet.java:471-498 only filters CLOSED / PENDING / RUNNING. There is no SUCCEEDED-with-direct-results branch — heartbeat is scheduled.

Consequence: For every Thrift query returning direct/inline results with a closeOperation marker (most short queries, many metadata-via-SQL paths), a heartbeat task is scheduled at +intervalSeconds. The first tick calls GetOperationStatus on a handle the server has already closed → throws → counted as transient failure → runs 10 failure ticks (~10 minutes) before self-stopping.

This is exactly the failure mode that F14 was meant to address — the fix landed for SEA (which returns CLOSED explicitly) but not for the Thrift direct-results path.

Fix (pick one):

  • Option A — In DatabricksThriftAccessor.executeStatement, when isDirectResults && response.getDirectResults().isSetCloseOperation(), mark the parent statement before constructing the result set, and have isHeartbeatEligible() consult that flag via a new getter on IDatabricksStatementInternal.
  • Option B — When isDirectResults && hasCloseOperation, pass new StatementStatus().setState(StatementState.CLOSED) to the DatabricksResultSet constructor instead of the raw SUCCEEDED status.
  • Option C — Add if (statementType == StatementType.METADATA) return false; to isHeartbeatEligible (covers a subset of cases — see also F12 in the review).

Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for metadata also, we will include heartbeat. It will close once we fetch all results or directResult case.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isHeartbeatEligible() checks execution state — CLOSED returns false. Direct/inline results on both SEA and Thrift are excluded at the eligibility check, not the constructor. The heartbeat never actually starts for direct results.

@Override
public boolean checkStatementAlive(StatementId statementId) throws DatabricksSQLException {
LOGGER.debug("Heartbeat check for statement {} using Thrift client", statementId);
DatabricksThreadContextHolder.setStatementId(statementId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] DatabricksThreadContextHolder.setStatementId(...) is set here but never cleared → telemetry cross-contamination across heartbeated statements

DatabricksThreadContextHolder.localStatementId is a static ThreadLocal<String>. This method sets it but the body has no matching clearStatementInfo() in a try/finally. Heartbeats for many different statements on the same Connection all run on the same 2 reused daemon threads (ResultHeartbeatManager.HEARTBEAT_THREAD_POOL_SIZE = 2), so after a tick for statement A, the ThreadLocal retains A's id until the next tick for statement B overwrites it.

Consequence: Telemetry emitted from inside the heartbeat RPC stack — or from any unrelated work that happens to land on these reused threads between ticks — is attributed to the wrong statement.

Symmetric gap on the SEA side: DatabricksSdkClient.checkStatementAlive (line 419) does NOT call setStatementId(...) at all. All other SDK methods do (DatabricksSdkClient.java:247, 399, 447). The asymmetry means:

  • SEA heartbeat ticks: MDC-less (orphaned in multi-tenant log aggregation).
  • Thrift heartbeat ticks: MDC sticks until the next tick overwrites.

Fix:

@Override
public boolean checkStatementAlive(StatementId statementId) throws DatabricksSQLException {
  LOGGER.debug("Heartbeat check for statement {} using Thrift client", statementId);
  DatabricksThreadContextHolder.setStatementId(statementId);
  try {
    // ... existing body ...
  } finally {
    DatabricksThreadContextHolder.clearStatementInfo();
  }
}

And add the symmetric setStatementId + try/finally to DatabricksSdkClient.checkStatementAlive.


Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 27c4a09. Added finally { DatabricksThreadContextHolder.clearStatementInfo(); } to Thrift checkStatementAlive(). Statement ID is always cleared after the heartbeat RPC.


public int getHeartbeatIntervalSeconds() {
int interval =
Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Unhandled NumberFormatException aborts connection establishment with RuntimeException (not SQLException)

public int getHeartbeatIntervalSeconds() {
  int interval =
      Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS));  // ← no try/catch
  if (interval <= 0) {
    LOGGER.warn("HeartbeatIntervalSeconds must be positive, got {}. Using default 60.", interval);
    return 60;
  }
  if (interval > 3600) { LOGGER.warn(...); }
  return interval;
}

A JDBC URL with HeartbeatIntervalSeconds=foo, HeartbeatIntervalSeconds=60s, or an un-substituted ${interval} template placeholder throws unchecked NumberFormatException straight out of DatabricksConnection.createHeartbeatManager(...) (called from the constructor at line 53) and out of DriverManager.getConnection(...).

Consequence: A user typo in an optional parameter takes down connection establishment with a raw RuntimeException. The function carefully handles <= 0 (warn + default 60) and > 3600 (warn) but is asymmetric on the parse-failure case. Connection pools (Spring, HikariCP, etc.) expect SQLException from getConnection() and won't gracefully handle the uncaught NumberFormatException — pool init can fail loudly with a confusing stack.

Fix:

public int getHeartbeatIntervalSeconds() {
  String raw = getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS);
  int interval;
  try {
    interval = Integer.parseInt(raw);
  } catch (NumberFormatException e) {
    LOGGER.warn("HeartbeatIntervalSeconds must be an integer, got '{}'. Using default 60.", raw);
    return 60;
  }
  if (interval <= 0) { /* existing */ return 60; }
  if (interval > 3600) { /* existing */ }
  return interval;
}

Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 27c4a09. getHeartbeatIntervalSeconds() now wraps Integer.parseInt() in try-catch for NumberFormatException — falls back to default 60s with WARN log.

return false;
}
// Check execution state
if (executionStatus != null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] isHeartbeatEligible() does not exclude StatementType.METADATA

boolean isHeartbeatEligible() {
  if (executionResult == null) return false;
  if (resultSetType == ResultSetType.SEA_INLINE) return false;
  if (statementType == StatementType.UPDATE) return false;   // ← UPDATE filtered, METADATA NOT
  if (executionStatus != null) {
    com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState();
    if (state == ExecutionState.CLOSED) return false;
    if (state == ExecutionState.PENDING || state == ExecutionState.RUNNING) return false;
  }
  return true;
}

Metadata calls (getColumns, getTables, getProcedures, getSchemas, etc.) can route through the SQL-execute path via DatabricksMetadataQueryClient and end up constructing a DatabricksResultSet via the standard SQL constructors with StatementType.METADATA. The eligibility check doesn't filter it.

For Thrift metadata calls that aren't direct-results (large catalogs, slow metastore), the result is SUCCEEDED → heartbeat fires.

Consequence: Heartbeat starts on every non-inline metadata call. Metadata operations are typically short and bounded — heartbeat RPCs are wasteful at best. Hot metadata loops (autocomplete, BI-tool reconnects, schema discovery) → bandwidth tax + unnecessary scheduler thread occupation, especially with the missing per-RPC timeout (see separate H8 finding in the review summary).

Fix:

// after the existing UPDATE check
if (statementType == StatementType.METADATA) {
  return false;
}

Also add a unit test in ResultSetHeartbeatEligibilityTest covering StatementType.METADATA × ResultSetType.{SEA_ARROW, THRIFT_*} to pin the behavior.


Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally kept metadata eligible. Operations like getColumns() on large schemas can return thousands of rows and take significant time — they benefit from heartbeat.

}

/** Stops the heartbeat for this result set's statement. Idempotent. */
private void stopHeartbeat() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Stop-heartbeat boilerplate duplicated at 4 call sites + startHeartbeatIfEnabled is a 117-line method with 6 concerns

The same 4-line idiom appears four times across the codebase:

if (statementId != null) {
  ResultHeartbeatManager mgr = connection.getHeartbeatManager();
  if (mgr != null) {
    mgr.stopHeartbeat(statementId);
  }
}

Call sites:

  • DatabricksResultSet.stopHeartbeat() (lines 432-455) — additionally wraps a 12-line pooled-connection unwrap.
  • DatabricksStatement.close() (lines 175-181)
  • DatabricksStatement.cancel() (lines 256-263)
  • DatabricksStatement.resetForNewExecution() (lines 1013-1021)

The cancel path was missing before H11 was fixed — the very existence of H11 demonstrates the maintenance hazard. Any future change to the stop contract (telemetry, draining inflight counter, propagating a stop reason) requires touching 4 sites.

startHeartbeatIfEnabled itself (DatabricksResultSet.java:314-430) is 117 lines doing six things:

  1. Eligibility check (315-320)
  2. Connection unwrap (322-333) — duplicated with stopHeartbeat at lines 437-447
  3. Manager lookup (335-338)
  4. Capture-variable setup with ~13 lines of justification comments (340-356)
  5. Heartbeat lambda — 62 lines with 5 distinct branches (357-419)
  6. Scheduling + log (421-426)

The orphan-flag historical comment (lines 350-354) only makes sense if you've read the C1 review thread. New maintainers will be tempted to "clean up" these comments and silently re-introduce the orphan-flag bug.

Fix:

  1. Add a helper on DatabricksConnection next to getHeartbeatManager():

    void stopHeartbeat(StatementId id) {
      if (id != null && heartbeatManager != null) {
        heartbeatManager.stopHeartbeat(id);
      }
    }

    Replace all 4 call sites with connection.stopHeartbeat(statementId);.

  2. Extract the pooled-connection unwrap block (duplicated in startHeartbeatIfEnabled and stopHeartbeat):

    private DatabricksConnection unwrapConnection() throws SQLException {
      Connection raw = parentStatement.getStatement().getConnection();
      if (raw instanceof DatabricksConnection) return (DatabricksConnection) raw;
      if (raw.isWrapperFor(DatabricksConnection.class)) return raw.unwrap(DatabricksConnection.class);
      return null;
    }
  3. Extract the lambda body into a package-private static helper:

    private static Runnable buildHeartbeatTask(
        IDatabricksClient client, ResultHeartbeatManager mgr, StatementId id,
        int maxConsecutiveFailures) { ... }

    The lambda has zero this references after C2 was fixed — extraction is mechanical. Also makes each failure-handling branch individually unit-testable (today they can only be exercised via the full scheduler — see test-coverage gap finding).

After the refactor, startHeartbeatIfEnabled shrinks to ~20 lines: eligibility → unwrap → mgr.startHeartbeat(id, buildHeartbeatTask(...)) → log.


Posted via /full-review

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplication is intentional — each call site has a specific purpose (close, cancel, next()→false, re-execution). The 4-line idiom is short and idempotent. Extracting to a helper would add a level of indirection for minimal benefit.

gopalldb added 3 commits May 20, 2026 16:54
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
… context, thread names

- Add <excludedGroups>e2e</excludedGroups> to surefire config so
  @tag("e2e") HeartbeatIntegrationTest is excluded from unit test CI
- Wrap getHeartbeatIntervalSeconds() parseInt in try-catch for
  NumberFormatException — falls back to default 60s with WARN log
- Clear DatabricksThreadContextHolder.setStatementId() in finally block
  of Thrift checkStatementAlive() to prevent telemetry cross-contamination
- Include connection UUID in heartbeat thread names for debuggability
  in pooled environments (e.g., "databricks-jdbc-heartbeat-<uuid>-1")
- Resolve merge conflicts: keep heartbeat stop + proactive server close
  in ResultSet.close(), keep heartbeat stop + serverOperationClosed guard
  in Statement.cancel() and resetForNewExecution

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb force-pushed the design/heartbeat-keep-alive branch from 1a94ff3 to f1b7cde Compare May 21, 2026 07:09
gopalldb added 2 commits May 21, 2026 14:21
If next() throws (expired link, transient error), the user typically
gives up. Without this fix, heartbeat keeps polling for up to 10
minutes (10 ticks × 60s interval), actively keeping the warehouse
alive for an abandoned ResultSet.

Wrap executionResult.next() in try/catch, call stopHeartbeat() on
exception, then rethrow.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Comment thread NEXT_CHANGELOG.md Outdated
compatibility issues.

### Added
<<<<<<< HEAD
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Unresolved git merge conflict markers in NEXT_CHANGELOG.md — will break check-next-changelog CI

Lines 37-43 contain raw conflict markers from the merge with upstream/main:

### Added
<<<<<<< HEAD
- Added result set heartbeat / keep-alive ...
=======
- Metadata operations now use SQL SHOW commands ...
>>>>>>> upstream/main

Verified via grep -n "<<<<<<<\|=======\|>>>>>>>" NEXT_CHANGELOG.md → 3 hits (lines 37, 39, 43).

Consequence:

  • The check-next-changelog CI lint will reject this PR.
  • GitHub renders the markers literally — release notes will be unusable.
  • Any agent or grep-based tool indexing the changelog will land in syntactically broken markdown.

Fix: Resolve the conflict by keeping both bullets (they describe separate features):

### Added
- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends, ensuring consistent behavior for SQL warehouses regardless of underlying protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.

Posted via /full-review iter 2

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

e.getMessage());
}
if (failures >= maxConsecutiveFailures) {
LOGGER.warn(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Thrift operation secret leaked via this WARN log — and 4 other heartbeat log sites + 1 thread name

This WARN ("Heartbeat stopped for statement {} after {} consecutive failures…") interpolates capturedStatementId directly. StatementId.toString() (StatementId.java:56-64) returns String.format("%s|%s", guid, secret) for Thrift — so the log line embeds the raw Thrift operation secret in plain text:

WARN  Heartbeat stopped for statement 01f14e36-85ad-17f8-ae5e-dbc8147c6663|a4b9c2e1f3...
      after 10 consecutive failures. Server-side results may expire. Last error: ...

The Thrift secret is bearer-authentication material — it authorises GetOperationStatus/CancelOperation/CloseOperation against the originating statement. Any operator with log-read access (Splunk, Datadog, Sumo, SIEM) can resurrect or cancel a victim's running statement during its lifetime.

Same leak pattern at 5 other heartbeat-PR-owned sites:

  • DatabricksResultSet.java:378-380 — INFO "Heartbeat detected terminal state for statement {}"
  • DatabricksResultSet.java:405-409 — INFO "Heartbeat failed for statement {} (first failure)"
  • DatabricksThriftServiceClient.java:275 — DEBUG (lower impact)
  • DatabricksThriftServiceClient.java:285-287 — WARN "Heartbeat for statement {} received null operation state"
  • DatabricksThriftServiceClient.java:296 — DEBUG

Additional pre-existing site that should be fixed alongside (introduced by sibling PR #1444 f3923fa2, but worth flagging since it's right next to your code):

  • DatabricksStatement.java:1047closeThread.setName("close-stmt-" + prevStatementId) — secret in thread name, visible via jstack, JMX ThreadMXBean, JFR, /proc/<pid>/task/<tid>/comm, async-profiler, JMC, any uncaught-exception handler that logs Thread.currentThread().getName().

Fix: Replace every statementId interpolation in non-DEBUG logs and thread names with statementId.toSQLExecStatementId() (returns only the guid). The repo already exposes StatementId.loggableStatementId(...) documented as "used for logging purposes to avoid logging sensitive information" — the pattern is established.

Example for this line:

LOGGER.warn(
    "Heartbeat stopped for statement {} after {} consecutive failures. "
        + "Server-side results may expire. Last error: {}",
    capturedStatementId.toSQLExecStatementId(),
    failures,
    e.getMessage());

Flagged by security, language, ops, devil's advocate (4/9 consensus).


Posted via /full-review iter 2

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Cross-thread cancel() reads statementId without volatile — partial-visibility hazard with the merged resetForNewExecution()

This new heartbeat-stop block reads statementId (and directResultsReceived, serverOperationClosed) under the cancel() path. cancel() is a JDBC-spec cross-thread method (DBA tooling, query timeouts, user-cancel buttons all invoke it from a different thread than the executing thread).

The merged upstream commit f3923fa2 makes directResultsReceived and serverOperationClosed volatile (good — line 62-64 of this file). But statementId (declared around line 46) is NOT volatile, and the new resetForNewExecution() writes this.statementId = null at line 1077.

This creates a partial-happens-before hazard:

  • A reader of the two volatile booleans gets a release-acquire barrier that incidentally publishes prior writes to non-volatile fields.
  • But only if the reads occur in the right order, and the writes happened before the volatile write that triggered the release.
  • The current cancel() guard if (statementId != null && !directResultsReceived && !serverOperationClosed) reads statementId BEFORE the volatile booleans, so the release barrier doesn't help.

Concrete failure mode: Thread A is mid-resetForNewExecution() between this.statementId = null (line 1077) and assignment of the new statementId (subsequent execution). Thread B's cancel() reads a stale non-null statementId from CPU cache, then proceeds into the heartbeat-stop block and into the SDK's cancelStatement(...) RPC. Two outcomes:

  1. Server returns "operation not found" — cancel reports success but had no effect on the live execution. The user thinks they cancelled; they didn't.
  2. Worst case: between thread A's null write and thread B's stale read, the new execution started with a recycled / similar-looking ID; cancel hits the new operation instead of the intended old one.

Fix (minimal):

private volatile StatementId statementId;

at the field declaration around line 46. statementId writes are infrequent (once per execution); volatile cost is negligible.

Stronger fix: wrap resetForNewExecution() and the cross-thread reader in cancel() under a shared lock. The volatile fix is the minimum cross-thread visibility guarantee required for JDBC's documented cancel-from-another-thread contract.

Flagged by language reviewer.


Posted via /full-review iter 2

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// No execution result — nothing to fetch
if (executionResult == null) {
return false;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] F3 still open — Thrift direct-results heartbeat fires on already-closed operations; test suite now codifies the bug as intentional

This eligibility check filters CLOSED, PENDING, and RUNNING but NOT SUCCEEDED-with-direct-results. For the Thrift direct-results path:

  1. DatabricksThriftAccessor.executeStatement polls until FINISHED_STATE
  2. DatabricksThriftUtil.getStatementStatus() (lines 181-182) maps FINISHED_STATE → StatementState.SUCCEEDEDnot CLOSED
  3. The DatabricksResultSet constructor at DatabricksThriftAccessor.java:267 runs startHeartbeatIfEnabled() (via the Thrift constructor at DatabricksResultSet.java:196)
  4. isHeartbeatEligible() returns true because SUCCEEDED ≠ CLOSED
  5. parentStatement.markDirectResultsReceived() is called at DatabricksThriftAccessor.java:287 — but only AFTER the constructor has already scheduled the heartbeat. The eligibility check has no way to observe direct-results state.

Consequence: For every Thrift query returning direct/inline results with a closeOperation marker (the most common Thrift query shape — small SELECT, SHOW, JDBC metadata-via-SQL), a heartbeat task is scheduled. The first tick at +60s calls GetOperationStatus on a server-closed handle, fails, and burns 10 failure ticks (~10 minutes) before self-stopping. Each failure produces an INFO log; the final WARN says "Server-side results may expire" — when the operation was already cleanly closed by the server.

Test suite codifies the bug: ResultSetHeartbeatEligibilityTest.testDirectResultsNotEligible (lines 104-113) only covers the SEA path (StatementState.CLOSED). The Thrift path's StatementState.SUCCEEDED + parentStatement.directResultsReceived=true case has no test — and the related testMetadataQueryIsEligible (lines 80-89) actively asserts assertTrue(rs.isHeartbeatEligible(), "Metadata queries can have large results"), locking in another related false-eligible case (F12).

Fix options (pick one):

(a) Have markDirectResultsReceived() also call stopHeartbeat(statementId):

public void markDirectResultsReceived() {
  this.directResultsReceived = true;
  if (statementId != null && connection != null) {
    ResultHeartbeatManager mgr = connection.getHeartbeatManager();
    if (mgr != null) mgr.stopHeartbeat(statementId);
  }
  LOGGER.info("Statement {} received direct results (server closed operation)", statementId.toSQLExecStatementId());
}

(b) Consult parent statement state inside isHeartbeatEligible():

if (parentStatement != null && parentStatement.isDirectResultsReceived()) {
  return false;
}

This requires exposing isDirectResultsReceived() on IDatabricksStatementInternal.

(c) In DatabricksThriftAccessor.executeStatement, when isDirectResults && response.getDirectResults().isSetCloseOperation(), pass new StatementStatus().setState(StatementState.CLOSED) to the DatabricksResultSet constructor instead of the raw SUCCEEDED status — so the existing CLOSED filter catches it.

After the fix, add a test like:

@Test
void testThriftDirectResultsNotEligible() {
  // construct with StatementState.SUCCEEDED but parentStatement.directResultsReceived=true
  // assert isHeartbeatEligible() == false
}

Flagged by 7 of 9 reviewers (security, language, architecture, ops, agent-compat, maintainability, devil's advocate).


Posted via /full-review iter 2

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled

gopalldb added 2 commits May 21, 2026 15:12
…secret

StatementId.toString() for Thrift returns "guid|secret" which leaks the
secret in log files. All heartbeat log statements now use
toSQLExecStatementId() which returns only the guid.

Affected files: DatabricksResultSet (heartbeat lambda), ResultHeartbeatManager
(start/stop/shutdown), DatabricksThriftServiceClient (checkStatementAlive).

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Copy link
Copy Markdown
Collaborator

@msrathore-db msrathore-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

gopalldb added 2 commits May 21, 2026 15:20
Thrift direct results arrive as SUCCEEDED (not CLOSED), so
isHeartbeatEligible() returns true and heartbeat starts. When
markDirectResultsReceived() is called (after the ResultSet constructor),
it now stops the heartbeat immediately — preventing 10 minutes of
failed GetOperationStatus RPCs on a server-closed handle.

Verified live: Thrift SELECT 1 shows "Starting heartbeat" then
"Stopped heartbeat" in sequence, with no heartbeat activity during
12s sleep.

Added test: testThriftSucceededIsEligible documents that SUCCEEDED
passes eligibility (the stop happens in markDirectResultsReceived).

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
cancel() is a JDBC-spec cross-thread method. Without volatile,
a concurrent cancel() can read a stale statementId from CPU cache
while resetForNewExecution() is writing null on the executing thread.

volatile ensures happens-before visibility. No locks needed — writes
are infrequent (once per execution), cost is negligible.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb enabled auto-merge (squash) May 21, 2026 10:15
@gopalldb gopalldb merged commit e1a2c53 into main May 21, 2026
18 checks passed
@gopalldb gopalldb deleted the design/heartbeat-keep-alive branch May 21, 2026 10:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants