Skip to content

Commit 3f3dc49

Browse files
committed
ip/wg: bail from wgconn if proxy has stopped
1 parent 6ea740a commit 3f3dc49

File tree

2 files changed

+39
-22
lines changed

2 files changed

+39
-22
lines changed

intra/ipn/wg/wgconn.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
errNoRawConn = errors.New("wg: bind: no raw conn")
7777
errNotUDP = errors.New("wg: bind: not a UDP conn")
7878
errNoListen = errors.New("wg: bind: listen failed")
79+
errEnded = errors.New("wg: bind: proxy ended")
7980
)
8081

8182
type floodkind int
@@ -96,7 +97,7 @@ func (k floodkind) String() string {
9697
}
9798
}
9899

99-
type rwobserver func(op PktDir, err error)
100+
type rwobserver func(op PktDir, err error) (ended bool)
100101
type connector func(network, to string) (net.PacketConn, error)
101102

102103
type PktDir string
@@ -144,7 +145,8 @@ type StdNetBind struct {
144145
observer rwobserver
145146
sendAddr *core.Volatile[netip.AddrPort] // may be invalid
146147

147-
closed atomic.Bool
148+
closed atomic.Bool // wgconn has been closed
149+
ended atomic.Bool // observer / connector are done
148150
}
149151

150152
// TODO: get d, ep, f, rb through an Opts bag?
@@ -245,6 +247,10 @@ func (s *StdNetBind) RemoteAddr() netip.AddrPort {
245247
}
246248

