Skip to content

Commit 25b7b0d

Browse files
[flink][spark] Support dry_run in drop_global_index procedure (#8309)
1 parent 44489c2 commit 25b7b0d

6 files changed

Lines changed: 250 additions & 7 deletions

File tree

docs/docs/flink/procedures.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,14 +1053,16 @@ All available procedures are listed below.
10531053
`table` => 'table',<br/>
10541054
`index_column` => 'columnName',<br/>
10551055
`index_type` => 'indexType',<br/>
1056-
`partitions` => 'partitions')<br/>
1056+
`partitions` => 'partitions',<br/>
1057+
`dry_run` => dryRun)<br/>
10571058
</td>
10581059
<td>
10591060
To drop global index files from a table. Arguments:
10601061
<li>table(required): the target table identifier.</li>
10611062
<li>index_column(required): the column name for which to drop the index.</li>
10621063
<li>index_type(required): the type of global index to drop, e.g., 'btree'.</li>
10631064
<li>partitions(optional): partition specification for selective index deletion.</li>
1065+
<li>dry_run(optional): when true, report how many index files would be dropped without committing any change. Default is false.</li>
10641066
</td>
10651067
<td>
10661068
-- Drop all btree indexes for column 'name'<br/>
@@ -1073,7 +1075,13 @@ All available procedures are listed below.
10731075
`table` => 'default.T',<br/>
10741076
`index_column` => 'name',<br/>
10751077
`index_type` => 'btree',<br/>
1076-
`partitions` => 'pt=p1;pt=p2')
1078+
`partitions` => 'pt=p1;pt=p2')<br/><br/>
1079+
-- Preview what would be dropped without deleting<br/>
1080+
CALL sys.drop_global_index(<br/>
1081+
`table` => 'default.T',<br/>
1082+
`index_column` => 'name',<br/>
1083+
`index_type` => 'btree',<br/>
1084+
`dry_run` => true)
10771085
</td>
10781086
</tr>
10791087
<tr>

