Skip to content

Commit f53e63b

Browse files
committed
Benchmark and little refactor
1 parent 7862105 commit f53e63b

5 files changed

Lines changed: 137 additions & 9 deletions

File tree

common/mux/bench_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package mux_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/xtls/xray-core/common"
8+
"github.com/xtls/xray-core/common/buf"
9+
"github.com/xtls/xray-core/common/mux"
10+
"github.com/xtls/xray-core/common/net"
11+
"github.com/xtls/xray-core/common/session"
12+
"github.com/xtls/xray-core/transport"
13+
"github.com/xtls/xray-core/transport/pipe"
14+
)
15+
16+
func BenchmarkMuxThroughput(b *testing.B) {
17+
serverCtx := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{}})
18+
muxServerUplink, muxServerDownlink := newLinkPair()
19+
dispatcher := TestDispatcher{
20+
OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) {
21+
inputReader, inputWriter := pipe.New(pipe.WithSizeLimit(512 * 1024))
22+
outputReader, outputWriter := pipe.New(pipe.WithSizeLimit(512 * 1024))
23+
go func() {
24+
defer outputWriter.Close()
25+
for {
26+
mb, err := inputReader.ReadMultiBuffer()
27+
if err != nil {
28+
break
29+
}
30+
buf.ReleaseMulti(mb)
31+
}
32+
}()
33+
return &transport.Link{
34+
Reader: outputReader,
35+
Writer: inputWriter,
36+
}, nil
37+
},
38+
}
39+
_, err := mux.NewServerWorker(serverCtx, &dispatcher, muxServerUplink)
40+
common.Must(err)
41+
client, err := mux.NewClientWorker(*muxServerDownlink, mux.ClientStrategy{})
42+
common.Must(err)
43+
clientCtx := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{
44+
Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80),
45+
}})
46+
muxClientUplink, muxClientDownlink := newLinkPair()
47+
go func() {
48+
for {
49+
mb, err := muxClientDownlink.Reader.ReadMultiBuffer()
50+
if err != nil {
51+
break
52+
}
53+
buf.ReleaseMulti(mb)
54+
}
55+
}()
56+
ok := client.Dispatch(clientCtx, muxClientUplink)
57+
if !ok {
58+
b.Fatal("failed to dispatch")
59+
}
60+
payload := make([]byte, 8192)
61+
b.SetBytes(int64(8192))
62+
b.ResetTimer()
63+
64+
for i := 0; i < b.N; i++ {
65+
data := buf.New()
66+
data.Write(payload)
67+
68+
err := muxClientUplink.Writer.WriteMultiBuffer(buf.MultiBuffer{data})
69+
if err != nil {
70+
b.Fatal(err)
71+
}
72+
}
73+
}

common/mux/client.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,14 +336,14 @@ func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool
336336

337337
func (m *ClientWorker) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
338338
if meta.Option.Has(OptionData) {
339-
return buf.Copy(NewStreamReader(reader), buf.Discard)
339+
return CopyChunk(reader, buf.Discard)
340340
}
341341
return nil
342342
}
343343

344344
func (m *ClientWorker) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
345345
if meta.Option.Has(OptionData) {
346-
return buf.Copy(NewStreamReader(reader), buf.Discard)
346+
return CopyChunk(reader, buf.Discard)
347347
}
348348
return nil
349349
}
@@ -359,7 +359,17 @@ func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
359359
closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream)
360360
closingWriter.Close()
361361

362-
return buf.Copy(NewStreamReader(reader), buf.Discard)
362+
return CopyChunk(reader, buf.Discard)
363+
}
364+
365+
if s.transferType == protocol.TransferTypeStream {
366+
err := CopyChunk(reader, s.output)
367+
if err != nil && buf.IsWriteError(err) {
368+
errors.LogInfoInner(context.Background(), err, "failed to write to downstream. closing session ", s.ID)
369+
s.Close(false)
370+
return err
371+
}
372+
return err
363373
}
364374

