diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala index 8f22181a4fab..9d1a0cafe837 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala @@ -3,7 +3,7 @@ package com.azure.cosmos.spark import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, MetadataVersionUtil} +import org.apache.spark.sql.execution.streaming.HDFSMetadataLog import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter} import java.nio.charset.StandardCharsets @@ -33,7 +33,7 @@ private class ChangeFeedInitialOffsetWriter "Log file was malformed: failed to detect the log file version line.") } - MetadataVersionUtil.validateVersion(content.substring(0, indexOfNewLine), VERSION) + ChangeFeedInitialOffsetWriter.validateVersion(content.substring(0, indexOfNewLine), VERSION) content.substring(indexOfNewLine + 1) } @@ -58,3 +58,35 @@ private class ChangeFeedInitialOffsetWriter override def toString: String = stringBuilder.toString() } } + +private[spark] object ChangeFeedInitialOffsetWriter { + /** + * Validates the version string from the log file. + * This is inlined to avoid a runtime dependency on MetadataVersionUtil, + * which has been relocated in some Spark distributions (e.g. Databricks Runtime 17.3+). + */ + def validateVersion(versionText: String, maxSupportedVersion: Int): Int = { + if (versionText.nonEmpty && versionText(0) == 'v') { + val version = + try { + versionText.substring(1).toInt + } catch { + case _: NumberFormatException => + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log version from $versionText.") + } + if (version > 0 && version <= maxSupportedVersion) { + return version + } + if (version > maxSupportedVersion) { + throw new IllegalStateException( + s"UnsupportedLogVersion: maximum supported log version " + + s"is v$maxSupportedVersion, but encountered v$version. " + + s"The log file was produced by a newer version of Spark and cannot be read by this version. " + + s"Please upgrade.") + } + } + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log version from $versionText.") + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriterSpec.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriterSpec.scala new file mode 100644 index 000000000000..62aab4214d18 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriterSpec.scala @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +class ChangeFeedInitialOffsetWriterSpec extends UnitSpec { + + "validateVersion" should "return version for valid version string within supported range" in { + ChangeFeedInitialOffsetWriter.validateVersion("v1", 1) shouldBe 1 + } + + it should "return version when version is less than max supported" in { + ChangeFeedInitialOffsetWriter.validateVersion("v1", 5) shouldBe 1 + } + + it should "return version when version equals max supported" in { + ChangeFeedInitialOffsetWriter.validateVersion("v3", 3) shouldBe 3 + } + + it should "throw IllegalStateException for version exceeding max supported" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("v2", 1) + } + exception.getMessage should include("UnsupportedLogVersion") + exception.getMessage should include("v1") + exception.getMessage should include("v2") + } + + it should "throw IllegalStateException for non-numeric version" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("vabc", 1) + } + exception.getMessage should include("malformed") + } + + it should "throw IllegalStateException for empty string" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("", 1) + } + exception.getMessage should include("malformed") + } + + it should "throw IllegalStateException for string without v prefix" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("1", 1) + } + exception.getMessage should include("malformed") + } + + it should "throw IllegalStateException for v0 (zero version)" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("v0", 1) + } + exception.getMessage should include("malformed") + } + + it should "throw IllegalStateException for negative version" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("v-1", 1) + } + exception.getMessage should include("malformed") + } + + it should "throw IllegalStateException for version string with only v" in { + val exception = intercept[IllegalStateException] { + ChangeFeedInitialOffsetWriter.validateVersion("v", 1) + } + exception.getMessage should include("malformed") + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala b/sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala index 23c20f545638..ccad58169342 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala @@ -111,5 +111,39 @@ df.filter(col("isAlive") === true) // COMMAND ---------- +// Change Feed - micro-batch structured streaming +// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths +// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+) + +val changeFeedCfg = cfg ++ Map( + "spark.cosmos.read.inferSchema.enabled" -> "false", + "spark.cosmos.changeFeed.startFrom" -> "Beginning", + "spark.cosmos.changeFeed.mode" -> "Incremental" +) + +val testId = java.util.UUID.randomUUID().toString.replace("-", "") + +val changeFeedDF = spark + .readStream + .format("cosmos.oltp.changeFeed") + .options(changeFeedCfg) + .load() + +val microBatchQuery = changeFeedDF + .writeStream + .format("memory") + .queryName(testId) + .outputMode("append") + .start() + +microBatchQuery.processAllAvailable() +microBatchQuery.stop() + +val sinkCount = spark.sql(s"SELECT * FROM $testId").count() +println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed") +assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount") + +// COMMAND ---------- + // cleanup spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")