Skip to content

Commit ce6266c

Browse files
committed
1. xds: get the authority map from the XdsClient along with other data as in the existing flow
2. xds: modifies existing tests to account for authority
1 parent 4fb3626 commit ce6266c

4 files changed

Lines changed: 107 additions & 35 deletions

File tree

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.grpc.xds;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20-
import com.google.common.collect.ImmutableMap;
2120
import com.google.common.util.concurrent.ListenableFuture;
2221
import io.grpc.LongCounterMetricInstrument;
2322
import io.grpc.LongGaugeMetricInstrument;
@@ -30,11 +29,12 @@
3029
import io.grpc.xds.client.XdsClient.ResourceMetadata;
3130
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
3231
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
33-
import io.grpc.xds.client.XdsClientImpl;
3432
import io.grpc.xds.client.XdsClientMetricReporter;
3533
import io.grpc.xds.client.XdsResourceType;
36-
37-
import java.util.*;
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.Map;
3838
import java.util.concurrent.ExecutionException;
3939
import java.util.concurrent.Future;
4040
import java.util.concurrent.TimeUnit;
@@ -143,7 +143,13 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
143143
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
144144
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);
145145

146-
computeAndReportResourceCounts(xdsClient, metadataByType, callback);
146+
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
147+
getResourceAuthorityCompleted = xdsClient.getSubscribedResourcesAuthoritySnapshot();
148+
149+
Map<XdsResourceType<?>, Map<String, String>> authorityByType =
150+
getResourceAuthorityCompleted.get(10, TimeUnit.SECONDS);
151+
152+
computeAndReportResourceCounts(metadataByType, authorityByType, callback);
147153

148154
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
149155
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
@@ -155,30 +161,29 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
155161
}
156162
}
157163

