Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
9770833
Adding readManyByPartitionKey API
FabianMeiswinkel Apr 13, 2026
ac287bc
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 13, 2026
9a5b3e9
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
a8720c3
Update sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmo…
FabianMeiswinkel Apr 14, 2026
d499da7
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
c3c542a
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Apr 14, 2026
4416354
´Fixing code review comments
FabianMeiswinkel Apr 14, 2026
3ab3f0d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 14, 2026
588a755
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8c5cdb4
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
f548552
Update ReadManyByPartitionKeyTest.java
FabianMeiswinkel Apr 15, 2026
f68cf02
Fixing test issues
FabianMeiswinkel Apr 15, 2026
8b6c4b1
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 15, 2026
8ba7f4d
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 15, 2026
56b067a
Reacted to code review feedback
FabianMeiswinkel Apr 16, 2026
fa430e9
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
d9504c9
Fix build issues
FabianMeiswinkel Apr 16, 2026
73151f0
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
681830e
Fixing changelog
FabianMeiswinkel Apr 16, 2026
7f745e6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 16, 2026
0b8905d
Addressing code review comments
FabianMeiswinkel Apr 16, 2026
22abc78
Addressing code review feedback
FabianMeiswinkel Apr 16, 2026
662b1a4
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
c764de9
Update CosmosItemsDataSource.scala
FabianMeiswinkel Apr 17, 2026
e1e6f5a
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
080ce4a
Update RxDocumentClientImpl.java
FabianMeiswinkel Apr 17, 2026
516bbf3
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
b01f875
Fix readManyByPartitionKey retries
FabianMeiswinkel Apr 17, 2026
7130d4a
Fix PK.None
FabianMeiswinkel Apr 17, 2026
93957f3
Update ReadManyByPartitionKeyQueryHelper.java
FabianMeiswinkel Apr 17, 2026
16dd1d6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
9200f8f
Fix code review feedback
FabianMeiswinkel Apr 17, 2026
4a3ea06
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
e2aa124
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 17, 2026
c34341e
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 17, 2026
c96b6f6
Reacting to code review feedback
FabianMeiswinkel Apr 17, 2026
e306fae
React to code review feedback
FabianMeiswinkel Apr 17, 2026
f34270d
Addressing code review comments
FabianMeiswinkel Apr 17, 2026
55ddac3
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 17, 2026
f13de23
Fixing nested PK for readMany / readAllItems
FabianMeiswinkel Apr 20, 2026
b0d7337
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 20, 2026
ce8fca3
Fixed changelogs
FabianMeiswinkel Apr 20, 2026
992ef1a
Disallow TOP for streamable queries in readManyByPartitionKey-API
FabianMeiswinkel Apr 20, 2026
84af9c3
Fixing quote recognition in pk for readManyByPartitionKey API
FabianMeiswinkel Apr 20, 2026
b0b7349
Fixing confusing side effects when calling GetCurrentRow
FabianMeiswinkel Apr 20, 2026
42a2fb8
Fixed code review comments
FabianMeiswinkel Apr 20, 2026
89ac19e
Fixed code review comment
FabianMeiswinkel Apr 20, 2026
ea28c5d
Avoid getting pkPaths (pk-def) per row
FabianMeiswinkel Apr 20, 2026
35e9d73
Update CosmosReadManyByPartitionKeyReader.scala
FabianMeiswinkel Apr 20, 2026
78bbd15
Renamed readManyByPartitionKey to readManyByPartitionKeys
FabianMeiswinkel Apr 20, 2026
dd83125
Fixing code review comments
FabianMeiswinkel Apr 20, 2026
2246e3f
added test
FabianMeiswinkel Apr 20, 2026
fdfa15f
Fixing continuation Token
FabianMeiswinkel Apr 21, 2026
5da1906
Update RxDocumentClientImpl.java
FabianMeiswinkel Apr 21, 2026
79d2487
Update readManyByPartitionKey-design.md
FabianMeiswinkel Apr 21, 2026
f12582f
Fixed code review comments
FabianMeiswinkel Apr 22, 2026
0d3280a
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 22, 2026
5b44e34
Changed the continuation token to not carry partitionScopes anymore
FabianMeiswinkel Apr 22, 2026
da578f6
Update cspell.yaml
FabianMeiswinkel Apr 22, 2026
f393c3f
Fixing test failure
FabianMeiswinkel Apr 22, 2026
280bc87
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 22, 2026
d4722c2
Fixed test issues
FabianMeiswinkel Apr 22, 2026
26df2ab
Update FeedResponse.java
FabianMeiswinkel Apr 22, 2026
a45aabb
Update FeedResponse.java
FabianMeiswinkel Apr 22, 2026
7299a2c
Fixes test issues
FabianMeiswinkel Apr 22, 2026
dc00653
Fixed null handinlgin in ReadManyByParittionKeyToken JsonCreator
FabianMeiswinkel Apr 22, 2026
c47f9a6
Update CosmosAsyncContainer.java
FabianMeiswinkel Apr 22, 2026
1f2f14b
Renamed CosmosReadManyPartitionKeyRequestOptions to CosmosReadManyPar…
FabianMeiswinkel Apr 22, 2026
9c75a32
Code review fixes
FabianMeiswinkel Apr 22, 2026
e8ea368
Fixed test issues
FabianMeiswinkel Apr 22, 2026
f3aa1da
updates to patch sparse checkout to account for spring and cosmos whi…
scbedd Apr 23, 2026
2c404e6
Update eng/scripts/Generate-ServiceDirectories-From-Project-List.ps1
scbedd Apr 23, 2026
112cf71
Fixing code review comments
FabianMeiswinkel Apr 23, 2026
348257a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Apr 23, 2026
bdc5a2d
Merge branch 'patch-sparse-checkout' of https://github.com/Azure/azur…
FabianMeiswinkel Apr 23, 2026
cbd8a93
Fixes code review comments
FabianMeiswinkel Apr 23, 2026
cf678fa
Fixed code review comments
FabianMeiswinkel Apr 23, 2026
17cfe75
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
bbfa698
Added e2e test for resuming readManyByPartitionKeys with continuation…
FabianMeiswinkel Apr 23, 2026
587cac6
Added and fixed tests
FabianMeiswinkel Apr 23, 2026
6e1cf84
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
59f3d44
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
81a99f3
Update CosmosItemSerializerNoExceptionWrapping.scala
FabianMeiswinkel Apr 23, 2026
2d8f2a0
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 23, 2026
5beab22
Update ReadManyByPartitionKeyContinuationToken.java
FabianMeiswinkel Apr 23, 2026
853bd04
Update DocumentQueryExecutionContextFactory.java
FabianMeiswinkel Apr 23, 2026
0fc0cc6
Merge branch 'main' into users/fabianm/readManyByPK
FabianMeiswinkel Apr 23, 2026
53ad60d
Update FaultInjectionWithAvailabilityStrategyTestsBase.java
FabianMeiswinkel Apr 23, 2026
970d209
Merge branch 'users/fabianm/readManyByPK' of https://github.com/Azure…
FabianMeiswinkel Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.47.0-beta.1 (Unreleased)

