Skip to content

Commit 77a8670

Browse files
committed
Add monitor to find length of queue at Routers
1 parent 019fc35 commit 77a8670

4 files changed

Lines changed: 164 additions & 0 deletions

File tree

docs/operations/metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
4646
|Metric|Description|Dimensions|Normal value|
4747
|------|-----------|----------|------------|
4848
|`query/time`|Milliseconds taken to complete a query.|Native Query: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.|< 1s|
49+
|`router/http/numRequestsQueued`|Total number of outbound HTTP requests currently queued across all destinations on the Router's HTTP client.|None.|0; sustained high values indicate backpressure toward downstream Brokers.|
4950

5051
### Broker
5152

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.metrics;
21+
22+
import com.google.inject.Inject;
23+
import com.google.inject.Provider;
24+
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
25+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
26+
import org.apache.druid.java.util.metrics.AbstractMonitor;
27+
import org.apache.druid.server.router.Router;
28+
import org.eclipse.jetty.client.HttpClient;
29+
import org.eclipse.jetty.client.HttpDestination;
30+
import org.eclipse.jetty.client.api.Destination;
31+
32+
/**
33+
* Monitor that emits the total number of outbound HTTP requests currently queued
34+
* across all destinations on the Router's Jetty HttpClient.
35+
*/
36+
public class RouterHttpClientMonitor extends AbstractMonitor
37+
{
38+
private final Provider<HttpClient> httpClientProvider;
39+
40+
@Inject
41+
public RouterHttpClientMonitor(@Router Provider<HttpClient> httpClientProvider)
42+
{
43+
this.httpClientProvider = httpClientProvider;
44+
}
45+
46+
@Override
47+
public boolean doMonitor(ServiceEmitter emitter)
48+
{
49+
final HttpClient httpClient = httpClientProvider.get();
50+
51+
int totalQueuedRequests = 0;
52+
for (Destination destination : httpClient.getDestinations()) {
53+
if (destination instanceof HttpDestination) {
54+
totalQueuedRequests += ((HttpDestination) destination).getQueuedRequestCount();
55+
}
56+
}
57+
58+
emitter.emit(new ServiceMetricEvent.Builder().setMetric("router/http/numRequestsQueued", totalQueuedRequests));
59+
60+
return true;
61+
}
62+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.metrics;
21+
22+
import com.google.inject.Provider;
23+
import org.apache.druid.java.util.metrics.StubServiceEmitter;
24+
import org.eclipse.jetty.client.HttpClient;
25+
import org.eclipse.jetty.client.HttpDestination;
26+
import org.eclipse.jetty.client.api.Destination;
27+
import org.junit.Assert;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.Mockito;
31+
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
public class RouterHttpClientMonitorTest
37+
{
38+
private HttpClient httpClient;
39+
private RouterHttpClientMonitor monitor;
40+
41+
@Before
42+
public void setUp()
43+
{
44+
httpClient = Mockito.mock(HttpClient.class);
45+
Provider<HttpClient> httpClientProvider = () -> httpClient;
46+
monitor = new RouterHttpClientMonitor(httpClientProvider);
47+
}
48+
49+
@Test
50+
public void testDoMonitorReturnsTrue()
51+
{
52+
Mockito.when(httpClient.getDestinations()).thenReturn(Collections.emptyList());
53+
Assert.assertTrue(monitor.doMonitor(new StubServiceEmitter("router", "localhost")));
54+
}
55+
56+
@Test
57+
public void testNoDestinationsEmitsZero()
58+
{
59+
Mockito.when(httpClient.getDestinations()).thenReturn(Collections.emptyList());
60+
final StubServiceEmitter emitter = new StubServiceEmitter("router", "localhost");
61+
monitor.doMonitor(emitter);
62+
63+
Assert.assertEquals(1, emitter.getEvents().size());
64+
Map<String, Object> event = emitter.getEvents().get(0).toMap();
65+
Assert.assertEquals("router/http/numRequestsQueued", event.get("metric"));
66+
Assert.assertEquals(0, event.get("value"));
67+
}
68+
69+
@Test
70+
public void testMultipleDestinationsQueueCountSummed()
71+
{
72+
Mockito.when(httpClient.getDestinations()).thenReturn(List.of(mockDest(3), mockDest(5)));
73+
final StubServiceEmitter emitter = new StubServiceEmitter("router", "localhost");
74+
monitor.doMonitor(emitter);
75+
76+
Assert.assertEquals(1, emitter.getEvents().size());
77+
Assert.assertEquals(8, emitter.getEvents().get(0).toMap().get("value"));
78+
}
79+
80+
@Test
81+
public void testNonHttpDestinationIsSkipped()
82+
{
83+
Destination plainDest = Mockito.mock(Destination.class);
84+
Mockito.when(httpClient.getDestinations()).thenReturn(List.of(plainDest, mockDest(4)));
85+
final StubServiceEmitter emitter = new StubServiceEmitter("router", "localhost");
86+
monitor.doMonitor(emitter);
87+
88+
Assert.assertEquals(1, emitter.getEvents().size());
89+
Assert.assertEquals(4, emitter.getEvents().get(0).toMap().get("value"));
90+
}
91+
92+
private static HttpDestination mockDest(int queuedCount)
93+
{
94+
HttpDestination dest = Mockito.mock(HttpDestination.class);
95+
Mockito.when(dest.getQueuedRequestCount()).thenReturn(queuedCount);
96+
return dest;
97+
}
98+
}

services/src/main/java/org/apache/druid/cli/CliRouter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import org.apache.druid.server.http.RouterResource;
4747
import org.apache.druid.server.http.SelfDiscoveryResource;
4848
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
49+
import org.apache.druid.server.metrics.MetricsModule;
4950
import org.apache.druid.server.metrics.QueryCountStatsProvider;
51+
import org.apache.druid.server.metrics.RouterHttpClientMonitor;
5052
import org.apache.druid.server.router.AvaticaConnectionBalancer;
5153
import org.apache.druid.server.router.CoordinatorRuleManager;
5254
import org.apache.druid.server.router.ManagementProxyConfig;
@@ -114,6 +116,7 @@ protected List<? extends Module> getModules()
114116
.in(LazySingleton.class);
115117

116118
binder.bind(QueryCountStatsProvider.class).to(AsyncQueryForwardingServlet.class).in(LazySingleton.class);
119+
MetricsModule.register(binder, RouterHttpClientMonitor.class);
117120
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
118121

119122
Jerseys.addResource(binder, RouterResource.class);

0 commit comments

Comments
 (0)