diff --git a/changelog/unreleased/issue-25259.toml b/changelog/unreleased/issue-25259.toml new file mode 100644 index 000000000000..107a6479d3d5 --- /dev/null +++ b/changelog/unreleased/issue-25259.toml @@ -0,0 +1,12 @@ +type = "f" +message = "Honor the max_event_age configuration for cluster event cleanup. Default reduced from 24h to 12h." + +details.user = """ +A clean-up job runs at a period of max_event_age to remove old events from the cluster. Note that +events created just after the last clean-up job will be removed after max_event_age, so the effective +age of events in the cluster is between max_event_age and 2*max_event_age. +The default value of max_event_age is reduced from 24h to 12h to ensure that events are cleaned up in a more timely manner. +""" + +issues = ["25259"] +pulls = ["25265"] diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index 474ca0ee0576..672de8bf6531 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -39,6 +39,7 @@ import org.graylog2.cluster.lock.MongoLockService; import org.graylog2.configuration.Documentation; import org.graylog2.configuration.converters.JavaDurationConverter; +import org.graylog2.configuration.validators.PositiveJavaDurationValidator; import org.graylog2.notifications.Notification; import org.graylog2.outputs.BatchSizeConfig; import org.graylog2.plugin.Tools; @@ -298,8 +299,9 @@ public class Configuration extends CaConfiguration implements CommonNodeConfigur @Parameter(value = "global_inputs_only") private boolean globalInputsOnly = false; - @Parameter(value = "max_event_age", converter = JavaDurationConverter.class) - private java.time.Duration maxEventAge = java.time.Duration.ofDays(1L); + @Documentation("Maximum age of cluster events before cleanup. The cleanup runs at this interval, so effective event age is between max_event_age and 2*max_event_age. Minimum effective interval is 1 hour. Default: 12h") + @Parameter(value = "max_event_age", converter = JavaDurationConverter.class, validators = PositiveJavaDurationValidator.class) + private java.time.Duration maxEventAge = java.time.Duration.ofHours(12L); public boolean maintainsStreamAwareFieldTypes() { return streamAwareFieldTypes; diff --git a/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java b/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java new file mode 100644 index 000000000000..fa26f38cb6ea --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/configuration/validators/PositiveJavaDurationValidator.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.configuration.validators; + +import com.github.joschi.jadconfig.ValidationException; +import com.github.joschi.jadconfig.Validator; + +import java.time.Duration; + +public class PositiveJavaDurationValidator implements Validator { + @Override + public void validate(String name, Duration value) throws ValidationException { + if (value == null || value.isZero() || value.isNegative()) { + throw new ValidationException("Parameter " + name + " must be a positive duration (found " + value + ")"); + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java index b1a6c4926ec2..4f7b4c5ca1cc 100644 --- a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java +++ b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java @@ -25,26 +25,33 @@ import jakarta.inject.Inject; import org.graylog2.database.MongoCollections; import org.graylog2.plugin.periodical.Periodical; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Clock; import java.time.Duration; -import java.util.concurrent.TimeUnit; +import java.time.Instant; public class ClusterEventCleanupPeriodical extends Periodical { private static final Logger LOG = LoggerFactory.getLogger(ClusterEventCleanupPeriodical.class); private static final String COLLECTION_NAME = ClusterEventPeriodical.COLLECTION_NAME; + private static final long MIN_PERIOD_SECONDS = 3600; private final MongoCollection collection; private final Duration maxEventAge; + private final Clock clock; @Inject public ClusterEventCleanupPeriodical(MongoCollections mongoCollections, @Named("max_event_age") Duration maxEventAge) { + this(mongoCollections, maxEventAge, Clock.systemUTC()); + } + + @VisibleForTesting + ClusterEventCleanupPeriodical(MongoCollections mongoCollections, Duration maxEventAge, Clock clock) { this.collection = mongoCollections.collection(COLLECTION_NAME, ClusterEvent.class) .withWriteConcern(WriteConcern.JOURNALED); this.maxEventAge = maxEventAge; + this.clock = clock; } @Override @@ -79,7 +86,7 @@ public int getInitialDelaySeconds() { @Override public int getPeriodSeconds() { - return Ints.saturatedCast(TimeUnit.DAYS.toSeconds(1L)); + return Ints.saturatedCast(Math.max(MIN_PERIOD_SECONDS, maxEventAge.toSeconds())); } @Override @@ -92,7 +99,7 @@ public void doRun() { try { LOG.debug("Removing stale events from MongoDB collection \"{}\"", COLLECTION_NAME); - final long timestamp = DateTime.now(DateTimeZone.UTC).getMillis() - maxEventAge.toMillis(); + final long timestamp = Instant.now(clock).minus(maxEventAge).toEpochMilli(); final var deleted = collection.deleteMany(Filters.lt("timestamp", timestamp)).getDeletedCount(); LOG.debug("Removed {} stale events from \"{}\"", deleted, COLLECTION_NAME); diff --git a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java index 9f283ea57b39..a4d2a4ded71b 100644 --- a/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java +++ b/graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java @@ -79,10 +79,15 @@ static MongoCollection prepareCollection(final MongoConnection mon .collection(COLLECTION_NAME, ClusterEvent.class) .withWriteConcern(WriteConcern.JOURNALED); + try { + collection.dropIndex("timestamp_1_producer_1_consumers_1"); + } catch (MongoException ignored) { + // Old index may not exist + } + collection.createIndex(Indexes.ascending( - "timestamp", - "producer", - "consumers")); + "consumers", + "timestamp")); return collection; } diff --git a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java index 860eb20bfd67..07dc73eaa21b 100644 --- a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java +++ b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java @@ -25,9 +25,6 @@ import org.graylog2.database.MongoCollections; import org.graylog2.database.MongoConnection; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; -import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; -import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -35,7 +32,10 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -43,8 +43,9 @@ public class ClusterEventCleanupPeriodicalTest { @Rule public final MongoDBInstance mongodb = MongoDBInstance.createForClass(); - private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC); + private static final Instant TIME = Instant.parse("2015-04-01T00:00:00Z"); private static final Duration maxEventAge = Duration.ofDays(1); + private static final Clock FIXED_CLOCK = Clock.fixed(TIME, ZoneOffset.UTC); @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); @@ -54,30 +55,28 @@ public class ClusterEventCleanupPeriodicalTest { private ClusterEventCleanupPeriodical clusterEventCleanupPeriodical; @Before - public void setUpService() throws Exception { - DateTimeUtils.setCurrentMillisFixed(TIME.getMillis()); - + public void setUpService() { this.mongoConnection = mongodb.mongoConnection(); this.clusterEventCleanupPeriodical = new ClusterEventCleanupPeriodical(new MongoCollections( new MongoJackObjectMapperProvider(objectMapper), - mongodb.mongoConnection()), maxEventAge); + mongodb.mongoConnection()), maxEventAge, FIXED_CLOCK); } @After public void tearDown() { - DateTimeUtils.setCurrentMillisSystem(); mongoConnection.getMongoDatabase().drop(); } @Test - public void testDoRun() throws Exception { - final var maxEventAgeMillis = maxEventAge.toMillis(); + public void testDoRun() { + final long maxEventAgeMillis = maxEventAge.toMillis(); + final long timeMillis = TIME.toEpochMilli(); final DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); assertThat(insertEvent(collection, 0L)).isTrue(); - assertThat(insertEvent(collection, TIME.getMillis())).isTrue(); - assertThat(insertEvent(collection, TIME.minus(maxEventAgeMillis).getMillis())).isTrue(); - assertThat(insertEvent(collection, TIME.minus(2 * maxEventAgeMillis).getMillis())).isTrue(); + assertThat(insertEvent(collection, timeMillis)).isTrue(); + assertThat(insertEvent(collection, timeMillis - maxEventAgeMillis)).isTrue(); + assertThat(insertEvent(collection, timeMillis - 2 * maxEventAgeMillis)).isTrue(); assertThat(collection.count()).isEqualTo(4L); clusterEventCleanupPeriodical.run(); @@ -85,6 +84,27 @@ public void testDoRun() throws Exception { assertThat(collection.count()).isEqualTo(2L); } + @Test + public void getPeriodSeconds_defaultAge_returns43200() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(12)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(43200); + } + + @Test + public void getPeriodSeconds_shortAge_clampsToMinimum() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofSeconds(10)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(3600); + } + + @Test + public void getPeriodSeconds_customAge_matchesMaxEventAge() { + final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection()); + final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(2)); + assertThat(periodical.getPeriodSeconds()).isEqualTo(7200); + } + private boolean insertEvent(DBCollection collection, long timestamp) { DBObject event = new BasicDBObjectBuilder() .add("timestamp", timestamp) diff --git a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java index cd04bf6941d0..ac00d418d5a2 100644 --- a/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java +++ b/graylog2-server/src/test/java/org/graylog2/events/ClusterEventPeriodicalTest.java @@ -38,7 +38,6 @@ import org.graylog2.shared.plugins.ChainingClassLoader; import org.graylog2.system.debug.DebugEvent; import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.Before; @@ -60,7 +59,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class ClusterEventPeriodicalTest { +class ClusterEventPeriodicalTest { @Rule public final MongoDBInstance mongodb = MongoDBInstance.createForClass(); private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC); @@ -80,9 +79,7 @@ public class ClusterEventPeriodicalTest { private MongoJackObjectMapperProvider objectMapperProvider; @Before - public void setUpService() throws Exception { - DateTimeUtils.setCurrentMillisFixed(TIME.getMillis()); - + public void setUpService() { this.mongoConnection = mongodb.mongoConnection(); this.objectMapperProvider = new MongoJackObjectMapperProvider(objectMapper); @@ -100,17 +97,16 @@ public void setUpService() throws Exception { @After public void tearDown() { - DateTimeUtils.setCurrentMillisSystem(); mongoConnection.getMongoDatabase().drop(); } @Test - public void clusterEventServiceRegistersItselfWithClusterEventBus() throws Exception { + void clusterEventServiceRegistersItselfWithClusterEventBus() { verify(clusterEventBus, times(1)).registerClusterEventSubscriber(clusterEventPeriodical); } @Test - public void runHandlesInvalidPayloadsGracefully() throws Exception { + void runHandlesInvalidPayloadsGracefully() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -137,7 +133,7 @@ public void runHandlesInvalidPayloadsGracefully() throws Exception { } @Test - public void serverEventBusDispatchesTypedEvents() throws Exception { + void serverEventBusDispatchesTypedEvents() { final SimpleEventHandler handler = new SimpleEventHandler(); serverEventBus.register(handler); @@ -169,7 +165,7 @@ public void serverEventBusDispatchesTypedEvents() throws Exception { } @Test - public void runHandlesAutoValueCorrectly() throws Exception { + void runHandlesAutoValueCorrectly() { final DebugEvent event = DebugEvent.create("Node ID", TIME, "test"); DBObject dbObject = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) @@ -197,7 +193,7 @@ public void runHandlesAutoValueCorrectly() throws Exception { } @Test - public void testRun() throws Exception { + void testRun() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -224,7 +220,7 @@ public void testRun() throws Exception { } @Test - public void testPublishClusterEvent() throws Exception { + void testPublishClusterEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); SimpleEvent event = new SimpleEvent("test"); @@ -247,7 +243,7 @@ public void testPublishClusterEvent() throws Exception { } @Test - public void publishClusterEventHandlesAutoValueCorrectly() throws Exception { + void publishClusterEventHandlesAutoValueCorrectly() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); DebugEvent event = DebugEvent.create("Node ID", "Test"); @@ -266,7 +262,7 @@ public void publishClusterEventHandlesAutoValueCorrectly() throws Exception { } @Test - public void publishClusterEventSkipsDeadEvent() throws Exception { + void publishClusterEventSkipsDeadEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); DeadEvent event = new DeadEvent(clusterEventBus, new SimpleEvent("test")); @@ -280,7 +276,7 @@ public void publishClusterEventSkipsDeadEvent() throws Exception { } @Test - public void prepareCollectionCreatesIndexesOnExistingCollection() throws Exception { + void prepareCollectionCreatesIndexesOnExistingCollection() { @SuppressWarnings("deprecation") DBCollection original = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); original.dropIndexes(); @@ -294,7 +290,7 @@ public void prepareCollectionCreatesIndexesOnExistingCollection() throws Excepti } @Test - public void prepareCollectionCreatesCollectionIfItDoesNotExist() throws Exception { + void prepareCollectionCreatesCollectionIfItDoesNotExist() { @SuppressWarnings("deprecation") final DB database = mongoConnection.getDatabase(); database.getCollection(ClusterEventPeriodical.COLLECTION_NAME).drop(); @@ -307,7 +303,7 @@ public void prepareCollectionCreatesCollectionIfItDoesNotExist() throws Exceptio } @Test - public void localNodeIsMarkedAsHavingConsumedEvent() { + void localNodeIsMarkedAsHavingConsumedEvent() { @SuppressWarnings("deprecation") DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME); SimpleEvent event = new SimpleEvent("test"); @@ -320,7 +316,7 @@ public void localNodeIsMarkedAsHavingConsumedEvent() { } @Test - public void localEventIsPostedToServerBusImmediately() { + void localEventIsPostedToServerBusImmediately() { SimpleEvent event = new SimpleEvent("test"); clusterEventPeriodical.publishClusterEvent(event); @@ -329,7 +325,7 @@ public void localEventIsPostedToServerBusImmediately() { } @Test - public void localEventIsNotProcessedFromDB() { + void localEventIsNotProcessedFromDB() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -362,7 +358,7 @@ public Safe(String param) { } @Test - public void testInstantiatesSafeEventClass() { + void testInstantiatesSafeEventClass() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER") @@ -381,7 +377,7 @@ public void testInstantiatesSafeEventClass() { } @Test - public void testIgnoresUnsafeEventClass() { + void testIgnoresUnsafeEventClass() { DBObject event = new BasicDBObjectBuilder() .add("timestamp", TIME.getMillis()) .add("producer", "TEST-PRODUCER")