#### Features Added
* Added new `CosmosItemsDataSource.readManyByPartitionKey` Spark function to execute bulk queries by a list of pk-values with better efficiency. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.47.0-beta.1 (Unreleased)

#### Features Added
* Added new `CosmosItemsDataSource.readManyByPartitionKey` Spark function to execute bulk queries by a list of pk-values with better efficiency. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.47.0-beta.1 (Unreleased)

#### Features Added
* Added new `CosmosItemsDataSource.readManyByPartitionKey` Spark function to execute bulk queries by a list of pk-values with better efficiency. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.47.0-beta.1 (Unreleased)

#### Features Added
* Added new `CosmosItemsDataSource.readManyByPartitionKey` Spark function to execute bulk queries by a list of pk-values with better efficiency. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private[spark] object CosmosConfigNames {
val ReadPartitioningFeedRangeFilter = "spark.cosmos.partitioning.feedRangeFilter"
val ReadRuntimeFilteringEnabled = "spark.cosmos.read.runtimeFiltering.enabled"
val ReadManyFilteringEnabled = "spark.cosmos.read.readManyFiltering.enabled"
val ReadManyByPkNullHandling = "spark.cosmos.read.readManyByPk.nullHandling"
val ViewsRepositoryPath = "spark.cosmos.views.repositoryPath"
val DiagnosticsMode = "spark.cosmos.diagnostics"
val DiagnosticsSamplingMaxCount = "spark.cosmos.diagnostics.sampling.maxCount"
Expand Down Expand Up @@ -226,6 +227,7 @@ private[spark] object CosmosConfigNames {
ReadPartitioningFeedRangeFilter,
ReadRuntimeFilteringEnabled,
ReadManyFilteringEnabled,
ReadManyByPkNullHandling,
ViewsRepositoryPath,
DiagnosticsMode,
DiagnosticsSamplingIntervalInSeconds,
Expand Down Expand Up @@ -1042,7 +1044,8 @@ private case class CosmosReadConfig(readConsistencyStrategy: ReadConsistencyStra
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
runtimeFilteringEnabled: Boolean,
readManyFilteringConfig: CosmosReadManyFilteringConfig,
responseContinuationTokenLimitInKb: Option[Int] = None)
responseContinuationTokenLimitInKb: Option[Int] = None,
readManyByPkTreatNullAsNone: Boolean = false)

private object SchemaConversionModes extends Enumeration {
type SchemaConversionMode = Value
Expand Down Expand Up @@ -1136,6 +1139,18 @@ private object CosmosReadConfig {
helpMessage = " Indicates whether dynamic partition pruning filters will be pushed down when applicable."
)

private val ReadManyByPkNullHandling = CosmosConfigEntry[String](
key = CosmosConfigNames.ReadManyByPkNullHandling,
mandatory = false,
defaultValue = Some("Null"),
parseFromStringFunction = value => value,
helpMessage = "Determines how null values in hierarchical partition key components are treated " +
"for readManyByPartitionKey. 'Null' (default) maps null to a JSON null value via addNullValue(), " +
"which is appropriate when the document field exists with an explicit null value. " +
"'None' maps null to PartitionKey.NONE via addNoneValue(), which should only be used when the " +
"partition key path does not exist at all in the document."
)

def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
val readConsistencyStrategyOverride = CosmosConfigEntry.parse(cfg, ReadConsistencyStrategyOverride)
Expand All @@ -1158,6 +1173,8 @@ private object CosmosReadConfig {
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val runtimeFilteringEnabled = CosmosConfigEntry.parse(cfg, ReadRuntimeFilteringEnabled)
val readManyFilteringConfig = CosmosReadManyFilteringConfig.parseCosmosReadManyFilterConfig(cfg)
val readManyByPkNullHandling = CosmosConfigEntry.parse(cfg, ReadManyByPkNullHandling)
val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None")
Comment thread
FabianMeiswinkel marked this conversation as resolved.

val effectiveReadConsistencyStrategy = if (readConsistencyStrategyOverride.getOrElse(ReadConsistencyStrategy.DEFAULT) != ReadConsistencyStrategy.DEFAULT) {
readConsistencyStrategyOverride.get
Expand Down Expand Up @@ -1189,7 +1206,8 @@ private object CosmosReadConfig {
throughputControlConfigOpt,
runtimeFilteringEnabled.get,
readManyFilteringConfig,
responseContinuationTokenLimitInKb)
responseContinuationTokenLimitInKb,
readManyByPkTreatNullAsNone)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[cosmos] object CosmosConstants {
val Id = "id"
val ETag = "_etag"
val ItemIdentity = "_itemIdentity"
val PartitionKeyIdentity = "_partitionKeyIdentity"
}

object StatusCodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey}
import com.azure.cosmos.models.{CosmosItemIdentity, PartitionKey, PartitionKeyBuilder}
import com.azure.cosmos.spark.CosmosPredicates.assertOnSparkDriver
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.{SparkBridgeInternal}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.util
Expand Down Expand Up @@ -112,4 +113,127 @@ object CosmosItemsDataSource {

readManyReader.readMany(df.rdd, readManyFilterExtraction)
}

def readManyByPartitionKey(df: DataFrame, userConfig: java.util.Map[String, String]): DataFrame = {
readManyByPartitionKey(df, userConfig, null)
}

def readManyByPartitionKey(
df: DataFrame,
userConfig: java.util.Map[String, String],
userProvidedSchema: StructType): DataFrame = {
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated

val readManyReader = new CosmosReadManyByPartitionKeyReader(
userProvidedSchema,
userConfig.asScala.toMap)

// Option 1: Look for the _partitionKeyIdentity column (produced by GetCosmosPartitionKeyValue UDF)
val pkIdentityFieldExtraction = df
.schema
.find(field => field.name.equals(CosmosConstants.Properties.PartitionKeyIdentity) && field.dataType.equals(StringType))
Comment thread
FabianMeiswinkel marked this conversation as resolved.
.map(field => (row: Row) => {
val rawValue = row.getString(row.fieldIndex(field.name))
CosmosPartitionKeyHelper.tryParsePartitionKey(rawValue)
.getOrElse(throw new IllegalArgumentException(
s"Invalid _partitionKeyIdentity value in row: '$rawValue'. " +
"Expected format: pk([...json...])"))
})

// Option 2: Detect PK columns by matching the container's partition key paths against the DataFrame schema
val pkColumnExtraction: Option[Row => PartitionKey] = if (pkIdentityFieldExtraction.isDefined) {
None // no need to resolve PK paths - _partitionKeyIdentity column takes precedence
} else {
val effectiveConfig = CosmosConfig.getEffectiveConfig(
databaseName = None,
containerName = None,
userConfig.asScala.toMap)
val readConfig = CosmosReadConfig.parseCosmosReadConfig(effectiveConfig)
val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(effectiveConfig)
val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(None)
val calledFrom = s"CosmosItemsDataSource.readManyByPartitionKey"
val treatNullAsNone = readConfig.readManyByPkTreatNullAsNone

val pkPaths = Loan(
List[Option[CosmosClientCacheItem]](
Some(
CosmosClientCache(
CosmosClientConfiguration(
effectiveConfig,
readConsistencyStrategy = readConfig.readConsistencyStrategy,
sparkEnvironmentInfo),
None,
calledFrom)),
ThroughputControlHelper.getThroughputControlClientCacheItem(
effectiveConfig,
calledFrom,
None,
sparkEnvironmentInfo)
))
.to(clientCacheItems => {
val container =
ThroughputControlHelper.getContainer(
effectiveConfig,
containerConfig,
clientCacheItems(0).get,
clientCacheItems(1))

val pkDefinition = SparkBridgeInternal
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated
Comment thread
FabianMeiswinkel marked this conversation as resolved.
Outdated
.getContainerPropertiesFromCollectionCache(container)
.getPartitionKeyDefinition

pkDefinition.getPaths.asScala.map(_.stripPrefix("/")).toList
})

// Check if ALL PK path columns exist in the DataFrame schema
val dfFieldNames = df.schema.fieldNames.toSet
val allPkColumnsPresent = pkPaths.forall(path => dfFieldNames.contains(path))

if (allPkColumnsPresent && pkPaths.nonEmpty) {
// pkPaths already defined above
Some((row: Row) => {
if (pkPaths.size == 1) {
// Single partition key
buildPartitionKey(row.getAs[Any](pkPaths.head), treatNullAsNone)
} else {
// Hierarchical partition key — build level by level
val builder = new PartitionKeyBuilder()
for (path <- pkPaths) {
addPartitionKeyComponent(builder, row.getAs[Any](path), treatNullAsNone)
}
builder.build()
}
})
} else {
None
}
}

val pkExtraction = pkIdentityFieldExtraction
.orElse(pkColumnExtraction)
.getOrElse(
throw new IllegalArgumentException(
"Cannot determine partition key extraction from the input DataFrame. " +
"Either add a '_partitionKeyIdentity' column (using the GetCosmosPartitionKeyValue UDF) " +
"or ensure the DataFrame contains columns matching the container's partition key paths."))

readManyReader.readManyByPartitionKey(df.rdd, pkExtraction)
}

private def addPartitionKeyComponent(builder: PartitionKeyBuilder, value: Any, treatNullAsNone: Boolean): Unit = {
value match {
case s: String => builder.add(s)
case n: Number => builder.add(n.doubleValue())
case b: Boolean => builder.add(b)
case null =>
if (treatNullAsNone) builder.addNoneValue()
else builder.addNullValue()
case other => builder.add(other.toString)
}
}

private def buildPartitionKey(value: Any, treatNullAsNone: Boolean): PartitionKey = {
val builder = new PartitionKeyBuilder()
addPartitionKeyComponent(builder, value, treatNullAsNone)
builder.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.spark

import com.azure.cosmos.implementation.routing.PartitionKeyInternal
import com.azure.cosmos.implementation.{ImplementationBridgeHelpers, Utils}
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait

import java.util

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

private[spark] object CosmosPartitionKeyHelper extends BasicLoggingTrait {
// pattern will be recognized
// pk(partitionKeyValue)
//
// (?i) : The whole matching is case-insensitive
// pk[(](.*)[)]: partitionKey Value
private val cosmosPartitionKeyStringRegx = """(?i)pk[(](.*)[)]""".r
private val objectMapper = Utils.getSimpleObjectMapper

def getCosmosPartitionKeyValueString(partitionKeyValue: List[Object]): String = {
s"pk(${objectMapper.writeValueAsString(partitionKeyValue.asJava)})"
}

def tryParsePartitionKey(cosmosPartitionKeyString: String): Option[PartitionKey] = {
cosmosPartitionKeyString match {
Comment thread
FabianMeiswinkel marked this conversation as resolved.
case cosmosPartitionKeyStringRegx(pkValue) =>
scala.util.Try(Utils.parse(pkValue, classOf[Object])).toOption.flatMap {
case arrayList: util.ArrayList[Object @unchecked] =>
Some(
ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor
.toPartitionKey(PartitionKeyInternal.fromObjectArray(arrayList.toArray, false)))
case other => Some(new PartitionKey(other))
}
case _ => None
}
}
}
Loading
Loading