Skip to content

Commit 010be96

Browse files
authored
Reduce log noise for k8s NodeRoleWatcher (#19077)
1 parent 7750759 commit 010be96

2 files changed

Lines changed: 43 additions & 3 deletions

File tree

server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,6 @@ public void childAdded(DiscoveryDruidNode druidNode)
141141
return;
142142
}
143143

144-
LOGGER.info("Node [%s] of role [%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
145-
146144
addNode(druidNode);
147145
}
148146
}
@@ -152,6 +150,7 @@ private void addNode(DiscoveryDruidNode druidNode)
152150
{
153151
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
154152
if (prev == null) {
153+
LOGGER.info("Node [%s] of role [%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
155154
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
156155
if (cacheInitialized.getCount() == 0) {
157156
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
@@ -165,7 +164,7 @@ private void addNode(DiscoveryDruidNode druidNode)
165164
}
166165
}
167166
} else {
168-
LOGGER.error(
167+
LOGGER.debug(
169168
"Node [%s] of role [%s] discovered but existed already [%s].",
170169
druidNode.getDruidNode().getUriToUse(),
171170
nodeRole.getJsonName(),

server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,47 @@ public void testGetAllNodesBeforeTimeout() throws InterruptedException
203203
assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of());
204204
}
205205

206+
@Test(timeout = 60_000L)
207+
public void testDuplicateChildAddedAfterResetNodesDoesNotNotifyListeners()
208+
{
209+
BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER);
210+
211+
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
212+
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
213+
214+
// Initial discovery and cache initialization
215+
nodeRoleWatcher.childAdded(broker1);
216+
nodeRoleWatcher.childAdded(broker2);
217+
nodeRoleWatcher.cacheInitialized();
218+
219+
TestListener listener = new TestListener();
220+
nodeRoleWatcher.registerListener(listener);
221+
222+
// Verify listener received the initial nodes
223+
Assert.assertEquals(ImmutableList.of(broker1, broker2), listener.nodesAddedList);
224+
225+
// Simulate watch reconnect: resetNodes with the same set of nodes
226+
LinkedHashMap<String, DiscoveryDruidNode> resetMap = new LinkedHashMap<>();
227+
resetMap.put(broker1.getDruidNode().getHostAndPortToUse(), broker1);
228+
resetMap.put(broker2.getDruidNode().getHostAndPortToUse(), broker2);
229+
nodeRoleWatcher.resetNodes(resetMap);
230+
231+
// No new additions or removals since the node set is unchanged
232+
Assert.assertEquals(ImmutableList.of(broker1, broker2), listener.nodesAddedList);
233+
Assert.assertTrue(listener.nodesRemovedList.isEmpty());
234+
235+
// Simulate K8s watch replaying ADDED events for already-present nodes
236+
nodeRoleWatcher.childAdded(broker1);
237+
nodeRoleWatcher.childAdded(broker2);
238+
239+
// Listeners should NOT be notified again — the duplicate adds are no-ops
240+
Assert.assertEquals(ImmutableList.of(broker1, broker2), listener.nodesAddedList);
241+
Assert.assertTrue(listener.nodesRemovedList.isEmpty());
242+
243+
// The nodes map should still contain exactly the same two nodes
244+
Assert.assertEquals(ImmutableSet.of(broker1, broker2), new HashSet<>(nodeRoleWatcher.getAllNodes()));
245+
}
246+
206247
private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String host)
207248
{
208249
return new DiscoveryDruidNode(

0 commit comments

Comments
 (0)