Skip to content

Commit b39955d

Browse files
committed
support pushdown limit
1 parent 03d24f1 commit b39955d

19 files changed

Lines changed: 174 additions & 21 deletions

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ class FlussAppendPartitionReader(
2828
tablePath: TablePath,
2929
projection: Array[Int],
3030
pushedPredicate: Option[Predicate],
31+
limit: Option[Int],
3132
flussPartition: FlussAppendInputPartition,
3233
flussConfig: Configuration)
33-
extends FlussPartitionReader(tablePath, flussConfig) {
34+
extends FlussPartitionReader(tablePath, flussConfig, limit) {
3435

3536
override protected lazy val projectedRowType: RowType = rowType.project(projection)
3637

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ abstract class FlussBatch(
3939
tablePath: TablePath,
4040
tableInfo: TableInfo,
4141
readSchema: StructType,
42+
limit: Option[Int],
4243
flussConfig: Configuration)
4344
extends Batch
4445
with AutoCloseable {
@@ -114,9 +115,10 @@ class FlussAppendBatch(
114115
readSchema: StructType,
115116
pushedPredicate: Option[Predicate],
116117
partitionPredicate: Option[Predicate],
118+
limit: Option[Int],
117119
options: CaseInsensitiveStringMap,
118120
flussConfig: Configuration)
119-
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
121+
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {
120122

121123
override val startOffsetsInitializer: OffsetsInitializer = {
122124
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
@@ -202,6 +204,7 @@ class FlussAppendBatch(
202204
tablePath,
203205
projection,
204206
pushedPredicate,
207+
limit,
205208
options,
206209
flussConfig)
207210
}
@@ -214,9 +217,10 @@ class FlussUpsertBatch(
214217
tableInfo: TableInfo,
215218
readSchema: StructType,
216219
partitionPredicate: Option[Predicate],
220+
limit: Option[Int],
217221
options: CaseInsensitiveStringMap,
218222
flussConfig: Configuration)
219-
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
223+
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {
220224

221225
override val startOffsetsInitializer: OffsetsInitializer = {
222226
val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
@@ -253,6 +257,6 @@ class FlussUpsertBatch(
253257
}
254258

255259
override def createReaderFactory(): PartitionReaderFactory = {
256-
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
260+
new FlussUpsertPartitionReaderFactory(tablePath, projection, limit, options, flussConfig)
257261
}
258262
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class FlussAppendMicroBatchStream(
271271
checkpointLocation) {
272272

273273
override def createReaderFactory(): PartitionReaderFactory = {
274-
new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
274+
new FlussAppendPartitionReaderFactory(tablePath, projection, None, None, options, flussConfig)
275275
}
276276

277277
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
@@ -352,6 +352,6 @@ class FlussUpsertMicroBatchStream(
352352
}
353353

354354
override def createReaderFactory(): PartitionReaderFactory = {
355-
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
355+
new FlussUpsertPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
356356
}
357357
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import org.apache.spark.sql.connector.read.PartitionReader
3434

3535
import java.time.Duration
3636

37-
abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configuration)
37+
abstract class FlussPartitionReader(
38+
tablePath: TablePath,
39+
flussConfig: Configuration,
40+
limit: Option[Int])
3841
extends PartitionReader[InternalRow]
3942
with Logging {
4043

@@ -57,6 +60,9 @@ abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configura
5760
def next0(): Boolean
5861

5962
override def next(): Boolean = {
63+
if (limit.isDefined && numRowsRead >= limit.get) {
64+
return false
65+
}
6066
val hasNext = next0()
6167
if (hasNext) {
6268
numRowsRead += 1

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class FlussAppendPartitionReaderFactory(
3030
tablePath: TablePath,
3131
projection: Array[Int],
3232
pushedPredicate: Option[Predicate],
33+
limit: Option[Int],
3334
options: CaseInsensitiveStringMap,
3435
flussConfig: Configuration)
3536
extends PartitionReaderFactory {
@@ -40,6 +41,7 @@ class FlussAppendPartitionReaderFactory(
4041
tablePath,
4142
projection,
4243
pushedPredicate,
44+
limit,
4345
flussPartition,
4446
flussConfig
4547
)
@@ -50,6 +52,7 @@ class FlussAppendPartitionReaderFactory(
5052
class FlussUpsertPartitionReaderFactory(
5153
tablePath: TablePath,
5254
projection: Array[Int],
55+
limit: Option[Int],
5356
options: CaseInsensitiveStringMap,
5457
flussConfig: Configuration)
5558
extends PartitionReaderFactory {
@@ -59,6 +62,7 @@ class FlussUpsertPartitionReaderFactory(
5962
new FlussUpsertPartitionReader(
6063
tablePath,
6164
projection,
65+
limit,
6266
upsertPartition,
6367
flussConfig
6468
)

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ trait FlussScan extends Scan {
4343

4444
def partitionPredicate: Option[FlussPredicate] = None
4545

46+
def limit: Option[Int] = None
47+
4648
protected def scanType: String
4749

4850
override def readSchema(): StructType = {
@@ -54,10 +56,14 @@ trait FlussScan extends Scan {
5456
val withPushed =
5557
if (pushedSparkPredicates.isEmpty) base
5658
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
57-
partitionPredicate match {
59+
val withPartition = partitionPredicate match {
5860
case Some(p) => s"$withPushed [PartitionFilter: $p]"
5961
case None => withPushed
6062
}
63+
limit match {
64+
case Some(l) => s"$withPartition [Limit: $l]"
65+
case None => withPartition
66+
}
6167
}
6268

6369
override def supportedCustomMetrics(): Array[CustomMetric] =
@@ -72,6 +78,7 @@ case class FlussAppendScan(
7278
pushedPredicate: Option[FlussPredicate],
7379
override val partitionPredicate: Option[FlussPredicate],
7480
override val pushedSparkPredicates: Seq[Predicate],
81+
override val limit: Option[Int],
7582
options: CaseInsensitiveStringMap,
7683
flussConfig: Configuration)
7784
extends FlussScan {
@@ -85,6 +92,7 @@ case class FlussAppendScan(
8592
readSchema,
8693
pushedPredicate,
8794
partitionPredicate,
95+
limit,
8896
options,
8997
flussConfig)
9098
}
@@ -107,6 +115,7 @@ case class FlussLakeAppendScan(
107115
requiredSchema: Option[StructType],
108116
pushedPredicate: Option[FlussPredicate],
109117
override val pushedSparkPredicates: Seq[Predicate],
118+
override val limit: Option[Int],
110119
options: CaseInsensitiveStringMap,
111120
flussConfig: Configuration)
112121
extends FlussScan {
@@ -119,6 +128,7 @@ case class FlussLakeAppendScan(
119128
tableInfo,
120129
readSchema,
121130
pushedPredicate,
131+
limit,
122132
options,
123133
flussConfig)
124134
}
@@ -140,14 +150,22 @@ case class FlussUpsertScan(
140150
tableInfo: TableInfo,
141151
requiredSchema: Option[StructType],
142152
override val partitionPredicate: Option[FlussPredicate],
153+
override val limit: Option[Int],
143154
options: CaseInsensitiveStringMap,
144155
flussConfig: Configuration)
145156
extends FlussScan {
146157

147158
override protected val scanType: String = "Upsert"
148159

149160
override def toBatch: Batch = {
150-
new FlussUpsertBatch(tablePath, tableInfo, readSchema, partitionPredicate, options, flussConfig)
161+
new FlussUpsertBatch(
162+
tablePath,
163+
tableInfo,
164+
readSchema,
165+
partitionPredicate,
166+
limit,
167+
options,
168+
flussConfig)
151169
}
152170

153171
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
@@ -168,6 +186,7 @@ case class FlussLakeUpsertScan(
168186
requiredSchema: Option[StructType],
169187
pushedPredicate: Option[FlussPredicate],
170188
override val pushedSparkPredicates: Seq[Predicate],
189+
override val limit: Option[Int],
171190
options: CaseInsensitiveStringMap,
172191
flussConfig: Configuration)
173192
extends FlussScan {
@@ -180,6 +199,7 @@ case class FlussLakeUpsertScan(
180199
tableInfo,
181200
readSchema,
182201
pushedPredicate,
202+
limit,
183203
options,
184204
flussConfig)
185205
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils}
2424
import org.apache.fluss.spark.utils.{SparkPartitionPredicate, SparkPredicateConverter}
2525

2626
import org.apache.spark.sql.connector.expressions.filter.Predicate
27-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
27+
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
2828
import org.apache.spark.sql.types.StructType
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3030

@@ -33,13 +33,22 @@ import java.util.{Collections, IdentityHashMap, Set => JSet}
3333
import scala.collection.JavaConverters._
3434

3535
/** An interface that extends from Spark [[ScanBuilder]]. */
36-
trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns {
36+
trait FlussScanBuilder
37+
extends ScanBuilder
38+
with SupportsPushDownRequiredColumns
39+
with SupportsPushDownLimit {
3740

3841
protected var requiredSchema: Option[StructType] = None
42+
protected var limit: Option[Int] = None
3943

4044
override def pruneColumns(requiredSchema: StructType): Unit = {
4145
this.requiredSchema = Some(requiredSchema)
4246
}
47+
48+
override def pushLimit(limit: Int): Boolean = {
49+
this.limit = Some(limit)
50+
true
51+
}
4352
}
4453

4554
/** Extracts a partition-key predicate so the scan can skip partitions that can't match. */
@@ -132,6 +141,7 @@ class FlussAppendScanBuilder(
132141
pushedPredicate,
133142
partitionPredicate,
134143
acceptedPredicates.toSeq,
144+
limit,
135145
options,
136146
flussConfig)
137147
}
@@ -152,6 +162,7 @@ class FlussLakeAppendScanBuilder(
152162
requiredSchema,
153163
pushedPredicate,
154164
acceptedPredicates.toSeq,
165+
limit,
155166
options,
156167
flussConfig)
157168
}
@@ -166,7 +177,14 @@ class FlussUpsertScanBuilder(
166177
extends FlussSupportsPushDownPartitionFilters {
167178

168179
override def build(): Scan = {
169-
FlussUpsertScan(tablePath, tableInfo, requiredSchema, partitionPredicate, options, flussConfig)
180+
FlussUpsertScan(
181+
tablePath,
182+
tableInfo,
183+
requiredSchema,
184+
partitionPredicate,
185+
limit,
186+
options,
187+
flussConfig)
170188
}
171189
}
172190

@@ -185,6 +203,7 @@ class FlussLakeUpsertScanBuilder(
185203
requiredSchema,
186204
pushedPredicate,
187205
acceptedPredicates.toSeq,
206+
limit,
188207
options,
189208
flussConfig)
190209
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ import scala.collection.mutable
4646
class FlussUpsertPartitionReader(
4747
tablePath: TablePath,
4848
projection: Array[Int],
49+
limit: Option[Int],
4950
flussPartition: FlussUpsertInputPartition,
5051
flussConfig: Configuration)
51-
extends FlussPartitionReader(tablePath, flussConfig)
52+
extends FlussPartitionReader(tablePath, flussConfig, limit)
5253
with Logging {
5354

5455
override protected lazy val projectedRowType: RowType = rowType.project(projectionWithPks)

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ class FlussLakeAppendBatch(
4040
tableInfo: TableInfo,
4141
readSchema: StructType,
4242
pushedPredicate: Option[FlussPredicate],
43+
limit: Option[Int],
4344
options: CaseInsensitiveStringMap,
4445
flussConfig: Configuration)
45-
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {
46+
extends FlussLakeBatch(tablePath, tableInfo, readSchema, limit, options, flussConfig) {
4647

4748
// Required by FlussLakeBatch but unused — lake snapshot determines start offsets.
4849
override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()
@@ -57,6 +58,7 @@ class FlussLakeAppendBatch(
5758
tablePath,
5859
projection,
5960
logTailPredicate,
61+
limit,
6062
options,
6163
flussConfig)
6264
} else {
@@ -66,6 +68,7 @@ class FlussLakeAppendBatch(
6668
projection,
6769
pushedPredicate,
6870
logTailPredicate,
71+
limit,
6972
flussConfig)
7073
}
7174
}

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ class FlussLakeAppendPartitionReader(
3333
partition: FlussLakeInputPartition,
3434
lakeSource: LakeSource[LakeSplit],
3535
projection: Array[Int],
36+
limit: Option[Int],
3637
flussConfig: Configuration)
37-
extends FlussPartitionReader(tablePath, flussConfig)
38+
extends FlussPartitionReader(tablePath, flussConfig, limit)
3839
with Logging {
3940

4041
private var recordIterator: CloseableIterator[LogRecord] = _

0 commit comments

Comments
 (0)