Skip to content

Commit 8b9feaa

Browse files
teovlclaude
andcommitted
feat(transport): NAT hole-punch direct-upgrade + streamed send-file + prefer-direct
Make P2P transfers actually go direct (and stay direct) across NAT, and ship large files reliably. - daemon.go: rewrite relayProbeLoop → tryDirectUpgrade. The old loop sent a one-way SendDirectProbe every 5 min, which a stateful NAT/firewall always drops (no conntrack pinhole). Now it fires a beacon-coordinated RequestHolePunch to open the pinhole on both NATs, then pushes encrypted probes at the peer's REAL address so the peer's ClearRelayOnDirect promotes the path. Unpins blackhole-pinned (non-relay-only) peers, resolves fresh when uncached, and runs every 15 s (was 5 min). - tunnel.go: add SendDirectProbeTo — encrypted probe to an explicit real address (the upgrade primitive; the stored peers[] entry for a relay peer is the beacon placeholder). - ipc.go: handlePreferDirect — drop tunnel + cached resolution so the next dial re-runs resolve + punch; unpin relay. - pilotctl/main.go: send-file streams by default (TypeFileStream) and falls back to single-frame TypeFile when the peer never sends an INIT-ACK (back-compat); --no-stream forces legacy; reports transport/sha256/throughput. Adds `prefer-direct` command + --prefer-direct/--timeout flags. Verified on Mac↔GCP-VM (true dual-NAT) and a fresh throwaway VM: tunnel goes relay=False via hole-punch in ~8 s through a default-deny firewall, holds direct through a 50 MB transfer (no flip), byte-perfect sha256, ~7-15× the relay throughput. Survives a cold restart of both ends. go.mod points common/dataexchange at branch commits (pseudo-versions) pending their tagged releases; the version-bump to proper tags happens at release time (v1.11.1), which is intentionally held for review. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 0582575 commit 8b9feaa

6 files changed

Lines changed: 449 additions & 32 deletions

File tree

