Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ object DatasetResource {
sizeBytes: Option[Long] // Size of the changed file (None for directories)
)

case class ExistingUploadFile(path: String, sizeBytes: Long)

case class ExistingUploadFilesRequest(files: List[ExistingUploadFile])

case class DatasetDescriptionModification(did: Integer, description: String)

case class DatasetNameModification(did: Integer, name: String)
Expand Down Expand Up @@ -1030,6 +1034,65 @@ class DatasetResource extends LazyLogging {
}
}

@POST
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/{did}/existing-upload-files")
@Consumes(Array(MediaType.APPLICATION_JSON))
def findExistingUploadFiles(
@PathParam("did") did: Integer,
request: ExistingUploadFilesRequest,
@Auth user: SessionUser
): Response = {
val uid = user.getUid
withTransaction(context) { ctx =>
if (!userHasWriteAccess(ctx, did, uid)) {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}

val requested = Option(request)
.flatMap(request => Option(request.files))
.getOrElse(List.empty)
.map { file =>
val originalPath = file.path
val path = validateAndNormalizeFilePathOrThrow(originalPath)
if (file.sizeBytes < 0L) throw new BadRequestException("sizeBytes must be >= 0")
(path, originalPath, file.sizeBytes)
}

val dataset = getDatasetByID(ctx, did)
val committed = getLatestDatasetVersion(ctx, did)
.map { v =>
withLakeFSErrorHandling(
s"retrieving committed files of dataset '${dataset.getName}'"
) {
LakeFSStorageClient
.retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash)
.map(obj => obj.getPath -> obj.getSizeBytes.longValue())
}
}
.getOrElse(List.empty)

val staged = withLakeFSErrorHandling(
s"retrieving staged files of dataset '${dataset.getName}'"
) {
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
}
.filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed")))
.flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue()))

val existing = (committed ++ staged).toMap
val matches = requested
.collect {
case (path, originalPath, size) if existing.get(path).contains(size) => originalPath
}
.toList
.distinct
.sorted

Response.ok(Map("filePaths" -> matches.asJava)).build()
}
}

@PUT
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/{did}/diff")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,175 @@ class DatasetResourceSpec
dashboardDataset.size should be >= 0L
}

"findExistingUploadFiles" should "match committed and staged files by path and size" in {
val repoName = s"existing-upload-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload checks")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val committed = "committed".getBytes(StandardCharsets.UTF_8)
LakeFSStorageClient.writeFileToRepo(
repoName,
"committed.csv",
new ByteArrayInputStream(committed)
)
val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit existing file")
val version = new DatasetVersion()
version.setDid(dataset.getDid)
version.setCreatorUid(ownerUser.getUid)
version.setName("v1")
version.setVersionHash(commit.getId)
new DatasetVersionDao(getDSLContext.configuration()).insert(version)

val staged = "staged".getBytes(StandardCharsets.UTF_8)
LakeFSStorageClient.writeFileToRepo(repoName, "staged.csv", new ByteArrayInputStream(staged))

val resp = datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(
DatasetResource.ExistingUploadFile("committed.csv", committed.length),
DatasetResource.ExistingUploadFile("staged.csv", staged.length),
DatasetResource.ExistingUploadFile("wrong-size.csv", staged.length + 1),
DatasetResource.ExistingUploadFile("missing.csv", 1L)
)
),
sessionUser
)

resp.getStatus shouldEqual 200
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) should contain theSameElementsAs List(
"committed.csv",
"staged.csv"
)
}

it should "return the original request path when matching a normalized path" in {
val repoName = s"existing-upload-normalized-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload normalized path checks")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val committed = "committed".getBytes(StandardCharsets.UTF_8)
LakeFSStorageClient.writeFileToRepo(
repoName,
"committed.csv",
new ByteArrayInputStream(committed)
)
val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit normalized file")
val version = new DatasetVersion()
version.setDid(dataset.getDid)
version.setCreatorUid(ownerUser.getUid)
version.setName("v1")
version.setVersionHash(commit.getId)
new DatasetVersionDao(getDSLContext.configuration()).insert(version)

val requestPath = "folder/../committed.csv"
val resp = datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile(requestPath, committed.length))
),
sessionUser
)

resp.getStatus shouldEqual 200
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) shouldEqual List(requestPath)
}

it should "treat a missing files list as empty" in {
val repoName = s"existing-upload-empty-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload empty request check")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val resp = datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(null),
sessionUser
)

resp.getStatus shouldEqual 200
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) shouldBe empty
}

it should "reject negative file sizes" in {
val ex = intercept[BadRequestException] {
datasetResource.findExistingUploadFiles(
baseDataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("bad-size.csv", -1L))
),
sessionUser
)
}

ex.getMessage should include("sizeBytes")
}

it should "reject users without write access" in {
val ex = intercept[ForbiddenException] {
datasetResource.findExistingUploadFiles(
multipartDataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("private.csv", 1L))
),
multipartNoWriteSessionUser
)
}

assertStatus(ex, 403)
}

it should "surface a LakeFS 404 as NotFoundException when checking a missing repo" in {
val repoName = s"existing-upload-missing-repo-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload missing repo check")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)

val version = new DatasetVersion()
version.setDid(dataset.getDid)
version.setCreatorUid(ownerUser.getUid)
version.setName("v1")
version.setVersionHash("missing-version")
new DatasetVersionDao(getDSLContext.configuration()).insert(version)

val ex = intercept[NotFoundException] {
datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("missing.csv", 1L))
),
sessionUser
)
}

assertStatus(ex, 404)
}

it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in {
val dataset = new Dataset
dataset.setName("get-ds-no-repo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<div><b>Path:</b> {{ data.path }}</div>
<div><b>Size:</b> {{ data.size }}</div>

<div class="hint">An upload session already exists for this path.</div>
<div class="hint">{{ data.hint || "An upload session already exists for this path." }}</div>
</div>
</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface ConflictingFileModalData {
fileName: string;
path: string;
size: string;
hint?: string;
}

@Component({
Expand Down
Loading
Loading