-
Notifications
You must be signed in to change notification settings - Fork 308
docs: create generic streaming documentation in Chinese version #1326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
880fae4
docs(generic streaming): update streamCli.Close() usage in code examp…
panda-xinyi 7b42ee6
Merge branch 'cloudwego:main' into main
panda-xinyi acf1c45
docs: create generic streaming documentation in Chinese
panda-xinyi 428c235
docs(kitex): align generic streaming code examples with kitex-example…
panda-xinyi b6bec38
docs(kitex): add link to generic streaming demo
panda-xinyi 5d8f1e7
docs(kitex): improve generic streaming documentation with better impo…
panda-xinyi 411bbe3
docs(kitex): clarify distinction between Unary and Ping Pong modes in…
panda-xinyi 44e1410
docs(kitex): clarify io.EOF meaning in streaming documentation
panda-xinyi 09ad7a9
docs(kitex): update Kitex version to v0.13.1 in streaming documentation
panda-xinyi 0fe9aba
docs(kitex): add genericclient imports to all streaming code examples
panda-xinyi 33ccbdf
docs(kitex): use English terminology for streaming modes in Chinese d…
panda-xinyi 5896f57
docs(kitex): refine description of Ping Pong mode in Chinese document
panda-xinyi fcc1bd3
docs(kitex): capitalize Protobuf in both Chinese and English streamin…
panda-xinyi c3754c7
docs(kitex): add imports in streaming examples
panda-xinyi 8d678a5
docs(kitex): add link to Kitex v0.13.1 release notes
panda-xinyi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
276 changes: 276 additions & 0 deletions
276
content/zh/docs/kitex/Tutorials/advanced-feature/generic-call/generic_streaming.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,276 @@ | ||
| --- | ||
| title: "流式泛化调用用户指南" | ||
| date: 2025-05-09 | ||
| weight: 6 | ||
| keywords: ["流式泛化调用用户指南"] | ||
| description: "" | ||
| --- | ||
|
|
||
| ## 简介 | ||
|
|
||
| **Kitex v0.12.0 起支持流式接口的 JSON 泛化调用(仅客户端)**。 | ||
|
|
||
| ## 使用方法 | ||
|
|
||
| ### 泛化流式客户端初始化 | ||
|
|
||
| #### protobuf | ||
|
|
||
| 以如下 Protobuf IDL 为例: | ||
|
|
||
| ```protobuf | ||
| syntax = "proto3"; | ||
| package pb; | ||
| option go_package = "pb"; | ||
|
|
||
| message Request { | ||
| string message = 1; | ||
| } | ||
|
|
||
| message Response { | ||
| string message = 1; | ||
| } | ||
|
|
||
| service StreamingService { | ||
| rpc StreamRequestEcho (stream Request) returns (Response) {} | ||
| rpc StreamResponseEcho (Request) returns (stream Response) {} | ||
| rpc BidirectionalEcho (stream Request) returns (stream Response) {} | ||
| rpc UnaryEcho (Request) returns (Response) {} | ||
| } | ||
| ``` | ||
|
|
||
| 上述 IDL 包含四种方法,分别对应四种场景: | ||
|
|
||
| 1. 客户端流:客户端发送多条消息,服务端返回一条消息后关闭流。 | ||
| 2. 服务端流:客户端发送一条消息,服务端返回多条消息后关闭流,适合大模型等场景。 | ||
| 3. 双向流:客户端和服务端可独立收发消息,顺序可自定义。 | ||
| 4. 单次调用(非流式)。 | ||
|
|
||
| 流式客户端初始化示例: | ||
|
|
||
| ```go | ||
|
DMwangnima marked this conversation as resolved.
|
||
| ctx := context.Background() | ||
|
|
||
| // 初始化泛化客户端 | ||
| dOpts := proto.Options{} | ||
| p, err := generic.NewPbFileProviderWithDynamicGo(idlPath, ctx, dOpts) | ||
|
|
||
| // 创建 JSON 泛化对象 | ||
| g, err := generic.JSONPbGeneric(p) | ||
|
|
||
| // 初始化流式客户端 | ||
| cli, err := genericclient.NewStreamingClient( | ||
| "streaming", | ||
| g, | ||
| client.WithTransportProtocol(transport.GRPC), | ||
| client.WithHostPorts("127.0.0.1:8888"), | ||
| client.WithMetaHandler(transmeta.ClientHTTP2Handler), | ||
| ) | ||
|
|
||
| // ... 其他流式调用示例 ... | ||
| ``` | ||
|
|
||
| #### thrift | ||
|
|
||
| 以如下 Thrift IDL 为例: | ||
|
|
||
| ```thrift | ||
| namespace go echo | ||
|
|
||
| struct Request { | ||
| 1: required string message, | ||
| } | ||
|
|
||
| struct Response { | ||
| 1: required string message, | ||
| } | ||
|
|
||
| service TestService { | ||
| Response Echo (1: Request req) (streaming.mode="bidirectional"), | ||
| Response EchoClient (1: Request req) (streaming.mode="client"), | ||
| Response EchoServer (1: Request req) (streaming.mode="server"), | ||
| // Response EchoUnary (1: Request req) (streaming.mode="unary"), // 不推荐 | ||
|
|
||
| Response EchoPingPong (1: Request req), // KitexThrift,非流式 | ||
| } | ||
| ``` | ||
|
|
||
| 上述 IDL 包含以下场景: | ||
|
|
||
| 1. 客户端流:客户端发送多条消息,服务端返回一条消息后关闭流。 | ||
|
DMwangnima marked this conversation as resolved.
Outdated
|
||
| 2. 服务端流:客户端发送一条消息,服务端返回多条消息后关闭流,适合大模型等场景。 | ||
| 3. 双向流:客户端和服务端可独立收发消息,顺序可自定义。 | ||
| 4. 单次调用(gRPC):带 `streaming.mode` 注解的非流式(不推荐,性能有损失)。 | ||
| 5. 单次调用(KitexThrift):非流式(推荐)。 | ||
|
|
||
| 流式客户端初始化示例: | ||
|
|
||
| ```go | ||
|
DMwangnima marked this conversation as resolved.
|
||
| // 1. 创建 Thrift 文件提供者 | ||
| p, err := generic.NewThriftFileProvider("../idl/streaming.thrift") | ||
|
|
||
| // 2. 创建 JSON Thrift 泛化调用 | ||
| g, err := generic.JSONThriftGeneric(p) | ||
|
|
||
| // 3. 创建流式客户端 | ||
| cli, err := genericclient.NewStreamingClient( | ||
| "streaming_service", | ||
| g, | ||
| client.WithTransportProtocol(transport.GRPC), | ||
| client.WithHostPorts("127.0.0.1:8888"), | ||
| client.WithMetaHandler(transmeta.ClientHTTP2Handler), | ||
| ) | ||
|
|
||
| // ... 其他流式调用示例 ... | ||
| ``` | ||
|
|
||
| ### 客户端流(Client Streaming) | ||
|
|
||
| 示例: | ||
|
|
||
| ```go | ||
| // 使用已创建的流式客户端初始化 client streaming | ||
| streamCli, err := genericclient.NewClientStreaming(ctx, cli, "EchoClient") | ||
|
|
||
| // 发送多个请求 | ||
| for i := 0; i < 3; i++ { | ||
| req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i) | ||
| if err = streamCli.Send(req); err != nil { | ||
| return fmt.Errorf("failed to send: %v", err) | ||
| } | ||
| time.Sleep(time.Second) | ||
| } | ||
|
|
||
| // 接收最终响应 | ||
| resp, err := streamCli.CloseAndRecv() | ||
| strResp, ok := resp.(string) // 响应为 json 字符串 | ||
| ``` | ||
|
|
||
| ### 服务端流(Server Streaming) | ||
|
|
||
| 注意:`Recv` 返回非 nil 错误(包括 `io.EOF`)表示服务端已发送完毕或出错。 | ||
|
DMwangnima marked this conversation as resolved.
Outdated
|
||
|
|
||
| 示例: | ||
|
|
||
| ```go | ||
| // 使用已创建的流式客户端初始化 server streaming,并发送消息 | ||
| streamCli, err := genericclient.NewServerStreaming(ctx, cli, "EchoServer", `{"message": "grpc server streaming generic request"}`) | ||
|
|
||
| // 接收多个响应 | ||
| for { | ||
| resp, err := streamCli.Recv() | ||
| if err == io.EOF { | ||
| fmt.Println("Server streaming message receive done. stream is closed") | ||
| break | ||
| } else if err != nil { | ||
| return fmt.Errorf("failed to receive: %v", err) | ||
| } | ||
|
|
||
| strResp, ok := resp.(string) | ||
| } | ||
| ``` | ||
|
|
||
| ### 双向流(Bidirectional Streaming) | ||
|
|
||
| 示例: | ||
|
|
||
| ```go | ||
| // 使用已创建的流式客户端初始化 bidirectional streaming | ||
| streamCli, err := genericclient.NewBidirectionalStreaming(ctx, cli, "Echo") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create bidirectional streaming: %v", err) | ||
| } | ||
|
|
||
| wg := &sync.WaitGroup{} | ||
| wg.Add(2) | ||
| var sendErr, recvErr error | ||
|
|
||
| // 发送消息 | ||
| go func() { | ||
| defer func() { | ||
| if p := recover(); p != nil { | ||
| sendErr = fmt.Errorf("panic: %v", p) | ||
| } | ||
| wg.Done() | ||
| }() | ||
|
|
||
| for i := 0; i < 3; i++ { | ||
| req := fmt.Sprintf(`{"message": "grpc bidirectional streaming generic %dth request"}`, i) | ||
| if err = streamCli.Send(req); err != nil { | ||
| sendErr = fmt.Errorf("bidirectionalStreaming send: failed, err = %v", err) | ||
| break | ||
| } | ||
| klog.Infof("BidirectionalStreamingTest send: req = %+v", req) | ||
| } | ||
|
|
||
| // 发送完所有消息后关闭客户端到服务端的流方向 | ||
| if cerr := streamCli.Close(); cerr != nil { | ||
| sendErr = fmt.Errorf("stream close failed: %v", cerr) | ||
| } | ||
| }() | ||
|
|
||
|
|
||
| // 接收消息 | ||
| go func() { | ||
| defer func() { | ||
| if p := recover(); p != nil { | ||
| recvErr = fmt.Errorf("panic: %v", p) | ||
| } | ||
| wg.Done() | ||
| }() | ||
|
|
||
| for { | ||
| resp, err := streamCli.Recv() | ||
| if err == io.EOF { | ||
| klog.Infof("bidirectionalStreaming message receive done. stream is closed") | ||
| break | ||
| } else if err != nil { | ||
| recvErr = fmt.Errorf("failed to recv: %v", err) | ||
| break | ||
| } | ||
|
|
||
| strResp, ok := resp.(string) | ||
| } | ||
| }() | ||
|
|
||
| wg.Wait() | ||
| ``` | ||
|
|
||
| ### 单次调用(PingPong) | ||
|
|
||
| 用法与普通(非流式)泛化调用类似。 | ||
|
|
||
| 示例: | ||
|
|
||
| ```go | ||
| resp, err := cli.GenericCall(ctx, "EchoPingPong", `{"message": "unary request"}`) | ||
| strResp, ok := resp.(string) // 响应为 json 字符串 | ||
| ``` | ||
|
|
||
| ## 常见问题(FAQ) | ||
|
|
||
| ### Recv() got err: rpc error: code = 12 desc = Method not found! | ||
|
|
||
| 该错误出现在 Kitex **protobuf** 泛化流式调用下游为 **gRPC-python**(或其他语言 gRPC 库)时。 | ||
|
|
||
| 根因是 Kitex 没有解析 protobuf idl 的 package,导致 gRPC 请求的 `:path` 缺少 package 部分,gRPC-python 找不到对应方法。 | ||
|
|
||
| 例如: | ||
|
|
||
| - 普通客户端 | ||
|
|
||
| `:path` - /search.gpt_engine.GPTStreamService/GPTGeneration | ||
|
|
||
| - protobuf 泛化客户端 | ||
|
|
||
| `:path` - /GPTStreamService/GPTGeneration | ||
|
|
||
| #### 解决办法 | ||
|
|
||
| 可用如下分支修复,等待 Kitex v1.18.1 正式发布后即可解决: | ||
|
DMwangnima marked this conversation as resolved.
Outdated
|
||
|
|
||
| ```shell | ||
| go get -u github.com/cloudwego/kitex@v0.12.1-0.20241220085925-b5894d2f9e0c | ||
| ``` | ||
|
|
||
| 如需完整 main 函数示例,请参考[官方 demo](https://github.com/cloudwego/kitex-examples/tree/main/generic_streaming)。 | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.