cmd/pilotctl/main.go

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,10 +1163,11 @@ Flags:
11631163
11641164
Publish a message to a topic on a remote node.
11651165
`,
1166-
"send-file": `Usage: pilotctl send-file <address|hostname> <filepath> [--timeout <dur>]
1166+
"send-file": `Usage: pilotctl send-file <address|hostname> <filepath> [--timeout <dur>] [--prefer-direct]
11671167
11681168
Send a file to a remote node via the data-exchange stream. Files are
1169-
capped at 256 MiB (the data-exchange frame ceiling).
1169+
capped at 256 MiB (the data-exchange frame ceiling) unless both daemons
1170+
have raised PILOT_DATAEXCHANGE_MAX_FRAME — see the dataexchange package.
11701171
11711172
Flags:
11721173
--timeout <dur> give up if the receiver does not ACK within this
@@ -1176,6 +1177,15 @@ Flags:
11761177
exits with a non-zero code and a clear hint
11771178
instead of hanging until SO_KEEPALIVE trips
11781179
(~120s by default on the OS).
1180+
--prefer-direct drop the existing tunnel + sticky relay flag for
1181+
this peer before dialing, so the daemon retries
1182+
a direct UDP path instead of reusing the
1183+
beacon-mediated relay tunnel. Useful when ping
1184+
works but send-file hangs — typical sign of a
1185+
relay path that established once and got stuck.
1186+
Best-effort: if the peer is genuinely behind a
1187+
symmetric NAT the daemon will still fall back to
1188+
relay within the dial retry budget.
11791189
11801190
What you see during a transfer (TTY only):
11811191
sending <file> to <target>… <Ns> self-rewriting elapsed line
@@ -1518,6 +1528,8 @@ dispatch:
15181528
cmdTrust(cmdArgs)
15191529
case "trusted":
15201530
cmdTrusted(cmdArgs)
1531+
case "prefer-direct":
1532+
cmdPreferDirect(cmdArgs)
15211533

15221534
// Networks
15231535
case "network":
@@ -3690,7 +3702,64 @@ func cmdSendFile(args []string) {
36903702
// Best-effort: warns on stderr and continues if handshake fails.
36913703
maybeAutoHandshake(d, target, false)
36923704

3705+
// --prefer-direct breaks the daemon out of a stuck-on-relay tunnel
3706+
// BEFORE we dial port 1001. Without this, a previously-established
3707+
// relay tunnel is reused and the dial inherits its broken stream
3708+
// behavior. We send the IPC, log what the daemon reset, and proceed
3709+
// regardless — an old daemon returns "unknown command" which we
3710+
// treat as a best-effort hint, not a hard failure.
3711+
if flagBool(flags, "prefer-direct") {
3712+
resp, perr := d.PreferDirect(target.Node)
3713+
switch {
3714+
case perr != nil && strings.Contains(perr.Error(), "unknown command"):
3715+
fmt.Fprintln(os.Stderr, sDim("--prefer-direct: daemon does not support it (pre-v1.12.0); proceeding with existing tunnel"))
3716+
case perr != nil:
3717+
fmt.Fprintln(os.Stderr, sDim("--prefer-direct: "+perr.Error()+" (continuing)"))
3718+
default:
3719+
had, _ := resp["had_tunnel"].(bool)
3720+
wasActive, _ := resp["was_relay_active"].(bool)
3721+
wasPinned, _ := resp["was_relay_pinned"].(bool)
3722+
fmt.Fprintln(os.Stderr, sDim(fmt.Sprintf("--prefer-direct: tunnel=%v relay_was_active=%v relay_was_pinned=%v",
3723+
had, wasActive, wasPinned)))
3724+
}
3725+
}
3726+
36933727
filePath := pos[1]
3728+
filename := filepath.Base(filePath)
3729+
3730+
fi, err := os.Stat(filePath)
3731+
if err != nil {
3732+
if os.IsNotExist(err) {
3733+
fatalCode("not_found", "file not found: %s", filePath)
3734+
}
3735+
if os.IsPermission(err) {
3736+
fatalCode("internal", "permission denied: %s", filePath)
3737+
}
3738+
fatalCode("internal", "stat file: %v", err)
3739+
}
3740+
if fi.IsDir() {
3741+
fatalCode("invalid_argument", "%s is a directory, not a file", filePath)
3742+
}
3743+
size := fi.Size()
3744+
3745+
// Streamed transfer (default): chunked, ACK'd, resumable, end-to-end
3746+
// SHA-256 verified — no per-frame size cap, and big files no longer
3747+
// collapse into one giant frame that stalls over relay (or over a
3748+
// direct link that flips to relay under one-way load). Falls back to
3749+
// the single-frame TypeFile path when the receiver is too old to
3750+
// understand TypeFileStream (it never sends an INIT-ACK).
3751+
if !flagBool(flags, "no-stream") {
3752+
if res, serr := streamSendFile(d, target, filePath, filename, size, timeout); serr == nil {
3753+
outputOK(res)
3754+
return
3755+
} else if !errors.Is(serr, dataexchange.ErrStreamUnsupported) {
3756+
fatalHint("connection_failed",
3757+
"check reachability: pilotctl ping "+target.String()+" · for very large/slow links raise --timeout",
3758+
"streamed send-file failed: %v", serr)
3759+
}
3760+
fmt.Fprintln(os.Stderr, sDim("receiver does not support streamed transfer (pre-v1.12.0); falling back to single-frame TypeFile"))
3761+
}
3762+
36943763
data, err := os.ReadFile(filePath)
36953764
if err != nil {
36963765
if os.IsNotExist(err) {
@@ -3705,14 +3774,12 @@ func cmdSendFile(args []string) {
37053774
// Reject files that would exceed the data-exchange frame cap before
37063775
// opening the connection — keeps the failure path clean and avoids
37073776
// streaming a quarter-gigabyte just to have the receiver close.
3708-
if len(data) > dataexchange.MaxFrameSize {
3777+
if uint32(len(data)) > dataexchange.MaxFrameSize {
37093778
fatalCode("invalid_argument",
3710-
"file too large: %d bytes (max %d). The chunked-streaming protocol planned in docs/PROPOSAL-reliable-file-transfer.md will lift this; until then split the file or compress it.",
3779+
"file too large: %d bytes (max %d) for the legacy single-frame path. Use the default streamed transfer (omit --no-stream) against a v1.12.0+ receiver.",
37113780
len(data), dataexchange.MaxFrameSize)
37123781
}
37133782

3714-
filename := filepath.Base(filePath)
3715-
37163783
client, err := dataexchange.Dial(d, target)
37173784
if err != nil {
37183785
hint := classifyDaemonError(err)
@@ -3798,6 +3865,54 @@ func cmdSendFile(args []string) {
37983865
outputOK(result)
37993866
}
38003867

3868+
// streamSendFile transfers filePath with the chunked, ACK'd, resumable
3869+
// TypeFileStream protocol. It returns a result map on success; the sentinel
3870+
// dataexchange.ErrStreamUnsupported tells the caller to fall back to the
3871+
// single-frame TypeFile path (the receiver is too old). timeout bounds the
3872+
// wait for any single ACK and for the receiver's final verification.
3873+
func streamSendFile(d *driver.Driver, target protocol.Addr, filePath, filename string, size int64, timeout time.Duration) (map[string]interface{}, error) {
3874+
client, err := dataexchange.Dial(d, target)
3875+
if err != nil {
3876+
return nil, err
3877+
}
3878+
defer client.Close()
3879+
3880+
f, err := os.Open(filePath)
3881+
if err != nil {
3882+
return nil, err
3883+
}
3884+
defer f.Close()
3885+
3886+
stop := startWaitProgress(fmt.Sprintf("streaming %s to %s", filename, target))
3887+
start := time.Now()
3888+
res, serr := client.SendFileStream(filename, f, size, timeout)
3889+
stop()
3890+
if serr != nil {
3891+
return nil, serr
3892+
}
3893+
if !res.OK {
3894+
return nil, fmt.Errorf("receiver rejected file: %s", res.Message)
3895+
}
3896+
3897+
elapsed := time.Since(start)
3898+
mbps := 0.0
3899+
if elapsed > 0 {
3900+
mbps = (float64(res.TotalBytes) * 8.0) / (1e6 * elapsed.Seconds())
3901+
}
3902+
return map[string]interface{}{
3903+
"filename": filename,
3904+
"bytes": res.TotalBytes,
3905+
"bytes_sent": res.BytesSent,
3906+
"bytes_resumed": res.BytesResumed,
3907+
"sha256": res.Sha256,
3908+
"destination": target.String(),
3909+
"elapsed_ms": elapsed.Milliseconds(),
3910+
"throughput_mbps": mbps,
3911+
"transport": "filestream",
3912+
"verified": res.OK,
3913+
}, nil
3914+
}
3915+
38013916
func cmdSendMessage(args []string) {
38023917
flags, pos := parseFlags(args)
38033918
if len(pos) < 1 {
@@ -4283,6 +4398,44 @@ func cmdReject(args []string) {
42834398
}
42844399
}
42854400

4401+
// cmdPreferDirect drops the daemon's tunnel + sticky routing state for a
4402+
// peer so the next dial retries a fresh direct UDP path.
4403+
//
4404+
// Use case: ping <peer> works (the small UDP fits through the beacon
4405+
// relay just fine) but send-file <peer> hangs ~120s and EOFs — symptom
4406+
// of a relay-mediated tunnel that established once and got stuck for
4407+
// stream traffic. Calling prefer-direct + retrying the dial routes the
4408+
// next attempt through ensureTunnel's resolve-and-punch path, which
4409+
// usually re-establishes a working direct UDP path.
4410+
func cmdPreferDirect(args []string) {
4411+
if len(args) < 1 {
4412+
fatalCode("invalid_argument", "usage: pilotctl prefer-direct <node_id|address|hostname>")
4413+
}
4414+
d := connectDriver()
4415+
defer d.Close()
4416+
4417+
nodeID := resolveToNodeID(d, args[0])
4418+
resp, err := d.PreferDirect(nodeID)
4419+
if err != nil {
4420+
if strings.Contains(err.Error(), "unknown command") {
4421+
fatalHint("not_implemented",
4422+
"upgrade the daemon: brew upgrade pilotprotocol (or re-run install.sh)",
4423+
"daemon does not support prefer-direct (pre-v1.12.0)")
4424+
}
4425+
fatalCode("connection_failed", "prefer-direct: %v", err)
4426+
}
4427+
if jsonOutput {
4428+
outputOK(resp)
4429+
return
4430+
}
4431+
had, _ := resp["had_tunnel"].(bool)
4432+
wasActive, _ := resp["was_relay_active"].(bool)
4433+
wasPinned, _ := resp["was_relay_pinned"].(bool)
4434+
fmt.Printf("reset routing state for node %d\n", nodeID)
4435+
fmt.Println(sDim(fmt.Sprintf(" tunnel was up: %v · relay was active: %v · relay was pinned: %v", had, wasActive, wasPinned)))
4436+
fmt.Println(sDim(" next dial will re-resolve from registry and prefer direct; falls back to relay if direct still fails"))
4437+
}
4438+
42864439
func cmdUntrust(args []string) {
42874440
if len(args) < 1 {
42884441
fatalCode("invalid_argument", "usage: pilotctl untrust <node_id|address|hostname>")

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ require (
66
github.com/coder/websocket v1.8.14
77
github.com/pilot-protocol/app-store v1.0.1-beta.1.0.20260609061942-8852c785a264
88
github.com/pilot-protocol/beacon v0.2.5
9-
github.com/pilot-protocol/common v0.4.8
10-
github.com/pilot-protocol/dataexchange v0.2.0
9+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6
10+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98
1111
github.com/pilot-protocol/eventstream v0.2.2
1212
github.com/pilot-protocol/handshake v0.2.1
1313
github.com/pilot-protocol/nameserver v0.2.1
@@ -18,10 +18,10 @@ require (
1818
github.com/pilot-protocol/trustedagents v0.2.3
1919
github.com/pilot-protocol/updater v0.2.2-0.20260529065627-220ed5b8383f
2020
github.com/pilot-protocol/webhook v0.2.0
21+
golang.org/x/sys v0.45.0
2122
)
2223

2324
require (
2425
github.com/expr-lang/expr v1.17.8 // indirect
2526
golang.org/x/net v0.55.0 // indirect
26-
golang.org/x/sys v0.45.0 // indirect
2727
)

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ github.com/pilot-protocol/beacon v0.2.5 h1:5+pkSPoA35r+u4Hfrph/ZfOltOyiy8lh1sCfK
1212
github.com/pilot-protocol/beacon v0.2.5/go.mod h1:I/UhEv097g1z/qtAVDZbEhf3R5tzM0Dp71vGHah52A4=
1313
github.com/pilot-protocol/common v0.4.8 h1:eS2Bc+XcZWJ/qhwwOZbXwIWhtNdOijuoEp716kQE+/c=
1414
github.com/pilot-protocol/common v0.4.8/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4=
15+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6 h1:Us3qSMPTBHPDQXFPY07BoUanriw1rVzS6SAHcbddqzY=
16+
github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4=
1517
github.com/pilot-protocol/dataexchange v0.2.0 h1:ldE6AyrES1uvdnn1NBl0KZ7C+SSWNtmeHHU3CQhwSCo=
1618
github.com/pilot-protocol/dataexchange v0.2.0/go.mod h1:JVy2+hr/IjzMPshxjExbGO/4SbJTs7ZJ7iYvT/ODF3Q=
19+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98 h1:Bqgnf4CZC7aZJyDzz/E7agwXotArJg2FvFlNDqouhLo=
20+
github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98/go.mod h1:tM9eyyruBdnxhhUtViasUjnAElwF/r5PQvCYKLdlTLY=
1721
github.com/pilot-protocol/eventstream v0.2.2 h1:E0IjveK7K+dsIbE/5hD3N821FkHzxVsx1tiAORMzt8k=
1822
github.com/pilot-protocol/eventstream v0.2.2/go.mod h1:gUjoMEItW1SRJYEq39VlcIeDe2LcE5B18/4bcaUJNrs=
1923
github.com/pilot-protocol/handshake v0.2.0 h1:uLeV8iNHcsHbcVH+GZ9p7uuIbObA8BReDByF5XGjzB8=

pkg/daemon/daemon.go

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,13 @@ const (
187187
MaxZeroWindowProbes = 30
188188
)
189189

190-
// RelayProbeInterval is how often we probe relay-flagged peers for direct connectivity.
191-
const RelayProbeInterval = 5 * time.Minute
190+
// RelayProbeInterval is how often we attempt to upgrade relay-flagged peers
191+
// to a direct path via coordinated hole-punching. Was 5m (far too slow — a
192+
// peer stuck on relay stayed there for minutes); tightened to 15s so a peer
193+
// that becomes direct-capable (or one that briefly flipped to relay under
194+
// load) is re-punched promptly. The punch + probe is cheap and only runs for
195+
// peers still flagged relay, so it self-limits once a peer is promoted.
196+
const RelayProbeInterval = 15 * time.Second
192197

193198
// EndpointCacheTTL is how long a cached endpoint is considered fresh.
194199
// After this, the entry is stale but still usable as a fallback.
@@ -5088,32 +5093,89 @@ func (d *Daemon) relayProbeLoop() {
50885093
case <-d.stopCh:
50895094
return
50905095
case <-ticker.C:
5091-
relayPeers := d.tunnels.RelayPeerIDs()
5092-
for _, nodeID := range relayPeers {
5093-
// P1-010 fix: send a targeted direct probe without flipping
5094-
// the relay flag. If the peer's direct path has recovered, the
5095-
// response will arrive on tm.sock from their real address;
5096-
// handleEncrypted auto-clears relay mode on a successful
5097-
// direct decrypt. Concurrent traffic (key exchange replies,
5098-
// retransmits) continues going via relay during the probe.
5099-
probe := &protocol.Packet{
5100-
Version: protocol.Version,
5101-
Flags: protocol.FlagACK,
5102-
Protocol: protocol.ProtoControl,
5103-
Src: d.Addr(),
5104-
Dst: protocol.Addr{Network: 0, Node: nodeID},
5105-
SrcPort: protocol.PortPing,
5106-
DstPort: protocol.PortPing,
5107-
Seq: 1,
5108-
}
5109-
if err := d.tunnels.SendDirectProbe(nodeID, probe); err != nil {
5110-
slog.Debug("relay direct-probe skipped", "node_id", nodeID, "error", err)
5111-
}
5096+
for _, nodeID := range d.tunnels.RelayPeerIDs() {
5097+
go d.tryDirectUpgrade(nodeID)
51125098
}
51135099
}
51145100
}
51155101
}
51165102

5103+
// tryDirectUpgrade attempts to promote a relay-flagged peer to a direct path
5104+
// via coordinated NAT hole-punching, then verifies on the rig that the path
5105+
// actually carries traffic.
5106+
//
5107+
// Why this exists: the old relayProbeLoop sent a single one-way SendDirectProbe
5108+
// and assumed the direct path "had recovered". Through a stateful firewall/NAT
5109+
// a one-way probe is dropped — there is no conntrack pinhole until BOTH peers
5110+
// send. So a relay tunnel never upgraded unless the peer was already publicly
5111+
// reachable. Here we (1) coordinate a simultaneous punch via the beacon to
5112+
// open the pinhole on both NATs, then (2) push several encrypted probes at the
5113+
// peer's REAL address (not the beacon placeholder) so the peer's
5114+
// ClearRelayOnDirect promotes the path (DirectClearsRequired=3 direct decrypts).
5115+
func (d *Daemon) tryDirectUpgrade(nodeID uint32) {
5116+
// Only act on peers we have authoritative resolve info for. Punching a
5117+
// relay-only peer would leak its real IP via the beacon PunchCommand
5118+
// (see establishConnection), so require resolve info that says the peer
5119+
// is NOT relay-only before we punch or probe.
5120+
resp, ok := d.cachedResolve(nodeID)
5121+
if !ok {
5122+
// A relay tunnel established via beacon discovery never populated
5123+
// the resolve cache. Resolve fresh so we can target the peer's real
5124+
// address; without this the upgrade can never start.
5125+
if d.regConn == nil {
5126+
return
5127+
}
5128+
r, err := d.regConn.Resolve(nodeID, d.NodeID())
5129+
if err != nil {
5130+
return
5131+
}
5132+
d.cacheResolve(nodeID, r)
5133+
resp = r
5134+
}
5135+
if relayOnly, _ := resp["relay_only"].(bool); relayOnly {
5136+
return
5137+
}
5138+
realAddrStr, _ := resp["real_addr"].(string)
5139+
if realAddrStr == "" {
5140+
return
5141+
}
5142+
realAddr, err := net.ResolveUDPAddr("udp", realAddrStr)
5143+
if err != nil {
5144+
return
5145+
}
5146+
5147+
// A prior blackhole flip may have PINNED the peer to relay; ClearRelayOnDirect
5148+
// will not promote a pinned peer. Unpin it — we've confirmed above it is not
5149+
// registry relay-only, so direct is allowed to win on its merits.
5150+
if d.tunnels.IsRelayPinned(nodeID) {
5151+
d.tunnels.SetRelayPeerPinned(nodeID, false)
5152+
}
5153+
5154+
// Coordinate the punch (opens the conntrack pinhole on both NATs).
5155+
d.tunnels.RequestHolePunch(nodeID)
5156+
5157+
probe := &protocol.Packet{
5158+
Version: protocol.Version,
5159+
Flags: protocol.FlagACK,
5160+
Protocol: protocol.ProtoControl,
5161+
Src: d.Addr(),
5162+
Dst: protocol.Addr{Network: 0, Node: nodeID},
5163+
SrcPort: protocol.PortPing,
5164+
DstPort: protocol.PortPing,
5165+
Seq: 1,
5166+
}
5167+
// Give the punch a moment to land, then probe the real address a few
5168+
// times so at least DirectClearsRequired (=3) direct decrypts arrive
5169+
// while the pinhole is open.
5170+
for i := 0; i < 5; i++ {
5171+
time.Sleep(200 * time.Millisecond)
5172+
if err := d.tunnels.SendDirectProbeTo(nodeID, realAddr, probe); err != nil {
5173+
slog.Debug("direct upgrade probe skipped", "node_id", nodeID, "error", err)
5174+
return
5175+
}
5176+
}
5177+
}
5178+
51175179
// ---------------------------------------------------------------------------
51185180
// Network membership reconciliation: periodic delta detection vs the
51195181
// registry's authoritative membership list. The reconciler is L7-only:

0 commit comments

Comments
 (0)