Skip to content

Commit 07f45d0

Browse files
committed
feat: support server-side part
1 parent 47cd4fd commit 07f45d0

3 files changed

Lines changed: 32 additions & 13 deletions

File tree

pkg/remote/trans/ttstream/server_handler.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,24 +126,21 @@ func (t *svrTransHandler) OnActive(ctx context.Context, conn net.Conn) (context.
126126
// OnRead control the connection level lifecycle.
127127
// only when OnRead return, netpoll can close the connection buffer
128128
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
129+
trans, _ := ctx.Value(serverTransCtxKey{}).(*serverTransport)
130+
if trans == nil {
131+
err = fmt.Errorf("server transport is nil")
132+
return
133+
}
129134
var wg sync.WaitGroup
130135
defer func() {
131136
wg.Wait()
132-
trans, _ := ctx.Value(serverTransCtxKey{}).(*serverTransport)
133-
if trans != nil {
134-
trans.WaitClosed()
135-
}
137+
trans.WaitClosed()
136138
if errors.Is(err, io.EOF) {
137139
err = nil
138140
}
139141
}()
140142
// connection level goroutine
141143
for {
142-
trans, _ := ctx.Value(serverTransCtxKey{}).(*serverTransport)
143-
if trans == nil {
144-
err = fmt.Errorf("server transport is nil")
145-
return
146-
}
147144
var st *serverStream
148145
// ReadStream will block until a stream coming or conn return error
149146
st, err = trans.ReadStream(ctx)

pkg/remote/trans/ttstream/transport_client.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,17 @@ func (t *clientTransport) loopRead() error {
190190
func (t *clientTransport) WriteFrame(fr *Frame) (err error) {
191191
// todo: optimize performance
192192
t.mu.Lock()
193-
defer t.mu.Unlock()
193+
defer func() {
194+
t.mu.Unlock()
195+
if err != nil {
196+
t.Close(err)
197+
}
198+
}()
194199
if err = EncodeFrame(context.Background(), t.writer, fr); err != nil {
195-
t.Close(err)
196200
return err
197201
}
198202
recycleFrame(fr)
199203
if err = t.writer.Flush(); err != nil {
200-
t.Close(err)
201204
return err
202205
}
203206
return nil

pkg/remote/trans/ttstream/transport_server.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"io"
2323
"net"
24+
"sync"
2425
"sync/atomic"
2526

2627
"github.com/cloudwego/gopkg/bufiox"
@@ -35,6 +36,9 @@ import (
3536
type serverTransport struct {
3637
conn netpoll.Connection
3738
stream atomic.Pointer[serverStream]
39+
// mu protects writer
40+
// todo: optimize performance
41+
mu sync.Mutex
3842
writer *writerBuffer
3943
// transport should operate directly on stream
4044
scache []*serverStream // size is streamCacheSize
@@ -141,6 +145,10 @@ func (t *serverTransport) readFrame(reader bufiox.Reader) error {
141145
s = newServerStream(ctx, t, fr.streamFrame)
142146
s.cancelFunc = cFunc
143147
t.storeStream(s)
148+
// todo: 如果能确保一个连接就是被一个 stream 独占的,那么这里不用那么麻烦,直接通知即可
149+
// 是否可能存在新的 stream 开始了,但是连接上继续收到了上一个 stream 的情况?
150+
// 这里需要确保一个流的生命周期结束后,连接才能被分配给下一个流
151+
// 此处需要做防御性编程,确保一个连接只被一个 Stream 占据
144152
err = t.spipe.Write(context.Background(), s)
145153
} else {
146154
// load exist stream
@@ -184,11 +192,22 @@ func (t *serverTransport) loopRead() error {
184192

185193
// WriteFrame is concurrent safe
186194
func (t *serverTransport) WriteFrame(fr *Frame) (err error) {
195+
// todo: optimize performance
196+
t.mu.Lock()
197+
defer func() {
198+
t.mu.Unlock()
199+
if err != nil {
200+
t.Close(err)
201+
}
202+
}()
187203
if err = EncodeFrame(context.Background(), t.writer, fr); err != nil {
188204
return err
189205
}
190206
recycleFrame(fr)
191-
return t.writer.Flush()
207+
if err = t.writer.Flush(); err != nil {
208+
return err
209+
}
210+
return nil
192211
}
193212

194213
func (t *serverTransport) CloseStream(sid int32) (err error) {

0 commit comments

Comments
 (0)