Skip to content

Commit 21862ac

Browse files
slfan1989thomaschow
authored andcommitted
API, Spark 4.0: Add create_file_list option to RewriteTablePathProcedure. (apache#13837)
1 parent 037197c commit 21862ac

6 files changed

Lines changed: 189 additions & 12 deletions

File tree

api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ public interface RewriteTablePath extends Action<RewriteTablePath, RewriteTableP
8686
*/
8787
RewriteTablePath stagingLocation(String stagingLocation);
8888

89+
/**
90+
* Whether to create the file list.
91+
*
92+
* <p>The default value is true, which means the file list will be created. If set to false, the
93+
* file list will not be created.
94+
*
95+
* @param createFileList true to create the file list, false to skip it
96+
* @return this instance for method chaining
97+
*/
98+
default RewriteTablePath createFileList(boolean createFileList) {
99+
return this;
100+
}
101+
89102
/** The action result that contains a summary of the execution. */
90103
interface Result {
91104
/** Staging location of rewritten files */
@@ -112,5 +125,15 @@ interface Result {
112125

113126
/** Name of latest metadata file version */
114127
String latestVersion();
128+
129+
/** Number of delete files with rewritten paths. */
130+
default int rewrittenDeleteFilePathsCount() {
131+
return 0;
132+
}
133+
134+
/** Number of manifest files with rewritten paths. */
135+
default int rewrittenManifestFilePathsCount() {
136+
return 0;
137+
}
115138
}
116139
}

core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,17 @@
2929
interface BaseRewriteTablePath extends RewriteTablePath {
3030

3131
@Value.Immutable
32-
interface Result extends RewriteTablePath.Result {}
32+
interface Result extends RewriteTablePath.Result {
33+
@Override
34+
@Value.Default
35+
default int rewrittenDeleteFilePathsCount() {
36+
return RewriteTablePath.Result.super.rewrittenDeleteFilePathsCount();
37+
}
38+
39+
@Override
40+
@Value.Default
41+
default int rewrittenManifestFilePathsCount() {
42+
return RewriteTablePath.Result.super.rewrittenManifestFilePathsCount();
43+
}
44+
}
3345
}

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,20 @@
2222
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2323
import static org.assertj.core.api.Assertions.atIndex;
2424

25+
import java.io.File;
26+
import java.io.IOException;
2527
import java.nio.file.Path;
2628
import java.util.List;
29+
import org.apache.iceberg.DeleteFile;
2730
import org.apache.iceberg.HasTableOperations;
2831
import org.apache.iceberg.ParameterizedTestExtension;
2932
import org.apache.iceberg.RewriteTablePathUtil;
3033
import org.apache.iceberg.Table;
3134
import org.apache.iceberg.TableUtil;
35+
import org.apache.iceberg.data.FileHelpers;
36+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
37+
import org.apache.iceberg.spark.SparkCatalogConfig;
38+
import org.apache.iceberg.util.Pair;
3239
import org.apache.spark.sql.AnalysisException;
3340
import org.junit.jupiter.api.AfterEach;
3441
import org.junit.jupiter.api.BeforeEach;
@@ -171,8 +178,93 @@ public void testProcedureWithInvalidInput() {
171178
"Cannot find provided version file %s in metadata log.", "v11.metadata.json");
172179
}
173180

