Skip to content

Commit 5efb321

Browse files
committed
Add CancelAndDrainContextWatcherHandler
Replace the racy 100ms sleep added in 93a5797 with a deterministic single-";" drain that absorbs stale SQLSTATE 57014 (query_canceled) errors before the connection is reused. Key changes: - Add CancelAndDrainContextWatcherHandler implementing ctxwatch.Handler. HandleUnwatchAfterCancel sends exactly one ";" after a successful cancel — sufficient because PostgreSQL's QueryCancelPending is a single flag, not a queue. - Add mutex-guarded three-state cancel machine (idle/inFlight/sent) on CancelRequest. This prevents double-send from concurrent callers (ctxwatch, asyncClose, direct user code) which would produce multiple 57014 responses that a single drain cannot reconcile. - Fix data race on pid and secretKey in CancelRequest by grouping both into a backendKeyData struct behind atomic.Pointer. - Extract cancel protocol constants (cancelRequestCode, negotiateSSLCode, packet field offsets) from magic numbers, referencing CancelRequestPacket in src/include/libpq/pqcomm.h.
1 parent a5680bc commit 5efb321

4 files changed

Lines changed: 850 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# Unreleased
2+
3+
* Add CancelAndDrainContextWatcherHandler that replaces the racy 100ms sleep in CancelRequestContextWatcherHandler with a single-";" drain, preventing stale 57014 (query_canceled) errors from bleeding into the next query on a connection (Sean Chittenden)
4+
* Fix data race on pid and secretKey in CancelRequest by grouping both into an atomically-published backendKeyData struct
5+
* Extract PostgreSQL cancel protocol constants (cancelRequestCode, negotiateSSLCode, packet field offsets) from magic numbers, referencing src/include/libpq/pqcomm.h
6+
* Add mutex-guarded cancel state machine on CancelRequest to prevent double-send of cancel requests from concurrent callers
7+
18
# 5.9.1 (March 22, 2026)
29

310
* Fix: batch result format corruption when using cached prepared statements (reported by Dirkjan Bussink)

