Skip to content

Commit 57afde9

Browse files
committed
feat(sync): add guarded fingerprint path lock and syncthing progress
1 parent ec2a3ad commit 57afde9

4 files changed

Lines changed: 190 additions & 30 deletions

File tree

internal/cli/syncthing.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"errors"
99
"fmt"
1010
"io"
11+
"log/slog"
1112
"net/http"
13+
"net/url"
1214
"os"
1315
"os/exec"
1416
"os/signal"
@@ -127,6 +129,9 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm
127129
fmt.Fprintf(cmd.OutOrStdout(), "Remote folder: %s\n", pair.Remote)
128130
fmt.Fprintf(cmd.OutOrStdout(), "Local binary: %s\n", localBinary)
129131
fmt.Fprintln(cmd.OutOrStdout(), "Press Ctrl+C to stop sync tunnel and local syncthing.")
132+
progressCtx, stopProgress := context.WithCancel(context.Background())
133+
defer stopProgress()
134+
go runSyncthingProgressReporter(progressCtx, cmd.OutOrStdout(), localBase, localKey, remoteBase, remoteKey, folderID, localID, remoteID)
130135

131136
sigCh := make(chan os.Signal, 1)
132137
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
@@ -289,6 +294,49 @@ func syncthingDeviceID(ctx context.Context, base, key string) (string, error) {
289294
return payload.MyID, nil
290295
}
291296

297+
func runSyncthingProgressReporter(ctx context.Context, out io.Writer, localBase, localKey, remoteBase, remoteKey, folderID, localID, remoteID string) {
298+
ticker := time.NewTicker(5 * time.Second)
299+
defer ticker.Stop()
300+
emit := func() {
301+
upPct, upNeed, err := syncthingCompletion(ctx, localBase, localKey, folderID, remoteID)
302+
if err != nil {
303+
slog.Debug("syncthing progress read failed", "side", "local", "error", err)
304+
return
305+
}
306+
downPct, downNeed, err := syncthingCompletion(ctx, remoteBase, remoteKey, folderID, localID)
307+
if err != nil {
308+
slog.Debug("syncthing progress read failed", "side", "remote", "error", err)
309+
return
310+
}
311+
fmt.Fprintf(out, "Syncthing progress: up %.1f%% (need=%dB), down %.1f%% (need=%dB)\n", upPct, upNeed, downPct, downNeed)
312+
}
313+
emit()
314+
for {
315+
select {
316+
case <-ctx.Done():
317+
return
318+
case <-ticker.C:
319+
emit()
320+
}
321+
}
322+
}
323+
324+
func syncthingCompletion(ctx context.Context, base, key, folderID, deviceID string) (float64, int64, error) {
325+
path := fmt.Sprintf("/rest/db/completion?folder=%s&device=%s", url.QueryEscape(folderID), url.QueryEscape(deviceID))
326+
body, err := syncthingAPIRequestWithContext(ctx, http.MethodGet, base, key, path, nil, "")
327+
if err != nil {
328+
return 0, 0, err
329+
}
330+
var payload struct {
331+
Completion float64 `json:"completion"`
332+
NeedBytes int64 `json:"needBytes"`
333+
}
334+
if err := json.Unmarshal(body, &payload); err != nil {
335+
return 0, 0, err
336+
}
337+
return payload.Completion, payload.NeedBytes, nil
338+
}
339+
292340
func folderTypesForMode(mode string) (local, remote string) {
293341
switch mode {
294342
case "up":
@@ -407,7 +455,11 @@ func syncthingPost(base, key, path string, body []byte) error {
407455
}
408456

409457
func syncthingAPIRequest(method, base, key, path string, body []byte, contentType string) ([]byte, error) {
410-
req, err := http.NewRequest(method, base+path, bytes.NewReader(body))
458+
return syncthingAPIRequestWithContext(context.Background(), method, base, key, path, body, contentType)
459+
}
460+
461+
func syncthingAPIRequestWithContext(ctx context.Context, method, base, key, path string, body []byte, contentType string) ([]byte, error) {
462+
req, err := http.NewRequestWithContext(ctx, method, base+path, bytes.NewReader(body))
411463
if err != nil {
412464
return nil, err
413465
}

internal/cli/syncthing_test.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package cli
22

3-
import "testing"
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
)
49

510
func TestSyncthingObjectArray(t *testing.T) {
611
arr, err := syncthingObjectArray(map[string]any{"devices": []any{map[string]any{"deviceID": "abc"}}}, "devices")
@@ -25,3 +30,30 @@ func TestSyncthingObjectMapRejectsWrongType(t *testing.T) {
2530
t.Fatal("expected type error")
2631
}
2732
}
33+
34+
func TestSyncthingCompletion(t *testing.T) {
35+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
36+
if r.URL.Path != "/rest/db/completion" {
37+
t.Fatalf("unexpected path %s", r.URL.Path)
38+
}
39+
if got := r.URL.Query().Get("folder"); got != "okdev-sess" {
40+
t.Fatalf("unexpected folder query %q", got)
41+
}
42+
if got := r.URL.Query().Get("device"); got != "REMOTEID" {
43+
t.Fatalf("unexpected device query %q", got)
44+
}
45+
if got := r.Header.Get("X-API-Key"); got != "k" {
46+
t.Fatalf("unexpected API key %q", got)
47+
}
48+
_, _ = w.Write([]byte(`{"completion":99.5,"needBytes":1234}`))
49+
}))
50+
defer srv.Close()
51+
52+
pct, need, err := syncthingCompletion(context.Background(), srv.URL, "k", "okdev-sess", "REMOTEID")
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
if pct != 99.5 || need != 1234 {
57+
t.Fatalf("unexpected completion values pct=%v need=%d", pct, need)
58+
}
59+
}

