Skip to content

Commit f686cfa

Browse files
committed
Add proxy Scheduler
1 parent e0711b7 commit f686cfa

2 files changed

Lines changed: 86 additions & 3 deletions

File tree

proxy/proxy.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func NewTrafficState(userUUID []byte, flow string) *TrafficState {
146146
return &state
147147
}
148148

149-
// VisionReader is used to read xtls vision protocol
149+
// VisionReader is used to read seed protocol
150150
// Note Vision probably only make sense as the inner most layer of reader, since it need assess traffic state from origin proxy traffic
151151
type VisionReader struct {
152152
buf.Reader
@@ -199,14 +199,15 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
199199
return buffer, err
200200
}
201201

202-
// VisionWriter is used to write xtls vision protocol
202+
// VisionWriter is used to write seed protocol
203203
// Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic
204204
type VisionWriter struct {
205205
buf.Writer
206206
addons *Addons
207207
trafficState *TrafficState
208208
ctx context.Context
209209
writeOnceUserUUID []byte
210+
scheduler *Scheduler
210211
}
211212

212213
func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter {
@@ -218,6 +219,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont
218219
trafficState: state,
219220
ctx: context,
220221
writeOnceUserUUID: w,
222+
scheduler: NewScheduler(writer, addon, state, context),
221223
}
222224
}
223225

@@ -270,7 +272,14 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
270272
if w.trafficState.StartTime.IsZero() {
271273
w.trafficState.StartTime = time.Now()
272274
}
273-
return w.Writer.WriteMultiBuffer(mb)
275+
w.scheduler.Buffer <- mb
276+
if w.addons.Scheduler == nil {
277+
w.scheduler.Trigger <- -1 // send all buffers
278+
}
279+
if len(w.scheduler.Error) > 0 {
280+
return <-w.scheduler.Error
281+
}
282+
return nil
274283
}
275284

276285
// ReshapeMultiBuffer prepare multi buffer for padding stucture (max 21 bytes)

proxy/scheduler.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"math/big"
7+
"sync"
8+
"time"
9+
10+
"github.com/xtls/xray-core/common/buf"
11+
"github.com/xtls/xray-core/common/session"
12+
)
13+
14+
type Scheduler struct {
15+
Buffer chan buf.MultiBuffer
16+
Trigger chan int
17+
Error chan error
18+
bufferReadLock *sync.Mutex
19+
writer buf.Writer
20+
addons *Addons
21+
trafficState *TrafficState
22+
ctx context.Context
23+
}
24+
25+
func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context context.Context) *Scheduler {
26+
var s = Scheduler{
27+
Buffer: make(chan buf.MultiBuffer, 100),
28+
Trigger: make(chan int),
29+
Error: make(chan error, 100),
30+
bufferReadLock: new(sync.Mutex),
31+
writer: w,
32+
addons: addon,
33+
trafficState: state,
34+
ctx: context,
35+
}
36+
go s.mainLoop()
37+
if s.addons.Scheduler != nil {
38+
go s.exampleIndependentScheduler()
39+
}
40+
return &s
41+
}
42+
43+
func(s *Scheduler) mainLoop() {
44+
for trigger := range s.Trigger {
45+
go func() { // each trigger has independent delay, trigger does not block
46+
var d = 0 * time.Millisecond
47+
if s.addons.Delay != nil {
48+
l, err := rand.Int(rand.Reader, big.NewInt(int64(s.addons.Delay.MaxMillis - s.addons.Delay.MinMillis)))
49+
if err != nil {
50+
newError("failed to generate delay", trigger).Base(err).WriteToLog(session.ExportIDToError(s.ctx))
51+
}
52+
d = time.Duration(uint32(l.Int64()) + s.addons.Delay.MinMillis)
53+
time.Sleep(d * time.Millisecond)
54+
}
55+
56+
s.bufferReadLock.Lock() // guard against multiple trigger threads
57+
var sending = len(s.Buffer)
58+
if sending > 0 {
59+
newError("Scheduler Trigger for ", sending, " buffer(s) with ", d, " ", trigger).AtDebug().WriteToLog(session.ExportIDToError(s.ctx))
60+
}
61+
for i := 0; i<sending; i++ {
62+
s.Error <- s.writer.WriteMultiBuffer(<-s.Buffer)
63+
}
64+
s.bufferReadLock.Unlock()
65+
}()
66+
}
67+
}
68+
69+
func(s *Scheduler) exampleIndependentScheduler() {
70+
for {
71+
time.Sleep(500 * time.Millisecond)
72+
s.Trigger <- -1 // send all buffers
73+
}
74+
}

0 commit comments

Comments
 (0)