Skip to content

Commit 67ddf75

Browse files
committed
fix(xdns): fix sendLoop starvation under mKCP retransmission flood
- Remove nextRec preemption from sendLoop inner select - Drain excess records before processing to skip stale queries - Reduce response wait from 1s to 50ms for faster turnaround - Increase server write queue from 512 to 4096 to prevent data drops - Increase mKCP connection timeout from 30s to 120s - Add sendEmptyResponse helper for drained records
1 parent bcade97 commit 67ddf75

6 files changed

Lines changed: 187 additions & 46 deletions

File tree

transport/internet/finalmask/xdns/client.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type xdnsConnClient struct {
5959

6060
resolverConns []*resolverConn
6161
resolverIdx atomic.Uint32
62+
serverAddr atomic.Value // stores net.Addr; set by WriteTo, used by recvLoopFrom in resolver mode
6263
recvWg sync.WaitGroup
6364
sendWg sync.WaitGroup
6465

@@ -92,12 +93,16 @@ func NewConnClient(c *Config, raw net.PacketConn) (net.PacketConn, error) {
9293
common.Must2(rand.Read(conn.clientID))
9394

9495
if len(c.Resolvers) > 0 {
96+
lc := net.ListenConfig{}
97+
if ctrl := resolverSocketControl(raw); ctrl != nil {
98+
lc.Control = ctrl
99+
}
95100
for _, rs := range c.Resolvers {
96101
addr, err := parseResolverAddr(rs)
97102
if err != nil {
98103
return nil, errors.New("invalid resolver address: ", rs, ": ", err)
99104
}
100-
uc, err := net.ListenPacket("udp", ":0")
105+
uc, err := lc.ListenPacket(context.Background(), "udp", ":0")
101106
if err != nil {
102107
for _, rc := range conn.resolverConns {
103108
rc.conn.Close()
@@ -166,6 +171,16 @@ func (c *xdnsConnClient) recvLoopFrom(conn net.PacketConn) {
166171
}
167172

168173
payload := dnsResponsePayload(&resp, c.domain)
174+
if payload == nil {
175+
continue
176+
}
177+
178+
pktAddr := net.Addr(addr)
179+
if len(c.resolverConns) > 0 {
180+
if sa := c.serverAddr.Load(); sa != nil {
181+
pktAddr = sa.(net.Addr)
182+
}
183+
}
169184

170185
r := bytes.NewReader(payload)
171186
anyPacket := false
@@ -181,7 +196,7 @@ func (c *xdnsConnClient) recvLoopFrom(conn net.PacketConn) {
181196
select {
182197
case c.readQueue <- &packet{
183198
p: buf,
184-
addr: addr,
199+
addr: pktAddr,
185200
}:
186201
default:
187202
errors.LogDebug(context.Background(), addr, " mask read err queue full")
@@ -283,6 +298,8 @@ func (c *xdnsConnClient) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
283298
}
284299

285300
func (c *xdnsConnClient) WriteTo(p []byte, addr net.Addr) (n int, err error) {
301+
c.serverAddr.Store(addr)
302+
286303
c.mutex.Lock()
287304
defer c.mutex.Unlock()
288305

transport/internet/finalmask/xdns/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package xdns
22

33
import (
44
"net"
5+
6+
"github.com/xtls/xray-core/common/errors"
57
)
68

79
func (c *Config) UDP() {
810
}
911

1012
func (c *Config) WrapPacketConnClient(raw net.PacketConn, level int, levelCount int) (net.PacketConn, error) {
13+
if len(c.Resolvers) > 0 && level > 0 {
14+
return nil, errors.New("xdns resolver mode cannot be combined with lower finalmask layers because resolver traffic must be valid DNS on the wire")
15+
}
1116
return NewConnClient(c, raw)
1217
}
1318

transport/internet/finalmask/xdns/server.go

Lines changed: 91 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (c *xdnsConnServer) ensureQueue(addr net.Addr) *queue {
124124
q, ok := c.writeQueueMap[addr.String()]
125125
if !ok {
126126
q = &queue{
127-
queue: make(chan []byte, 512),
127+
queue: make(chan []byte, 4096),
128128
stash: make(chan []byte, 1),
129129
}
130130
c.writeQueueMap[addr.String()] = q
@@ -225,17 +225,50 @@ func (c *xdnsConnServer) recvLoop() {
225225
}
226226
}
227227

228+
func (c *xdnsConnServer) sendEmptyResponse(rec *record) {
229+
if rec.Resp.Rcode() == RcodeNoError && len(rec.Resp.Question) == 1 {
230+
rec.Resp.Answer = []RR{
231+
{
232+
Name: rec.Resp.Question[0].Name,
233+
Type: rec.Resp.Question[0].Type,
234+
Class: rec.Resp.Question[0].Class,
235+
TTL: responseTTL,
236+
Data: EncodeRDataTXT(nil),
237+
},
238+
}
239+
}
240+
buf, err := rec.Resp.WireFormat()
241+
if err != nil {
242+
return
243+
}
244+
if len(buf) > maxUDPPayload {
245+
buf = buf[:maxUDPPayload]
246+
buf[2] |= 0x02
247+
}
248+
c.PacketConn.WriteTo(buf, rec.Addr)
249+
}
250+
228251
func (c *xdnsConnServer) sendLoop() {
229-
var nextRec *record
230252
for {
231-
rec := nextRec
232-
nextRec = nil
253+
rec, ok := <-c.ch
254+
if !ok {
255+
break
256+
}
233257

234-
if rec == nil {
235-
var ok bool
236-
rec, ok = <-c.ch
237-
if !ok {
238-
break
258+
errors.LogWarning(context.Background(), "xdns sendLoop: got record from ", rec.Addr, " client=", rec.ClientAddr, " chLen=", len(c.ch))
259+
260+
// Drain excess records, keeping the latest. mKCP floods retransmissions
261+
// that fill c.ch with hundreds of queries. Process only the latest one.
262+
drain:
263+
for {
264+
select {
265+
case newer, ok2 := <-c.ch:
266+
if !ok2 {
267+
break drain
268+
}
269+
rec = newer
270+
default:
271+
break drain
239272
}
240273
}
241274

@@ -252,56 +285,72 @@ func (c *xdnsConnServer) sendLoop() {
252285

253286
var payload bytes.Buffer
254287
limit := maxEncodedPayload
255-
timer := time.NewTimer(maxResponseDelay)
256288

257-
for {
258-
c.mutex.Lock()
259-
q := c.ensureQueue(rec.ClientAddr)
260-
if q == nil {
261-
c.mutex.Unlock()
262-
return
263-
}
289+
c.mutex.Lock()
290+
q := c.ensureQueue(rec.ClientAddr)
291+
if q == nil {
264292
c.mutex.Unlock()
265-
266-
var p []byte
267-
293+
return
294+
}
295+
c.mutex.Unlock()
296+
297+
// Try to get data immediately (non-blocking). If no data is
298+
// available, wait briefly (50ms) for data to arrive. This is much
299+
// shorter than the original 1s maxResponseDelay. DNS tunneling needs
300+
// fast turnaround because the client can only receive data in
301+
// responses to its queries.
302+
var p []byte
303+
select {
304+
case p = <-q.stash:
305+
default:
268306
select {
269307
case p = <-q.stash:
308+
case p = <-q.queue:
270309
default:
310+
timer := time.NewTimer(50 * time.Millisecond)
271311
select {
272312
case p = <-q.stash:
313+
timer.Stop()
273314
case p = <-q.queue:
274-
default:
275-
select {
276-
case p = <-q.stash:
277-
case p = <-q.queue:
278-
case <-timer.C:
279-
case nextRec = <-c.ch:
280-
}
315+
timer.Stop()
316+
case <-timer.C:
281317
}
282318
}
319+
}
283320

284-
timer.Reset(0)
285-
286-
if len(p) == 0 {
287-
break
288-
}
321+
errors.LogWarning(context.Background(), "xdns sendLoop: data fetch result len=", len(p), " qLen=", len(q.queue), " stashLen=", len(q.stash))
289322

323+
// Pack first packet
324+
if len(p) > 0 {
290325
limit -= 2 + len(p)
291-
if payload.Len() > 0 && limit < 0 {
292-
c.stash(q, p)
293-
break
294-
}
295-
296-
// if len(p) > 65535 {
297-
// panic(len(p))
298-
// }
299-
300326
_ = binary.Write(&payload, binary.BigEndian, uint16(len(p)))
301327
payload.Write(p)
328+
329+
// Try to batch more packets immediately (non-blocking)
330+
for {
331+
var p2 []byte
332+
select {
333+
case p2 = <-q.stash:
334+
default:
335+
select {
336+
case p2 = <-q.stash:
337+
case p2 = <-q.queue:
338+
default:
339+
}
340+
}
341+
if len(p2) == 0 {
342+
break
343+
}
344+
limit -= 2 + len(p2)
345+
if limit < 0 {
346+
c.stash(q, p2)
347+
break
348+
}
349+
_ = binary.Write(&payload, binary.BigEndian, uint16(len(p2)))
350+
payload.Write(p2)
351+
}
302352
}
303353

304-
timer.Stop()
305354
rec.Resp.Answer[0].Data = EncodeRDataTXT(payload.Bytes())
306355
}
307356

@@ -363,7 +412,6 @@ func (c *xdnsConnServer) WriteTo(p []byte, addr net.Addr) (n int, err error) {
363412
case q.queue <- buf:
364413
return len(p), nil
365414
default:
366-
// errors.LogDebug(context.Background(), addr, " mask write err queue full")
367415
return 0, nil
368416
}
369417
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//go:build linux
2+
3+
package xdns
4+
5+
import (
6+
"context"
7+
"net"
8+
"syscall"
9+
10+
"github.com/xtls/xray-core/common/errors"
11+
"golang.org/x/sys/unix"
12+
)
13+
14+
// resolverSocketControl reads outbound socket options (SO_MARK, SO_BINDTODEVICE)
15+
// from the raw PacketConn and returns a Control function that applies them to
16+
// resolver sockets. This ensures resolver traffic respects the same routing
17+
// policy and interface binding as the original outbound connection.
18+
func resolverSocketControl(raw net.PacketConn) func(network, address string, c syscall.RawConn) error {
19+
sc, ok := raw.(syscall.Conn)
20+
if !ok {
21+
return nil
22+
}
23+
rawConn, err := sc.SyscallConn()
24+
if err != nil {
25+
return nil
26+
}
27+
28+
var mark int
29+
var iface string
30+
rawConn.Control(func(fd uintptr) {
31+
v, err := syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK)
32+
if err == nil && v != 0 {
33+
mark = v
34+
}
35+
s, err := unix.GetsockoptString(int(fd), syscall.SOL_SOCKET, unix.SO_BINDTODEVICE)
36+
if err == nil && s != "" {
37+
iface = s
38+
}
39+
})
40+
41+
if mark == 0 && iface == "" {
42+
return nil
43+
}
44+
45+
return func(network, address string, c syscall.RawConn) error {
46+
return c.Control(func(fd uintptr) {
47+
if mark != 0 {
48+
if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, mark); err != nil {
49+
errors.LogInfo(context.Background(), "xdns: failed to set SO_MARK on resolver socket: ", err)
50+
}
51+
}
52+
if iface != "" {
53+
if err := syscall.BindToDevice(int(fd), iface); err != nil {
54+
errors.LogInfo(context.Background(), "xdns: failed to bind resolver socket to interface: ", err)
55+
}
56+
}
57+
})
58+
}
59+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
//go:build !linux
2+
3+
package xdns
4+
5+
import (
6+
"net"
7+
"syscall"
8+
)
9+
10+
func resolverSocketControl(_ net.PacketConn) func(network, address string, c syscall.RawConn) error {
11+
return nil
12+
}

transport/internet/kcp/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func (c *Connection) flush() {
610610
if c.State() == StateTerminated {
611611
return
612612
}
613-
if c.State() == StateActive && current-atomic.LoadUint32(&c.lastIncomingTime) >= 30000 {
613+
if c.State() == StateActive && current-atomic.LoadUint32(&c.lastIncomingTime) >= 120000 {
614614
c.Close()
615615
}
616616
if c.State() == StateReadyToClose && c.sendingWorker.IsEmpty() {

0 commit comments

Comments
 (0)