diff --git a/content/en/blog/releases/Kitex/release-v0_13_0.md b/content/en/blog/releases/Kitex/release-v0_13_0.md new file mode 100644 index 00000000000..231d2b40fb9 --- /dev/null +++ b/content/en/blog/releases/Kitex/release-v0_13_0.md @@ -0,0 +1,114 @@ +--- +title: "Kitex Release v0.13.0" +linkTitle: "Release v0.13.0" +projects: ["Kitex"] +date: 2025-04-07 +description: > +--- + +> We recommend upgrading directly to Kitex version v0.13.1, as we have fixed a potential Goroutine leak issue of the gRPC Client in v0.13.0. + +## **Introduction to Key Changes** + +### **New Features** +1. **New streaming interface StreamX supports gRPC, existing Kitex gRPC users can migrate** + + v0.12.0 released the StreamX interface to optimise the streaming experience, and supported the custom streaming protocol TTHeader Streaming, but did not support gRPC. So existing users could not migrate. + + This version supports gRPC for StreamX, users can migrate to StreamX, and the Server side can be compatible with two streaming protocols at the same time. So there is no need to worry about protocol compatibility after interface migration. + + In particular, when adapting gRPC with StreamX, we found that there are still some inconvenient problems. In order to bring a better experience of using the interface, we have adjusted the StreamX interface for the second time, which will affect the users who have already been using StreamX. We apologise for that. + + User documentation: [StreamX User Documentation](/docs/kitex/tutorials/basic-feature/streamx) + +2. **Prutal - Protobuf's non-generated code serialisation library** + + [Prutal](https://github.com/cloudwego/prutal) is officially open source, on par with Thrift's [Frugal](https://github.com/cloudwego/frugal), and the new version of Kitex integrates Prutal by default. + + Advantages: + + - Minimized Code Product Size: Generating Only Structures, No Runtime Code + + - Leveraging Reflection Optimization Similar to Frugal, Achieving Over 50% Speed Increase + + - Generating Code Compatible with Existing Protobuf and Derivative Versions + + User documentation: [Prutal](/docs/kitex/tutorials/code-gen/prutal) + +### **Feature/Experience Optimization** +1. **TTHeader Streaming: Support interface-level Recv timeout control** + + In addition to the existing Client level, this release of TTHeader Streaming supports interface-level Recv timeout configuration, making configuration more flexible. + + User documentation: [StreamX Timeout Control](/docs/kitex/tutorials/basic-feature/streamx/kitex+streamx+-+stream+timeout+control/) + +2. **Default Thrift transport protocol changed from Buffered to Framed** + + This change can leverage FastCodec for better performance. + +### **Others** +1. **Code Product Simplification** + + - Kitex Tool would not generate the repeated verification code for Set data structure and the ```DeepEqual``` function for each structure by default. + + - If you only want to restore```DeepEqual```, add```-thrift gen_deep_equal=true```to the generation command. + + - If you want to restore the repeated verification of Set, add```-thrift validate_set=true, -thrift gen_deep_equal=true```to the generation command. + + - Kitex Tool would not generate the Apache Codec related code by default. + + - If you want to restore it, add```-thrift no_default_serdes=false```to the generation command. + +2. **Go Supported Version Change** + + Support version Go 1.19~1.24, the lowest supported version becomes Go 1.19. + + if Go version is too low, there will be a prompt when compiling:```note: module requires Go 1.19```. + +## **Full Change** +### Feature +[[#1719](https://github.com/cloudwego/kitex/pull/1719)] feat: prutal for replacing protoc + +[[#1736](https://github.com/cloudwego/kitex/pull/1736)] feat(ttstream): support WithRecvTimeout stream call option + +[[#1702](https://github.com/cloudwego/kitex/pull/1702)] feat(gRPC): add grpc client conn dump to help debug the conn and stream status + +[[#1723](https://github.com/cloudwego/kitex/pull/1723)] feat(codec/thrift): use fastcodec/frugal if apache codec not available + +[[#1724](https://github.com/cloudwego/kitex/pull/1724)] feat: add tail option to support for delayed initialization of some client options + +### Optimize +[[#1728](https://github.com/cloudwego/kitex/pull/1728)] optimize(apache): remove apache codec gen and set default protocol from buffered to framed + +[[#1732](https://github.com/cloudwego/kitex/pull/1732)] optimize(rpcinfo): purify the transport protocol of rpcinfo in a single rpc request + +[[#1711](https://github.com/cloudwego/kitex/pull/1711)] optimize(tool): disable set validate and deep equal code gen to simplify kitex_gen + +[[#1717](https://github.com/cloudwego/kitex/pull/1717)] optimize(gRPC): return more detailed error when received invalid http2 frame + +### Fix +[[#1734](https://github.com/cloudwego/kitex/pull/1734)] fix(ttstream): adjust stream state transition and remove all SetFinalizer to avoid memory leak + +[[#1735](https://github.com/cloudwego/kitex/pull/1735)] fix(generic): support both relative and absolute check for idl includes parse to make it compatible with generation tool + +[[#1725](https://github.com/cloudwego/kitex/pull/1725)] fix: code gen import issue for streamx mode, stream call judgement bug and set ttheader streaming as default + +[[#1727](https://github.com/cloudwego/kitex/pull/1727)] fix(tool): fix tool UseStdLib remains unexcepted lib issue + +### Refactor +[[#1658](https://github.com/cloudwego/kitex/pull/1658)] refactor: streamx api to adapt both grpc and ttheader streaming protocol and provide more user-friendly interface + +[[#1729](https://github.com/cloudwego/kitex/pull/1729)] refactor(tool): move pb tpl code to sep pkg + +### Chore +[[#1743](https://github.com/cloudwego/kitex/pull/1743)] chore: update dependencies version + +[[#1740](https://github.com/cloudwego/kitex/pull/1740)] chore(generic): deprecate NewThriftContentProvider + +[[#1741](https://github.com/cloudwego/kitex/pull/1741)] chore(streamx): remove redundant streamx package + +[[#1738](https://github.com/cloudwego/kitex/pull/1738)] ci: fix typos & crate-ci/typos + +[[#1737](https://github.com/cloudwego/kitex/pull/1737)] chore: update dependency and change go support to 1.19-1.24 + +[[#1720](https://github.com/cloudwego/kitex/pull/1720)] Revert "fix(ttstream): pingpong method refers to server interface defined in Kitex generation code when streamx is enabled and there are other streaming methods" diff --git a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Basic+Programming.md b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Basic+Programming.md index 4164a6707bf..a6dd76f5ec7 100644 --- a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Basic+Programming.md +++ b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Basic+Programming.md @@ -14,10 +14,10 @@ Current support: - Transport protocol: TTHeader - IDL Definition Language and Serialization Protocol: Thrift -- **gRPC Streaming** : ~~~~(planned implementation) +- **gRPC Streaming** - - ~~Transport protocol: gRPC~~ - - ~~IDL Definition Language and Serialization Protocol: Protobuf Encoding~~ + - Transport protocol: gRPC + - IDL Definition Language and Serialization Protocol: Thrift / Protobuf The protocol selected here only affects code generated from IDL. Regardless of the protocol, the following usage is consistent. @@ -44,38 +44,42 @@ service TestService { ``` #### Generate code +Please make sure that Kitex Tool has been upgraded to v0.13.0+: +``` +go install github.com/cloudwego/kitex/tool/cmd/kitex@latest +``` -To maintain compatibility with legacy stream-generated code, Command Line needs to add `the -streamx ` flag. +To maintain compatibility with legacy stream-generated code, Command Line needs to add the `-streamx` flag. ``` -kitex -streamx -module -service P.S.M echo.thrift +kitex -streamx -module -service service echo.thrift ``` -##### Initialization +### Initialization #### Create Client ```go import ".../kitex_gen/echo/testservice" -import "github.com/cloudwego/kitex/client/streamxclient" +import "github.com/cloudwego/kitex/client" cli, err := testservice.NewClient( "a.b.c", - streamxclient.WithStreamRecvMiddleware(...), - streamxclient.WithStreamSendMiddleware(...), + client.WithStreamRecvMiddleware(...), + client.WithStreamSendMiddleware(...), ) ``` #### Create Server ```go -import ".../kitex_gen/echo/streamserver" -import "github.com/cloudwego/kitex/server/streamxserver" +import ".../kitex_gen/echo/testservice" +import "github.com/cloudwego/kitex/server" -svr := streamserver.NewServer( +svr := testservice.NewServer( new(serviceImpl), - streamxserver.WithStreamRecvMiddleware(...), - streamxserver.WithStreamSendMiddleware(...), + server.WithStreamRecvMiddleware(...), + server.WithStreamSendMiddleware(...), ) ``` @@ -101,14 +105,14 @@ client.CloseAndRecv(res) === EOF ==> server.Recv(EOF) #### Client Usage -- [**Must**] : The client must call the CloseAndRecv () or (CloseSend + Recv) method to inform the server that there is no new data to send. +- [**Must**] : The client must call the CloseAndRecv() or (CloseSend + Recv) method to inform the server that there is no new data to send. ```go -ctx, cs, err := cli.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) for i := 0; i < 3; i++ { - err = cs.Send(ctx, req) + err = stream.Send(stream.Context(), req) } -res, err = cs.CloseAndRecv(ctx) +res, err = stream.CloseAndRecv(stream.Context()) ``` #### Server usage @@ -117,14 +121,14 @@ res, err = cs.CloseAndRecv(ctx) ```go -func (si *serviceImpl) ClientStream( - ctx context.Context, stream streamx.ClientStreamingServer[Request, Response] -) (res *Response, err error) { +func (si *serviceImpl) EchoClient( + ctx context.Context, stream echo.TestService_EchoClientServer +) (err error) { for { req, err := stream.Recv(ctx) if err == io.EOF { res := new(Response) - return res, nil + return stream.SendAndClose(ctx, res) } if err != nil { return nil, err @@ -153,12 +157,12 @@ client.Recv(EOF) <== EOF === server handler return #### Client Usage -- [**Must**] : The client must check the io. EOF error and end the loop +- [**Must**] : The client must check the io.EOF error and end the loop ```go -ctx, ss, err := cli.ServerStream(ctx, req) +stream, err := cli.EchoServer(ctx, req) for { - res, err := ss.Recv(ctx) + res, err := stream.Recv(stream.Context()) if errors.Is(err, io.EOF) { break } @@ -168,7 +172,7 @@ for { #### Server usage ```go -func (si *serviceImpl) ServerStream(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *serviceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) error { for i := 0; i < 3; i++ { err := stream.Send(ctx, resp) if err != nil { @@ -204,23 +208,23 @@ client.Recv(EOF) <== EOF === server handler return #### Client Usage - [**Must**] : client must call CloseSend after sending -- [**Must**] : client must judge io. EOF and end the loop when Recv +- [**Must**] : client must judge io.EOF and end the loop when Recv ```go -ctx, bs, err := cli.BidiStream(ctx) +stream, err := cli.EchoBidi(ctx) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for i := 0; i < round; i++ { - err := bs.Send(ctx, req) + err := stream.Send(stream.Context(), req) } - err = bs.CloseSend(ctx) + err = stream.CloseSend(stream.Context()) }() go func() { defer wg.Done() for { - res, err := bs.Recv(ctx) + res, err := stream.Recv(stream.Context()) if errors.Is(err, io.EOF) { break } @@ -234,7 +238,7 @@ wg.Wait() - [**Must**] : The server must determine io. EOF and end the loop when Recv ```go -func (si *serviceImpl) BidiStream(ctx context.Context, stream streamx.BidiStreamingServer[Request, Response]) error { +func (si *serviceImpl) EchoBidi(ctx context.Context, stream echo.TestService_EchoBidiServer) error { for { req, err := stream.Recv(ctx) if err == io.EOF { diff --git a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Error+Handling.md b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Error+Handling.md index 0479d64b808..0b225f43926 100644 --- a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Error+Handling.md +++ b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Error+Handling.md @@ -19,14 +19,14 @@ Unlike RPC, stream errors can occur at any time during stream processing. For e Server implementation: ```go -func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error { // 检查用户账户余额 for isHasBalance (req.UserId) { stream.Send(ctx, res) } // 返回用户余额不足错误 bizErr := kerrors.NewBizStatusErrorWithExtra( - 10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, + 10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, ) return bizErr } @@ -35,11 +35,11 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques Client implementation: ```go -svrStream, err = streamClient.ServerStreamWithErr(ctx, req) +stream, err = cli.ServerStreamWithErr(ctx, req) var err error for { - res, err = stream.Recv(ctx) + res, err = stream.Recv(stream.Context()) if err != nil { break } @@ -52,12 +52,12 @@ if ok { ### Other errors -If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as `(* thrift.ApplicationException) `. At this time, only the error Message can be obtained. +If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as `(*thrift.ApplicationException)`. At this time, only the error Message can be obtained. Server implementation: ```go -func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error { // ... return errors.New("test error") } @@ -66,16 +66,16 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques Client implementation: ```go -svrStream, err = streamClient.ServerStreamWithErr(ctx, req) -test.Assert(t, err == nil, err) +stream, err = cli.ServerStreamWithErr(ctx, req) var err error for { - res, err = stream.Recv(ctx) + res, err = stream.Recv(stream.Context()) if err != nil { break } } + ex, ok := err.(*thrift.ApplicationException) if ok { println(ex.TypeID(), ex.Msg()) diff --git a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Metainfo.md b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Metainfo.md index b348901d253..d5216a0e6f1 100644 --- a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Metainfo.md +++ b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Metainfo.md @@ -8,7 +8,7 @@ description: "" ## Preface -The overall usage of message pass-through is similar to [Kitex - 元信息透传](https://bytedance.larkoffice.com/wiki/Y3ChwldJzihF4Vkb6Ekcie38no4), except that each stream can only **pass-through meta-information when created** , and sending messages cannot pass-through. +The overall usage of message pass-through is similar to [Metainfo](/docs/kitex/tutorials/advanced-feature/metainfo/), except that each stream can only **pass-through meta-information when created** , and sending messages cannot pass-through. ## User guide @@ -18,14 +18,15 @@ The overall usage of message pass-through is similar to [Kitex - 元信息透传 ctx = metainfo.WithPersistentValue(ctx, "k1", "v1") ctx = metainfo.WithValue(ctx, "k2", "v2") -s, err := streamClient.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) ``` +> If using gRPC streaming, make sure the key is uppercase and use _ instead of - or it will be discarded. TTHeader streaming does not make this requirement. ### Server receives meta message ```go -func (s *streamingService) ClientStream(ctx context.Context, - stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { +func (s *streamingService) EchoClient(ctx context.Context, + stream echo.TestService_EchoClientServer) (err error) { v, ok := metainfo.GetPersistentValue(ctx, "k1") // v == "v1" @@ -36,20 +37,24 @@ func (s *streamingService) ClientStream(ctx context.Context, ### Server reverse pass-through meta-message -Reverse pass-through introduces a new concept, Header and Trailer. Any complete data stream must include Header and Trailer. Use these two frames to reverse pass-through information. +Reverse pass-through introduces a new concept, Header and Trailer. Any complete data stream must include Header and Trailer. + +Use these two frames to reverse pass-through information. ```go -func (s *streamingService) ClientStream(ctx context.Context, - stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { +import "github.com/cloudwego/pkg/streaming" + +func (s *streamingService) EchoClient(ctx context.Context, + stream echo.TestService_EchoClientServer) (err error) { - // SetTrailer set 的 trailer 会在 server handler 结束后发送 - err := stream.SetTrailer(streamx.Trailer{"t1": "v1"}) - // 立刻发送 Header - err = stream.SendHeader(streamx.Header{"h1": "v1"}) + // the trailer set by SetTrailer would be sent after server handler finishing + err := stream.SetTrailer(streaming.Trailer{"t1": "v1"}) + // send Header directly + err = stream.SendHeader(streaming.Header{"h1": "v1"}) if err != nil { return err } - // 发送正常数据 + // send normal data err = stream.Send(req) } ``` @@ -57,12 +62,12 @@ func (s *streamingService) ClientStream(ctx context.Context, ### Client receives a reverse pass-through meta message ```go -s, err := streamClient.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) -// Header/Trailer 函数会一直阻塞到对端发送了 Header/Trailer 为止,或中间发生了错误 -hd, err := s.Header() +// Header/Trailer calling would block constantly until the remote side has sent Header/Trailer, or there was an error in the intermediate process +hd, err := stream.Header() // hd["h1"] == "v1" -tl, err := s.Trailer() +tl, err := stream.Trailer() // tl["t1"] == "v1" ``` @@ -72,6 +77,6 @@ tl, err := s.Trailer() Because the concept of flow is different from , under the flow, the Header can be sent independently, which means that my server can send the Header in the first second, send the Data after 10 seconds, and send the Trailer after 1 second. -At the same time, the client can also choose whether to call `.Header () ` or `.Trailer () ` to block. +At the same time, the client can also choose whether to call `.Header()` or `.Trailer ()` to block. -Therefore, the semantics of traditional CTX cannot meet the reverse pass-through function of streams. Forward pass-through is still consistent with the original . +Therefore, the semantics of traditional ctx cannot meet the reverse pass-through function of streams. Forward pass-through is still consistent with the original. diff --git a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Middleware.md b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Middleware.md index ff4f3b64ca2..a159a5c7276 100644 --- a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Middleware.md +++ b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Middleware.md @@ -8,6 +8,27 @@ description: "" ## Middleware Type +### Stream Middleware + +**Trigger timing**: Called when stream is created + +#### Type definition +```go +// client: github.com/cloudwego/kitex/pkg/endpoint/cep +type StreamEndpoint func(ctx context.Context) (stream streaming.ClientStream, err error) +type StreamMiddleware func(next StreamEndpoint) StreamEndpoint + +// server: github.com/cloudwego/kitex/pkg/endpoint/sep +type StreamEndpoint func(ctx context.Context, stream streaming.ServerStream) (err error) +type StreamMiddleware func(next StreamEndpoint) StreamEndpoint +``` + +**Parameter description**: + +- ```stream``` is the stream object created for a single RPC +- After the ```next``` function is executed in the client middleware, the stream is created +- After the ```next``` function is executed in the Server middleware, the server handler completes processing. + ### Stream Recv/Send Middleware **Trigger timing** : Called when streaming messages @@ -15,79 +36,105 @@ description: "" #### Type definition ```go -type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error) -type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error) +// client: github.com/cloudwego/kitex/pkg/endpoint/cep +type StreamRecvEndpoint func(ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) +type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint +// server: github.com/cloudwego/kitex/pkg/endpoint/sep +type StreamRecvEndpoint func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint -type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint + +// similar to send middleware ... ``` -**Parameter description** : +**Parameter description**: - Directly obtain the current stream object - Res/req both represent real requests and responses. - Behavior before and after calling the Next function: -| Middleware type | Before calling Next | After calling Next | -| -------------------- | ---------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | -| StreamRecvMiddleware | - The data is not really collected, just called the stream. Recv () function.

