Skip to content

Commit b646912

Browse files
committed
address comments
1 parent d373e0d commit b646912

6 files changed

Lines changed: 61 additions & 23 deletions

File tree

fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
import org.apache.fluss.types.DecimalType;
5252
import org.apache.fluss.types.RowType;
5353

54-
import java.time.LocalDate;
55-
import java.time.LocalTime;
5654
import java.util.ArrayList;
5755
import java.util.HashMap;
5856
import java.util.List;
@@ -145,9 +143,9 @@ private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType fieldTyp
145143
decimalType.getScale());
146144
}
147145
case DATE:
148-
return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
146+
return (int) pbLiteral.getBigintValue();
149147
case TIME_WITHOUT_TIME_ZONE:
150-
return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 1_000_000L);
148+
return pbLiteral.getIntValue();
151149
case TIMESTAMP_WITHOUT_TIME_ZONE:
152150
return TimestampNtz.fromMillis(
153151
pbLiteral.getTimestampMillisValue(),
@@ -282,10 +280,10 @@ private static PbLiteralValue toPbLiteralValue(DataType type, Object literal) {
282280
pbLiteral.setIntValue((Integer) literal);
283281
break;
284282
case DATE:
285-
pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
283+
pbLiteral.setBigintValue(((Integer) literal).longValue());
286284
break;
287285
case TIME_WITHOUT_TIME_ZONE:
288-
pbLiteral.setIntValue((int) (((LocalTime) literal).toNanoOfDay() / 1_000_000L));
286+
pbLiteral.setIntValue((Integer) literal);
289287
break;
290288
case TIMESTAMP_WITHOUT_TIME_ZONE:
291289
pbLiteral.setTimestampMillisValue(((TimestampNtz) literal).getMillisecond());

fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656

5757
import java.lang.reflect.Field;
5858
import java.math.BigDecimal;
59-
import java.time.LocalDate;
6059
import java.time.LocalTime;
6160
import java.util.Arrays;
6261
import java.util.Collections;
@@ -94,6 +93,20 @@ private static RowType buildRowType(LeafPredicate... predicates) {
9493
return buildRowType(Arrays.asList(predicates));
9594
}
9695

96+
@Test
97+
public void testLeafPredicateTimeIntegerLiteral() {
98+
int timeMillis = 45_000_000;
99+
DataType type = new TimeType(false, 3);
100+
LeafPredicate predicate =
101+
new LeafPredicate(
102+
Equal.INSTANCE, type, 0, "f_time", Collections.singletonList(timeMillis));
103+
RowType rowType = buildRowType(predicate);
104+
PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, rowType);
105+
Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
106+
LeafPredicate lp = (LeafPredicate) result;
107+
assertThat(lp.literals().get(0)).isEqualTo(timeMillis);
108+
}
109+
97110
@Test
98111
public void testLeafPredicateIntEqual() {
99112
DataType type = new IntType(false);
@@ -317,22 +330,17 @@ public void testPbLiteralSerde() {
317330
DataType dateType = new DateType(false);
318331
LeafPredicate datePredicate =
319332
new LeafPredicate(
320-
Equal.INSTANCE,
321-
dateType,
322-
8,
323-
"f_date",
324-
Collections.singletonList(
325-
LocalDate.ofEpochDay(19000L))); // days since epoch
333+
Equal.INSTANCE, dateType, 8, "f_date", Collections.singletonList(19000));
326334
// time
327335
DataType timeType = new TimeType(false, 3);
328-
LocalTime time = LocalTime.of(12, 30);
336+
int timeMillis = (int) (LocalTime.of(12, 30).toNanoOfDay() / 1_000_000L);
329337
LeafPredicate timePredicate =
330338
new LeafPredicate(
331339
Equal.INSTANCE,
332340
timeType,
333341
9,
334342
"f_time",
335-
Collections.singletonList(time)); // millis of day
343+
Collections.singletonList(timeMillis));
336344
// timestamp
337345
DataType tsType = new TimestampType(false, 3);
338346
TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.spark.row
1919

2020
import org.apache.fluss.row.{InternalRow => FlussInternalRow}
21-
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
21+
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, BytesType => FlussBytesType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
2222
import org.apache.fluss.utils.InternalRowUtils
2323

2424
import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
@@ -84,9 +84,9 @@ class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow {
8484
DataConverter.toSparkUTF8String(row.getString(ordinal))
8585
}
8686

87-
override def getBinary(ordinal: Int): Array[Byte] = {
88-
val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType]
89-
row.getBinary(ordinal, binaryType.getLength)
87+
override def getBinary(ordinal: Int): Array[Byte] = rowType.getTypeAt(ordinal) match {
88+
case b: FlussBinaryType => row.getBinary(ordinal, b.getLength)
89+
case _: FlussBytesType => row.getBytes(ordinal)
9090
}
9191

9292
override def getInterval(ordinal: Int): CalendarInterval =

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.sql.types.{Decimal => SparkDecimal}
2727
import org.apache.spark.unsafe.types.UTF8String
2828

2929
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
30-
import java.time.LocalDate
3130

3231
import scala.jdk.CollectionConverters._
3332

@@ -277,9 +276,8 @@ object SparkPredicateConverter {
277276
}
278277

279278
case DataTypeRoot.DATE =>
280-
// RPC serialization (PredicateMessageUtils) expects LocalDate.
281279
value match {
282-
case d: Integer => LocalDate.ofEpochDay(d.longValue())
280+
case d: Integer => d
283281
case _ => throw new UnsupportedExpression
284282
}
285283

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,40 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
511511
}
512512
}
513513

