Skip to content

Commit a1f1436

Browse files
committed
docs: update streamx related documents
1 parent 3f5d5a6 commit a1f1436

8 files changed

Lines changed: 296 additions & 192 deletions

File tree

content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Basic+Programming.md

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ Current support:
1414

1515
- Transport protocol: TTHeader
1616
- IDL Definition Language and Serialization Protocol: Thrift
17-
- **gRPC Streaming** : ~~~~(planned implementation)
17+
- **gRPC Streaming**
1818

19-
- ~~Transport protocol: gRPC~~
20-
- ~~IDL Definition Language and Serialization Protocol: Protobuf Encoding~~
19+
- Transport protocol: gRPC
20+
- IDL Definition Language and Serialization Protocol: Thrift / Protobuf
2121

2222
The protocol selected here only affects code generated from IDL. Regardless of the protocol, the following usage is consistent.
2323

@@ -44,11 +44,15 @@ service TestService {
4444
```
4545

4646
#### Generate code
47+
Please make sure that Kitex Tool has been upgraded to v0.13.0+:
48+
```
49+
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
50+
```
4751

48-
To maintain compatibility with legacy stream-generated code, Command Line needs to add `the -streamx ` flag.
52+
To maintain compatibility with legacy stream-generated code, Command Line needs to add the `-streamx` flag.
4953

5054
```
51-
kitex -streamx -module <go module> -service P.S.M echo.thrift
55+
kitex -streamx -module <go module> -service service echo.thrift
5256
```
5357

5458
##### Initialization
@@ -57,25 +61,25 @@ kitex -streamx -module <go module> -service P.S.M echo.thrift
5761

5862
```go
5963
import ".../kitex_gen/echo/testservice"
60-
import "github.com/cloudwego/kitex/client/streamxclient"
64+
import "github.com/cloudwego/kitex/client"
6165

6266
cli, err := testservice.NewClient(
6367
"a.b.c",
64-
streamxclient.WithStreamRecvMiddleware(...),
65-
streamxclient.WithStreamSendMiddleware(...),
68+
client.WithStreamRecvMiddleware(...),
69+
client.WithStreamSendMiddleware(...),
6670
)
6771
```
6872

6973
#### Create Server
7074

7175
```go
72-
import ".../kitex_gen/echo/streamserver"
73-
import "github.com/cloudwego/kitex/server/streamxserver"
76+
import ".../kitex_gen/echo/testservice"
77+
import "github.com/cloudwego/kitex/server"
7478

75-
svr := streamserver.NewServer(
79+
svr := testservice.NewServer(
7680
new(serviceImpl),
77-
streamxserver.WithStreamRecvMiddleware(...),
78-
streamxserver.WithStreamSendMiddleware(...),
81+
server.WithStreamRecvMiddleware(...),
82+
server.WithStreamSendMiddleware(...),
7983
)
8084
```
8185

@@ -101,14 +105,14 @@ client.CloseAndRecv(res) === EOF ==> server.Recv(EOF)
101105

102106
#### Client Usage
103107

104-
- [**Must**] : The client must call the CloseAndRecv () or (CloseSend + Recv) method to inform the server that there is no new data to send.
108+
- [**Must**] : The client must call the CloseAndRecv() or (CloseSend + Recv) method to inform the server that there is no new data to send.
105109

106110
```go
107-
ctx, cs, err := cli.ClientStream(ctx)
111+
stream, err := cli.EchoClient(ctx)
108112
for i := 0; i < 3; i++ {
109-
err = cs.Send(ctx, req)
113+
err = stream.Send(stream.Context(), req)
110114
}
111-
res, err = cs.CloseAndRecv(ctx)
115+
res, err = stream.CloseAndRecv(stream.Context())
112116
```
113117

114118
#### Server usage
@@ -117,14 +121,14 @@ res, err = cs.CloseAndRecv(ctx)
117121

118122
```go
119123

120-
func (si *serviceImpl) ClientStream(
121-
ctx context.Context, stream streamx.ClientStreamingServer[Request, Response]
122-
) (res *Response, err error) {
124+
func (si *serviceImpl) EchoClient(
125+
ctx context.Context, stream echo.TestService_EchoClientServer
126+
) (err error) {
123127
for {
124128
req, err := stream.Recv(ctx)
125129
if err == io.EOF {
126130
res := new(Response)
127-
return res, nil
131+
return stream.SendAndClose(ctx, res)
128132
}
129133
if err != nil {
130134
return nil, err
@@ -153,12 +157,12 @@ client.Recv(EOF) <== EOF === server handler return
153157

154158
#### Client Usage
155159

156-
- [**Must**] : The client must check the io. EOF error and end the loop
160+
- [**Must**] : The client must check the io.EOF error and end the loop
157161

158162
```go
159-
ctx, ss, err := cli.ServerStream(ctx, req)
163+
stream, err := cli.EchoServer(ctx, req)
160164
for {
161-
res, err := ss.Recv(ctx)
165+
res, err := stream.Recv(stream.Context())
162166
if errors.Is(err, io.EOF) {
163167
break
164168
}
@@ -168,7 +172,7 @@ for {
168172
#### Server usage
169173

170174
```go
171-
func (si *serviceImpl) ServerStream(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error {
175+
func (si *serviceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) error {
172176
for i := 0; i < 3; i++ {
173177
err := stream.Send(ctx, resp)
174178
if err != nil {
@@ -204,23 +208,23 @@ client.Recv(EOF) <== EOF === server handler return
204208
#### Client Usage
205209

206210
- [**Must**] : client must call CloseSend after sending
207-
- [**Must**] : client must judge io. EOF and end the loop when Recv
211+
- [**Must**] : client must judge io.EOF and end the loop when Recv
208212

209213
```go
210-
ctx, bs, err := cli.BidiStream(ctx)
214+
stream, err := cli.EchoBidi(ctx)
211215
var wg sync.WaitGroup
212216
wg.Add(2)
213217
go func() {
214218
defer wg.Done()
215219
for i := 0; i < round; i++ {
216-
err := bs.Send(ctx, req)
220+
err := stream.Send(stream.Context(), req)
217221
}
218-
err = bs.CloseSend(ctx)
222+
err = stream.CloseSend(stream.Context())
219223
}()
220224
go func() {
221225
defer wg.Done()
222226
for {
223-
res, err := bs.Recv(ctx)
227+
res, err := stream.Recv(stream.Context())
224228
if errors.Is(err, io.EOF) {
225229
break
226230
}
@@ -234,7 +238,7 @@ wg.Wait()
234238
- [**Must**] : The server must determine io. EOF and end the loop when Recv
235239

236240
```go
237-
func (si *serviceImpl) BidiStream(ctx context.Context, stream streamx.BidiStreamingServer[Request, Response]) error {
241+
func (si *serviceImpl) EchoBidi(ctx context.Context, stream echo.TestService_EchoBidiServer) error {
238242
for {
239243
req, err := stream.Recv(ctx)
240244
if err == io.EOF {

content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Error+Handling.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ Unlike RPC, stream errors can occur at any time during stream processing. For e
1919
Server implementation:
2020

2121
```go
22-
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error {
22+
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error {
2323
// 检查用户账户余额
2424
for isHasBalance (req.UserId) {
2525
stream.Send(ctx, res)
2626
}
2727
// 返回用户余额不足错误
2828
bizErr := kerrors.NewBizStatusErrorWithExtra(
29-
10001, "insufficient user balance", map[string]string{"testKey": "testVal"}
29+
10001, "insufficient user balance", map[string]string{"testKey": "testVal"},
3030
)
3131
return bizErr
3232
}
@@ -35,11 +35,11 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques
3535
Client implementation:
3636

3737
```go
38-
svrStream, err = streamClient.ServerStreamWithErr(ctx, req)
38+
stream, err = cli.ServerStreamWithErr(ctx, req)
3939

4040
var err error
4141
for {
42-
res, err = stream.Recv(ctx)
42+
res, err = stream.Recv(stream.Context())
4343
if err != nil {
4444
break
4545
}
@@ -52,12 +52,12 @@ if ok {
5252

5353
### Other errors
5454

55-
If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as `(* thrift.ApplicationException) `. At this time, only the error Message can be obtained.
55+
If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as `(*thrift.ApplicationException)`. At this time, only the error Message can be obtained.
5656

5757
Server implementation:
5858

5959
```go
60-
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error {
60+
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error {
6161
// ...
6262
return errors.New("test error")
6363
}
@@ -66,16 +66,16 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques
6666
Client implementation:
6767

6868
```go
69-
svrStream, err = streamClient.ServerStreamWithErr(ctx, req)
70-
test.Assert(t, err == nil, err)
69+
stream, err = cli.ServerStreamWithErr(ctx, req)
7170

7271
var err error
7372
for {
74-
res, err = stream.Recv(ctx)
73+
res, err = stream.Recv(stream.Context())
7574
if err != nil {
7675
break
7776
}
7877
}
78+
7979
ex, ok := err.(*thrift.ApplicationException)
8080
if ok {
8181
println(ex.TypeID(), ex.Msg())

content/en/docs/kitex/Tutorials/basic-feature/streamx/Kitex+StreamX+-+Stream+Metainfo.md

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ The overall usage of message pass-through is similar to [Kitex - 元信息透传
1818
ctx = metainfo.WithPersistentValue(ctx, "k1", "v1")
1919
ctx = metainfo.WithValue(ctx, "k2", "v2")
2020

21-
s, err := streamClient.ClientStream(ctx)
21+
stream, err := cli.EchoClient(ctx)
2222
```
23+
> If using gRPC streaming, make sure the key is uppercase and use _ instead of - or it will be discarded. TTHeader streaming does not make this requirement.
2324
2425
### Server receives meta message
2526

2627
```go
27-
func (s *streamingService) ClientStream(ctx context.Context,
28-
stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) {
28+
func (s *streamingService) EchoClient(ctx context.Context,
29+
stream echo.TestService_EchoClientServer) (err error) {
2930

3031
v, ok := metainfo.GetPersistentValue(ctx, "k1")
3132
// v == "v1"
@@ -39,30 +40,32 @@ func (s *streamingService) ClientStream(ctx context.Context,
3940
Reverse pass-through introduces a new concept, Header and Trailer. Any complete data stream must include Header and Trailer. Use these two frames to reverse pass-through information.
4041

4142
```go
42-
func (s *streamingService) ClientStream(ctx context.Context,
43-
stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) {
43+
import "github.com/cloudwego/pkg/streaming"
44+
45+
func (s *streamingService) EchoClient(ctx context.Context,
46+
stream echo.TestService_EchoClientServer) (err error) {
4447

45-
// SetTrailer set 的 trailer 会在 server handler 结束后发送
46-
err := stream.SetTrailer(streamx.Trailer{"t1": "v1"})
47-
// 立刻发送 Header
48-
err = stream.SendHeader(streamx.Header{"h1": "v1"})
48+
// the trailer set by SetTrailer would be sent after server handler finishing
49+
err := stream.SetTrailer(streaming.Trailer{"t1": "v1"})
50+
// send Header directly
51+
err = stream.SendHeader(streaming.Header{"h1": "v1"})
4952
if err != nil {
5053
return err
5154
}
52-
// 发送正常数据
55+
// send normal data
5356
err = stream.Send(req)
5457
}
5558
```
5659

5760
### Client receives a reverse pass-through meta message
5861

5962
```go
60-
s, err := streamClient.ClientStream(ctx)
63+
stream, err := cli.EchoClient(ctx)
6164

62-
// Header/Trailer 函数会一直阻塞到对端发送了 Header/Trailer 为止,或中间发生了错误
63-
hd, err := s.Header()
65+
// Header/Trailer calling would block constantly until the remote side has sent Header/Trailer, or there was an error in the intermediate process
66+
hd, err := stream.Header()
6467
// hd["h1"] == "v1"
65-
tl, err := s.Trailer()
68+
tl, err := stream.Trailer()
6669
// tl["t1"] == "v1"
6770
```
6871

0 commit comments

Comments
 (0)