- Res parameter is empty | - Data received or encountered an error

- The res parameter has a real value | +| Middleware type | Before calling Next | After calling Next | +| -------------------- | --------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | +| StreamRecvMiddleware | - The data is not really collected, just called the stream.Recv () function.

- Res parameter is empty | - Data received or encountered an error

- The res parameter has a real value | | StreamSendMiddleware | - The data was not actually sent, just called the stream.Send () function

- The req parameter is a real request | - Data transmission completed or encountered an error

- The req parameter is a real request | -#### Examples +### Unary Middlware -**Usage scenario** : Inject relevant business logic when the stream receives/sends messages. +For all non-streaming interfaces, we additionally provide ```UnaryMiddleware``` for injecting middleware that only works on all unary methods. +It is identical to the ```WithMiddleware``` natively supported by kitex, with the difference that the latter can also work on streaming methods. ```go -svr, err := xxx.NewServer( - //... - streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - // ctx has user's token - token, ok := metainfo.GetPersistentValue(ctx, "user_token") - // check token balance - if !hasBalance(token) { - return fmt.Errorf("user dont have enough balance: token=%s", token) - } - return next(ctx, stream, res) - } - }), -) +type UnaryEndpoint Endpoint +type UnaryMiddleware func(next UnaryEndpoint) UnaryEndpoint + +// client.WithUnaryOptions(client.WithUnaryMiddleware(mw)) +// server.WithUnaryOptions(server.WithUnaryMiddleware(mw)) ``` -## Inject Middleware +## Inject Middlewares -#### Inject Client Middleware +### Inject client-side middlewares ```go +import "github.com/cloudwego/client" + cli, err := xxx.NewClient( "a.b.c", - streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - return next(ctx, stream, res) - } - }), - streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { - return func(ctx context.Context, stream streamx.Stream, req any) (err error) { - return next(ctx, stream, req) - } - }), + client.WithStreamOptions( + client.WithStreamMiddleware(func (next cep.StreamEndpoint) cep.StreamEndpoint { + return func (ctx context.Context) (stream streaming.ClientStream, err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("create stream, method: ", ri.Invocation().MethodName()) + return next(ctx) + } + }), + client.WithStreamSendMiddleware(func (next cep.StreamSendEndpoint) cep.StreamSendEndpoint { + return func (ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("stream send message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + client.WithStreamRecvMiddleware(func (next cep.StreamRecvEndpoint) cep.StreamRecvEndpoint { + return func (ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("stream recv message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + ), ) ``` -#### Inject Server Middleware +### Inject server-side middlewares ```go -server, err := xxx.NewServer( - // .... - streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - return next(ctx, stream, res) - } - }), - streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { - return func(ctx context.Context, stream streamx.Stream, req any) (err error) { - return next(ctx, stream, req) - } - }), +import "github.com/cloudwego/server" + +svr, err := xxx.NewServer( + //... + server.WithStreamOptions( + server.WithStreamMiddleware(func(next sep.StreamEndpoint) sep.StreamEndpoint { + return func(ctx context.Context, st streaming.ServerStream) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("create stream, method: ", ri.Invocation().MethodName()) + return next(ctx, st) + } + }), + server.WithStreamRecvMiddleware(func(next sep.StreamRecvEndpoint) sep.StreamRecvEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("stream recv message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + server.WithStreamSendMiddleware(func(next sep.StreamSendEndpoint) sep.StreamSendEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("stream send message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + ), ) ``` diff --git a/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Timeout+Control.md b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Timeout+Control.md new file mode 100644 index 00000000000..2713ed5d83c --- /dev/null +++ b/content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Timeout+Control.md @@ -0,0 +1,33 @@ +--- +title: "StreamX Timeout Control" +date: 2025-01-10 +weight: 5 +keywords: ["Stream Timeout Control Best Practices"] +description: "" +--- + +## Timeout Control Mechanism Supported by TTHeader Streaming +### Recv Timeout +Control timeout for every ```Recv```。 +#### Client Level +It will take effect for every Interface of this Client. + +``` +import ( + "github.com/cloudwego/kitex/client" +) + +cli, err := NewClient("service", client.WithStreamOptions(client.WithStreamRecvTimeout(timeout))) +``` +#### Interface Level(Kitex >= v0.13.0) +It will take effect for single interface. + +``` +import ( + "github.com/cloudwego/kitex/client/callopt/streamcall" +) + +cli, err := NewClient("service") +// Calling one of the downstream streaming interfaces +stream, err := cli.NewStream(ctx, streamcall.WithRecvTimeout(timeout)) +``` diff --git a/content/en/docs/kitex/Tutorials/code-gen/prutal.md b/content/en/docs/kitex/Tutorials/code-gen/prutal.md new file mode 100644 index 00000000000..220172fb1d7 --- /dev/null +++ b/content/en/docs/kitex/Tutorials/code-gen/prutal.md @@ -0,0 +1,86 @@ +--- +title: "Prutal" +date: 2025-04-09 +weight: 11 +keywords: ["Prutal"] +description: "Protobuf's non-generated code serialisation library" +--- + +## **Abstraction** + +Starting from v0.13.0, Kitex will use its own Prutal implementation to replace Protobuf's protoc and protoc-gen-go for code generation and serialisation. + +Kitex will no longer need to install protoc, protoc-gen-go. New users will not be aware of the change. + +Prutal is compatible with the existing Protobuf in terms of serialisation, and generates code that is very compact, has no extra code, and has better serialisation performance than the official one. + +Technically Prutal and Frugal are similar to the use of struct tags and efficient reflection to achieve serialisation, and does not rely on redundant generation code. + +Currently open source, the specific implementation and subsequent benchmark data can be viewed: https://github.com/cloudwego/prutal + +## **Using Advices** + +Initially, it is not recommended that users use Prutal directly, and currently only promises forward and backward compatibility with Prutal integration in Kitex. + +If you find any problems while using Kitex, you can fallback to Protobuf using the environment variable KITEX_TOOL_USE_PROTOC + +``` +KITEX_TOOL_USE_PROTOC=1 kitex --version +``` + +In the future, Kitex will deprecate the protoc implementation. + +It is recommended that if you encounter problems with usage, you can provide feedback to help improve it. + +## **Kitex Tool Updates** + +Since protoc is no longer used as the default code generation tool, the following arguments will be deprecated: + + --protobuf + + --protobuf-plugin + +These two arguments are mainly used for passthrough to protoc, and are not actually used by Kitex itself. If they are still specified, an error will be reported: + +``` +[ERROR] invalid value ‘xxx’ for flag -protobuf: flag is deprecated +[ERROR] invalid value ‘xxx’ for flag -protobuf-plugin: flag is deprecated +``` + +Users can call protoc and related plugins for code generation if they want. Instead of relying on Kitex to call protoc. + +Due to the complexity of generating paths in older protoc implementations, the following arguments do not work in older implementations: + + -gen-path + +which defaults to kitex_gen, has been fixed in the new Prutal. + +## **Prutal and Protobuf compatibility issues** + +Kitex uses Prutal to generate code by default. + +If the user does Marshal / Unmarshal directly from the protobuf library, the new Kitex will generate code with a compile-time error: + +``` +cannot use &YourRequest{} (value of type *YourRequest) as protoreflect.ProtoMessage value in argument to proto.Marshal: *YourRequest does not implement protoreflect.ProtoMessage (missing method ProtoReflect) +``` + +This is because the Protobuf library is strongly bound to the Protobuf generation code, and Protobuf needs to generate a lot of binary data to assist its reflection implementation. + +It is recommended to use the github.com/cloudwego/prutal package directly. + +``` +// MarshalAppend appends the protobuf encoding of v to b and returns the new bytes +func MarshalAppend(b []byte, v interface{}) ([]byte, error) + +// Unmarshal parses the protobuf-encoded data and stores the result in the value pointed to by v. +func Unmarshal(b []byte, v interface{}) error +``` + +Prutal is compatible with Protobuf generated code. So even if the original code was generated by the official Protobuf, you can use Prutal to serialise it for better performance. + +Since Protobuf is complex and has a lot of features, Prutal can't guarantee 100% compatibility, so if you find any problems, please feel free to pull an issue. + +## **Feedback to us** + +If you have any questions, you can submit an [issue](https://github.com/cloudwego/prutal/issues) to Prutal for feedback. diff --git a/content/zh/blog/releases/Kitex/release-v0_13_0.md b/content/zh/blog/releases/Kitex/release-v0_13_0.md new file mode 100644 index 00000000000..d587b0c86a9 --- /dev/null +++ b/content/zh/blog/releases/Kitex/release-v0_13_0.md @@ -0,0 +1,111 @@ +--- +title: "Kitex Release v0.13.0" +linkTitle: "Release v0.13.0" +projects: ["Kitex"] +date: 2025-04-09 +description: > +--- + +> 建议直接升级 Kitex 版本到 v0.13.1,因为我们对 v0.13.0 里 gRPC Client 潜在的 Goroutine 泄漏问题进行了修复 + +## **重要变更介绍** + +### **新特性** +1. **新流式接口 StreamX 支持 gRPC,存量 Kitex gRPC 用户可迁移** + + v0.12.0 发布了 StreamX 接口优化流式体验,支持了自定义流式协议 TTHeader Streaming,但未支持 gRPC,导致存量用户无法迁移。 + + v0.13.0 对 StreamX 支持 gRPC 后,用户可迁移至 StreamX 新接口,Server 端可以同时兼容两个流式协议,无需担心接口迁移后的协议兼容性问题。 + + 特别地,StreamX 在适配 gRPC 时,发现依然有一些不便利的问题,为带来更好的接口使用体验,因此对 StreamX 接口做了二次调整。 + + 已经使用 v0.12.* 的 StreamX 用户会带来影响,在这里表示抱歉。 + + 详见 [StreamX 用户文档](/zh/docs/kitex/tutorials/basic-feature/streamx)。 + +2. **Prutal - Protobuf 的无生成代码序列化库** + + [Prutal](https://github.com/cloudwego/prutal) 正式开源,对标 Thrift 的 [Frugal](https://github.com/cloudwego/frugal) 库,新版本 Kitex 默认集成 Prutal。特点: + + - 产物体积最小化,只需生成结构体 + + - 使用与 Frugal 相似的反射优化,性能优于官方 Protobuf + + - 兼容官方 Protobuf 及衍生版本的生成代码 + + 详细信息参考 [Prutal](/zh/docs/kitex/tutorials/code-gen/prutal)。 + +### **功能/体验优化** +1. **TTHeader Streaming 支持配置接口级别 Recv 超时** + + 本版本 TTHeader Streaming 在原有的 Kitex Client 级别基础上,额外支持接口级别的 Recv 超时配置,配置更为灵活。 + + 详见 [StreamX 流超时控制](/zh/docs/kitex/tutorials/basic-feature/streamx/stream+timeout+control/)。 + +2. **Thrift 默认传输协议由 Buffered 改为 Framed** + + 这个变更可以利用 FastCodec 以获得更高的编解码性能。 + +### **其他** +1. 产物简化 + + - 默认不生成 Set 数据结构的重复校验代码与各结构体的 DeepEqual 函数 + + - 若只想恢复 DeepEqual,生成命令追加```-thrift gen_deep_equal=true``` + + - 若想恢复 Set 的重复校验,生成命令追加```-thrift validate_set=true, -thrift gen_deep_equal=true``` + + - 默认不生成 Apache Codec 相关代码 + + - 若想恢复这部分代码,生成命令追加```-thrift no_default_serdes=false``` +2. Go 支持版本变化 + + 支持版本 Go 1.19~1.24,最低支持版本变为 Go 1.19,如果 Go 版本过低,编译时会有提示:```note: module requires Go 1.19``` + +## **详细变更** +### Feature +[[#1719](https://github.com/cloudwego/kitex/pull/1719)] feat: prutal for replacing protoc + +[[#1736](https://github.com/cloudwego/kitex/pull/1736)] feat(ttstream): support WithRecvTimeout stream call option + +[[#1702](https://github.com/cloudwego/kitex/pull/1702)] feat(gRPC): add grpc client conn dump to help debug the conn and stream status + +[[#1723](https://github.com/cloudwego/kitex/pull/1723)] feat(codec/thrift): use fastcodec/frugal if apache codec not available + +[[#1724](https://github.com/cloudwego/kitex/pull/1724)] feat: add tail option to support for delayed initialization of some client options + +### Optimize +[[#1728](https://github.com/cloudwego/kitex/pull/1728)] optimize(apache): remove apache codec gen and set default protocol from buffered to framed + +[[#1732](https://github.com/cloudwego/kitex/pull/1732)] optimize(rpcinfo): purify the transport protocol of rpcinfo in a single rpc request + +[[#1711](https://github.com/cloudwego/kitex/pull/1711)] optimize(tool): disable set validate and deep equal code gen to simplify kitex_gen + +[[#1717](https://github.com/cloudwego/kitex/pull/1717)] optimize(gRPC): return more detailed error when received invalid http2 frame + +### Fix +[[#1734](https://github.com/cloudwego/kitex/pull/1734)] fix(ttstream): adjust stream state transition and remove all SetFinalizer to avoid memory leak + +[[#1735](https://github.com/cloudwego/kitex/pull/1735)] fix(generic): support both relative and absolute check for idl includes parse to make it compatible with generation tool + +[[#1725](https://github.com/cloudwego/kitex/pull/1725)] fix: code gen import issue for streamx mode, stream call judgement bug and set ttheader streaming as default + +[[#1727](https://github.com/cloudwego/kitex/pull/1727)] fix(tool): fix tool UseStdLib remains unexcepted lib issue. + +### Refactor +[[#1658](https://github.com/cloudwego/kitex/pull/1658)] refactor: streamx api to adapt both grpc and ttheader streaming protocol and provide more user-friendly interface + +[[#1729](https://github.com/cloudwego/kitex/pull/1729)] refactor(tool): move pb tpl code to sep pkg + +### Chore +[[#1743](https://github.com/cloudwego/kitex/pull/1743)] chore: update dependencies version + +[[#1740](https://github.com/cloudwego/kitex/pull/1740)] chore(generic): deprecate NewThriftContentProvider + +[[#1741](https://github.com/cloudwego/kitex/pull/1741)] chore(streamx): remove redundant streamx package + +[[#1738](https://github.com/cloudwego/kitex/pull/1738)] ci: fix typos & crate-ci/typos + +[[#1737](https://github.com/cloudwego/kitex/pull/1737)] chore: update dependency and change go support to 1.19-1.24 + +[[#1720](https://github.com/cloudwego/kitex/pull/1720)] Revert "fix(ttstream): pingpong method refers to server interface defined in Kitex generation code when streamx is enabled and there are other streaming methods" diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Basic+Programming.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Basic+Programming.md index 5132a26682f..11c1b3d0c3a 100644 --- a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Basic+Programming.md +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Basic+Programming.md @@ -14,10 +14,10 @@ description: "" - 传输协议:TTHeader - IDL 定义语言 与 序列化协议:Thrift -- **gRPC Streaming**:~~~~ (计划实现) +- **gRPC Streaming** - - ~~传输协议:gRPC~~ - - ~~IDL 定义语言 与 序列化协议:Protobuf 编码~~ + - 传输协议:gRPC + - IDL 定义语言 与 序列化协议:Thrift / Protobuf 此处选定的协议只影响从 IDL 生成代码,无论哪种协议,以下用法均一致。 @@ -44,11 +44,15 @@ service TestService { ``` #### 生成代码 +请确保 Kitex Tool 已经升级到 v0.13.0+: +``` +go install github.com/cloudwego/kitex/tool/cmd/kitex@latest +``` 为保持与旧流式生成代码的兼容,命令行需加上 `-streamx` flag。 ``` -kitex -streamx -module -service P.S.M echo.thrift +kitex -streamx -module -service service echo.thrift ``` ##### 初始化 @@ -58,22 +62,22 @@ kitex -streamx -module -service P.S.M echo.thrift ```go // 生成代码目录,streamserver 为 IDL 定义的 service name import ".../kitex_gen/echo/testservice" -import "github.com/cloudwego/kitex/client/streamxclient" +import "github.com/cloudwego/kitex/client" cli, err := testservice.NewClient( "a.b.c", - streamxclient.WithStreamRecvMiddleware(...), - streamxclient.WithStreamSendMiddleware(...), + client.WithStreamRecvMiddleware(...), + client.WithStreamSendMiddleware(...), ) ``` #### 创建 Server ```go -import ".../kitex_gen/echo/streamserver" -import "github.com/cloudwego/kitex/server/streamxserver" +import ".../kitex_gen/echo/testservice" +import "github.com/cloudwego/kitex/server" -svr := streamserver.NewServer( +svr := testservice.NewServer( new(serviceImpl), streamxserver.WithStreamRecvMiddleware(...), streamxserver.WithStreamSendMiddleware(...), @@ -105,11 +109,11 @@ client.CloseAndRecv(res) === EOF ==> server.Recv(EOF) - [**必须**]: client 必须调用 CloseAndRecv() 或者 (CloseSend + Recv)方法,告知 Server 不再有新数据发送。 ```go -ctx, cs, err := cli.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) for i := 0; i < 3; i++ { - err = cs.Send(ctx, req) + err = cs.Send(stream.Context(), req) } -res, err = cs.CloseAndRecv(ctx) +res, err = cs.CloseAndRecv(stream.Context()) ``` #### Server 用法 @@ -118,14 +122,14 @@ res, err = cs.CloseAndRecv(ctx) ```go -func (si *serviceImpl) ClientStream( - ctx context.Context, stream streamx.ClientStreamingServer[Request, Response] -) (res *Response, err error) { +func (si *serviceImpl) EchoClient( + ctx context.Context, stream echo.TestService_EchoClientServer +) (err error) { for { req, err := stream.Recv(ctx) if err == io.EOF { res := new(Response) - return res, nil + return stream.SendAndClose(ctx, res) } if err != nil { return nil, err @@ -157,9 +161,9 @@ client.Recv(EOF) <== EOF === server handler return - [**必须**]: client 必须判断 io.EOF 错误,并结束循环 ```go -ctx, ss, err := cli.ServerStream(ctx, req) +stream, err := cli.EchoServer(ctx, req) for { - res, err := ss.Recv(ctx) + res, err := stream.Recv(stream.Context()) if errors.Is(err, io.EOF) { break } @@ -169,7 +173,7 @@ for { #### Server 用法 ```go -func (si *serviceImpl) ServerStream(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *serviceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) error { for i := 0; i < 3; i++ { err := stream.Send(ctx, resp) if err != nil { @@ -208,20 +212,20 @@ client.Recv(EOF) <== EOF === server handler return - [**必须**]: client 必须在 Recv 时,判断 io.EOF 并结束循环 ```go -ctx, bs, err := cli.BidiStream(ctx) +stream, err := cli.EchoBidi(ctx) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for i := 0; i < round; i++ { - err := bs.Send(ctx, req) + err := stream.Send(stream.Context(), req) } - err = bs.CloseSend(ctx) + err = bs.CloseSend(stream.Context()) }() go func() { defer wg.Done() for { - res, err := bs.Recv(ctx) + res, err := stream.Recv(stream.Context()) if errors.Is(err, io.EOF) { break } @@ -235,7 +239,7 @@ wg.Wait() - [**必须**]: server 必须在 Recv 时,判断 io.EOF 并结束循环 ```go -func (si *serviceImpl) BidiStream(ctx context.Context, stream streamx.BidiStreamingServer[Request, Response]) error { +func (si *serviceImpl) EchoBidi(ctx context.Context, stream echo.TestService_EchoBidiServer) error { for { req, err := stream.Recv(ctx) if err == io.EOF { diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Error+Handling.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Error+Handling.md index 2053759c4e6..53198e2e5f8 100644 --- a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Error+Handling.md +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Error+Handling.md @@ -19,14 +19,14 @@ description: "" Server 实现: ```go -func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error { // 检查用户账户余额 for isHasBalance (req.UserId) { stream.Send(ctx, res) } // 返回用户余额不足错误 bizErr := kerrors.NewBizStatusErrorWithExtra( - 10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, + 10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, ) return bizErr } @@ -35,11 +35,11 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques Client 实现: ```go -svrStream, err = streamClient.ServerStreamWithErr(ctx, req) +stream, err = cli.ServerStreamWithErr(ctx, req) var err error for { - res, err = stream.Recv(ctx) + res, err = stream.Recv(stream.Context()) if err != nil { break } @@ -57,7 +57,7 @@ if ok { Server 实现: ```go -func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error { // ... return errors.New("test error") } @@ -66,12 +66,11 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques Client 实现: ```go -svrStream, err = streamClient.ServerStreamWithErr(ctx, req) -test.Assert(t, err == nil, err) +stream, err = cli.ServerStreamWithErr(ctx, req) var err error for { - res, err = stream.Recv(ctx) + res, err = stream.Recv(stream.Context()) if err != nil { break } diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+FAQ.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+FAQ.md index 3b6e76a1111..1471c60dd2a 100644 --- a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+FAQ.md +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+FAQ.md @@ -1,7 +1,7 @@ --- title: "StreamX 流编程常见问题 QA" date: 2025-01-13 -weight: 5 +weight: 6 keywords: ["流编程常见问题 QA"] description: "" --- diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Metainfo.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Metainfo.md index f9f3ca12533..7f32e924aad 100644 --- a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Metainfo.md +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Metainfo.md @@ -8,7 +8,7 @@ description: "" ## 前言 -消息透传总体用法与 [Kitex - 元信息透传](https://bytedance.larkoffice.com/wiki/Y3ChwldJzihF4Vkb6Ekcie38no4)相似,区别在于每一个 stream 只有在**创建时才能透传元信息**,发送消息无法透传。 +消息透传总体用法与 [Metainfo](/zh/docs/kitex/tutorials/advanced-feature/metainfo/) 相似,区别在于每一个 stream 只有在**创建时才能透传元信息**,发送消息无法透传。 ## 使用指南 @@ -18,14 +18,16 @@ description: "" ctx = metainfo.WithPersistentValue(ctx, "k1", "v1") ctx = metainfo.WithValue(ctx, "k2", "v2") -s, err := streamClient.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) ``` +> 如果使用 gRPC streaming,请确保 key 是大写,使用 _ 而不是 -,否则将被丢弃。TTHeader streaming 不做此要求。 + ### Server 接收元消息 ```go -func (s *streamingService) ClientStream(ctx context.Context, - stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { +func (s *streamingService) EchoClient(ctx context.Context, + stream echo.TestService_EchoClientServer) (err error) { v, ok := metainfo.GetPersistentValue(ctx, "k1") // v == "v1" @@ -39,13 +41,15 @@ func (s *streamingService) ClientStream(ctx context.Context, 反向透传引入了一个新的概念,Header 和 Trailer ,任何完整数据流必定包含 Header(头包) 和 Trailer(尾包),使用这两个帧来反向透传信息。 ```go -func (s *streamingService) ClientStream(ctx context.Context, - stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { +import "github.com/cloudwego/pkg/streaming" + +func (s *streamingService) EchoClient(ctx context.Context, + stream echo.TestService_EchoClientServer) (err error) { // SetTrailer set 的 trailer 会在 server handler 结束后发送 - err := stream.SetTrailer(streamx.Trailer{"t1": "v1"}) + err := stream.SetTrailer(streaming.Trailer{"t1": "v1"}) // 立刻发送 Header - err = stream.SendHeader(streamx.Header{"h1": "v1"}) + err = stream.SendHeader(streaming.Header{"h1": "v1"}) if err != nil { return err } @@ -57,12 +61,12 @@ func (s *streamingService) ClientStream(ctx context.Context, ### Client 接收反向透传的元消息 ```go -s, err := streamClient.ClientStream(ctx) +stream, err := cli.EchoClient(ctx) // Header/Trailer 函数会一直阻塞到对端发送了 Header/Trailer 为止,或中间发生了错误 -hd, err := s.Header() +hd, err := stream.Header() // hd["h1"] == "v1" -tl, err := s.Trailer() +tl, err := stream.Trailer() // tl["t1"] == "v1" ``` diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Middleware.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Middleware.md index 90ffc915f19..fe15faac25d 100644 --- a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Middleware.md +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Middleware.md @@ -8,18 +8,43 @@ description: "" ## 中间件类型 -### Stream Recv/Send Middleware +### Stream 中间件 + +**触发时机**:每次创建流时 + +#### 类型定义 +```go +// client: github.com/cloudwego/kitex/pkg/endpoint/cep +type StreamEndpoint func(ctx context.Context) (stream streaming.ClientStream, err error) +type StreamMiddleware func(next StreamEndpoint) StreamEndpoint + +// server: github.com/cloudwego/kitex/pkg/endpoint/sep +type StreamEndpoint func(ctx context.Context, stream streaming.ServerStream) (err error) +type StreamMiddleware func(next StreamEndpoint) StreamEndpoint +``` + +**参数说明**: + +- ```stream``` 为单次 RPC 创建的流对象 +- Client middleware 内 ```next``` 函数执行后,stream 即完成创建 +- Server middleware 内 ```next``` 函数执行后,server handler 即完成处理 + +### Stream Recv/Send 中间件 **触发时机**:流收发消息时调用 #### 类型定义 ```go -type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error) -type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error) +// client: github.com/cloudwego/kitex/pkg/endpoint/cep +type StreamRecvEndpoint func(ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) +type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint +// server: github.com/cloudwego/kitex/pkg/endpoint/sep +type StreamRecvEndpoint func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint -type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint + +// Send middleware 和 Recv middlware 定义类似... ``` **参数说明**: @@ -33,61 +58,81 @@ type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint | StreamRecvMiddleware | - 数据未真正收,刚调用 stream.Recv() 函数

