Skip to content

Commit 4333920

Browse files
committed
fix(ttstream): ttstream should not recycle connection when Recv timeout with DisableCancelRemote=true
1 parent 17be90b commit 4333920

28 files changed

Lines changed: 1353 additions & 452 deletions

pkg/remote/trans/ttstream/client_handler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package ttstream
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"github.com/bytedance/gopkg/cloud/metainfo"
2324
"github.com/cloudwego/gopkg/protocol/ttheader"
@@ -74,6 +75,10 @@ func (c clientTransHandler) NewStream(ctx context.Context, ri rpcinfo.RPCInfo) (
7475
}
7576
strHeader[ttheader.HeaderIDLServiceName] = invocation.ServiceName()
7677
metainfo.SaveMetaInfoToMap(ctx, strHeader)
78+
var tm time.Duration
79+
if ddl, ok := ctx.Deadline(); ok {
80+
tm = time.Until(ddl)
81+
}
7782

7883
trans, err := c.transPool.Get(addr.Network(), addr.String())
7984
if err != nil {
@@ -84,9 +89,10 @@ func (c clientTransHandler) NewStream(ctx context.Context, ri rpcinfo.RPCInfo) (
8489
cs := newClientStream(ctx, trans, streamFrame{sid: genStreamID(), method: method})
8590
// stream should be configured before WriteStream or there would be a race condition for metaFrameHandler
8691

87-
cs.setRecvTimeoutConfig(rconfig)
92+
cs.setRecvTimeoutConfig(rconfig, cs.recvTimeoutCallback)
8893
cs.setMetaFrameHandler(c.metaHandler)
8994
cs.setTraceController(c.traceCtl)
95+
cs.setStreamTimeout(tm)
9096

9197
if err = trans.WriteStream(ctx, cs, intHeader, strHeader); err != nil {
9298
return nil, err
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
//go:build !windows
2+
3+
/*
4+
* Copyright 2026 CloudWeGo Authors
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package ttstream
20+
21+
import (
22+
"context"
23+
"errors"
24+
"testing"
25+
"time"
26+
27+
"github.com/cloudwego/kitex/internal/test"
28+
"github.com/cloudwego/kitex/pkg/kerrors"
29+
"github.com/cloudwego/kitex/pkg/rpcinfo"
30+
"github.com/cloudwego/kitex/pkg/streaming"
31+
"github.com/cloudwego/kitex/pkg/utils"
32+
)
33+
34+
// mockTransPool implements transPool for testing clientTransHandler.NewStream
35+
type mockTransPool struct {
36+
getTrans *clientTransport
37+
getErr error
38+
}
39+
40+
func (p *mockTransPool) Get(network, addr string) (*clientTransport, error) {
41+
return p.getTrans, p.getErr
42+
}
43+
44+
func (p *mockTransPool) Put(trans *clientTransport) {}
45+
46+
// mockHeaderFrameWriteHandler implements HeaderFrameWriteHandler for testing
47+
type mockHeaderFrameWriteHandler struct {
48+
intHeader IntHeader
49+
strHeader streaming.Header
50+
err error
51+
}
52+
53+
func (h *mockHeaderFrameWriteHandler) OnWriteStream(ctx context.Context) (IntHeader, StrHeader, error) {
54+
return h.intHeader, h.strHeader, h.err
55+
}
56+
57+
func newTestRPCInfoCtx(cfg rpcinfo.RPCConfig) (context.Context, rpcinfo.RPCInfo) {
58+
if cfg == nil {
59+
cfg = rpcinfo.NewRPCConfig()
60+
}
61+
ri := rpcinfo.NewRPCInfo(
62+
rpcinfo.NewEndpointInfo("clientSvc", "testMethod", nil, nil),
63+
rpcinfo.NewEndpointInfo("serverSvc", "testMethod", utils.NewNetAddr("tcp", "127.0.0.1:8888"), nil),
64+
rpcinfo.NewInvocation("serverSvc", "testMethod"),
65+
cfg,
66+
nil,
67+
)
68+
ctx := rpcinfo.NewCtxWithRPCInfo(context.Background(), ri)
69+
return ctx, ri
70+
}
71+
72+
func TestNewStream(t *testing.T) {
73+
t.Run("no dest address", func(t *testing.T) {
74+
handler := &clientTransHandler{
75+
transPool: &mockTransPool{},
76+
}
77+
ri := rpcinfo.NewRPCInfo(
78+
nil,
79+
rpcinfo.NewEndpointInfo("serverSvc", "testMethod", nil, nil), // addr = nil
80+
rpcinfo.NewInvocation("serverSvc", "testMethod"),
81+
rpcinfo.NewRPCConfig(), nil,
82+
)
83+
ctx := rpcinfo.NewCtxWithRPCInfo(context.Background(), ri)
84+
_, err := handler.NewStream(ctx, ri)
85+
test.Assert(t, errors.Is(err, kerrors.ErrNoDestAddress), err)
86+
})
87+
t.Run("transPool.Get() fails", func(t *testing.T) {
88+
getErr := errors.New("pool get error")
89+
handler := &clientTransHandler{
90+
transPool: &mockTransPool{getErr: getErr},
91+
}
92+
ctx, ri := newTestRPCInfoCtx(nil)
93+
_, err := handler.NewStream(ctx, ri)
94+
test.Assert(t, err == getErr, err)
95+
})
96+
t.Run("headerHandler.OnWriteStream() fails", func(t *testing.T) {
97+
hdlErr := errors.New("header handler error")
98+
handler := &clientTransHandler{
99+
transPool: &mockTransPool{},
100+
headerHandler: &mockHeaderFrameWriteHandler{err: hdlErr},
101+
}
102+
ctx, ri := newTestRPCInfoCtx(nil)
103+
_, err := handler.NewStream(ctx, ri)
104+
test.Assert(t, err == hdlErr, err)
105+
})
106+
t.Run("success with StreamRecvTimeout", func(t *testing.T) {
107+
cconn, sconn := newTestConnectionPipe(t)
108+
defer cconn.Close()
109+
defer sconn.Close()
110+
111+
ctrans := newClientTransport(cconn, nil)
112+
handler := &clientTransHandler{
113+
transPool: &mockTransPool{getTrans: ctrans},
114+
}
115+
cfg := rpcinfo.NewRPCConfig()
116+
rpcinfo.AsMutableRPCConfig(cfg).SetStreamRecvTimeout(100 * time.Millisecond)
117+
ctx, ri := newTestRPCInfoCtx(cfg)
118+
119+
cs, err := handler.NewStream(ctx, ri)
120+
test.Assert(t, err == nil, err)
121+
test.Assert(t, cs != nil)
122+
123+
cliSt := cs.(*clientStream)
124+
test.Assert(t, cliSt.recvTimeoutConfig.Timeout == 100*time.Millisecond, cliSt.recvTimeoutConfig)
125+
test.Assert(t, !cliSt.recvTimeoutConfig.DisableCancelRemote, cliSt.recvTimeoutConfig)
126+
test.Assert(t, cliSt.stream.recvTimeoutCallback != nil)
127+
})
128+
t.Run("success with StreamRecvTimeoutConfig", func(t *testing.T) {
129+
cconn, sconn := newTestConnectionPipe(t)
130+
defer cconn.Close()
131+
defer sconn.Close()
132+
133+
ctrans := newClientTransport(cconn, nil)
134+
handler := &clientTransHandler{
135+
transPool: &mockTransPool{getTrans: ctrans},
136+
}
137+
cfg := rpcinfo.NewRPCConfig()
138+
rpcinfo.AsMutableRPCConfig(cfg).SetStreamRecvTimeoutConfig(streaming.TimeoutConfig{
139+
Timeout: 100 * time.Millisecond,
140+
DisableCancelRemote: true,
141+
})
142+
ctx, ri := newTestRPCInfoCtx(cfg)
143+
144+
cs, err := handler.NewStream(ctx, ri)
145+
test.Assert(t, err == nil, err)
146+
test.Assert(t, cs != nil)
147+
148+
cliSt := cs.(*clientStream)
149+
test.Assert(t, cliSt.recvTimeoutConfig.Timeout == 100*time.Millisecond, cliSt.recvTimeoutConfig)
150+
test.Assert(t, cliSt.recvTimeoutConfig.DisableCancelRemote, cliSt.recvTimeoutConfig)
151+
test.Assert(t, cliSt.stream.recvTimeoutCallback != nil)
152+
})
153+
t.Run("both set, StreamRecvTimeoutConfig takes priority", func(t *testing.T) {
154+
cconn, sconn := newTestConnectionPipe(t)
155+
defer cconn.Close()
156+
defer sconn.Close()
157+
158+
ctrans := newClientTransport(cconn, nil)
159+
handler := &clientTransHandler{
160+
transPool: &mockTransPool{getTrans: ctrans},
161+
}
162+
cfg := rpcinfo.NewRPCConfig()
163+
rpcinfo.AsMutableRPCConfig(cfg).SetStreamRecvTimeout(500 * time.Millisecond)
164+
rpcinfo.AsMutableRPCConfig(cfg).SetStreamRecvTimeoutConfig(streaming.TimeoutConfig{
165+
Timeout: 100 * time.Millisecond,
166+
DisableCancelRemote: true,
167+
})
168+
ctx, ri := newTestRPCInfoCtx(cfg)
169+
170+
cs, err := handler.NewStream(ctx, ri)
171+
test.Assert(t, err == nil, err)
172+
test.Assert(t, cs != nil)
173+
174+
cliSt := cs.(*clientStream)
175+
test.Assert(t, cliSt.recvTimeoutConfig.Timeout == 100*time.Millisecond, cliSt.recvTimeoutConfig)
176+
test.Assert(t, cliSt.recvTimeoutConfig.DisableCancelRemote, cliSt.recvTimeoutConfig)
177+
test.Assert(t, cliSt.stream.recvTimeoutCallback != nil)
178+
})
179+
t.Run("stream ctx deadline sets streamTimeout", func(t *testing.T) {
180+
cconn, sconn := newTestConnectionPipe(t)
181+
defer cconn.Close()
182+
defer sconn.Close()
183+
184+
ctrans := newClientTransport(cconn, nil)
185+
handler := &clientTransHandler{
186+
transPool: &mockTransPool{getTrans: ctrans},
187+
}
188+
ctx, ri := newTestRPCInfoCtx(nil)
189+
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
190+
defer cancel()
191+
192+
cs, err := handler.NewStream(ctx, ri)
193+
test.Assert(t, err == nil, err)
194+
195+
cliSt := cs.(*clientStream)
196+
// streamTimeout should be set from ctx deadline
197+
test.Assert(t, cliSt.streamTimeout > 0 && cliSt.streamTimeout <= 500*time.Millisecond, cliSt.streamTimeout)
198+
})
199+
t.Run("headerHandler provides custom headers", func(t *testing.T) {
200+
cconn, sconn := newTestConnectionPipe(t)
201+
defer cconn.Close()
202+
defer sconn.Close()
203+
204+
ctrans := newClientTransport(cconn, nil)
205+
handler := &clientTransHandler{
206+
transPool: &mockTransPool{getTrans: ctrans},
207+
headerHandler: &mockHeaderFrameWriteHandler{
208+
intHeader: IntHeader{1: "val1"},
209+
strHeader: streaming.Header{"custom-key": "custom-val"},
210+
},
211+
}
212+
ctx, ri := newTestRPCInfoCtx(nil)
213+
214+
cs, err := handler.NewStream(ctx, ri)
215+
test.Assert(t, err == nil, err)
216+
test.Assert(t, cs != nil)
217+
})
218+
}

pkg/remote/trans/ttstream/client_trans_pool_longconn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package ttstream
1919
import (
2020
"time"
2121

22-
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/container"
22+
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/internal/container"
2323
)
2424

2525
var DefaultLongConnConfig = LongConnConfig{

0 commit comments

Comments
 (0)