Conversation
…to users/fabianm/readManyByPK
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a new readManyByPartitionKey API surface to the Java Cosmos SDK (sync + async) and wires it through the Spark connector to support PK-only reads (including partial HPK), with query-plan-based validation for custom queries.
Changes:
- Added public
readManyByPartitionKeyoverloads inCosmosAsyncContainer/CosmosContainerand an internalAsyncDocumentClient+RxDocumentClientImplimplementation that groups PKs by physical partition and issues per-range queries. - Introduced
ReadManyByPartitionKeyQueryHelperto compose PK filters into user-provided SQL and added a new config knob for per-partition batching. - Added Spark support (UDF + PK serialization/parsing helper + reader) and unit/integration tests for query composition and end-to-end behavior.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/docs/readManyByPartitionKey-design.md | Design doc describing the new API, query validation, and Spark integration approach. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java | Adds a helper method to fetch query plans through the gateway for validation. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java | Implements readManyByPartitionKey execution, validation, PK→range grouping, batching, and concurrency. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ReadManyByPartitionKeyQueryHelper.java | New helper to build SqlQuerySpec by appending PK filters and extracting table aliases. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java | Adds config/env accessors for max PKs per per-partition query batch. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java | Adds internal interface method for PK-only read-many. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java | Adds sync readManyByPartitionKey overloads. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java | Adds async readManyByPartitionKey overloads and wiring to internal client. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ReadManyByPartitionKeyQueryHelperTest.java | Unit tests for SQL generation, alias extraction, and WHERE detection. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ReadManyByPartitionKeyTest.java | Emulator integration tests for single PK + HPK, partial HPK, projections, and query validation. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKeyITest.scala | Spark integration test for reading by PKs and empty result behavior. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/CosmosPartitionKeyHelperSpec.scala | Unit tests for PK string serialization/parsing helpers. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/udf/GetCosmosPartitionKeyValue.scala | Spark UDF to compute _partitionKeyIdentity values. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala | Spark partition reader that calls new SDK API and converts results to rows. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosReadManyByPartitionKeyReader.scala | Spark reader that maps input rows to PKs and streams results via the partition reader. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosPartitionKeyHelper.scala | Helper for PK serialization/parsing used by the UDF and data source. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosItemsDataSource.scala | Adds Spark entry point to read-many by partition key, including PK extraction logic. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala | Adds _partitionKeyIdentity constant. |
Comments suppressed due to low confidence (1)
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala:1
- The error message has mismatched parentheses/quoting (
classOf<SparkRowItem])) which makes it harder to read and search for. Suggest correcting it to a clean, unambiguous string (e.g.,classOf[SparkRowItem]) to improve diagnosability.
// Copyright (c) Microsoft Corporation. All rights reserved.
…s/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…s/spark/ItemsPartitionReaderWithReadManyByPartitionKey.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ntation/RxDocumentClientImpl.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ntation/ReadManyByPartitionKeyQueryHelper.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…to users/fabianm/readManyByPK
|
@sdkReviewAgent |
|
@sdkReviewAgent |
…/azure-sdk-for-java into users/fabianm/readManyByPK
…/azure-sdk-for-java into users/fabianm/readManyByPK
|
✅ Review complete (44:59) Posted 9 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…to users/fabianm/readManyByPK
|
@sdkReviewAgent |
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
|
||
| Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap = new HashMap<>(); | ||
| List<String> partitionKeySelectors = createPkSelectors(partitionKeyDefinition); | ||
| List<String> partitionKeySelectors = ReadManyByPartitionKeyQueryHelper.createPkSelectors(partitionKeyDefinition); |
There was a problem hiding this comment.
🟢 Observation: createPkSelectors refactoring changes behavior for nested PK paths in existing APIs
The old private createPkSelectors used StringUtils.substring(pathPart, 1) which treated /address/city as a single segment producing ["address/city"]. The new implementation uses PathParser.getPathParts which correctly splits into ["address"]["city"].
This is a bug fix for nested partition key paths (the old selector looked for a property literally named address/city rather than traversing nested objects), but it changes behavior for two existing code paths:
getRangeQueryMap→ used byreadMany(List<CosmosItemIdentity>)createLogicalPartitionScanQuerySpec→ used byreadAllItemsOfLogicalPartition
The fix is correct and the risk is low (nested PK paths are uncommon, and the old behavior was wrong), but it may be worth noting in the CHANGELOG since it changes existing readMany behavior.
| throw new IllegalArgumentException( | ||
| "Custom query for readMany by partition key must not contain LIMIT."); | ||
| } | ||
| if (queryInfo.hasNonStreamingOrderBy()) { |
There was a problem hiding this comment.
🟡 Remaining: SELECT TOP N queries not rejected by validation
OFFSET and LIMIT were added to the rejection list (fixing the earlier comment), but queryInfo.hasTop() is still missing. SELECT TOP 5 * FROM c would pass validation and the SDK would split it across N physical partitions × M batches, each independently limiting to 5 rows — returning up to 5 × N × M results instead of the expected 5.
hasTop() is available on QueryInfo (line 71 of QueryInfo.java) and is used elsewhere in the codebase (e.g., DocumentQueryExecutionContextFactory:294).
Suggested fix: Add before the hasNonStreamingOrderBy() check:
if (queryInfo.hasTop()) {
throw new IllegalArgumentException(
"Custom query for readMany by partition key must not contain TOP.");
}| .stream() | ||
| .map(PathParser::getPathParts) | ||
| .map(pathParts -> pathParts.stream() | ||
| .map(pathPart -> "[\"" + pathPart.replace("\"", "\\") + "\"]") |
There was a problem hiding this comment.
🔴 Bug: Incorrect quote escaping
.map(pathPart -> "[""" + pathPart.replace("""", "\\") + """]").replace("""", "\\") replaces a double-quote with a single backslash, not an escaped quote \"". The intent is to produce valid JSON-style selectors like [""myField""].
Impact: If a partition key path part contains a " character, the generated selector is malformed the query will error or silently match the wrong field.
Fix:
.map(pathPart -> "[""" + pathPart.replace("""", "\\""") + """]")(Note: the old createPkSelectors in RxDocumentClientImpl had the identical bug this is a pre-existing issue carried forward.)
| cosmosRowConverter.fromRowToInternalRow(getOrCreateIterator.next().row, rowSerializer) | ||
| } | ||
|
|
||
| def getCurrentRow(): Row = getOrCreateIterator.next().row |
There was a problem hiding this comment.
🟡 Design: get() and getCurrentRow() both advance the underlying iterator
override def get(): InternalRow = {
cosmosRowConverter.fromRowToInternalRow(getOrCreateIterator.next().row, rowSerializer)
}
def getCurrentRow(): Row = getOrCreateIterator.next().rowBoth methods call getOrCreateIterator.next(), which advances the iterator. Neither returns the current row both consume the next one.
Risk: If any code path calls both get() and getCurrentRow() for the same logical row, one item will be silently skipped. The naming getCurrentRow strongly implies a non-advancing read of the current position, but it actually advances. Even if current callers use them in separate contexts, the misleading API surface is a latent bug.
Suggestion: Consider caching the last next() result and having getCurrentRow() return the cached value without advancing, or renaming it to make the advancing behavior explicit (e.g., nextRow()).
Description
Adds a new readManyByPartitionKey API surface to the Java Cosmos SDK (sync + async) and wires it through the Spark connector to support PK-only reads (including partial HPK), with query-plan-based validation for custom queries.
Changes:
Added public readManyByPartitionKey overloads in CosmosAsyncContainer / CosmosContainer and an internal AsyncDocumentClient + RxDocumentClientImpl implementation that groups PKs by physical partition and issues per-range queries.
Introduced ReadManyByPartitionKeyQueryHelper to compose PK filters into user-provided SQL and added a new config knob for per-partition batching.
Added Spark support (UDF + PK serialization/parsing helper + reader) and unit/integration tests for query composition and end-to-end behavior.
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines