Skip to content

Commit db3a420

Browse files
junmuzymuzammil
authored andcommitted
Spark 4.1 test changes made
1 parent aeae9ac commit db3a420

10 files changed

+3898
-0
lines changed
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.spark.sql.{Dataset, Row}
22+
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
23+
import org.apache.spark.sql.streaming.StreamTest
24+
25+
class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
26+
27+
import testImplicits._
28+
29+
test("Paimon CDC Source: batch write and streaming read change-log with default scan mode") {
30+
withTempDir {
31+
checkpointDir =>
32+
val tableName = "T"
33+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
34+
spark.sql(s"""
35+
|CREATE TABLE $tableName (a INT, b STRING)
36+
|TBLPROPERTIES (
37+
| 'primary-key'='a',
38+
| 'bucket'='2',
39+
| 'changelog-producer' = 'lookup')
40+
|""".stripMargin)
41+
42+
spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
43+
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
44+
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
45+
46+
val table = loadTable(tableName)
47+
val location = table.location().toString
48+
49+
val readStream = spark.readStream
50+
.format("paimon")
51+
.option("read.changelog", "true")
52+
.load(location)
53+
.writeStream
54+
.format("memory")
55+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
56+
.queryName("mem_table")
57+
.outputMode("append")
58+
.start()
59+
60+
val currentResult = () => spark.sql("SELECT * FROM mem_table")
61+
try {
62+
readStream.processAllAvailable()
63+
val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2_new") :: Nil
64+
checkAnswer(currentResult(), expertResult1)
65+
66+
spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
67+
readStream.processAllAvailable()
68+
val expertResult2 =
69+
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
70+
"+I",
71+
2,
72+
"v_2_new") :: Row("+I", 3, "v_3") :: Nil
73+
checkAnswer(currentResult(), expertResult2)
74+
} finally {
75+
readStream.stop()
76+
}
77+
}
78+
}
79+
80+
test("Paimon CDC Source: batch write and streaming read change-log with scan.snapshot-id") {
81+
withTempDir {
82+
checkpointDir =>
83+
val tableName = "T"
84+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
85+
spark.sql(s"""
86+
|CREATE TABLE $tableName (a INT, b STRING)
87+
|TBLPROPERTIES (
88+
| 'primary-key'='a',
89+
| 'bucket'='2',
90+
| 'changelog-producer' = 'lookup')
91+
|""".stripMargin)
92+
93+
spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
94+
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
95+
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
96+
97+
val table = loadTable(tableName)
98+
val location = table.location().toString
99+
100+
val readStream = spark.readStream
101+
.format("paimon")
102+
.option("read.changelog", "true")
103+
.option("scan.mode", "from-snapshot")
104+
.option("scan.snapshot-id", 1)
105+
.load(location)
106+
.writeStream
107+
.format("memory")
108+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
109+
.queryName("mem_table")
110+
.outputMode("append")
111+
.start()
112+
113+
val currentResult = () => spark.sql("SELECT * FROM mem_table")
114+
try {
115+
readStream.processAllAvailable()
116+
val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
117+
"-U",
118+
2,
119+
"v_2") :: Row("+U", 2, "v_2_new") :: Nil
120+
checkAnswer(currentResult(), expertResult1)
121+
122+
spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 'v_3')")
123+
readStream.processAllAvailable()
124+
val expertResult2 =
125+
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
126+
"+I",
127+
2,
128+
"v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
129+
checkAnswer(currentResult(), expertResult2)
130+
} finally {
131+
readStream.stop()
132+
}
133+
}
134+
}
135+
136+
test("Paimon CDC Source: streaming write and streaming read change-log") {
137+
withTempDirs {
138+
(checkpointDir1, checkpointDir2) =>
139+
val tableName = "T"
140+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
141+
spark.sql(s"""
142+
|CREATE TABLE $tableName (a INT, b STRING)
143+
|TBLPROPERTIES (
144+
| 'primary-key'='a',
145+
| 'bucket'='2',
146+
| 'changelog-producer' = 'lookup')
147+
|""".stripMargin)
148+
149+
val table = loadTable(tableName)
150+
val location = table.location().toString
151+
152+
// streaming write
153+
val inputData = MemoryStream[(Int, String)]
154+
val writeStream = inputData
155+
.toDS()
156+
.toDF("a", "b")
157+
.writeStream
158+
.option("checkpointLocation", checkpointDir1.getCanonicalPath)
159+
.foreachBatch {
160+
(batch: Dataset[Row], _: Long) =>
161+
batch.write.format("paimon").mode("append").save(location)
162+
}
163+
.start()
164+
165+
// streaming read
166+
val readStream = spark.readStream
167+
.format("paimon")
168+
.option("read.changelog", "true")
169+
.option("scan.mode", "from-snapshot")
170+
.option("scan.snapshot-id", 1)
171+
.load(location)
172+
.writeStream
173+
.format("memory")
174+
.option("checkpointLocation", checkpointDir2.getCanonicalPath)
175+
.queryName("mem_table")
176+
.outputMode("append")
177+
.start()
178+
179+
val currentResult = () => spark.sql("SELECT * FROM mem_table")
180+
try {
181+
inputData.addData((1, "v_1"))
182+
writeStream.processAllAvailable()
183+
readStream.processAllAvailable()
184+
val expertResult1 = Row("+I", 1, "v_1") :: Nil
185+
checkAnswer(currentResult(), expertResult1)
186+
187+
inputData.addData((2, "v_2"))
188+
writeStream.processAllAvailable()
189+
readStream.processAllAvailable()
190+
val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil
191+
checkAnswer(currentResult(), expertResult2)
192+
193+
inputData.addData((2, "v_2_new"))
194+
writeStream.processAllAvailable()
195+
readStream.processAllAvailable()
196+
val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row(
197+
"-U",
198+
2,
199+
"v_2") :: Row("+U", 2, "v_2_new") :: Nil
200+
checkAnswer(currentResult(), expertResult3)
201+
202+
inputData.addData((1, "v_1_new"), (3, "v_3"))
203+
writeStream.processAllAvailable()
204+
readStream.processAllAvailable()
205+
val expertResult4 =
206+
Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row(
207+
"+I",
208+
2,
209+
"v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil
210+
checkAnswer(currentResult(), expertResult4)
211+
} finally {
212+
readStream.stop()
213+
}
214+
}
215+
}
216+
217+
test("Paimon CDC Source: streaming read change-log with audit_log system table") {
218+
withTable("T") {
219+
withTempDir {
220+
checkpointDir =>
221+
spark.sql(
222+
s"""
223+
|CREATE TABLE T (a INT, b STRING)
224+
|TBLPROPERTIES ('primary-key'='a','bucket'='2', 'changelog-producer' = 'lookup')
225+
|""".stripMargin)
226+
227+
val readStream = spark.readStream
228+
.format("paimon")
229+
.table("`T$audit_log`")
230+
.writeStream
231+
.format("memory")
232+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
233+
.queryName("mem_table")
234+
.outputMode("append")
235+
.start()
236+
237+
val currentResult = () => spark.sql("SELECT * FROM mem_table")
238+
try {
239+
spark.sql(s"INSERT INTO T VALUES (1, 'v_1')")
240+
readStream.processAllAvailable()
241+
checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Nil)
242+
243+
spark.sql(s"INSERT INTO T VALUES (2, 'v_2')")
244+
readStream.processAllAvailable()
245+
checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil)
246+
} finally {
247+
readStream.stop()
248+
}
249+
}
250+
}
251+
}
252+
}

0 commit comments

Comments
 (0)