Skip to content

Commit e57247b

Browse files
authored
Spark 3.4: Backport Async Micro Batch Planner to 3.4 (#16311)
Backport of #15992 to spark/v3.4. Stacked on PR #16307 (#15683 SerializableFileIOWithSize), which is itself a backport. Adaptations from the source PR: - SparkMicroBatchStream.java was replaced wholesale with the v3.5 post-#15992 version because v3.4 had structural drift; the refactor extracts the planning logic into the new planner classes and there are no v3.4-only features in this file. - TestStructuredStreamingRead3.java was likewise replaced with the v3.5 version (which adds parameterized sync/async coverage). The only non-mechanical change is using 'SparkCatalogConfig.SPARK' instead of 'SparkCatalogConfig.SPARK_SESSION', because v3.4 still uses the older enum name.
1 parent bdbb375 commit e57247b

12 files changed

Lines changed: 1588 additions & 347 deletions

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,39 @@ public int maxRecordsPerMicroBatch() {
265265
.parse();
266266
}
267267

268+
public boolean asyncMicroBatchPlanningEnabled() {
269+
return confParser
270+
.booleanConf()
271+
.option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
272+
.sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
273+
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
274+
.parse();
275+
}
276+
277+
public long streamingSnapshotPollingIntervalMs() {
278+
return confParser
279+
.longConf()
280+
.option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
281+
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
282+
.parse();
283+
}
284+
285+
public long asyncQueuePreloadFileLimit() {
286+
return confParser
287+
.longConf()
288+
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
289+
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
290+
.parse();
291+
}
292+
293+
public long asyncQueuePreloadRowLimit() {
294+
return confParser
295+
.longConf()
296+
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
297+
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
298+
.parse();
299+
}
300+
268301
public boolean preserveDataGrouping() {
269302
return confParser
270303
.booleanConf()

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,21 @@ private SparkReadOptions() {}
9292
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
9393
"streaming-max-rows-per-micro-batch";
9494

95+
// Enable async micro batch planning
96+
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
97+
"async-micro-batch-planning-enabled";
98+
99+
// Polling interval for async planner to refresh table metadata (ms)
100+
public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
101+
"streaming-snapshot-polling-interval-ms";
102+
public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 30000L;
103+
104+
// Initial queue preload limits for async micro batch planner
105+
public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = "async-queue-preload-file-limit";
106+
public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
107+
public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = "async-queue-preload-row-limit";
108+
public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
109+
95110
// Table path
96111
public static final String PATH = "path";
97112

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,9 @@ private SparkSQLProperties() {}
106106
// Controls whether to report available column statistics to Spark for query optimization.
107107
public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats";
108108
public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
109+
110+
// Controls whether to enable async micro batch planning for session
111+
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
112+
"spark.sql.iceberg.async-micro-batch-planning-enabled";
113+
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;
109114
}

0 commit comments

Comments
 (0)