Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,7 @@ func (cn *conn) readParseResponse() error {
return nil
case proto.ErrorResponse:
err := parseError(r, "")
_ = cn.readReadyForQuery()
_ = cn.handleError(cn.readReadyForQuery())
return err
default:
cn.err.set(driver.ErrBadConn)
Expand Down Expand Up @@ -1788,7 +1788,7 @@ func (cn *conn) readStatementDescribeResponse() (paramTyps []oid.Oid, colNames [
return paramTyps, colNames, colTyps, nil
case proto.ErrorResponse:
err := parseError(r, "")
_ = cn.readReadyForQuery()
_ = cn.handleError(cn.readReadyForQuery())
return nil, nil, nil, err
default:
cn.err.set(driver.ErrBadConn)
Expand All @@ -1809,7 +1809,7 @@ func (cn *conn) readPortalDescribeResponse() (rowsHeader, error) {
return rowsHeader{}, nil
case proto.ErrorResponse:
err := parseError(r, "")
_ = cn.readReadyForQuery()
_ = cn.handleError(cn.readReadyForQuery())
return rowsHeader{}, err
default:
cn.err.set(driver.ErrBadConn)
Expand All @@ -1827,7 +1827,7 @@ func (cn *conn) readBindResponse() error {
return nil
case proto.ErrorResponse:
err := parseError(r, "")
_ = cn.readReadyForQuery()
_ = cn.handleError(cn.readReadyForQuery())
return err
default:
cn.err.set(driver.ErrBadConn)
Expand All @@ -1853,7 +1853,7 @@ func (cn *conn) postExecuteWorkaround() error {
switch t {
case proto.ErrorResponse:
err := parseError(r, "")
_ = cn.readReadyForQuery()
_ = cn.handleError(cn.readReadyForQuery())
return err
case proto.CommandComplete, proto.DataRow, proto.EmptyQueryResponse:
// the query didn't fail, but we can't process this message
Expand Down
57 changes: 57 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"io"
"maps"
Expand Down Expand Up @@ -578,6 +579,62 @@ func TestUnexpectedEOF(t *testing.T) {
pqtest.QueryRow[int](t, db, `select okay`)
}

// #1320
func TestUnexpectedClose(t *testing.T) {
t.Parallel()

// Emit non-fatal ErrorResponse and close the connection on any Parse.
// Previously this would cause conn.inProgress to remain "stuck" on true
// because cn.err was never set and no ReadyForQuery was received.
var triggered atomic.Bool
f := pqtest.NewFake(t, func(f pqtest.Fake, cn net.Conn) {
f.Startup(cn, nil)
for {
code, _, ok := f.ReadMsg(cn)
if !ok {
return
}
switch code {
case proto.Terminate:
cn.Close()
return
case proto.Query: // Ping(): empty simple query.
f.WriteMsg(cn, proto.EmptyQueryResponse, "")
f.WriteMsg(cn, proto.ReadyForQuery, "I")
case proto.Parse:
triggered.Store(true)
f.WriteMsg(cn, proto.ErrorResponse, "SERROR\x00C08P01\x00Mserver conn crashed?\x00\x00")
cn.Close()
return
}
}
})
defer f.Close()
db := pqtest.MustDB(t, f.DSN())
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

_, err := db.Exec("select $1::int", 1)
if err == nil {
t.Fatal("first Exec: want non-nil error from pooler fault, got nil")
}
if !triggered.Load() {
t.Fatal("server-side fault was never triggered")
}

// Must not short-circuit to errQueryInProgress and have the poisoned conn
// stays in the pool with inProgress=true. database/sql should open a new
// connection we get a proper *pq.Error from the new fault, not the "there
// is already a query being processed" guard.
_, err = db.Exec("select $1::int", 1)
if err == nil {
t.Fatal("second Exec: want non-nil error, got nil")
}
if errors.Is(err, errQueryInProgress) {
t.Fatalf("second Exec: poisoned conn was reused from pool: %v", err)
}
}

func TestConnClose(t *testing.T) {
// Ensure the underlying connection can be closed with Close after an error.
t.Run("CloseBadConn", func(t *testing.T) {
Expand Down