Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-26161.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Include pipeline processing-load snapshot in support bundles."

issues = ["Graylog2/graylog-plugin-enterprise#14145"]
pulls = ["26161"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.plugins.pipelineprocessor.rest;

import com.google.common.collect.Maps;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination;
import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse;

import java.util.List;
import java.util.Map;

/**
* Builds a {@link ProcessingLoadResponse} from cluster-wide debug timer data, shared by the
* {@code /system/pipelines/processing-load} endpoint and the support-bundle snapshot so both stay
* in sync. Callers pass a {@link NodeMetricsFetcher}, keeping this class agnostic of how the
* per-node {@code multipleMetrics} fan-out is transported.
*/
@Singleton
public class ProcessingLoadBuilder {

private final PipelineInterpreterStateUpdater stateUpdater;
private final RuleMetricsConfigService ruleMetricsConfigService;
private final ProcessingLoadService processingLoadService;
private final NodeTimerSnapshotParser snapshotParser;

@Inject
public ProcessingLoadBuilder(PipelineInterpreterStateUpdater stateUpdater,
RuleMetricsConfigService ruleMetricsConfigService,
ProcessingLoadService processingLoadService,
NodeTimerSnapshotParser snapshotParser) {
this.stateUpdater = stateUpdater;
this.ruleMetricsConfigService = ruleMetricsConfigService;
this.processingLoadService = processingLoadService;
this.snapshotParser = snapshotParser;
}

public boolean metricsEnabled() {
return ruleMetricsConfigService.get().metricsEnabled();
}

/**
* The active pipeline-stage-rule combinations, or empty if debug metrics are off. Callers can
* permission-check these before the cluster fan-out, then pass the same list to
* {@link #buildUnfiltered(List, NodeMetricsFetcher)} to avoid resolving them twice.
*/
List<ActiveCombination> activeCombinations() {
if (!metricsEnabled()) {
return List.of();
}
return processingLoadService.activeCombinations(stateUpdater.getLatestState());
}

/**
* Builds the unfiltered response, resolving the active combinations itself. Returns
* {@code unavailable} when metrics are off, no combinations are active, or no node reported data.
*/
public ProcessingLoadResponse buildUnfiltered(NodeMetricsFetcher fetcher) {
return buildUnfiltered(activeCombinations(), fetcher);
}

/**
* Builds the unfiltered response from already-resolved combinations, so a caller that needed
* them (e.g. for a permission check) doesn't recompute them.
*/
ProcessingLoadResponse buildUnfiltered(List<ActiveCombination> combinations, NodeMetricsFetcher fetcher) {
if (combinations.isEmpty()) {
return ProcessingLoadResponse.unavailable();
}

final List<String> timerNames = processingLoadService.expectedTimerNames(combinations);
final Map<String, MetricsSummaryResponse> perNodeResponses = fetcher.fetch(timerNames);
final Map<String, NodeTimerSnapshot> perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size());
perNodeResponses.forEach((nodeId, response) -> perNodeSnapshots.put(nodeId, snapshotParser.parse(response)));

return processingLoadService.compute(combinations, perNodeSnapshots);
}

/**
* Fans a {@code multipleMetrics} call out to every node, returning responses keyed by node id.
* The implementation supplies the transport (REST proxy vs. support-bundle helper).
* A node missing from the map simply has no data.
*/
@FunctionalInterface
public interface NodeMetricsFetcher {
Map<String, MetricsSummaryResponse> fetch(List<String> timerNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.graylog.plugins.pipelineprocessor.rest;

import com.google.common.collect.Maps;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.inject.Inject;
Expand All @@ -29,9 +28,6 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.cluster.NodeService;
Expand All @@ -44,8 +40,8 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

@RequiresAuthentication
@PublicCloudAPI
Expand All @@ -54,25 +50,19 @@
@Produces(MediaType.APPLICATION_JSON)
public class ProcessingLoadResource extends ProxiedResource {

private final PipelineInterpreterStateUpdater stateUpdater;
private final RuleMetricsConfigService ruleMetricsConfigService;
private final ProcessingLoadBuilder processingLoadBuilder;
private final ProcessingLoadService processingLoadService;
private final NodeTimerSnapshotParser snapshotParser;

@Inject
public ProcessingLoadResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService,
PipelineInterpreterStateUpdater stateUpdater,
RuleMetricsConfigService ruleMetricsConfigService,
ProcessingLoadService processingLoadService,
NodeTimerSnapshotParser snapshotParser) {
ProcessingLoadBuilder processingLoadBuilder,
ProcessingLoadService processingLoadService) {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
this.stateUpdater = stateUpdater;
this.ruleMetricsConfigService = ruleMetricsConfigService;
this.processingLoadBuilder = processingLoadBuilder;
this.processingLoadService = processingLoadService;
this.snapshotParser = snapshotParser;
}

@GET
Expand All @@ -81,43 +71,24 @@ public ProcessingLoadResource(NodeService nodeService,
description = "Returns Processing Load % per pipeline-stage-rule, per pipeline, and per rule. " +
"Requires debug metrics enable.")
public ProcessingLoadResponse processingLoad() {
if (!ruleMetricsConfigService.get().metricsEnabled()) {
return ProcessingLoadResponse.unavailable();
}

final PipelineInterpreter.State state = stateUpdater.getLatestState();
final List<ActiveCombination> combinations = processingLoadService.activeCombinations(state);

final List<ActiveCombination> combinations = processingLoadBuilder.activeCombinations();
if (combinations.isEmpty()) {
return ProcessingLoadResponse.unavailable();
}

if (!hasAnyVisibleEntity(combinations)) {
if (combinations.stream().noneMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId()))) {
throw new ForbiddenException("No read permission on any active pipeline or rule.");
}

final List<String> expectedTimerNames = processingLoadService.expectedTimerNames(combinations);
final Map<String, NodeTimerSnapshot> perNodeSnapshots = fetchNodeSnapshots(expectedTimerNames);
final ProcessingLoadResponse full = processingLoadService.compute(combinations, perNodeSnapshots);
final ProcessingLoadResponse full = processingLoadBuilder.buildUnfiltered(combinations, this::fetchPerNodeMetrics);
return processingLoadService.filterByPermissions(full, this::canReadPipeline, this::canReadRule);
}

private Map<String, NodeTimerSnapshot> fetchNodeSnapshots(List<String> metricNames) {
final MetricsReadRequest request = MetricsReadRequest.create(metricNames);
final Map<String, Optional<MetricsSummaryResponse>> perNodeResponses = stripCallResult(
requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request))
);

final Map<String, NodeTimerSnapshot> perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size());
perNodeResponses.forEach((nodeId, response) ->
perNodeSnapshots.put(nodeId, response.map(snapshotParser::parse).orElseGet(NodeTimerSnapshot::empty))
);
return perNodeSnapshots;
}

