Skip to content

Commit e28edaf

Browse files
j1wonparkpan3793
authored andcommitted
[SPARK-57627][CONNECT] Support primary key, foreign key, and SQL type metadata in DatabaseMetaData
### What changes were proposed in this pull request? This PR implements five `DatabaseMetaData` methods in the Spark Connect JDBC driver (`SparkConnectDatabaseMetaData`) that previously threw `SQLFeatureNotSupportedException`: - `getPrimaryKeys` — returns an empty `ResultSet` with the JDBC-defined schema. Spark Connect does not expose primary keys over JDBC, so "no primary keys" is represented as an empty result rather than an error. - `getImportedKeys` / `getExportedKeys` / `getCrossReference` — return an empty `ResultSet` with the JDBC foreign-key schema, for the same reason. All three share a private `emptyForeignKeys` helper. - `getTypeInfo` — returns a static catalog of the Spark SQL atomic types (12 rows), ordered by `DATA_TYPE`, mirroring the type-code/precision mapping already used by `JdbcTypeUtils`. `TIMESTAMP_NTZ` is omitted because it maps to the same JDBC type code (`Types.TIMESTAMP`) as `TIMESTAMP`; `TIME` is omitted for now because its maximum `PRECISION`/scale representation in `getTypeInfo` is not yet settled. The result-set schemas (column names, order, and types) match the canonical definitions already used by Spark's Thrift server operations (`GetPrimaryKeysOperation`, `GetCrossReferenceOperation`, `GetTypeInfoOperation`), with one intentional correction: the `KEY_SEQ` column uses the JDBC-spec name `KEY_SEQ` rather than the `KEQ_SEQ` typo inherited from Hive in the Thrift operations. `getFunctions` is intentionally left throwing and is out of scope for this PR. ### Why are the changes needed? Returning an empty `ResultSet` (rather than throwing) for metadata that a driver does not support is the conventional JDBC behavior, and it is what other engines in this ecosystem do: Trino returns an empty result set for `getPrimaryKeys`/`getImportedKeys`, and Hive does so for `getImportedKeys`. Throwing `SQLFeatureNotSupportedException` breaks otherwise-recoverable client introspection — for example, BI tools that probe primary/foreign keys to infer table relationships abort the metadata step instead of degrading to "no keys." `getTypeInfo` is the one method here for which Spark can return real data: its atomic types are statically known. Hive, Trino, and the Databricks JDBC driver all implement `getTypeInfo`; Spark Connect was the outlier in throwing. ### Does this PR introduce _any_ user-facing change? Yes. Previously these five methods threw `SQLFeatureNotSupportedException`. After this change: - `getPrimaryKeys`, `getImportedKeys`, `getExportedKeys`, and `getCrossReference` return an empty `ResultSet` with the JDBC-defined columns. - `getTypeInfo` returns the catalog of Spark SQL atomic types. This is a change within the unreleased branch only; the Spark Connect JDBC driver has not been released. ### How was this patch tested? Added in-process tests to `SparkConnectDatabaseMetaDataSuite`: - `getPrimaryKeys` and `getImportedKeys`/`getExportedKeys`/`getCrossReference` assert the result-set column schema and that the result is empty. - `getTypeInfo` asserts the column schema, the rows ordered by `DATA_TYPE`, that every type is nullable and searchable, that only `STRING` is case-sensitive, the per-type `LITERAL_PREFIX`/`LITERAL_SUFFIX` (including the `X'...'` hex syntax for `BINARY`), the `NUM_PREC_RADIX` (10 for numeric types, NULL otherwise), and that `DECIMAL` carries the expected precision and scale. ``` build/sbt 'connect-client-jdbc/testOnly *SparkConnectDatabaseMetaDataSuite' ``` All 10 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) Closes #56688 from j1wonpark/SPARK-57627. Authored-by: Jiwon Park <jpark92@outlook.kr> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent c431311 commit e28edaf

2 files changed

Lines changed: 303 additions & 10 deletions

File tree

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala

Lines changed: 157 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -598,26 +598,94 @@ class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends Databas
598598
catalog: String, schema: String, table: String): ResultSet =
599599
throw new SQLFeatureNotSupportedException
600600

