Skip to content

Commit 88cc8d6

Browse files
Change resolution precedence + test improvements
1 parent ae8f4a8 commit 88cc8d6

4 files changed

Lines changed: 162 additions & 64 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,9 @@ private[sql] trait LookupCatalog extends Logging {
121121
// this custom catalog can't be accessed.
122122
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
123123
} else {
124-
// Path-based data sources (e.g. `pathformat2.'/path/to/t'`) whose format declares a
125-
// catalog via SupportsCatalogOptions are routed to that catalog. Both the catalog and
126-
// the canonical identifier come from the connector.
127-
val (catalogName, ident) =
128-
Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match {
129-
case Some((catName, providerIdent)) => (catName, providerIdent)
130-
case None => (nameParts.head, nameParts.tail.asIdentifier)
131-
}
132-
133124
try {
134-
val catalog = catalogManager.catalog(catalogName)
125+
val catalog = catalogManager.catalog(nameParts.head)
126+
val ident = nameParts.tail.asIdentifier
135127
if (CatalogV2Util.isSessionCatalog(catalog)) {
136128
// Reject only when namespace is empty (e.g. spark_catalog.t with no database).
137129
// Allow multi-part namespace for metadata tables (e.g. default.table.snapshots).
@@ -143,7 +135,18 @@ private[sql] trait LookupCatalog extends Logging {
143135
Some((catalog, ident))
144136
} catch {
145137
case _: CatalogNotFoundException =>
146-
Some((currentCatalog, nameParts.asIdentifier))
138+
// No catalog matched. As a fallback, try path-based data sources:
139+
// formats implementing SupportsCatalogOptions (e.g. `pathformat.`/path/to/t``)
140+
// route to the catalog the connector designates. If no SCO format claims the
141+
// identifier head, fall through to currentCatalog and let later analysis raise
142+
// table-not-found. This matches the v1 file-format precedence (catalog first,
143+
// path-based as fallback).
144+
Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match {
145+
case Some((catName, providerIdent)) =>
146+
Some((catalogManager.catalog(catName), providerIdent))
147+
case None =>
148+
Some((currentCatalog, nameParts.asIdentifier))
149+
}
147150
}
148151
}
149152
}

sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.connector
1919

2020
import java.util
21+
import java.util.Optional
2122

2223
import org.apache.spark.sql.connector.catalog.{Identifier, SessionConfigSupport, SupportsCatalogOptions, SupportsV1OverwriteWithSaveAsTable, Table, TableProvider}
2324
import org.apache.spark.sql.connector.expressions.Transform
@@ -96,10 +97,11 @@ class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled
9697
}
9798

9899
/**
99-
* Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]]
100-
* to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. We rely on
101-
* the default [[SupportsCatalogOptions#extractCatalog]] which returns
102-
* [[org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME]].
100+
* Simulates a path-based connector that implements [[SupportsCatalogOptions]] and routes
101+
* `pathformat.\`/path/to/t\`` SQL identifiers to a dedicated catalog (`pathformat_cat`).
102+
* Tests register that catalog and assert against it so the SCO seam is exercised
103+
* unambiguously: without SCO, `CatalogAndIdentifier` falls back to the current catalog
104+
* (session catalog) and the target catalog stays empty.
103105
*/
104106
class FakePathBasedSource
105107
extends FakeV2ProviderWithCustomSchema
@@ -108,8 +110,18 @@ class FakePathBasedSource
108110

109111
override def shortName(): String = "pathformat"
110112

113+
override def extractCatalog(options: CaseInsensitiveStringMap): String =
114+
FakePathBasedSource.CATALOG_NAME
115+
111116
override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier =
112117
Identifier.of(Array(shortName()), options.get("path"))
118+
119+
override def extractTimeTravelVersion(options: CaseInsensitiveStringMap): Optional[String] =
120+
Optional.ofNullable(options.get("versionAsOf"))
121+
}
122+
123+
object FakePathBasedSource {
124+
val CATALOG_NAME: String = "pathformat_cat"
113125
}
114126

