Skip to content

Commit 00fa7d3

Browse files
xinlian12CopilotCopilot
authored
fix: [SparkConnector] Skip readContainerThroughput when targetThroughput is configured (#48800)
* fix: skip readContainerThroughput when targetThroughput is configured and customer is using AAD --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5663943 commit 00fa7d3

7 files changed

Lines changed: 156 additions & 0 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `readContainerThroughput` was always called even when `targetThroughput` is explicitly configured, requiring unnecessary `throughputSettings/read` permission for AAD principals. - See [PR 48800](https://github.com/Azure/azure-sdk-for-java/pull/48800)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `readContainerThroughput` was always called even when `targetThroughput` is explicitly configured, requiring unnecessary `throughputSettings/read` permission for AAD principals. - See [PR 48800](https://github.com/Azure/azure-sdk-for-java/pull/48800)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `readContainerThroughput` was always called even when `targetThroughput` is explicitly configured, requiring unnecessary `throughputSettings/read` permission for AAD principals. - See [PR 48800](https://github.com/Azure/azure-sdk-for-java/pull/48800)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `readContainerThroughput` was always called even when `targetThroughput` is explicitly configured, requiring unnecessary `throughputSettings/read` permission for AAD principals. - See [PR 48800](https://github.com/Azure/azure-sdk-for-java/pull/48800)
1011

1112
#### Other Changes
1213

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.throughputControl.sdk.controller;
5+
6+
import com.azure.cosmos.ConnectionMode;
7+
import com.azure.cosmos.CosmosAsyncContainer;
8+
import com.azure.cosmos.CosmosAsyncDatabase;
9+
import com.azure.cosmos.implementation.caches.RxCollectionCache;
10+
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
11+
import com.azure.cosmos.implementation.throughputControl.sdk.config.LocalThroughputControlGroup;
12+
import com.azure.cosmos.implementation.throughputControl.sdk.config.SDKThroughputControlGroupInternal;
13+
import com.azure.cosmos.implementation.throughputControl.sdk.controller.container.SDKThroughputContainerController;
14+
import com.azure.cosmos.models.CosmosContainerProperties;
15+
import com.azure.cosmos.models.CosmosContainerResponse;
16+
import com.azure.cosmos.models.PriorityLevel;
17+
import org.mockito.Mockito;
18+
import org.testng.annotations.Test;
19+
import reactor.core.publisher.Mono;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.UUID;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
26+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
27+
28+
public class SDKThroughputContainerControllerTests {
29+
30+
@Test(groups = "unit")
31+
public void throughputQueryMonoNotSubscribedWhenOnlyTargetThroughputConfigured() {
32+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
33+
CosmosAsyncDatabase databaseMock = Mockito.mock(CosmosAsyncDatabase.class, Mockito.RETURNS_DEEP_STUBS);
34+
Mockito.doReturn("FakeCollection").when(containerMock).getId();
35+
Mockito.doReturn(databaseMock).when(containerMock).getDatabase();
36+
Mockito.doReturn("FakeDatabase").when(databaseMock).getId();
37+
38+
// Mock container.read() for resolveContainerResourceId()
39+
CosmosContainerResponse containerResponseMock = Mockito.mock(CosmosContainerResponse.class);
40+
CosmosContainerProperties containerPropertiesMock = Mockito.mock(CosmosContainerProperties.class);
41+
Mockito.when(containerResponseMock.getProperties()).thenReturn(containerPropertiesMock);
42+
Mockito.when(containerPropertiesMock.getResourceId()).thenReturn("fakeContainerRid");
43+
Mockito.when(containerMock.read()).thenReturn(Mono.just(containerResponseMock));
44+
45+
// Group with targetThroughput only (no threshold)
46+
LocalThroughputControlGroup group = new LocalThroughputControlGroup(
47+
"test-" + UUID.randomUUID(),
48+
containerMock,
49+
6, // targetThroughput
50+
null, // targetThroughputThreshold — NOT set
51+
PriorityLevel.HIGH,
52+
true,
53+
false);
54+
55+
Map<String, SDKThroughputControlGroupInternal> groups = new HashMap<>();
56+
groups.put(group.getGroupName(), group);
57+
58+
AtomicBoolean throughputQuerySubscribed = new AtomicBoolean(false);
59+
Mono<Integer> trackingThroughputQueryMono = Mono.<Integer>just(10000)
60+
.doOnSubscribe(s -> throughputQuerySubscribed.set(true));
61+
62+
RxCollectionCache collectionCacheMock = Mockito.mock(RxCollectionCache.class);
63+
RxPartitionKeyRangeCache pkRangeCacheMock = Mockito.mock(RxPartitionKeyRangeCache.class);
64+
65+
SDKThroughputContainerController controller = new SDKThroughputContainerController(
66+
collectionCacheMock,
67+
ConnectionMode.DIRECT,
68+
groups,
69+
pkRangeCacheMock,
70+
null,
71+
trackingThroughputQueryMono);
72+
73+
// Call init() which drives the full pipeline:
74+
// resolveContainerResourceId -> resolveContainerMaxThroughput -> createAndInitializeGroupControllers
75+
// This exercises both getThroughputResolveLevel() (scope calculation) and
76+
// the resolveContainerMaxThroughput() guard. The last step may fail due to
77+
// insufficient mocking, but resolveContainerMaxThroughput has already completed by then.
78+
try {
79+
controller.<Object>init().block();
80+
} catch (Exception ignored) {
81+
// createAndInitializeGroupControllers may fail — acceptable for this test
82+
}
83+
84+
assertThat(throughputQuerySubscribed.get())
85+
.as("throughputQueryMono should not be subscribed when only targetThroughput is configured")
86+
.isFalse();
87+
}
88+
89+
@Test(groups = "unit")
90+
public void throughputQueryMonoSubscribedWhenThresholdConfigured() {
91+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
92+
CosmosAsyncDatabase databaseMock = Mockito.mock(CosmosAsyncDatabase.class, Mockito.RETURNS_DEEP_STUBS);
93+
Mockito.doReturn("FakeCollection").when(containerMock).getId();
94+
Mockito.doReturn(databaseMock).when(containerMock).getDatabase();
95+
Mockito.doReturn("FakeDatabase").when(databaseMock).getId();
96+
97+
// Mock container.read() for resolveContainerResourceId()
98+
CosmosContainerResponse containerResponseMock = Mockito.mock(CosmosContainerResponse.class);
99+
CosmosContainerProperties containerPropertiesMock = Mockito.mock(CosmosContainerProperties.class);
100+
Mockito.when(containerResponseMock.getProperties()).thenReturn(containerPropertiesMock);
101+
Mockito.when(containerPropertiesMock.getResourceId()).thenReturn("fakeContainerRid");
102+
Mockito.when(containerMock.read()).thenReturn(Mono.just(containerResponseMock));
103+
104+
// Group with targetThroughputThreshold set
105+
LocalThroughputControlGroup group = new LocalThroughputControlGroup(
106+
"test-" + UUID.randomUUID(),
107+
containerMock,
108+
null, // targetThroughput
109+
0.5, // targetThroughputThreshold — IS set
110+
PriorityLevel.HIGH,
111+
true,
112+
false);
113+
114+
Map<String, SDKThroughputControlGroupInternal> groups = new HashMap<>();
115+
groups.put(group.getGroupName(), group);
116+
117+
AtomicBoolean throughputQuerySubscribed = new AtomicBoolean(false);
118+
Mono<Integer> trackingThroughputQueryMono = Mono.<Integer>just(10000)
119+
.doOnSubscribe(s -> throughputQuerySubscribed.set(true));
120+
121+
RxCollectionCache collectionCacheMock = Mockito.mock(RxCollectionCache.class);
122+
RxPartitionKeyRangeCache pkRangeCacheMock = Mockito.mock(RxPartitionKeyRangeCache.class);
123+
124+
SDKThroughputContainerController controller = new SDKThroughputContainerController(
125+
collectionCacheMock,
126+
ConnectionMode.DIRECT,
127+
groups,
128+
pkRangeCacheMock,
129+
null,
130+
trackingThroughputQueryMono);
131+
132+
// Call init() — when targetThroughputThreshold is configured,
133+
// resolveContainerMaxThroughput should subscribe to the throughput query mono.
134+
try {
135+
controller.<Object>init().block();
136+
} catch (Exception ignored) {
137+
// createAndInitializeGroupControllers may fail — acceptable for this test
138+
}
139+
140+
assertThat(throughputQuerySubscribed.get())
141+
.as("throughputQueryMono should be subscribed when targetThroughputThreshold is configured")
142+
.isTrue();
143+
}
144+
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where the throughput control `throughputQueryMono` was always subscribed even when `targetThroughput` is used (not `targetThroughputThreshold`), causing unnecessary `throughputSettings/read` permission requirement for AAD principals. - See [PR 48800](https://github.com/Azure/azure-sdk-for-java/pull/48800)
1011
* Fixed an issue where change feed with `startFrom` point-in-time returned `400` on merged partitions by enabling the `CHANGE_FEED_WITH_START_TIME_POST_MERGE` SDK capability.
1112
* Fixed JVM `<clinit>` deadlock when multiple threads concurrently trigger Cosmos SDK class loading for the first time. - See [PR 48689](https://github.com/Azure/azure-sdk-for-java/pull/48689)
1213

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/sdk/controller/container/SDKThroughputContainerController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ private Mono<ThroughputResponse> resolveContainerThroughput() {
183183
}
184184

185185
private Mono<SDKThroughputContainerController> resolveContainerMaxThroughput() {
186+
// When no group uses targetThroughputThreshold, there is no need to resolve the container's
187+
// max throughput. Skip the query to avoid requiring throughputSettings/read permission
188+
// (which AAD principals may not have).
189+
if (this.throughputProvisioningScope == ThroughputProvisioningScope.NONE) {
190+
return Mono.just(this);
191+
}
192+
186193
return this.throughputQueryMono
187194
.flatMap(maxThroughput -> {
188195
this.maxContainerThroughput.set(maxThroughput);

0 commit comments

Comments
 (0)