Skip to content

Commit df7614a

Browse files
xinlian12Copilot
andauthored
[SparkConnector][No Review]FixNoClassDefFoundError for MetadataVersionUtil (#48837)
* Fix NoClassDefFoundError for MetadataVersionUtil in Cosmos Spark connector Inline version validation logic in ChangeFeedInitialOffsetWriter instead of depending on Spark-internal MetadataVersionUtil, which has been relocated in Databricks Runtime 17.3 LTS (Spark 4.0). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add unit tests for inlined validateVersion logic Add ChangeFeedInitialOffsetWriterSpec with tests covering: - Valid version strings within supported range - Version exceeding max supported (UnsupportedLogVersion) - Malformed versions: non-numeric, empty, missing v prefix, v0, negative, bare v Widen companion object visibility to private[spark] for testability. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add change feed micro-batch streaming scenarios to Databricks live test notebooks Add structured streaming scenarios using cosmos.oltp.changeFeed to both basicScenario.scala and basicScenarioAadManagedIdentity.scala notebooks. These scenarios exercise the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+). Each scenario: - Creates a sink container - Reads change feed from source via readStream with micro-batch - Writes to sink container via writeStream - Validates records were copied - Cleans up both containers Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix change feed streaming checkpoint path in Databricks notebooks Use file:/tmp/ instead of /tmp/ for checkpoint location to avoid DBFS access issues on Unity Catalog-enabled Databricks clusters. Also: - Remove unused Trigger import - Stop query before reading sink to avoid race conditions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Simplify change feed streaming test to use memory sink Replace cosmos.oltp sink with in-memory sink to eliminate the need for a separate sink container. This avoids 404 errors from sink container creation/resolution and removes checkpoint path concerns. The test still exercises the full ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths (readStream with cosmos.oltp.changeFeed), which is the goal for validating the MetadataVersionUtil fix. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove change feed streaming scenarios from Databricks notebooks Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Re-add change feed streaming with shared logic in both notebooks Both notebooks now use the same pattern: derive changeFeedCfg from the existing cfg map (which already has the correct auth config) plus the change feed-specific options. Write to an in-memory sink to avoid container creation issues. This ensures both key-based and AAD/MSI notebooks exercise identical streaming logic. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove change feed streaming from AAD/MSI notebook The MSI notebook shares a cluster with basicScenario, and the Cosmos client cache retains references from the first notebook's proactive connection init. When basicScenario drops the source container during cleanup, the MSI notebook's change feed streaming fails with 404 on the cached (now-deleted) container. The change feed streaming test in basicScenario already provides sufficient coverage for the ChangeFeedInitialOffsetWriter code paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add diagnostic logging to MSI change feed streaming test Add detailed logging to capture: - Endpoint, database, container, auth config used - Source container record count before streaming - Streaming query ID - Full exception details on failure This will help diagnose why the change feed streaming fails on the MSI notebook but succeeds on the key-based one. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove change feed streaming from MSI notebook The MSI change feed test passes on a fresh cluster but fails when basicScenario runs first on the same cluster without restart. The basicScenario leaves cached Cosmos client state (proactive connection init on the ephemeral endpoint) that causes the MSI streaming query to resolve to the wrong endpoint, resulting in a 404. The change feed test in basicScenario provides sufficient coverage for the ChangeFeedInitialOffsetWriter/HDFSMetadataLog code paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d86dd5d commit df7614a

File tree

3 files changed

+137
-2
lines changed

3 files changed

+137
-2
lines changed

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
package com.azure.cosmos.spark
44

55
import org.apache.spark.sql.SparkSession
6-
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, MetadataVersionUtil}
6+
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog
77

88
import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
99
import java.nio.charset.StandardCharsets
@@ -33,7 +33,7 @@ private class ChangeFeedInitialOffsetWriter
3333
"Log file was malformed: failed to detect the log file version line.")
3434
}
3535

36-
MetadataVersionUtil.validateVersion(content.substring(0, indexOfNewLine), VERSION)
36+
ChangeFeedInitialOffsetWriter.validateVersion(content.substring(0, indexOfNewLine), VERSION)
3737
content.substring(indexOfNewLine + 1)
3838
}
3939

