Skip to content

Commit 766c068

Browse files
Adds option to configure max batch size in readManyByPartitionKeys (#48930)
* Adds option to configure max batch size in readManyByPartitionKeys * Update changelogs * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyByPartitionKeysRequestOptions.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Addressed code review comments --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5ff691f commit 766c068

17 files changed

Lines changed: 216 additions & 13 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Features Added
66
* Added new `CosmosItemsDataSource.readManyByPartitionKeys` Spark function to execute bulk queries by a list of pk-values with better efficiency. Configure null handling via `spark.cosmos.read.readManyByPk.nullHandling` - default `Null` treats a null PK column as JSON null (`addNullValue`), `None` treats it as `PartitionKey.NONE` (`addNoneValue` / `NOT IS_DEFINED`). These route to different physical partitions - picking the wrong mode silently returns zero rows. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
77
* Added Spark config `spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch` (default `1`) to bound the per-task prefetch parallelism the SDK uses inside `readManyByPartitionKeys`. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
8+
* Added Spark config `spark.cosmos.read.readManyByPk.maxBatchSize` (default `100`) to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
89

910
#### Breaking Changes
1011

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Features Added
66
* Added new `CosmosItemsDataSource.readManyByPartitionKeys` Spark function to execute bulk queries by a list of pk-values with better efficiency. Configure null handling via `spark.cosmos.read.readManyByPk.nullHandling` - default `Null` treats a null PK column as JSON null (`addNullValue`), `None` treats it as `PartitionKey.NONE` (`addNoneValue` / `NOT IS_DEFINED`). These route to different physical partitions - picking the wrong mode silently returns zero rows. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
77
* Added Spark config `spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch` (default `1`) to bound the per-task prefetch parallelism the SDK uses inside `readManyByPartitionKeys`. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
8+
* Added Spark config `spark.cosmos.read.readManyByPk.maxBatchSize` (default `100`) to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
89

910
#### Breaking Changes
1011

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Features Added
66
* Added new `CosmosItemsDataSource.readManyByPartitionKeys` Spark function to execute bulk queries by a list of pk-values with better efficiency. Configure null handling via `spark.cosmos.read.readManyByPk.nullHandling` - default `Null` treats a null PK column as JSON null (`addNullValue`), `None` treats it as `PartitionKey.NONE` (`addNoneValue` / `NOT IS_DEFINED`). These route to different physical partitions - picking the wrong mode silently returns zero rows. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
77
* Added Spark config `spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch` (default `1`) to bound the per-task prefetch parallelism the SDK uses inside `readManyByPartitionKeys`. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
8+
* Added Spark config `spark.cosmos.read.readManyByPk.maxBatchSize` (default `100`) to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
89

910
#### Breaking Changes
1011

sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Features Added
66
* Added new `CosmosItemsDataSource.readManyByPartitionKeys` Spark function to execute bulk queries by a list of pk-values with better efficiency. Configure null handling via `spark.cosmos.read.readManyByPk.nullHandling` - default `Null` treats a null PK column as JSON null (`addNullValue`), `None` treats it as `PartitionKey.NONE` (`addNoneValue` / `NOT IS_DEFINED`). These route to different physical partitions - picking the wrong mode silently returns zero rows. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
77
* Added Spark config `spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch` (default `1`) to bound the per-task prefetch parallelism the SDK uses inside `readManyByPartitionKeys`. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
8+
* Added Spark config `spark.cosmos.read.readManyByPk.maxBatchSize` (default `100`) to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
89

910
#### Breaking Changes
1011

sdk/cosmos/azure-cosmos-spark_3/dev/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true
4848
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-5_2-12 clean install
4949
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-5_2-13 clean install
5050
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_4-0_2-13 clean install
51+
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_4-1_2-13 clean install
5152
```
5253

5354
Take these files:

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private[spark] object CosmosConfigNames {
9494
val ReadManyFilteringEnabled = "spark.cosmos.read.readManyFiltering.enabled"
9595
val ReadManyByPkNullHandling = "spark.cosmos.read.readManyByPk.nullHandling"
9696
val ReadManyByPkMaxConcurrentBatchPrefetch = "spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch"
97+
val ReadManyByPkMaxBatchSize = "spark.cosmos.read.readManyByPk.maxBatchSize"
9798
val ViewsRepositoryPath = "spark.cosmos.views.repositoryPath"
9899
val DiagnosticsMode = "spark.cosmos.diagnostics"
99100
val DiagnosticsSamplingMaxCount = "spark.cosmos.diagnostics.sampling.maxCount"
@@ -230,6 +231,7 @@ private[spark] object CosmosConfigNames {
230231
ReadManyFilteringEnabled,
231232
ReadManyByPkNullHandling,
232233
ReadManyByPkMaxConcurrentBatchPrefetch,
234+
ReadManyByPkMaxBatchSize,
233235
ViewsRepositoryPath,
234236
DiagnosticsMode,
235237
DiagnosticsSamplingIntervalInSeconds,
@@ -1048,7 +1050,8 @@ private case class CosmosReadConfig(readConsistencyStrategy: ReadConsistencyStra
10481050
readManyFilteringConfig: CosmosReadManyFilteringConfig,
10491051
responseContinuationTokenLimitInKb: Option[Int] = None,
10501052
readManyByPkTreatNullAsNone: Boolean = false,
1051-
readManyByPkMaxConcurrentBatchPrefetch: Int = 1)
1053+
readManyByPkMaxConcurrentBatchPrefetch: Option[Int] = None,
1054+
readManyByPkMaxBatchSize: Option[Int] = None)
10521055

10531056
private object SchemaConversionModes extends Enumeration {
10541057
type SchemaConversionMode = Value
@@ -1159,12 +1162,25 @@ private object CosmosReadConfig {
11591162
private val ReadManyByPkMaxConcurrentBatchPrefetch = CosmosConfigEntry[Int](
11601163
key = CosmosConfigNames.ReadManyByPkMaxConcurrentBatchPrefetch,
11611164
mandatory = false,
1162-
defaultValue = Some(1),
1165+
defaultValue = None,
11631166
parseFromStringFunction = value => Math.min(64, Math.max(1, value.toInt)),
11641167
helpMessage = "The maximum number of per-physical-partition batches whose first page is prefetched " +
1165-
"concurrently inside a single Spark task by the SDK's readManyByPartitionKeys execution. The " +
1166-
"default is `1` - max is `64`, because Spark already parallelises across tasks - increase this when individual " +
1167-
"tasks span many physical partitions and additional intra-task prefetch is desired."
1168+
"concurrently inside a single Spark task by the SDK's readManyByPartitionKeys execution. When " +
1169+
"not set, the SDK default (`min(cpuCnt, 8)`) is used. Max is `64`, because Spark already " +
1170+
"parallelises across tasks - increase this when individual tasks span many physical partitions " +
1171+
"and additional intra-task prefetch is desired."
1172+
)
1173+
1174+
private val ReadManyByPkMaxBatchSize = CosmosConfigEntry[Int](
1175+
key = CosmosConfigNames.ReadManyByPkMaxBatchSize,
1176+
mandatory = false,
1177+
defaultValue = None,
1178+
parseFromStringFunction = value => Math.max(1, value.toInt),
1179+
helpMessage = "The maximum number of partition key values per batch query sent to a single " +
1180+
"physical partition. When not set, the SDK default (currently `100`, overridable via the " +
1181+
"`COSMOS.READ_MANY_BY_PK_MAX_BATCH_SIZE` system property / environment variable) is used. " +
1182+
"Increasing this value reduces the number of batches (and round-trips) but produces larger " +
1183+
"IN-clause queries that consume more RUs per request."
11681184
)
11691185

11701186
def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
@@ -1191,7 +1207,8 @@ private object CosmosReadConfig {
11911207
val readManyFilteringConfig = CosmosReadManyFilteringConfig.parseCosmosReadManyFilterConfig(cfg)
11921208
val readManyByPkNullHandling = CosmosConfigEntry.parse(cfg, ReadManyByPkNullHandling)
11931209
val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None")
1194-
val readManyByPkMaxConcurrentBatchPrefetch = CosmosConfigEntry.parse(cfg, ReadManyByPkMaxConcurrentBatchPrefetch).getOrElse(1)
1210+
val readManyByPkMaxConcurrentBatchPrefetch = CosmosConfigEntry.parse(cfg, ReadManyByPkMaxConcurrentBatchPrefetch)
1211+
val readManyByPkMaxBatchSize = CosmosConfigEntry.parse(cfg, ReadManyByPkMaxBatchSize)
11951212

11961213
val effectiveReadConsistencyStrategy = if (readConsistencyStrategyOverride.getOrElse(ReadConsistencyStrategy.DEFAULT) != ReadConsistencyStrategy.DEFAULT) {
11971214
readConsistencyStrategyOverride.get
@@ -1225,7 +1242,8 @@ private object CosmosReadConfig {
12251242
readManyFilteringConfig,
12261243
responseContinuationTokenLimitInKb,
12271244
readManyByPkTreatNullAsNone,
1228-
readManyByPkMaxConcurrentBatchPrefetch)
1245+
readManyByPkMaxConcurrentBatchPrefetch,
1246+
readManyByPkMaxBatchSize)
12291247
}
12301248
}
12311249

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private[spark] case class ItemsPartitionReaderWithReadManyByPartitionKey
5151

5252
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
5353
ThroughputControlHelper.populateThroughputControlGroupName(readManyOptionsImpl, readConfig.throughputControlConfig)
54-
readManyOptions.setMaxConcurrentBatchPrefetch(readConfig.readManyByPkMaxConcurrentBatchPrefetch)
54+
readConfig.readManyByPkMaxConcurrentBatchPrefetch.foreach(readManyOptions.setMaxConcurrentBatchPrefetch)
55+
readConfig.readManyByPkMaxBatchSize.foreach(readManyOptions.setMaxBatchSize)
5556

5657
private val operationContext = {
5758
assert(taskContext != null)
@@ -226,7 +227,8 @@ private[spark] case class ItemsPartitionReaderWithReadManyByPartitionKey
226227
// fragile if the SDK ever stopped cloning options internally.
227228
private val fluxFactory: String => CosmosPagedFlux[SparkRowItem] = { (continuationToken: String) =>
228229
val perCallOptions = new CosmosReadManyByPartitionKeysRequestOptions()
229-
perCallOptions.setMaxConcurrentBatchPrefetch(readConfig.readManyByPkMaxConcurrentBatchPrefetch)
230+
readConfig.readManyByPkMaxConcurrentBatchPrefetch.foreach(perCallOptions.setMaxConcurrentBatchPrefetch)
231+
readConfig.readManyByPkMaxBatchSize.foreach(perCallOptions.setMaxBatchSize)
230232
perCallOptions.setContinuationToken(continuationToken)
231233
val perCallOptionsImpl = ImplementationBridgeHelpers
232234
.CosmosReadManyByPartitionKeysRequestOptionsHelper

sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,8 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
458458
config.readManyFilteringConfig.readManyFilteringEnabled shouldBe false
459459
config.readManyFilteringConfig.readManyFilterProperty shouldEqual "_itemIdentity"
460460
config.readManyByPkTreatNullAsNone shouldBe false
461+
config.readManyByPkMaxConcurrentBatchPrefetch shouldBe None
462+
config.readManyByPkMaxBatchSize shouldBe None
461463

462464
userConfig = Map(
463465
"spark.cosmos.read.forceEventualConsistency" -> "false",
@@ -672,6 +674,72 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
672674
config.readManyByPkTreatNullAsNone shouldBe true
673675
}
674676

677+
it should "parse readManyByPk maxBatchSize configuration" in {
678+
// Default (not specified) should be None - SDK applies its own default
679+
var userConfig = Map(
680+
"spark.cosmos.read.forceEventualConsistency" -> "false"
681+
)
682+
var config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
683+
config.readManyByPkMaxBatchSize shouldBe None
684+
685+
// Explicit value
686+
userConfig = Map(
687+
"spark.cosmos.read.forceEventualConsistency" -> "false",
688+
"spark.cosmos.read.readManyByPk.maxBatchSize" -> "50"
689+
)
690+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
691+
config.readManyByPkMaxBatchSize shouldBe Some(50)
692+
693+
// Value below 1 should be clamped to 1
694+
userConfig = Map(
695+
"spark.cosmos.read.forceEventualConsistency" -> "false",
696+
"spark.cosmos.read.readManyByPk.maxBatchSize" -> "0"
697+
)
698+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
699+
config.readManyByPkMaxBatchSize shouldBe Some(1)
700+
701+
// Large value should be accepted
702+
userConfig = Map(
703+
"spark.cosmos.read.forceEventualConsistency" -> "false",
704+
"spark.cosmos.read.readManyByPk.maxBatchSize" -> "500"
705+
)
706+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
707+
config.readManyByPkMaxBatchSize shouldBe Some(500)
708+
}
709+
710+
it should "parse readManyByPk maxConcurrentBatchPrefetch configuration" in {
711+
// Default (not specified) should be None - SDK applies its own default
712+
var userConfig = Map(
713+
"spark.cosmos.read.forceEventualConsistency" -> "false"
714+
)
715+
var config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
716+
config.readManyByPkMaxConcurrentBatchPrefetch shouldBe None
717+
718+
// Explicit value
719+
userConfig = Map(
720+
"spark.cosmos.read.forceEventualConsistency" -> "false",
721+
"spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch" -> "4"
722+
)
723+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
724+
config.readManyByPkMaxConcurrentBatchPrefetch shouldBe Some(4)
725+
726+
// Value above 64 should be clamped to 64
727+
userConfig = Map(
728+
"spark.cosmos.read.forceEventualConsistency" -> "false",
729+
"spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch" -> "100"
730+
)
731+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
732+
config.readManyByPkMaxConcurrentBatchPrefetch shouldBe Some(64)
733+
734+
// Value below 1 should be clamped to 1
735+
userConfig = Map(
736+
"spark.cosmos.read.forceEventualConsistency" -> "false",
737+
"spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch" -> "0"
738+
)
739+
config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
740+
config.readManyByPkMaxConcurrentBatchPrefetch shouldBe Some(1)
741+
}
742+
675743
it should "throw on invalid read configuration" in {
676744
val userConfig = Map(
677745
"spark.cosmos.read.schemaConversionMode" -> "not a valid value"

sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Features Added
66
* Added new `CosmosItemsDataSource.readManyByPartitionKeys` Spark function to execute bulk queries by a list of pk-values with better efficiency. Configure null handling via `spark.cosmos.read.readManyByPk.nullHandling` - default `Null` treats a null PK column as JSON null (`addNullValue`), `None` treats it as `PartitionKey.NONE` (`addNoneValue` / `NOT IS_DEFINED`). These route to different physical partitions - picking the wrong mode silently returns zero rows. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
77
* Added Spark config `spark.cosmos.read.readManyByPk.maxConcurrentBatchPrefetch` (default `1`) to bound the per-task prefetch parallelism the SDK uses inside `readManyByPartitionKeys`. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
8+
* Added Spark config `spark.cosmos.read.readManyByPk.maxBatchSize` (default `100`) to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
89

910
#### Breaking Changes
1011

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ReadManyByPartitionKeyTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,43 @@ public void singlePk_readManyByPartitionKey_withRequestOptionsAndMaxConcurrentBa
505505

506506
cleanupContainer(singlePkContainer);
507507
}
508+
509+
@Test(groups = {"emulator"}, timeOut = TIMEOUT)
510+
public void singlePk_readManyByPartitionKey_withRequestOptionsAndMaxBatchSize() {
511+
// Exercises the per-request maxBatchSize override (precedence over global default).
512+
// Use batch size of 1 so every PK ends up in its own batch — verifies results
513+
// are still correctly assembled from many small batches.
514+
List<ObjectNode> items = createSinglePkItems("batchSzPk1", 2);
515+
items.addAll(createSinglePkItems("batchSzPk2", 2));
516+
items.addAll(createSinglePkItems("batchSzPk3", 2));
517+
518+
List<PartitionKey> pkValues = Arrays.asList(
519+
new PartitionKey("batchSzPk1"),
520+
new PartitionKey("batchSzPk2"),
521+
new PartitionKey("batchSzPk3"));
522+
523+
com.azure.cosmos.models.CosmosReadManyByPartitionKeysRequestOptions options =
524+
new com.azure.cosmos.models.CosmosReadManyByPartitionKeysRequestOptions();
525+
options.setMaxBatchSize(1);
526+
527+
CosmosPagedIterable<ObjectNode> results = singlePkContainer.readManyByPartitionKeys(
528+
pkValues, options, ObjectNode.class);
529+
List<ObjectNode> resultList = results.stream().collect(Collectors.toList());
530+
531+
assertThat(resultList).hasSize(6);
532+
resultList.forEach(item -> {
533+
assertThat(item.get("mypk").asText()).isIn("batchSzPk1", "batchSzPk2", "batchSzPk3");
534+
});
535+
536+
cleanupContainer(singlePkContainer);
537+
}
538+
539+
@Test(groups = {"emulator"}, timeOut = TIMEOUT, expectedExceptions = IllegalArgumentException.class)
540+
public void singlePk_readManyByPartitionKey_setMaxBatchSizeZeroThrows() {
541+
com.azure.cosmos.models.CosmosReadManyByPartitionKeysRequestOptions options =
542+
new com.azure.cosmos.models.CosmosReadManyByPartitionKeysRequestOptions();
543+
options.setMaxBatchSize(0); // must throw IllegalArgumentException
544+
}
508545
//endregion
509546

510547
//region Continuation token tests

0 commit comments

Comments
 (0)