Skip to content

Commit 2cf098d

Browse files
committed
go.mod: bump pgx to v5.9.2 for OAuthTokenProvider support
Bumps github.com/jackc/pgx/v5 from v5.7.2 to v5.9.2. The primary motivation is that v5.9.2 adds OAuthTokenProvider to pgconn.Config, enabling native OAUTHBEARER SASL client support. Four compatibility fixes are required: 1. pgconn.PgConn.SecretKey() now returns []byte instead of uint32, supporting the new PostgreSQL protocol v3.2 variable-length cancel key. The sqlproxy cancel tests recover the original uint32 using binary.BigEndian.Uint32() since the proxy always sends a 4-byte key. 2. pgconn now returns a "conn closed" error from its internal state machine when operations are attempted on an already-closed connection, rather than propagating the OS-level "connection reset by peer" or "unexpected EOF". Two regex assertions in sqlproxy tests are updated to accept all three forms. 3. pgxpool.Config.BeforeAcquire is deprecated in v5.9.2 in favor of PrepareConn, which has the same semantics but also returns an error. The workload pool setup is migrated accordingly. 4. pgx v5.9.2 enforces the PostgreSQL pipeline protocol more strictly: ReadyForQuery must be sent after Sync, not after each Execute. TestPipelineMetric's fake server was sending ReadyForQuery after each Execute (via finishQuery(execPortal)), which worked with v5.7.2's lenient pipeline reader but fails with v5.9.2. Two new test helpers (expectExecPortalPipelined, expectSyncAndSendReadyForQuery) correctly simulate the pipeline response sequence. Closes #171149 Closes #169280 Informs: CRDB-64341 Epic: CRDB-60766 Release note: None
1 parent 17b56ec commit 2cf098d

7 files changed

Lines changed: 68 additions & 25 deletions

File tree

