Skip to content

Commit 6f509d8

Browse files
author
Matthis Gördel
committed
rest of refactor
1 parent 5d49a5e commit 6f509d8

3 files changed

Lines changed: 460 additions & 490 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala

Lines changed: 137 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -81,189 +81,177 @@ trait DSv2CacheTableReadTests extends DSv2ExternalMutationTestBase {
8181

8282
test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table pinned, " +
8383
"REFRESH clears both layers") {
84-
withTestSession { session =>
85-
withTestTableAndViews(session, cachingTestTable) {
86-
session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) USING foo").collect()
87-
session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
88-
89-
session.table(cachingTestTable).cache()
90-
assertTableCached(session, cachingTestTable)
91-
checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
92-
93-
val catalog =
94-
getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
95-
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200))
96-
97-
// Both CacheManager and connector cache are stale: external write invisible
98-
assertTableCached(session, cachingTestTable)
99-
checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
100-
101-
// REFRESH TABLE calls invalidateTable (clears connector cache) and rebuilds
102-
// the CacheManager entry, so the external write becomes visible.
103-
session.sql(s"REFRESH TABLE $cachingTestTable").collect()
104-
assertTableCached(session, cachingTestTable)
105-
checkRows(session.table(cachingTestTable), Seq(Row(1, 100), Row(2, 200)))
106-
}
84+
withTable(cachingTestTable) {
85+
spark.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) USING foo").collect()
86+
spark.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
87+
88+
spark.table(cachingTestTable).cache()
89+
assertTableCached(cachingTestTable)
90+
checkAnswer(spark.table(cachingTestTable), Seq(Row(1, 100)))
91+
92+
val catalog =
93+
getTableCatalog[CachingInMemoryTableCatalog](spark, "cachingcat")
94+
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200))
95+
96+
// Both CacheManager and connector cache are stale: external write invisible
97+
assertTableCached(cachingTestTable)
98+
checkAnswer(spark.table(cachingTestTable), Seq(Row(1, 100)))
99+
100+
// REFRESH TABLE calls invalidateTable (clears connector cache) and rebuilds
101+
// the CacheManager entry, so the external write becomes visible.
102+
spark.sql(s"REFRESH TABLE $cachingTestTable").collect()
103+
assertTableCached(cachingTestTable)
104+
checkAnswer(spark.table(cachingTestTable), Seq(Row(1, 100), Row(2, 200)))
107105
}
108106
}
109107

110108
test(s"${testPrefix}SPARK-54022: session write invalidates cache, " +
111109
"then external write invisible") {
112-
withTestSession { session =>
113-
withTestTableAndViews(session, testTable) {
114-
session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
115-
session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
110+
withTable(testTable) {
111+
spark.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
112+
spark.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
116113

117-
session.table(testTable).cache()
118-
assertTableCached(session, testTable)
119-
checkRows(session.table(testTable), Seq(Row(1, 100)))
114+
spark.table(testTable).cache()
115+
assertTableCached(testTable)
116+
checkAnswer(spark.table(testTable), Seq(Row(1, 100)))
120117

121-
session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
122-
assertTableCached(session, testTable)
123-
checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
118+
spark.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
119+
assertTableCached(testTable)
120+
checkAnswer(spark.table(testTable), Seq(Row(1, 100), Row(2, 200)))
124121

125-
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
126-
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(3, 300))
122+
val catalog = getTableCatalog[InMemoryTableCatalog](spark, "testcat")
123+
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(3, 300))
127124

128-
assertTableCached(session, testTable)
129-
checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
125+
assertTableCached(testTable)
126+
checkAnswer(spark.table(testTable), Seq(Row(1, 100), Row(2, 200)))
130127

131-
session.sql(s"REFRESH TABLE $testTable").collect()
132-
assertTableCached(session, testTable)
133-
checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200), Row(3, 300)))
134-
}
128+
spark.sql(s"REFRESH TABLE $testTable").collect()
129+
assertTableCached(testTable)
130+
checkAnswer(spark.table(testTable), Seq(Row(1, 100), Row(2, 200), Row(3, 300)))
135131
}
136132
}
137133