- res 参数为空 | - 数据已收到或遇到错误

- res 参数有真实值 | | StreamSendMiddleware | - 数据未真正发送,刚调用 stream.Send() 函数

- req 参数为真实请求 | - 数据发送完成或遇到错误

- req 参数为真实请求 | -#### 使用范例 - -**使用场景**:流收/发消息时,注入相关业务逻辑。 +### Unary 中间件 +对所有非流式接口,我们额外提供了 ```UnaryMiddleware``` 用于注入仅对所有 unary 方法生效的中间件,该中间件与 kitex 原生支持的 ```WithMiddleware``` 的方法签名完全一致,区别在于后者可以同时对 streaming 方法生效。 ```go -svr, err := xxx.NewServer( - //... - streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - // ctx 依然含有用户透传的 token - token, ok := metainfo.GetPersistentValue(ctx, "user_token") - // 检查 token 是否有账户余额继续维持会话 - if !hasBalance(token) { - return fmt.Errorf("user dont have enough balance: token=%s", token) - } - return next(ctx, stream, res) - } - }), -) +type UnaryEndpoint Endpoint +type UnaryMiddleware func(next UnaryEndpoint) UnaryEndpoint + +// client.WithUnaryOptions(client.WithUnaryMiddleware(mw)) +// server.WithUnaryOptions(server.WithUnaryMiddleware(mw)) ``` ## 注入中间件 -#### 注入 Client Middleware +### 注入 client 侧的中间件 ```go +import "github.com/cloudwego/client" + cli, err := xxx.NewClient( - "a.b.c", - streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - return next(ctx, stream, res) - } - }), - streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { - return func(ctx context.Context, stream streamx.Stream, req any) (err error) { - return next(ctx, stream, req) - } - }), + "a.b.c", + client.WithStreamOptions( + client.WithStreamMiddleware(func (next cep.StreamEndpoint) cep.StreamEndpoint { + return func (ctx context.Context) (stream streaming.ClientStream, err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("create stream, method: ", ri.Invocation().MethodName()) + return next(ctx) + } + }), + client.WithStreamSendMiddleware(func (next cep.StreamSendEndpoint) cep.StreamSendEndpoint { + return func (ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("stream send message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + client.WithStreamRecvMiddleware(func (next cep.StreamRecvEndpoint) cep.StreamRecvEndpoint { + return func (ctx context.Context, stream streaming.ClientStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(stream.Context()) + println("stream recv message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + ), ) ``` -#### 注入 Server Middleware +### 注入 server 侧的中间件 ```go -server, err := xxx.NewServer( - // .... - streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { - return func(ctx context.Context, stream streamx.Stream, res any) (err error) { - return next(ctx, stream, res) - } - }), - streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { - return func(ctx context.Context, stream streamx.Stream, req any) (err error) { - return next(ctx, stream, req) - } - }), +import "github.com/cloudwego/server" + +svr, err := xxx.NewServer( + //... + server.WithStreamOptions( + server.WithStreamMiddleware(func(next sep.StreamEndpoint) sep.StreamEndpoint { + return func(ctx context.Context, st streaming.ServerStream) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("create stream, method: ", ri.Invocation().MethodName()) + return next(ctx, st) + } + }), + server.WithStreamRecvMiddleware(func(next sep.StreamRecvEndpoint) sep.StreamRecvEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("stream recv message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + server.WithStreamSendMiddleware(func(next sep.StreamSendEndpoint) sep.StreamSendEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, message interface{}) (err error) { + ri := rpcinfo.GetRPCInfo(ctx) + println("stream send message, method: ", ri.Invocation().MethodName()) + return next(ctx, stream, message) + } + }), + ), ) ``` diff --git a/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Timeout+Control.md b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Timeout+Control.md new file mode 100644 index 00000000000..7a994bafbc1 --- /dev/null +++ b/content/zh/docs/kitex/Tutorials/basic-feature/streamx/Stream+Timeout+Control.md @@ -0,0 +1,34 @@ +--- +title: "StreamX 流超时控制" +date: 2025-01-10 +weight: 5 +keywords: ["流超时控制最佳实践"] +description: "" +--- + +## TTHeader Streaming 支持的超时机制 +### Recv Timeout +对每次 Recv 进行超时控制。 +#### Client 级别 +这个 Client 下的所有接口都会生效。 + +``` +import ( + "github.com/cloudwego/kitex/client" +) + +cli, err := NewClient("service", client.WithStreamOptions(client.WithStreamRecvTimeout(timeout))) +``` +#### 接口级别(Kitex >= v0.13.0) +只针对单个接口生效。 + +``` +import ( + "github.com/cloudwego/kitex/client/callopt/streamcall" +) + +cli, err := NewClient("service") +// 调用下游某个流式接口 +stream, err := cli.NewStream(ctx, streamcall.WithRecvTimeout(timeout)) +效果 +``` diff --git a/content/zh/docs/kitex/Tutorials/code-gen/prutal.md b/content/zh/docs/kitex/Tutorials/code-gen/prutal.md new file mode 100644 index 00000000000..3f7dcd2238e --- /dev/null +++ b/content/zh/docs/kitex/Tutorials/code-gen/prutal.md @@ -0,0 +1,87 @@ +--- +title: "Prutal" +date: 2025-04-09 +weight: 11 +keywords: ["Prutal"] +description: "Protobuf 的无生成代码序列化库" +--- + +## **概要** + +Kitex 从 v0.13.0 版本开始将使用自研的 Prutal 实现用于替换 Protobuf 的 protoc 、protoc-gen-go 来生成代码、序列化。 + +Kitex 将不再需要安装 protoc 、protoc-gen-go 。对于新的用户并不会感知相应的变化。 + +Prutal 在序列化上兼容现有的 Protobuf,并且生成代码十分精简、没有多余的代码,并且在序列化性能优于官方。 + +技术上 Prutal 与 Frugal 类似,借助 struct tag 还有高效的反射实现序列化,并不依赖冗余的生成代码。 + +当前已经开源,具体实现还有后续的 benchmark 数据可以关注: https://github.com/cloudwego/prutal + +## **使用建议** + +初期并不建议用户直接使用 Prutal,当前只会承诺通过 Kitex 里 Prutal 集成的向前和向后兼容性。 + +如果在使用 Kitex 过程,发现任何问题,可以使用环境变量 KITEX_TOOL_USE_PROTOC 回退到 protobuf。 + +``` +KITEX_TOOL_USE_PROTOC=1 kitex --version +``` + +长期来看,Kitex 将废弃 protoc 的实现。建议遇到使用上的问题,可以反馈以帮助改进。 + +## **Kitex Tool 变更** + +由于不再使用 protoc 作为默认的代码生成工具,以下的参数将废弃: +``` +--protobuf + +--protobuf-plugin +``` + +这两个参数主要用于直接透传参数到 protoc,Kitex 自身并没有实际在使用。如果仍指定使用,将会报错: + +``` +[ERROR] invalid value "xxx" for flag -protobuf: flag is deprecated +[ERROR] invalid value "xxx" for flag -protobuf-plugin: flag is deprecated +``` + +如果用户有需求,可以自行调用 protoc 以及相关的插件进行代码生成。而不是依赖 Kitex 去调用 protoc。 + +由于旧的 protoc 实现的生成路径问题比较复杂,在老的实现中,以下参数并不生效: + +``` +--gen-path +``` + +其默认值只能为 kitex_gen,在新的 Prutal 中,修复了这个问题。 + +## **Prutal 与 Protobuf 兼容性问题** + +Kitex 默认使用 Prutal 进行生成代码。 + +如果用户直接通过 Protobuf 的库进行 Marshal / Unmarshal, 新 Kitex 的生成代码将会在编译期发生错误: + +``` +cannot use &YourRequest{} (value of type *YourRequest) as protoreflect.ProtoMessage value in argument to proto.Marshal: *YourRequest does not implement protoreflect.ProtoMessage (missing method ProtoReflect) +``` + +这是因为 Protobuf 库与 Protobuf 的生成代码是强绑定的,Protobuf 需要生成大量的二进制数据以协助其反射的实现。 + +建议直接使用 github.com/cloudwego/prutal 包: + +``` +// MarshalAppend appends the protobuf encoding of v to b and returns the new bytes +func MarshalAppend(b []byte, v interface{}) ([]byte, error) + +// Unmarshal parses the protobuf-encoded data and stores the result in the value pointed to by v. +func Unmarshal(b []byte, v interface{}) error +``` + +Prutal 兼容 Protobuf 的生成代码。因此哪怕原本的代码是官方 Protobuf 生成的,都可以使用 Prutal 来序列化,以获取更好的性能。 + +由于 Protobuf 实现上比较复杂,功能十分丰富。Prutal 很难保证 100% 的功能都可以兼容,如果发现任何问题,欢迎反馈。 + +## **问题反馈** + +有任何问题均可以向 prutal 提 [issue](https://github.com/cloudwego/prutal/issues) 进行反馈。