Skip to content

Commit f953fa8

Browse files
authored
Include pipeline processing-load snapshot in support bundles (#26161)
* Collect pipeline processing-load data for support bundles * Minor refactoring * Add changelog
1 parent da7767a commit f953fa8

7 files changed

Lines changed: 542 additions & 83 deletions

File tree

changelog/unreleased/pr-26161.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type = "a"
2+
message = "Include pipeline processing-load snapshot in support bundles."
3+
4+
issues = ["Graylog2/graylog-plugin-enterprise#14145"]
5+
pulls = ["26161"]
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog.plugins.pipelineprocessor.rest;
18+
19+
import com.google.common.collect.Maps;
20+
import jakarta.inject.Inject;
21+
import jakarta.inject.Singleton;
22+
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
23+
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
24+
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination;
25+
import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse;
26+
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Builds a {@link ProcessingLoadResponse} from cluster-wide debug timer data, shared by the
32+
* {@code /system/pipelines/processing-load} endpoint and the support-bundle snapshot so both stay
33+
* in sync. Callers pass a {@link NodeMetricsFetcher}, keeping this class agnostic of how the
34+
* per-node {@code multipleMetrics} fan-out is transported.
35+
*/
36+
@Singleton
37+
public class ProcessingLoadBuilder {
38+
39+
private final PipelineInterpreterStateUpdater stateUpdater;
40+
private final RuleMetricsConfigService ruleMetricsConfigService;
41+
private final ProcessingLoadService processingLoadService;
42+
private final NodeTimerSnapshotParser snapshotParser;
43+
44+
@Inject
45+
public ProcessingLoadBuilder(PipelineInterpreterStateUpdater stateUpdater,
46+
RuleMetricsConfigService ruleMetricsConfigService,
47+
ProcessingLoadService processingLoadService,
48+
NodeTimerSnapshotParser snapshotParser) {
49+
this.stateUpdater = stateUpdater;
50+
this.ruleMetricsConfigService = ruleMetricsConfigService;
51+
this.processingLoadService = processingLoadService;
52+
this.snapshotParser = snapshotParser;
53+
}
54+
55+
public boolean metricsEnabled() {
56+
return ruleMetricsConfigService.get().metricsEnabled();
57+
}
58+
59+
/**
60+
* The active pipeline-stage-rule combinations, or empty if debug metrics are off. Callers can
61+
* permission-check these before the cluster fan-out, then pass the same list to
62+
* {@link #buildUnfiltered(List, NodeMetricsFetcher)} to avoid resolving them twice.
63+
*/
64+
List<ActiveCombination> activeCombinations() {
65+
if (!metricsEnabled()) {
66+
return List.of();
67+
}
68+
return processingLoadService.activeCombinations(stateUpdater.getLatestState());
69+
}
70+
71+
/**
72+
* Builds the unfiltered response, resolving the active combinations itself. Returns
73+
* {@code unavailable} when metrics are off, no combinations are active, or no node reported data.
74+
*/
75+
public ProcessingLoadResponse buildUnfiltered(NodeMetricsFetcher fetcher) {
76+
return buildUnfiltered(activeCombinations(), fetcher);
77+
}
78+
79+
/**
80+
* Builds the unfiltered response from already-resolved combinations, so a caller that needed
81+
* them (e.g. for a permission check) doesn't recompute them.
82+
*/
83+
ProcessingLoadResponse buildUnfiltered(List<ActiveCombination> combinations, NodeMetricsFetcher fetcher) {
84+
if (combinations.isEmpty()) {
85+
return ProcessingLoadResponse.unavailable();
86+
}
87+
88+
final List<String> timerNames = processingLoadService.expectedTimerNames(combinations);
89+
final Map<String, MetricsSummaryResponse> perNodeResponses = fetcher.fetch(timerNames);
90+
final Map<String, NodeTimerSnapshot> perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size());
91+
perNodeResponses.forEach((nodeId, response) -> perNodeSnapshots.put(nodeId, snapshotParser.parse(response)));
92+
93+
return processingLoadService.compute(combinations, perNodeSnapshots);
94+
}
95+
96+
/**
97+
* Fans a {@code multipleMetrics} call out to every node, returning responses keyed by node id.
98+
* The implementation supplies the transport (REST proxy vs. support-bundle helper).
99+
* A node missing from the map simply has no data.
100+
*/
101+
@FunctionalInterface
102+
public interface NodeMetricsFetcher {
103+
Map<String, MetricsSummaryResponse> fetch(List<String> timerNames);
104+
}
105+
}

graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadResource.java

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.graylog.plugins.pipelineprocessor.rest;
1818

19-
import com.google.common.collect.Maps;
2019
import io.swagger.v3.oas.annotations.Operation;
2120
import io.swagger.v3.oas.annotations.tags.Tag;
2221
import jakarta.inject.Inject;
@@ -29,9 +28,6 @@
2928
import jakarta.ws.rs.core.HttpHeaders;
3029
import jakarta.ws.rs.core.MediaType;
3130
import org.apache.shiro.authz.annotation.RequiresAuthentication;
32-
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
33-
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
34-
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
3531
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination;
3632
import org.graylog2.audit.jersey.NoAuditEvent;
3733
import org.graylog2.cluster.NodeService;
@@ -44,8 +40,8 @@
4440

4541
import java.util.List;
4642
import java.util.Map;
47-
import java.util.Optional;
4843
import java.util.concurrent.ExecutorService;
44+
import java.util.stream.Collectors;
4945

5046
@RequiresAuthentication
5147
@PublicCloudAPI
@@ -54,25 +50,19 @@
5450
@Produces(MediaType.APPLICATION_JSON)
5551
public class ProcessingLoadResource extends ProxiedResource {
5652

57-
private final PipelineInterpreterStateUpdater stateUpdater;
58-
private final RuleMetricsConfigService ruleMetricsConfigService;
53+
private final ProcessingLoadBuilder processingLoadBuilder;
5954
private final ProcessingLoadService processingLoadService;
60-
private final NodeTimerSnapshotParser snapshotParser;
6155

6256
@Inject
6357
public ProcessingLoadResource(NodeService nodeService,
6458
RemoteInterfaceProvider remoteInterfaceProvider,
6559
@Context HttpHeaders httpHeaders,
6660
@Named("proxiedRequestsExecutorService") ExecutorService executorService,
67-
PipelineInterpreterStateUpdater stateUpdater,
68-
RuleMetricsConfigService ruleMetricsConfigService,
69-
ProcessingLoadService processingLoadService,
70-
NodeTimerSnapshotParser snapshotParser) {
61+
ProcessingLoadBuilder processingLoadBuilder,
62+
ProcessingLoadService processingLoadService) {
7163
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
72-
this.stateUpdater = stateUpdater;
73-
this.ruleMetricsConfigService = ruleMetricsConfigService;
64+
this.processingLoadBuilder = processingLoadBuilder;
7465
this.processingLoadService = processingLoadService;
75-
this.snapshotParser = snapshotParser;
7666
}
7767

7868
@GET
@@ -81,43 +71,24 @@ public ProcessingLoadResource(NodeService nodeService,
8171
description = "Returns Processing Load % per pipeline-stage-rule, per pipeline, and per rule. " +
8272
"Requires debug metrics enable.")
8373
public ProcessingLoadResponse processingLoad() {
84-
if (!ruleMetricsConfigService.get().metricsEnabled()) {
85-
return ProcessingLoadResponse.unavailable();
86-
}
87-
88-
final PipelineInterpreter.State state = stateUpdater.getLatestState();
89-
final List<ActiveCombination> combinations = processingLoadService.activeCombinations(state);
90-
74+
final List<ActiveCombination> combinations = processingLoadBuilder.activeCombinations();
9175
if (combinations.isEmpty()) {
9276
return ProcessingLoadResponse.unavailable();
9377
}
94-
95-
if (!hasAnyVisibleEntity(combinations)) {
78+
if (combinations.stream().noneMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId()))) {
9679
throw new ForbiddenException("No read permission on any active pipeline or rule.");
9780
}
9881

99-
final List<String> expectedTimerNames = processingLoadService.expectedTimerNames(combinations);
100-
final Map<String, NodeTimerSnapshot> perNodeSnapshots = fetchNodeSnapshots(expectedTimerNames);
101-
final ProcessingLoadResponse full = processingLoadService.compute(combinations, perNodeSnapshots);
82+
final ProcessingLoadResponse full = processingLoadBuilder.buildUnfiltered(combinations, this::fetchPerNodeMetrics);
10283
return processingLoadService.filterByPermissions(full, this::canReadPipeline, this::canReadRule);
10384
}
10485