138134
test(s"${testPrefix}SPARK-54022: cached table pinned against external schema change") {
139-
withTestSession { session =>
140-
withTestTableAndViews(session, testTable) {
141-
session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
142-
session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
143-
144-
session.table(testTable).cache()
145-
assertTableCached(session, testTable)
146-
checkRows(session.table(testTable), Seq(Row(1, 100)))
147-
148-
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
149-
val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true)
150-
catalog.alterTable(testIdent, addCol)
151-
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
152-
153-
assertTableCached(session, testTable)
154-
checkRows(session.table(testTable), Seq(Row(1, 100)))
155-
156-
session.sql(s"REFRESH TABLE $testTable").collect()
157-
assertTableCached(session, testTable)
158-
checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200, -1)))
159-
}
135+
withTable(testTable) {
136+
spark.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
137+
spark.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
138+
139+
spark.table(testTable).cache()
140+
assertTableCached(testTable)
141+
checkAnswer(spark.table(testTable), Seq(Row(1, 100)))
142+
143+
val catalog = getTableCatalog[InMemoryTableCatalog](spark, "testcat")
144+
val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true)
145+
catalog.alterTable(testIdent, addCol)
146+
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
147+
148+
assertTableCached(testTable)
149+
checkAnswer(spark.table(testTable), Seq(Row(1, 100)))
150+
151+
spark.sql(s"REFRESH TABLE $testTable").collect()
152+
assertTableCached(testTable)
153+
checkAnswer(spark.table(testTable), Seq(Row(1, 100, null), Row(2, 200, -1)))
160154
}
161155
}
162156

163157
test(s"${testPrefix}SPARK-54022: session schema change invalidates cache, " +
164158
"external write invisible") {
165-
withTestSession { session =>
166-
withTestTableAndViews(session, testTable) {
167-
session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
168-
session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
159+
withTable(testTable) {
160+
spark.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
161+
spark.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
169162

170-
session.table(testTable).cache()
171-
assertTableCached(session, testTable)
172-
checkRows(session.table(testTable), Seq(Row(1, 100)))
163+
spark.table(testTable).cache()
164+
assertTableCached(testTable)
165+
checkAnswer(spark.table(testTable), Seq(Row(1, 100)))
173166

174-
session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column INT").collect()
175-
assertTableCached(session, testTable)
176-
checkRows(session.table(testTable), Seq(Row(1, 100, null)))
167+
spark.sql(s"ALTER TABLE $testTable ADD COLUMN new_column INT").collect()
168+
assertTableCached(testTable)
169+
checkAnswer(spark.table(testTable), Seq(Row(1, 100, null)))
177170

178-
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
179-
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
171+
val catalog = getTableCatalog[InMemoryTableCatalog](spark, "testcat")
172+
externalAppend(catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
180173

181-
assertTableCached(session, testTable)
182-
checkRows(session.table(testTable), Seq(Row(1, 100, null)))
174+
assertTableCached(testTable)
175+
checkAnswer(spark.table(testTable), Seq(Row(1, 100, null)))
183176

184-
session.sql(s"REFRESH TABLE $testTable").collect()
185-
assertTableCached(session, testTable)
186-
checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200, -1)))
187-
}
177+
spark.sql(s"REFRESH TABLE $testTable").collect()
178+
assertTableCached(testTable)
179+
checkAnswer(spark.table(testTable), Seq(Row(1, 100, null), Row(2, 200, -1)))
188180
}
189181
}
190182

