Skip to content

Commit b379de7

Browse files
committed
address comments
1 parent d373e0d commit b379de7

6 files changed

Lines changed: 89 additions & 23 deletions

File tree

fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
import org.apache.fluss.types.DecimalType;
5252
import org.apache.fluss.types.RowType;
5353

54-
import java.time.LocalDate;
55-
import java.time.LocalTime;
5654
import java.util.ArrayList;
5755
import java.util.HashMap;
5856
import java.util.List;
@@ -145,9 +143,9 @@ private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType fieldTyp
145143
decimalType.getScale());
146144
}
147145
case DATE:
148-
return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
146+
return (int) pbLiteral.getBigintValue();
149147
case TIME_WITHOUT_TIME_ZONE:
150-
return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 1_000_000L);
148+
return pbLiteral.getIntValue();
151149
case TIMESTAMP_WITHOUT_TIME_ZONE:
152150
return TimestampNtz.fromMillis(
153151
pbLiteral.getTimestampMillisValue(),
@@ -282,10 +280,10 @@ private static PbLiteralValue toPbLiteralValue(DataType type, Object literal) {
282280
pbLiteral.setIntValue((Integer) literal);
283281
break;
284282
case DATE:
285-
pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
283+
pbLiteral.setBigintValue(((Integer) literal).longValue());
286284
break;
287285
case TIME_WITHOUT_TIME_ZONE:
288-
pbLiteral.setIntValue((int) (((LocalTime) literal).toNanoOfDay() / 1_000_000L));
286+
pbLiteral.setIntValue((Integer) literal);
289287
break;
290288
case TIMESTAMP_WITHOUT_TIME_ZONE:
291289
pbLiteral.setTimestampMillisValue(((TimestampNtz) literal).getMillisecond());

fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656

5757
import java.lang.reflect.Field;
5858
import java.math.BigDecimal;
59-
import java.time.LocalDate;
6059
import java.time.LocalTime;
6160
import java.util.Arrays;
6261
import java.util.Collections;
@@ -94,6 +93,20 @@ private static RowType buildRowType(LeafPredicate... predicates) {
9493
return buildRowType(Arrays.asList(predicates));
9594
}
9695

96+
@Test
97+
public void testLeafPredicateTimeIntegerLiteral() {
98+
int timeMillis = 45_000_000;
99+
DataType type = new TimeType(false, 3);
100+
LeafPredicate predicate =
101+
new LeafPredicate(
102+
Equal.INSTANCE, type, 0, "f_time", Collections.singletonList(timeMillis));
103+
RowType rowType = buildRowType(predicate);
104+
PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, rowType);
105+
Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
106+
LeafPredicate lp = (LeafPredicate) result;
107+
assertThat(lp.literals().get(0)).isEqualTo(timeMillis);
108+
}
109+
97110
@Test
98111
public void testLeafPredicateIntEqual() {
99112
DataType type = new IntType(false);
@@ -317,22 +330,17 @@ public void testPbLiteralSerde() {
317330
DataType dateType = new DateType(false);
318331
LeafPredicate datePredicate =
319332
new LeafPredicate(
320-
Equal.INSTANCE,
321-
dateType,
322-
8,
323-
"f_date",
324-
Collections.singletonList(
325-
LocalDate.ofEpochDay(19000L))); // days since epoch
333+
Equal.INSTANCE, dateType, 8, "f_date", Collections.singletonList(19000));
326334
// time
327335
DataType timeType = new TimeType(false, 3);
328-
LocalTime time = LocalTime.of(12, 30);
336+
int timeMillis = (int) (LocalTime.of(12, 30).toNanoOfDay() / 1_000_000L);
329337
LeafPredicate timePredicate =
330338
new LeafPredicate(
331339
Equal.INSTANCE,
332340
timeType,
333341
9,
334342
"f_time",
335-
Collections.singletonList(time)); // millis of day
343+
Collections.singletonList(timeMillis));
336344
// timestamp
337345
DataType tsType = new TimestampType(false, 3);
338346
TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.spark.row
1919

2020
import org.apache.fluss.row.{InternalRow => FlussInternalRow}
21-
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
21+
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, BytesType => FlussBytesType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
2222
import org.apache.fluss.utils.InternalRowUtils
2323

2424
import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
@@ -84,9 +84,9 @@ class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow {
8484
DataConverter.toSparkUTF8String(row.getString(ordinal))
8585
}
8686

87-
override def getBinary(ordinal: Int): Array[Byte] = {
88-
val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType]
89-
row.getBinary(ordinal, binaryType.getLength)
87+
override def getBinary(ordinal: Int): Array[Byte] = rowType.getTypeAt(ordinal) match {
88+
case b: FlussBinaryType => row.getBinary(ordinal, b.getLength)
89+
case _: FlussBytesType => row.getBytes(ordinal)
9090
}
9191

9292
override def getInterval(ordinal: Int): CalendarInterval =

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.sql.types.{Decimal => SparkDecimal}
2727
import org.apache.spark.unsafe.types.UTF8String
2828

2929
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
30-
import java.time.LocalDate
3130

3231
import scala.jdk.CollectionConverters._
3332

@@ -277,9 +276,8 @@ object SparkPredicateConverter {
277276
}
278277

279278
case DataTypeRoot.DATE =>
280-
// RPC serialization (PredicateMessageUtils) expects LocalDate.
281279
value match {
282-
case d: Integer => LocalDate.ofEpochDay(d.longValue())
280+
case d: Integer => d
283281
case _ => throw new UnsupportedExpression
284282
}
285283

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,68 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
511511
}
512512
}
513513

