Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
9770833
Adding readManyByPartitionKey API
FabianMeiswinkel Apr 13, 2026
ac287bc
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 13, 2026
9a5b3e9
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
a8720c3
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
d499da7
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
c3c542a
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
4416354
´Fixing code review comments
FabianMeiswinkel Apr 14, 2026
3ab3f0d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 14, 2026
588a755
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8c5cdb4
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
f548552
Update ReadManyByPartitionKeyTest.java
FabianMeiswinkel Apr 15, 2026
f68cf02
Fixing test issues
FabianMeiswinkel Apr 15, 2026
8b6c4b1
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8ba7f4d
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
56b067a
Reacted to code review feedback
FabianMeiswinkel Apr 16, 2026
fa430e9
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
d9504c9
Fix build issues
FabianMeiswinkel Apr 16, 2026
73151f0
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
681830e
Fixing changelog
FabianMeiswinkel Apr 16, 2026
7f745e6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
0b8905d
Addressing code review comments
FabianMeiswinkel Apr 16, 2026
22abc78
Addressing code review feedback
FabianMeiswinkel Apr 16, 2026
662b1a4
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
c764de9
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
e1e6f5a
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
080ce4a
Update RxDocumentClientImpl.java
FabianMeiswinkel Apr 17, 2026
516bbf3
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
b01f875
Fix readManyByPartitionKey retries
FabianMeiswinkel Apr 17, 2026
7130d4a
Fix PK.None
FabianMeiswinkel Apr 17, 2026
93957f3
Update ReadManyByPartitionKeyQueryHelper.java
FabianMeiswinkel Apr 17, 2026
16dd1d6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
9200f8f
Fix code review feedback
FabianMeiswinkel Apr 17, 2026
4a3ea06
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
e2aa124
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
c34341e
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
c96b6f6
Reacting to code review feedback
FabianMeiswinkel Apr 17, 2026
e306fae
React to code review feedback
FabianMeiswinkel Apr 17, 2026
f34270d
Addressing code review comments
FabianMeiswinkel Apr 17, 2026
55ddac3
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 17, 2026
f13de23
Fixing nested PK for readMany / readAllItems
FabianMeiswinkel Apr 20, 2026
b0d7337
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 20, 2026
ce8fca3
Fixed changelogs
FabianMeiswinkel Apr 20, 2026
992ef1a
Disallow TOP for streamable queries in readManyByPartitionKey-API
FabianMeiswinkel Apr 20, 2026
84af9c3
Fixing quote recognition in pk for readManyByPartitionKey API
FabianMeiswinkel Apr 20, 2026
b0b7349
Fixing confusing side effects when calling GetCurrentRow
FabianMeiswinkel Apr 20, 2026
42a2fb8
Fixed code review comments
FabianMeiswinkel Apr 20, 2026
89ac19e
Fixed code review comment
FabianMeiswinkel Apr 20, 2026
ea28c5d
Avoid getting pkPaths (pk-def) per row
FabianMeiswinkel Apr 20, 2026
35e9d73
Update CosmosReadManyByPartitionKeyReader.scala
FabianMeiswinkel Apr 20, 2026
78bbd15
Renamed readManyByPartitionKey to readManyByPartitionKeys
FabianMeiswinkel Apr 20, 2026
dd83125
Fixing code review comments
FabianMeiswinkel Apr 20, 2026
2246e3f
added test
FabianMeiswinkel Apr 20, 2026
fdfa15f
Fixing continuation Token
FabianMeiswinkel Apr 21, 2026
5da1906
Update RxDocumentClientImpl.java
FabianMeiswinkel Apr 21, 2026
79d2487
Update readManyByPartitionKey-design.md
FabianMeiswinkel Apr 21, 2026
f12582f
Fixed code review comments
FabianMeiswinkel Apr 22, 2026
0d3280a
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 22, 2026
5b44e34
Changed the continuation token to not carry partitionScopes anymore
FabianMeiswinkel Apr 22, 2026
da578f6
Update cspell.yaml
FabianMeiswinkel Apr 22, 2026
f393c3f
Fixing test failure
FabianMeiswinkel Apr 22, 2026
280bc87
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 22, 2026
d4722c2
Fixed test issues
FabianMeiswinkel Apr 22, 2026
26df2ab
Update FeedResponse.java
FabianMeiswinkel Apr 22, 2026
a45aabb
Update FeedResponse.java
FabianMeiswinkel Apr 22, 2026
7299a2c
Fixes test issues
FabianMeiswinkel Apr 22, 2026
dc00653
Fixed null handinlgin in ReadManyByParittionKeyToken JsonCreator
FabianMeiswinkel Apr 22, 2026
c47f9a6
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 22, 2026
1f2f14b
Renamed CosmosReadManyPartitionKeyRequestOptions to CosmosReadManyPar…
FabianMeiswinkel Apr 22, 2026
9c75a32
Code review fixes
FabianMeiswinkel Apr 22, 2026
e8ea368
Fixed test issues
FabianMeiswinkel Apr 22, 2026
f3aa1da
updates to patch sparse checkout to account for spring and cosmos whi…
scbedd Apr 23, 2026
2c404e6
Update eng/scripts/Generate-ServiceDirectories-From-Project-List.ps1
scbedd Apr 23, 2026
112cf71
Fixing code review comments
FabianMeiswinkel Apr 23, 2026
348257a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 23, 2026
bdc5a2d
Merge branch 'patch-sparse-checkout' of https://github.com/Azure/azur…
FabianMeiswinkel Apr 23, 2026
cbd8a93
Fixes code review comments
FabianMeiswinkel Apr 23, 2026
cf678fa
Fixed code review comments
FabianMeiswinkel Apr 23, 2026
17cfe75
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
bbfa698
Added e2e test for resuming readManyByPartitionKeys with continuation…
FabianMeiswinkel Apr 23, 2026
587cac6
Added and fixed tests
FabianMeiswinkel Apr 23, 2026
6e1cf84
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
59f3d44
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
81a99f3
Update CosmosItemSerializerNoExceptionWrapping.scala
FabianMeiswinkel Apr 23, 2026
2d8f2a0
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 23, 2026
5beab22
Update ReadManyByPartitionKeyContinuationToken.java
FabianMeiswinkel Apr 23, 2026
853bd04
Update DocumentQueryExecutionContextFactory.java
FabianMeiswinkel Apr 23, 2026
0fc0cc6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
53ad60d
Update FaultInjectionWithAvailabilityStrategyTestsBase.java
FabianMeiswinkel Apr 23, 2026
970d209
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

