@@ -266,7 +266,7 @@ type VisionWriter struct {
266266 ctx context.Context
267267 writeOnceUserUUID * []byte
268268 isUplink bool
269- scheduler * Scheduler
269+ Scheduler * Scheduler
270270}
271271
272272func NewVisionWriter (writer buf.Writer , addon * Addons , state * TrafficState , isUplink bool , context context.Context ) * VisionWriter {
@@ -279,7 +279,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, isUp
279279 ctx : context ,
280280 writeOnceUserUUID : & w ,
281281 isUplink : isUplink ,
282- scheduler : NewScheduler (writer , addon , state , & w , context ),
282+ Scheduler : NewScheduler (writer , addon , state , & w , context ),
283283 }
284284}
285285
@@ -341,12 +341,24 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
341341 if w .trafficState .StartTime .IsZero () {
342342 w .trafficState .StartTime = time .Now ()
343343 }
344- w .scheduler .Buffer <- mb
345- if w .addons .Scheduler == nil {
346- w .scheduler .Trigger <- - 1 // send all buffers
344+ w .Scheduler .Buffer <- mb
345+ w .Scheduler .Trigger <- - 1 // send all buffers if no independent scheduler
346+ if w .addons .Scheduler != nil {
347+ w .Scheduler .TimeoutLock .Lock ()
348+ w .Scheduler .TimeoutCounter ++
349+ w .Scheduler .TimeoutLock .Unlock ()
350+ go func () {
351+ time .Sleep (time .Duration (w .addons .Scheduler .TimeoutMillis ) * time .Millisecond )
352+ w .Scheduler .TimeoutLock .Lock ()
353+ w .Scheduler .TimeoutCounter --
354+ if w .Scheduler .TimeoutCounter == 0 {
355+ w .Scheduler .Trigger <- 0 // send when the latest buffer timeout
356+ }
357+ w .Scheduler .TimeoutLock .Unlock ()
358+ }()
347359 }
348- if len (w .scheduler .Error ) > 0 {
349- return <- w .scheduler .Error
360+ if len (w .Scheduler .Error ) > 0 {
361+ return <- w .Scheduler .Error
350362 }
351363 return nil
352364}
@@ -596,7 +608,7 @@ func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) {
596608// CopyRawConnIfExist use the most efficient copy method.
597609// - If caller don't want to turn on splice, do not pass in both reader conn and writer conn
598610// - writer are from *transport.Link
599- func CopyRawConnIfExist (ctx context.Context , readerConn net.Conn , writerConn net.Conn , writer buf.Writer , timer * signal.ActivityTimer , inTimer * signal.ActivityTimer ) error {
611+ func CopyRawConnIfExist (ctx context.Context , readerConn net.Conn , writerConn net.Conn , writer buf.Writer , timer * signal.ActivityTimer , inTimer * signal.ActivityTimer , scheduler * Scheduler ) error {
600612 readerConn , readCounter , _ := UnwrapRawConn (readerConn )
601613 writerConn , _ , writeCounter := UnwrapRawConn (writerConn )
602614 reader := buf .NewReader (readerConn )
@@ -659,10 +671,13 @@ func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net
659671 if readCounter != nil {
660672 readCounter .Add (int64 (buffer .Len ()))
661673 }
662- timer .Update ()
663674 if werr := writer .WriteMultiBuffer (buffer ); werr != nil {
664675 return werr
665676 }
677+ timer .Update ()
678+ }
679+ if scheduler != nil {
680+ scheduler .Trigger <- 2
666681 }
667682 if err != nil {
668683 return err
0 commit comments