191183
test(s"${testPrefix}SPARK-54022: cached table after external drop and " +
192184
"recreate sees empty table") {
193-
withTestSession { session =>
194-
withTestTableAndViews(session, testTable) {
195-
session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
196-
session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
197-
198-
session.table(testTable).cache()
199-
assertTableCached(session, testTable)
200-
checkRows(session.table(testTable), Seq(Row(1, 100)))
201-
202-
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
203-
val originalTableId = catalog.loadTable(testIdent).id
204-
205-
catalog.dropTable(testIdent)
206-
catalog.createTable(
207-
testIdent,
208-
new TableInfo.Builder()
209-
.withColumns(Array(
210-
Column.create("id", IntegerType),
211-
Column.create("salary", IntegerType)))
212-
.build())
213-
214-
val newTableId = catalog.loadTable(testIdent).id
215-
assert(originalTableId != newTableId)
216-
217-
val result = session.table(testTable)
218-
assert(result.schema.fieldNames.toSeq == Seq("id", "salary"))
219-
checkRows(result, Seq.empty)
220-
221-
// External drop+recreate produces a new table identity, so the prior cache entry
222-
// is unreachable via name lookup (unlike external write/schema change where the
223-
// cache stays pinned).
224-
assert(!session.catalog.isCached(testTable))
225-
226-
session.sql(s"REFRESH TABLE $testTable").collect()
227-
checkRows(session.table(testTable), Seq.empty)
228-
}
185+
withTable(testTable) {
186+
spark.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING foo").collect()
187+
spark.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
188+
189+
spark.table(testTable).cache()
190+
assertTableCached(testTable)
191+
checkAnswer(spark.table(testTable), Seq(Row(1, 100)))
192+
193+
val catalog = getTableCatalog[InMemoryTableCatalog](spark, "testcat")
194+
val originalTableId = catalog.loadTable(testIdent).id
195+
196+
catalog.dropTable(testIdent)
197+
catalog.createTable(
198+
testIdent,
199+
new TableInfo.Builder()
200+
.withColumns(Array(
201+
Column.create("id", IntegerType),
202+
Column.create("salary", IntegerType)))
203+
.build())
204+
205+
val newTableId = catalog.loadTable(testIdent).id
206+
assert(originalTableId != newTableId)
207+
208+
val result = spark.table(testTable)
209+
assert(result.schema.fieldNames.toSeq == Seq("id", "salary"))
210+
checkAnswer(result, Seq.empty)
211+
212+
// External drop+recreate produces a new table identity, so the prior cache entry
213+
// is unreachable via name lookup (unlike external write/schema change where the
214+
// cache stays pinned).
215+
assert(!spark.catalog.isCached(testTable))
216+
217+
spark.sql(s"REFRESH TABLE $testTable").collect()
218+
checkAnswer(spark.table(testTable), Seq.empty)
229219
}
230220
}
231221

232222
test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table stale after " +
233223
"external drop and recreate") {
234-
withTestSession { session =>
235-
withTestTableAndViews(session, cachingTestTable) {
236-
session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) USING foo").collect()
237-
session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
238-
239-
session.table(cachingTestTable).cache()
240-
assertTableCached(session, cachingTestTable)
241-
checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
242-
243-
val catalog =
244-
getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
245-
val originalTableId = catalog.loadTable(testIdent).id
246-
247-
catalog.dropTable(testIdent)
248-
catalog.createTable(
249-
testIdent,
250-
new TableInfo.Builder()
251-
.withColumns(Array(
252-
Column.create("id", IntegerType),
253-
Column.create("salary", IntegerType)))
254-
.build())
255-
256-
// CachingInMemoryTableCatalog does not invalidate on drop/create, so loadTable
257-
// still returns the old cached table object. CacheManager still matches and
258-
// serves the stale cached data.
259-
assertTableCached(session, cachingTestTable)
260-
checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
261-
262-
// REFRESH TABLE calls invalidateTable (clears connector cache) and rebuilds
263-
// the CacheManager entry, so the new empty table becomes visible.
264-
session.sql(s"REFRESH TABLE $cachingTestTable").collect()
265-
checkRows(session.table(cachingTestTable), Seq.empty)
266-
}
224+
withTable(cachingTestTable) {
225+
spark.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) USING foo").collect()
226+
spark.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
227+
228+
spark.table(cachingTestTable).cache()
229+
assertTableCached(cachingTestTable)
230+
checkAnswer(spark.table(cachingTestTable), Seq(Row(1, 100)))
231+
232+
val catalog =
233+
getTableCatalog[CachingInMemoryTableCatalog](spark, "cachingcat")
234+
val originalTableId = catalog.loadTable(testIdent).id
235+
236+
catalog.dropTable(testIdent)
237+
catalog.createTable(
238+
testIdent,
239+
new TableInfo.Builder()
240+
.withColumns(Array(
241+
Column.create("id", IntegerType),
242+
Column.create("salary", IntegerType)))
243+
.build())
244+
245+
// CachingInMemoryTableCatalog does not invalidate on drop/create, so loadTable
246+
// still returns the old cached table object. CacheManager still matches and
247+
// serves the stale cached data.
248+
assertTableCached(cachingTestTable)
249+
checkAnswer(spark.table(cachingTestTable), Seq(Row(1, 100)))
250+
251+
// REFRESH TABLE calls invalidateTable (clears connector cache) and rebuilds
252+
// the CacheManager entry, so the new empty table becomes visible.
253+
spark.sql(s"REFRESH TABLE $cachingTestTable").collect()
254+
checkAnswer(spark.table(cachingTestTable), Seq.empty)
267255
}
268256
}
269257
}

0 commit comments

Comments
 (0)