internal/sync/native.go

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ var uploadFingerprintCache = struct {
5050
m: map[string]uploadFingerprintEntry{},
5151
}
5252

53+
var uploadPathLocks sync.Map
54+
5355
func ParsePairs(configured []string, defaultRemote string) ([]Pair, error) {
5456
if len(configured) == 0 {
5557
return []Pair{{Local: ".", Remote: defaultRemote}}, nil
@@ -200,43 +202,47 @@ func syncUpPath(ctx context.Context, k Client, namespace, pod, podInstance strin
200202

201203
cacheKey := uploadFingerprintCacheKey(namespace, pod, p, excludes)
202204
if !st.IsDir() {
203-
fingerprint := fmt.Sprintf("file:%d:%d", st.Size(), st.ModTime().UnixNano())
204-
if !force && uploadFingerprintMatches(cacheKey, podInstance, fingerprint) {
205-
return pathStats{Skipped: true}, nil
206-
}
207-
if err := k.CopyToPod(ctx, namespace, absLocal, pod, p.Remote); err != nil {
208-
return pathStats{}, err
209-
}
210-
setUploadFingerprint(cacheKey, podInstance, fingerprint)
211-
return pathStats{UploadBytes: st.Size()}, nil
205+
return withUploadKeyLock(cacheKey, func() (pathStats, error) {
206+
fingerprint := fmt.Sprintf("file:%d:%d", st.Size(), st.ModTime().UnixNano())
207+
if !force && uploadFingerprintMatches(cacheKey, podInstance, fingerprint) {
208+
return pathStats{Skipped: true}, nil
209+
}
210+
if err := k.CopyToPod(ctx, namespace, absLocal, pod, p.Remote); err != nil {
211+
return pathStats{}, err
212+
}
213+
setUploadFingerprint(cacheKey, podInstance, fingerprint)
214+
return pathStats{UploadBytes: st.Size()}, nil
215+
})
212216
}
213217

214218
fingerprint, err := localDirFingerprint(absLocal, excludes)
215219
if err != nil {
216220
return pathStats{}, err
217221
}
218-
if !force && uploadFingerprintMatches(cacheKey, podInstance, fingerprint) {
219-
return pathStats{Skipped: true}, nil
220-
}
222+
return withUploadKeyLock(cacheKey, func() (pathStats, error) {
223+
if !force && uploadFingerprintMatches(cacheKey, podInstance, fingerprint) {
224+
return pathStats{Skipped: true}, nil
225+
}
221226

222-
stream, waitTar, err := startLocalTarStream(ctx, absLocal, excludes)
223-
if err != nil {
224-
return pathStats{}, err
225-
}
226-
defer stream.Close()
227+
stream, waitTar, err := startLocalTarStream(ctx, absLocal, excludes)
228+
if err != nil {
229+
return pathStats{}, err
230+
}
231+
defer stream.Close()
227232

228-
countingStream := &countingReader{Reader: stream}
229-
if err := k.ExtractTarToPod(ctx, namespace, pod, p.Remote, countingStream); err != nil {
230-
if tarErr := waitTar(); tarErr != nil {
231-
return pathStats{}, tarErr
233+
countingStream := &countingReader{Reader: stream}
234+
if err := k.ExtractTarToPod(ctx, namespace, pod, p.Remote, countingStream); err != nil {
235+
if tarErr := waitTar(); tarErr != nil {
236+
return pathStats{}, tarErr
237+
}
238+
return pathStats{}, err
232239
}
233-
return pathStats{}, err
234-
}
235-
if err := waitTar(); err != nil {
236-
return pathStats{}, err
237-
}
238-
setUploadFingerprint(cacheKey, podInstance, fingerprint)
239-
return pathStats{UploadBytes: countingStream.BytesRead()}, nil
240+
if err := waitTar(); err != nil {
241+
return pathStats{}, err
242+
}
243+
setUploadFingerprint(cacheKey, podInstance, fingerprint)
244+
return pathStats{UploadBytes: countingStream.BytesRead()}, nil
245+
})
240246
}
241247

242248
func syncDownPath(ctx context.Context, k Client, namespace, pod string, p Pair, excludes []string) (pathStats, error) {
@@ -436,6 +442,14 @@ func uploadFingerprintCacheLen() int {
436442
return len(uploadFingerprintCache.m)
437443
}
438444

445+
func withUploadKeyLock(key string, fn func() (pathStats, error)) (pathStats, error) {
446+
v, _ := uploadPathLocks.LoadOrStore(key, &sync.Mutex{})
447+
mu := v.(*sync.Mutex)
448+
mu.Lock()
449+
defer mu.Unlock()
450+
return fn()
451+
}
452+
439453
func localDirFingerprint(root string, excludes []string) (string, error) {
440454
var files int64
441455
var bytes int64

internal/sync/native_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"os"
88
"path/filepath"
9+
"sync"
910
"testing"
1011
"time"
1112
)
@@ -157,3 +158,64 @@ func TestRunOnceWithOptionsForceBypassesFingerprint(t *testing.T) {
157158
t.Fatalf("expected force sync to bypass cache, skipped=%d", stats.SkippedPaths)
158159
}
159160
}
161+
162+
type countingCopyClient struct {
163+
mu sync.Mutex
164+
calls int
165+
}
166+
167+
func (c *countingCopyClient) CopyToPod(context.Context, string, string, string, string) error {
168+
c.mu.Lock()
169+
c.calls++
170+
c.mu.Unlock()
171+
time.Sleep(50 * time.Millisecond)
172+
return nil
173+
}
174+
func (c *countingCopyClient) CopyFromPod(context.Context, string, string, string, string) error {
175+
return nil
176+
}
177+
func (c *countingCopyClient) ExtractTarToPod(context.Context, string, string, string, io.Reader) error {
178+
return nil
179+
}
180+
func (c *countingCopyClient) StreamFromPod(context.Context, string, string, string, io.Writer) error {
181+
return nil
182+
}
183+
func (c *countingCopyClient) ExecSh(context.Context, string, string, string) ([]byte, error) {
184+
return []byte("pod-instance"), nil
185+
}
186+
187+
func TestRunOnceWithReportConcurrentUploadsShareFingerprintLock(t *testing.T) {
188+
ResetUploadFingerprintCache()
189+
tmp := t.TempDir()
190+
local := filepath.Join(tmp, "file.txt")
191+
if err := os.WriteFile(local, []byte("hello"), 0o644); err != nil {
192+
t.Fatal(err)
193+
}
194+
195+
client := &countingCopyClient{}
196+
pairs := []Pair{{Local: local, Remote: "/workspace/file.txt"}}
197+
198+
var wg sync.WaitGroup
199+
errCh := make(chan error, 2)
200+
run := func() {
201+
defer wg.Done()
202+
_, err := RunOnceWithReport(context.Background(), "up", client, "ns", "pod", pairs, nil)
203+
errCh <- err
204+
}
205+
wg.Add(2)
206+
go run()
207+
go run()
208+
wg.Wait()
209+
close(errCh)
210+
for err := range errCh {
211+
if err != nil {
212+
t.Fatalf("unexpected sync error: %v", err)
213+
}
214+
}
215+
216+
client.mu.Lock()
217+
defer client.mu.Unlock()
218+
if client.calls != 1 {
219+
t.Fatalf("expected one upload call with lock-protected fingerprint cache, got %d", client.calls)
220+
}
221+
}

0 commit comments

Comments
 (0)