@@ -200,78 +200,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
200200 builder.build()
201201 }
202202
203- /**
204- * Legacy JSON serialization function - removed in favor of protobuf. Kept as reference for
205- * conversion logic.
206- */
207- private def partitionValueToJson (fieldTypeStr : String , value : Any ): JValue = {
208- fieldTypeStr match {
209- case t if t.startsWith(" timestamp" ) =>
210- val micros = value match {
211- case l : java.lang.Long => l.longValue()
212- case i : java.lang.Integer => i.longValue()
213- case _ => value.toString.toLong
214- }
215- val instant = java.time.Instant .ofEpochSecond(micros / 1000000 , (micros % 1000000 ) * 1000 )
216- val formatted = java.time.format.DateTimeFormatter
217- .ofPattern(" yyyy-MM-dd'T'HH:mm:ss.SSSSSS" )
218- .withZone(java.time.ZoneOffset .UTC )
219- .format(instant)
220- JString (formatted)
221-
222- case " date" =>
223- val days = value.asInstanceOf [java.lang.Integer ].intValue()
224- val localDate = java.time.LocalDate .ofEpochDay(days.toLong)
225- JString (localDate.toString)
226-
227- case d if d.startsWith(" decimal(" ) =>
228- JString (value.toString)
229-
230- case " string" =>
231- JString (value.toString)
232-
233- case " int" | " long" =>
234- value match {
235- case i : java.lang.Integer => JInt (BigInt (i.intValue()))
236- case l : java.lang.Long => JInt (BigInt (l.longValue()))
237- case _ => JDecimal (BigDecimal (value.toString))
238- }
239-
240- case " float" | " double" =>
241- value match {
242- // NaN/Infinity are not valid JSON numbers - serialize as strings
243- case f : java.lang.Float if f.isNaN || f.isInfinite =>
244- JString (f.toString)
245- case d : java.lang.Double if d.isNaN || d.isInfinite =>
246- JString (d.toString)
247- case f : java.lang.Float => JDouble (f.doubleValue())
248- case d : java.lang.Double => JDouble (d.doubleValue())
249- case _ => JDecimal (BigDecimal (value.toString))
250- }
251-
252- case " boolean" =>
253- value match {
254- case b : java.lang.Boolean => JBool (b.booleanValue())
255- case _ => JBool (value.toString.toBoolean)
256- }
257-
258- case " uuid" =>
259- JString (value.toString)
260-
261- // Fallback: infer JSON type from Java type
262- case _ =>
263- value match {
264- case s : String => JString (s)
265- case i : java.lang.Integer => JInt (BigInt (i.intValue()))
266- case l : java.lang.Long => JInt (BigInt (l.longValue()))
267- case d : java.lang.Double => JDouble (d.doubleValue())
268- case f : java.lang.Float => JDouble (f.doubleValue())
269- case b : java.lang.Boolean => JBool (b.booleanValue())
270- case other => JString (other.toString)
271- }
272- }
273- }
274-
275203 /**
276204 * Helper to extract a literal from an Iceberg expression and build a binary predicate.
277205 */
@@ -852,104 +780,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
852780 throw new RuntimeException (msg)
853781 }
854782
855- // Extract partition values for Hive-style partitioning
856- var partitionJsonOpt : Option [String ] = None
857- try {
858- val partitionMethod = contentFileClass.getMethod(" partition" )
859- val partitionStruct = partitionMethod.invoke(dataFile)
860-
861- if (partitionStruct != null ) {
862- // scalastyle:off classforname
863- val structLikeClass =
864- Class .forName(IcebergReflection .ClassNames .STRUCT_LIKE )
865- // scalastyle:on classforname
866- val sizeMethod = structLikeClass.getMethod(" size" )
867- val getMethod =
868- structLikeClass.getMethod(" get" , classOf [Int ], classOf [Class [_]])
869-
870- val partitionSize =
871- sizeMethod.invoke(partitionStruct).asInstanceOf [Int ]
872-
873- if (partitionSize > 0 ) {
874- // Get the partition spec directly from the task
875- // scalastyle:off classforname
876- val partitionScanTaskClass =
877- Class .forName(IcebergReflection .ClassNames .PARTITION_SCAN_TASK )
878- // scalastyle:on classforname
879- val specMethod = partitionScanTaskClass.getMethod(" spec" )
880- val partitionSpec = specMethod.invoke(task)
881-
882- // Build JSON representation of partition values using json4s
883-
884- val partitionMap = scala.collection.mutable.Map [String , JValue ]()
885-
886- if (partitionSpec != null ) {
887- // Get the list of partition fields from the spec
888- val fieldsMethod = partitionSpec.getClass.getMethod(" fields" )
889- val fields = fieldsMethod
890- .invoke(partitionSpec)
891- .asInstanceOf [java.util.List [_]]
892-
893- for (i <- 0 until partitionSize) {
894- val value =
895- getMethod.invoke(partitionStruct, Int .box(i), classOf [Object ])
896-
897- // Get the partition field and check its transform type
898- val partitionField = fields.get(i)
899-
900- // Only inject partition values for IDENTITY transforms
901- val transformMethod =
902- partitionField.getClass.getMethod(" transform" )
903- val transform = transformMethod.invoke(partitionField)
904- val isIdentity =
905- transform.toString == IcebergReflection .Transforms .IDENTITY
906-
907- if (isIdentity) {
908- // Get the source field ID
909- val sourceIdMethod =
910- partitionField.getClass.getMethod(" sourceId" )
911- val sourceFieldId =
912- sourceIdMethod.invoke(partitionField).asInstanceOf [Int ]
913-
914- val jsonValue = if (value == null ) {
915- JNull
916- } else {
917- // Get field type from schema to serialize correctly
918- val fieldTypeStr =
919- try {
920- val findFieldMethod =
921- metadata.tableSchema.getClass
922- .getMethod(" findField" , classOf [Int ])
923- val field = findFieldMethod.invoke(
924- metadata.tableSchema,
925- sourceFieldId.asInstanceOf [Object ])
926- if (field != null ) {
927- val typeMethod = field.getClass.getMethod(" type" )
928- typeMethod.invoke(field).toString
929- } else {
930- " unknown"
931- }
932- } catch {
933- case _ : Exception => " unknown"
934- }
935-
936- partitionValueToJson(fieldTypeStr, value)
937- }
938- partitionMap(sourceFieldId.toString) = jsonValue
939- }
940- }
941- }
942-
943- val partitionJson = compact(render(JObject (partitionMap.toList)))
944- partitionJsonOpt = Some (partitionJson)
945- }
946- }
947- } catch {
948- case e : Exception =>
949- logWarning(
950- s " Failed to extract partition values from DataFile: ${e.getMessage}" )
951- }
952-
953783 val startMethod = contentScanTaskClass.getMethod(" start" )
954784 val start = startMethod.invoke(task).asInstanceOf [Long ]
955785 taskBuilder.setStart(start)
0 commit comments