DEPS.bzl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5134,10 +5134,10 @@ def go_deps():
51345134
name = "com_github_jackc_pgx_v5",
51355135
build_file_proto_mode = "disable_global",
51365136
importpath = "github.com/jackc/pgx/v5",
5137-
sha256 = "c4b5a22a3f3db2e764f5b4df65ad1b2550d4587b3640f73ab27b868136ef9018",
5138-
strip_prefix = "github.com/jackc/pgx/v5@v5.7.2",
5137+
sha256 = "0e8e8456630e580729cd1b06bf402cc6b98b57ec3a867df0e89e6ff27aa361c9",
5138+
strip_prefix = "github.com/jackc/pgx/v5@v5.9.2",
51395139
urls = [
5140-
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.7.2.zip",
5140+
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.9.2.zip",
51415141
],
51425142
)
51435143
go_repository(

build/bazelutil/distdir_files.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ DISTDIR_FILES = {
672672
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20240606120523-5a60cdf6a761.zip": "c9e31c91aebf96eb246bd410d1849cc7666d955a1e20ca2eba1c30b4eb89335f",
673673
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.14.3.zip": "24eb1112e74a18ba22ef7b47f59808060ddef00b98c2ade780d81b283e40f676",
674674
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v4/com_github_jackc_pgx_v4-v4.18.3.zip": "1d5955dc65b8de8f72f9856865b33dcd9a2238f7cf9b1f2a00f1558e7d4965da",
675-
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.7.2.zip": "c4b5a22a3f3db2e764f5b4df65ad1b2550d4587b3640f73ab27b868136ef9018",
675+
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.9.2.zip": "0e8e8456630e580729cd1b06bf402cc6b98b57ec3a867df0e89e6ff27aa361c9",
676676
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/com_github_jackc_puddle-v1.3.0.zip": "b1eb42bb3cf9a430146af79cb183860b9dddfca51844c2d4b447dc2f43becc55",
677677
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.2.zip": "f1f0789098a0bcb5ff3c7024f9ee387a6748446e1bc6713b13a63d763a9fb11e",
678678
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jaegertracing/jaeger/com_github_jaegertracing_jaeger-v1.18.1.zip": "256a95b2a52a66494aca6d354224bb450ff38ce3ea1890af46a7c8dc39203891",

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ require (
185185
github.com/iancoleman/strcase v0.2.0
186186
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040
187187
github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6
188-
github.com/jackc/pgx/v5 v5.7.2
188+
github.com/jackc/pgx/v5 v5.9.2
189189
github.com/jaegertracing/jaeger v1.18.1
190190
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
191191
github.com/jordanlewis/gcassert v0.0.0-20240401195008-3141cbd028c0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,8 +1519,8 @@ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgS
15191519
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
15201520
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
15211521
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
1522-
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
1523-
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
1522+
github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw=
1523+
github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
15241524
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
15251525
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
15261526
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=

pkg/sql/pgwire/conn_test.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func TestPipelineMetric(t *testing.T) {
289289
expectPrepareStmt(ctx, t, "", "SELECT $1::INT8 + $2::INT8", &rd, serverSideConn)
290290
expectBindStmt(ctx, t, "", &rd, serverSideConn)
291291
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
292-
expectExecPortal(ctx, t, "", &rd, serverSideConn)
292+
expectExecPortalPipelined(ctx, t, "", &rd, serverSideConn)
293293
require.EqualValues(t, 5, serverSideConn.stmtBuf.PipelineCount.Value())
294294

295295
// Send another query in the pipeline.
@@ -310,19 +310,21 @@ func TestPipelineMetric(t *testing.T) {
310310
return nil
311311
})
312312

313-
// Process all of the commands that are in the pipeline.
313+
// Process all of the commands that are in the pipeline. In the PostgreSQL
314+
// pipeline protocol, ReadyForQuery is sent once per Sync (not per Execute),
315+
// so we use expectExecPortalPipelined and expectSyncAndSendReadyForQuery.
314316
expectPrepareStmt(ctx, t, "", "SELECT ($1::INT8 + $2::INT8) + $3::INT8", &rd, serverSideConn)
315317
expectBindStmt(ctx, t, "", &rd, serverSideConn)
316318
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
317-
expectExecPortal(ctx, t, "", &rd, serverSideConn)
318-
expectSync(ctx, t, &rd)
319+
expectExecPortalPipelined(ctx, t, "", &rd, serverSideConn)
320+
expectSyncAndSendReadyForQuery(ctx, t, &rd, serverSideConn)
319321
require.EqualValues(t, 5, serverSideConn.stmtBuf.PipelineCount.Value())
320322

321323
expectPrepareStmt(ctx, t, "", "SELECT $1::STRING", &rd, serverSideConn)
322324
expectBindStmt(ctx, t, "", &rd, serverSideConn)
323325
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
324-
expectExecPortal(ctx, t, "", &rd, serverSideConn)
325-
expectSync(ctx, t, &rd)
326+
expectExecPortalPipelined(ctx, t, "", &rd, serverSideConn)
327+
expectSyncAndSendReadyForQuery(ctx, t, &rd, serverSideConn)
326328
require.EqualValues(t, 0, serverSideConn.stmtBuf.PipelineCount.Value())
327329

328330
err = pipeline.Close()
@@ -951,6 +953,46 @@ func expectSync(ctx context.Context, t *testing.T, rd *sql.StmtBufReader) {
951953
}
952954
}
953955

956+
// expectSyncAndSendReadyForQuery reads a Sync command from the stmtBuf and
957+
// sends ReadyForQuery back to the client. In the PostgreSQL pipeline protocol,
958+
// ReadyForQuery is sent after Sync, not after each Execute.
959+
func expectSyncAndSendReadyForQuery(
960+
ctx context.Context, t *testing.T, rd *sql.StmtBufReader, c *conn,
961+
) {
962+
t.Helper()
963+
expectSync(ctx, t, rd)
964+
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
965+
c.msgBuilder.writeByte('I') // transaction status: no txn
966+
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
967+
t.Fatal(err)
968+
}
969+
}
970+
971+
// expectExecPortalPipelined reads an ExecPortal command from the stmtBuf and
972+
// sends only CommandComplete (no ReadyForQuery) back to the client. This
973+
// matches the PostgreSQL pipeline protocol where ReadyForQuery is sent after
974+
// Sync, not after each Execute.
975+
func expectExecPortalPipelined(
976+
ctx context.Context, t *testing.T, expName string, rd *sql.StmtBufReader, c *conn,
977+
) {
978+
t.Helper()
979+
cmd, err := rd.CurCmd()
980+
if err != nil {
981+
t.Fatal(err)
982+
}
983+
rd.AdvanceOne()
984+
ep, ok := cmd.(sql.ExecPortal)
985+
if !ok {
986+
t.Fatalf("expected command ExecPortal, got: %T (%+v)", cmd, cmd)
987+
}
988+
if ep.Name != expName {
989+
t.Fatalf("expected name %s, got %s", expName, ep.Name)
990+
}
991+
if err := finishQuery(cmdComplete, c); err != nil {
992+
t.Fatal(err)
993+
}
994+
}
995+
954996
func expectExecPortal(
955997
ctx context.Context, t *testing.T, expName string, rd *sql.StmtBufReader, c *conn,
956998
) {

pkg/sqlproxy/proxy_handler_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"crypto/tls"
1212
gosql "database/sql"
13+
"encoding/binary"
1314
"fmt"
1415
"io"
1516
"net"
@@ -438,7 +439,7 @@ func TestPrivateEndpointsACL(t *testing.T) {
438439
"Expected the connection to eventually fail",
439440
)
440441
require.Error(t, err)
441-
require.Regexp(t, "connection reset by peer|unexpected EOF", err.Error())
442+
require.Regexp(t, "connection reset by peer|unexpected EOF|conn closed", err.Error())
442443
require.Equal(t, int64(1), s.metrics.ExpiredClientConnCount.Count())
443444
},
444445
)
@@ -568,7 +569,7 @@ func TestAllowedCIDRRangesACL(t *testing.T) {
568569
time.Second, 5*time.Millisecond,
569570
"Expected the connection to eventually fail",
570571
)
571-
require.Regexp(t, "connection reset by peer|unexpected EOF", err.Error())
572+
require.Regexp(t, "connection reset by peer|unexpected EOF|conn closed", err.Error())
572573
require.Equal(t, int64(1), s.metrics.ExpiredClientConnCount.Count())
573574
})
574575
})
@@ -1700,7 +1701,7 @@ func TestCancelQuery(t *testing.T) {
17001701
cancelFn = func() {
17011702
cancelRequest := proxyCancelRequest{
17021703
ProxyIP: net.IP{},
1703-
SecretKey: conn.PgConn().SecretKey(),
1704+
SecretKey: binary.BigEndian.Uint32(conn.PgConn().SecretKey()),
17041705
ClientIP: net.IP{127, 0, 0, 1},
17051706
}
17061707
u := "http://" + addrs.httpAddr + "/_status/cancel/"
@@ -1730,7 +1731,7 @@ func TestCancelQuery(t *testing.T) {
17301731
_ = conn.PgConn().CancelRequest(ctx)
17311732
}
17321733
defer testutils.TestingHook(&defaultTransferTimeout, 3*time.Minute)()
1733-
origCancelInfo, found := proxy.handler.cancelInfoMap.getCancelInfo(conn.PgConn().SecretKey())
1734+
origCancelInfo, found := proxy.handler.cancelInfoMap.getCancelInfo(binary.BigEndian.Uint32(conn.PgConn().SecretKey()))
17341735
require.True(t, found)
17351736
b := tds.DrainPod(tenantID, tenants[0].SQLAddr())
17361737
require.True(t, b)
@@ -1753,7 +1754,7 @@ func TestCancelQuery(t *testing.T) {
17531754
timeSource.Advance(2 * time.Minute)
17541755
proxy.handler.balancer.RebalanceTenant(ctx, tenantID)
17551756
testutils.SucceedsSoon(t, func() error {
1756-
newCancelInfo, found := proxy.handler.cancelInfoMap.getCancelInfo(conn.PgConn().SecretKey())
1757+
newCancelInfo, found := proxy.handler.cancelInfoMap.getCancelInfo(binary.BigEndian.Uint32(conn.PgConn().SecretKey()))
17571758
if !found {
17581759
return errors.New("expected to find cancel info")
17591760
}
@@ -1775,7 +1776,7 @@ func TestCancelQuery(t *testing.T) {
17751776
snapshot := snapshotMetrics()
17761777
cancelRequest := proxyCancelRequest{
17771778
ProxyIP: net.IP{},
1778-
SecretKey: conn.PgConn().SecretKey(),
1779+
SecretKey: binary.BigEndian.Uint32(conn.PgConn().SecretKey()),
17791780
ClientIP: net.IP{210, 1, 2, 3},
17801781
}
17811782
u := "http://" + addrs.httpAddr + "/_status/cancel/"
@@ -1812,7 +1813,7 @@ func TestCancelQuery(t *testing.T) {
18121813
})()
18131814
crdbRequest := &pgproto3.CancelRequest{
18141815
ProcessID: 1,
1815-
SecretKey: conn.PgConn().SecretKey() + 1,
1816+
SecretKey: binary.BigEndian.Uint32(conn.PgConn().SecretKey()) + 1,
18161817
}
18171818
buf, err := crdbRequest.Encode(nil /* buf */)
18181819
require.NoError(t, err)
@@ -1829,7 +1830,7 @@ func TestCancelQuery(t *testing.T) {
18291830
require.Equal(t, "http://0.0.0.1:8080/_status/cancel/", forwardedTo)
18301831
expectedReq := proxyCancelRequest{
18311832
ProxyIP: net.IP{0, 0, 0, 1},
1832-
SecretKey: conn.PgConn().SecretKey() + 1,
1833+
SecretKey: binary.BigEndian.Uint32(conn.PgConn().SecretKey()) + 1,
18331834
ClientIP: net.IP{127, 0, 0, 1},
18341835
}
18351836
require.Equal(t, expectedReq, forwardedReq)
@@ -1840,7 +1841,7 @@ func TestCancelQuery(t *testing.T) {
18401841
snapshot := snapshotMetrics()
18411842
cancelRequest := proxyCancelRequest{
18421843
ProxyIP: net.IP{},
1843-
SecretKey: conn.PgConn().SecretKey() + 1,
1844+
SecretKey: binary.BigEndian.Uint32(conn.PgConn().SecretKey()) + 1,
18441845
ClientIP: net.IP{127, 0, 0, 1},
18451846
}
18461847
u := "http://" + addrs.httpAddr + "/_status/cancel/"

pkg/workload/pgx_helpers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func NewMultiConnPool(
224224
minConns = numConns
225225
}
226226
poolCfg.MinConns = int32(minConns)
227-
poolCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
227+
poolCfg.PrepareConn = func(ctx context.Context, conn *pgx.Conn) (bool, error) {
228228
m.mu.RLock()
229229
defer m.mu.RUnlock()
230230
for name, sql := range m.mu.preparedStatements {
@@ -233,10 +233,10 @@ func NewMultiConnPool(
233233
// communication to the server.
234234
if _, err := conn.Prepare(ctx, name, sql); err != nil {
235235
log.Dev.Warningf(ctx, "error preparing statement. name=%s sql=%s %v", name, sql, err)
236-
return false
236+
return false, nil
237237
}
238238
}
239-
return true
239+
return true, nil
240240
}
241241

242242
// Attach the supplied tracer to the ConnConfig.

0 commit comments

Comments
 (0)