Skip to content

Commit 9e78f49

Browse files
slfan1989thomaschow
authored andcommitted
Spark 4.0: Refactor Spark procedures to consistently use ProcedureInput for parameter handling. (apache#13913)
1 parent 21862ac commit 9e78f49

19 files changed

Lines changed: 207 additions & 125 deletions

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void testInvalidCherrypickSnapshotCases() {
184184

185185
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", catalogName))
186186
.isInstanceOf(IllegalArgumentException.class)
187-
.hasMessage("Cannot handle an empty identifier for argument table");
187+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
188188

189189
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', '2.2')", catalogName))
190190
.isInstanceOf(IllegalArgumentException.class)

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public void testInvalidExpireSnapshotsCases() {
189189

190190
assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", catalogName))
191191
.isInstanceOf(IllegalArgumentException.class)
192-
.hasMessage("Cannot handle an empty identifier for argument table");
192+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
193193
}
194194

195195
@TestTemplate

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void testInvalidFastForwardBranchCases() {
190190
assertThatThrownBy(
191191
() -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", catalogName))
192192
.isInstanceOf(IllegalArgumentException.class)
193-
.hasMessage("Cannot handle an empty identifier for argument table");
193+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
194194
}
195195

196196
@TestTemplate

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,6 @@ public void testInvalidApplyWapChangesCases() {
182182

183183
assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName))
184184
.isInstanceOf(IllegalArgumentException.class)
185-
.hasMessage("Cannot handle an empty identifier for argument table");
185+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
186186
}
187187
}

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public void testInvalidRemoveOrphanFilesCases() {
271271

272272
assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", catalogName))
273273
.isInstanceOf(IllegalArgumentException.class)
274-
.hasMessage("Cannot handle an empty identifier for argument table");
274+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
275275
}
276276

277277
@TestTemplate

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ public void testInvalidRewriteManifestsCases() {
326326

327327
assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", catalogName))
328328
.isInstanceOf(IllegalArgumentException.class)
329-
.hasMessage("Cannot handle an empty identifier for argument table");
329+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
330330
}
331331

332332
@TestTemplate

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,6 @@ public void testInvalidRollbackToSnapshotCases() {
281281

282282
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 1L)", catalogName))
283283
.isInstanceOf(IllegalArgumentException.class)
284-
.hasMessage("Cannot handle an empty identifier for argument table");
284+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
285285
}
286286
}

spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public void testInvalidRollbackToSnapshotCases() {
214214

215215
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName))
216216
.isInstanceOf(IllegalArgumentException.class)
217-
.hasMessage("Cannot parse identifier for arg table: 1");
217+
.hasMessage("Cannot parse identifier for parameter 'table': 1");
218218

219219
assertThatThrownBy(
220220
() -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 1L)", catalogName))
@@ -233,7 +233,7 @@ public void testInvalidRollbackToSnapshotCases() {
233233

234234
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName))
235235
.isInstanceOf(IllegalArgumentException.class)
236-
.hasMessage("Cannot handle an empty identifier for argument table");
236+
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
237237

238238
assertThatThrownBy(
239239
() ->

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure {
4545

4646
static final String NAME = "cherrypick_snapshot";
4747

48+
private static final ProcedureParameter TABLE_PARAM =
49+
requiredInParameter("table", DataTypes.StringType);
50+
private static final ProcedureParameter SNAPSHOT_ID_PARAM =
51+
requiredInParameter("snapshot_id", DataTypes.LongType);
52+
4853
private static final ProcedureParameter[] PARAMETERS =
49-
new ProcedureParameter[] {
50-
requiredInParameter("table", DataTypes.StringType),
51-
requiredInParameter("snapshot_id", DataTypes.LongType)
52-
};
54+
new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
5355

5456
private static final StructType OUTPUT_TYPE =
5557
new StructType(
@@ -83,8 +85,10 @@ public ProcedureParameter[] parameters() {
8385

8486
@Override
8587
public Iterator<Scan> call(InternalRow args) {
86-
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
87-
long snapshotId = args.getLong(1);
88+
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
89+
90+
Identifier tableIdent = input.ident(TABLE_PARAM);
91+
long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);
8892

8993
return asScanIterator(
9094
OUTPUT_TYPE,

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
2727
import org.apache.iceberg.spark.actions.SparkActions;
2828
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
29-
import org.apache.iceberg.util.DateTimeUtil;
3029
import org.apache.spark.sql.catalyst.InternalRow;
3130
import org.apache.spark.sql.connector.catalog.Identifier;
3231
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
5150

5251
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);
5352

53+
private static final ProcedureParameter TABLE_PARAM =
54+
requiredInParameter("table", DataTypes.StringType);
55+
private static final ProcedureParameter OLDER_THAN_PARAM =
56+
optionalInParameter("older_than", DataTypes.TimestampType);
57+
private static final ProcedureParameter RETAIN_LAST_PARAM =
58+
optionalInParameter("retain_last", DataTypes.IntegerType);
59+
private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
60+
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
61+
private static final ProcedureParameter STREAM_RESULTS_PARAM =
62+
optionalInParameter("stream_results", DataTypes.BooleanType);
63+
private static final ProcedureParameter SNAPSHOT_IDS_PARAM =
64+
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType));
65+
private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM =
66+
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType);
67+
5468
private static final ProcedureParameter[] PARAMETERS =
5569
new ProcedureParameter[] {
56-
requiredInParameter("table", DataTypes.StringType),
57-
optionalInParameter("older_than", DataTypes.TimestampType),
58-
optionalInParameter("retain_last", DataTypes.IntegerType),
59-
optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
60-
optionalInParameter("stream_results", DataTypes.BooleanType),
61-
optionalInParameter("snapshot_ids", DataTypes.createArrayType(DataTypes.LongType)),
62-
optionalInParameter("clean_expired_metadata", DataTypes.BooleanType)
70+
TABLE_PARAM,
71+
OLDER_THAN_PARAM,
72+
RETAIN_LAST_PARAM,
73+
MAX_CONCURRENT_DELETES_PARAM,
74+
STREAM_RESULTS_PARAM,
75+
SNAPSHOT_IDS_PARAM,
76+
CLEAN_EXPIRED_METADATA_PARAM
6377
};
6478

6579
private static final StructType OUTPUT_TYPE =
@@ -104,13 +118,14 @@ public ProcedureParameter[] parameters() {
104118
@Override
105119
@SuppressWarnings("checkstyle:CyclomaticComplexity")
106120
public Iterator<Scan> call(InternalRow args) {
107-
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
108-
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
109-
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
110-
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
111-
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
112-
long[] snapshotIds = args.isNullAt(5) ? null : args.getArray(5).toLongArray();
113-
Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : args.getBoolean(6);
121+
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
122+
Identifier tableIdent = input.ident(TABLE_PARAM);
123+
Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
124+
Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null);
125+
Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null);
126+
Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null);
127+
long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null);
128+
Boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null);
114129

115130
Preconditions.checkArgument(
116131
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,

0 commit comments

Comments
 (0)