514+
test("Spark Read: partition pushdown — all supported types") {
515+
withTable("t") {
516+
sql(s"""
517+
|CREATE TABLE $DEFAULT_DATABASE.t (
518+
| id INT,
519+
| p_bool BOOLEAN, p_int INT, p_bigint BIGINT, p_float FLOAT, p_double DOUBLE,
520+
| p_string STRING, p_binary BINARY,
521+
| p_date DATE, p_ts TIMESTAMP, p_ts_ntz TIMESTAMP_NTZ)
522+
|PARTITIONED BY (
523+
| p_bool, p_int, p_bigint, p_float, p_double,
524+
| p_string, p_binary, p_date, p_ts, p_ts_ntz)""".stripMargin)
525+
sql(s"""INSERT INTO $DEFAULT_DATABASE.t VALUES
526+
|(1, false, 10, 99999L, CAST(12.5 AS FLOAT), 7.88, 'hello',
527+
| CAST('Hi' AS BINARY), DATE '2026-01-01',
528+
| TIMESTAMP '2026-01-01 12:00:00', TIMESTAMP_NTZ '2026-01-01 12:00:00'),
529+
|(2, true, 11, 99998L, CAST(13.5 AS FLOAT), 8.88, 'world',
530+
| CAST('Bye' AS BINARY), DATE '2026-01-02',
531+
| TIMESTAMP '2026-01-02 12:00:00', TIMESTAMP_NTZ '2026-01-02 12:00:00')
532+
|""".stripMargin)
533+
534+
val query = sql(s"""
535+
|SELECT id FROM $DEFAULT_DATABASE.t WHERE
536+
| p_bool = false AND p_int = 10 AND p_bigint = 99999L
537+
| AND p_float = CAST(12.5 AS FLOAT) AND p_double = 7.88
538+
| AND p_string = 'hello' AND p_binary = CAST('Hi' AS BINARY)
539+
| AND p_date = DATE '2026-01-01'
540+
| AND p_ts = TIMESTAMP '2026-01-01 12:00:00'
541+
| AND p_ts_ntz = TIMESTAMP_NTZ '2026-01-01 12:00:00'
542+
|""".stripMargin)
543+
checkAnswer(query, Row(1) :: Nil)
544+
assert(partitionPredicate(query).isDefined)
545+
}
546+
}
547+
514548
test("Spark Read: scan description surfaces partition filter when pushed") {
515549
withPartitionedTable {
516550
val withPart = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01'")

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class SparkPredicateConverterTest extends AnyFunSuite {
195195
test("Date literal from epoch days") {
196196
val days = Integer.valueOf(LocalDate.of(2025, 1, 15).toEpochDay.toInt)
197197
val predicate = convert(pred("=", ref("dt"), lit(days, DateType)))
198-
val expected = new PredicateBuilder(rowType).equal(6, LocalDate.of(2025, 1, 15))
198+
val expected = new PredicateBuilder(rowType).equal(6, days)
199199
assertThat(predicate).isEqualTo(expected)
200200
}
201201

0 commit comments

Comments
 (0)