Skip to content

Commit 31a8714

Browse files
authored
rpc streaming primitives (#2759)
built some streaming primitives with windowing for rpc streams. not hooked up to anything yet (or the WshRpc)
1 parent c506823 commit 31a8714

File tree

13 files changed

+1686
-474
lines changed

13 files changed

+1686
-474
lines changed

frontend/app/store/wshclientapi.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,16 @@ class RpcApiType {
582582
return client.wshRpcStream("streamcpudata", data, opts);
583583
}
584584

585+
// command "streamdata" [call]
586+
StreamDataCommand(client: WshClient, data: CommandStreamData, opts?: RpcOpts): Promise<void> {
587+
return client.wshRpcCall("streamdata", data, opts);
588+
}
589+
590+
// command "streamdataack" [call]
591+
StreamDataAckCommand(client: WshClient, data: CommandStreamAckData, opts?: RpcOpts): Promise<void> {
592+
return client.wshRpcCall("streamdataack", data, opts);
593+
}
594+
585595
// command "streamtest" [responsestream]
586596
StreamTestCommand(client: WshClient, opts?: RpcOpts): AsyncGenerator<number, void, boolean> {
587597
return client.wshRpcStream("streamtest", null, opts);

frontend/types/gotypes.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,26 @@ declare global {
461461
builderid: string;
462462
};
463463

464+
// wshrpc.CommandStreamAckData
465+
type CommandStreamAckData = {
466+
id: number;
467+
seq: number;
468+
rwnd: number;
469+
fin?: boolean;
470+
delay?: number;
471+
cancel?: boolean;
472+
error?: string;
473+
};
474+
475+
// wshrpc.CommandStreamData
476+
type CommandStreamData = {
477+
id: number;
478+
seq: number;
479+
data64?: string;
480+
eof?: boolean;
481+
error?: string;
482+
};
483+
464484
// wshrpc.CommandTermGetScrollbackLinesData
465485
type CommandTermGetScrollbackLinesData = {
466486
linestart: number;
@@ -1240,6 +1260,7 @@ declare global {
12401260
"wsh:cmd"?: string;
12411261
"wsh:haderror"?: boolean;
12421262
"conn:conntype"?: string;
1263+
"conn:wsherrorcode"?: string;
12431264
"onboarding:feature"?: "waveai" | "magnify" | "wsh";
12441265
"onboarding:version"?: string;
12451266
"onboarding:githubstar"?: "already" | "star" | "later";

pkg/streamclient/stream_test.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package streamclient
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"testing"
7+
"time"
8+
9+
"github.com/wavetermdev/waveterm/pkg/wshrpc"
10+
)
11+
12+
type fakeTransport struct {
13+
dataChan chan wshrpc.CommandStreamData
14+
ackChan chan wshrpc.CommandStreamAckData
15+
}
16+
17+
func newFakeTransport() *fakeTransport {
18+
return &fakeTransport{
19+
dataChan: make(chan wshrpc.CommandStreamData, 10),
20+
ackChan: make(chan wshrpc.CommandStreamAckData, 10),
21+
}
22+
}
23+
24+
func (ft *fakeTransport) SendData(dataPk wshrpc.CommandStreamData) {
25+
ft.dataChan <- dataPk
26+
}
27+
28+
func (ft *fakeTransport) SendAck(ackPk wshrpc.CommandStreamAckData) {
29+
ft.ackChan <- ackPk
30+
}
31+
32+
func TestBasicReadWrite(t *testing.T) {
33+
transport := newFakeTransport()
34+
35+
reader := NewReader(1, 1024, transport)
36+
writer := NewWriter(1, 1024, transport)
37+
38+
go func() {
39+
for dataPk := range transport.dataChan {
40+
reader.RecvData(dataPk)
41+
}
42+
}()
43+
44+
go func() {
45+
for ackPk := range transport.ackChan {
46+
writer.RecvAck(ackPk)
47+
}
48+
}()
49+
50+
testData := []byte("Hello, World!")
51+
n, err := writer.Write(testData)
52+
if err != nil {
53+
t.Fatalf("Write failed: %v", err)
54+
}
55+
if n != len(testData) {
56+
t.Fatalf("Write returned %d, expected %d", n, len(testData))
57+
}
58+
59+
buf := make([]byte, 1024)
60+
n, err = reader.Read(buf)
61+
if err != nil {
62+
t.Fatalf("Read failed: %v", err)
63+
}
64+
if n != len(testData) {
65+
t.Fatalf("Read returned %d, expected %d", n, len(testData))
66+
}
67+
if !bytes.Equal(buf[:n], testData) {
68+
t.Fatalf("Read data %q doesn't match written data %q", buf[:n], testData)
69+
}
70+
}
71+
72+
func TestEOF(t *testing.T) {
73+
transport := newFakeTransport()
74+
75+
reader := NewReader(1, 1024, transport)
76+
writer := NewWriter(1, 1024, transport)
77+
78+
go func() {
79+
for dataPk := range transport.dataChan {
80+
reader.RecvData(dataPk)
81+
}
82+
}()
83+
84+
go func() {
85+
for ackPk := range transport.ackChan {
86+
writer.RecvAck(ackPk)
87+
}
88+
}()
89+
90+
testData := []byte("Test data")
91+
writer.Write(testData)
92+
writer.Close()
93+
94+
buf := make([]byte, 1024)
95+
n, err := reader.Read(buf)
96+
if err != nil {
97+
t.Fatalf("First read failed: %v", err)
98+
}
99+
if !bytes.Equal(buf[:n], testData) {
100+
t.Fatalf("Read data doesn't match")
101+
}
102+
103+
_, err = reader.Read(buf)
104+
if err != io.EOF {
105+
t.Fatalf("Expected EOF, got %v", err)
106+
}
107+
}
108+
109+
func TestFlowControl(t *testing.T) {
110+
smallWindow := int64(10)
111+
transport := newFakeTransport()
112+
113+
reader := NewReader(1, smallWindow, transport)
114+
writer := NewWriter(1, smallWindow, transport)
115+
116+
go func() {
117+
for dataPk := range transport.dataChan {
118+
reader.RecvData(dataPk)
119+
}
120+
}()
121+
122+
go func() {
123+
for ackPk := range transport.ackChan {
124+
writer.RecvAck(ackPk)
125+
}
126+
}()
127+
128+
largeData := make([]byte, 100)
129+
for i := range largeData {
130+
largeData[i] = byte(i)
131+
}
132+
133+
writeDone := make(chan error)
134+
go func() {
135+
_, err := writer.Write(largeData)
136+
writeDone <- err
137+
}()
138+
139+
received := make([]byte, 0, 100)
140+
buf := make([]byte, 20)
141+
for len(received) < len(largeData) {
142+
n, err := reader.Read(buf)
143+
if err != nil {
144+
t.Fatalf("Read failed: %v", err)
145+
}
146+
received = append(received, buf[:n]...)
147+
}
148+
149+
select {
150+
case err := <-writeDone:
151+
if err != nil {
152+
t.Fatalf("Write failed: %v", err)
153+
}
154+
case <-time.After(2 * time.Second):
155+
t.Fatal("Write didn't complete in time")
156+
}
157+
158+
if !bytes.Equal(received, largeData) {
159+
t.Fatal("Received data doesn't match sent data")
160+
}
161+
}
162+
163+
func TestError(t *testing.T) {
164+
transport := newFakeTransport()
165+
166+
reader := NewReader(1, 1024, transport)
167+
writer := NewWriter(1, 1024, transport)
168+
169+
go func() {
170+
for dataPk := range transport.dataChan {
171+
reader.RecvData(dataPk)
172+
}
173+
}()
174+
175+
go func() {
176+
for ackPk := range transport.ackChan {
177+
writer.RecvAck(ackPk)
178+
}
179+
}()
180+
181+
testErr := io.ErrUnexpectedEOF
182+
writer.CloseWithError(testErr)
183+
184+
buf := make([]byte, 1024)
185+
_, err := reader.Read(buf)
186+
if err == nil {
187+
t.Fatal("Expected error from read")
188+
}
189+
if err.Error() != "stream error: unexpected EOF" {
190+
t.Fatalf("Expected stream error, got: %v", err)
191+
}
192+
}
193+
194+
func TestCancel(t *testing.T) {
195+
transport := newFakeTransport()
196+
197+
reader := NewReader(1, 1024, transport)
198+
writer := NewWriter(1, 1024, transport)
199+
200+
go func() {
201+
for dataPk := range transport.dataChan {
202+
reader.RecvData(dataPk)
203+
}
204+
}()
205+
206+
go func() {
207+
for ackPk := range transport.ackChan {
208+
writer.RecvAck(ackPk)
209+
}
210+
}()
211+
212+
reader.Close()
213+
214+
select {
215+
case <-writer.GetCanceledChan():
216+
// Success
217+
case <-time.After(1 * time.Second):
218+
t.Fatal("Writer not notified of cancellation")
219+
}
220+
221+
_, _, canceled := writer.GetAckState()
222+
if !canceled {
223+
t.Fatal("Writer should be in canceled state")
224+
}
225+
}
226+
227+
func TestMultipleWrites(t *testing.T) {
228+
transport := newFakeTransport()
229+
230+
reader := NewReader(1, 1024, transport)
231+
writer := NewWriter(1, 1024, transport)
232+
233+
go func() {
234+
for dataPk := range transport.dataChan {
235+
reader.RecvData(dataPk)
236+
}
237+
}()
238+
239+
go func() {
240+
for ackPk := range transport.ackChan {
241+
writer.RecvAck(ackPk)
242+
}
243+
}()
244+
245+
messages := []string{"First", "Second", "Third"}
246+
for _, msg := range messages {
247+
_, err := writer.Write([]byte(msg))
248+
if err != nil {
249+
t.Fatalf("Write failed: %v", err)
250+
}
251+
}
252+
253+
expected := "FirstSecondThird"
254+
buf := make([]byte, len(expected))
255+
totalRead := 0
256+
for totalRead < len(expected) {
257+
n, err := reader.Read(buf[totalRead:])
258+
if err != nil {
259+
t.Fatalf("Read failed: %v", err)
260+
}
261+
totalRead += n
262+
}
263+
264+
if string(buf) != expected {
265+
t.Fatalf("Expected %q, got %q", expected, string(buf))
266+
}
267+
}

0 commit comments

Comments
 (0)