601-
override def getPrimaryKeys(catalog: String, schema: String, table: String): ResultSet =
602-
throw new SQLFeatureNotSupportedException
601+
// Spark supports informational PRIMARY KEY constraints on DSv2 tables (SPARK-51207), but the
602+
// Spark Connect JDBC client cannot retrieve them in a structured way yet, so return an empty
603+
// result set instead of throwing. JDBC clients call this during schema introspection.
604+
override def getPrimaryKeys(catalog: String, schema: String, table: String): ResultSet = {
605+
conn.checkOpen()
603606

604-
override def getImportedKeys(catalog: String, schema: String, table: String): ResultSet =
605-
throw new SQLFeatureNotSupportedException
607+
val df = conn.spark.emptyDataFrame
608+
.withColumn("TABLE_CAT", lit(""))
609+
.withColumn("TABLE_SCHEM", lit(""))
610+
.withColumn("TABLE_NAME", lit(""))
611+
.withColumn("COLUMN_NAME", lit(""))
612+
.withColumn("KEY_SEQ", lit(0.toShort))
613+
.withColumn("PK_NAME", lit(""))
614+
new SparkConnectResultSet(df.collectResult())
615+
}
606616

607-
override def getExportedKeys(catalog: String, schema: String, table: String): ResultSet =
608-
throw new SQLFeatureNotSupportedException
617+
// getImportedKeys, getExportedKeys and getCrossReference share the JDBC foreign-key result
618+
// schema. Spark supports informational FOREIGN KEY constraints on DSv2 tables (SPARK-51207),
619+
// but the Spark Connect JDBC client cannot retrieve them in a structured way yet, so they all
620+
// return an empty result set instead of throwing.
621+
private def emptyForeignKeys: ResultSet = {
622+
val df = conn.spark.emptyDataFrame
623+
.withColumn("PKTABLE_CAT", lit(""))
624+
.withColumn("PKTABLE_SCHEM", lit(""))
625+
.withColumn("PKTABLE_NAME", lit(""))
626+
.withColumn("PKCOLUMN_NAME", lit(""))
627+
.withColumn("FKTABLE_CAT", lit(""))
628+
.withColumn("FKTABLE_SCHEM", lit(""))
629+
.withColumn("FKTABLE_NAME", lit(""))
630+
.withColumn("FKCOLUMN_NAME", lit(""))
631+
.withColumn("KEY_SEQ", lit(0.toShort))
632+
.withColumn("UPDATE_RULE", lit(0.toShort))
633+
.withColumn("DELETE_RULE", lit(0.toShort))
634+
.withColumn("FK_NAME", lit(""))
635+
.withColumn("PK_NAME", lit(""))
636+
.withColumn("DEFERRABILITY", lit(0.toShort))
637+
new SparkConnectResultSet(df.collectResult())
638+
}
639+
640+
override def getImportedKeys(catalog: String, schema: String, table: String): ResultSet = {
641+
conn.checkOpen()
642+
emptyForeignKeys
643+
}
644+
645+
override def getExportedKeys(catalog: String, schema: String, table: String): ResultSet = {
646+
conn.checkOpen()
647+
emptyForeignKeys
648+
}
609649

610650
override def getCrossReference(
611651
parentCatalog: String,
612652
parentSchema: String,
613653
parentTable: String,
614654
foreignCatalog: String,
615655
foreignSchema: String,
616-
foreignTable: String): ResultSet =
617-
throw new SQLFeatureNotSupportedException
656+
foreignTable: String): ResultSet = {
657+
conn.checkOpen()
658+
emptyForeignKeys
659+
}
618660

619-
override def getTypeInfo: ResultSet =
620-
throw new SQLFeatureNotSupportedException
661+
// Static catalog of the Spark SQL atomic types, so JDBC clients can resolve type
662+
// information instead of failing on a thrown exception.
663+
override def getTypeInfo: ResultSet = {
664+
conn.checkOpen()
665+
666+
val df = TYPE_INFO
667+
.toDF(
668+
"TYPE_NAME",
669+
"DATA_TYPE",
670+
"PRECISION",
671+
"LITERAL_PREFIX",
672+
"LITERAL_SUFFIX",
673+
"CREATE_PARAMS",
674+
"NULLABLE",
675+
"CASE_SENSITIVE",
676+
"SEARCHABLE",
677+
"UNSIGNED_ATTRIBUTE",
678+
"FIXED_PREC_SCALE",
679+
"AUTO_INCREMENT",
680+
"LOCAL_TYPE_NAME",
681+
"MINIMUM_SCALE",
682+
"MAXIMUM_SCALE",
683+
"SQL_DATA_TYPE",
684+
"SQL_DATETIME_SUB",
685+
"NUM_PREC_RADIX")
686+
.orderBy("DATA_TYPE")
687+
new SparkConnectResultSet(df.collectResult())
688+
}
621689

