diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index d52ef85b..a009c956 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -13,6 +13,8 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.arrow.c.{ArrowArrayStream, Data} import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter} @@ -24,8 +26,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.LanceArrowUtils import org.apache.spark.sql.util.LanceSerializeUtil.{decode, encode} import org.apache.spark.unsafe.types.UTF8String -import org.json4s.JsonAST._ -import org.json4s.jackson.JsonMethods.{compact, render} import org.lance.{CommitBuilder, Dataset, Transaction} import org.lance.index.{Index, IndexOptions, IndexParams, IndexType} import org.lance.index.scalar.{BTreeIndexParams, ScalarIndexParams} @@ -520,6 +520,8 @@ case class RangeBTreeIndexBuilder( */ object IndexUtils { + private val jsonMapper = new ObjectMapper() + /** * Build an [[IndexType]] from the given index method string. * @@ -539,25 +541,25 @@ object IndexUtils { if (args.isEmpty) { "{}" } else { - val fields = args.map { a => - val jv = a.value match { - case null => JNull + val node: ObjectNode = jsonMapper.createObjectNode() + args.foreach { a => + a.value match { + case null => node.putNull(a.name) case s: java.lang.String => val trimmed = s.stripPrefix("\"").stripSuffix("\"").stripPrefix("'").stripSuffix("'") - JString(trimmed) - case b: java.lang.Boolean => JBool(b.booleanValue()) - case c: java.lang.Character => JString(String.valueOf(c)) - case by: java.lang.Byte => JInt(BigInt(by.intValue())) - case sh: java.lang.Short => JInt(BigInt(sh.intValue())) - case i: java.lang.Integer => JInt(BigInt(i.intValue())) - case l: java.lang.Long => JInt(BigInt(l.longValue())) - case f: java.lang.Float => JDouble(f.doubleValue()) - case d: java.lang.Double => JDouble(d.doubleValue()) - case other => JString(String.valueOf(other)) + node.put(a.name, trimmed) + case b: java.lang.Boolean => node.put(a.name, b.booleanValue()) + case c: java.lang.Character => node.put(a.name, String.valueOf(c)) + case by: java.lang.Byte => node.put(a.name, by.intValue()) + case sh: java.lang.Short => node.put(a.name, sh.intValue()) + case i: java.lang.Integer => node.put(a.name, i.intValue()) + case l: java.lang.Long => node.put(a.name, l.longValue()) + case f: java.lang.Float => node.put(a.name, f.doubleValue()) + case d: java.lang.Double => node.put(a.name, d.doubleValue()) + case other => node.put(a.name, String.valueOf(other)) } - JField(a.name, jv) } - compact(render(JObject(fields.toList))) + jsonMapper.writeValueAsString(node) } } diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala index 4c876da3..b45ec8a4 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala @@ -23,13 +23,12 @@ package org.apache.spark.sql.util * It has been modified by the Lance developers to fit the needs of the Lance project. */ +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.types._ -import org.json4s.{DefaultFormats, Formats} -import org.json4s.JsonAST.{JObject, JString} import org.lance.spark.LanceConstant import org.lance.spark.utils.{BlobUtils, Float16Utils, LargeVarCharUtils, VectorUtils} @@ -39,6 +38,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ object LanceArrowUtils { + + private val mapper = new ObjectMapper() + val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY @@ -157,8 +159,8 @@ object LanceArrowUtils { new MetadataBuilder() .putString(ARROW_LARGE_VAR_CHAR_KEY, "true") .build() - case _ => Metadata.fromJObject( - JObject(field.getMetadata.asScala.map { case (k, v) => (k, JString(v)) }.toList)) + case _ => Metadata.fromJson( + mapper.writeValueAsString(field.getMetadata)) } StructField(field.getName, dt, field.isNullable, metadata) }.toArray) @@ -207,10 +209,9 @@ object LanceArrowUtils { large = true } - implicit val formats: Formats = DefaultFormats - meta = metadata.jsonValue.extract[Map[String, Object]].map { case (k, v) => - (k, String.valueOf(v)) - } + meta = mapper + .readValue(metadata.json, classOf[java.util.LinkedHashMap[_, _]]) + .asScala.map { case (k, v) => (k.toString, String.valueOf(v)) }.toMap } dt match {