Skip to content

Commit 717bee7

Browse files
authored
[spark] Implement SupportsPushDownLimit DSv2 interface (#3346)
* support pushdown limit * fix comments * rebase
1 parent 35beec1 commit 717bee7

19 files changed

Lines changed: 210 additions & 21 deletions

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ class FlussAppendPartitionReader(
2828
tablePath: TablePath,
2929
projection: Array[Int],
3030
pushedPredicate: Option[Predicate],
31+
limit: Option[Int],
3132
flussPartition: FlussAppendInputPartition,
3233
flussConfig: Configuration)
33-
extends FlussPartitionReader(tablePath, flussConfig) {
34+
extends FlussPartitionReader(tablePath, flussConfig, limit) {
3435

3536
override protected lazy val projectedRowType: RowType = rowType.project(projection)
3637

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ abstract class FlussBatch(
3939
tablePath: TablePath,
4040
tableInfo: TableInfo,
4141
readSchema: StructType,
42+
limit: Option[Int],
4243
flussConfig: Configuration)
4344
extends Batch
4445
with AutoCloseable {
@@ -114,9 +115,10 @@ class FlussAppendBatch(
114115
readSchema: StructType,
115116
pushedPredicate: Option[Predicate],
116117
partitionPredicate: Option[Predicate],
118+
limit: Option[Int],
117119
options: CaseInsensitiveStringMap,
118120
flussConfig: Configuration)
119-
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
121+
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {
120122

121123
override val startOffsetsInitializer: OffsetsInitializer = {
122124
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
@@ -202,6 +204,7 @@ class FlussAppendBatch(
202204
tablePath,
203205
projection,
204206
pushedPredicate,
207+
limit,
205208
options,
206209
flussConfig)
207210
}
@@ -214,9 +217,10 @@ class FlussUpsertBatch(
214217
tableInfo: TableInfo,
215218
readSchema: StructType,
216219
partitionPredicate: Option[Predicate],
220+
limit: Option[Int],
217221
options: CaseInsensitiveStringMap,
218222
flussConfig: Configuration)
219-
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
223+
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {
220224

221225
override val startOffsetsInitializer: OffsetsInitializer = {
222226
val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
@@ -253,6 +257,6 @@ class FlussUpsertBatch(
253257
}
254258

255259
override def createReaderFactory(): PartitionReaderFactory = {
256-
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
260+
new FlussUpsertPartitionReaderFactory(tablePath, projection, limit, options, flussConfig)
257261
}
258262
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class FlussAppendMicroBatchStream(
271271
checkpointLocation) {
272272

273273
override def createReaderFactory(): PartitionReaderFactory = {
274-
new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
274+
new FlussAppendPartitionReaderFactory(tablePath, projection, None, None, options, flussConfig)
275275
}
276276

277277
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
@@ -352,6 +352,6 @@ class FlussUpsertMicroBatchStream(
352352
}
353353

354354
override def createReaderFactory(): PartitionReaderFactory = {
355-
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
355+
new FlussUpsertPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
356356
}
357357
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import org.apache.spark.sql.connector.read.PartitionReader
3434

3535
import java.time.Duration
3636

37-
abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configuration)
37+
abstract class FlussPartitionReader(
38+
tablePath: TablePath,
39+
flussConfig: Configuration,
40+
limit: Option[Int])
3841
extends PartitionReader[InternalRow]
3942
with Logging {
4043

@@ -57,6 +60,9 @@ abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configura
5760
def next0(): Boolean
5861

5962
override def next(): Boolean = {
63+
if (limit.exists(numRowsRead >= _)) {
64+
return false
65+
}
6066
val hasNext = next0()
6167
if (hasNext) {
6268
numRowsRead += 1

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class FlussAppendPartitionReaderFactory(
3030
tablePath: TablePath,
3131
projection: Array[Int],
3232
pushedPredicate: Option[Predicate],
33+
limit: Option[Int],
3334
options: CaseInsensitiveStringMap,
3435
flussConfig: Configuration)
3536
extends PartitionReaderFactory {
@@ -40,6 +41,7 @@ class FlussAppendPartitionReaderFactory(
4041
tablePath,
4142
projection,
4243
pushedPredicate,
44+
limit,
4345
flussPartition,
4446
flussConfig
4547
)
@@ -50,6 +52,7 @@ class FlussAppendPartitionReaderFactory(
5052
class FlussUpsertPartitionReaderFactory(
5153
tablePath: TablePath,
5254
projection: Array[Int],
55+
limit: Option[Int],
5356
options: CaseInsensitiveStringMap,
5457
flussConfig: Configuration)
5558
extends PartitionReaderFactory {
@@ -59,6 +62,7 @@ class FlussUpsertPartitionReaderFactory(
5962
new FlussUpsertPartitionReader(
6063
tablePath,
6164
projection,
65+
limit,
6266
upsertPartition,
6367
flussConfig
6468
)

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ trait FlussScan extends Scan {
4343

4444
def partitionPredicate: Option[FlussPredicate] = None
4545

46+
def limit: Option[Int] = None
47+
4648
protected def scanType: String
4749

4850
override def readSchema(): StructType = {
@@ -54,10 +56,14 @@ trait FlussScan extends Scan {
5456
val withPushed =
5557
if (pushedSparkPredicates.isEmpty) base
5658
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
57-
partitionPredicate match {
59+
val withPartition = partitionPredicate match {
5860
case Some(p) => s"$withPushed [PartitionFilter: $p]"
5961
case None => withPushed
6062
}
63+
limit match {
64+
case Some(l) => s"$withPartition [Limit: $l]"
65+
case None => withPartition
66+
}
6167
}
6268

6369
override def supportedCustomMetrics(): Array[CustomMetric] =
@@ -72,6 +78,7 @@ case class FlussAppendScan(
7278
pushedPredicate: Option[FlussPredicate],
7379
override val partitionPredicate: Option[FlussPredicate],
7480
override val pushedSparkPredicates: Seq[Predicate],
81+
override val limit: Option[Int],
7582
options: CaseInsensitiveStringMap,
7683
flussConfig: Configuration)
7784
extends FlussScan {
@@ -85,6 +92,7 @@ case class FlussAppendScan(
8592
readSchema,
8693
pushedPredicate,
8794
partitionPredicate,
95+
limit,
8896
options,
8997
flussConfig)
9098
}
@@ -108,6 +116,7 @@ case class FlussLakeAppendScan(
108116
pushedPredicate: Option[FlussPredicate],
109117
override val partitionPredicate: Option[FlussPredicate],
110118
override val pushedSparkPredicates: Seq[Predicate],
119+
override val limit: Option[Int],
111120
options: CaseInsensitiveStringMap,
112121
flussConfig: Configuration)
113122
extends FlussScan {
@@ -121,6 +130,7 @@ case class FlussLakeAppendScan(
121130
readSchema,
122131
pushedPredicate,
123132
partitionPredicate,
133+
limit,
124134
options,
125135
flussConfig)
126136
}
@@ -142,14 +152,22 @@ case class FlussUpsertScan(
142152
tableInfo: TableInfo,
143153
requiredSchema: Option[StructType],
144154
override val partitionPredicate: Option[FlussPredicate],
155+
override val limit: Option[Int],
145156
options: CaseInsensitiveStringMap,
146157
flussConfig: Configuration)
147158
extends FlussScan {
148159

149160
override protected val scanType: String = "Upsert"
150161

151162
override def toBatch: Batch = {
152-
new FlussUpsertBatch(tablePath, tableInfo, readSchema, partitionPredicate, options, flussConfig)
163+
new FlussUpsertBatch(
164+
tablePath,
165+
tableInfo,
166+
readSchema,
167+
partitionPredicate,
168+
limit,
169+
options,
170+
flussConfig)
153171
}
154172

155173
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
@@ -171,6 +189,7 @@ case class FlussLakeUpsertScan(
171189
pushedPredicate: Option[FlussPredicate],
172190
override val partitionPredicate: Option[FlussPredicate],
173191
override val pushedSparkPredicates: Seq[Predicate],
192+
override val limit: Option[Int],
174193
options: CaseInsensitiveStringMap,
175194
flussConfig: Configuration)
176195
extends FlussScan {
@@ -184,6 +203,7 @@ case class FlussLakeUpsertScan(
184203
readSchema,
185204
pushedPredicate,
186205
partitionPredicate,
206+
limit,
187207
options,
188208
flussConfig)
189209
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils}
2424
import org.apache.fluss.spark.utils.{SparkPartitionPredicate, SparkPredicateConverter}
2525

2626
import org.apache.spark.sql.connector.expressions.filter.Predicate
27-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
27+
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
2828
import org.apache.spark.sql.types.StructType
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3030

@@ -33,13 +33,22 @@ import java.util.{Collections, IdentityHashMap, Set => JSet}
3333
import scala.collection.JavaConverters._
3434

3535
/** An interface that extends from Spark [[ScanBuilder]]. */
36-
trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns {
36+
trait FlussScanBuilder
37+
extends ScanBuilder
38+
with SupportsPushDownRequiredColumns
39+
with SupportsPushDownLimit {
3740

3841
protected var requiredSchema: Option[StructType] = None
42+
protected var limit: Option[Int] = None
3943

4044
override def pruneColumns(requiredSchema: StructType): Unit = {
4145
this.requiredSchema = Some(requiredSchema)
4246
}
47+
48+
override def pushLimit(limit: Int): Boolean = {
49+
this.limit = Some(limit)
50+
true
51+
}
4352
}
4453

4554
/** Extracts a partition-key predicate so the scan can skip partitions that can't match. */
@@ -133,6 +142,7 @@ class FlussAppendScanBuilder(
133142
pushedPredicate,
134143
partitionPredicate,
135144
acceptedPredicates.toSeq,
145+
limit,
136146
options,
137147
flussConfig)
138148
}
@@ -154,6 +164,7 @@ class FlussLakeAppendScanBuilder(
154164
pushedPredicate,
155165
partitionPredicate,
156166
acceptedPredicates.toSeq,
167+
limit,
157168
options,
158169
flussConfig)
159170
}
@@ -168,7 +179,14 @@ class FlussUpsertScanBuilder(
168179
extends FlussSupportsPushDownV2Filters {
169180

170181
override def build(): Scan = {
171-
FlussUpsertScan(tablePath, tableInfo, requiredSchema, partitionPredicate, options, flussConfig)
182+
FlussUpsertScan(
183+
tablePath,
184+
tableInfo,
185+
requiredSchema,
186+
partitionPredicate,
187+
limit,
188+
options,
189+
flussConfig)
172190
}
173191
}
174192

@@ -188,6 +206,7 @@ class FlussLakeUpsertScanBuilder(
188206
pushedPredicate,
189207
partitionPredicate,
190208
acceptedPredicates.toSeq,
209+
limit,
191210
options,
192211
flussConfig)
193212
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ import scala.collection.mutable
4646
class FlussUpsertPartitionReader(
4747
tablePath: TablePath,
4848
projection: Array[Int],
49+
limit: Option[Int],
4950
flussPartition: FlussUpsertInputPartition,
5051
flussConfig: Configuration)
51-
extends FlussPartitionReader(tablePath, flussConfig)
52+
extends FlussPartitionReader(tablePath, flussConfig, limit)
5253
with Logging {
5354

5455
override protected lazy val projectedRowType: RowType = rowType.project(projectionWithPks)

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ class FlussLakeAppendBatch(
4242
readSchema: StructType,
4343
pushedPredicate: Option[FlussPredicate],
4444
partitionPredicate: Option[FlussPredicate],
45+
limit: Option[Int],
4546
options: CaseInsensitiveStringMap,
4647
flussConfig: Configuration)
47-
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {
48+
extends FlussLakeBatch(tablePath, tableInfo, readSchema, limit, options, flussConfig) {
4849

4950
// Required by FlussLakeBatch but unused — lake snapshot determines start offsets.
5051
override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()
@@ -59,6 +60,7 @@ class FlussLakeAppendBatch(
5960
tablePath,
6061
projection,
6162
logTailPredicate,
63+
limit,
6264
options,
6365
flussConfig)
6466
} else {
@@ -68,6 +70,7 @@ class FlussLakeAppendBatch(
6870
projection,
6971
pushedPredicate,
7072
logTailPredicate,
73+
limit,
7174
flussConfig)
7275
}
7376
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ class FlussLakeAppendPartitionReader(
3333
partition: FlussLakeInputPartition,
3434
lakeSource: LakeSource[LakeSplit],
3535
projection: Array[Int],
36+
limit: Option[Int],
3637
flussConfig: Configuration)
37-
extends FlussPartitionReader(tablePath, flussConfig)
38+
extends FlussPartitionReader(tablePath, flussConfig, limit)
3839
with Logging {
3940

4041
private var recordIterator: CloseableIterator[LogRecord] = _

0 commit comments

Comments
 (0)