Skip to content

Commit ab7e2ab

Browse files
committed
fix comments
1 parent b39955d commit ab7e2ab

5 files changed

Lines changed: 39 additions & 3 deletions

File tree

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ abstract class FlussPartitionReader(
6060
def next0(): Boolean
6161

6262
override def next(): Boolean = {
63-
if (limit.isDefined && numRowsRead >= limit.get) {
63+
if (limit.exists(numRowsRead >= _)) {
6464
return false
6565
}
6666
val hasNext = next0()

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,14 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
656656

657657
val dfLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t LIMIT 2")
658658
assert(flussAppendScans(dfLimit).flatMap(_.limit).distinct == Seq(2))
659+
660+
// Verify limit pushdown actually reduces rows read via metrics
661+
dfLimit.collect()
662+
val batchScanExec = dfLimit.queryExecution.executedPlan.collectFirst {
663+
case b: BatchScanExec => b
664+
}.get
665+
val numRowsRead = batchScanExec.metrics(FlussMetrics.NUM_ROWS_READ).value
666+
assert(numRowsRead == 2L, s"Expected 2 rows read with limit pushdown, got $numRowsRead")
659667
}
660668
}
661669
}

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,16 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
453453
val dfNoLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t")
454454
assert(flussUpsertScan(dfNoLimit).flatMap(_.limit).isEmpty)
455455

456-
val dfLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t LIMIT 2")
457-
assert(flussUpsertScan(dfLimit).flatMap(_.limit).contains(2))
456+
val dfLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01' LIMIT 1")
457+
assert(flussUpsertScan(dfLimit).flatMap(_.limit).contains(1))
458+
459+
// Verify limit pushdown actually reduces rows read via metrics
460+
dfLimit.collect()
461+
val batchScanExec = dfLimit.queryExecution.executedPlan.collectFirst {
462+
case b: BatchScanExec => b
463+
}.get
464+
val numRowsRead = batchScanExec.metrics(FlussMetrics.NUM_ROWS_READ).value
465+
assert(numRowsRead == 1L, s"Expected 1 rows read with limit pushdown, got $numRowsRead")
458466
}
459467
}
460468

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package org.apache.fluss.spark.lake
2020
import org.apache.fluss.config.{ConfigOptions, Configuration}
2121
import org.apache.fluss.metadata.DataLakeFormat
2222
import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
23+
import org.apache.fluss.spark.read.FlussMetrics
2324

2425
import org.apache.spark.sql.Row
26+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2527

2628
import java.nio.file.Files
2729

@@ -547,6 +549,14 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase {
547549

548550
val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union_limit LIMIT 2")
549551
assert(flussScan(df).flatMap(_.limit).distinct == Seq(2))
552+
553+
// Verify limit pushdown actually reduces rows read via metrics
554+
df.collect()
555+
val batchScanExec = df.queryExecution.executedPlan.collectFirst {
556+
case b: BatchScanExec => b
557+
}.get
558+
val numRowsRead = batchScanExec.metrics(FlussMetrics.NUM_ROWS_READ).value
559+
assert(numRowsRead == 2L, s"Expected 2 rows read with limit pushdown, got $numRowsRead")
550560
}
551561
}
552562

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package org.apache.fluss.spark.lake
2020
import org.apache.fluss.config.{ConfigOptions, Configuration}
2121
import org.apache.fluss.metadata.DataLakeFormat
2222
import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_NUMBER, PRIMARY_KEY}
23+
import org.apache.fluss.spark.read.FlussMetrics
2324

2425
import org.apache.spark.sql.Row
26+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2527

2628
import java.nio.file.Files
2729

@@ -481,6 +483,14 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTe
481483
val query =
482484
sql(s"SELECT id, score FROM $DEFAULT_DATABASE.t_pk_union_limit LIMIT 2")
483485
assert(flussScan(query).flatMap(_.limit).distinct == Seq(2))
486+
487+
// Verify limit pushdown actually reduces rows read via metrics
488+
query.collect()
489+
val batchScanExec = query.queryExecution.executedPlan.collectFirst {
490+
case b: BatchScanExec => b
491+
}.get
492+
val numRowsRead = batchScanExec.metrics(FlussMetrics.NUM_ROWS_READ).value
493+
assert(numRowsRead == 2L, s"Expected 2 rows read with limit pushdown, got $numRowsRead")
484494
}
485495
}
486496

0 commit comments

Comments
 (0)