Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,20 @@ public void close() {
// SCHEMA_MISMATCH HALT) from users who only call close() and
// never call flush() afterwards.
Throwable terminalError = null;
// Snapshot the latched terminal error that the user thread has
// ALREADY caught (via flush()/at()) before close() ran. If
// flushPendingRows/drainOnClose below also rethrow the same
// Snapshot the exact terminal error instance that a user-thread
// API call ALREADY caught (via flush()/at()) before close() ran.
// If flushPendingRows/drainOnClose below also rethrow the same
// instance, dropping it at the final rethrow avoids
// try-with-resources self-suppression: Throwable.addSuppressed
// raises IllegalArgumentException when primary == suppressed.
Throwable alreadyOwnedByUser = (cursorSendLoop != null && !cursorSendLoop.hasUnsurfacedError())
? cursorSendLoop.getLastError() : null;
// Must stay this single read: the snapshot needs the identity of
// the error the user already owns, and only
// getSynchronouslySurfacedError() holds it. Deriving it from two
// separate latch reads races the I/O thread -- a terminal latched
// between the reads would be adopted as user-owned and silently
// dropped (see CloseOwnershipRaceTest).
Throwable alreadyOwnedByUser = cursorSendLoop != null
? cursorSendLoop.getSynchronouslySurfacedError() : null;

try {
// Only drain when both the engine and the I/O loop are wired
Expand All @@ -957,21 +963,20 @@ public void close() {
// only when no other channel has already delivered it
// to the user. "Already delivered" means either the
// producer thread saw it synchronously via
// flush()/append() (errorSurfacedSynchronously) or the
// async dispatcher delivered it to a user-installed
// custom handler at any point in this sender's life
// (deliveredToCustomHandler). The latter survives a
// setErrorHandler(null) cleanup in test helpers --
// once the user has owned an error, close() should
// not double-signal it. The default no-op logging
// handler does not count as "delivered to user", so a
// config-string-only caller still gets the loud
// rethrow on shutdown.
// flush()/append() (checkUnsurfacedError is silent in
// that case) or the async dispatcher delivered it to a
// user-installed custom handler at any point in this
// sender's life (deliveredToCustomHandler, checked
// here). The latter survives a setErrorHandler(null)
// cleanup in test helpers -- once the user has owned
// an error, close() should not double-signal it. The
// default no-op logging handler does not count as
// "delivered to user", so a config-string-only caller
// still gets the loud rethrow on shutdown.
boolean alreadyDeliveredToCustomHandler = errorDispatcher != null
&& errorDispatcher.hasDeliveredToCustomHandler();
if (!alreadyDeliveredToCustomHandler
&& cursorSendLoop.hasUnsurfacedError()) {
cursorSendLoop.checkError();
if (!alreadyDeliveredToCustomHandler) {
cursorSendLoop.checkUnsurfacedError();
}
// 3) Bounded drain: block until the server has ACK'd
// everything we just published, or until the
Expand Down Expand Up @@ -2558,7 +2563,7 @@ private void checkConnectionError() {
error.fillInStackTrace();
throw error;
}
// Poll the cursor I/O loop's lastError too. Without this, a fatal
// Poll the cursor I/O loop's terminalError too. Without this, a fatal
// wire / server-rejection error recorded by the I/O thread would
// only surface on the next flush() / close() — every row-level
// method (table, longColumn, atNow, etc.) routes through
Expand Down
Loading
Loading