158-
private void computeAndReportResourceCounts(XdsClient xdsClient,
159-
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
160-
MetricReporterCallback callback) {
164+
private void computeAndReportResourceCounts(
165+
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
166+
Map<XdsResourceType<?>, Map<String, String>> authorityByType,
167+
MetricReporterCallback callback) {
161168
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
162169
metadataByType.entrySet()) {
163170
XdsResourceType<?> type = metadataByTypeEntry.getKey();
164171

165172
Map<String, Long> resourceCountsByState = new HashMap<>();
166-
List<String> authorities = new ArrayList<>();
167-
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
173+
Map<String, String> authorityByState = new HashMap<>();
174+
for (Map.Entry<String, ResourceMetadata> metadataByName :
175+
metadataByTypeEntry.getValue().entrySet()) {
176+
String resourceName = metadataByName.getKey();
177+
ResourceMetadata metadata = metadataByName.getValue();
168178
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
169179
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
170-
}
171-
for (String resourceName : metadataByTypeEntry.getValue().keySet()) {
172-
authorities.add(xdsClient.getAuthority(type, resourceName));
180+
authorityByState.put(cacheState, authorityByType.get(type).get(resourceName));
173181
}
174182

175-
Iterator<String> authorityIterator = authorities.iterator();
176183
resourceCountsByState.forEach((cacheState, count) -> {
177-
if (authorityIterator.hasNext()) {
178-
String authority = authorityIterator.next();
179-
callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl());
180-
}
181-
});
184+
callback.reportResourceCountGauge(authorityByState.get(cacheState),
185+
count, cacheState, type.typeUrl());
186+
});
182187
}
183188
}
184189

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,20 @@ public Object getSecurityConfig() {
318318
getSubscribedResourcesMetadataSnapshot() {
319319
throw new UnsupportedOperationException();
320320
}
321-
public String getAuthority(XdsResourceType<?> resourceType, String resourceName) {
321+
322+
/**
323+
* Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
324+
* they are at the moment of the call.
325+
*
326+
* <p>The snapshot is a map from the "resource type" to
327+
* a map ("resource name": "authority").
328+
*/
329+
// Must be synchronized.
330+
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
331+
getSubscribedResourcesAuthoritySnapshot() {
322332
throw new UnsupportedOperationException();
323333
}
334+
324335
/**
325336
* Registers a data watcher for the given Xds resource.
326337
*/

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,13 +242,31 @@ public void run() {
242242
return future;
243243
}
244244

245+
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
246+
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
245247
@Override
246-
public String getAuthority(XdsResourceType<?> resourceType, String resourceName) {
247-
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry = resourceSubscribers.get(resourceType);
248-
if (resourceEntry != null) {
249-
return resourceEntry.get(resourceName).authority;
250-
}
251-
return null;
248+
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
249+
getSubscribedResourcesAuthoritySnapshot() {
250+
final SettableFuture<Map<XdsResourceType<?>, Map<String, String>>> future =
251+
SettableFuture.create();
252+
syncContext.execute(new Runnable() {
253+
@Override
254+
public void run() {
255+
// A map from a "resource type" to a map ("resource name": "authority")
256+
ImmutableMap.Builder<XdsResourceType<?>, Map<String, String>> authoritySnapshot =
257+
ImmutableMap.builder();
258+
for (XdsResourceType<?> resourceType : resourceSubscribers.keySet()) {
259+
ImmutableMap.Builder<String, String> authorityMap = ImmutableMap.builder();
260+
for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
261+
: resourceSubscribers.get(resourceType).entrySet()) {
262+
authorityMap.put(resourceEntry.getKey(), resourceEntry.getValue().authority);
263+
}
264+
authoritySnapshot.put(resourceType, authorityMap.buildOrThrow());
265+
}
266+
future.set(authoritySnapshot.buildOrThrow());
267+
}
268+
});
269+
return future;
252270
}
253271

254272
@Override

xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
public class XdsClientMetricReporterImplTest {
7777

7878
private static final String target = "test-target";
79+
private static final String authority = "test-authority";
7980
private static final String server = "trafficdirector.googleapis.com";
8081
private static final String resourceTypeUrl =
8182
"resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster";
@@ -101,7 +102,6 @@ public void setUp() {
101102

102103
@Test
103104
public void reportResourceUpdates() {
104-
// TODO(dnvindhya): add the "authority" label once available.
105105
reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl);
106106
verify(mockMetricRecorder).addLongCounter(
107107
eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10),
@@ -129,6 +129,8 @@ public void setXdsClient_reportMetrics() throws Exception {
129129
future.set(null);
130130
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
131131
ImmutableMap.of()));
132+
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
133+
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
132134
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
133135
.thenReturn(future);
134136
reporter.setXdsClient(mockXdsClient);
@@ -150,6 +152,8 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() {
150152
future.set(null);
151153
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
152154
ImmutableMap.of()));
155+
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
156+
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
153157

154158
// Create a future that will throw an exception
155159
SettableFuture<Void> serverConnectionsFeature = SettableFuture.create();
@@ -177,6 +181,8 @@ public void metricGauges() {
177181
future.set(null);
178182
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
179183
ImmutableMap.of()));
184+
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
185+
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
180186
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
181187
.thenReturn(future);
182188
reporter.setXdsClient(mockXdsClient);
@@ -222,16 +228,17 @@ public void metricReporterCallback() {
222228
eq(Lists.newArrayList()));
223229

224230
String cacheState = "requested";
225-
callback.reportResourceCountGauge("BuzzLightyear", 10, cacheState, resourceTypeUrl);
231+
callback.reportResourceCountGauge(authority, 10, cacheState, resourceTypeUrl);
226232
verify(mockBatchRecorder, times(1)).recordLongGauge(
227233
eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L),
228-
eq(Arrays.asList(target, cacheState, resourceTypeUrl)),
234+
eq(Arrays.asList(target, authority, cacheState, resourceTypeUrl)),
229235
eq(Collections.emptyList()));
230236
}
231237

232238
@Test
233239
public void reportCallbackMetrics_computeAndReportResourceCounts() {
234240
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
241+
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
235242
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
236243
XdsResourceType<?> routeConfigResource = XdsRouteConfigureResource.getInstance();
237244
XdsResourceType<?> clusterResource = XdsClusterResource.getInstance();
@@ -241,31 +248,44 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
241248
long nanosLastUpdate = 1577923199_606042047L;
242249

243250
Map<String, ResourceMetadata> ldsResourceMetadataMap = new HashMap<>();
251+
Map<String, String> ldsAuthorityMap = new HashMap<>();
244252
ldsResourceMetadataMap.put("resource1",
245253
ResourceMetadata.newResourceMetadataRequested());
254+
ldsAuthorityMap.put("resource1", "authority1");
246255
ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42",
247256
nanosLastUpdate);
248257
ldsResourceMetadataMap.put("resource2", ackedLdsResource);
258+
ldsAuthorityMap.put("resource2", "authority2");
249259
ldsResourceMetadataMap.put("resource3",
250260
ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate));
261+
ldsAuthorityMap.put("resource3", "authority3");
251262
ldsResourceMetadataMap.put("resource4",
252263
ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate,
253264
"nacked after previous ack", true));
265+
ldsAuthorityMap.put("resource4", "authority4");
254266