115127
/**

sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala

Lines changed: 83 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,44 +19,50 @@ package org.apache.spark.sql.connector
1919

2020
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2121
import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog
22-
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
2322
import org.apache.spark.sql.test.SharedSparkSession
2423

2524
/**
2625
* Non-transactional tests for SQL resolution of path-based tables surfaced by a
2726
* [[org.apache.spark.sql.connector.catalog.SupportsCatalogOptions]] data source
28-
* (e.g. `pathformat.`/path/to/t``). Covers reads, DDL, CREATE/REPLACE, regression for v1
29-
* file-format direct queries, and the `runSQLOnFile` gate. Transactional behavior is
30-
* covered separately in [[PathBasedTableTransactionSuite]].
27+
* (e.g. `pathformat.`/path/to/t``). [[FakePathBasedSource]] routes resolution to a
28+
* dedicated `pathformat_cat` catalog rather than the session catalog, so assertions
29+
* against that catalog unambiguously confirm the SCO seam fired — without SCO,
30+
* `CatalogAndIdentifier`'s fallback lands in the (default) session catalog and the
31+
* named catalog stays empty.
3132
*/
3233
class PathBasedTableSuite extends QueryTest with SharedSparkSession {
3334

3435
import testImplicits._
3536

36-
// FakePathBasedSource rewrites `pathformat.\`/path/to/t\`` to the session catalog with
37-
// Identifier(ns = ["pathformat"], name = "/path/to/t"). InMemoryTableCatalog accepts
38-
// arbitrary namespace/name shapes, so we plug it in as the v2 session catalog.
3937
private val tablePath = "pathformat.`/path/to/t`"
4038

39+
private def pathformatCat: InMemoryTableCatalog =
40+
spark.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME)
41+
.asInstanceOf[InMemoryTableCatalog]
42+
4143
override def beforeEach(): Unit = {
4244
super.beforeEach()
4345
spark.conf.set(
44-
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableCatalog].getName)
46+
s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}",
47+
classOf[InMemoryTableCatalog].getName)
4548
}
4649

4750
override def afterEach(): Unit = {
48-
// SharedSparkSession reuses one SparkSession across tests, so the in-memory catalog's
49-
// table map would persist between tests. Reset clears registered catalogs so each test
50-
// sees a fresh session catalog instance.
51+
// SharedSparkSession reuses one SparkSession across tests. `reset()` drops registered
52+
// non-session catalogs (including pathformat_cat), so the next test starts with a
53+
// fresh InMemoryTableCatalog instance.
5154
spark.sessionState.catalogManager.reset()
52-
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
55+
spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}")
5356
super.afterEach()
5457
}
5558

5659
test("CREATE then SELECT on path-based table") {
5760
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
5861
sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
5962
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
63+
// The SCO resolver routed creation/reads to pathformat_cat. Without the seam,
64+
// CREATE would fall through to the (default) session catalog and fail.
65+
assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t"))
6066
}
6167

6268
test("DESCRIBE TABLE resolves path-based table") {
@@ -136,39 +142,43 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession {
136142
checkAnswer(sql(s"SELECT * FROM $base VERSION AS OF 'v1'"), Row(2))
137143
}
138144

139-
test("SCO precedence: data source name wins over same-named catalog") {
140-
// Register a catalog under the same name as the SCO data source short name.
141-
// Resolution should still route through the SCO resolver, i.e. the table is
142-
// created under the session catalog (`spark_catalog`), not under "pathformat".
145+
test("catalog precedence: same-named catalog wins over SCO data source") {
146+
// Register a catalog under the same name as the SCO data source short name. SQL
147+
// resolution should route to the catalog; the SCO resolver is consulted only when
148+
// no catalog claims the head, matching v1 file-format precedence (ResolveSQLOnFile)
149+
// and Delta's ResolveDeltaPathTable extension.
143150
withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) {
144151
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
145152
sql(s"INSERT INTO $tablePath VALUES (1, 'a')")
146153
checkAnswer(spark.table(tablePath), Row(1, "a") :: Nil)
147154

148-
// Table lives in the session catalog under namespace=["pathformat"], not in the
149-
// catalog registered as "pathformat".
150-
val sessionCat = spark.sessionState.catalogManager.v2SessionCatalog
151-
.asInstanceOf[InMemoryTableCatalog]
152-
assert(sessionCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t"))
155+
// Table lives in the homonym catalog at identifier (ns=[], name="/path/to/t"),
156+
// and the SCO-targeted catalog (pathformat_cat) is untouched because the SCO
157+
// resolver was never consulted.
153158
val homonymCat = spark.sessionState.catalogManager.catalog("pathformat")
154159
.asInstanceOf[InMemoryTableCatalog]
155-
assert(homonymCat.listTables(Array.empty).isEmpty)
160+
assert(homonymCat.listTables(Array.empty).map(_.name()).contains("/path/to/t"))
161+
assert(!pathformatCat.namespaceExists(Array("pathformat")))
156162
}
157163
}
158164

159165
test("CREATE TABLE AS SELECT on path-based table") {
160-
sql("CREATE TABLE source (id INT, data STRING)")
161-
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')")
162-
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
163-
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
166+
withTable("source") {
167+
sql("CREATE TABLE source (id INT, data STRING)")
168+
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')")
169+
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
170+
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
171+
}
164172
}
165173

166174
test("REPLACE TABLE AS SELECT on path-based table") {
167-
sql("CREATE TABLE source (id INT, data STRING)")
168-
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')")
169-
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
170-
sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1")
171-
checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil)
175+
withTable("source") {
176+
sql("CREATE TABLE source (id INT, data STRING)")
177+
sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')")
178+
sql(s"CREATE TABLE $tablePath AS SELECT * FROM source")
179+
sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1")
180+
checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil)
181+
}
172182
}
173183

