Skip to content

Commit 3696079

Browse files
committed
fix(streaming): decouple stream ctx timeout from per-recv timeout to fix ttstream connection reuse
Previously, RecvMsg created a child context from the stream ctx for recv timeout. This coupled stream-level timeout with per-recv timeout, causing incorrect behavior: - When DisableCancelRemote=true, stream ctx expiration was misidentified as recv timeout, preventing the RST frame from being sent and breaking connection reuse. - When stream ctx expired before recv timeout, the error was reported as recv timeout instead of stream timeout. Changes: - Redesign Pipe to support two-level ctx: a pipe-level ctx (stream lifecycle) and a per-read ctx (individual recv timeout), each with its own callback - Separate stream timeout (error 12015) from recv timeout (error 12014) so they trigger independent close/cancel logic - Fix Pipe.ReadCtx to drain remaining items after detecting closed state, preventing data loss in Write+Close race - Move container package to internal to prevent external dependency - Update streaming interface comments to accurately describe client/server side RecvMsg/SendMsg semantics and error caching behavior
1 parent 89f0337 commit 3696079

28 files changed

Lines changed: 990 additions & 457 deletions

pkg/remote/trans/ttstream/client_handler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package ttstream
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"github.com/bytedance/gopkg/cloud/metainfo"
2324
"github.com/cloudwego/gopkg/protocol/ttheader"
@@ -74,6 +75,10 @@ func (c clientTransHandler) NewStream(ctx context.Context, ri rpcinfo.RPCInfo) (
7475
}
7576
strHeader[ttheader.HeaderIDLServiceName] = invocation.ServiceName()
7677
metainfo.SaveMetaInfoToMap(ctx, strHeader)
78+
var tm time.Duration
79+
if ddl, ok := ctx.Deadline(); ok {
80+
tm = time.Until(ddl)
81+
}
7782

7883
trans, err := c.transPool.Get(addr.Network(), addr.String())
7984
if err != nil {
@@ -87,6 +92,7 @@ func (c clientTransHandler) NewStream(ctx context.Context, ri rpcinfo.RPCInfo) (
8792
cs.setRecvTimeoutConfig(rconfig)
8893
cs.setMetaFrameHandler(c.metaHandler)
8994
cs.setTraceController(c.traceCtl)
95+
cs.setStreamTimeout(tm)
9096

9197
if err = trans.WriteStream(ctx, cs, intHeader, strHeader); err != nil {
9298
return nil, err

pkg/remote/trans/ttstream/client_trans_pool_longconn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package ttstream
1919
import (
2020
"time"
2121

22-
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/container"
22+
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/internal/container"
2323
)
2424

2525
var DefaultLongConnConfig = LongConnConfig{

pkg/remote/trans/ttstream/container/pipe.go

Lines changed: 0 additions & 152 deletions
This file was deleted.

pkg/remote/trans/ttstream/container/pipe_test.go

Lines changed: 0 additions & 163 deletions
This file was deleted.

pkg/remote/trans/ttstream/exception.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"strings"
23+
"time"
2324

2425
"github.com/cloudwego/kitex/pkg/kerrors"
2526
"github.com/cloudwego/kitex/pkg/streaming"
@@ -49,6 +50,10 @@ func newStreamRecvTimeoutException(cfg streaming.TimeoutConfig) *Exception {
4950
return newException(fmt.Sprintf("stream Recv timeout, timeout config=%+v", cfg), kerrors.ErrStreamingTimeout, 12014).withSide(clientSide)
5051
}
5152

53+
func newStreamTimeoutException(tm time.Duration) *Exception {
54+
return newException(fmt.Sprintf("stream timeout, timeout in ctx: %+v", tm), kerrors.ErrStreamingTimeout, 12015).withSide(clientSide)
55+
}
56+
5257
const (
5358
setSide = 1 << iota
5459
setCancelPath
File renamed without changes.

pkg/remote/trans/ttstream/container/object_pool.go renamed to pkg/remote/trans/ttstream/internal/container/object_pool.go

File renamed without changes.

pkg/remote/trans/ttstream/container/object_pool_test.go renamed to pkg/remote/trans/ttstream/internal/container/object_pool_test.go

File renamed without changes.

0 commit comments

Comments
 (0)