Skip to content

Commit b2a199b

Browse files
Add PathBasedTableSuite
1 parent 96e86aa commit b2a199b

1 file changed

Lines changed: 191 additions & 0 deletions

File tree

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector
19+
20+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
21+
import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog
22+
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
23+
import org.apache.spark.sql.test.SharedSparkSession
24+
25+
/**
26+
* Non-transactional tests for SQL resolution of path-based tables surfaced by a
27+
* [[org.apache.spark.sql.connector.catalog.SupportsCatalogOptions]] data source
28+
* (e.g. `pathformat.`/path/to/t``). Covers reads, DDL, CREATE/REPLACE, regression for v1
29+
* file-format direct queries, and the `runSQLOnFile` gate. Transactional behavior is
30+
* covered separately in [[PathBasedTableTransactionSuite]].
31+
*/
32+
class PathBasedTableSuite extends QueryTest with SharedSparkSession {
33+
34+
import testImplicits._
35+
36+
// FakePathBasedSource rewrites `pathformat.\`/path/to/t\`` to the session catalog with
37+
// Identifier(ns = ["pathformat"], name = "/path/to/t"). InMemoryTableCatalog accepts
38+
// arbitrary namespace/name shapes, so we plug it in as the v2 session catalog.
39+
private val tablePath = "pathformat.`/path/to/t`"
40+
41+
override def beforeEach(): Unit = {
42+
super.beforeEach()
43+
spark.conf.set(
44+
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableCatalog].getName)
45+
}
46+
47+
override def afterEach(): Unit = {
48+
// SharedSparkSession reuses one SparkSession across tests, so the in-memory catalog's
49+
// table map would persist between tests. Reset clears registered catalogs so each test
50+
// sees a fresh session catalog instance.
51+
spark.sessionState.catalogManager.reset()
52+
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
53+
super.afterEach()
54+
}
55+
56+
test("CREATE then SELECT on path-based table") {
57+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
58+
sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
59+
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
60+
}
61+
62+
test("DESCRIBE TABLE resolves path-based table") {
63+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
64+
checkAnswer(
65+
sql(s"DESCRIBE TABLE $tablePath").select("col_name", "data_type"),
66+
Row("id", "int") :: Row("data", "string") :: Nil)
67+
}
68+
69+
test("ALTER TABLE on path-based table") {
70+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
71+
sql(s"ALTER TABLE $tablePath ADD COLUMNS (extra DOUBLE)")
72+
val columns = sql(s"DESCRIBE TABLE $tablePath").collect().map(_.getString(0)).toSet
73+
assert(Set("id", "data", "extra").subsetOf(columns))
74+
}
75+
76+
test("DROP TABLE on path-based table") {
77+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
78+
sql(s"DROP TABLE $tablePath")
79+
intercept[AnalysisException] {
80+
sql(s"SELECT * FROM $tablePath")
81+
}
82+
}
83+
84+
test("JOIN across two path-based tables") {
85+
val a = "pathformat.`/a`"
86+
val b = "pathformat.`/b`"
87+
sql(s"CREATE TABLE $a (id INT, x STRING)")
88+
sql(s"CREATE TABLE $b (id INT, y STRING)")
89+
sql(s"INSERT INTO $a VALUES (1, 'x1'), (2, 'x2')")
90+
sql(s"INSERT INTO $b VALUES (1, 'y1'), (2, 'y2')")
91+
checkAnswer(
92+
sql(s"SELECT a.id, a.x, b.y FROM $a a JOIN $b b ON a.id = b.id ORDER BY a.id"),
93+
Row(1, "x1", "y1") :: Row(2, "x2", "y2") :: Nil)
94+
}
95+
96+
test("session-config catalog routes non-transactional reads") {
97+
val target = "tgt"
98+
withSQLConf(
99+
s"spark.sql.catalog.$target" -> classOf[InMemoryTableCatalog].getName,
100+
s"spark.datasource.pathformat2.catalog" -> target) {
101+
sql("CREATE TABLE pathformat2.`/p` (id INT, data STRING)")
102+
sql("INSERT INTO pathformat2.`/p` VALUES (1, 'a')")
103+
checkAnswer(spark.table("pathformat2.`/p`"), Row(1, "a") :: Nil)
104+
val tgt = spark.sessionState.catalogManager.catalog(target)
105+
.asInstanceOf[InMemoryTableCatalog]
106+
assert(tgt.listTables(Array("pathformat2")).map(_.name()).contains("/p"))
107+
}
108+
}
109+
110+
test("regression: v1 file format direct query still resolves") {
111+
withTempDir { dir =>
112+
val path = new java.io.File(dir, "p.parquet").getCanonicalPath
113+
Seq((1, "a"), (2, "b")).toDF("id", "data").write.parquet(path)
114+
checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), Row(1, "a") :: Row(2, "b") :: Nil)
115+
}
116+
}
117+
118+
test("unknown format head produces table-not-found, not ClassNotFoundException") {
119+
val e = intercept[AnalysisException] {
120+
sql("SELECT * FROM unknown_fmt.`/path/to/t`")
121+
}
122+
// The format head is not a registered data source and not a catalog. Resolution
123+
// falls through to a normal table-not-found error.
124+
assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" ||
125+
e.getMessage.toLowerCase.contains("not found"))
126+
}
127+
128+
test("VERSION AS OF on path-based table") {
129+
val base = "pathformat.`/p`"
130+
// InMemoryTableCatalog implements time travel by appending the version string to the
131+
// identifier name (see loadTable(ident, version) — it looks up name + version).
132+
val versioned = "pathformat.`/pv1`"
133+
sql(s"CREATE TABLE $base (id INT)")
134+
sql(s"CREATE TABLE $versioned (id INT)")
135+
sql(s"INSERT INTO $base VALUES (1)")
136+
sql(s"INSERT INTO $versioned VALUES (2)")
137+
checkAnswer(sql(s"SELECT * FROM $base VERSION AS OF 'v1'"), Row(2))
138+
}
139+
140+
test("SCO precedence: data source name wins over same-named catalog") {
141+
// Register a catalog under the same name as the SCO data source short name.
142+
// Resolution should still route through the SCO resolver, i.e. the table is
143+
// created under the session catalog (`spark_catalog`), not under "pathformat".
144+
withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) {
145+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
146+
sql(s"INSERT INTO $tablePath VALUES (1, 'a')")
147+
checkAnswer(spark.table(tablePath), Row(1, "a") :: Nil)
148+
149+
// Table lives in the session catalog under namespace=["pathformat"], not in the
150+
// catalog registered as "pathformat".
151+
val sessionCat = spark.sessionState.catalogManager.v2SessionCatalog
152+
.asInstanceOf[InMemoryTableCatalog]
153+
assert(sessionCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t"))
154+
val homonymCat = spark.sessionState.catalogManager.catalog("pathformat")
155+
.asInstanceOf[InMemoryTableCatalog]
156+
assert(homonymCat.listTables(Array.empty).isEmpty)
157+
}
158+
}
159+
160+
test("CREATE TABLE AS SELECT on path-based table") {
161+
sql("CREATE TABLE source (id INT, data STRING)")
162+
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')")
163+
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
164+
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
165+
}
166+
167+
test("REPLACE TABLE AS SELECT on path-based table") {
168+
sql("CREATE TABLE source (id INT, data STRING)")
169+
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')")
170+
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
171+
sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1")
172+
checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil)
173+
}
174+
175+
test("INSERT OVERWRITE on path-based table") {
176+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
177+
sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
178+
sql(s"INSERT OVERWRITE $tablePath VALUES (9, 'z')")
179+
checkAnswer(spark.table(tablePath), Row(9, "z") :: Nil)
180+
}
181+
182+
test("DataFrame API regression: read still resolves via SCO") {
183+
// Create via SQL (exercises the new LookupCatalog SCO seam), read via DataFrame
184+
// (exercises the pre-existing DataFrameReader SCO path in DataSourceV2Utils).
185+
// Both paths should land on the same Identifier in the session catalog.
186+
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
187+
sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
188+
val df = spark.read.format("pathformat").load("/path/to/t")
189+
checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil)
190+
}
191+
}

0 commit comments

Comments
 (0)