Skip to content

Commit 32fd222

Browse files
hzhaopclaude
andcommitted
fix(grpc): Improve reconnection logic and fix race condition
1. Restore original reconnection logic with TRANSIENT_FAILURE monitoring: - Keep original behavior: only force reconnect when different server selected - When same server selected, rely on gRPC auto-reconnect mechanism - Add TRANSIENT_FAILURE state monitoring to detect prolonged failures - Force rebuild channel if either reconnectCount or transientFailureCount exceeds threshold - Add keepAliveWithoutCalls(true) to detect half-open connections 2. Fix race condition between reportError() and run(): - Wrap all state changes (reconnect flag + notifications) in synchronized blocks - Prevents reconnect flag and listener status from becoming inconsistent - Fixes production issue where reconnect=false but listeners in DISCONNECT state 3. Additional improvements: - Adjust keepalive default from 60s to 120s (reduces overhead by 50%) - Add getState() method to GRPCChannel for state monitoring 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent c534c26 commit 32fd222

4 files changed

Lines changed: 115 additions & 23 deletions

File tree

apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public static class Collector {
216216
*
217217
* This maps to `collector.grpc_keepalive_time` in agent.config.
218218
*/
219-
public static long GRPC_KEEPALIVE_TIME = 60L;
219+
public static long GRPC_KEEPALIVE_TIME = 120L;
220220
/**
221221
* The timeout in seconds to wait for a keepalive ack from the backend.
222222
* If the ack is not received within this time, the connection is considered dead.

apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ private GRPCChannel(String host, int port, List<ChannelBuilder> channelBuilders,
4343

4444
if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) {
4545
channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS)
46-
.keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS);
46+
.keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS)
47+
.keepAliveWithoutCalls(true);
4748
}
4849

4950
NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
@@ -90,6 +91,10 @@ public boolean isConnected(boolean requestConnection) {
9091
return originChannel.getState(requestConnection) == ConnectivityState.READY;
9192
}
9293

94+
public ConnectivityState getState(boolean requestConnection) {
95+
return originChannel.getState(requestConnection);
96+
}
97+
9398
public static class Builder {
9499
private final String host;
95100
private final int port;

apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java

Lines changed: 107 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.skywalking.apm.agent.core.remote;
2020

2121
import io.grpc.Channel;
22+
import io.grpc.ConnectivityState;
2223
import io.grpc.Status;
2324
import io.grpc.StatusRuntimeException;
2425

@@ -57,6 +58,8 @@ public class GRPCChannelManager implements BootService, Runnable {
5758
private volatile List<String> grpcServers;
5859
private volatile int selectedIdx = -1;
5960
private volatile int reconnectCount = 0;
61+
private volatile int transientFailureCount = 0;
62+
private final Object statusLock = new Object();
6063

6164
@Override
6265
public void prepare() {
@@ -99,7 +102,15 @@ public void shutdown() {
99102

100103
@Override
101104
public void run() {
102-
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
105+
if (reconnect) {
106+
LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect);
107+
} else {
108+
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
109+
}
110+
111+
// Check channel state even when reconnect is false to detect prolonged failures
112+
checkChannelStateAndTriggerReconnectIfNeeded();
113+
103114
if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
104115
grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
105116
.filter(StringUtil::isNotBlank)
@@ -130,26 +141,34 @@ public void run() {
130141
String server = "";
131142
try {
132143
int index = Math.abs(random.nextInt()) % grpcServers.size();
133-
selectedIdx = index;
134144

135145
server = grpcServers.get(index);
136146
String[] ipAndPort = server.split(":");
137147

138-
LOGGER.debug("Attempting to reconnect to gRPC server {}. Shutting down existing channel if any.", server);
139-
if (managedChannel != null) {
140-
managedChannel.shutdownNow();
141-
}
148+
if (index != selectedIdx) {
149+
selectedIdx = index;
150+
LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server);
151+
createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
152+
} else {
153+
// Same server, increment reconnectCount and check state
154+
reconnectCount++;
142155

143-
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
144-
.addManagedChannelBuilder(new StandardChannelBuilder())
145-
.addManagedChannelBuilder(new TLSChannelBuilder())
146-
.addChannelDecorator(new AgentIDDecorator())
147-
.addChannelDecorator(new AuthenticationDecorator())
148-
.build();
149-
LOGGER.debug("Successfully reconnected to gRPC server {}.", server);
150-
reconnectCount = 0;
151-
reconnect = false;
152-
notify(GRPCChannelStatus.CONNECTED);
156+
// Force reconnect if reconnectCount or transientFailureCount exceeds threshold
157+
boolean forceReconnect = reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD
158+
|| transientFailureCount > Config.Agent.FORCE_RECONNECTION_PERIOD;
159+
160+
if (forceReconnect) {
161+
// Failed to reconnect after multiple attempts, force rebuild channel
162+
LOGGER.warn("Force rebuild channel to {} (reconnectCount={}, transientFailureCount={})",
163+
server, reconnectCount, transientFailureCount);
164+
createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
165+
} else if (managedChannel.isConnected(false)) {
166+
// Reconnect to the same server is automatically done by GRPC,
167+
// therefore we are responsible to check the connectivity and
168+
// set the state and notify listeners
169+
markAsConnected();
170+
}
171+
}
153172

154173
return;
155174
} catch (Throwable t) {
@@ -177,17 +196,85 @@ public Channel getChannel() {
177196
*/
178197
public void reportError(Throwable throwable) {
179198
if (isNetworkError(throwable)) {
199+
triggerReconnect();
200+
}
201+
}
202+
203+
private void notify(GRPCChannelStatus status) {
204+
synchronized (listeners) {
205+
for (GRPCChannelListener listener : listeners) {
206+
try {
207+
listener.statusChanged(status);
208+
} catch (Throwable t) {
209+
LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
210+
}
211+
}
212+
}
213+
}
214+
215+
/**
216+
* Create a new gRPC channel to the specified server and reset connection state.
217+
*/
218+
private void createNewChannel(String host, int port) throws Exception {
219+
if (managedChannel != null) {
220+
managedChannel.shutdownNow();
221+
}
222+
223+
managedChannel = GRPCChannel.newBuilder(host, port)
224+
.addManagedChannelBuilder(new StandardChannelBuilder())
225+
.addManagedChannelBuilder(new TLSChannelBuilder())
226+
.addChannelDecorator(new AgentIDDecorator())
227+
.addChannelDecorator(new AuthenticationDecorator())
228+
.build();
229+
230+
markAsConnected();
231+
}
232+
233+
/**
234+
* Trigger reconnection by setting reconnect flag and notifying listeners.
235+
*/
236+
private void triggerReconnect() {
237+
synchronized (statusLock) {
180238
reconnect = true;
181239
notify(GRPCChannelStatus.DISCONNECT);
182240
}
183241
}
184242

185-
private void notify(GRPCChannelStatus status) {
186-
for (GRPCChannelListener listener : listeners) {
243+
/**
244+
* Mark connection as successful and reset connection state.
245+
*/
246+
private void markAsConnected() {
247+
synchronized (statusLock) {
248+
reconnectCount = 0;
249+
reconnect = false;
250+
notify(GRPCChannelStatus.CONNECTED);
251+
}
252+
}
253+
254+
/**
255+
* Check the connectivity state of existing channel and trigger reconnect if needed.
256+
* This method monitors TRANSIENT_FAILURE state and triggers reconnect if the failure persists too long.
257+
*/
258+
private void checkChannelStateAndTriggerReconnectIfNeeded() {
259+
if (managedChannel != null) {
187260
try {
188-
listener.statusChanged(status);
261+
ConnectivityState state = managedChannel.getState(false);
262+
LOGGER.debug("Current channel state: {}", state);
263+
264+
if (state == ConnectivityState.TRANSIENT_FAILURE) {
265+
transientFailureCount++;
266+
LOGGER.warn("Channel in TRANSIENT_FAILURE state, count: {}", transientFailureCount);
267+
} else if (state == ConnectivityState.SHUTDOWN) {
268+
LOGGER.warn("Channel is SHUTDOWN");
269+
if (!reconnect) {
270+
triggerReconnect();
271+
}
272+
} else {
273+
// IDLE, READY, CONNECTING are all normal states
274+
transientFailureCount = 0;
275+
}
189276
} catch (Throwable t) {
190-
LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
277+
LOGGER.error(t, "Error checking channel state");
191278
}
192279
}
193280
}

apm-sniffer/config/agent.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
102102
collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30}
103103
# The interval in seconds to send a keepalive ping to the backend.
104104
# If this is less than or equal to 0, the keepalive is disabled.
105-
#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:60}
105+
#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120}
106106
# The timeout in seconds to wait for a keepalive ack from the backend.
107107
# If the ack is not received within this time, the connection is considered dead.
108108
#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30}

0 commit comments

Comments
 (0)