365375
rr := s.NewReader(reader, &meta.Target)
@@ -378,7 +388,7 @@ func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
378388
s.Close(false)
379389
}
380390
if meta.Option.Has(OptionData) {
381-
return buf.Copy(NewStreamReader(reader), buf.Discard)
391+
return CopyChunk(reader, buf.Discard)
382392
}
383393
return nil
384394
}

common/mux/reader.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,29 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
5757
func NewStreamReader(reader *buf.BufferedReader) buf.Reader {
5858
return crypto.NewChunkStreamReaderWithChunkCount(crypto.PlainChunkSizeParser{}, reader, 1)
5959
}
60+
61+
func CopyChunk(reader *buf.BufferedReader, writer buf.Writer) error {
62+
size, err := serial.ReadUint16(reader)
63+
if err != nil {
64+
return err
65+
}
66+
var writeErr error
67+
for size > 0 {
68+
mb, err := reader.ReadAtMost(int32(size))
69+
if !mb.IsEmpty() {
70+
size -= uint16(mb.Len())
71+
if writeErr == nil {
72+
if err := writer.WriteMultiBuffer(mb); err != nil {
73+
writeErr = err
74+
}
75+
} else {
76+
buf.ReleaseMulti(mb)
77+
}
78+
continue
79+
}
80+
if err != nil {
81+
return err
82+
}
83+
}
84+
return writeErr
85+
}

common/mux/server.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (w *ServerWorker) Close() error {
157157

158158
func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
159159
if meta.Option.Has(OptionData) {
160-
return buf.Copy(NewStreamReader(reader), buf.Discard)
160+
return CopyChunk(reader, buf.Discard)
161161
}
162162
return nil
163163
}
@@ -264,7 +264,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
264264
link, err := w.dispatcher.Dispatch(ctx, meta.Target)
265265
if err != nil {
266266
if meta.Option.Has(OptionData) {
267-
buf.Copy(NewStreamReader(reader), buf.Discard)
267+
CopyChunk(reader, buf.Discard)
268268
}
269269
return errors.New("failed to dispatch request.").Base(err)
270270
}
@@ -287,6 +287,15 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
287287
return nil
288288
}
289289

290+
if s.transferType == protocol.TransferTypeStream {
291+
err = CopyChunk(reader, s.output)
292+
if err != nil && buf.IsWriteError(err) {
293+
s.Close(false)
294+
return err
295+
}
296+
return err
297+
}
298+
290299
rr := s.NewReader(reader, &meta.Target)
291300
err = buf.Copy(rr, s.output)
292301

@@ -308,7 +317,17 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
308317
closingWriter := NewResponseWriter(meta.SessionID, w.link.Writer, protocol.TransferTypeStream)
309318
closingWriter.Close()
310319

311-
return buf.Copy(NewStreamReader(reader), buf.Discard)
320+
return CopyChunk(reader, buf.Discard)
321+
}
322+
323+
if s.transferType == protocol.TransferTypeStream {
324+
err := CopyChunk(reader, s.output)
325+
if err != nil && buf.IsWriteError(err) {
326+
errors.LogInfoInner(context.Background(), err, "failed to write to downstream writer. closing session ", s.ID)
327+
s.Close(false)
328+
return err
329+
}
330+
return err
312331
}
313332

314333
rr := s.NewReader(reader, &meta.Target)
@@ -328,7 +347,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
328347
s.Close(false)
329348
}
330349
if meta.Option.Has(OptionData) {
331-
return buf.Copy(NewStreamReader(reader), buf.Discard)
350+
return CopyChunk(reader, buf.Discard)
332351
}
333352
return nil
334353
}

common/mux/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
func newLinkPair() (*transport.Link, *transport.Link) {
18-
opt := pipe.WithoutSizeLimit()
18+
opt := pipe.WithSizeLimit(512 * 1024)
1919
uplinkReader, uplinkWriter := pipe.New(opt)
2020
downlinkReader, downlinkWriter := pipe.New(opt)
2121

0 commit comments

Comments
 (0)