@@ -2,10 +2,8 @@ package dokodemo
22
33import (
44 "context"
5- "runtime"
65 "strconv"
76 "strings"
8- "sync/atomic"
97
108 "github.com/xtls/xray-core/common"
119 "github.com/xtls/xray-core/common/buf"
@@ -14,11 +12,10 @@ import (
1412 "github.com/xtls/xray-core/common/net"
1513 "github.com/xtls/xray-core/common/protocol"
1614 "github.com/xtls/xray-core/common/session"
17- "github.com/xtls/xray-core/common/signal"
18- "github.com/xtls/xray-core/common/task"
1915 "github.com/xtls/xray-core/core"
2016 "github.com/xtls/xray-core/features/policy"
2117 "github.com/xtls/xray-core/features/routing"
18+ "github.com/xtls/xray-core/transport"
2219 "github.com/xtls/xray-core/transport/internet/stat"
2320 "github.com/xtls/xray-core/transport/internet/tls"
2421)
@@ -144,39 +141,11 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
144141 })
145142 errors .LogInfo (ctx , "received request for " , conn .RemoteAddr ())
146143
147- plcy := d .policy ()
148- ctx , cancel := context .WithCancel (ctx )
149- timer := signal .CancelAfterInactivity (ctx , cancel , plcy .Timeouts .ConnectionIdle )
150-
151- if inbound != nil {
152- inbound .Timer = timer
153- }
154-
155- ctx = policy .ContextWithBufferPolicy (ctx , plcy .Buffer )
156- link , err := dispatcher .Dispatch (ctx , dest )
157- if err != nil {
158- return errors .New ("failed to dispatch request" ).Base (err )
159- }
160-
161- requestCount := int32 (1 )
162- requestDone := func () error {
163- defer func () {
164- if atomic .AddInt32 (& requestCount , - 1 ) == 0 {
165- timer .SetTimeout (plcy .Timeouts .DownlinkOnly )
166- }
167- }()
168-
169- var reader buf.Reader
170- if dest .Network == net .Network_UDP {
171- reader = buf .NewPacketReader (conn )
172- } else {
173- reader = buf .NewReader (conn )
174- }
175- if err := buf .Copy (reader , link .Writer , buf .UpdateActivity (timer )); err != nil {
176- return errors .New ("failed to transport request" ).Base (err )
177- }
178-
179- return nil
144+ var reader buf.Reader
145+ if dest .Network == net .Network_TCP {
146+ reader = buf .NewReader (conn )
147+ } else {
148+ reader = buf .NewPacketReader (conn )
180149 }
181150
182151 var writer buf.Writer
@@ -208,72 +177,17 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
208177 return err
209178 }
210179 writer = NewPacketWriter (pConn , & dest , mark , back )
211- defer func () {
212- runtime .Gosched ()
213- common .Interrupt (link .Reader ) // maybe duplicated
214- runtime .Gosched ()
215- writer .(* PacketWriter ).Close () // close fake UDP conns
216- }()
217- /*
218- sockopt := &internet.SocketConfig{
219- Tproxy: internet.SocketConfig_TProxy,
220- }
221- if dest.Address.Family().IsIP() {
222- sockopt.BindAddress = dest.Address.IP()
223- sockopt.BindPort = uint32(dest.Port)
224- }
225- if d.sockopt != nil {
226- sockopt.Mark = d.sockopt.Mark
227- }
228- tConn, err := internet.DialSystem(ctx, net.DestinationFromAddr(conn.RemoteAddr()), sockopt)
229- if err != nil {
230- return err
231- }
232- defer tConn.Close()
233-
234- writer = &buf.SequentialWriter{Writer: tConn}
235- tReader := buf.NewPacketReader(tConn)
236- requestCount++
237- tproxyRequest = func() error {
238- defer func() {
239- if atomic.AddInt32(&requestCount, -1) == 0 {
240- timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
241- }
242- }()
243- if err := buf.Copy(tReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
244- return errors.New("failed to transport request (TPROXY conn)").Base(err)
245- }
246- return nil
247- }
248- */
249- }
250- }
251-
252- responseDone := func () error {
253- defer timer .SetTimeout (plcy .Timeouts .UplinkOnly )
254-
255- if network == net .Network_UDP && destinationOverridden {
256- buf .Copy (link .Reader , writer ) // respect upload's timeout
257- return nil
258- }
259-
260- if err := buf .Copy (link .Reader , writer , buf .UpdateActivity (timer )); err != nil {
261- return errors .New ("failed to transport response" ).Base (err )
180+ defer writer .(* PacketWriter ).Close () // close fake UDP conns
262181 }
263- return nil
264182 }
265183
266- if err := task .Run (ctx ,
267- task .OnSuccess (func () error { return task .Run (ctx , requestDone ) }, task .Close (link .Writer )),
268- responseDone ); err != nil {
269- runtime .Gosched ()
270- common .Interrupt (link .Writer )
271- runtime .Gosched ()
272- common .Interrupt (link .Reader )
273- return errors .New ("connection ends" ).Base (err )
184+ if err := dispatcher .DispatchLink (ctx , dest , & transport.Link {
185+ Reader : & buf.TimeoutWrapperReader {Reader : reader },
186+ Writer : writer },
187+ ); err != nil {
188+ return errors .New ("failed to dispatch request" ).Base (err )
274189 }
275-
276- return nil
190+ return nil // Unlike Dispatch(), DispatchLink() will not return until the outbound finishes Process()
277191}
278192
279193func NewPacketWriter (conn net.PacketConn , d * net.Destination , mark int , back * net.UDPAddr ) buf.Writer {
0 commit comments