Skip to content

Commit f0786fd

Browse files
teovlclaude
andcommitted
feat(transport): NAT hole-punch direct-upgrade + streamed send-file + prefer-direct
Make P2P transfers actually go direct (and stay direct) across NAT, and ship large files reliably. - daemon.go: rewrite relayProbeLoop → tryDirectUpgrade. The old loop sent a one-way SendDirectProbe every 5 min, which a stateful NAT/firewall always drops (no conntrack pinhole). Now it fires a beacon-coordinated RequestHolePunch to open the pinhole on both NATs, then pushes encrypted probes at the peer's REAL address so the peer's ClearRelayOnDirect promotes the path. Unpins blackhole-pinned (non-relay-only) peers, resolves fresh when uncached, and runs every 15 s (was 5 min). - tunnel.go: add SendDirectProbeTo — encrypted probe to an explicit real address (the upgrade primitive; the stored peers[] entry for a relay peer is the beacon placeholder). - ipc.go: handlePreferDirect — drop tunnel + cached resolution so the next dial re-runs resolve + punch; unpin relay. - pilotctl/main.go: send-file streams by default (TypeFileStream) and falls back to single-frame TypeFile when the peer never sends an INIT-ACK (back-compat); --no-stream forces legacy; reports transport/sha256/throughput. Adds `prefer-direct` command + --prefer-direct/--timeout flags. Verified on Mac↔GCP-VM (true dual-NAT) and a fresh throwaway VM: tunnel goes relay=False via hole-punch in ~8 s through a default-deny firewall, holds direct through a 50 MB transfer (no flip), byte-perfect sha256, ~7-15× the relay throughput. Survives a cold restart of both ends. go.mod points common/dataexchange at branch commits (pseudo-versions) pending their tagged releases; the version-bump to proper tags happens at release time (v1.11.1), which is intentionally held for review. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 5b500d5 commit f0786fd

6 files changed

Lines changed: 448 additions & 31 deletions

File tree