622690
override def getIndexInfo(
623691
catalog: String,
@@ -816,4 +884,83 @@ object SparkConnectDatabaseMetaData {
816884
)
817885

818886
private[jdbc] val TABLE_TYPES = Seq("TABLE", "VIEW")
887+
888+
// One row of the java.sql.DatabaseMetaData.getTypeInfo result.
889+
private[jdbc] type TypeInfoRow =
890+
(
891+
String, // TYPE_NAME
892+
Int, // DATA_TYPE
893+
Int, // PRECISION
894+
String, // LITERAL_PREFIX
895+
String, // LITERAL_SUFFIX
896+
String, // CREATE_PARAMS
897+
Short, // NULLABLE
898+
Boolean, // CASE_SENSITIVE
899+
Short, // SEARCHABLE
900+
Boolean, // UNSIGNED_ATTRIBUTE
901+
Boolean, // FIXED_PREC_SCALE
902+
Boolean, // AUTO_INCREMENT
903+
String, // LOCAL_TYPE_NAME
904+
Short, // MINIMUM_SCALE
905+
Short, // MAXIMUM_SCALE
906+
Int, // SQL_DATA_TYPE
907+
Int, // SQL_DATETIME_SUB
908+
Integer // NUM_PREC_RADIX (null for non-numeric types)
909+
)
910+
911+
// Fills the columns that are constant across all Spark atomic types: every type is
912+
// nullable and searchable, and none are unsigned, fixed-prec-scale, or auto-increment.
913+
// `literalPrefix` is also used as the literal suffix unless `literalSuffix` is given;
914+
// BINARY is the exception, whose literals use the hex syntax X'...'.
915+
private[jdbc] def typeRow(
916+
typeName: String,
917+
dataType: Int,
918+
precision: Int,
919+
literalPrefix: String,
920+
createParams: String,
921+
caseSensitive: Boolean,
922+
minScale: Short,
923+
maxScale: Short,
924+
numPrecRadix: Integer,
925+
literalSuffix: String = null): TypeInfoRow =
926+
(
927+
typeName,
928+
dataType,
929+
precision,
930+
literalPrefix,
931+
if (literalSuffix != null) literalSuffix else literalPrefix,
932+
createParams,
933+
typeNullable.toShort,
934+
caseSensitive,
935+
typeSearchable.toShort,
936+
false,
937+
false,
938+
false,
939+
null,
940+
minScale,
941+
maxScale,
942+
0,
943+
0,
944+
numPrecRadix)
945+
946+
// Static JDBC type metadata for the Spark SQL atomic types, mirroring the
947+
// JdbcTypeUtils type-code/precision mapping. Only STRING is case-sensitive.
948+
// TIMESTAMP_NTZ is omitted because it maps to the same JDBC type code
949+
// (Types.TIMESTAMP) as TIMESTAMP, so the TIMESTAMP row already covers it.
950+
// TIME is omitted for now because its maximum PRECISION/scale representation
951+
// in getTypeInfo is not yet settled.
952+
private[jdbc] val TYPE_INFO: Seq[TypeInfoRow] = Seq(
953+
typeRow("BOOLEAN", Types.BOOLEAN, 1, null, null, false, 0, 0, null),
954+
typeRow("TINYINT", Types.TINYINT, 3, null, null, false, 0, 0, 10),
955+
typeRow("SMALLINT", Types.SMALLINT, 5, null, null, false, 0, 0, 10),
956+
typeRow("INT", Types.INTEGER, 10, null, null, false, 0, 0, 10),
957+
typeRow("BIGINT", Types.BIGINT, 19, null, null, false, 0, 0, 10),
958+
typeRow("FLOAT", Types.FLOAT, 7, null, null, false, 0, 0, 10),
959+
typeRow("DOUBLE", Types.DOUBLE, 15, null, null, false, 0, 0, 10),
960+
typeRow("DECIMAL", Types.DECIMAL, 38, null, "precision,scale", false, 0, 38, 10),
961+
typeRow("STRING", Types.VARCHAR, Int.MaxValue, "'", null, true, 0, 0, null),
962+
typeRow("BINARY", Types.VARBINARY, Int.MaxValue, "X'", null, false, 0, 0, null,
963+
literalSuffix = "'"),
964+
typeRow("DATE", Types.DATE, 10, "'", null, false, 0, 0, null),
965+
typeRow("TIMESTAMP", Types.TIMESTAMP, 29, "'", null, false, 0, 6, null))
819966
}

sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,4 +808,150 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark
808808
}
809809
}
810810
}
811+
812+
test("SparkConnectDatabaseMetaData getPrimaryKeys") {
813+
withConnection { conn =>
814+
val metadata = conn.getMetaData
815+
// Spark has no primary keys, so an empty result set with the JDBC schema is returned.
816+
Using.resource(metadata.getPrimaryKeys(null, null, null)) { rs =>
817+
val rsmd = rs.getMetaData
818+
assert((1 to rsmd.getColumnCount).map(rsmd.getColumnName) === Seq(
819+
"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "KEY_SEQ", "PK_NAME"))
820+
assert(!rs.next())
821+
}
822+
}
823+
}
824+
825+
test("SparkConnectDatabaseMetaData getImportedKeys, getExportedKeys and getCrossReference") {
826+
withConnection { conn =>
827+
val metadata = conn.getMetaData
828+
val foreignKeySchema = Seq(
829+
"PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "PKCOLUMN_NAME",
830+
"FKTABLE_CAT", "FKTABLE_SCHEM", "FKTABLE_NAME", "FKCOLUMN_NAME",
831+
"KEY_SEQ", "UPDATE_RULE", "DELETE_RULE", "FK_NAME", "PK_NAME", "DEFERRABILITY")
832+
// Spark has no foreign keys, so all three return an empty result set with the JDBC schema.
833+
Seq(
834+
() => metadata.getImportedKeys(null, null, null),
835+
() => metadata.getExportedKeys(null, null, null),
836+
() => metadata.getCrossReference(null, null, null, null, null, null))
837+
.foreach { getForeignKeys =>
838+
Using.resource(getForeignKeys()) { rs =>
839+
val rsmd = rs.getMetaData
840+
assert((1 to rsmd.getColumnCount).map(rsmd.getColumnName) === foreignKeySchema)
841+
assert(!rs.next())
842+
}
843+
}
844+
}
845+
}
846+
847+
test("SparkConnectDatabaseMetaData getTypeInfo") {
848+
withConnection { conn =>
849+
val metadata = conn.getMetaData
850+
Using.resource(metadata.getTypeInfo) { rs =>
851+
val rsmd = rs.getMetaData
852+
assert((1 to rsmd.getColumnCount).map(rsmd.getColumnName) === Seq(
853+
"TYPE_NAME", "DATA_TYPE", "PRECISION", "LITERAL_PREFIX", "LITERAL_SUFFIX",
854+
"CREATE_PARAMS", "NULLABLE", "CASE_SENSITIVE", "SEARCHABLE", "UNSIGNED_ATTRIBUTE",
855+
"FIXED_PREC_SCALE", "AUTO_INCREMENT", "LOCAL_TYPE_NAME", "MINIMUM_SCALE",
856+
"MAXIMUM_SCALE", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "NUM_PREC_RADIX"))
857+
858+
case class TypeInfo(
859+
name: String,
860+
dataType: Int,
861+
precision: Int,
862+
literalPrefix: String,
863+
literalSuffix: String,
864+
createParams: String,
865+
caseSensitive: Boolean,
866+
nullable: Short,
867+
searchable: Short,
868+
minScale: Short,
869+
maxScale: Short,
870+
numPrecRadix: Option[Int])
871+
val types = new Iterator[TypeInfo] {
872+
def hasNext: Boolean = rs.next()
873+
def next(): TypeInfo = TypeInfo(
874+
name = rs.getString("TYPE_NAME"),
875+
dataType = rs.getInt("DATA_TYPE"),
876+
precision = rs.getInt("PRECISION"),
877+
literalPrefix = rs.getString("LITERAL_PREFIX"),
878+
literalSuffix = rs.getString("LITERAL_SUFFIX"),
879+
createParams = rs.getString("CREATE_PARAMS"),
880+
caseSensitive = rs.getBoolean("CASE_SENSITIVE"),
881+
nullable = rs.getShort("NULLABLE"),
882+
searchable = rs.getShort("SEARCHABLE"),
883+
minScale = rs.getShort("MINIMUM_SCALE"),
884+
maxScale = rs.getShort("MAXIMUM_SCALE"),
885+
numPrecRadix =
886+
Option(rs.getObject("NUM_PREC_RADIX")).map(_.asInstanceOf[Integer].toInt))
887+
}.toSeq
888+
889+
// results are ordered by DATA_TYPE
890+
assert(types.map(t => (t.name, t.dataType)) === Seq(
891+
("TINYINT", Types.TINYINT),
892+
("BIGINT", Types.BIGINT),
893+
("BINARY", Types.VARBINARY),
894+
("DECIMAL", Types.DECIMAL),
895+
("INT", Types.INTEGER),
896+
("SMALLINT", Types.SMALLINT),
897+
("FLOAT", Types.FLOAT),
898+
("DOUBLE", Types.DOUBLE),
899+
("STRING", Types.VARCHAR),
900+
("BOOLEAN", Types.BOOLEAN),
901+
("DATE", Types.DATE),
902+
("TIMESTAMP", Types.TIMESTAMP)))
903+
904+
// every type is nullable and searchable
905+
assert(types.forall(_.nullable == DatabaseMetaData.typeNullable))
906+
assert(types.forall(_.searchable == DatabaseMetaData.typeSearchable))
907+
// only STRING is case-sensitive
908+
assert(types.filter(_.caseSensitive).map(_.name) === Seq("STRING"))
909+
910+
// string-like types are quoted with a single quote on both sides, except BINARY,
911+
// whose literals use the hex syntax X'...'. Numeric types carry no literal quote.
912+
val quoted = Map(
913+
"STRING" -> ("'", "'"),
914+
"DATE" -> ("'", "'"),
915+
"TIMESTAMP" -> ("'", "'"),
916+
"BINARY" -> ("X'", "'"))
917+
types.foreach { t =>
918+
val (prefix, suffix) = quoted.getOrElse(t.name, (null, null))
919+
assert(t.literalPrefix === prefix, s"unexpected LITERAL_PREFIX for ${t.name}")
920+
assert(t.literalSuffix === suffix, s"unexpected LITERAL_SUFFIX for ${t.name}")
921+
}
922+
923+
// PRECISION mirrors JdbcTypeUtils.getPrecision for every type
924+
val precisions = Map(
925+
"BOOLEAN" -> 1, "TINYINT" -> 3, "SMALLINT" -> 5, "INT" -> 10, "BIGINT" -> 19,
926+
"FLOAT" -> 7, "DOUBLE" -> 15, "DECIMAL" -> 38, "STRING" -> Int.MaxValue,
927+
"BINARY" -> Int.MaxValue, "DATE" -> 10, "TIMESTAMP" -> 29)
928+
types.foreach { t =>
929+
assert(t.precision === precisions(t.name), s"unexpected PRECISION for ${t.name}")
930+
}
931+
932+
// CREATE_PARAMS is set only for the parameterized types
933+
val createParams = Map("DECIMAL" -> "precision,scale")
934+
types.foreach { t =>
935+
assert(t.createParams === createParams.getOrElse(t.name, null),
936+
s"unexpected CREATE_PARAMS for ${t.name}")
937+
}
938+
939+
// (MINIMUM_SCALE, MAXIMUM_SCALE); types not listed carry no scale (0, 0)
940+
val scales = Map("DECIMAL" -> (0, 38), "TIMESTAMP" -> (0, 6))
941+
types.foreach { t =>
942+
val (minScale, maxScale) = scales.getOrElse(t.name, (0, 0))
943+
assert(t.minScale === minScale.toShort, s"unexpected MINIMUM_SCALE for ${t.name}")
944+
assert(t.maxScale === maxScale.toShort, s"unexpected MAXIMUM_SCALE for ${t.name}")
945+
}
946+
947+
// NUM_PREC_RADIX is 10 for numeric types and NULL otherwise, mirroring JdbcTypeUtils
948+
val numericTypes =
949+
Set("TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "DECIMAL")
950+
types.foreach { t =>
951+
val expected = if (numericTypes.contains(t.name)) Some(10) else None
952+
assert(t.numPrecRadix === expected, s"unexpected NUM_PREC_RADIX for ${t.name}")
953+
}
954+
}
955+
}
956+
}
811957
}

0 commit comments

Comments
 (0)