diff --git a/changelog/unreleased/pr-26161.toml b/changelog/unreleased/pr-26161.toml new file mode 100644 index 000000000000..e062656205e7 --- /dev/null +++ b/changelog/unreleased/pr-26161.toml @@ -0,0 +1,5 @@ +type = "a" +message = "Include pipeline processing-load snapshot in support bundles." + +issues = ["Graylog2/graylog-plugin-enterprise#14145"] +pulls = ["26161"] diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilder.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilder.java new file mode 100644 index 000000000000..9016a513e39f --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilder.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.plugins.pipelineprocessor.rest; + +import com.google.common.collect.Maps; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService; +import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination; +import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse; + +import java.util.List; +import java.util.Map; + +/** + * Builds a {@link ProcessingLoadResponse} from cluster-wide debug timer data, shared by the + * {@code /system/pipelines/processing-load} endpoint and the support-bundle snapshot so both stay + * in sync. Callers pass a {@link NodeMetricsFetcher}, keeping this class agnostic of how the + * per-node {@code multipleMetrics} fan-out is transported. + */ +@Singleton +public class ProcessingLoadBuilder { + + private final PipelineInterpreterStateUpdater stateUpdater; + private final RuleMetricsConfigService ruleMetricsConfigService; + private final ProcessingLoadService processingLoadService; + private final NodeTimerSnapshotParser snapshotParser; + + @Inject + public ProcessingLoadBuilder(PipelineInterpreterStateUpdater stateUpdater, + RuleMetricsConfigService ruleMetricsConfigService, + ProcessingLoadService processingLoadService, + NodeTimerSnapshotParser snapshotParser) { + this.stateUpdater = stateUpdater; + this.ruleMetricsConfigService = ruleMetricsConfigService; + this.processingLoadService = processingLoadService; + this.snapshotParser = snapshotParser; + } + + public boolean metricsEnabled() { + return ruleMetricsConfigService.get().metricsEnabled(); + } + + /** + * The active pipeline-stage-rule combinations, or empty if debug metrics are off. Callers can + * permission-check these before the cluster fan-out, then pass the same list to + * {@link #buildUnfiltered(List, NodeMetricsFetcher)} to avoid resolving them twice. + */ + List activeCombinations() { + if (!metricsEnabled()) { + return List.of(); + } + return processingLoadService.activeCombinations(stateUpdater.getLatestState()); + } + + /** + * Builds the unfiltered response, resolving the active combinations itself. Returns + * {@code unavailable} when metrics are off, no combinations are active, or no node reported data. + */ + public ProcessingLoadResponse buildUnfiltered(NodeMetricsFetcher fetcher) { + return buildUnfiltered(activeCombinations(), fetcher); + } + + /** + * Builds the unfiltered response from already-resolved combinations, so a caller that needed + * them (e.g. for a permission check) doesn't recompute them. + */ + ProcessingLoadResponse buildUnfiltered(List combinations, NodeMetricsFetcher fetcher) { + if (combinations.isEmpty()) { + return ProcessingLoadResponse.unavailable(); + } + + final List timerNames = processingLoadService.expectedTimerNames(combinations); + final Map perNodeResponses = fetcher.fetch(timerNames); + final Map perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size()); + perNodeResponses.forEach((nodeId, response) -> perNodeSnapshots.put(nodeId, snapshotParser.parse(response))); + + return processingLoadService.compute(combinations, perNodeSnapshots); + } + + /** + * Fans a {@code multipleMetrics} call out to every node, returning responses keyed by node id. + * The implementation supplies the transport (REST proxy vs. support-bundle helper). + * A node missing from the map simply has no data. + */ + @FunctionalInterface + public interface NodeMetricsFetcher { + Map fetch(List timerNames); + } +} diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadResource.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadResource.java index 8f8c9a72158b..074358816783 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadResource.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadResource.java @@ -16,7 +16,6 @@ */ package org.graylog.plugins.pipelineprocessor.rest; -import com.google.common.collect.Maps; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.inject.Inject; @@ -29,9 +28,6 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; import org.apache.shiro.authz.annotation.RequiresAuthentication; -import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService; -import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; -import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater; import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination; import org.graylog2.audit.jersey.NoAuditEvent; import org.graylog2.cluster.NodeService; @@ -44,8 +40,8 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; @RequiresAuthentication @PublicCloudAPI @@ -54,25 +50,19 @@ @Produces(MediaType.APPLICATION_JSON) public class ProcessingLoadResource extends ProxiedResource { - private final PipelineInterpreterStateUpdater stateUpdater; - private final RuleMetricsConfigService ruleMetricsConfigService; + private final ProcessingLoadBuilder processingLoadBuilder; private final ProcessingLoadService processingLoadService; - private final NodeTimerSnapshotParser snapshotParser; @Inject public ProcessingLoadResource(NodeService nodeService, RemoteInterfaceProvider remoteInterfaceProvider, @Context HttpHeaders httpHeaders, @Named("proxiedRequestsExecutorService") ExecutorService executorService, - PipelineInterpreterStateUpdater stateUpdater, - RuleMetricsConfigService ruleMetricsConfigService, - ProcessingLoadService processingLoadService, - NodeTimerSnapshotParser snapshotParser) { + ProcessingLoadBuilder processingLoadBuilder, + ProcessingLoadService processingLoadService) { super(httpHeaders, nodeService, remoteInterfaceProvider, executorService); - this.stateUpdater = stateUpdater; - this.ruleMetricsConfigService = ruleMetricsConfigService; + this.processingLoadBuilder = processingLoadBuilder; this.processingLoadService = processingLoadService; - this.snapshotParser = snapshotParser; } @GET @@ -81,43 +71,24 @@ public ProcessingLoadResource(NodeService nodeService, description = "Returns Processing Load % per pipeline-stage-rule, per pipeline, and per rule. " + "Requires debug metrics enable.") public ProcessingLoadResponse processingLoad() { - if (!ruleMetricsConfigService.get().metricsEnabled()) { - return ProcessingLoadResponse.unavailable(); - } - - final PipelineInterpreter.State state = stateUpdater.getLatestState(); - final List combinations = processingLoadService.activeCombinations(state); - + final List combinations = processingLoadBuilder.activeCombinations(); if (combinations.isEmpty()) { return ProcessingLoadResponse.unavailable(); } - - if (!hasAnyVisibleEntity(combinations)) { + if (combinations.stream().noneMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId()))) { throw new ForbiddenException("No read permission on any active pipeline or rule."); } - final List expectedTimerNames = processingLoadService.expectedTimerNames(combinations); - final Map perNodeSnapshots = fetchNodeSnapshots(expectedTimerNames); - final ProcessingLoadResponse full = processingLoadService.compute(combinations, perNodeSnapshots); + final ProcessingLoadResponse full = processingLoadBuilder.buildUnfiltered(combinations, this::fetchPerNodeMetrics); return processingLoadService.filterByPermissions(full, this::canReadPipeline, this::canReadRule); } - private Map fetchNodeSnapshots(List metricNames) { - final MetricsReadRequest request = MetricsReadRequest.create(metricNames); - final Map> perNodeResponses = stripCallResult( - requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request)) - ); - - final Map perNodeSnapshots = Maps.newHashMapWithExpectedSize(perNodeResponses.size()); - perNodeResponses.forEach((nodeId, response) -> - perNodeSnapshots.put(nodeId, response.map(snapshotParser::parse).orElseGet(NodeTimerSnapshot::empty)) - ); - return perNodeSnapshots; - } - - private boolean hasAnyVisibleEntity(List combinations) { - return combinations.stream() - .anyMatch(c -> canReadPipeline(c.pipelineId()) || canReadRule(c.ruleId())); + private Map fetchPerNodeMetrics(List timerNames) { + final MetricsReadRequest request = MetricsReadRequest.create(timerNames); + return stripCallResult(requestOnAllNodes(RemoteMetricsResource.class, r -> r.multipleMetrics(request))) + .entrySet().stream() + .filter(e -> e.getValue().isPresent()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); } private boolean canReadPipeline(String pipelineId) { diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleService.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleService.java index 63123aefd618..e87255692d85 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleService.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleService.java @@ -34,6 +34,8 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender; import org.apache.logging.log4j.core.config.Configuration; import org.apache.shiro.subject.Subject; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadResponse; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadBuilder; import org.graylog.security.certutil.KeyStoreDto; import org.graylog2.cluster.NodeService; import org.graylog2.cluster.nodes.DataNodeDto; @@ -42,6 +44,7 @@ import org.graylog2.log4j.MemoryAppender; import org.graylog2.plugin.system.SimpleNodeId; import org.graylog2.rest.RemoteInterfaceProvider; +import org.graylog2.rest.models.system.metrics.requests.MetricsReadRequest; import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse; import org.graylog2.rest.models.system.plugins.responses.PluginList; import org.graylog2.rest.models.system.responses.SystemJVMResponse; @@ -134,6 +137,7 @@ public class SupportBundleService { private final List elasticsearchHosts; private final ClusterAdapter searchDbClusterAdapter; private final DatanodeRestApiProxy datanodeProxy; + private final ProcessingLoadBuilder processingLoadBuilder; @Inject public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorService executor, @@ -145,7 +149,9 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer ClusterStatsService clusterStatsService, VersionProbeFactory searchDbProbeFactory, @IndexerHosts List searchDbHosts, - ClusterAdapter searchDbClusterAdapter, DatanodeRestApiProxy datanodeProxy) { + ClusterAdapter searchDbClusterAdapter, + DatanodeRestApiProxy datanodeProxy, + ProcessingLoadBuilder processingLoadBuilder) { this.executor = executor; this.nodeService = nodeService; this.datanodeService = datanodeService; @@ -157,6 +163,7 @@ public SupportBundleService(@Named("proxiedRequestsExecutorService") ExecutorSer this.elasticsearchHosts = searchDbHosts; this.searchDbClusterAdapter = searchDbClusterAdapter; this.datanodeProxy = datanodeProxy; + this.processingLoadBuilder = processingLoadBuilder; } public void buildBundle(HttpHeaders httpHeaders, Subject currentSubject) { @@ -190,7 +197,9 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper, // fetchClusterInfos runs concurrently with per-node collection — they are independent. // Its requestOnAllNodes wrappers run on their own orchestrationExecutor (created inside - // the method), so only 4 simple leaf tasks land on `executor` — no starvation risk. + // the method), so only 4 simple leaf tasks land on `executor`. + // tryFetchProcessingLoadSnapshot adds one more `executor` task that runs at most one fan-out inline + // (none when debug metrics are off). final List> futures = new ArrayList<>(); nodeManifests.entrySet().stream() @@ -204,6 +213,11 @@ private void collectBundleData(ProxiedResourceHelper proxiedResourceHelper, futures.add(CompletableFuture.runAsync( () -> tryFetchClusterInfos(proxiedResourceHelper, nodeManifests, spoolDir, errors), executor)); + futures.add(CompletableFuture.runAsync( + () -> tryFetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors), + executor + )); + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(); writeErrors(errors, spoolDir); @@ -231,6 +245,52 @@ private void tryFetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper, } } + private void tryFetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper, + Path spoolDir, List errors) { + try { + if (!processingLoadBuilder.metricsEnabled()) { + return; + } + fetchProcessingLoadSnapshot(proxiedResourceHelper, spoolDir, errors); + } catch (Exception e) { + LOG.warn("Failed to collect pipeline processing-load snapshot for support bundle, skipping", e); + errors.add(BundleError.of("cluster/processing-load", e)); + } + } + + private void fetchProcessingLoadSnapshot(ProxiedResourceHelper proxiedResourceHelper, Path spoolDir, + List errors) throws IOException { + final ProcessingLoadResponse snapshot = processingLoadBuilder.buildUnfiltered( + timerNames -> fetchPerNodeMetrics(proxiedResourceHelper, timerNames, errors)); + + if (!snapshot.available()) { + errors.add(new BundleError("cluster/processing-load", + "Debug metrics are enabled but the Processing Load snapshot had no usable data " + + "(no active pipeline rules, no traffic in the last collection window, or " + + "per-node metric collection failed. See any node-level processing-load " + + "errors). Snapshot omitted.", null)); + return; + } + + try (var snapshotFile = new FileOutputStream(spoolDir.resolve("pipeline-processing-load.json").toFile())) { + objectMapper.writerWithDefaultPrettyPrinter().writeValue(snapshotFile, snapshot); + } + } + + private Map fetchPerNodeMetrics( + ProxiedResourceHelper proxiedResourceHelper, List timerNames, List errors) { + final MetricsReadRequest request = MetricsReadRequest.create(timerNames); + return stripCallResult( + proxiedResourceHelper.requestOnAllNodes( + RemoteMetricsResource.class, + r -> r.multipleMetrics(request), + CALL_TIMEOUT + ), + errors, + "processing-load" + ); + } + private void fetchClusterInfos(ProxiedResourceHelper proxiedResourceHelper, Map nodeManifests, Path tmpDir, List errors) throws IOException { // requestOnAllNodes submits per-node tasks to `executor` then blocks on Future#get. // A separate short-lived orchestration executor runs these blocking fan-outs so they diff --git a/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilderTest.java b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilderTest.java new file mode 100644 index 000000000000..c0524b9c0d1a --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/rest/ProcessingLoadBuilderTest.java @@ -0,0 +1,238 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.plugins.pipelineprocessor.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.graylog.plugins.pipelineprocessor.ast.Pipeline; +import org.graylog.plugins.pipelineprocessor.ast.Rule; +import org.graylog.plugins.pipelineprocessor.ast.Stage; +import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigDto; +import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService; +import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; +import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater; +import org.graylog.plugins.pipelineprocessor.processors.listeners.RuleMetricsListener; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadService.ActiveCombination; +import org.graylog2.rest.models.system.metrics.responses.MetricsSummaryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.codahale.metrics.MetricRegistry.name; +import static org.assertj.core.api.Assertions.assertThat; +import static org.graylog.plugins.pipelineprocessor.processors.listeners.RuleMetricsListener.Type.EVALUATE; +import static org.graylog.plugins.pipelineprocessor.processors.listeners.RuleMetricsListener.Type.EXECUTE; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ProcessingLoadBuilderTest { + + private static final String RULE_A = "rule-A"; + private static final String PIPELINE_P = "pipeline-P"; + private static final int STAGE_0 = 0; + + private PipelineInterpreterStateUpdater stateUpdater; + private RuleMetricsConfigService ruleMetricsConfigService; + private ProcessingLoadBuilder processingLoadBuilder; + + @BeforeEach + void setUp() { + stateUpdater = mock(PipelineInterpreterStateUpdater.class); + ruleMetricsConfigService = mock(RuleMetricsConfigService.class); + final NodeTimerSnapshotParser snapshotParser = new NodeTimerSnapshotParser(new ObjectMapper()); + processingLoadBuilder = new ProcessingLoadBuilder(stateUpdater, ruleMetricsConfigService, new ProcessingLoadService(), snapshotParser); + } + + @Test + void unavailableWhenDebugMetricsOff() { + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(false)); + + final AtomicBoolean fetcherInvoked = new AtomicBoolean(false); + final ProcessingLoadResponse response = processingLoadBuilder.buildUnfiltered(timerNames -> { + fetcherInvoked.set(true); + return Map.of(); + }); + + assertThat(response.available()).isFalse(); + assertThat(fetcherInvoked.get()).isFalse(); + } + + @Test + void unavailableWhenNoActiveCombinations() { + final PipelineInterpreter.State state = stateWith(); + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + when(stateUpdater.getLatestState()).thenReturn(state); + + final AtomicBoolean fetcherInvoked = new AtomicBoolean(false); + final ProcessingLoadResponse response = processingLoadBuilder.buildUnfiltered(timerNames -> { + fetcherInvoked.set(true); + return Map.of(); + }); + + assertThat(response.available()).isFalse(); + assertThat(fetcherInvoked.get()).isFalse(); + } + + @Test + void buildsResponseWhenMetricsEnabledAndDataPresent() { + final PipelineInterpreter.State state = stateWithSinglePipeline(PIPELINE_P, STAGE_0, RULE_A); + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + when(stateUpdater.getLatestState()).thenReturn(state); + + final ProcessingLoadResponse response = processingLoadBuilder.buildUnfiltered(timerNames -> { + assertThat(timerNames).containsExactlyInAnyOrder( + timerName(RULE_A, PIPELINE_P, STAGE_0, EVALUATE), + timerName(RULE_A, PIPELINE_P, STAGE_0, EXECUTE) + ); + return Map.of("node-1", metricsResponse(List.of( + timerEntry(timerName(RULE_A, PIPELINE_P, STAGE_0, EVALUATE), 100.0d, 10.0d), + timerEntry(timerName(RULE_A, PIPELINE_P, STAGE_0, EXECUTE), 100.0d, 90.0d) + ))); + }); + + assertThat(response.available()).isTrue(); + assertThat(response.totalCostMicrosecondsPerSecond()).isEqualTo(10_000.0d); + assertThat(response.rules()).extracting(RuleLoad::loadPercent).containsExactly(100.00d); + } + + @Test + void survivesNodeWithoutResponse() { + final PipelineInterpreter.State state = stateWithSinglePipeline(PIPELINE_P, STAGE_0, RULE_A); + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + when(stateUpdater.getLatestState()).thenReturn(state); + + final ProcessingLoadResponse response = processingLoadBuilder.buildUnfiltered(timerNames -> Map.of( + "node-1", metricsResponse(List.of( + timerEntry(timerName(RULE_A, PIPELINE_P, STAGE_0, EVALUATE), 100.0d, 10.0d), + timerEntry(timerName(RULE_A, PIPELINE_P, STAGE_0, EXECUTE), 100.0d, 90.0d) + )) + )); + + assertThat(response.available()).isTrue(); + assertThat(response.totalCostMicrosecondsPerSecond()).isEqualTo(10_000.0d); + } + + @Test + void metricsEnabledReflectsConfig() { + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + assertThat(processingLoadBuilder.metricsEnabled()).isTrue(); + + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(false)); + assertThat(processingLoadBuilder.metricsEnabled()).isFalse(); + } + + @Test + void activeCombinationsEmptyWhenMetricsOff() { + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(false)); + assertThat(processingLoadBuilder.activeCombinations()).isEmpty(); + } + + @Test + void activeCombinationsEmptyWhenNoCombinations() { + final PipelineInterpreter.State state = stateWith(); + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + when(stateUpdater.getLatestState()).thenReturn(state); + + assertThat(processingLoadBuilder.activeCombinations()).isEmpty(); + } + + @Test + void activeCombinationsReturnsRuleAndPipelineIds() { + final PipelineInterpreter.State state = stateWithSinglePipeline(PIPELINE_P, STAGE_0, RULE_A); + when(ruleMetricsConfigService.get()).thenReturn(metricsConfig(true)); + when(stateUpdater.getLatestState()).thenReturn(state); + + final List combinations = processingLoadBuilder.activeCombinations(); + assertThat(combinations).hasSize(1); + assertThat(combinations.getFirst().ruleId()).isEqualTo(RULE_A); + assertThat(combinations.getFirst().pipelineId()).isEqualTo(PIPELINE_P); + } + + private static RuleMetricsConfigDto metricsConfig(boolean enabled) { + return RuleMetricsConfigDto.builder().metricsEnabled(enabled).build(); + } + + private static MetricsSummaryResponse metricsResponse(List> entries) { + return MetricsSummaryResponse.create(entries.size(), new ArrayList<>(entries)); + } + + private static Map timerEntry(String name, double fifteenMinuteRate, double meanMicroseconds) { + return Map.of( + "full_name", name, + "metric", Map.of( + "rate", Map.of("fifteen_minute", fifteenMinuteRate), + "time", Map.of("mean", meanMicroseconds) + ) + ); + } + + private static String timerName(String ruleId, String pipelineId, int stage, RuleMetricsListener.Type type) { + return RuleMetricsListener.getMetricName(name(ruleId, pipelineId, String.valueOf(stage)), type); + } + + private static PipelineInterpreter.State stateWith(Pipeline... pipelines) { + final ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Pipeline pipeline : pipelines) { + builder.put(pipeline.id(), pipeline); + } + final PipelineInterpreter.State state = mock(PipelineInterpreter.State.class); + when(state.getCurrentPipelines()).thenReturn(builder.build()); + return state; + } + + private static PipelineInterpreter.State stateWithSinglePipeline(String pipelineId, int stageNumber, String... ruleIds) { + final Rule[] rules = new Rule[ruleIds.length]; + for (int i = 0; i < ruleIds.length; i++) { + rules[i] = mockRule(ruleIds[i]); + } + return stateWith(pipeline(pipelineId, List.of(stage(stageNumber, rules)))); + } + + private static Rule mockRule(String id) { + final Rule rule = mock(Rule.class); + when(rule.id()).thenReturn(id); + when(rule.name()).thenReturn(id); + return rule; + } + + private static Stage stage(int stageNumber, Rule... rules) { + final Stage stage = Stage.builder() + .stage(stageNumber) + .match(Stage.Match.ALL) + .ruleReferences(List.of(rules).stream().map(Rule::name).toList()) + .build(); + stage.setRules(List.of(rules)); + return stage; + } + + private static Pipeline pipeline(String pipelineId, List stages) { + final SortedSet sortedStages = new TreeSet<>(Comparator.comparingInt(Stage::stage)); + sortedStages.addAll(stages); + return Pipeline.builder() + .id(pipelineId) + .name(pipelineId) + .stages(sortedStages) + .build(); + } +} diff --git a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceIT.java b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceIT.java index a15d45ee43ae..9931a75b8917 100644 --- a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceIT.java +++ b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceIT.java @@ -20,6 +20,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import jakarta.ws.rs.core.HttpHeaders; import org.apache.shiro.subject.Subject; +import org.graylog.plugins.pipelineprocessor.rest.PipelineLoad; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadResponse; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadBuilder; +import org.graylog.plugins.pipelineprocessor.rest.RuleLoad; +import org.graylog.plugins.pipelineprocessor.rest.StageRuleLoad; import org.graylog2.cluster.Node; import org.graylog2.cluster.NodeService; import org.graylog2.cluster.TestNodeService; @@ -35,12 +40,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Stream; @@ -49,6 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.graylog2.shared.utilities.StringUtils.f; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -67,6 +73,7 @@ class SupportBundleServiceIT { private VersionProbeFactory versionProbeFactory; private ClusterAdapter searchDbClusterAdapter; private DatanodeRestApiProxy datanodeProxy; + private ProcessingLoadBuilder processingLoadBuilder; @BeforeEach void setUp() { @@ -79,6 +86,7 @@ void setUp() { versionProbeFactory = mock(VersionProbeFactory.class); searchDbClusterAdapter = mock(ClusterAdapter.class); datanodeProxy = mock(DatanodeRestApiProxy.class); + processingLoadBuilder = mock(ProcessingLoadBuilder.class); } @AfterEach @@ -101,25 +109,17 @@ void buildBundleStillSucceedsAndWritesErrorsJsonWhenCollectionsFail() throws Exc assertThatNoException().isThrownBy( () -> service.buildBundle(mock(HttpHeaders.class), mock(Subject.class))); - // A zip file must have been written to the bundle dir. - final Path bundleDir = tempDir.resolve("support-bundle"); - try (final Stream files = Files.list(bundleDir)) { - final Optional zipFile = files.filter(p -> p.getFileName().toString().endsWith(".zip")).findFirst(); - assertThat(zipFile).as("bundle zip file").isPresent(); - - // The zip must contain errors.json with the collected error. - try (final ZipFile zip = new ZipFile(zipFile.get().toFile())) { - final var errorsEntry = zip.getEntry("errors.json"); - assertThat(errorsEntry).as("errors.json entry in bundle zip").isNotNull(); - - final String content = new String( - zip.getInputStream(errorsEntry).readAllBytes(), StandardCharsets.UTF_8); - assertThat(content) - .contains("cluster/datanode-info") - .contains("Simulated datanode service failure"); - } - } + // The zip must contain errors.json with the collected error. + try (final ZipFile zip = openBundleZip()) { + final var errorsEntry = zip.getEntry("errors.json"); + assertThat(errorsEntry).as("errors.json entry in bundle zip").isNotNull(); + final String content = new String( + zip.getInputStream(errorsEntry).readAllBytes(), StandardCharsets.UTF_8); + assertThat(content) + .contains("cluster/datanode-info") + .contains("Simulated datanode service failure"); + } } @Test @@ -141,30 +141,106 @@ void buildBundleRecordsPerNodeFailuresFromStripCallResultInErrorsJson() throws E final SupportBundleService service = new SupportBundleService( executor, nodeServiceWithNode, datanodeService, remoteInterfaceProvider, tempDir, objectMapperProvider, clusterStatsService, versionProbeFactory, - List.of(), searchDbClusterAdapter, datanodeProxy); + List.of(), searchDbClusterAdapter, datanodeProxy, processingLoadBuilder); + + assertThatNoException().isThrownBy( + () -> service.buildBundle(mock(HttpHeaders.class), mock(Subject.class))); + + try (final ZipFile zip = openBundleZip()) { + final var errorsEntry = zip.getEntry("errors.json"); + assertThat(errorsEntry).as("errors.json entry in bundle zip").isNotNull(); + + final String content = new String( + zip.getInputStream(errorsEntry).readAllBytes(), StandardCharsets.UTF_8); + assertThat(content) + .contains(f("node/%s/system-overview", failingNodeId)) + .contains(f("node/%s/jvm", failingNodeId)) + .contains(f("node/%s/process-buffer-dump", failingNodeId)) + .contains(f("node/%s/installed-plugins", failingNodeId)); + } + } + + @Test + void buildBundleOmitsProcessingLoadSnapshotWhenDebugMetricsOff() throws Exception { + when(datanodeService.allActive()).thenReturn(Map.of()); + when(processingLoadBuilder.metricsEnabled()).thenReturn(false); + + final SupportBundleService service = buildService(); + assertThatNoException().isThrownBy( + () -> service.buildBundle(mock(HttpHeaders.class), mock(Subject.class))); + + try (final ZipFile zip = openBundleZip()) { + assertThat(zip.getEntry("pipeline-processing-load.json")) + .as("processing-load snapshot must be absent when debug metrics is off") + .isNull(); + } + } + @Test + void buildBundleOmitsProcessingLoadSnapshotWhenMetricsOnButUnavailable() throws Exception { + when(datanodeService.allActive()).thenReturn(Map.of()); + when(processingLoadBuilder.metricsEnabled()).thenReturn(true); + when(processingLoadBuilder.buildUnfiltered(any())).thenReturn(ProcessingLoadResponse.create( + false, 0.0d, List.of(), List.of(), List.of())); + + final SupportBundleService service = buildService(); assertThatNoException().isThrownBy( () -> service.buildBundle(mock(HttpHeaders.class), mock(Subject.class))); + try (final ZipFile zip = openBundleZip()) { + assertThat(zip.getEntry("pipeline-processing-load.json")) + .as("processing-load snapshot must be absent when no usable data is available") + .isNull(); + + final var errorsEntry = zip.getEntry("errors.json"); + assertThat(errorsEntry).as("errors.json breadcrumb entry").isNotNull(); + final String content = new String( + zip.getInputStream(errorsEntry).readAllBytes(), StandardCharsets.UTF_8); + assertThat(content) + .contains("cluster/processing-load") + .contains("Snapshot omitted"); + } + } + + @Test + void buildBundleIncludesProcessingLoadSnapshotWhenDebugMetricsOn() throws Exception { + when(datanodeService.allActive()).thenReturn(Map.of()); + when(processingLoadBuilder.metricsEnabled()).thenReturn(true); + + final ProcessingLoadResponse snapshot = ProcessingLoadResponse.create( + true, + 170_000.0d, + List.of(StageRuleLoad.create("rule-A", "pipe-1", 0, 29.41d, 100.0d)), + List.of(PipelineLoad.create("pipe-1", 100.0d)), + List.of(RuleLoad.create("rule-A", 29.41d)) + ); + when(processingLoadBuilder.buildUnfiltered(any())).thenReturn(snapshot); + + final SupportBundleService service = buildService(); + assertThatNoException().isThrownBy( + () -> service.buildBundle(mock(HttpHeaders.class), mock(Subject.class))); + + try (final ZipFile zip = openBundleZip()) { + final var snapshotEntry = zip.getEntry("pipeline-processing-load.json"); + assertThat(snapshotEntry).as("pipeline-processing-load.json entry").isNotNull(); + + final String content = new String( + zip.getInputStream(snapshotEntry).readAllBytes(), StandardCharsets.UTF_8); + assertThat(content) + .contains("\"available\" : true") + .contains("rule-A") + .contains("pipe-1") + .contains("170000"); + } + } + + private ZipFile openBundleZip() throws IOException { final Path bundleDir = tempDir.resolve("support-bundle"); try (final Stream files = Files.list(bundleDir)) { - final Optional zipFile = files - .filter(p -> p.getFileName().toString().endsWith(".zip")) - .findFirst(); - assertThat(zipFile).as("bundle zip file").isPresent(); - - try (final ZipFile zip = new ZipFile(zipFile.get().toFile())) { - final var errorsEntry = zip.getEntry("errors.json"); - assertThat(errorsEntry).as("errors.json entry in bundle zip").isNotNull(); - - final String content = new String( - zip.getInputStream(errorsEntry).readAllBytes(), StandardCharsets.UTF_8); - assertThat(content) - .contains(f("node/%s/system-overview", failingNodeId)) - .contains(f("node/%s/jvm", failingNodeId)) - .contains(f("node/%s/process-buffer-dump", failingNodeId)) - .contains(f("node/%s/installed-plugins", failingNodeId)); - } + final Path zipFile = files.filter(p -> p.getFileName().toString().endsWith(".zip")) + .findFirst() + .orElseThrow(() -> new AssertionError("no bundle zip file was written")); + return new ZipFile(zipFile.toFile()); } } @@ -173,6 +249,6 @@ private SupportBundleService buildService() { return new SupportBundleService( executor, nodeService, datanodeService, remoteInterfaceProvider, tempDir, objectMapperProvider, clusterStatsService, versionProbeFactory, - List.of(), searchDbClusterAdapter, datanodeProxy); + List.of(), searchDbClusterAdapter, datanodeProxy, processingLoadBuilder); } } diff --git a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceTest.java b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceTest.java index c86b08a11b31..f2a8198630e5 100644 --- a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/debug/bundle/SupportBundleServiceTest.java @@ -16,6 +16,7 @@ */ package org.graylog2.rest.resources.system.debug.bundle; +import org.graylog.plugins.pipelineprocessor.rest.ProcessingLoadBuilder; import org.graylog2.cluster.NodeService; import org.graylog2.rest.RemoteInterfaceProvider; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; @@ -55,6 +56,9 @@ public class SupportBundleServiceTest { @Mock private ObjectMapperProvider objectMapperProvider; + @Mock + private ProcessingLoadBuilder processingLoadBuilder; + @InjectMocks private SupportBundleService supportBundleService;