Skip to content

Commit b469e86

Browse files
xinlian12Copilot
andcommitted
Add diagnostic logging to MSI change feed streaming test
Add detailed logging to capture: - Endpoint, database, container, auth config used - Source container record count before streaming - Streaming query ID - Full exception details on failure This will help diagnose why the change feed streaming fails on the MSI notebook but succeeds on the key-based one. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b999e3f commit b469e86

1 file changed

Lines changed: 54 additions & 0 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenarioAadManagedIdentity.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,59 @@ df.filter(col("isAlive") === true)
9696

9797
// COMMAND ----------
9898

99+
// Change Feed - micro-batch structured streaming
100+
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
101+
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
102+
103+
val changeFeedCfg = cfg ++ Map(
104+
"spark.cosmos.read.inferSchema.enabled" -> "false",
105+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
106+
"spark.cosmos.changeFeed.mode" -> "Incremental"
107+
)
108+
109+
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
110+
111+
println(s"Change Feed test: using endpoint=${cosmosEndpoint}")
112+
println(s"Change Feed test: database=${cosmosDatabaseName}, container=${cosmosContainerName}")
113+
println(s"Change Feed test: authType=${authType}")
114+
println(s"Change Feed test: changeFeedCfg keys=${changeFeedCfg.keys.mkString(", ")}")
115+
116+
// Verify the source container is accessible before starting streaming
117+
val verifyDf = spark.read.format("cosmos.oltp").options(cfg).load()
118+
val verifyCount = verifyDf.count()
119+
println(s"Change Feed test: source container has $verifyCount records before streaming")
120+
121+
try {
122+
val changeFeedDF = spark
123+
.readStream
124+
.format("cosmos.oltp.changeFeed")
125+
.options(changeFeedCfg)
126+
.load()
127+
128+
val microBatchQuery = changeFeedDF
129+
.writeStream
130+
.format("memory")
131+
.queryName(testId)
132+
.outputMode("append")
133+
.start()
134+
135+
println(s"Change Feed test: streaming query started, id=${microBatchQuery.id}")
136+
microBatchQuery.processAllAvailable()
137+
microBatchQuery.stop()
138+
139+
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
140+
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
141+
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
142+
} catch {
143+
case e: Exception =>
144+
println(s"Change Feed test FAILED: ${e.getClass.getName}: ${e.getMessage}")
145+
if (e.getCause != null) {
146+
println(s"Change Feed test cause: ${e.getCause.getClass.getName}: ${e.getCause.getMessage}")
147+
}
148+
throw e
149+
}
150+
151+
// COMMAND ----------
152+
99153
// cleanup
100154
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)