174184
test("INSERT OVERWRITE on path-based table") {
@@ -178,13 +188,53 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession {
178188
checkAnswer(spark.table(tablePath), Row(9, "z") :: Nil)
179189
}
180190

181-
test("DataFrame API regression: read still resolves via SCO") {
191+
test("DataFrame API: read resolves via SCO") {
182192
// Create via SQL (exercises the new LookupCatalog SCO seam), read via DataFrame
183193
// (exercises the pre-existing DataFrameReader SCO path in DataSourceV2Utils).
184-
// Both paths should land on the same Identifier in the session catalog.
194+
// Both paths should land on the same Identifier in pathformat_cat.
185195
sql(s"CREATE TABLE $tablePath (id INT, data STRING)")
186196
sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
187197
val df = spark.read.format("pathformat").load("/path/to/t")
188198
checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil)
189199
}
200+
201+
test("DataFrame API: write via SCO, read via SQL") {
202+
// Write through DataFrameWriter (exercises the refactored buildDsOptions in the v2
203+
// write path), read back via SQL to confirm both entry points land on the same
204+
// Identifier under pathformat_cat.
205+
Seq((1, "a"), (2, "b")).toDF("id", "data")
206+
.write.format("pathformat").save("/path/to/t")
207+
checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
208+
}
209+
210+
test("DataFrame API: time travel via SCO") {
211+
// InMemoryTableCatalog implements time travel by appending the version string to
212+
// the identifier name. SCO time-travel options flow through the refactored
213+
// extractCatalogAndIdentifier helper, so this also regresses that path.
214+
sql("CREATE TABLE pathformat.`/p` (id INT)")
215+
sql("CREATE TABLE pathformat.`/pv1` (id INT)")
216+
sql("INSERT INTO pathformat.`/p` VALUES (1)")
217+
sql("INSERT INTO pathformat.`/pv1` VALUES (2)")
218+
val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p")
219+
checkAnswer(df, Row(2))
220+
}
221+
222+
test("DataFrame API: pure write and read via SCO (no SQL)") {
223+
// Uses only DataFrameWriter/DataFrameReader, so it exercises the v2 SCO entry point
224+
// in DataSourceV2Utils.loadV2Source / the writer's getTableProviderCatalog branch
225+
// independently of LookupCatalog. Survives even when the SQL SCO seam is removed.
226+
Seq((1, "a"), (2, "b")).toDF("id", "data")
227+
.write.format("pathformat").save("/path/to/t")
228+
val df = spark.read.format("pathformat").load("/path/to/t")
229+
checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil)
230+
assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t"))
231+
}
232+
233+
test("DataFrame API: pure time travel via SCO (no SQL)") {
234+
// Pure DataFrame setup so the test does not depend on the SQL SCO seam.
235+
Seq(1).toDF("id").write.format("pathformat").save("/p")
236+
Seq(2).toDF("id").write.format("pathformat").save("/pv1")
237+
val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p")
238+
checkAnswer(df, Row(2))
239+
}
190240
}

0 commit comments

Comments
 (0)