Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions changelog/unreleased/issue-25259.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type = "f"
message = "Honor the max_event_age configuration for cluster event cleanup. Default reduced from 24h to 12h."

details.user = """
A clean-up job runs at a period of max_event_age to remove old events from the cluster. Note that
events created just after the last clean-up job will be removed after max_event_age, so the effective
age of events in the cluster is between max_event_age and 2*max_event_age.
The default value of max_event_age is reduced from 24h to 12h to ensure that events are cleaned up in a more timely manner.
"""

issues = ["25259"]
pulls = ["25265"]
6 changes: 4 additions & 2 deletions graylog2-server/src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.graylog2.cluster.lock.MongoLockService;
import org.graylog2.configuration.Documentation;
import org.graylog2.configuration.converters.JavaDurationConverter;
import org.graylog2.configuration.validators.PositiveJavaDurationValidator;
import org.graylog2.notifications.Notification;
import org.graylog2.outputs.BatchSizeConfig;
import org.graylog2.plugin.Tools;
Expand Down Expand Up @@ -298,8 +299,9 @@ public class Configuration extends CaConfiguration implements CommonNodeConfigur
@Parameter(value = "global_inputs_only")
private boolean globalInputsOnly = false;

@Parameter(value = "max_event_age", converter = JavaDurationConverter.class)
private java.time.Duration maxEventAge = java.time.Duration.ofDays(1L);
@Documentation("Maximum age of cluster events before cleanup. The cleanup runs at this interval, so effective event age is between max_event_age and 2*max_event_age. Minimum effective interval is 1 hour. Default: 12h")
@Parameter(value = "max_event_age", converter = JavaDurationConverter.class, validators = PositiveJavaDurationValidator.class)
private java.time.Duration maxEventAge = java.time.Duration.ofHours(12L);

