Skip to content

Commit d0a6aba

Browse files
authored
[multistage] Reset GRPC connection backoff when server is re-enabled (#17466)
1 parent 0ea4d1f commit d0a6aba

5 files changed

Lines changed: 266 additions & 4 deletions

File tree

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,9 @@ public void start()
415415
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
416416
_accessControlFactory, _queryQuotaManager, _tableCache, _multiStageQueryThrottler, _failureDetector,
417417
_threadAccountant, multiClusterRoutingContext);
418+
MultiStageBrokerRequestHandler finalHandler = multiStageBrokerRequestHandler;
419+
_routingManager.setServerReenableCallback(
420+
serverInstance -> finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
418421
}
419422
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
420423
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,4 +855,8 @@ public static boolean isYellowError(QueryException e) {
855855
return false;
856856
}
857857
}
858+
859+
public QueryDispatcher getQueryDispatcher() {
860+
return _queryDispatcher;
861+
}
858862
}

pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.locks.ReadWriteLock;
3838
import java.util.concurrent.locks.ReentrantReadWriteLock;
39+
import java.util.function.Consumer;
3940
import javax.annotation.Nullable;
4041
import org.apache.commons.lang3.tuple.Pair;
4142
import org.apache.helix.AccessOption;
@@ -129,6 +130,8 @@ public abstract class BaseBrokerRoutingManager implements RoutingManager, Cluste
129130
private final PinotConfiguration _pinotConfig;
130131
private final boolean _enablePartitionMetadataManager;
131132
private final ExecutorService _executorService;
133+
@Nullable
134+
private Consumer<ServerInstance> _serverReenableCallback;
132135

133136
// Global read-write lock for protecting the global data structures such as _enabledServerInstanceMap,
134137
// _excludedServers, and _routableServers. Write lock must be held if any of these are modified, read lock must be
@@ -180,6 +183,14 @@ public void init(HelixManager helixManager) {
180183
_propertyStore = helixManager.getHelixPropertyStore();
181184
}
182185