105-
private Map<String, NodeTimerSnapshot> fetchNodeSnapshots(List<String> metricNames) {
106-
final MetricsReadRequest request = MetricsReadRequest.create(metricNames);
107-
final Map<String, Optional<MetricsSummaryResponse>> perNodeResponses = stripCallResult(
108-
requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request))
109-
);
110-
111-
final Map<String, NodeTimerSnapshot> perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size());
112-
perNodeResponses.forEach((nodeId, response) ->
113-
perNodeSnapshots.put(nodeId, response.map(snapshotParser::parse).orElseGet(NodeTimerSnapshot::empty))
114-
);
115-
return perNodeSnapshots;
116-
}
117-
118-
private boolean hasAnyVisibleEntity(List<ActiveCombination> combinations) {
119-
return combinations.stream()
120-
.anyMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId()));
86+
private Map<String, MetricsSummaryResponse> fetchPerNodeMetrics(List<String> timerNames) {
87+
final MetricsReadRequest request = MetricsReadRequest.create(timerNames);
88+
return stripCallResult(requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request)))
89+
.entrySet().stream()
90+
.filter(e -> e.getValue().isPresent())
91+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
12192
}
12293

12394
private boolean canReadPipeline(String pipelineId) {

graylog2-server/src/main/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleService.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.logging.log4j.core.appender.RollingFileAppender;
3535
import org.apache.logging.log4j.core.config.Configuration;
3636
import org.apache.shiro.subject.Subject;
37+
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadResponse;
38+
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadBuilder;
3739
import org.graylog.security.certutil.KeyStoreDto;
3840
import org.graylog2.cluster.NodeService;
3941
import org.graylog2.cluster.nodes.DataNodeDto;
@@ -42,6 +44,7 @@
4244
import org.graylog2.log4j.MemoryAppender;
4345
import org.graylog2.plugin.system.SimpleNodeId;
4446
import org.graylog2.rest.RemoteInterfaceProvider;
47+
import org.graylog2.rest.models.system.metrics.requests.MetricsReadRequest;
4548
import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse;
4649
import org.graylog2.rest.models.system.plugins.responses.PluginList;
4750
import org.graylog2.rest.models.system.responses.SystemJVMResponse;
@@ -134,6 +137,7 @@ public class SupportBundleService {
134137
private final List<URI> elasticsearchHosts;
135138
private final ClusterAdapter searchDbClusterAdapter;
136139
private final DatanodeRestApiProxy datanodeProxy;
140+
private final ProcessingLoadBuilder processingLoadBuilder;
137141

138142
@Inject
139143
public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorService executor,
@@ -145,7 +149,9 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer
145149
ClusterStatsService clusterStatsService,
146150
VersionProbeFactory searchDbProbeFactory,
147151
@IndexerHosts List<URI> searchDbHosts,
148-
ClusterAdapter searchDbClusterAdapter, DatanodeRestApiProxy datanodeProxy) {
152+
ClusterAdapter searchDbClusterAdapter,
153+
DatanodeRestApiProxy datanodeProxy,
154+
ProcessingLoadBuilder processingLoadBuilder) {
149155
this.executor = executor;
150156
this.nodeService = nodeService;
151157
this.datanodeService = datanodeService;
@@ -157,6 +163,7 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer
157163
this.elasticsearchHosts = searchDbHosts;
158164
this.searchDbClusterAdapter = searchDbClusterAdapter;
159165
this.datanodeProxy = datanodeProxy;
166+
this.processingLoadBuilder = processingLoadBuilder;
160167
}
161168

162169
public void buildBundle(HttpHeaders httpHeaders, Subject currentSubject) {
@@ -190,7 +197,9 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper,
190197

191198
// fetchClusterInfos runs concurrently with per-node collection — they are independent.
192199
// Its requestOnAllNodes wrappers run on their own orchestrationExecutor (created inside
193-
// the method), so only 4 simple leaf tasks land on `executor` — no starvation risk.
200+
// the method), so only 4 simple leaf tasks land on `executor`.
201+
// tryFetchProcessingLoadSnapshot adds one more `executor` task that runs at most one fan-out inline
202+
// (none when debug metrics are off).
194203
final List<CompletableFuture<Void>> futures = new ArrayList<>();
195204

196205
nodeManifests.entrySet().stream()
@@ -204,6 +213,11 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper,
204213
futures.add(CompletableFuture.runAsync(
205214
() -> tryFetchClusterInfos(proxiedResourceHelper, nodeManifests, spoolDir, errors), executor));
206215

216+
futures.add(CompletableFuture.runAsync(
217+
() -> tryFetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors),
218+
executor
219+
));
220+
207221
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get();
208222

209223
writeErrors(errors, spoolDir);
@@ -231,6 +245,52 @@ private void tryFetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper,
231245
}
232246
}
233247