private boolean hasAnyVisibleEntity(List<ActiveCombination> combinations) {
return combinations.stream()
.anyMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId()));
private Map<String, MetricsSummaryResponse> fetchPerNodeMetrics(List<String> timerNames) {
final MetricsReadRequest request = MetricsReadRequest.create(timerNames);
return stripCallResult(requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request)))
.entrySet().stream()
.filter(e -> e.getValue().isPresent())
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
}

private boolean canReadPipeline(String pipelineId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.logging.log4j.core.appender.RollingFileAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.shiro.subject.Subject;
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadResponse;
import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadBuilder;
import org.graylog.security.certutil.KeyStoreDto;
import org.graylog2.cluster.NodeService;
import org.graylog2.cluster.nodes.DataNodeDto;
Expand All @@ -42,6 +44,7 @@
import org.graylog2.log4j.MemoryAppender;
import org.graylog2.plugin.system.SimpleNodeId;
import org.graylog2.rest.RemoteInterfaceProvider;
import org.graylog2.rest.models.system.metrics.requests.MetricsReadRequest;
import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse;
import org.graylog2.rest.models.system.plugins.responses.PluginList;
import org.graylog2.rest.models.system.responses.SystemJVMResponse;
Expand Down Expand Up @@ -134,6 +137,7 @@ public class SupportBundleService {
private final List<URI> elasticsearchHosts;
private final ClusterAdapter searchDbClusterAdapter;
private final DatanodeRestApiProxy datanodeProxy;
private final ProcessingLoadBuilder processingLoadBuilder;

@Inject
public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorService executor,
Expand All @@ -145,7 +149,9 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer
ClusterStatsService clusterStatsService,
VersionProbeFactory searchDbProbeFactory,
@IndexerHosts List<URI> searchDbHosts,
ClusterAdapter searchDbClusterAdapter, DatanodeRestApiProxy datanodeProxy) {
ClusterAdapter searchDbClusterAdapter,
DatanodeRestApiProxy datanodeProxy,
ProcessingLoadBuilder processingLoadBuilder) {
this.executor = executor;
this.nodeService = nodeService;
this.datanodeService = datanodeService;
Expand All @@ -157,6 +163,7 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer
this.elasticsearchHosts = searchDbHosts;
this.searchDbClusterAdapter = searchDbClusterAdapter;
this.datanodeProxy = datanodeProxy;
this.processingLoadBuilder = processingLoadBuilder;
}

public void buildBundle(HttpHeaders httpHeaders, Subject currentSubject) {
Expand Down Expand Up @@ -190,7 +197,9 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper,

// fetchClusterInfos runs concurrently with per-node collection — they are independent.
// Its requestOnAllNodes wrappers run on their own orchestrationExecutor (created inside
// the method), so only 4 simple leaf tasks land on `executor` — no starvation risk.
// the method), so only 4 simple leaf tasks land on `executor`.
// tryFetchProcessingLoadSnapshot adds one more `executor` task that runs at most one fan-out inline
// (none when debug metrics are off).
final List<CompletableFuture<Void>> futures = new ArrayList<>();

nodeManifests.entrySet().stream()
Expand All @@ -204,6 +213,11 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper,
futures.add(CompletableFuture.runAsync(
() -> tryFetchClusterInfos(proxiedResourceHelper, nodeManifests, spoolDir, errors), executor));

futures.add(CompletableFuture.runAsync(
() -> tryFetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors),
executor
));

CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get();

writeErrors(errors, spoolDir);
Expand Down Expand Up @@ -231,6 +245,52 @@ private void tryFetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper,
}
}

