Skip to content

Commit 5128111

Browse files
tvaron3CopilotCopilot
authored
[Cosmos] Fix pkranges fetch: use container name for URL, RID for cache key (#4047)
## Summary Fixes [#4031](#4031) — the pkranges fetch was using the collection **RID** in a name-based URL hierarchy (`dbs/perfdb/colls/<RID>/pkranges`), which Cosmos DB rejects with 404 because mixed name/RID addressing is not supported. The previous fix (#4041) corrected URL encoding (`.item()` → `.item_by_rid()`) but did not fix the fundamental mixed-addressing issue. This PR resolves it by passing the container **name** for URL construction while keeping the **RID** as the cache key and request context value. ## Root Cause PR #4005 changed `container_connection.rs::send()` to pass `self.container_ref.rid()` to `pk_range_cache.try_lookup()`. The cache used this RID to build the pkranges URL: ``` dbs/perfdb/colls/pLLZAIuPigw=/pkranges ^^^^^^ ^^^^^^^^^^^^^ NAME RID ← mixed addressing → 404 ``` All other SDK and driver operations use name-based URLs. The pkranges fetch was the only code path using a RID in a name-based link hierarchy. ## Impact (observed on continuous benchmarks) - **1.8M 404 requests/hour** from failed pkranges fetches - Errors silently swallowed by `try_lookup` → `Ok(routing_map.ok())` - Errors not cached → retried on every request (write lock contention on AsyncCache) - Loss of client-side partition key routing → gateway must route all requests - Throughput regression from ~110M to ~4.2M requests/hour ## Changes ### `partition_key_range_cache.rs` - Added `collection_name: &str` parameter to `try_lookup`, `get_routing_map_for_collection`, `resolve_partition_key_range_by_id`, and `resolve_overlapping_ranges` - Changed `.item_by_rid(collection_rid)` → `.item(collection_name)` for pkranges URL construction - Cache key remains the RID (`collection_rid.to_string()`) - `resource_id` on the request remains the RID - Updated `tracing::warn!` to include both `collection_name` and `collection_rid` - Replaced 3 RID-encoding unit tests with 2 tests verifying name-based URL construction ### `container_connection.rs` - Extracted `collection_name` from `self.container_ref.name()` alongside existing `collection_rid` - Passes `collection_name` to all `pk_range_cache` method calls - `resolved_collection_rid` on request context still uses the RID (unchanged) ### `cosmos_fault_injection.rs` - Added `fault_injection_pkrange_readfeed_is_exercised` integration test - Injects a transient error on `MetadataPartitionKeyRanges` ReadFeed with hit_limit=1 - Verifies the fault rule is hit (proving pkrange fetch code path executes) - Verifies subsequent item operations succeed (proving end-to-end pkrange resolution works) ## Test Results - ✅ 31 unit tests pass (including 2 new) - ✅ Build succeeds - Integration test requires emulator (will run in CI) --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 2ad6e08 commit 5128111

6 files changed

Lines changed: 186 additions & 50 deletions

File tree

sdk/cosmos/.cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
"qself",
124124
"RAII",
125125
"reactivations",
126+
"readfeed",
126127
"Replicaset",
127128
"reqs",
128129
"Retriable",

sdk/cosmos/azure_data_cosmos/src/fault_injection/http_client.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,10 @@ impl FaultClient {
219219
impl HttpClient for FaultClient {
220220
async fn execute_request(&self, request: &Request) -> azure_core::Result<AsyncRawResponse> {
221221
// Find applicable rule and clone the result if needed
222-
let fault_result: Option<FaultInjectionResult> = {
222+
let (fault_result, matched_rule): (
223+
Option<FaultInjectionResult>,
224+
Option<Arc<FaultInjectionRule>>,
225+
) = {
223226
let rules = self.rules.lock().unwrap();
224227
let mut applicable_rule_index: Option<usize> = None;
225228

@@ -234,9 +237,9 @@ impl HttpClient for FaultClient {
234237
if let Some(index) = applicable_rule_index {
235238
let rule = &rules[index];
236239
rule.increment_hit_count();
237-
Some(rule.result.clone())
240+
(Some(rule.result.clone()), Some(Arc::clone(rule)))
238241
} else {
239-
None
242+
(None, None)
240243
}
241244
};
242245

@@ -257,7 +260,23 @@ impl HttpClient for FaultClient {
257260
.remove(constants::FAULT_INJECTION_OPERATION);
258261

259262
// No fault injection or delay-only fault, proceed with actual request
260-
self.inner.execute_request(&clean_request).await
263+
let result = self.inner.execute_request(&clean_request).await;
264+
265+
// Record response status only for true spy rules: no error_type,
266+
// no custom_response, and no delay. This excludes probability-skipped
267+
// faults and any rule that injected a delay.
268+
if let (Some(rule), Some(ref fr), Ok(ref response)) =
269+
(&matched_rule, &fault_result, &result)
270+
{
271+
if fr.error_type.is_none()
272+
&& fr.custom_response.is_none()
273+
&& fr.delay == Duration::ZERO
274+
{
275+
rule.record_passthrough_status(response.status());
276+
}
277+
}
278+
279+
result
261280
};
262281

263282
// Apply delay after the request is sent

sdk/cosmos/azure_data_cosmos/src/fault_injection/rule.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
//! Defines fault injection rules that combine conditions and results.
55
66
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
7+
use std::sync::Mutex;
78
use std::time::Instant;
89

10+
use azure_core::http::StatusCode;
11+
912
use super::condition::FaultInjectionCondition;
1013
use super::result::FaultInjectionResult;
1114

@@ -26,8 +29,10 @@ pub struct FaultInjectionRule {
2629
pub id: String,
2730
/// Whether the rule is currently enabled.
2831
enabled: AtomicBool,
29-
/// Number of times the rule has been matched and applied.
32+
/// Number of times the rule has been matched (including matches where no fault was injected).
3033
hit_count: AtomicU32,
34+
/// HTTP status codes of responses for matched requests that passed through without fault injection.
35+
passthrough_statuses: Mutex<Vec<StatusCode>>,
3136
}
3237

3338
/// Cloning snapshots the current `hit_count` and `enabled` state rather than
@@ -43,6 +48,7 @@ impl Clone for FaultInjectionRule {
4348
id: self.id.clone(),
4449
enabled: AtomicBool::new(self.enabled.load(Ordering::SeqCst)),
4550
hit_count: AtomicU32::new(self.hit_count.load(Ordering::SeqCst)),
51+
passthrough_statuses: Mutex::new(self.passthrough_statuses.lock().unwrap().clone()),
4652
}
4753
}
4854
}
@@ -81,6 +87,26 @@ impl FaultInjectionRule {
8187
pub fn reset_hit_count(&self) {
8288
self.hit_count.store(0, Ordering::SeqCst);
8389
}
90+
91+
/// Records the HTTP status code of a response for a matched request that
92+
/// passed through without fault injection (spy/passthrough mode).
93+
pub(super) fn record_passthrough_status(&self, status: StatusCode) {
94+
self.passthrough_statuses.lock().unwrap().push(status);
95+
}
96+
97+
/// Returns the HTTP status codes of responses for matched requests that
98+
/// passed through without fault injection.
99+
///
100+
/// When a rule matches a request but does not inject a fault (e.g., no
101+
/// `error_type` or `custom_response` is set), the real service response
102+
/// status is recorded here. This enables "spy" rules that observe requests
103+
/// without modifying them.
104+
///
105+
/// The history grows unbounded for the lifetime of the rule. This is
106+
/// designed for test scenarios with a bounded number of requests.
107+
pub fn passthrough_statuses(&self) -> Vec<StatusCode> {
108+
self.passthrough_statuses.lock().unwrap().clone()
109+
}
84110
}
85111

86112
/// Builder for creating a fault injection rule.
@@ -155,6 +181,7 @@ impl FaultInjectionRuleBuilder {
155181
id: self.id,
156182
enabled: AtomicBool::new(true),
157183
hit_count: AtomicU32::new(0),
184+
passthrough_statuses: Mutex::new(Vec::new()),
158185
}
159186
}
160187
}

sdk/cosmos/azure_data_cosmos/src/handler/container_connection.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ impl ContainerConnection {
5353
|| cosmos_request.resource_type == ResourceType::StoredProcedures)
5454
{
5555
let pk_def = self.container_ref.partition_key_definition();
56+
let collection_name = self.container_ref.name();
5657
let collection_rid = self.container_ref.rid();
5758

5859
if let Some(pk_range) = cosmos_request.partition_key_range_identity.as_ref() {
5960
if !pk_range.collection_rid.is_empty() {
6061
if let Some(resolved) = self
6162
.pk_range_cache
6263
.resolve_partition_key_range_by_id(
64+
collection_name,
6365
&pk_range.collection_rid,
6466
&pk_range.partition_key_range_id,
6567
false,
@@ -71,7 +73,10 @@ impl ContainerConnection {
7173
}
7274
}
7375
} else if let Some(partition_key) = cosmos_request.partition_key.as_ref() {
74-
let routing_map = self.pk_range_cache.try_lookup(collection_rid, None).await?;
76+
let routing_map = self
77+
.pk_range_cache
78+
.try_lookup(collection_name, collection_rid, None)
79+
.await?;
7580

7681
if let Some(routing_map) = routing_map {
7782
// Use a safe default version (2) when the service omits the version field,
@@ -94,7 +99,7 @@ impl ContainerConnection {
9499
// Refresh the routing map and retry.
95100
let refreshed_routing_map = self
96101
.pk_range_cache
97-
.try_lookup(collection_rid, Some(routing_map))
102+
.try_lookup(collection_name, collection_rid, Some(routing_map))
98103
.await?;
99104

100105
if let Some(refreshed_routing_map) = refreshed_routing_map {

sdk/cosmos/azure_data_cosmos/src/routing/partition_key_range_cache.rs

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,20 @@ impl PartitionKeyRangeCache {
5555

5656
pub async fn resolve_overlapping_ranges(
5757
&self,
58+
collection_name: &str,
5859
collection_rid: &str,
5960
range: Range<String>,
6061
force_refresh: bool,
6162
) -> Result<Option<Vec<PartitionKeyRange>>, Error> {
62-
let mut routing_map = self.try_lookup(collection_rid, None).await?;
63+
let mut routing_map = self
64+
.try_lookup(collection_name, collection_rid, None)
65+
.await?;
6366

6467
if force_refresh {
6568
if let Some(previous) = routing_map.clone() {
66-
routing_map = self.try_lookup(collection_rid, Some(previous)).await?;
69+
routing_map = self
70+
.try_lookup(collection_name, collection_rid, Some(previous))
71+
.await?;
6772
}
6873
}
6974

@@ -83,16 +88,20 @@ impl PartitionKeyRangeCache {
8388

8489
pub async fn resolve_partition_key_range_by_id(
8590
&self,
86-
collection_resource_id: &str,
91+
collection_name: &str,
92+
collection_rid: &str,
8793
partition_key_range_id: &str,
8894
force_refresh: bool,
8995
) -> Option<PartitionKeyRange> {
90-
let mut routing_map = self.try_lookup(collection_resource_id, None).await.ok()?;
96+
let mut routing_map = self
97+
.try_lookup(collection_name, collection_rid, None)
98+
.await
99+
.ok()?;
91100

92101
if force_refresh {
93102
if let Some(previous) = routing_map.clone() {
94103
routing_map = self
95-
.try_lookup(collection_resource_id, Some(previous))
104+
.try_lookup(collection_name, collection_rid, Some(previous))
96105
.await
97106
.ok()?;
98107
}
@@ -103,7 +112,7 @@ impl PartitionKeyRangeCache {
103112
None => {
104113
info!(
105114
"Routing Map Null for collection: {}, PartitionKeyRangeId: {}, forceRefresh: {}",
106-
collection_resource_id,
115+
collection_rid,
107116
partition_key_range_id,
108117
force_refresh
109118
);
@@ -114,6 +123,7 @@ impl PartitionKeyRangeCache {
114123

115124
pub async fn try_lookup(
116125
&self,
126+
collection_name: &str,
117127
collection_rid: &str,
118128
previous_value: Option<CollectionRoutingMap>,
119129
) -> Result<Option<CollectionRoutingMap>, Error> {
@@ -128,15 +138,19 @@ impl PartitionKeyRangeCache {
128138
},
129139
|| async {
130140
let routing_map = self
131-
.get_routing_map_for_collection(collection_rid, previous_value.clone())
141+
.get_routing_map_for_collection(
142+
collection_name,
143+
collection_rid,
144+
previous_value.clone(),
145+
)
132146
.await?;
133147
match routing_map {
134148
Some(map) => Ok(map),
135149
None => Err(Error::new(
136150
azure_core::error::ErrorKind::Other,
137151
format!(
138-
"Failed to get routing map for collection: {}",
139-
collection_rid
152+
"Failed to get routing map for collection: {} (rid: {})",
153+
collection_name, collection_rid
140154
),
141155
)),
142156
}
@@ -146,6 +160,7 @@ impl PartitionKeyRangeCache {
146160

147161
if let Err(ref e) = routing_map {
148162
tracing::warn!(
163+
collection_name,
149164
collection_rid,
150165
error = %e,
151166
"Failed to fetch routing map for collection"
@@ -168,6 +183,7 @@ impl PartitionKeyRangeCache {
168183

169184
async fn get_routing_map_for_collection(
170185
&self,
186+
collection_name: &str,
171187
collection_rid: &str,
172188
previous_routing_map: Option<CollectionRoutingMap>,
173189
) -> Result<Option<CollectionRoutingMap>, Error> {
@@ -196,7 +212,7 @@ impl PartitionKeyRangeCache {
196212
let pk_range_link = self
197213
.database_link
198214
.feed(ResourceType::Containers)
199-
.item_by_rid(collection_rid)
215+
.item(collection_name)
200216
.feed(ResourceType::PartitionKeyRanges);
201217
let response = self
202218
.execute_partition_key_range_read_change_feed(
@@ -681,60 +697,51 @@ mod tests {
681697
assert_eq!(range.target_throughput.unwrap(), 1000.0);
682698
}
683699

684-
// Tests verifying that the pkranges resource link uses item_by_rid() so that
685-
// collection RIDs (which are base64-encoded and can contain '=', '+', '/') are
686-
// not URL-percent-encoded. Using item() would encode '=' to '%3D', causing 404s.
687-
688700
#[test]
689-
fn pkranges_link_rid_with_equals_is_not_encoded() {
690-
// RIDs like "pLLZAIuPigw=" contain '=' which item() would encode to '%3D'.
691-
// item_by_rid() must preserve it as-is.
692-
let collection_rid = "pLLZAIuPigw=";
701+
fn pkranges_link_uses_container_name_not_rid() {
702+
// The pkranges URL must use the container NAME (not RID) to match
703+
// the name-based database_link. Mixed name/RID addressing causes 404s.
693704
let database_link = ResourceLink::root(ResourceType::Databases).item("perfdb");
694705
let pk_range_link = database_link
695706
.feed(ResourceType::Containers)
696-
.item_by_rid(collection_rid)
707+
.item("perfcontainer")
697708
.feed(ResourceType::PartitionKeyRanges);
698709

699-
// Correct: '=' preserved, not encoded to '%3D'
700710
assert_eq!(
701-
"dbs/perfdb/colls/pLLZAIuPigw=/pkranges",
711+
"dbs/perfdb/colls/perfcontainer/pkranges",
702712
pk_range_link.path()
703713
);
704714
}
705715

706716
#[test]
707-
fn pkranges_link_item_encodes_equals_incorrectly() {
708-
// Demonstrates the bug: item() URL-encodes '=' to '%3D', producing a path
709-
// that Cosmos DB cannot find (404).
710-
let collection_rid = "pLLZAIuPigw=";
717+
fn pkranges_link_with_rid_causes_mixed_addressing() {
718+
// Demonstrates why using RID in a name-based link is wrong:
719+
// the resulting URL mixes name-based database with RID-based container,
720+
// which Cosmos DB rejects with 404.
711721
let database_link = ResourceLink::root(ResourceType::Databases).item("perfdb");
712-
let pk_range_link_wrong = database_link
722+
723+
// Using item_by_rid with a RID in a name-based hierarchy produces
724+
// a mixed-addressing URL that Cosmos DB cannot resolve.
725+
let pk_range_link_mixed = database_link
713726
.feed(ResourceType::Containers)
714-
.item(collection_rid)
727+
.item_by_rid("pLLZAIuPigw=")
715728
.feed(ResourceType::PartitionKeyRanges);
716729

717-
// Wrong: '=' is encoded to '%3D', causing 404 from Cosmos DB
718-
assert!(
719-
pk_range_link_wrong.path().contains("%3D"),
720-
"item() should URL-encode '=' to '%3D'"
721-
);
730+
// This path uses name for DB but RID for collection — Cosmos DB returns 404
722731
assert_eq!(
723-
"dbs/perfdb/colls/pLLZAIuPigw%3D/pkranges",
724-
pk_range_link_wrong.path()
732+
"dbs/perfdb/colls/pLLZAIuPigw=/pkranges",
733+
pk_range_link_mixed.path()
725734
);
726-
}
727735

728-
#[test]
729-
fn pkranges_link_rid_with_plus_is_not_encoded() {
730-
// RIDs may also contain '+' (base64 char). item_by_rid() must preserve it.
731-
let collection_rid = "AB+CD/EF==";
732-
let database_link = ResourceLink::root(ResourceType::Databases).item("mydb");
733-
let pk_range_link = database_link
736+
// Using item() with a name in a name-based hierarchy produces a valid URL.
737+
let pk_range_link_correct = database_link
734738
.feed(ResourceType::Containers)
735-
.item_by_rid(collection_rid)
739+
.item("perfcontainer")
736740
.feed(ResourceType::PartitionKeyRanges);
737741

738-
assert_eq!("dbs/mydb/colls/AB+CD/EF==/pkranges", pk_range_link.path());
742+
assert_eq!(
743+
"dbs/perfdb/colls/perfcontainer/pkranges",
744+
pk_range_link_correct.path()
745+
);
739746
}
740747
}

0 commit comments

Comments
 (0)