186+
/**
187+
* Sets a callback to be invoked when a server is re-enabled after being excluded.
188+
* This is useful for resetting gRPC channel state to avoid exponential backoff delays.
189+
*/
190+
public void setServerReenableCallback(Consumer<ServerInstance> callback) {
191+
_serverReenableCallback = callback;
192+
}
193+
183194
private Object getRoutingTableBuildLock(String tableNameWithType) {
184195
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
185196
return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new Object());
@@ -387,6 +398,12 @@ private void processInstanceConfigChangeInternal() {
387398
// NOTE: Remove new enabled server from excluded servers because the server is likely being restarted
388399
if (_excludedServers.remove(instanceId)) {
389400
LOGGER.info("Got excluded server: {} re-enabled, including it into the routing", instanceId);
401+
// We clear any GRPC channel reconnection backoff in this callback when a server is re-enabled. Otherwise,
402+
// MSE queries to this server may fail fast until the next backoff retry succeeds.
403+
if (_serverReenableCallback != null) {
404+
LOGGER.info("Calling server re-enable callback for server: {}", instanceId);
405+
_serverReenableCallback.accept(serverInstance);
406+
}
390407
}
391408
}
392409
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.broker.routing.manager;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.function.Consumer;
24+
import org.apache.helix.AccessOption;
25+
import org.apache.helix.BaseDataAccessor;
26+
import org.apache.helix.HelixConstants.ChangeType;
27+
import org.apache.helix.HelixDataAccessor;
28+
import org.apache.helix.HelixManager;
29+
import org.apache.helix.PropertyKey;
30+
import org.apache.helix.model.InstanceConfig;
31+
import org.apache.helix.store.zk.ZkHelixPropertyStore;
32+
import org.apache.helix.zookeeper.datamodel.ZNRecord;
33+
import org.apache.pinot.common.metrics.BrokerMetrics;
34+
import org.apache.pinot.core.transport.ServerInstance;
35+
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
36+
import org.apache.pinot.spi.env.PinotConfiguration;
37+
import org.mockito.ArgumentCaptor;
38+
import org.mockito.Mock;
39+
import org.mockito.MockitoAnnotations;
40+
import org.testng.annotations.AfterMethod;
41+
import org.testng.annotations.BeforeMethod;
42+
import org.testng.annotations.Test;
43+
44+
import static org.mockito.ArgumentMatchers.any;
45+
import static org.mockito.ArgumentMatchers.anyInt;
46+
import static org.mockito.ArgumentMatchers.eq;
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.never;
49+
import static org.mockito.Mockito.verify;
50+
import static org.mockito.Mockito.when;
51+
import static org.testng.Assert.assertEquals;
52+
import static org.testng.Assert.assertTrue;
53+
54+
55+
public class BrokerRoutingManagerTest {
56+
private static final String SERVER_INSTANCE_ID = "Server_localhost_8000";
57+
private static final String SERVER_HOST = "localhost";
58+
private static final int SERVER_PORT = 8000;
59+
private static final String INSTANCE_CONFIGS_PATH = "/CONFIGS/PARTICIPANT";
60+
61+
private AutoCloseable _mocks;
62+
63+
@Mock
64+
private BrokerMetrics _brokerMetrics;
65+
66+
@Mock
67+
private ServerRoutingStatsManager _serverRoutingStatsManager;
68+
69+
@Mock
70+
private HelixManager _helixManager;
71+
72+
@Mock
73+
private HelixDataAccessor _helixDataAccessor;
74+
75+
@Mock
76+
private BaseDataAccessor<ZNRecord> _zkDataAccessor;
77+
78+
@Mock
79+
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
80+
81+
@Mock
82+
private PropertyKey.Builder _keyBuilder;
83+
84+
@Mock
85+
private PropertyKey _instanceConfigsKey;
86+
87+
@Mock
88+
private Consumer<ServerInstance> _serverReenableCallback;
89+
90+
private BrokerRoutingManager _routingManager;
91+
92+
@BeforeMethod
93+
public void setUp() {
94+
_mocks = MockitoAnnotations.openMocks(this);
95+
96+
// Set up Helix mocks
97+
when(_helixManager.getHelixDataAccessor()).thenReturn(_helixDataAccessor);
98+
when(_helixManager.getHelixPropertyStore()).thenReturn(_propertyStore);
99+
when(_helixDataAccessor.getBaseDataAccessor()).thenReturn(_zkDataAccessor);
100+
when(_helixDataAccessor.keyBuilder()).thenReturn(_keyBuilder);
101+
when(_keyBuilder.instanceConfigs()).thenReturn(_instanceConfigsKey);
102+
when(_keyBuilder.externalViews()).thenReturn(mock(PropertyKey.class));
103+
when(_keyBuilder.idealStates()).thenReturn(mock(PropertyKey.class));
104+
when(_instanceConfigsKey.getPath()).thenReturn(INSTANCE_CONFIGS_PATH);
105+
106+
// Mock paths for external views and ideal states
107+
PropertyKey evKey = mock(PropertyKey.class);
108+
PropertyKey isKey = mock(PropertyKey.class);
109+
when(_keyBuilder.externalViews()).thenReturn(evKey);
110+
when(_keyBuilder.idealStates()).thenReturn(isKey);
111+
when(evKey.getPath()).thenReturn("/EXTERNALVIEW");
112+
when(isKey.getPath()).thenReturn("/IDEALSTATES");
113+
114+
// Create routing manager
115+
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, new PinotConfiguration());
116+
_routingManager.init(_helixManager);
117+
}
118+
119+
@AfterMethod
120+
public void tearDown()
121+
throws Exception {
122+
_mocks.close();
123+
}
124+
125+
@Test
126+
public void testNoErrorWhenCallbackNotSet() {
127+
// Don't set callback
128+
129+
// Enable server
130+
List<ZNRecord> instanceConfigs = Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
131+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
132+
anyInt(), anyInt())).thenReturn(instanceConfigs);
133+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
134+
135+
// Exclude server
136+
_routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
137+
138+
// Disable then re-enable
139+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
140+
anyInt(), anyInt())).thenReturn(Collections.emptyList());
141+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
142+
143+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
144+
anyInt(), anyInt())).thenReturn(instanceConfigs);
145+
146+
// Should not throw NPE
147+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
148+
149+
// Server should be re-enabled in the map
150+
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
151+
}
152+
153+
@Test
154+
public void testServerReenableCallbackInvokedWhenExcludedServerReenabled() {
155+
// Set up callback
156+
_routingManager.setServerReenableCallback(_serverReenableCallback);
157+
158+
// First, enable the server by processing instance config change
159+
List<ZNRecord> instanceConfigs = Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
160+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
161+
anyInt(), anyInt())).thenReturn(instanceConfigs);
162+
163+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
164+
165+
// Verify server is now in enabled map
166+
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
167+
168+
// Exclude the server (simulating failure detector marking it unhealthy)
169+
_routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
170+
171+
// Now simulate server being disabled then re-enabled (e.g., restart)
172+
// First, disable
173+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
174+
anyInt(), anyInt())).thenReturn(Collections.emptyList());
175+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
176+
177+
// Then re-enable
178+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
179+
anyInt(), anyInt())).thenReturn(instanceConfigs);
180+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
181+
182+
// Verify callback was invoked with correct ServerInstance
183+
ArgumentCaptor<ServerInstance> captor = ArgumentCaptor.forClass(ServerInstance.class);
184+
verify(_serverReenableCallback).accept(captor.capture());
185+
186+
ServerInstance capturedInstance = captor.getValue();
187+
assertEquals(capturedInstance.getHostname(), SERVER_HOST);
188+
assertEquals(capturedInstance.getPort(), SERVER_PORT);
189+
}
190+
191+
@Test
192+
public void testServerReenableCallbackNotInvokedForNewServer() {
193+
// Set up callback
194+
_routingManager.setServerReenableCallback(_serverReenableCallback);
195+
196+
// Enable a new server (never excluded)
197+
List<ZNRecord> instanceConfigs = Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
198+
when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), eq(AccessOption.PERSISTENT),
199+
anyInt(), anyInt())).thenReturn(instanceConfigs);
200+
201+
_routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
202+
203+
// Verify callback was NOT invoked (server was never excluded)
204+
verify(_serverReenableCallback, never()).accept(any());
205+
}
206+
207+
/**
208+
* Creates a ZNRecord representing an enabled server instance.
209+
*/
210+
private ZNRecord createEnabledServerZNRecord(String instanceId) {
211+
ZNRecord record = new ZNRecord(instanceId);
212+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(), "true");
213+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_HOST.name(),
214+
instanceId.split("_")[1]); // Extract host from Server_host_port
215+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.name(),
216+
instanceId.split("_")[2]); // Extract port from Server_host_port
217+
// Don't set IS_SHUTDOWN_IN_PROGRESS or QUERIES_DISABLED (they default to false)
218+
return record;
219+
}
220+
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,8 @@ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeou
255255
public FailureDetector.ServerState checkConnectivityToInstance(ServerInstance serverInstance) {
256256
String hostname = serverInstance.getHostname();
257257
int port = serverInstance.getQueryServicePort();
258-
String hostnamePort = String.format("%s_%d", hostname, port);
259258

260-
DispatchClient client = _dispatchClientMap.get(hostnamePort);
259+
DispatchClient client = _dispatchClientMap.get(toHostnamePortKey(hostname, port));
261260
// Could occur if the cluster is only serving single-stage queries
262261
if (client == null) {
263262
LOGGER.debug("No DispatchClient found for server with instanceId: {}", serverInstance.getInstanceId());
@@ -510,8 +509,27 @@ private MultiStageQueryStats cancelWithStats(long requestId, @Nullable Set<Query
510509
private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) {
511510
String hostname = queryServerInstance.getHostname();
512511
int port = queryServerInstance.getQueryServicePort();
513-
String hostnamePort = String.format("%s_%d", hostname, port);
514-
return _dispatchClientMap.computeIfAbsent(hostnamePort, k -> new DispatchClient(hostname, port, _tlsConfig));
512+
return _dispatchClientMap.computeIfAbsent(toHostnamePortKey(hostname, port),
513+
k -> new DispatchClient(hostname, port, _tlsConfig));
514+
}
515+
516+
/**
517+
* Reset the connection backoff for a server. When the GRPC channel enters a TRANSIENT_FAILURE state from
518+
* connection failures, it will fast fail requests and reconnect with exponential backoff. This method
519+
* resets the backoff so servers that have recovered can be reconnected to immediately.
520+
*/
521+
public void resetClientConnectionBackoff(ServerInstance serverInstance) {
522+
String hostname = serverInstance.getHostname();
523+
int port = serverInstance.getQueryServicePort();
524+
DispatchClient dispatchClient = _dispatchClientMap.get(toHostnamePortKey(hostname, port));
525+
if (dispatchClient != null) {
526+
LOGGER.info("Resetting connection backoff for server: {}", serverInstance.getInstanceId());
527+
dispatchClient.getChannel().resetConnectBackoff();
528+
}
529+
}
530+
531+
private static String toHostnamePortKey(String hostname, int port) {
532+
return String.format("%s_%d", hostname, port);
515533
}
516534

517535
/// Concatenates the results of the sub-plan and returns a [QueryResult] with the concatenated result.

0 commit comments

Comments
 (0)