181+
@TestTemplate
182+
public void testRewriteTablePathWithoutFileList() {
183+
String location = targetTableDir.toFile().toURI().toString();
184+
Table table = validationCatalog.loadTable(tableIdent);
185+
String metadataJson = TableUtil.metadataFileLocation(table);
186+
187+
List<Object[]> result =
188+
sql(
189+
"CALL %s.system.rewrite_table_path(table => '%s', source_prefix => '%s', target_prefix => '%s', create_file_list => false)",
190+
catalogName, tableIdent, table.location(), location);
191+
assertThat(result).hasSize(1);
192+
assertThat(result.get(0)[0])
193+
.as("Should return correct latest version")
194+
.isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
195+
assertThat(result.get(0)[1])
196+
.as("Check if file list location is correctly marked as N/A when not generated")
197+
.asString()
198+
.isEqualTo("N/A");
199+
}
200+
174201
private void checkFileListLocationCount(String fileListLocation, long expectedFileCount) {
175202
long fileCount = spark.read().format("text").load(fileListLocation).count();
176203
assertThat(fileCount).isEqualTo(expectedFileCount);
177204
}
205+
206+
@TestTemplate
207+
public void testRewriteTablePathWithManifestAndDeleteCounts() throws IOException {
208+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
209+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
210+
sql("INSERT INTO %s VALUES (3, 'c')", tableName);
211+
212+
Table table = validationCatalog.loadTable(tableIdent);
213+
List<Pair<CharSequence, Long>> rowsToDelete =
214+
Lists.newArrayList(
215+
Pair.of(
216+
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
217+
0L));
218+
219+
File file = new File(removePrefix(table.location()) + "/data/deletes.parquet");
220+
String filePath = file.toURI().toString();
221+
if (SparkCatalogConfig.REST.catalogName().equals(catalogName)) {
222+
// We applied this special handling because the base path for
223+
// matching the RESTCATALOG's Hive BaseLocation is represented
224+
// in the form of an AbsolutePath.
225+
filePath = file.getAbsolutePath().toString();
226+
}
227+
228+
DeleteFile positionDeletes =
229+
FileHelpers.writeDeleteFile(table, table.io().newOutputFile(filePath), rowsToDelete)
230+
.first();
231+
232+
table.newRowDelta().addDeletes(positionDeletes).commit();
233+
234+
sql("INSERT INTO %s VALUES (4, 'd')", tableName);
235+
236+
String targetLocation = targetTableDir.toFile().toURI().toString();
237+
String stagingLocation = staging.toFile().toURI().toString();
238+
239+
List<Object[]> result =
240+
sql(
241+
"CALL %s.system.rewrite_table_path("
242+
+ "table => '%s', "
243+
+ "source_prefix => '%s', "
244+
+ "target_prefix => '%s', "
245+
+ "staging_location => '%s', create_file_list => false)",
246+
catalogName, tableIdent, table.location(), targetLocation, stagingLocation);
247+
248+
assertThat(result).hasSize(1);
249+
Object[] row = result.get(0);
250+
251+
int rewrittenManifestFilesCount = ((Number) row[2]).intValue();
252+
int rewrittenDeleteFilesCount = ((Number) row[3]).intValue();
253+
254+
assertThat(rewrittenDeleteFilesCount)
255+
.as(
256+
"Expected exactly 1 delete file to be rewritten, but found "
257+
+ rewrittenDeleteFilesCount)
258+
.isEqualTo(1);
259+
260+
assertThat(rewrittenManifestFilesCount)
261+
.as(
262+
"Expected exactly 5 manifest files to be rewritten, but found "
263+
+ rewrittenManifestFilesCount)
264+
.isEqualTo(5);
265+
}
266+
267+
private String removePrefix(String path) {
268+
return path.substring(path.lastIndexOf(":") + 1);
269+
}
178270
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,14 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat
9191

9292
private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
9393
private static final String RESULT_LOCATION = "file-list";
94+
static final String NOT_APPLICABLE = "N/A";
9495

9596
private String sourcePrefix;
9697
private String targetPrefix;
9798
private String startVersionName;
9899
private String endVersionName;
99100
private String stagingDir;
101+
private boolean createFileList = true;
100102

101103
private final Table table;
102104
private Broadcast<Table> tableBroadcast = null;
@@ -150,6 +152,12 @@ public RewriteTablePath stagingLocation(String stagingLocation) {
150152
return this;
151153
}
152154

