Skip to content

Commit 949e5f8

Browse files
committed
dialers,auto: parallel dial
1 parent 6d6296a commit 949e5f8

6 files changed

Lines changed: 96 additions & 24 deletions

File tree

intra/dialers/retrier.go

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ const maxRetryCount = 3
5050
type retrier struct {
5151
dialers []protect.RDialer
5252
dialerOpts settings.DialerOpts
53-
raddr *net.TCPAddr
54-
laddr *net.TCPAddr // laddr may be nil; TCPAddr.IP may be nil.
53+
racing bool
54+
raddr net.Addr
55+
laddr net.Addr // laddr may be nil; TCPAddr.IP may be nil.
5556

5657
// Flags indicating whether the caller has called CloseRead and CloseWrite.
5758
readDone atomic.Bool
@@ -119,7 +120,7 @@ func calcTimeout(rtt time.Duration) time.Duration {
119120
// These values were chosen to have a <1% false positive rate based on test data.
120121
// False positives trigger an unnecessary retry, which can make connections slower, so they are
121122
// worth avoiding. However, overly long timeouts make retry slower and less useful.
122-
return 800*time.Millisecond + max(2*rtt, 100*time.Millisecond)
123+
return 400*time.Millisecond + max(2*rtt, 100*time.Millisecond)
123124
}
124125

125126
// DialWithSplitRetry returns a TCP connection that transparently retries by
@@ -146,6 +147,33 @@ func DialWithSplitRetry(d *protect.RDial, laddr, raddr *net.TCPAddr) (*retrier,
146147
return r, nil
147148
}
148149

150+
func dialerOptsForRace() settings.DialerOpts {
151+
return settings.DialerOpts{
152+
Strat: settings.SplitNever,
153+
Retry: settings.RetryWithSplit,
154+
}
155+
}
156+
157+
func DialRace(ds []protect.RDialer, laddr, raddr net.Addr) (*retrier, error) {
158+
r := &retrier{
159+
dialers: ds,
160+
dialerOpts: dialerOptsForRace(),
161+
racing: true,
162+
laddr: laddr, // may be nil
163+
raddr: raddr, // must not be nil
164+
retryDoneCh: make(chan struct{}),
165+
}
166+
167+
r.mu.Lock()
168+
defer r.mu.Unlock()
169+
170+
if _, err := r.dialLocked(); err != nil {
171+
return nil, err
172+
}
173+
return r, nil
174+
}
175+
176+
// SycallConn implements core.DuplexConn.
149177
func (r *retrier) SyscallConn() (syscall.RawConn, error) {
150178
r.mu.Lock()
151179
c := r.conn
@@ -157,6 +185,7 @@ func (r *retrier) SyscallConn() (syscall.RawConn, error) {
157185
return nil, syscall.EINVAL
158186
}
159187

188+
// SetKeepAlive implements core.DuplexConn.
160189
func (r *retrier) SetKeepAlive(y bool) error {
161190
r.mu.Lock()
162191
c := r.conn
@@ -242,8 +271,8 @@ func (r *retrier) dialLocked() (c core.DuplexConn, err error) {
242271
r.conn = c // c may be nil
243272
r.timeout = calcTimeout(rtt)
244273

245-
logeif(err)("retrier: dial(%s) %s=>%s; strat: %d, rtt: %dms; err? %v",
246-
r.dialerOpts, laddr(c), r.raddr, strat, rtt.Milliseconds(), err)
274+
logeif(err)("retrier: dial(%s) %s=>%s; strat: %d (race? %t), rtt: %dms; err? %v",
275+
r.dialerOpts, laddr(c), r.raddr, strat, r.racing, rtt.Milliseconds(), err)
247276

248277
return
249278
}
@@ -285,8 +314,8 @@ func (r *retrier) retryWriteReadLocked(buf []byte) (int, error) {
285314

286315
var nw int
287316
nw, r.retryErr = newConn.Write(r.tee)
288-
logeif(r.retryErr)("retrier: retryLocked: strat(%s) %s=>%s; write? %d/%d; err? %v",
289-
r.dialerOpts, laddr(newConn), r.raddr, nw, len(r.tee), r.retryErr)
317+
logeif(r.retryErr)("retrier: retryLocked: strat(%s, racing? %t) %s=>%s; write? %d/%d; err? %v",
318+
r.dialerOpts, r.racing, laddr(newConn), r.raddr, nw, len(r.tee), r.retryErr)
290319
if r.retryErr != nil {
291320
return 0, r.retryErr
292321
}
@@ -309,8 +338,8 @@ func (r *retrier) retryWriteReadLocked(buf []byte) (int, error) {
309338
_ = newConn.SetWriteDeadline(r.writeDeadline)
310339
}
311340

312-
logedcond(readdone || writedone)("retrier: retryLocked: done! strat(%s) %s=>%s; write? %d/%d; closed r/w? %t/%t; deadline r/w: %v/%v",
313-
r.dialerOpts, laddr(newConn), r.raddr, nw, len(r.tee), readdone, writedone, time.Since(r.readDeadline).Seconds(), time.Since(r.writeDeadline).Seconds())
341+
logedcond(readdone || writedone)("retrier: retryLocked: done! strat(%s; racing? %t) %s=>%s; write? %d/%d; closed r/w? %t/%t; deadline r/w: %v/%v",
342+
r.dialerOpts, r.racing, laddr(newConn), r.raddr, nw, len(r.tee), readdone, writedone, time.Since(r.readDeadline).Seconds(), time.Since(r.writeDeadline).Seconds())
314343

315344
return newConn.Read(buf)
316345
}
@@ -349,16 +378,20 @@ func (r *retrier) Read(buf []byte) (n int, err error) {
349378
if !r.retryCompleted() {
350379
defer close(r.retryDoneCh) // signal that retry is complete or unnecessary
351380
var retryerr error
381+
canRetry := r.retryCount < maxRetryCount
382+
if r.racing {
383+
canRetry = r.dialerCount < len(r.dialers)
384+
}
352385
// retry on errs like timeouts or connection resets
353-
for (c == nil || err != nil) && r.retryCount < maxRetryCount {
386+
for (c == nil || err != nil) && canRetry {
354387
r.retryCount++
355388
n, retryerr = r.retryWriteReadLocked(buf)
356389
c = r.conn // re-assign c to newConn, if any; may be nil
357390
if c == nil {
358391
err = core.UniqErr(err, retryerr)
359392
}
360-
logeor(retryerr, log.I)("retrier: read# %d: [%s<=%s] %d; err? %v",
361-
r.retryCount, laddr(c), r.raddr, n, retryerr)
393+
logeor(retryerr, log.I)("retrier: read# %d + (racing? %t / c: %d): [%s<=%s] %d; err? %v",
394+
r.retryCount, r.racing, r.dialerCount, laddr(c), r.raddr, n, retryerr)
362395
}
363396
if c != nil && core.IsNotNil(c) {
364397
_ = c.SetReadDeadline(r.readDeadline)

intra/dialers/split_and_desync.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,9 @@ func desyncWithFixedTtl(d protect.RDialer, local, remote netip.AddrPort, initial
298298
// then returns a TCP connection that may launch TCB Desynchronization
299299
// and split the initial upstream segment.
300300
// ref: github.com/bol-van/zapret/blob/c369f11638/docs/readme.eng.md#dpi-desync-attack
301-
func dialWithSplitAndDesync(d protect.RDialer, laddr, raddr *net.TCPAddr) (*overwriteSplitter, error) {
302-
remote := raddr.AddrPort() // must not be invalid
303-
local := laddr.AddrPort() // can be invalid
301+
func dialWithSplitAndDesync(d protect.RDialer, laddr, raddr net.Addr) (*overwriteSplitter, error) {
302+
remote := netip.MustParseAddrPort(raddr.String()) // must not be invalid
303+
local := netip.MustParseAddrPort(laddr.String()) // can be invalid
304304

305305
if !remote.IsValid() {
306306
log.E("desync: invalid raddr: conv %s to %s", raddr, remote)

intra/ipn/auto.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ import (
1919
"github.com/celzero/firestack/intra/settings"
2020
)
2121

22-
const ttl30s = 30 * time.Second
23-
const shortdelay = 100 * time.Millisecond
22+
const (
23+
ttl30s = 30 * time.Second
24+
shortdelay = 100 * time.Millisecond
25+
parallelDial = true
26+
)
2427

2528
// exit is a proxy that always dials out to the internet.
2629
type auto struct {
@@ -131,6 +134,15 @@ func (h *auto) dial(network, local, remote string) (protect.Conn, error) {
131134

132135
remoteOnly := settings.AutoAlwaysRemote.Load()
133136

137+
if parallelDial {
138+
all := []protect.RDialer{exit, exit64, pro, warp, amz, sep}
139+
if remoteOnly {
140+
all = []protect.RDialer{pro, warp, amz, sep}
141+
}
142+
// TODO: pinning IPs
143+
return parallelDialStrat(all, network, local, remote)
144+
}
145+
134146
previdx, recent := h.exp.Get(remote)
135147

136148
c, who, err := core.Race(

intra/ipn/proxies.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
"errors"
1212
"fmt"
1313
"math/rand"
14+
"net"
1415
"net/netip"
16+
"strconv"
1517
"strings"
1618
"sync"
1719
"time"
@@ -1487,6 +1489,35 @@ func localDialStrat(d *protect.RDial, network, local, remote string) (protect.Co
14871489
return dialers.SplitDialBind(d, network, local, remote)
14881490
}
14891491

1492+
func parallelDialStrat(all []protect.RDialer, network, local, remote string) (protect.Conn, error) {
1493+
return dialers.DialRace(all, str2addr(network, local), str2addr(network, remote))
1494+
}
1495+
1496+
func str2addr(network, addrport string) net.Addr {
1497+
ip, port, err := net.SplitHostPort(addrport)
1498+
if err != nil {
1499+
return nil
1500+
}
1501+
portno, err := strconv.Atoi(port)
1502+
if err != nil {
1503+
return nil
1504+
}
1505+
switch network {
1506+
case "tcp", "tcp4", "tcp6":
1507+
return &net.TCPAddr{
1508+
IP: net.ParseIP(ip),
1509+
Port: portno,
1510+
}
1511+
case "udp", "udp4", "udp6":
1512+
fallthrough
1513+
default:
1514+
return &net.UDPAddr{
1515+
IP: net.ParseIP(ip),
1516+
Port: portno,
1517+
}
1518+
}
1519+
}
1520+
14901521
func firstEmpty(arr []string) bool {
14911522
return len(arr) <= 0 || len(arr[0]) <= 0
14921523
}

intra/protect/xdial.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (d *RDial) DialTCP(network string, laddr, raddr *net.TCPAddr) (*net.TCPConn
233233
return DialTCP(d, network, laddr, raddr)
234234
}
235235

236-
func DialTCP(d RDialer, network string, laddr, raddr *net.TCPAddr) (*net.TCPConn, error) {
236+
func DialTCP(d RDialer, network string, laddr, raddr net.Addr) (*net.TCPConn, error) {
237237
if c, err := d.DialBind(network, laddr.String(), raddr.String()); err != nil {
238238
return nil, err
239239
} else if tc, ok := c.(*net.TCPConn); ok {
@@ -254,7 +254,7 @@ func (d *RDial) DialUDP(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn
254254
return DialUDP(d, network, laddr, raddr)
255255
}
256256

257-
func DialUDP(d RDialer, network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) {
257+
func DialUDP(d RDialer, network string, laddr, raddr net.Addr) (*net.UDPConn, error) {
258258
if c, err := d.DialBind(network, laddr.String(), raddr.String()); err != nil {
259259
return nil, err
260260
} else if uc, ok := c.(*net.UDPConn); ok {

intra/settings/dialeropts.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ type DialerOpts struct {
2626
WriteTimeoutSec int32
2727
}
2828

29-
func newDialerOpts() *DialerOpts {
30-
return &DialerOpts{}
31-
}
32-
3329
func (d DialerOpts) String() string {
3430
s := func() string {
3531
switch d.Strat {
@@ -99,7 +95,7 @@ const (
9995
RetryNever
10096
)
10197

102-
var dialerOpts = newDialerOpts()
98+
var dialerOpts = &DialerOpts{}
10399

104100
// SetDialerOpts sets the dialer options to use.
105101
func SetDialerOpts(strat, retry, timeoutsec int32, keepalive bool) bool {

0 commit comments

Comments
 (0)