metastore_db/*
spark-warehouse/*

.temp/
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.TestConfigurations
import com.azure.cosmos.models.{CosmosContainerProperties, CosmosItemRequestOptions, PartitionKey, PartitionKeyDefinition, PartitionKeyDefinitionVersion, PartitionKind, ThroughputProperties}
import com.azure.cosmos.spark.udf.GetCosmosPartitionKeyValue
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.StringType

import java.util.UUID
import java.util.{ArrayList, UUID}

import scala.collection.JavaConverters._

class SparkE2EQueryITest
extends SparkE2EQueryITestBase {

// scalastyle:off multiple.string.literals
"spark query" can "return proper Cosmos specific query plan on explain with nullable properties" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down Expand Up @@ -67,4 +74,115 @@ class SparkE2EQueryITest
val item = rowsArray(0)
item.getAs[String]("id") shouldEqual id
}
}

"spark readManyByPartitionKey" can "use a matching top-level partition key column without the UDF" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey)
val requestOptions = new CosmosItemRequestOptions()

Seq("pkA", "pkB").foreach { pkValue =>
val item = objectMapper.createObjectNode()
item.put("id", s"item-$pkValue")
item.put("pk", pkValue)
item.put("payload", s"value-$pkValue")

container.createItem(item, new PartitionKey(pkValue), requestOptions).block()
}

val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainersWithPkAsPartitionKey,
"spark.cosmos.read.inferSchema.enabled" -> "true"
)

val sparkSession = spark
import sparkSession.implicits._

val rows = CosmosItemsDataSource
.readManyByPartitionKey(Seq("pkA", "pkB").toDF("pk"), cfg.asJava)
.selectExpr("id", "pk", "payload")
.collect()

rows should have size 2
rows.map(_.getAs[String]("id")).toSet shouldEqual Set("item-pkA", "item-pkB")
rows.map(_.getAs[String]("pk")).toSet shouldEqual Set("pkA", "pkB")
rows.map(_.getAs[String]("payload")).toSet shouldEqual Set("value-pkA", "value-pkB")
}
"spark readManyByPartitionKey" can "require the UDF for nested partition key paths and succeed with it" in {
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val containerName = s"nested-pk-${UUID.randomUUID()}"

val pkPaths = new ArrayList[String]()
pkPaths.add("/tenant/id")

val pkDefinition = new PartitionKeyDefinition()
pkDefinition.setPaths(pkPaths)
pkDefinition.setKind(PartitionKind.HASH)
pkDefinition.setVersion(PartitionKeyDefinitionVersion.V2)

val containerProperties = new CosmosContainerProperties(containerName, pkDefinition)
cosmosClient
.getDatabase(cosmosDatabase)
.createContainerIfNotExists(containerProperties, ThroughputProperties.createManualThroughput(400))
.block()

try {
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName)
val requestOptions = new CosmosItemRequestOptions()

Seq("tenantA", "tenantB").foreach { tenantId =>
val item = objectMapper.createObjectNode()
item.put("id", s"item-$tenantId")
item.put("payload", s"value-$tenantId")
item.putObject("tenant").put("id", tenantId)

container.createItem(item, new PartitionKey(tenantId), requestOptions).block()
}

val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> containerName,
"spark.cosmos.read.inferSchema.enabled" -> "true"
)

val sparkSession = spark
import sparkSession.implicits._

val missingUdfError = the[IllegalArgumentException] thrownBy {
CosmosItemsDataSource.readManyByPartitionKey(Seq("tenantA").toDF("tenantId"), cfg.asJava)
}

missingUdfError.getMessage should include("Nested paths cannot be resolved from DataFrame columns automatically")
missingUdfError.getMessage should include("_partitionKeyIdentity")

spark.udf.register("GetCosmosPartitionKeyValue", new GetCosmosPartitionKeyValue(), StringType)

val inputDf = Seq("tenantA", "tenantB")
.toDF("tenantId")
.withColumn("_partitionKeyIdentity", expr("GetCosmosPartitionKeyValue(tenantId)"))

val rows = CosmosItemsDataSource
.readManyByPartitionKey(inputDf, cfg.asJava)
.selectExpr("id", "tenant.id as tenantId")
.collect()

rows should have size 2
rows.map(_.getAs[String]("id")).toSet shouldEqual Set("item-tenantA", "item-tenantB")
rows.map(_.getAs[String]("tenantId")).toSet shouldEqual Set("tenantA", "tenantB")
} finally {
cosmosClient
.getDatabase(cosmosDatabase)
.getContainer(containerName)
.delete()
.block()
}
}

// scalastyle:on multiple.string.literals
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private[spark] object CosmosConfigNames {
val ReadPartitioningFeedRangeFilter = "spark.cosmos.partitioning.feedRangeFilter"
val ReadRuntimeFilteringEnabled = "spark.cosmos.read.runtimeFiltering.enabled"
val ReadManyFilteringEnabled = "spark.cosmos.read.readManyFiltering.enabled"
val ReadManyByPkNullHandling = "spark.cosmos.read.readManyByPk.nullHandling"
val ViewsRepositoryPath = "spark.cosmos.views.repositoryPath"
val DiagnosticsMode = "spark.cosmos.diagnostics"
val DiagnosticsSamplingMaxCount = "spark.cosmos.diagnostics.sampling.maxCount"
Expand Down Expand Up @@ -226,6 +227,7 @@ private[spark] object CosmosConfigNames {
ReadPartitioningFeedRangeFilter,
ReadRuntimeFilteringEnabled,
ReadManyFilteringEnabled,
ReadManyByPkNullHandling,
ViewsRepositoryPath,
DiagnosticsMode,
DiagnosticsSamplingIntervalInSeconds,
Expand Down Expand Up @@ -1042,7 +1044,8 @@ private case class CosmosReadConfig(readConsistencyStrategy: ReadConsistencyStra
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
runtimeFilteringEnabled: Boolean,
readManyFilteringConfig: CosmosReadManyFilteringConfig,
responseContinuationTokenLimitInKb: Option[Int] = None)
responseContinuationTokenLimitInKb: Option[Int] = None,
readManyByPkTreatNullAsNone: Boolean = false)

private object SchemaConversionModes extends Enumeration {
type SchemaConversionMode = Value
Expand Down Expand Up @@ -1136,6 +1139,20 @@ private object CosmosReadConfig {
helpMessage = " Indicates whether dynamic partition pruning filters will be pushed down when applicable."
)

private val ReadManyByPkNullHandling = CosmosConfigEntry[String](
key = CosmosConfigNames.ReadManyByPkNullHandling,
mandatory = false,
defaultValue = Some("Null"),
parseFromStringFunction = value => value,
helpMessage = "Determines how null values in partition key columns are treated for " +
"readManyByPartitionKey. 'Null' (default) maps null to a JSON null via addNullValue(), which " +
"is appropriate when the document field exists with an explicit null value. 'None' maps null " +
"to PartitionKey.NONE via addNoneValue(), which is only supported for single-path partition keys " +
"and should only be used when the partition key path does not exist at all in the document. " +
"Hierarchical partition keys reject this mode. These two semantics hash to DIFFERENT physical " +
"partitions - picking the wrong mode for your data will silently return zero rows."
)

def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
val readConsistencyStrategyOverride = CosmosConfigEntry.parse(cfg, ReadConsistencyStrategyOverride)
Expand All @@ -1158,6 +1175,8 @@ private object CosmosReadConfig {
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val runtimeFilteringEnabled = CosmosConfigEntry.parse(cfg, ReadRuntimeFilteringEnabled)
val readManyFilteringConfig = CosmosReadManyFilteringConfig.parseCosmosReadManyFilterConfig(cfg)
val readManyByPkNullHandling = CosmosConfigEntry.parse(cfg, ReadManyByPkNullHandling)
val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None")
Comment thread
FabianMeiswinkel marked this conversation as resolved.

val effectiveReadConsistencyStrategy = if (readConsistencyStrategyOverride.getOrElse(ReadConsistencyStrategy.DEFAULT) != ReadConsistencyStrategy.DEFAULT) {
readConsistencyStrategyOverride.get
Expand Down Expand Up @@ -1189,7 +1208,8 @@ private object CosmosReadConfig {
throughputControlConfigOpt,
runtimeFilteringEnabled.get,
readManyFilteringConfig,
responseContinuationTokenLimitInKb)
responseContinuationTokenLimitInKb,
readManyByPkTreatNullAsNone)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[cosmos] object CosmosConstants {
val Id = "id"
val ETag = "_etag"
val ItemIdentity = "_itemIdentity"
val PartitionKeyIdentity = "_partitionKeyIdentity"
}

object StatusCodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey}
import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey, PartitionKeyBuilder}
import com.azure.cosmos.spark.CosmosPredicates.assertOnSparkDriver
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.SparkBridgeInternal
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.util
Expand Down Expand Up @@ -112,4 +113,156 @@ object CosmosItemsDataSource {

readManyReader.readMany(df.rdd, readManyFilterExtraction)
}

def readManyByPartitionKey(df: DataFrame, userConfig: java.util.Map[String, String]): DataFrame = {
readManyByPartitionKey(df, userConfig, null)
}

def readManyByPartitionKey(
df: DataFrame,
userConfig: java.util.Map[String, String],
userProvidedSchema: StructType): DataFrame = {
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated

val readManyReader = new CosmosReadManyByPartitionKeyReader(
userProvidedSchema,
userConfig.asScala.toMap)

// Resolve the null-handling config up front so both the UDF path and the PK-column path honor it.
val sharedEffectiveConfig = CosmosConfig.getEffectiveConfig(
databaseName = None,
containerName = None,
userConfig.asScala.toMap)
val sharedReadConfig = CosmosReadConfig.parseCosmosReadConfig(sharedEffectiveConfig)
val sharedTreatNullAsNone = sharedReadConfig.readManyByPkTreatNullAsNone

// Option 1: Look for the _partitionKeyIdentity column (produced by GetCosmosPartitionKeyValue UDF)
val pkIdentityFieldExtraction = df
.schema
.find(field => field.name.equals(CosmosConstants.Properties.PartitionKeyIdentity) && field.dataType.equals(StringType))
Comment thread
FabianMeiswinkel marked this conversation as resolved.
.map(field => (row: Row) => {
val rawValue = row.getString(row.fieldIndex(field.name))
CosmosPartitionKeyHelper.tryParsePartitionKey(rawValue, sharedTreatNullAsNone)
.getOrElse(throw new IllegalArgumentException(
s"Invalid _partitionKeyIdentity value in row: '$rawValue'. " +
"Expected format: pk([...json...])"))
})

// Option 2: Detect PK columns by matching the container's partition key paths against the DataFrame schema
val pkColumnExtraction: Option[Row => PartitionKey] = if (pkIdentityFieldExtraction.isDefined) {
None // no need to resolve PK paths - _partitionKeyIdentity column takes precedence
} else {
val effectiveConfig = sharedEffectiveConfig
val readConfig = sharedReadConfig
val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(effectiveConfig)
val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(None)
val calledFrom = s"CosmosItemsDataSource.readManyByPartitionKey"
val treatNullAsNone = readConfig.readManyByPkTreatNullAsNone

val pkPaths = Loan(
List[Option[CosmosClientCacheItem]](
Some(
CosmosClientCache(
CosmosClientConfiguration(
effectiveConfig,
readConsistencyStrategy = readConfig.readConsistencyStrategy,
sparkEnvironmentInfo),
None,
calledFrom)),
ThroughputControlHelper.getThroughputControlClientCacheItem(
effectiveConfig,
calledFrom,
None,
sparkEnvironmentInfo)
))
.to(clientCacheItems => {
val container =
ThroughputControlHelper.getContainer(
effectiveConfig,
containerConfig,
clientCacheItems(0).get,
clientCacheItems(1))

val pkDefinition = SparkBridgeInternal
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated
.getContainerPropertiesFromCollectionCache(container)
.getPartitionKeyDefinition

pkDefinition.getPaths.asScala.map(_.stripPrefix("/")).toList
})

// Nested PK paths (containing /) cannot be resolved from top-level DataFrame columns.
// Surface an explicit error so users know to use the UDF-produced _partitionKeyIdentity column.
if (pkPaths.exists(_.contains("/"))) {
throw new IllegalArgumentException(
"Container has nested partition key path(s) " + pkPaths.mkString("[", ",", "]") + ". " +
"Nested paths cannot be resolved from DataFrame columns automatically - add a " +
"'_partitionKeyIdentity' column produced by the GetCosmosPartitionKeyValue UDF.")
}

// Check if ALL PK path columns exist in the DataFrame schema
val dfFieldNames = df.schema.fieldNames.toSet
val allPkColumnsPresent = pkPaths.forall(path => dfFieldNames.contains(path))

if (allPkColumnsPresent && pkPaths.nonEmpty) {
// pkPaths already defined above
Some((row: Row) => {
if (pkPaths.size == 1) {
// Single partition key
buildPartitionKey(row.getAs[Any](pkPaths.head), treatNullAsNone)
} else {
// Hierarchical partition key - build level by level
val builder = new PartitionKeyBuilder()
for (path <- pkPaths) {
addPartitionKeyComponent(builder, row.getAs[Any](path), treatNullAsNone, pkPaths.size)
}
builder.build()
}
})
} else {
None
}
}

val pkExtraction = pkIdentityFieldExtraction
.orElse(pkColumnExtraction)
.getOrElse(
throw new IllegalArgumentException(
"Cannot determine partition key extraction from the input DataFrame. " +
"Either add a '_partitionKeyIdentity' column (using the GetCosmosPartitionKeyValue UDF) " +
"or ensure the DataFrame contains columns matching the container's partition key paths."))

readManyReader.readManyByPartitionKey(df.rdd, pkExtraction)
}

private def addPartitionKeyComponent(
builder: PartitionKeyBuilder,
value: Any,
treatNullAsNone: Boolean,
partitionKeyComponentCount: Int): Unit = {
value match {
case s: String => builder.add(s)
case n: Number => builder.add(n.doubleValue())
case b: Boolean => builder.add(b)
case null =>
CosmosPartitionKeyHelper.validateNoneHandlingForPartitionKeyComponentCount(
partitionKeyComponentCount,
treatNullAsNone)
if (treatNullAsNone) builder.addNoneValue()
else builder.addNullValue()
case other =>
// Reject unknown types rather than silently .toString-ing them - the document field
// was stored with its original type and a stringified value will never match.
// Supported types: String, Number (Byte/Short/Int/Long/Float/Double/BigDecimal), Boolean, null.
throw new IllegalArgumentException(
s"Unsupported partition key column type '${other.getClass.getName}' with value '$other'. " +
"Supported types are String, Number (integral or floating-point), Boolean, and null. " +
"For other source types, convert the column before calling readManyByPartitionKey or use " +
"the GetCosmosPartitionKeyValue UDF to produce a '_partitionKeyIdentity' column.")
}
}

private def buildPartitionKey(value: Any, treatNullAsNone: Boolean): PartitionKey = {
val builder = new PartitionKeyBuilder()
addPartitionKeyComponent(builder, value, treatNullAsNone, 1)
builder.build()
}
}
Loading