Skip to content

Commit bcaa6f4

Browse files
rls: only reset backoff on recovery from TRANSIENT_FAILURE (#9137)
Fixes #8693 This PR is a continuation and finalization of the stale/closed PR #8720. It addresses the remaining feedback from the maintainers and the code review tools to resolve control channel state monitoring issues in the RLS balancer. RELEASE NOTES: N/A --------- Co-authored-by: ulascansenturk <ulascansenturk@protonmail.com>
1 parent 429e6e0 commit bcaa6f4

2 files changed

Lines changed: 243 additions & 84 deletions

File tree

balancer/rls/balancer_test.go

Lines changed: 197 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
"google.golang.org/grpc/credentials/insecure"
4040
"google.golang.org/grpc/internal"
4141
"google.golang.org/grpc/internal/balancer/stub"
42+
"google.golang.org/grpc/internal/buffer"
43+
"google.golang.org/grpc/internal/grpcsync"
4244
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
4345
"google.golang.org/grpc/internal/testutils"
4446
rlstest "google.golang.org/grpc/internal/testutils/rls"
@@ -975,9 +977,46 @@ func (s) TestDataCachePurging(t *testing.T) {
975977
verifyRLSRequest(t, rlsReqCh, true)
976978
}
977979

980+
// wrappingConnectivityStateSubscriber wraps a grpcsync.Subscriber and forwards
981+
// connectivity state updates to both the delegate and a channel for testing.
982+
type wrappingConnectivityStateSubscriber struct {
983+
delegate grpcsync.Subscriber
984+
connStateCh *buffer.Unbounded
985+
}
986+
987+
func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) {
988+
w.delegate.OnMessage(msg)
989+
w.connStateCh.Put(msg.(connectivity.State))
990+
}
991+
992+
// waitForConnectivityState waits for one of the specified connectivity states
993+
// to appear on the channel, returning the state that matched first. It skips
994+
// intermediate states that do not match any of the wanted states, and fails
995+
// the test if the context expires before any of the desired states are reached.
996+
func waitForConnectivityState(ctx context.Context, t *testing.T, ch *buffer.Unbounded, wants ...connectivity.State) connectivity.State {
997+
t.Helper()
998+
for {
999+
select {
1000+
case gotState := <-ch.Get():
1001+
ch.Load()
1002+
got := gotState.(connectivity.State)
1003+
for _, want := range wants {
1004+
if got == want {
1005+
return got
1006+
}
1007+
}
1008+
case <-ctx.Done():
1009+
t.Fatalf("Timeout waiting for RLS control channel to become one of %v", wants)
1010+
}
1011+
}
1012+
}
1013+
9781014
// TestControlChannelConnectivityStateMonitoring tests the scenario where the
9791015
// control channel goes down and comes back up again and verifies that backoff
980-
// state is reset for cache entries in this scenario.
1016+
// state is reset for cache entries in this scenario. It also verifies that:
1017+
// - Backoff is NOT reset when the control channel first becomes READY (i.e.,
1018+
// the initial CONNECTING → READY transition should not trigger a backoff reset)
1019+
// - Backoff is reset for READY → TRANSIENT_FAILURE → READY transitions
9811020
func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
9821021
// Create a restartable listener which can close existing connections.
9831022
l, err := testutils.LocalTCPListener()
@@ -1004,6 +1043,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
10041043
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
10051044
defer func() { defaultBackoffStrategy = origBackoffStrategy }()
10061045

1046+
// Override the connectivity state subscriber to wrap the original and
1047+
// make connectivity state changes visible to the test.
1048+
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: buffer.NewUnbounded()}
1049+
origConnectivityStateSubscriber := newConnectivityStateSubscriber
1050+
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
1051+
wrappedSubscriber.delegate = delegate
1052+
return wrappedSubscriber
1053+
}
1054+
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()
1055+
10071056
// Register an LB policy to act as the child policy for RLS LB policy.
10081057
childPolicyName := "test-child-policy" + t.Name()
10091058
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
@@ -1038,16 +1087,39 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
10381087
// Make sure an RLS request is sent out.
10391088
verifyRLSRequest(t, rlsReqCh, true)
10401089

1090+
// Wait for the control channel to move to CONNECTING and then to READY.
1091+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Connecting)
1092+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)
1093+
1094+
// Verify that the initial READY state of the control channel did NOT trigger
1095+
// a backoff reset. The resetBackoffHook should only be called when
1096+
// transitioning from TRANSIENT_FAILURE to READY, not for the initial
1097+
// CONNECTING → READY transition.
1098+
select {
1099+
case <-resetBackoffDone:
1100+
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
1101+
case <-time.After(defaultTestShortTimeout):
1102+
}
1103+
10411104
// Stop the RLS server.
10421105
lis.Stop()
10431106

1044-
// Make another RPC similar to the first one. Since the above cache entry
1045-
// would have expired by now, this should trigger another RLS request. And
1046-
// since the RLS server is down, RLS request will fail and the cache entry
1047-
// will enter backoff, and we have overridden the default backoff strategy to
1048-
// return a value which will keep this entry in backoff for the whole duration
1049-
// of the test.
1050-
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
1107+
// When the server is stopped, the control channel connection is closed,
1108+
// causing it to transition to IDLE or TRANSIENT_FAILURE.
1109+
// If it transitions to IDLE, we need to send a new RLS request to force
1110+
// it to attempt reconnection, which will fail and move it to TRANSIENT_FAILURE.
1111+
state := waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Idle, connectivity.TransientFailure)
1112+
1113+
if state == connectivity.Idle {
1114+
// Make another RPC with different headers to bypass any cached backoff entries
1115+
// and force the control channel to attempt a connection, which will fail
1116+
// and move it to TRANSIENT_FAILURE.
1117+
ctxFailed := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
1118+
makeTestRPCAndVerifyError(ctxFailed, t, cc, codes.Unavailable, nil)
1119+
}
1120+
1121+
// Wait for the control channel to move to TRANSIENT_FAILURE.
1122+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.TransientFailure)
10511123

