Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.arrow.c.{ArrowArrayStream, Data}
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
Expand All @@ -24,8 +26,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.LanceArrowUtils
import org.apache.spark.sql.util.LanceSerializeUtil.{decode, encode}
import org.apache.spark.unsafe.types.UTF8String
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods.{compact, render}
import org.lance.{CommitBuilder, Dataset, Transaction}
import org.lance.index.{Index, IndexOptions, IndexParams, IndexType}
import org.lance.index.scalar.{BTreeIndexParams, ScalarIndexParams}
Expand Down Expand Up @@ -520,6 +520,8 @@ case class RangeBTreeIndexBuilder(
*/
object IndexUtils {

private val jsonMapper = new ObjectMapper()

/**
* Build an [[IndexType]] from the given index method string.
*
Expand All @@ -539,25 +541,25 @@ object IndexUtils {
if (args.isEmpty) {
"{}"
} else {
val fields = args.map { a =>
val jv = a.value match {
case null => JNull
val node: ObjectNode = jsonMapper.createObjectNode()
args.foreach { a =>
a.value match {
case null => node.putNull(a.name)
case s: java.lang.String =>
val trimmed = s.stripPrefix("\"").stripSuffix("\"").stripPrefix("'").stripSuffix("'")
JString(trimmed)
case b: java.lang.Boolean => JBool(b.booleanValue())
case c: java.lang.Character => JString(String.valueOf(c))
case by: java.lang.Byte => JInt(BigInt(by.intValue()))
case sh: java.lang.Short => JInt(BigInt(sh.intValue()))
case i: java.lang.Integer => JInt(BigInt(i.intValue()))
case l: java.lang.Long => JInt(BigInt(l.longValue()))
case f: java.lang.Float => JDouble(f.doubleValue())
case d: java.lang.Double => JDouble(d.doubleValue())
case other => JString(String.valueOf(other))
node.put(a.name, trimmed)
case b: java.lang.Boolean => node.put(a.name, b.booleanValue())
case c: java.lang.Character => node.put(a.name, String.valueOf(c))
case by: java.lang.Byte => node.put(a.name, by.intValue())
case sh: java.lang.Short => node.put(a.name, sh.intValue())
case i: java.lang.Integer => node.put(a.name, i.intValue())
case l: java.lang.Long => node.put(a.name, l.longValue())
case f: java.lang.Float => node.put(a.name, f.doubleValue())
case d: java.lang.Double => node.put(a.name, d.doubleValue())
case other => node.put(a.name, String.valueOf(other))
}
JField(a.name, jv)
}
compact(render(JObject(fields.toList)))
jsonMapper.writeValueAsString(node)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ package org.apache.spark.sql.util
* It has been modified by the Lance developers to fit the needs of the Lance project.
*/

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.types._
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonAST.{JObject, JString}
import org.lance.spark.LanceConstant
import org.lance.spark.utils.{BlobUtils, Float16Utils, LargeVarCharUtils, VectorUtils}

Expand All @@ -39,6 +38,9 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._

object LanceArrowUtils {

private val mapper = new ObjectMapper()

val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY
val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY
val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY
Expand Down Expand Up @@ -157,8 +159,8 @@ object LanceArrowUtils {
new MetadataBuilder()
.putString(ARROW_LARGE_VAR_CHAR_KEY, "true")
.build()
case _ => Metadata.fromJObject(
JObject(field.getMetadata.asScala.map { case (k, v) => (k, JString(v)) }.toList))
case _ => Metadata.fromJson(
mapper.writeValueAsString(field.getMetadata))
}
StructField(field.getName, dt, field.isNullable, metadata)
}.toArray)
Expand Down Expand Up @@ -207,10 +209,9 @@ object LanceArrowUtils {
large = true
}

implicit val formats: Formats = DefaultFormats
meta = metadata.jsonValue.extract[Map[String, Object]].map { case (k, v) =>
(k, String.valueOf(v))
}
meta = mapper
.readValue(metadata.json, classOf[java.util.LinkedHashMap[_, _]])
.asScala.map { case (k, v) => (k.toString, String.valueOf(v)) }.toMap
}

dt match {
Expand Down
Loading