Skip to content

Commit dee64ef

Browse files
committed
little refactor
1 parent 0a1b5bf commit dee64ef

4 files changed

Lines changed: 78 additions & 8 deletions

File tree

common/buf/copy.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ type readError struct {
5656
error
5757
}
5858

59+
func NewReadError(err error) error {
60+
return readError{err}
61+
}
62+
5963
func (e readError) Error() string {
6064
return e.error.Error()
6165
}
@@ -74,6 +78,10 @@ type writeError struct {
7478
error
7579
}
7680

81+
func NewWriteError(err error) error {
82+
return writeError{err}
83+
}
84+
7785
func (e writeError) Error() string {
7886
return e.error.Error()
7987
}

common/mux/client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,14 +332,14 @@ func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool
332332

333333
func (m *ClientWorker) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
334334
if meta.Option.Has(OptionData) {
335-
return buf.Copy(NewStreamReader(reader), buf.Discard)
335+
return CopyChunk(reader, buf.Discard)
336336
}
337337
return nil
338338
}
339339

340340
func (m *ClientWorker) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
341341
if meta.Option.Has(OptionData) {
342-
return buf.Copy(NewStreamReader(reader), buf.Discard)
342+
return CopyChunk(reader, buf.Discard)
343343
}
344344
return nil
345345
}
@@ -355,7 +355,19 @@ func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
355355
closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream)
356356
closingWriter.Close()
357357

358-
return buf.Copy(NewStreamReader(reader), buf.Discard)
358+
return CopyChunk(reader, buf.Discard)
359+
}
360+
361+
if s.transferType == protocol.TransferTypeStream {
362+
err := CopyChunk(reader, s.output)
363+
if err != nil && buf.IsWriteError(err) {
364+
errors.LogInfoInner(context.Background(), err, "failed to write to downstream. closing session ", s.ID)
365+
s.Close(false)
366+
// down stream can have a write err but don't return the err to terminate the whole mux connection
367+
// because it's still available for other sessions
368+
return nil
369+
}
370+
return err
359371
}
360372

361373
rr := s.NewReader(reader, &meta.Target)
@@ -374,7 +386,7 @@ func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
374386
s.Close(false)
375387
}
376388
if meta.Option.Has(OptionData) {
377-
return buf.Copy(NewStreamReader(reader), buf.Discard)
389+
return CopyChunk(reader, buf.Discard)
378390
}
379391
return nil
380392
}

common/mux/reader.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,32 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
5757
func NewStreamReader(reader *buf.BufferedReader) buf.Reader {
5858
return crypto.NewChunkStreamReaderWithChunkCount(crypto.PlainChunkSizeParser{}, reader, 1)
5959
}
60+
61+
func CopyChunk(reader *buf.BufferedReader, writer buf.Writer) error {
62+
size, err := serial.ReadUint16(reader)
63+
if err != nil {
64+
return err
65+
}
66+
var writeErr error
67+
for size > 0 {
68+
mb, readErr := reader.ReadAtMost(int32(size))
69+
if !mb.IsEmpty() {
70+
size -= uint16(mb.Len())
71+
if writeErr == nil {
72+
if err := writer.WriteMultiBuffer(mb); err != nil {
73+
writeErr = err
74+
}
75+
} else {
76+
buf.ReleaseMulti(mb)
77+
}
78+
continue
79+
}
80+
if readErr != nil {
81+
return buf.NewReadError(readErr)
82+
}
83+
}
84+
if writeErr != nil {
85+
return buf.NewWriteError(writeErr)
86+
}
87+
return nil
88+
}

common/mux/server.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (w *ServerWorker) Close() error {
157157

158158
func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
159159
if meta.Option.Has(OptionData) {
160-
return buf.Copy(NewStreamReader(reader), buf.Discard)
160+
return CopyChunk(reader, buf.Discard)
161161
}
162162
return nil
163163
}
@@ -264,7 +264,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
264264
link, err := w.dispatcher.Dispatch(ctx, meta.Target)
265265
if err != nil {
266266
if meta.Option.Has(OptionData) {
267-
buf.Copy(NewStreamReader(reader), buf.Discard)
267+
CopyChunk(reader, buf.Discard)
268268
}
269269
return errors.New("failed to dispatch request.").Base(err)
270270
}
@@ -287,6 +287,15 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
287287
return nil
288288
}
289289

290+
if s.transferType == protocol.TransferTypeStream {
291+
err = CopyChunk(reader, s.output)
292+
if err != nil && buf.IsWriteError(err) {
293+
s.Close(false)
294+
return err
295+
}
296+
return err
297+
}
298+
290299
rr := s.NewReader(reader, &meta.Target)
291300
err = buf.Copy(rr, s.output)
292301

@@ -308,7 +317,19 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
308317
closingWriter := NewResponseWriter(meta.SessionID, w.link.Writer, protocol.TransferTypeStream)
309318
closingWriter.Close()
310319

311-
return buf.Copy(NewStreamReader(reader), buf.Discard)
320+
return CopyChunk(reader, buf.Discard)
321+
}
322+
323+
if s.transferType == protocol.TransferTypeStream {
324+
err := CopyChunk(reader, s.output)
325+
if err != nil && buf.IsWriteError(err) {
326+
errors.LogInfoInner(context.Background(), err, "failed to write to downstream writer. closing session ", s.ID)
327+
s.Close(false)
328+
// down stream can have a write err but don't return the err to terminate the whole mux connection
329+
// because it's still available for other sessions
330+
return nil
331+
}
332+
return err
312333
}
313334

314335
rr := s.NewReader(reader, &meta.Target)
@@ -328,7 +349,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
328349
s.Close(false)
329350
}
330351
if meta.Option.Has(OptionData) {
331-
return buf.Copy(NewStreamReader(reader), buf.Discard)
352+
return CopyChunk(reader, buf.Discard)
332353
}
333354
return nil
334355
}

0 commit comments

Comments
 (0)