diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala new file mode 100644 index 0000000000000..c9f04e5aebaf2 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.DSv2RepeatedSQLTests +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.IntegerType + +/** + * Connect-mode runner for [[DSv2RepeatedSQLTests]], plus Connect-specific "reused DataFrame" + * tests that verify Connect's re-analysis behavior when the same DataFrame object is collected + * multiple times across external mutations. + */ +class DataSourceV2RepeatedSQLConnectSuite + extends SparkConnectServerTest + with DSv2RepeatedSQLTests { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + .set("spark.sql.catalog.cachingcat", classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") + + override protected def testPrefix: String = "[connect] " + + override protected def withTestSession(fn: SparkSession => Unit): Unit = + withSession(fn) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg)) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + val serverSession = getServerSession(session) + val catalog = serverSession.sessionState.catalogManager.catalog(catalogName) + val ct = implicitly[ClassTag[C]] + require( + ct.runtimeClass.isInstance(catalog), + s"Expected ${ct.runtimeClass.getName} but got ${catalog.getClass.getName}") + catalog.asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + try { fn } + finally { + views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect()) + session.sql(s"DROP TABLE IF EXISTS $table").collect() + } + } + + // Connect-specific: reusing the same DataFrame object across mutations. + // In Connect, each action re-sends the plan for fresh analysis. + + private val T = "testcat.ns1.ns2.tbl" + private val dfTestIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + test("[connect] reused DataFrame reflects external write") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = dfTestIdent, row = InternalRow(2, 200)) + + // same df object, Connect re-analyzes and sees the new row + checkRows(df, Seq(Row(1, 100), Row(2, 200))) + } + } + } + + test("[connect] reused DataFrame reflects external schema change") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(dfTestIdent, addCol) + + externalAppend(cat = cat, ident = dfTestIdent, row = InternalRow(2, 200, -1)) + + // same df object, Connect re-analyzes and sees the new schema + checkRows(df, Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + test("[connect] reused DataFrame reflects external drop/recreate") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + cat.dropTable(dfTestIdent) + cat.createTable( + dfTestIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // same df object, Connect re-analyzes against the new empty table + checkRows(df, Seq.empty) + } + } + } +} diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala new file mode 100644 index 0000000000000..eae61ebc53667 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} +import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, InMemoryTableCatalog, TableCatalog} + +/** + * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. + * All test logic lives in the shared trait; this class only provides + * the Connect-specific session, catalog access, and assertion wiring. + */ +class DataSourceV2TempViewConnectSuite + extends SparkConnectServerTest + with DSv2TempViewWithStoredPlanTests { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + .set("spark.sql.catalog.cachingcat", classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") + + override protected def testPrefix: String = "[connect] " + + override protected def withTestSession(fn: SparkSession => Unit): Unit = + withSession(fn) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg)) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + val serverSession = getServerSession(session) + val catalog = serverSession.sessionState.catalogManager.catalog(catalogName) + val ct = implicitly[ClassTag[C]] + require( + ct.runtimeClass.isInstance(catalog), + s"Expected ${ct.runtimeClass.getName} but got ${catalog.getClass.getName}") + catalog.asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + try { fn } + finally { + views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect()) + session.sql(s"DROP TABLE IF EXISTS $table").collect() + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala new file mode 100644 index 0000000000000..d3245b63fd7cf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import scala.reflect.ClassTag + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Base trait for DSv2 tests that involve external table mutations (writes, schema changes, + * drop/recreate) via the catalog API. + * + * Provides abstract methods so that the same test scenarios can run in both classic mode + * (where the test session IS the server session) and Connect mode (where the test session + * is a Connect client and catalog access requires the server session). + * + * Concrete suites override the abstract methods and mix in one or more of the test traits: + * [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedSQLTests]], [[DSv2CacheTableTests]]. + */ +trait DSv2ExternalMutationTestBase extends SharedSparkSession { + + /** Prefix for test names, e.g. "[connect] " for Connect suites, "" for classic. */ + protected def testPrefix: String = "" + + /** + * Execute a test body with a session. Classic: `fn(spark)`. Connect: `withSession(fn)`. + */ + protected def withTestSession(fn: SparkSession => Unit): Unit + + /** + * Assert that a DataFrame's rows match the expected rows (order-agnostic). + * Classic: delegates to [[org.apache.spark.sql.QueryTest.checkAnswer]]. + * Connect: collects rows and compares with [[org.apache.spark.sql.QueryTest.sameRows]]. + */ + protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit + + /** + * Get a server-side [[TableCatalog]] by name. + * Classic: the session is the server session, so access the catalog directly. + * Connect: get the server session behind the Connect client, then access the catalog. + */ + protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C + + /** + * Cleanup wrapper: drop views and the table after the test body, even on failure. + * Classic: delegates to `withTable` + manual view drops. + * Connect: `session.sql("DROP ...")` in a finally block. + */ + protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit + + /** Appends a row to a DSv2 table via the catalog API, bypassing the session. */ + protected def externalAppend( + cat: TableCatalog, + ident: Identifier, + row: InternalRow): Unit = { + val extTable = cat + .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns()) + extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala new file mode 100644 index 0000000000000..3c87997d7392f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.IntegerType + +/** + * Shared repeated sql() access tests for DSv2 tables. Each sql() call creates a fresh + * plan, so it always sees the latest data, schema, and table identity. + * + * Mixed into both classic [[DataSourceV2DataFrameSuite]] and Connect + * [[org.apache.spark.sql.connect.DataSourceV2RepeatedSQLConnectSuite]]. + */ +trait DSv2RepeatedSQLTests extends DSv2ExternalMutationTestBase { + + private val T = "testcat.ns1.ns2.tbl" + private val CT = "cachingcat.ns1.ns2.tbl" + private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + // Scenario 1: writes + test(s"${testPrefix}repeated sql() reflects session write") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external write") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1 connector w/ cache (external write, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external write") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + // Caching connector returns stale table: external write invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2: schema changes + test(s"${testPrefix}repeated sql() reflects session schema change") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + checkRows( + session.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external schema change") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + checkRows( + session.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 2 connector w/ cache (external schema change, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external schema change") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // Caching connector returns stale table: external changes invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, schema change + data visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows( + session.sql(s"SELECT * FROM $CT"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 3: drop and recreate table + test(s"${testPrefix}repeated sql() reflects session drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"DROP TABLE $T").collect() + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty) + } + } + } + + // Scenario 3 connector w/ cache (external drop/recreate, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, new empty table visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq.empty) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala new file mode 100644 index 0000000000000..8a46eec5b811a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} + +/** + * Shared temp view with stored plan tests for DSv2 tables. These tests verify that temp views + * backed by DSv2 tables correctly handle data changes, schema changes, and table recreation, + * both via session SQL and external catalog mutations. + * + * Mixed into both classic [[DataSourceV2DataFrameSuite]] and Connect + * [[org.apache.spark.sql.connect.DataSourceV2TempViewConnectSuite]]. + */ +trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase { + + private val T = "testcat.ns1.ns2.tbl" + private val CT = "cachingcat.ns1.ns2.tbl" + private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + // Scenario 1.1 (session write) + test(s"${testPrefix}temp view with stored plan reflects session write") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1.2 (external write) + test(s"${testPrefix}temp view with stored plan reflects external write") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1.2 connector w/ cache (external write, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external write") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + // Caching connector returns stale table: external write invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.1 (session ADD COLUMN) + test(s"${testPrefix}temp view with stored plan preserves schema after session ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() + session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // view preserves original 2-column schema, filter still applied + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.2 (external ADD COLUMN) + test(s"${testPrefix}temp view with stored plan preserves schema after external ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // view preserves original 2-column schema, filter still applied + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // Caching connector returns stale table: external changes invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 3.1 (session column removal) + test(s"${testPrefix}temp view with stored plan detects session column removal") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 3.2 (external column removal) + test(s"${testPrefix}temp view with stored plan detects external column removal") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(testIdent, dropCol) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 3.2 connector w/ cache (external column removal, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external column removal") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(testIdent, dropCol) + + // Caching connector returns stale table: column removal invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, column removal detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 4.1 (session drop and recreate table) + test(s"${testPrefix}temp view with stored plan resolves to session-recreated table") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val originalTableId = cat.loadTable(testIdent).id + + session.sql(s"DROP TABLE $T").collect() + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + + val newTableId = cat.loadTable(testIdent).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkRows(session.table("v"), Seq.empty) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(2, 200))) + } + } + } + + // Scenario 4.2 (external drop and recreate table) + test(s"${testPrefix}temp view with stored plan resolves to externally recreated table") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val originalTableId = cat.loadTable(testIdent).id + + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = cat.loadTable(testIdent).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkRows(session.table("v"), Seq.empty) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(2, 200))) + } + } + } + + // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view resolves to new empty table + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq.empty) + } + } + } + + // Scenario 5.1 (session drop and re-add column with same type, multiple views) + test(s"${testPrefix}temp view with stored plan after session drop and re-add column same type" + + " with unfiltered view") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v", "v_no_filter", "v_filter_is_null")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + session.table(T).createOrReplaceTempView("v_no_filter") + session.table(T).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkRows(session.table("v"), Seq(Row(1, 100))) + checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkRows(session.table("v_filter_is_null"), Seq.empty) + + // drop and re-add column with same name and type + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect() + + // salary values are now null, so the filtered view returns nothing + checkRows(session.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + } + + // Scenario 5.2 (external drop and re-add column with same type) + test(s"${testPrefix}temp view with stored plan after external drop and re-add column " + + "same type") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v", "v_no_filter", "v_filter_is_null")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + session.table(T).createOrReplaceTempView("v_no_filter") + session.table(T).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkRows(session.table("v"), Seq(Row(1, 100))) + checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkRows(session.table("v_filter_is_null"), Seq.empty) + + // external drop and re-add column via catalog API + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // salary values are now null, so the filtered view returns nothing + checkRows(session.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + } + + // Scenario 5.2 connector w/ cache (external drop/re-add column, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external drop/re-add column " + + "same type") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // Caching connector returns stale table: column drop/re-add invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, salary values are null + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq.empty) + } + } + } + + // Scenario 6.1 (session drop and re-add column with different type) + test(s"${testPrefix}temp view with stored plan detects session column type change") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 6.2 (external drop and re-add column with different type) + test(s"${testPrefix}temp view with stored plan detects external column type change") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + cat.alterTable(testIdent, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 6.2 connector w/ cache (external column type change, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external column type change") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // Caching connector returns stale table: type change invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, type change detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 7.1 (session type widening from INT to BIGINT) + test(s"${testPrefix}temp view with stored plan detects session type widening") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } + + // Scenario 7.2 (external type widening from INT to BIGINT) + test(s"${testPrefix}temp view with stored plan detects external type widening") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + cat.alterTable(testIdent, updateType) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } + + // Scenario 7.2 connector w/ cache (external type widening, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external type widening") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + cat.alterTable(testIdent, updateType) + + // Caching connector returns stale table: type change invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, type change detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 4fc93609fb41e..948572c963d7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -21,14 +21,15 @@ import java.util import java.util.Collections import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -46,7 +47,9 @@ import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String class DataSourceV2DataFrameSuite - extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) { + extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) + with DSv2TempViewWithStoredPlanTests + with DSv2RepeatedSQLTests { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import testImplicits._ @@ -87,6 +90,28 @@ class DataSourceV2DataFrameSuite catalog.asInstanceOf[InMemoryTableCatalog] } + // DSv2ExternalMutationTestBase implementations for classic mode + override protected def withTestSession(fn: SparkSession => Unit): Unit = fn(spark) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + checkAnswer(df, expected) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + catalog(catalogName).asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + withTable(table) { + try { fn } + finally { views.foreach(v => sql(s"DROP VIEW IF EXISTS $v")) } + } + } + override def verifyTable(tableName: String, expected: DataFrame): Unit = { checkAnswer(spark.table(tableName), expected) } @@ -2989,581 +3014,6 @@ class DataSourceV2DataFrameSuite } } - // Temp views with stored plans: scenarios from the DSv2 table refresh tests. - // Each test creates a DSv2 table with initial data, builds a temp view with a filter - // (to demonstrate that the stored plan is non-trivial), and then verifies the view - // behavior after various table modifications (session or external). - - /** Appends a row to a DSv2 table via the catalog API, bypassing the session. */ - // The row layout must match the current table column order. - private def externalAppend( - catalogName: String, - ident: Identifier, - row: InternalRow): Unit = { - val extTable = catalog(catalogName).loadTable(ident, - util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] - val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns()) - extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) - } - - // Scenario 1.1 (session write) - test("temp view with stored plan reflects session write") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - sql(s"INSERT INTO $t VALUES (2, 200)") - - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 1.2 (external write) - test("temp view with stored plan reflects external write") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external writer adds (2, 200) via direct catalog API - externalAppend( - catalogName = "testcat", - ident = ident, - row = InternalRow(2, 200)) - - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 1.2 connector w/ cache (external write, caching connector) - test("connector w/ cache: temp view stale after external write") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external writer adds (2, 200) via catalog API (bypasses cache) - externalAppend( - catalogName = "cachingcat", - ident = ident, - row = InternalRow(2, 200)) - - // Caching connector returns stale table: external write invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, external write becomes visible - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.1 (session ADD COLUMN) - test("temp view with stored plan preserves schema after session ADD COLUMN") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - sql(s"ALTER TABLE $t ADD COLUMN new_column INT") - sql(s"INSERT INTO $t VALUES (2, 200, -1)") - - // view preserves original 2-column schema, filter still applied - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.2 (external ADD COLUMN) - test("temp view with stored plan preserves schema after external ADD COLUMN") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change via catalog API - val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) - catalog("testcat").alterTable(ident, addCol) - - // external writer adds data with new schema - externalAppend( - catalogName = "testcat", - ident = ident, - row = InternalRow(2, 200, -1)) - - // view preserves original 2-column schema, filter still applied - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) - test("connector w/ cache: temp view stale after external ADD COLUMN") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change + data via catalog API - val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) - catalog("cachingcat").alterTable(ident, addCol) - - externalAppend( - catalogName = "cachingcat", - ident = ident, - row = InternalRow(2, 200, -1)) - - // Caching connector returns stale table: external changes invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 3.1 (session column removal) - test("temp view with stored plan detects session column removal") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // session schema change via SQL - sql(s"ALTER TABLE $t DROP COLUMN salary") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 3.2 (external column removal) - test("temp view with stored plan detects external column removal") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - catalog("testcat").alterTable(ident, dropCol) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 3.2 connector w/ cache (external column removal, caching connector) - test("connector w/ cache: temp view stale after external column removal") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external column removal via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - catalog("cachingcat").alterTable(ident, dropCol) - - // Caching connector returns stale table: column removal invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, column removal detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 4.1 (session drop and recreate table) - test("temp view with stored plan resolves to session-recreated table") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - val originalTableId = catalog("testcat").loadTable(ident).id - - // session drop and recreate via SQL - sql(s"DROP TABLE $t") - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - - val newTableId = catalog("testcat").loadTable(ident).id - assert(originalTableId != newTableId) - - // view resolves to the new empty table - checkAnswer(spark.table("v"), Seq.empty) - - // insert new data and verify the view picks it up - sql(s"INSERT INTO $t VALUES (2, 200)") - checkAnswer(spark.table("v"), Seq(Row(2, 200))) - } - } - - // Scenario 4.2 (external drop and recreate table) - test("temp view with stored plan resolves to externally recreated table") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - val originalTableId = catalog("testcat").loadTable(ident).id - - // external drop and recreate via catalog API - catalog("testcat").dropTable(ident) - catalog("testcat").createTable( - ident, - new TableInfo.Builder() - .withColumns(Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType))) - .build()) - - val newTableId = catalog("testcat").loadTable(ident).id - assert(originalTableId != newTableId) - - // view resolves to the new empty table - checkAnswer(spark.table("v"), Seq.empty) - - // insert new data and verify the view picks it up - sql(s"INSERT INTO $t VALUES (2, 200)") - checkAnswer(spark.table("v"), Seq(Row(2, 200))) - } - } - - // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) - test("connector w/ cache: temp view stale after external drop/recreate") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and recreate via catalog API - catalog("cachingcat").dropTable(ident) - catalog("cachingcat").createTable( - ident, - new TableInfo.Builder() - .withColumns(Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType))) - .build()) - - // Caching connector returns stale table: drop/recreate invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, view resolves to new empty table - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq.empty) - } - } - - // Scenario 5.1 (session drop and re-add column with same type, multiple views) - test("temp view with stored plan after session drop and re-add column same type" + - " with unfiltered view") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - spark.table(t).createOrReplaceTempView("v_no_filter") - spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) - checkAnswer(spark.table("v_filter_is_null"), Seq.empty) - - // drop and re-add column with same name and type - sql(s"ALTER TABLE $t DROP COLUMN salary") - sql(s"ALTER TABLE $t ADD COLUMN salary INT") - - // salary values are now null, so the filtered view returns nothing - checkAnswer(spark.table("v"), Seq.empty) - // unfiltered view returns rows with null salary - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) - // IS NULL filter now matches all rows - checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) - } - } - - // Scenario 5.2 (external drop and re-add column with same type) - test("temp view with stored plan after external drop and re-add column same type") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - spark.table(t).createOrReplaceTempView("v_no_filter") - spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) - checkAnswer(spark.table("v_filter_is_null"), Seq.empty) - - // external drop and re-add column via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - catalog("testcat").alterTable(ident, dropCol, addCol) - - // salary values are now null, so the filtered view returns nothing - checkAnswer(spark.table("v"), Seq.empty) - // unfiltered view returns rows with null salary - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) - // IS NULL filter now matches all rows - checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) - } - } - - // Scenario 5.2 connector w/ cache (external drop/re-add column, caching connector) - test("connector w/ cache: temp view stale after external drop/re-add column same type") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with same type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - catalog("cachingcat").alterTable(ident, dropCol, addCol) - - // Caching connector returns stale table: column drop/re-add invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, salary values are null - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq.empty) - } - } - - // Scenario 6.1 (session drop and re-add column with different type) - test("temp view with stored plan detects session column type change") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // drop and re-add column with same name but different type - sql(s"ALTER TABLE $t DROP COLUMN salary") - sql(s"ALTER TABLE $t ADD COLUMN salary STRING") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 6.2 (external drop and re-add column with different type) - test("temp view with stored plan detects external column type change") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with different type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), StringType, true) - catalog("testcat").alterTable(ident, dropCol, addCol) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 6.2 connector w/ cache (external column type change, caching connector) - test("connector w/ cache: temp view stale after external column type change") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with different type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), StringType, true) - catalog("cachingcat").alterTable(ident, dropCol, addCol) - - // Caching connector returns stale table: type change invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, type change detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 7.1 (session type widening from INT to BIGINT) - test("temp view with stored plan detects session type widening") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // session type widening via SQL - sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - - // Scenario 7.2 (external type widening from INT to BIGINT) - test("temp view with stored plan detects external type widening") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // widen salary type from INT to BIGINT via catalog API - val updateType = TableChange.updateColumnType(Array("salary"), LongType) - catalog("testcat").alterTable(ident, updateType) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - - // Scenario 7.2 connector w/ cache (external type widening, caching connector) - test("connector w/ cache: temp view stale after external type widening") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external type widening via catalog API - val updateType = TableChange.updateColumnType(Array("salary"), LongType) - catalog("cachingcat").alterTable(ident, updateType) - - // Caching connector returns stale table: type change invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, type change detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - test("cached DSv2 table DataFrame is refreshed and reused after insert") { val t = "testcat.ns1.ns2.tbl" withTable(t) {