Skip to content

Commit 9a44459

Browse files
committed
CopyTable support for multiple source and destination prefixes
1 parent 58940f3 commit 9a44459

3 files changed

Lines changed: 343 additions & 255 deletions

File tree

core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java

Lines changed: 111 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -94,26 +94,28 @@ public Set<Pair<String, String>> copyPlan() {
9494
* Create a new table metadata object, replacing path references
9595
*
9696
* @param metadata source table metadata
97-
* @param sourcePrefix source prefix that will be replaced
98-
* @param targetPrefix target prefix that will replace it
97+
* @param prefixMappings source prefix amd target prefix mappings
9998
* @return copy of table metadata with paths replaced
10099
*/
101100
public static TableMetadata replacePaths(
102-
TableMetadata metadata, String sourcePrefix, String targetPrefix) {
103-
String newLocation = metadata.location().replaceFirst(sourcePrefix, targetPrefix);
104-
List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, sourcePrefix, targetPrefix);
101+
TableMetadata metadata, Map<String, String> prefixMappings) {
102+
Map.Entry<String, String> prefixMap = lookupPrefixMappings(metadata.location(), prefixMappings);
103+
if(prefixMap == null) {
104+
throw new IllegalArgumentException("unable to find prefix mapping for path: " + metadata.location());
105+
}
106+
List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, prefixMappings);
105107
List<TableMetadata.MetadataLogEntry> metadataLogEntries =
106-
updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix);
108+
updatePathInMetadataLogs(metadata, prefixMappings);
107109
long snapshotId =
108110
metadata.currentSnapshot() == null ? -1 : metadata.currentSnapshot().snapshotId();
109111
Map<String, String> properties =
110-
updateProperties(metadata.properties(), sourcePrefix, targetPrefix);
112+
updateProperties(metadata.properties(), prefixMappings);
111113

112114
return new TableMetadata(
113115
null,
114116
metadata.formatVersion(),
115117
metadata.uuid(),
116-
newLocation,
118+
prefixMap.getValue(),
117119
metadata.lastSequenceNumber(),
118120
metadata.lastUpdatedMillis(),
119121
metadata.lastColumnId(),
@@ -131,47 +133,48 @@ public static TableMetadata replacePaths(
131133
metadata.snapshotLog(),
132134
metadataLogEntries,
133135
metadata.refs(),
134-
updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix),
136+
updatePathInStatisticsFiles(metadata.statisticsFiles(), prefixMappings),
135137
updatePathInPartitionStatisticsFiles(
136-
metadata.partitionStatisticsFiles(), sourcePrefix, targetPrefix),
138+
metadata.partitionStatisticsFiles(), prefixMappings),
137139
metadata.nextRowId(),
138140
metadata.encryptionKeys(),
139141
metadata.changes());
140142
}
141143

