Skip to content

Commit c793edc

Browse files
committed
Add progress events to copy operations
1 parent cc7fab4 commit c793edc

1 file changed

Lines changed: 137 additions & 31 deletions

File tree

packages/hub/src/lib/copy-files.ts

Lines changed: 137 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { checkCredentials } from "../utils/checkCredentials";
33
import { formatBytes } from "../utils/formatBytes";
44
import { promisesQueue } from "../utils/promisesQueue";
55
import { toRepoId } from "../utils/toRepoId";
6+
import { eventToGenerator } from "../utils/eventToGenerator";
67
import type { CommitOperation, CommitParams } from "./commit";
78
import { commit } from "./commit";
89
import { downloadFile } from "./download-file";
@@ -11,6 +12,23 @@ import { listFiles } from "./list-files";
1112
import type { PathInfo } from "./paths-info";
1213
import { pathsInfo } from "./paths-info";
1314

15+
/**
16+
* Progress events yielded by {@link copyFileIter} / {@link copyFilesIter} / {@link copyFolderIter}.
17+
*
18+
* Currently only `fileDownloaded` is emitted: one event per source file that had to be downloaded
19+
* (small git-stored files that can't be copied server-side). Xet-backed files are copied
20+
* server-side and do not produce events.
21+
*/
22+
export interface CopyProgressEvent {
23+
event: "fileDownloaded";
24+
/** Source path of the file that was just downloaded. */
25+
path: string;
26+
/** Number of files downloaded so far (including this one). */
27+
downloaded: number;
28+
/** Total number of files that will be downloaded. */
29+
total: number;
30+
}
31+
1432
const DOWNLOAD_CONCURRENCY = 5;
1533
const PATHS_INFO_BATCH_SIZE = 100;
1634
const MAX_REPORTED_LFS_PATHS = 5;
@@ -112,6 +130,39 @@ export function copyFile(
112130
});
113131
}
114132

