Conversation
…to users/fabianm/readManyByPK
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a new readManyByPartitionKey API surface to the Java Cosmos SDK (sync + async) and wires it through the Spark connector to support PK-only reads (including partial HPK), with query-plan-based validation for custom queries.
Changes:
- Added public
readManyByPartitionKeyoverloads inCosmosAsyncContainer/CosmosContainerand an internalAsyncDocumentClient+RxDocumentClientImplimplementation that groups PKs by physical partition and issues per-range queries. - Introduced
ReadManyByPartitionKeyQueryHelperto compose PK filters into user-provided SQL and added a new config knob for per-partition batching. - Added Spark support (UDF + PK serialization/parsing helper + reader) and unit/integration tests for query composition and end-to-end behavior.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/docs/readManyByPartitionKey-design.md | Design doc describing the new API, query validation, and Spark integration approach. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java | Adds a helper method to fetch query plans through the gateway for validation. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java | Implements readManyByPartitionKey execution, validation, PK→range grouping, batching, and concurrency. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ReadManyByPartitionKeyQueryHelper.java | New helper to build SqlQuerySpec by appending PK filters and extracting table aliases. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java | Adds config/env accessors for max PKs per per-partition query batch. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java | Adds internal interface method for PK-only read-many. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java | Adds sync readManyByPartitionKey overloads. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java | Adds async readManyByPartitionKey overloads and wiring to internal client. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ReadManyByPartitionKeyQueryHelperTest.java | Unit tests for SQL generation, alias extraction, and WHERE detection. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ReadManyByPartitionKeyTest.java | Emulator integration tests for single PK + HPK, partial HPK, projections, and query validation. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKeyITest.scala | Spark integration test for reading by PKs and empty result behavior. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosPartitionKeyHelperSpec.scala | Unit tests for PK string serialization/parsing helpers. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/udf/GetCosmosPartitionKeyValue.scala | Spark UDF to compute _partitionKeyIdentity values. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala | Spark partition reader that calls new SDK API and converts results to rows. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosReadManyByPartitionKeyReader.scala | Spark reader that maps input rows to PKs and streams results via the partition reader. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosPartitionKeyHelper.scala | Helper for PK serialization/parsing used by the UDF and data source. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosItemsDataSource.scala | Adds Spark entry point to read-many by partition key, including PK extraction logic. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala | Adds _partitionKeyIdentity constant. |
Comments suppressed due to low confidence (1)
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala:1
- The error message has mismatched parentheses/quoting (
classOf<SparkRowItem])) which makes it harder to read and search for. Suggest correcting it to a clean, unambiguous string (e.g.,classOf[SparkRowItem]) to improve diagnosability.
// Copyright (c) Microsoft Corporation. All rights reserved.
…s/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…s/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ntation/RxDocumentClientImpl.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ntation/ReadManyByPartitionKeyQueryHelper.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…to users/fabianm/readManyByPK
|
@sdkReviewAgent |
|
@sdkReviewAgent |
|
✅ Review complete (34:12) Posted 5 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…/azure-sdk-for-java into users/fabianm/readManyByPK
|
@sdkReviewAgent |
…/azure-sdk-for-java into users/fabianm/readManyByPK
…/azure-sdk-for-java into users/fabianm/readManyByPK
| partitionKeys, customQuery, collectionLink, state, diagnosticsFactory, klass), | ||
| staleResourceRetryPolicy | ||
| ) | ||
| .onErrorMap(throwable -> { |
There was a problem hiding this comment.
🟡 Recommendation — Observability: Missing success-path and cancel-path diagnostics merge
The public readManyByPartitionKey chains .onErrorMap(...) to merge diagnostics on the error path, but is missing both:
- A
.flatMap(or.doOnNext) to merge diagnostics on the success path - A
.doOnCancelto merge diagnostics when the subscriber cancels mid-stream
Compare with the sibling Flux-based API queryDocumentChangeFeedFromPagedFlux which chains:
.flatMap(result -> { diagnosticsFactory.merge(...); return Mono.just(result); })
.onErrorMap(throwable -> { diagnosticsFactory.merge(...); return throwable; })
.doOnCancel(() -> diagnosticsFactory.merge(...));Why this matters: If the subscriber cancels mid-stream (e.g., take(N), timeout, or Spark task kill), in-flight diagnostics won't be flushed to the CosmosDiagnosticsContext. This creates a monitoring blind spot — production debugging of cancelled/timed-out operations loses diagnostic data.
Suggested fix: Add matching .doOnNext and .doOnCancel operators after .onErrorMap.
| pkFilter.append(tableAlias); | ||
| pkFilter.append(partitionKeySelectors.get(j)); | ||
| pkFilter.append(")"); | ||
| } |
There was a problem hiding this comment.
🟡 Recommendation — Correctness: HPK individual addNoneValue() components generate = {} instead of NOT IS_DEFINED
parameters.add(new SqlParameter(pkParamName, pkComponents[j]));In the HPK branch, when a partition key has addNoneValue() at an individual component level (e.g., new PartitionKeyBuilder().add("Redmond").add("98053").addNoneValue().build()), pkComponents[j] at position j=2 is Undefined.value(). This Undefined object extends JsonSerializable, so SqlParameter.setValue() serializes it as {} (empty JSON object). The generated query becomes c["areaCode"] = @param where @param = {}.
The code only checks pkComponents == null for full PartitionKey.NONE (line 111) — it doesn't detect individual undefined components.
Caveat: The integration test hpk_readManyByPartitionKey_withNoneComponent exercises exactly this scenario and expects 3 results — so either the Cosmos service handles = {} specially for undefined fields, or there's custom serialization I'm not seeing. If the test passes against the emulator, this may be a non-issue. But relying on service-specific parameter interpretation for correctness is fragile.
Suggested fix (if confirmed): In the HPK loop, check each pkComponents[j] for instanceof Undefined. When true, generate NOT IS_DEFINED(alias + selector) instead of alias + selector + " = " + paramName.
| rawIterator.next() | ||
| skipped += 1 | ||
| } | ||
| if (skipped < pagesCommitted) { |
There was a problem hiding this comment.
🟡 Recommendation — Resilience: Page-skip retry assumes deterministic cross-partition page ordering
if (skipped < pagesCommitted) {The retry approach (replay flux from scratch + skip pagesCommitted pages) assumes that re-executing the same readManyByPartitionKey call returns pages in the same order. However, the SDK fans out queries to multiple physical partitions with bounded concurrency via Flux.merge. The order in which physical-partition results arrive can vary between invocations due to network timing.
Concrete failure scenario: First execution returns pages [P1-partA, P2-partB, P3-partA]. After emitting P1 and P2, a transient error occurs. On retry, the service returns [P1-partB, P2-partA, P3-partA]. The iterator skips 2 pages (P1-partB and P2-partA), then emits P3-partA. Result: P2-partA was never emitted (data loss) and P1-partA's content may differ from P1-partB's (wrong data).
Mitigation already in place: The currentPagePartiallyConsumed guard prevents mid-page retry, and the page-count mismatch check (this line) catches truncation. These reduce practical risk.
Suggested action: Document this as a known limitation in the class Javadoc. For stronger guarantees, consider tracking page identity (e.g., page hash or first-item ID) rather than ordinal position.
| val runtimeFilteringEnabled = CosmosConfigEntry.parse(cfg, ReadRuntimeFilteringEnabled) | ||
| val readManyFilteringConfig = CosmosReadManyFilteringConfig.parseCosmosReadManyFilterConfig(cfg) | ||
| val readManyByPkNullHandling = CosmosConfigEntry.parse(cfg, ReadManyByPkNullHandling) | ||
| val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None") |
There was a problem hiding this comment.
🟡 Recommendation — Config Safety: Invalid nullHandling values silently default to "Null"
val readManyByPkTreatNullAsNone = readManyByPkNullHandling.getOrElse("Null").equalsIgnoreCase("None")
Any value that isn't case-insensitively "None" is silently treated as "Null". A typo like "Non" or "Nul" would silently default to JSON-null semantics with no warning.
Why this matters: The CHANGELOG and help text explicitly warn that "picking the wrong mode for your data will silently return zero rows." A configuration typo is exactly the scenario where this warning applies — the user intended "None" but wrote "Non", gets "Null" semantics, queries the wrong physical partition, and gets zero results with no error.
Suggested fix: Validate that the value is exactly "Null" or "None" (case-insensitive) and throw IllegalArgumentException for any other value. This matches how other enum-like configs in the Spark connector are parsed (e.g., CosmosAuthType, Azure environment type).
| cosmosContainerConfig, | ||
| clientCacheItems(0).get, | ||
| clientCacheItems(1)) | ||
| // Warm-up readItem: intentionally issues a lookup for a random id/partition-key pair |
There was a problem hiding this comment.
🟢 Suggestion — Cleanup: Duplicate warm-up comment block
The warm-up readItem comment block is copy-pasted twice (lines 67–70 and 71–74):
// Warm-up readItem: intentionally issues a lookup for a random id/partition-key pair
// on the driver so that the collection/routing-map caches are populated before we serialize
// the client state and broadcast it to executors. ...
This appears to be an accidental duplication. The sibling CosmosReadManyReader has this comment only once. Remove the duplicate to keep the code clean.
| case s: String => builder.add(s) | ||
| case n: java.lang.Number => builder.add(n.doubleValue()) | ||
| case b: java.lang.Boolean => builder.add(b.booleanValue()) | ||
| case other => builder.add(other.toString) |
There was a problem hiding this comment.
🟢 Suggestion — Consistency: other.toString fallback silently creates wrong PK instead of throwing
case other => builder.add(other.toString)
In the null-handling builder path of tryParsePartitionKey, the other catch-all silently stringifies unknown types. This is inconsistent with CosmosItemsDataSource.addPartitionKeyComponent (line 289) which throws IllegalArgumentException for unsupported types.
While String, Number, and Boolean are all handled before other (so Jackson's deserialized primitives won't hit this path), the inconsistency means that an exotic type reaching this path via the UDF would silently produce a wrong partition key (string representation instead of the original type), querying the wrong physical partition and returning zero rows — with no error.
Suggested fix: Replace builder.add(other.toString) with throw new IllegalArgumentException(...) to match the column-based extraction path.
| | PK deduplication | Done at Spark layer only, not in the SDK | | ||
| | Spark UDF | New `GetCosmosPartitionKeyValue` UDF | | ||
| | Custom query validation | Gateway query plan; reject aggregates/ORDER BY/DISTINCT/GROUP BY/DCount/non-streaming ORDER BY/vector/fulltext | | ||
| | PK list size | No hard upper-bound enforced; SDK batches internally per physical partition (default 1000 PKs per batch, configurable via `COSMOS.READ_MANY_BY_PK_MAX_BATCH_SIZE`) | |
There was a problem hiding this comment.
🟡 Recommendation — Documentation: Design doc batch size default inconsistent with code
| PK list size | No hard upper-bound enforced; SDK batches internally per physical partition (default 1000 PKs per batch, configurable via `COSMOS.READ_MANY_BY_PK_MAX_BATCH_SIZE`) |
This says "default 1000" but Configs.java:254 defines DEFAULT_READ_MANY_BY_PK_MAX_BATCH_SIZE = 100. The same mismatch appears in RxDocumentClientImpl.java where the inline comment says "(default 1000)". Please update the design doc and all inline comments to match the actual code default, or change the code default if 1000 was intended.
|
✅ Review complete (44:59) Posted 9 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
Description
Adds a new readManyByPartitionKey API surface to the Java Cosmos SDK (sync + async) and wires it through the Spark connector to support PK-only reads (including partial HPK), with query-plan-based validation for custom queries.
Changes:
Added public readManyByPartitionKey overloads in CosmosAsyncContainer / CosmosContainer and an internal AsyncDocumentClient + RxDocumentClientImpl implementation that groups PKs by physical partition and issues per-range queries.
Introduced ReadManyByPartitionKeyQueryHelper to compose PK filters into user-provided SQL and added a new config knob for per-partition batching.
Added Spark support (UDF + PK serialization/parsing helper + reader) and unit/integration tests for query composition and end-to-end behavior.
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines