@@ -12,25 +12,29 @@ import (
1212)
1313
1414type Scheduler struct {
15- Buffer chan buf.MultiBuffer
16- Trigger chan int
17- Error chan error
18- bufferReadLock * sync.Mutex
19- writer buf.Writer
20- addons * Addons
21- trafficState * TrafficState
22- ctx context.Context
15+ Buffer chan buf.MultiBuffer
16+ Trigger chan int
17+ Error chan error
18+ closed chan int
19+ bufferReadLock * sync.Mutex
20+ writer buf.Writer
21+ addons * Addons
22+ trafficState * TrafficState
23+ writeOnceUserUUID * []byte
24+ ctx context.Context
2325}
2426
25- func NewScheduler (w buf.Writer , addon * Addons , state * TrafficState , context context.Context ) * Scheduler {
27+ func NewScheduler (w buf.Writer , addon * Addons , state * TrafficState , userUUID * [] byte , context context.Context ) * Scheduler {
2628 var s = Scheduler {
2729 Buffer : make (chan buf.MultiBuffer , 100 ),
2830 Trigger : make (chan int ),
2931 Error : make (chan error , 100 ),
32+ closed : make (chan int ),
3033 bufferReadLock : new (sync.Mutex ),
3134 writer : w ,
3235 addons : addon ,
3336 trafficState : state ,
37+ writeOnceUserUUID : userUUID ,
3438 ctx : context ,
3539 }
3640 go s .mainLoop ()
@@ -42,6 +46,9 @@ func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context cont
4246
4347func (s * Scheduler ) mainLoop () {
4448 for trigger := range s .Trigger {
49+ if len (s .closed ) > 0 {
50+ return
51+ }
4552 go func () { // each trigger has independent delay, trigger does not block
4653 var d = 0 * time .Millisecond
4754 if s .addons .Delay != nil {
@@ -58,12 +65,31 @@ func(s *Scheduler) mainLoop() {
5865 if sending > 0 {
5966 errors .LogDebug (s .ctx , "Scheduler Trigger for " , sending , " buffer(s) with " , d , " " , trigger )
6067 for i := 0 ; i < sending ; i ++ {
61- s .Error <- s .writer .WriteMultiBuffer (<- s .Buffer )
68+ err := s .writer .WriteMultiBuffer (<- s .Buffer )
69+ if err != nil {
70+ s .Error <- err
71+ s .closed <- 1
72+ return
73+ }
6274 }
63- } else if trigger > 0 {
75+ } else if trigger > 0 && s . trafficState . IsPadding && ShouldStartSeed ( s . addons , s . trafficState ) && ! ShouldStopSeed ( s . addons , s . trafficState ) {
6476 errors .LogDebug (s .ctx , "Scheduler Trigger for fake buffer with " , d , " " , trigger )
77+ s .trafficState .NumberOfPacketSent += 1
6578 mb := make (buf.MultiBuffer , 1 )
66- s .Error <- s .writer .WriteMultiBuffer (mb )
79+ mb [0 ] = XtlsPadding (nil , CommandPaddingContinue , s .writeOnceUserUUID , true , s .addons , s .ctx )
80+ s .trafficState .ByteSent += int64 (mb .Len ())
81+ if s .trafficState .StartTime .IsZero () {
82+ s .trafficState .StartTime = time .Now ()
83+ }
84+ err := s .writer .WriteMultiBuffer (mb )
85+ if err != nil {
86+ s .Error <- err
87+ s .closed <- 1
88+ return
89+ }
90+ if buffered , ok := s .writer .(* buf.BufferedWriter ); ok {
91+ buffered .SetBuffered (false )
92+ }
6793 }
6894 s .bufferReadLock .Unlock ()
6995 }()
@@ -72,7 +98,10 @@ func(s *Scheduler) mainLoop() {
7298
7399func (s * Scheduler ) exampleIndependentScheduler () {
74100 for {
75- time .Sleep (500 * time .Millisecond )
101+ if len (s .closed ) > 0 {
102+ return
103+ }
76104 s .Trigger <- 1 // send fake buffer if no pending
105+ time .Sleep (500 * time .Millisecond )
77106 }
78107}
0 commit comments