Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
package com.azure.cosmos.spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, MetadataVersionUtil}
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog

import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -33,7 +33,7 @@ private class ChangeFeedInitialOffsetWriter
"Log file was malformed: failed to detect the log file version line.")
}

MetadataVersionUtil.validateVersion(content.substring(0, indexOfNewLine), VERSION)
ChangeFeedInitialOffsetWriter.validateVersion(content.substring(0, indexOfNewLine), VERSION)
content.substring(indexOfNewLine + 1)
}

Expand All @@ -58,3 +58,35 @@ private class ChangeFeedInitialOffsetWriter
override def toString: String = stringBuilder.toString()
}
}

private[spark] object ChangeFeedInitialOffsetWriter {
/**
* Validates the version string from the log file.
* This is inlined to avoid a runtime dependency on MetadataVersionUtil,
* which has been relocated in some Spark distributions (e.g. Databricks Runtime 17.3+).
*/
def validateVersion(versionText: String, maxSupportedVersion: Int): Int = {
if (versionText.nonEmpty && versionText(0) == 'v') {
val version =
try {
versionText.substring(1).toInt
} catch {
case _: NumberFormatException =>
throw new IllegalStateException(
s"Log file was malformed: failed to read correct log version from $versionText.")
}
if (version > 0 && version <= maxSupportedVersion) {
return version
}
if (version > maxSupportedVersion) {
throw new IllegalStateException(
s"UnsupportedLogVersion: maximum supported log version " +
s"is v$maxSupportedVersion, but encountered v$version. " +
s"The log file was produced by a newer version of Spark and cannot be read by this version. " +
s"Please upgrade.")
}
}
throw new IllegalStateException(
s"Log file was malformed: failed to read correct log version from $versionText.")
Comment thread
xinlian12 marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

class ChangeFeedInitialOffsetWriterSpec extends UnitSpec {

"validateVersion" should "return version for valid version string within supported range" in {
ChangeFeedInitialOffsetWriter.validateVersion("v1", 1) shouldBe 1
}

it should "return version when version is less than max supported" in {
ChangeFeedInitialOffsetWriter.validateVersion("v1", 5) shouldBe 1
}

it should "return version when version equals max supported" in {
ChangeFeedInitialOffsetWriter.validateVersion("v3", 3) shouldBe 3
}

it should "throw IllegalStateException for version exceeding max supported" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("v2", 1)
}
exception.getMessage should include("UnsupportedLogVersion")
exception.getMessage should include("v1")
exception.getMessage should include("v2")
}

it should "throw IllegalStateException for non-numeric version" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("vabc", 1)
}
exception.getMessage should include("malformed")
}

it should "throw IllegalStateException for empty string" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("", 1)
}
exception.getMessage should include("malformed")
}

it should "throw IllegalStateException for string without v prefix" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("1", 1)
}
exception.getMessage should include("malformed")
}

it should "throw IllegalStateException for v0 (zero version)" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("v0", 1)
}
exception.getMessage should include("malformed")
}

it should "throw IllegalStateException for negative version" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("v-1", 1)
}
exception.getMessage should include("malformed")
}

it should "throw IllegalStateException for version string with only v" in {
val exception = intercept[IllegalStateException] {
ChangeFeedInitialOffsetWriter.validateVersion("v", 1)
}
exception.getMessage should include("malformed")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,39 @@ df.filter(col("isAlive") === true)

// COMMAND ----------

// Change Feed - micro-batch structured streaming
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)

val changeFeedCfg = cfg ++ Map(
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental"
)

val testId = java.util.UUID.randomUUID().toString.replace("-", "")

val changeFeedDF = spark
.readStream
.format("cosmos.oltp.changeFeed")
.options(changeFeedCfg)
.load()

val microBatchQuery = changeFeedDF
.writeStream
.format("memory")
.queryName(testId)
.outputMode("append")
.start()

microBatchQuery.processAllAvailable()
microBatchQuery.stop()

val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")

// COMMAND ----------

// cleanup
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")