Skip to content

Commit 6ac51d3

Browse files
authored
fix(generic): Preserve BLOBS path across copy iterations (#1450)
Changes in diff 1. Using job store to save ID to path mapping 2. Blobs serializer only parses the tree single depth, it uses job store to link parent & subfolders across different iterations
1 parent bca0c79 commit 6ac51d3

10 files changed

Lines changed: 369 additions & 195 deletions

File tree

extensions/data-transfer/portability-data-transfer-generic/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ plugins {
2121

2222
dependencies {
2323
compile project(':portability-spi-transfer')
24+
compile project(':portability-transfer')
2425
compile project(':portability-spi-cloud')
2526
compile project(':portability-types-common')
2627
compile "com.squareup.okhttp3:okhttp:${okHttpVersion}"
2728
compile "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
2829
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${jacksonVersion}"
30+
testCompile project(':portability-transfer').sourceSets.test.output
31+
testCompile project(':extensions:cloud:portability-cloud-local')
2932
testCompile "com.squareup.okhttp3:mockwebserver:${okHttpVersion}"
3033
testCompile "commons-fileupload:commons-fileupload:1.5"
3134
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.datatransferproject.datatransfer.generic;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import org.datatransferproject.types.common.models.DataModel;
6+
7+
public class BlobIdToName extends DataModel {
8+
9+
private Map<String, String> idToName;
10+
11+
public BlobIdToName() {
12+
this.idToName = new HashMap<>();
13+
}
14+
15+
public String get(String id) {
16+
return idToName.get(id);
17+
}
18+
19+
public String add(String id, String name) {
20+
return idToName.put(id, name);
21+
}
22+
}

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/BlobbySerializer.java

Lines changed: 101 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
import com.fasterxml.jackson.annotation.JsonSubTypes;
88
import com.fasterxml.jackson.annotation.JsonTypeInfo;
99
import com.fasterxml.jackson.annotation.JsonTypeName;
10+
import java.io.IOException;
1011
import java.time.ZonedDateTime;
11-
import java.util.ArrayDeque;
1212
import java.util.ArrayList;
1313
import java.util.List;
1414
import java.util.Optional;
15-
import java.util.Queue;
15+
import java.util.UUID;
16+
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
17+
import org.datatransferproject.transfer.JobMetadata;
1618
import org.datatransferproject.types.common.DownloadableItem;
1719
import org.datatransferproject.types.common.models.blob.BlobbyStorageContainerResource;
1820
import org.datatransferproject.types.common.models.blob.DigitalDocumentWrapper;
@@ -25,8 +27,8 @@
2527
* items the Importer has to download itself (some MEDIA items) from the same interface.
2628
*/
2729
class CachedDownloadableItem implements DownloadableItem {
28-
private String cachedId;
29-
private String name;
30+
private final String cachedId;
31+
private final String name;
3032

3133
public CachedDownloadableItem(String cachedId, String name) {
3234
this.cachedId = cachedId;
@@ -68,6 +70,15 @@ private FileExportData(String folder, String name, Optional<ZonedDateTime> dateM
6870
this.dateModified = dateModified;
6971
}
7072

73+
public static FileExportData fromDtpDigitalDocument(String path, DtpDigitalDocument model) {
74+
return new FileExportData(
75+
path,
76+
model.getName(),
77+
Optional.ofNullable(model.getDateModified())
78+
.filter(string -> !string.isEmpty())
79+
.map(dateString -> ZonedDateTime.parse(model.getDateModified())));
80+
}
81+
7182
public String getFolder() {
7283
return folder;
7384
}
@@ -79,15 +90,6 @@ public String getName() {
7990
public Optional<ZonedDateTime> getDateModified() {
8091
return dateModified;
8192
}
82-
83-
public static FileExportData fromDtpDigitalDocument(String path, DtpDigitalDocument model) {
84-
return new FileExportData(
85-
path,
86-
model.getName(),
87-
Optional.ofNullable(model.getDateModified())
88-
.filter(string -> !string.isEmpty())
89-
.map(dateString -> ZonedDateTime.parse(model.getDateModified())));
90-
}
9193
}
9294

9395
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@@ -106,72 +108,106 @@ public String getPath() {
106108
}
107109

108110
public class BlobbySerializer {
109-
@JsonSubTypes({
110-
@JsonSubTypes.Type(FolderExportData.class),
111-
@JsonSubTypes.Type(FileExportData.class),
112-
})
113-
public interface ExportData {}
114111

115-
static class BlobbyContainerPath {
116-
private BlobbyStorageContainerResource container;
117-
private String path;
112+
static final String SCHEMA_SOURCE =
113+
GenericTransferConstants.SCHEMA_SOURCE_BASE
114+
+ "/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/BlobbySerializer.java";
115+
private static final String BLOB_ID_TO_NAME_KEY = "blobIdToNameKey";
116+
private final TemporaryPerJobDataStore jobStore;
117+
private BlobIdToName blobIdToName;
118118

119-
public BlobbyContainerPath(BlobbyStorageContainerResource container, String path) {
120-
this.container = container;
121-
this.path = path;
122-
}
119+
public BlobbySerializer(TemporaryPerJobDataStore jobStore) {
120+
this.jobStore = jobStore;
121+
}
123122

124-
public BlobbyStorageContainerResource getContainer() {
125-
return container;
126-
}
123+
private void addToJobStore(String id, String name) throws IOException {
124+
initialiseBlobIdToNameIfNot(JobMetadata.getJobId());
125+
blobIdToName.add(id, name);
126+
}
127127

128-
public String getPath() {
129-
return path;
130-
}
128+
private void saveStateToStore() throws IOException {
129+
initialiseBlobIdToNameIfNot(JobMetadata.getJobId());
130+
131+
jobStore.create(JobMetadata.getJobId(), BLOB_ID_TO_NAME_KEY, blobIdToName);
131132
}
132133

133-
static final String SCHEMA_SOURCE =
134-
GenericTransferConstants.SCHEMA_SOURCE_BASE
135-
+ "/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/BlobbySerializer.java";
134+
private String getFromStore(String id) throws IOException {
135+
initialiseBlobIdToNameIfNot(JobMetadata.getJobId());
136+
return blobIdToName.get(id);
137+
}
138+
139+
private void initialiseBlobIdToNameIfNot(UUID jobId) throws IOException {
140+
if (blobIdToName == null) {
136141

137-
public static Iterable<ImportableData<ExportData>> serialize(
138-
BlobbyStorageContainerResource root) {
142+
blobIdToName = jobStore.findData(jobId, BLOB_ID_TO_NAME_KEY, BlobIdToName.class);
143+
144+
if (blobIdToName == null) {
145+
blobIdToName = new BlobIdToName();
146+
}
147+
}
148+
}
149+
150+
/**
151+
* Serializes a BlobbyStorageContainerResource into an iterable of ImportableData objects.
152+
*
153+
* <p>This method only serializes the tree up to a single depth, assuming that the exporter will
154+
* send separate BlobbyStorageContainerResource objects for subfolders. It also stores an
155+
* ID-to-path mapping for folders, which can be used to establish parent-child relationships in
156+
* separate iterations.
157+
*
158+
* @param root The BloppyStorageContainerResource to serialize.
159+
* @return An iterable of ImportableData objects representing the serialized data.
160+
*/
161+
public Iterable<ImportableData<ExportData>> serialize(BlobbyStorageContainerResource root)
162+
throws IOException {
139163
List<ImportableData<ExportData>> results = new ArrayList<>();
140-
// Search whole tree of container resource
141-
Queue<BlobbyContainerPath> horizon = new ArrayDeque<>();
142-
BlobbyContainerPath containerAndPath = new BlobbyContainerPath(root, "");
143-
do {
144-
BlobbyStorageContainerResource container = containerAndPath.getContainer();
145-
String parentPath = containerAndPath.getPath();
146-
String path = format("%s/%s", parentPath, container.getName());
147-
// Import the current folder
164+
165+
String currentFolderPath = getFromStore(root.getId());
166+
if (currentFolderPath == null) {
167+
currentFolderPath = "/" + root.getName();
168+
}
169+
170+
// Import the current folder
171+
results.add(
172+
new ImportableData<>(
173+
new GenericPayload<>(new FolderExportData(currentFolderPath), SCHEMA_SOURCE),
174+
root.getId(),
175+
currentFolderPath));
176+
177+
// Import all sub folders, not recursively
178+
for (BlobbyStorageContainerResource childFolder : root.getFolders()) {
179+
String path = format("%s/%s", currentFolderPath, childFolder.getName());
148180
results.add(
149181
new ImportableData<>(
150182
new GenericPayload<>(new FolderExportData(path), SCHEMA_SOURCE),
151-
container.getId(),
183+
childFolder.getId(),
152184
path));
185+
addToJobStore(childFolder.getId(), path);
186+
}
153187

154-
// Add all sub-folders to the search tree
155-
for (BlobbyStorageContainerResource child : container.getFolders()) {
156-
horizon.add(new BlobbyContainerPath(child, path));
157-
}
158-
159-
// Import all files in the current folder
160-
// Intentionally done after importing the current folder
161-
for (DigitalDocumentWrapper file : container.getFiles()) {
162-
results.add(
163-
new ImportableFileData<>(
164-
new CachedDownloadableItem(
165-
file.getCachedContentId(), file.getDtpDigitalDocument().getName()),
166-
file.getDtpDigitalDocument().getEncodingFormat(),
167-
new GenericPayload<>(
168-
FileExportData.fromDtpDigitalDocument(path, file.getDtpDigitalDocument()),
169-
SCHEMA_SOURCE),
170-
file.getCachedContentId(),
171-
file.getDtpDigitalDocument().getName()));
172-
}
173-
} while ((containerAndPath = horizon.poll()) != null);
188+
// Import all files in the current folder
189+
// Intentionally done after importing the current folder
190+
for (DigitalDocumentWrapper file : root.getFiles()) {
191+
results.add(
192+
new ImportableFileData<>(
193+
new CachedDownloadableItem(
194+
file.getCachedContentId(), file.getDtpDigitalDocument().getName()),
195+
file.getDtpDigitalDocument().getEncodingFormat(),
196+
new GenericPayload<>(
197+
FileExportData.fromDtpDigitalDocument(
198+
currentFolderPath, file.getDtpDigitalDocument()),
199+
SCHEMA_SOURCE),
200+
file.getCachedContentId(),
201+
file.getDtpDigitalDocument().getName()));
202+
}
174203

204+
saveStateToStore();
175205
return results;
176206
}
207+
208+
@JsonSubTypes({
209+
@JsonSubTypes.Type(FolderExportData.class),
210+
@JsonSubTypes.Type(FileExportData.class),
211+
})
212+
public interface ExportData {}
177213
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package org.datatransferproject.datatransfer.generic;
22

3+
import java.io.IOException;
34
import org.datatransferproject.types.common.models.ContainerResource;
45

56
@FunctionalInterface
67
public interface ContainerSerializer<C extends ContainerResource, R> {
7-
public Iterable<ImportableData<R>> apply(C containerResource);
8+
public Iterable<ImportableData<R>> apply(C containerResource) throws IOException;
89
}

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ public ImportResult importItem(
111111
TokensAndUrlAuthData initialAuthData,
112112
C data)
113113
throws Exception {
114-
115114
OAuthTokenManager tokenManager =
116115
jobTokenManagerMap.computeIfAbsent(
117116
jobId,
@@ -159,6 +158,7 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti
159158

160159
boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
161160
throws IOException, InvalidTokenException, DestinationMemoryFullException {
161+
162162
Request request =
163163
new Request.Builder()
164164
.url(endpoint)

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericTransferExtension.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,11 @@ public void initialize(ExtensionContext context) {
140140
}
141141

142142
if (serviceConfig.supportsVertical(BLOBS)) {
143+
BlobbySerializer serializer = new BlobbySerializer(jobStore);
143144
importerMap.put(
144145
BLOBS,
145-
new GenericFileImporter<BlobbyStorageContainerResource, BlobbySerializer.ExportData>(
146-
BlobbySerializer::serialize,
146+
new GenericFileImporter<>(
147+
serializer::serialize,
147148
appCredentials,
148149
urlAppend(serviceConfig.getEndpoint(), "blobs"),
149150
jobStore,

0 commit comments

Comments
 (0)