Skip to content

Commit 6c36add

Browse files
authored
feat: add cp transfer progress reporting (#91)
* Add cp transfer progress reporting * Fix cp download path and reconcile wait status * Show pending workloads in status before pods exist
1 parent 86ec7ce commit 6c36add

14 files changed

Lines changed: 1105 additions & 30 deletions

docs/command-reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
- **Multi-pod upload** (`--all`, `--pod`, `--role`, `--label`): fans out the same local source to all matched pods in parallel.
111111
- **Multi-pod download**: downloads from each matched pod into `<dest>/<short-pod-name>/` subdirectories.
112112
- Files are streamed via `cat` pipes. Directories are tar-streamed automatically.
113+
- On a TTY, an in-place progress line shows transferred bytes, transfer rate, and elapsed time (after a few seconds). For multi-pod copies the line aggregates across pods (e.g. `Copying to 8 pods · 3/8 done · 5 in flight · 1.2 GiB · 28.0 MiB/s · 00:42`) and surfaces a noticeably slow pod inline. Progress is suppressed on non-TTY writers (pipes, redirects, CI), so machine-readable output is unaffected.
113114
- `--all`: target all running pods in the session.
114115
- `--pod`: target specific pods by name (repeatable or comma-separated).
115116
- `--role`: target pods by `okdev.io/workload-role` label (case-insensitive).

internal/cli/common.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,22 @@ func (s *transientStatus) clear() {
280280
fmt.Fprint(s.w, "\r\033[K")
281281
}
282282

283+
// printAbove writes a persistent line above the in-place status, clearing the
284+
// current spinner line first so the printed output is not garbled by the
285+
// concurrent spinner re-render. The spinner picks up where it left off on the
286+
// next render tick.
287+
func (s *transientStatus) printAbove(line string) {
288+
if !s.enabled {
289+
return
290+
}
291+
s.mu.Lock()
292+
defer s.mu.Unlock()
293+
if !strings.HasSuffix(line, "\n") {
294+
line += "\n"
295+
}
296+
fmt.Fprintf(s.w, "\r\033[K%s", line)
297+
}
298+
283299
func (s *transientStatus) stop() {
284300
if !s.enabled {
285301
return

internal/cli/cp.go

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,16 @@ func newCpCmd(opts *Options) *cobra.Command {
110110
if targetContainer == "" {
111111
targetContainer = target.Container
112112
}
113-
return runSinglePodCp(cmd.Context(), cc.kube, cc.namespace, target.PodName, targetContainer, localPath, remotePath, upload, cmd.OutOrStdout())
113+
out := cmd.OutOrStdout()
114+
prog := newSinglePodProgress(out, singlePodCpMessage(localPath, remotePath, upload))
115+
prog.start()
116+
err = runSinglePodCpWithProgress(cmd.Context(), cc.kube, cc.namespace, target.PodName, targetContainer, localPath, remotePath, upload, out, prog)
117+
prog.stop()
118+
if err != nil {
119+
return err
120+
}
121+
fmt.Fprintln(out, singlePodCpDoneLine(localPath, remotePath, upload))
122+
return nil
114123
},
115124
}
116125

@@ -152,18 +161,29 @@ func validateCpFlags(allPods bool, podNames []string, role string, labels []stri
152161
}
153162

154163
func runSinglePodCp(ctx context.Context, client *kube.Client, namespace, pod, container, localPath, remotePath string, upload bool, out io.Writer) error {
164+
return runSinglePodCpWithProgress(ctx, client, namespace, pod, container, localPath, remotePath, upload, out, nil)
165+
}
166+
167+
// runSinglePodCpWithProgress performs a single-pod copy and, if prog is
168+
// non-nil, threads it through to the underlying kube methods so byte and
169+
// file counters are updated as data flows.
170+
func runSinglePodCpWithProgress(ctx context.Context, client *kube.Client, namespace, pod, container, localPath, remotePath string, upload bool, out io.Writer, prog *cpProgress) error {
171+
kp := kube.CopyProgress{}
172+
if prog != nil {
173+
kp.OnBytes = prog.addBytes
174+
kp.OnFile = prog.addFile
175+
}
155176
if upload {
156177
info, err := os.Stat(localPath)
157178
if err != nil {
158179
return err
159180
}
160181
if info.IsDir() {
161-
return client.CopyDirToPod(ctx, namespace, pod, container, localPath, remotePath)
182+
return client.CopyDirToPodWithProgress(ctx, namespace, pod, container, localPath, remotePath, kp)
162183
}
163-
return client.CopyToPodInContainer(ctx, namespace, localPath, pod, container, remotePath)
184+
return client.CopyToPodInContainerWithProgress(ctx, namespace, localPath, pod, container, remotePath, kp)
164185
}
165186

166-
// Download: probe remote to decide file vs dir.
167187
isDir, err := client.IsRemoteDir(ctx, namespace, pod, container, remotePath)
168188
if err != nil {
169189
return err
@@ -172,9 +192,13 @@ func runSinglePodCp(ctx context.Context, client *kube.Client, namespace, pod, co
172192
if err := os.MkdirAll(localPath, 0o755); err != nil {
173193
return err
174194
}
175-
return client.CopyDirFromPod(ctx, namespace, pod, container, remotePath, localPath)
195+
return client.CopyDirFromPodWithProgress(ctx, namespace, pod, container, remotePath, localPath, kp)
176196
}
177-
return client.CopyFromPodInContainer(ctx, namespace, pod, container, remotePath, localPath)
197+
targetPath, err := resolveDownloadTargetPath(localPath, "", remotePath, false, 1)
198+
if err != nil {
199+
return err
200+
}
201+
return client.CopyFromPodInContainerWithProgress(ctx, namespace, pod, container, remotePath, targetPath, kp)
178202
}
179203

180204
func multiPodDownloadPath(localPath, shortName, remotePath string, remoteIsDir bool) string {
@@ -192,6 +216,25 @@ func downloadTargetPath(localPath, shortName, remotePath string, remoteIsDir boo
192216
return multiPodDownloadPath(localPath, shortName, remotePath, remoteIsDir)
193217
}
194218

219+
func resolveDownloadTargetPath(localPath, shortName, remotePath string, remoteIsDir bool, podCount int) (string, error) {
220+
targetPath := downloadTargetPath(localPath, shortName, remotePath, remoteIsDir, podCount)
221+
if remoteIsDir || podCount != 1 {
222+
return targetPath, nil
223+
}
224+
225+
info, err := os.Stat(targetPath)
226+
switch {
227+
case err == nil && info.IsDir():
228+
return filepath.Join(targetPath, filepath.Base(remotePath)), nil
229+
case err == nil:
230+
return targetPath, nil
231+
case os.IsNotExist(err):
232+
return targetPath, nil
233+
default:
234+
return "", err
235+
}
236+
}
237+
195238
func downloadSuccessDestination(localPath, shortName string, podCount int) string {
196239
if podCount == 1 {
197240
return localPath
@@ -204,6 +247,25 @@ type cpResult struct {
204247
err error
205248
}
206249

250+
// singlePodCpMessage returns the prefix used in the transient progress line
251+
// for a single-pod copy. Uses an arrow to make direction obvious.
252+
func singlePodCpMessage(localPath, remotePath string, upload bool) string {
253+
if upload {
254+
return fmt.Sprintf("Copying %s → :%s", localPath, remotePath)
255+
}
256+
return fmt.Sprintf("Copying :%s → %s", remotePath, localPath)
257+
}
258+
259+
// singlePodCpDoneLine returns the persistent line printed after a successful
260+
// single-pod copy. Format intentionally mirrors the per-pod success line
261+
// printed by runMultiPodCp so users see consistent output across modes.
262+
func singlePodCpDoneLine(localPath, remotePath string, upload bool) string {
263+
if upload {
264+
return fmt.Sprintf("Copied %s -> :%s", localPath, remotePath)
265+
}
266+
return fmt.Sprintf("Copied :%s -> %s", remotePath, localPath)
267+
}
268+
207269
func runMultiPodCp(cmd *cobra.Command, cc *commandContext, localPath, remotePath string, upload bool, allPods bool, podNames []string, role string, labels []string, exclude []string, container string, fanout int, readyOnly bool) error {
208270
ctx := cmd.Context()
209271
labelSel := selectorForSessionRun(cc.sessionName)
@@ -274,11 +336,18 @@ func runMultiPodCp(cmd *cobra.Command, cc *commandContext, localPath, remotePath
274336
nameMap[n] = shortNames[i]
275337
}
276338

277-
out := cmd.OutOrStdout()
339+
rawOut := cmd.OutOrStdout()
340+
// Wrap output so the spinner re-render and per-pod completion lines
341+
// (written from goroutines draining `results`) cannot interleave.
342+
out := io.Writer(&syncWriter{w: rawOut})
278343
results := make(chan cpResult, len(pods))
279344
sem := make(chan struct{}, effectiveFanout)
280345
podCount := len(pods)
281346

347+
prog := newMultiPodProgress(out, podCount)
348+
prog.start()
349+
defer prog.stop()
350+
282351
var wg sync.WaitGroup
283352
for _, pod := range pods {
284353
wg.Add(1)
@@ -288,17 +357,24 @@ func runMultiPodCp(cmd *cobra.Command, cc *commandContext, localPath, remotePath
288357
defer func() { <-sem }()
289358

290359
short := nameMap[pod.Name]
360+
prog.startPod(short)
361+
defer prog.finishPod(short)
362+
291363
var cpErr error
292364
if upload {
293-
cpErr = runSinglePodCp(ctx, cc.kube, cc.namespace, pod.Name, targetContainer, localPath, remotePath, true, io.Discard)
365+
cpErr = runSinglePodCpWithProgress(ctx, cc.kube, cc.namespace, pod.Name, targetContainer, localPath, remotePath, true, io.Discard, prog)
294366
} else {
295367
isRemoteDir, err := cc.kube.IsRemoteDir(ctx, cc.namespace, pod.Name, targetContainer, remotePath)
296368
if err != nil {
297369
results <- cpResult{pod: pod.Name, err: err}
298370
return
299371
}
300-
podLocalPath := downloadTargetPath(localPath, short, remotePath, isRemoteDir, podCount)
301-
cpErr = runSinglePodCp(ctx, cc.kube, cc.namespace, pod.Name, targetContainer, podLocalPath, remotePath, false, io.Discard)
372+
podLocalPath, err := resolveDownloadTargetPath(localPath, short, remotePath, isRemoteDir, podCount)
373+
if err != nil {
374+
results <- cpResult{pod: pod.Name, err: err}
375+
return
376+
}
377+
cpErr = runSinglePodCpWithProgress(ctx, cc.kube, cc.namespace, pod.Name, targetContainer, podLocalPath, remotePath, false, io.Discard, prog)
302378
}
303379
results <- cpResult{pod: pod.Name, err: cpErr}
304380
}(pod)
@@ -313,13 +389,13 @@ func runMultiPodCp(cmd *cobra.Command, cc *commandContext, localPath, remotePath
313389
for r := range results {
314390
short := nameMap[r.pod]
315391
if r.err != nil {
316-
fmt.Fprintf(out, "%s: error: %v\n", short, r.err)
392+
prog.println(fmt.Sprintf("%s: error: %v", short, r.err))
317393
failures = append(failures, r)
318394
} else {
319395
if upload {
320-
fmt.Fprintf(out, "%s: copied %s -> :%s\n", short, localPath, remotePath)
396+
prog.println(fmt.Sprintf("%s: copied %s -> :%s", short, localPath, remotePath))
321397
} else {
322-
fmt.Fprintf(out, "%s: copied :%s -> %s\n", short, remotePath, downloadSuccessDestination(localPath, short, podCount))
398+
prog.println(fmt.Sprintf("%s: copied :%s -> %s", short, remotePath, downloadSuccessDestination(localPath, short, podCount)))
323399
}
324400
}
325401
}

0 commit comments

Comments
 (0)