Skip to content

Commit 8319470

Browse files
committed
address review comments, remove unused Json code and fix unit test to avoid iceberg optimization
1 parent 2d18f57 commit 8319470

3 files changed

Lines changed: 8 additions & 178 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2652,8 +2652,6 @@ fn convert_spark_types_to_arrow_schema(
26522652

26532653
/// Converts a protobuf PartitionValue to an iceberg Literal.
26542654
///
2655-
/// This replaces JSON parsing with direct protobuf deserialization with a more compact
2656-
/// representation (e.g., timestamps as integers vs strings).
26572655
fn partition_value_to_literal(
26582656
proto_value: &spark_operator::PartitionValue,
26592657
) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
@@ -2696,8 +2694,8 @@ fn partition_value_to_literal(
26962694
// Handle sign extension for negative numbers
26972695
let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
26982696
// Negative number - sign extend
2699-
for i in 0..offset {
2700-
buf[i] = 0xFF;
2697+
for byte in buf.iter_mut().take(offset) {
2698+
*byte = 0xFF;
27012699
}
27022700
i128::from_be_bytes(buf)
27032701
} else {

spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala

Lines changed: 0 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -200,78 +200,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
200200
builder.build()
201201
}
202202

203-
/**
204-
* Legacy JSON serialization function - removed in favor of protobuf. Kept as reference for
205-
* conversion logic.
206-
*/
207-
private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue = {
208-
fieldTypeStr match {
209-
case t if t.startsWith("timestamp") =>
210-
val micros = value match {
211-
case l: java.lang.Long => l.longValue()
212-
case i: java.lang.Integer => i.longValue()
213-
case _ => value.toString.toLong
214-
}
215-
val instant = java.time.Instant.ofEpochSecond(micros / 1000000, (micros % 1000000) * 1000)
216-
val formatted = java.time.format.DateTimeFormatter
217-
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
218-
.withZone(java.time.ZoneOffset.UTC)
219-
.format(instant)
220-
JString(formatted)
221-
222-
case "date" =>
223-
val days = value.asInstanceOf[java.lang.Integer].intValue()
224-
val localDate = java.time.LocalDate.ofEpochDay(days.toLong)
225-
JString(localDate.toString)
226-
227-
case d if d.startsWith("decimal(") =>
228-
JString(value.toString)
229-
230-
case "string" =>
231-
JString(value.toString)
232-
233-
case "int" | "long" =>
234-
value match {
235-
case i: java.lang.Integer => JInt(BigInt(i.intValue()))
236-
case l: java.lang.Long => JInt(BigInt(l.longValue()))
237-
case _ => JDecimal(BigDecimal(value.toString))
238-
}
239-
240-
case "float" | "double" =>
241-
value match {
242-
// NaN/Infinity are not valid JSON numbers - serialize as strings
243-
case f: java.lang.Float if f.isNaN || f.isInfinite =>
244-
JString(f.toString)
245-
case d: java.lang.Double if d.isNaN || d.isInfinite =>
246-
JString(d.toString)
247-
case f: java.lang.Float => JDouble(f.doubleValue())
248-
case d: java.lang.Double => JDouble(d.doubleValue())
249-
case _ => JDecimal(BigDecimal(value.toString))
250-
}
251-
252-
case "boolean" =>
253-
value match {
254-
case b: java.lang.Boolean => JBool(b.booleanValue())
255-
case _ => JBool(value.toString.toBoolean)
256-
}
257-
258-
case "uuid" =>
259-
JString(value.toString)
260-
261-
// Fallback: infer JSON type from Java type
262-
case _ =>
263-
value match {
264-
case s: String => JString(s)
265-
case i: java.lang.Integer => JInt(BigInt(i.intValue()))
266-
case l: java.lang.Long => JInt(BigInt(l.longValue()))
267-
case d: java.lang.Double => JDouble(d.doubleValue())
268-
case f: java.lang.Float => JDouble(f.doubleValue())
269-
case b: java.lang.Boolean => JBool(b.booleanValue())
270-
case other => JString(other.toString)
271-
}
272-
}
273-
}
274-
275203
/**
276204
* Helper to extract a literal from an Iceberg expression and build a binary predicate.
277205
*/
@@ -852,104 +780,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
852780
throw new RuntimeException(msg)
853781
}
854782

855-
// Extract partition values for Hive-style partitioning
856-
var partitionJsonOpt: Option[String] = None
857-
try {
858-
val partitionMethod = contentFileClass.getMethod("partition")
859-
val partitionStruct = partitionMethod.invoke(dataFile)
860-
861-
if (partitionStruct != null) {
862-
// scalastyle:off classforname
863-
val structLikeClass =
864-
Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE)
865-
// scalastyle:on classforname
866-
val sizeMethod = structLikeClass.getMethod("size")
867-
val getMethod =
868-
structLikeClass.getMethod("get", classOf[Int], classOf[Class[_]])
869-
870-
val partitionSize =
871-
sizeMethod.invoke(partitionStruct).asInstanceOf[Int]
872-
873-
if (partitionSize > 0) {
874-
// Get the partition spec directly from the task
875-
// scalastyle:off classforname
876-
val partitionScanTaskClass =
877-
Class.forName(IcebergReflection.ClassNames.PARTITION_SCAN_TASK)
878-
// scalastyle:on classforname
879-
val specMethod = partitionScanTaskClass.getMethod("spec")
880-
val partitionSpec = specMethod.invoke(task)
881-
882-
// Build JSON representation of partition values using json4s
883-
884-
val partitionMap = scala.collection.mutable.Map[String, JValue]()
885-
886-
if (partitionSpec != null) {
887-
// Get the list of partition fields from the spec
888-
val fieldsMethod = partitionSpec.getClass.getMethod("fields")
889-
val fields = fieldsMethod
890-
.invoke(partitionSpec)
891-
.asInstanceOf[java.util.List[_]]
892-
893-
for (i <- 0 until partitionSize) {
894-
val value =
895-
getMethod.invoke(partitionStruct, Int.box(i), classOf[Object])
896-
897-
// Get the partition field and check its transform type
898-
val partitionField = fields.get(i)
899-
900-
// Only inject partition values for IDENTITY transforms
901-
val transformMethod =
902-
partitionField.getClass.getMethod("transform")
903-
val transform = transformMethod.invoke(partitionField)
904-
val isIdentity =
905-
transform.toString == IcebergReflection.Transforms.IDENTITY
906-
907-
if (isIdentity) {
908-
// Get the source field ID
909-
val sourceIdMethod =
910-
partitionField.getClass.getMethod("sourceId")
911-
val sourceFieldId =
912-
sourceIdMethod.invoke(partitionField).asInstanceOf[Int]
913-
914-
val jsonValue = if (value == null) {
915-
JNull
916-
} else {
917-
// Get field type from schema to serialize correctly
918-
val fieldTypeStr =
919-
try {
920-
val findFieldMethod =
921-
metadata.tableSchema.getClass
922-
.getMethod("findField", classOf[Int])
923-
val field = findFieldMethod.invoke(
924-
metadata.tableSchema,
925-
sourceFieldId.asInstanceOf[Object])
926-
if (field != null) {
927-
val typeMethod = field.getClass.getMethod("type")
928-
typeMethod.invoke(field).toString
929-
} else {
930-
"unknown"
931-
}
932-
} catch {
933-
case _: Exception => "unknown"
934-
}
935-
936-
partitionValueToJson(fieldTypeStr, value)
937-
}
938-
partitionMap(sourceFieldId.toString) = jsonValue
939-
}
940-
}
941-
}
942-
943-
val partitionJson = compact(render(JObject(partitionMap.toList)))
944-
partitionJsonOpt = Some(partitionJson)
945-
}
946-
}
947-
} catch {
948-
case e: Exception =>
949-
logWarning(
950-
s"Failed to extract partition values from DataFile: ${e.getMessage}")
951-
}
952-
953783
val startMethod = contentScanTaskClass.getMethod("start")
954784
val start = startMethod.invoke(task).asInstanceOf[Long]
955785
taskBuilder.setStart(start)

spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
166166
test("large scale partitioned table - 100 partitions with many files") {
167167
assume(icebergAvailable, "Iceberg not available in classpath")
168168

169-
withSQLConf("spark.sql.files.maxRecordsPerFile" -> "50") {
169+
withSQLConf(
170+
"spark.sql.files.maxRecordsPerFile" -> "50",
171+
"spark.sql.adaptive.enabled" -> "false") {
170172
spark.sql("""
171173
CREATE TABLE s3_catalog.db.large_partitioned_test (
172174
id INT,
@@ -190,11 +192,11 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
190192
checkIcebergNativeScan(
191193
"SELECT * FROM s3_catalog.db.large_partitioned_test WHERE id < 10 ORDER BY id")
192194
checkIcebergNativeScan(
193-
"SELECT COUNT(*) FROM s3_catalog.db.large_partitioned_test WHERE partition_id = 0")
195+
"SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE partition_id = 0")
194196
checkIcebergNativeScan(
195-
"SELECT COUNT(*) FROM s3_catalog.db.large_partitioned_test WHERE partition_id IN (0, 50, 99)")
197+
"SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE partition_id IN (0, 50, 99)")
196198

197-
spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test")
199+
spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test PURGE")
198200
}
199201
}
200202

0 commit comments

Comments
 (0)