docs/docs/spark/procedures.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,9 +538,12 @@ This section introduce all available spark procedures about paimon.
538538
<li>index_column: the name of the indexed column. Cannot be empty.</li>
539539
<li>index_type: type of the index to drop, e.g. 'btree'. Cannot be empty.</li>
540540
<li>partitions: partition filter to limit the partitions from which to drop the index. The comma (",") represents "AND", the semicolon (";") represents "OR". Left empty for all partitions.</li>
541+
<li>dry_run: when true, return the number of index files that would be dropped without committing any change. Default is false.</li>
541542
</td>
542543
<td>
543-
CALL sys.drop_global_index(table => 'default.T', index_column => 'name', index_type => 'btree', partitions => 'pt=p1')
544+
CALL sys.drop_global_index(table => 'default.T', index_column => 'name', index_type => 'btree', partitions => 'pt=p1')<br/><br/>
545+
-- Preview what would be dropped without deleting<br/>
546+
CALL sys.drop_global_index(table => 'default.T', index_column => 'name', index_type => 'btree', dry_run => true)
544547
</td>
545548
</tr>
546549
<tr>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import javax.annotation.Nullable;
45+
4446
import java.util.ArrayList;
4547
import java.util.Arrays;
4648
import java.util.List;
@@ -71,14 +73,16 @@ public String identifier() {
7173
@ArgumentHint(
7274
name = "partitions",
7375
type = @DataTypeHint("STRING"),
74-
isOptional = true)
76+
isOptional = true),
77+
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true)
7578
})
7679
public String[] call(
7780
ProcedureContext procedureContext,
7881
String tableId,
7982
String indexColumn,
8083
String indexType,
81-
String partitions)
84+
@Nullable String partitions,
85+
@Nullable Boolean dryRun)
8286
throws Exception {
8387

8488
FileStoreTable table = (FileStoreTable) table(tableId);
@@ -142,6 +146,21 @@ public String[] call(
142146
columnsDesc,
143147
table.name());
144148

149+
// Dry run: report what would be dropped without committing any change.
150+
if (dryRun != null && dryRun) {
151+
return new String[] {
152+
"Dry run: "
153+
+ waitToDelete.size()
154+
+ " "
155+
+ indexTypeLower
156+
+ " global index files would be dropped for columns '"
157+
+ columnsDesc
158+
+ "' on table '"
159+
+ table.name()
160+
+ "'"
161+
};
162+
}
163+
145164
if (waitToDelete.isEmpty()) {
146165
return new String[] {
147166
"No " + indexTypeLower + " global index found for columns '" + columnsDesc + "'"

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,153 @@ public void testDropBtreeGlobalIndex() throws Exception {
115115
assertThat(btreeEntries).isEmpty();
116116
}
117117

118+
@Test
119+
public void testDropGlobalIndexDryRun() throws Exception {
120+
sql(
121+
"CREATE TABLE T ("
122+
+ " id INT,"
123+
+ " name STRING"
124+
+ ") WITH ("
125+
+ " 'bucket' = '-1',"
126+
+ " 'global-index.row-count-per-shard' = '10000',"
127+
+ " 'row-tracking.enabled' = 'true',"
128+
+ " 'data-evolution.enabled' = 'true'"
129+
+ ")");
130+
131+
FileStoreTable table = paimonTable("T");
132+
BatchWriteBuilder builder = table.newBatchWriteBuilder();
133+
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
134+
for (int i = 0; i < 100000; i++) {
135+
batchTableWrite.write(GenericRow.of(i, BinaryString.fromString("name_" + i)));
136+
}
137+
List<CommitMessage> commitMessages = batchTableWrite.prepareCommit();
138+
BatchTableCommit commit = builder.newCommit();
139+
commit.commit(commitMessages);
140+
commit.close();
141+
}
142+
143+
tEnv.getConfig()
144+
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
145+
sql(
146+
"CALL sys.create_global_index(`table` => 'default.T', "
147+
+ "`index_column` => 'name', "
148+
+ "`index_type` => 'btree')");
149+
table = paimonTable("T");
150+
List<IndexManifestEntry> btreeEntries =
151+
table.store().newIndexFileHandler().scanEntries().stream()
152+
.filter(entry -> entry.indexFile().indexType().equals("btree"))
153+
.collect(Collectors.toList());
154+
assertThat(btreeEntries).isNotEmpty();
155+
156+
// Dry run: should report how many would be dropped, but keep the index intact.
157+
List<Row> dryRunResult =
158+
sql(
159+
"CALL sys.drop_global_index(`table` => 'default.T', "
160+
+ "`index_column` => 'name', "
161+
+ "`index_type` => 'btree', "
162+
+ "`dry_run` => true)");
163+
assertThat(dryRunResult).hasSize(1);
164+
assertThat(dryRunResult.get(0).getField(0))
165+
.isInstanceOf(String.class)
166+
.asString()
167+
.contains("Dry run")
168+
.contains(String.valueOf(btreeEntries.size()))
169+
.contains("btree")
170+
.contains("name");
171+
172+
// Index files must still be present after a dry run.
173+
table = paimonTable("T");
174+
List<IndexManifestEntry> afterDryRun =
175+
table.store().newIndexFileHandler().scanEntries().stream()
176+
.filter(entry -> entry.indexFile().indexType().equals("btree"))
177+
.collect(Collectors.toList());
178+
assertThat(afterDryRun).hasSameSizeAs(btreeEntries);
179+
}
180+
181+
@Test
182+
public void testDropGlobalIndexDryRunWithPartition() throws Exception {
183+
sql(
184+
"CREATE TABLE T ("
185+
+ " id INT,"
186+
+ " name STRING,"
187+
+ " pt STRING"
188+
+ ") PARTITIONED BY (pt) WITH ("
189+
+ " 'bucket' = '-1',"
190+
+ " 'global-index.row-count-per-shard' = '10000',"
191+
+ " 'row-tracking.enabled' = 'true',"
192+
+ " 'data-evolution.enabled' = 'true'"
193+
+ ")");
194+
195+
FileStoreTable table = paimonTable("T");
196+
BatchWriteBuilder builder = table.newBatchWriteBuilder();
197+
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
198+
for (int i = 0; i < 20000; i++) {
199+
batchTableWrite.write(
200+
GenericRow.of(
201+
i,
202+
BinaryString.fromString("name_" + i),
203+
BinaryString.fromString("p0")));
204+
}
205+
for (int i = 0; i < 20000; i++) {
206+
batchTableWrite.write(
207+
GenericRow.of(
208+
i,
209+
BinaryString.fromString("name_" + i),
210+
BinaryString.fromString("p1")));
211+
}
212+
List<CommitMessage> commitMessages = batchTableWrite.prepareCommit();
213+
BatchTableCommit commit = builder.newCommit();
214+
commit.commit(commitMessages);
215+
commit.close();
216+
}
217+
218+
tEnv.getConfig()
219+
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
220+
sql(
221+
"CALL sys.create_global_index(`table` => 'default.T', "
222+
+ "`index_column` => 'name', "
223+
+ "`index_type` => 'btree')");
224+
225+
table = paimonTable("T");
226+
List<IndexManifestEntry> before =
227+
table.store().newIndexFileHandler().scanEntries().stream()
228+
.filter(entry -> entry.indexFile().indexType().equals("btree"))
229+
.collect(Collectors.toList());
230+
assertThat(before).isNotEmpty();
231+
232+
// Dry run scoped to one partition.
233+
String partitionMsg =
234+
(String)
235+
sql("CALL sys.drop_global_index(`table` => 'default.T', "
236+
+ "`index_column` => 'name', "
237+
+ "`index_type` => 'btree', "
238+
+ "`partitions` => 'pt=p1', "
239+
+ "`dry_run` => true)")
240+
.get(0)
241+
.getField(0);
242+
assertThat(partitionMsg).contains("Dry run").contains("btree");
243+
244+
// Dry run over all partitions reports a different (larger) count, proving the
245+
// partition filter narrows the preview.
246+
String allMsg =
247+
(String)
248+
sql("CALL sys.drop_global_index(`table` => 'default.T', "
249+
+ "`index_column` => 'name', "
250+
+ "`index_type` => 'btree', "
251+
+ "`dry_run` => true)")
252+
.get(0)
253+
.getField(0);
254+
assertThat(allMsg).contains("Dry run");
255+
assertThat(partitionMsg).isNotEqualTo(allMsg);
256+
257+
// Neither dry run committed anything.
258+
List<IndexManifestEntry> after =
259+
table.store().newIndexFileHandler().scanEntries().stream()
260+
.filter(entry -> entry.indexFile().indexType().equals("btree"))
261+
.collect(Collectors.toList());
262+
assertThat(after).hasSameSizeAs(before);
263+
}
264+
118265
@Test
119266
public void testDropBtreeGlobalIndexWithPartition() throws Exception {
120267
sql(

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,15 @@ public class DropGlobalIndexProcedure extends BaseProcedure {
6767
ProcedureParameter.required("index_column", DataTypes.StringType),
6868
ProcedureParameter.required("index_type", DataTypes.StringType),
6969
ProcedureParameter.optional("partitions", StringType),
70+
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
7071
};
7172

7273
private static final StructType OUTPUT_TYPE =
7374
new StructType(
7475
new StructField[] {
75-
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
76+
new StructField("result", DataTypes.BooleanType, true, Metadata.empty()),
77+
new StructField(
78+
"dropped_file_count", DataTypes.LongType, true, Metadata.empty())
7679
});
7780

7881
protected DropGlobalIndexProcedure(TableCatalog tableCatalog) {
@@ -103,6 +106,7 @@ public InternalRow[] call(InternalRow args) {
103106
(args.isNullAt(3) || StringUtils.isNullOrWhitespaceOnly(args.getString(3)))
104107
? null
105108
: args.getString(3);
109+
boolean dryRun = !args.isNullAt(4) && args.getBoolean(4);
106110

107111
String finalWhere = partitions != null ? SparkProcedureUtils.toWhere(partitions) : null;
108112

@@ -172,6 +176,18 @@ public InternalRow[] call(InternalRow args) {
172176
"Waiting for global index to be deleted size: "
173177
+ waitDelete.size());
174178

179+
// Dry run: report how many would be dropped, commit nothing.
180+
if (dryRun) {
181+
return new InternalRow[] {
182+
newInternalRow(true, (long) waitDelete.size())
183+
};
184+
}
185+
186+
// Nothing matched: avoid committing an empty change.
187+
if (waitDelete.isEmpty()) {
188+
return new InternalRow[] {newInternalRow(true, 0L)};
189+
}
190+
175191
Map<BinaryRow, List<IndexFileMeta>> deleteEntries =
176192
waitDelete.stream()
177193
.map(IndexManifestEntry::toDeleteEntry)
@@ -202,7 +218,7 @@ public InternalRow[] call(InternalRow args) {
202218
commit.commit(commitMessages);
203219
}
204220

205-
return new InternalRow[] {newInternalRow(true)};
221+
return new InternalRow[] {newInternalRow(true, (long) waitDelete.size())};
206222
} catch (Exception e) {
207223
throw new RuntimeException(
208224
String.format(

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class DropGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest {
6363
val totalRowCount = btreeEntries.map(_.indexFile().rowCount()).sum
6464
assert(totalRowCount == 100000L)
6565

66+
val droppedCount = btreeEntries.size
6667
output = spark
6768
.sql("CALL sys.drop_global_index(table => 'test.T', index_column => 'name', index_type => 'btree')")
6869
.collect()
6970
.head
7071

7172
assert(output.getBoolean(0))
73+
assert(output.getLong(1) == droppedCount)
7274

7375
table = loadTable("T")
7476
btreeEntries = table
@@ -81,6 +83,54 @@ class DropGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest {
8183
}
8284
}
8385

86+
test("drop btree global index dry run") {
87+
withTable("T") {
88+
spark.sql("""
89+
|CREATE TABLE T (id INT, name STRING)
90+
|TBLPROPERTIES (
91+
| 'bucket' = '-1',
92+
| 'global-index.row-count-per-shard' = '10000',
93+
| 'row-tracking.enabled' = 'true',
94+
| 'data-evolution.enabled' = 'true')
95+
|""".stripMargin)
96+
97+
val values =
98+
(0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
99+
spark.sql(s"INSERT INTO T VALUES $values")
100+
101+
spark
102+
.sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree')")
103+
.collect()
104+
105+
var table = loadTable("T")
106+
val before = table
107+
.store()
108+
.newIndexFileHandler()
109+
.scanEntries()
110+
.asScala
111+
.filter(_.indexFile().indexType() == "btree")
112+
assert(before.nonEmpty)
113+
114+
// Dry run: reports the would-drop count but commits nothing.
115+
val output = spark
116+
.sql("CALL sys.drop_global_index(table => 'test.T', index_column => 'name', index_type => 'btree', dry_run => true)")
117+
.collect()
118+
.head
119+
assert(output.getBoolean(0))
120+
assert(output.getLong(1) == before.size)
121+
122+
// Index files must still be present after a dry run.
123+
table = loadTable("T")
124+
val after = table
125+
.store()
126+
.newIndexFileHandler()
127+
.scanEntries()
128+
.asScala
129+
.filter(_.indexFile().indexType() == "btree")
130+
assert(after.size == before.size)
131+
}
132+
}
133+
84134
test("create btree global index with partition") {
85135
withTable("T") {
86136
spark.sql("""

0 commit comments

Comments
 (0)