From 67373bf9309ad949e6c32accf8a4741e65fb12b2 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Thu, 21 May 2026 14:11:57 +0000 Subject: [PATCH] [SPARK-56643][SPARK-56619][CONNECT][SQL][TESTS] Share DSv2 test cases between classic and connect Refactors DSv2 test organization to eliminate duplication between classic and Connect mode tests. Introduces shared test traits that define test scenarios once and run them in both modes. New shared traits in sql/core (accessible from both sql/core and sql/connect): - DSv2ExternalMutationTestBase: base trait with 5 abstract methods (withTestSession, checkRows, getTableCatalog, withTestTableAndViews, testPrefix) and shared externalAppend helper - DSv2TempViewWithStoredPlanTests: 21 temp view tests (7 scenarios x 3 variants) - DSv2RepeatedSQLTests: 9 repeated sql() access tests (3 scenarios x 3 variants) Classic suite (DataSourceV2DataFrameSuite): - Mixes in DSv2TempViewWithStoredPlanTests and DSv2RepeatedSQLTests - Implements abstract methods using classic session, checkAnswer, catalog() - Removes 578 lines of inline test code Connect suites (thin wrappers): - DataSourceV2TempViewConnectSuite (72 lines vs 739 previously) - DataSourceV2RepeatedSQLConnectSuite (144 lines, includes 3 connect-specific "reused DataFrame" tests) Co-authored-by: Isaac --- .../DataSourceV2RepeatedSQLConnectSuite.scala | 144 +++++ .../DataSourceV2TempViewConnectSuite.scala | 72 +++ .../DSv2ExternalMutationTestBase.scala | 87 +++ .../sql/connector/DSv2RepeatedSQLTests.scala | 215 +++++++ .../DSv2TempViewWithStoredPlanTests.scala | 587 +++++++++++++++++ .../DataSourceV2DataFrameSuite.scala | 606 +----------------- 6 files changed, 1133 insertions(+), 578 deletions(-) create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala new file mode 100644 index 0000000000000..c9f04e5aebaf2 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RepeatedSQLConnectSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.DSv2RepeatedSQLTests +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.IntegerType + +/** + * Connect-mode runner for [[DSv2RepeatedSQLTests]], plus Connect-specific "reused DataFrame" + * tests that verify Connect's re-analysis behavior when the same DataFrame object is collected + * multiple times across external mutations. + */ +class DataSourceV2RepeatedSQLConnectSuite + extends SparkConnectServerTest + with DSv2RepeatedSQLTests { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + .set("spark.sql.catalog.cachingcat", classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") + + override protected def testPrefix: String = "[connect] " + + override protected def withTestSession(fn: SparkSession => Unit): Unit = + withSession(fn) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg)) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + val serverSession = getServerSession(session) + val catalog = serverSession.sessionState.catalogManager.catalog(catalogName) + val ct = implicitly[ClassTag[C]] + require( + ct.runtimeClass.isInstance(catalog), + s"Expected ${ct.runtimeClass.getName} but got ${catalog.getClass.getName}") + catalog.asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + try { fn } + finally { + views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect()) + session.sql(s"DROP TABLE IF EXISTS $table").collect() + } + } + + // Connect-specific: reusing the same DataFrame object across mutations. + // In Connect, each action re-sends the plan for fresh analysis. + + private val T = "testcat.ns1.ns2.tbl" + private val dfTestIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + test("[connect] reused DataFrame reflects external write") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = dfTestIdent, row = InternalRow(2, 200)) + + // same df object, Connect re-analyzes and sees the new row + checkRows(df, Seq(Row(1, 100), Row(2, 200))) + } + } + } + + test("[connect] reused DataFrame reflects external schema change") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(dfTestIdent, addCol) + + externalAppend(cat = cat, ident = dfTestIdent, row = InternalRow(2, 200, -1)) + + // same df object, Connect re-analyzes and sees the new schema + checkRows(df, Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + test("[connect] reused DataFrame reflects external drop/recreate") { + withSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + + val df = session.sql(s"SELECT * FROM $T") + checkRows(df, Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + cat.dropTable(dfTestIdent) + cat.createTable( + dfTestIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // same df object, Connect re-analyzes against the new empty table + checkRows(df, Seq.empty) + } + } + } +} diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala new file mode 100644 index 0000000000000..eae61ebc53667 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} +import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, InMemoryTableCatalog, TableCatalog} + +/** + * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. + * All test logic lives in the shared trait; this class only provides + * the Connect-specific session, catalog access, and assertion wiring. + */ +class DataSourceV2TempViewConnectSuite + extends SparkConnectServerTest + with DSv2TempViewWithStoredPlanTests { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + .set("spark.sql.catalog.cachingcat", classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") + + override protected def testPrefix: String = "[connect] " + + override protected def withTestSession(fn: SparkSession => Unit): Unit = + withSession(fn) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg)) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + val serverSession = getServerSession(session) + val catalog = serverSession.sessionState.catalogManager.catalog(catalogName) + val ct = implicitly[ClassTag[C]] + require( + ct.runtimeClass.isInstance(catalog), + s"Expected ${ct.runtimeClass.getName} but got ${catalog.getClass.getName}") + catalog.asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + try { fn } + finally { + views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect()) + session.sql(s"DROP TABLE IF EXISTS $table").collect() + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala new file mode 100644 index 0000000000000..d3245b63fd7cf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import scala.reflect.ClassTag + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Base trait for DSv2 tests that involve external table mutations (writes, schema changes, + * drop/recreate) via the catalog API. + * + * Provides abstract methods so that the same test scenarios can run in both classic mode + * (where the test session IS the server session) and Connect mode (where the test session + * is a Connect client and catalog access requires the server session). + * + * Concrete suites override the abstract methods and mix in one or more of the test traits: + * [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedSQLTests]], [[DSv2CacheTableTests]]. + */ +trait DSv2ExternalMutationTestBase extends SharedSparkSession { + + /** Prefix for test names, e.g. "[connect] " for Connect suites, "" for classic. */ + protected def testPrefix: String = "" + + /** + * Execute a test body with a session. Classic: `fn(spark)`. Connect: `withSession(fn)`. + */ + protected def withTestSession(fn: SparkSession => Unit): Unit + + /** + * Assert that a DataFrame's rows match the expected rows (order-agnostic). + * Classic: delegates to [[org.apache.spark.sql.QueryTest.checkAnswer]]. + * Connect: collects rows and compares with [[org.apache.spark.sql.QueryTest.sameRows]]. + */ + protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit + + /** + * Get a server-side [[TableCatalog]] by name. + * Classic: the session is the server session, so access the catalog directly. + * Connect: get the server session behind the Connect client, then access the catalog. + */ + protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C + + /** + * Cleanup wrapper: drop views and the table after the test body, even on failure. + * Classic: delegates to `withTable` + manual view drops. + * Connect: `session.sql("DROP ...")` in a finally block. + */ + protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit + + /** Appends a row to a DSv2 table via the catalog API, bypassing the session. */ + protected def externalAppend( + cat: TableCatalog, + ident: Identifier, + row: InternalRow): Unit = { + val extTable = cat + .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns()) + extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala new file mode 100644 index 0000000000000..3c87997d7392f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedSQLTests.scala @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.IntegerType + +/** + * Shared repeated sql() access tests for DSv2 tables. Each sql() call creates a fresh + * plan, so it always sees the latest data, schema, and table identity. + * + * Mixed into both classic [[DataSourceV2DataFrameSuite]] and Connect + * [[org.apache.spark.sql.connect.DataSourceV2RepeatedSQLConnectSuite]]. + */ +trait DSv2RepeatedSQLTests extends DSv2ExternalMutationTestBase { + + private val T = "testcat.ns1.ns2.tbl" + private val CT = "cachingcat.ns1.ns2.tbl" + private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + // Scenario 1: writes + test(s"${testPrefix}repeated sql() reflects session write") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external write") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1 connector w/ cache (external write, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external write") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + // Caching connector returns stale table: external write invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2: schema changes + test(s"${testPrefix}repeated sql() reflects session schema change") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + checkRows( + session.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external schema change") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + checkRows( + session.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 2 connector w/ cache (external schema change, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external schema change") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // Caching connector returns stale table: external changes invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, schema change + data visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows( + session.sql(s"SELECT * FROM $CT"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 3: drop and recreate table + test(s"${testPrefix}repeated sql() reflects session drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + session.sql(s"DROP TABLE $T").collect() + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty) + } + } + } + + test(s"${testPrefix}repeated sql() reflects external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, T) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty) + } + } + } + + // Scenario 3 connector w/ cache (external drop/recreate, caching connector) + test(s"${testPrefix}connector w/ cache: repeated sql() stale after external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, CT) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, new empty table visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.sql(s"SELECT * FROM $CT"), Seq.empty) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala new file mode 100644 index 0000000000000..8a46eec5b811a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} + +/** + * Shared temp view with stored plan tests for DSv2 tables. These tests verify that temp views + * backed by DSv2 tables correctly handle data changes, schema changes, and table recreation, + * both via session SQL and external catalog mutations. + * + * Mixed into both classic [[DataSourceV2DataFrameSuite]] and Connect + * [[org.apache.spark.sql.connect.DataSourceV2TempViewConnectSuite]]. + */ +trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase { + + private val T = "testcat.ns1.ns2.tbl" + private val CT = "cachingcat.ns1.ns2.tbl" + private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl") + + // Scenario 1.1 (session write) + test(s"${testPrefix}temp view with stored plan reflects session write") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1.2 (external write) + test(s"${testPrefix}temp view with stored plan reflects external write") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 1.2 connector w/ cache (external write, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external write") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200)) + + // Caching connector returns stale table: external write invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.1 (session ADD COLUMN) + test(s"${testPrefix}temp view with stored plan preserves schema after session ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() + session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // view preserves original 2-column schema, filter still applied + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.2 (external ADD COLUMN) + test(s"${testPrefix}temp view with stored plan preserves schema after external ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // view preserves original 2-column schema, filter still applied + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external ADD COLUMN") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(testIdent, addCol) + + externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1)) + + // Caching connector returns stale table: external changes invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + } + + // Scenario 3.1 (session column removal) + test(s"${testPrefix}temp view with stored plan detects session column removal") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 3.2 (external column removal) + test(s"${testPrefix}temp view with stored plan detects external column removal") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(testIdent, dropCol) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 3.2 connector w/ cache (external column removal, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external column removal") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(testIdent, dropCol) + + // Caching connector returns stale table: column removal invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, column removal detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + } + + // Scenario 4.1 (session drop and recreate table) + test(s"${testPrefix}temp view with stored plan resolves to session-recreated table") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val originalTableId = cat.loadTable(testIdent).id + + session.sql(s"DROP TABLE $T").collect() + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + + val newTableId = cat.loadTable(testIdent).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkRows(session.table("v"), Seq.empty) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(2, 200))) + } + } + } + + // Scenario 4.2 (external drop and recreate table) + test(s"${testPrefix}temp view with stored plan resolves to externally recreated table") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val originalTableId = cat.loadTable(testIdent).id + + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = cat.loadTable(testIdent).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkRows(session.table("v"), Seq.empty) + + session.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkRows(session.table("v"), Seq(Row(2, 200))) + } + } + } + + // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external drop/recreate") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + cat.dropTable(testIdent) + cat.createTable( + testIdent, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view resolves to new empty table + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq.empty) + } + } + } + + // Scenario 5.1 (session drop and re-add column with same type, multiple views) + test(s"${testPrefix}temp view with stored plan after session drop and re-add column same type" + + " with unfiltered view") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v", "v_no_filter", "v_filter_is_null")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + session.table(T).createOrReplaceTempView("v_no_filter") + session.table(T).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkRows(session.table("v"), Seq(Row(1, 100))) + checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkRows(session.table("v_filter_is_null"), Seq.empty) + + // drop and re-add column with same name and type + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect() + + // salary values are now null, so the filtered view returns nothing + checkRows(session.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + } + + // Scenario 5.2 (external drop and re-add column with same type) + test(s"${testPrefix}temp view with stored plan after external drop and re-add column " + + "same type") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v", "v_no_filter", "v_filter_is_null")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + session.table(T).createOrReplaceTempView("v_no_filter") + session.table(T).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkRows(session.table("v"), Seq(Row(1, 100))) + checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkRows(session.table("v_filter_is_null"), Seq.empty) + + // external drop and re-add column via catalog API + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // salary values are now null, so the filtered view returns nothing + checkRows(session.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + } + + // Scenario 5.2 connector w/ cache (external drop/re-add column, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external drop/re-add column " + + "same type") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // Caching connector returns stale table: column drop/re-add invisible + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, salary values are null + session.sql(s"REFRESH TABLE $CT").collect() + checkRows(session.table("v"), Seq.empty) + } + } + } + + // Scenario 6.1 (session drop and re-add column with different type) + test(s"${testPrefix}temp view with stored plan detects session column type change") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 6.2 (external drop and re-add column with different type) + test(s"${testPrefix}temp view with stored plan detects external column type change") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + cat.alterTable(testIdent, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 6.2 connector w/ cache (external column type change, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external column type change") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + cat.alterTable(testIdent, dropCol, addCol) + + // Caching connector returns stale table: type change invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, type change detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + } + + // Scenario 7.1 (session type widening from INT to BIGINT) + test(s"${testPrefix}temp view with stored plan detects session type widening") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect() + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } + + // Scenario 7.2 (external type widening from INT to BIGINT) + test(s"${testPrefix}temp view with stored plan detects external type widening") { + withTestSession { session => + withTestTableAndViews(session, T, Seq("v")) { + session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + session.table(T).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat") + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + cat.alterTable(testIdent, updateType) + + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } + + // Scenario 7.2 connector w/ cache (external type widening, caching connector) + test(s"${testPrefix}connector w/ cache: temp view stale after external type widening") { + withTestSession { session => + withTestTableAndViews(session, CT, Seq("v")) { + session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect() + session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect() + + session.table(CT).filter("salary < 999").createOrReplaceTempView("v") + checkRows(session.table("v"), Seq(Row(1, 100))) + + val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat") + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + cat.alterTable(testIdent, updateType) + + // Caching connector returns stale table: type change invisible, no error + checkRows(session.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, type change detected + session.sql(s"REFRESH TABLE $CT").collect() + checkError( + exception = intercept[AnalysisException] { session.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 4fc93609fb41e..948572c963d7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -21,14 +21,15 @@ import java.util import java.util.Collections import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -46,7 +47,9 @@ import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String class DataSourceV2DataFrameSuite - extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) { + extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) + with DSv2TempViewWithStoredPlanTests + with DSv2RepeatedSQLTests { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import testImplicits._ @@ -87,6 +90,28 @@ class DataSourceV2DataFrameSuite catalog.asInstanceOf[InMemoryTableCatalog] } + // DSv2ExternalMutationTestBase implementations for classic mode + override protected def withTestSession(fn: SparkSession => Unit): Unit = fn(spark) + + override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit = + checkAnswer(df, expected) + + override protected def getTableCatalog[C <: TableCatalog: ClassTag]( + session: SparkSession, + catalogName: String): C = { + catalog(catalogName).asInstanceOf[C] + } + + override protected def withTestTableAndViews( + session: SparkSession, + table: String, + views: Seq[String] = Seq.empty)(fn: => Unit): Unit = { + withTable(table) { + try { fn } + finally { views.foreach(v => sql(s"DROP VIEW IF EXISTS $v")) } + } + } + override def verifyTable(tableName: String, expected: DataFrame): Unit = { checkAnswer(spark.table(tableName), expected) } @@ -2989,581 +3014,6 @@ class DataSourceV2DataFrameSuite } } - // Temp views with stored plans: scenarios from the DSv2 table refresh tests. - // Each test creates a DSv2 table with initial data, builds a temp view with a filter - // (to demonstrate that the stored plan is non-trivial), and then verifies the view - // behavior after various table modifications (session or external). - - /** Appends a row to a DSv2 table via the catalog API, bypassing the session. */ - // The row layout must match the current table column order. - private def externalAppend( - catalogName: String, - ident: Identifier, - row: InternalRow): Unit = { - val extTable = catalog(catalogName).loadTable(ident, - util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] - val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns()) - extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row))) - } - - // Scenario 1.1 (session write) - test("temp view with stored plan reflects session write") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - sql(s"INSERT INTO $t VALUES (2, 200)") - - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 1.2 (external write) - test("temp view with stored plan reflects external write") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external writer adds (2, 200) via direct catalog API - externalAppend( - catalogName = "testcat", - ident = ident, - row = InternalRow(2, 200)) - - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 1.2 connector w/ cache (external write, caching connector) - test("connector w/ cache: temp view stale after external write") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external writer adds (2, 200) via catalog API (bypasses cache) - externalAppend( - catalogName = "cachingcat", - ident = ident, - row = InternalRow(2, 200)) - - // Caching connector returns stale table: external write invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, external write becomes visible - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.1 (session ADD COLUMN) - test("temp view with stored plan preserves schema after session ADD COLUMN") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - sql(s"ALTER TABLE $t ADD COLUMN new_column INT") - sql(s"INSERT INTO $t VALUES (2, 200, -1)") - - // view preserves original 2-column schema, filter still applied - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.2 (external ADD COLUMN) - test("temp view with stored plan preserves schema after external ADD COLUMN") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change via catalog API - val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) - catalog("testcat").alterTable(ident, addCol) - - // external writer adds data with new schema - externalAppend( - catalogName = "testcat", - ident = ident, - row = InternalRow(2, 200, -1)) - - // view preserves original 2-column schema, filter still applied - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) - test("connector w/ cache: temp view stale after external ADD COLUMN") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change + data via catalog API - val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) - catalog("cachingcat").alterTable(ident, addCol) - - externalAppend( - catalogName = "cachingcat", - ident = ident, - row = InternalRow(2, 200, -1)) - - // Caching connector returns stale table: external changes invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) - } - } - - // Scenario 3.1 (session column removal) - test("temp view with stored plan detects session column removal") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // session schema change via SQL - sql(s"ALTER TABLE $t DROP COLUMN salary") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 3.2 (external column removal) - test("temp view with stored plan detects external column removal") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external schema change via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - catalog("testcat").alterTable(ident, dropCol) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 3.2 connector w/ cache (external column removal, caching connector) - test("connector w/ cache: temp view stale after external column removal") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external column removal via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - catalog("cachingcat").alterTable(ident, dropCol) - - // Caching connector returns stale table: column removal invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, column removal detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - } - } - - // Scenario 4.1 (session drop and recreate table) - test("temp view with stored plan resolves to session-recreated table") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - val originalTableId = catalog("testcat").loadTable(ident).id - - // session drop and recreate via SQL - sql(s"DROP TABLE $t") - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - - val newTableId = catalog("testcat").loadTable(ident).id - assert(originalTableId != newTableId) - - // view resolves to the new empty table - checkAnswer(spark.table("v"), Seq.empty) - - // insert new data and verify the view picks it up - sql(s"INSERT INTO $t VALUES (2, 200)") - checkAnswer(spark.table("v"), Seq(Row(2, 200))) - } - } - - // Scenario 4.2 (external drop and recreate table) - test("temp view with stored plan resolves to externally recreated table") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - val originalTableId = catalog("testcat").loadTable(ident).id - - // external drop and recreate via catalog API - catalog("testcat").dropTable(ident) - catalog("testcat").createTable( - ident, - new TableInfo.Builder() - .withColumns(Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType))) - .build()) - - val newTableId = catalog("testcat").loadTable(ident).id - assert(originalTableId != newTableId) - - // view resolves to the new empty table - checkAnswer(spark.table("v"), Seq.empty) - - // insert new data and verify the view picks it up - sql(s"INSERT INTO $t VALUES (2, 200)") - checkAnswer(spark.table("v"), Seq(Row(2, 200))) - } - } - - // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) - test("connector w/ cache: temp view stale after external drop/recreate") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and recreate via catalog API - catalog("cachingcat").dropTable(ident) - catalog("cachingcat").createTable( - ident, - new TableInfo.Builder() - .withColumns(Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType))) - .build()) - - // Caching connector returns stale table: drop/recreate invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, view resolves to new empty table - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq.empty) - } - } - - // Scenario 5.1 (session drop and re-add column with same type, multiple views) - test("temp view with stored plan after session drop and re-add column same type" + - " with unfiltered view") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - spark.table(t).createOrReplaceTempView("v_no_filter") - spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) - checkAnswer(spark.table("v_filter_is_null"), Seq.empty) - - // drop and re-add column with same name and type - sql(s"ALTER TABLE $t DROP COLUMN salary") - sql(s"ALTER TABLE $t ADD COLUMN salary INT") - - // salary values are now null, so the filtered view returns nothing - checkAnswer(spark.table("v"), Seq.empty) - // unfiltered view returns rows with null salary - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) - // IS NULL filter now matches all rows - checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) - } - } - - // Scenario 5.2 (external drop and re-add column with same type) - test("temp view with stored plan after external drop and re-add column same type") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - spark.table(t).createOrReplaceTempView("v_no_filter") - spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) - checkAnswer(spark.table("v_filter_is_null"), Seq.empty) - - // external drop and re-add column via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - catalog("testcat").alterTable(ident, dropCol, addCol) - - // salary values are now null, so the filtered view returns nothing - checkAnswer(spark.table("v"), Seq.empty) - // unfiltered view returns rows with null salary - checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) - // IS NULL filter now matches all rows - checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) - } - } - - // Scenario 5.2 connector w/ cache (external drop/re-add column, caching connector) - test("connector w/ cache: temp view stale after external drop/re-add column same type") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with same type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - catalog("cachingcat").alterTable(ident, dropCol, addCol) - - // Caching connector returns stale table: column drop/re-add invisible - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, salary values are null - sql(s"REFRESH TABLE $t") - checkAnswer(spark.table("v"), Seq.empty) - } - } - - // Scenario 6.1 (session drop and re-add column with different type) - test("temp view with stored plan detects session column type change") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // drop and re-add column with same name but different type - sql(s"ALTER TABLE $t DROP COLUMN salary") - sql(s"ALTER TABLE $t ADD COLUMN salary STRING") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 6.2 (external drop and re-add column with different type) - test("temp view with stored plan detects external column type change") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with different type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), StringType, true) - catalog("testcat").alterTable(ident, dropCol, addCol) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 6.2 connector w/ cache (external column type change, caching connector) - test("connector w/ cache: temp view stale after external column type change") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external drop and re-add column with different type via catalog API - val dropCol = TableChange.deleteColumn(Array("salary"), false) - val addCol = TableChange.addColumn(Array("salary"), StringType, true) - catalog("cachingcat").alterTable(ident, dropCol, addCol) - - // Caching connector returns stale table: type change invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, type change detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to STRING")) - } - } - - // Scenario 7.1 (session type widening from INT to BIGINT) - test("temp view with stored plan detects session type widening") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // session type widening via SQL - sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG") - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - - // Scenario 7.2 (external type widening from INT to BIGINT) - test("temp view with stored plan detects external type widening") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // widen salary type from INT to BIGINT via catalog API - val updateType = TableChange.updateColumnType(Array("salary"), LongType) - catalog("testcat").alterTable(ident, updateType) - - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - - // Scenario 7.2 connector w/ cache (external type widening, caching connector) - test("connector w/ cache: temp view stale after external type widening") { - val t = "cachingcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") - - spark.table(t).filter("salary < 999").createOrReplaceTempView("v") - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // external type widening via catalog API - val updateType = TableChange.updateColumnType(Array("salary"), LongType) - catalog("cachingcat").alterTable(ident, updateType) - - // Caching connector returns stale table: type change invisible, no error - checkAnswer(spark.table("v"), Seq(Row(1, 100))) - - // REFRESH TABLE invalidates the connector cache, type change detected - sql(s"REFRESH TABLE $t") - checkError( - exception = intercept[AnalysisException] { spark.table("v").collect() }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` type has changed from INT to BIGINT")) - } - } - test("cached DSv2 table DataFrame is refreshed and reused after insert") { val t = "testcat.ns1.ns2.tbl" withTable(t) {