Skip to content

Commit ce97bb4

Browse files
committed
feat: skip completed dataset uploads
1 parent 1c580e5 commit ce97bb4

9 files changed

Lines changed: 422 additions & 11 deletions

File tree

file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ object DatasetResource {
202202
sizeBytes: Option[Long] // Size of the changed file (None for directories)
203203
)
204204

205+
case class ExistingUploadFile(path: String, sizeBytes: Long)
206+
207+
case class ExistingUploadFilesRequest(files: List[ExistingUploadFile])
208+
205209
case class DatasetDescriptionModification(did: Integer, description: String)
206210

207211
case class DatasetNameModification(did: Integer, name: String)
@@ -1030,6 +1034,55 @@ class DatasetResource extends LazyLogging {
10301034
}
10311035
}
10321036

1037+
@POST
1038+
@RolesAllowed(Array("REGULAR", "ADMIN"))
1039+
@Path("/{did}/existing-upload-files")
1040+
@Consumes(Array(MediaType.APPLICATION_JSON))
1041+
def findExistingUploadFiles(
1042+
@PathParam("did") did: Integer,
1043+
request: ExistingUploadFilesRequest,
1044+
@Auth user: SessionUser
1045+
): Response = {
1046+
val uid = user.getUid
1047+
withTransaction(context) { ctx =>
1048+
if (!userHasWriteAccess(ctx, did, uid)) {
1049+
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
1050+
}
1051+
1052+
val requested = Option(request)
1053+
.map(_.files)
1054+
.getOrElse(List.empty)
1055+
.map { file =>
1056+
val path = validateAndNormalizeFilePathOrThrow(file.path)
1057+
if (file.sizeBytes < 0L) throw new BadRequestException("sizeBytes must be >= 0")
1058+
path -> file.sizeBytes
1059+
}
1060+
.toMap
1061+
1062+
val dataset = getDatasetByID(ctx, did)
1063+
val committed = getLatestDatasetVersion(ctx, did)
1064+
.map(v =>
1065+
LakeFSStorageClient
1066+
.retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash)
1067+
.map(obj => obj.getPath -> obj.getSizeBytes.longValue())
1068+
)
1069+
.getOrElse(List.empty)
1070+
1071+
val staged = LakeFSStorageClient
1072+
.retrieveUncommittedObjects(dataset.getRepositoryName)
1073+
.filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed")))
1074+
.flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue()))
1075+
1076+
val existing = (committed ++ staged).toMap
1077+
val matches = requested
1078+
.collect { case (path, size) if existing.get(path).contains(size) => path }
1079+
.toList
1080+
.sorted
1081+
1082+
Response.ok(Map("filePaths" -> matches.asJava)).build()
1083+
}
1084+
}
1085+
10331086
@PUT
10341087
@RolesAllowed(Array("REGULAR", "ADMIN"))
10351088
@Path("/{did}/diff")

file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,55 @@ class DatasetResourceSpec
404404
dashboardDataset.size should be >= 0L
405405
}
406406

