Skip to content

Commit 37369e5

Browse files
committed
[SPARK-52812][SQL] Make Spark Connect Catalog.createTable eager
1 parent 846376a commit 37369e5

2 files changed

Lines changed: 22 additions & 7 deletions

File tree

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
9999
import session.implicits._
100100
val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
101101
df1.write.parquet(table1Dir.getPath)
102-
spark.catalog.createTable(parquetTableName, table1Dir.getPath).collect()
102+
spark.catalog.createTable(parquetTableName, table1Dir.getPath)
103103
withTable(orcTableName, jsonTableName) {
104104
withTempPath { table2Dir =>
105105
val df2 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").zipWithIndex.toDF("name", "id")
106106
df2.write.orc(table2Dir.getPath)
107-
spark.catalog.createTable(orcTableName, table2Dir.getPath, "orc").collect()
107+
spark.catalog.createTable(orcTableName, table2Dir.getPath, "orc")
108108
val orcTable = spark.catalog.getTable(orcTableName)
109109
assert(!orcTable.isTemporary)
110110
assert(orcTable.name == orcTableName)
@@ -151,6 +151,19 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
151151
assert(spark.catalog.listTables().collect().isEmpty)
152152
}
153153

154+
test("createTable should be eager") {
155+
val tableName = "eager_table"
156+
withTable(tableName) {
157+
withTempPath { dir =>
158+
val session = spark
159+
import session.implicits._
160+
Seq((1, "a")).toDF("id", "value").write.parquet(dir.getPath)
161+
spark.catalog.createTable(tableName, dir.getPath)
162+
assert(spark.catalog.tableExists(tableName))
163+
}
164+
}
165+
}
166+
154167
test("Cache Table APIs") {
155168
val parquetTableName = "parquet_table"
156169
withTable(parquetTableName) {
@@ -159,7 +172,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
159172
import session.implicits._
160173
val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
161174
df1.write.parquet(table1Dir.getPath)
162-
spark.catalog.createTable(parquetTableName, table1Dir.getPath).collect()
175+
spark.catalog.createTable(parquetTableName, table1Dir.getPath)
163176

164177
// Test cache and uncacheTable
165178
spark.catalog.cacheTable(parquetTableName)
@@ -375,7 +388,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
375388
val session = spark
376389
import session.implicits._
377390
Seq(1).toDF("id").write.parquet(dir.getPath)
378-
spark.catalog.createTable(tbl, dir.getPath).collect()
391+
spark.catalog.createTable(tbl, dir.getPath)
379392
assert(spark.catalog.tableExists(tbl))
380393
spark.catalog.dropTable(tbl)
381394
assert(!spark.catalog.tableExists(tbl))
@@ -445,7 +458,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
445458
val session = spark
446459
import session.implicits._
447460
Seq(1).toDF("id").write.parquet(dir.getPath)
448-
spark.catalog.createTable(t, dir.getPath).collect()
461+
spark.catalog.createTable(t, dir.getPath)
449462
val ddl = spark.catalog.getCreateTableString(t)
450463
assert(ddl.nonEmpty && ddl.toLowerCase(java.util.Locale.ROOT).contains("create"))
451464
}
@@ -470,7 +483,7 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
470483
val session = spark
471484
import session.implicits._
472485
Seq(1).toDF("id").write.parquet(dir.getPath)
473-
spark.catalog.createTable(t, dir.getPath).collect()
486+
spark.catalog.createTable(t, dir.getPath)
474487
spark.catalog.analyzeTable(t, noScan = true)
475488
}
476489
}

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
484484
schema: StructType,
485485
description: String,
486486
options: Map[String, String]): DataFrame = {
487-
sparkSession.newDataFrame { builder =>
487+
val df = sparkSession.newDataFrame { builder =>
488488
val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
489489
.setTableName(tableName)
490490
.setSource(source)
@@ -494,6 +494,8 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
494494
createTableBuilder.putOptions(k, v)
495495
}
496496
}
497+
df.collect()
498+
df
497499
}
498500

499501
/**

0 commit comments

Comments
 (0)