514+
test("Spark Read: partition pushdown — BIGINT partition with range predicate") {
515+
withTable("t") {
516+
sql(s"""
517+
|CREATE TABLE $DEFAULT_DATABASE.t (id INT, pt BIGINT)
518+
|PARTITIONED BY (pt)""".stripMargin)
519+
sql(s"INSERT INTO $DEFAULT_DATABASE.t VALUES (1, 2L), (2, 10L), (3, 9999999999L)")
520+
521+
val query = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE pt > 2 ORDER BY id")
522+
checkAnswer(query, Row(2, 10L) :: Row(3, 9999999999L) :: Nil)
523+
assert(partitionPredicate(query).isDefined)
524+
}
525+
}
526+
527+
test("Spark Read: partition pushdown — DATE partition with equality predicate") {
528+
withTable("t") {
529+
sql(s"""
530+
|CREATE TABLE $DEFAULT_DATABASE.t (id INT, pt DATE)
531+
|PARTITIONED BY (pt)""".stripMargin)
532+
sql(s"""INSERT INTO $DEFAULT_DATABASE.t VALUES
533+
|(1, DATE '2026-01-01'), (2, DATE '2026-01-02'), (3, DATE '2026-01-10')
534+
|""".stripMargin)
535+
536+
val query = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE pt = DATE '2026-01-02' ORDER BY id")
537+
checkAnswer(query, Row(2, java.sql.Date.valueOf("2026-01-02")) :: Nil)
538+
assert(partitionPredicate(query).isDefined)
539+
}
540+
}
541+
542+
test("Spark Read: partition pushdown — all supported types") {
543+
withTable("t") {
544+
sql(s"""
545+
|CREATE TABLE $DEFAULT_DATABASE.t (
546+
| id INT,
547+
| p_bool BOOLEAN, p_int INT, p_bigint BIGINT, p_float FLOAT, p_double DOUBLE,
548+
| p_string STRING, p_binary BINARY,
549+
| p_date DATE, p_ts TIMESTAMP, p_ts_ntz TIMESTAMP_NTZ)
550+
|PARTITIONED BY (
551+
| p_bool, p_int, p_bigint, p_float, p_double,
552+
| p_string, p_binary, p_date, p_ts, p_ts_ntz)""".stripMargin)
553+
sql(s"""INSERT INTO $DEFAULT_DATABASE.t VALUES
554+
|(1, false, 10, 99999L, CAST(12.5 AS FLOAT), 7.88, 'hello',
555+
| CAST('Hi' AS BINARY), DATE '2026-01-01',
556+
| TIMESTAMP '2026-01-01 12:00:00', TIMESTAMP_NTZ '2026-01-01 12:00:00'),
557+
|(2, true, 11, 99998L, CAST(13.5 AS FLOAT), 8.88, 'world',
558+
| CAST('Bye' AS BINARY), DATE '2026-01-02',
559+
| TIMESTAMP '2026-01-02 12:00:00', TIMESTAMP_NTZ '2026-01-02 12:00:00')
560+
|""".stripMargin)
561+
562+
val query = sql(s"""
563+
|SELECT id FROM $DEFAULT_DATABASE.t WHERE
564+
| p_bool = false AND p_int = 10 AND p_bigint = 99999L
565+
| AND p_float = CAST(12.5 AS FLOAT) AND p_double = 7.88
566+
| AND p_string = 'hello' AND p_binary = CAST('Hi' AS BINARY)
567+
| AND p_date = DATE '2026-01-01'
568+
| AND p_ts = TIMESTAMP '2026-01-01 12:00:00'
569+
| AND p_ts_ntz = TIMESTAMP_NTZ '2026-01-01 12:00:00'
570+
|""".stripMargin)
571+
checkAnswer(query, Row(1) :: Nil)
572+
assert(partitionPredicate(query).isDefined)
573+
}
574+
}
575+
514576
test("Spark Read: scan description surfaces partition filter when pushed") {
515577
withPartitionedTable {
516578
val withPart = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01'")

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class SparkPredicateConverterTest extends AnyFunSuite {
195195
test("Date literal from epoch days") {
196196
val days = Integer.valueOf(LocalDate.of(2025, 1, 15).toEpochDay.toInt)
197197
val predicate = convert(pred("=", ref("dt"), lit(days, DateType)))
198-
val expected = new PredicateBuilder(rowType).equal(6, LocalDate.of(2025, 1, 15))
198+
val expected = new PredicateBuilder(rowType).equal(6, days)
199199
assertThat(predicate).isEqualTo(expected)
200200
}
201201

0 commit comments

Comments
 (0)