407+
"findExistingUploadFiles" should "match committed and staged files by path and size" in {
408+
val repoName = s"existing-upload-${System.nanoTime()}"
409+
val dataset = new Dataset
410+
dataset.setName(repoName)
411+
dataset.setRepositoryName(repoName)
412+
dataset.setDescription("existing upload checks")
413+
dataset.setOwnerUid(ownerUser.getUid)
414+
dataset.setIsPublic(true)
415+
dataset.setIsDownloadable(true)
416+
datasetDao.insert(dataset)
417+
LakeFSStorageClient.initRepo(repoName)
418+
419+
val committed = "committed".getBytes(StandardCharsets.UTF_8)
420+
LakeFSStorageClient.writeFileToRepo(
421+
repoName,
422+
"committed.csv",
423+
new ByteArrayInputStream(committed)
424+
)
425+
val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit existing file")
426+
val version = new DatasetVersion()
427+
version.setDid(dataset.getDid)
428+
version.setCreatorUid(ownerUser.getUid)
429+
version.setName("v1")
430+
version.setVersionHash(commit.getId)
431+
new DatasetVersionDao(getDSLContext.configuration()).insert(version)
432+
433+
val staged = "staged".getBytes(StandardCharsets.UTF_8)
434+
LakeFSStorageClient.writeFileToRepo(repoName, "staged.csv", new ByteArrayInputStream(staged))
435+
436+
val resp = datasetResource.findExistingUploadFiles(
437+
dataset.getDid,
438+
DatasetResource.ExistingUploadFilesRequest(
439+
List(
440+
DatasetResource.ExistingUploadFile("committed.csv", committed.length),
441+
DatasetResource.ExistingUploadFile("staged.csv", staged.length),
442+
DatasetResource.ExistingUploadFile("wrong-size.csv", staged.length + 1),
443+
DatasetResource.ExistingUploadFile("missing.csv", 1L)
444+
)
445+
),
446+
sessionUser
447+
)
448+
449+
resp.getStatus shouldEqual 200
450+
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) should contain theSameElementsAs List(
451+
"committed.csv",
452+
"staged.csv"
453+
)
454+
}
455+
407456
it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in {
408457
val dataset = new Dataset
409458
dataset.setName("get-ds-no-repo")

frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<div><b>Path:</b> {{ data.path }}</div>
3030
<div><b>Size:</b> {{ data.size }}</div>
3131

32-
<div class="hint">An upload session already exists for this path.</div>
32+
<div class="hint">{{ data.hint || "An upload session already exists for this path." }}</div>
3333
</div>
3434
</body>
3535
</html>

frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface ConflictingFileModalData {
2424
fileName: string;
2525
path: string;
2626
size: string;
27+
hint?: string;
2728
}
2829

2930
@Component({
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import { of } from "rxjs";
21+
import { NgxFileDropEntry } from "ngx-file-drop";
22+
import { NzModalService } from "ng-zorro-antd/modal";
23+
import { AdminSettingsService } from "../../../service/admin/settings/admin-settings.service";
24+
import { DatasetService } from "../../../service/user/dataset/dataset.service";
25+
import { NotificationService } from "../../../../common/service/notification/notification.service";
26+
import { FileUploadItem } from "../../../type/dashboard-file.interface";
27+
import { FilesUploaderComponent } from "./files-uploader.component";
28+
29+
interface CapturedModal {
30+
nzTitle: string;
31+
nzData: {
32+
path: string;
33+
hint?: string;
34+
};
35+
nzFooter: Array<{
36+
label: string;
37+
onClick: () => void;
38+
}>;
39+
}
40+
41+
const waitUntil = async (condition: () => boolean): Promise<void> => {
42+
for (let i = 0; i < 20; i++) {
43+
if (condition()) return;
44+
await new Promise(resolve => setTimeout(resolve, 0));
45+
}
46+
throw new Error("condition was not met");
47+
};
48+
49+
const droppedFile = (relativePath: string, file: File): NgxFileDropEntry =>
50+
({
51+
relativePath,
52+
fileEntry: {
53+
isFile: true,
54+
file: (success: (file: File) => void): void => success(file),
55+
},
56+
}) as unknown as NgxFileDropEntry;
57+
58+
describe("FilesUploaderComponent", () => {
59+
let component: FilesUploaderComponent;
60+
let modals: CapturedModal[];
61+
62+
beforeEach(() => {
63+
modals = [];
64+
const modal = {
65+
create: vi.fn(config => {
66+
modals.push(config as CapturedModal);
67+
return { destroy: vi.fn() };
68+
}),
69+
} as unknown as NzModalService;
70+
const adminSettingsService = {
71+
getSetting: vi.fn().mockReturnValue(of("20")),
72+
} as unknown as AdminSettingsService;
73+
const datasetService = {
74+
listMultipartUploads: vi.fn().mockReturnValue(of(["failed.csv"])),
75+
findExistingUploadFiles: vi.fn().mockReturnValue(of(["done.csv"])),
76+
} as unknown as DatasetService;
77+
78+
component = new FilesUploaderComponent(
79+
{ error: vi.fn() } as unknown as NotificationService,
80+
adminSettingsService,
81+
datasetService,
82+
modal
83+
);
84+
component.ownerEmail = "owner@example.com";
85+
component.datasetName = "dataset";
86+
component.did = 7;
87+
});
88+
89+
it("asks to resume failed multipart files and skip completed matching files in one retry batch", async () => {
90+
const emitted = new Promise<FileUploadItem[]>(resolve => component.uploadedFiles.subscribe(resolve));
91+
92+
component.fileDropped([
93+
droppedFile("failed.csv", new File(["half"], "failed.csv")),
94+
droppedFile("done.csv", new File(["done"], "done.csv")),
95+
]);
96+
97+
await waitUntil(() => modals.length === 1);
98+
expect(modals[0].nzTitle).toBe("Conflicting File");
99+
expect(modals[0].nzData.path).toBe("failed.csv");
100+
modals[0].nzFooter.find(button => button.label === "Resume")?.onClick();
101+
102+
await waitUntil(() => modals.length === 2);
103+
expect(modals[1].nzTitle).toBe("Matching File Found");
104+
expect(modals[1].nzData.path).toBe("done.csv");
105+
expect(modals[1].nzData.hint).toContain("same path and size");
106+
modals[1].nzFooter.find(button => button.label === "Skip")?.onClick();
107+
108+
expect((await emitted).map(item => item.name)).toEqual(["failed.csv"]);
109+
});
110+
});

frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export class FilesUploaderComponent {
6565
*/
6666
@Input() ownerEmail: string = "";
6767
@Input() datasetName: string = "";
68+
@Input() did: number | undefined;
6869

6970
@Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
7071

@@ -149,6 +150,41 @@ export class FilesUploaderComponent {
149150
});
150151
}
151152

153+
private askUploadOrSkip(
154+
item: FileUploadItem,
155+
showForAll: boolean
156+
): Promise<"upload" | "uploadAll" | "skip" | "skipAll"> {
157+
return new Promise(resolve => {
158+
const fileName = item.name.split("/").pop() || item.name;
159+
let ref: NzModalRef;
160+
const button = (label: string, choice: "upload" | "uploadAll" | "skip" | "skipAll", type?: "primary") => ({
161+
label,
162+
type,
163+
onClick: () => {
164+
resolve(choice);
165+
ref.destroy();
166+
},
167+
});
168+
ref = this.modal.create<ConflictingFileModalContentComponent, ConflictingFileModalData>({
169+
nzTitle: "Matching File Found",
170+
nzMaskClosable: false,
171+
nzClosable: false,
172+
nzContent: ConflictingFileModalContentComponent,
173+
nzData: {
174+
fileName,
175+
path: item.name,
176+
size: formatSize(item.file.size),
177+
hint: "A file with the same path and size exists in this dataset. Skip only if you expect it is the same file.",
178+
},
179+
nzFooter: [
180+
...(showForAll ? [button("Upload For All", "uploadAll"), button("Skip For All", "skipAll")] : []),
181+
button("Upload", "upload"),
182+
button("Skip", "skip", "primary"),
183+
],
184+
});
185+
});
186+
}
187+
152188
private async resolveConflicts(items: FileUploadItem[], activePaths: string[]): Promise<FileUploadItem[]> {
153189
const active = new Set(activePaths ?? []);
154190
const isConflict = (p: string) => active.has(p) || active.has(encodeURIComponent(p));
@@ -201,6 +237,27 @@ export class FilesUploaderComponent {
201237
return out;
202238
}
203239

240+
private async resolveExistingFiles(items: FileUploadItem[], existingPaths: string[]): Promise<FileUploadItem[]> {
241+
const existing = new Set(existingPaths ?? []);
242+
const showForAll = items.length > 1;
243+
let mode: "ask" | "uploadAll" | "skipAll" = "ask";
244+
const out: FileUploadItem[] = [];
245+
246+
for (const item of items) {
247+
if (!existing.has(item.name)) {
248+
out.push(item);
249+
} else if (mode === "uploadAll") {
250+
out.push(item);
251+
} else if (mode === "ask") {
252+
const choice = await this.askUploadOrSkip(item, showForAll);
253+
if (choice === "upload" || choice === "uploadAll") out.push(item);
254+
if (choice === "uploadAll" || choice === "skipAll") mode = choice;
255+
}
256+
}
257+
258+
return out;
259+
}
260+
204261
hideBanner(): void {
205262
this.fileUploadingFinished = false;
206263
}
@@ -254,20 +311,39 @@ export class FilesUploaderComponent {
254311
.then(async results => {
255312
const { ownerEmail, datasetName } = this.getOwnerAndName();
256313

257-
const activePathsPromise =
258-
ownerEmail && datasetName
259-
? firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail, datasetName)).catch(() => [])
260-
: [];
261-
262-
const activePaths = await activePathsPromise;
263314
const successfulUploads = results
264315
.filter((r): r is PromiseFulfilledResult<FileUploadItem | null> => r.status === "fulfilled")
265316
.map(r_1 => r_1.value)
266317
.filter((item): item is FileUploadItem => item !== null);
267-
const filteredUploads = await this.resolveConflicts(successfulUploads, activePaths);
268-
if (filteredUploads.length > 0) {
269-
const msg = `${filteredUploads.length} file${filteredUploads.length > 1 ? "s" : ""} selected successfully!`;
270-
this.showFileUploadBanner("success", msg);
318+
319+
const activePathsPromise: Promise<string[]> =
320+
ownerEmail && datasetName
321+
? firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail, datasetName)).catch(() => [])
322+
: Promise.resolve([]);
323+
const existingPathsPromise: Promise<string[]> = this.did
324+
? firstValueFrom(
325+
this.datasetService.findExistingUploadFiles(
326+
this.did,
327+
successfulUploads.map(item => ({ path: item.name, sizeBytes: item.file.size }))
328+
)
329+
).catch(() => [])
330+
: Promise.resolve([]);
331+
332+
const [activePaths, existingPaths] = await Promise.all([activePathsPromise, existingPathsPromise]);
333+
const resumableUploads = await this.resolveConflicts(successfulUploads, activePaths);
334+
const filteredUploads = await this.resolveExistingFiles(resumableUploads, existingPaths);
335+
const skippedCount = resumableUploads.length - filteredUploads.length;
336+
if (filteredUploads.length > 0 || skippedCount > 0) {
337+
const messages = [];
338+
if (filteredUploads.length > 0) {
339+
messages.push(
340+
`${filteredUploads.length} file${filteredUploads.length > 1 ? "s" : ""} selected successfully!`
341+
);
342+
}
343+
if (skippedCount > 0) {
344+
messages.push(`${skippedCount} matching file${skippedCount > 1 ? "s were" : " was"} skipped.`);
345+
}
346+
this.showFileUploadBanner(skippedCount > 0 ? "info" : "success", messages.join(" "));
271347
}
272348
const failedCount = results.length - successfulUploads.length;
273349
if (failedCount > 0) {

frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ <h6 style="font-weight: lighter; font-size: 0.9em">Choose a Version:</h6>
338338
nzActive="true"
339339
nzHeader="Create New Version">
340340
<texera-user-files-uploader
341+
[did]="did"
341342
[ownerEmail]="ownerEmail"
342343
[datasetName]="datasetName"
343344
(uploadedFiles)="onNewUploadFilesChanged($event)">

0 commit comments

Comments
 (0)