@@ -58,3 +58,35 @@ private class ChangeFeedInitialOffsetWriter
5858
override def toString: String = stringBuilder.toString()
5959
}
6060
}
61+
62+
private[spark] object ChangeFeedInitialOffsetWriter {
63+
/**
64+
* Validates the version string from the log file.
65+
* This is inlined to avoid a runtime dependency on MetadataVersionUtil,
66+
* which has been relocated in some Spark distributions (e.g. Databricks Runtime 17.3+).
67+
*/
68+
def validateVersion(versionText: String, maxSupportedVersion: Int): Int = {
69+
if (versionText.nonEmpty && versionText(0) == 'v') {
70+
val version =
71+
try {
72+
versionText.substring(1).toInt
73+
} catch {
74+
case _: NumberFormatException =>
75+
throw new IllegalStateException(
76+
s"Log file was malformed: failed to read correct log version from $versionText.")
77+
}
78+
if (version > 0 && version <= maxSupportedVersion) {
79+
return version
80+
}
81+
if (version > maxSupportedVersion) {
82+
throw new IllegalStateException(
83+
s"UnsupportedLogVersion: maximum supported log version " +
84+
s"is v$maxSupportedVersion, but encountered v$version. " +
85+
s"The log file was produced by a newer version of Spark and cannot be read by this version. " +
86+
s"Please upgrade.")
87+
}
88+
}
89+
throw new IllegalStateException(
90+
s"Log file was malformed: failed to read correct log version from $versionText.")
91+
}
92+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.spark
4+
5+
class ChangeFeedInitialOffsetWriterSpec extends UnitSpec {
6+
7+
"validateVersion" should "return version for valid version string within supported range" in {
8+
ChangeFeedInitialOffsetWriter.validateVersion("v1", 1) shouldBe 1
9+
}
10+
11+
it should "return version when version is less than max supported" in {
12+
ChangeFeedInitialOffsetWriter.validateVersion("v1", 5) shouldBe 1
13+
}
14+
15+
it should "return version when version equals max supported" in {
16+
ChangeFeedInitialOffsetWriter.validateVersion("v3", 3) shouldBe 3
17+
}
18+
19+
it should "throw IllegalStateException for version exceeding max supported" in {
20+
val exception = intercept[IllegalStateException] {
21+
ChangeFeedInitialOffsetWriter.validateVersion("v2", 1)
22+
}
23+
exception.getMessage should include("UnsupportedLogVersion")
24+
exception.getMessage should include("v1")
25+
exception.getMessage should include("v2")
26+
}
27+
28+
it should "throw IllegalStateException for non-numeric version" in {
29+
val exception = intercept[IllegalStateException] {
30+
ChangeFeedInitialOffsetWriter.validateVersion("vabc", 1)
31+
}
32+
exception.getMessage should include("malformed")
33+
}
34+
35+
it should "throw IllegalStateException for empty string" in {
36+
val exception = intercept[IllegalStateException] {
37+
ChangeFeedInitialOffsetWriter.validateVersion("", 1)
38+
}
39+
exception.getMessage should include("malformed")
40+
}
41+
42+
it should "throw IllegalStateException for string without v prefix" in {
43+
val exception = intercept[IllegalStateException] {
44+
ChangeFeedInitialOffsetWriter.validateVersion("1", 1)
45+
}
46+
exception.getMessage should include("malformed")
47+
}
48+
49+
it should "throw IllegalStateException for v0 (zero version)" in {
50+
val exception = intercept[IllegalStateException] {
51+
ChangeFeedInitialOffsetWriter.validateVersion("v0", 1)
52+
}
53+
exception.getMessage should include("malformed")
54+
}
55+
56+
it should "throw IllegalStateException for negative version" in {
57+
val exception = intercept[IllegalStateException] {
58+
ChangeFeedInitialOffsetWriter.validateVersion("v-1", 1)
59+
}
60+
exception.getMessage should include("malformed")
61+
}
62+
63+
it should "throw IllegalStateException for version string with only v" in {
64+
val exception = intercept[IllegalStateException] {
65+
ChangeFeedInitialOffsetWriter.validateVersion("v", 1)
66+
}
67+
exception.getMessage should include("malformed")
68+
}
69+
}

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,39 @@ df.filter(col("isAlive") === true)
111111

112112
// COMMAND ----------
113113

114+
// Change Feed - micro-batch structured streaming
115+
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
116+
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
117+
118+
val changeFeedCfg = cfg ++ Map(
119+
"spark.cosmos.read.inferSchema.enabled" -> "false",
120+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
121+
"spark.cosmos.changeFeed.mode" -> "Incremental"
122+
)
123+
124+
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
125+
126+
val changeFeedDF = spark
127+
.readStream
128+
.format("cosmos.oltp.changeFeed")
129+
.options(changeFeedCfg)
130+
.load()
131+
132+
val microBatchQuery = changeFeedDF
133+
.writeStream
134+
.format("memory")
135+
.queryName(testId)
136+
.outputMode("append")
137+
.start()
138+
139+
microBatchQuery.processAllAvailable()
140+
microBatchQuery.stop()
141+
142+
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
143+
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
144+
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
145+
146+
// COMMAND ----------
147+
114148
// cleanup
115149
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)