Skip to content

Commit 319c113

Browse files
authored
Merge pull request #3 from shivaspeaks/client-watcher-changes-2
2 parents 587969d + 910b20b commit 319c113

18 files changed

Lines changed: 1210 additions & 725 deletions

cronet/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ dependencies {
5858

5959
task javadocs(type: Javadoc) {
6060
source = android.sourceSets.main.java.srcDirs
61-
classpath += files(android.getBootClasspath())
61+
// classpath += files(android.getBootClasspath())
6262
classpath += files({
6363
android.libraryVariants.collect { variant ->
6464
variant.javaCompileProvider.get().classpath

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,9 @@ private abstract class XdsWatcherBase<T extends ResourceUpdate>
632632

633633
@Nullable
634634
private StatusOr<T> data;
635+
@Nullable
636+
@SuppressWarnings("unused")
637+
private Status ambientError;
635638

636639

637640
private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
@@ -640,42 +643,39 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
640643
}
641644

642645
@Override
643-
public void onError(Status error) {
644-
checkNotNull(error, "error");
646+
public void onResourceChanged(StatusOr<T> update) {
645647
if (cancelled) {
646648
return;
647649
}
648-
// Don't update configuration on error, if we've already received configuration
649-
if (!hasDataValue()) {
650-
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
651-
String.format("Error retrieving %s: %s: %s",
652-
toContextString(), error.getCode(), error.getDescription())));
653-
maybePublishConfig();
654-
}
655-
}
650+
ambientError = null;
651+
if (update.hasValue()) {
652+
data = update;
653+
subscribeToChildren(update.getValue());
654+
} else {
655+
Status status = update.getStatus();
656+
Status translatedStatus = Status.UNAVAILABLE.withDescription(
657+
String.format("Error retrieving %s: %s. Details: %s%s",
658+
toContextString(),
659+
status.getCode(),
660+
status.getDescription() != null ? status.getDescription() : "",
661+
nodeInfo()));
656662

657-
@Override
658-
public void onResourceDoesNotExist(String resourceName) {
659-
if (cancelled) {
660-
return;
663+
data = StatusOr.fromStatus(translatedStatus);
661664
}
662-
663-
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
664-
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
665-
toContextString() + " does not exist" + nodeInfo()));
666665
maybePublishConfig();
667666
}
668667

669668
@Override
670-
public void onChanged(T update) {
671-
checkNotNull(update, "update");
669+
public void onAmbientError(Status error) {
672670
if (cancelled) {
673671
return;
674672
}
675-
676-
this.data = StatusOr.fromValue(update);
677-
subscribeToChildren(update);
678-
maybePublishConfig();
673+
ambientError = error.withDescription(
674+
String.format("Ambient error for %s: %s. Details: %s%s",
675+
toContextString(),
676+
error.getCode(),
677+
error.getDescription() != null ? error.getDescription() : "",
678+
nodeInfo()));
679679
}
680680

681681
protected abstract void subscribeToChildren(T update);

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 62 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.grpc.ServerServiceDefinition;
4343
import io.grpc.Status;
4444
import io.grpc.StatusException;
45+
import io.grpc.StatusOr;
4546
import io.grpc.SynchronizationContext;
4647
import io.grpc.SynchronizationContext.ScheduledHandle;
4748
import io.grpc.internal.GrpcUtil;
@@ -382,18 +383,30 @@ private DiscoveryState(String resourceName) {
382383
}
383384

384385
@Override
385-
public void onChanged(final LdsUpdate update) {
386+
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
386387
if (stopped) {
387388
return;
388389
}
389-
logger.log(Level.FINEST, "Received Lds update {0}", update);
390-
if (update.listener() == null) {
391-
onResourceDoesNotExist("Non-API");
390+
391+
if (!update.hasValue()) {
392+
Status status = update.getStatus();
393+
StatusException statusException = Status.UNAVAILABLE.withDescription(
394+
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
395+
.withCause(status.asException())
396+
.asException();
397+
handleConfigNotFoundOrMismatch(statusException);
392398
return;
393399
}
394400

395-
String ldsAddress = update.listener().address();
396-
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
401+
final LdsUpdate ldsUpdate = update.getValue();
402+
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
403+
if (ldsUpdate.listener() == null) {
404+
handleConfigNotFoundOrMismatch(
405+
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
406+
return;
407+
}
408+
String ldsAddress = ldsUpdate.listener().address();
409+
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
397410
|| !ipAddressesMatch(ldsAddress)) {
398411
handleConfigNotFoundOrMismatch(
399412
Status.UNKNOWN.withDescription(
@@ -402,16 +415,14 @@ public void onChanged(final LdsUpdate update) {
402415
listenerAddress, ldsAddress)).asException());
403416
return;
404417
}
418+
405419
if (!pendingRds.isEmpty()) {
406-
// filter chain state has not yet been applied to filterChainSelectorManager and there
407-
// are two sets of sslContextProviderSuppliers, so we release the old ones.
408420
releaseSuppliersInFlight();
409421
pendingRds.clear();
410422
}
411423

412-
filterChains = update.listener().filterChains();
413-
defaultFilterChain = update.listener().defaultFilterChain();
414-
// Filters are loaded even if the server isn't serving yet.
424+
filterChains = ldsUpdate.listener().filterChains();
425+
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
415426
updateActiveFilters();
416427

417428
List<FilterChain> allFilterChains = filterChains;
@@ -450,43 +461,33 @@ public void onChanged(final LdsUpdate update) {
450461
}
451462
}
452463

453-
private boolean ipAddressesMatch(String ldsAddress) {
454-
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
455-
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
456-
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
457-
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
458-
return false;
459-
}
460-
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
461-
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
462-
return listenerIp.equals(ldsIp);
463-
}
464-
465-
@Override
466-
public void onResourceDoesNotExist(final String resourceName) {
467-
if (stopped) {
468-
return;
469-
}
470-
StatusException statusException = Status.UNAVAILABLE.withDescription(
471-
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
472-
xdsClient.getBootstrapInfo().node().getId())).asException();
473-
handleConfigNotFoundOrMismatch(statusException);
474-
}
475-
476464
@Override
477-
public void onError(final Status error) {
465+
public void onAmbientError(final Status error) {
478466
if (stopped) {
479467
return;
480468
}
481469
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
482470
Status errorWithNodeId = error.withDescription(
483471
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
484472
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
473+
485474
if (!isServing) {
486475
listener.onNotServing(errorWithNodeId.asException());
487476
}
488477
}
489478

479+
private boolean ipAddressesMatch(String ldsAddress) {
480+
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
481+
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
482+
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
483+
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
484+
return false;
485+
}
486+
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
487+
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
488+
return listenerIp.equals(ldsIp);
489+
}
490+
490491
private void shutdown() {
491492
stopped = true;
492493
cleanUpRouteDiscoveryStates();
@@ -775,54 +776,42 @@ private RouteDiscoveryState(String resourceName) {
775776
}
776777

777778
@Override
778-
public void onChanged(final RdsUpdate update) {
779-
syncContext.execute(new Runnable() {
780-
@Override
781-
public void run() {
782-
if (!routeDiscoveryStates.containsKey(resourceName)) {
783-
return;
784-
}
785-
if (savedVirtualHosts == null && !isPending) {
786-
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
787-
}
788-
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
789-
updateRdsRoutingConfig();
790-
maybeUpdateSelector();
779+
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
780+
syncContext.execute(() -> {
781+
if (!routeDiscoveryStates.containsKey(resourceName)) {
782+
return; // Watcher has been cancelled.
791783
}
792-
});
793-
}
794784

795-
@Override
796-
public void onResourceDoesNotExist(final String resourceName) {
797-
syncContext.execute(new Runnable() {
798-
@Override
799-
public void run() {
800-
if (!routeDiscoveryStates.containsKey(resourceName)) {
801-
return;
785+
if (update.hasValue()) {
786+
if (savedVirtualHosts == null && !isPending) {
787+
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
802788
}
803-
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
789+
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
790+
} else {
791+
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
792+
new Object[]{resourceName, update.getStatus()});
804793
savedVirtualHosts = null;
805-
updateRdsRoutingConfig();
806-
maybeUpdateSelector();
807794
}
795+
// In both cases, a change has occurred that requires a config update.
796+
updateRdsRoutingConfig();
797+
maybeUpdateSelector();
808798
});
809799
}
810800

811801
@Override
812-
public void onError(final Status error) {
813-
syncContext.execute(new Runnable() {
814-
@Override
815-
public void run() {
816-
if (!routeDiscoveryStates.containsKey(resourceName)) {
817-
return;
818-
}
819-
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
820-
Status errorWithNodeId = error.withDescription(
821-
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
822-
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
823-
new Object[]{resourceName, errorWithNodeId});
824-
maybeUpdateSelector();
802+
public void onAmbientError(final Status error) {
803+
syncContext.execute(() -> {
804+
if (!routeDiscoveryStates.containsKey(resourceName)) {
805+
return; // Watcher has been cancelled.
825806
}
807+
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
808+
Status errorWithNodeId = error.withDescription(
809+
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
810+
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
811+
new Object[]{resourceName, errorWithNodeId});
812+
813+
// Per gRFC A88, ambient errors should not trigger a configuration change.
814+
// Therefore, we do NOT call maybeUpdateSelector() here.
826815
});
827816
}
828817

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,9 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
262262
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
263263
if (serverFeatures != null) {
264264
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
265-
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
265+
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
266+
ignoreResourceDeletion = true;
267+
}
266268
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
267269
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
268270
}

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -453,44 +453,29 @@ private void handleRpcStreamClosed(Status status) {
453453
stopwatch.reset();
454454
}
455455

456-
Status newStatus = status;
457-
if (responseReceived) {
458-
// A closed ADS stream after a successful response is not considered an error. Servers may
459-
// close streams for various reasons during normal operation, such as load balancing or
460-
// underlying connection hitting its max connection age limit (see gRFC A9).
461-
if (!status.isOk()) {
462-
newStatus = Status.OK;
463-
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
464-
+ "response was received, so this will not be treated as an error. Cause: {2}",
465-
status.getCode(), status.getDescription(), status.getCause());
466-
} else {
467-
logger.log(XdsLogLevel.DEBUG,
468-
"ADS stream closed by server after a response was received");
469-
}
470-
} else {
471-
// If the ADS stream is closed without ever having received a response from the server, then
472-
// the XdsClient should consider that a connectivity error (see gRFC A57).
456+
Status statusToPropagate = status;
457+
if (!responseReceived && status.isOk()) {
458+
// If the ADS stream is closed with OK without ever having received a response,
459+
// it is a connectivity error (see gRFC A57).
460+
statusToPropagate = Status.UNAVAILABLE.withDescription(
461+
"ADS stream closed with OK before receiving a response");
462+
}
463+
if (!statusToPropagate.isOk()) {
473464
inError = true;
474-
if (status.isOk()) {
475-
newStatus = Status.UNAVAILABLE.withDescription(
476-
"ADS stream closed with OK before receiving a response");
477-
}
478-
logger.log(
479-
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
480-
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
465+
logger.log(XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
466+
statusToPropagate.getCode(), statusToPropagate.getDescription(),
467+
statusToPropagate.getCause());
481468
}
482469

483-
close(newStatus.asException());
484-
485470
// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
486471
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
487472
// concurrently with the stopwatch and schedule.
488473
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
489474
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
475+
close(status.asException());
490476
rpcRetryTimer =
491477
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
492-
493-
xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
478+
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);
494479
}
495480

496481
private void close(Exception error) {

0 commit comments

Comments
 (0)