Skip to content

Commit 59a9eef

Browse files
committed
pgconn: add CancelAndDrainContextWatcherHandler
Add a context watcher handler that deterministically drains stale cancel signals instead of relying on a fixed 100ms sleep. When a context is cancelled, HandleCancel sends the cancel request immediately. After the query returns, HandleUnwatchAfterCancel polls SELECT 1 to absorb any in-flight SQLSTATE 57014 before releasing the connection. The drain runs inside Unwatch via HandleUnwatchAfterCancel, so every pgconn operation that watches a context (Exec, ExecParams, Prepare, Deallocate, CopyTo, CopyFrom, Pipeline, WaitForNotification) gets drain-after-cancel automatically with no per-call-site wiring.
1 parent da20f82 commit 59a9eef

2 files changed

Lines changed: 399 additions & 0 deletions

File tree

pgconn/cancel_and_drain.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package pgconn
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/jackc/pgx/v5/pgconn/ctxwatch"
8+
"github.com/jackc/pgx/v5/pgproto3"
9+
)
10+
11+
// CancelAndDrainContextWatcherHandler handles cancelled contexts by sending a cancel request to the server and then
12+
// draining any in-flight SQLSTATE 57014 (query_canceled) by polling SELECT 1. Unlike [CancelRequestContextWatcherHandler],
13+
// no fixed sleep is used; the drain is deterministic.
14+
type CancelAndDrainContextWatcherHandler struct {
15+
Conn *PgConn
16+
17+
// DeadlineDelay is the network deadline set on the connection when the context
18+
// is cancelled, used as a fallback to unblock any blocked read. Defaults to 1s.
19+
DeadlineDelay time.Duration
20+
21+
// DrainTimeout is the maximum time to spend draining a cancelled query's
22+
// in-flight results via SELECT 1 polling. Defaults to 5s.
23+
DrainTimeout time.Duration
24+
25+
cancelFinishedChan chan struct{}
26+
stopFn context.CancelFunc
27+
}
28+
29+
var _ ctxwatch.Handler = (*CancelAndDrainContextWatcherHandler)(nil)
30+
31+
func (h *CancelAndDrainContextWatcherHandler) deadlineDelay() time.Duration {
32+
if h.DeadlineDelay == 0 {
33+
return time.Second
34+
}
35+
return h.DeadlineDelay
36+
}
37+
38+
func (h *CancelAndDrainContextWatcherHandler) drainTimeout() time.Duration {
39+
if h.DrainTimeout == 0 {
40+
return 5 * time.Second
41+
}
42+
return h.DrainTimeout
43+
}
44+
45+
// HandleCancel is called when the context is cancelled. It sets a net.Conn deadline
46+
// as a fallback and sends a PostgreSQL cancel request in a goroutine.
47+
func (h *CancelAndDrainContextWatcherHandler) HandleCancel(_ context.Context) {
48+
h.cancelFinishedChan = make(chan struct{})
49+
cancelCtx, stop := context.WithCancel(context.Background())
50+
h.stopFn = stop
51+
52+
deadline := time.Now().Add(h.deadlineDelay())
53+
h.Conn.conn.SetDeadline(deadline)
54+
55+
doneCh := h.cancelFinishedChan
56+
go func() {
57+
defer close(doneCh)
58+
reqCtx, cancel := context.WithDeadline(cancelCtx, deadline)
59+
defer cancel()
60+
h.Conn.CancelRequest(reqCtx)
61+
}()
62+
}
63+
64+
// HandleUnwatchAfterCancel is called after the cancelled query returns. It stops the cancel goroutine (if still
65+
// running), clears the net.Conn deadline, and drains any in-flight cancel by polling SELECT 1.
66+
func (h *CancelAndDrainContextWatcherHandler) HandleUnwatchAfterCancel() {
67+
if h.stopFn != nil {
68+
h.stopFn()
69+
}
70+
if h.cancelFinishedChan != nil {
71+
<-h.cancelFinishedChan
72+
}
73+
h.Conn.conn.SetDeadline(time.Time{})
74+
h.cancelFinishedChan = nil
75+
h.stopFn = nil
76+
77+
if !h.Conn.IsClosed() {
78+
ctx, cancel := context.WithTimeout(context.Background(), h.drainTimeout())
79+
defer cancel()
80+
h.Conn.execInternalForDrain(ctx)
81+
}
82+
}
83+
84+
// queryCanceledSQLStateCode is SQLSTATE 57014 (query_canceled).
85+
const queryCanceledSQLStateCode = "57014"
86+
87+
// execInternalForDrain sends an empty query (";") in a loop, absorbing any
88+
// SQLSTATE 57014 responses, until the connection is confirmed clean or a
89+
// non-57014 error occurs. On any failure the connection is asyncClosed.
90+
//
91+
// Called while the connection is still logically "busy" from pgconn's perspective
92+
// (lock is held and contextWatcher.Unwatch has been called) but idle from the
93+
// PostgreSQL server's perspective (ReadyForQuery was just received). This means
94+
// it bypasses the normal lock/unlock and contextWatcher.Watch paths.
95+
//
96+
// The deadline from ctx is applied directly to the net.Conn.
97+
func (pgConn *PgConn) execInternalForDrain(ctx context.Context) {
98+
if deadline, ok := ctx.Deadline(); ok {
99+
pgConn.conn.SetDeadline(deadline)
100+
defer pgConn.conn.SetDeadline(time.Time{})
101+
}
102+
103+
outer:
104+
for {
105+
pgConn.frontend.Send(&pgproto3.Query{String: ";"})
106+
if err := pgConn.frontend.Flush(); err != nil {
107+
pgConn.asyncClose()
108+
return
109+
}
110+
clean := true
111+
for {
112+
msg, err := pgConn.receiveMessage()
113+
if err != nil {
114+
pgConn.asyncClose()
115+
return
116+
}
117+
118+
switch msg := msg.(type) {
119+
case *pgproto3.ReadyForQuery:
120+
if !clean {
121+
clean = true
122+
continue outer // absorbed 57014; send another to confirm clean
123+
}
124+
return // clean ReadyForQuery — done
125+
case *pgproto3.ErrorResponse:
126+
pgErr := ErrorResponseToPgError(msg)
127+
if pgErr.Code == queryCanceledSQLStateCode {
128+
clean = false // cancel hit this query; will confirm after ReadyForQuery
129+
} else {
130+
pgConn.asyncClose()
131+
return
132+
}
133+
case *pgproto3.EmptyQueryResponse:
134+
// Expected response for ";".
135+
}
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)