Skip to content

Commit 17f2c55

Browse files
authored
fix(sync): preserve explicit remote path for mesh (#88)
1 parent 561c502 commit 17f2c55

12 files changed

Lines changed: 537 additions & 85 deletions

File tree

internal/cli/mesh.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"fmt"
66
"log/slog"
77
"sort"
8+
"strings"
89
"sync"
910
"time"
1011

1112
"github.com/acmore/okdev/internal/kube"
13+
syncengine "github.com/acmore/okdev/internal/sync"
1214
"github.com/acmore/okdev/internal/workload"
1315
)
1416

@@ -30,11 +32,18 @@ type meshSummary struct {
3032
Receivers []meshReceiverStatus
3133
}
3234

35+
func meshFolderPath(syncPairs []syncengine.Pair, workspaceMountPath string) string {
36+
if len(syncPairs) == 1 && strings.TrimSpace(syncPairs[0].Remote) != "" {
37+
return syncPairs[0].Remote
38+
}
39+
return workspaceMountPath
40+
}
41+
3342
// setupMesh configures syncthing mesh sync between the hub (target) pod and
3443
// all receiver pods in the session. It reads the hub's syncthing device ID,
3544
// then configures each receiver's sidecar to peer with the hub. Finally it
3645
// waits for all receivers to complete initial sync.
37-
func setupMesh(ctx context.Context, opts *Options, k *kube.Client, namespace, sessionName string, labels map[string]string, hubPod, folderID, workspaceMountPath string, timeout time.Duration, onStatus func(string)) (*meshSummary, error) {
46+
func setupMesh(ctx context.Context, opts *Options, k *kube.Client, namespace, sessionName string, labels map[string]string, hubPod, folderID, folderPath string, timeout time.Duration, onStatus func(string)) (*meshSummary, error) {
3847
// 1. Discover receiver pods.
3948
selector := workload.DiscoveryLabelSelector(labels)
4049
if selector != "" {
@@ -185,7 +194,7 @@ func setupMesh(ctx context.Context, opts *Options, k *kube.Client, namespace, se
185194
hubPeers[info.DeviceID] = fmt.Sprintf("tcp://%s:22000", info.Pod.PodIP)
186195
}
187196
if len(hubPeers) > 0 {
188-
if err := configureSyncthingMeshHub(ctx, hubBase, hubKey, hubDeviceID, hubPeers, folderID, workspaceMountPath); err != nil {
197+
if err := configureSyncthingMeshHub(ctx, hubBase, hubKey, hubDeviceID, hubPeers, folderID, folderPath); err != nil {
189198
return nil, fmt.Errorf("configure hub syncthing mesh: %w", err)
190199
}
191200
}
@@ -203,7 +212,7 @@ func setupMesh(ctx context.Context, opts *Options, k *kube.Client, namespace, se
203212
results[idx] = meshReceiverStatus{Pod: ri.Pod.Name, Err: ri.Err}
204213
return
205214
}
206-
results[idx] = configureAndWaitMeshReceiver(ctx, opts, k, namespace, ri.Pod, ri.APIKey, ri.DeviceID, hubBase, hubKey, hubDeviceID, hubAddr, folderID, workspaceMountPath, timeout)
215+
results[idx] = configureAndWaitMeshReceiver(ctx, opts, k, namespace, ri.Pod, ri.APIKey, ri.DeviceID, hubBase, hubKey, hubDeviceID, hubAddr, folderID, folderPath, timeout)
207216
}(i, info)
208217
}
209218
wg.Wait()
@@ -382,7 +391,7 @@ func syncthingMergeMeshHubFolderDevices(existingFolderDevices, devices any, hubD
382391

383392
// configureAndWaitMeshReceiver configures a single receiver pod's syncthing
384393
// to peer with the hub and waits for initial sync to complete.
385-
func configureAndWaitMeshReceiver(ctx context.Context, opts *Options, k *kube.Client, namespace string, pod kube.PodSummary, recvKey, recvDeviceID, hubBase, hubKey, hubDeviceID, hubAddr, folderID, workspaceMountPath string, timeout time.Duration) meshReceiverStatus {
394+
func configureAndWaitMeshReceiver(ctx context.Context, opts *Options, k *kube.Client, namespace string, pod kube.PodSummary, recvKey, recvDeviceID, hubBase, hubKey, hubDeviceID, hubAddr, folderID, folderPath string, timeout time.Duration) meshReceiverStatus {
386395
status := meshReceiverStatus{Pod: pod.Name}
387396

388397
// Ensure the receiver sidecar has syncthing running.
@@ -404,15 +413,15 @@ func configureAndWaitMeshReceiver(ctx context.Context, opts *Options, k *kube.Cl
404413
}
405414

406415
// Ensure workspace dir exists on receiver.
407-
if _, err := execInSyncthingContainer(ctx, k, namespace, pod.Name, fmt.Sprintf("mkdir -p %s", workspaceMountPath)); err != nil {
416+
if _, err := execInSyncthingContainer(ctx, k, namespace, pod.Name, fmt.Sprintf("mkdir -p %s", folderPath)); err != nil {
408417
slog.Debug("mesh: mkdir workspace on receiver", "pod", pod.Name, "error", err)
409418
}
410419

411420
// Configure receiver syncthing to peer with hub as receiveonly.
412421
if err := configureSyncthingPeer(ctx, recvBase, recvKey,
413422
recvDeviceID, hubDeviceID,
414423
hubAddr,
415-
folderID, workspaceMountPath,
424+
folderID, folderPath,
416425
"receiveonly",
417426
60, 1, // rescan/watcher intervals
418427
false, false, // ignoreDelete, relaysEnabled
@@ -443,7 +452,19 @@ func configureAndWaitMeshReceiver(ctx context.Context, opts *Options, k *kube.Cl
443452
}
444453

445454
_, needBytes, pollErr := syncthingCompletion(ctx, recvBase, recvKey, folderID, hubDeviceID)
446-
if pollErr == nil && needBytes == 0 && status.Connected {
455+
filesReady := true
456+
if pollErr == nil && status.Connected {
457+
if hubStatus, err := syncthingFolderStatusInfoForFolder(ctx, hubBase, hubKey, folderID); err == nil && hubStatus.LocalFiles > 0 {
458+
receiverReady, _, readyErr := syncthingFolderHasLocalFiles(ctx, recvBase, recvKey, folderID, hubStatus.LocalFiles)
459+
if readyErr != nil {
460+
slog.Debug("mesh: receiver file materialization poll error", "pod", pod.Name, "error", readyErr)
461+
filesReady = false
462+
} else {
463+
filesReady = receiverReady
464+
}
465+
}
466+
}
467+
if pollErr == nil && needBytes == 0 && status.Connected && filesReady {
447468
status.Synced = true
448469
slog.Debug("mesh: receiver synced", "pod", pod.Name)
449470
return status
@@ -574,7 +595,19 @@ func probeMeshReceiver(ctx context.Context, opts *Options, k *kube.Client, names
574595
_, needBytes, pollErr := syncthingCompletion(ctx, recvBase, recvKey, folderID, hubDeviceID)
575596
if pollErr == nil {
576597
h.NeedBytes = needBytes
577-
h.InSync = needBytes == 0 && h.Connected
598+
filesReady := true
599+
if h.Connected {
600+
if hubStatus, err := syncthingFolderStatusInfoForFolder(ctx, hubBase, hubKey, folderID); err == nil && hubStatus.LocalFiles > 0 {
601+
receiverReady, _, readyErr := syncthingFolderHasLocalFiles(ctx, recvBase, recvKey, folderID, hubStatus.LocalFiles)
602+
if readyErr != nil {
603+
h.Err = fmt.Sprintf("file materialization poll: %v", readyErr)
604+
filesReady = false
605+
} else {
606+
filesReady = receiverReady
607+
}
608+
}
609+
}
610+
h.InSync = needBytes == 0 && h.Connected && filesReady
578611
} else {
579612
h.Err = fmt.Sprintf("completion poll: %v", pollErr)
580613
}
@@ -599,8 +632,8 @@ func brokenMeshReceiverPods(summary *meshHealthSummary) []string {
599632
// repairMeshReceivers re-runs the full mesh setup which reconfigures all
600633
// receivers. setupMesh is idempotent — healthy receivers reconverge quickly
601634
// while broken ones get reconfigured.
602-
func repairMeshReceivers(ctx context.Context, opts *Options, k *kube.Client, namespace, sessionName string, labels map[string]string, hubPod, folderID, workspaceMountPath string, onStatus func(string)) (*meshSummary, error) {
603-
return setupMesh(ctx, opts, k, namespace, sessionName, labels, hubPod, folderID, workspaceMountPath, meshSetupTimeout, onStatus)
635+
func repairMeshReceivers(ctx context.Context, opts *Options, k *kube.Client, namespace, sessionName string, labels map[string]string, hubPod, folderID, folderPath string, onStatus func(string)) (*meshSummary, error) {
636+
return setupMesh(ctx, opts, k, namespace, sessionName, labels, hubPod, folderID, folderPath, meshSetupTimeout, onStatus)
604637
}
605638

606639
// meshReceiverCount returns the number of receiver pods discovered in a session.

internal/cli/mesh_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111

1212
"github.com/acmore/okdev/internal/kube"
13+
syncengine "github.com/acmore/okdev/internal/sync"
1314
)
1415

1516
func TestFormatMeshSummary(t *testing.T) {
@@ -96,3 +97,17 @@ func TestMeshReceiverCount(t *testing.T) {
9697
t.Fatalf("expected 2 receivers, got %d", count)
9798
}
9899
}
100+
101+
func TestMeshFolderPathUsesExplicitRemoteSyncPath(t *testing.T) {
102+
got := meshFolderPath([]syncengine.Pair{{Local: ".", Remote: "/workspace/a"}}, "/workspace")
103+
if got != "/workspace/a" {
104+
t.Fatalf("expected explicit remote sync path, got %q", got)
105+
}
106+
}
107+
108+
func TestMeshFolderPathFallsBackToWorkspaceMountPath(t *testing.T) {
109+
got := meshFolderPath(nil, "/workspace")
110+
if got != "/workspace" {
111+
t.Fatalf("expected workspace mount path fallback, got %q", got)
112+
}
113+
}

internal/cli/sync.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,11 @@ func repairMeshIfNeeded(cmd *cobra.Command, cc *commandContext, hubPod string) e
292292
}
293293

294294
folderID := "okdev-" + cc.sessionName
295-
workspaceMountPath := cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath)
295+
pairs, err := syncengine.ParsePairs(cc.cfg.Spec.Sync.Paths, cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath))
296+
if err != nil {
297+
return fmt.Errorf("parse sync paths: %w", err)
298+
}
299+
folderPath := meshFolderPath(pairs, cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath))
296300

297301
fmt.Fprintf(cmd.OutOrStdout(), "Checking mesh health for %d receiver(s) …\n", count)
298302
health, err := checkMeshHealth(ctx, cc.opts, cc.kube, cc.namespace, cc.sessionName, labels, hubPod, folderID)
@@ -308,7 +312,7 @@ func repairMeshIfNeeded(cmd *cobra.Command, cc *commandContext, hubPod string) e
308312
}
309313

310314
fmt.Fprintf(cmd.OutOrStdout(), "Mesh: %d broken receiver(s): %s\n", len(broken), strings.Join(broken, ", "))
311-
return doMeshRepair(cmd, ctx, cc, labels, hubPod, folderID, workspaceMountPath)
315+
return doMeshRepair(cmd, ctx, cc, labels, hubPod, folderID, folderPath)
312316
}
313317

314318
func forceRepairMesh(cmd *cobra.Command, cc *commandContext, hubPod string) error {
@@ -323,15 +327,19 @@ func forceRepairMesh(cmd *cobra.Command, cc *commandContext, hubPod string) erro
323327
}
324328

325329
folderID := "okdev-" + cc.sessionName
326-
workspaceMountPath := cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath)
330+
pairs, err := syncengine.ParsePairs(cc.cfg.Spec.Sync.Paths, cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath))
331+
if err != nil {
332+
return fmt.Errorf("parse sync paths: %w", err)
333+
}
334+
folderPath := meshFolderPath(pairs, cc.cfg.EffectiveWorkspaceMountPath(cc.cfgPath))
327335

328336
fmt.Fprintf(cmd.OutOrStdout(), "Force resetting mesh for %d receiver(s) …\n", count)
329-
return doMeshRepair(cmd, ctx, cc, labels, hubPod, folderID, workspaceMountPath)
337+
return doMeshRepair(cmd, ctx, cc, labels, hubPod, folderID, folderPath)
330338
}
331339

332-
func doMeshRepair(cmd *cobra.Command, ctx context.Context, cc *commandContext, labels map[string]string, hubPod, folderID, workspaceMountPath string) error {
340+
func doMeshRepair(cmd *cobra.Command, ctx context.Context, cc *commandContext, labels map[string]string, hubPod, folderID, folderPath string) error {
333341
fmt.Fprintln(cmd.OutOrStdout(), "Repairing mesh sync …")
334-
result, meshErr := repairMeshReceivers(ctx, cc.opts, cc.kube, cc.namespace, cc.sessionName, labels, hubPod, folderID, workspaceMountPath, func(status string) {
342+
result, meshErr := repairMeshReceivers(ctx, cc.opts, cc.kube, cc.namespace, cc.sessionName, labels, hubPod, folderID, folderPath, func(status string) {
335343
fmt.Fprintf(cmd.OutOrStdout(), " mesh: %s\n", status)
336344
})
337345
if meshErr != nil {

0 commit comments

Comments
 (0)