142144
private static Map<String, String> updateProperties(
143-
Map<String, String> tableProperties, String sourcePrefix, String targetPrefix) {
145+
Map<String, String> tableProperties, Map<String, String> prefixMappings) {
144146
Map<String, String> properties = Maps.newHashMap(tableProperties);
145-
updatePathInProperty(properties, sourcePrefix, targetPrefix, TableProperties.OBJECT_STORE_PATH);
146-
updatePathInProperty(
147-
properties, sourcePrefix, targetPrefix, TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
148-
updatePathInProperty(
149-
properties, sourcePrefix, targetPrefix, TableProperties.WRITE_DATA_LOCATION);
150-
updatePathInProperty(
151-
properties, sourcePrefix, targetPrefix, TableProperties.WRITE_METADATA_LOCATION);
152-
147+
for (Map.Entry<String, String> entry : prefixMappings.entrySet()) {
148+
updatePathInProperty(properties, entry.getKey(), entry.getValue(), TableProperties.OBJECT_STORE_PATH);
149+
updatePathInProperty(
150+
properties, entry.getKey(), entry.getValue(), TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
151+
updatePathInProperty(
152+
properties, entry.getKey(), entry.getValue(), TableProperties.WRITE_DATA_LOCATION);
153+
updatePathInProperty(
154+
properties, entry.getKey(), entry.getValue(), TableProperties.WRITE_METADATA_LOCATION);
155+
}
153156
return properties;
154157
}
155158

156159
private static void updatePathInProperty(
157-
Map<String, String> properties,
158-
String sourcePrefix,
159-
String targetPrefix,
160-
String propertyName) {
160+
Map<String, String> properties,
161+
String sourcePrefix,
162+
String targetPrefix,
163+
String propertyName) {
161164
if (properties.containsKey(propertyName)) {
162165
properties.put(
163-
propertyName, newPath(properties.get(propertyName), sourcePrefix, targetPrefix));
166+
propertyName, newPath(properties.get(propertyName), sourcePrefix, targetPrefix));
164167
}
165168
}
166169

167170
private static List<StatisticsFile> updatePathInStatisticsFiles(
168-
List<StatisticsFile> statisticsFiles, String sourcePrefix, String targetPrefix) {
171+
List<StatisticsFile> statisticsFiles, Map<String, String> prefixMappings) {
169172
return statisticsFiles.stream()
170173
.map(
171174
existing ->
172175
new GenericStatisticsFile(
173176
existing.snapshotId(),
174-
newPath(existing.path(), sourcePrefix, targetPrefix),
177+
newPath(existing.path(), prefixMappings),
175178
existing.fileSizeInBytes(),
176179
existing.fileFooterSizeInBytes(),
177180
existing.blobMetadata()))
@@ -183,46 +186,44 @@ private static List<StatisticsFile> updatePathInStatisticsFiles(
183186
* sourcePrefix in the file paths with the targetPrefix.
184187
*
185188
* @param partitionStatisticsFiles The list of PartitionStatisticsFile to update.
186-
* @param sourcePrefix The prefix to be replaced in the file paths.
187-
* @param targetPrefix The new prefix to replace the sourcePrefix in the file paths.
189+
* @param prefixMappings The mappings between source prefix and destination prefix.
188190
* @return A new list of PartitionStatisticsFile with updated file paths.
189191
*/
190192
private static List<PartitionStatisticsFile> updatePathInPartitionStatisticsFiles(
191193
List<PartitionStatisticsFile> partitionStatisticsFiles,
192-
String sourcePrefix,
193-
String targetPrefix) {
194+
Map<String, String> prefixMappings) {
194195

195196
return partitionStatisticsFiles.stream()
196197
.map(
197198
existing ->
198199
ImmutableGenericPartitionStatisticsFile.builder()
199200
.snapshotId(existing.snapshotId())
200-
.path(newPath(existing.path(), sourcePrefix, targetPrefix))
201+
.path(newPath(existing.path(), prefixMappings))
201202
.fileSizeInBytes(existing.fileSizeInBytes())
202203
.build())
203204
.collect(Collectors.toList());
204205
}
205206

206207
private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs(
207-
TableMetadata metadata, String sourcePrefix, String targetPrefix) {
208+
TableMetadata metadata, Map<String, String> prefixMappings) {
208209
List<TableMetadata.MetadataLogEntry> metadataLogEntries =
209210
Lists.newArrayListWithCapacity(metadata.previousFiles().size());
210211
for (TableMetadata.MetadataLogEntry metadataLog : metadata.previousFiles()) {
211212
TableMetadata.MetadataLogEntry newMetadataLog =
212213
new TableMetadata.MetadataLogEntry(
213214
metadataLog.timestampMillis(),
214-
newPath(metadataLog.file(), sourcePrefix, targetPrefix));
215+
newPath(metadataLog.file(), prefixMappings));
215216
metadataLogEntries.add(newMetadataLog);
216217
}
217218
return metadataLogEntries;
218219
}
219220

220221
private static List<Snapshot> updatePathInSnapshots(
221-
TableMetadata metadata, String sourcePrefix, String targetPrefix) {
222+
TableMetadata metadata, Map<String, String> prefixMappings) {
222223
List<Snapshot> newSnapshots = Lists.newArrayListWithCapacity(metadata.snapshots().size());
223224
for (Snapshot snapshot : metadata.snapshots()) {
224225
String newManifestListLocation =
225-
newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix);
226+
newPath(snapshot.manifestListLocation(), prefixMappings);
226227
Snapshot newSnapshot =
227228
new BaseSnapshot(
228229
snapshot.sequenceNumber(),
@@ -248,8 +249,7 @@ private static List<Snapshot> updatePathInSnapshots(
248249
* @param io file io
249250
* @param tableMetadata metadata of table
250251
* @param manifestsToRewrite a list of manifest files to filter for rewrite
251-
* @param sourcePrefix source prefix that will be replaced
252-
* @param targetPrefix target prefix that will replace it
252+
* @param prefixMappings map of source prefix to target prefix mappings
253253
* @param stagingDir staging directory
254254
* @param outputPath location to write the manifest list
255255
* @return a copy plan for manifest files whose metadata were contained in the rewritten manifest
@@ -260,21 +260,24 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
260260
FileIO io,
261261
TableMetadata tableMetadata,
262262
Set<String> manifestsToRewrite,
263-
String sourcePrefix,
264-
String targetPrefix,
263+
Map<String, String> prefixMappings,
265264
String stagingDir,
266265
String outputPath) {
267266
RewriteResult<ManifestFile> result = new RewriteResult<>();
268267
OutputFile outputFile = io.newOutputFile(outputPath);
269268

270269
List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
270+
// Validate that all manifest files can be matched by at least one prefix
271271
manifestFiles.forEach(
272-
mf ->
273-
Preconditions.checkArgument(
274-
mf.path().startsWith(sourcePrefix),
275-
"Encountered manifest file %s not under the source prefix %s",
276-
mf.path(),
277-
sourcePrefix));
272+
mf -> {
273+
boolean hasMatchingPrefix = prefixMappings.keySet().stream()
274+
.anyMatch(prefix -> mf.path().startsWith(prefix));
275+
Preconditions.checkArgument(
276+
hasMatchingPrefix,
277+
"Encountered manifest file %s not matching any source prefix in %s",
278+
mf.path(),
279+
prefixMappings.keySet());
280+
});
278281

279282
EncryptionManager encryptionManager =
280283
(io instanceof EncryptingFileIO)
@@ -293,14 +296,14 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
293296

294297
for (ManifestFile file : manifestFiles) {
295298
ManifestFile newFile = file.copy();
296-
((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix));
299+
String newPath = newPath(newFile.path(), prefixMappings);
300+
((StructLike) newFile).set(0, newPath);
297301
writer.add(newFile);
298302

299303
if (manifestsToRewrite.contains(file.path())) {
300304
result.toRewrite().add(file);
301-
result
302-
.copyPlan()
303-
.add(Pair.of(stagingPath(file.path(), sourcePrefix, stagingDir), newFile.path()));
305+
String stagingFilePath = stagingPath(file.path(),prefixMappings, stagingDir);
306+
result.copyPlan().add(Pair.of(stagingFilePath != null ? stagingFilePath : file.path(), newPath));
304307
}
305308
}
306309
return result;
@@ -544,23 +547,22 @@ PositionDeleteWriter<Record> writer(
544547
* @param outputFile output file to rewrite delete file to
545548
* @param io file io
546549
* @param spec spec of delete file
547-
* @param sourcePrefix source prefix that will be replaced
548-
* @param targetPrefix target prefix to replace it
550+
* @param prefixMappings source prefix and target prefix mappings
549551
* @param posDeleteReaderWriter class to read and write position delete files
550552
*/
551553
public static void rewritePositionDeleteFile(
552554
DeleteFile deleteFile,
553555
OutputFile outputFile,
554556
FileIO io,
555557
PartitionSpec spec,
556-
String sourcePrefix,
557-
String targetPrefix,
558+
Map<String, String> prefixMappings,
558559
PositionDeleteReaderWriter posDeleteReaderWriter)
559560
throws IOException {
560561
String path = deleteFile.location();
561-
if (!path.startsWith(sourcePrefix)) {
562+
Map.Entry<String, String> matchEntry = lookupPrefixMappings(path, prefixMappings);
563+
if (matchEntry == null) {
562564
throw new UnsupportedOperationException(
563-
String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix));
565+
String.format("Expected delete file %s to find in prefixes", path));
564566
}
565567
InputFile sourceFile = io.newInputFile(path);
566568
try (CloseableIterable<Record> reader =
@@ -579,12 +581,12 @@ record = recordIt.next();
579581
posDeleteReaderWriter.writer(
580582
outputFile, deleteFile.format(), spec, deleteFile.partition(), rowSchema)) {
581583

582-
writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix));
584+
writer.write(newPositionDeleteRecord(record, matchEntry.getKey(), matchEntry.getValue()));
583585

584586
while (recordIt.hasNext()) {
585587
record = recordIt.next();
586588
if (record != null) {
587-
writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix));
589+
writer.write(newPositionDeleteRecord(record, matchEntry.getKey(), matchEntry.getValue()));
588590
}
589591
}
590592
}
@@ -608,6 +610,43 @@ private static PositionDelete newPositionDeleteRecord(
608610
return delete;
609611
}
610612

613+
/**
614+
* Lookup the longest matching prefix mapping for a given path.
615+
*
616+
* @param path the path to find a prefix mapping for
617+
* @param prefixMappings map of source prefix to target prefix mappings
618+
* @return the Map.Entry with the longest matching source prefix, or null if no match found
619+
*/
620+
public static Map.Entry<String, String> lookupPrefixMappings(
621+
String path, Map<String, String> prefixMappings) {
622+
if (prefixMappings == null || prefixMappings.isEmpty() || path == null) {
623+
return null;
624+
}
625+
626+
// Sort entries by key length in descending order to find the longest matching prefix
627+
return prefixMappings.entrySet().stream()
628+
.filter(entry -> path.startsWith(entry.getKey()))
629+
.max(java.util.Comparator.comparing(entry -> entry.getKey().length()))
630+
.orElse(null);
631+
}
632+
633+
public static String newPath(String path, Map<String, String> prefixMappings) {
634+
if (prefixMappings == null || prefixMappings.isEmpty()) {
635+
return path;
636+
}
637+
638+
// Use the centralized lookup method to find the longest matching prefix
639+
Map.Entry<String, String> entry = lookupPrefixMappings(path, prefixMappings);
640+
if (entry != null) {
641+
String sourcePrefix = entry.getKey();
642+
String targetPrefix = entry.getValue();
643+
return path.replaceFirst(sourcePrefix, targetPrefix != null ? targetPrefix : "");
644+
}
645+
646+
// No matching prefix found, return original path
647+
return path;
648+
}
649+
611650
/**
612651
* Replace path reference
613652
*
@@ -625,6 +664,7 @@ public static String combinePaths(String absolutePath, String relativePath) {
625664
return maybeAppendFileSeparator(absolutePath) + relativePath;
626665
}
627666

667+
628668
/** Returns the file name of a path. */
629669
public static String fileName(String path) {
630670
String filename = path;
@@ -662,4 +702,18 @@ public static String stagingPath(String originalPath, String sourcePrefix, Strin
662702
String relativePath = relativize(originalPath, sourcePrefix);
663703
return combinePaths(stagingDir, relativePath);
664704
}
705+
706+
/**
707+
* Construct a staging path under a given staging directory, preserving relative directory
708+
* structure to avoid conflicts when multiple files have the same name but different paths.
709+
*
710+
* @param originalPath source path
711+
* @param prefixMappings source prefix and target prefix mappings
712+
* @param stagingDir staging directory
713+
* @return a staging path under the staging directory that preserves the relative path structure
714+
*/
715+
public static String stagingPath(String originalPath, Map<String, String> prefixMappings, String stagingDir) {
716+
String relativePath = newPath(originalPath, prefixMappings);
717+
return combinePaths(stagingDir, relativePath);
718+
}
665719
}

0 commit comments

Comments
 (0)