private void tryFetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper,
Path spoolDir, List<BundleError> errors) {
try {
if (!processingLoadBuilder.metricsEnabled()) {
return;
}
fetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors);
} catch (Exception e) {
LOG.warn("Failed to collect pipeline processing-load snapshot for support bundle, skipping", e);
errors.add(BundleError.of("cluster/processing-load", e));
}
}

private void fetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper, Path spoolDir,
List<BundleError> errors) throws IOException {
final ProcessingLoadResponse snapshot = processingLoadBuilder.buildUnfiltered(
timerNames -> fetchPerNodeMetrics(proxiedResourceHelper, timerNames, errors));

if (!snapshot.available()) {
errors.add(new BundleError("cluster/processing-load",
"Debug metrics are enabled but the Processing Load snapshot had no usable data "
+ "(no active pipeline rules, no traffic in the last collection window, or "
+ "per-node metric collection failed. See any node-level processing-load "
+ "errors). Snapshot omitted.", null));
Comment thread
patrickmann marked this conversation as resolved.
return;
}

try (var snapshotFile = new FileOutputStream(spoolDir.resolve("pipeline-processing-load.json").toFile())) {
objectMapper.writerWithDefaultPrettyPrinter().writeValue(snapshotFile, snapshot);
}
}

private Map<String, MetricsSummaryResponse> fetchPerNodeMetrics(
ProxiedResourceHelper proxiedResourceHelper, List<String> timerNames, List<BundleError> errors) {
final MetricsReadRequest request = MetricsReadRequest.create(timerNames);
return stripCallResult(
proxiedResourceHelper.requestOnAllNodes(
RemoteMetricsResource.class,
r -> r.multipleMetrics(request),
CALL_TIMEOUT
),
errors,
"processing-load"
);
}

private void fetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper, Map<String, SupportBundleNodeManifest> nodeManifests, Path tmpDir, List<BundleError> errors) throws IOException {
// requestOnAllNodes submits per-node tasks to `executor` then blocks on Future#get.
// A separate short-lived orchestration executor runs these blocking fan-outs so they
Expand Down
Loading
Loading