Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9770833
Adding readManyByPartitionKey API
FabianMeiswinkel Apr 13, 2026
ac287bc
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 13, 2026
9a5b3e9
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
a8720c3
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
d499da7
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
c3c542a
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
4416354
´Fixing code review comments
FabianMeiswinkel Apr 14, 2026
3ab3f0d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 14, 2026
588a755
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8c5cdb4
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
f548552
Update ReadManyByPartitionKeyTest.java
FabianMeiswinkel Apr 15, 2026
f68cf02
Fixing test issues
FabianMeiswinkel Apr 15, 2026
8b6c4b1
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8ba7f4d
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
56b067a
Reacted to code review feedback
FabianMeiswinkel Apr 16, 2026
fa430e9
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
d9504c9
Fix build issues
FabianMeiswinkel Apr 16, 2026
73151f0
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
681830e
Fixing changelog
FabianMeiswinkel Apr 16, 2026
7f745e6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
0b8905d
Addressing code review comments
FabianMeiswinkel Apr 16, 2026
22abc78
Addressing code review feedback
FabianMeiswinkel Apr 16, 2026
662b1a4
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
c764de9
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
e1e6f5a
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
080ce4a
Update RxDocumentClientImpl.java
FabianMeiswinkel Apr 17, 2026
516bbf3
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
b01f875
Fix readManyByPartitionKey retries
FabianMeiswinkel Apr 17, 2026
7130d4a
Fix PK.None
FabianMeiswinkel Apr 17, 2026
93957f3
Update ReadManyByPartitionKeyQueryHelper.java
FabianMeiswinkel Apr 17, 2026
16dd1d6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
9200f8f
Fix code review feedback
FabianMeiswinkel Apr 17, 2026
4a3ea06
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
e2aa124
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
c34341e
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
c96b6f6
Reacting to code review feedback
FabianMeiswinkel Apr 17, 2026
e306fae
React to code review feedback
FabianMeiswinkel Apr 17, 2026
f34270d
Addressing code review comments
FabianMeiswinkel Apr 17, 2026
55ddac3
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