10521124
// Restart the RLS server.
10531125
lis.Restart()
@@ -1063,9 +1135,13 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
10631135
// till the control channel comes moves back to READY. So, override the
10641136
// backoff strategy to perform a small backoff on this entry.
10651137
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout}
1066-
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
1138+
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
10671139
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)
10681140

1141+
// Wait for the control channel to move back to READY.
1142+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)
1143+
1144+
// Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY.
10691145
select {
10701146
case <-ctx.Done():
10711147
t.Fatalf("Timed out waiting for resetBackoffDone")
@@ -1081,6 +1157,118 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
10811157
verifyRLSRequest(t, rlsReqCh, true)
10821158
}
10831159

1160+
// TestControlChannelIdleTransitionNoBackoffReset tests that READY → IDLE → READY
1161+
// transitions do not trigger backoff resets. This is a benign state change that
1162+
// should not affect cache entry backoff state.
1163+
func (s) TestControlChannelIdleTransitionNoBackoffReset(t *testing.T) {
1164+
// Create a restartable listener which can close existing connections.
1165+
l, err := testutils.LocalTCPListener()
1166+
if err != nil {
1167+
t.Fatalf("net.Listen() failed: %v", err)
1168+
}
1169+
lis := testutils.NewRestartableListener(l)
1170+
1171+
// Start an RLS server with the restartable listener and set the throttler to
1172+
// never throttle requests.
1173+
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
1174+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
1175+
1176+
// Override the reset backoff hook to get notified.
1177+
resetBackoffCalled := make(chan struct{}, 1)
1178+
origResetBackoffHook := resetBackoffHook
1179+
resetBackoffHook = func() { resetBackoffCalled <- struct{}{} }
1180+
defer func() { resetBackoffHook = origResetBackoffHook }()
1181+
1182+
// Override the backoff strategy to return a large backoff which
1183+
// will make sure the data cache entry remains in backoff for the
1184+
// duration of the test.
1185+
origBackoffStrategy := defaultBackoffStrategy
1186+
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
1187+
defer func() { defaultBackoffStrategy = origBackoffStrategy }()
1188+
1189+
// Override the connectivity state subscriber to wrap the original and
1190+
// make connectivity state changes visible to the test.
1191+
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: buffer.NewUnbounded()}
1192+
origConnectivityStateSubscriber := newConnectivityStateSubscriber
1193+
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
1194+
wrappedSubscriber.delegate = delegate
1195+
return wrappedSubscriber
1196+
}
1197+
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()
1198+
1199+
// Register an LB policy to act as the child policy for RLS LB policy.
1200+
childPolicyName := "test-child-policy" + t.Name()
1201+
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
1202+
t.Logf("Registered child policy with name %q", childPolicyName)
1203+
1204+
// Build RLS service config with header matchers, and a very low value for
1205+
// maxAge to ensure that cache entries become invalid very soon.
1206+
rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address)
1207+
rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
1208+
1209+
// Start a test backend, and set up the fake RLS server to return this as a
1210+
// target in the RLS response.
1211+
backendCh, backendAddress := startBackend(t)
1212+
rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
1213+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
1214+
})
1215+
1216+
// Register a manual resolver and push the RLS service config through it.
1217+
r := startManualResolverWithConfig(t, rlsConfig)
1218+
1219+
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
1220+
if err != nil {
1221+
t.Fatalf("Failed to create gRPC client: %v", err)
1222+
}
1223+
defer cc.Close()
1224+
1225+
// Make an RPC and ensure it gets routed to the test backend.
1226+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1227+
defer cancel()
1228+
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh)
1229+
1230+
// Make sure an RLS request is sent out.
1231+
verifyRLSRequest(t, rlsReqCh, true)
1232+
1233+
// Wait for the control channel to move to READY.
1234+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)
1235+
1236+
// Verify that the initial READY state did NOT trigger a backoff reset.
1237+
select {
1238+
case <-resetBackoffCalled:
1239+
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
1240+
case <-time.After(defaultTestShortTimeout):
1241+
}
1242+
1243+
// Stop the RLS server to force the control channel to go IDLE. Stop()
1244+
// closes all existing connections on the listener. Since there are no active
1245+
// RPCs on the control channel, the subchannel transitions to IDLE instead of
1246+
// TRANSIENT_FAILURE.
1247+
lis.Stop()
1248+
1249+
// Wait for the control channel to move to IDLE.
1250+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Idle)
1251+
1252+
// Restart the RLS server.
1253+
lis.Restart()
1254+
1255+
// Make another RPC to trigger reconnection. Use different headers to create
1256+
// a new cache entry.
1257+
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
1258+
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout}
1259+
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)
1260+
1261+
// Wait for the control channel to move back to READY.
1262+
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)
1263+
1264+
// Verify that the READY → IDLE → READY transition did NOT trigger a backoff reset.
1265+
select {
1266+
case <-resetBackoffCalled:
1267+
t.Fatal("Backoff reset was triggered for READY → IDLE → READY transition, want no reset")
1268+
case <-time.After(defaultTestShortTimeout):
1269+
}
1270+
}
1271+
10841272
// testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and
10851273
// stores all state updates pushed by the RLS LB policy.
10861274
type testCCWrapper struct {

0 commit comments

Comments
 (0)