133+
/**
134+
* Async-iterator variant of {@link copyFile} that yields {@link CopyProgressEvent}s while
135+
* downloading non-xet source files (xet-backed files are copied server-side and do not
136+
* emit events). See {@link copyFile} for the semantics.
137+
*
138+
* @example
139+
* ```ts
140+
* for await (const event of copyFileIter({ source, destination, accessToken })) {
141+
* console.log(`downloaded ${event.path} (${event.downloaded}/${event.total})`);
142+
* }
143+
* ```
144+
*/
145+
export function copyFileIter(
146+
params: {
147+
source: CopySource;
148+
destination: CopyDestination;
149+
} & SharedParams,
150+
): AsyncGenerator<CopyProgressEvent, undefined> {
151+
return copyFilesIter({
152+
...(params.accessToken ? { accessToken: params.accessToken } : { credentials: params.credentials }),
153+
destination: params.destination.repo,
154+
files: [
155+
{
156+
source: params.source,
157+
destinationPath: params.destination.path,
158+
},
159+
],
160+
hubUrl: params.hubUrl,
161+
fetch: params.fetch,
162+
abortSignal: params.abortSignal,
163+
});
164+
}
165+
115166
/**
116167
* Copy multiple files (potentially from different source repos/buckets) to the destination
117168
* bucket in a single commit.
@@ -152,11 +203,31 @@ export async function copyFiles(
152203
files: CopyFilesEntry[];
153204
} & SharedParams,
154205
): Promise<undefined> {
206+
const iterator = copyFilesIter(params);
207+
while (true) {
208+
const res = await iterator.next();
209+
if (res.done) {
210+
return undefined;
211+
}
212+
}
213+
}
214+
215+
/**
216+
* Async-iterator variant of {@link copyFiles} that yields {@link CopyProgressEvent}s while
217+
* downloading non-xet source files (xet-backed files are copied server-side and do not
218+
* emit events). See {@link copyFiles} for the semantics.
219+
*/
220+
export async function* copyFilesIter(
221+
params: {
222+
destination: BucketDesignation;
223+
files: CopyFilesEntry[];
224+
} & SharedParams,
225+
): AsyncGenerator<CopyProgressEvent, undefined> {
155226
if (params.files.length === 0) {
156227
return undefined;
157228
}
158229

159-
const operations = await resolveCopyOperations(params, params.files);
230+
const operations = yield* resolveCopyOperationsIter(params, params.files);
160231

161232
await commit({
162233
...(params.accessToken ? { accessToken: params.accessToken } : { credentials: params.credentials }),
@@ -210,6 +281,26 @@ export async function copyFolder(
210281
destination: Omit<CopyDestination, "path"> & { path?: string };
211282
} & SharedParams,
212283
): Promise<undefined> {
284+
const iterator = copyFolderIter(params);
285+
while (true) {
286+
const res = await iterator.next();
287+
if (res.done) {
288+
return undefined;
289+
}
290+
}
291+
}
292+
293+
/**
294+
* Async-iterator variant of {@link copyFolder} that yields {@link CopyProgressEvent}s while
295+
* downloading non-xet source files (xet-backed files are copied server-side and do not
296+
* emit events). See {@link copyFolder} for the semantics.
297+
*/
298+
export async function* copyFolderIter(
299+
params: {
300+
source: Omit<CopySource, "path"> & { path?: string };
301+
destination: Omit<CopyDestination, "path"> & { path?: string };
302+
} & SharedParams,
303+
): AsyncGenerator<CopyProgressEvent, undefined> {
213304
const accessToken = checkCredentials(params);
214305
const sourceRepoId = toRepoId(params.source.repo);
215306
const sourcePath = (params.source.path ?? "").replace(/\/+$/, "");
@@ -273,7 +364,7 @@ export async function copyFolder(
273364
return undefined;
274365
}
275366

276-
await downloadAndFillBlobs({
367+
yield* downloadAndFillBlobsIter({
277368
pendingDownloads,
278369
operations,
279370
accessToken,
@@ -296,8 +387,12 @@ export async function copyFolder(
296387
/**
297388
* Resolve a list of {@link CopyFilesEntry} entries into `CommitOperation`s, batching
298389
* `pathsInfo` calls per source repo and parallelizing downloads for non-xet files.
390+
* Yields one {@link CopyProgressEvent} per downloaded file.
299391
*/
300-
async function resolveCopyOperations(shared: SharedParams, files: CopyFilesEntry[]): Promise<CommitOperation[]> {
392+
async function* resolveCopyOperationsIter(
393+
shared: SharedParams,
394+
files: CopyFilesEntry[],
395+
): AsyncGenerator<CopyProgressEvent, CommitOperation[]> {
301396
const accessToken = checkCredentials(shared);
302397

303398
// Group files by (source repo, source revision) so we can batch pathsInfo calls.
@@ -391,7 +486,7 @@ async function resolveCopyOperations(shared: SharedParams, files: CopyFilesEntry
391486
}
392487
}
393488

394-
await downloadAndFillBlobs({
489+
yield* downloadAndFillBlobsIter({
395490
pendingDownloads,
396491
operations,
397492
accessToken,
@@ -411,39 +506,50 @@ interface PendingDownload {
411506

412507
/**
413508
* Download all `pendingDownloads` in parallel and fill the matching `addOrUpdate`
414-
* placeholder ops in `operations` with the downloaded blob. No-op if the list is empty.
509+
* placeholder ops in `operations` with the downloaded blob. Yields one
510+
* {@link CopyProgressEvent} per file as it completes. No-op if the list is empty.
415511
*/
416-
async function downloadAndFillBlobs(args: {
512+
function downloadAndFillBlobsIter(args: {
417513
pendingDownloads: PendingDownload[];
418514
operations: CommitOperation[];
419515
accessToken: string | undefined;
420516
hubUrl: string | undefined;
421517
fetch: typeof fetch | undefined;
422-
}): Promise<void> {
423-
if (args.pendingDownloads.length === 0) {
424-
return;
425-
}
426-
await promisesQueue(
427-
args.pendingDownloads.map(({ index, repoId, revision, sourcePath }) => async () => {
428-
const blob = await downloadFile({
429-
repo: repoId,
430-
path: sourcePath,
431-
revision,
432-
accessToken: args.accessToken,
433-
hubUrl: args.hubUrl,
434-
fetch: args.fetch,
435-
});
436-
if (!blob) {
437-
throw new Error(`Failed to download '${sourcePath}' from ${repoId.type}s/${repoId.name}`);
438-
}
439-
const op = args.operations[index];
440-
if (op.operation !== "addOrUpdate") {
441-
throw new Error("Internal: expected addOrUpdate placeholder operation");
442-
}
443-
op.content = blob;
444-
}),
445-
DOWNLOAD_CONCURRENCY,
446-
);
518+
}): AsyncGenerator<CopyProgressEvent, void> {
519+
const total = args.pendingDownloads.length;
520+
return eventToGenerator<CopyProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => {
521+
if (total === 0) {
522+
returnCallback();
523+
return;
524+
}
525+
let downloaded = 0;
526+
promisesQueue(
527+
args.pendingDownloads.map(({ index, repoId, revision, sourcePath }) => async () => {
528+
const blob = await downloadFile({
529+
repo: repoId,
530+
path: sourcePath,
531+
revision,
532+
accessToken: args.accessToken,
533+
hubUrl: args.hubUrl,
534+
fetch: args.fetch,
535+
});
536+
if (!blob) {
537+
throw new Error(`Failed to download '${sourcePath}' from ${repoId.type}s/${repoId.name}`);
538+
}
539+
const op = args.operations[index];
540+
if (op.operation !== "addOrUpdate") {
541+
throw new Error("Internal: expected addOrUpdate placeholder operation");
542+
}
543+
op.content = blob;
544+
downloaded++;
545+
yieldCallback({ event: "fileDownloaded", path: sourcePath, downloaded, total });
546+
}),
547+
DOWNLOAD_CONCURRENCY,
548+
).then(
549+
() => returnCallback(),
550+
(err) => rejectCallback(err),
551+
);
552+
});
447553
}
448554

449555
/**

0 commit comments

Comments
 (0)