Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
import session.implicits._
val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
df1.write.parquet(table1Dir.getPath)
spark.catalog.createTable(parquetTableName, table1Dir.getPath).collect()
spark.catalog.createTable(parquetTableName, table1Dir.getPath)
withTable(orcTableName, jsonTableName) {
withTempPath { table2Dir =>
val df2 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").zipWithIndex.toDF("name", "id")
df2.write.orc(table2Dir.getPath)
spark.catalog.createTable(orcTableName, table2Dir.getPath, "orc").collect()
spark.catalog.createTable(orcTableName, table2Dir.getPath, "orc")
val orcTable = spark.catalog.getTable(orcTableName)
assert(!orcTable.isTemporary)
assert(orcTable.name == orcTableName)
Expand All @@ -117,7 +117,6 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
val schema = new StructType().add("id", LongType).add("a", DoubleType)
spark.catalog
.createTable(jsonTableName, "json", schema, Map.empty[String, String])
.collect()
val jsonTable = spark.catalog.getTable("default", jsonTableName)
assert(!jsonTable.isTemporary)
assert(jsonTable.name == jsonTableName)
Expand Down Expand Up @@ -151,6 +150,19 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
assert(spark.catalog.listTables().collect().isEmpty)
}

test("createTable should be eager") {
val tableName = "eager_table"
withTable(tableName) {
withTempPath { dir =>
val session = spark
import session.implicits._
Seq((1, "a")).toDF("id", "value").write.parquet(dir.getPath)
spark.catalog.createTable(tableName, dir.getPath)
assert(spark.catalog.tableExists(tableName))
}
}
}

test("Cache Table APIs") {
val parquetTableName = "parquet_table"
withTable(parquetTableName) {
Expand All @@ -159,7 +171,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
import session.implicits._
val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
df1.write.parquet(table1Dir.getPath)
spark.catalog.createTable(parquetTableName, table1Dir.getPath).collect()
spark.catalog.createTable(parquetTableName, table1Dir.getPath)

// Test cache and uncacheTable
spark.catalog.cacheTable(parquetTableName)
Expand Down Expand Up @@ -375,7 +387,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
val session = spark
import session.implicits._
Seq(1).toDF("id").write.parquet(dir.getPath)
spark.catalog.createTable(tbl, dir.getPath).collect()
spark.catalog.createTable(tbl, dir.getPath)
assert(spark.catalog.tableExists(tbl))
spark.catalog.dropTable(tbl)
assert(!spark.catalog.tableExists(tbl))
Expand Down Expand Up @@ -445,7 +457,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
val session = spark
import session.implicits._
Seq(1).toDF("id").write.parquet(dir.getPath)
spark.catalog.createTable(t, dir.getPath).collect()
spark.catalog.createTable(t, dir.getPath)
val ddl = spark.catalog.getCreateTableString(t)
assert(ddl.nonEmpty && ddl.toLowerCase(java.util.Locale.ROOT).contains("create"))
}
Expand All @@ -470,7 +482,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
val session = spark
import session.implicits._
Seq(1).toDF("id").write.parquet(dir.getPath)
spark.catalog.createTable(t, dir.getPath).collect()
spark.catalog.createTable(t, dir.getPath)
spark.catalog.analyzeTable(t, noScan = true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,7 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
* @since 3.5.0
*/
override def createTable(tableName: String, path: String): DataFrame = {
sparkSession.newDataFrame { builder =>
builder.getCatalogBuilder.getCreateTableBuilder
.setTableName(tableName)
.setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType))
.setDescription("")
.putOptions("path", path)
}
createTable(tableName, path, "parquet")
}

/**
Expand Down Expand Up @@ -484,7 +478,7 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
schema: StructType,
description: String,
options: Map[String, String]): DataFrame = {
sparkSession.newDataFrame { builder =>
sparkSession.execute { builder =>
val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
.setTableName(tableName)
.setSource(source)
Expand All @@ -494,6 +488,7 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
createTableBuilder.putOptions(k, v)
}
}
sparkSession.table(tableName)
}

/**
Expand Down