247249
func (s *StdNetBind) listenNet(network string, port int) (net.PacketConn, int, error) {
250+
if s.ended.Load() {
251+
return nil, 0, errEnded
252+
}
253+
248254
anyaddr := anyaddr6
249255
if network == "udp4" {
250256
anyaddr = anyaddr4
@@ -286,6 +292,11 @@ func (s *StdNetBind) listenNet(network string, port int) (net.PacketConn, int, e
286292
func (s *StdNetBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) {
287293
s.mu.Lock()
288294
defer s.mu.Unlock()
295+
296+
if s.ended.Load() {
297+
return nil, 0, errEnded
298+
}
299+
289300
s.closed.Store(false)
290301

291302
var err error
@@ -339,7 +350,12 @@ again:
339350
if len(fns) == 0 {
340351
return nil, 0, syscall.EAFNOSUPPORT
341352
}
342-
return fns, uint16(port), nil
353+
354+
var eerr error = nil
355+
if s.ended.Load() {
356+
eerr = errEnded
357+
}
358+
return fns, uint16(port), eerr
343359
}
344360

345361
// Pause implements wgconn
@@ -400,7 +416,7 @@ func (s *StdNetBind) Close() error {
400416
s.blackhole4 = false
401417
s.blackhole6 = false
402418

403-
s.observer(Clo, nil)
419+
s.ended.Store(s.observer(Clo, nil))
404420

405421
log.I("wg: bind: %s close; addrs %s + %s; err4? %v err6? %v", s.id, addr1, addr2, err1, err2)
406422
return core.JoinErr(err1, err2)
@@ -421,7 +437,9 @@ func (s *StdNetBind) makeReceiveFn(uc net.PacketConn) conn.ReceiveFunc {
421437
if !anyTransportTyp && anyProcessed {
422438
op = Crc // processed packets not transport data
423439
}
424-
s.observer(op, err)
440+
if s.observer(op, err) {
441+
s.ended.Store(true)
442+
}
425443
}()
426444

427445
amnezia := s.amnezia.Load()
@@ -479,7 +497,9 @@ func (s *StdNetBind) Send(buf [][]byte, peer conn.Endpoint) (err error) {
479497
if !anyTransportTyp && anyProcessed {
480498
op = Csn // processed packet not transport data
481499
}
482-
s.observer(op, err)
500+
if s.observer(op, err) {
501+
s.ended.Store(true)
502+
}
483503
}()
484504

485505
// the peer endpoint

intra/ipn/wgproxy.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -396,14 +396,13 @@ func (w *wgproxy) Refresh() (err error) {
396396
resetDevice = resetDevice && w.wgtun.ignoreTUNClose.CompareAndSwap(false, true)
397397
if resetDevice {
398398
var newdev *device.Device
399-
const useExistingCfg = ""
400399
// Close the old device before creating the new one.
401400
// w.Device.Down() already set bind.ipv4/ipv6 to nil, so Close() is a
402401
// near no-op on the bind here. Doing it in the other order would have
403402
// Close() re-enter Down() and close the bind that newdevice just opened.
404403
w.Device.Close() // tun.Close() is ignored via ignoreTUNClose
405404
w.events <- tun.EventUp
406-
if newdev, err = newdevice(w.wgtun, w.wgep, useExistingCfg); err == nil {
405+
if newdev, err = newdevice(w.wgtun, w.wgep); err == nil {
407406
w.Device = newdev // TODO: core.Volatile[device.Device]
408407
} else {
409408
w.wgtun.ignoreTUNClose.Store(false) // next Close() must not be silently ignored
@@ -632,7 +631,7 @@ func wgIfConfigOf(id string, txtptr *string) (opts wgifopts, err error) {
632631
opts.peers[v] = peerkey
633632
}
634633
// peer config: carry over public keys
635-
log.D("proxy: wg: %s ifconfig: processing key %q, err? %v", id, k, pfxsfx(v), exx)
634+
log.D("proxy: wg: %s ifconfig: processing key %q=%s, err? %v", id, k, pfxsfx(v), exx)
636635
pcfg.WriteString(line + "\n")
637636
finalizeMH(opts.eps, currentPeer)
638637
if len(v) > 8 {
@@ -749,7 +748,6 @@ func loadIPNets(out *[]netip.Prefix, v string) (err error) {
749748
func NewWgProxy(id string, ctl protect.Controller, px ProxyProvider, lp LinkProps, cfg string) (*wgproxy, error) {
750749
ogcfg := cfg
751750
opts, err := wgIfConfigOf(id, &cfg)
752-
uapicfg := cfg
753751
if err != nil {
754752
log.E("proxy: wg: %s failure getting opts from config %v", id, err)
755753
return nil, err
@@ -768,7 +766,7 @@ func NewWgProxy(id string, ctl protect.Controller, px ProxyProvider, lp LinkProp
768766
wgep = wg.NewEndpoint(wgtun.who(), wgtun.serve, wgtun.remote, wgtun.listener, wgtun.amnezia)
769767
}
770768

771-
wgdev, err := newdevice(wgtun, wgep, uapicfg)
769+
wgdev, err := newdevice(wgtun, wgep)
772770
if err != nil {
773771
return nil, err
774772
}
@@ -785,15 +783,10 @@ func NewWgProxy(id string, ctl protect.Controller, px ProxyProvider, lp LinkProp
785783
return w, nil
786784
}
787785

788-
func newdevice(wgtun *wgtun, wgep wgconn, uapicfg string) (*device.Device, error) {
786+
func newdevice(wgtun *wgtun, wgep wgconn) (*device.Device, error) {
789787
wgdev := device.NewDevice(wgtun, wgep, wglogger(wgtun))
790788

791-
curcfg := wgtun.uapicfg.Load()
792-
if len(uapicfg) <= 0 {
793-
// prefer stored UAPI config which reflects latest update() changes
794-
uapicfg = curcfg
795-
}
796-
789+
uapicfg := wgtun.uapicfg.Load()
797790
err := wgdev.IpcSet(uapicfg)
798791
if err != nil {
799792
defer wgdev.Close()
@@ -809,7 +802,6 @@ func newdevice(wgtun *wgtun, wgep wgconn, uapicfg string) (*device.Device, error
809802
// started by device.NewDevice()
810803
// err = wgdev.Up()
811804
// TODO: wait for wgconn to open?
812-
wgtun.uapicfg.Cas(curcfg, uapicfg)
813805
return wgdev, nil
814806
}
815807

@@ -1620,9 +1612,10 @@ func (h *wgtun) serve(network, local string) (pc net.PacketConn, err error) {
16201612
return
16211613
}
16221614

1623-
func (h *wgtun) listener(op wg.PktDir, err error) {
1615+
func (h *wgtun) listener(op wg.PktDir, err error) (ended bool) {
16241616
s := h.status.Load()
16251617
cur := s
1618+
ended = s == END
16261619

16271620
if op != wg.Clo {
16281621
if op.Read() {
@@ -1635,20 +1628,23 @@ func (h *wgtun) listener(op wg.PktDir, err error) {
16351628
}
16361629

16371630
if s == END || s == TPU { // stopped or paused
1631+
h.statusReason.Store("TXX: paused or stopped")
16381632
log.E("wg: %s listener: %s; status %s; ignoring1", h.tag(), op, pxstatus(s))
16391633
return
16401634
}
16411635

16421636
if s == TUP && op != wg.Opn { // ignore all else but open
1637+
h.statusReason.Store("TUP: waiting for wgconn")
16431638
return
16441639
}
16451640

16461641
why := ""
16471642

16481643
defer func() {
16491644
h.statusReason.Store(why)
1650-
updated := false
1651-
if cur != s {
1645+
updated := cur == s
1646+
ended = s == END
1647+
if !updated {
16521648
updated = h.status.Cas(cur, s)
16531649
}
16541650
logeif(!updated)("wg: %s listener: %s; status %s => %s; transition? %t, statusupdated? %t, why: %s",
@@ -1759,6 +1755,7 @@ func (h *wgtun) listener(op wg.PktDir, err error) {
17591755
}
17601756
// TODO: h.redoPeers()
17611757
}
1758+
return
17621759
}
17631760

17641761
// func Handle(), GetAddr(), Dialer(), Reaches(), Stop(),

0 commit comments

Comments
 (0)