248+
private void tryFetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper,
249+
Path spoolDir, List<BundleError> errors) {
250+
try {
251+
if (!processingLoadBuilder.metricsEnabled()) {
252+
return;
253+
}
254+
fetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors);
255+
} catch (Exception e) {
256+
LOG.warn("Failed to collect pipeline processing-load snapshot for support bundle, skipping", e);
257+
errors.add(BundleError.of("cluster/processing-load", e));
258+
}
259+
}
260+
261+
private void fetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper, Path spoolDir,
262+
List<BundleError> errors) throws IOException {
263+
final ProcessingLoadResponse snapshot = processingLoadBuilder.buildUnfiltered(
264+
timerNames -> fetchPerNodeMetrics(proxiedResourceHelper, timerNames, errors));
265+
266+
if (!snapshot.available()) {
267+
errors.add(new BundleError("cluster/processing-load",
268+
"Debug metrics are enabled but the Processing Load snapshot had no usable data "
269+
+ "(no active pipeline rules, no traffic in the last collection window, or "
270+
+ "per-node metric collection failed. See any node-level processing-load "
271+
+ "errors). Snapshot omitted.", null));
272+
return;
273+
}
274+
275+
try (var snapshotFile = new FileOutputStream(spoolDir.resolve("pipeline-processing-load.json").toFile())) {
276+
objectMapper.writerWithDefaultPrettyPrinter().writeValue(snapshotFile, snapshot);
277+
}
278+
}
279+
280+
private Map<String, MetricsSummaryResponse> fetchPerNodeMetrics(
281+
ProxiedResourceHelper proxiedResourceHelper, List<String> timerNames, List<BundleError> errors) {
282+
final MetricsReadRequest request = MetricsReadRequest.create(timerNames);
283+
return stripCallResult(
284+
proxiedResourceHelper.requestOnAllNodes(
285+
RemoteMetricsResource.class,
286+
r -> r.multipleMetrics(request),
287+
CALL_TIMEOUT
288+
),
289+
errors,
290+
"processing-load"
291+
);
292+
}
293+
234294
private void fetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper, Map<String, SupportBundleNodeManifest> nodeManifests, Path tmpDir, List<BundleError> errors) throws IOException {
235295
// requestOnAllNodes submits per-node tasks to `executor` then blocks on Future#get.
236296
// A separate short-lived orchestration executor runs these blocking fan-outs so they

0 commit comments

Comments
 (0)