Skip to content

Commit c764de9

Browse files
Update CosmosItemsDataSource.scala
1 parent 662b1a4 commit c764de9

1 file changed

Lines changed: 7 additions & 2 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosItemsDataSource.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,13 @@ object CosmosItemsDataSource {
131131
val pkIdentityFieldExtraction = df
132132
.schema
133133
.find(field => field.name.equals(CosmosConstants.Properties.PartitionKeyIdentity) && field.dataType.equals(StringType))
134-
.map(field => (row: Row) =>
135-
CosmosPartitionKeyHelper.tryParsePartitionKey(row.getString(row.fieldIndex(field.name))).get)
134+
.map(field => (row: Row) => {
135+
val rawValue = row.getString(row.fieldIndex(field.name))
136+
CosmosPartitionKeyHelper.tryParsePartitionKey(rawValue)
137+
.getOrElse(throw new IllegalArgumentException(
138+
s"Invalid _partitionKeyIdentity value in row: '$rawValue'. " +
139+
"Expected format: pk([...json...])"))
140+
})
136141

137142
// Option 2: Detect PK columns by matching the container's partition key paths against the DataFrame schema
138143
val pkColumnExtraction: Option[Row => PartitionKey] = if (pkIdentityFieldExtraction.isDefined) {

0 commit comments

Comments
 (0)