155+
@Override
156+
public RewriteTablePath createFileList(boolean createFileListFlag) {
157+
this.createFileList = createFileListFlag;
158+
return this;
159+
}
160+
153161
@Override
154162
public Result execute() {
155163
validateInputs();
@@ -158,12 +166,7 @@ public Result execute() {
158166
}
159167

160168
private Result doExecute() {
161-
String resultLocation = rebuildMetadata();
162-
return ImmutableRewriteTablePath.Result.builder()
163-
.stagingLocation(stagingDir)
164-
.fileListLocation(resultLocation)
165-
.latestVersion(RewriteTablePathUtil.fileName(endVersionName))
166-
.build();
169+
return rebuildMetadata();
167170
}
168171

169172
private void validateInputs() {
@@ -264,7 +267,7 @@ private String jobDesc() {
264267
* <li>Get all files needed to move
265268
* </ul>
266269
*/
267-
private String rebuildMetadata() {
270+
private Result rebuildMetadata() {
268271
TableMetadata startMetadata =
269272
startVersionName != null
270273
? ((HasTableOperations) newStaticTable(startVersionName, table.io()))
@@ -289,6 +292,7 @@ private String rebuildMetadata() {
289292
.reduce(new RewriteResult<>(), RewriteResult::append);
290293

291294
// rebuild manifest files
295+
Set<ManifestFile> metaFiles = rewriteManifestListResult.toRewrite();
292296
RewriteContentFileResult rewriteManifestResult =
293297
rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite());
294298

@@ -300,12 +304,24 @@ private String rebuildMetadata() {
300304
.collect(Collectors.toSet());
301305
rewritePositionDeletes(deleteFiles);
302306

307+
ImmutableRewriteTablePath.Result.Builder builder =
308+
ImmutableRewriteTablePath.Result.builder()
309+
.stagingLocation(stagingDir)
310+
.rewrittenDeleteFilePathsCount(deleteFiles.size())
311+
.rewrittenManifestFilePathsCount(metaFiles.size())
312+
.latestVersion(RewriteTablePathUtil.fileName(endVersionName));
313+
314+
if (!createFileList) {
315+
return builder.fileListLocation(NOT_APPLICABLE).build();
316+
}
317+
303318
Set<Pair<String, String>> copyPlan = Sets.newHashSet();
304319
copyPlan.addAll(rewriteVersionResult.copyPlan());
305320
copyPlan.addAll(rewriteManifestListResult.copyPlan());
306321
copyPlan.addAll(rewriteManifestResult.copyPlan());
322+
String fileListLocation = saveFileList(copyPlan);
307323

308-
return saveFileList(copyPlan);
324+
return builder.fileListLocation(fileListLocation).build();
309325
}
310326

311327
private String saveFileList(Set<Pair<String, String>> filesToMove) {

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class RewriteTablePathProcedure extends BaseProcedure {
5050
optionalInParameter("end_version", DataTypes.StringType);
5151
private static final ProcedureParameter STAGING_LOCATION_PARAM =
5252
optionalInParameter("staging_location", DataTypes.StringType);
53+
private static final ProcedureParameter CREATE_FILE_LIST_PARAM =
54+
optionalInParameter("create_file_list", DataTypes.BooleanType);
5355

5456
private static final ProcedureParameter[] PARAMETERS =
5557
new ProcedureParameter[] {
@@ -58,14 +60,22 @@ public class RewriteTablePathProcedure extends BaseProcedure {
5860
TARGET_PREFIX_PARAM,
5961
START_VERSION_PARAM,
6062
END_VERSION_PARM,
61-
STAGING_LOCATION_PARAM
63+
STAGING_LOCATION_PARAM,
64+
CREATE_FILE_LIST_PARAM
6265
};
6366

6467
private static final StructType OUTPUT_TYPE =
6568
new StructType(
6669
new StructField[] {
6770
new StructField("latest_version", DataTypes.StringType, true, Metadata.empty()),
68-
new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty())
71+
new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty()),
72+
new StructField(
73+
"rewritten_manifest_file_paths_count",
74+
DataTypes.IntegerType,
75+
true,
76+
Metadata.empty()),
77+
new StructField(
78+
"rewritten_delete_file_paths_count", DataTypes.IntegerType, true, Metadata.empty())
6979
});
7080

7181
public static SparkProcedures.ProcedureBuilder builder() {
@@ -100,6 +110,7 @@ public Iterator<Scan> call(InternalRow args) {
100110
String startVersion = input.asString(START_VERSION_PARAM, null);
101111
String endVersion = input.asString(END_VERSION_PARM, null);
102112
String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);
113+
boolean createFileList = input.asBoolean(CREATE_FILE_LIST_PARAM, true);
103114

104115
return withIcebergTable(
105116
tableIdent,
@@ -116,6 +127,8 @@ public Iterator<Scan> call(InternalRow args) {
116127
action.stagingLocation(stagingLocation);
117128
}
118129

130+
action.createFileList(createFileList);
131+
119132
return asScanIterator(
120133
OUTPUT_TYPE,
121134
toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute()));
@@ -126,7 +139,9 @@ private InternalRow[] toOutputRows(RewriteTablePath.Result result) {
126139
return new InternalRow[] {
127140
newInternalRow(
128141
UTF8String.fromString(result.latestVersion()),
129-
UTF8String.fromString(result.fileListLocation()))
142+
UTF8String.fromString(result.fileListLocation()),
143+
result.rewrittenManifestFilePathsCount(),
144+
result.rewrittenDeleteFilePathsCount())
130145
};
131146
}
132147

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.spark.actions;
2020

21+
import static org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_APPLICABLE;
2122
import static org.apache.iceberg.types.Types.NestedField.optional;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1234,6 +1235,24 @@ public void testNestedDirectoryStructurePreservation() throws Exception {
12341235
assertThat(targetPath2).startsWith(targetTableLocation());
12351236
}
12361237

1238+
@Test
1239+
public void testRewritePathWithoutCreateFileList() throws Exception {
1240+
String targetTableLocation = targetTableLocation();
1241+
1242+
RewriteTablePath.Result result =
1243+
actions()
1244+
.rewriteTablePath(table)
1245+
.rewriteLocationPrefix(tableLocation, targetTableLocation)
1246+
.createFileList(false) // Disable file list creation
1247+
.execute();
1248+
1249+
assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
1250+
1251+
assertThat(result.fileListLocation())
1252+
.as("File list location should not be set when createFileList is false")
1253+
.isEqualTo(NOT_APPLICABLE);
1254+
}
1255+
12371256
protected void checkFileNum(
12381257
int versionFileCount,
12391258
int manifestListCount,

0 commit comments

Comments
 (0)