Skip to content

Commit 3bf26af

Browse files
Improvements
1 parent b2a199b commit 3bf26af

3 files changed

Lines changed: 29 additions & 21 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec
3030
import org.apache.spark.sql.catalyst.expressions.Literal
3131
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3232
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
33-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
33+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, DataSourceCatalogResolver, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
3434
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
3535
import org.apache.spark.sql.errors.QueryExecutionErrors
3636
import org.apache.spark.sql.execution.SQLExecution
@@ -129,6 +129,30 @@ private[sql] object DataSourceV2Utils extends Logging {
129129
(catalogName, ident)
130130
}
131131

132+
/**
133+
* Resolver bound to a session [[SQLConf]] that maps a multipart SQL identifier
134+
* (e.g. `pathformat.\`/path/to/t\``) to a `(catalogName, identifier)` pair when the head
135+
* names a registered [[SupportsCatalogOptions]] data source. Returns `None` for non-SCO
136+
* sources or unknown format heads, letting the caller fall back to standard catalog
137+
* resolution.
138+
*/
139+
def supportsCatalogOptionsResolver(conf: SQLConf): DataSourceCatalogResolver =
140+
(nameParts: Seq[String]) =>
141+
try {
142+
DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap {
143+
case sco: SupportsCatalogOptions =>
144+
val optionsWithPath = getOptionsWithPaths(
145+
CaseInsensitiveMap(Map.empty), nameParts.tail.mkString("."))
146+
val dsOptions = buildDsOptions(sco, conf, optionsWithPath)
147+
Some(extractCatalogAndIdentifier(sco, dsOptions))
148+
case _ => None
149+
}
150+
} catch {
151+
// The format name is not a registered data source. Fall through and let the caller
152+
// treat it as a catalog/namespace name.
153+
case _: ClassNotFoundException => None
154+
}
155+
132156
def loadV2Source(
133157
sparkSession: SparkSession,
134158
provider: TableProvider,

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
2828
import org.apache.spark.sql.catalyst.parser.ParserInterface
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3030
import org.apache.spark.sql.catalyst.rules.Rule
31-
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3231
import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration}
33-
import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver, SupportsCatalogOptions}
32+
import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver}
3433
import org.apache.spark.sql.errors.QueryCompilationErrors
3534
import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
3635
import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
@@ -162,21 +161,7 @@ abstract class BaseSessionStateBuilder(
162161
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog)
163162

164163
protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver =
165-
(nameParts: Seq[String]) =>
166-
try {
167-
DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap {
168-
case sco: SupportsCatalogOptions =>
169-
val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths(
170-
CaseInsensitiveMap(Map.empty), nameParts.tail.mkString("."))
171-
val dsOptions = DataSourceV2Utils.buildDsOptions(sco, conf, optionsWithPath)
172-
Some(DataSourceV2Utils.extractCatalogAndIdentifier(sco, dsOptions))
173-
case _ => None
174-
}
175-
} catch {
176-
// The format name is not a registered data source. Fall through and let the caller
177-
// treat it as a catalog/namespace name.
178-
case _: ClassNotFoundException => None
179-
}
164+
DataSourceV2Utils.supportsCatalogOptionsResolver(conf)
180165

181166
protected lazy val catalogManager = {
182167
val cm = new CatalogManager(v2SessionCatalog, catalog, dataSourceCatalogResolver)

sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,13 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession {
121121
}
122122
// The format head is not a registered data source and not a catalog. Resolution
123123
// falls through to a normal table-not-found error.
124-
assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" ||
125-
e.getMessage.toLowerCase.contains("not found"))
124+
assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND")
126125
}
127126

128127
test("VERSION AS OF on path-based table") {
129128
val base = "pathformat.`/p`"
130129
// InMemoryTableCatalog implements time travel by appending the version string to the
131-
// identifier name (see loadTable(ident, version) it looks up name + version).
130+
// identifier name (see loadTable(ident, version) - it looks up name + version).
132131
val versioned = "pathformat.`/pv1`"
133132
sql(s"CREATE TABLE $base (id INT)")
134133
sql(s"CREATE TABLE $versioned (id INT)")

0 commit comments

Comments
 (0)