pgconn/cancel_and_drain.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package pgconn
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/jackc/pgx/v5/pgconn/ctxwatch"
9+
"github.com/jackc/pgx/v5/pgproto3"
10+
)
11+
12+
const (
13+
defaultDeadlineDelay = time.Second
14+
defaultDrainTimeout = 5 * time.Second
15+
16+
queryCanceledSQLStateCode = "57014"
17+
18+
cancelStateIdle = 0
19+
cancelStateInFlight = 1
20+
cancelStateSent = 2
21+
)
22+
23+
// ErrCancelAlreadyInFlight is returned when a cancel request is attempted while one is already in progress. Sending
24+
// multiple cancel requests for the same connection would cause multiple 57014 responses, which the single-";" drain
25+
// cannot reconcile. The mutex guard in [PgConn.CancelRequest] prevents this at the protocol level, but higher-level
26+
// APIs should surface this error to callers rather than silently retrying.
27+
var ErrCancelAlreadyInFlight = errors.New("cancel request already in flight for this connection")
28+
29+
// CancelAndDrainContextWatcherHandler handles cancelled contexts by first sending a cancel request, then draining any
30+
// pending SQLSTATE 57014 (query_canceled) with a single ";" round-trip.
31+
//
32+
// Correctness depends on at most one cancel request being in flight per connection at any time. Each cancel request
33+
// causes the server to set QueryCancelPending, which produces exactly one 57014. If two cancel requests were sent,
34+
// two 57014s could arrive -- the first absorbed by the drain, the second bleeding into the next real query. This
35+
// invariant is enforced by [PgConn.CancelRequest]'s mutex-guarded state machine, which blocks concurrent callers
36+
// until the in-flight cancel completes.
37+
type CancelAndDrainContextWatcherHandler struct {
38+
Conn *PgConn
39+
40+
// DeadlineDelay is a net.Conn deadline set when the context is cancelled, used as a fallback to unblock blocked
41+
// reads. Defaults to defaultDeadlineDelay (1s).
42+
DeadlineDelay time.Duration
43+
44+
// DrainTimeout caps the single drain round-trip. Defaults to defaultDrainTimeout (5s).
45+
DrainTimeout time.Duration
46+
47+
doneCtx context.Context //nolint:containedctx // synchronization primitive, not a request-scoped context
48+
doneFn context.CancelFunc
49+
stopFn context.CancelFunc
50+
}
51+
52+
var _ ctxwatch.Handler = (*CancelAndDrainContextWatcherHandler)(nil)
53+
54+
func (h *CancelAndDrainContextWatcherHandler) deadlineDelay() time.Duration {
55+
if h.DeadlineDelay == 0 {
56+
return defaultDeadlineDelay
57+
}
58+
return h.DeadlineDelay
59+
}
60+
61+
func (h *CancelAndDrainContextWatcherHandler) drainTimeout() time.Duration {
62+
if h.DrainTimeout == 0 {
63+
return defaultDrainTimeout
64+
}
65+
return h.DrainTimeout
66+
}
67+
68+
// HandleCancel is called when the watched context is cancelled. It applies a net.Conn deadline as a fallback and fires
69+
// a cancel request in a goroutine. Mutual exclusion (at most one cancel in flight) is enforced by
70+
// [PgConn.CancelRequest], not here -- the ctxwatch.Handler interface does not permit a return value, but CancelRequest
71+
// will block if another cancel is already in progress.
72+
//
73+
// The parent context is inherited (via WithoutCancel) so that values like trace IDs propagate into the cancel request
74+
// without inheriting its already-fired cancellation.
75+
func (h *CancelAndDrainContextWatcherHandler) HandleCancel(ctx context.Context) {
76+
baseCtx := context.WithoutCancel(ctx)
77+
cancelCtx, stop := context.WithCancel(baseCtx)
78+
h.stopFn = stop
79+
80+
h.doneCtx, h.doneFn = context.WithCancel(context.Background())
81+
82+
deadline := time.Now().Add(h.deadlineDelay())
83+
h.Conn.conn.SetDeadline(deadline)
84+
85+
go func() {
86+
defer h.doneFn()
87+
reqCtx, cancel := context.WithDeadline(cancelCtx, deadline)
88+
defer cancel()
89+
h.Conn.CancelRequest(reqCtx)
90+
}()
91+
}
92+
93+
// HandleUnwatchAfterCancel is called after the cancelled query returns. It waits for the cancel goroutine, clears the
94+
// deadline, and -- if the cancel was successfully sent (cancelStateSent) -- sends exactly one ";" to absorb any pending
95+
// 57014. Finally it transitions back to idle.
96+
func (h *CancelAndDrainContextWatcherHandler) HandleUnwatchAfterCancel() {
97+
if h.stopFn != nil {
98+
h.stopFn()
99+
}
100+
if h.doneCtx != nil {
101+
<-h.doneCtx.Done()
102+
}
103+
h.Conn.conn.SetDeadline(time.Time{})
104+
h.doneCtx = nil
105+
h.doneFn = nil
106+
h.stopFn = nil
107+
108+
h.Conn.cancelMu.Lock()
109+
needsDrain := h.Conn.cancelMu.state == cancelStateSent
110+
if needsDrain {
111+
h.Conn.cancelMu.state = cancelStateIdle
112+
}
113+
h.Conn.cancelMu.Unlock()
114+
115+
if !h.Conn.IsClosed() && needsDrain {
116+
ctx, cancel := context.WithTimeout(context.Background(), h.drainTimeout())
117+
defer cancel()
118+
h.Conn.drainOnce(ctx)
119+
}
120+
}
121+
122+
// drainOnce sends a single ";" and reads the response. If the server returns 57014, the cancel was still pending and is
123+
// now consumed. If the server returns a clean EmptyQueryResponse, the cancel was already consumed by the original query.
124+
// Either way the connection is clean after one round-trip -- no loop required.
125+
//
126+
// This design assumes at most one cancel is in flight per connection (enforced by [PgConn.CancelRequest]). A single
127+
// cancel produces at most one QueryCancelPending flag on the server, which yields at most one 57014.
128+
func (pgConn *PgConn) drainOnce(ctx context.Context) {
129+
if deadline, ok := ctx.Deadline(); ok {
130+
pgConn.conn.SetDeadline(deadline)
131+
defer pgConn.conn.SetDeadline(time.Time{})
132+
}
133+
134+
pgConn.frontend.Send(&pgproto3.Query{String: ";"})
135+
if err := pgConn.frontend.Flush(); err != nil {
136+
pgConn.asyncClose()
137+
return
138+
}
139+
140+
for {
141+
msg, err := pgConn.receiveMessage()
142+
if err != nil {
143+
pgConn.asyncClose()
144+
return
145+
}
146+
147+
switch msg := msg.(type) {
148+
case *pgproto3.ReadyForQuery:
149+
return
150+
case *pgproto3.ErrorResponse:
151+
pgErr := ErrorResponseToPgError(msg)
152+
if pgErr.Code != queryCanceledSQLStateCode {
153+
pgConn.asyncClose()
154+
return
155+
}
156+
// 57014 absorbed -- continue reading until ReadyForQuery
157+
case *pgproto3.EmptyQueryResponse:
158+
// Expected response for ";" -- continue reading until ReadyForQuery
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)