255267
Map<String, ResourceMetadata> rdsResourceMetadataMap = new HashMap<>();
268+
Map<String, String> rdsAuthorityMap = new HashMap<>();
256269
ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested();
257270
rdsResourceMetadataMap.put("resource5",
258271
ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24",
259272
nanosLastUpdate, "nacked after request", false));
273+
rdsAuthorityMap.put("resource5", "authority5");
260274
rdsResourceMetadataMap.put("resource6",
261275
ResourceMetadata.newResourceMetadataDoesNotExist());
276+
rdsAuthorityMap.put("resource6", "authority6");
262277

263278
Map<String, ResourceMetadata> cdsResourceMetadataMap = new HashMap<>();
279+
Map<String, String> cdsAuthorityMap = new HashMap<>();
264280
cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown());
281+
cdsAuthorityMap.put("resource7", "authority7");
265282

266283
metadataByType.put(listenerResource, ldsResourceMetadataMap);
284+
authorityByType.put(listenerResource, ldsAuthorityMap);
267285
metadataByType.put(routeConfigResource, rdsResourceMetadataMap);
286+
authorityByType.put(routeConfigResource, rdsAuthorityMap);
268287
metadataByType.put(clusterResource, cdsResourceMetadataMap);
288+
authorityByType.put(clusterResource, cdsAuthorityMap);
269289

270290
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
271291
reportServerConnectionsCompleted.set(null);
@@ -277,36 +297,49 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
277297
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
278298
.thenReturn(getResourceMetadataCompleted);
279299

300+
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
301+
getResourceAuthorityCompleted = Futures.immediateFuture(authorityByType);
302+
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
303+
.thenReturn(getResourceAuthorityCompleted);
304+
280305
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
281306

282307
// LDS resource requested
283308
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
284-
eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any());
309+
eq(1L), eq(Arrays.asList(target, "authority1",
310+
"requested", listenerResource.typeUrl())), any());
285311
// LDS resources acked
286312
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
287-
eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any());
313+
eq(2L), eq(Arrays.asList(target, "authority3",
314+
"acked", listenerResource.typeUrl())), any());
288315
// LDS resource nacked but cached
289316
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
290-
eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any());
317+
eq(1L), eq(Arrays.asList(target, "authority4",
318+
"nacked_but_cached", listenerResource.typeUrl())), any());
291319

292320
// RDS resource nacked
293321
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
294-
eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any());
322+
eq(1L), eq(Arrays.asList(target, "authority5",
323+
"nacked", routeConfigResource.typeUrl())), any());
295324
// RDS resource does not exist
296325
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
297-
eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any());
326+
eq(1L), eq(Arrays.asList(target, "authority6",
327+
"does_not_exist", routeConfigResource.typeUrl())), any());
298328

299329
// CDS resource unknown
300330
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
301-
eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any());
331+
eq(1L), eq(Arrays.asList(target, "authority7",
332+
"unknown", clusterResource.typeUrl())), any());
302333
verifyNoMoreInteractions(mockBatchRecorder);
303334
}
304335

305336
@Test
306337
public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() {
307338
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
339+
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
308340
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
309341
metadataByType.put(listenerResource, Collections.emptyMap());
342+
authorityByType.put(listenerResource, Collections.emptyMap());
310343

311344
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
312345
reportServerConnectionsCompleted.set(null);
@@ -318,6 +351,11 @@ public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources(
318351
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
319352
.thenReturn(getResourceMetadataCompleted);
320353

354+
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
355+
getAuthorityCompleted = Futures.immediateFuture(authorityByType);
356+
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
357+
.thenReturn(getAuthorityCompleted);
358+
321359
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
322360

323361
// Verify that reportResourceCountGauge is never called

0 commit comments

Comments
 (0)