metastore_db/*
spark-warehouse/*

.temp/
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.TestConfigurations
import com.azure.cosmos.models.{CosmosContainerProperties, CosmosItemRequestOptions, PartitionKey, PartitionKeyDefinition, PartitionKeyDefinitionVersion, PartitionKind, ThroughputProperties}
import com.azure.cosmos.spark.udf.GetCosmosPartitionKeyValue
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.StringType

import java.util.UUID
import java.util.{ArrayList, UUID}

import scala.collection.JavaConverters._

class SparkE2EQueryITest
extends SparkE2EQueryITestBase {

// scalastyle:off multiple.string.literals
"spark query" can "return proper Cosmos specific query plan on explain with nullable properties" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down Expand Up @@ -67,4 +74,115 @@ class SparkE2EQueryITest
val item = rowsArray(0)
item.getAs[String]("id") shouldEqual id
}
}

"spark readManyByPartitionKey" can "use a matching top-level partition key column without the UDF" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey)
val requestOptions = new CosmosItemRequestOptions()

Seq("pkA", "pkB").foreach { pkValue =>
val item = objectMapper.createObjectNode()
item.put("id", s"item-$pkValue")
item.put("pk", pkValue)
item.put("payload", s"value-$pkValue")

container.createItem(item, new PartitionKey(pkValue), requestOptions).block()
}

val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainersWithPkAsPartitionKey,
"spark.cosmos.read.inferSchema.enabled" -> "true"
)

val sparkSession = spark
import sparkSession.implicits._

val rows = CosmosItemsDataSource
.readManyByPartitionKey(Seq("pkA", "pkB").toDF("pk"), cfg.asJava)
.selectExpr("id", "pk", "payload")
.collect()

rows should have size 2
rows.map(_.getAs[String]("id")).toSet shouldEqual Set("item-pkA", "item-pkB")
rows.map(_.getAs[String]("pk")).toSet shouldEqual Set("pkA", "pkB")
rows.map(_.getAs[String]("payload")).toSet shouldEqual Set("value-pkA", "value-pkB")
}
"spark readManyByPartitionKey" can "require the UDF for nested partition key paths and succeed with it" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val containerName = s"nested-pk-${UUID.randomUUID()}"

val pkPaths = new ArrayList[String]()
pkPaths.add("/tenant/id")

val pkDefinition = new PartitionKeyDefinition()
pkDefinition.setPaths(pkPaths)
pkDefinition.setKind(PartitionKind.HASH)
pkDefinition.setVersion(PartitionKeyDefinitionVersion.V2)

val containerProperties = new CosmosContainerProperties(containerName, pkDefinition)
cosmosClient
.getDatabase(cosmosDatabase)
.createContainerIfNotExists(containerProperties, ThroughputProperties.createManualThroughput(400))
.block()

try {
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName)
val requestOptions = new CosmosItemRequestOptions()

Seq("tenantA", "tenantB").foreach { tenantId =>
val item = objectMapper.createObjectNode()
item.put("id", s"item-$tenantId")
item.put("payload", s"value-$tenantId")
item.putObject("tenant").put("id", tenantId)

container.createItem(item, new PartitionKey(tenantId), requestOptions).block()
}

val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> containerName,
"spark.cosmos.read.inferSchema.enabled" -> "true"
)

val sparkSession = spark
import sparkSession.implicits._

val missingUdfError = the[IllegalArgumentException] thrownBy {
CosmosItemsDataSource.readManyByPartitionKey(Seq("tenantA").toDF("tenantId"), cfg.asJava)
}

missingUdfError.getMessage should include("Nested paths cannot be resolved from DataFrame columns automatically")
missingUdfError.getMessage should include("_partitionKeyIdentity")

spark.udf.register("GetCosmosPartitionKeyValue", new GetCosmosPartitionKeyValue(), StringType)

val inputDf = Seq("tenantA", "tenantB")
.toDF("tenantId")
.withColumn("_partitionKeyIdentity", expr("GetCosmosPartitionKeyValue(tenantId)"))

val rows = CosmosItemsDataSource
.readManyByPartitionKey(inputDf, cfg.asJava)
.selectExpr("id", "tenant.id as tenantId")
.collect()

rows should have size 2
rows.map(_.getAs[String]("id")).toSet shouldEqual Set("item-tenantA", "item-tenantB")
rows.map(_.getAs[String]("tenantId")).toSet shouldEqual Set("tenantA", "tenantB")
} finally {
cosmosClient
.getDatabase(cosmosDatabase)
.getContainer(containerName)
.delete()
.block()
}
}

// scalastyle:on multiple.string.literals
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private[spark] object CosmosConfigNames {
val ReadPartitioningFeedRangeFilter = "spark.cosmos.partitioning.feedRangeFilter"
val ReadRuntimeFilteringEnabled = "spark.cosmos.read.runtimeFiltering.enabled"
val ReadManyFilteringEnabled = "spark.cosmos.read.readManyFiltering.enabled"
val ReadManyByPkNullHandling = "spark.cosmos.read.readManyByPk.nullHandling"
val ViewsRepositoryPath = "spark.cosmos.views.repositoryPath"
val DiagnosticsMode = "spark.cosmos.diagnostics"
val DiagnosticsSamplingMaxCount = "spark.cosmos.diagnostics.sampling.maxCount"
Expand Down Expand Up @@ -226,6 +227,7 @@ private[spark] object CosmosConfigNames {
ReadPartitioningFeedRangeFilter,
ReadRuntimeFilteringEnabled,
ReadManyFilteringEnabled,
ReadManyByPkNullHandling,
ViewsRepositoryPath,
DiagnosticsMode,
DiagnosticsSamplingIntervalInSeconds,
Expand Down Expand Up @@ -1042,7 +1044,8 @@ private case class CosmosReadConfig(readConsistencyStrategy: ReadConsistencyStra
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
runtimeFilteringEnabled: Boolean,
readManyFilteringConfig: CosmosReadManyFilteringConfig,
responseContinuationTokenLimitInKb: Option[Int] = None)
responseContinuationTokenLimitInKb: Option[Int] = None,
readManyByPkTreatNullAsNone: Boolean = false)

private object SchemaConversionModes extends Enumeration {
type SchemaConversionMode = Value
Expand Down Expand Up @@ -1136,6 +1139,20 @@ private object CosmosReadConfig {
helpMessage = " Indicates whether dynamic partition pruning filters will be pushed down when applicable."
)

private val ReadManyByPkNullHandling = CosmosConfigEntry[String](
key = CosmosConfigNames.ReadManyByPkNullHandling,
mandatory = false,
defaultValue = Some("Null"),
parseFromStringFunction = value => value,
helpMessage = "Determines how null values in partition key columns are treated for " +
"readManyByPartitionKey. 'Null' (default) maps null to a JSON null via addNullValue(), which " +
"is appropriate when the document field exists with an explicit null value. 'None' maps null " +
"to PartitionKey.NONE via addNoneValue(), which is only supported for single-path partition keys " +
"and should only be used when the partition key path does not exist at all in the document. " +
"Hierarchical partition keys reject this mode. These two semantics hash to DIFFERENT physical " +
"partitions - picking the wrong mode for your data will silently return zero rows."
)

def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
val readConsistencyStrategyOverride = CosmosConfigEntry.parse(cfg, ReadConsistencyStrategyOverride)
Expand All @@ -1158,6 +1175,8 @@ private object CosmosReadConfig {
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val runtimeFilteringEnabled = CosmosConfigEntry.parse(cfg, ReadRuntimeFilteringEnabled)
val readManyFilteringConfig = CosmosReadManyFilteringConfig.parseCosmosReadManyFilterConfig(cfg)
val readManyByPkNullHandling = CosmosConfigEntry.parse(cfg, ReadManyByPkNullHandling)
val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None")
Comment thread
FabianMeiswinkel marked this conversation as resolved.

val effectiveReadConsistencyStrategy = if (readConsistencyStrategyOverride.getOrElse(ReadConsistencyStrategy.DEFAULT) != ReadConsistencyStrategy.DEFAULT) {
readConsistencyStrategyOverride.get
Expand Down Expand Up @@ -1189,7 +1208,8 @@ private object CosmosReadConfig {
throughputControlConfigOpt,
runtimeFilteringEnabled.get,
readManyFilteringConfig,
responseContinuationTokenLimitInKb)
responseContinuationTokenLimitInKb,
readManyByPkTreatNullAsNone)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[cosmos] object CosmosConstants {
val Id = "id"
val ETag = "_etag"
val ItemIdentity = "_itemIdentity"
val PartitionKeyIdentity = "_partitionKeyIdentity"
}

object StatusCodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey}
import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey, PartitionKeyBuilder}
import com.azure.cosmos.spark.CosmosPredicates.assertOnSparkDriver
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.SparkBridgeInternal
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.util
Expand Down Expand Up @@ -112,4 +113,156 @@ object CosmosItemsDataSource {

readManyReader.readMany(df.rdd, readManyFilterExtraction)
}

def readManyByPartitionKey(df: DataFrame, userConfig: java.util.Map[String, String]): DataFrame = {
readManyByPartitionKey(df, userConfig, null)
}

def readManyByPartitionKey(
df: DataFrame,
userConfig: java.util.Map[String, String],
userProvidedSchema: StructType): DataFrame = {
Comment thread
FabianMeiswinkel marked this conversation as resolved.

val readManyReader = new CosmosReadManyByPartitionKeyReader(
userProvidedSchema,
userConfig.asScala.toMap)

// Resolve the null-handling config up front so both the UDF path and the PK-column path honor it.
val sharedEffectiveConfig = CosmosConfig.getEffectiveConfig(
databaseName = None,
containerName = None,
userConfig.asScala.toMap)
val sharedReadConfig = CosmosReadConfig.parseCosmosReadConfig(sharedEffectiveConfig)
val sharedTreatNullAsNone = sharedReadConfig.readManyByPkTreatNullAsNone

// Option 1: Look for the _partitionKeyIdentity column (produced by GetCosmosPartitionKeyValue UDF)
val pkIdentityFieldExtraction = df
.schema
.find(field => field.name.equals(CosmosConstants.Properties.PartitionKeyIdentity) && field.dataType.equals(StringType))
Comment thread
FabianMeiswinkel marked this conversation as resolved.
.map(field => (row: Row) => {
val rawValue = row.getString(row.fieldIndex(field.name))
CosmosPartitionKeyHelper.tryParsePartitionKey(rawValue, sharedTreatNullAsNone)
.getOrElse(throw new IllegalArgumentException(
s"Invalid _partitionKeyIdentity value in row: '$rawValue'. " +
"Expected format: pk([...json...])"))
})

// Option 2: Detect PK columns by matching the container's partition key paths against the DataFrame schema
val pkColumnExtraction: Option[Row => PartitionKey] = if (pkIdentityFieldExtraction.isDefined) {
None // no need to resolve PK paths - _partitionKeyIdentity column takes precedence
} else {
val effectiveConfig = sharedEffectiveConfig
val readConfig = sharedReadConfig
val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(effectiveConfig)
val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(None)
val calledFrom = s"CosmosItemsDataSource.readManyByPartitionKey"
val treatNullAsNone = readConfig.readManyByPkTreatNullAsNone

val pkPaths = Loan(
List[Option[CosmosClientCacheItem]](
Some(
CosmosClientCache(
CosmosClientConfiguration(
effectiveConfig,
readConsistencyStrategy = readConfig.readConsistencyStrategy,
sparkEnvironmentInfo),
None,
calledFrom)),
ThroughputControlHelper.getThroughputControlClientCacheItem(
effectiveConfig,
calledFrom,
None,
sparkEnvironmentInfo)
))
.to(clientCacheItems => {
val container =
ThroughputControlHelper.getContainer(
effectiveConfig,
containerConfig,
clientCacheItems(0).get,
clientCacheItems(1))

val pkDefinition = SparkBridgeInternal
.getContainerPropertiesFromCollectionCache(container)
.getPartitionKeyDefinition

pkDefinition.getPaths.asScala.map(_.stripPrefix("/")).toList
})

// Nested PK paths (containing /) cannot be resolved from top-level DataFrame columns.
// Surface an explicit error so users know to use the UDF-produced _partitionKeyIdentity column.
if (pkPaths.exists(_.contains("/"))) {
throw new IllegalArgumentException(
"Container has nested partition key path(s) " + pkPaths.mkString("[", ",", "]") + ". " +
"Nested paths cannot be resolved from DataFrame columns automatically - add a " +
"'_partitionKeyIdentity' column produced by the GetCosmosPartitionKeyValue UDF.")
}

// Check if ALL PK path columns exist in the DataFrame schema
val dfFieldNames = df.schema.fieldNames.toSet
val allPkColumnsPresent = pkPaths.forall(path => dfFieldNames.contains(path))

if (allPkColumnsPresent && pkPaths.nonEmpty) {
// pkPaths already defined above
Some((row: Row) => {
if (pkPaths.size == 1) {
// Single partition key
buildPartitionKey(row.getAs[Any](pkPaths.head), treatNullAsNone)
} else {
// Hierarchical partition key - build level by level
val builder = new PartitionKeyBuilder()
for (path <- pkPaths) {
addPartitionKeyComponent(builder, row.getAs[Any](path), treatNullAsNone, pkPaths.size)
}
builder.build()
}
})
} else {
None
}
}

val pkExtraction = pkIdentityFieldExtraction
.orElse(pkColumnExtraction)
.getOrElse(
throw new IllegalArgumentException(
"Cannot determine partition key extraction from the input DataFrame. " +
"Either add a '_partitionKeyIdentity' column (using the GetCosmosPartitionKeyValue UDF) " +
"or ensure the DataFrame contains columns matching the container's partition key paths."))

readManyReader.readManyByPartitionKey(df.rdd, pkExtraction)
}

private def addPartitionKeyComponent(
builder: PartitionKeyBuilder,
value: Any,
treatNullAsNone: Boolean,
partitionKeyComponentCount: Int): Unit = {
value match {
case s: String => builder.add(s)
case n: Number => builder.add(n.doubleValue())
case b: Boolean => builder.add(b)
case null =>
CosmosPartitionKeyHelper.validateNoneHandlingForPartitionKeyComponentCount(
partitionKeyComponentCount,
treatNullAsNone)
if (treatNullAsNone) builder.addNoneValue()
else builder.addNullValue()
case other =>
// Reject unknown types rather than silently .toString-ing them - the document field
// was stored with its original type and a stringified value will never match.
// Supported types: String, Number (Byte/Short/Int/Long/Float/Double/BigDecimal), Boolean, null.
throw new IllegalArgumentException(
s"Unsupported partition key column type '${other.getClass.getName}' with value '$other'. " +
"Supported types are String, Number (integral or floating-point), Boolean, and null. " +
"For other source types, convert the column before calling readManyByPartitionKey or use " +
"the GetCosmosPartitionKeyValue UDF to produce a '_partitionKeyIdentity' column.")
}
}

private def buildPartitionKey(value: Any, treatNullAsNone: Boolean): PartitionKey = {
val builder = new PartitionKeyBuilder()
addPartitionKeyComponent(builder, value, treatNullAsNone, 1)
builder.build()
}
}
Loading