cmd/pilotctl/main.go

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,10 +1176,11 @@ Flags:
11761176
11771177
Publish a message to a topic on a remote node.
11781178
`,
1179-
"send-file": `Usage: pilotctl send-file <address|hostname> <filepath> [--timeout <dur>]
1179+
"send-file": `Usage: pilotctl send-file <address|hostname> <filepath> [--timeout <dur>] [--prefer-direct]
11801180
11811181
Send a file to a remote node via the data-exchange stream. Files are
1182-
capped at 256 MiB (the data-exchange frame ceiling).
1182+
capped at 256 MiB (the data-exchange frame ceiling) unless both daemons
1183+
have raised PILOT_DATAEXCHANGE_MAX_FRAME — see the dataexchange package.
11831184
11841185
Flags:
11851186
--timeout <dur> give up if the receiver does not ACK within this
@@ -1189,6 +1190,15 @@ Flags:
11891190
exits with a non-zero code and a clear hint
11901191
instead of hanging until SO_KEEPALIVE trips
11911192
(~120s by default on the OS).
1193+
--prefer-direct drop the existing tunnel + sticky relay flag for
1194+
this peer before dialing, so the daemon retries
1195+
a direct UDP path instead of reusing the
1196+
beacon-mediated relay tunnel. Useful when ping
1197+
works but send-file hangs — typical sign of a
1198+
relay path that established once and got stuck.
1199+
Best-effort: if the peer is genuinely behind a
1200+
symmetric NAT the daemon will still fall back to
1201+
relay within the dial retry budget.
11921202
11931203
What you see during a transfer (TTY only):
11941204
sending <file> to <target>… <Ns> self-rewriting elapsed line
@@ -1538,6 +1548,8 @@ dispatch:
15381548
cmdTrust(cmdArgs)
15391549
case "trusted":
15401550
cmdTrusted(cmdArgs)
1551+
case "prefer-direct":
1552+
cmdPreferDirect(cmdArgs)
15411553

15421554
// Networks
15431555
case "network":
@@ -3710,7 +3722,64 @@ func cmdSendFile(args []string) {
37103722
// Best-effort: warns on stderr and continues if handshake fails.
37113723
maybeAutoHandshake(d, target, false)
37123724

3725+
// --prefer-direct breaks the daemon out of a stuck-on-relay tunnel
3726+
// BEFORE we dial port 1001. Without this, a previously-established
3727+
// relay tunnel is reused and the dial inherits its broken stream
3728+
// behavior. We send the IPC, log what the daemon reset, and proceed
3729+
// regardless — an old daemon returns "unknown command" which we
3730+
// treat as a best-effort hint, not a hard failure.
3731+
if flagBool(flags, "prefer-direct") {
3732+
resp, perr := d.PreferDirect(target.Node)
3733+
switch {
3734+
case perr != nil && strings.Contains(perr.Error(), "unknown command"):
3735+
fmt.Fprintln(os.Stderr, sDim("--prefer-direct: daemon does not support it (pre-v1.12.0); proceeding with existing tunnel"))
3736+
case perr != nil:
3737+
fmt.Fprintln(os.Stderr, sDim("--prefer-direct: "+perr.Error()+" (continuing)"))
3738+
default:
3739+
had, _ := resp["had_tunnel"].(bool)
3740+
wasActive, _ := resp["was_relay_active"].(bool)
3741+
wasPinned, _ := resp["was_relay_pinned"].(bool)
3742+
fmt.Fprintln(os.Stderr, sDim(fmt.Sprintf("--prefer-direct: tunnel=%v relay_was_active=%v relay_was_pinned=%v",
3743+
had, wasActive, wasPinned)))
3744+
}
3745+
}
3746+
37133747
filePath := pos[1]
3748+
filename := filepath.Base(filePath)
3749+
3750+
fi, err := os.Stat(filePath)
3751+
if err != nil {
3752+
if os.IsNotExist(err) {
3753+
fatalCode("not_found", "file not found: %s", filePath)
3754+
}
3755+
if os.IsPermission(err) {
3756+
fatalCode("internal", "permission denied: %s", filePath)
3757+
}
3758+
fatalCode("internal", "stat file: %v", err)
3759+
}
3760+
if fi.IsDir() {
3761+
fatalCode("invalid_argument", "%s is a directory, not a file", filePath)
3762+
}
3763+
size := fi.Size()
3764+
3765+
// Streamed transfer (default): chunked, ACK'd, resumable, end-to-end
3766+
// SHA-256 verified — no per-frame size cap, and big files no longer
3767+
// collapse into one giant frame that stalls over relay (or over a
3768+
// direct link that flips to relay under one-way load). Falls back to
3769+
// the single-frame TypeFile path when the receiver is too old to
3770+
// understand TypeFileStream (it never sends an INIT-ACK).
3771+
if !flagBool(flags, "no-stream") {
3772+
if res, serr := streamSendFile(d, target, filePath, filename, size, timeout); serr == nil {
3773+
outputOK(res)
3774+
return
3775+
} else if !errors.Is(serr, dataexchange.ErrStreamUnsupported) {
3776+
fatalHint("connection_failed",
3777+
"check reachability: pilotctl ping "+target.String()+" · for very large/slow links raise --timeout",
3778+
"streamed send-file failed: %v", serr)
3779+
}
3780+
fmt.Fprintln(os.Stderr, sDim("receiver does not support streamed transfer (pre-v1.12.0); falling back to single-frame TypeFile"))
3781+
}
3782+
37143783
data, err := os.ReadFile(filePath)
37153784
if err != nil {
37163785
if os.IsNotExist(err) {
@@ -3725,14 +3794,12 @@ func cmdSendFile(args []string) {
37253794
// Reject files that would exceed the data-exchange frame cap before
37263795
// opening the connection — keeps the failure path clean and avoids
37273796
// streaming a quarter-gigabyte just to have the receiver close.
3728-
if len(data) > dataexchange.MaxFrameSize {
3797+
if uint32(len(data)) > dataexchange.MaxFrameSize {
37293798
fatalCode("invalid_argument",
3730-
"file too large: %d bytes (max %d). The chunked-streaming protocol planned in docs/PROPOSAL-reliable-file-transfer.md will lift this; until then split the file or compress it.",
3799+
"file too large: %d bytes (max %d) for the legacy single-frame path. Use the default streamed transfer (omit --no-stream) against a v1.12.0+ receiver.",
37313800
len(data), dataexchange.MaxFrameSize)
37323801
}
37333802

3734-
filename := filepath.Base(filePath)
3735-
37363803
client, err := dataexchange.Dial(d, target)
37373804
if err != nil {
37383805
hint := classifyDaemonError(err)
@@ -3818,6 +3885,54 @@ func cmdSendFile(args []string) {
38183885
outputOK(result)
38193886
}
38203887

3888+
// streamSendFile transfers filePath with the chunked, ACK'd, resumable
3889+
// TypeFileStream protocol. It returns a result map on success; the sentinel
3890+
// dataexchange.ErrStreamUnsupported tells the caller to fall back to the
3891+
// single-frame TypeFile path (the receiver is too old). timeout bounds the
3892+
// wait for any single ACK and for the receiver's final verification.
3893+
func streamSendFile(d *driver.Driver, target protocol.Addr, filePath, filename string, size int64, timeout time.Duration) (map[string]interface{}, error) {
3894+
client, err := dataexchange.Dial(d, target)
3895+
if err != nil {
3896+
return nil, err
3897+
}
3898+
defer client.Close()
3899+
3900+
f, err := os.Open(filePath)
3901+
if err != nil {
3902+
return nil, err
3903+
}
3904+
defer f.Close()
3905+
3906+
stop := startWaitProgress(fmt.Sprintf("streaming %s to %s", filename, target))
3907+
start := time.Now()
3908+
res, serr := client.SendFileStream(filename, f, size, timeout)
3909+
stop()
3910+
if serr != nil {
3911+
return nil, serr
3912+
}
3913+
if !res.OK {
3914+
return nil, fmt.Errorf("receiver rejected file: %s", res.Message)
3915+
}
3916+
3917+
elapsed := time.Since(start)
3918+
mbps := 0.0
3919+
if elapsed > 0 {
3920+
mbps = (float64(res.TotalBytes) * 8.0) / (1e6 * elapsed.Seconds())
3921+
}
3922+
return map[string]interface{}{
3923+
"filename": filename,
3924+
"bytes": res.TotalBytes,
3925+
"bytes_sent": res.BytesSent,
3926+
"bytes_resumed": res.BytesResumed,
3927+
"sha256": res.Sha256,
3928+
"destination": target.String(),
3929+
"elapsed_ms": elapsed.Milliseconds(),
3930+
"throughput_mbps": mbps,
3931+
"transport": "filestream",
3932+
"verified": res.OK,
3933+
}, nil
3934+
}
3935+
38213936
func cmdSendMessage(args []string) {
38223937
flags, pos := parseFlags(args)
38233938
if len(pos) < 1 {
@@ -4303,6 +4418,44 @@ func cmdReject(args []string) {
43034418
}
43044419
}
43054420

4421+
// cmdPreferDirect drops the daemon's tunnel + sticky routing state for a
4422+
// peer so the next dial retries a fresh direct UDP path.
4423+
//
4424+
// Use case: ping <peer> works (the small UDP fits through the beacon
4425+
// relay just fine) but send-file <peer> hangs ~120s and EOFs — symptom
4426+
// of a relay-mediated tunnel that established once and got stuck for
4427+
// stream traffic. Calling prefer-direct + retrying the dial routes the
4428+
// next attempt through ensureTunnel's resolve-and-punch path, which
4429+
// usually re-establishes a working direct UDP path.
4430+
func cmdPreferDirect(args []string) {
4431+
if len(args) < 1 {
4432+
fatalCode("invalid_argument", "usage: pilotctl prefer-direct <node_id|address|hostname>")
4433+
}
4434+
d := connectDriver()
4435+
defer d.Close()
4436+
4437+
nodeID := resolveToNodeID(d, args[0])
4438+
resp, err := d.PreferDirect(nodeID)
4439+
if err != nil {
4440+
if strings.Contains(err.Error(), "unknown command") {
4441+
fatalHint("not_implemented",
4442+
"upgrade the daemon: brew upgrade pilotprotocol (or re-run install.sh)",
4443+
"daemon does not support prefer-direct (pre-v1.12.0)")
4444+
}
4445+
fatalCode("connection_failed", "prefer-direct: %v", err)
4446+
}
4447+
if jsonOutput {
4448+
outputOK(resp)
4449+
return
4450+
}
4451+
had, _ := resp["had_tunnel"].(bool)
4452+
wasActive, _ := resp["was_relay_active"].(bool)
4453+
wasPinned, _ := resp["was_relay_pinned"].(bool)
4454+
fmt.Printf("reset routing state for node %d\n", nodeID)
4455+
fmt.Println(sDim(fmt.Sprintf(" tunnel was up: %v · relay was active: %v · relay was pinned: %v", had, wasActive, wasPinned)))
4456+
fmt.Println(sDim(" next dial will re-resolve from registry and prefer direct; falls back to relay if direct still fails"))
4457+
}
4458+
43064459
func cmdUntrust(args []string) {
43074460
if len(args) < 1 {
43084461
fatalCode("invalid_argument", "usage: pilotctl untrust <node_id|address|hostname>")

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ require (
66
github.com/coder/websocket v1.8.14
77
github.com/pilot-protocol/app-store v1.0.1-beta.1.0.20260609061942-8852c785a264
88
github.com/pilot-protocol/beacon v0.2.6
9-
github.com/pilot-protocol/common v0.4.8
10-
github.com/pilot-protocol/dataexchange v0.2.0
9+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6
10+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98
1111
github.com/pilot-protocol/eventstream v0.2.2
1212
github.com/pilot-protocol/handshake v0.2.1
1313
github.com/pilot-protocol/nameserver v0.2.1

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ github.com/pilot-protocol/beacon v0.2.6 h1:grxwaVyPRUT0W6coyjYfNkO0rpzOIrwrKn94S
1414
github.com/pilot-protocol/beacon v0.2.6/go.mod h1:I/UhEv097g1z/qtAVDZbEhf3R5tzM0Dp71vGHah52A4=
1515
github.com/pilot-protocol/common v0.4.8 h1:eS2Bc+XcZWJ/qhwwOZbXwIWhtNdOijuoEp716kQE+/c=
1616
github.com/pilot-protocol/common v0.4.8/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4=
17+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6 h1:Us3qSMPTBHPDQXFPY07BoUanriw1rVzS6SAHcbddqzY=
18+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4=
1719
github.com/pilot-protocol/dataexchange v0.2.0 h1:ldE6AyrES1uvdnn1NBl0KZ7C+SSWNtmeHHU3CQhwSCo=
1820
github.com/pilot-protocol/dataexchange v0.2.0/go.mod h1:JVy2+hr/IjzMPshxjExbGO/4SbJTs7ZJ7iYvT/ODF3Q=
21+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98 h1:Bqgnf4CZC7aZJyDzz/E7agwXotArJg2FvFlNDqouhLo=
22+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98/go.mod h1:tM9eyyruBdnxhhUtViasUjnAElwF/r5PQvCYKLdlTLY=
1923
github.com/pilot-protocol/eventstream v0.2.2 h1:E0IjveK7K+dsIbE/5hD3N821FkHzxVsx1tiAORMzt8k=
2024
github.com/pilot-protocol/eventstream v0.2.2/go.mod h1:gUjoMEItW1SRJYEq39VlcIeDe2LcE5B18/4bcaUJNrs=
2125
github.com/pilot-protocol/handshake v0.2.0 h1:uLeV8iNHcsHbcVH+GZ9p7uuIbObA8BReDByF5XGjzB8=

pkg/daemon/daemon.go

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,13 @@ const (
196196
MaxZeroWindowProbes = 30
197197
)
198198

199-
// RelayProbeInterval is how often we probe relay-flagged peers for direct connectivity.
200-
const RelayProbeInterval = 5 * time.Minute
199+
// RelayProbeInterval is how often we attempt to upgrade relay-flagged peers
200+
// to a direct path via coordinated hole-punching. Was 5m (far too slow — a
201+
// peer stuck on relay stayed there for minutes); tightened to 15s so a peer
202+
// that becomes direct-capable (or one that briefly flipped to relay under
203+
// load) is re-punched promptly. The punch + probe is cheap and only runs for
204+
// peers still flagged relay, so it self-limits once a peer is promoted.
205+
const RelayProbeInterval = 15 * time.Second
201206

202207
// EndpointCacheTTL is how long a cached endpoint is considered fresh.
203208
// After this, the entry is stale but still usable as a fallback.
@@ -5204,32 +5209,89 @@ func (d *Daemon) relayProbeLoop() {
52045209
case <-d.stopCh:
52055210
return
52065211
case <-ticker.C:
5207-
relayPeers := d.tunnels.RelayPeerIDs()
5208-
for _, nodeID := range relayPeers {
5209-
// P1-010 fix: send a targeted direct probe without flipping
5210-
// the relay flag. If the peer's direct path has recovered, the
5211-
// response will arrive on tm.sock from their real address;
5212-
// handleEncrypted auto-clears relay mode on a successful
5213-
// direct decrypt. Concurrent traffic (key exchange replies,
5214-
// retransmits) continues going via relay during the probe.
5215-
probe := &protocol.Packet{
5216-
Version: protocol.Version,
5217-
Flags: protocol.FlagACK,
5218-
Protocol: protocol.ProtoControl,
5219-
Src: d.Addr(),
5220-
Dst: protocol.Addr{Network: 0, Node: nodeID},
5221-
SrcPort: protocol.PortPing,
5222-
DstPort: protocol.PortPing,
5223-
Seq: 1,
5224-
}
5225-
if err := d.tunnels.SendDirectProbe(nodeID, probe); err != nil {
5226-
slog.Debug("relay direct-probe skipped", "node_id", nodeID, "error", err)
5227-
}
5212+
for _, nodeID := range d.tunnels.RelayPeerIDs() {
5213+
go d.tryDirectUpgrade(nodeID)
52285214
}
52295215
}
52305216
}
52315217
}
52325218

5219+
// tryDirectUpgrade attempts to promote a relay-flagged peer to a direct path
5220+
// via coordinated NAT hole-punching, then verifies on the rig that the path
5221+
// actually carries traffic.
5222+
//
5223+
// Why this exists: the old relayProbeLoop sent a single one-way SendDirectProbe
5224+
// and assumed the direct path "had recovered". Through a stateful firewall/NAT
5225+
// a one-way probe is dropped — there is no conntrack pinhole until BOTH peers
5226+
// send. So a relay tunnel never upgraded unless the peer was already publicly
5227+
// reachable. Here we (1) coordinate a simultaneous punch via the beacon to
5228+
// open the pinhole on both NATs, then (2) push several encrypted probes at the
5229+
// peer's REAL address (not the beacon placeholder) so the peer's
5230+
// ClearRelayOnDirect promotes the path (DirectClearsRequired=3 direct decrypts).
5231+
func (d *Daemon) tryDirectUpgrade(nodeID uint32) {
5232+
// Only act on peers we have authoritative resolve info for. Punching a
5233+
// relay-only peer would leak its real IP via the beacon PunchCommand
5234+
// (see establishConnection), so require resolve info that says the peer
5235+
// is NOT relay-only before we punch or probe.
5236+
resp, ok := d.cachedResolve(nodeID)
5237+
if !ok {
5238+
// A relay tunnel established via beacon discovery never populated
5239+
// the resolve cache. Resolve fresh so we can target the peer's real
5240+
// address; without this the upgrade can never start.
5241+
if d.regConn == nil {
5242+
return
5243+
}
5244+
r, err := d.regConn.Resolve(nodeID, d.NodeID())
5245+
if err != nil {
5246+
return
5247+
}
5248+
d.cacheResolve(nodeID, r)
5249+
resp = r
5250+
}
5251+
if relayOnly, _ := resp["relay_only"].(bool); relayOnly {
5252+
return
5253+
}
5254+
realAddrStr, _ := resp["real_addr"].(string)
5255+
if realAddrStr == "" {
5256+
return
5257+
}
5258+
realAddr, err := net.ResolveUDPAddr("udp", realAddrStr)
5259+
if err != nil {
5260+
return
5261+
}
5262+
5263+
// A prior blackhole flip may have PINNED the peer to relay; ClearRelayOnDirect
5264+
// will not promote a pinned peer. Unpin it — we've confirmed above it is not
5265+
// registry relay-only, so direct is allowed to win on its merits.
5266+
if d.tunnels.IsRelayPinned(nodeID) {
5267+
d.tunnels.SetRelayPeerPinned(nodeID, false)
5268+
}
5269+
5270+
// Coordinate the punch (opens the conntrack pinhole on both NATs).
5271+
d.tunnels.RequestHolePunch(nodeID)
5272+
5273+
probe := &protocol.Packet{
5274+
Version: protocol.Version,
5275+
Flags: protocol.FlagACK,
5276+
Protocol: protocol.ProtoControl,
5277+
Src: d.Addr(),
5278+
Dst: protocol.Addr{Network: 0, Node: nodeID},
5279+
SrcPort: protocol.PortPing,
5280+
DstPort: protocol.PortPing,
5281+
Seq: 1,
5282+
}
5283+
// Give the punch a moment to land, then probe the real address a few
5284+
// times so at least DirectClearsRequired (=3) direct decrypts arrive
5285+
// while the pinhole is open.
5286+
for i := 0; i < 5; i++ {
5287+
time.Sleep(200 * time.Millisecond)
5288+
if err := d.tunnels.SendDirectProbeTo(nodeID, realAddr, probe); err != nil {
5289+
slog.Debug("direct upgrade probe skipped", "node_id", nodeID, "error", err)
5290+
return
5291+
}
5292+
}
5293+
}
5294+
52335295
// ---------------------------------------------------------------------------
52345296
// Network membership reconciliation: periodic delta detection vs the
52355297
// registry's authoritative membership list. The reconciler is L7-only:

0 commit comments

Comments
 (0)