public boolean maintainsStreamAwareFieldTypes() {
return streamAwareFieldTypes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.configuration.validators;

import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.Validator;

import java.time.Duration;

public class PositiveJavaDurationValidator implements Validator<Duration> {
@Override
public void validate(String name, Duration value) throws ValidationException {
if (value == null || value.isZero() || value.isNegative()) {
throw new ValidationException("Parameter " + name + " must be a positive duration (found " + value + ")");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,33 @@
import jakarta.inject.Inject;
import org.graylog2.database.MongoCollections;
import org.graylog2.plugin.periodical.Periodical;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.time.Instant;

public class ClusterEventCleanupPeriodical extends Periodical {
private static final Logger LOG = LoggerFactory.getLogger(ClusterEventCleanupPeriodical.class);
private static final String COLLECTION_NAME = ClusterEventPeriodical.COLLECTION_NAME;
private static final long MIN_PERIOD_SECONDS = 3600;

private final MongoCollection<ClusterEvent> collection;
private final Duration maxEventAge;
private final Clock clock;

@Inject
public ClusterEventCleanupPeriodical(MongoCollections mongoCollections, @Named("max_event_age") Duration maxEventAge) {
this(mongoCollections, maxEventAge, Clock.systemUTC());
}

@VisibleForTesting
ClusterEventCleanupPeriodical(MongoCollections mongoCollections, Duration maxEventAge, Clock clock) {
this.collection = mongoCollections.collection(COLLECTION_NAME, ClusterEvent.class)
.withWriteConcern(WriteConcern.JOURNALED);
this.maxEventAge = maxEventAge;
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -79,7 +86,7 @@ public int getInitialDelaySeconds() {

@Override
public int getPeriodSeconds() {
return Ints.saturatedCast(TimeUnit.DAYS.toSeconds(1L));
return Ints.saturatedCast(Math.max(MIN_PERIOD_SECONDS, maxEventAge.toSeconds()));
}

@Override
Expand All @@ -92,7 +99,7 @@ public void doRun() {
try {
LOG.debug("Removing stale events from MongoDB collection \"{}\"", COLLECTION_NAME);

final long timestamp = DateTime.now(DateTimeZone.UTC).getMillis() - maxEventAge.toMillis();
final long timestamp = Instant.now(clock).minus(maxEventAge).toEpochMilli();
final var deleted = collection.deleteMany(Filters.lt("timestamp", timestamp)).getDeletedCount();

LOG.debug("Removed {} stale events from \"{}\"", deleted, COLLECTION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ static MongoCollection<ClusterEvent> prepareCollection(final MongoConnection mon
.collection(COLLECTION_NAME, ClusterEvent.class)
.withWriteConcern(WriteConcern.JOURNALED);

try {
collection.dropIndex("timestamp_1_producer_1_consumers_1");
} catch (MongoException ignored) {
// Old index may not exist
}

collection.createIndex(Indexes.ascending(
"timestamp",
"producer",
"consumers"));
"consumers",
"timestamp"));

return collection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,27 @@
import org.graylog2.database.MongoCollections;
import org.graylog2.database.MongoConnection;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

public class ClusterEventCleanupPeriodicalTest {
@Rule
public final MongoDBInstance mongodb = MongoDBInstance.createForClass();
private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC);
private static final Instant TIME = Instant.parse("2015-04-01T00:00:00Z");
private static final Duration maxEventAge = Duration.ofDays(1);
private static final Clock FIXED_CLOCK = Clock.fixed(TIME, ZoneOffset.UTC);

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
Expand All @@ -54,37 +55,56 @@ public class ClusterEventCleanupPeriodicalTest {
private ClusterEventCleanupPeriodical clusterEventCleanupPeriodical;

@Before
public void setUpService() throws Exception {
DateTimeUtils.setCurrentMillisFixed(TIME.getMillis());

public void setUpService() {
this.mongoConnection = mongodb.mongoConnection();

this.clusterEventCleanupPeriodical = new ClusterEventCleanupPeriodical(new MongoCollections(
new MongoJackObjectMapperProvider(objectMapper),
mongodb.mongoConnection()), maxEventAge);
mongodb.mongoConnection()), maxEventAge, FIXED_CLOCK);
}

@After
public void tearDown() {
DateTimeUtils.setCurrentMillisSystem();
mongoConnection.getMongoDatabase().drop();
}

@Test
public void testDoRun() throws Exception {
final var maxEventAgeMillis = maxEventAge.toMillis();
public void testDoRun() {
final long maxEventAgeMillis = maxEventAge.toMillis();
final long timeMillis = TIME.toEpochMilli();
final DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME);
assertThat(insertEvent(collection, 0L)).isTrue();
assertThat(insertEvent(collection, TIME.getMillis())).isTrue();
assertThat(insertEvent(collection, TIME.minus(maxEventAgeMillis).getMillis())).isTrue();
assertThat(insertEvent(collection, TIME.minus(2 * maxEventAgeMillis).getMillis())).isTrue();
assertThat(insertEvent(collection, timeMillis)).isTrue();
assertThat(insertEvent(collection, timeMillis - maxEventAgeMillis)).isTrue();
assertThat(insertEvent(collection, timeMillis - 2 * maxEventAgeMillis)).isTrue();
assertThat(collection.count()).isEqualTo(4L);

clusterEventCleanupPeriodical.run();

assertThat(collection.count()).isEqualTo(2L);
}

@Test
public void getPeriodSeconds_defaultAge_returns43200() {
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(12));
assertThat(periodical.getPeriodSeconds()).isEqualTo(43200);
}

@Test
public void getPeriodSeconds_shortAge_clampsToMinimum() {
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofSeconds(10));
assertThat(periodical.getPeriodSeconds()).isEqualTo(3600);
}

@Test
public void getPeriodSeconds_customAge_matchesMaxEventAge() {
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(2));
assertThat(periodical.getPeriodSeconds()).isEqualTo(7200);
}

private boolean insertEvent(DBCollection collection, long timestamp) {
DBObject event = new BasicDBObjectBuilder()
.add("timestamp", timestamp)
Expand Down
Loading
Loading