From 51aa28f645716b593b11e821479a380c460329e9 Mon Sep 17 00:00:00 2001 From: Patrick Mann Date: Tue, 14 Apr 2026 08:55:28 +0200 Subject: [PATCH] Honor max_event_age in cluster event periodical and improve performance (`7.0`) (#25514) * Honor max_event_age in cluster event periodical and improve performance (#25265) * honor max_event_age * CL * add validation; migrate test format to junit5 style * reduce default and min period * revise default and min value * add config param documentation --------- Co-authored-by: Anton Ebel (cherry picked from commit a2c2f52295a8668afe648f5c85cc4f0b15f0658a) * Drop old cluster_events index when creating the new one The original PR replaced the compound index from (timestamp, producer, consumers) to (consumers, timestamp) but didn't remove the old index. Drop it on startup if present. Co-Authored-By: Claude Opus 4.6 (1M context) * adjust for graylog collection wrapper --------- Co-authored-by: Anton Ebel Co-authored-by: Claude Opus 4.6 (1M context) Co-authored-by: Ismail Belkacim (cherry picked from commit a3e07928756112f5316d40773184343724f16a28) --- changelog/unreleased/issue-25259.toml | 12 +++++ .../main/java/org/graylog2/Configuration.java | 6 ++- .../PositiveJavaDurationValidator.java | 31 ++++++++++++ .../events/ClusterEventCleanupPeriodical.java | 17 +++++-- .../events/ClusterEventPeriodical.java | 11 +++-- .../ClusterEventCleanupPeriodicalTest.java | 48 +++++++++++++------ .../events/ClusterEventPeriodicalTest.java | 38 +++++++-------- 7 files changed, 118 insertions(+), 45 deletions(-) create mode 100644 changelog/unreleased/issue-25259.toml create mode 100644 graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java diff --git a/changelog/unreleased/issue-25259.toml b/changelog/unreleased/issue-25259.toml new file mode 100644 index 000000000000..107a6479d3d5 --- /dev/null +++ b/changelog/unreleased/issue-25259.toml @@ -0,0 +1,12 @@ +type = "f" +message = "Honor the max_event_age configuration for cluster event cleanup. Default reduced from 24h to 12h." + +details.user = """ +A clean-up job runs at a period of max_event_age to remove old events from the cluster. Note that +events created just after the last clean-up job will be removed after max_event_age, so the effective +age of events in the cluster is between max_event_age and 2*max_event_age. +The default value of max_event_age is reduced from 24h to 12h to ensure that events are cleaned up in a more timely manner. +""" + +issues = ["25259"] +pulls = ["25265"] diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index 474ca0ee0576..672de8bf6531 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -39,6 +39,7 @@ import org.graylog2.cluster.lock.MongoLockService; import org.graylog2.configuration.Documentation; import org.graylog2.configuration.converters.JavaDurationConverter; +import org.graylog2.configuration.validators.PositiveJavaDurationValidator; import org.graylog2.notifications.Notification; import org.graylog2.outputs.BatchSizeConfig; import org.graylog2.plugin.Tools; @@ -298,8 +299,9 @@ public class Configuration extends CaConfiguration implements CommonNodeConfigur @Parameter(value = "global_inputs_only") private boolean globalInputsOnly = false; - @Parameter(value = "max_event_age", converter = JavaDurationConverter.class) - private java.time.Duration maxEventAge = java.time.Duration.ofDays(1L); + @Documentation("Maximum age of cluster events before cleanup. The cleanup runs at this interval, so effective event age is between max_event_age and 2*max_event_age. Minimum effective interval is 1 hour. Default: 12h") + @Parameter(value = "max_event_age", converter = JavaDurationConverter.class, validators = PositiveJavaDurationValidator.class) + private java.time.Duration maxEventAge = java.time.Duration.ofHours(12L); public boolean maintainsStreamAwareFieldTypes() { return streamAwareFieldTypes; diff --git a/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java b/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java new file mode 100644 index 000000000000..fa26f38cb6ea --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.configuration.validators; + +import com.github.joschi.jadconfig.ValidationException; +import com.github.joschi.jadconfig.Validator; + +import java.time.Duration; + +public class PositiveJavaDurationValidator implements Validator { + @Override + public void validate(String name, Duration value) throws ValidationException { + if (value == null || value.isZero() || value.isNegative()) { + throw new ValidationException("Parameter " + name + " must be a positive duration (found " + value + ")"); + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java index b1a6c4926ec2..4f7b4c5ca1cc 100644 --- a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java +++ b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java @@ -25,26 +25,33 @@ import jakarta.inject.Inject; import org.graylog2.database.MongoCollections; import org.graylog2.plugin.periodical.Periodical; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Clock; import java.time.Duration; -import java.util.concurrent.TimeUnit; +import java.time.Instant; public class ClusterEventCleanupPeriodical extends Periodical { private static final Logger LOG = LoggerFactory.getLogger(ClusterEventCleanupPeriodical.class); private static final String COLLECTION_NAME = ClusterEventPeriodical.COLLECTION_NAME; + private static final long MIN_PERIOD_SECONDS = 3600; private final MongoCollection collection; private final Duration maxEventAge; + private final Clock clock; @Inject public ClusterEventCleanupPeriodical(MongoCollections mongoCollections, @Named("max_event_age") Duration maxEventAge) { + this(mongoCollections, maxEventAge, Clock.systemUTC()); + } + + @VisibleForTesting + ClusterEventCleanupPeriodical(MongoCollections mongoCollections, Duration maxEventAge, Clock clock) { this.collection = mongoCollections.collection(COLLECTION_NAME, ClusterEvent.class) .withWriteConcern(WriteConcern.JOURNALED); this.maxEventAge = maxEventAge; + this.clock = clock; } @Override @@ -79,7 +86,7 @@ public int getInitialDelaySeconds() { @Override public int getPeriodSeconds() { - return Ints.saturatedCast(TimeUnit.DAYS.toSeconds(1L)); + return Ints.saturatedCast(Math.max(MIN_PERIOD_SECONDS, maxEventAge.toSeconds())); } @Override @@ -92,7 +99,7 @@ public void doRun() { try { LOG.debug("Removing stale events from MongoDB collection \"{}\"", COLLECTION_NAME); - final long timestamp = DateTime.now(DateTimeZone.UTC).getMillis() - maxEventAge.toMillis(); + final long timestamp = Instant.now(clock).minus(maxEventAge).toEpochMilli(); final var deleted = collection.deleteMany(Filters.lt("timestamp", timestamp)).getDeletedCount(); LOG.debug("Removed {} stale events from \"{}\"", deleted, COLLECTION_NAME); diff --git a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java index 9f283ea57b39..a4d2a4ded71b 100644 --- a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java +++ b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java @@ -79,10 +79,15 @@ static MongoCollection prepareCollection(final MongoConnection mon .collection(COLLECTION_NAME, ClusterEvent.class) .withWriteConcern(WriteConcern.JOURNALED); + try { + collection.dropIndex("timestamp_1_producer_1_consumers_1"); + } catch (MongoException ignored) { + // Old index may not exist + } + collection.createIndex(Indexes.ascending( - "timestamp", - "producer", - "consumers")); + "consumers", + "timestamp")); return collection; } diff --git a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java index 860eb20bfd67..07dc73eaa21b 100644 --- a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java +++ b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java @@ -25,9 +25,6 @@ import org.graylog2.database.MongoCollections; import org.graylog2.database.MongoConnection; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; -import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; -import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -35,7 +32,10 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -43,8 +43,9 @@ public class ClusterEventCleanupPeriodicalTest { @Rule public final MongoDBInstance mongodb = MongoDBInstance.createForClass(); - private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC); + private static final Instant TIME = Instant.parse("2015-04-01T00:00:00Z"); private static final Duration maxEventAge = Duration.ofDays(1); + private static final Clock FIXED_CLOCK = Clock.fixed(TIME, ZoneOffset.UTC); @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); @@ -54,30 +55,28 @@ public class ClusterEventCleanupPeriodicalTest { private ClusterEventCleanupPeriodical clusterEventCleanupPeriodical; @Before - public void setUpService() throws Exception { - DateTimeUtils.setCurrentMillisFixed(TIME.getMillis()); - + public void setUpService() { this.mongoConnection = mongodb.mongoConnection(); this.clusterEventCleanupPeriodical = new ClusterEventCleanupPeriodical(new MongoCollections( new MongoJackObjectMapperProvider(objectMapper), - mongodb.mongoConnection()), maxEventAge); + mongodb.mongoConnection()), maxEventAge, FIXED_CLOCK); } @After public void tearDown() { - DateTimeUtils.setCurrentMillisSystem(); mongoConnection.getMongoDatabase().drop(); } @Test - public void testDoRun() throws Exception { - final var maxEventAgeMillis = maxEventAge.toMillis(); + public void testDoRun() { + final long maxEventAgeMillis = maxEventAge.toMillis(); + final long timeMillis = TIME.toEpochMilli(); final DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); assertThat(insertEvent(collection, 0L)).isTrue(); - assertThat(insertEvent(collection, TIME.getMillis())).isTrue(); - assertThat(insertEvent(collection, TIME.minus(maxEventAgeMillis).getMillis())).isTrue(); - assertThat(insertEvent(collection, TIME.minus(2 * maxEventAgeMillis).getMillis())).isTrue(); + assertThat(insertEvent(collection, timeMillis)).isTrue(); + assertThat(insertEvent(collection, timeMillis - maxEventAgeMillis)).isTrue(); + assertThat(insertEvent(collection, timeMillis - 2 * maxEventAgeMillis)).isTrue(); assertThat(collection.count()).isEqualTo(4L); clusterEventCleanupPeriodical.run(); @@ -85,6 +84,27 @@ public void testDoRun() throws Exception { assertThat(collection.count()).isEqualTo(2L); } + @Test + public void getPeriodSeconds_defaultAge_returns43200() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(12)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(43200); + } + + @Test + public void getPeriodSeconds_shortAge_clampsToMinimum() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofSeconds(10)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(3600); + } + + @Test + public void getPeriodSeconds_customAge_matchesMaxEventAge() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(2)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(7200); + } + private boolean insertEvent(DBCollection collection, long timestamp) { DBObject event = new BasicDBObjectBuilder() .add("timestamp", timestamp) diff --git a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java index cd04bf6941d0..ac00d418d5a2 100644 --- a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java +++ b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java @@ -38,7 +38,6 @@ import org.graylog2.shared.plugins.ChainingClassLoader; import org.graylog2.system.debug.DebugEvent; import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.Before; @@ -60,7 +59,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class ClusterEventPeriodicalTest { +class ClusterEventPeriodicalTest { @Rule public final MongoDBInstance mongodb = MongoDBInstance.createForClass(); private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC); @@ -80,9 +79,7 @@ public class ClusterEventPeriodicalTest { private MongoJackObjectMapperProvider objectMapperProvider; @Before - public void setUpService() throws Exception { - DateTimeUtils.setCurrentMillisFixed(TIME.getMillis()); - + public void setUpService() { this.mongoConnection = mongodb.mongoConnection(); this.objectMapperProvider = new MongoJackObjectMapperProvider(objectMapper); @@ -100,17 +97,16 @@ public void setUpService() throws Exception { @After public void tearDown() { - DateTimeUtils.setCurrentMillisSystem(); mongoConnection.getMongoDatabase().drop(); } @Test - public void clusterEventServiceRegistersItselfWithClusterEventBus() throws Exception { + void clusterEventServiceRegistersItselfWithClusterEventBus() { verify(clusterEventBus, times(1)).registerClusterEventSubscriber(clusterEventPeriodical); } @Test - public void runHandlesInvalidPayloadsGracefully() throws Exception { + void runHandlesInvalidPayloadsGracefully() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -137,7 +133,7 @@ public void runHandlesInvalidPayloadsGracefully() throws Exception { } @Test - public void serverEventBusDispatchesTypedEvents() throws Exception { + void serverEventBusDispatchesTypedEvents() { final SimpleEventHandler handler = new SimpleEventHandler(); serverEventBus.register(handler); @@ -169,7 +165,7 @@ public void serverEventBusDispatchesTypedEvents() throws Exception { } @Test - public void runHandlesAutoValueCorrectly() throws Exception { + void runHandlesAutoValueCorrectly() { final DebugEvent event = DebugEvent.create("Node ID", TIME, "test"); DBObject dbObject = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) @@ -197,7 +193,7 @@ public void runHandlesAutoValueCorrectly() throws Exception { } @Test - public void testRun() throws Exception { + void testRun() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -224,7 +220,7 @@ public void testRun() throws Exception { } @Test - public void testPublishClusterEvent() throws Exception { + void testPublishClusterEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); SimpleEvent event = new SimpleEvent("test"); @@ -247,7 +243,7 @@ public void testPublishClusterEvent() throws Exception { } @Test - public void publishClusterEventHandlesAutoValueCorrectly() throws Exception { + void publishClusterEventHandlesAutoValueCorrectly() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); DebugEvent event = DebugEvent.create("Node ID", "Test"); @@ -266,7 +262,7 @@ public void publishClusterEventHandlesAutoValueCorrectly() throws Exception { } @Test - public void publishClusterEventSkipsDeadEvent() throws Exception { + void publishClusterEventSkipsDeadEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); DeadEvent event = new DeadEvent(clusterEventBus, new SimpleEvent("test")); @@ -280,7 +276,7 @@ public void publishClusterEventSkipsDeadEvent() throws Exception { } @Test - public void prepareCollectionCreatesIndexesOnExistingCollection() throws Exception { + void prepareCollectionCreatesIndexesOnExistingCollection() { @SuppressWarnings("deprecation") DBCollection original = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); original.dropIndexes(); @@ -294,7 +290,7 @@ public void prepareCollectionCreatesIndexesOnExistingCollection() throws Excepti } @Test - public void prepareCollectionCreatesCollectionIfItDoesNotExist() throws Exception { + void prepareCollectionCreatesCollectionIfItDoesNotExist() { @SuppressWarnings("deprecation") final DB database = mongoConnection.getDatabase(); database.getCollection(ClusterEventPeriodical.COLLECTION_NAME).drop(); @@ -307,7 +303,7 @@ public void prepareCollectionCreatesCollectionIfItDoesNotExist() throws Exceptio } @Test - public void localNodeIsMarkedAsHavingConsumedEvent() { + void localNodeIsMarkedAsHavingConsumedEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); SimpleEvent event = new SimpleEvent("test"); @@ -320,7 +316,7 @@ public void localNodeIsMarkedAsHavingConsumedEvent() { } @Test - public void localEventIsPostedToServerBusImmediately() { + void localEventIsPostedToServerBusImmediately() { SimpleEvent event = new SimpleEvent("test"); clusterEventPeriodical.publishClusterEvent(event); @@ -329,7 +325,7 @@ public void localEventIsPostedToServerBusImmediately() { } @Test - public void localEventIsNotProcessedFromDB() { + void localEventIsNotProcessedFromDB() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -362,7 +358,7 @@ public Safe(String param) { } @Test - public void testInstantiatesSafeEventClass() { + void testInstantiatesSafeEventClass() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -381,7 +377,7 @@ public void testInstantiatesSafeEventClass() { } @Test - public void testIgnoresUnsafeEventClass() { + void testIgnoresUnsafeEventClass() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER")