Skip to content

Commit eff63f2

Browse files
authored
fix: recreate http request on each auth retry and clean up client state (#213)
- Move http.NewRequest inside auth retry loop in doRequest/doRequestRaw so the request body is not consumed on the first attempt - Use headers.Clone() per iteration to avoid header mutation across retries - Move snapshotClientState and clone helpers to clientstate.go - Clear resp.Data after JSON materialization to prevent dual-path state - Fix materializeJSONQueryRows early-return guard for empty typedRows slice - Add internal consistency check in rows.Next for dual typed/raw row state - Expose RowCount() and CellString() as public accessors on QueryResponse
1 parent 6bb24db commit eff63f2

8 files changed

Lines changed: 99 additions & 81 deletions

File tree

arrow.go

Lines changed: 14 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ func (c *APIClient) doRequestRaw(
9292
}
9393

9494
url := c.makeURL(path)
95-
httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
96-
if err != nil {
97-
return nil, errors.Wrap(err, "failed to create http request")
98-
}
99-
httpReq = httpReq.WithContext(ctx)
10095

10196
headers, err := c.makeHeaders(ctx)
10297
if err != nil {
@@ -113,11 +108,6 @@ func (c *APIClient) doRequestRaw(
113108
acceptType = jsonContentType
114109
}
115110
headers.Set(accept, acceptType)
116-
httpReq.Header = headers
117-
118-
if len(c.host) > 0 {
119-
httpReq.Host = c.host
120-
}
121111

122112
authRetryLimit := 2
123113
for i := 1; i <= authRetryLimit; i++ {
@@ -127,6 +117,16 @@ func (c *APIClient) doRequestRaw(
127117
default:
128118
}
129119

120+
httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
121+
if err != nil {
122+
return nil, errors.Wrap(err, "failed to create http request")
123+
}
124+
httpReq = httpReq.WithContext(ctx)
125+
httpReq.Header = headers.Clone()
126+
if len(c.host) > 0 {
127+
httpReq.Host = c.host
128+
}
129+
130130
httpResp, err := c.cli.Do(httpReq)
131131
if err != nil {
132132
return nil, errors.Wrap(ErrDoRequest, err.Error())
@@ -164,6 +164,7 @@ func (c *APIClient) doRequestRaw(
164164
}, nil
165165
}
166166

167+
// unreachable: loop always returns inside on the final iteration
167168
return nil, errors.Errorf("failed to do request after %d retries", authRetryLimit)
168169
}
169170

@@ -615,6 +616,9 @@ func formatArrowTimestampTZDecimalValue(value decimal128.Num) (string, error) {
615616
return ts.In(zone).Format("2006-01-02 15:04:05.000000 -0700"), nil
616617
}
617618

619+
// arrowTimestampTZDecimalToTime decodes a Timestamp_Tz value encoded as Decimal128.
620+
// The server encodes: LowBits = microseconds since Unix epoch (uint64 cast to int64),
621+
// HighBits = timezone offset in seconds from UTC (uint64 cast to int32).
618622
func arrowTimestampTZDecimalToTime(value decimal128.Num) time.Time {
619623
ts := time.UnixMicro(clampArrowTimestampMicros(int64(value.LowBits())))
620624
zone := time.FixedZone("", int(int32(value.HighBits())))
@@ -678,52 +682,6 @@ func clampArrowTimestampMicros(value int64) int64 {
678682
}
679683
}
680684

681-
func (c *APIClient) snapshotClientState() func() {
682-
querySeq := c.QuerySeq
683-
routeHint := c.routeHint
684-
nodeID := c.nodeID
685-
stateRestored := c.stateRestored
686-
sessionStateRaw := cloneRawMessage(c.sessionStateRaw)
687-
sessionState := cloneSessionState(c.sessionState)
688-
689-
return func() {
690-
c.QuerySeq = querySeq
691-
c.routeHint = routeHint
692-
c.nodeID = nodeID
693-
c.stateRestored = stateRestored
694-
c.sessionStateRaw = sessionStateRaw
695-
c.sessionState = sessionState
696-
}
697-
}
698-
699-
func cloneRawMessage(raw *json.RawMessage) *json.RawMessage {
700-
if raw == nil {
701-
return nil
702-
}
703-
cloned := json.RawMessage(append([]byte(nil), (*raw)...))
704-
return &cloned
705-
}
706-
707-
func cloneSessionState(state *SessionState) *SessionState {
708-
if state == nil {
709-
return nil
710-
}
711-
712-
cloned := *state
713-
if state.SecondaryRoles != nil {
714-
roles := append([]string(nil), (*state.SecondaryRoles)...)
715-
cloned.SecondaryRoles = &roles
716-
}
717-
if state.Settings != nil {
718-
settings := make(map[string]string, len(state.Settings))
719-
for key, value := range state.Settings {
720-
settings[key] = value
721-
}
722-
cloned.Settings = settings
723-
}
724-
return &cloned
725-
}
726-
727685
func formatArrowScalar(value interface{}) (string, error) {
728686
switch v := value.(type) {
729687
case nil:

arrow_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,10 @@ func TestQuerySyncFallsBackToJSONWhenArrowRequested(t *testing.T) {
442442

443443
resp, err := client.QuerySync(context.Background(), "SELECT 1")
444444
require.NoError(t, err)
445-
require.Len(t, resp.Data, 1)
446-
require.NotNil(t, resp.Data[0][0])
447-
assert.Equal(t, "1", *resp.Data[0][0])
445+
require.Equal(t, 1, resp.RowCount())
446+
v, ok := resp.CellString(0, 0)
447+
require.True(t, ok)
448+
assert.Equal(t, "1", v)
448449

449450
mu.Lock()
450451
defer mu.Unlock()
@@ -495,9 +496,10 @@ func TestQuerySyncUsesJSONForRestoredState(t *testing.T) {
495496

496497
resp, err := client.QuerySync(context.Background(), "SELECT 1")
497498
require.NoError(t, err)
498-
require.Len(t, resp.Data, 1)
499-
require.NotNil(t, resp.Data[0][0])
500-
assert.Equal(t, "1", *resp.Data[0][0])
499+
require.Equal(t, 1, resp.RowCount())
500+
v, ok := resp.CellString(0, 0)
501+
require.True(t, ok)
502+
assert.Equal(t, "1", v)
501503

502504
mu.Lock()
503505
defer mu.Unlock()

client.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -251,30 +251,19 @@ func (c *APIClient) doRequest(ctx context.Context, method, path string, req inte
251251
}
252252

253253
url := c.makeURL(path)
254-
httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
255-
if err != nil {
256-
return errors.Wrap(err, "failed to create http request")
257-
}
258-
259-
httpReq = httpReq.WithContext(ctx)
260254

261255
headers, err := c.makeHeaders(ctx)
262-
if needSticky && len(c.nodeID) != 0 {
263-
headers.Set(DatabendQueryStickyNode, c.nodeID)
264-
}
265256
if err != nil {
266257
return errors.Wrap(err, "failed to make request headers")
267258
}
259+
if needSticky && len(c.nodeID) != 0 {
260+
headers.Set(DatabendQueryStickyNode, c.nodeID)
261+
}
268262
if method == "GET" && len(c.nodeID) != 0 {
269263
headers.Set(DatabendQueryIDNode, c.nodeID)
270264
}
271265
headers.Set(contentType, jsonContentType)
272266
headers.Set(accept, jsonContentType)
273-
httpReq.Header = headers
274-
275-
if len(c.host) > 0 {
276-
httpReq.Host = c.host
277-
}
278267

279268
authRetryLimit := 2
280269
for i := 1; i <= authRetryLimit; i++ {
@@ -284,6 +273,16 @@ func (c *APIClient) doRequest(ctx context.Context, method, path string, req inte
284273
default:
285274
}
286275

276+
httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
277+
if err != nil {
278+
return errors.Wrap(err, "failed to create http request")
279+
}
280+
httpReq = httpReq.WithContext(ctx)
281+
httpReq.Header = headers.Clone()
282+
if len(c.host) > 0 {
283+
httpReq.Host = c.host
284+
}
285+
287286
httpResp, err := c.cli.Do(httpReq)
288287
if err != nil {
289288
return errors.Wrap(ErrDoRequest, err.Error())

clientstate.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,52 @@ import (
55
"net/http"
66
)
77

8+
func (c *APIClient) snapshotClientState() func() {
9+
querySeq := c.QuerySeq
10+
routeHint := c.routeHint
11+
nodeID := c.nodeID
12+
stateRestored := c.stateRestored
13+
sessionStateRaw := cloneRawMessage(c.sessionStateRaw)
14+
sessionState := cloneSessionState(c.sessionState)
15+
16+
return func() {
17+
c.QuerySeq = querySeq
18+
c.routeHint = routeHint
19+
c.nodeID = nodeID
20+
c.stateRestored = stateRestored
21+
c.sessionStateRaw = sessionStateRaw
22+
c.sessionState = sessionState
23+
}
24+
}
25+
26+
func cloneRawMessage(raw *json.RawMessage) *json.RawMessage {
27+
if raw == nil {
28+
return nil
29+
}
30+
cloned := json.RawMessage(append([]byte(nil), (*raw)...))
31+
return &cloned
32+
}
33+
34+
func cloneSessionState(state *SessionState) *SessionState {
35+
if state == nil {
36+
return nil
37+
}
38+
39+
cloned := *state
40+
if state.SecondaryRoles != nil {
41+
roles := append([]string(nil), (*state.SecondaryRoles)...)
42+
cloned.SecondaryRoles = &roles
43+
}
44+
if state.Settings != nil {
45+
settings := make(map[string]string, len(state.Settings))
46+
for key, value := range state.Settings {
47+
settings[key] = value
48+
}
49+
cloned.Settings = settings
50+
}
51+
return &cloned
52+
}
53+
854
type APIClientState struct {
955
SessionID string
1056
QuerySeq int64

query.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (r *QueryResponse) ReadFinished() bool {
6262
return r.NextURI == "" || strings.Contains(r.NextURI, "/final")
6363
}
6464

65+
func (r *QueryResponse) RowCount() int {
66+
return r.bufferedRowCount()
67+
}
68+
6569
func (r *QueryResponse) bufferedRowCount() int {
6670
if r == nil {
6771
return 0
@@ -85,6 +89,10 @@ func (r *QueryResponse) cellValue(rowIdx, colIdx int) (driver.Value, bool) {
8589
return nil, false
8690
}
8791

92+
func (r *QueryResponse) CellString(rowIdx, colIdx int) (string, bool) {
93+
return r.cellString(rowIdx, colIdx)
94+
}
95+
8896
func (r *QueryResponse) cellString(rowIdx, colIdx int) (string, bool) {
8997
value, ok := r.cellValue(rowIdx, colIdx)
9098
if !ok || value == nil {

result_rows.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func queryResponseColumnTypeOptions(settings *Settings) (*ColumnTypeOptions, err
2828
}
2929

3030
func materializeJSONQueryRows(resp *QueryResponse) error {
31-
if resp == nil || resp.typedRows != nil || len(resp.Data) == 0 {
31+
if resp == nil || (resp.typedRows != nil && len(resp.typedRows) > 0) || len(resp.Data) == 0 {
3232
return nil
3333
}
3434
if resp.Schema == nil || len(*resp.Schema) != len(resp.Data[0]) {
@@ -62,6 +62,7 @@ func materializeJSONQueryRows(resp *QueryResponse) error {
6262
rows = append(rows, row)
6363
}
6464
resp.typedRows = rows
65+
resp.Data = nil
6566
return nil
6667
}
6768

rows.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ func (r *nextRows) Next(dest []driver.Value) error {
173173
typedRow = r.respData.typedRows[0]
174174
r.respData.typedRows = r.respData.typedRows[1:]
175175
}
176+
if len(typedRow) != 0 && len(lineData) != 0 {
177+
return errors.New("query error: internal error, both typed and raw row data are set")
178+
}
176179
if len(typedRow) != 0 {
177180
if len(typedRow) != len(r.columns) {
178181
return errors.New("query error: internal error, typed data and schema not match")

tests/resume_query_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (s *DatabendTestSuite) TestResumeQueryWithSessionState() {
4343
finalResp, err := secondClient.PollUntilQueryEnd(ctx, resumeResp)
4444
s.Require().NoError(err)
4545
s.Require().NotNil(finalResp)
46+
s.Greater(finalResp.RowCount(), 0)
4647
s.NoError(secondClient.CloseQuery(ctx, finalResp))
4748
}
4849

@@ -65,10 +66,10 @@ func (s *DatabendTestSuite) TestSessionSettingLoadWithState() {
6566
client2.WithState(state)
6667
resp, err := client2.QuerySync(ctx, fmt.Sprintf("SELECT value FROM system.settings WHERE name = '%s'", settingKey))
6768
s.Require().NoError(err)
68-
s.Require().Greater(len(resp.Data), 0)
69-
s.Require().Greater(len(resp.Data[0]), 0)
70-
s.Require().NotNil(resp.Data[0][0])
71-
s.Equal(fmt.Sprintf("%d", settingValue), *resp.Data[0][0])
69+
s.Require().Greater(resp.RowCount(), 0)
70+
v, ok := resp.CellString(0, 0)
71+
s.Require().True(ok)
72+
s.Equal(fmt.Sprintf("%d", settingValue), v)
7273

7374
roundedState := client2.GetState()
7475
s.Require().NotNil(roundedState)

0 commit comments

Comments
 (0)